YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/inbound_call.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_RPC_INBOUND_CALL_H_
33
#define YB_RPC_INBOUND_CALL_H_
34
35
#include <string>
36
#include <vector>
37
38
#include <glog/logging.h>
39
40
#include "yb/gutil/stl_util.h"
41
#include "yb/gutil/macros.h"
42
#include "yb/gutil/ref_counted.h"
43
44
#include "yb/rpc/rpc_fwd.h"
45
#include "yb/rpc/call_data.h"
46
#include "yb/rpc/rpc_call.h"
47
#include "yb/rpc/remote_method.h"
48
#include "yb/rpc/rpc_header.pb.h"
49
#include "yb/rpc/thread_pool.h"
50
51
#include "yb/yql/cql/ql/ql_session.h"
52
53
#include "yb/util/faststring.h"
54
#include "yb/util/lockfree.h"
55
#include "yb/util/metrics_fwd.h"
56
#include "yb/util/memory/memory.h"
57
#include "yb/util/monotime.h"
58
#include "yb/util/net/net_fwd.h"
59
#include "yb/util/ref_cnt_buffer.h"
60
#include "yb/util/slice.h"
61
#include "yb/util/status_fwd.h"
62
63
namespace google {
64
namespace protobuf {
65
class Message;
66
}  // namespace protobuf
67
}  // namespace google
68
69
namespace yb {
70
71
class Histogram;
72
class Trace;
73
74
namespace rpc {
75
76
struct InboundCallTiming {
77
  MonoTime time_received;   // Time the call was first accepted.
78
  MonoTime time_handled;    // Time the call handler was kicked off.
79
  MonoTime time_completed;  // Time the call handler completed.
80
};
81
82
class InboundCallHandler {
83
 public:
84
  virtual void Handle(InboundCallPtr call) = 0;
85
86
  virtual void Failure(const InboundCallPtr& call, const Status& status) = 0;
87
88
  virtual bool CallQueued() = 0;
89
90
  virtual void CallDequeued() = 0;
91
92
 protected:
93
  ~InboundCallHandler() = default;
94
};
95
96
// Inbound call on server
97
class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> {
98
 public:
99
  typedef std::function<void(InboundCall*)> CallProcessedListener;
100
101
  InboundCall(ConnectionPtr conn, RpcMetrics* rpc_metrics,
102
              CallProcessedListener call_processed_listener);
103
  virtual ~InboundCall();
104
105
  void SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value);
106
107
  // Return the serialized request parameter protobuf.
108
24.2M
  const Slice &serialized_request() const {
109
24.2M
    return serialized_request_;
110
24.2M
  }
111
112
29.8M
  void set_method_index(size_t value) {
113
29.8M
    method_index_ = value;
114
29.8M
  }
115
116
25.2M
  size_t method_index() const {
117
25.2M
    return method_index_;
118
25.2M
  }
119
120
  virtual const Endpoint& remote_address() const;
121
  virtual const Endpoint& local_address() const;
122
123
  ConnectionPtr connection() const;
124
  ConnectionContext& connection_context() const;
125
126
  Trace* trace();
127
128
  // When this InboundCall was received (instantiated).
129
  // Should only be called once on a given instance.
130
  // Not thread-safe. Should only be called by the current "owner" thread.
131
  void RecordCallReceived();
132
133
  // When RPC call Handle() was called on the server side.
134
  // Updates the Histogram with time elapsed since the call was received,
135
  // and should only be called once on a given instance.
136
  // Not thread-safe. Should only be called by the current "owner" thread.
137
  virtual void RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time);
138
139
  // When RPC call Handle() completed execution on the server side.
140
  // Updates the Histogram with time elapsed since the call was started,
141
  // and should only be called once on a given instance.
142
  // Not thread-safe. Should only be called by the current "owner" thread.
143
  void RecordHandlingCompleted();
144
145
  // Return true if the deadline set by the client has already elapsed.
146
  // In this case, the server may stop processing the call, since the
147
  // call response will be ignored anyway.
148
  bool ClientTimedOut() const;
149
150
  // Return an upper bound on the client timeout deadline. This does not
151
  // account for transmission delays between the client and the server.
152
  // If the client did not specify a deadline, returns MonoTime::Max().
153
  virtual CoarseTimePoint GetClientDeadline() const = 0;
154
155
  virtual void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) = 0;
156
157
  // Returns the time spent in the service queue -- from the time the call was received, until
158
  // it gets handled.
159
  MonoDelta GetTimeInQueue() const;
160
161
  ThreadPoolTask* BindTask(InboundCallHandler* handler);
162
163
1
  void ResetCallProcessedListener() {
164
1
    call_processed_listener_ = decltype(call_processed_listener_)();
165
1
  }
166
167
  virtual Slice serialized_remote_method() const = 0;
168
  virtual Slice method_name() const = 0;
169
170
//  virtual const std::string& service_name() const = 0;
171
  virtual void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, const Status& status) = 0;
172
173
  // Do appropriate actions when call is timed out.
174
  //
175
  // message contains human readable information on why call timed out.
176
  //
177
  // Returns true if actions were applied, false if call was already processed.
178
  bool RespondTimedOutIfPending(const char* message);
179
180
29.8M
  bool TryStartProcessing() {
181
29.8M
    bool expected = false;
182
29.8M
    if (!processing_started_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
183
11.8k
      return false;
184
11.8k
    }
185
29.8M
    if (tracker_) {
186
29.8M
      tracker_->CallDequeued();
187
29.8M
    }
188
29.8M
    return true;
189
29.8M
  }
