YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/reactor.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_RPC_REACTOR_H_
33
#define YB_RPC_REACTOR_H_
34
35
#include <pthread.h>
36
#include <stdint.h>
37
#include <sys/types.h>
38
39
#include <atomic>
40
#include <condition_variable>
41
#include <functional>
42
#include <list>
43
#include <map>
44
#include <memory>
45
#include <mutex>
46
#include <set>
47
#include <string>
48
49
#include <boost/intrusive/list.hpp>
50
#include <boost/utility.hpp>
51
#include <ev++.h> // NOLINT
52
#include <gflags/gflags_declare.h>
53
#include <glog/logging.h>
54
55
#include "yb/gutil/bind.h"
56
#include "yb/gutil/integral_types.h"
57
#include "yb/gutil/ref_counted.h"
58
59
#include "yb/rpc/outbound_call.h"
60
61
#include "yb/util/status_fwd.h"
62
#include "yb/util/async_util.h"
63
#include "yb/util/condition_variable.h"
64
#include "yb/util/countdown_latch.h"
65
#include "yb/util/locks.h"
66
#include "yb/util/monotime.h"
67
#include "yb/util/mutex.h"
68
#include "yb/util/net/socket.h"
69
#include "yb/util/shared_lock.h"
70
#include "yb/util/source_location.h"
71
72
namespace yb {
73
namespace rpc {
74
75
// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop.
76
// Otherwise we run into problems because 'select' can't handle connections when more than 1024
77
// file descriptors are open by the process.
78
#if defined(__APPLE__)
79
constexpr unsigned int kDefaultLibEvFlags = ev::KQUEUE;
80
#else
81
constexpr unsigned int kDefaultLibEvFlags = ev::AUTO;
82
#endif
83
84
typedef std::list<ConnectionPtr> ConnectionList;
85
86
class DumpRunningRpcsRequestPB;
87
class DumpRunningRpcsResponsePB;
88
class Messenger;
89
class MessengerBuilder;
90
class Reactor;
91
92
// Simple metrics information from within a reactor.
93
struct ReactorMetrics {
94
  // Number of client RPC connections currently connected.
95
  size_t num_client_connections;
96
  // Number of server RPC connections currently connected.
97
  size_t num_server_connections;
98
};
99
100
// ------------------------------------------------------------------------------------------------
101
// A task which can be enqueued to run on the reactor thread.
102
103
class ReactorTask : public std::enable_shared_from_this<ReactorTask> {
104
 public:
105
  // source_location - location of code that initiated this task.
106
  explicit ReactorTask(const SourceLocation& source_location);
107
108
  ReactorTask(const ReactorTask&) = delete;
109
  void operator=(const ReactorTask&) = delete;
110
111
  // Run the task. 'reactor' is guaranteed to be the current thread.
112
  virtual void Run(Reactor *reactor) = 0;
113
114
  // Abort the task, in the case that the reactor shut down before the task could be processed. This
115
  // may or may not run on the reactor thread itself.  If this is run not on the reactor thread,
116
  // then reactor thread should have already been shut down. It is guaranteed that Abort() will be
117
  // called at most once.
118
  //
119
  // The Reactor guarantees that the Reactor lock is free when this method is called.
120
  void Abort(const Status& abort_status);
121
122
  virtual std::string ToString() const;
123
124
  virtual ~ReactorTask();
125
126
 protected:
127
  const SourceLocation source_location_;
128
129
 private:
130
  // To be overridden by subclasses.
131
1
  virtual void DoAbort(const Status &abort_status) {}
132
133
  // Used to prevent Abort() from being called twice from multiple threads.
134
  std::atomic<bool> abort_called_{false};
135
};
136
137
typedef std::shared_ptr<ReactorTask> ReactorTaskPtr;
138
139
// ------------------------------------------------------------------------------------------------
140
// A task that runs the given user functor on success. Abort is ignored.
141
142
template <class F>
143
class FunctorReactorTask : public ReactorTask {
144
 public:
145
  explicit FunctorReactorTask(const F& f, const SourceLocation& source_location)
146
1.64M
      : ReactorTask(source_location), f_(f) {}
messenger.cc:yb::rpc::FunctorReactorTask<yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0>::FunctorReactorTask(yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0 const&, yb::SourceLocation const&)
Line
Count
Source
146
352k
      : ReactorTask(source_location), f_(f) {}
yb::rpc::FunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> >::FunctorReactorTask(std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> const&, yb::SourceLocation const&)
Line
Count
Source
146
289k
      : ReactorTask(source_location), f_(f) {}
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>::FunctorReactorTask(yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2 const&, yb::SourceLocation const&)
Line
Count
Source
146
370k
      : ReactorTask(source_location), f_(f) {}
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>::FunctorReactorTask(yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5 const&, yb::SourceLocation const&)
Line
Count
Source
146
864
      : ReactorTask(source_location), f_(f) {}
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>::FunctorReactorTask(yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6 const&, yb::SourceLocation const&)
Line
Count
Source
146
628k
      : ReactorTask(source_location), f_(f) {}
rpc_context.cc:yb::rpc::FunctorReactorTask<yb::rpc::RpcContext::CloseConnection()::$_0>::FunctorReactorTask(yb::rpc::RpcContext::CloseConnection()::$_0 const&, yb::SourceLocation const&)
Line
Count
Source
146
5.24k
      : ReactorTask(source_location), f_(f) {}
147
148
71.4M
  void Run(Reactor* reactor) override  {
149
71.4M
    f_(reactor);
150
71.4M
  }
messenger.cc:yb::rpc::FunctorReactorTask<yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0>::Run(yb::rpc::Reactor*)
Line
Count
Source
148
347k
  void Run(Reactor* reactor) override  {
149
347k
    f_(reactor);
150
347k
  }
yb::rpc::FunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> >::Run(yb::rpc::Reactor*)
Line
Count
Source
148
70.1M
  void Run(Reactor* reactor) override  {
149
70.1M
    f_(reactor);
150
70.1M
  }
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>::Run(yb::rpc::Reactor*)
Line
Count
Source
148
364k
  void Run(Reactor* reactor) override  {
149
364k
    f_(reactor);
150
364k
  }
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>::Run(yb::rpc::Reactor*)
Line
Count
Source
148
864
  void Run(Reactor* reactor) override  {
149
864
    f_(reactor);
150
864
  }
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>::Run(yb::rpc::Reactor*)
Line
Count
Source
148
622k
  void Run(Reactor* reactor) override  {
149
622k
    f_(reactor);
150
622k
  }
rpc_context.cc:yb::rpc::FunctorReactorTask<yb::rpc::RpcContext::CloseConnection()::$_0>::Run(yb::rpc::Reactor*)
Line
Count
Source
148
5.30k
  void Run(Reactor* reactor) override  {
149
5.30k
    f_(reactor);
150
5.30k
  }
151
 private:
152
  F f_;
153
};
154
155
template <class F>
156
1.64M
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
1.64M
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
1.64M
}
messenger.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0>(yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0 const&, yb::SourceLocation const&)
Line
Count
Source
156
352k
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
352k
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
352k
}
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> >(std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> const&, yb::SourceLocation const&)
Line
Count
Source
156
289k
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
289k
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
289k
}
reactor.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>(yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2 const&, yb::SourceLocation const&)
Line
Count
Source
156
370k
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
370k
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
370k
}
reactor.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>(yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5 const&, yb::SourceLocation const&)
Line
Count
Source
156
864
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
864
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
864
}
reactor.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>(yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6 const&, yb::SourceLocation const&)
Line
Count
Source
156
628k
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
628k
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
628k
}
rpc_context.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::RpcContext::CloseConnection()::$_0>(yb::rpc::RpcContext::CloseConnection()::$_0 const&, yb::SourceLocation const&)
Line
Count
Source
156
5.12k
ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) {
157
5.12k
  return std::make_shared<FunctorReactorTask<F>>(f, source_location);
158
5.12k
}
159
160
// ------------------------------------------------------------------------------------------------
161
// A task that runs the given user functor on success or abort.
162
163
template <class F>
164
class FunctorReactorTaskWithAbort : public ReactorTask {
165
 public:
166
  FunctorReactorTaskWithAbort(const F& f, const SourceLocation& source_location)
167
3.24k
      : ReactorTask(source_location), f_(f) {}
168
169
3.24k
  void Run(Reactor* reactor) override  {
170
3.24k
    f_(reactor, Status::OK());
171
3.24k
  }
172
173
 private:
174
0
  void DoAbort(const Status &abort_status) override {
175
0
    f_(nullptr, abort_status);
176
0
  }
177
178
  F f_;
179
};
180
181
template <class F>
182
3.24k
ReactorTaskPtr MakeFunctorReactorTaskWithAbort(const F& f, const SourceLocation& source_location) {
183
3.24k
  return std::make_shared<FunctorReactorTaskWithAbort<F>>(f, source_location);
184
3.24k
}
185
186
// ------------------------------------------------------------------------------------------------
187
// A task that runs the user functor if the given weak pointer is still valid by the time the
188
// reactor runs the task.
189
190
template <class F, class Object>
191
class FunctorReactorTaskWithWeakPtr : public ReactorTask {
192
 public:
193
  FunctorReactorTaskWithWeakPtr(const F& f, const std::weak_ptr<Object>& ptr,
194
                                const SourceLocation& source_location)
195
618k
      : ReactorTask(source_location), f_(f), ptr_(ptr) {}
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
195
116
      : ReactorTask(source_location), f_(f), ptr_(ptr) {}
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
195
1.47k
      : ReactorTask(source_location), f_(f), ptr_(ptr) {}
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
195
615k
      : ReactorTask(source_location), f_(f), ptr_(ptr) {}
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
195
1.51k
      : ReactorTask(source_location), f_(f), ptr_(ptr) {}
196
197
78.4M
  void Run(Reactor* reactor) override  {
198
78.4M
    auto shared_ptr = ptr_.lock();
199
78.4M
    if (shared_ptr) {
200
78.4M
      f_(reactor);
201
78.4M
    }
202
78.4M
  }
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::Run(yb::rpc::Reactor*)
Line
Count
Source
197
116
  void Run(Reactor* reactor) override  {
198
116
    auto shared_ptr = ptr_.lock();
199
116
    if (shared_ptr) {
200
116
      f_(reactor);
201
116
    }
202
116
  }
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::Run(yb::rpc::Reactor*)
Line
Count
Source
197
1.47k
  void Run(Reactor* reactor) override  {
198
1.47k
    auto shared_ptr = ptr_.lock();
199
1.47k
    if (shared_ptr) {
200
1.18k
      f_(reactor);
201
1.18k
    }
202
1.47k
  }
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*>, yb::rpc::Connection>::Run(yb::rpc::Reactor*)
Line
Count
Source
197
78.2M
  void Run(Reactor* reactor) override  {
198
78.2M
    auto shared_ptr = ptr_.lock();
199
78.2M
    if (shared_ptr) {
200
78.2M
      f_(reactor);
201
78.2M
    }
202
78.2M
  }
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*>, yb::rpc::Connection>::Run(yb::rpc::Reactor*)
Line
Count
Source
197
209k
  void Run(Reactor* reactor) override  {
198
209k
    auto shared_ptr = ptr_.lock();
199
209k
    if (shared_ptr) {
200
209k
      f_(reactor);
201
209k
    }
202
209k
  }
203
204
 private:
205
  F f_;
206
  std::weak_ptr<Object> ptr_;
207
};
208
209
template <class F, class Object>
210
ReactorTaskPtr MakeFunctorReactorTask(
211
    const F& f, const std::weak_ptr<Object>& ptr,
212
    const SourceLocation& source_location) {
213
  return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location);
