YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/reactor.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/reactor.h"
34
35
#include <netinet/in.h>
36
#include <stdlib.h>
37
#include <sys/types.h>
38
39
#include <functional>
40
#include <map>
41
#include <mutex>
42
#include <set>
43
#include <string>
44
45
#include <boost/preprocessor/cat.hpp>
46
#include <boost/preprocessor/stringize.hpp>
47
#include <ev++.h>
48
#include <glog/logging.h>
49
50
#include "yb/gutil/ref_counted.h"
51
#include "yb/gutil/stringprintf.h"
52
53
#include "yb/rpc/connection.h"
54
#include "yb/rpc/connection_context.h"
55
#include "yb/rpc/messenger.h"
56
#include "yb/rpc/rpc_controller.h"
57
#include "yb/rpc/rpc_introspection.pb.h"
58
#include "yb/rpc/server_event.h"
59
60
#include "yb/util/atomic.h"
61
#include "yb/util/countdown_latch.h"
62
#include "yb/util/errno.h"
63
#include "yb/util/format.h"
64
#include "yb/util/logging.h"
65
#include "yb/util/memory/memory.h"
66
#include "yb/util/metric_entity.h"
67
#include "yb/util/monotime.h"
68
#include "yb/util/net/socket.h"
69
#include "yb/util/scope_exit.h"
70
#include "yb/util/size_literals.h"
71
#include "yb/util/status.h"
72
#include "yb/util/status_format.h"
73
#include "yb/util/status_log.h"
74
#include "yb/util/thread.h"
75
#include "yb/util/thread_restrictions.h"
76
#include "yb/util/trace.h"
77
78
using namespace std::literals;
79
80
DEFINE_uint64(rpc_read_buffer_size, 0,
81
              "RPC connection read buffer size. 0 to auto detect.");
