YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/yb_rpc.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#ifndef YB_RPC_YB_RPC_H
17
#define YB_RPC_YB_RPC_H
18
19
#include <stdint.h>
20
21
#include <cstdint>
22
#include <cstdlib>
23
#include <string>
24
#include <type_traits>
25
26
#include <boost/version.hpp>
27
28
#include "yb/rpc/rpc_fwd.h"
29
#include "yb/rpc/binary_call_parser.h"
30
#include "yb/rpc/circular_read_buffer.h"
31
#include "yb/rpc/connection_context.h"
32
#include "yb/rpc/rpc_with_call_id.h"
33
#include "yb/rpc/serialization.h"
34
35
#include "yb/util/ev_util.h"
36
#include "yb/util/net/net_fwd.h"
37
#include "yb/util/size_literals.h"
38
39
namespace yb {
40
namespace rpc {
41
42
class YBConnectionContext : public ConnectionContextWithCallId, public BinaryCallParserListener {
43
 public:
44
  YBConnectionContext(
45
      size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
46
      const MemTrackerPtr& call_tracker);
47
  ~YBConnectionContext();
48
49
58.2M
  const MemTrackerPtr& call_tracker() const { return call_tracker_; }
50
51
  void SetEventLoop(ev::loop_ref* loop) override;
52
53
  void Shutdown(const Status& status) override;
54
55
 protected:
56
38.4M
  BinaryCallParser& parser() { return parser_; }
57
58
  ev::loop_ref* loop_ = nullptr;
59
60
  EvTimerHolder timer_;
61
62
 private:
63
  uint64_t ExtractCallId(InboundCall* call) override;
64
65
288M
  StreamReadBuffer& ReadBuffer() override {
66
288M
    return read_buffer_;
67
288M
  }
68
69
  BinaryCallParser parser_;
70
71
  CircularReadBuffer read_buffer_;
72
73
  const MemTrackerPtr call_tracker_;
74
};
75
76
class YBInboundConnectionContext : public YBConnectionContext {
77
 public:
78
  YBInboundConnectionContext(
79
      size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
80
      const MemTrackerPtr& call_tracker)
81
304k
      : YBConnectionContext(receive_buffer_size, buffer_tracker, call_tracker) {}
82
83
11.6k
  static std::string Name() { return "Inbound RPC"; }
84
 private:
85
  // Takes ownership of call_data content.
86
  CHECKED_STATUS HandleCall(const ConnectionPtr& connection, CallData* call_data) override;
87
  void Connected(const ConnectionPtr& connection) override;
88
  Result<ProcessCallsResult> ProcessCalls(const ConnectionPtr& connection,
89
                                          const IoVecs& data,
90
                                          ReadBufferFull read_buffer_full) override;
91
92
  // Takes ownership of call_data content.
93
  CHECKED_STATUS HandleInboundCall(const ConnectionPtr& connection, std::vector<char>* call_data);
94
95
  void HandleTimeout(ev::timer& watcher, int revents); // NOLINT
96
97
11
  RpcConnectionPB::StateType State() override { return state_; }
98
99
  RpcConnectionPB::StateType state_ = RpcConnectionPB::UNKNOWN;
100
101
  void UpdateLastWrite(const ConnectionPtr& connection) override;
102
103
  std::weak_ptr<Connection> connection_;
104
105
  // Last time data was sent to network layer below application.
106
  CoarseTimePoint last_write_time_;
107
  // Last time we queued heartbeat for sending.
108
  CoarseTimePoint last_heartbeat_sending_time_;
109
};
110
111
class YBInboundCall : public InboundCall {
112
 public:
113
  YBInboundCall(ConnectionPtr conn, CallProcessedListener call_processed_listener);
114
  explicit YBInboundCall(RpcMetrics* rpc_metrics, const RemoteMethod& remote_method);
115
  virtual ~YBInboundCall();
116
117
  // Is this a local call?
118
19.3M
  virtual bool IsLocalCall() const { return false; }
119
120
  // Parse an inbound call message.
121
  //
122
  // This only deserializes the call header, populating the 'header_' and
123
  // 'serialized_request_' member variables. The actual call parameter is
124
  // not deserialized, as this may be CPU-expensive, and this is called
125
  // from the reactor thread.
126
  //
127
  // Takes ownership of call_data content.
128
  CHECKED_STATUS ParseFrom(const MemTrackerPtr& mem_tracker, CallData* call_data);
129
130
39.1M
  int32_t call_id() const {
131
39.1M
    return header_.call_id;
132
39.1M
  }
133
134
25.2M
  Slice serialized_remote_method() const override {
135
25.2M
    return header_.remote_method;
136
25.2M
  }
137
138
  Slice method_name() const override;
139
140
  // See RpcContext::AddRpcSidecar()
141
  virtual size_t AddRpcSidecar(Slice car);
142
143
  // See RpcContext::ResetRpcSidecars()
144
  void ResetRpcSidecars();
145
146
  void ReserveSidecarSpace(size_t space);
147
148
  // Serializes 'response' into the InboundCall's internal buffer, and marks
149
  // the call as a success. Enqueues the response back to the connection
150
  // that made the call.
151
  //
152
  // This method deletes the InboundCall object, so no further calls may be
153
  // made after this one.
154
  void RespondSuccess(AnyMessageConstPtr response);
155
156
  // Serializes a failure response into the internal buffer, marking the
157
  // call as a failure. Enqueues the response back to the connection that
158
  // made the call.
159
  //
160
  // This method deletes the InboundCall object, so no further calls may be
161
  // made after this one.
162
  void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
163
                      const Status &status) override;
164
165
  void RespondApplicationError(int error_ext_id, const std::string& message,
166
                               const google::protobuf::MessageLite& app_error_pb);
167
168
  // Convert an application error extension to an ErrorStatusPB.
169
  // These ErrorStatusPB objects are what are returned in application error responses.
170
  static void ApplicationErrorToPB(int error_ext_id, const std::string& message,
171
                                   const google::protobuf::MessageLite& app_error_pb,
172
                                   ErrorStatusPB* err);
173
174
  // Serialize the response packet for the finished call.
175
  // The resulting slices refer to memory in this object.
176
  void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) override;
177
178
  void LogTrace() const override;
179
  std::string ToString() const override;
180
  bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) override;
