/Users/deen/code/yugabyte-db/src/yb/rpc/connection.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_RPC_CONNECTION_H_ |
34 | | #define YB_RPC_CONNECTION_H_ |
35 | | |
36 | | #include <stdint.h> |
37 | | |
38 | | #include <atomic> |
39 | | #include <cstdint> |
40 | | #include <limits> |
41 | | #include <memory> |
42 | | #include <queue> |
43 | | #include <string> |
44 | | #include <thread> |
45 | | #include <type_traits> |
46 | | #include <unordered_map> |
47 | | #include <vector> |
48 | | |
49 | | #include <boost/container/small_vector.hpp> |
50 | | #include <boost/version.hpp> |
51 | | #include <ev++.h> |
52 | | #include <gflags/gflags_declare.h> |
53 | | #include <glog/logging.h> |
54 | | |
55 | | #include "yb/gutil/ref_counted.h" |
56 | | |
57 | | #include "yb/rpc/rpc_fwd.h" |
58 | | #include "yb/rpc/stream.h" |
59 | | |
60 | | #include "yb/util/metrics_fwd.h" |
61 | | #include "yb/util/enums.h" |
62 | | #include "yb/util/ev_util.h" |
63 | | #include "yb/util/locks.h" |
64 | | #include "yb/util/monotime.h" |
65 | | #include "yb/util/net/net_fwd.h" |
66 | | #include "yb/util/net/sockaddr.h" |
67 | | #include "yb/util/net/socket.h" |
68 | | #include "yb/util/status.h" |
69 | | #include "yb/util/strongly_typed_bool.h" |
70 | | |
71 | | namespace yb { |
72 | | namespace rpc { |
73 | | |
74 | | YB_DEFINE_ENUM(ConnectionDirection, (CLIENT)(SERVER)); |
75 | | |
76 | | typedef boost::container::small_vector_base<OutboundDataPtr> OutboundDataBatch; |
77 | | |
78 | | // |
79 | | // A connection between an endpoint and us. |
80 | | // |
81 | | // Inbound connections are created by AcceptorPools, which eventually schedule |
82 | | // RegisterConnection() to be called from the reactor thread. |
83 | | // |
84 | | // Outbound connections are created by the Reactor thread in order to service |
85 | | // outbound calls. |
86 | | // |
87 | | // Once a Connection is created, it can be used both for sending messages and |
88 | | // receiving them, but any given connection is explicitly a client or server. |
89 | | // If a pair of servers are making bidirectional RPCs, they will use two separate |
90 | | // TCP connections (and Connection objects). |
91 | | // |
92 | | // This class is not fully thread-safe. It is accessed only from the context of a |
93 | | // single Reactor except where otherwise specified. |
94 | | // |
95 | | class Connection final : public StreamContext, public std::enable_shared_from_this<Connection> { |
96 | | public: |
97 | | typedef ConnectionDirection Direction; |
98 | | |
99 | | // Create a new Connection. |
100 | | // reactor: the reactor that owns us. |
101 | | // remote: the address of the remote end |
102 | | // socket: the socket to take ownership of. |
103 | | // direction: whether we are the client or server side |
104 | | // context: context for this connection. Context is used by connection to handle |
105 | | // protocol specific actions, such as parsing of incoming data into calls. |
106 | | Connection(Reactor* reactor, |
107 | | std::unique_ptr<Stream> stream, |
108 | | Direction direction, |
109 | | RpcMetrics* rpc_metrics, |
110 | | std::unique_ptr<ConnectionContext> context); |
111 | | |
112 | | ~Connection(); |
113 | | |
114 | 908M | CoarseTimePoint last_activity_time() const { |
115 | 908M | return last_activity_time_; |
116 | 908M | } |
117 | | |
118 | | void UpdateLastActivity() override; |
119 | | |
120 | | // Returns true if we are not in the process of receiving or sending a |
121 | | // message, and we have no outstanding calls. |
122 | | // When reason_not_idle is specified it contains reason why this connection is not idle. |
123 | | bool Idle(std::string* reason_not_idle = nullptr) const; |
124 | | |
125 | | // A human-readable reason why the connection is not idle. Empty string if connection is idle. |
126 | | std::string ReasonNotIdle() const; |
127 | | |
128 | | // Fail any calls which are currently queued or awaiting response. |
129 | | // Prohibits any future calls (they will be failed immediately with this |
130 | | // same Status). |
131 | | void Shutdown(const Status& status); |
132 | | |
133 | | // Queue a new call to be made. If the queueing fails, the call will be |
134 | | // marked failed. |
135 | | // Takes ownership of the 'call' object regardless of whether it succeeds or fails. |
136 | | // This may be called from a non-reactor thread. |
137 | | void QueueOutboundCall(const OutboundCallPtr& call); |
138 | | |
139 | | // The address of the remote end of the connection. |
140 | | const Endpoint& remote() const; |
141 | | |
142 | | const Protocol* protocol() const; |
143 | | |
144 | | // The address of the local end of the connection. |
145 | | const Endpoint& local() const; |
146 | | |
147 | | void HandleTimeout(ev::timer& watcher, int revents); // NOLINT |
148 | | |
149 | | // Safe to be called from other threads. |
150 | | std::string ToString() const; |
151 | | |
152 | 4.48M | Direction direction() const { return direction_; } |
153 | | |
154 | | // Queue a call response back to the client on the server side. |
155 | | // |
156 | | // This is usually called by the IPC worker thread when the response is set, but in some |
157 | | // circumstances may also be called by the reactor thread (e.g. if the service has shut down). |
158 | | // In addition to this, its also called for processing events generated by the server. |
159 | | void QueueOutboundData(OutboundDataPtr outbound_data); |
160 | | |
161 | | void QueueOutboundDataBatch(const OutboundDataBatch& batch); |
162 | | |
163 | 1.42G | Reactor* reactor() const { return reactor_; } |
164 | | |
165 | | CHECKED_STATUS DumpPB(const DumpRunningRpcsRequestPB& req, |
166 | | RpcConnectionPB* resp); |
167 | | |
168 | | // Do appropriate actions after adding outbound call. |
169 | | void OutboundQueued(); |
170 | | |
171 | | // An incoming packet has completed on the client side. This parses the |
172 | | // call response, looks up the CallAwaitingResponse, and calls the |
173 | | // client callback. |
174 | | CHECKED_STATUS HandleCallResponse(CallData* call_data); |
175 | | |
176 | 109M | ConnectionContext& context() { return *context_; } |
177 | | |
178 | | void CallSent(OutboundCallPtr call); |
179 | | |
180 | | CHECKED_STATUS Start(ev::loop_ref* loop); |
181 | | |
182 | | // Try to parse already received data. |
183 | | void ParseReceived(); |
184 | | |
185 | | void Close(); |
186 | | |
187 | 79.9M | RpcMetrics& rpc_metrics() { |
188 | 79.9M | return *rpc_metrics_; |
189 | 79.9M | } |
190 | | |
191 | | private: |
192 | | CHECKED_STATUS DoWrite(); |
193 | | |
194 | | // Does actual outbound data queueing. Invoked in appropriate reactor thread. |
195 | | size_t DoQueueOutboundData(OutboundDataPtr call, bool batch); |
196 | | |
197 | | void ProcessResponseQueue(); |
198 | | |
199 | | // Stream context implementation |
200 | | void UpdateLastRead() override; |
201 | | |
202 | | void UpdateLastWrite() override; |
203 | | |
204 | | void Transferred(const OutboundDataPtr& data, const Status& status) override; |
205 | | void Destroy(const Status& status) override; |
206 | | Result<size_t> ProcessReceived(ReadBufferFull read_buffer_full) override; |
207 | | void Connected() override; |
208 | | StreamReadBuffer& ReadBuffer() override; |
209 | | |
210 | | std::string LogPrefix() const; |
211 | | |
212 | | // The reactor thread that created this connection. |
213 | | Reactor* const reactor_; |
214 | | |
215 | | std::unique_ptr<Stream> stream_; |
216 | | |
217 | | // whether we are client or server |
218 | | Direction direction_; |
219 | | |
220 | | // The last time we read or wrote from the socket. |
221 | | CoarseTimePoint last_activity_time_; |
222 | | |
223 | | // Calls which have been sent and are now waiting for a response. |
224 | | std::unordered_map<int32_t, OutboundCallPtr> awaiting_response_; |
225 | | |
226 | | // Starts as Status::OK, gets set to a shutdown status upon Shutdown(). Guarded by |
227 | | // outbound_data_queue_lock_. |
228 | | Status shutdown_status_; |
229 | | |
230 | | // We instantiate and store this metric instance at the level of connection, but not at the level |
231 | | // of the class emitting metrics (OutboundTransfer) as recommended in metrics.h. This is on |
232 | | // purpose, because OutboundTransfer is instantiated each time we need to send payload over a |
233 | | // connection and creating a metric instance each time could be a performance hit, because |
234 | | // it involves spin lock and search in a metrics map. Therefore we prepare metric instances |
235 | | // at connection level. |
236 | | scoped_refptr<Histogram> handler_latency_outbound_transfer_; |
237 | | |
238 | | struct ExpirationEntry { |
239 | | CoarseTimePoint time; |
240 | | std::weak_ptr<OutboundCall> call; |
241 | | // See Stream::Send for details. |
242 | | size_t handle; |
243 | | }; |
244 | | |
245 | | struct CompareExpiration { |
246 | 579M | bool operator()(const ExpirationEntry& lhs, const ExpirationEntry& rhs) const { |
247 | 579M | return rhs.time < lhs.time; |
248 | 579M | } |
249 | | }; |
250 | | |
251 | | std::priority_queue<ExpirationEntry, |
252 | | std::vector<ExpirationEntry>, |
253 | | CompareExpiration> expiration_queue_; |
254 | | |
255 | | EvTimerHolder timer_; |
256 | | |
257 | | simple_spinlock outbound_data_queue_lock_; |
258 | | |
259 | | // Responses we are going to process. |
260 | | std::vector<OutboundDataPtr> outbound_data_to_process_; |
261 | | |
262 | | // Responses that are currently being processed. |
263 | | // It could be in function variable, but declared as member for optimization. |
264 | | std::vector<OutboundDataPtr> outbound_data_being_processed_; |
265 | | |
266 | | std::shared_ptr<ReactorTask> process_response_queue_task_; |
267 | | |
268 | | // RPC related metrics. |
269 | | RpcMetrics* rpc_metrics_; |
270 | | |
271 | | // Connection is responsible for sending and receiving bytes. |
272 | | // Context is responsible for what to do with them. |
273 | | std::unique_ptr<ConnectionContext> context_; |
274 | | |
275 | | std::atomic<uint64_t> responded_call_count_{0}; |
276 | | }; |
277 | | |
278 | | } // namespace rpc |
279 | | } // namespace yb |
280 | | |
281 | | #endif // YB_RPC_CONNECTION_H_ |