82
DECLARE_string(local_ip_for_outbound_sockets);
83
DECLARE_int32(num_connections_to_server);
84
DECLARE_int32(socket_receive_buffer_size);
85
86
namespace yb {
87
namespace rpc {
88
89
namespace {
90
91
static const char* kShutdownMessage = "Shutdown connection";
92
93
13.5k
const Status& AbortedError() {
94
13.5k
  static Status result = STATUS(Aborted, kShutdownMessage, "" /* msg2 */, Errno(ESHUTDOWN));
95
13.5k
  return result;
96
13.5k
}
97
98
618k
const Status& ServiceUnavailableError() {
99
618k
  static Status result = STATUS(
100
618k
      ServiceUnavailable, kShutdownMessage, "" /* msg2 */, Errno(ESHUTDOWN));
101
618k
  return result;
102
618k
}
103
104
// Callback for libev fatal errors (eg running out of file descriptors).
105
// Unfortunately libev doesn't plumb these back through to the caller, but
106
// instead just expects the callback to abort.
107
//
108
// This implementation is slightly preferable to the built-in one since
109
// it uses a FATAL log message instead of printing to stderr, which might
110
// not end up anywhere useful in a daemonized context.
111
92
void LibevSysErr(const char* msg) noexcept {
112
92
  PLOG(FATAL) << "LibEV fatal error: " << msg;
113
92
}
114
115
12.8k
void DoInitLibEv() {
116
12.8k
  ev::set_syserr_cb(LibevSysErr);
117
12.8k
}
118
119
85.4M
bool HasReactorStartedClosing(ReactorState state) {
120
85.4M
  return state == ReactorState::kClosing || state == ReactorState::kClosed;
121
85.4M
}
122
123
928k
size_t PatchReceiveBufferSize(size_t receive_buffer_size) {
124
928k
  return std::max<size_t>(
125
928k
      64_KB, FLAGS_rpc_read_buffer_size ? FLAGS_rpc_read_buffer_size : receive_buffer_size);
126
928k
}
127
128
} // anonymous namespace
129
130
// ------------------------------------------------------------------------------------------------
131
// Reactor class members
132
// ------------------------------------------------------------------------------------------------
133
134
Reactor::Reactor(Messenger* messenger,
135
                 int index,
136
                 const MessengerBuilder &bld)
137
    : messenger_(messenger),
138
      name_(StringPrintf("%s_R%03d", messenger->name().c_str(), index)),
139
      log_prefix_(name_ + ": "),
140
      loop_(kDefaultLibEvFlags),
141
      cur_time_(CoarseMonoClock::Now()),
142
      last_unused_tcp_scan_(cur_time_),
143
      connection_keepalive_time_(bld.connection_keepalive_time()),
144
      coarse_timer_granularity_(bld.coarse_timer_granularity()),
145
188k
      num_connections_to_server_(bld.num_connections_to_server()) {
146
188k
  static std::once_flag libev_once;
147
188k
  std::call_once(libev_once, DoInitLibEv);
148
149
294
  VLOG_WITH_PREFIX(1) << "Create reactor with keep alive_time: "
150
294
                      << yb::ToString(connection_keepalive_time_)
151
294
                      << ", coarse timer granularity: " << yb::ToString(coarse_timer_granularity_);
152
153
188k
  process_outbound_queue_task_ =
154
188k
      MakeFunctorReactorTask(std::bind(&Reactor::ProcessOutboundQueue, this), SOURCE_LOCATION());
155
188k
}
156
157
13.8k
Reactor::~Reactor() {
158
8
  LOG_IF_WITH_PREFIX(DFATAL, !pending_tasks_.empty())
159
8
      << "Not empty pending tasks when destroyed reactor: " << yb::ToString(pending_tasks_);
160
13.8k
}
161
162
185k
Status Reactor::Init() {
163
1
  DCHECK(thread_.get() == nullptr) << "Already started";
164
0
  DVLOG_WITH_PREFIX(6) << "Called Reactor::Init()";
165
  // Register to get async notifications in our epoll loop.
166
185k
  async_.set(loop_);
167
185k
  async_.set<Reactor, &Reactor::AsyncHandler>(this);
168
185k
  async_.start();
169
170
  // Register the timer watcher.
171
  // The timer is used for closing old TCP connections and applying
172
  // backpressure.
173
185k
  timer_.set(loop_);
174
185k
  timer_.set<Reactor, &Reactor::TimerHandler>(this);
175
185k
  timer_.start(ToSeconds(coarse_timer_granularity_),
176
185k
               ToSeconds(coarse_timer_granularity_));
177
178
  // Create Reactor thread.
179
185k
  const std::string group_name = messenger_->name() + "_reactor";
180
185k
  return yb::Thread::Create(group_name, group_name, &Reactor::RunThread, this, &thread_);
181
185k
}
182
183
13.6k
void Reactor::Shutdown() {
184
13.6k
  ReactorState old_state = ReactorState::kRunning;
185
27.3k
  do {
186
27.3k
    if (state_.compare_exchange_weak(old_state,
187
27.3k
                                     ReactorState::kClosing,
188
13.6k
                                     std::memory_order_acq_rel)) {
189
9
      VLOG_WITH_PREFIX(1) << "shutting down Reactor thread.";
190
13.6k
      WakeThread();
191
13.6k
    }
192
27.3k
  } while (!HasReactorStartedClosing(old_state));
193
194
  // Another thread already switched the state to closing before us.
195
13.6k
}
196
197
618k
void Reactor::ShutdownConnection(const ConnectionPtr& conn) {
198
618k
  DCHECK(IsCurrentThread());
199
200
1.33k
  VLOG_WITH_PREFIX(1) << "shutting down " << conn->ToString();
201
618k
  conn->Shutdown(ServiceUnavailableError());
202
618k
  if (!conn->context().Idle()) {
203
0
    VLOG_WITH_PREFIX(1) << "connection is not idle: " << conn->ToString();
204
337
    std::weak_ptr<Connection> weak_conn(conn);
205
166
    conn->context().ListenIdle([this, weak_conn]() {
206
166
      DCHECK(IsCurrentThreadOrStartedClosing());
207
166
      auto conn = weak_conn.lock();
208
166
      if (conn) {
209
0
        VLOG_WITH_PREFIX(1) << "connection became idle " << conn->ToString();
210
        // The access to waiting_conns_ is safe here, because this code can only be called on the
211
        // reactor thread or when holding final_abort_mutex_ during shutdown.
212
166
        waiting_conns_.erase(conn);
213
166
      }
214
166
    });
215
337
    waiting_conns_.insert(conn);
216
618k
  } else {
217
4.58k
    VLOG_WITH_PREFIX(1) << "connection is idle: " << conn->ToString();
218
618k
  }
219
618k
}
220
221
13.3k
void Reactor::ShutdownInternal() {
222
13.3k
  DCHECK(IsCurrentThread());
223
224
13.3k
  stopping_ = true;
225
13.3k
  stop_start_time_ = CoarseMonoClock::Now();
226
227
  // Tear down any outbound TCP connections.
228
18.4E
  VLOG_WITH_PREFIX(1) << "tearing down outbound TCP connections...";
229
13.3k
  decltype(client_conns_) client_conns = std::move(client_conns_);
230
21.0k
  for (auto& pair : client_conns) {
231
21.0k
    ShutdownConnection(pair.second);
232
21.0k
  }
233
13.3k
  client_conns.clear();
234
235
  // Tear down any inbound TCP connections.
236
18.4E
  VLOG_WITH_PREFIX(1) << "tearing down inbound TCP connections...";
237
736
  for (const ConnectionPtr& conn : server_conns_) {
238
736
    ShutdownConnection(conn);
239
736
  }
240
13.3k
  server_conns_.clear();
241
242
  // Abort any scheduled tasks.
243
  //
244
  // These won't be found in the Reactor's list of pending tasks
245
  // because they've been "run" (that is, they've been scheduled).
246
18.4E
  VLOG_WITH_PREFIX(1) << "aborting scheduled tasks";
247
13.3k
  Status aborted = AbortedError();
248
638
  for (const auto& task : scheduled_tasks_) {
249
638
    task->Abort(aborted);
250
638
  }
251
13.3k
  scheduled_tasks_.clear();
252
253
  // async_handler_tasks_ are the tasks added by ScheduleReactorTask.
254
42
  VLOG_WITH_PREFIX(1) << "aborting async handler tasks";
255
0
  for (const auto& task : async_handler_tasks_) {
256
0
    task->Abort(aborted);
257
0
  }
258
259
18.4E
  VLOG_WITH_PREFIX(1) << "aborting outbound calls";
260
112
  CHECK(processing_outbound_queue_.empty()) << yb::ToString(processing_outbound_queue_);
261
13.3k
  {
262
13.3k
    std::lock_guard<simple_spinlock> lock(outbound_queue_lock_);
263
13.3k
    outbound_queue_stopped_ = true;
264
13.3k
    outbound_queue_.swap(processing_outbound_queue_);
265
13.3k
  }
266
0
  for (auto& call : processing_outbound_queue_) {
267
0
    call->Transferred(aborted, nullptr);
268
0
  }
269
13.3k
  processing_outbound_queue_.clear();
270
13.3k
}
271
272
8
Status Reactor::GetMetrics(ReactorMetrics *metrics) {
273
8
  return RunOnReactorThread([metrics](Reactor* reactor) {
274
8
    metrics->num_client_connections = reactor->client_conns_.size();
275
8
    metrics->num_server_connections = reactor->server_conns_.size();
276
8
    return Status::OK();
277
8
  }, SOURCE_LOCATION());
278
8
}
279
280
13.6k
void Reactor::Join() {
281
13.6k
  auto join_result = ThreadJoiner(thread_.get()).give_up_after(30s).Join();
282
13.6k
  if (join_result.ok()) {
283
13.6k
    return;
284
13.6k
  }
285
18.4E
  if (join_result.IsInvalidArgument()) {
286
0
    LOG_WITH_PREFIX(WARNING) << join_result;
287
0
    return;
288
0
  }
289
18.4E
  LOG_WITH_PREFIX(DFATAL) << "Failed to join Reactor " << thread_->ToString() << ": "
290
18.4E
                          << join_result;
291
  // Fallback to endless join in release mode.
292
18.4E
  thread_->Join();
293
18.4E
}
294
295
void Reactor::QueueEventOnAllConnections(
296
108k
    ServerEventListPtr server_event, const SourceLocation& source_location) {
297
107k
  ScheduleReactorFunctor([server_event = std::move(server_event)](Reactor* reactor) {
298
19.3k
    for (const ConnectionPtr& conn : reactor->server_conns_) {
299
19.3k
      conn->QueueOutboundData(server_event);
300
19.3k
    }
301
107k
  }, source_location);
302
108k
}
303
304
Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
305
15
                                DumpRunningRpcsResponsePB* resp) {
306
15
  return RunOnReactorThread([&req, resp](Reactor* reactor) -> Status {
307
11
    for (const ConnectionPtr& conn : reactor->server_conns_) {
308
11
      RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
309
11
    }
310
15
    for (const auto& entry : reactor->client_conns_) {
311
10
      Connection* conn = entry.second.get();
312
10
      RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
313
10
    }
314
15
    return Status::OK();
315
15
  }, SOURCE_LOCATION());
316
15
}
317
318
41.8M
void Reactor::WakeThread() {
319
41.8M
  async_.send();
320
41.8M
}
321
322
13.5k
void Reactor::CheckReadyToStop() {
323
13.5k
  DCHECK(IsCurrentThread());
324
325
291
  VLOG_WITH_PREFIX(4) << "Check ready to stop: " << thread_->ToString() << ", "
326
291
          << "waiting connections: " << yb::ToString(waiting_conns_);
327
328
1
  if (VLOG_IS_ON(4)) {
329
0
    for (const auto& conn : waiting_conns_) {
330
0
      VLOG_WITH_PREFIX(4) << "Connection: " << conn->ToString() << ", idle=" << conn->Idle()
331
0
                          << ", why: " << conn->ReasonNotIdle();
332
0
    }
333
1
  }
334
335
13.5k
  if (waiting_conns_.empty()) {
336
18.4E
    VLOG_WITH_PREFIX(4) << "Reactor ready to stop, breaking loop: " << this;
337
338
18.4E
    VLOG_WITH_PREFIX(2) << "Marking reactor as closed: " << thread_.get()->ToString();
339
13.1k
    ReactorTasks final_tasks;
340
13.1k
    {
341
13.1k
      std::lock_guard<simple_spinlock> lock(pending_tasks_mtx_);
342
13.1k
      state_.store(ReactorState::kClosed, std::memory_order_release);
343
13.1k
      final_tasks.swap(pending_tasks_);
344
13.1k
    }
345
18.4E
    VLOG_WITH_PREFIX(2) << "Running final pending task aborts: " << thread_.get()->ToString();;
346
0
    for (auto task : final_tasks) {
347
0
      task->Abort(ServiceUnavailableError());
348
0
    }
349
18.4E
    VLOG_WITH_PREFIX(2) << "Breaking reactor loop: " << thread_.get()->ToString();;
350
13.1k
    loop_.break_loop(); // break the epoll loop and terminate the thread
351
13.1k
  }
352
13.5k
}
353
354
// Handle async events.  These events are sent to the reactor by other threads that want to bring
355
// something to our attention, like the fact that we're shutting down, or the fact that there is a
356
// new outbound Transfer ready to send.
357
41.9M
void Reactor::AsyncHandler(ev::async &watcher, int revents) {
358
9.28k
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Events: " << revents;
359
360
41.9M
  DCHECK(IsCurrentThread());
361
362
41.8M
  auto se = ScopeExit([this] {
363
41.8M
    async_handler_tasks_.clear();
364
41.8M
  });
365
366
41.9M
  if (PREDICT_FALSE(DrainTaskQueueAndCheckIfClosing())) {
367
13.5k
    ShutdownInternal();
368
13.5k
    CheckReadyToStop();
369
13.5k
    return;
370
13.5k
  }
371
372
42.9M
  for (const auto &task : async_handler_tasks_) {
373
42.9M
    task->Run(this);
374
42.9M
  }
375
41.8M
}
376
377
314k
void Reactor::RegisterConnection(const ConnectionPtr& conn) {
378
314k
  DCHECK(IsCurrentThread());
379
380
314k
  Status s = conn->Start(&loop_);
381
314k
  if (s.ok()) {
382
314k
    server_conns_.push_back(conn);
383
18.4E
  } else {
384
18.4E
    LOG_WITH_PREFIX(WARNING) << "Failed to start connection: " << conn->ToString() << ": " << s;
385
18.4E
  }
386
314k
}
387
388
19.9M
ConnectionPtr Reactor::AssignOutboundCall(const OutboundCallPtr& call) {
389
19.9M
  DCHECK(IsCurrentThread());
390
19.9M
  ConnectionPtr conn;
391
392
  // TODO: Move call deadline timeout computation into OutboundCall constructor.
393
19.9M
  const MonoDelta &timeout = call->controller()->timeout();
394
19.9M
  MonoTime deadline;
395
19.9M
  if (!timeout.Initialized()) {
396
9.31k
    LOG_WITH_PREFIX(WARNING) << "Client call " << call->remote_method().ToString()
397
9.31k
                 << " has no timeout set for connection id: "
398
9.31k
                 << call->conn_id().ToString();
399
9.31k
    deadline = MonoTime::Max();
400
19.9M
  } else {
401
19.9M
    deadline = MonoTime::Now();
402
19.9M
    deadline.AddDelta(timeout);
403
19.9M
  }
404
405
19.9M
  Status s = FindOrStartConnection(call->conn_id(), call->hostname(), deadline, &conn);
406
19.9M
  if (PREDICT_FALSE(!s.ok())) {
407
3.30k
    call->SetFailed(s);
408
3.30k
    return ConnectionPtr();
409
3.30k
  }
410
411
19.9M
  conn->QueueOutboundCall(call);
412
19.9M
  return conn;
413
19.9M
}
414
415
//
416
// Handles timer events.  The periodic timer:
417
//
418
// 1. updates Reactor::cur_time_
419
// 2. every tcp_conn_timeo_ seconds, close down connections older than
420
//    tcp_conn_timeo_ seconds.
421
//
422
57.1M
void Reactor::TimerHandler(ev::timer &watcher, int revents) {
423
57.1M
  DCHECK(IsCurrentThread());
424
425
57.1M
  if (EV_ERROR & revents) {
426
0
    LOG_WITH_PREFIX(WARNING) << "Reactor got an error in the timer handler.";
427
0
    return;
428
0
  }
429
430
57.1M
  if (stopping_) {
431
91
    CheckReadyToStop();
432
91
    return;
433
91
  }
434
435
57.1M
  auto now = CoarseMonoClock::Now();
436
1.00M
  VLOG_WITH_PREFIX(4) << "timer tick at " << ToSeconds(now.time_since_epoch());
437
57.1M
  cur_time_ = now;
438
439
57.1M
  ScanIdleConnections();
440
57.1M
}
441
442
56.2M
void Reactor::ScanIdleConnections() {
443
56.2M
  DCHECK(IsCurrentThread());
444
56.2M
  if (connection_keepalive_time_ == CoarseMonoClock::Duration::zero()) {
445
18.4E
    VLOG_WITH_PREFIX(3) << "Skipping Idle connections check since connection_keepalive_time_ = 0";
446
10.2M
    return;
447
10.2M
  }
448
449
  // enforce TCP connection timeouts
450
46.0M
  auto c = server_conns_.begin();
451
46.0M
  auto c_end = server_conns_.end();
452
46.0M
  uint64_t timed_out = 0;
453
105M
  for (; c != c_end; ) {
454
59.8M
    const ConnectionPtr& conn = *c;
455
59.8M
    if (!conn->Idle()) {
456
18.4E
      VLOG_WITH_PREFIX(3) << "Connection " << conn->ToString() << " not idle";
457
596k
      ++c; // TODO: clean up this loop
458
596k
      continue;
459
596k
    }
460
461
59.2M
    auto last_activity_time = conn->last_activity_time();
462
59.2M
    auto connection_delta = cur_time_ - last_activity_time;
463
59.2M
    if (connection_delta > connection_keepalive_time_) {
464
5.34k
      conn->Shutdown(STATUS_FORMAT(
465
5.34k
          NetworkError, "Connection timed out after $0", ToSeconds(connection_delta)));
466
5.34k
      LOG_WITH_PREFIX(INFO)
467
5.34k
          << "DEBUG: Closing idle connection: " << conn->ToString()
468
5.34k
          << " - it has been idle for " << ToSeconds(connection_delta) << "s";
469
18.4E
      VLOG(1) << "(delta: " << ToSeconds(connection_delta)
470
18.4E
          << ", current time: " << ToSeconds(cur_time_.time_since_epoch())
471
18.4E
          << ", last activity time: " << ToSeconds(last_activity_time.time_since_epoch()) << ")";
472
5.34k
      server_conns_.erase(c++);
473
5.34k
      ++timed_out;
474
59.2M
    } else {
475
59.2M
      ++c;
476
59.2M
    }
477
59.2M
  }
478
479
  // TODO: above only times out on the server side.
480
  // Clients may want to set their keepalive timeout as well.
481
482
18.4E
  VLOG_IF_WITH_PREFIX(1, timed_out > 0) << "timed out " << timed_out << " TCP connections.";
483
46.0M
}
484
485
480M
bool Reactor::IsCurrentThread() const {
486
480M
  return thread_.get() == yb::Thread::current_thread();
487
480M
}
488
489
24.1M
bool Reactor::IsCurrentThreadOrStartedClosing() const {
490
24.1M
  return thread_.get() == yb::Thread::current_thread() ||
491
0
         HasReactorStartedClosing(state_.load(std::memory_order_acquire));
492
24.1M
}
493
494
185k
void Reactor::RunThread() {
495
185k
  ThreadRestrictions::SetWaitAllowed(false);
496
185k
  ThreadRestrictions::SetIOAllowed(false);
497
14
  DVLOG_WITH_PREFIX(6) << "Calling Reactor::RunThread()...";
498
185k
  loop_.run(/* flags */ 0);
499
172k
  VLOG_WITH_PREFIX(1) << "thread exiting.";
500
185k
}
501
502
namespace {
503
504
634k
Result<Socket> CreateClientSocket(const Endpoint& remote) {
505
634k
  int flags = Socket::FLAG_NONBLOCKING;
506
634k
  if (remote.address().is_v6()) {
507
1
    flags |= Socket::FLAG_IPV6;
508
1
  }
509
634k
  Socket socket;
510
634k
  Status status = socket.Init(flags);
511
634k
  if (status.ok()) {
512
627k
    status = socket.SetNoDelay(true);
513
627k
  }
514
10.1k
  LOG_IF(WARNING, !status.ok()) << "failed to create an "
515
10.1k
      "outbound connection because a new socket could not "
516
10.1k
      "be created: " << status.ToString();
517
634k
  if (!status.ok())
518
3.07k
    return status;
519
631k
  return std::move(socket);
520
631k
}
521
522
template <class... Args>
523
Result<std::unique_ptr<Stream>> CreateStream(
524
934k
    const StreamFactories& factories, const Protocol* protocol, const StreamCreateData& data) {
525
934k
  auto it = factories.find(protocol);
526
934k
  if (it == factories.end()) {
527
0
    return STATUS_FORMAT(NotFound, "Unknown protocol: $0", protocol);
528
0
  }
529
934k
  return it->second->Create(data);
530
934k
}
531
532
} // namespace
533
534
Status Reactor::FindOrStartConnection(const ConnectionId &conn_id,
535
                                      const std::string& hostname,
536
                                      const MonoTime &deadline,
537
19.8M
                                      ConnectionPtr* conn) {
538
19.8M
  DCHECK(IsCurrentThread());
539
19.8M
  auto c = client_conns_.find(conn_id);
540
19.8M
  if (c != client_conns_.end()) {
541
19.2M
    *conn = (*c).second;
542
19.2M
    return Status::OK();
543
19.2M
  }
544
545
618k
  if (HasReactorStartedClosing(state_.load(std::memory_order_acquire))) {
546
0
    return ServiceUnavailableError();
547
0
  }
548
549
  // No connection to this remote. Need to create one.
550
18.4E
  VLOG_WITH_PREFIX(2) << "FindOrStartConnection: creating new connection for "
551
18.4E
                      << conn_id.ToString();
552
553
  // Create a new socket and start connecting to the remote.
554
615k
  auto sock = VERIFY_RESULT(CreateClientSocket(conn_id.remote()));
555
615k
  if (messenger_->has_outbound_ip_base_.load(std::memory_order_acquire) &&
556
22.1k
      !messenger_->test_outbound_ip_base_.is_unspecified()) {
557
22.1k
    auto address_bytes(messenger_->test_outbound_ip_base_.to_v4().to_bytes());
558
    // Use different addresses for public/private endpoints.
559
    // Private addresses are even, and public are odd.
560
    // So if base address is "private" and destination address is "public" we will modify
561
    // originating address to be "public" also.
562
22.1k
    address_bytes[3] |= conn_id.remote().address().to_v4().to_bytes()[3] & 1;
563
22.1k
    boost::asio::ip::address_v4 outbound_address(address_bytes);
564
22.1k
    auto status = sock.SetReuseAddr(true);
565
22.1k
    if (status.ok()) {
566
22.1k
      status = sock.Bind(Endpoint(outbound_address, 0));
567
22.1k
    }
568
18.4E
    LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Bind " << outbound_address << " failed: "
569
18.4E
                                              << status;
570
593k
  } else if (FLAGS_local_ip_for_outbound_sockets.empty()) {
571
134k
    auto outbound_address = conn_id.remote().address().is_v6()
572
1
        ? messenger_->outbound_address_v6()
573
134k
        : messenger_->outbound_address_v4();
574
134k
    if (!outbound_address.is_unspecified()) {
575
88.2k
      auto status = sock.SetReuseAddr(true);
576
89.4k
      if (status.ok()) {
577
89.4k
        status = sock.Bind(Endpoint(outbound_address, 0));
578
89.4k
      }
579
18.4E
      LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Bind " << outbound_address << " failed: "
580
18.4E
                                                << status;
581
88.2k
    }
582
134k
  }
583
584
615k
  if (FLAGS_socket_receive_buffer_size) {
585
0
    WARN_NOT_OK(sock.SetReceiveBufferSize(FLAGS_socket_receive_buffer_size),
586
0
                "Set receive buffer size failed: ");
587
0
  }
588
589
615k
  auto receive_buffer_size = PatchReceiveBufferSize(VERIFY_RESULT(sock.GetReceiveBufferSize()));
590
591
615k
  auto stream = VERIFY_RESULT(CreateStream(
592
615k
      messenger_->stream_factories_, conn_id.protocol(),
593
615k
      StreamCreateData {
594
615k
        .remote = conn_id.remote(),
595
615k
        .remote_hostname = hostname,
596
615k
        .socket = &sock,
597
615k
        .receive_buffer_size = receive_buffer_size,
598
615k
        .mem_tracker = messenger_->connection_context_factory_->buffer_tracker(),
599
615k
        .metric_entity = messenger_->metric_entity(),
600
615k
      }));
601
615k
  auto context = messenger_->connection_context_factory_->Create(receive_buffer_size);
602
603
  // Register the new connection in our map.
604
615k
  auto connection = std::make_shared<Connection>(
605
615k
      this,
606
615k
      std::move(stream),
607
615k
      ConnectionDirection::CLIENT,
608
615k
      messenger()->rpc_metrics().get(),
609
615k
      std::move(context));
610
611
615k
  RETURN_NOT_OK(connection->Start(&loop_));
612
613
  // Insert into the client connection map to avoid duplicate connection requests.
614
615k
  CHECK(client_conns_.emplace(conn_id, connection).second);
615
616
615k
  conn->swap(connection);
617
615k
  return Status::OK();
618
615k
}
619
620
namespace {
621
622
26.0k
void ShutdownIfRemoteAddressIs(const ConnectionPtr& conn, const IpAddress& address) {
623
26.0k
  Endpoint peer = conn->remote();
624
625
26.0k
  if (peer.address() != address) {
626
24.8k
    return;
627
24.8k
  }
628
629
1.17k
  conn->Close();
630
1.17k
  LOG(INFO) << "Dropped connection: " << conn->ToString();
631
1.17k
}
632
633
} // namespace
634
635
0
void Reactor::DropWithRemoteAddress(const IpAddress& address) {
636
0
  DropIncomingWithRemoteAddress(address);
637
0
  DropOutgoingWithRemoteAddress(address);
638
0
}
639
640
178k
void Reactor::DropIncomingWithRemoteAddress(const IpAddress& address) {
641
178k
  DCHECK(IsCurrentThread());
642
643
322
  VLOG_WITH_PREFIX(1) << "Dropping Incoming connections from " << address;
644
2
  for (auto& conn : server_conns_) {
645
2
    ShutdownIfRemoteAddressIs(conn, address);
646
2
  }
647
178k
}
648
649
178k
void Reactor::DropOutgoingWithRemoteAddress(const IpAddress& address) {
650
178k
  DCHECK(IsCurrentThread());
651
18.4E
  VLOG_WITH_PREFIX(1) << "Dropping Outgoing connections to " << address;
652
26.0k
  for (auto& pair : client_conns_) {
653
26.0k
    ShutdownIfRemoteAddressIs(pair.second, address);
654
26.0k
  }
655
178k
}
656
657
597k
void Reactor::DestroyConnection(Connection *conn, const Status &conn_status) {
658
597k
  DCHECK(IsCurrentThread());
659
660
6.54k
  VLOG_WITH_PREFIX(3) << "DestroyConnection(" << conn->ToString() << ", " << conn_status.ToString()
661
6.54k
                      << ")";
662
663
597k
  ConnectionPtr retained_conn = conn->shared_from_this();
664
597k
  conn->Shutdown(conn_status);
665
666
  // Unlink connection from lists.
667
597k
  if (conn->direction() == ConnectionDirection::CLIENT) {
668
461k
    bool erased = false;
669
4.15M
    for (int idx = 0; idx < num_connections_to_server_; idx++) {
670
3.69M
      auto it = client_conns_.find(ConnectionId(conn->remote(), idx, conn->protocol()));
671
3.69M
      if (it != client_conns_.end() && it->second.get() == conn) {
672
461k
        client_conns_.erase(it);
673
461k
        erased = true;
674
461k
      }
675
3.69M
    }
676
461k
    if (!erased) {
677
0
      LOG_WITH_PREFIX(WARNING) << "Looking for " << conn->ToString();
678
0
      for (auto &p : client_conns_) {
679
0
        LOG_WITH_PREFIX(WARNING) << "  Client connection: " << p.first.ToString() << ", "
680
0
                     << p.second->ToString();
681
0
      }
682
0
    }
683
18.4E
    CHECK(erased) << "Couldn't find connection for any index to " << conn->ToString();
684
135k
  } else if (conn->direction() == ConnectionDirection::SERVER) {
685
133k
    auto it = server_conns_.begin();
686
301k
    while (it != server_conns_.end()) {
687
301k
      if ((*it).get() == conn) {
688
134k
        server_conns_.erase(it);
689
134k
        break;
690
134k
      }
691
167k
      ++it;
692
167k
    }
693
133k
  }
694
695
597k
  ShutdownConnection(retained_conn);
696
597k
}
697
698
17.8M
void Reactor::ProcessOutboundQueue() {
699
8.07k
  CHECK(processing_outbound_queue_.empty()) << yb::ToString(processing_outbound_queue_);
700
17.8M
  {
701
17.8M
    std::lock_guard<simple_spinlock> lock(outbound_queue_lock_);
702
17.8M
    outbound_queue_.swap(processing_outbound_queue_);
703
17.8M
  }
704
17.8M
  if (processing_outbound_queue_.empty()) {
705
0
    return;
706
0
  }
707
708
17.8M
  processing_connections_.reserve(processing_outbound_queue_.size());
709
19.9M
  for (auto& call : processing_outbound_queue_) {
710
19.9M
    auto conn = AssignOutboundCall(call);
711
19.9M
    processing_connections_.push_back(std::move(conn));
712
19.9M
  }
713
17.8M
  processing_outbound_queue_.clear();
714
715
17.8M
  std::sort(processing_connections_.begin(), processing_connections_.end());
716
17.8M
  auto new_end = std::unique(processing_connections_.begin(), processing_connections_.end());
717
17.8M
  processing_connections_.erase(new_end, processing_connections_.end());
718
19.4M
  for (auto& conn : processing_connections_) {
719
19.4M
    if (conn) {
720
19.4M
      conn->OutboundQueued();
721
19.4M
    }
722
19.4M
  }
723
17.8M
  processing_connections_.clear();
724
17.8M
}
725
726
19.8M
void Reactor::QueueOutboundCall(OutboundCallPtr call) {
727
29.2k
  DVLOG_WITH_PREFIX(3) << "Queueing outbound call "
728
29.2k
                       << call->ToString() << " to remote " << call->conn_id().remote();
729
730
19.8M
  bool was_empty = false;
731
19.8M
  bool closing = false;
732
19.8M
  {
733
19.8M
    std::lock_guard<simple_spinlock> lock(outbound_queue_lock_);
734
19.9M
    if (!outbound_queue_stopped_) {
735
19.9M
      was_empty = outbound_queue_.empty();
736
19.9M
      outbound_queue_.push_back(call);
737
18.4E
    } else {
738
18.4E
      closing = true;
739
18.4E
    }
740
19.8M
  }
741
19.8M
  if (closing) {
742
15
    call->Transferred(AbortedError(), nullptr /* conn */);
743
15
    return;
744
15
  }
745
19.8M
  if (was_empty) {
746
17.8M
    auto scheduled = ScheduleReactorTask(process_outbound_queue_task_);
747
18.4E
    LOG_IF_WITH_PREFIX(WARNING, !scheduled) << "Failed to schedule process outbound queue task";
748
17.8M
  }
749
19.8M
  TRACE_TO(call->trace(), "Scheduled.");
750
19.8M
}
751
752
// ------------------------------------------------------------------------------------------------
753
// ReactorTask class members
754
// ------------------------------------------------------------------------------------------------
755
756
ReactorTask::ReactorTask(const SourceLocation& source_location)
757
2.15M
    : source_location_(source_location) {
758
2.15M
}
759
760
1.79M
ReactorTask::~ReactorTask() {
761
1.79M
}
762
763
638
void ReactorTask::Abort(const Status& abort_status) {
764
638
  if (!abort_called_.exchange(true, std::memory_order_acq_rel)) {
765
638
    DoAbort(abort_status);
766
638
  }
767
638
}
768
769
0
std::string ReactorTask::ToString() const {
770
0
  return Format("{ source: $0 }", source_location_);
771
0
}
772
773
// ------------------------------------------------------------------------------------------------
774
// DelayedTask class members
775
// ------------------------------------------------------------------------------------------------
776
777
DelayedTask::DelayedTask(StatusFunctor func, MonoDelta when, int64_t id,
778
                         const SourceLocation& source_location, Messenger* messenger)
779
    : ReactorTask(source_location),
780
      func_(std::move(func)),
781
      when_(when),
782
      id_(id),
783
986k
      messenger_(messenger) {
784
986k
}
785
786
986k
void DelayedTask::Run(Reactor* reactor) {
787
17
  DCHECK(reactor_ == nullptr) << "Task has already been scheduled";
788
986k
  DCHECK(reactor->IsCurrentThread());
789
790
986k
  const auto reactor_state = reactor->state();
791
986k
  if (reactor_state != ReactorState::kRunning) {
792
0
    LOG(WARNING) << "Reactor is not running (state: " << reactor_state
793
0
                 << "), not scheduling a delayed task.";
794
0
    return;
795
0
  }
796
797
  // Acquire lock to prevent task from being aborted in the middle of scheduling, in case abort
798
  // will be requested in the middle of scheduling - task will be aborted right after return
799
  // from this method.
800
986k
  std::lock_guard<LockType> l(lock_);
801
802
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Done: " << done_ << ", when: " << when_;
803
804
986k
  if (done_) {
805
    // Task has been aborted.
806
0
    return;
807
0
  }
808
809
  // Schedule the task to run later.
810
986k
  reactor_ = reactor;
811
986k
  timer_.set(reactor->loop_);
812
813
  // timer_ is owned by this task and will be stopped through AbortTask/Abort before this task
814
  // is removed from list of scheduled tasks, so it is safe for timer_ to remember pointer to task.
815
986k
  timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this);
816
817
986k
  timer_.start(when_.ToSeconds(), // after
818
986k
               0);                // repeat
819
986k
  reactor_->scheduled_tasks_.insert(shared_from(this));
820
986k
}
821
822
984k
MarkAsDoneResult DelayedTask::MarkAsDone() {
823
984k
  std::lock_guard<LockType> l(lock_);
824
984k
  if (done_) {
825
581
    return MarkAsDoneResult::kAlreadyDone;
826
581
  }
827
983k
  done_ = true;
828
829
  // ENG-2879: we need to check if reactor_ is nullptr, because that would mean that the task has
830
  // not even started.  AbortTask uses the return value of this function to check if it needs to
831
  // stop the timer, and that is only possible / necessary if Run has been called and reactor_ is
832
  // not nullptr.
833
0
  return reactor_ == nullptr ? MarkAsDoneResult::kNotScheduled
834
983k
                             : MarkAsDoneResult::kSuccess;
835
983k
}
836
837
2
std::string DelayedTask::ToString() const {
838
2
  return Format("{ id: $0 source: $1 }", id_, source_location_);
839
2
}
840
841
1.27k
void DelayedTask::AbortTask(const Status& abort_status) {
842
1.27k
  auto mark_as_done_result = MarkAsDone();
843
844
0
  VLOG_WITH_PREFIX_AND_FUNC(4)
845
0
      << "Status: " << abort_status << ", " << AsString(mark_as_done_result);
846
847
1.27k
  if (mark_as_done_result == MarkAsDoneResult::kSuccess) {
848
    // Stop the libev timer. We don't need to do this in the kNotScheduled case, because the timer
849
    // has not started in that case.
850
690
    if (reactor_->IsCurrentThread()) {
851
57
      timer_.stop();
852
633
    } else {
853
      // Must call timer_.stop() on the reactor thread. Keep a refcount to prevent this DelayedTask
854
      // from being deleted. If the reactor thread has already been shut down, this will be a no-op.
855
633
      reactor_->ScheduleReactorFunctor([this, holder = shared_from(this)](Reactor* reactor) {
856
633
        timer_.stop();
857
633
      }, SOURCE_LOCATION());
858
633
    }
859
690
  }
860
1.27k
  if (mark_as_done_result != MarkAsDoneResult::kAlreadyDone) {
861
    // We need to call the callback whenever we successfully switch the done_ flag to true, whether
862
    // or not the task has been scheduled.
863
690
    func_(abort_status);
864
690
  }
865
1.27k
}
866
867
638
void DelayedTask::DoAbort(const Status& abort_status) {
868
638
  if (messenger_ != nullptr) {
869
637
    messenger_->RemoveScheduledTask(id_);
870
637
  }
871
872
638
  AbortTask(abort_status);
873
638
}
874
875
984k
void DelayedTask::TimerHandler(ev::timer& watcher, int revents) {
876
984k
  DCHECK(reactor_->IsCurrentThread());
877
878
984k
  auto mark_as_done_result = MarkAsDone();
879
984k
  if (mark_as_done_result != MarkAsDoneResult::kSuccess) {
880
0
    DCHECK_EQ(MarkAsDoneResult::kAlreadyDone, mark_as_done_result)
881
0
        << "Can't get kNotScheduled here, because the timer handler is already being called";
882
0
    return;
883
0
  }
884
885
  // Hold shared_ptr, so this task wouldn't be destroyed upon removal below until func_ is called.
886
984k
  auto holder = shared_from(this);
887
888
984k
  reactor_->scheduled_tasks_.erase(holder);
889
984k
  if (messenger_ != nullptr) {
890
983k
    messenger_->RemoveScheduledTask(id_);
891
983k
  }
892
893
984k
  if (EV_ERROR & revents) {
894
0
    std::string msg = "Delayed task got an error in its timer handler";
895
0
    LOG(WARNING) << msg;
896
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Abort";
897
0
    func_(STATUS(Aborted, msg));
898
984k
  } else {
899
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Execute";
900
984k
    func_(Status::OK());
901
984k
  }
902
984k
}
903
904
// ------------------------------------------------------------------------------------------------
905
// More Reactor class members
906
// ------------------------------------------------------------------------------------------------
907
908
void Reactor::RegisterInboundSocket(
909
    Socket *socket, size_t receive_buffer_size, const Endpoint& remote,
910
314k
    const ConnectionContextFactoryPtr& factory) {
911
0
  VLOG_WITH_PREFIX(3) << "New inbound connection to " << remote;
912
314k
  receive_buffer_size = PatchReceiveBufferSize(receive_buffer_size);
913
914
314k
  auto stream = CreateStream(
915
314k
      messenger_->stream_factories_, messenger_->listen_protocol_,
916
314k
      StreamCreateData {
917
314k
        .remote = remote,
918
314k
        .remote_hostname = std::string(),
919
314k
        .socket = socket,
920
314k
        .receive_buffer_size = receive_buffer_size,
921
314k
        .mem_tracker = factory->buffer_tracker(),
922
314k
        .metric_entity = messenger_->metric_entity()
923
314k
      });
924
314k
  if (!stream.ok()) {
925
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to create stream for " << remote << ": " << stream.status();
926
0
    return;
927
0
  }
928
314k
  auto conn = std::make_shared<Connection>(this,
929
314k
                                           std::move(*stream),
930
314k
                                           ConnectionDirection::SERVER,
931
314k
                                           messenger()->rpc_metrics().get(),
932
314k
                                           factory->Create(receive_buffer_size));
933
314k
  ScheduleReactorFunctor([conn = std::move(conn)](Reactor* reactor) {
934
314k
    reactor->RegisterConnection(conn);
935
314k
  }, SOURCE_LOCATION());
936
314k
}
937
938
42.8M
bool Reactor::ScheduleReactorTask(ReactorTaskPtr task, bool schedule_even_closing) {
939
42.8M
  bool was_empty;
940
42.8M
  {
941
    // Even though state_ is atomic, we still need to take the lock to make sure state_
942
    // and pending_tasks_mtx_ are being modified in a consistent way.
943
42.8M
    std::unique_lock<simple_spinlock> pending_lock(pending_tasks_mtx_);
944
42.8M
    auto state = state_.load(std::memory_order_acquire);
945
424
    bool failure = schedule_even_closing ? state == ReactorState::kClosed
946
42.8M
                                         : HasReactorStartedClosing(state);
947
42.8M
    if (failure) {
948
11
      return false;
949
11
    }
950
42.8M
    was_empty = pending_tasks_.empty();
951
42.8M
    pending_tasks_.emplace_back(std::move(task));
952
42.8M
  }
953
42.8M
  if (was_empty) {
954
41.8M
    WakeThread();
955
41.8M
  }
956
957
42.8M
  return true;
958
42.8M
}
959
960
41.8M
bool Reactor::DrainTaskQueueAndCheckIfClosing() {
961
41.8M
  CHECK(async_handler_tasks_.empty());
962
963
41.8M
  std::lock_guard<simple_spinlock> lock(pending_tasks_mtx_);
964
41.8M
  async_handler_tasks_.swap(pending_tasks_);
965
41.8M
  return HasReactorStartedClosing(state_.load(std::memory_order_acquire));
966
41.8M
}
967
968
// Task to call an arbitrary function within the reactor thread.
969
template<class F>
970
class RunFunctionTask : public ReactorTask {
971
 public:
972
  RunFunctionTask(const F& f, const SourceLocation& source_location)
973
23
      : ReactorTask(source_location), function_(f) {}
reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor10GetMetricsEPNS0_14ReactorMetricsEE3$_1EC2ERKS5_RKNS_14SourceLocationE
Line
Count
Source
973
8
      : ReactorTask(source_location), function_(f) {}
reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor15DumpRunningRpcsERKNS0_24DumpRunningRpcsRequestPBEPNS0_25DumpRunningRpcsResponsePBEE3$_3EC2ERKS8_RKNS_14SourceLocationE
Line
Count
Source
973
15
      : ReactorTask(source_location), function_(f) {}
974
975
23
  void Run(Reactor *reactor) override {
976
23
    status_ = function_(reactor);
977
23
    latch_.CountDown();
978
23
  }
reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor10GetMetricsEPNS0_14ReactorMetricsEE3$_1E3RunEPS2_
Line
Count
Source
975
8
  void Run(Reactor *reactor) override {
976
8
    status_ = function_(reactor);
977
8
    latch_.CountDown();
978
8
  }
reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor15DumpRunningRpcsERKNS0_24DumpRunningRpcsRequestPBEPNS0_25DumpRunningRpcsResponsePBEE3$_3E3RunEPS2_
Line
Count
Source
975
15
  void Run(Reactor *reactor) override {
976
15
    status_ = function_(reactor);
977
15
    latch_.CountDown();
978
15
  }
979
980
  // Wait until the function has completed, and return the Status returned by the function.
981
23
  Status Wait() {
982
23
    latch_.Wait();
983
23
    return status_;
984
23
  }
reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor10GetMetricsEPNS0_14ReactorMetricsEE3$_1E4WaitEv
Line
Count
Source
981
8
  Status Wait() {
982
8
    latch_.Wait();
983
8
    return status_;
984
8
  }
reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor15DumpRunningRpcsERKNS0_24DumpRunningRpcsRequestPBEPNS0_25DumpRunningRpcsResponsePBEE3$_3E4WaitEv
Line
Count
Source
981
15
  Status Wait() {
982
15
    latch_.Wait();
983
15
    return status_;
984
15
  }
985
986
 private:
987
0
  void DoAbort(const Status &status) override {
988
0
    status_ = status;
989
0
    latch_.CountDown();
990
0
  }
Unexecuted instantiation: reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor10GetMetricsEPNS0_14ReactorMetricsEE3$_1E7DoAbortERKNS_6StatusE
Unexecuted instantiation: reactor.cc:_ZN2yb3rpc15RunFunctionTaskIZNS0_7Reactor15DumpRunningRpcsERKNS0_24DumpRunningRpcsRequestPBEPNS0_25DumpRunningRpcsResponsePBEE3$_3E7DoAbortERKNS_6StatusE
991
992
  F function_;
993
  Status status_;
994
  CountDownLatch latch_{1};
995
};
996
997
template<class F>
998
23
Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) {
999
23
  auto task = std::make_shared<RunFunctionTask<F>>(f, source_location);
1000
23
  if (!ScheduleReactorTask(task)) {
1001
0
    return ServiceUnavailableError();
1002
0
  }
1003
23
  return task->Wait();
1004
23
}
reactor.cc:_ZN2yb3rpc7Reactor18RunOnReactorThreadIZNS1_10GetMetricsEPNS0_14ReactorMetricsEE3$_1EENS_6StatusERKT_RKNS_14SourceLocationE
Line
Count
Source
998
8
Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) {
999
8
  auto task = std::make_shared<RunFunctionTask<F>>(f, source_location);
1000
8
  if (!ScheduleReactorTask(task)) {
1001
0
    return ServiceUnavailableError();
1002
0
  }
1003
8
  return task->Wait();
1004
8
}
reactor.cc:_ZN2yb3rpc7Reactor18RunOnReactorThreadIZNS1_15DumpRunningRpcsERKNS0_24DumpRunningRpcsRequestPBEPNS0_25DumpRunningRpcsResponsePBEE3$_3EENS_6StatusERKT_RKNS_14SourceLocationE
Line
Count
Source
998
15
Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) {
999
15
  auto task = std::make_shared<RunFunctionTask<F>>(f, source_location);
1000
15
  if (!ScheduleReactorTask(task)) {
1001
0
    return ServiceUnavailableError();
1002
0
  }
1003
15
  return task->Wait();
1004
15
}
1005
1006
}  // namespace rpc
1007
}  // namespace yb