/Users/deen/code/yugabyte-db/src/yb/rpc/reactor.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_RPC_REACTOR_H_ |
33 | | #define YB_RPC_REACTOR_H_ |
34 | | |
35 | | #include <pthread.h> |
36 | | #include <stdint.h> |
37 | | #include <sys/types.h> |
38 | | |
39 | | #include <atomic> |
40 | | #include <condition_variable> |
41 | | #include <functional> |
42 | | #include <list> |
43 | | #include <map> |
44 | | #include <memory> |
45 | | #include <mutex> |
46 | | #include <set> |
47 | | #include <string> |
48 | | |
49 | | #include <boost/intrusive/list.hpp> |
50 | | #include <boost/utility.hpp> |
51 | | #include <ev++.h> // NOLINT |
52 | | #include <gflags/gflags_declare.h> |
53 | | #include <glog/logging.h> |
54 | | |
55 | | #include "yb/gutil/bind.h" |
56 | | #include "yb/gutil/integral_types.h" |
57 | | #include "yb/gutil/ref_counted.h" |
58 | | |
59 | | #include "yb/rpc/outbound_call.h" |
60 | | |
61 | | #include "yb/util/status_fwd.h" |
62 | | #include "yb/util/async_util.h" |
63 | | #include "yb/util/condition_variable.h" |
64 | | #include "yb/util/countdown_latch.h" |
65 | | #include "yb/util/locks.h" |
66 | | #include "yb/util/monotime.h" |
67 | | #include "yb/util/mutex.h" |
68 | | #include "yb/util/net/socket.h" |
69 | | #include "yb/util/shared_lock.h" |
70 | | #include "yb/util/source_location.h" |
71 | | |
72 | | namespace yb { |
73 | | namespace rpc { |
74 | | |
75 | | // When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop. |
76 | | // Otherwise we run into problems because 'select' can't handle connections when more than 1024 |
77 | | // file descriptors are open by the process. |
78 | | #if defined(__APPLE__) |
79 | | constexpr unsigned int kDefaultLibEvFlags = ev::KQUEUE; |
80 | | #else |
81 | | constexpr unsigned int kDefaultLibEvFlags = ev::AUTO; |
82 | | #endif |
83 | | |
84 | | typedef std::list<ConnectionPtr> ConnectionList; |
85 | | |
86 | | class DumpRunningRpcsRequestPB; |
87 | | class DumpRunningRpcsResponsePB; |
88 | | class Messenger; |
89 | | class MessengerBuilder; |
90 | | class Reactor; |
91 | | |
92 | | // Simple metrics information from within a reactor. |
93 | | struct ReactorMetrics { |
94 | | // Number of client RPC connections currently connected. |
95 | | size_t num_client_connections; |
96 | | // Number of server RPC connections currently connected. |
97 | | size_t num_server_connections; |
98 | | }; |
99 | | |
100 | | // ------------------------------------------------------------------------------------------------ |
101 | | // A task which can be enqueued to run on the reactor thread. |
102 | | |
103 | | class ReactorTask : public std::enable_shared_from_this<ReactorTask> { |
104 | | public: |
105 | | // source_location - location of code that initiated this task. |
106 | | explicit ReactorTask(const SourceLocation& source_location); |
107 | | |
108 | | ReactorTask(const ReactorTask&) = delete; |
109 | | void operator=(const ReactorTask&) = delete; |
110 | | |
111 | | // Run the task. 'reactor' is guaranteed to be the current thread. |
112 | | virtual void Run(Reactor *reactor) = 0; |
113 | | |
114 | | // Abort the task, in the case that the reactor shut down before the task could be processed. This |
115 | | // may or may not run on the reactor thread itself. If this is run not on the reactor thread, |
116 | | // then reactor thread should have already been shut down. It is guaranteed that Abort() will be |
117 | | // called at most once. |
118 | | // |
119 | | // The Reactor guarantees that the Reactor lock is free when this method is called. |
120 | | void Abort(const Status& abort_status); |
121 | | |
122 | | virtual std::string ToString() const; |
123 | | |
124 | | virtual ~ReactorTask(); |
125 | | |
126 | | protected: |
127 | | const SourceLocation source_location_; |
128 | | |
129 | | private: |
130 | | // To be overridden by subclasses. |
131 | 1 | virtual void DoAbort(const Status &abort_status) {} |
132 | | |
133 | | // Used to prevent Abort() from being called twice from multiple threads. |
134 | | std::atomic<bool> abort_called_{false}; |
135 | | }; |
136 | | |
137 | | typedef std::shared_ptr<ReactorTask> ReactorTaskPtr; |
138 | | |
139 | | // ------------------------------------------------------------------------------------------------ |
140 | | // A task that runs the given user functor on success. Abort is ignored. |
141 | | |
142 | | template <class F> |
143 | | class FunctorReactorTask : public ReactorTask { |
144 | | public: |
145 | | explicit FunctorReactorTask(const F& f, const SourceLocation& source_location) |
146 | 1.64M | : ReactorTask(source_location), f_(f) {} messenger.cc:yb::rpc::FunctorReactorTask<yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0>::FunctorReactorTask(yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0 const&, yb::SourceLocation const&) Line | Count | Source | 146 | 352k | : ReactorTask(source_location), f_(f) {} |
yb::rpc::FunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> >::FunctorReactorTask(std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> const&, yb::SourceLocation const&) Line | Count | Source | 146 | 289k | : ReactorTask(source_location), f_(f) {} |
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>::FunctorReactorTask(yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2 const&, yb::SourceLocation const&) Line | Count | Source | 146 | 370k | : ReactorTask(source_location), f_(f) {} |
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>::FunctorReactorTask(yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5 const&, yb::SourceLocation const&) Line | Count | Source | 146 | 864 | : ReactorTask(source_location), f_(f) {} |
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>::FunctorReactorTask(yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6 const&, yb::SourceLocation const&) Line | Count | Source | 146 | 628k | : ReactorTask(source_location), f_(f) {} |
rpc_context.cc:yb::rpc::FunctorReactorTask<yb::rpc::RpcContext::CloseConnection()::$_0>::FunctorReactorTask(yb::rpc::RpcContext::CloseConnection()::$_0 const&, yb::SourceLocation const&) Line | Count | Source | 146 | 5.24k | : ReactorTask(source_location), f_(f) {} |
|
147 | | |
148 | 71.4M | void Run(Reactor* reactor) override { |
149 | 71.4M | f_(reactor); |
150 | 71.4M | } messenger.cc:yb::rpc::FunctorReactorTask<yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0>::Run(yb::rpc::Reactor*) Line | Count | Source | 148 | 347k | void Run(Reactor* reactor) override { | 149 | 347k | f_(reactor); | 150 | 347k | } |
yb::rpc::FunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> >::Run(yb::rpc::Reactor*) Line | Count | Source | 148 | 70.1M | void Run(Reactor* reactor) override { | 149 | 70.1M | f_(reactor); | 150 | 70.1M | } |
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>::Run(yb::rpc::Reactor*) Line | Count | Source | 148 | 364k | void Run(Reactor* reactor) override { | 149 | 364k | f_(reactor); | 150 | 364k | } |
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>::Run(yb::rpc::Reactor*) Line | Count | Source | 148 | 864 | void Run(Reactor* reactor) override { | 149 | 864 | f_(reactor); | 150 | 864 | } |
reactor.cc:yb::rpc::FunctorReactorTask<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>::Run(yb::rpc::Reactor*) Line | Count | Source | 148 | 622k | void Run(Reactor* reactor) override { | 149 | 622k | f_(reactor); | 150 | 622k | } |
rpc_context.cc:yb::rpc::FunctorReactorTask<yb::rpc::RpcContext::CloseConnection()::$_0>::Run(yb::rpc::Reactor*) Line | Count | Source | 148 | 5.30k | void Run(Reactor* reactor) override { | 149 | 5.30k | f_(reactor); | 150 | 5.30k | } |
|
151 | | private: |
152 | | F f_; |
153 | | }; |
154 | | |
155 | | template <class F> |
156 | 1.64M | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { |
157 | 1.64M | return std::make_shared<FunctorReactorTask<F>>(f, source_location); |
158 | 1.64M | } messenger.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0>(yb::rpc::Messenger::BreakConnectivity(boost::asio::ip::address const&, bool, bool)::$_0 const&, yb::SourceLocation const&) Line | Count | Source | 156 | 352k | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { | 157 | 352k | return std::make_shared<FunctorReactorTask<F>>(f, source_location); | 158 | 352k | } |
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> >(std::__1::__bind<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*> const&, yb::SourceLocation const&) Line | Count | Source | 156 | 289k | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { | 157 | 289k | return std::make_shared<FunctorReactorTask<F>>(f, source_location); | 158 | 289k | } |
reactor.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>(yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2 const&, yb::SourceLocation const&) Line | Count | Source | 156 | 370k | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { | 157 | 370k | return std::make_shared<FunctorReactorTask<F>>(f, source_location); | 158 | 370k | } |
reactor.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>(yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5 const&, yb::SourceLocation const&) Line | Count | Source | 156 | 864 | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { | 157 | 864 | return std::make_shared<FunctorReactorTask<F>>(f, source_location); | 158 | 864 | } |
reactor.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>(yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6 const&, yb::SourceLocation const&) Line | Count | Source | 156 | 628k | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { | 157 | 628k | return std::make_shared<FunctorReactorTask<F>>(f, source_location); | 158 | 628k | } |
rpc_context.cc:std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<yb::rpc::RpcContext::CloseConnection()::$_0>(yb::rpc::RpcContext::CloseConnection()::$_0 const&, yb::SourceLocation const&) Line | Count | Source | 156 | 5.12k | ReactorTaskPtr MakeFunctorReactorTask(const F& f, const SourceLocation& source_location) { | 157 | 5.12k | return std::make_shared<FunctorReactorTask<F>>(f, source_location); | 158 | 5.12k | } |
|
159 | | |
160 | | // ------------------------------------------------------------------------------------------------ |
161 | | // A task that runs the given user functor on success or abort. |
162 | | |
163 | | template <class F> |
164 | | class FunctorReactorTaskWithAbort : public ReactorTask { |
165 | | public: |
166 | | FunctorReactorTaskWithAbort(const F& f, const SourceLocation& source_location) |
167 | 3.24k | : ReactorTask(source_location), f_(f) {} |
168 | | |
169 | 3.24k | void Run(Reactor* reactor) override { |
170 | 3.24k | f_(reactor, Status::OK()); |
171 | 3.24k | } |
172 | | |
173 | | private: |
174 | 0 | void DoAbort(const Status &abort_status) override { |
175 | 0 | f_(nullptr, abort_status); |
176 | 0 | } |
177 | | |
178 | | F f_; |
179 | | }; |
180 | | |
181 | | template <class F> |
182 | 3.24k | ReactorTaskPtr MakeFunctorReactorTaskWithAbort(const F& f, const SourceLocation& source_location) { |
183 | 3.24k | return std::make_shared<FunctorReactorTaskWithAbort<F>>(f, source_location); |
184 | 3.24k | } |
185 | | |
186 | | // ------------------------------------------------------------------------------------------------ |
187 | | // A task that runs the user functor if the given weak pointer is still valid by the time the |
188 | | // reactor runs the task. |
189 | | |
190 | | template <class F, class Object> |
191 | | class FunctorReactorTaskWithWeakPtr : public ReactorTask { |
192 | | public: |
193 | | FunctorReactorTaskWithWeakPtr(const F& f, const std::weak_ptr<Object>& ptr, |
194 | | const SourceLocation& source_location) |
195 | 618k | : ReactorTask(source_location), f_(f), ptr_(ptr) {} yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 195 | 116 | : ReactorTask(source_location), f_(f), ptr_(ptr) {} |
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 195 | 1.47k | : ReactorTask(source_location), f_(f), ptr_(ptr) {} |
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 195 | 615k | : ReactorTask(source_location), f_(f), ptr_(ptr) {} |
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*>, yb::rpc::Connection>::FunctorReactorTaskWithWeakPtr(std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*> const&, std::__1::weak_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 195 | 1.51k | : ReactorTask(source_location), f_(f), ptr_(ptr) {} |
|
196 | | |
197 | 78.4M | void Run(Reactor* reactor) override { |
198 | 78.4M | auto shared_ptr = ptr_.lock(); |
199 | 78.4M | if (shared_ptr) { |
200 | 78.4M | f_(reactor); |
201 | 78.4M | } |
202 | 78.4M | } yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::Run(yb::rpc::Reactor*) Line | Count | Source | 197 | 116 | void Run(Reactor* reactor) override { | 198 | 116 | auto shared_ptr = ptr_.lock(); | 199 | 116 | if (shared_ptr) { | 200 | 116 | f_(reactor); | 201 | 116 | } | 202 | 116 | } |
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>::Run(yb::rpc::Reactor*) Line | Count | Source | 197 | 1.47k | void Run(Reactor* reactor) override { | 198 | 1.47k | auto shared_ptr = ptr_.lock(); | 199 | 1.47k | if (shared_ptr) { | 200 | 1.18k | f_(reactor); | 201 | 1.18k | } | 202 | 1.47k | } |
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*>, yb::rpc::Connection>::Run(yb::rpc::Reactor*) Line | Count | Source | 197 | 78.2M | void Run(Reactor* reactor) override { | 198 | 78.2M | auto shared_ptr = ptr_.lock(); | 199 | 78.2M | if (shared_ptr) { | 200 | 78.2M | f_(reactor); | 201 | 78.2M | } | 202 | 78.2M | } |
yb::rpc::FunctorReactorTaskWithWeakPtr<std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*>, yb::rpc::Connection>::Run(yb::rpc::Reactor*) Line | Count | Source | 197 | 209k | void Run(Reactor* reactor) override { | 198 | 209k | auto shared_ptr = ptr_.lock(); | 199 | 209k | if (shared_ptr) { | 200 | 209k | f_(reactor); | 201 | 209k | } | 202 | 209k | } |
|
203 | | |
204 | | private: |
205 | | F f_; |
206 | | std::weak_ptr<Object> ptr_; |
207 | | }; |
208 | | |
209 | | template <class F, class Object> |
210 | | ReactorTaskPtr MakeFunctorReactorTask( |
211 | | const F& f, const std::weak_ptr<Object>& ptr, |
212 | | const SourceLocation& source_location) { |
213 | | return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location); |
214 | | } |
215 | | |
216 | | template <class F, class Object> |
217 | | ReactorTaskPtr MakeFunctorReactorTask( |
218 | 620k | const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) { |
219 | 620k | return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location); |
220 | 620k | } std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor*&, yb::rpc::Connection*, yb::Status&> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 218 | 116 | const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) { | 219 | 116 | return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location); | 220 | 116 | } |
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::Reactor::*)(yb::rpc::Connection*, yb::Status const&), yb::rpc::Reactor* const&, yb::rpc::Connection*, yb::Status&> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 218 | 1.47k | const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) { | 219 | 1.47k | return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location); | 220 | 1.47k | } |
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::Connection::*)(), yb::rpc::Connection*> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 218 | 616k | const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) { | 219 | 616k | return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location); | 220 | 616k | } |
std::__1::shared_ptr<yb::rpc::ReactorTask> yb::rpc::MakeFunctorReactorTask<std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*>, yb::rpc::Connection>(std::__1::__bind<void (yb::rpc::ConnectionContextWithQueue::*)(yb::rpc::Connection*), yb::rpc::ConnectionContextWithQueue*, yb::rpc::Connection*> const&, std::__1::shared_ptr<yb::rpc::Connection> const&, yb::SourceLocation const&) Line | Count | Source | 218 | 1.51k | const F& f, const std::shared_ptr<Object>& ptr, const SourceLocation& source_location) { | 219 | 1.51k | return std::make_shared<FunctorReactorTaskWithWeakPtr<F, Object>>(f, ptr, source_location); | 220 | 1.51k | } |
|
221 | | |
222 | | YB_DEFINE_ENUM(MarkAsDoneResult, |
223 | | // Successfully marked as done with this call to MarkAsDone. |
224 | | (kSuccess) |
225 | | // Task already marked as done by another caller to MarkAsDone. |
226 | | (kAlreadyDone) |
227 | | // We've switched the done_ flag to true, but the task is not scheduled on a reactor thread and |
228 | | // reactor_ is nullptr. Next calls to MarkAsDone will return kAlreadyDone. |
229 | | (kNotScheduled)) |
230 | | |
231 | | // A ReactorTask that is scheduled to run at some point in the future. |
232 | | // |
233 | | // Semantically it works like RunFunctionTask with a few key differences: |
234 | | // 1. The user function is called during Abort. Put another way, the user function is _always_ |
235 | | // invoked, even during reactor shutdown. |
236 | | // 2. To differentiate between Abort and non-Abort, the user function receives a Status as its first |
237 | | // argument. |
238 | | class DelayedTask : public ReactorTask { |
239 | | public: |
240 | | DelayedTask(StatusFunctor func, MonoDelta when, int64_t id, |
241 | | const SourceLocation& source_location, Messenger* messenger); |
242 | | |
243 | | // Schedules the task for running later but doesn't actually run it yet. |
244 | | void Run(Reactor* reactor) override; |
245 | | |
246 | | // Could be called from non-reactor thread even before reactor thread shutdown. |
247 | | void AbortTask(const Status& abort_status); |
248 | | |
249 | | std::string ToString() const override; |
250 | | |
251 | 1 | std::string LogPrefix() const { |
252 | 1 | return ToString() + ": "; |
253 | 1 | } |
254 | | |
255 | | private: |
256 | | void DoAbort(const Status& abort_status) override; |
257 | | |
258 | | // Set done_ to true if not set and return true. If done_ is already set, return false. |
259 | | MarkAsDoneResult MarkAsDone(); |
260 | | |
261 | | // libev callback for when the registered timer fires. |
262 | | void TimerHandler(ev::timer& rwatcher, int revents); // NOLINT |
263 | | |
264 | | // User function to invoke when timer fires or when task is aborted. |
265 | | StatusFunctor func_; |
266 | | |
267 | | // Delay to apply to this task. |
268 | | const MonoDelta when_; |
269 | | |
270 | | // Link back to registering reactor thread. |
271 | | Reactor* reactor_ = nullptr; |
272 | | |
273 | | // libev timer. Set when Run() is invoked. |
274 | | ev::timer timer_; |
275 | | |
276 | | // This task's id. |
277 | | const int64_t id_; |
278 | | |
279 | | Messenger* const messenger_; |
280 | | |
281 | | // Set to true whenever a Run or Abort methods are called. |
282 | | // Guarded by lock_. |
283 | | bool done_ = false; |
284 | | |
285 | | typedef simple_spinlock LockType; |
286 | | mutable LockType lock_; |
287 | | }; |
288 | | |
289 | | typedef std::vector<ReactorTaskPtr> ReactorTasks; |
290 | | |
291 | | YB_DEFINE_ENUM(ReactorState, (kRunning)(kClosing)(kClosed)); |
292 | | |
293 | | class Reactor { |
294 | | public: |
295 | | // Client-side connection map. |
296 | | typedef std::unordered_map<const ConnectionId, ConnectionPtr, ConnectionIdHash> ConnectionMap; |
297 | | |
298 | | Reactor(Messenger* messenger, |
299 | | int index, |
300 | | const MessengerBuilder &bld); |
301 | | |
302 | | ~Reactor(); |
303 | | |
304 | | Reactor(const Reactor&) = delete; |
305 | | void operator=(const Reactor&) = delete; |
306 | | |
307 | | // This may be called from another thread. |
308 | | CHECKED_STATUS Init(); |
309 | | |
310 | | // Add any connections on this reactor thread into the given status dump. |
311 | | // May be called from another thread. |
312 | | CHECKED_STATUS DumpRunningRpcs( |
313 | | const DumpRunningRpcsRequestPB& req, DumpRunningRpcsResponsePB* resp); |
314 | | |
315 | | // Block until the Reactor thread is shut down |
316 | | // |
317 | | // This must be called from another thread. |
318 | | void Shutdown(); |
319 | | |
320 | | // This method is thread-safe. |
321 | | void WakeThread(); |
322 | | |
323 | | // libev callback for handling async notifications in our epoll thread. |
324 | | void AsyncHandler(ev::async &watcher, int revents); // NOLINT |
325 | | |
326 | | // libev callback for handling timer events in our epoll thread. |
327 | | void TimerHandler(ev::timer &watcher, int revents); // NOLINT |
328 | | |
329 | | // This may be called from another thread. |
330 | 0 | const std::string &name() const { return name_; } |
331 | | |
332 | 166k | const std::string& LogPrefix() { return log_prefix_; } |
333 | | |
334 | 87.5M | Messenger *messenger() const { return messenger_; } |
335 | | |
336 | 1.64G | CoarseTimePoint cur_time() const { return cur_time_; } |
337 | | |
338 | | // Drop all connections with remote address. Used in tests with broken connectivity. |
339 | | void DropIncomingWithRemoteAddress(const IpAddress& address); |
340 | | void DropOutgoingWithRemoteAddress(const IpAddress& address); |
341 | | void DropWithRemoteAddress(const IpAddress& address); |
342 | | |
343 | | // Return true if this reactor thread is the thread currently |
344 | | // running. Should be used in DCHECK assertions. |
345 | | bool IsCurrentThread() const; |
346 | | |
347 | | // Return true if this reactor thread is the thread currently running, or the reactor is closing. |
348 | | // This is the condition under which the Abort method can be called for tasks. Should be used in |
349 | | // DCHECK assertions. |
350 | | bool IsCurrentThreadOrStartedClosing() const; |
351 | | |
352 | | // Shut down the given connection, removing it from the connection tracking |
353 | | // structures of this reactor. |
354 | | // |
355 | | // The connection is not explicitly deleted -- shared_ptr reference counting |
356 | | // may hold on to the object after this, but callers should assume that it |
357 | | // _may_ be deleted by this call. |
358 | | void DestroyConnection(Connection *conn, const Status &conn_status); |
359 | | |
360 | | // Queue a new call to be sent. If the reactor is already shut down, marks |
361 | | // the call as failed. |
362 | | void QueueOutboundCall(OutboundCallPtr call); |
363 | | |
364 | | // Collect metrics. |
365 | | // Must be called from the reactor thread. |
366 | | CHECKED_STATUS GetMetrics(ReactorMetrics *metrics); |
367 | | |
368 | | void Join(); |
369 | | |
370 | | // Queues a server event on all the connections, such that every client receives it. |
371 | | void QueueEventOnAllConnections( |
372 | | ServerEventListPtr server_event, const SourceLocation& source_location); |
373 | | |
374 | | // Queue a new incoming connection. Takes ownership of the underlying fd from |
375 | | // 'socket', but not the Socket object itself. |
376 | | // If the reactor is already shut down, takes care of closing the socket. |
377 | | void RegisterInboundSocket( |
378 | | Socket *socket, size_t receive_buffer_size, const Endpoint& remote, |
379 | | const ConnectionContextFactoryPtr& factory); |
380 | | |
381 | | // Schedule the given task's Run() method to be called on the reactor thread. If the reactor shuts |
382 | | // down before it is run, the Abort method will be called. |
383 | | // Returns true if task was scheduled. |
384 | 160M | MUST_USE_RESULT bool ScheduleReactorTask(ReactorTaskPtr task) { |
385 | 160M | return ScheduleReactorTask(std::move(task), false /* schedule_even_closing */); |
386 | 160M | } |
387 | | |
388 | | template<class F> |
389 | 1.00M | bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) { |
390 | 1.00M | return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location)); |
391 | 1.00M | } reactor.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2>(yb::rpc::Reactor::QueueEventOnAllConnections(std::__1::shared_ptr<yb::rpc::ServerEventList>, yb::SourceLocation const&)::$_2 const&, yb::SourceLocation const&) Line | Count | Source | 389 | 370k | bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) { | 390 | 370k | return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location)); | 391 | 370k | } |
reactor.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5>(yb::rpc::DelayedTask::AbortTask(yb::Status const&)::$_5 const&, yb::SourceLocation const&) Line | Count | Source | 389 | 864 | bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) { | 390 | 864 | return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location)); | 391 | 864 | } |
reactor.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6>(yb::rpc::Reactor::RegisterInboundSocket(yb::Socket*, unsigned long, boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> const&, std::__1::shared_ptr<yb::rpc::ConnectionContextFactory> const&)::$_6 const&, yb::SourceLocation const&) Line | Count | Source | 389 | 628k | bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) { | 390 | 628k | return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location)); | 391 | 628k | } |
rpc_context.cc:bool yb::rpc::Reactor::ScheduleReactorFunctor<yb::rpc::RpcContext::CloseConnection()::$_0>(yb::rpc::RpcContext::CloseConnection()::$_0 const&, yb::SourceLocation const&) Line | Count | Source | 389 | 5.14k | bool ScheduleReactorFunctor(const F& f, const SourceLocation& source_location) { | 390 | 5.14k | return ScheduleReactorTask(MakeFunctorReactorTask(f, source_location)); | 391 | 5.14k | } |
|
392 | | |
393 | 10.2M | ReactorState state() { |
394 | 10.2M | return state_.load(std::memory_order_acquire); |
395 | 10.2M | } |
396 | | |
397 | | private: |
398 | | friend class Connection; |
399 | | friend class AssignOutboundCallTask; |
400 | | friend class DelayedTask; |
401 | | |
402 | | // Run the main event loop of the reactor. |
403 | | void RunThread(); |
404 | | |
405 | | MUST_USE_RESULT bool ScheduleReactorTask(ReactorTaskPtr task, bool schedule_even_closing); |
406 | | |
407 | | // Find or create a new connection to the given remote. |
408 | | // If such a connection already exists, returns that, otherwise creates a new one. |
409 | | // May return a bad Status if the connect() call fails. |
410 | | // The resulting connection object is managed internally by the reactor thread. |
411 | | // Deadline specifies latest time allowed for initializing the connection. |
412 | | CHECKED_STATUS FindOrStartConnection(const ConnectionId &conn_id, |
413 | | const std::string& hostname, |
414 | | const MonoTime &deadline, |
415 | | ConnectionPtr* conn); |
416 | | |
417 | | // Scan any open connections for idle ones that have been idle longer than |
418 | | // connection_keepalive_time_ |
419 | | void ScanIdleConnections(); |
420 | | |
421 | | // Assign a new outbound call to the appropriate connection object. |
422 | | // If this fails, the call is marked failed and completed. |
423 | | ConnectionPtr AssignOutboundCall(const OutboundCallPtr &call); |
424 | | |
425 | | // Register a new connection. |
426 | | void RegisterConnection(const ConnectionPtr& conn); |
427 | | |
428 | | // Actually perform shutdown of the thread, tearing down any connections, |
429 | | // etc. This is called from within the thread. |
430 | | void ShutdownInternal(); |
431 | | |
432 | | void ProcessOutboundQueue(); |
433 | | |
434 | | void CheckReadyToStop(); |
435 | | |
436 | | // Drains the pending_tasks_ queue into async_handler_tasks_. Returns true if the reactor is |
437 | | // closing. |
438 | | bool DrainTaskQueueAndCheckIfClosing(); |
439 | | |
440 | | template<class F> |
441 | | CHECKED_STATUS RunOnReactorThread(const F& f, const SourceLocation& source_location); |
442 | | |
443 | | void ShutdownConnection(const ConnectionPtr& conn); |
444 | | |
445 | | // parent messenger |
446 | | Messenger* const messenger_; |
447 | | |
448 | | const std::string name_; |
449 | | |
450 | | const std::string log_prefix_; |
451 | | |
452 | | mutable simple_spinlock pending_tasks_mtx_; |
453 | | |
454 | | // Reactor status, mostly used when shutting down. Guarded by pending_tasks_mtx_, but also read |
455 | | // without a lock for sanity checking. |
456 | | std::atomic<ReactorState> state_{ReactorState::kRunning}; |
457 | | |
458 | | // This mutex is used to make sure that multiple threads that end up running Abort() in case the |
459 | | // reactor has already shut down will not have data races accessing data that is normally only |
460 | | // accessed from the reactor thread. We are using a recursive mutex because Abort() could try to |
461 | | // submit another reactor task, which will result in Abort() being called on that other task |
462 | | // as well. |
463 | | std::recursive_mutex final_abort_mutex_; |
464 | | |
465 | | // Tasks to be run within the reactor thread. |
466 | | // Guarded by pending_tasks_mtx_. |
467 | | ReactorTasks pending_tasks_; |
468 | | |
469 | | scoped_refptr<yb::Thread> thread_; |
470 | | |
471 | | // our epoll object (or kqueue, etc). |
472 | | ev::dynamic_loop loop_; |
473 | | |
474 | | // Used by other threads to notify the reactor thread |
475 | | ev::async async_; |
476 | | |
477 | | // Handles the periodic timer. |
478 | | ev::timer timer_; |
479 | | |
480 | | // Scheduled (but not yet run) delayed tasks. |
481 | | std::set<std::shared_ptr<DelayedTask>> scheduled_tasks_; |
482 | | |
483 | | ReactorTasks async_handler_tasks_; |
484 | | |
485 | | // The current monotonic time. Updated every coarse_timer_granularity_. |
486 | | CoarseTimePoint cur_time_; |
487 | | |
488 | | // last time we did TCP timeouts. |
489 | | CoarseTimePoint last_unused_tcp_scan_; |
490 | | |
491 | | // Map of sockaddrs to Connection objects for outbound (client) connections. |
492 | | ConnectionMap client_conns_; |
493 | | |
494 | | // List of current connections coming into the server. |
495 | | ConnectionList server_conns_; |
496 | | |
497 | | // Set of connections that should be completed before we can stop this thread. |
498 | | std::unordered_set<ConnectionPtr> waiting_conns_; |
499 | | |
500 | | // If a connection has been idle for this much time, it is torn down. |
501 | | CoarseMonoClock::Duration connection_keepalive_time_; |
502 | | |
503 | | // Scan for idle connections on this granularity. |
504 | | CoarseMonoClock::Duration coarse_timer_granularity_; |
505 | | |
506 | | simple_spinlock outbound_queue_lock_; |
507 | | bool outbound_queue_stopped_ = false; |
508 | | |
509 | | // We found that we should shut down, but not all connections are ready for it. Only accessed in |
510 | | // the reactor thread. |
511 | | bool stopping_ = false; |
512 | | |
513 | | CoarseTimePoint stop_start_time_; |
514 | | |
515 | | std::vector<OutboundCallPtr> outbound_queue_; |
516 | | |
517 | | // Outbound calls currently being processed. Only accessed on the reactor thread. Could be a local |
518 | | // variable, but implemented as a member field as an optimization to avoid memory allocation. |
519 | | std::vector<OutboundCallPtr> processing_outbound_queue_; |
520 | | |
521 | | std::vector<ConnectionPtr> processing_connections_; |
522 | | ReactorTaskPtr process_outbound_queue_task_; |
523 | | |
524 | | // Number of outbound connections to create per each destination server address. |
525 | | int num_connections_to_server_; |
526 | | }; |
527 | | |
528 | | } // namespace rpc |
529 | | } // namespace yb |
530 | | |
531 | | #endif // YB_RPC_REACTOR_H_ |