YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/messenger.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/messenger.h"
34
35
#include <sys/types.h>
36
37
#include <list>
38
#include <mutex>
39
#include <set>
40
#include <string>
41
#include <thread>
42
43
#include <gflags/gflags.h>
44
#include <glog/logging.h>
45
46
#include "yb/gutil/map-util.h"
47
#include "yb/gutil/stl_util.h"
48
#include "yb/gutil/strings/substitute.h"
49
50
#include "yb/rpc/acceptor.h"
51
#include "yb/rpc/constants.h"
52
#include "yb/rpc/reactor.h"
53
#include "yb/rpc/rpc_header.pb.h"
54
#include "yb/rpc/rpc_metrics.h"
55
#include "yb/rpc/rpc_service.h"
56
#include "yb/rpc/rpc_util.h"
57
#include "yb/rpc/tcp_stream.h"
58
#include "yb/rpc/yb_rpc.h"
59
60
#include "yb/util/debug-util.h"
61
#include "yb/util/errno.h"
62
#include "yb/util/flag_tags.h"
63
#include "yb/util/format.h"
64
#include "yb/util/metrics.h"
65
#include "yb/util/monotime.h"
66
#include "yb/util/net/dns_resolver.h"
67
#include "yb/util/net/socket.h"
68
#include "yb/util/result.h"
69
#include "yb/util/size_literals.h"
70
#include "yb/util/status.h"
71
#include "yb/util/status_format.h"
72
#include "yb/util/status_log.h"
73
#include "yb/util/thread_restrictions.h"
74
#include "yb/util/trace.h"
75
76
using namespace std::literals;
77
using namespace std::placeholders;
78
using namespace yb::size_literals;
79
80
using std::string;
81
using std::shared_ptr;
82
using strings::Substitute;
83
84
DECLARE_int32(num_connections_to_server);
85
DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
86
             "If an RPC connection from a client is idle for this amount of time, the server "
87
             "will disconnect the client. Setting flag to 0 disables this clean up.");
