/Users/deen/code/yugabyte-db/src/yb/rpc/messenger.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_RPC_MESSENGER_H_ |
33 | | #define YB_RPC_MESSENGER_H_ |
34 | | |
35 | | #include <stdint.h> |
36 | | |
37 | | #include <atomic> |
38 | | #include <list> |
39 | | #include <memory> |
40 | | #include <string> |
41 | | #include <unordered_map> |
42 | | #include <unordered_set> |
43 | | #include <vector> |
44 | | |
45 | | #include <gtest/gtest_prod.h> |
46 | | |
47 | | #include "yb/gutil/ref_counted.h" |
48 | | |
49 | | #include "yb/rpc/rpc_fwd.h" |
50 | | #include "yb/rpc/io_thread_pool.h" |
51 | | #include "yb/rpc/proxy_context.h" |
52 | | #include "yb/rpc/scheduler.h" |
53 | | |
54 | | #include "yb/util/metrics_fwd.h" |
55 | | #include "yb/util/status_fwd.h" |
56 | | #include "yb/util/async_util.h" |
57 | | #include "yb/util/atomic.h" |
58 | | #include "yb/util/locks.h" |
59 | | #include "yb/util/monotime.h" |
60 | | #include "yb/util/net/sockaddr.h" |
61 | | #include "yb/util/operation_counter.h" |
62 | | #include "yb/util/stack_trace.h" |
63 | | |
64 | | namespace yb { |
65 | | |
66 | | class MemTracker; |
67 | | class Socket; |
68 | | struct SourceLocation; |
69 | | |
70 | | namespace rpc { |
71 | | |
72 | | template <class ContextType> |
73 | | class ConnectionContextFactoryImpl; |
74 | | |
75 | | typedef std::unordered_map<const Protocol*, StreamFactoryPtr> StreamFactories; |
76 | | |
77 | | // Used to construct a Messenger. |
78 | | class MessengerBuilder { |
79 | | public: |
80 | | friend class Messenger; |
81 | | |
82 | | explicit MessengerBuilder(std::string name); |
83 | | ~MessengerBuilder(); |
84 | | |
85 | | MessengerBuilder(const MessengerBuilder&); |
86 | | |
87 | | // Set the length of time we will keep a TCP connection will alive with no traffic. |
88 | | MessengerBuilder &set_connection_keepalive_time(CoarseMonoClock::Duration keepalive); |
89 | | |
90 | | // Set the number of reactor threads that will be used for sending and receiving. |
91 | | MessengerBuilder &set_num_reactors(int num_reactors); |
92 | | |
93 | | // Set the granularity with which connections are checked for keepalive. |
94 | | MessengerBuilder &set_coarse_timer_granularity(CoarseMonoClock::Duration granularity); |
95 | | |
96 | | // Set metric entity for use by RPC systems. |
97 | | MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity); |
98 | | |
99 | | // Uses the given connection type to handle the incoming connections. |
100 | 0 | MessengerBuilder &UseConnectionContextFactory(const ConnectionContextFactoryPtr& factory) { |
101 | 0 | connection_context_factory_ = factory; |
102 | 0 | return *this; |
103 | 0 | } |
104 | | |
105 | | MessengerBuilder &UseDefaultConnectionContextFactory( |
106 | | const std::shared_ptr<MemTracker>& parent_mem_tracker = nullptr); |
107 | | |
108 | | MessengerBuilder &AddStreamFactory(const Protocol* protocol, StreamFactoryPtr factory); |
109 | | |
110 | 16.2k | MessengerBuilder &SetListenProtocol(const Protocol* protocol) { |
111 | 16.2k | listen_protocol_ = protocol; |
112 | 16.2k | return *this; |
113 | 16.2k | } |
114 | | |
115 | | template <class ContextType> |
116 | | MessengerBuilder &CreateConnectionContextFactory( |
117 | 202 | size_t memory_limit, const std::shared_ptr<MemTracker>& parent_mem_tracker = nullptr) { |
118 | 202 | if (parent_mem_tracker) { |
119 | 202 | last_used_parent_mem_tracker_ = parent_mem_tracker; |
120 | 202 | } |
121 | 202 | connection_context_factory_ = |
122 | 202 | std::make_shared<ConnectionContextFactoryImpl<ContextType>>( |
123 | 202 | memory_limit, parent_mem_tracker); |
124 | 202 | return *this; |
125 | 202 | } |
126 | | |
127 | | Result<std::unique_ptr<Messenger>> Build(); |
128 | | |
129 | 188k | CoarseMonoClock::Duration connection_keepalive_time() const { |
130 | 188k | return connection_keepalive_time_; |
131 | 188k | } |
132 | | |
133 | 188k | CoarseMonoClock::Duration coarse_timer_granularity() const { |
134 | 188k | return coarse_timer_granularity_; |
135 | 188k | } |
136 | | |
137 | 0 | const ConnectionContextFactoryPtr& connection_context_factory() const { |
138 | 0 | return connection_context_factory_; |
139 | 0 | } |
140 | | |
141 | 0 | MessengerBuilder& set_thread_pool_options(size_t queue_limit, size_t workers_limit) { |
142 | 0 | queue_limit_ = queue_limit; |
143 | 0 | workers_limit_ = workers_limit; |
144 | 0 | return *this; |
145 | 0 | } |
146 | | |
147 | 8 | MessengerBuilder& set_num_connections_to_server(int value) { |
148 | 8 | num_connections_to_server_ = value; |
149 | 8 | return *this; |
150 | 8 | } |
151 | | |
152 | 188k | int num_connections_to_server() const { |
153 | 188k | return num_connections_to_server_; |
154 | 188k | } |
155 | | |
156 | 16.1k | const std::shared_ptr<MemTracker>& last_used_parent_mem_tracker() const { |
157 | 16.1k | return last_used_parent_mem_tracker_; |
158 | 16.1k | } |
159 | | |
160 | | private: |
161 | | const std::string name_; |
162 | | CoarseMonoClock::Duration connection_keepalive_time_; |
163 | | int num_reactors_ = 4; |
164 | | CoarseMonoClock::Duration coarse_timer_granularity_ = std::chrono::milliseconds(100); |
165 | | scoped_refptr<MetricEntity> metric_entity_; |
166 | | ConnectionContextFactoryPtr connection_context_factory_; |
167 | | StreamFactories stream_factories_; |
168 | | const Protocol* listen_protocol_; |
169 | | size_t queue_limit_; |
170 | | size_t workers_limit_; |
171 | | int num_connections_to_server_; |
172 | | std::shared_ptr<MemTracker> last_used_parent_mem_tracker_; |
173 | | }; |
174 | | |
175 | | // A Messenger is a container for the reactor threads which run event loops for the RPC services. |
176 | | // If the process is a server, a Messenger will also have an Acceptor. In this case, calls received |
177 | | // over the connection are enqueued into the messenger's service_queue for processing by a |
178 | | // ServicePool. |
179 | | // |
180 | | // Users do not typically interact with the Messenger directly except to create one as a singleton, |
181 | | // and then make calls using Proxy objects. |
182 | | // |
183 | | // See rpc-test.cc and rpc-bench.cc for example usages. |
184 | | class Messenger : public ProxyContext { |
185 | | public: |
186 | | friend class MessengerBuilder; |
187 | | friend class Proxy; |
188 | | friend class Reactor; |
189 | | typedef std::unordered_map<std::string, scoped_refptr<RpcService>> RpcServicesMap; |
190 | | |
191 | | ~Messenger(); |
192 | | |
193 | | // Stop all communication and prevent further use. Should be called explicitly by messenger owner. |
194 | | void Shutdown(); |
195 | | |
196 | | // Setup messenger to listen connections on given address. |
197 | | CHECKED_STATUS ListenAddress( |
198 | | ConnectionContextFactoryPtr factory, const Endpoint& accept_endpoint, |
199 | | Endpoint* bound_endpoint = nullptr); |
200 | | |
201 | | // Stop accepting connections. |
202 | | void ShutdownAcceptor(); |
203 | | |
204 | | // Start accepting connections. |
205 | | CHECKED_STATUS StartAcceptor(); |
206 | | |
207 | | // Register a new RpcService to handle inbound requests. |
208 | | CHECKED_STATUS RegisterService(const std::string& service_name, const RpcServicePtr& service); |
209 | | |
210 | | void UnregisterAllServices(); |
211 | | |
212 | | void ShutdownThreadPools(); |
213 | | |
214 | | // Queue a call for transmission. This will pick the appropriate reactor, and enqueue a task on |
215 | | // that reactor to assign and send the call. |
216 | | void QueueOutboundCall(OutboundCallPtr call) override; |
217 | | |
218 | | // Invoke the RpcService to handle a call directly. |
219 | | void Handle(InboundCallPtr call, Queue queue) override; |
220 | | |
221 | 145k | const Protocol* DefaultProtocol() override { return listen_protocol_; } |
222 | | |
223 | 18.1M | rpc::ThreadPool& CallbackThreadPool(ServicePriority priority) override { |
224 | 18.1M | return ThreadPool(priority); |
225 | 18.1M | } |
226 | | |
227 | | CHECKED_STATUS QueueEventOnAllReactors( |
228 | | ServerEventListPtr server_event, const SourceLocation& source_location); |
229 | | |
230 | | // Dump the current RPCs into the given protobuf. |
231 | | CHECKED_STATUS DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, |
232 | | DumpRunningRpcsResponsePB* resp); |
233 | | |
234 | | void RemoveScheduledTask(ScheduledTaskId task_id); |
235 | | |
236 | | // This method will run 'func' with an ABORT status argument. It's not guaranteed that the task |
237 | | // will cancel because TimerHandler could run before this method. |
238 | | void AbortOnReactor(ScheduledTaskId task_id); |
239 | | |
240 | | // Run 'func' on a reactor thread after 'when' time elapses. |
241 | | // |
242 | | // The status argument conveys whether 'func' was run correctly (i.e. after the elapsed time) or |
243 | | // not. |
244 | | MUST_USE_RESULT ScheduledTaskId ScheduleOnReactor( |
245 | | StatusFunctor func, MonoDelta when, |
246 | | const SourceLocation& source_location, |
247 | | rpc::Messenger* msgr); |
248 | | |
249 | 375k | std::string name() const { |
250 | 375k | return name_; |
251 | 375k | } |
252 | | |
253 | | scoped_refptr<MetricEntity> metric_entity() const override; |
254 | | |
255 | | RpcServicePtr TEST_rpc_service(const std::string& service_name) const; |
256 | | |
257 | | size_t max_concurrent_requests() const; |
258 | | |
259 | 135k | const IpAddress& outbound_address_v4() const { return outbound_address_v4_; } |
260 | 1 | const IpAddress& outbound_address_v6() const { return outbound_address_v6_; } |
261 | | |
262 | | void BreakConnectivityWith(const IpAddress& address); |
263 | | void BreakConnectivityTo(const IpAddress& address); |
264 | | void BreakConnectivityFrom(const IpAddress& address); |
265 | | void RestoreConnectivityWith(const IpAddress& address); |
266 | | void RestoreConnectivityTo(const IpAddress& address); |
267 | | void RestoreConnectivityFrom(const IpAddress& address); |
268 | | |
269 | 3.75M | Scheduler& scheduler() { |
270 | 3.75M | return scheduler_; |
271 | 3.75M | } |
272 | | |
273 | 909 | IoService& io_service() override { |
274 | 909 | return io_thread_pool_.io_service(); |
275 | 909 | } |
276 | | |
277 | 259k | DnsResolver& resolver() override { |
278 | 259k | return *resolver_; |
279 | 259k | } |
280 | | |
281 | | rpc::ThreadPool& ThreadPool(ServicePriority priority = ServicePriority::kNormal); |
282 | | |
283 | 26.5M | const std::shared_ptr<RpcMetrics>& rpc_metrics() override { |
284 | 26.5M | return rpc_metrics_; |
285 | 26.5M | } |
286 | | |
287 | | const std::shared_ptr<MemTracker>& parent_mem_tracker() override; |
288 | | |
289 | 144k | int num_connections_to_server() const override { |
290 | 144k | return num_connections_to_server_; |
291 | 144k | } |
292 | | |
293 | 0 | size_t num_reactors() const { |
294 | 0 | return reactors_.size(); |
295 | 0 | } |
296 | | |
297 | | // Use specified IP address as base address for outbound connections from messenger. |
298 | 2.37k | void TEST_SetOutboundIpBase(const IpAddress& value) { |
299 | 2.37k | test_outbound_ip_base_ = value; |
300 | 2.37k | has_outbound_ip_base_.store(true, std::memory_order_release); |
301 | 2.37k | } |
302 | | |
303 | | bool TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote); |
304 | | |
305 | | CHECKED_STATUS TEST_GetReactorMetrics(size_t reactor_idx, ReactorMetrics* metrics); |
306 | | |
307 | | private: |
308 | | friend class DelayedTask; |
309 | | |
310 | | explicit Messenger(const MessengerBuilder &bld); |
311 | | |
312 | | Reactor* RemoteToReactor(const Endpoint& remote, uint32_t idx = 0); |
313 | | CHECKED_STATUS Init(); |
314 | | |
315 | | void BreakConnectivity(const IpAddress& address, bool incoming, bool outgoing); |
316 | | void RestoreConnectivity(const IpAddress& address, bool incoming, bool outgoing); |
317 | | |
318 | | // Take ownership of the socket via Socket::Release |
319 | | void RegisterInboundSocket( |
320 | | const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote); |
321 | | |
322 | | bool TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote); |
323 | | |
324 | | const std::string name_; |
325 | | |
326 | | ConnectionContextFactoryPtr connection_context_factory_; |
327 | | |
328 | | const StreamFactories stream_factories_; |
329 | | |
330 | | const Protocol* const listen_protocol_; |
331 | | |
332 | | // Protects closing_, acceptor_pools_. |
333 | | mutable percpu_rwlock lock_; |
334 | | |
335 | | bool closing_ = false; |
336 | | |
337 | | // RPC services that handle inbound requests. |
338 | | mutable RWOperationCounter rpc_services_counter_; |
339 | | std::unordered_multimap<std::string, RpcServicePtr> rpc_services_; |
340 | | RpcEndpointMap rpc_endpoints_; |
341 | | |
342 | | std::vector<std::unique_ptr<Reactor>> reactors_; |
343 | | |
344 | | const scoped_refptr<MetricEntity> metric_entity_; |
345 | | const scoped_refptr<Histogram> outgoing_queue_time_; |
346 | | |
347 | | // Acceptor which is listening on behalf of this messenger. |
348 | | std::unique_ptr<Acceptor> acceptor_; |
349 | | IpAddress outbound_address_v4_; |
350 | | IpAddress outbound_address_v6_; |
351 | | |
352 | | // Id that will be assigned to the next task that is scheduled on the reactor. |
353 | | std::atomic<ScheduledTaskId> next_task_id_ = {1}; |
354 | | std::atomic<uint64_t> num_connections_accepted_ = {0}; |
355 | | |
356 | | std::mutex mutex_scheduled_tasks_; |
357 | | |
358 | | std::unordered_map<ScheduledTaskId, std::shared_ptr<DelayedTask>> scheduled_tasks_; |
359 | | |
360 | | // Flag that we have at least on address with artificially broken connectivity. |
361 | | std::atomic<bool> has_broken_connectivity_ = {false}; |
362 | | |
363 | | // Set of addresses with artificially broken connectivity. |
364 | | std::unordered_set<IpAddress, IpAddressHash> broken_connectivity_from_; |
365 | | std::unordered_set<IpAddress, IpAddressHash> broken_connectivity_to_; |
366 | | |
367 | | IoThreadPool io_thread_pool_; |
368 | | Scheduler scheduler_; |
369 | | |
370 | | // Thread pools that are used by services running in this messenger. |
371 | | std::unique_ptr<rpc::ThreadPool> normal_thread_pool_; |
372 | | |
373 | | std::mutex mutex_high_priority_thread_pool_; |
374 | | |
375 | | // This could be used for high-priority services such as Consensus. |
376 | | AtomicUniquePtr<rpc::ThreadPool> high_priority_thread_pool_; |
377 | | |
378 | | std::unique_ptr<DnsResolver> resolver_; |
379 | | |
380 | | std::shared_ptr<RpcMetrics> rpc_metrics_; |
381 | | |
382 | | // Use this IP address as base address for outbound connections from messenger. |
383 | | IpAddress test_outbound_ip_base_; |
384 | | std::atomic<bool> has_outbound_ip_base_{false}; |
385 | | |
386 | | // Number of outbound connections to create per each destination server address. |
387 | | int num_connections_to_server_; |
388 | | |
389 | | #ifndef NDEBUG |
390 | | // This is so we can log where exactly a Messenger was instantiated to better diagnose a CHECK |
391 | | // failure in the destructor (ENG-2838). This can be removed when that is fixed. |
392 | | StackTrace creation_stack_trace_; |
393 | | #endif |
394 | | DISALLOW_COPY_AND_ASSIGN(Messenger); |
395 | | }; |
396 | | |
397 | | } // namespace rpc |
398 | | } // namespace yb |
399 | | |
400 | | #endif // YB_RPC_MESSENGER_H_ |