YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/reactor.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/reactor.h"
34
35
#include <netinet/in.h>
36
#include <stdlib.h>
37
#include <sys/types.h>
38
39
#include <functional>
40
#include <map>
41
#include <mutex>
42
#include <set>
43
#include <string>
44
45
#include <boost/preprocessor/cat.hpp>
46
#include <boost/preprocessor/stringize.hpp>
47
#include <ev++.h>
48
#include <glog/logging.h>
49
50
#include "yb/gutil/ref_counted.h"
51
#include "yb/gutil/stringprintf.h"
52
53
#include "yb/rpc/connection.h"
54
#include "yb/rpc/connection_context.h"
55
#include "yb/rpc/messenger.h"
56
#include "yb/rpc/rpc_controller.h"
57
#include "yb/rpc/rpc_introspection.pb.h"
58
#include "yb/rpc/server_event.h"
59
60
#include "yb/util/atomic.h"
61
#include "yb/util/countdown_latch.h"
62
#include "yb/util/errno.h"
63
#include "yb/util/format.h"
64
#include "yb/util/logging.h"
65
#include "yb/util/memory/memory.h"
66
#include "yb/util/metric_entity.h"
67
#include "yb/util/monotime.h"
68
#include "yb/util/net/socket.h"
69
#include "yb/util/scope_exit.h"
70
#include "yb/util/size_literals.h"
71
#include "yb/util/status.h"
72
#include "yb/util/status_format.h"
73
#include "yb/util/status_log.h"
74
#include "yb/util/thread.h"
75
#include "yb/util/thread_restrictions.h"
76
#include "yb/util/trace.h"
77
78
using namespace std::literals;
79
80
DEFINE_uint64(rpc_read_buffer_size, 0,
81
              "RPC connection read buffer size. 0 to auto detect.");
