/Users/deen/code/yugabyte-db/src/yb/rpc/reactor.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/reactor.h" |
34 | | |
35 | | #include <netinet/in.h> |
36 | | #include <stdlib.h> |
37 | | #include <sys/types.h> |
38 | | |
39 | | #include <functional> |
40 | | #include <map> |
41 | | #include <mutex> |
42 | | #include <set> |
43 | | #include <string> |
44 | | |
45 | | #include <boost/preprocessor/cat.hpp> |
46 | | #include <boost/preprocessor/stringize.hpp> |
47 | | #include <ev++.h> |
48 | | #include <glog/logging.h> |
49 | | |
50 | | #include "yb/gutil/ref_counted.h" |
51 | | #include "yb/gutil/stringprintf.h" |
52 | | |
53 | | #include "yb/rpc/connection.h" |
54 | | #include "yb/rpc/connection_context.h" |
55 | | #include "yb/rpc/messenger.h" |
56 | | #include "yb/rpc/rpc_controller.h" |
57 | | #include "yb/rpc/rpc_introspection.pb.h" |
58 | | #include "yb/rpc/server_event.h" |
59 | | |
60 | | #include "yb/util/atomic.h" |
61 | | #include "yb/util/countdown_latch.h" |
62 | | #include "yb/util/errno.h" |
63 | | #include "yb/util/format.h" |
64 | | #include "yb/util/logging.h" |
65 | | #include "yb/util/memory/memory.h" |
66 | | #include "yb/util/metric_entity.h" |
67 | | #include "yb/util/monotime.h" |
68 | | #include "yb/util/net/socket.h" |
69 | | #include "yb/util/scope_exit.h" |
70 | | #include "yb/util/size_literals.h" |
71 | | #include "yb/util/status.h" |
72 | | #include "yb/util/status_format.h" |
73 | | #include "yb/util/status_log.h" |
74 | | #include "yb/util/thread.h" |
75 | | #include "yb/util/thread_restrictions.h" |
76 | | #include "yb/util/trace.h" |
77 | | |
78 | | using namespace std::literals; |
79 | | |
80 | | DEFINE_uint64(rpc_read_buffer_size, 0, |
81 | | "RPC connection read buffer size. 0 to auto detect."); |
82 | | DECLARE_string(local_ip_for_outbound_sockets); |
83 | | DECLARE_int32(num_connections_to_server); |
84 | | DECLARE_int32(socket_receive_buffer_size); |
85 | | |
86 | | namespace yb { |
87 | | namespace rpc { |
88 | | |
89 | | namespace { |
90 | | |
91 | | static const char* kShutdownMessage = "Shutdown connection"; |
92 | | |
93 | 24.6k | const Status& AbortedError() { |
94 | 24.6k | static Status result = STATUS(Aborted, kShutdownMessage, "" /* msg2 */, Errno(ESHUTDOWN)); |
95 | 24.6k | return result; |
96 | 24.6k | } |
97 | | |
98 | 3.10M | const Status& ServiceUnavailableError() { |
99 | 3.10M | static Status result = STATUS( |
100 | 3.10M | ServiceUnavailable, kShutdownMessage, "" /* msg2 */, Errno(ESHUTDOWN)); |
101 | 3.10M | return result; |
102 | 3.10M | } |
103 | | |
104 | | // Callback for libev fatal errors (eg running out of file descriptors). |
105 | | // Unfortunately libev doesn't plumb these back through to the caller, but |
106 | | // instead just expects the callback to abort. |
107 | | // |
108 | | // This implementation is slightly preferable to the built-in one since |
109 | | // it uses a FATAL log message instead of printing to stderr, which might |
110 | | // not end up anywhere useful in a daemonized context. |
111 | 357 | void LibevSysErr(const char* msg) noexcept { |
112 | 357 | PLOG(FATAL) << "LibEV fatal error: " << msg; |
113 | 357 | } |
114 | | |
115 | 22.5k | void DoInitLibEv() { |
116 | 22.5k | ev::set_syserr_cb(LibevSysErr); |
117 | 22.5k | } |
118 | | |
119 | 321M | bool HasReactorStartedClosing(ReactorState state) { |
120 | 321M | return state == ReactorState::kClosing || state == ReactorState::kClosed321M ; |
121 | 321M | } |
122 | | |
123 | 3.73M | size_t PatchReceiveBufferSize(size_t receive_buffer_size) { |
124 | 3.73M | return std::max<size_t>( |
125 | 3.73M | 64_KB, FLAGS_rpc_read_buffer_size ? FLAGS_rpc_read_buffer_size10 : receive_buffer_size3.73M ); |
126 | 3.73M | } |
127 | | |
128 | | } // anonymous namespace |
129 | | |
130 | | // ------------------------------------------------------------------------------------------------ |
131 | | // Reactor class members |
132 | | // ------------------------------------------------------------------------------------------------ |
133 | | |
134 | | Reactor::Reactor(Messenger* messenger, |
135 | | int index, |
136 | | const MessengerBuilder &bld) |
137 | | : messenger_(messenger), |
138 | | name_(StringPrintf("%s_R%03d", messenger->name().c_str(), index)), |
139 | | log_prefix_(name_ + ": "), |
140 | | loop_(kDefaultLibEvFlags), |
141 | | cur_time_(CoarseMonoClock::Now()), |
142 | | last_unused_tcp_scan_(cur_time_), |
143 | | connection_keepalive_time_(bld.connection_keepalive_time()), |
144 | | coarse_timer_granularity_(bld.coarse_timer_granularity()), |
145 | 289k | num_connections_to_server_(bld.num_connections_to_server()) { |
146 | 289k | static std::once_flag libev_once; |
147 | 289k | std::call_once(libev_once, DoInitLibEv); |
148 | | |
149 | 289k | VLOG_WITH_PREFIX478 (1) << "Create reactor with keep alive_time: " |
150 | 478 | << yb::ToString(connection_keepalive_time_) |
151 | 478 | << ", coarse timer granularity: " << yb::ToString(coarse_timer_granularity_); |
152 | | |
153 | 289k | process_outbound_queue_task_ = |
154 | 289k | MakeFunctorReactorTask(std::bind(&Reactor::ProcessOutboundQueue, this), SOURCE_LOCATION()); |
155 | 289k | } |
156 | | |
157 | 25.1k | Reactor::~Reactor() { |
158 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !pending_tasks_.empty()) |
159 | 18.4E | << "Not empty pending tasks when destroyed reactor: " << yb::ToString(pending_tasks_); |
160 | 25.1k | } |
161 | | |
162 | 284k | Status Reactor::Init() { |
163 | 284k | DCHECK(thread_.get() == nullptr) << "Already started"5 ; |
164 | 18.4E | DVLOG_WITH_PREFIX(6) << "Called Reactor::Init()"; |
165 | | // Register to get async notifications in our epoll loop. |
166 | 284k | async_.set(loop_); |
167 | 284k | async_.set<Reactor, &Reactor::AsyncHandler>(this); |
168 | 284k | async_.start(); |
169 | | |
170 | | // Register the timer watcher. |
171 | | // The timer is used for closing old TCP connections and applying |
172 | | // backpressure. |
173 | 284k | timer_.set(loop_); |
174 | 284k | timer_.set<Reactor, &Reactor::TimerHandler>(this); |
175 | 284k | timer_.start(ToSeconds(coarse_timer_granularity_), |
176 | 284k | ToSeconds(coarse_timer_granularity_)); |
177 | | |
178 | | // Create Reactor thread. |
179 | 284k | const std::string group_name = messenger_->name() + "_reactor"; |
180 | 284k | return yb::Thread::Create(group_name, group_name, &Reactor::RunThread, this, &thread_); |
181 | 284k | } |
182 | | |
183 | 24.7k | void Reactor::Shutdown() { |
184 | 24.7k | ReactorState old_state = ReactorState::kRunning; |
185 | 49.5k | do { |
186 | 49.5k | if (state_.compare_exchange_weak(old_state, |
187 | 49.5k | ReactorState::kClosing, |
188 | 49.5k | std::memory_order_acq_rel)) { |
189 | 18.4E | VLOG_WITH_PREFIX(1) << "shutting down Reactor thread."; |
190 | 24.7k | WakeThread(); |
191 | 24.7k | } |
192 | 49.5k | } while (!HasReactorStartedClosing(old_state)); |
193 | | |
194 | | // Another thread already switched the state to closing before us. |
195 | 24.7k | } |
196 | | |
197 | 3.10M | void Reactor::ShutdownConnection(const ConnectionPtr& conn) { |
198 | 3.10M | DCHECK(IsCurrentThread()); |
199 | | |
200 | 3.10M | VLOG_WITH_PREFIX3.41k (1) << "shutting down " << conn->ToString()3.41k ; |
201 | 3.10M | conn->Shutdown(ServiceUnavailableError()); |
202 | 3.10M | if (!conn->context().Idle()) { |
203 | 18.4E | VLOG_WITH_PREFIX(1) << "connection is not idle: " << conn->ToString(); |
204 | 3.01k | std::weak_ptr<Connection> weak_conn(conn); |
205 | 3.01k | conn->context().ListenIdle([this, weak_conn]() { |
206 | 2.37k | DCHECK(IsCurrentThreadOrStartedClosing()); |
207 | 2.37k | auto conn = weak_conn.lock(); |
208 | 2.37k | if (conn) { |
209 | 2.37k | VLOG_WITH_PREFIX0 (1) << "connection became idle " << conn->ToString()0 ; |
210 | | // The access to waiting_conns_ is safe here, because this code can only be called on the |
211 | | // reactor thread or when holding final_abort_mutex_ during shutdown. |
212 | 2.37k | waiting_conns_.erase(conn); |
213 | 2.37k | } |
214 | 2.37k | }); |
215 | 3.01k | waiting_conns_.insert(conn); |
216 | 3.10M | } else { |
217 | 3.10M | VLOG_WITH_PREFIX9.71k (1) << "connection is idle: " << conn->ToString()9.71k ; |
218 | 3.10M | } |
219 | 3.10M | } |
220 | | |
221 | 24.6k | void Reactor::ShutdownInternal() { |
222 | 24.6k | DCHECK(IsCurrentThread()); |
223 | | |
224 | 24.6k | stopping_ = true; |
225 | 24.6k | stop_start_time_ = CoarseMonoClock::Now(); |
226 | | |
227 | | // Tear down any outbound TCP connections. |
228 | 24.6k | VLOG_WITH_PREFIX50 (1) << "tearing down outbound TCP connections..."50 ; |
229 | 24.6k | decltype(client_conns_) client_conns = std::move(client_conns_); |
230 | 24.6k | for (auto& pair : client_conns) { |
231 | 22.3k | ShutdownConnection(pair.second); |
232 | 22.3k | } |
233 | 24.6k | client_conns.clear(); |
234 | | |
235 | | // Tear down any inbound TCP connections. |
236 | 24.6k | VLOG_WITH_PREFIX47 (1) << "tearing down inbound TCP connections..."47 ; |
237 | 24.6k | for (const ConnectionPtr& conn : server_conns_) { |
238 | 778 | ShutdownConnection(conn); |
239 | 778 | } |
240 | 24.6k | server_conns_.clear(); |
241 | | |
242 | | // Abort any scheduled tasks. |
243 | | // |
244 | | // These won't be found in the Reactor's list of pending tasks |
245 | | // because they've been "run" (that is, they've been scheduled). |
246 | 24.6k | VLOG_WITH_PREFIX88 (1) << "aborting scheduled tasks"88 ; |
247 | 24.6k | Status aborted = AbortedError(); |
248 | 24.6k | for (const auto& task : scheduled_tasks_) { |
249 | 890 | task->Abort(aborted); |
250 | 890 | } |
251 | 24.6k | scheduled_tasks_.clear(); |
252 | | |
253 | | // async_handler_tasks_ are the tasks added by ScheduleReactorTask. |
254 | 24.6k | VLOG_WITH_PREFIX232 (1) << "aborting async handler tasks"232 ; |
255 | 24.6k | for (const auto& task : async_handler_tasks_) { |
256 | 1 | task->Abort(aborted); |
257 | 1 | } |
258 | | |
259 | 24.6k | VLOG_WITH_PREFIX254 (1) << "aborting outbound calls"254 ; |
260 | 24.6k | CHECK(processing_outbound_queue_.empty()) << yb::ToString(processing_outbound_queue_)76 ; |
261 | 24.6k | { |
262 | 24.6k | std::lock_guard<simple_spinlock> lock(outbound_queue_lock_); |
263 | 24.6k | outbound_queue_stopped_ = true; |
264 | 24.6k | outbound_queue_.swap(processing_outbound_queue_); |
265 | 24.6k | } |
266 | 24.6k | for (auto& call : processing_outbound_queue_) { |
267 | 2 | call->Transferred(aborted, nullptr); |
268 | 2 | } |
269 | 24.6k | processing_outbound_queue_.clear(); |
270 | 24.6k | } |
271 | | |
272 | 8 | Status Reactor::GetMetrics(ReactorMetrics *metrics) { |
273 | 8 | return RunOnReactorThread([metrics](Reactor* reactor) { |
274 | 8 | metrics->num_client_connections = reactor->client_conns_.size(); |
275 | 8 | metrics->num_server_connections = reactor->server_conns_.size(); |
276 | 8 | return Status::OK(); |
277 | 8 | }, SOURCE_LOCATION()); |
278 | 8 | } |
279 | | |
280 | 24.7k | void Reactor::Join() { |
281 | 24.7k | auto join_result = ThreadJoiner(thread_.get()).give_up_after(30s).Join(); |
282 | 24.7k | if (join_result.ok()24.7k ) { |
283 | 24.7k | return; |
284 | 24.7k | } |
285 | 18.4E | if (join_result.IsInvalidArgument()) { |
286 | 0 | LOG_WITH_PREFIX(WARNING) << join_result; |
287 | 0 | return; |
288 | 0 | } |
289 | 18.4E | LOG_WITH_PREFIX(DFATAL) << "Failed to join Reactor " << thread_->ToString() << ": " |
290 | 18.4E | << join_result; |
291 | | // Fallback to endless join in release mode. |
292 | 18.4E | thread_->Join(); |
293 | 18.4E | } |
294 | | |
295 | | void Reactor::QueueEventOnAllConnections( |
296 | 370k | ServerEventListPtr server_event, const SourceLocation& source_location) { |
297 | 370k | ScheduleReactorFunctor([server_event = std::move(server_event)](Reactor* reactor) { |
298 | 365k | for (const ConnectionPtr& conn : reactor->server_conns_) { |
299 | 28.4k | conn->QueueOutboundData(server_event); |
300 | 28.4k | } |
301 | 365k | }, source_location); |
302 | 370k | } |
303 | | |
304 | | Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, |
305 | 15 | DumpRunningRpcsResponsePB* resp) { |
306 | 15 | return RunOnReactorThread([&req, resp](Reactor* reactor) -> Status { |
307 | 15 | for (const ConnectionPtr& conn : reactor->server_conns_) { |
308 | 11 | RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections())); |
309 | 11 | } |
310 | 15 | for (const auto& entry : reactor->client_conns_) { |
311 | 10 | Connection* conn = entry.second.get(); |
312 | 10 | RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections())); |
313 | 10 | } |
314 | 15 | return Status::OK(); |
315 | 15 | }, SOURCE_LOCATION()); |
316 | 15 | } |
317 | | |
318 | 158M | void Reactor::WakeThread() { |
319 | 158M | async_.send(); |
320 | 158M | } |
321 | | |
322 | 24.7k | void Reactor::CheckReadyToStop() { |
323 | 24.7k | DCHECK(IsCurrentThread()); |
324 | | |
325 | 24.7k | VLOG_WITH_PREFIX149 (4) << "Check ready to stop: " << thread_->ToString() << ", " |
326 | 149 | << "waiting connections: " << yb::ToString(waiting_conns_); |
327 | | |
328 | 24.7k | if (VLOG_IS_ON(4)) { |
329 | 0 | for (const auto& conn : waiting_conns_) { |
330 | 0 | VLOG_WITH_PREFIX(4) << "Connection: " << conn->ToString() << ", idle=" << conn->Idle() |
331 | 0 | << ", why: " << conn->ReasonNotIdle(); |
332 | 0 | } |
333 | 0 | } |
334 | | |
335 | 24.7k | if (waiting_conns_.empty()) { |
336 | 18.4E | VLOG_WITH_PREFIX(4) << "Reactor ready to stop, breaking loop: " << this; |
337 | | |
338 | 18.4E | VLOG_WITH_PREFIX(2) << "Marking reactor as closed: " << thread_.get()->ToString(); |
339 | 24.4k | ReactorTasks final_tasks; |
340 | 24.4k | { |
341 | 24.4k | std::lock_guard<simple_spinlock> lock(pending_tasks_mtx_); |
342 | 24.4k | state_.store(ReactorState::kClosed, std::memory_order_release); |
343 | 24.4k | final_tasks.swap(pending_tasks_); |
344 | 24.4k | } |
345 | 18.4E | VLOG_WITH_PREFIX(2) << "Running final pending task aborts: " << thread_.get()->ToString();; |
346 | 24.4k | for (auto task : final_tasks) { |
347 | 0 | task->Abort(ServiceUnavailableError()); |
348 | 0 | } |
349 | 18.4E | VLOG_WITH_PREFIX(2) << "Breaking reactor loop: " << thread_.get()->ToString();; |
350 | 24.4k | loop_.break_loop(); // break the epoll loop and terminate the thread |
351 | 24.4k | } |
352 | 24.7k | } |
353 | | |
354 | | // Handle async events. These events are sent to the reactor by other threads that want to bring |
355 | | // something to our attention, like the fact that we're shutting down, or the fact that there is a |
356 | | // new outbound Transfer ready to send. |
357 | 158M | void Reactor::AsyncHandler(ev::async &watcher, int revents) { |
358 | 158M | VLOG_WITH_PREFIX_AND_FUNC90.9k (4) << "Events: " << revents90.9k ; |
359 | | |
360 | 158M | DCHECK(IsCurrentThread()); |
361 | | |
362 | 158M | auto se = ScopeExit([this] { |
363 | 157M | async_handler_tasks_.clear(); |
364 | 157M | }); |
365 | | |
366 | 158M | if (PREDICT_FALSE(DrainTaskQueueAndCheckIfClosing())) { |
367 | 24.6k | ShutdownInternal(); |
368 | 24.6k | CheckReadyToStop(); |
369 | 24.6k | return; |
370 | 24.6k | } |
371 | | |
372 | 160M | for (const auto &task : async_handler_tasks_)158M { |
373 | 160M | task->Run(this); |
374 | 160M | } |
375 | 158M | } |
376 | | |
377 | 622k | void Reactor::RegisterConnection(const ConnectionPtr& conn) { |
378 | 622k | DCHECK(IsCurrentThread()); |
379 | | |
380 | 622k | Status s = conn->Start(&loop_); |
381 | 622k | if (s.ok()622k ) { |
382 | 622k | server_conns_.push_back(conn); |
383 | 18.4E | } else { |
384 | 18.4E | LOG_WITH_PREFIX(WARNING) << "Failed to start connection: " << conn->ToString() << ": " << s; |
385 | 18.4E | } |
386 | 622k | } |
387 | | |
388 | 75.3M | ConnectionPtr Reactor::AssignOutboundCall(const OutboundCallPtr& call) { |
389 | 75.3M | DCHECK(IsCurrentThread()); |
390 | 75.3M | ConnectionPtr conn; |
391 | | |
392 | | // TODO: Move call deadline timeout computation into OutboundCall constructor. |
393 | 75.3M | const MonoDelta &timeout = call->controller()->timeout(); |
394 | 75.3M | MonoTime deadline; |
395 | 75.3M | if (!timeout.Initialized()) { |
396 | 10.1k | LOG_WITH_PREFIX(WARNING) << "Client call " << call->remote_method().ToString() |
397 | 10.1k | << " has no timeout set for connection id: " |
398 | 10.1k | << call->conn_id().ToString(); |
399 | 10.1k | deadline = MonoTime::Max(); |
400 | 75.3M | } else { |
401 | 75.3M | deadline = MonoTime::Now(); |
402 | 75.3M | deadline.AddDelta(timeout); |
403 | 75.3M | } |
404 | | |
405 | 75.3M | Status s = FindOrStartConnection(call->conn_id(), call->hostname(), deadline, &conn); |
406 | 75.3M | if (PREDICT_FALSE(!s.ok())) { |
407 | 4.53k | call->SetFailed(s); |
408 | 4.53k | return ConnectionPtr(); |
409 | 4.53k | } |
410 | | |
411 | 75.3M | conn->QueueOutboundCall(call); |
412 | 75.3M | return conn; |
413 | 75.3M | } |
414 | | |
415 | | // |
416 | | // Handles timer events. The periodic timer: |
417 | | // |
418 | | // 1. updates Reactor::cur_time_ |
419 | | // 2. every tcp_conn_timeo_ seconds, close down connections older than |
420 | | // tcp_conn_timeo_ seconds. |
421 | | // |
422 | 1.81G | void Reactor::TimerHandler(ev::timer &watcher, int revents) { |
423 | 1.81G | DCHECK(IsCurrentThread()); |
424 | | |
425 | 1.81G | if (EV_ERROR & revents) { |
426 | 0 | LOG_WITH_PREFIX(WARNING) << "Reactor got an error in the timer handler."; |
427 | 0 | return; |
428 | 0 | } |
429 | | |
430 | 1.81G | if (stopping_) { |
431 | 200 | CheckReadyToStop(); |
432 | 200 | return; |
433 | 200 | } |
434 | | |
435 | 1.81G | auto now = CoarseMonoClock::Now(); |
436 | 1.81G | VLOG_WITH_PREFIX4.10M (4) << "timer tick at " << ToSeconds(now.time_since_epoch())4.10M ; |
437 | 1.81G | cur_time_ = now; |
438 | | |
439 | 1.81G | ScanIdleConnections(); |
440 | 1.81G | } |
441 | | |
442 | 1.80G | void Reactor::ScanIdleConnections() { |
443 | 1.80G | DCHECK(IsCurrentThread()); |
444 | 1.80G | if (connection_keepalive_time_ == CoarseMonoClock::Duration::zero()) { |
445 | 18.4E | VLOG_WITH_PREFIX(3) << "Skipping Idle connections check since connection_keepalive_time_ = 0"; |
446 | 518M | return; |
447 | 518M | } |
448 | | |
449 | | // enforce TCP connection timeouts |
450 | 1.28G | auto c = server_conns_.begin(); |
451 | 1.28G | auto c_end = server_conns_.end(); |
452 | 1.28G | uint64_t timed_out = 0; |
453 | 2.20G | for (; c != c_end; ) { |
454 | 916M | const ConnectionPtr& conn = *c; |
455 | 916M | if (!conn->Idle()) { |
456 | 18.4E | VLOG_WITH_PREFIX(3) << "Connection " << conn->ToString() << " not idle"; |
457 | 8.08M | ++c; // TODO: clean up this loop |
458 | 8.08M | continue; |
459 | 8.08M | } |
460 | | |
461 | 908M | auto last_activity_time = conn->last_activity_time(); |
462 | 908M | auto connection_delta = cur_time_ - last_activity_time; |
463 | 908M | if (connection_delta > connection_keepalive_time_) { |
464 | 155k | conn->Shutdown(STATUS_FORMAT( |
465 | 155k | NetworkError, "Connection timed out after $0", ToSeconds(connection_delta))); |
466 | 155k | LOG_WITH_PREFIX(INFO) |
467 | 155k | << "DEBUG: Closing idle connection: " << conn->ToString() |
468 | 155k | << " - it has been idle for " << ToSeconds(connection_delta) << "s"; |
469 | 155k | VLOG(1) << "(delta: " << ToSeconds(connection_delta) |
470 | 11 | << ", current time: " << ToSeconds(cur_time_.time_since_epoch()) |
471 | 11 | << ", last activity time: " << ToSeconds(last_activity_time.time_since_epoch()) << ")"; |
472 | 155k | server_conns_.erase(c++); |
473 | 155k | ++timed_out; |
474 | 908M | } else { |
475 | 908M | ++c; |
476 | 908M | } |
477 | 908M | } |
478 | | |
479 | | // TODO: above only times out on the server side. |
480 | | // Clients may want to set their keepalive timeout as well. |
481 | | |
482 | 18.4E | VLOG_IF_WITH_PREFIX(1, timed_out > 0) << "timed out " << timed_out << " TCP connections."; |
483 | 1.28G | } |
484 | | |
485 | 5.74G | bool Reactor::IsCurrentThread() const { |
486 | 5.74G | return thread_.get() == yb::Thread::current_thread(); |
487 | 5.74G | } |
488 | | |
489 | 79.7M | bool Reactor::IsCurrentThreadOrStartedClosing() const { |
490 | 79.7M | return thread_.get() == yb::Thread::current_thread() || |
491 | 79.7M | HasReactorStartedClosing(state_.load(std::memory_order_acquire))0 ; |
492 | 79.7M | } |
493 | | |
494 | 284k | void Reactor::RunThread() { |
495 | 284k | ThreadRestrictions::SetWaitAllowed(false); |
496 | 284k | ThreadRestrictions::SetIOAllowed(false); |
497 | 18.4E | DVLOG_WITH_PREFIX(6) << "Calling Reactor::RunThread()..."; |
498 | 284k | loop_.run(/* flags */ 0); |
499 | 284k | VLOG_WITH_PREFIX259k (1) << "thread exiting."259k ; |
500 | 284k | } |
501 | | |
502 | | namespace { |
503 | | |
504 | 3.13M | Result<Socket> CreateClientSocket(const Endpoint& remote) { |
505 | 3.13M | int flags = Socket::FLAG_NONBLOCKING; |
506 | 3.13M | if (remote.address().is_v6()) { |
507 | 1 | flags |= Socket::FLAG_IPV6; |
508 | 1 | } |
509 | 3.13M | Socket socket; |
510 | 3.13M | Status status = socket.Init(flags); |
511 | 3.13M | if (status.ok()) { |
512 | 3.12M | status = socket.SetNoDelay(true); |
513 | 3.12M | } |
514 | 3.13M | LOG_IF(WARNING, !status.ok()) << "failed to create an " |
515 | 22.9k | "outbound connection because a new socket could not " |
516 | 22.9k | "be created: " << status.ToString(); |
517 | 3.13M | if (!status.ok()) |
518 | 4.30k | return status; |
519 | 3.13M | return std::move(socket); |
520 | 3.13M | } |
521 | | |
522 | | template <class... Args> |
523 | | Result<std::unique_ptr<Stream>> CreateStream( |
524 | 3.73M | const StreamFactories& factories, const Protocol* protocol, const StreamCreateData& data) { |
525 | 3.73M | auto it = factories.find(protocol); |
526 | 3.73M | if (it == factories.end()) { |
527 | 0 | return STATUS_FORMAT(NotFound, "Unknown protocol: $0", protocol); |
528 | 0 | } |
529 | 3.73M | return it->second->Create(data); |
530 | 3.73M | } |
531 | | |
532 | | } // namespace |
533 | | |
534 | | Status Reactor::FindOrStartConnection(const ConnectionId &conn_id, |
535 | | const std::string& hostname, |
536 | | const MonoTime &deadline, |
537 | 75.2M | ConnectionPtr* conn) { |
538 | 75.2M | DCHECK(IsCurrentThread()); |
539 | 75.2M | auto c = client_conns_.find(conn_id); |
540 | 75.2M | if (c != client_conns_.end()) { |
541 | 72.1M | *conn = (*c).second; |
542 | 72.1M | return Status::OK(); |
543 | 72.1M | } |
544 | | |
545 | 3.03M | if (HasReactorStartedClosing(state_.load(std::memory_order_acquire))) { |
546 | 0 | return ServiceUnavailableError(); |
547 | 0 | } |
548 | | |
549 | | // No connection to this remote. Need to create one. |
550 | 18.4E | VLOG_WITH_PREFIX(2) << "FindOrStartConnection: creating new connection for " |
551 | 18.4E | << conn_id.ToString(); |
552 | | |
553 | | // Create a new socket and start connecting to the remote. |
554 | 3.03M | auto sock = VERIFY_RESULT3.02M (CreateClientSocket(conn_id.remote()));3.02M |
555 | 3.02M | if (messenger_->has_outbound_ip_base_.load(std::memory_order_acquire) && |
556 | 3.02M | !messenger_->test_outbound_ip_base_.is_unspecified()27.5k ) { |
557 | 27.5k | auto address_bytes(messenger_->test_outbound_ip_base_.to_v4().to_bytes()); |
558 | | // Use different addresses for public/private endpoints. |
559 | | // Private addresses are even, and public are odd. |
560 | | // So if base address is "private" and destination address is "public" we will modify |
561 | | // originating address to be "public" also. |
562 | 27.5k | address_bytes[3] |= conn_id.remote().address().to_v4().to_bytes()[3] & 1; |
563 | 27.5k | boost::asio::ip::address_v4 outbound_address(address_bytes); |
564 | 27.5k | auto status = sock.SetReuseAddr(true); |
565 | 27.6k | if (status.ok()27.5k ) { |
566 | 27.6k | status = sock.Bind(Endpoint(outbound_address, 0)); |
567 | 27.6k | } |
568 | 18.4E | LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Bind " << outbound_address << " failed: " |
569 | 18.4E | << status; |
570 | 3.00M | } else if (FLAGS_local_ip_for_outbound_sockets.empty()) { |
571 | 2.46M | auto outbound_address = conn_id.remote().address().is_v6() |
572 | 2.46M | ? messenger_->outbound_address_v6()1 |
573 | 2.46M | : messenger_->outbound_address_v4()2.46M ; |
574 | 2.46M | if (!outbound_address.is_unspecified()) { |
575 | 2.41M | auto status = sock.SetReuseAddr(true); |
576 | 2.41M | if (status.ok()2.41M ) { |
577 | 2.41M | status = sock.Bind(Endpoint(outbound_address, 0)); |
578 | 2.41M | } |
579 | 18.4E | LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Bind " << outbound_address << " failed: " |
580 | 18.4E | << status; |
581 | 2.41M | } |
582 | 2.46M | } |
583 | | |
584 | 3.02M | if (FLAGS_socket_receive_buffer_size) { |
585 | 0 | WARN_NOT_OK(sock.SetReceiveBufferSize(FLAGS_socket_receive_buffer_size), |
586 | 0 | "Set receive buffer size failed: "); |
587 | 0 | } |
588 | | |
589 | 3.02M | auto receive_buffer_size = PatchReceiveBufferSize(VERIFY_RESULT(sock.GetReceiveBufferSize())); |
590 | | |
591 | 3.02M | auto stream = VERIFY_RESULT(CreateStream( |
592 | 3.02M | messenger_->stream_factories_, conn_id.protocol(), |
593 | 3.02M | StreamCreateData { |
594 | 3.02M | .remote = conn_id.remote(), |
595 | 3.02M | .remote_hostname = hostname, |
596 | 3.02M | .socket = &sock, |
597 | 3.02M | .receive_buffer_size = receive_buffer_size, |
598 | 3.02M | .mem_tracker = messenger_->connection_context_factory_->buffer_tracker(), |
599 | 3.02M | .metric_entity = messenger_->metric_entity(), |
600 | 3.02M | })); |
601 | 0 | auto context = messenger_->connection_context_factory_->Create(receive_buffer_size); |
602 | | |
603 | | // Register the new connection in our map. |
604 | 3.02M | auto connection = std::make_shared<Connection>( |
605 | 3.02M | this, |
606 | 3.02M | std::move(stream), |
607 | 3.02M | ConnectionDirection::CLIENT, |
608 | 3.02M | messenger()->rpc_metrics().get(), |
609 | 3.02M | std::move(context)); |
610 | | |
611 | 3.02M | RETURN_NOT_OK(connection->Start(&loop_)); |
612 | | |
613 | | // Insert into the client connection map to avoid duplicate connection requests. |
614 | 3.02M | CHECK(client_conns_.emplace(conn_id, connection).second); |
615 | | |
616 | 3.02M | conn->swap(connection); |
617 | 3.02M | return Status::OK(); |
618 | 3.02M | } |
619 | | |
620 | | namespace { |
621 | | |
622 | 54.6k | void ShutdownIfRemoteAddressIs(const ConnectionPtr& conn, const IpAddress& address) { |
623 | 54.6k | Endpoint peer = conn->remote(); |
624 | | |
625 | 54.6k | if (peer.address() != address) { |
626 | 52.3k | return; |
627 | 52.3k | } |
628 | | |
629 | 2.28k | conn->Close(); |
630 | 2.28k | LOG(INFO) << "Dropped connection: " << conn->ToString(); |
631 | 2.28k | } |
632 | | |
633 | | } // namespace |
634 | | |
635 | 0 | void Reactor::DropWithRemoteAddress(const IpAddress& address) { |
636 | 0 | DropIncomingWithRemoteAddress(address); |
637 | 0 | DropOutgoingWithRemoteAddress(address); |
638 | 0 | } |
639 | | |
640 | 347k | void Reactor::DropIncomingWithRemoteAddress(const IpAddress& address) { |
641 | 347k | DCHECK(IsCurrentThread()); |
642 | | |
643 | 347k | VLOG_WITH_PREFIX317 (1) << "Dropping Incoming connections from " << address317 ; |
644 | 347k | for (auto& conn : server_conns_) { |
645 | 2 | ShutdownIfRemoteAddressIs(conn, address); |
646 | 2 | } |
647 | 347k | } |
648 | | |
649 | 347k | void Reactor::DropOutgoingWithRemoteAddress(const IpAddress& address) { |
650 | 347k | DCHECK(IsCurrentThread()); |
651 | 18.4E | VLOG_WITH_PREFIX(1) << "Dropping Outgoing connections to " << address; |
652 | 347k | for (auto& pair : client_conns_) { |
653 | 54.6k | ShutdownIfRemoteAddressIs(pair.second, address); |
654 | 54.6k | } |
655 | 347k | } |
656 | | |
657 | 3.08M | void Reactor::DestroyConnection(Connection *conn, const Status &conn_status) { |
658 | 3.08M | DCHECK(IsCurrentThread()); |
659 | | |
660 | 3.08M | VLOG_WITH_PREFIX10.7k (3) << "DestroyConnection(" << conn->ToString() << ", " << conn_status.ToString() |
661 | 10.7k | << ")"; |
662 | | |
663 | 3.08M | ConnectionPtr retained_conn = conn->shared_from_this(); |
664 | 3.08M | conn->Shutdown(conn_status); |
665 | | |
666 | | // Unlink connection from lists. |
667 | 3.08M | if (conn->direction() == ConnectionDirection::CLIENT) { |
668 | 2.88M | bool erased = false; |
669 | 25.9M | for (int idx = 0; idx < num_connections_to_server_; idx++23.0M ) { |
670 | 23.0M | auto it = client_conns_.find(ConnectionId(conn->remote(), idx, conn->protocol())); |
671 | 23.0M | if (it != client_conns_.end() && it->second.get() == conn2.89M ) { |
672 | 2.87M | client_conns_.erase(it); |
673 | 2.87M | erased = true; |
674 | 2.87M | } |
675 | 23.0M | } |
676 | 2.88M | if (!erased) { |
677 | 0 | LOG_WITH_PREFIX(WARNING) << "Looking for " << conn->ToString(); |
678 | 0 | for (auto &p : client_conns_) { |
679 | 0 | LOG_WITH_PREFIX(WARNING) << " Client connection: " << p.first.ToString() << ", " |
680 | 0 | << p.second->ToString(); |
681 | 0 | } |
682 | 0 | } |
683 | 18.4E | CHECK(erased) << "Couldn't find connection for any index to " << conn->ToString(); |
684 | 2.88M | } else if (200k conn->direction() == ConnectionDirection::SERVER200k ) { |
685 | 198k | auto it = server_conns_.begin(); |
686 | 408k | while (it != server_conns_.end()408k ) { |
687 | 408k | if ((*it).get() == conn) { |
688 | 198k | server_conns_.erase(it); |
689 | 198k | break; |
690 | 198k | } |
691 | 209k | ++it; |
692 | 209k | } |
693 | 198k | } |
694 | | |
695 | 3.08M | ShutdownConnection(retained_conn); |
696 | 3.08M | } |
697 | | |
698 | 70.0M | void Reactor::ProcessOutboundQueue() { |
699 | 70.0M | CHECK(processing_outbound_queue_.empty()) << yb::ToString(processing_outbound_queue_)71.4k ; |
700 | 70.0M | { |
701 | 70.0M | std::lock_guard<simple_spinlock> lock(outbound_queue_lock_); |
702 | 70.0M | outbound_queue_.swap(processing_outbound_queue_); |
703 | 70.0M | } |
704 | 70.0M | if (processing_outbound_queue_.empty()) { |
705 | 0 | return; |
706 | 0 | } |
707 | | |
708 | 70.0M | processing_connections_.reserve(processing_outbound_queue_.size()); |
709 | 75.3M | for (auto& call : processing_outbound_queue_) { |
710 | 75.3M | auto conn = AssignOutboundCall(call); |
711 | 75.3M | processing_connections_.push_back(std::move(conn)); |
712 | 75.3M | } |
713 | 70.0M | processing_outbound_queue_.clear(); |
714 | | |
715 | 70.0M | std::sort(processing_connections_.begin(), processing_connections_.end()); |
716 | 70.0M | auto new_end = std::unique(processing_connections_.begin(), processing_connections_.end()); |
717 | 70.0M | processing_connections_.erase(new_end, processing_connections_.end()); |
718 | 74.3M | for (auto& conn : processing_connections_) { |
719 | 74.3M | if (conn) { |
720 | 74.1M | conn->OutboundQueued(); |
721 | 74.1M | } |
722 | 74.3M | } |
723 | 70.0M | processing_connections_.clear(); |
724 | 70.0M | } |
725 | | |
726 | 75.4M | void Reactor::QueueOutboundCall(OutboundCallPtr call) { |
727 | 75.4M | DVLOG_WITH_PREFIX91.4k (3) << "Queueing outbound call " |
728 | 91.4k | << call->ToString() << " to remote " << call->conn_id().remote(); |
729 | | |
730 | 75.4M | bool was_empty = false; |
731 | 75.4M | bool closing = false; |
732 | 75.4M | { |
733 | 75.4M | std::lock_guard<simple_spinlock> lock(outbound_queue_lock_); |
734 | 75.5M | if (!outbound_queue_stopped_75.4M ) { |
735 | 75.5M | was_empty = outbound_queue_.empty(); |
736 | 75.5M | outbound_queue_.push_back(call); |
737 | 18.4E | } else { |
738 | 18.4E | closing = true; |
739 | 18.4E | } |
740 | 75.4M | } |
741 | 75.4M | if (closing) { |
742 | 10 | call->Transferred(AbortedError(), nullptr /* conn */); |
743 | 10 | return; |
744 | 10 | } |
745 | 75.4M | if (was_empty) { |
746 | 70.1M | auto scheduled = ScheduleReactorTask(process_outbound_queue_task_); |
747 | 18.4E | LOG_IF_WITH_PREFIX(WARNING, !scheduled) << "Failed to schedule process outbound queue task"; |
748 | 70.1M | } |
749 | 75.4M | TRACE_TO(call->trace(), "Scheduled."); |
750 | 75.4M | } |
751 | | |
752 | | // ------------------------------------------------------------------------------------------------ |
753 | | // ReactorTask class members |
754 | | // ------------------------------------------------------------------------------------------------ |
755 | | |
756 | | ReactorTask::ReactorTask(const SourceLocation& source_location) |
757 | 12.6M | : source_location_(source_location) { |
758 | 12.6M | } |
759 | | |
760 | 12.1M | ReactorTask::~ReactorTask() { |
761 | 12.1M | } |
762 | | |
763 | 891 | void ReactorTask::Abort(const Status& abort_status) { |
764 | 891 | if (!abort_called_.exchange(true, std::memory_order_acq_rel)) { |
765 | 891 | DoAbort(abort_status); |
766 | 891 | } |
767 | 891 | } |
768 | | |
769 | 0 | std::string ReactorTask::ToString() const { |
770 | 0 | return Format("{ source: $0 }", source_location_); |
771 | 0 | } |
772 | | |
773 | | // ------------------------------------------------------------------------------------------------ |
774 | | // DelayedTask class members |
775 | | // ------------------------------------------------------------------------------------------------ |
776 | | |
777 | | DelayedTask::DelayedTask(StatusFunctor func, MonoDelta when, int64_t id, |
778 | | const SourceLocation& source_location, Messenger* messenger) |
779 | | : ReactorTask(source_location), |
780 | | func_(std::move(func)), |
781 | | when_(when), |
782 | | id_(id), |
783 | 10.2M | messenger_(messenger) { |
784 | 10.2M | } |
785 | | |
786 | 10.2M | void DelayedTask::Run(Reactor* reactor) { |
787 | 10.2M | DCHECK(reactor_ == nullptr) << "Task has already been scheduled"175 ; |
788 | 10.2M | DCHECK(reactor->IsCurrentThread()); |
789 | | |
790 | 10.2M | const auto reactor_state = reactor->state(); |
791 | 10.2M | if (reactor_state != ReactorState::kRunning) { |
792 | 0 | LOG(WARNING) << "Reactor is not running (state: " << reactor_state |
793 | 0 | << "), not scheduling a delayed task."; |
794 | 0 | return; |
795 | 0 | } |
796 | | |
797 | | // Acquire lock to prevent task from being aborted in the middle of scheduling, in case abort |
798 | | // will be requested in the middle of scheduling - task will be aborted right after return |
799 | | // from this method. |
800 | 10.2M | std::lock_guard<LockType> l(lock_); |
801 | | |
802 | 10.2M | VLOG_WITH_PREFIX_AND_FUNC111 (4) << "Done: " << done_ << ", when: " << when_111 ; |
803 | | |
804 | 10.2M | if (done_) { |
805 | | // Task has been aborted. |
806 | 0 | return; |
807 | 0 | } |
808 | | |
809 | | // Schedule the task to run later. |
810 | 10.2M | reactor_ = reactor; |
811 | 10.2M | timer_.set(reactor->loop_); |
812 | | |
813 | | // timer_ is owned by this task and will be stopped through AbortTask/Abort before this task |
814 | | // is removed from list of scheduled tasks, so it is safe for timer_ to remember pointer to task. |
815 | 10.2M | timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); |
816 | | |
817 | 10.2M | timer_.start(when_.ToSeconds(), // after |
818 | 10.2M | 0); // repeat |
819 | 10.2M | reactor_->scheduled_tasks_.insert(shared_from(this)); |
820 | 10.2M | } |
821 | | |
822 | 10.2M | MarkAsDoneResult DelayedTask::MarkAsDone() { |
823 | 10.2M | std::lock_guard<LockType> l(lock_); |
824 | 10.2M | if (done_) { |
825 | 822 | return MarkAsDoneResult::kAlreadyDone; |
826 | 822 | } |
827 | 10.2M | done_ = true; |
828 | | |
829 | | // ENG-2879: we need to check if reactor_ is nullptr, because that would mean that the task has |
830 | | // not even started. AbortTask uses the return value of this function to check if it needs to |
831 | | // stop the timer, and that is only possible / necessary if Run has been called and reactor_ is |
832 | | // not nullptr. |
833 | 10.2M | return reactor_ == nullptr ? MarkAsDoneResult::kNotScheduled0 |
834 | 10.2M | : MarkAsDoneResult::kSuccess; |
835 | 10.2M | } |
836 | | |
837 | 1 | std::string DelayedTask::ToString() const { |
838 | 1 | return Format("{ id: $0 source: $1 }", id_, source_location_); |
839 | 1 | } |
840 | | |
841 | 1.75k | void DelayedTask::AbortTask(const Status& abort_status) { |
842 | 1.75k | auto mark_as_done_result = MarkAsDone(); |
843 | | |
844 | 1.75k | VLOG_WITH_PREFIX_AND_FUNC0 (4) |
845 | 0 | << "Status: " << abort_status << ", " << AsString(mark_as_done_result); |
846 | | |
847 | 1.75k | if (mark_as_done_result == MarkAsDoneResult::kSuccess) { |
848 | | // Stop the libev timer. We don't need to do this in the kNotScheduled case, because the timer |
849 | | // has not started in that case. |
850 | 932 | if (reactor_->IsCurrentThread()) { |
851 | 68 | timer_.stop(); |
852 | 864 | } else { |
853 | | // Must call timer_.stop() on the reactor thread. Keep a refcount to prevent this DelayedTask |
854 | | // from being deleted. If the reactor thread has already been shut down, this will be a no-op. |
855 | 864 | reactor_->ScheduleReactorFunctor([this, holder = shared_from(this)](Reactor* reactor) { |
856 | 864 | timer_.stop(); |
857 | 864 | }, SOURCE_LOCATION()); |
858 | 864 | } |
859 | 932 | } |
860 | 1.75k | if (mark_as_done_result != MarkAsDoneResult::kAlreadyDone) { |
861 | | // We need to call the callback whenever we successfully switch the done_ flag to true, whether |
862 | | // or not the task has been scheduled. |
863 | 932 | func_(abort_status); |
864 | 932 | } |
865 | 1.75k | } |
866 | | |
867 | 890 | void DelayedTask::DoAbort(const Status& abort_status) { |
868 | 890 | if (messenger_ != nullptr) { |
869 | 889 | messenger_->RemoveScheduledTask(id_); |
870 | 889 | } |
871 | | |
872 | 890 | AbortTask(abort_status); |
873 | 890 | } |
874 | | |
875 | 10.2M | void DelayedTask::TimerHandler(ev::timer& watcher, int revents) { |
876 | 10.2M | DCHECK(reactor_->IsCurrentThread()); |
877 | | |
878 | 10.2M | auto mark_as_done_result = MarkAsDone(); |
879 | 10.2M | if (mark_as_done_result != MarkAsDoneResult::kSuccess) { |
880 | 0 | DCHECK_EQ(MarkAsDoneResult::kAlreadyDone, mark_as_done_result) |
881 | 0 | << "Can't get kNotScheduled here, because the timer handler is already being called"; |
882 | 0 | return; |
883 | 0 | } |
884 | | |
885 | | // Hold shared_ptr, so this task wouldn't be destroyed upon removal below until func_ is called. |
886 | 10.2M | auto holder = shared_from(this); |
887 | | |
888 | 10.2M | reactor_->scheduled_tasks_.erase(holder); |
889 | 10.2M | if (messenger_ != nullptr) { |
890 | 10.2M | messenger_->RemoveScheduledTask(id_); |
891 | 10.2M | } |
892 | | |
893 | 10.2M | if (EV_ERROR & revents) { |
894 | 0 | std::string msg = "Delayed task got an error in its timer handler"; |
895 | 0 | LOG(WARNING) << msg; |
896 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Abort"; |
897 | 0 | func_(STATUS(Aborted, msg)); |
898 | 10.2M | } else { |
899 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Execute"; |
900 | 10.2M | func_(Status::OK()); |
901 | 10.2M | } |
902 | 10.2M | } |
903 | | |
904 | | // ------------------------------------------------------------------------------------------------ |
905 | | // More Reactor class members |
906 | | // ------------------------------------------------------------------------------------------------ |
907 | | |
908 | | void Reactor::RegisterInboundSocket( |
909 | | Socket *socket, size_t receive_buffer_size, const Endpoint& remote, |
910 | 628k | const ConnectionContextFactoryPtr& factory) { |
911 | 628k | VLOG_WITH_PREFIX0 (3) << "New inbound connection to " << remote0 ; |
912 | 628k | receive_buffer_size = PatchReceiveBufferSize(receive_buffer_size); |
913 | | |
914 | 628k | auto stream = CreateStream( |
915 | 628k | messenger_->stream_factories_, messenger_->listen_protocol_, |
916 | 628k | StreamCreateData { |
917 | 628k | .remote = remote, |
918 | 628k | .remote_hostname = std::string(), |
919 | 628k | .socket = socket, |
920 | 628k | .receive_buffer_size = receive_buffer_size, |
921 | 628k | .mem_tracker = factory->buffer_tracker(), |
922 | 628k | .metric_entity = messenger_->metric_entity() |
923 | 628k | }); |
924 | 628k | if (!stream.ok()) { |
925 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to create stream for " << remote << ": " << stream.status(); |
926 | 0 | return; |
927 | 0 | } |
928 | 628k | auto conn = std::make_shared<Connection>(this, |
929 | 628k | std::move(*stream), |
930 | 628k | ConnectionDirection::SERVER, |
931 | 628k | messenger()->rpc_metrics().get(), |
932 | 628k | factory->Create(receive_buffer_size)); |
933 | 628k | ScheduleReactorFunctor([conn = std::move(conn)](Reactor* reactor) { |
934 | 622k | reactor->RegisterConnection(conn); |
935 | 628k | }, SOURCE_LOCATION()); |
936 | 628k | } |
937 | | |
938 | 160M | bool Reactor::ScheduleReactorTask(ReactorTaskPtr task, bool schedule_even_closing) { |
939 | 160M | bool was_empty; |
940 | 160M | { |
941 | | // Even though state_ is atomic, we still need to take the lock to make sure state_ |
942 | | // and pending_tasks_mtx_ are being modified in a consistent way. |
943 | 160M | std::unique_lock<simple_spinlock> pending_lock(pending_tasks_mtx_); |
944 | 160M | auto state = state_.load(std::memory_order_acquire); |
945 | 160M | bool failure = schedule_even_closing ? state == ReactorState::kClosed3.24k |
946 | 160M | : HasReactorStartedClosing(state)160M ; |
947 | 160M | if (failure) { |
948 | 4 | return false; |
949 | 4 | } |
950 | 160M | was_empty = pending_tasks_.empty(); |
951 | 160M | pending_tasks_.emplace_back(std::move(task)); |
952 | 160M | } |
953 | 160M | if (was_empty) { |
954 | 158M | WakeThread(); |
955 | 158M | } |
956 | | |
957 | 160M | return true; |
958 | 160M | } |
959 | | |
960 | 157M | bool Reactor::DrainTaskQueueAndCheckIfClosing() { |
961 | 157M | CHECK(async_handler_tasks_.empty()); |
962 | | |
963 | 157M | std::lock_guard<simple_spinlock> lock(pending_tasks_mtx_); |
964 | 157M | async_handler_tasks_.swap(pending_tasks_); |
965 | 157M | return HasReactorStartedClosing(state_.load(std::memory_order_acquire)); |
966 | 157M | } |
967 | | |
968 | | // Task to call an arbitrary function within the reactor thread. |
969 | | template<class F> |
970 | | class RunFunctionTask : public ReactorTask { |
971 | | public: |
972 | | RunFunctionTask(const F& f, const SourceLocation& source_location) |
973 | 23 | : ReactorTask(source_location), function_(f) {} reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::RunFunctionTask(yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1 const&, yb::SourceLocation const&) Line | Count | Source | 973 | 8 | : ReactorTask(source_location), function_(f) {} |
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::RunFunctionTask(yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3 const&, yb::SourceLocation const&) Line | Count | Source | 973 | 15 | : ReactorTask(source_location), function_(f) {} |
|
974 | | |
975 | 23 | void Run(Reactor *reactor) override { |
976 | 23 | status_ = function_(reactor); |
977 | 23 | latch_.CountDown(); |
978 | 23 | } reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::Run(yb::rpc::Reactor*) Line | Count | Source | 975 | 8 | void Run(Reactor *reactor) override { | 976 | 8 | status_ = function_(reactor); | 977 | 8 | latch_.CountDown(); | 978 | 8 | } |
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::Run(yb::rpc::Reactor*) Line | Count | Source | 975 | 15 | void Run(Reactor *reactor) override { | 976 | 15 | status_ = function_(reactor); | 977 | 15 | latch_.CountDown(); | 978 | 15 | } |
|
979 | | |
980 | | // Wait until the function has completed, and return the Status returned by the function. |
981 | 23 | Status Wait() { |
982 | 23 | latch_.Wait(); |
983 | 23 | return status_; |
984 | 23 | } reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::Wait() Line | Count | Source | 981 | 8 | Status Wait() { | 982 | 8 | latch_.Wait(); | 983 | 8 | return status_; | 984 | 8 | } |
reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::Wait() Line | Count | Source | 981 | 15 | Status Wait() { | 982 | 15 | latch_.Wait(); | 983 | 15 | return status_; | 984 | 15 | } |
|
985 | | |
986 | | private: |
987 | 0 | void DoAbort(const Status &status) override { |
988 | 0 | status_ = status; |
989 | 0 | latch_.CountDown(); |
990 | 0 | } Unexecuted instantiation: reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>::DoAbort(yb::Status const&) Unexecuted instantiation: reactor.cc:yb::rpc::RunFunctionTask<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>::DoAbort(yb::Status const&) |
991 | | |
992 | | F function_; |
993 | | Status status_; |
994 | | CountDownLatch latch_{1}; |
995 | | }; |
996 | | |
997 | | template<class F> |
998 | 23 | Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) { |
999 | 23 | auto task = std::make_shared<RunFunctionTask<F>>(f, source_location); |
1000 | 23 | if (!ScheduleReactorTask(task)) { |
1001 | 0 | return ServiceUnavailableError(); |
1002 | 0 | } |
1003 | 23 | return task->Wait(); |
1004 | 23 | } reactor.cc:yb::Status yb::rpc::Reactor::RunOnReactorThread<yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1>(yb::rpc::Reactor::GetMetrics(yb::rpc::ReactorMetrics*)::$_1 const&, yb::SourceLocation const&) Line | Count | Source | 998 | 8 | Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) { | 999 | 8 | auto task = std::make_shared<RunFunctionTask<F>>(f, source_location); | 1000 | 8 | if (!ScheduleReactorTask(task)) { | 1001 | 0 | return ServiceUnavailableError(); | 1002 | 0 | } | 1003 | 8 | return task->Wait(); | 1004 | 8 | } |
reactor.cc:yb::Status yb::rpc::Reactor::RunOnReactorThread<yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3>(yb::rpc::Reactor::DumpRunningRpcs(yb::rpc::DumpRunningRpcsRequestPB const&, yb::rpc::DumpRunningRpcsResponsePB*)::$_3 const&, yb::SourceLocation const&) Line | Count | Source | 998 | 15 | Status Reactor::RunOnReactorThread(const F& f, const SourceLocation& source_location) { | 999 | 15 | auto task = std::make_shared<RunFunctionTask<F>>(f, source_location); | 1000 | 15 | if (!ScheduleReactorTask(task)) { | 1001 | 0 | return ServiceUnavailableError(); | 1002 | 0 | } | 1003 | 15 | return task->Wait(); | 1004 | 15 | } |
|
1005 | | |
1006 | | } // namespace rpc |
1007 | | } // namespace yb |