YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/outbound_data.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_RPC_OUTBOUND_DATA_H
15
#define YB_RPC_OUTBOUND_DATA_H
16
17
#include <float.h>
18
#include <stdint.h>
19
20
#include <chrono>
21
#include <memory>
22
#include <sstream>
23
#include <string>
24
#include <type_traits>
25
26
#include <boost/container/small_vector.hpp>
27
#include <boost/mpl/and.hpp>
28
29
#include "yb/util/format.h"
30
#include "yb/util/memory/memory_usage.h"
31
#include "yb/util/ref_cnt_buffer.h"
32
#include "yb/util/tostring.h"
33
#include "yb/util/type_traits.h"
34
35
namespace yb {
36
37
class Status;
38
39
namespace rpc {
40
41
class Connection;
42
class DumpRunningRpcsRequestPB;
43
class RpcCallInProgressPB;
44
45
// Interface for outbound transfers from the RPC framework. Implementations include:
46
// - RpcCall
47
// - LocalOutboundCall
48
// - ConnectionHeader
49
// - ServerEventList
50
class OutboundData : public std::enable_shared_from_this<OutboundData> {
51
 public:
52
  virtual void Transferred(const Status& status, Connection* conn) = 0;
53
54
  // Serializes the data to be sent out via the RPC framework.
55
  virtual void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) = 0;
56
57
  virtual std::string ToString() const = 0;
58
59
  virtual bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) = 0;
60
61
25.0M
  virtual bool IsFinished() const { return false; }
62
63
44.1M
  virtual bool IsHeartbeat() const { return false; }
64
65
  virtual size_t ObjectSize() const = 0;
66
67
  virtual size_t DynamicMemoryUsage() const = 0;
68
69
55.4M
  virtual ~OutboundData() {}
70
};
71
72
typedef std::shared_ptr<OutboundData> OutboundDataPtr;
73
74
class StringOutboundData : public OutboundData {
75
 public:
76
  StringOutboundData(const std::string& data, const std::string& name)
77
30
      : buffer_(data), name_(name) {}
78
  StringOutboundData(const char* data, size_t len, const std::string& name)
79
19.6k
      : buffer_(data, len), name_(name) {}
80
1.07M
  void Transferred(const Status& status, Connection* conn) override {}
81
82
  // Serializes the data to be sent out via the RPC framework.
83
581k
  void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override {
84
581k
    output->push_back(buffer_);
85
581k
  }
86
87
8
  std::string ToString() const override { return name_; }
88
89
0
  bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) override {
90
0
    return false;
91
0
  }
92
93
785k
  size_t ObjectSize() const override { return sizeof(*this); }
94
95
785k
  size_t DynamicMemoryUsage() const override { return DynamicMemoryUsageOf(name_, buffer_); }
96
97
 private:
98
  RefCntBuffer buffer_;
99
  std::string name_;
100
};
101
102
// OutboundData wrapper, that is used for altered streams, where we modify the data that should be
103
// sent. Examples could be that we encrypt or compress it.
104
// This wrapper would contain modified data and reference to original data, that will be used
105
// for notifications.
106
class SingleBufferOutboundData : public OutboundData {
107
 public:
108
  SingleBufferOutboundData(RefCntBuffer buffer, OutboundDataPtr lower_data)
109
112k
      : buffer_(std::move(buffer)), lower_data_(std::move(lower_data)) {}
110
111
112k
  void Transferred(const Status& status, Connection* conn) override {
112
112k
    if (lower_data_) {
113
57.3k
      lower_data_->Transferred(status, conn);
114
57.3k
    }
115
112k
  }
116
117
0
  bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) override {
118
0
    return false;
119
0
  }
120
121
35.7k
  void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override {
122
35.7k
    output->push_back(std::move(buffer_));
123
35.7k
  }
124
125
0
  std::string ToString() const override {
126
0
    return Format("SingleBuffer[$0]", lower_data_);
127
0
  }
128
129
112k
  size_t ObjectSize() const override { return sizeof(*this); }
130
131
112k
  size_t DynamicMemoryUsage() const override { return DynamicMemoryUsageOf(buffer_, lower_data_); }
132
133
 private:
134
  RefCntBuffer buffer_;
135
  OutboundDataPtr lower_data_;
136
};
137
138
}  // namespace rpc
139
}  // namespace yb
140
141
#endif // YB_RPC_OUTBOUND_DATA_H