190
191
  std::string LogPrefix() const override;
192
193
  template <class T, class ...Args>
194
29.8M
  static std::shared_ptr<T> Create(Args&&... args) {
195
29.8M
    auto result = std::make_shared<T>(std::forward<Args>(args)...);
196
29.8M
    result->RecordCallReceived();
197
29.8M
    return result;
198
29.8M
  }
_ZN2yb3rpc11InboundCall6CreateINS_9cqlserver14CQLInboundCallEJRKNSt3__110shared_ptrINS0_10ConnectionEEENS5_8functionIFvPS1_EEERNS6_INS_2ql9QLSessionEEEEEENS6_IT_EEDpOT0_
Line
Count
Source
194
4.56M
  static std::shared_ptr<T> Create(Args&&... args) {
195
4.56M
    auto result = std::make_shared<T>(std::forward<Args>(args)...);
196
4.56M
    result->RecordCallReceived();
197
4.56M
    return result;
198
4.56M
  }
_ZN2yb3rpc11InboundCall6CreateINS_11redisserver16RedisInboundCallEJRKNSt3__110shared_ptrINS0_10ConnectionEEEmNS5_8functionIFvPS1_EEEEEENS6_IT_EEDpOT0_
Line
Count
Source
194
104k
  static std::shared_ptr<T> Create(Args&&... args) {
195
104k
    auto result = std::make_shared<T>(std::forward<Args>(args)...);
196
104k
    result->RecordCallReceived();
197
104k
    return result;
198
104k
  }
_ZN2yb3rpc11InboundCall6CreateINS0_18LocalYBInboundCallEJPNS0_10RpcMetricsERKNS0_12RemoteMethodERNSt3__110shared_ptrINS0_17LocalOutboundCallEEERKNS9_6chrono10time_pointINS_15CoarseMonoClockENSE_8durationIxNS9_5ratioILl1ELl1000000000EEEEEEEEEENSA_IT_EEDpOT0_
Line
Count
Source
194
5.59M
  static std::shared_ptr<T> Create(Args&&... args) {
195
5.59M
    auto result = std::make_shared<T>(std::forward<Args>(args)...);
196
5.59M
    result->RecordCallReceived();
197
5.59M
    return result;
198
5.59M
  }
_ZN2yb3rpc11InboundCall6CreateINS0_13YBInboundCallEJRKNSt3__110shared_ptrINS0_10ConnectionEEENS4_8functionIFvPS1_EEEEEENS5_IT_EEDpOT0_
Line
Count
Source
194
19.5M
  static std::shared_ptr<T> Create(Args&&... args) {
195
19.5M
    auto result = std::make_shared<T>(std::forward<Args>(args)...);
196
19.5M
    result->RecordCallReceived();
197
19.5M
    return result;
198
19.5M
  }
199
200
  size_t DynamicMemoryUsage() const override;
201
202
  void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override final;
203
204
38.9M
  const CallData& request_data() const { return request_data_; }
205
206
 protected:
207
  void NotifyTransferred(const Status& status, Connection* conn) override;
208
209
  virtual void Clear();
210
211
  // Log a WARNING message if the RPC response was slow enough that the
212
  // client likely timed out. This is based on the client-provided timeout
213
  // value.
214
  // Also can be configured to log _all_ RPC traces for help debugging.
215
  virtual void LogTrace() const = 0;
216
217
  void QueueResponse(bool is_success);
218
219
  // The serialized bytes of the request param protobuf. Set by ParseFrom().
220
  // This references memory held by 'request_data_'.
221
  Slice serialized_request_;
222
223
  // Data source of this call.
224
  CallData request_data_;
225
  std::atomic<size_t> request_data_memory_usage_{0};
226
227
  // The trace buffer.
228
  scoped_refptr<Trace> trace_;
229
230
  // Timing information related to this RPC call.
231
  InboundCallTiming timing_;
232
233
  std::atomic<bool> processing_started_{false};
234
235
  std::atomic<bool> responded_{false};
236
237
  scoped_refptr<Counter> rpc_method_response_bytes_;
238
  scoped_refptr<Histogram> rpc_method_handler_latency_;
239
240
 private:
241
  // The connection on which this inbound call arrived. Can be null for LocalYBInboundCall.
242
  ConnectionPtr conn_ = nullptr;
243
  RpcMetrics* rpc_metrics_;
244
  std::function<void(InboundCall*)> call_processed_listener_;
245
246
  class InboundCallTask : public ThreadPoolTask {
247
   public:
248
29.8M
    void Bind(InboundCallHandler* handler, InboundCallPtr call) {
249
29.8M
      handler_ = handler;
250
29.8M
      call_ = std::move(call);
251
29.8M
    }
252
253
    void Run() override;
254
255
    void Done(const Status& status) override;
256
257
29.8M
    virtual ~InboundCallTask() = default;
258
259
   private:
260
    InboundCallHandler* handler_;
261
    InboundCallPtr call_;
262
  };
263
264
  InboundCallTask task_;
265
  InboundCallHandler* tracker_ = nullptr;
266
267
  size_t method_index_ = 0;
268
269
  DISALLOW_COPY_AND_ASSIGN(InboundCall);
270
};
271
272
}  // namespace rpc
273
}  // namespace yb
274
275
#endif  // YB_RPC_INBOUND_CALL_H_