88
TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
89
DEFINE_uint64(io_thread_pool_size, 4, "Size of allocated IO Thread Pool.");
90
91
DEFINE_int64(outbound_rpc_memory_limit, 0, "Outbound RPC memory limit");
92
93
DEFINE_int32(rpc_queue_limit, 10000, "Queue limit for rpc server");
94
DEFINE_int32(rpc_workers_limit, 1024, "Workers limit for rpc server");
95
96
DEFINE_int32(socket_receive_buffer_size, 0, "Socket receive buffer size, 0 to use default");
97
98
namespace yb {
99
namespace rpc {
100
101
class Messenger;
102
class ServerBuilder;
103
104
// ------------------------------------------------------------------------------------------------
105
// MessengerBuilder
106
// ------------------------------------------------------------------------------------------------
107
108
MessengerBuilder::MessengerBuilder(std::string name)
109
    : name_(std::move(name)),
110
      connection_keepalive_time_(FLAGS_rpc_default_keepalive_time_ms * 1ms),
111
      coarse_timer_granularity_(100ms),
112
      listen_protocol_(TcpStream::StaticProtocol()),
113
      queue_limit_(FLAGS_rpc_queue_limit),
114
      workers_limit_(FLAGS_rpc_workers_limit),
115
35.3k
      num_connections_to_server_(GetAtomicFlag(&FLAGS_num_connections_to_server)) {
116
35.3k
  AddStreamFactory(TcpStream::StaticProtocol(), TcpStream::Factory());
117
35.3k
}
118
119
34.6k
MessengerBuilder::~MessengerBuilder() = default;
120
0
MessengerBuilder::MessengerBuilder(const MessengerBuilder&) = default;
121
122
MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(
123
26.5k
    CoarseMonoClock::Duration keepalive) {
124
26.5k
  connection_keepalive_time_ = keepalive;
125
26.5k
  return *this;
126
26.5k
}
127
128
33.7k
MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) {
129
33.7k
  num_reactors_ = num_reactors;
130
33.7k
  return *this;
131
33.7k
}
132
133
MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity(
134
202
    CoarseMonoClock::Duration granularity) {
135
202
  coarse_timer_granularity_ = granularity;
136
202
  return *this;
137
202
}
138
139
MessengerBuilder &MessengerBuilder::set_metric_entity(
140
33.3k
    const scoped_refptr<MetricEntity>& metric_entity) {
141
33.3k
  metric_entity_ = metric_entity;
142
33.3k
  return *this;
143
33.3k
}
144
145
35.3k
Result<std::unique_ptr<Messenger>> MessengerBuilder::Build() {
146
35.3k
  if (!connection_context_factory_) {
147
2.01k
    UseDefaultConnectionContextFactory();
148
2.01k
  }
149
35.3k
  std::unique_ptr<Messenger> messenger(new Messenger(*this));
150
35.3k
  RETURN_NOT_OK(messenger->Init());
151
152
35.3k
  return messenger;
153
35.3k
}
154
155
MessengerBuilder &MessengerBuilder::AddStreamFactory(
156
59.0k
    const Protocol* protocol, StreamFactoryPtr factory) {
157
59.0k
  auto p = stream_factories_.emplace(protocol, std::move(factory));
158
18.4E
  LOG_IF(DFATAL, !p.second) << "Duplicate stream factory: " << protocol->ToString();
159
59.0k
  return *this;
160
59.0k
}
161
162
MessengerBuilder &MessengerBuilder::UseDefaultConnectionContextFactory(
163
35.1k
    const std::shared_ptr<MemTracker>& parent_mem_tracker) {
164
35.1k
  if (parent_mem_tracker) {
165
32.4k
    last_used_parent_mem_tracker_ = parent_mem_tracker;
166
32.4k
  }
167
35.1k
  connection_context_factory_ = rpc::CreateConnectionContextFactory<YBOutboundConnectionContext>(
168
35.1k
      FLAGS_outbound_rpc_memory_limit, parent_mem_tracker);
169
35.1k
  return *this;
170
35.1k
}
171
172
// ------------------------------------------------------------------------------------------------
173
// Messenger
174
// ------------------------------------------------------------------------------------------------
175
176
9.02k
void Messenger::Shutdown() {
177
9.02k
  ShutdownThreadPools();
178
9.02k
  ShutdownAcceptor();
179
9.02k
  UnregisterAllServices();
180
181
  // Since we're shutting down, it's OK to block.
182
9.02k
  ThreadRestrictions::ScopedAllowWait allow_wait;
183
184
9.02k
  std::vector<Reactor*> reactors;
185
9.02k
  std::unique_ptr<Acceptor> acceptor;
186
9.02k
  {
187
9.02k
    std::lock_guard<percpu_rwlock> guard(lock_);
188
9.02k
    if (closing_) {
189
456
      return;
190
456
    }
191
8.56k
    VLOG
(1) << "shutting down messenger " << name_33
;
192
8.56k
    closing_ = true;
193
194
8.56k
    DCHECK
(rpc_services_.empty()) << "Unregister RPC services before shutting down Messenger"47
;
195
8.56k
    rpc_services_.clear();
196
197
8.56k
    acceptor.swap(acceptor_);
198
199
24.7k
    for (const auto& reactor : reactors_) {
200
24.7k
      reactors.push_back(reactor.get());
201
24.7k
    }
202
8.56k
  }
203
204
8.56k
  if (acceptor) {
205
0
    acceptor->Shutdown();
206
0
  }
207
208
24.7k
  for (auto* reactor : reactors) {
209
24.7k
    reactor->Shutdown();
210
24.7k
  }
211
212
8.56k
  scheduler_.Shutdown();
213
8.56k
  io_thread_pool_.Shutdown();
214
215
24.7k
  for (auto* reactor : reactors) {
216
24.7k
    reactor->Join();
217
24.7k
  }
218
219
8.56k
  io_thread_pool_.Join();
220
221
8.56k
  {
222
8.56k
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
223
8.56k
    LOG_IF(DFATAL, !scheduled_tasks_.empty())
224
17
        << "Scheduled tasks is not empty after messenger shutdown: "
225
17
        << yb::ToString(scheduled_tasks_);
226
8.56k
  }
227
8.56k
}
228
229
Status Messenger::ListenAddress(
230
    ConnectionContextFactoryPtr factory, const Endpoint& accept_endpoint,
231
25.9k
    Endpoint* bound_endpoint) {
232
25.9k
  Acceptor* acceptor;
233
25.9k
  {
234
25.9k
    std::lock_guard<percpu_rwlock> guard(lock_);
235
25.9k
    if (!acceptor_) {
236
25.9k
      acceptor_.reset(new Acceptor(
237
25.9k
          metric_entity_, std::bind(&Messenger::RegisterInboundSocket, this, factory, _1, _2)));
238
25.9k
    }
239
25.9k
    auto accept_host = accept_endpoint.address();
240
25.9k
    auto& outbound_address = accept_host.is_v6() ? 
outbound_address_v6_2
241
25.9k
                                                 : 
outbound_address_v4_25.9k
;
242
25.9k
    if (outbound_address.is_unspecified() && !accept_host.is_unspecified()) {
243
25.8k
      outbound_address = accept_host;
244
25.8k
    }
245
25.9k
    acceptor = acceptor_.get();
246
25.9k
  }
247
25.9k
  return acceptor->Listen(accept_endpoint, bound_endpoint);
248
25.9k
}
249
250
25.8k
Status Messenger::StartAcceptor() {
251
208k
  for (const auto& p : rpc_services_) {
252
208k
    p.second->FillEndpoints(&rpc_endpoints_);
253
208k
  }
254
255
25.8k
  std::lock_guard<percpu_rwlock> guard(lock_);
256
25.8k
  if (acceptor_) {
257
25.8k
    return acceptor_->Start();
258
25.8k
  } else {
259
0
    return STATUS(IllegalState, "Trying to start acceptor w/o active addresses");
260
0
  }
261
25.8k
}
262
263
35.2k
void Messenger::BreakConnectivityWith(const IpAddress& address) {
264
35.2k
  BreakConnectivity(address, /* incoming */ true, /* outgoing */ true);
265
35.2k
}
266
0
void Messenger::BreakConnectivityTo(const IpAddress& address) {
267
0
  BreakConnectivity(address, /* incoming */ false, /* outgoing */ true);
268
0
}
269
270
0
void Messenger::BreakConnectivityFrom(const IpAddress& address) {
271
0
  BreakConnectivity(address, /* incoming */ true, /* outgoing */ false);
272
0
}
273
274
35.2k
void Messenger::BreakConnectivity(const IpAddress& address, bool incoming, bool outgoing) {
275
35.2k
  LOG(INFO) << "TEST: Break " << (incoming ? "incoming" : 
""0
) << "/" << (outgoing ? "outgoing" :
""0
)
276
35.2k
            << " connectivity with: " << address;
277
278
35.2k
  boost::optional<CountDownLatch> latch;
279
35.2k
  {
280
35.2k
    std::lock_guard<percpu_rwlock> guard(lock_);
281
35.2k
    if (broken_connectivity_from_.empty() || 
broken_connectivity_to_.empty()33.4k
) {
282
1.76k
      has_broken_connectivity_.store(true, std::memory_order_release);
283
1.76k
    }
284
35.2k
    bool inserted_from = false;
285
35.2k
    if (incoming) {
286
35.2k
      inserted_from = broken_connectivity_from_.insert(address).second;
287
35.2k
    }
288
35.2k
    bool inserted_to = false;
289
35.2k
    if (outgoing) {
290
35.2k
      inserted_to = broken_connectivity_to_.insert(address).second;
291
35.2k
    }
292
35.2k
    if (inserted_from || 
inserted_to0
) {
293
35.2k
      latch.emplace(reactors_.size());
294
352k
      for (const auto& reactor : reactors_) {
295
352k
        auto scheduled = reactor->ScheduleReactorTask(MakeFunctorReactorTask(
296
352k
            [&latch, address, incoming, outgoing](Reactor* reactor) {
297
347k
              if (incoming) {
298
347k
                reactor->DropIncomingWithRemoteAddress(address);
299
347k
              }
300
348k
              if (
outgoing347k
) {
301
348k
                reactor->DropOutgoingWithRemoteAddress(address);
302
348k
              }
303
347k
              latch->CountDown();
304
347k
            },
305
352k
            SOURCE_LOCATION()));
306
352k
        if (!scheduled) {
307
0
          LOG(INFO) << "Failed to schedule drop connection with: " << address.to_string();
308
0
          latch->CountDown();
309
0
        }
310
352k
      }
311
35.2k
    }
312
35.2k
  }
313
314
35.2k
  if (latch) {
315
35.2k
    latch->Wait();
316
35.2k
  }
317
35.2k
}
318
319
35.2k
void Messenger::RestoreConnectivityWith(const IpAddress& address) {
320
35.2k
  RestoreConnectivity(address, /* incoming */ true, /* outgoing */ true);
321
35.2k
}
322
0
void Messenger::RestoreConnectivityTo(const IpAddress& address) {
323
0
  RestoreConnectivity(address, /* incoming */ false, /* outgoing */ true);
324
0
}
325
326
0
void Messenger::RestoreConnectivityFrom(const IpAddress& address) {
327
0
  RestoreConnectivity(address, /* incoming */ true, /* outgoing */ false);
328
0
}
329
330
35.2k
void Messenger::RestoreConnectivity(const IpAddress& address, bool incoming, bool outgoing) {
331
35.2k
  LOG(INFO) << "TEST: Restore " << (incoming ? "incoming" : 
""0
) << "/"
332
35.2k
            << (outgoing ? "outgoing" : 
""0
) << " connectivity with: " << address;
333
334
35.2k
  std::lock_guard<percpu_rwlock> guard(lock_);
335
35.2k
  if (incoming) {
336
35.2k
    broken_connectivity_from_.erase(address);
337
35.2k
  }
338
35.2k
  if (outgoing) {
339
35.2k
    broken_connectivity_to_.erase(address);
340
35.2k
  }
341
35.2k
  if (broken_connectivity_from_.empty() && 
broken_connectivity_to_.empty()1
) {
342
1
    has_broken_connectivity_.store(false, std::memory_order_release);
343
1
  }
344
35.2k
}
345
346
628k
bool Messenger::TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote) {
347
628k
  if (has_broken_connectivity_.load(std::memory_order_acquire)) {
348
13.9k
    shared_lock<rw_spinlock> guard(lock_.get_lock());
349
13.9k
    return broken_connectivity_from_.count(remote) != 0;
350
13.9k
  }
351
614k
  return false;
352
628k
}
353
354
75.5M
bool Messenger::TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote) {
355
75.5M
  if (has_broken_connectivity_.load(std::memory_order_acquire)) {
356
319k
    shared_lock<rw_spinlock> guard(lock_.get_lock());
357
319k
    return broken_connectivity_to_.count(remote) != 0;
358
319k
  }
359
75.2M
  return false;
360
75.5M
}
361
362
8
Status Messenger::TEST_GetReactorMetrics(size_t reactor_idx, ReactorMetrics* metrics) {
363
8
  if (reactor_idx >= reactors_.size()) {
364
0
    return STATUS_FORMAT(
365
0
        InvalidArgument, "Invalid reactor index $0, should be >=0 and <$1", reactor_idx,
366
0
        reactors_.size());
367
0
  }
368
8
  return reactors_[reactor_idx]->GetMetrics(metrics);
369
8
}
370
371
9.90k
void Messenger::ShutdownAcceptor() {
372
9.90k
  std::unique_ptr<Acceptor> acceptor;
373
9.90k
  {
374
9.90k
    std::lock_guard<percpu_rwlock> guard(lock_);
375
9.90k
    acceptor.swap(acceptor_);
376
9.90k
  }
377
9.90k
  if (acceptor) {
378
363
    acceptor->Shutdown();
379
363
  }
380
9.90k
}
381
382
85.8M
rpc::ThreadPool& Messenger::ThreadPool(ServicePriority priority) {
383
85.8M
  switch (priority) {
384
58.5M
    case ServicePriority::kNormal:
385
58.5M
      return *normal_thread_pool_;
386
27.3M
    case ServicePriority::kHigh:
387
27.3M
      auto high_priority_thread_pool = high_priority_thread_pool_.get();
388
27.3M
      if (high_priority_thread_pool) {
389
27.2M
        return *high_priority_thread_pool;
390
27.2M
      }
391
129k
      std::lock_guard<std::mutex> lock(mutex_high_priority_thread_pool_);
392
129k
      high_priority_thread_pool = high_priority_thread_pool_.get();
393
129k
      if (high_priority_thread_pool) {
394
0
        return *high_priority_thread_pool;
395
0
      }
396
129k
      const ThreadPoolOptions& options = normal_thread_pool_->options();
397
129k
      high_priority_thread_pool_.reset(new rpc::ThreadPool(
398
129k
          name_ + "-high-pri", options.queue_limit, options.max_workers));
399
129k
      return *high_priority_thread_pool_.get();
400
85.8M
  }
401
0
  FATAL_INVALID_ENUM_VALUE(ServicePriority, priority);
402
0
}
403
404
// Register a new RpcService to handle inbound requests.
405
Status Messenger::RegisterService(
406
209k
    const std::string& service_name, const scoped_refptr<RpcService>& service) {
407
209k
  DCHECK(service);
408
209k
  rpc_services_.emplace(service_name, service);
409
209k
  return Status::OK();
410
209k
}
411
412
9.93k
void Messenger::ShutdownThreadPools() {
413
9.93k
  normal_thread_pool_->Shutdown();
414
9.93k
  auto high_priority_thread_pool = high_priority_thread_pool_.get();
415
9.93k
  if (high_priority_thread_pool) {
416
1.47k
    high_priority_thread_pool->Shutdown();
417
1.47k
  }
418
9.93k
}
419
420
9.90k
void Messenger::UnregisterAllServices() {
421
9.90k
  CHECK_OK(rpc_services_counter_.DisableAndWaitForOps(CoarseTimePoint::max(), Stop::kTrue));
422
9.90k
  rpc_services_counter_.UnlockExclusiveOpMutex();
423
424
9.90k
  for (const auto& p : rpc_services_) {
425
2.30k
    p.second->StartShutdown();
426
2.30k
  }
427
9.90k
  for (const auto& p : rpc_services_) {
428
2.25k
    p.second->CompleteShutdown();
429
2.25k
  }
430
431
9.90k
  rpc_services_.clear();
432
9.90k
  rpc_endpoints_.clear();
433
9.90k
}
434
435
class NotifyDisconnectedReactorTask : public ReactorTask {
436
 public:
437
  NotifyDisconnectedReactorTask(OutboundCallPtr call, const SourceLocation& source_location)
438
154k
      : ReactorTask(source_location), call_(std::move(call)) {}
439
440
154k
  void Run(Reactor* reactor) override  {
441
154k
    call_->Transferred(STATUS_FORMAT(
442
154k
        NetworkError, "TEST: Connectivity is broken with $0",
443
154k
        call_->conn_id().remote().address()), nullptr);
444
154k
  }
445
 private:
446
0
  void DoAbort(const Status &abort_status) override {
447
0
    call_->Transferred(abort_status, nullptr);
448
0
  }
449
450
  OutboundCallPtr call_;
451
};
452
453
75.5M
void Messenger::QueueOutboundCall(OutboundCallPtr call) {
454
75.5M
  const auto& remote = call->conn_id().remote();
455
75.5M
  Reactor *reactor = RemoteToReactor(remote, call->conn_id().idx());
456
457
75.5M
  if (TEST_ShouldArtificiallyRejectOutgoingCallsTo(remote.address())) {
458
154k
    VLOG
(1) << "TEST: Rejected connection to " << remote7
;
459
154k
    auto scheduled = reactor->ScheduleReactorTask(std::make_shared<NotifyDisconnectedReactorTask>(
460
154k
        call, SOURCE_LOCATION()));
461
154k
    if (!scheduled) {
462
2
      call->Transferred(STATUS(Aborted, "Reactor is closing"), nullptr /* conn */);
463
2
    }
464
154k
    return;
465
154k
  }
466
467
75.4M
  reactor->QueueOutboundCall(std::move(call));
468
75.4M
}
469
470
90.7M
void Messenger::Handle(InboundCallPtr call, Queue queue) {
471
90.7M
  ScopedRWOperation op(&rpc_services_counter_, CoarseTimePoint::min());
472
90.7M
  if (!op.ok()) {
473
6
    call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, MoveStatus(op));
474
6
    return;
475
6
  }