214
}
215
216
template <class F, class Object>
217
ReactorTaskPtr MakeFunctorReactorTask(
218
620k
    const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) {
219
620k
  return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location);
220
620k
}
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
218
116
    const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) {
219
116
  return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location);
220
116
}
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
218
1.47k
    const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) {
219
1.47k
  return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location);
220
1.47k
}
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
218
616k
    const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) {
219
616k
  return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location);
220
616k
}
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&)
Line
Count
Source
218
1.51k
    const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) {
219
1.51k
  return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location);
220
1.51k
}
221
222
YB_DEFINE_ENUM(MarkAsDoneResult,
223
    // Successfully marked as done with this call to MarkAsDone.
224
    (kSuccess)
225
    // Task already marked as done by another caller to MarkAsDone.
226
    (kAlreadyDone)
227
    // We've switched the done_ flag to true, but the task is not scheduled on a reactor thread and
228
    // reactor_ is nullptr. Next calls to MarkAsDone will return kAlreadyDone.
229
    (kNotScheduled))
230
231
// A ReactorTask that is scheduled to run at some point in the future.
232
//
233
// Semantically it works like RunFunctionTask with a few key differences:
234
// 1. The user function is called during Abort. Put another way, the user function is _always_
235
//    invoked, even during reactor shutdown.
236
// 2. To differentiate between Abort and non-Abort, the user function receives a Status as its first
237
//    argument.
238
class DelayedTask : public ReactorTask {
239
 public:
240
  DelayedTask(StatusFunctor func, MonoDelta when, int64_t id,
241
              const SourceLocation& source_location, Messenger* messenger);
242
243
  // Schedules the task for running later but doesn't actually run it yet.
244
  void Run(Reactor* reactor) override;
245
246
  // Could be called from non-reactor thread even before reactor thread shutdown.
247
  void AbortTask(const Status& abort_status);
248
249
  std::string ToString() const override;
250
251
1
  std::string LogPrefix() const {
252
1
    return ToString() + ": ";
253
1
  }
254
255
 private:
256
  void DoAbort(const Status& abort_status) override;
257
258
  // Set done_ to true if not set and return true. If done_ is already set, return false.
259
  MarkAsDoneResult MarkAsDone();
260
261
  // libev callback for when the registered timer fires.
262
  void TimerHandler(ev::timer& rwatcher, int revents); // NOLINT
263
264
  // User function to invoke when timer fires or when task is aborted.
265
  StatusFunctor func_;
266
267
  // Delay to apply to this task.
268
  const MonoDelta when_;
269
270
  // Link back to registering reactor thread.
271
  Reactor* reactor_ = nullptr;
272
273
  // libev timer. Set when Run() is invoked.
274
  ev::timer timer_;
275
276
  // This task's id.
277
  const int64_t id_;
278
279
  Messenger* const messenger_;
280
281
  // Set to true whenever a Run or Abort methods are called.
282
  // Guarded by lock_.
283
  bool done_ = false;
284
285
  typedef simple_spinlock LockType;
286
  mutable LockType lock_;
287
};
288
289
typedef std::vector<ReactorTaskPtr> ReactorTasks;
290
291
YB_DEFINE_ENUM(ReactorState, (kRunning)(kClosing)(kClosed));
292
293
class Reactor {
294
 public:
295
  // Client-side connection map.
296
  typedef std::unordered_map<const ConnectionId, ConnectionPtr, ConnectionIdHash> ConnectionMap;
297
298
  Reactor(Messenger* messenger,
299
          int index,
300
          const MessengerBuilder &bld);
301
302
  ~Reactor();
303
304
  Reactor(const Reactor&) = delete;
305
  void operator=(const Reactor&) = delete;
306
307
  // This may be called from another thread.
308
  CHECKED_STATUS Init();
309
310
  // Add any connections on this reactor thread into the given status dump.
311
  // May be called from another thread.
312
  CHECKED_STATUS DumpRunningRpcs(
313
      const DumpRunningRpcsRequestPB& req, DumpRunningRpcsResponsePB* resp);
314
315
  // Block until the Reactor thread is shut down
316
  //
317
  // This must be called from another thread.
318
  void Shutdown();
319
320
  // This method is thread-safe.
321
  void WakeThread();
322
323
  // libev callback for handling async notifications in our epoll thread.
324
  void AsyncHandler(ev::async &watcher, int revents); // NOLINT
325
326
  // libev callback for handling timer events in our epoll thread.
327
  void TimerHandler(ev::timer &watcher, int revents); // NOLINT
328
329
  // This may be called from another thread.
330
0
  const std::string &name() const { return name_; }
331
332
166k
  const std::string& LogPrefix() { return log_prefix_; }
333
334
87.5M
  Messenger *messenger() const { return messenger_; }
335
336
1.64G
  CoarseTimePoint cur_time() const { return cur_time_; }
337
338
  // Drop all connections with remote address. Used in tests with broken connectivity.
339
  void DropIncomingWithRemoteAddress(const IpAddress& address);
340
  void DropOutgoingWithRemoteAddress(const IpAddress& address);
341
  void DropWithRemoteAddress(const IpAddress& address);
342
343
  // Return true if this reactor thread is the thread currently
344
  // running. Should be used in DCHECK assertions.
345
  bool IsCurrentThread() const;
346
347
  // Return true if this reactor thread is the thread currently running, or the reactor is closing.
348
  // This is the condition under which the Abort method can be called for tasks. Should be used in
349
  // DCHECK assertions.
350
  bool IsCurrentThreadOrStartedClosing() const;
351
352
  // Shut down the given connection, removing it from the connection tracking
353
  // structures of this reactor.
354
  //
355
  // The connection is not explicitly deleted -- shared_ptr reference counting
356
  // may hold on to the object after this, but callers should assume that it
357
  // _may_ be deleted by this call.
358
  void DestroyConnection(Connection *conn, const Status &conn_status);
359
360
  // Queue a new call to be sent. If the reactor is already shut down, marks
361
  // the call as failed.
362
  void QueueOutboundCall(OutboundCallPtr call);
363
364
  // Collect metrics.
365
  // Must be called from the reactor thread.
366
  CHECKED_STATUS GetMetrics(ReactorMetrics *metrics);
367
368
  void Join();
369
370
  // Queues a server event on all the connections, such that every client receives it.
371
  void QueueEventOnAllConnections(
372
      ServerEventListPtr server_event, const SourceLocation& source_location);
373
374
  // Queue a new incoming connection. Takes ownership of the underlying fd from
375
  // 'socket', but not the Socket object itself.
376
  // If the reactor is already shut down, takes care of closing the socket.
377
  void RegisterInboundSocket(
378
      Socket *socket, size_t receive_buffer_size, const Endpoint& remote,
379
      const ConnectionContextFactoryPtr& factory);
380
381
  // Schedule the given task's Run() method to be called on the reactor thread. If the reactor shuts
382
  // down before it is run, the Abort method will be called.
383
  // Returns true if task was scheduled.
384
160M
  MUST_USE_RESULT bool ScheduleReactorTask(ReactorTaskPtr task) {
385
160M
    return ScheduleReactorTask(std::move(task), false /* schedule_even_closing */);
386
160M
  }
387
388
  template<class F>
389
1.00M
  bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) {
390
1.00M
    return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location));
