YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/outbound_call.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_RPC_OUTBOUND_CALL_H_
33
#define YB_RPC_OUTBOUND_CALL_H_
34
35
#include <stdint.h>
36
37
#include <cstdint>
38
#include <cstdlib>
39
#include <deque>
40
#include <memory>
41
#include <string>
42
#include <thread>
43
#include <type_traits>
44
#include <vector>
45
46
#include <boost/functional/hash.hpp>
47
#include <gflags/gflags_declare.h>
48
#include <glog/logging.h>
49
50
#include "yb/gutil/integral_types.h"
51
#include "yb/gutil/macros.h"
52
53
#include "yb/rpc/rpc_fwd.h"
54
#include "yb/rpc/call_data.h"
55
#include "yb/rpc/constants.h"
56
#include "yb/rpc/lightweight_message.h"
57
#include "yb/rpc/remote_method.h"
58
#include "yb/rpc/rpc_call.h"
59
#include "yb/rpc/rpc_header.pb.h"
60
#include "yb/rpc/serialization.h"
61
#include "yb/rpc/rpc_introspection.pb.h"
62
#include "yb/rpc/service_if.h"
63
#include "yb/rpc/thread_pool.h"
64
65
#include "yb/util/status_fwd.h"
66
#include "yb/util/atomic.h"
67
#include "yb/util/locks.h"
68
#include "yb/util/mem_tracker.h"
69
#include "yb/util/memory/memory_usage.h"
70
#include "yb/util/monotime.h"
71
#include "yb/util/net/sockaddr.h"
72
#include "yb/util/object_pool.h"
73
#include "yb/util/ref_cnt_buffer.h"
74
#include "yb/util/shared_lock.h"
75
#include "yb/util/slice.h"
76
#include "yb/util/trace.h"
77
78
namespace google {
79
namespace protobuf {
80
class Message;
81
}  // namespace protobuf
82
}  // namespace google
83
84
namespace yb {
85
namespace rpc {
86
87
// Used to key on Connection information.
88
// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
89
// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
90
class ConnectionId {
91
 public:
92
86.4M
  ConnectionId() {}
93
94
  // Convenience constructor.
95
  ConnectionId(const Endpoint& remote, size_t idx, const Protocol* protocol)
96
98.6M
      : remote_(remote), idx_(idx), protocol_(protocol) {}
97
98
  // The remote address.
99
238M
  const Endpoint& remote() const { return remote_; }
100
225M
  uint8_t idx() const { return idx_; }
101
153M
  const Protocol* protocol() const { return protocol_; }
102
103
  // Returns a string representation of the object, not including the password field.
104
  std::string ToString() const;
105
106
  size_t HashCode() const;
107
108
 private:
109
  // Remember to update HashCode() and Equals() when new fields are added.
110
  Endpoint remote_;
111
  uint8_t idx_ = 0;  // Connection index, used to support multiple connections to the same server.
112
  const Protocol* protocol_ = nullptr;
113
};
114
115
class ConnectionIdHash {
116
 public:
117
  std::size_t operator() (const ConnectionId& conn_id) const;
118
};
119
120
76.7M
inline bool operator==(const ConnectionId& lhs, const ConnectionId& rhs) {
121
76.7M
  return lhs.remote() == rhs.remote() && 
lhs.idx() == rhs.idx()75.0M
&&
lhs.protocol() == rhs.protocol()74.9M
;
122
76.7M
}
123
124
// Container for OutboundCall metrics
125
struct OutboundCallMetrics {
126
  explicit OutboundCallMetrics(const scoped_refptr<MetricEntity>& metric_entity);
127
128
  scoped_refptr<Histogram> queue_time;
129
  scoped_refptr<Histogram> send_time;
130
  scoped_refptr<Histogram> time_to_response;
131
};
132
133
// A response to a call, on the client side.
134
// Upon receiving a response, this is allocated in the reactor thread and filled
135
// into the OutboundCall instance via OutboundCall::SetResponse.
136
//
137
// This may either be a success or error response.
138
//
139
// This class takes care of separating out the distinct payload slices sent
140
// over.
141
class CallResponse {
142
 public:
143
  CallResponse();
144
145
  CallResponse(CallResponse&& rhs) = default;
146
72.7M
  CallResponse& operator=(CallResponse&& rhs) = default;
147
148
  // Parse the response received from a call. This must be called before any
149
  // other methods on this object. Takes ownership of data content.
150
  CHECKED_STATUS ParseFrom(CallData* data);
151
152
  // Return true if the call succeeded.
153
72.5M
  bool is_success() const {
154
72.5M
    DCHECK(parsed_);
155
72.5M
    return !header_.is_error();
156
72.5M
  }
157
158
  // Return the call ID that this response is related to.
159
72.8M
  int32_t call_id() const {
160
72.8M
    DCHECK(parsed_);
161
72.8M
    return header_.call_id();
162
72.8M
  }
163
164
  // Return the serialized response data. This is just the response "body" --
165
  // either a serialized ErrorStatusPB, or the serialized user response protobuf.
166
72.8M
  const Slice &serialized_response() const {
167
72.8M
    DCHECK(parsed_);
168
72.8M
    return serialized_response_;
169
72.8M
  }
170
171
  Result<const uint8_t*const*> GetSidecarPtr(size_t idx) const;
172
173
  Result<SidecarHolder> GetSidecarHolder(size_t idx) const;
174
175
72.6M
  size_t DynamicMemoryUsage() const {
176
72.6M
    return DynamicMemoryUsageOf(header_, response_data_) +
177
72.6M
           GetFlatDynamicMemoryUsageOf(sidecar_bounds_);
178
72.6M
  }
179
180
 private:
181
  // True once ParseFrom() is called.
182
  bool parsed_;
183
184
  // The parsed header.
185
  ResponseHeader header_;
186
187
  // The slice of data for the encoded protobuf response.
188
  // This slice refers to memory allocated by transfer_
189
  Slice serialized_response_;
190
191
  // Slices of data for rpc sidecars. They point into memory owned by transfer_.
192
  // Number of sidecars chould be obtained from header_.
193
  boost::container::small_vector<const uint8_t*, kMinBufferForSidecarSlices> sidecar_bounds_;
194
195
  // The incoming transfer data - retained because serialized_response_
196
  // and sidecar_slices_ refer into its data.
197
  CallData response_data_;
198
199
  DISALLOW_COPY_AND_ASSIGN(CallResponse);
200
};
201
202
class InvokeCallbackTask : public rpc::ThreadPoolTask {
203
 public:
204
86.5M
  InvokeCallbackTask() {}
205
206
66.8M
  void SetOutboundCall(OutboundCallPtr call) { call_ = std::move(call); }
207
208
  void Run() override;
209
210
  void Done(const Status& status) override;
211
212
86.3M
  virtual ~InvokeCallbackTask() {}
213
214
 private:
215
  OutboundCallPtr call_;
216
};
217
218
// Tracks the status of a call on the client side.
219
//
220
// This is an internal-facing class -- clients interact with the
221
// RpcController class.
222
//
223
// This is allocated by the Proxy when a call is first created,
224
// then passed to the reactor thread to send on the wire. It's typically
225
// kept using a shared_ptr because a call may terminate in any number
226
// of different threads, making it tricky to enforce single ownership.
227
class OutboundCall : public RpcCall {
228
 public:
229
  OutboundCall(const RemoteMethod* remote_method,
230
               const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
231
               std::shared_ptr<const OutboundMethodMetrics> method_metrics,
232
               AnyMessagePtr response_storage,
233
               RpcController* controller,
234
               std::shared_ptr<RpcMetrics> rpc_metrics,
235
               ResponseCallback callback,
236
               ThreadPool* callback_thread_pool);
237
238
  virtual ~OutboundCall();
239
240
  // Serialize the given request PB into this call's internal storage.
241
  //
242
  // Because the data is fully serialized by this call, 'req' may be
243
  // subsequently mutated with no ill effects.
244
  virtual CHECKED_STATUS SetRequestParam(
245
      AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker);
246
247
  // Serialize the call for the wire. Requires that SetRequestParam()
248
  // is called first. This is called from the Reactor thread.
249
  void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override;
250
251
  // Sets thread pool to be used by `InvokeCallback` for callback execution.
252
0
  void SetCallbackThreadPool(ThreadPool* callback_thread_pool) {
253
0
    callback_thread_pool_ = callback_thread_pool;
254
0
  }
255
256
  // Callback after the call has been put on the outbound connection queue.
257
  void SetQueued();
258
259
  // Update the call state to show that the request has been sent.
260
  // Could be called on already finished call in case it was already timed out.
261
  void SetSent();
262
263
  // Outbound call could be moved to final state only once,
264
  // so only one of SetFinished/SetTimedOut/SetFailed/SetResponse can be called.
265
266
  // Update the call state to show that the call has finished.
267
  void SetFinished();
268
269
  // Mark the call as failed. This also triggers the callback to notify
270
  // the caller. If the call failed due to a remote error, then err_pb
271
  // should be set to the error returned by the remote server.
272
  void SetFailed(const Status& status, std::unique_ptr<ErrorStatusPB> err_pb = nullptr);
273
274
  // Mark the call as timed out. This also triggers the callback to notify
275
  // the caller.
276
  void SetTimedOut();
277
278
  // Fill in the call response.
279
  void SetResponse(CallResponse&& resp);
280
281
  bool IsTimedOut() const;
282
283
  // Is the call finished?
284
  bool IsFinished() const override final;
285
286
  std::string ToString() const override;
287
288
  bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) override;
289
290
  std::string LogPrefix() const override;
291
292
75.5M
  void SetConnectionId(const ConnectionId& value, const std::string* hostname) {
293
75.5M
    conn_id_ = value;
294
75.5M
    hostname_ = hostname;
295
75.5M
  }
296
297
284
  void SetThreadPoolFailure(const Status& status) {
298
284
    thread_pool_failure_ = status;
299
284
  }
300
301
25.6M
  const Status& thread_pool_failure() const {
302
25.6M
    return thread_pool_failure_;
303
25.6M
  }
304
305
  void InvokeCallbackSync();
306
307
  ////////////////////////////////////////////////////////////
308
  // Getters
309
  ////////////////////////////////////////////////////////////
310
311
226M
  const ConnectionId& conn_id() const { return conn_id_; }
312
75.3M
  const std::string& hostname() const { return *hostname_; }
313
10.8M
  const RemoteMethod& remote_method() const { return *remote_method_; }
314
0
  const ResponseCallback& callback() const { return callback_; }
315
161M
  RpcController* controller() { return controller_; }
316
0
  const RpcController* controller() const { return controller_; }
317
21.6M
  AnyMessagePtr response() const { return response_; }
318
319
72.7M
  int32_t call_id() const {
320
72.7M
    return call_id_;
321
72.7M
  }
322
323
86.2M
  Trace* trace() {
324
86.2M
    return trace_.get();
325
86.2M
  }
326
327
10.8M
  RpcMetrics& rpc_metrics() {
328
10.8M
    return *rpc_metrics_;
329
10.8M
  }
330
331
72.7M
  size_t ObjectSize() const override { return sizeof(*this); }
332
333
72.6M
  size_t DynamicMemoryUsage() const override {
334
72.6M
    return DynamicMemoryUsageAllowSizeOf(error_pb_) +
335
72.6M
           DynamicMemoryUsageOf(buffer_, call_response_, trace_);
336
72.6M
  }
337
338
 protected:
339
  friend class RpcController;
340
341
  Result<Slice> GetSidecar(size_t idx) const;
342
  virtual Result<const uint8_t*const*> GetSidecarPtr(size_t idx) const;
343
  virtual Result<SidecarHolder> GetSidecarHolder(size_t idx) const;
344
345
  ConnectionId conn_id_;
346
  const std::string* hostname_;
347
  CoarseTimePoint start_;
348
  RpcController* controller_;
349
  // Pointer for the protobuf where the response should be written.
350
  // Can be used only while callback_ object is alive.
351
  AnyMessagePtr response_;
352
353
  // The trace buffer.
354
  scoped_refptr<Trace> trace_;
355
356
 private:
357
  friend class RpcController;
358
359
  typedef RpcCallState State;
360
361
  static std::string StateName(State state);
362
363
  void NotifyTransferred(const Status& status, Connection* conn) override;
364
365
  bool SetState(State new_state);
366
  State state() const;
367
368
  // Same as set_state, but requires that the caller already holds
369
  // lock_
370
  void set_state_unlocked(State new_state);
371
372
  // return current status
373
  CHECKED_STATUS status() const;
374
375
  // Return the error protobuf, if a remote error occurred.
376
  // This will only be non-NULL if status().IsRemoteError().
377
  const ErrorStatusPB* error_pb() const;
378
379
  CHECKED_STATUS InitHeader(RequestHeader* header);
380
381
  // Lock for state_ status_, error_pb_ fields, since they
382
  // may be mutated by the reactor thread while the client thread
383
  // reads them.
384
  mutable simple_spinlock lock_;
385
  std::atomic<State> state_ = {READY};
386
  Status status_;
387
  std::unique_ptr<ErrorStatusPB> error_pb_;
388
389
  // Invokes the user-provided callback. Uses callback_thread_pool_ if set.
390
  void InvokeCallback();
391
392
  Result<uint32_t> TimeoutMs() const;
393
394
  int32_t call_id_;
395
396
  // The remote method being called.
397
  const RemoteMethod* remote_method_;
398
399
  ResponseCallback callback_;
400
401
  InvokeCallbackTask callback_task_;
402
403
  ThreadPool* callback_thread_pool_;
404
405
  // Buffers for storing segments of the wire-format request.
406
  RefCntBuffer buffer_;
407
408
  // Consumption of buffer_.
409
  ScopedTrackedConsumption buffer_consumption_;
410
411
  // Once a response has been received for this call, contains that response.
412
  CallResponse call_response_;
413
414
  std::shared_ptr<OutboundCallMetrics> outbound_call_metrics_;
415
416
  std::shared_ptr<RpcMetrics> rpc_metrics_;
417
418
  Status thread_pool_failure_;
419
420
  std::shared_ptr<const OutboundMethodMetrics> method_metrics_;
421
422
  DISALLOW_COPY_AND_ASSIGN(OutboundCall);
423
};
424
425
class RpcErrorTag : public IntegralErrorTag<ErrorStatusPB::RpcErrorCodePB> {
426
 public:
427
  static constexpr uint8_t kCategory = 15;
428
429
0
  static std::string ToMessage(Value value) {
430
0
    return ErrorStatusPB::RpcErrorCodePB_Name(value);
431
0
  }
432
};
433
434
typedef StatusErrorCodeImpl<RpcErrorTag> RpcError;
435
436
}  // namespace rpc
437
}  // namespace yb
438
439
#endif  // YB_RPC_OUTBOUND_CALL_H_