YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/messenger.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/messenger.h"
34
35
#include <sys/types.h>
36
37
#include <list>
38
#include <mutex>
39
#include <set>
40
#include <string>
41
#include <thread>
42
43
#include <gflags/gflags.h>
44
#include <glog/logging.h>
45
46
#include "yb/gutil/map-util.h"
47
#include "yb/gutil/stl_util.h"
48
#include "yb/gutil/strings/substitute.h"
49
50
#include "yb/rpc/acceptor.h"
51
#include "yb/rpc/constants.h"
52
#include "yb/rpc/reactor.h"
53
#include "yb/rpc/rpc_header.pb.h"
54
#include "yb/rpc/rpc_metrics.h"
55
#include "yb/rpc/rpc_service.h"
56
#include "yb/rpc/rpc_util.h"
57
#include "yb/rpc/tcp_stream.h"
58
#include "yb/rpc/yb_rpc.h"
59
60
#include "yb/util/debug-util.h"
61
#include "yb/util/errno.h"
62
#include "yb/util/flag_tags.h"
63
#include "yb/util/format.h"
64
#include "yb/util/metrics.h"
65
#include "yb/util/monotime.h"
66
#include "yb/util/net/dns_resolver.h"
67
#include "yb/util/net/socket.h"
68
#include "yb/util/result.h"
69
#include "yb/util/size_literals.h"
70
#include "yb/util/status.h"
71
#include "yb/util/status_format.h"
72
#include "yb/util/status_log.h"
73
#include "yb/util/thread_restrictions.h"
74
#include "yb/util/trace.h"
75
76
using namespace std::literals;
77
using namespace std::placeholders;
78
using namespace yb::size_literals;
79
80
using std::string;
81
using std::shared_ptr;
82
using strings::Substitute;
83
84
DECLARE_int32(num_connections_to_server);
85
DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
86
             "If an RPC connection from a client is idle for this amount of time, the server "
87
             "will disconnect the client. Setting flag to 0 disables this clean up.");