391
1.00M
  }
reactor.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>(yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2 const&, yb::SourceLocation const&)
Line
Count
Source
389
370k
  bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) {
390
370k
    return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location));
391
370k
  }
reactor.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>(yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5 const&, yb::SourceLocation const&)
Line
Count
Source
389
864
  bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) {
390
864
    return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location));
391
864
  }
reactor.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>(yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6 const&, yb::SourceLocation const&)
Line
Count
Source
389
628k
  bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) {
390
628k
    return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location));
391
628k
  }
rpc_context.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::RpcContext::CloseConnection()::$_0>(yb::rpc::RpcContext::CloseConnection()::$_0 const&, yb::SourceLocation const&)
Line
Count
Source
389
5.14k
  bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) {
390
5.14k
    return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location));
391
5.14k
  }
392
393
10.2M
  ReactorState state() {
394
10.2M
    return state_.load(std::memory_order_acquire);
395
10.2M
  }
396
397
 private:
398
  friend class Connection;
399
  friend class AssignOutboundCallTask;
400
  friend class DelayedTask;
401
402
  // Run the main event loop of the reactor.
403
  void RunThread();
404
405
  MUST_USE_RESULT bool ScheduleReactorTask(ReactorTaskPtr task, bool schedule_even_closing);
406
407
  // Find or create a new connection to the given remote.