476
90.7M
  auto it = rpc_endpoints_.find(call->serialized_remote_method());
477
90.7M
  if (it == rpc_endpoints_.end()) {
478
265k
    auto remote_method = ParseRemoteMethod(call->serialized_remote_method());
479
265k
    Status s;
480
265k
    ErrorStatusPB::RpcErrorCodePB error_code = ErrorStatusPB::ERROR_NO_SUCH_SERVICE;
481
265k
    if (
remote_method.ok()265k
) {
482
265k
      if (rpc_services_.count(remote_method->service.ToBuffer())) {
483
2
        error_code = ErrorStatusPB::ERROR_NO_SUCH_METHOD;
484
2
        s = STATUS_FORMAT(
485
2
            InvalidArgument, "Call on service $0 received from $1 with an invalid method name: $2",
486
2
            remote_method->service.ToBuffer(),
487
2
            call->remote_address(),
488
2
            remote_method->method.ToBuffer());
489
265k
      } else {
490
265k
        s = STATUS_FORMAT(
491
265k
            ServiceUnavailable, "Service $0 not registered on $1",
492
265k
            remote_method->service.ToBuffer(), name_);
493
265k
      }
494
18.4E
    } else {
495
18.4E
      s = remote_method.status();
496
18.4E
    }
497
265k
    LOG(WARNING) << s;
498
265k
    call->RespondFailure(error_code, s);
499
265k
    return;
500
265k
  }
