/Users/deen/code/yugabyte-db/src/yb/rpc/inbound_call.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_RPC_INBOUND_CALL_H_ |
33 | | #define YB_RPC_INBOUND_CALL_H_ |
34 | | |
35 | | #include <string> |
36 | | #include <vector> |
37 | | |
38 | | #include <glog/logging.h> |
39 | | |
40 | | #include "yb/gutil/stl_util.h" |
41 | | #include "yb/gutil/macros.h" |
42 | | #include "yb/gutil/ref_counted.h" |
43 | | |
44 | | #include "yb/rpc/rpc_fwd.h" |
45 | | #include "yb/rpc/call_data.h" |
46 | | #include "yb/rpc/rpc_call.h" |
47 | | #include "yb/rpc/remote_method.h" |
48 | | #include "yb/rpc/rpc_header.pb.h" |
49 | | #include "yb/rpc/thread_pool.h" |
50 | | |
51 | | #include "yb/yql/cql/ql/ql_session.h" |
52 | | |
53 | | #include "yb/util/faststring.h" |
54 | | #include "yb/util/lockfree.h" |
55 | | #include "yb/util/metrics_fwd.h" |
56 | | #include "yb/util/memory/memory.h" |
57 | | #include "yb/util/monotime.h" |
58 | | #include "yb/util/net/net_fwd.h" |
59 | | #include "yb/util/ref_cnt_buffer.h" |
60 | | #include "yb/util/slice.h" |
61 | | #include "yb/util/status_fwd.h" |
62 | | |
63 | | namespace google { |
64 | | namespace protobuf { |
65 | | class Message; |
66 | | } // namespace protobuf |
67 | | } // namespace google |
68 | | |
69 | | namespace yb { |
70 | | |
71 | | class Histogram; |
72 | | class Trace; |
73 | | |
74 | | namespace rpc { |
75 | | |
76 | | struct InboundCallTiming { |
77 | | MonoTime time_received; // Time the call was first accepted. |
78 | | MonoTime time_handled; // Time the call handler was kicked off. |
79 | | MonoTime time_completed; // Time the call handler completed. |
80 | | }; |
81 | | |
82 | | class InboundCallHandler { |
83 | | public: |
84 | | virtual void Handle(InboundCallPtr call) = 0; |
85 | | |
86 | | virtual void Failure(const InboundCallPtr& call, const Status& status) = 0; |
87 | | |
88 | | virtual bool CallQueued() = 0; |
89 | | |
90 | | virtual void CallDequeued() = 0; |
91 | | |
92 | | protected: |
93 | | ~InboundCallHandler() = default; |
94 | | }; |
95 | | |
96 | | // Inbound call on server |
97 | | class InboundCall : public RpcCall, public MPSCQueueEntry<InboundCall> { |
98 | | public: |
99 | | typedef std::function<void(InboundCall*)> CallProcessedListener; |
100 | | |
101 | | InboundCall(ConnectionPtr conn, RpcMetrics* rpc_metrics, |
102 | | CallProcessedListener call_processed_listener); |
103 | | virtual ~InboundCall(); |
104 | | |
105 | | void SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value); |
106 | | |
107 | | // Return the serialized request parameter protobuf. |
108 | 24.2M | const Slice &serialized_request() const { |
109 | 24.2M | return serialized_request_; |
110 | 24.2M | } |
111 | | |
112 | 29.8M | void set_method_index(size_t value) { |
113 | 29.8M | method_index_ = value; |
114 | 29.8M | } |
115 | | |
116 | 25.2M | size_t method_index() const { |
117 | 25.2M | return method_index_; |
118 | 25.2M | } |
119 | | |
120 | | virtual const Endpoint& remote_address() const; |
121 | | virtual const Endpoint& local_address() const; |
122 | | |
123 | | ConnectionPtr connection() const; |
124 | | ConnectionContext& connection_context() const; |
125 | | |
126 | | Trace* trace(); |
127 | | |
128 | | // When this InboundCall was received (instantiated). |
129 | | // Should only be called once on a given instance. |
130 | | // Not thread-safe. Should only be called by the current "owner" thread. |
131 | | void RecordCallReceived(); |
132 | | |
133 | | // When RPC call Handle() was called on the server side. |
134 | | // Updates the Histogram with time elapsed since the call was received, |
135 | | // and should only be called once on a given instance. |
136 | | // Not thread-safe. Should only be called by the current "owner" thread. |
137 | | virtual void RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time); |
138 | | |
139 | | // When RPC call Handle() completed execution on the server side. |
140 | | // Updates the Histogram with time elapsed since the call was started, |
141 | | // and should only be called once on a given instance. |
142 | | // Not thread-safe. Should only be called by the current "owner" thread. |
143 | | void RecordHandlingCompleted(); |
144 | | |
145 | | // Return true if the deadline set by the client has already elapsed. |
146 | | // In this case, the server may stop processing the call, since the |
147 | | // call response will be ignored anyway. |
148 | | bool ClientTimedOut() const; |
149 | | |
150 | | // Return an upper bound on the client timeout deadline. This does not |
151 | | // account for transmission delays between the client and the server. |
152 | | // If the client did not specify a deadline, returns MonoTime::Max(). |
153 | | virtual CoarseTimePoint GetClientDeadline() const = 0; |
154 | | |
155 | | virtual void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) = 0; |
156 | | |
157 | | // Returns the time spent in the service queue -- from the time the call was received, until |
158 | | // it gets handled. |
159 | | MonoDelta GetTimeInQueue() const; |
160 | | |
161 | | ThreadPoolTask* BindTask(InboundCallHandler* handler); |
162 | | |
163 | 1 | void ResetCallProcessedListener() { |
164 | 1 | call_processed_listener_ = decltype(call_processed_listener_)(); |
165 | 1 | } |
166 | | |
167 | | virtual Slice serialized_remote_method() const = 0; |
168 | | virtual Slice method_name() const = 0; |
169 | | |
170 | | // virtual const std::string& service_name() const = 0; |
171 | | virtual void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, const Status& status) = 0; |
172 | | |
173 | | // Do appropriate actions when call is timed out. |
174 | | // |
175 | | // message contains human readable information on why call timed out. |
176 | | // |
177 | | // Returns true if actions were applied, false if call was already processed. |
178 | | bool RespondTimedOutIfPending(const char* message); |
179 | | |
180 | 29.8M | bool TryStartProcessing() { |
181 | 29.8M | bool expected = false; |
182 | 29.8M | if (!processing_started_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
183 | 11.8k | return false; |
184 | 11.8k | } |
185 | 29.8M | if (tracker_) { |
186 | 29.8M | tracker_->CallDequeued(); |
187 | 29.8M | } |
188 | 29.8M | return true; |
189 | 29.8M | } |
190 | | |
191 | | std::string LogPrefix() const override; |
192 | | |
193 | | template <class T, class ...Args> |
194 | 29.8M | static std::shared_ptr<T> Create(Args&&... args) { |
195 | 29.8M | auto result = std::make_shared<T>(std::forward<Args>(args)...); |
196 | 29.8M | result->RecordCallReceived(); |
197 | 29.8M | return result; |
198 | 29.8M | } _ZN2yb3rpc11InboundCall6CreateINS_9cqlserver14CQLInboundCallEJRKNSt3__110shared_ptrINS0_10ConnectionEEENS5_8functionIFvPS1_EEERNS6_INS_2ql9QLSessionEEEEEENS6_IT_EEDpOT0_ Line | Count | Source | 194 | 4.56M | static std::shared_ptr<T> Create(Args&&... args) { | 195 | 4.56M | auto result = std::make_shared<T>(std::forward<Args>(args)...); | 196 | 4.56M | result->RecordCallReceived(); | 197 | 4.56M | return result; | 198 | 4.56M | } |
_ZN2yb3rpc11InboundCall6CreateINS_11redisserver16RedisInboundCallEJRKNSt3__110shared_ptrINS0_10ConnectionEEEmNS5_8functionIFvPS1_EEEEEENS6_IT_EEDpOT0_ Line | Count | Source | 194 | 104k | static std::shared_ptr<T> Create(Args&&... args) { | 195 | 104k | auto result = std::make_shared<T>(std::forward<Args>(args)...); | 196 | 104k | result->RecordCallReceived(); | 197 | 104k | return result; | 198 | 104k | } |
_ZN2yb3rpc11InboundCall6CreateINS0_18LocalYBInboundCallEJPNS0_10RpcMetricsERKNS0_12RemoteMethodERNSt3__110shared_ptrINS0_17LocalOutboundCallEEERKNS9_6chrono10time_pointINS_15CoarseMonoClockENSE_8durationIxNS9_5ratioILl1ELl1000000000EEEEEEEEEENSA_IT_EEDpOT0_ Line | Count | Source | 194 | 5.59M | static std::shared_ptr<T> Create(Args&&... args) { | 195 | 5.59M | auto result = std::make_shared<T>(std::forward<Args>(args)...); | 196 | 5.59M | result->RecordCallReceived(); | 197 | 5.59M | return result; | 198 | 5.59M | } |
_ZN2yb3rpc11InboundCall6CreateINS0_13YBInboundCallEJRKNSt3__110shared_ptrINS0_10ConnectionEEENS4_8functionIFvPS1_EEEEEENS5_IT_EEDpOT0_ Line | Count | Source | 194 | 19.5M | static std::shared_ptr<T> Create(Args&&... args) { | 195 | 19.5M | auto result = std::make_shared<T>(std::forward<Args>(args)...); | 196 | 19.5M | result->RecordCallReceived(); | 197 | 19.5M | return result; | 198 | 19.5M | } |
|
199 | | |
200 | | size_t DynamicMemoryUsage() const override; |
201 | | |
202 | | void Serialize(boost::container::small_vector_base<RefCntBuffer>* output) override final; |
203 | | |
204 | 38.9M | const CallData& request_data() const { return request_data_; } |
205 | | |
206 | | protected: |
207 | | void NotifyTransferred(const Status& status, Connection* conn) override; |
208 | | |
209 | | virtual void Clear(); |
210 | | |
211 | | // Log a WARNING message if the RPC response was slow enough that the |
212 | | // client likely timed out. This is based on the client-provided timeout |
213 | | // value. |
214 | | // Also can be configured to log _all_ RPC traces for help debugging. |
215 | | virtual void LogTrace() const = 0; |
216 | | |
217 | | void QueueResponse(bool is_success); |
218 | | |
219 | | // The serialized bytes of the request param protobuf. Set by ParseFrom(). |
220 | | // This references memory held by 'request_data_'. |
221 | | Slice serialized_request_; |
222 | | |
223 | | // Data source of this call. |
224 | | CallData request_data_; |
225 | | std::atomic<size_t> request_data_memory_usage_{0}; |
226 | | |
227 | | // The trace buffer. |
228 | | scoped_refptr<Trace> trace_; |
229 | | |
230 | | // Timing information related to this RPC call. |
231 | | InboundCallTiming timing_; |
232 | | |
233 | | std::atomic<bool> processing_started_{false}; |
234 | | |
235 | | std::atomic<bool> responded_{false}; |
236 | | |
237 | | scoped_refptr<Counter> rpc_method_response_bytes_; |
238 | | scoped_refptr<Histogram> rpc_method_handler_latency_; |
239 | | |
240 | | private: |
241 | | // The connection on which this inbound call arrived. Can be null for LocalYBInboundCall. |
242 | | ConnectionPtr conn_ = nullptr; |
243 | | RpcMetrics* rpc_metrics_; |
244 | | std::function<void(InboundCall*)> call_processed_listener_; |
245 | | |
246 | | class InboundCallTask : public ThreadPoolTask { |
247 | | public: |
248 | 29.8M | void Bind(InboundCallHandler* handler, InboundCallPtr call) { |
249 | 29.8M | handler_ = handler; |
250 | 29.8M | call_ = std::move(call); |
251 | 29.8M | } |
252 | | |
253 | | void Run() override; |
254 | | |
255 | | void Done(const Status& status) override; |
256 | | |
257 | 29.8M | virtual ~InboundCallTask() = default; |
258 | | |
259 | | private: |
260 | | InboundCallHandler* handler_; |
261 | | InboundCallPtr call_; |
262 | | }; |
263 | | |
264 | | InboundCallTask task_; |
265 | | InboundCallHandler* tracker_ = nullptr; |
266 | | |
267 | | size_t method_index_ = 0; |
268 | | |
269 | | DISALLOW_COPY_AND_ASSIGN(InboundCall); |
270 | | }; |
271 | | |
272 | | } // namespace rpc |
273 | | } // namespace yb |
274 | | |
275 | | #endif // YB_RPC_INBOUND_CALL_H_ |