408
  // If such a connection already exists, returns that, otherwise creates a new one.
409
  // May return a bad Status if the connect() call fails.
410
  // The resulting connection object is managed internally by the reactor thread.
411
  // Deadline specifies latest time allowed for initializing the connection.
412
  CHECKED_STATUS FindOrStartConnection(const ConnectionId &conn_id,
413
                                       const std::string& hostname,
414
                                       const MonoTime &deadline,
415
                                       ConnectionPtr* conn);
416
417
  // Scan any open connections for idle ones that have been idle longer than
418
  // connection_keepalive_time_
419
  void ScanIdleConnections();
420
421
  // Assign a new outbound call to the appropriate connection object.
422
  // If this fails, the call is marked failed and completed.
423
  ConnectionPtr AssignOutboundCall(const OutboundCallPtr &call);
424
425
  // Register a new connection.
426
  void RegisterConnection(const ConnectionPtr& conn);
427
428
  // Actually perform shutdown of the thread, tearing down any connections,
429
  // etc. This is called from within the thread.
430
  void ShutdownInternal();
431
432
  void ProcessOutboundQueue();
433
434
  void CheckReadyToStop();
435
436
  // Drains the pending_tasks_ queue into async_handler_tasks_. Returns true if the reactor is
437
  // closing.