88
TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
89
DEFINE_uint64(io_thread_pool_size, 4, "Size of allocated IO Thread Pool.");
90
91
DEFINE_int64(outbound_rpc_memory_limit, 0, "Outbound RPC memory limit");
92
93
DEFINE_int32(rpc_queue_limit, 10000, "Queue limit for rpc server");
94
DEFINE_int32(rpc_workers_limit, 1024, "Workers limit for rpc server");
95
96
DEFINE_int32(socket_receive_buffer_size, 0, "Socket receive buffer size, 0 to use default");
97
98
namespace yb {
99
namespace rpc {
100
101
class Messenger;
102
class ServerBuilder;
103
104
// ------------------------------------------------------------------------------------------------
105
// MessengerBuilder
106
// ------------------------------------------------------------------------------------------------
107
108
MessengerBuilder::MessengerBuilder(std::string name)
109
    : name_(std::move(name)),
110
      connection_keepalive_time_(FLAGS_rpc_default_keepalive_time_ms * 1ms),
111
      coarse_timer_granularity_(100ms),
112
      listen_protocol_(TcpStream::StaticProtocol()),
113
      queue_limit_(FLAGS_rpc_queue_limit),
114
      workers_limit_(FLAGS_rpc_workers_limit),
115
21.3k
      num_connections_to_server_(GetAtomicFlag(&FLAGS_num_connections_to_server)) {
116
21.3k
  AddStreamFactory(TcpStream::StaticProtocol(), TcpStream::Factory());
117
21.3k
}
118
119
21.0k
MessengerBuilder::~MessengerBuilder() = default;
120
0
MessengerBuilder::MessengerBuilder(const MessengerBuilder&) = default;
121
122
MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(
123
17.7k
    CoarseMonoClock::Duration keepalive) {
124
17.7k
  connection_keepalive_time_ = keepalive;
125
17.7k
  return *this;
126
17.7k
}
127
128
19.9k
MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) {
129
19.9k
  num_reactors_ = num_reactors;
130
19.9k
  return *this;
131
19.9k
}
132
133
MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity(
134
202
    CoarseMonoClock::Duration granularity) {
135
202
  coarse_timer_granularity_ = granularity;
136
202
  return *this;
137
202
}
138
139
MessengerBuilder &MessengerBuilder::set_metric_entity(
140
19.6k
    const scoped_refptr<MetricEntity>& metric_entity) {
141
19.6k
  metric_entity_ = metric_entity;
142
19.6k
  return *this;
143
19.6k
}
144
145
21.3k
Result<std::unique_ptr<Messenger>> MessengerBuilder::Build() {
146
21.3k
  if (!connection_context_factory_) {
147
1.69k
    UseDefaultConnectionContextFactory();
148
1.69k
  }
149
21.3k
  std::unique_ptr<Messenger> messenger(new Messenger(*this));
150
21.3k
  RETURN_NOT_OK(messenger->Init());
151
152
21.3k
  return messenger;
153
21.3k
}
154
155
MessengerBuilder &MessengerBuilder::AddStreamFactory(
156
37.6k
    const Protocol* protocol, StreamFactoryPtr factory) {
157
37.6k
  auto p = stream_factories_.emplace(protocol, std::move(factory));
158
0
  LOG_IF(DFATAL, !p.second) << "Duplicate stream factory: " << protocol->ToString();
159
37.6k
  return *this;
160
37.6k
}
161
162
MessengerBuilder &MessengerBuilder::UseDefaultConnectionContextFactory(
163
21.1k
    const std::shared_ptr<MemTracker>& parent_mem_tracker) {
164
21.1k
  if (parent_mem_tracker) {
165
19.1k
    last_used_parent_mem_tracker_ = parent_mem_tracker;
166
19.1k
  }
167
21.1k
  connection_context_factory_ = rpc::CreateConnectionContextFactory<YBOutboundConnectionContext>(
168
21.1k
      FLAGS_outbound_rpc_memory_limit, parent_mem_tracker);
169
21.1k
  return *this;
170
21.1k
}
171
172
// ------------------------------------------------------------------------------------------------
173
// Messenger
174
// ------------------------------------------------------------------------------------------------
175
176
4.18k
void Messenger::Shutdown() {
177
4.18k
  ShutdownThreadPools();
178
4.18k
  ShutdownAcceptor();
179
4.18k
  UnregisterAllServices();
180
181
  // Since we're shutting down, it's OK to block.
182
4.18k
  ThreadRestrictions::ScopedAllowWait allow_wait;
183
184
4.18k
  std::vector<Reactor*> reactors;
185
4.18k
  std::unique_ptr<Acceptor> acceptor;
186
4.18k
  {
187
4.18k
    std::lock_guard<percpu_rwlock> guard(lock_);
188
4.18k
    if (closing_) {
189
431
      return;
190
431
    }
191
18
    VLOG(1) << "shutting down messenger " << name_;
192
3.75k
    closing_ = true;
193
194
25
    DCHECK(rpc_services_.empty()) << "Unregister RPC services before shutting down Messenger";
195
3.75k
    rpc_services_.clear();
196
197
3.75k
    acceptor.swap(acceptor_);
198
199
13.6k
    for (const auto& reactor : reactors_) {
200
13.6k
      reactors.push_back(reactor.get());
201
13.6k
    }
202
3.75k
  }
203
204
3.75k
  if (acceptor) {
205
0
    acceptor->Shutdown();
206
0
  }
207
208
13.6k
  for (auto* reactor : reactors) {
209
13.6k
    reactor->Shutdown();
210
13.6k
  }
211
212
3.75k
  scheduler_.Shutdown();
213
3.75k
  io_thread_pool_.Shutdown();
214
215
13.6k
  for (auto* reactor : reactors) {
216
13.6k
    reactor->Join();
217
13.6k
  }
218
219
3.75k
  io_thread_pool_.Join();
220
221
3.75k
  {
222
3.75k
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
223
8
    LOG_IF(DFATAL, !scheduled_tasks_.empty())
224
8
        << "Scheduled tasks is not empty after messenger shutdown: "
225
8
        << yb::ToString(scheduled_tasks_);
226
3.75k
  }
227
3.75k
}
228
229
Status Messenger::ListenAddress(
230
    ConnectionContextFactoryPtr factory, const Endpoint& accept_endpoint,
231
17.2k
    Endpoint* bound_endpoint) {
232
17.2k
  Acceptor* acceptor;
233
17.2k
  {
234
17.2k
    std::lock_guard<percpu_rwlock> guard(lock_);
235
17.2k
    if (!acceptor_) {
236
17.2k
      acceptor_.reset(new Acceptor(
237
17.2k
          metric_entity_, std::bind(&Messenger::RegisterInboundSocket, this, factory, _1, _2)));
238
17.2k
    }
239
17.2k
    auto accept_host = accept_endpoint.address();
240
2
    auto& outbound_address = accept_host.is_v6() ? outbound_address_v6_
241
17.2k
                                                 : outbound_address_v4_;
242
17.2k
    if (outbound_address.is_unspecified() && !accept_host.is_unspecified()) {
243
17.2k
      outbound_address = accept_host;
244
17.2k
    }
245
17.2k
    acceptor = acceptor_.get();
246
17.2k
  }
247
17.2k
  return acceptor->Listen(accept_endpoint, bound_endpoint);
248
17.2k
}
249
250
17.2k
Status Messenger::StartAcceptor() {
251
140k
  for (const auto& p : rpc_services_) {
252
140k
    p.second->FillEndpoints(&rpc_endpoints_);
253
140k
  }
254
255
17.2k
  std::lock_guard<percpu_rwlock> guard(lock_);
256
17.2k
  if (acceptor_) {
257
17.2k
    return acceptor_->Start();
258
0
  } else {
259
0
    return STATUS(IllegalState, "Trying to start acceptor w/o active addresses");
260
0
  }
261
17.2k
}
262
263
18.1k
void Messenger::BreakConnectivityWith(const IpAddress& address) {
264
18.1k
  BreakConnectivity(address, /* incoming */ true, /* outgoing */ true);
265
18.1k
}
266
0
void Messenger::BreakConnectivityTo(const IpAddress& address) {
267
0
  BreakConnectivity(address, /* incoming */ false, /* outgoing */ true);
268
0
}
269
270
0
void Messenger::BreakConnectivityFrom(const IpAddress& address) {
271
0
  BreakConnectivity(address, /* incoming */ true, /* outgoing */ false);
272
0
}
273
274
18.1k
void Messenger::BreakConnectivity(const IpAddress& address, bool incoming, bool outgoing) {
275
18.1k
  LOG(INFO) << "TEST: Break " << (incoming ? "incoming" : "") << "/" << (outgoing ? "outgoing" : "")
276
18.1k
            << " connectivity with: " << address;
277
278
18.1k
  boost::optional<CountDownLatch> latch;
279
18.1k
  {
280
18.1k
    std::lock_guard<percpu_rwlock> guard(lock_);
281
18.1k
    if (broken_connectivity_from_.empty() || broken_connectivity_to_.empty()) {
282
910
      has_broken_connectivity_.store(true, std::memory_order_release);
283
910
    }
284
18.1k
    bool inserted_from = false;
285
18.1k
    if (incoming) {
286
18.1k
      inserted_from = broken_connectivity_from_.insert(address).second;
287
18.1k
    }
288
18.1k
    bool inserted_to = false;
289
18.1k
    if (outgoing) {
290
18.1k
      inserted_to = broken_connectivity_to_.insert(address).second;
291
18.1k
    }
292
18.1k
    if (inserted_from || inserted_to) {
293
18.1k
      latch.emplace(reactors_.size());
294
181k
      for (const auto& reactor : reactors_) {
295
181k
        auto scheduled = reactor->ScheduleReactorTask(MakeFunctorReactorTask(
296
178k
            [&latch, address, incoming, outgoing](Reactor* reactor) {
297
178k
              if (incoming) {
298
178k
                reactor->DropIncomingWithRemoteAddress(address);
299
178k
              }
300
178k
              if (outgoing) {
301
178k
                reactor->DropOutgoingWithRemoteAddress(address);
302
178k
              }
303
178k
              latch->CountDown();
304
178k
            },
305
181k
            SOURCE_LOCATION()));
306
181k
        if (!scheduled) {
307
0
          LOG(INFO) << "Failed to schedule drop connection with: " << address.to_string();
308
0
          latch->CountDown();
309
0
        }
310
181k
      }
311
18.1k
    }
312
18.1k
  }
313
314
18.1k
  if (latch) {
315
18.1k
    latch->Wait();
316
18.1k
  }
317
18.1k
}
318
319
18.1k
void Messenger::RestoreConnectivityWith(const IpAddress& address) {
320
18.1k
  RestoreConnectivity(address, /* incoming */ true, /* outgoing */ true);
321
18.1k
}
322
0
void Messenger::RestoreConnectivityTo(const IpAddress& address) {
323
0
  RestoreConnectivity(address, /* incoming */ false, /* outgoing */ true);
324
0
}
325
326
0
void Messenger::RestoreConnectivityFrom(const IpAddress& address) {
327
0
  RestoreConnectivity(address, /* incoming */ true, /* outgoing */ false);
328
0
}
329
330
18.1k
void Messenger::RestoreConnectivity(const IpAddress& address, bool incoming, bool outgoing) {
331
18.1k
  LOG(INFO) << "TEST: Restore " << (incoming ? "incoming" : "") << "/"
332
18.1k
            << (outgoing ? "outgoing" : "") << " connectivity with: " << address;
333
334
18.1k
  std::lock_guard<percpu_rwlock> guard(lock_);
335
18.1k
  if (incoming) {
336
18.1k
    broken_connectivity_from_.erase(address);
337
18.1k
  }
338
18.1k
  if (outgoing) {
339
18.1k
    broken_connectivity_to_.erase(address);
340
18.1k
  }
341
18.1k
  if (broken_connectivity_from_.empty() && broken_connectivity_to_.empty()) {
342
1
    has_broken_connectivity_.store(false, std::memory_order_release);
343
1
  }
344
18.1k
}
345
346
314k
bool Messenger::TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote) {
347
314k
  if (has_broken_connectivity_.load(std::memory_order_acquire)) {
348
7.11k
    shared_lock<rw_spinlock> guard(lock_.get_lock());
349
7.11k
    return broken_connectivity_from_.count(remote) != 0;
350
7.11k
  }
351
307k
  return false;
352
307k
}
353
354
19.9M
bool Messenger::TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote) {
355
19.9M
  if (has_broken_connectivity_.load(std::memory_order_acquire)) {
356
100k
    shared_lock<rw_spinlock> guard(lock_.get_lock());
357
100k
    return broken_connectivity_to_.count(remote) != 0;
358
100k
  }
359
19.8M
  return false;
360
19.8M
}
361
362
8
Status Messenger::TEST_GetReactorMetrics(size_t reactor_idx, ReactorMetrics* metrics) {
363
8
  if (reactor_idx >= reactors_.size()) {
364
0
    return STATUS_FORMAT(
365
0
        InvalidArgument, "Invalid reactor index $0, should be >=0 and <$1", reactor_idx,
366
0
        reactors_.size());
367
0
  }
368
8
  return reactors_[reactor_idx]->GetMetrics(metrics);
369
8
}
370
371
4.93k
void Messenger::ShutdownAcceptor() {
372
4.93k
  std::unique_ptr<Acceptor> acceptor;
373
4.93k
  {
374
4.93k
    std::lock_guard<percpu_rwlock> guard(lock_);
375
4.93k
    acceptor.swap(acceptor_);
376
4.93k
  }
377
4.93k
  if (acceptor) {
378
258
    acceptor->Shutdown();
379
258
  }
380
4.93k
}
381
382
28.3M
rpc::ThreadPool& Messenger::ThreadPool(ServicePriority priority) {
383
28.3M
  switch (priority) {
384
17.8M
    case ServicePriority::kNormal:
385
17.8M
      return *normal_thread_pool_;
386
10.4M
    case ServicePriority::kHigh:
387
10.4M
      auto high_priority_thread_pool = high_priority_thread_pool_.get();
388
10.4M
      if (high_priority_thread_pool) {
389
10.4M
        return *high_priority_thread_pool;
390
10.4M
      }
391
52.2k
      std::lock_guard<std::mutex> lock(mutex_high_priority_thread_pool_);
392
52.2k
      high_priority_thread_pool = high_priority_thread_pool_.get();
393
52.2k
      if (high_priority_thread_pool) {
394
0
        return *high_priority_thread_pool;
395
0
      }
396
52.2k
      const ThreadPoolOptions& options = normal_thread_pool_->options();
397
52.2k
      high_priority_thread_pool_.reset(new rpc::ThreadPool(
398
52.2k
          name_ + "-high-pri", options.queue_limit, options.max_workers));
399
52.2k
      return *high_priority_thread_pool_.get();
400
0
  }
401
0
  FATAL_INVALID_ENUM_VALUE(ServicePriority, priority);
402
0
}
403
404
// Register a new RpcService to handle inbound requests.
405
Status Messenger::RegisterService(
406
140k
    const std::string& service_name, const scoped_refptr<RpcService>& service) {
407
140k
  DCHECK(service);
408
140k
  rpc_services_.emplace(service_name, service);
409
140k
  return Status::OK();
410
140k
}
411
412
4.93k
void Messenger::ShutdownThreadPools() {
413
4.93k
  normal_thread_pool_->Shutdown();
414
4.93k
  auto high_priority_thread_pool = high_priority_thread_pool_.get();
415
4.93k
  if (high_priority_thread_pool) {
416
1.28k
    high_priority_thread_pool->Shutdown();
417
1.28k
  }
418
4.93k
}
419
420
5.01k
void Messenger::UnregisterAllServices() {
421
5.01k
  CHECK_OK(rpc_services_counter_.DisableAndWaitForOps(CoarseTimePoint::max(), Stop::kTrue));
422
5.01k
  rpc_services_counter_.UnlockExclusiveOpMutex();
423
424
2.02k
  for (const auto& p : rpc_services_) {
425
2.02k
    p.second->StartShutdown();
426
2.02k
  }
427
2.02k
  for (const auto& p : rpc_services_) {
428
2.02k
    p.second->CompleteShutdown();
429
2.02k
  }
430
431
5.01k
  rpc_services_.clear();
432
5.01k
  rpc_endpoints_.clear();
433
5.01k
}
434
435
class NotifyDisconnectedReactorTask : public ReactorTask {
436
 public:
437
  NotifyDisconnectedReactorTask(OutboundCallPtr call, const SourceLocation& source_location)
438
48.8k
      : ReactorTask(source_location), call_(std::move(call)) {}
439
440
49.0k
  void Run(Reactor* reactor) override  {
441
49.0k
    call_->Transferred(STATUS_FORMAT(
442
49.0k
        NetworkError, "TEST: Connectivity is broken with $0",
443
49.0k
        call_->conn_id().remote().address()), nullptr);
444
49.0k
  }
445
 private:
446
0
  void DoAbort(const Status &abort_status) override {
447
0
    call_->Transferred(abort_status, nullptr);
448
0
  }
449
450
  OutboundCallPtr call_;
451
};
452
453
19.9M
void Messenger::QueueOutboundCall(OutboundCallPtr call) {
454
19.9M
  const auto& remote = call->conn_id().remote();
455
19.9M
  Reactor *reactor = RemoteToReactor(remote, call->conn_id().idx());
456
457
19.9M
  if (TEST_ShouldArtificiallyRejectOutgoingCallsTo(remote.address())) {
458
4
    VLOG(1) << "TEST: Rejected connection to " << remote;
459
49.0k
    auto scheduled = reactor->ScheduleReactorTask(std::make_shared<NotifyDisconnectedReactorTask>(
460
49.0k
        call, SOURCE_LOCATION()));
461
49.0k
    if (!scheduled) {
462
10
      call->Transferred(STATUS(Aborted, "Reactor is closing"), nullptr /* conn */);
463
10
    }
464
49.0k
    return;
465
49.0k
  }
466
467
19.8M
  reactor->QueueOutboundCall(std::move(call));
468
19.8M
}
469
470
29.8M
void Messenger::Handle(InboundCallPtr call, Queue queue) {
471
29.8M
  ScopedRWOperation op(&rpc_services_counter_, CoarseTimePoint::min());
472
29.8M
  if (!op.ok()) {
473
15
    call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, MoveStatus(op));
474
15
    return;
475
15
  }