501
502
  // The RpcService will respond to the client on success or failure.
503
90.5M
  call->set_method_index(it->second.second);
504
90.5M
  it->second.first->QueueInboundCall(std::move(call));
505
90.5M
}
506
507
0
RpcServicePtr Messenger::TEST_rpc_service(const std::string& service_name) const {
508
0
  ScopedRWOperation op(&rpc_services_counter_, CoarseTimePoint::min());
509
0
  if (!op.ok()) {
510
0
    return nullptr;
511
0
  }
512
0
  auto it = rpc_services_.find(service_name);
513
0
  return it != rpc_services_.end() ? it->second : nullptr;
514
0
}
515
516
425k
const std::shared_ptr<MemTracker>& Messenger::parent_mem_tracker() {
517
425k
  return connection_context_factory_->buffer_tracker();
518
425k
}
519
520
void Messenger::RegisterInboundSocket(
521
628k
    const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote) {
522
628k
  if (TEST_ShouldArtificiallyRejectIncomingCallsFrom(remote.address())) {
523
1
    auto status = new_socket->Close();
524
1
    VLOG(1) << "TEST: Rejected connection from " << remote
525
0
            << ", close status: " << status.ToString();
526
1
    return;
527
1
  }
528
529
628k
  if (FLAGS_socket_receive_buffer_size) {
530
0
    WARN_NOT_OK(new_socket->SetReceiveBufferSize(FLAGS_socket_receive_buffer_size),
531
0
                "Set receive buffer size failed: ");
532
0
  }
533
534
628k
  auto receive_buffer_size = new_socket->GetReceiveBufferSize();
535
628k
  if (!receive_buffer_size.ok()) {
536
0
    LOG(WARNING) << "Register inbound socket failed: " << receive_buffer_size.status();
537
0
    return;
538
0
  }
539
540
628k
  int idx = num_connections_accepted_.fetch_add(1) % num_connections_to_server_;
541
628k
  Reactor *reactor = RemoteToReactor(remote, idx);
542
628k
  reactor->RegisterInboundSocket(new_socket, *receive_buffer_size, remote, factory);
543
628k
}
544
545
Messenger::Messenger(const MessengerBuilder &bld)
546
    : name_(bld.name_),