82
DECLARE_string(local_ip_for_outbound_sockets);
83
DECLARE_int32(num_connections_to_server);
84
DECLARE_int32(socket_receive_buffer_size);
85
86
namespace yb {
87
namespace rpc {
88
89
namespace {
90
91
static const char* kShutdownMessage = "Shutdown connection";
92
93
24.6k
const Status& AbortedError() {
94
24.6k
  static Status result = STATUS(Aborted, kShutdownMessage, "" /* msg2 */, Errno(ESHUTDOWN));
95
24.6k
  return result;
96
24.6k
}
97
98
3.10M
const Status& ServiceUnavailableError() {
99
3.10M
  static Status result = STATUS(
100
3.10M
      ServiceUnavailable, kShutdownMessage, "" /* msg2 */, Errno(ESHUTDOWN));
101
3.10M
  return result;
102
3.10M
}
103
104
// Callback for libev fatal errors (eg running out of file descriptors).
105
// Unfortunately libev doesn't plumb these back through to the caller, but
106
// instead just expects the callback to abort.
107
//
108
// This implementation is slightly preferable to the built-in one since
109
// it uses a FATAL log message instead of printing to stderr, which might
110
// not end up anywhere useful in a daemonized context.
111
357
void LibevSysErr(const char* msg) noexcept {
112
357
  PLOG(FATAL) << "LibEV fatal error: " << msg;
113
357
}
114
115
22.5k
void DoInitLibEv() {
116
22.5k
  ev::set_syserr_cb(LibevSysErr);
117
22.5k
}
118
119
321M
bool HasReactorStartedClosing(ReactorState state) {
120
321M
  return state == ReactorState::kClosing || 
state == ReactorState::kClosed321M
;
121
321M
}
122
123
3.73M
size_t PatchReceiveBufferSize(size_t receive_buffer_size) {
124
3.73M
  return std::max<size_t>(
125
3.73M
      64_KB, FLAGS_rpc_read_buffer_size ? 
FLAGS_rpc_read_buffer_size10
:
receive_buffer_size3.73M
);
126
3.73M
}
127
128
} // anonymous namespace
129
130
// ------------------------------------------------------------------------------------------------
131
// Reactor class members
132
// ------------------------------------------------------------------------------------------------
133
134
Reactor::Reactor(Messenger* messenger,
135
                 int index,
136
                 const MessengerBuilder &bld)
137
    : messenger_(messenger),
138
      name_(StringPrintf("%s_R%03d", messenger->name().c_str(), index)),
139
      log_prefix_(name_ + ": "),
140
      loop_(kDefaultLibEvFlags),
141
      cur_time_(CoarseMonoClock::Now()),
142
      last_unused_tcp_scan_(cur_time_),
143
      connection_keepalive_time_(bld.connection_keepalive_time()),
144
      coarse_timer_granularity_(bld.coarse_timer_granularity()),
145
289k
      num_connections_to_server_(bld.num_connections_to_server()) {
146
289k
  static std::once_flag libev_once;
147
289k
  std::call_once(libev_once, DoInitLibEv);
148
149
289k
  
VLOG_WITH_PREFIX478
(1) << "Create reactor with keep alive_time: "
150
478
                      << yb::ToString(connection_keepalive_time_)
151
478
                      << ", coarse timer granularity: " << yb::ToString(coarse_timer_granularity_);
152
153
289k
  process_outbound_queue_task_ =
154
289k
      MakeFunctorReactorTask(std::bind(&Reactor::ProcessOutboundQueue, this), SOURCE_LOCATION());
155
289k
}
156
157
25.1k
Reactor::~Reactor() {
158
18.4E
  LOG_IF_WITH_PREFIX(DFATAL, !pending_tasks_.empty())
159
18.4E
      << "Not empty pending tasks when destroyed reactor: " << yb::ToString(pending_tasks_);
160
25.1k
}
161
162
284k
Status Reactor::Init() {
163
284k
  DCHECK
(thread_.get() == nullptr) << "Already started"5
;
164
18.4E
  DVLOG_WITH_PREFIX(6) << "Called Reactor::Init()";
165
  // Register to get async notifications in our epoll loop.
166
284k
  async_.set(loop_);
167
284k
  async_.set<Reactor, &Reactor::AsyncHandler>(this);
168
284k
  async_.start();
169
170
  // Register the timer watcher.
171
  // The timer is used for closing old TCP connections and applying
172
  // backpressure.
173
284k
  timer_.set(loop_);
174
284k
  timer_.set<Reactor, &Reactor::TimerHandler>(this);
175
284k
  timer_.start(ToSeconds(coarse_timer_granularity_),
176
284k
               ToSeconds(coarse_timer_granularity_));
177
178
  // Create Reactor thread.
179
284k
  const std::string group_name = messenger_->name() + "_reactor";
180
284k
  return yb::Thread::Create(group_name, group_name, &Reactor::RunThread, this, &thread_);
181
284k
}
182
183
24.7k
void Reactor::Shutdown() {
184
24.7k
  ReactorState old_state = ReactorState::kRunning;
185
49.5k
  do {
186
49.5k
    if (state_.compare_exchange_weak(old_state,
187
49.5k
                                     ReactorState::kClosing,
188
49.5k
                                     std::memory_order_acq_rel)) {
189
18.4E
      VLOG_WITH_PREFIX(1) << "shutting down Reactor thread.";
190
24.7k
      WakeThread();
191
24.7k
    }
192
49.5k
  } while (!HasReactorStartedClosing(old_state));
193
194
  // Another thread already switched the state to closing before us.
195
24.7k
}
196
197
3.10M
void Reactor::ShutdownConnection(const ConnectionPtr& conn) {
198
3.10M
  DCHECK(IsCurrentThread());
199
200
3.10M
  
VLOG_WITH_PREFIX3.41k
(1) << "shutting down " << conn->ToString()3.41k
;
201
3.10M
  conn->Shutdown(ServiceUnavailableError());
202
3.10M
  if (!conn->context().Idle()) {
203
18.4E
    VLOG_WITH_PREFIX(1) << "connection is not idle: " << conn->ToString();
204
3.01k
    std::weak_ptr<Connection> weak_conn(conn);
205
3.01k
    conn->context().ListenIdle([this, weak_conn]() {
206
2.37k
      DCHECK(IsCurrentThreadOrStartedClosing());
207
2.37k
      auto conn = weak_conn.lock();
208
2.37k
      if (conn) {
209
2.37k
        
VLOG_WITH_PREFIX0
(1) << "connection became idle " << conn->ToString()0
;
210
        // The access to waiting_conns_ is safe here, because this code can only be called on the
211
        // reactor thread or when holding final_abort_mutex_ during shutdown.
212
2.37k
        waiting_conns_.erase(conn);
213
2.37k
      }
214
2.37k
    });
215
3.01k
    waiting_conns_.insert(conn);
216
3.10M
  } else {
217
3.10M
    
VLOG_WITH_PREFIX9.71k
(1) << "connection is idle: " << conn->ToString()9.71k
;
218
3.10M
  }
219
3.10M
}
220
221
24.6k
void Reactor::ShutdownInternal() {
222
24.6k
  DCHECK(IsCurrentThread());
223
224
24.6k
  stopping_ = true;
225
24.6k
  stop_start_time_ = CoarseMonoClock::Now();
226
227
  // Tear down any outbound TCP connections.
228
24.6k
  
VLOG_WITH_PREFIX50
(1) << "tearing down outbound TCP connections..."50
;
229
24.6k
  decltype(client_conns_) client_conns = std::move(client_conns_);
230
24.6k
  for (auto& pair : client_conns) {
231
22.3k
    ShutdownConnection(pair.second);
232
22.3k
  }
233
24.6k
  client_conns.clear();
234
235
  // Tear down any inbound TCP connections.
236
24.6k
  
VLOG_WITH_PREFIX47
(1) << "tearing down inbound TCP connections..."47
;
237
24.6k
  for (const ConnectionPtr& conn : server_conns_) {
238
778
    ShutdownConnection(conn);
239
778
  }
240
24.6k
  server_conns_.clear();
241
242
  // Abort any scheduled tasks.
243
  //
244
  // These won't be found in the Reactor's list of pending tasks
245
  // because they've been "run" (that is, they've been scheduled).
246
24.6k
  
VLOG_WITH_PREFIX88
(1) << "aborting scheduled tasks"88
;
247
24.6k
  Status aborted = AbortedError();
248
24.6k
  for (const auto& task : scheduled_tasks_) {
249
890
    task->Abort(aborted);
250
890
  }
251
24.6k
  scheduled_tasks_.clear();
252
253
  // async_handler_tasks_ are the tasks added by ScheduleReactorTask.
254
24.6k
  
VLOG_WITH_PREFIX232
(1) << "aborting async handler tasks"232
;
255
24.6k
  for (const auto& task : async_handler_tasks_) {
256
1
    task->Abort(aborted);
257
1
  }
258
259
24.6k
  
VLOG_WITH_PREFIX254
(1) << "aborting outbound calls"254
;
260
24.6k
  CHECK
(processing_outbound_queue_.empty()) << yb::ToString(processing_outbound_queue_)76
;
261
24.6k
  {
262
24.6k
    std::lock_guard<simple_spinlock> lock(outbound_queue_lock_);
263
24.6k
    outbound_queue_stopped_ = true;
264
24.6k
    outbound_queue_.swap(processing_outbound_queue_);
265
24.6k
  }
266
24.6k
  for (auto& call : processing_outbound_queue_) {
267
2
    call->Transferred(aborted, nullptr);
268
2
  }
269
24.6k
  processing_outbound_queue_.clear();
270
24.6k
}
271
272
8
Status Reactor::GetMetrics(ReactorMetrics *metrics) {
273
8
  return RunOnReactorThread([metrics](Reactor* reactor) {
274
8
    metrics->num_client_connections = reactor->client_conns_.size();
275
8
    metrics->num_server_connections = reactor->server_conns_.size();
276
8
    return Status::OK();
277
8
  }, SOURCE_LOCATION());
278
8
}
279
280
24.7k
void Reactor::Join() {
281
24.7k
  auto join_result = ThreadJoiner(thread_.get()).give_up_after(30s).Join();
282
24.7k
  if (
join_result.ok()24.7k
) {
283
24.7k
    return;
284
24.7k
  }
285
18.4E
  if (join_result.IsInvalidArgument()) {
286
0
    LOG_WITH_PREFIX(WARNING) << join_result;
287
0
    return;
288
0
  }
289
18.4E
  LOG_WITH_PREFIX(DFATAL) << "Failed to join Reactor " << thread_->ToString() << ": "
290
18.4E
                          << join_result;
291
  // Fallback to endless join in release mode.
292
18.4E
  thread_->Join();
293
18.4E
}
294
295
void Reactor::QueueEventOnAllConnections(
296
370k
    ServerEventListPtr server_event, const SourceLocation& source_location) {
297
370k
  ScheduleReactorFunctor([server_event = std::move(server_event)](Reactor* reactor) {
298
365k
    for (const ConnectionPtr& conn : reactor->server_conns_) {
299
28.4k
      conn->QueueOutboundData(server_event);
300
28.4k
    }
301
365k
  }, source_location);
302
370k
}
303
304
Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
305
15
                                DumpRunningRpcsResponsePB* resp) {
306
15
  return RunOnReactorThread([&req, resp](Reactor* reactor) -> Status {
307
15
    for (const ConnectionPtr& conn : reactor->server_conns_) {
308
11
      RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
309
11
    }
310
15
    for (const auto& entry : reactor->client_conns_) {
311
10
      Connection* conn = entry.second.get();
312
10
      RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
313
10
    }
314
15
    return Status::OK();
315
15
  }, SOURCE_LOCATION());
316
15
}
317
318
158M
void Reactor::WakeThread() {
319
158M
  async_.send();
320
158M
}
321
322
24.7k
void Reactor::CheckReadyToStop() {
323
24.7k
  DCHECK(IsCurrentThread());
324
325
24.7k
  
VLOG_WITH_PREFIX149
(4) << "Check ready to stop: " << thread_->ToString() << ", "
326
149
          << "waiting connections: " << yb::ToString(waiting_conns_);
327
328
24.7k
  if (VLOG_IS_ON(4)) {
329
0
    for (const auto& conn : waiting_conns_) {
330
0
      VLOG_WITH_PREFIX(4) << "Connection: " << conn->ToString() << ", idle=" << conn->Idle()
331
0
                          << ", why: " << conn->ReasonNotIdle();
332
0
    }
333
0
  }
334
335
24.7k
  if (waiting_conns_.empty()) {
336
18.4E
    VLOG_WITH_PREFIX(4) << "Reactor ready to stop, breaking loop: " << this;
337
338
18.4E
    VLOG_WITH_PREFIX(2) << "Marking reactor as closed: " << thread_.get()->ToString();
339
24.4k
    ReactorTasks final_tasks;
340
24.4k
    {
341
24.4k
      std::lock_guard<simple_spinlock> lock(pending_tasks_mtx_);
342
24.4k
      state_.store(ReactorState::kClosed, std::memory_order_release);
343
24.4k
      final_tasks.swap(pending_tasks_);
344
24.4k
    }
345
18.4E
    VLOG_WITH_PREFIX(2) << "Running final pending task aborts: " << thread_.get()->ToString();;
346
24.4k
    for (auto task : final_tasks) {
347
0
      task->Abort(ServiceUnavailableError());
348
0
    }
349
18.4E
    VLOG_WITH_PREFIX(2) << "Breaking reactor loop: " << thread_.get()->ToString();;
350
24.4k
    loop_.break_loop(); // break the epoll loop and terminate the thread
351
24.4k
  }
352
24.7k
}
353
354
// Handle async events.  These events are sent to the reactor by other threads that want to bring
355
// something to our attention, like the fact that we're shutting down, or the fact that there is a
356
// new outbound Transfer ready to send.
357
158M
void Reactor::AsyncHandler(ev::async &watcher, int revents) {
358
158M
  
VLOG_WITH_PREFIX_AND_FUNC90.9k
(4) << "Events: " << revents90.9k
;
359
360
158M
  DCHECK(IsCurrentThread());
361
362
158M
  auto se = ScopeExit([this] {
363
157M
    async_handler_tasks_.clear();
364
157M
  });
365
366
158M
  if (PREDICT_FALSE(DrainTaskQueueAndCheckIfClosing())) {
367
24.6k
    ShutdownInternal();
368
24.6k
    CheckReadyToStop();
369
24.6k
    return;
370
24.6k
  }
371
372
160M
  
for (const auto &task : async_handler_tasks_)158M
{
373
160M
    task->Run(this);
374
160M
  }
375
158M
}
376
377
622k
void Reactor::RegisterConnection(const ConnectionPtr& conn) {
378
622k
  DCHECK(IsCurrentThread());
379
380
622k
  Status s = conn->Start(&loop_);
381
622k
  if (
s.ok()622k
) {
382
622k
    server_conns_.push_back(conn);
383
18.4E
  } else {
384
18.4E
    LOG_WITH_PREFIX(WARNING) << "Failed to start connection: " << conn->ToString() << ": " << s;
385
18.4E
  }
386
622k
}
387
388
75.3M
ConnectionPtr Reactor::AssignOutboundCall(const OutboundCallPtr& call) {
389
75.3M
  DCHECK(IsCurrentThread());
390
75.3M
  ConnectionPtr conn;
391
392
  // TODO: Move call deadline timeout computation into OutboundCall constructor.
393
75.3M
  const MonoDelta &timeout = call->controller()->timeout();
394
75.3M
  MonoTime deadline;
395
75.3M
  if (!timeout.Initialized()) {
396
10.1k
    LOG_WITH_PREFIX(WARNING) << "Client call " << call->remote_method().ToString()
397
10.1k
                 << " has no timeout set for connection id: "
398
10.1k
                 << call->conn_id().ToString();
399
10.1k
    deadline = MonoTime::Max();
400
75.3M
  } else {
401
75.3M
    deadline = MonoTime::Now();
402
75.3M
    deadline.AddDelta(timeout);
403
75.3M
  }
404
405
75.3M
  Status s = FindOrStartConnection(call->conn_id(), call->hostname(), deadline, &conn);
406
75.3M
  if (PREDICT_FALSE(!s.ok())) {
407
4.53k
    call->SetFailed(s);
408
4.53k
    return ConnectionPtr();
409
4.53k
  }
410
411
75.3M
  conn->QueueOutboundCall(call);
412
75.3M
  return conn;
413
75.3M
}
414
415
//
416
// Handles timer events.  The periodic timer:
417
//
418
// 1. updates Reactor::cur_time_
419
// 2. every tcp_conn_timeo_ seconds, close down connections older than
420
//    tcp_conn_timeo_ seconds.
421
//
422
1.81G
void Reactor::TimerHandler(ev::timer &watcher, int revents) {
423
1.81G
  DCHECK(IsCurrentThread());
424
425
1.81G
  if (EV_ERROR & revents) {
426
0
    LOG_WITH_PREFIX(WARNING) << "Reactor got an error in the timer handler.";
427
0
    return;
428
0
  }
429
430
1.81G
  if (stopping_) {
431
200
    CheckReadyToStop();
432
200
    return;
433
200
  }
434
435
1.81G
  auto now = CoarseMonoClock::Now();
436
1.81G
  
VLOG_WITH_PREFIX4.10M
(4) << "timer tick at " << ToSeconds(now.time_since_epoch())4.10M
;
437
1.81G
  cur_time_ = now;
438
439
1.81G
  ScanIdleConnections();
440
1.81G
}
441
442
1.80G
void Reactor::ScanIdleConnections() {
443
1.80G
  DCHECK(IsCurrentThread());
444
1.80G
  if (connection_keepalive_time_ == CoarseMonoClock::Duration::zero()) {
445
18.4E
    VLOG_WITH_PREFIX(3) << "Skipping Idle connections check since connection_keepalive_time_ = 0";
446
518M
    return;
447
518M
  }
448
449
  // enforce TCP connection timeouts
450
1.28G
  auto c = server_conns_.begin();
451
1.28G
  auto c_end = server_conns_.end();
452
1.28G
  uint64_t timed_out = 0;
453
2.20G
  for (; c != c_end; ) {
454
916M
    const ConnectionPtr& conn = *c;
455
916M
    if (!conn->Idle()) {
456
18.4E
      VLOG_WITH_PREFIX(3) << "Connection " << conn->ToString() << " not idle";
457
8.08M
      ++c; // TODO: clean up this loop
458
8.08M
      continue;
459
8.08M
    }
460
461
908M
    auto last_activity_time = conn->last_activity_time();
462
908M
    auto connection_delta = cur_time_ - last_activity_time;
463
908M
    if (connection_delta > connection_keepalive_time_) {
464
155k
      conn->Shutdown(STATUS_FORMAT(
465
155k
          NetworkError, "Connection timed out after $0", ToSeconds(connection_delta)));
466
155k
      LOG_WITH_PREFIX(INFO)
467
155k
          << "DEBUG: Closing idle connection: " << conn->ToString()
468
155k
          << " - it has been idle for " << ToSeconds(connection_delta) << "s";
469
155k
      VLOG(1) << "(delta: " << ToSeconds(connection_delta)
470
11
          << ", current time: " << ToSeconds(cur_time_.time_since_epoch())
471
11
          << ", last activity time: " << ToSeconds(last_activity_time.time_since_epoch()) << ")";
472
155k
      server_conns_.erase(c++);
473
155k
      ++timed_out;
474
908M
    } else {
475
908M
      ++c;
476
908M
    }
477
908M
  }
478
479
  // TODO: above only times out on the server side.
480
  // Clients may want to set their keepalive timeout as well.
481
482
18.4E
  VLOG_IF_WITH_PREFIX(1, timed_out > 0) << "timed out " << timed_out << " TCP connections.";
483
1.28G
}
484
485
5.74G
bool Reactor::IsCurrentThread() const {
486
5.74G
  return thread_.get() == yb::Thread::current_thread();
487
5.74G
}
488
489
79.7M
bool Reactor::IsCurrentThreadOrStartedClosing() const {
490
79.7M
  return thread_.get() == yb::Thread::current_thread() ||
491
79.7M
         
HasReactorStartedClosing(state_.load(std::memory_order_acquire))0
;
492
79.7M
}
493
494
284k
void Reactor::RunThread() {
495
284k
  ThreadRestrictions::SetWaitAllowed(false);
496
284k
  ThreadRestrictions::SetIOAllowed(false);
497
18.4E
  DVLOG_WITH_PREFIX(6) << "Calling Reactor::RunThread()...";
498
284k
  loop_.run(/* flags */ 0);
499
284k
  
VLOG_WITH_PREFIX259k
(1) << "thread exiting."259k
;
500
284k
}
501
502
namespace {
503
504
3.13M
Result<Socket> CreateClientSocket(const Endpoint& remote) {
505
3.13M
  int flags = Socket::FLAG_NONBLOCKING;
506
3.13M
  if (remote.address().is_v6()) {
507
1
    flags |= Socket::FLAG_IPV6;
508
1
  }
509
3.13M
  Socket socket;
510
3.13M
  Status status = socket.Init(flags);
511
3.13M
  if (status.ok()) {
512
3.12M
    status = socket.SetNoDelay(true);
513
3.12M
  }
514
3.13M
  LOG_IF(WARNING, !status.ok()) << "failed to create an "
515
22.9k
      "outbound connection because a new socket could not "
516
22.9k
      "be created: " << status.ToString();
517
3.13M
  if (!status.ok())
518
4.30k
    return status;
519
3.13M
  return std::move(socket);
520
3.13M
}
521
522
template <class... Args>
523
Result<std::unique_ptr<Stream>> CreateStream(
524
3.73M
    const StreamFactories& factories, const Protocol* protocol, const StreamCreateData& data) {
525
3.73M
  auto it = factories.find(protocol);
526
3.73M
  if (it == factories.end()) {
527
0
    return STATUS_FORMAT(NotFound, "Unknown protocol: $0", protocol);
528
0
  }
529
3.73M
  return it->second->Create(data);
530
3.73M
}
531
532
} // namespace
533
534
Status Reactor::FindOrStartConnection(const ConnectionId &conn_id,
535
                                      const std::string& hostname,
536
                                      const MonoTime &deadline,
537
75.2M
                                      ConnectionPtr* conn) {
538
75.2M
  DCHECK(IsCurrentThread());
539
75.2M
  auto c = client_conns_.find(conn_id);
540
75.2M
  if (c != client_conns_.end()) {
541
72.1M
    *conn = (*c).second;
542
72.1M
    return Status::OK();
543
72.1M
  }
544
545
3.03M
  if (HasReactorStartedClosing(state_.load(std::memory_order_acquire))) {
546
0
    return ServiceUnavailableError();
547
0
  }
548
549
  // No connection to this remote. Need to create one.
550
18.4E
  VLOG_WITH_PREFIX(2) << "FindOrStartConnection: creating new connection for "
551
18.4E
                      << conn_id.ToString();
552
553
  // Create a new socket and start connecting to the remote.
554
3.03M
  auto sock = 
VERIFY_RESULT3.02M
(CreateClientSocket(conn_id.remote()));3.02M
555
3.02M
  if (messenger_->has_outbound_ip_base_.load(std::memory_order_acquire) &&
556
3.02M
      
!messenger_->test_outbound_ip_base_.is_unspecified()27.5k
) {
557
27.5k
    auto address_bytes(messenger_->test_outbound_ip_base_.to_v4().to_bytes());
558
    // Use different addresses for public/private endpoints.
559
    // Private addresses are even, and public are odd.
560
    // So if base address is "private" and destination address is "public" we will modify
561
    // originating address to be "public" also.
562
27.5k
    address_bytes[3] |= conn_id.remote().address().to_v4().to_bytes()[3] & 1;
563
27.5k
    boost::asio::ip::address_v4 outbound_address(address_bytes);
564
27.5k
    auto status = sock.SetReuseAddr(true);
565
27.6k
    if (
status.ok()27.5k
) {
566
27.6k
      status = sock.Bind(Endpoint(outbound_address, 0));
567
27.6k
    }
568
18.4E
    LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Bind " << outbound_address << " failed: "
569
18.4E
                                              << status;
570
3.00M
  } else if (FLAGS_local_ip_for_outbound_sockets.empty()) {
571
2.46M
    auto outbound_address = conn_id.remote().address().is_v6()
572
2.46M
        ? 
messenger_->outbound_address_v6()1
573
2.46M
        : 
messenger_->outbound_address_v4()2.46M
;
574
2.46M
    if (!outbound_address.is_unspecified()) {
575
2.41M
      auto status = sock.SetReuseAddr(true);
576
2.41M
      if (
status.ok()2.41M
) {
577
2.41M
        status = sock.Bind(Endpoint(outbound_address, 0));
578
2.41M
      }
579
18.4E
      LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Bind " << outbound_address << " failed: "
580
18.4E
                                                << status;
581
2.41M
    }
582
2.46M
  }
583
584
3.02M
  if (FLAGS_socket_receive_buffer_size) {
585
0
    WARN_NOT_OK(sock.SetReceiveBufferSize(FLAGS_socket_receive_buffer_size),
586
0
                "Set receive buffer size failed: ");
587
0
  }
588
589
3.02M
  auto receive_buffer_size = PatchReceiveBufferSize(VERIFY_RESULT(sock.GetReceiveBufferSize()));
590
591
3.02M
  auto stream = VERIFY_RESULT(CreateStream(
592
3.02M
      messenger_->stream_factories_, conn_id.protocol(),
593
3.02M
      StreamCreateData {
594
3.02M
        .remote = conn_id.remote(),
595
3.02M
        .remote_hostname = hostname,
596
3.02M
        .socket = &sock,
597
3.02M
        .receive_buffer_size = receive_buffer_size,
598
3.02M
        .mem_tracker = messenger_->connection_context_factory_->buffer_tracker(),
599
3.02M
        .metric_entity = messenger_->metric_entity(),
600
3.02M
      }));
601
0
  auto context = messenger_->connection_context_factory_->Create(receive_buffer_size);
602
603
  // Register the new connection in our map.
604
3.02M
  auto connection = std::make_shared<Connection>(
605
3.02M
      this,
606
3.02M
      std::move(stream),
607
3.02M
      ConnectionDirection::CLIENT,
608
3.02M
      messenger()->rpc_metrics().get(),
609
3.02M
      std::move(context));
610
611
3.02M
  RETURN_NOT_OK(connection->Start(&loop_));
612
613
  // Insert into the client connection map to avoid duplicate connection requests.
614
3.02M
  CHECK(client_conns_.emplace(conn_id, connection).second);
615
616
3.02M
  conn->swap(connection);
617
3.02M
  return Status::OK();
618
3.02M
}
619
620
namespace {
621
622
54.6k
void ShutdownIfRemoteAddressIs(const ConnectionPtr& conn, const IpAddress& address) {
623
54.6k
  Endpoint peer = conn->remote();
624
625
54.6k
  if (peer.address() != address) {
626
52.3k
    return;
627
52.3k
  }
628
629
2.28k
  conn->Close();
630
2.28k
  LOG(INFO) << "Dropped connection: " << conn->ToString();
631
2.28k
}
632
633
} // namespace
634
635
0
void Reactor::DropWithRemoteAddress(const IpAddress& address) {
636
0
  DropIncomingWithRemoteAddress(address);
637
0
  DropOutgoingWithRemoteAddress(address);
638
0
}
639
640
347k
void Reactor::DropIncomingWithRemoteAddress(const IpAddress& address) {
641
347k
  DCHECK(IsCurrentThread());
642
643
347k
  
VLOG_WITH_PREFIX317
(1) << "Dropping Incoming connections from " << address317
;
644
347k
  for (auto& conn : server_conns_) {
645
2
    ShutdownIfRemoteAddressIs(conn, address);
646
2
  }
647
347k
}
648
649
347k
void Reactor::DropOutgoingWithRemoteAddress(const IpAddress& address) {
650
347k
  DCHECK(IsCurrentThread());
651
18.4E
  VLOG_WITH_PREFIX(1) << "Dropping Outgoing connections to " << address;
652
347k
  for (auto& pair : client_conns_) {
653
54.6k
    ShutdownIfRemoteAddressIs(pair.second, address);
654
54.6k
  }
655
347k
}
656
657
3.08M
void Reactor::DestroyConnection(Connection *conn, const Status &conn_status) {
658
3.08M
  DCHECK(IsCurrentThread());
659
660
3.08M
  
VLOG_WITH_PREFIX10.7k
(3) << "DestroyConnection(" << conn->ToString() << ", " << conn_status.ToString()
661
10.7k
                      << ")";
662
663
3.08M
  ConnectionPtr retained_conn = conn->shared_from_this();
664
3.08M
  conn->Shutdown(conn_status);
665
666
  // Unlink connection from lists.
667
3.08M
  if (conn->direction() == ConnectionDirection::CLIENT) {
668
2.88M
    bool erased = false;
669
25.9M
    for (int idx = 0; idx < num_connections_to_server_; 
idx++23.0M
) {
670
23.0M
      auto it = client_conns_.find(ConnectionId(conn->remote(), idx, conn->protocol()));
671
23.0M
      if (it != client_conns_.end() && 
it->second.get() == conn2.89M
) {
672
2.87M
        client_conns_.erase(it);
673
2.87M
        erased = true;
674
2.87M
      }
675
23.0M
    }
676
2.88M
    if (!erased) {
677
0
      LOG_WITH_PREFIX(WARNING) << "Looking for " << conn->ToString();
678
0
      for (auto &p : client_conns_) {
679
0
        LOG_WITH_PREFIX(WARNING) << "  Client connection: " << p.first.ToString() << ", "
680
0
                     << p.second->ToString();
681
0
      }
682
0
    }
683
18.4E
    CHECK(erased) << "Couldn't find connection for any index to " << conn->ToString();
684
2.88M
  } else 
if (200k
conn->direction() == ConnectionDirection::SERVER200k
) {
685
198k
    auto it = server_conns_.begin();
686
408k
    while (
it != server_conns_.end()408k
) {
687
408k
      if ((*it).get() == conn) {
688
198k
        server_conns_.erase(it);
689
198k
        break;
690
198k
      }
691
209k
      ++it;
692
209k
    }
693
198k
  }
694
695
3.08M
  ShutdownConnection(retained_conn);
696
3.08M
}
697
698
70.0M
void Reactor::ProcessOutboundQueue() {
699
70.0M
  CHECK
(processing_outbound_queue_.empty()) << yb::ToString(processing_outbound_queue_)71.4k
;
700
70.0M
  {
701
70.0M
    std::lock_guard<simple_spinlock> lock(outbound_queue_lock_);
702
70.0M
    outbound_queue_.swap(processing_outbound_queue_);
703
70.0M
  }
704
70.0M
  if (processing_outbound_queue_.empty()) {
705
0
    return;
706
0
  }
707
708
70.0M
  processing_connections_.reserve(processing_outbound_queue_.size());
709
75.3M
  for (auto& call : processing_outbound_queue_) {
710
75.3M
    auto conn = AssignOutboundCall(call);
711
75.3M
    processing_connections_.push_back(std::move(conn));
712
75.3M
  }
713
70.0M
  processing_outbound_queue_.clear();
714
715
70.0M
  std::sort(processing_connections_.begin(), processing_connections_.end());
716
70.0M
  auto new_end = std::unique(processing_connections_.begin(), processing_connections_.end());
717
70.0M
  processing_connections_.erase(new_end, processing_connections_.end());
718
74.3M
  for (auto& conn : processing_connections_) {
719
74.3M
    if (conn) {
720
74.1M
      conn->OutboundQueued();
721
74.1M
    }
722
74.3M
  }
723
70.0M
  processing_connections_.clear();
724
70.0M
}
725
726
75.4M
void Reactor::QueueOutboundCall(OutboundCallPtr call) {
727
75.4M
  
DVLOG_WITH_PREFIX91.4k
(3) << "Queueing outbound call "
728
91.4k
                       << call->ToString() << " to remote " << call->conn_id().remote();
729
730
75.4M
  bool was_empty = false;
731
75.4M
  bool closing = false;
732
75.4M
  {
733
75.4M
    std::lock_guard<simple_spinlock> lock(outbound_queue_lock_);
734
75.5M
    if (
!outbound_queue_stopped_75.4M
) {
735
75.5M
      was_empty = outbound_queue_.empty();
736
75.5M
      outbound_queue_.push_back(call);
737
18.4E
    } else {
738
18.4E
      closing = true;
739
18.4E
    }
740
75.4M
  }
741
75.4M
  if (closing) {
742
10
    call->Transferred(AbortedError(), nullptr /* conn */);
743
10
    return;
744
10
  }
745
75.4M
  if (was_empty) {
746
70.1M
    auto scheduled = ScheduleReactorTask(process_outbound_queue_task_);
747
18.4E
    LOG_IF_WITH_PREFIX(WARNING, !scheduled) << "Failed to schedule process outbound queue task";
748
70.1M
  }
749
75.4M
  TRACE_TO(call->trace(), "Scheduled.");
750
75.4M
}
751
752
// ------------------------------------------------------------------------------------------------
753
// ReactorTask class members
754
// ------------------------------------------------------------------------------------------------
755
756
ReactorTask::ReactorTask(const SourceLocation& source_location)
757
12.6M
    : source_location_(source_location) {
758
12.6M
}
759
760
12.1M
ReactorTask::~ReactorTask() {
761
12.1M
}
762
763
891
void ReactorTask::Abort(const Status& abort_status) {
764
891
  if (!abort_called_.exchange(true, std::memory_order_acq_rel)) {
765
891
    DoAbort(abort_status);
766
891
  }
767
891
}
768
769
0
std::string ReactorTask::ToString() const {
770
0
  return Format("{ source: $0 }", source_location_);
771
0
}
772
773
// ------------------------------------------------------------------------------------------------
774
// DelayedTask class members
775
// ------------------------------------------------------------------------------------------------
776
777
DelayedTask::DelayedTask(StatusFunctor func, MonoDelta when, int64_t id,
778
                         const SourceLocation& source_location, Messenger* messenger)
779
    : ReactorTask(source_location),
780
      func_(std::move(func)),
781
      when_(when),
782
      id_(id),
783
10.2M
      messenger_(messenger) {
784
10.2M
}
785
786
10.2M
void DelayedTask::Run(Reactor* reactor) {
787
10.2M
  DCHECK
(reactor_ == nullptr) << "Task has already been scheduled"175
;
788
10.2M
  DCHECK(reactor->IsCurrentThread());
789
790
10.2M
  const auto reactor_state = reactor->state();
791
10.2M
  if (reactor_state != ReactorState::kRunning) {
792
0
    LOG(WARNING) << "Reactor is not running (state: " << reactor_state
793
0
                 << "), not scheduling a delayed task.";
794
0
    return;
795
0
  }
796
797
  // Acquire lock to prevent task from being aborted in the middle of scheduling, in case abort
798
  // will be requested in the middle of scheduling - task will be aborted right after return
799
  // from this method.
800
10.2M
  std::lock_guard<LockType> l(lock_);
801
802
10.2M
  
VLOG_WITH_PREFIX_AND_FUNC111
(4) << "Done: " << done_ << ", when: " << when_111
;
803
804
10.2M
  if (done_) {
805
    // Task has been aborted.
806
0
    return;
807
0
  }
808
809
  // Schedule the task to run later.
810
10.2M
  reactor_ = reactor;
811
10.2M
  timer_.set(reactor->loop_);
812
813
  // timer_ is owned by this task and will be stopped through AbortTask/Abort before this task
814
  // is removed from list of scheduled tasks, so it is safe for timer_ to remember pointer to task.
815
10.2M
  timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this);
816
817
10.2M
  timer_.start(when_.ToSeconds(), // after
818
10.2M
               0);                // repeat
819
10.2M
  reactor_->scheduled_tasks_.insert(shared_from(this));
820
10.2M
}
821
822
10.2M
MarkAsDoneResult DelayedTask::MarkAsDone() {
823
10.2M
  std::lock_guard<LockType> l(lock_);
824
10.2M
  if (done_) {
825
822
    return MarkAsDoneResult::kAlreadyDone;
826
822
  }
827
10.2M
  done_ = true;
828
829
  // ENG-2879: we need to check if reactor_ is nullptr, because that would mean that the task has
830
  // not even started.  AbortTask uses the return value of this function to check if it needs to
831
  // stop the timer, and that is only possible / necessary if Run has been called and reactor_ is
832
  // not nullptr.
833
10.2M
  return reactor_ == nullptr ? 
MarkAsDoneResult::kNotScheduled0
834
10.2M
                             : MarkAsDoneResult::kSuccess;
835
10.2M
}
836
837
1
std::string DelayedTask::ToString() const {
838
1
  return Format("{ id: $0 source: $1 }", id_, source_location_);
839
1
}
840
841
1.75k
void DelayedTask::AbortTask(const Status& abort_status) {
842
1.75k
  auto mark_as_done_result = MarkAsDone();
843
844
1.75k
  
VLOG_WITH_PREFIX_AND_FUNC0
(4)
845
0
      << "Status: " << abort_status << ", " << AsString(mark_as_done_result);
846
847
1.75k
  if (mark_as_done_result == MarkAsDoneResult::kSuccess) {
848
    // Stop the libev timer. We don't need to do this in the kNotScheduled case, because the timer
849
    // has not started in that case.
850
932
    if (reactor_->IsCurrentThread()) {
851
68
      timer_.stop();
852
864
    } else {
853
      // Must call timer_.stop() on the reactor thread. Keep a refcount to prevent this DelayedTask
854
      // from being deleted. If the reactor thread has already been shut down, this will be a no-op.
855
864
      reactor_->ScheduleReactorFunctor([this, holder = shared_from(this)](Reactor* reactor) {
856
864
        timer_.stop();
857
864
      }, SOURCE_LOCATION());
858
864
    }
859
932
  }
860
1.75k
  if (mark_as_done_result != MarkAsDoneResult::kAlreadyDone) {
861
    // We need to call the callback whenever we successfully switch the done_ flag to true, whether
862
    // or not the task has been scheduled.
863
932
    func_(abort_status);
864
932
  }
865
1.75k
}
866
867
890
void DelayedTask::DoAbort(const Status& abort_status) {
868
890
  if (messenger_ != nullptr) {
869
889
    messenger_->RemoveScheduledTask(id_);
870
889
  }
871
872
890
  AbortTask(abort_status);
873
890
}
874
875
10.2M
void DelayedTask::TimerHandler(ev::timer& watcher, int revents) {
876
10.2M
  DCHECK(reactor_->IsCurrentThread());
877
878
10.2M
  auto mark_as_done_result = MarkAsDone();
879
10.2M
  if (mark_as_done_result != MarkAsDoneResult::kSuccess) {
880
0
    DCHECK_EQ(MarkAsDoneResult::kAlreadyDone, mark_as_done_result)
881
0
        << "Can't get kNotScheduled here, because the timer handler is already being called";
882
0
    return;
883
0
  }
884
885
  // Hold shared_ptr, so this task wouldn't be destroyed upon removal below until func_ is called.
886
10.2M
  auto holder = shared_from(this);
887
888
10.2M
  reactor_->scheduled_tasks_.erase(holder);
889
10.2M
  if (messenger_ != nullptr) {
890
10.2M
    messenger_->RemoveScheduledTask(id_);
891
10.2M
  }
892
893
10.2M
  if (EV_ERROR & revents) {
894
0
    std::string msg = "Delayed task got an error in its timer handler";
895
0
    LOG(WARNING) << msg;
896
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Abort";
897
0
    func_(STATUS(Aborted, msg));
898
10.2M
  } else {
899
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Execute";
900
10.2M
    func_(Status::OK());
901
10.2M
  }
902
10.2M
}
903
904
// ------------------------------------------------------------------------------------------------
905
// More Reactor class members
906
// ------------------------------------------------------------------------------------------------
907
908
void Reactor::RegisterInboundSocket(
909
    Socket *socket, size_t receive_buffer_size, const Endpoint& remote,
910
628k
    const ConnectionContextFactoryPtr& factory) {
911
628k
  
VLOG_WITH_PREFIX0
(3) << "New inbound connection to " << remote0
;
912
628k
  receive_buffer_size = PatchReceiveBufferSize(receive_buffer_size);
913
914
628k
  auto stream = CreateStream(
915
628k
      messenger_->stream_factories_, messenger_->listen_protocol_,
916
628k
      StreamCreateData {
917
628k
        .remote = remote,
918
628k
        .remote_hostname = std::string(),
919
628k
        .socket = socket,
920
628k
        .receive_buffer_size = receive_buffer_size,
921
628k
        .mem_tracker = factory->buffer_tracker(),
922
628k
        .metric_entity = messenger_->metric_entity()
923
628k
      });
924
628k
  if (!stream.ok()) {
925
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to create stream for " << remote << ": " << stream.status();
926
0
    return;
927
0
  }
928
628k
  auto conn = std::make_shared<Connection>(this,
929
628k
                                           std::move(*stream),
930
628k
                                           ConnectionDirection::SERVER,
931
628k
                                           messenger()->rpc_metrics().get(),
932
628k
                                           factory->Create(receive_buffer_size));
933
628k
  ScheduleReactorFunctor([conn = std::move(conn)](Reactor* reactor) {
934
622k
    reactor->RegisterConnection(conn);
935
628k
  }, SOURCE_LOCATION());
936
628k
}
937
938
160M
bool Reactor::ScheduleReactorTask(ReactorTaskPtr task, bool schedule_even_closing) {
939
160M
  bool was_empty;
940
160M
  {
941
    // Even though state_ is atomic, we still need to take the lock to make sure state_
942
    // and pending_tasks_mtx_ are being modified in a consistent way.
943
160M
    std::unique_lock<simple_spinlock> pending_lock(pending_tasks_mtx_);
944
160M
    auto state = state_.load(std::memory_order_acquire);
945
160M
    bool failure = schedule_even_closing ? 
state == ReactorState::kClosed3.24k
946
160M
                                         : 
HasReactorStartedClosing(state)160M
;
947
160M
    if (failure) {
948
4
      return false;
949
4
    }
950
160M
    was_empty = pending_tasks_.empty();
951
160M
    pending_tasks_.emplace_back(std::move(task));
952
160M
  }
953
160M
  if (was_empty) {
954
158M
    WakeThread();
955
158M
  }
956
957
160M
  return true;
958
160M
}
959
960
157M
bool Reactor::DrainTaskQueueAndCheckIfClosing() {
961
157M
  CHECK(async_handler_tasks_.empty());
962
963
157M
  std::lock_guard<simple_spinlock> lock(pending_tasks_mtx_);
964
157M
  async_handler_tasks_.swap(pending_tasks_);
965
157M
  return HasReactorStartedClosing(state_.load(std::memory_order_acquire));
966
157M
}
967
968
// Task to call an arbitrary function within the reactor thread.
969
template<class F>
970
class RunFunctionTask : public ReactorTask {
971
 public:
972
  RunFunctionTask(const F& f, const SourceLocation& source_location)
973
23
      : ReactorTask(source_location), function_(f) {}
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::RunFunctionTask(yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1 const&, yb::SourceLocation const&)
Line
Count
Source
973
8
      : ReactorTask(source_location), function_(f) {}
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::RunFunctionTask(yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3 const&, yb::SourceLocation const&)
Line
Count
Source
973
15
      : ReactorTask(source_location), function_(f) {}
974
975
23
  void Run(Reactor *reactor) override {
976
23
    status_ = function_(reactor);
977
23
    latch_.CountDown();
978
23
  }
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::Run(yb::rpc::Reactor*)
Line
Count
Source
975
8
  void Run(Reactor *reactor) override {
976
8
    status_ = function_(reactor);
977
8
    latch_.CountDown();
978
8
  }
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::Run(yb::rpc::Reactor*)
Line
Count
Source
975
15
  void Run(Reactor *reactor) override {
976
15
    status_ = function_(reactor);
977
15
    latch_.CountDown();
978
15
  }
979
980
  // Wait until the function has completed, and return the Status returned by the function.
981
23
  Status Wait() {
982
23
    latch_.Wait();
983
23
    return status_;
984
23
  }
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::Wait()
Line
Count
Source
981
8
  Status Wait() {
982
8
    latch_.Wait();
983
8
    return status_;
984
8
  }
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::Wait()
Line
Count
Source
981
15
  Status Wait() {
982
15
    latch_.Wait();
983
15
    return status_;
984
15
  }
985
986
 private:
987
0
  void DoAbort(const Status &status) override {
988
0
    status_ = status;
989
0
    latch_.CountDown();
990
0
  }
Unexecuted instantiation: reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::DoAbort(yb::Status const&)
Unexecuted instantiation: reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::DoAbort(yb::Status const&)
991
992
  F function_;
993
  Status status_;
994
  CountDownLatch latch_{1};
995
};
996
997
template<class F>
998
23
Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) {
999
23
  auto task = std::make_shared<RunFunctionTask<F>>(f, source_location);
1000
23
  if (!ScheduleReactorTask(task)) {
1001
0
    return ServiceUnavailableError();
1002
0
  }
1003
23
  return task->Wait();
1004
23
}
reactor.cc:yb::Status yb::rpc::Reactor::RunOnReactorThread<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>(yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1 const&, yb::SourceLocation const&)
Line
Count
Source
998
8
Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) {
999
8
  auto task = std::make_shared<RunFunctionTask<F>>(f, source_location);
1000
8
  if (!ScheduleReactorTask(task)) {
1001
0
    return ServiceUnavailableError();
1002
0
  }
1003
8
  return task->Wait();
1004
8
}
reactor.cc:yb::Status yb::rpc::Reactor::RunOnReactorThread<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>(yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3 const&, yb::SourceLocation const&)
Line
Count
Source
998
15
Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) {
999
15
  auto task = std::make_shared<RunFunctionTask<F>>(f, source_location);
1000
15
  if (!ScheduleReactorTask(task)) {
1001
0
    return ServiceUnavailableError();
1002
0
  }
1003
15
  return task->Wait();
1004
15
}
1005
1006
}  // namespace rpc
1007
}  // namespace yb