476
29.8M
  auto it = rpc_endpoints_.find(call->serialized_remote_method());
477
29.8M
  if (it == rpc_endpoints_.end()) {
478
9
    auto remote_method = ParseRemoteMethod(call->serialized_remote_method());
479
9
    Status s;
480
9
    ErrorStatusPB::RpcErrorCodePB error_code = ErrorStatusPB::ERROR_NO_SUCH_SERVICE;
481
9
    if (remote_method.ok()) {
482
9
      if (rpc_services_.count(remote_method->service.ToBuffer())) {
483
2
        error_code = ErrorStatusPB::ERROR_NO_SUCH_METHOD;
484
2
        s = STATUS_FORMAT(
485
2
            InvalidArgument, "Call on service $0 received from $1 with an invalid method name: $2",
486
2
            remote_method->service.ToBuffer(),
487
2
            call->remote_address(),
488
2
            remote_method->method.ToBuffer());
489
7
      } else {
490
7
        s = STATUS_FORMAT(
491
7
            ServiceUnavailable, "Service $0 not registered on $1",
492
7
            remote_method->service.ToBuffer(), name_);
493
7
      }
494
0
    } else {
495
0
      s = remote_method.status();
496
0
    }
497
9
    LOG(WARNING) << s;
498
9
    call->RespondFailure(error_code, s);
499
9
    return;
500
9
  }
