YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/messenger.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_RPC_MESSENGER_H_
33
#define YB_RPC_MESSENGER_H_
34
35
#include <stdint.h>
36
37
#include <atomic>
38
#include <list>
39
#include <memory>
40
#include <string>
41
#include <unordered_map>
42
#include <unordered_set>
43
#include <vector>
44
45
#include <gtest/gtest_prod.h>
46
47
#include "yb/gutil/ref_counted.h"
48
49
#include "yb/rpc/rpc_fwd.h"
50
#include "yb/rpc/io_thread_pool.h"
51
#include "yb/rpc/proxy_context.h"
52
#include "yb/rpc/scheduler.h"
53
54
#include "yb/util/metrics_fwd.h"
55
#include "yb/util/status_fwd.h"
56
#include "yb/util/async_util.h"
57
#include "yb/util/atomic.h"
58
#include "yb/util/locks.h"
59
#include "yb/util/monotime.h"
60
#include "yb/util/net/sockaddr.h"
61
#include "yb/util/operation_counter.h"
62
#include "yb/util/stack_trace.h"
63
64
namespace yb {
65
66
class MemTracker;
67
class Socket;
68
struct SourceLocation;
69
70
namespace rpc {
71
72
template <class ContextType>
73
class ConnectionContextFactoryImpl;
74
75
typedef std::unordered_map<const Protocol*, StreamFactoryPtr> StreamFactories;
76
77
// Used to construct a Messenger.
78
class MessengerBuilder {
79
 public:
80
  friend class Messenger;
81
82
  explicit MessengerBuilder(std::string name);
83
  ~MessengerBuilder();
84
85
  MessengerBuilder(const MessengerBuilder&);
86
87
  // Set the length of time we will keep a TCP connection will alive with no traffic.
88
  MessengerBuilder &set_connection_keepalive_time(CoarseMonoClock::Duration keepalive);
89
90
  // Set the number of reactor threads that will be used for sending and receiving.
91
  MessengerBuilder &set_num_reactors(int num_reactors);
92
93
  // Set the granularity with which connections are checked for keepalive.
94
  MessengerBuilder &set_coarse_timer_granularity(CoarseMonoClock::Duration granularity);
95
96
  // Set metric entity for use by RPC systems.
97
  MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity);
98
99
  // Uses the given connection type to handle the incoming connections.
100
0
  MessengerBuilder &UseConnectionContextFactory(const ConnectionContextFactoryPtr& factory) {
101
0
    connection_context_factory_ = factory;
102
0
    return *this;
103
0
  }
104
105
  MessengerBuilder &UseDefaultConnectionContextFactory(
106
      const std::shared_ptr<MemTracker>& parent_mem_tracker = nullptr);
107
108
  MessengerBuilder &AddStreamFactory(const Protocol* protocol, StreamFactoryPtr factory);
109
110
23.7k
  MessengerBuilder &SetListenProtocol(const Protocol* protocol) {
111
23.7k
    listen_protocol_ = protocol;
112
23.7k
    return *this;
113
23.7k
  }
114
115
  template <class ContextType>
116
  MessengerBuilder &CreateConnectionContextFactory(
117
202
      size_t memory_limit, const std::shared_ptr<MemTracker>& parent_mem_tracker = nullptr) {
118
202
    if (parent_mem_tracker) {
119
202
      last_used_parent_mem_tracker_ = parent_mem_tracker;
120
202
    }
121
202
    connection_context_factory_ =
122
202
        std::make_shared<ConnectionContextFactoryImpl<ContextType>>(
123
202
            memory_limit, parent_mem_tracker);
124
202
    return *this;
125
202
  }
126
127
  Result<std::unique_ptr<Messenger>> Build();
128
129
289k
  CoarseMonoClock::Duration connection_keepalive_time() const {
130
289k
    return connection_keepalive_time_;
131
289k
  }
132
133
289k
  CoarseMonoClock::Duration coarse_timer_granularity() const {
134
289k
    return coarse_timer_granularity_;
135
289k
  }
136
137
0
  const ConnectionContextFactoryPtr& connection_context_factory() const {
138
0
    return connection_context_factory_;
139
0
  }
140
141
0
  MessengerBuilder& set_thread_pool_options(size_t queue_limit, size_t workers_limit) {
142
0
    queue_limit_ = queue_limit;
143
0
    workers_limit_ = workers_limit;
144
0
    return *this;
145
0
  }
146
147
8
  MessengerBuilder& set_num_connections_to_server(int value) {
148
8
    num_connections_to_server_ = value;
149
8
    return *this;
150
8
  }
151
152
289k
  int num_connections_to_server() const {
153
289k
    return num_connections_to_server_;
154
289k
  }
155
156
23.6k
  const std::shared_ptr<MemTracker>& last_used_parent_mem_tracker() const {
157
23.6k
    return last_used_parent_mem_tracker_;
158
23.6k
  }
159
160
 private:
161
  const std::string name_;
162
  CoarseMonoClock::Duration connection_keepalive_time_;
163
  int num_reactors_ = 4;
164
  CoarseMonoClock::Duration coarse_timer_granularity_ = std::chrono::milliseconds(100);
165
  scoped_refptr<MetricEntity> metric_entity_;
166
  ConnectionContextFactoryPtr connection_context_factory_;
167
  StreamFactories stream_factories_;
168
  const Protocol* listen_protocol_;
169
  size_t queue_limit_;
170
  size_t workers_limit_;
171
  int num_connections_to_server_;
172
  std::shared_ptr<MemTracker> last_used_parent_mem_tracker_;
173
};
174
175
// A Messenger is a container for the reactor threads which run event loops for the RPC services.
176
// If the process is a server, a Messenger will also have an Acceptor.  In this case, calls received
177
// over the connection are enqueued into the messenger's service_queue for processing by a
178
// ServicePool.
179
//
180
// Users do not typically interact with the Messenger directly except to create one as a singleton,
181
// and then make calls using Proxy objects.
182
//
183
// See rpc-test.cc and rpc-bench.cc for example usages.
184
class Messenger : public ProxyContext {
185
 public:
186
  friend class MessengerBuilder;
187
  friend class Proxy;
188
  friend class Reactor;
189
  typedef std::unordered_map<std::string, scoped_refptr<RpcService>> RpcServicesMap;
190
191
  ~Messenger();
192
193
  // Stop all communication and prevent further use. Should be called explicitly by messenger owner.
194
  void Shutdown();
195
196
  // Setup messenger to listen connections on given address.
197
  CHECKED_STATUS ListenAddress(
198
      ConnectionContextFactoryPtr factory, const Endpoint& accept_endpoint,
199
      Endpoint* bound_endpoint = nullptr);
200
201
  // Stop accepting connections.
202
  void ShutdownAcceptor();
203
204
  // Start accepting connections.
205
  CHECKED_STATUS StartAcceptor();
206
207
  // Register a new RpcService to handle inbound requests.
208
  CHECKED_STATUS RegisterService(const std::string& service_name, const RpcServicePtr& service);
209
210
  void UnregisterAllServices();
211
212
  void ShutdownThreadPools();
213
214
  // Queue a call for transmission. This will pick the appropriate reactor, and enqueue a task on
215
  // that reactor to assign and send the call.
216
  void QueueOutboundCall(OutboundCallPtr call) override;
217
218
  // Invoke the RpcService to handle a call directly.
219
  void Handle(InboundCallPtr call, Queue queue) override;
220
221
212k
  const Protocol* DefaultProtocol() override { return listen_protocol_; }
222
223
66.7M
  rpc::ThreadPool& CallbackThreadPool(ServicePriority priority) override {
224
66.7M
    return ThreadPool(priority);
225
66.7M
  }
226
227
  CHECKED_STATUS QueueEventOnAllReactors(
228
      ServerEventListPtr server_event, const SourceLocation& source_location);
229
230
  // Dump the current RPCs into the given protobuf.
231
  CHECKED_STATUS DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
232
                                 DumpRunningRpcsResponsePB* resp);
233
234
  void RemoveScheduledTask(ScheduledTaskId task_id);
235
236
  // This method will run 'func' with an ABORT status argument. It's not guaranteed that the task
237
  // will cancel because TimerHandler could run before this method.
238
  void AbortOnReactor(ScheduledTaskId task_id);
239
240
  // Run 'func' on a reactor thread after 'when' time elapses.
241
  //
242
  // The status argument conveys whether 'func' was run correctly (i.e. after the elapsed time) or
243
  // not.
244
  MUST_USE_RESULT ScheduledTaskId ScheduleOnReactor(
245
      StatusFunctor func, MonoDelta when,
246
      const SourceLocation& source_location,
247
      rpc::Messenger* msgr);
248
249
573k
  std::string name() const {
250
573k
    return name_;
251
573k
  }
252
253
  scoped_refptr<MetricEntity> metric_entity() const override;
254
255
  RpcServicePtr TEST_rpc_service(const std::string& service_name) const;
256
257
  size_t max_concurrent_requests() const;
258
259
2.46M
  const IpAddress& outbound_address_v4() const { return outbound_address_v4_; }
260
1
  const IpAddress& outbound_address_v6() const { return outbound_address_v6_; }
261
262
  void BreakConnectivityWith(const IpAddress& address);
263
  void BreakConnectivityTo(const IpAddress& address);
264
  void BreakConnectivityFrom(const IpAddress& address);
265
  void RestoreConnectivityWith(const IpAddress& address);
266
  void RestoreConnectivityTo(const IpAddress& address);
267
  void RestoreConnectivityFrom(const IpAddress& address);
268
269
19.7M
  Scheduler& scheduler() {
270
19.7M
    return scheduler_;
271
19.7M
  }
272
273
1.92k
  IoService& io_service() override {
274
1.92k
    return io_thread_pool_.io_service();
275
1.92k
  }
276
277
330k
  DnsResolver& resolver() override {
278
330k
    return *resolver_;
279
330k
  }
280
281
  rpc::ThreadPool& ThreadPool(ServicePriority priority = ServicePriority::kNormal);
282
283
90.2M
  const std::shared_ptr<RpcMetrics>& rpc_metrics() override {
284
90.2M
    return rpc_metrics_;
285
90.2M
  }
286
287
  const std::shared_ptr<MemTracker>& parent_mem_tracker() override;
288
289
212k
  int num_connections_to_server() const override {
290
212k
    return num_connections_to_server_;
291
212k
  }
292
293
0
  size_t num_reactors() const {
294
0
    return reactors_.size();
295
0
  }
296
297
  // Use specified IP address as base address for outbound connections from messenger.
298
3.30k
  void TEST_SetOutboundIpBase(const IpAddress& value) {
299
3.30k
    test_outbound_ip_base_ = value;
300
3.30k
    has_outbound_ip_base_.store(true, std::memory_order_release);
301
3.30k
  }
302
303
  bool TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote);