438
  bool DrainTaskQueueAndCheckIfClosing();
439
440
  template<class F>
441
  CHECKED_STATUS RunOnReactorThread(const F& f, const SourceLocation& source_location);
442
443
  void ShutdownConnection(const ConnectionPtr& conn);
444
445
  // parent messenger
446
  Messenger* const messenger_;
447
448
  const std::string name_;
449
450
  const std::string log_prefix_;
451
452
  mutable simple_spinlock pending_tasks_mtx_;
453
454
  // Reactor status, mostly used when shutting down. Guarded by pending_tasks_mtx_, but also read
455
  // without a lock for sanity checking.
456
  std::atomic<ReactorState> state_{ReactorState::kRunning};
457
458
  // This mutex is used to make sure that multiple threads that end up running Abort() in case the
459
  // reactor has already shut down will not have data races accessing data that is normally only
460
  // accessed from the reactor thread. We are using a recursive mutex because Abort() could try to
461
  // submit another reactor task, which will result in Abort() being called on that other task
462
  // as well.
463
  std::recursive_mutex final_abort_mutex_;
464
465
  // Tasks to be run within the reactor thread.
466
  // Guarded by pending_tasks_mtx_.
467
  ReactorTasks pending_tasks_;
468
469
  scoped_refptr<yb::Thread> thread_;