501
502
  // The RpcService will respond to the client on success or failure.
503
29.8M
  call->set_method_index(it->second.second);
504
29.8M
  it->second.first->QueueInboundCall(std::move(call));
505
29.8M
}
506
507
0
RpcServicePtr Messenger::TEST_rpc_service(const std::string& service_name) const {
508
0
  ScopedRWOperation op(&rpc_services_counter_, CoarseTimePoint::min());
509
0
  if (!op.ok()) {
510
0
    return nullptr;
511
0
  }
512
0
  auto it = rpc_services_.find(service_name);
513
0
  return it != rpc_services_.end() ? it->second : nullptr;
514
0
}
515
516
290k
const std::shared_ptr<MemTracker>& Messenger::parent_mem_tracker() {
517
290k
  return connection_context_factory_->buffer_tracker();
518
290k
}
519
520
void Messenger::RegisterInboundSocket(
521
314k
    const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote) {
522
314k
  if (TEST_ShouldArtificiallyRejectIncomingCallsFrom(remote.address())) {
523
1
    auto status = new_socket->Close();
524
0
    VLOG(1) << "TEST: Rejected connection from " << remote
525
0
            << ", close status: " << status.ToString();
526
1
    return;
527
1
  }
528
529
314k
  if (FLAGS_socket_receive_buffer_size) {
530
0
    WARN_NOT_OK(new_socket->SetReceiveBufferSize(FLAGS_socket_receive_buffer_size),
531
0
                "Set receive buffer size failed: ");
532
0
  }
533
534
314k
  auto receive_buffer_size = new_socket->GetReceiveBufferSize();
535
314k
  if (!receive_buffer_size.ok()) {
536
0
    LOG(WARNING) << "Register inbound socket failed: " << receive_buffer_size.status();
537
0
    return;
538
0
  }
539
540
314k
  int idx = num_connections_accepted_.fetch_add(1) % num_connections_to_server_;
541
314k
  Reactor *reactor = RemoteToReactor(remote, idx);
542
314k
  reactor->RegisterInboundSocket(new_socket, *receive_buffer_size, remote, factory);
543
314k
}
544
545
Messenger::Messenger(const MessengerBuilder &bld)
546
    : name_(bld.name_),