304
305
  CHECKED_STATUS TEST_GetReactorMetrics(size_t reactor_idx, ReactorMetrics* metrics);
306
307
 private:
308
  friend class DelayedTask;
309
310
  explicit Messenger(const MessengerBuilder &bld);
311
312
  Reactor* RemoteToReactor(const Endpoint& remote, uint32_t idx = 0);
313
  CHECKED_STATUS Init();
314
315
  void BreakConnectivity(const IpAddress& address, bool incoming, bool outgoing);
316
  void RestoreConnectivity(const IpAddress& address, bool incoming, bool outgoing);
317
318
  // Take ownership of the socket via Socket::Release
319
  void RegisterInboundSocket(
320
      const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote);
321
322
  bool TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote);
323
324
  const std::string name_;
325
326
  ConnectionContextFactoryPtr connection_context_factory_;
327
328
  const StreamFactories stream_factories_;
329
330
  const Protocol* const listen_protocol_;
331
332
  // Protects closing_, acceptor_pools_.
333
  mutable percpu_rwlock lock_;
334
335
  bool closing_ = false;
336
337
  // RPC services that handle inbound requests.
338
  mutable RWOperationCounter rpc_services_counter_;
339
  std::unordered_multimap<std::string, RpcServicePtr> rpc_services_;