181
182
  CoarseTimePoint GetClientDeadline() const override;
183
184
0
  MonoTime ReceiveTime() const {
185
0
    return timing_.time_received;
186
0
  }
187
188
  virtual CHECKED_STATUS ParseParam(RpcCallParams* params);
189
190
19.5M
  size_t ObjectSize() const override { return sizeof(*this); }
191
192
19.6M
  size_t DynamicMemoryUsage() const override {
193
19.6M
    return InboundCall::DynamicMemoryUsage() + DynamicMemoryUsageOf(response_buf_);
194
19.6M
  }
195
196
 protected:
197
  // Fields to store sidecars state. See rpc/rpc_sidecar.h for more info.
198
  size_t num_sidecars_ = 0;
199
  size_t filled_bytes_in_last_sidecar_buffer_ = 0;
200
  size_t total_sidecars_size_ = 0;
201
  boost::container::small_vector<RefCntBuffer, kMinBufferForSidecarSlices> sidecar_buffers_;
202
  google::protobuf::RepeatedField<uint32_t> sidecar_offsets_;
203
204
  // Serialize and queue the response.
205
  virtual void Respond(AnyMessageConstPtr response, bool is_success);
206
207
 private:
208
  // Serialize a response message for either success or failure. If it is a success,
209
  // 'response' should be the user-defined response type for the call. If it is a
210
  // failure, 'response' should be an ErrorStatusPB instance.
211
  CHECKED_STATUS SerializeResponseBuffer(AnyMessageConstPtr response, bool is_success);
212
213
  // Returns number of bytes copied.
214
  size_t CopyToLastSidecarBuffer(const Slice& slice);
215
  void AllocateSidecarBuffer(size_t size);
216
217
  // The header of the incoming call. Set by ParseFrom()
218
  ParsedRequestHeader header_;
219
220
  // The buffers for serialized response. Set by SerializeResponseBuffer().
221
  RefCntBuffer response_buf_;
222
223
  ScopedTrackedConsumption consumption_;
224
};
225
226
class YBOutboundConnectionContext : public YBConnectionContext {
227
 public:
228
  YBOutboundConnectionContext(
229
      size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
230
      const MemTrackerPtr& call_tracker)
231
629k
      : YBConnectionContext(receive_buffer_size, buffer_tracker, call_tracker) {}
232
233
21.3k
  static std::string Name() { return "Outbound RPC"; }
234
235
 private:
236
10
  RpcConnectionPB::StateType State() override {
237
10
    return RpcConnectionPB::OPEN;
238
10
  }
239
240
  // Takes ownership of call_data content.
241
  CHECKED_STATUS HandleCall(const ConnectionPtr& connection, CallData* call_data) override;
242
  void Connected(const ConnectionPtr& connection) override;
243
  void AssignConnection(const ConnectionPtr& connection) override;
244
  Result<ProcessCallsResult> ProcessCalls(const ConnectionPtr& connection,
245
                                          const IoVecs& data,
246
                                          ReadBufferFull read_buffer_full) override;
247
248
  void UpdateLastRead(const ConnectionPtr& connection) override;
249
250
  void HandleTimeout(ev::timer& watcher, int revents); // NOLINT
251
252
  std::weak_ptr<Connection> connection_;
253
254
  CoarseTimePoint last_read_time_;
255
};
256
257
} // namespace rpc
258
} // namespace yb
259
260
#endif // YB_RPC_YB_RPC_H