547
      connection_context_factory_(bld.connection_context_factory_),
548
      stream_factories_(bld.stream_factories_),
549
      listen_protocol_(bld.listen_protocol_),
550
      rpc_services_counter_(name_ + " endpoints"),
551
      metric_entity_(bld.metric_entity_),
552
      io_thread_pool_(name_, FLAGS_io_thread_pool_size),
553
      scheduler_(&io_thread_pool_.io_service()),
554
      normal_thread_pool_(new rpc::ThreadPool(name_, bld.queue_limit_, bld.workers_limit_)),
555
      resolver_(new DnsResolver(&io_thread_pool_.io_service())),
556
      rpc_metrics_(std::make_shared<RpcMetrics>(bld.metric_entity_)),
557
35.3k
      num_connections_to_server_(bld.num_connections_to_server_) {
558
35.3k
#ifndef NDEBUG
559
35.3k
  creation_stack_trace_.Collect(/* skip_frames */ 1);
560
35.3k
#endif
561
35.3k
  VLOG
(1) << "Messenger constructor for " << this << " called at:\n" << GetStackTrace()66
;
562
325k
  for (int i = 0; i < bld.num_reactors_; 
i++289k
) {
563
289k
    reactors_.emplace_back(std::make_unique<Reactor>(this, i, bld));
564
289k
  }
565
  // Make sure skip buffer is allocated before we hit memory limit and try to use it.
566
35.3k
  GetGlobalSkipBuffer();
567
35.3k
}
568
569
8.54k
Messenger::~Messenger() {
570
8.54k
  std::lock_guard<percpu_rwlock> guard(lock_);
571
  // This logging and the corresponding logging in the constructor is here to track down the
572
  // occasional CHECK(closing_) failure below in some tests (ENG-2838).
573
8.54k
  VLOG
(1) << "Messenger destructor for " << this << " called at:\n" << GetStackTrace()5
;
574
8.54k
#ifndef NDEBUG
575
8.54k
  if (!closing_) {
576
0
    LOG(ERROR) << "Messenger created here:\n" << creation_stack_trace_.Symbolize()
577
0
               << "Messenger destructor for " << this << " called at:\n" << GetStackTrace();
578
0
  }
579
8.54k
#endif
580
8.54k
  CHECK
(closing_) << "Should have already shut down"10
;
581
8.54k
  reactors_.clear();
582
8.54k
}
583
584
1
size_t Messenger::max_concurrent_requests() const {
585
1
  return num_connections_to_server_;
586
1
}
587
588
76.1M
Reactor* Messenger::RemoteToReactor(const Endpoint& remote, uint32_t idx) {
589
76.1M
  auto hash_code = hash_value(remote);
590
76.1M
  auto reactor_idx = (hash_code + idx) % reactors_.size();
591
  // This is just a static partitioning; where each connection
592
  // to a remote is assigned to a particular reactor. We could
593
  // get a lot fancier with assigning Sockaddrs to Reactors,
594
  // but this should be good enough.
595
76.1M
  return reactors_[reactor_idx].get();
596
76.1M
}
597
598
34.7k
Status Messenger::Init() {
599
34.7k
  Status status;
600
284k
  for (const auto& r : reactors_) {
601
284k
    RETURN_NOT_OK(r->Init());
602
284k
  }
603
604
34.7k
  return Status::OK();
605
34.7k
}
606
607
Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
608
4
                                  DumpRunningRpcsResponsePB* resp) {
609
4
  shared_lock<rw_spinlock> guard(lock_.get_lock());
610
15
  for (const auto& reactor : reactors_) {
611
15
    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
612
15
  }
613
4
  return Status::OK();
614
4
}
615
616
Status Messenger::QueueEventOnAllReactors(
617
37.0k
    ServerEventListPtr server_event, const SourceLocation& source_location) {
618
37.0k
  shared_lock<rw_spinlock> guard(lock_.get_lock());
619
370k
  for (const auto& reactor : reactors_) {
620
370k
    reactor->QueueEventOnAllConnections(server_event, source_location);
621
370k
  }
622
37.0k
  return Status::OK();
623
37.0k
}
624
625
10.2M
void Messenger::RemoveScheduledTask(ScheduledTaskId id) {
626
10.2M
  CHECK_GT(id, 0);
627
10.2M
  std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
628
10.2M
  scheduled_tasks_.erase(id);
629
10.2M
}
630
631
1.33k
void Messenger::AbortOnReactor(ScheduledTaskId task_id) {
632
1.33k
  DCHECK(!reactors_.empty());
633
1.33k
  CHECK_GT(task_id, 0);
634
635
1.33k
  std::shared_ptr<DelayedTask> task;
636
1.33k
  {
637
1.33k
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
638
1.33k
    auto iter = scheduled_tasks_.find(task_id);
639
1.33k
    if (iter != scheduled_tasks_.end()) {
640
864
      task = iter->second;
641
864
      scheduled_tasks_.erase(iter);
642
864
    }
643
1.33k
  }
644
1.33k
  if (task) {
645
864
    task->AbortTask(STATUS(Aborted, "Task aborted by messenger"));
646
864
  }
647
1.33k
}
648
649
ScheduledTaskId Messenger::ScheduleOnReactor(
650
10.2M
    StatusFunctor func, MonoDelta when, const SourceLocation& source_location, Messenger* msgr) {
651
10.2M
  DCHECK(!reactors_.empty());
652
653
  // If we're already running on a reactor thread, reuse it.
654
10.2M
  Reactor* chosen = nullptr;
655
102M
  for (const auto& r : reactors_) {
656
102M
    if (r->IsCurrentThread()) {
657
3.47k
      chosen = r.get();
658
3.47k
      break;
659
3.47k
    }
660
102M
  }
661
10.2M
  if (chosen == nullptr) {
662
    // Not running on a reactor thread, pick one at random.
663
10.2M
    chosen = reactors_[rand() % reactors_.size()].get();
664
10.2M
  }
665
666
10.2M
  ScheduledTaskId task_id = 0;
667
10.2M
  if (msgr != nullptr) {
668
10.2M
    task_id = next_task_id_.fetch_add(1);
669
10.2M
  }
670
10.2M
  auto task = std::make_shared<DelayedTask>(
671
10.2M
      std::move(func), when, task_id, source_location, msgr);
672
10.2M
  if (msgr != nullptr) {
673
10.2M
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
674
10.2M
    scheduled_tasks_.emplace(task_id, task);
675
10.2M
  }
676
677
10.2M
  if (
chosen->ScheduleReactorTask(task)10.2M
) {
678
10.2M
    return task_id;
679
10.2M
  }
680
681
18.4E
  {
682
18.4E
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
683
18.4E
    scheduled_tasks_.erase(task_id);
684
18.4E
  }
685
686
18.4E
  return kInvalidTaskId;
687
10.2M
}
688
689
8.40M
scoped_refptr<MetricEntity> Messenger::metric_entity() const {
690
8.40M
  return metric_entity_;
691
8.40M
}
692
693
} // namespace rpc
694
} // namespace yb