/Users/deen/code/yugabyte-db/src/yb/rpc/outbound_call.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_RPC_OUTBOUND_CALL_H_ |
33 | | #define YB_RPC_OUTBOUND_CALL_H_ |
34 | | |
35 | | #include <stdint.h> |
36 | | |
37 | | #include <cstdint> |
38 | | #include <cstdlib> |
39 | | #include <deque> |
40 | | #include <memory> |
41 | | #include <string> |
42 | | #include <thread> |
43 | | #include <type_traits> |
44 | | #include <vector> |
45 | | |
46 | | #include <boost/functional/hash.hpp> |
47 | | #include <gflags/gflags_declare.h> |
48 | | #include <glog/logging.h> |
49 | | |
50 | | #include "yb/gutil/integral_types.h" |
51 | | #include "yb/gutil/macros.h" |
52 | | |
53 | | #include "yb/rpc/rpc_fwd.h" |
54 | | #include "yb/rpc/call_data.h" |
55 | | #include "yb/rpc/constants.h" |
56 | | #include "yb/rpc/lightweight_message.h" |
57 | | #include "yb/rpc/remote_method.h" |
58 | | #include "yb/rpc/rpc_call.h" |
59 | | #include "yb/rpc/rpc_header.pb.h" |
60 | | #include "yb/rpc/serialization.h" |
61 | | #include "yb/rpc/rpc_introspection.pb.h" |
62 | | #include "yb/rpc/service_if.h" |
63 | | #include "yb/rpc/thread_pool.h" |
64 | | |
65 | | #include "yb/util/status_fwd.h" |
66 | | #include "yb/util/atomic.h" |
67 | | #include "yb/util/locks.h" |
68 | | #include "yb/util/mem_tracker.h" |
69 | | #include "yb/util/memory/memory_usage.h" |
70 | | #include "yb/util/monotime.h" |
71 | | #include "yb/util/net/sockaddr.h" |
72 | | #include "yb/util/object_pool.h" |
73 | | #include "yb/util/ref_cnt_buffer.h" |
74 | | #include "yb/util/shared_lock.h" |
75 | | #include "yb/util/slice.h" |
76 | | #include "yb/util/trace.h" |
77 | | |
78 | | namespace google { |
79 | | namespace protobuf { |
80 | | class Message; |
81 | | } // namespace protobuf |
82 | | } // namespace google |
83 | | |
84 | | namespace yb { |
85 | | namespace rpc { |
86 | | |
87 | | // Used to key on Connection information. |
88 | | // For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual. |
89 | | // This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that). |
90 | | class ConnectionId { |
91 | | public: |
92 | 86.4M | ConnectionId() {} |
93 | | |
94 | | // Convenience constructor. |
95 | | ConnectionId(const Endpoint& remote, size_t idx, const Protocol* protocol) |
96 | 98.6M | : remote_(remote), idx_(idx), protocol_(protocol) {} |
97 | | |
98 | | // The remote address. |
99 | 238M | const Endpoint& remote() const { return remote_; } |
100 | 225M | uint8_t idx() const { return idx_; } |
101 | 153M | const Protocol* protocol() const { return protocol_; } |
102 | | |
103 | | // Returns a string representation of the object, not including the password field. |
104 | | std::string ToString() const; |
105 | | |
106 | | size_t HashCode() const; |
107 | | |
108 | | private: |
109 | | // Remember to update HashCode() and Equals() when new fields are added. |
110 | | Endpoint remote_; |
111 | | uint8_t idx_ = 0; // Connection index, used to support multiple connections to the same server. |
112 | | const Protocol* protocol_ = nullptr; |
113 | | }; |
114 | | |
115 | | class ConnectionIdHash { |
116 | | public: |
117 | | std::size_t operator() (const ConnectionId& conn_id) const; |
118 | | }; |
119 | | |
120 | 76.7M | inline bool operator==(const ConnectionId& lhs, const ConnectionId& rhs) { |
121 | 76.7M | return lhs.remote() == rhs.remote() && lhs.idx() == rhs.idx()75.0M && lhs.protocol() == rhs.protocol()74.9M ; |
122 | 76.7M | } |
123 | | |
124 | | // Container for OutboundCall metrics |
125 | | struct OutboundCallMetrics { |
126 | | explicit OutboundCallMetrics(const scoped_refptr<MetricEntity>& metric_entity); |
127 | | |
128 | | scoped_refptr<Histogram> queue_time; |
129 | | scoped_refptr<Histogram> send_time; |
130 | | scoped_refptr<Histogram> time_to_response; |
131 | | }; |
132 | | |
133 | | // A response to a call, on the client side. |
134 | | // Upon receiving a response, this is allocated in the reactor thread and filled |
135 | | // into the OutboundCall instance via OutboundCall::SetResponse. |
136 | | // |
137 | | // This may either be a success or error response. |
138 | | // |
139 | | // This class takes care of separating out the distinct payload slices sent |
140 | | // over. |
141 | | class CallResponse { |
142 | | public: |
143 | | CallResponse(); |
144 | | |
145 | | CallResponse(CallResponse&& rhs) = default; |
146 | 72.7M | CallResponse& operator=(CallResponse&& rhs) = default; |
147 | | |
148 | | // Parse the response received from a call. This must be called before any |
149 | | // other methods on this object. Takes ownership of data content. |
150 | | CHECKED_STATUS ParseFrom(CallData* data); |
151 | | |
152 | | // Return true if the call succeeded. |
153 | 72.5M | bool is_success() const { |
154 | 72.5M | DCHECK(parsed_); |
155 | 72.5M | return !header_.is_error(); |
156 | 72.5M | } |
157 | | |
158 | | // Return the call ID that this response is related to. |
159 | 72.8M | int32_t call_id() const { |
160 | 72.8M | DCHECK(parsed_); |
161 | 72.8M | return header_.call_id(); |
162 | 72.8M | } |
163 | | |
164 | | // Return the serialized response data. This is just the response "body" -- |
165 | | // either a serialized ErrorStatusPB, or the serialized user response protobuf. |
166 | 72.8M | const Slice &serialized_response() const { |
167 | 72.8M | DCHECK(parsed_); |
168 | 72.8M | return serialized_response_; |
169 | 72.8M | } |
170 | | |
171 | | Result<const uint8_t*const*> GetSidecarPtr(size_t idx) const; |
172 | | |
173 | | Result<SidecarHolder> GetSidecarHolder(size_t idx) const; |
174 | | |
175 | 72.6M | size_t DynamicMemoryUsage() const { |
176 | 72.6M | return DynamicMemoryUsageOf(header_, response_data_) + |
177 | 72.6M | GetFlatDynamicMemoryUsageOf(sidecar_bounds_); |
178 | 72.6M | } |
179 | | |
180 | | private: |
181 | | // True once ParseFrom() is called. |
182 | | bool parsed_; |
183 | | |
184 | | // The parsed header. |
185 | | ResponseHeader header_; |
186 | | |
187 | | // The slice of data for the encoded protobuf response. |
188 | | // This slice refers to memory allocated by transfer_ |
189 | | Slice serialized_response_; |
190 | | |
191 | | // Slices of data for rpc sidecars. They point into memory owned by transfer_. |
192 | | // Number of sidecars chould be obtained from header_. |
193 | | boost::container::small_vector<const uint8_t*, kMinBufferForSidecarSlices> sidecar_bounds_; |
194 | | |
195 | | // The incoming transfer data - retained because serialized_response_ |
196 | | // and sidecar_slices_ refer into its data. |
197 | | CallData response_data_; |
198 | | |
199 | | DISALLOW_COPY_AND_ASSIGN(CallResponse); |
200 | | }; |
201 | | |
202 | | class InvokeCallbackTask : public rpc::ThreadPoolTask { |
203 | | public: |
204 | 86.5M | InvokeCallbackTask() {} |
205 | | |
206 | 66.8M | void SetOutboundCall(OutboundCallPtr call) { call_ = std::move(call); } |
207 | | |
208 | | void Run() override; |
209 | | |
210 | | void Done(const Status& status) override; |
211 | | |
212 | 86.3M | virtual ~InvokeCallbackTask() {} |
213 | | |
214 | | private: |
215 | | OutboundCallPtr call_; |
216 | | }; |
217 | | |
218 | | // Tracks the status of a call on the client side. |
219 | | // |
220 | | // This is an internal-facing class -- clients interact with the |
221 | | // RpcController class. |
222 | | // |
223 | | // This is allocated by the Proxy when a call is first created, |
224 | | // then passed to the reactor thread to send on the wire. It's typically |
225 | | // kept using a shared_ptr because a call may terminate in any number |
226 | | // of different threads, making it tricky to enforce single ownership. |
227 | | class OutboundCall : public RpcCall { |
228 | | public: |
229 | | OutboundCall(const RemoteMethod* remote_method, |
230 | | const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics, |
231 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
232 | | AnyMessagePtr response_storage, |
233 | | RpcController* controller, |
234 | | std::shared_ptr<RpcMetrics> rpc_metrics, |
235 | | ResponseCallback callback, |
236 | | ThreadPool* callback_thread_pool); |
237 | | |
238 | | virtual ~OutboundCall(); |
239 | | |
240 | | // Serialize the given request PB into this call's internal storage. |
241 | | // |
242 | | // Because the data is fully serialized by this call, 'req' may be |
243 | | // subsequently mutated with no ill effects. |
244 | | virtual CHECKED_STATUS SetRequestParam( |
245 | | AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker); |
246 | | |
247 | | // Serialize the call for the wire. Requires that SetRequestParam() |
248 | | // is called first. This is called from the Reactor thread. |
249 | | void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override; |
250 | | |
251 | | // Sets thread pool to be used by `InvokeCallback` for callback execution. |
252 | 0 | void SetCallbackThreadPool(ThreadPool* callback_thread_pool) { |
253 | 0 | callback_thread_pool_ = callback_thread_pool; |
254 | 0 | } |
255 | | |
256 | | // Callback after the call has been put on the outbound connection queue. |
257 | | void SetQueued(); |
258 | | |
259 | | // Update the call state to show that the request has been sent. |
260 | | // Could be called on already finished call in case it was already timed out. |
261 | | void SetSent(); |
262 | | |
263 | | // Outbound call could be moved to final state only once, |
264 | | // so only one of SetFinished/SetTimedOut/SetFailed/SetResponse can be called. |
265 | | |
266 | | // Update the call state to show that the call has finished. |
267 | | void SetFinished(); |
268 | | |
269 | | // Mark the call as failed. This also triggers the callback to notify |
270 | | // the caller. If the call failed due to a remote error, then err_pb |
271 | | // should be set to the error returned by the remote server. |
272 | | void SetFailed(const Status& status, std::unique_ptr<ErrorStatusPB> err_pb = nullptr); |
273 | | |
274 | | // Mark the call as timed out. This also triggers the callback to notify |
275 | | // the caller. |
276 | | void SetTimedOut(); |
277 | | |
278 | | // Fill in the call response. |
279 | | void SetResponse(CallResponse&& resp); |
280 | | |
281 | | bool IsTimedOut() const; |
282 | | |
283 | | // Is the call finished? |
284 | | bool IsFinished() const override final; |
285 | | |
286 | | std::string ToString() const override; |
287 | | |
288 | | bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) override; |
289 | | |
290 | | std::string LogPrefix() const override; |
291 | | |
292 | 75.5M | void SetConnectionId(const ConnectionId& value, const std::string* hostname) { |
293 | 75.5M | conn_id_ = value; |
294 | 75.5M | hostname_ = hostname; |
295 | 75.5M | } |
296 | | |
297 | 284 | void SetThreadPoolFailure(const Status& status) { |
298 | 284 | thread_pool_failure_ = status; |
299 | 284 | } |
300 | | |
301 | 25.6M | const Status& thread_pool_failure() const { |
302 | 25.6M | return thread_pool_failure_; |
303 | 25.6M | } |
304 | | |
305 | | void InvokeCallbackSync(); |
306 | | |
307 | | //////////////////////////////////////////////////////////// |
308 | | // Getters |
309 | | //////////////////////////////////////////////////////////// |
310 | | |
311 | 226M | const ConnectionId& conn_id() const { return conn_id_; } |
312 | 75.3M | const std::string& hostname() const { return *hostname_; } |
313 | 10.8M | const RemoteMethod& remote_method() const { return *remote_method_; } |
314 | 0 | const ResponseCallback& callback() const { return callback_; } |
315 | 161M | RpcController* controller() { return controller_; } |
316 | 0 | const RpcController* controller() const { return controller_; } |
317 | 21.6M | AnyMessagePtr response() const { return response_; } |
318 | | |
319 | 72.7M | int32_t call_id() const { |
320 | 72.7M | return call_id_; |
321 | 72.7M | } |
322 | | |
323 | 86.2M | Trace* trace() { |
324 | 86.2M | return trace_.get(); |
325 | 86.2M | } |
326 | | |
327 | 10.8M | RpcMetrics& rpc_metrics() { |
328 | 10.8M | return *rpc_metrics_; |
329 | 10.8M | } |
330 | | |
331 | 72.7M | size_t ObjectSize() const override { return sizeof(*this); } |
332 | | |
333 | 72.6M | size_t DynamicMemoryUsage() const override { |
334 | 72.6M | return DynamicMemoryUsageAllowSizeOf(error_pb_) + |
335 | 72.6M | DynamicMemoryUsageOf(buffer_, call_response_, trace_); |
336 | 72.6M | } |
337 | | |
338 | | protected: |
339 | | friend class RpcController; |
340 | | |
341 | | Result<Slice> GetSidecar(size_t idx) const; |
342 | | virtual Result<const uint8_t*const*> GetSidecarPtr(size_t idx) const; |
343 | | virtual Result<SidecarHolder> GetSidecarHolder(size_t idx) const; |
344 | | |
345 | | ConnectionId conn_id_; |
346 | | const std::string* hostname_; |
347 | | CoarseTimePoint start_; |
348 | | RpcController* controller_; |
349 | | // Pointer for the protobuf where the response should be written. |
350 | | // Can be used only while callback_ object is alive. |
351 | | AnyMessagePtr response_; |
352 | | |
353 | | // The trace buffer. |
354 | | scoped_refptr<Trace> trace_; |
355 | | |
356 | | private: |
357 | | friend class RpcController; |
358 | | |
359 | | typedef RpcCallState State; |
360 | | |
361 | | static std::string StateName(State state); |
362 | | |
363 | | void NotifyTransferred(const Status& status, Connection* conn) override; |
364 | | |
365 | | bool SetState(State new_state); |
366 | | State state() const; |
367 | | |
368 | | // Same as set_state, but requires that the caller already holds |
369 | | // lock_ |
370 | | void set_state_unlocked(State new_state); |
371 | | |
372 | | // return current status |
373 | | CHECKED_STATUS status() const; |
374 | | |
375 | | // Return the error protobuf, if a remote error occurred. |
376 | | // This will only be non-NULL if status().IsRemoteError(). |
377 | | const ErrorStatusPB* error_pb() const; |
378 | | |
379 | | CHECKED_STATUS InitHeader(RequestHeader* header); |
380 | | |
381 | | // Lock for state_ status_, error_pb_ fields, since they |
382 | | // may be mutated by the reactor thread while the client thread |
383 | | // reads them. |
384 | | mutable simple_spinlock lock_; |
385 | | std::atomic<State> state_ = {READY}; |
386 | | Status status_; |
387 | | std::unique_ptr<ErrorStatusPB> error_pb_; |
388 | | |
389 | | // Invokes the user-provided callback. Uses callback_thread_pool_ if set. |
390 | | void InvokeCallback(); |
391 | | |
392 | | Result<uint32_t> TimeoutMs() const; |
393 | | |
394 | | int32_t call_id_; |
395 | | |
396 | | // The remote method being called. |
397 | | const RemoteMethod* remote_method_; |
398 | | |
399 | | ResponseCallback callback_; |
400 | | |
401 | | InvokeCallbackTask callback_task_; |
402 | | |
403 | | ThreadPool* callback_thread_pool_; |
404 | | |
405 | | // Buffers for storing segments of the wire-format request. |
406 | | RefCntBuffer buffer_; |
407 | | |
408 | | // Consumption of buffer_. |
409 | | ScopedTrackedConsumption buffer_consumption_; |
410 | | |
411 | | // Once a response has been received for this call, contains that response. |
412 | | CallResponse call_response_; |
413 | | |
414 | | std::shared_ptr<OutboundCallMetrics> outbound_call_metrics_; |
415 | | |
416 | | std::shared_ptr<RpcMetrics> rpc_metrics_; |
417 | | |
418 | | Status thread_pool_failure_; |
419 | | |
420 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics_; |
421 | | |
422 | | DISALLOW_COPY_AND_ASSIGN(OutboundCall); |
423 | | }; |
424 | | |
425 | | class RpcErrorTag : public IntegralErrorTag<ErrorStatusPB::RpcErrorCodePB> { |
426 | | public: |
427 | | static constexpr uint8_t kCategory = 15; |
428 | | |
429 | 0 | static std::string ToMessage(Value value) { |
430 | 0 | return ErrorStatusPB::RpcErrorCodePB_Name(value); |
431 | 0 | } |
432 | | }; |
433 | | |
434 | | typedef StatusErrorCodeImpl<RpcErrorTag> RpcError; |
435 | | |
436 | | } // namespace rpc |
437 | | } // namespace yb |
438 | | |
439 | | #endif // YB_RPC_OUTBOUND_CALL_H_ |