547
      connection_context_factory_(bld.connection_context_factory_),
548
      stream_factories_(bld.stream_factories_),
549
      listen_protocol_(bld.listen_protocol_),
550
      rpc_services_counter_(name_ + " endpoints"),
551
      metric_entity_(bld.metric_entity_),
552
      io_thread_pool_(name_, FLAGS_io_thread_pool_size),
553
      scheduler_(&io_thread_pool_.io_service()),
554
      normal_thread_pool_(new rpc::ThreadPool(name_, bld.queue_limit_, bld.workers_limit_)),
555
      resolver_(new DnsResolver(&io_thread_pool_.io_service())),
556
      rpc_metrics_(std::make_shared<RpcMetrics>(bld.metric_entity_)),
557
21.3k
      num_connections_to_server_(bld.num_connections_to_server_) {
558
21.3k
#ifndef NDEBUG
559
21.3k
  creation_stack_trace_.Collect(/* skip_frames */ 1);
560
21.3k
#endif
561
37
  VLOG(1) << "Messenger constructor for " << this << " called at:\n" << GetStackTrace();
562
209k
  for (int i = 0; i < bld.num_reactors_; i++) {
563
188k
    reactors_.emplace_back(std::make_unique<Reactor>(this, i, bld));
564
188k
  }
565
  // Make sure skip buffer is allocated before we hit memory limit and try to use it.
566
21.3k
  GetGlobalSkipBuffer();
567
21.3k
}
568
569
3.74k
Messenger::~Messenger() {
570
3.74k
  std::lock_guard<percpu_rwlock> guard(lock_);
571
  // This logging and the corresponding logging in the constructor is here to track down the
572
  // occasional CHECK(closing_) failure below in some tests (ENG-2838).
573
1
  VLOG(1) << "Messenger destructor for " << this << " called at:\n" << GetStackTrace();
574
3.74k
#ifndef NDEBUG
575
3.74k
  if (!closing_) {
576
0
    LOG(ERROR) << "Messenger created here:\n" << creation_stack_trace_.Symbolize()
577
0
               << "Messenger destructor for " << this << " called at:\n" << GetStackTrace();
578
0
  }
579
3.74k
#endif
580
0
  CHECK(closing_) << "Should have already shut down";
581
3.74k
  reactors_.clear();
582
3.74k
}
583
584
1
size_t Messenger::max_concurrent_requests() const {
585
1
  return num_connections_to_server_;
586
1
}
587
588
20.2M
Reactor* Messenger::RemoteToReactor(const Endpoint& remote, uint32_t idx) {
589
20.2M
  auto hash_code = hash_value(remote);
590
20.2M
  auto reactor_idx = (hash_code + idx) % reactors_.size();
591
  // This is just a static partitioning; where each connection
592
  // to a remote is assigned to a particular reactor. We could
593
  // get a lot fancier with assigning Sockaddrs to Reactors,
594
  // but this should be good enough.
595
20.2M
  return reactors_[reactor_idx].get();
596
20.2M
}
597
598
21.0k
Status Messenger::Init() {
599
21.0k
  Status status;
600
185k
  for (const auto& r : reactors_) {
601
185k
    RETURN_NOT_OK(r->Init());
602
185k
  }
603
604
21.0k
  return Status::OK();
605
21.0k
}
606
607
Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
608
4
                                  DumpRunningRpcsResponsePB* resp) {
609
4
  shared_lock<rw_spinlock> guard(lock_.get_lock());
610
15
  for (const auto& reactor : reactors_) {
611
15
    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
612
15
  }
613
4
  return Status::OK();
614
4
}
615
616
Status Messenger::QueueEventOnAllReactors(
617
10.8k
    ServerEventListPtr server_event, const SourceLocation& source_location) {
618
10.8k
  shared_lock<rw_spinlock> guard(lock_.get_lock());
619
108k
  for (const auto& reactor : reactors_) {
620
108k
    reactor->QueueEventOnAllConnections(server_event, source_location);
621
108k
  }
622
10.8k
  return Status::OK();
623
10.8k
}
624
625
984k
void Messenger::RemoveScheduledTask(ScheduledTaskId id) {
626
984k
  CHECK_GT(id, 0);
627
984k
  std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
628
984k
  scheduled_tasks_.erase(id);
629
984k
}
630
631
885
void Messenger::AbortOnReactor(ScheduledTaskId task_id) {
632
885
  DCHECK(!reactors_.empty());
633
885
  CHECK_GT(task_id, 0);
634
635
885
  std::shared_ptr<DelayedTask> task;
636
885
  {
637
885
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
638
885
    auto iter = scheduled_tasks_.find(task_id);
639
885
    if (iter != scheduled_tasks_.end()) {
640
633
      task = iter->second;
641
633
      scheduled_tasks_.erase(iter);
642
633
    }
643
885
  }
644
885
  if (task) {
645
633
    task->AbortTask(STATUS(Aborted, "Task aborted by messenger"));
646
633
  }
647
885
}
648
649
ScheduledTaskId Messenger::ScheduleOnReactor(
650
986k
    StatusFunctor func, MonoDelta when, const SourceLocation& source_location, Messenger* msgr) {
651
986k
  DCHECK(!reactors_.empty());
652
653
  // If we're already running on a reactor thread, reuse it.
654
986k
  Reactor* chosen = nullptr;
655
9.66M
  for (const auto& r : reactors_) {
656
9.66M
    if (r->IsCurrentThread()) {
657
3.35k
      chosen = r.get();
658
3.35k
      break;
659
3.35k
    }
660
9.66M
  }
661
986k
  if (chosen == nullptr) {
662
    // Not running on a reactor thread, pick one at random.
663
981k
    chosen = reactors_[rand() % reactors_.size()].get();
664
981k
  }
665
666
986k
  ScheduledTaskId task_id = 0;
667
986k
  if (msgr != nullptr) {
668
985k
    task_id = next_task_id_.fetch_add(1);
669
985k
  }
670
986k
  auto task = std::make_shared<DelayedTask>(
671
986k
      std::move(func), when, task_id, source_location, msgr);
672
986k
  if (msgr != nullptr) {
673
986k
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
674
986k
    scheduled_tasks_.emplace(task_id, task);
675
986k
  }
676
677
987k
  if (chosen->ScheduleReactorTask(task)) {
678
987k
    return task_id;
679
987k
  }
680
681
18.4E
  {
682
18.4E
    std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
683
18.4E
    scheduled_tasks_.erase(task_id);
684
18.4E
  }
685
686
18.4E
  return kInvalidTaskId;
687
18.4E
}
688
689
2.50M
scoped_refptr<MetricEntity> Messenger::metric_entity() const {
690
2.50M
  return metric_entity_;
691
2.50M
}
692
693
} // namespace rpc
694
} // namespace yb