/Users/deen/code/yugabyte-db/src/yb/rpc/yb_rpc.h
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #ifndef YB_RPC_YB_RPC_H |
17 | | #define YB_RPC_YB_RPC_H |
18 | | |
19 | | #include <stdint.h> |
20 | | |
21 | | #include <cstdint> |
22 | | #include <cstdlib> |
23 | | #include <string> |
24 | | #include <type_traits> |
25 | | |
26 | | #include <boost/version.hpp> |
27 | | |
28 | | #include "yb/rpc/rpc_fwd.h" |
29 | | #include "yb/rpc/binary_call_parser.h" |
30 | | #include "yb/rpc/circular_read_buffer.h" |
31 | | #include "yb/rpc/connection_context.h" |
32 | | #include "yb/rpc/rpc_with_call_id.h" |
33 | | #include "yb/rpc/serialization.h" |
34 | | |
35 | | #include "yb/util/ev_util.h" |
36 | | #include "yb/util/net/net_fwd.h" |
37 | | #include "yb/util/size_literals.h" |
38 | | |
39 | | namespace yb { |
40 | | namespace rpc { |
41 | | |
42 | | class YBConnectionContext : public ConnectionContextWithCallId, public BinaryCallParserListener { |
43 | | public: |
44 | | YBConnectionContext( |
45 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker, |
46 | | const MemTrackerPtr& call_tracker); |
47 | | ~YBConnectionContext(); |
48 | | |
49 | 58.2M | const MemTrackerPtr& call_tracker() const { return call_tracker_; } |
50 | | |
51 | | void SetEventLoop(ev::loop_ref* loop) override; |
52 | | |
53 | | void Shutdown(const Status& status) override; |
54 | | |
55 | | protected: |
56 | 38.4M | BinaryCallParser& parser() { return parser_; } |
57 | | |
58 | | ev::loop_ref* loop_ = nullptr; |
59 | | |
60 | | EvTimerHolder timer_; |
61 | | |
62 | | private: |
63 | | uint64_t ExtractCallId(InboundCall* call) override; |
64 | | |
65 | 288M | StreamReadBuffer& ReadBuffer() override { |
66 | 288M | return read_buffer_; |
67 | 288M | } |
68 | | |
69 | | BinaryCallParser parser_; |
70 | | |
71 | | CircularReadBuffer read_buffer_; |
72 | | |
73 | | const MemTrackerPtr call_tracker_; |
74 | | }; |
75 | | |
76 | | class YBInboundConnectionContext : public YBConnectionContext { |
77 | | public: |
78 | | YBInboundConnectionContext( |
79 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker, |
80 | | const MemTrackerPtr& call_tracker) |
81 | 304k | : YBConnectionContext(receive_buffer_size, buffer_tracker, call_tracker) {} |
82 | | |
83 | 11.6k | static std::string Name() { return "Inbound RPC"; } |
84 | | private: |
85 | | // Takes ownership of call_data content. |
86 | | CHECKED_STATUS HandleCall(const ConnectionPtr& connection, CallData* call_data) override; |
87 | | void Connected(const ConnectionPtr& connection) override; |
88 | | Result<ProcessCallsResult> ProcessCalls(const ConnectionPtr& connection, |
89 | | const IoVecs& data, |
90 | | ReadBufferFull read_buffer_full) override; |
91 | | |
92 | | // Takes ownership of call_data content. |
93 | | CHECKED_STATUS HandleInboundCall(const ConnectionPtr& connection, std::vector<char>* call_data); |
94 | | |
95 | | void HandleTimeout(ev::timer& watcher, int revents); // NOLINT |
96 | | |
97 | 11 | RpcConnectionPB::StateType State() override { return state_; } |
98 | | |
99 | | RpcConnectionPB::StateType state_ = RpcConnectionPB::UNKNOWN; |
100 | | |
101 | | void UpdateLastWrite(const ConnectionPtr& connection) override; |
102 | | |
103 | | std::weak_ptr<Connection> connection_; |
104 | | |
105 | | // Last time data was sent to network layer below application. |
106 | | CoarseTimePoint last_write_time_; |
107 | | // Last time we queued heartbeat for sending. |
108 | | CoarseTimePoint last_heartbeat_sending_time_; |
109 | | }; |
110 | | |
111 | | class YBInboundCall : public InboundCall { |
112 | | public: |
113 | | YBInboundCall(ConnectionPtr conn, CallProcessedListener call_processed_listener); |
114 | | explicit YBInboundCall(RpcMetrics* rpc_metrics, const RemoteMethod& remote_method); |
115 | | virtual ~YBInboundCall(); |
116 | | |
117 | | // Is this a local call? |
118 | 19.3M | virtual bool IsLocalCall() const { return false; } |
119 | | |
120 | | // Parse an inbound call message. |
121 | | // |
122 | | // This only deserializes the call header, populating the 'header_' and |
123 | | // 'serialized_request_' member variables. The actual call parameter is |
124 | | // not deserialized, as this may be CPU-expensive, and this is called |
125 | | // from the reactor thread. |
126 | | // |
127 | | // Takes ownership of call_data content. |
128 | | CHECKED_STATUS ParseFrom(const MemTrackerPtr& mem_tracker, CallData* call_data); |
129 | | |
130 | 39.1M | int32_t call_id() const { |
131 | 39.1M | return header_.call_id; |
132 | 39.1M | } |
133 | | |
134 | 25.2M | Slice serialized_remote_method() const override { |
135 | 25.2M | return header_.remote_method; |
136 | 25.2M | } |
137 | | |
138 | | Slice method_name() const override; |
139 | | |
140 | | // See RpcContext::AddRpcSidecar() |
141 | | virtual size_t AddRpcSidecar(Slice car); |
142 | | |
143 | | // See RpcContext::ResetRpcSidecars() |
144 | | void ResetRpcSidecars(); |
145 | | |
146 | | void ReserveSidecarSpace(size_t space); |
147 | | |
148 | | // Serializes 'response' into the InboundCall's internal buffer, and marks |
149 | | // the call as a success. Enqueues the response back to the connection |
150 | | // that made the call. |
151 | | // |
152 | | // This method deletes the InboundCall object, so no further calls may be |
153 | | // made after this one. |
154 | | void RespondSuccess(AnyMessageConstPtr response); |
155 | | |
156 | | // Serializes a failure response into the internal buffer, marking the |
157 | | // call as a failure. Enqueues the response back to the connection that |
158 | | // made the call. |
159 | | // |
160 | | // This method deletes the InboundCall object, so no further calls may be |
161 | | // made after this one. |
162 | | void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, |
163 | | const Status &status) override; |
164 | | |
165 | | void RespondApplicationError(int error_ext_id, const std::string& message, |
166 | | const google::protobuf::MessageLite& app_error_pb); |
167 | | |
168 | | // Convert an application error extension to an ErrorStatusPB. |
169 | | // These ErrorStatusPB objects are what are returned in application error responses. |
170 | | static void ApplicationErrorToPB(int error_ext_id, const std::string& message, |
171 | | const google::protobuf::MessageLite& app_error_pb, |
172 | | ErrorStatusPB* err); |
173 | | |
174 | | // Serialize the response packet for the finished call. |
175 | | // The resulting slices refer to memory in this object. |
176 | | void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) override; |
177 | | |
178 | | void LogTrace() const override; |
179 | | std::string ToString() const override; |
180 | | bool DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp) override; |
181 | | |
182 | | CoarseTimePoint GetClientDeadline() const override; |
183 | | |
184 | 0 | MonoTime ReceiveTime() const { |
185 | 0 | return timing_.time_received; |
186 | 0 | } |
187 | | |
188 | | virtual CHECKED_STATUS ParseParam(RpcCallParams* params); |
189 | | |
190 | 19.5M | size_t ObjectSize() const override { return sizeof(*this); } |
191 | | |
192 | 19.6M | size_t DynamicMemoryUsage() const override { |
193 | 19.6M | return InboundCall::DynamicMemoryUsage() + DynamicMemoryUsageOf(response_buf_); |
194 | 19.6M | } |
195 | | |
196 | | protected: |
197 | | // Fields to store sidecars state. See rpc/rpc_sidecar.h for more info. |
198 | | size_t num_sidecars_ = 0; |
199 | | size_t filled_bytes_in_last_sidecar_buffer_ = 0; |
200 | | size_t total_sidecars_size_ = 0; |
201 | | boost::container::small_vector<RefCntBuffer, kMinBufferForSidecarSlices> sidecar_buffers_; |
202 | | google::protobuf::RepeatedField<uint32_t> sidecar_offsets_; |
203 | | |
204 | | // Serialize and queue the response. |
205 | | virtual void Respond(AnyMessageConstPtr response, bool is_success); |
206 | | |
207 | | private: |
208 | | // Serialize a response message for either success or failure. If it is a success, |
209 | | // 'response' should be the user-defined response type for the call. If it is a |
210 | | // failure, 'response' should be an ErrorStatusPB instance. |
211 | | CHECKED_STATUS SerializeResponseBuffer(AnyMessageConstPtr response, bool is_success); |
212 | | |
213 | | // Returns number of bytes copied. |
214 | | size_t CopyToLastSidecarBuffer(const Slice& slice); |
215 | | void AllocateSidecarBuffer(size_t size); |
216 | | |
217 | | // The header of the incoming call. Set by ParseFrom() |
218 | | ParsedRequestHeader header_; |
219 | | |
220 | | // The buffers for serialized response. Set by SerializeResponseBuffer(). |
221 | | RefCntBuffer response_buf_; |
222 | | |
223 | | ScopedTrackedConsumption consumption_; |
224 | | }; |
225 | | |
226 | | class YBOutboundConnectionContext : public YBConnectionContext { |
227 | | public: |
228 | | YBOutboundConnectionContext( |
229 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker, |
230 | | const MemTrackerPtr& call_tracker) |
231 | 629k | : YBConnectionContext(receive_buffer_size, buffer_tracker, call_tracker) {} |
232 | | |
233 | 21.3k | static std::string Name() { return "Outbound RPC"; } |
234 | | |
235 | | private: |
236 | 10 | RpcConnectionPB::StateType State() override { |
237 | 10 | return RpcConnectionPB::OPEN; |
238 | 10 | } |
239 | | |
240 | | // Takes ownership of call_data content. |
241 | | CHECKED_STATUS HandleCall(const ConnectionPtr& connection, CallData* call_data) override; |
242 | | void Connected(const ConnectionPtr& connection) override; |
243 | | void AssignConnection(const ConnectionPtr& connection) override; |
244 | | Result<ProcessCallsResult> ProcessCalls(const ConnectionPtr& connection, |
245 | | const IoVecs& data, |
246 | | ReadBufferFull read_buffer_full) override; |
247 | | |
248 | | void UpdateLastRead(const ConnectionPtr& connection) override; |
249 | | |
250 | | void HandleTimeout(ev::timer& watcher, int revents); // NOLINT |
251 | | |
252 | | std::weak_ptr<Connection> connection_; |
253 | | |
254 | | CoarseTimePoint last_read_time_; |
255 | | }; |
256 | | |
257 | | } // namespace rpc |
258 | | } // namespace yb |
259 | | |
260 | | #endif // YB_RPC_YB_RPC_H |