/Users/deen/code/yugabyte-db/src/yb/rpc/messenger.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/messenger.h" |
34 | | |
35 | | #include <sys/types.h> |
36 | | |
37 | | #include <list> |
38 | | #include <mutex> |
39 | | #include <set> |
40 | | #include <string> |
41 | | #include <thread> |
42 | | |
43 | | #include <gflags/gflags.h> |
44 | | #include <glog/logging.h> |
45 | | |
46 | | #include "yb/gutil/map-util.h" |
47 | | #include "yb/gutil/stl_util.h" |
48 | | #include "yb/gutil/strings/substitute.h" |
49 | | |
50 | | #include "yb/rpc/acceptor.h" |
51 | | #include "yb/rpc/constants.h" |
52 | | #include "yb/rpc/reactor.h" |
53 | | #include "yb/rpc/rpc_header.pb.h" |
54 | | #include "yb/rpc/rpc_metrics.h" |
55 | | #include "yb/rpc/rpc_service.h" |
56 | | #include "yb/rpc/rpc_util.h" |
57 | | #include "yb/rpc/tcp_stream.h" |
58 | | #include "yb/rpc/yb_rpc.h" |
59 | | |
60 | | #include "yb/util/debug-util.h" |
61 | | #include "yb/util/errno.h" |
62 | | #include "yb/util/flag_tags.h" |
63 | | #include "yb/util/format.h" |
64 | | #include "yb/util/metrics.h" |
65 | | #include "yb/util/monotime.h" |
66 | | #include "yb/util/net/dns_resolver.h" |
67 | | #include "yb/util/net/socket.h" |
68 | | #include "yb/util/result.h" |
69 | | #include "yb/util/size_literals.h" |
70 | | #include "yb/util/status.h" |
71 | | #include "yb/util/status_format.h" |
72 | | #include "yb/util/status_log.h" |
73 | | #include "yb/util/thread_restrictions.h" |
74 | | #include "yb/util/trace.h" |
75 | | |
76 | | using namespace std::literals; |
77 | | using namespace std::placeholders; |
78 | | using namespace yb::size_literals; |
79 | | |
80 | | using std::string; |
81 | | using std::shared_ptr; |
82 | | using strings::Substitute; |
83 | | |
84 | | DECLARE_int32(num_connections_to_server); |
85 | | DEFINE_int32(rpc_default_keepalive_time_ms, 65000, |
86 | | "If an RPC connection from a client is idle for this amount of time, the server " |
87 | | "will disconnect the client. Setting flag to 0 disables this clean up."); |
88 | | TAG_FLAG(rpc_default_keepalive_time_ms, advanced); |
89 | | DEFINE_uint64(io_thread_pool_size, 4, "Size of allocated IO Thread Pool."); |
90 | | |
91 | | DEFINE_int64(outbound_rpc_memory_limit, 0, "Outbound RPC memory limit"); |
92 | | |
93 | | DEFINE_int32(rpc_queue_limit, 10000, "Queue limit for rpc server"); |
94 | | DEFINE_int32(rpc_workers_limit, 1024, "Workers limit for rpc server"); |
95 | | |
96 | | DEFINE_int32(socket_receive_buffer_size, 0, "Socket receive buffer size, 0 to use default"); |
97 | | |
98 | | namespace yb { |
99 | | namespace rpc { |
100 | | |
101 | | class Messenger; |
102 | | class ServerBuilder; |
103 | | |
104 | | // ------------------------------------------------------------------------------------------------ |
105 | | // MessengerBuilder |
106 | | // ------------------------------------------------------------------------------------------------ |
107 | | |
108 | | MessengerBuilder::MessengerBuilder(std::string name) |
109 | | : name_(std::move(name)), |
110 | | connection_keepalive_time_(FLAGS_rpc_default_keepalive_time_ms * 1ms), |
111 | | coarse_timer_granularity_(100ms), |
112 | | listen_protocol_(TcpStream::StaticProtocol()), |
113 | | queue_limit_(FLAGS_rpc_queue_limit), |
114 | | workers_limit_(FLAGS_rpc_workers_limit), |
115 | 35.3k | num_connections_to_server_(GetAtomicFlag(&FLAGS_num_connections_to_server)) { |
116 | 35.3k | AddStreamFactory(TcpStream::StaticProtocol(), TcpStream::Factory()); |
117 | 35.3k | } |
118 | | |
119 | 34.6k | MessengerBuilder::~MessengerBuilder() = default; |
120 | 0 | MessengerBuilder::MessengerBuilder(const MessengerBuilder&) = default; |
121 | | |
122 | | MessengerBuilder& MessengerBuilder::set_connection_keepalive_time( |
123 | 26.5k | CoarseMonoClock::Duration keepalive) { |
124 | 26.5k | connection_keepalive_time_ = keepalive; |
125 | 26.5k | return *this; |
126 | 26.5k | } |
127 | | |
128 | 33.7k | MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) { |
129 | 33.7k | num_reactors_ = num_reactors; |
130 | 33.7k | return *this; |
131 | 33.7k | } |
132 | | |
133 | | MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity( |
134 | 202 | CoarseMonoClock::Duration granularity) { |
135 | 202 | coarse_timer_granularity_ = granularity; |
136 | 202 | return *this; |
137 | 202 | } |
138 | | |
139 | | MessengerBuilder &MessengerBuilder::set_metric_entity( |
140 | 33.3k | const scoped_refptr<MetricEntity>& metric_entity) { |
141 | 33.3k | metric_entity_ = metric_entity; |
142 | 33.3k | return *this; |
143 | 33.3k | } |
144 | | |
145 | 35.3k | Result<std::unique_ptr<Messenger>> MessengerBuilder::Build() { |
146 | 35.3k | if (!connection_context_factory_) { |
147 | 2.01k | UseDefaultConnectionContextFactory(); |
148 | 2.01k | } |
149 | 35.3k | std::unique_ptr<Messenger> messenger(new Messenger(*this)); |
150 | 35.3k | RETURN_NOT_OK(messenger->Init()); |
151 | | |
152 | 35.3k | return messenger; |
153 | 35.3k | } |
154 | | |
155 | | MessengerBuilder &MessengerBuilder::AddStreamFactory( |
156 | 59.0k | const Protocol* protocol, StreamFactoryPtr factory) { |
157 | 59.0k | auto p = stream_factories_.emplace(protocol, std::move(factory)); |
158 | 18.4E | LOG_IF(DFATAL, !p.second) << "Duplicate stream factory: " << protocol->ToString(); |
159 | 59.0k | return *this; |
160 | 59.0k | } |
161 | | |
162 | | MessengerBuilder &MessengerBuilder::UseDefaultConnectionContextFactory( |
163 | 35.1k | const std::shared_ptr<MemTracker>& parent_mem_tracker) { |
164 | 35.1k | if (parent_mem_tracker) { |
165 | 32.4k | last_used_parent_mem_tracker_ = parent_mem_tracker; |
166 | 32.4k | } |
167 | 35.1k | connection_context_factory_ = rpc::CreateConnectionContextFactory<YBOutboundConnectionContext>( |
168 | 35.1k | FLAGS_outbound_rpc_memory_limit, parent_mem_tracker); |
169 | 35.1k | return *this; |
170 | 35.1k | } |
171 | | |
172 | | // ------------------------------------------------------------------------------------------------ |
173 | | // Messenger |
174 | | // ------------------------------------------------------------------------------------------------ |
175 | | |
176 | 9.02k | void Messenger::Shutdown() { |
177 | 9.02k | ShutdownThreadPools(); |
178 | 9.02k | ShutdownAcceptor(); |
179 | 9.02k | UnregisterAllServices(); |
180 | | |
181 | | // Since we're shutting down, it's OK to block. |
182 | 9.02k | ThreadRestrictions::ScopedAllowWait allow_wait; |
183 | | |
184 | 9.02k | std::vector<Reactor*> reactors; |
185 | 9.02k | std::unique_ptr<Acceptor> acceptor; |
186 | 9.02k | { |
187 | 9.02k | std::lock_guard<percpu_rwlock> guard(lock_); |
188 | 9.02k | if (closing_) { |
189 | 456 | return; |
190 | 456 | } |
191 | 8.56k | VLOG(1) << "shutting down messenger " << name_33 ; |
192 | 8.56k | closing_ = true; |
193 | | |
194 | 8.56k | DCHECK(rpc_services_.empty()) << "Unregister RPC services before shutting down Messenger"47 ; |
195 | 8.56k | rpc_services_.clear(); |
196 | | |
197 | 8.56k | acceptor.swap(acceptor_); |
198 | | |
199 | 24.7k | for (const auto& reactor : reactors_) { |
200 | 24.7k | reactors.push_back(reactor.get()); |
201 | 24.7k | } |
202 | 8.56k | } |
203 | | |
204 | 8.56k | if (acceptor) { |
205 | 0 | acceptor->Shutdown(); |
206 | 0 | } |
207 | | |
208 | 24.7k | for (auto* reactor : reactors) { |
209 | 24.7k | reactor->Shutdown(); |
210 | 24.7k | } |
211 | | |
212 | 8.56k | scheduler_.Shutdown(); |
213 | 8.56k | io_thread_pool_.Shutdown(); |
214 | | |
215 | 24.7k | for (auto* reactor : reactors) { |
216 | 24.7k | reactor->Join(); |
217 | 24.7k | } |
218 | | |
219 | 8.56k | io_thread_pool_.Join(); |
220 | | |
221 | 8.56k | { |
222 | 8.56k | std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_); |
223 | 8.56k | LOG_IF(DFATAL, !scheduled_tasks_.empty()) |
224 | 17 | << "Scheduled tasks is not empty after messenger shutdown: " |
225 | 17 | << yb::ToString(scheduled_tasks_); |
226 | 8.56k | } |
227 | 8.56k | } |
228 | | |
229 | | Status Messenger::ListenAddress( |
230 | | ConnectionContextFactoryPtr factory, const Endpoint& accept_endpoint, |
231 | 25.9k | Endpoint* bound_endpoint) { |
232 | 25.9k | Acceptor* acceptor; |
233 | 25.9k | { |
234 | 25.9k | std::lock_guard<percpu_rwlock> guard(lock_); |
235 | 25.9k | if (!acceptor_) { |
236 | 25.9k | acceptor_.reset(new Acceptor( |
237 | 25.9k | metric_entity_, std::bind(&Messenger::RegisterInboundSocket, this, factory, _1, _2))); |
238 | 25.9k | } |
239 | 25.9k | auto accept_host = accept_endpoint.address(); |
240 | 25.9k | auto& outbound_address = accept_host.is_v6() ? outbound_address_v6_2 |
241 | 25.9k | : outbound_address_v4_25.9k ; |
242 | 25.9k | if (outbound_address.is_unspecified() && !accept_host.is_unspecified()) { |
243 | 25.8k | outbound_address = accept_host; |
244 | 25.8k | } |
245 | 25.9k | acceptor = acceptor_.get(); |
246 | 25.9k | } |
247 | 25.9k | return acceptor->Listen(accept_endpoint, bound_endpoint); |
248 | 25.9k | } |
249 | | |
250 | 25.8k | Status Messenger::StartAcceptor() { |
251 | 208k | for (const auto& p : rpc_services_) { |
252 | 208k | p.second->FillEndpoints(&rpc_endpoints_); |
253 | 208k | } |
254 | | |
255 | 25.8k | std::lock_guard<percpu_rwlock> guard(lock_); |
256 | 25.8k | if (acceptor_) { |
257 | 25.8k | return acceptor_->Start(); |
258 | 25.8k | } else { |
259 | 0 | return STATUS(IllegalState, "Trying to start acceptor w/o active addresses"); |
260 | 0 | } |
261 | 25.8k | } |
262 | | |
263 | 35.2k | void Messenger::BreakConnectivityWith(const IpAddress& address) { |
264 | 35.2k | BreakConnectivity(address, /* incoming */ true, /* outgoing */ true); |
265 | 35.2k | } |
266 | 0 | void Messenger::BreakConnectivityTo(const IpAddress& address) { |
267 | 0 | BreakConnectivity(address, /* incoming */ false, /* outgoing */ true); |
268 | 0 | } |
269 | | |
270 | 0 | void Messenger::BreakConnectivityFrom(const IpAddress& address) { |
271 | 0 | BreakConnectivity(address, /* incoming */ true, /* outgoing */ false); |
272 | 0 | } |
273 | | |
274 | 35.2k | void Messenger::BreakConnectivity(const IpAddress& address, bool incoming, bool outgoing) { |
275 | 35.2k | LOG(INFO) << "TEST: Break " << (incoming ? "incoming" : ""0 ) << "/" << (outgoing ? "outgoing" : ""0 ) |
276 | 35.2k | << " connectivity with: " << address; |
277 | | |
278 | 35.2k | boost::optional<CountDownLatch> latch; |
279 | 35.2k | { |
280 | 35.2k | std::lock_guard<percpu_rwlock> guard(lock_); |
281 | 35.2k | if (broken_connectivity_from_.empty() || broken_connectivity_to_.empty()33.4k ) { |
282 | 1.76k | has_broken_connectivity_.store(true, std::memory_order_release); |
283 | 1.76k | } |
284 | 35.2k | bool inserted_from = false; |
285 | 35.2k | if (incoming) { |
286 | 35.2k | inserted_from = broken_connectivity_from_.insert(address).second; |
287 | 35.2k | } |
288 | 35.2k | bool inserted_to = false; |
289 | 35.2k | if (outgoing) { |
290 | 35.2k | inserted_to = broken_connectivity_to_.insert(address).second; |
291 | 35.2k | } |
292 | 35.2k | if (inserted_from || inserted_to0 ) { |
293 | 35.2k | latch.emplace(reactors_.size()); |
294 | 352k | for (const auto& reactor : reactors_) { |
295 | 352k | auto scheduled = reactor->ScheduleReactorTask(MakeFunctorReactorTask( |
296 | 352k | [&latch, address, incoming, outgoing](Reactor* reactor) { |
297 | 347k | if (incoming) { |
298 | 347k | reactor->DropIncomingWithRemoteAddress(address); |
299 | 347k | } |
300 | 348k | if (outgoing347k ) { |
301 | 348k | reactor->DropOutgoingWithRemoteAddress(address); |
302 | 348k | } |
303 | 347k | latch->CountDown(); |
304 | 347k | }, |
305 | 352k | SOURCE_LOCATION())); |
306 | 352k | if (!scheduled) { |
307 | 0 | LOG(INFO) << "Failed to schedule drop connection with: " << address.to_string(); |
308 | 0 | latch->CountDown(); |
309 | 0 | } |
310 | 352k | } |
311 | 35.2k | } |
312 | 35.2k | } |
313 | | |
314 | 35.2k | if (latch) { |
315 | 35.2k | latch->Wait(); |
316 | 35.2k | } |
317 | 35.2k | } |
318 | | |
319 | 35.2k | void Messenger::RestoreConnectivityWith(const IpAddress& address) { |
320 | 35.2k | RestoreConnectivity(address, /* incoming */ true, /* outgoing */ true); |
321 | 35.2k | } |
322 | 0 | void Messenger::RestoreConnectivityTo(const IpAddress& address) { |
323 | 0 | RestoreConnectivity(address, /* incoming */ false, /* outgoing */ true); |
324 | 0 | } |
325 | | |
326 | 0 | void Messenger::RestoreConnectivityFrom(const IpAddress& address) { |
327 | 0 | RestoreConnectivity(address, /* incoming */ true, /* outgoing */ false); |
328 | 0 | } |
329 | | |
330 | 35.2k | void Messenger::RestoreConnectivity(const IpAddress& address, bool incoming, bool outgoing) { |
331 | 35.2k | LOG(INFO) << "TEST: Restore " << (incoming ? "incoming" : ""0 ) << "/" |
332 | 35.2k | << (outgoing ? "outgoing" : ""0 ) << " connectivity with: " << address; |
333 | | |
334 | 35.2k | std::lock_guard<percpu_rwlock> guard(lock_); |
335 | 35.2k | if (incoming) { |
336 | 35.2k | broken_connectivity_from_.erase(address); |
337 | 35.2k | } |
338 | 35.2k | if (outgoing) { |
339 | 35.2k | broken_connectivity_to_.erase(address); |
340 | 35.2k | } |
341 | 35.2k | if (broken_connectivity_from_.empty() && broken_connectivity_to_.empty()1 ) { |
342 | 1 | has_broken_connectivity_.store(false, std::memory_order_release); |
343 | 1 | } |
344 | 35.2k | } |
345 | | |
346 | 628k | bool Messenger::TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote) { |
347 | 628k | if (has_broken_connectivity_.load(std::memory_order_acquire)) { |
348 | 13.9k | shared_lock<rw_spinlock> guard(lock_.get_lock()); |
349 | 13.9k | return broken_connectivity_from_.count(remote) != 0; |
350 | 13.9k | } |
351 | 614k | return false; |
352 | 628k | } |
353 | | |
354 | 75.5M | bool Messenger::TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote) { |
355 | 75.5M | if (has_broken_connectivity_.load(std::memory_order_acquire)) { |
356 | 319k | shared_lock<rw_spinlock> guard(lock_.get_lock()); |
357 | 319k | return broken_connectivity_to_.count(remote) != 0; |
358 | 319k | } |
359 | 75.2M | return false; |
360 | 75.5M | } |
361 | | |
362 | 8 | Status Messenger::TEST_GetReactorMetrics(size_t reactor_idx, ReactorMetrics* metrics) { |
363 | 8 | if (reactor_idx >= reactors_.size()) { |
364 | 0 | return STATUS_FORMAT( |
365 | 0 | InvalidArgument, "Invalid reactor index $0, should be >=0 and <$1", reactor_idx, |
366 | 0 | reactors_.size()); |
367 | 0 | } |
368 | 8 | return reactors_[reactor_idx]->GetMetrics(metrics); |
369 | 8 | } |
370 | | |
371 | 9.90k | void Messenger::ShutdownAcceptor() { |
372 | 9.90k | std::unique_ptr<Acceptor> acceptor; |
373 | 9.90k | { |
374 | 9.90k | std::lock_guard<percpu_rwlock> guard(lock_); |
375 | 9.90k | acceptor.swap(acceptor_); |
376 | 9.90k | } |
377 | 9.90k | if (acceptor) { |
378 | 363 | acceptor->Shutdown(); |
379 | 363 | } |
380 | 9.90k | } |
381 | | |
382 | 85.8M | rpc::ThreadPool& Messenger::ThreadPool(ServicePriority priority) { |
383 | 85.8M | switch (priority) { |
384 | 58.5M | case ServicePriority::kNormal: |
385 | 58.5M | return *normal_thread_pool_; |
386 | 27.3M | case ServicePriority::kHigh: |
387 | 27.3M | auto high_priority_thread_pool = high_priority_thread_pool_.get(); |
388 | 27.3M | if (high_priority_thread_pool) { |
389 | 27.2M | return *high_priority_thread_pool; |
390 | 27.2M | } |
391 | 129k | std::lock_guard<std::mutex> lock(mutex_high_priority_thread_pool_); |
392 | 129k | high_priority_thread_pool = high_priority_thread_pool_.get(); |
393 | 129k | if (high_priority_thread_pool) { |
394 | 0 | return *high_priority_thread_pool; |
395 | 0 | } |
396 | 129k | const ThreadPoolOptions& options = normal_thread_pool_->options(); |
397 | 129k | high_priority_thread_pool_.reset(new rpc::ThreadPool( |
398 | 129k | name_ + "-high-pri", options.queue_limit, options.max_workers)); |
399 | 129k | return *high_priority_thread_pool_.get(); |
400 | 85.8M | } |
401 | 0 | FATAL_INVALID_ENUM_VALUE(ServicePriority, priority); |
402 | 0 | } |
403 | | |
404 | | // Register a new RpcService to handle inbound requests. |
405 | | Status Messenger::RegisterService( |
406 | 209k | const std::string& service_name, const scoped_refptr<RpcService>& service) { |
407 | 209k | DCHECK(service); |
408 | 209k | rpc_services_.emplace(service_name, service); |
409 | 209k | return Status::OK(); |
410 | 209k | } |
411 | | |
412 | 9.93k | void Messenger::ShutdownThreadPools() { |
413 | 9.93k | normal_thread_pool_->Shutdown(); |
414 | 9.93k | auto high_priority_thread_pool = high_priority_thread_pool_.get(); |
415 | 9.93k | if (high_priority_thread_pool) { |
416 | 1.47k | high_priority_thread_pool->Shutdown(); |
417 | 1.47k | } |
418 | 9.93k | } |
419 | | |
420 | 9.90k | void Messenger::UnregisterAllServices() { |
421 | 9.90k | CHECK_OK(rpc_services_counter_.DisableAndWaitForOps(CoarseTimePoint::max(), Stop::kTrue)); |
422 | 9.90k | rpc_services_counter_.UnlockExclusiveOpMutex(); |
423 | | |
424 | 9.90k | for (const auto& p : rpc_services_) { |
425 | 2.30k | p.second->StartShutdown(); |
426 | 2.30k | } |
427 | 9.90k | for (const auto& p : rpc_services_) { |
428 | 2.25k | p.second->CompleteShutdown(); |
429 | 2.25k | } |
430 | | |
431 | 9.90k | rpc_services_.clear(); |
432 | 9.90k | rpc_endpoints_.clear(); |
433 | 9.90k | } |
434 | | |
435 | | class NotifyDisconnectedReactorTask : public ReactorTask { |
436 | | public: |
437 | | NotifyDisconnectedReactorTask(OutboundCallPtr call, const SourceLocation& source_location) |
438 | 154k | : ReactorTask(source_location), call_(std::move(call)) {} |
439 | | |
440 | 154k | void Run(Reactor* reactor) override { |
441 | 154k | call_->Transferred(STATUS_FORMAT( |
442 | 154k | NetworkError, "TEST: Connectivity is broken with $0", |
443 | 154k | call_->conn_id().remote().address()), nullptr); |
444 | 154k | } |
445 | | private: |
446 | 0 | void DoAbort(const Status &abort_status) override { |
447 | 0 | call_->Transferred(abort_status, nullptr); |
448 | 0 | } |
449 | | |
450 | | OutboundCallPtr call_; |
451 | | }; |
452 | | |
453 | 75.5M | void Messenger::QueueOutboundCall(OutboundCallPtr call) { |
454 | 75.5M | const auto& remote = call->conn_id().remote(); |
455 | 75.5M | Reactor *reactor = RemoteToReactor(remote, call->conn_id().idx()); |
456 | | |
457 | 75.5M | if (TEST_ShouldArtificiallyRejectOutgoingCallsTo(remote.address())) { |
458 | 154k | VLOG(1) << "TEST: Rejected connection to " << remote7 ; |
459 | 154k | auto scheduled = reactor->ScheduleReactorTask(std::make_shared<NotifyDisconnectedReactorTask>( |
460 | 154k | call, SOURCE_LOCATION())); |
461 | 154k | if (!scheduled) { |
462 | 2 | call->Transferred(STATUS(Aborted, "Reactor is closing"), nullptr /* conn */); |
463 | 2 | } |
464 | 154k | return; |
465 | 154k | } |
466 | | |
467 | 75.4M | reactor->QueueOutboundCall(std::move(call)); |
468 | 75.4M | } |
469 | | |
470 | 90.7M | void Messenger::Handle(InboundCallPtr call, Queue queue) { |
471 | 90.7M | ScopedRWOperation op(&rpc_services_counter_, CoarseTimePoint::min()); |
472 | 90.7M | if (!op.ok()) { |
473 | 6 | call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, MoveStatus(op)); |
474 | 6 | return; |
475 | 6 | } |
476 | 90.7M | auto it = rpc_endpoints_.find(call->serialized_remote_method()); |
477 | 90.7M | if (it == rpc_endpoints_.end()) { |
478 | 265k | auto remote_method = ParseRemoteMethod(call->serialized_remote_method()); |
479 | 265k | Status s; |
480 | 265k | ErrorStatusPB::RpcErrorCodePB error_code = ErrorStatusPB::ERROR_NO_SUCH_SERVICE; |
481 | 265k | if (remote_method.ok()265k ) { |
482 | 265k | if (rpc_services_.count(remote_method->service.ToBuffer())) { |
483 | 2 | error_code = ErrorStatusPB::ERROR_NO_SUCH_METHOD; |
484 | 2 | s = STATUS_FORMAT( |
485 | 2 | InvalidArgument, "Call on service $0 received from $1 with an invalid method name: $2", |
486 | 2 | remote_method->service.ToBuffer(), |
487 | 2 | call->remote_address(), |
488 | 2 | remote_method->method.ToBuffer()); |
489 | 265k | } else { |
490 | 265k | s = STATUS_FORMAT( |
491 | 265k | ServiceUnavailable, "Service $0 not registered on $1", |
492 | 265k | remote_method->service.ToBuffer(), name_); |
493 | 265k | } |
494 | 18.4E | } else { |
495 | 18.4E | s = remote_method.status(); |
496 | 18.4E | } |
497 | 265k | LOG(WARNING) << s; |
498 | 265k | call->RespondFailure(error_code, s); |
499 | 265k | return; |
500 | 265k | } |
501 | | |
502 | | // The RpcService will respond to the client on success or failure. |
503 | 90.5M | call->set_method_index(it->second.second); |
504 | 90.5M | it->second.first->QueueInboundCall(std::move(call)); |
505 | 90.5M | } |
506 | | |
507 | 0 | RpcServicePtr Messenger::TEST_rpc_service(const std::string& service_name) const { |
508 | 0 | ScopedRWOperation op(&rpc_services_counter_, CoarseTimePoint::min()); |
509 | 0 | if (!op.ok()) { |
510 | 0 | return nullptr; |
511 | 0 | } |
512 | 0 | auto it = rpc_services_.find(service_name); |
513 | 0 | return it != rpc_services_.end() ? it->second : nullptr; |
514 | 0 | } |
515 | | |
516 | 425k | const std::shared_ptr<MemTracker>& Messenger::parent_mem_tracker() { |
517 | 425k | return connection_context_factory_->buffer_tracker(); |
518 | 425k | } |
519 | | |
520 | | void Messenger::RegisterInboundSocket( |
521 | 628k | const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote) { |
522 | 628k | if (TEST_ShouldArtificiallyRejectIncomingCallsFrom(remote.address())) { |
523 | 1 | auto status = new_socket->Close(); |
524 | 1 | VLOG(1) << "TEST: Rejected connection from " << remote |
525 | 0 | << ", close status: " << status.ToString(); |
526 | 1 | return; |
527 | 1 | } |
528 | | |
529 | 628k | if (FLAGS_socket_receive_buffer_size) { |
530 | 0 | WARN_NOT_OK(new_socket->SetReceiveBufferSize(FLAGS_socket_receive_buffer_size), |
531 | 0 | "Set receive buffer size failed: "); |
532 | 0 | } |
533 | | |
534 | 628k | auto receive_buffer_size = new_socket->GetReceiveBufferSize(); |
535 | 628k | if (!receive_buffer_size.ok()) { |
536 | 0 | LOG(WARNING) << "Register inbound socket failed: " << receive_buffer_size.status(); |
537 | 0 | return; |
538 | 0 | } |
539 | | |
540 | 628k | int idx = num_connections_accepted_.fetch_add(1) % num_connections_to_server_; |
541 | 628k | Reactor *reactor = RemoteToReactor(remote, idx); |
542 | 628k | reactor->RegisterInboundSocket(new_socket, *receive_buffer_size, remote, factory); |
543 | 628k | } |
544 | | |
545 | | Messenger::Messenger(const MessengerBuilder &bld) |
546 | | : name_(bld.name_), |
547 | | connection_context_factory_(bld.connection_context_factory_), |
548 | | stream_factories_(bld.stream_factories_), |
549 | | listen_protocol_(bld.listen_protocol_), |
550 | | rpc_services_counter_(name_ + " endpoints"), |
551 | | metric_entity_(bld.metric_entity_), |
552 | | io_thread_pool_(name_, FLAGS_io_thread_pool_size), |
553 | | scheduler_(&io_thread_pool_.io_service()), |
554 | | normal_thread_pool_(new rpc::ThreadPool(name_, bld.queue_limit_, bld.workers_limit_)), |
555 | | resolver_(new DnsResolver(&io_thread_pool_.io_service())), |
556 | | rpc_metrics_(std::make_shared<RpcMetrics>(bld.metric_entity_)), |
557 | 35.3k | num_connections_to_server_(bld.num_connections_to_server_) { |
558 | 35.3k | #ifndef NDEBUG |
559 | 35.3k | creation_stack_trace_.Collect(/* skip_frames */ 1); |
560 | 35.3k | #endif |
561 | 35.3k | VLOG(1) << "Messenger constructor for " << this << " called at:\n" << GetStackTrace()66 ; |
562 | 325k | for (int i = 0; i < bld.num_reactors_; i++289k ) { |
563 | 289k | reactors_.emplace_back(std::make_unique<Reactor>(this, i, bld)); |
564 | 289k | } |
565 | | // Make sure skip buffer is allocated before we hit memory limit and try to use it. |
566 | 35.3k | GetGlobalSkipBuffer(); |
567 | 35.3k | } |
568 | | |
569 | 8.54k | Messenger::~Messenger() { |
570 | 8.54k | std::lock_guard<percpu_rwlock> guard(lock_); |
571 | | // This logging and the corresponding logging in the constructor is here to track down the |
572 | | // occasional CHECK(closing_) failure below in some tests (ENG-2838). |
573 | 8.54k | VLOG(1) << "Messenger destructor for " << this << " called at:\n" << GetStackTrace()5 ; |
574 | 8.54k | #ifndef NDEBUG |
575 | 8.54k | if (!closing_) { |
576 | 0 | LOG(ERROR) << "Messenger created here:\n" << creation_stack_trace_.Symbolize() |
577 | 0 | << "Messenger destructor for " << this << " called at:\n" << GetStackTrace(); |
578 | 0 | } |
579 | 8.54k | #endif |
580 | 8.54k | CHECK(closing_) << "Should have already shut down"10 ; |
581 | 8.54k | reactors_.clear(); |
582 | 8.54k | } |
583 | | |
584 | 1 | size_t Messenger::max_concurrent_requests() const { |
585 | 1 | return num_connections_to_server_; |
586 | 1 | } |
587 | | |
588 | 76.1M | Reactor* Messenger::RemoteToReactor(const Endpoint& remote, uint32_t idx) { |
589 | 76.1M | auto hash_code = hash_value(remote); |
590 | 76.1M | auto reactor_idx = (hash_code + idx) % reactors_.size(); |
591 | | // This is just a static partitioning; where each connection |
592 | | // to a remote is assigned to a particular reactor. We could |
593 | | // get a lot fancier with assigning Sockaddrs to Reactors, |
594 | | // but this should be good enough. |
595 | 76.1M | return reactors_[reactor_idx].get(); |
596 | 76.1M | } |
597 | | |
598 | 34.7k | Status Messenger::Init() { |
599 | 34.7k | Status status; |
600 | 284k | for (const auto& r : reactors_) { |
601 | 284k | RETURN_NOT_OK(r->Init()); |
602 | 284k | } |
603 | | |
604 | 34.7k | return Status::OK(); |
605 | 34.7k | } |
606 | | |
607 | | Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, |
608 | 4 | DumpRunningRpcsResponsePB* resp) { |
609 | 4 | shared_lock<rw_spinlock> guard(lock_.get_lock()); |
610 | 15 | for (const auto& reactor : reactors_) { |
611 | 15 | RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp)); |
612 | 15 | } |
613 | 4 | return Status::OK(); |
614 | 4 | } |
615 | | |
616 | | Status Messenger::QueueEventOnAllReactors( |
617 | 37.0k | ServerEventListPtr server_event, const SourceLocation& source_location) { |
618 | 37.0k | shared_lock<rw_spinlock> guard(lock_.get_lock()); |
619 | 370k | for (const auto& reactor : reactors_) { |
620 | 370k | reactor->QueueEventOnAllConnections(server_event, source_location); |
621 | 370k | } |
622 | 37.0k | return Status::OK(); |
623 | 37.0k | } |
624 | | |
625 | 10.2M | void Messenger::RemoveScheduledTask(ScheduledTaskId id) { |
626 | 10.2M | CHECK_GT(id, 0); |
627 | 10.2M | std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_); |
628 | 10.2M | scheduled_tasks_.erase(id); |
629 | 10.2M | } |
630 | | |
631 | 1.33k | void Messenger::AbortOnReactor(ScheduledTaskId task_id) { |
632 | 1.33k | DCHECK(!reactors_.empty()); |
633 | 1.33k | CHECK_GT(task_id, 0); |
634 | | |
635 | 1.33k | std::shared_ptr<DelayedTask> task; |
636 | 1.33k | { |
637 | 1.33k | std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_); |
638 | 1.33k | auto iter = scheduled_tasks_.find(task_id); |
639 | 1.33k | if (iter != scheduled_tasks_.end()) { |
640 | 864 | task = iter->second; |
641 | 864 | scheduled_tasks_.erase(iter); |
642 | 864 | } |
643 | 1.33k | } |
644 | 1.33k | if (task) { |
645 | 864 | task->AbortTask(STATUS(Aborted, "Task aborted by messenger")); |
646 | 864 | } |
647 | 1.33k | } |
648 | | |
649 | | ScheduledTaskId Messenger::ScheduleOnReactor( |
650 | 10.2M | StatusFunctor func, MonoDelta when, const SourceLocation& source_location, Messenger* msgr) { |
651 | 10.2M | DCHECK(!reactors_.empty()); |
652 | | |
653 | | // If we're already running on a reactor thread, reuse it. |
654 | 10.2M | Reactor* chosen = nullptr; |
655 | 102M | for (const auto& r : reactors_) { |
656 | 102M | if (r->IsCurrentThread()) { |
657 | 3.47k | chosen = r.get(); |
658 | 3.47k | break; |
659 | 3.47k | } |
660 | 102M | } |
661 | 10.2M | if (chosen == nullptr) { |
662 | | // Not running on a reactor thread, pick one at random. |
663 | 10.2M | chosen = reactors_[rand() % reactors_.size()].get(); |
664 | 10.2M | } |
665 | | |
666 | 10.2M | ScheduledTaskId task_id = 0; |
667 | 10.2M | if (msgr != nullptr) { |
668 | 10.2M | task_id = next_task_id_.fetch_add(1); |
669 | 10.2M | } |
670 | 10.2M | auto task = std::make_shared<DelayedTask>( |
671 | 10.2M | std::move(func), when, task_id, source_location, msgr); |
672 | 10.2M | if (msgr != nullptr) { |
673 | 10.2M | std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_); |
674 | 10.2M | scheduled_tasks_.emplace(task_id, task); |
675 | 10.2M | } |
676 | | |
677 | 10.2M | if (chosen->ScheduleReactorTask(task)10.2M ) { |
678 | 10.2M | return task_id; |
679 | 10.2M | } |
680 | | |
681 | 18.4E | { |
682 | 18.4E | std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_); |
683 | 18.4E | scheduled_tasks_.erase(task_id); |
684 | 18.4E | } |
685 | | |
686 | 18.4E | return kInvalidTaskId; |
687 | 10.2M | } |
688 | | |
689 | 8.40M | scoped_refptr<MetricEntity> Messenger::metric_entity() const { |
690 | 8.40M | return metric_entity_; |
691 | 8.40M | } |
692 | | |
693 | | } // namespace rpc |
694 | | } // namespace yb |