340
  RpcEndpointMap rpc_endpoints_;
341
342
  std::vector<std::unique_ptr<Reactor>> reactors_;
343
344
  const scoped_refptr<MetricEntity> metric_entity_;
345
  const scoped_refptr<Histogram> outgoing_queue_time_;
346
347
  // Acceptor which is listening on behalf of this messenger.
348
  std::unique_ptr<Acceptor> acceptor_;
349
  IpAddress outbound_address_v4_;
350
  IpAddress outbound_address_v6_;
351
352
  // Id that will be assigned to the next task that is scheduled on the reactor.
353
  std::atomic<ScheduledTaskId> next_task_id_ = {1};
354
  std::atomic<uint64_t> num_connections_accepted_ = {0};
355
356
  std::mutex mutex_scheduled_tasks_;
357
358
  std::unordered_map<ScheduledTaskId, std::shared_ptr<DelayedTask>> scheduled_tasks_;
359
360
  // Flag that we have at least on address with artificially broken connectivity.
361
  std::atomic<bool> has_broken_connectivity_ = {false};
362
363
  // Set of addresses with artificially broken connectivity.
364
  std::unordered_set<IpAddress, IpAddressHash> broken_connectivity_from_;
365
  std::unordered_set<IpAddress, IpAddressHash> broken_connectivity_to_;
366
367
  IoThreadPool io_thread_pool_;
368
  Scheduler scheduler_;
369
370
  // Thread pools that are used by services running in this messenger.
371
  std::unique_ptr<rpc::ThreadPool> normal_thread_pool_;
372
373
  std::mutex mutex_high_priority_thread_pool_;
374
375
  // This could be used for high-priority services such as Consensus.
376
  AtomicUniquePtr<rpc::ThreadPool> high_priority_thread_pool_;
377
378
  std::unique_ptr<DnsResolver> resolver_;
379
380
  std::shared_ptr<RpcMetrics> rpc_metrics_;
381
382
  // Use this IP address as base address for outbound connections from messenger.
383
  IpAddress test_outbound_ip_base_;
384
  std::atomic<bool> has_outbound_ip_base_{false};
385
386
  // Number of outbound connections to create per each destination server address.
387
  int num_connections_to_server_;
388
389
#ifndef NDEBUG
390
  // This is so we can log where exactly a Messenger was instantiated to better diagnose a CHECK
391
  // failure in the destructor (ENG-2838). This can be removed when that is fixed.
392
  StackTrace creation_stack_trace_;
393
#endif
394
  DISALLOW_COPY_AND_ASSIGN(Messenger);
395
};
396
397
}  // namespace rpc
398
}  // namespace yb
399
400
#endif  // YB_RPC_MESSENGER_H_