470
471
  // our epoll object (or kqueue, etc).
472
  ev::dynamic_loop loop_;
473
474
  // Used by other threads to notify the reactor thread
475
  ev::async async_;
476
477
  // Handles the periodic timer.
478
  ev::timer timer_;
479
480
  // Scheduled (but not yet run) delayed tasks.
481
  std::set<std::shared_ptr<DelayedTask>> scheduled_tasks_;
482
483
  ReactorTasks async_handler_tasks_;
484
485
  // The current monotonic time.  Updated every coarse_timer_granularity_.
486
  CoarseTimePoint cur_time_;
487
488
  // last time we did TCP timeouts.
489
  CoarseTimePoint last_unused_tcp_scan_;
490
491
  // Map of sockaddrs to Connection objects for outbound (client) connections.
492
  ConnectionMap client_conns_;
493
494
  // List of current connections coming into the server.
495
  ConnectionList server_conns_;
496
497
  // Set of connections that should be completed before we can stop this thread.
498
  std::unordered_set<ConnectionPtr> waiting_conns_;
499
500
  // If a connection has been idle for this much time, it is torn down.
501
  CoarseMonoClock::Duration connection_keepalive_time_;
502
503
  // Scan for idle connections on this granularity.
504
  CoarseMonoClock::Duration coarse_timer_granularity_;
505
506
  simple_spinlock outbound_queue_lock_;
507
  bool outbound_queue_stopped_ = false;
508
509
  // We found that we should shut down, but not all connections are ready for it.  Only accessed in
510
  // the reactor thread.
511
  bool stopping_ = false;
512
513
  CoarseTimePoint stop_start_time_;
514
515
  std::vector<OutboundCallPtr> outbound_queue_;
516
517
  // Outbound calls currently being processed. Only accessed on the reactor thread. Could be a local
518
  // variable, but implemented as a member field as an optimization to avoid memory allocation.
519
  std::vector<OutboundCallPtr> processing_outbound_queue_;
520
521
  std::vector<ConnectionPtr> processing_connections_;
522
  ReactorTaskPtr process_outbound_queue_task_;
523
524
  // Number of outbound connections to create per each destination server address.
525
  int num_connections_to_server_;
526
};
527
528
}  // namespace rpc
529
}  // namespace yb
530
531
#endif // YB_RPC_REACTOR_H_