/Users/deen/code/yugabyte-db/src/yb/rpc/rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/rpc.h" |
34 | | |
35 | | #include <functional> |
36 | | #include <string> |
37 | | #include <thread> |
38 | | |
39 | | #include "yb/gutil/strings/substitute.h" |
40 | | |
41 | | #include "yb/rpc/messenger.h" |
42 | | #include "yb/rpc/rpc_header.pb.h" |
43 | | |
44 | | #include "yb/util/flag_tags.h" |
45 | | #include "yb/util/logging.h" |
46 | | #include "yb/util/random_util.h" |
47 | | #include "yb/util/source_location.h" |
48 | | #include "yb/util/status_format.h" |
49 | | #include "yb/util/trace.h" |
50 | | #include "yb/util/tsan_util.h" |
51 | | |
52 | | using namespace std::literals; |
53 | | using namespace std::placeholders; |
54 | | |
55 | | DEFINE_int64(rpcs_shutdown_timeout_ms, 15000 * yb::kTimeMultiplier, |
56 | | "Timeout for a batch of multiple RPCs invoked in parallel to shutdown."); |
57 | | DEFINE_int64(rpcs_shutdown_extra_delay_ms, 5000 * yb::kTimeMultiplier, |
58 | | "Extra allowed time for a single RPC command to complete after its deadline."); |
59 | | DEFINE_int32(min_backoff_ms_exponent, 7, |
60 | | "Min amount of backoff delay if the server responds with TOO BUSY (default: 128ms). " |
61 | | "Set this to some amount, during which the server might have recovered."); |
62 | | DEFINE_int32(max_backoff_ms_exponent, 16, |
63 | | "Max amount of backoff delay if the server responds with TOO BUSY (default: 64 sec). " |
64 | | "Set this to some duration, past which you are okay having no backoff for a Ddos " |
65 | | "style build-up, during times when the server is overloaded, and unable to recover."); |
66 | | |
67 | | DEFINE_int32(linear_backoff_ms, 1, |
68 | | "Number of milliseconds added to delay while using linear backoff strategy."); |
69 | | |
70 | | TAG_FLAG(min_backoff_ms_exponent, hidden); |
71 | | TAG_FLAG(min_backoff_ms_exponent, advanced); |
72 | | TAG_FLAG(max_backoff_ms_exponent, hidden); |
73 | | TAG_FLAG(max_backoff_ms_exponent, advanced); |
74 | | |
75 | | namespace yb { |
76 | | |
77 | | using std::shared_ptr; |
78 | | using strings::Substitute; |
79 | | using strings::SubstituteAndAppend; |
80 | | |
81 | | namespace rpc { |
82 | | |
83 | 12.6M | RpcCommand::RpcCommand() : trace_(new Trace) { |
84 | 12.6M | if (Trace::CurrentTrace()) { |
85 | 183k | Trace::CurrentTrace()->AddChildTrace(trace_.get()); |
86 | 183k | } |
87 | 12.6M | } |
88 | | |
89 | 12.6M | RpcCommand::~RpcCommand() {} |
90 | | |
91 | | RpcRetrier::RpcRetrier(CoarseTimePoint deadline, Messenger* messenger, ProxyCache *proxy_cache) |
92 | | : start_(CoarseMonoClock::now()), |
93 | | deadline_(deadline), |
94 | | messenger_(messenger), |
95 | 12.6M | proxy_cache_(*proxy_cache) { |
96 | 12.6M | DCHECK(deadline != CoarseTimePoint()); |
97 | 12.6M | } |
98 | | |
99 | | bool RpcRetrier::HandleResponse( |
100 | 12.2M | RpcCommand* rpc, Status* out_status, RetryWhenBusy retry_when_busy) { |
101 | 12.2M | DCHECK_ONLY_NOTNULL(rpc); |
102 | 12.2M | DCHECK_ONLY_NOTNULL(out_status); |
103 | | |
104 | | // Always retry a TOO_BUSY error. |
105 | 12.2M | Status controller_status = controller_.status(); |
106 | 12.2M | if (controller_status.IsRemoteError() && retry_when_busy) { |
107 | 1.15k | const ErrorStatusPB* err = controller_.error_response(); |
108 | 1.15k | if (err && |
109 | 1.15k | err->has_code() && |
110 | 1.15k | err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) { |
111 | 1.13k | auto status = DelayedRetry(rpc, controller_status, BackoffStrategy::kExponential); |
112 | 1.13k | if (!status.ok()) { |
113 | 84 | *out_status = status; |
114 | 84 | return false; |
115 | 84 | } |
116 | 1.05k | return true; |
117 | 1.05k | } |
118 | 1.15k | } |
119 | | |
120 | 12.2M | *out_status = controller_status; |
121 | 12.2M | return false; |
122 | 12.2M | } |
123 | | |
124 | | Status RpcRetrier::DelayedRetry( |
125 | 956k | RpcCommand* rpc, const Status& why_status, BackoffStrategy strategy) { |
126 | | // Add some jitter to the retry delay. |
127 | | // |
128 | | // If the delay causes us to miss our deadline, RetryCb will fail the |
129 | | // RPC on our behalf. |
130 | | // makes the call redundant by then. |
131 | 956k | if (strategy == BackoffStrategy::kExponential) { |
132 | 1.13k | retry_delay_ = fit_bounds<MonoDelta>( |
133 | 1.13k | retry_delay_ * 2, |
134 | 1.13k | 1ms * (1ULL << (FLAGS_min_backoff_ms_exponent + 1)), |
135 | 1.13k | 1ms * (1ULL << FLAGS_max_backoff_ms_exponent)); |
136 | 955k | } else { |
137 | 955k | retry_delay_ += 1ms * FLAGS_linear_backoff_ms; |
138 | 955k | } |
139 | | |
140 | 956k | return DoDelayedRetry(rpc, why_status); |
141 | 956k | } |
142 | | |
143 | 816 | Status RpcRetrier::DelayedRetry(RpcCommand* rpc, const Status& why_status, MonoDelta add_delay) { |
144 | 816 | retry_delay_ += add_delay; |
145 | 816 | return DoDelayedRetry(rpc, why_status); |
146 | 816 | } |
147 | | |
148 | 956k | Status RpcRetrier::DoDelayedRetry(RpcCommand* rpc, const Status& why_status) { |
149 | 956k | if (!why_status.ok() && (last_error_.ok() || last_error_.IsTimedOut())) { |
150 | 30.8k | last_error_ = why_status; |
151 | 30.8k | } |
152 | 956k | attempt_num_++; |
153 | | |
154 | 956k | RpcRetrierState expected_state = RpcRetrierState::kIdle; |
155 | 956k | while (!state_.compare_exchange_strong(expected_state, RpcRetrierState::kScheduling)) { |
156 | 208 | if (expected_state == RpcRetrierState::kFinished) { |
157 | 84 | auto result = STATUS_FORMAT(IllegalState, "Retry of finished command: $0", rpc); |
158 | 84 | LOG(WARNING) << result; |
159 | 84 | return result; |
160 | 84 | } |
161 | 124 | if (expected_state == RpcRetrierState::kWaiting) { |
162 | 0 | auto result = STATUS_FORMAT(IllegalState, "Retry of already waiting command: $0", rpc); |
163 | 0 | LOG(WARNING) << result; |
164 | 0 | return result; |
165 | 0 | } |
166 | 124 | } |
167 | | |
168 | 956k | auto retain_rpc = rpc->shared_from_this(); |
169 | 956k | task_id_ = messenger_->ScheduleOnReactor( |
170 | 956k | std::bind(&RpcRetrier::DoRetry, this, rpc, _1), |
171 | 956k | retry_delay_ + MonoDelta::FromMilliseconds(RandomUniformInt(0, 4)), |
172 | 956k | SOURCE_LOCATION(), messenger_); |
173 | | |
174 | | // Scheduling state can be changed only in this method, so we expected both |
175 | | // exchanges below to succeed. |
176 | 956k | expected_state = RpcRetrierState::kScheduling; |
177 | 956k | if (task_id_.load(std::memory_order_acquire) == kInvalidTaskId) { |
178 | 0 | auto result = STATUS_FORMAT(Aborted, "Failed to schedule: $0", rpc); |
179 | 0 | LOG(WARNING) << result; |
180 | 0 | CHECK(state_.compare_exchange_strong( |
181 | 0 | expected_state, RpcRetrierState::kFinished, std::memory_order_acq_rel)); |
182 | 0 | return result; |
183 | 0 | } |
184 | 956k | CHECK(state_.compare_exchange_strong( |
185 | 956k | expected_state, RpcRetrierState::kWaiting, std::memory_order_acq_rel)); |
186 | 956k | return Status::OK(); |
187 | 956k | } |
188 | | |
189 | 955k | void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) { |
190 | 955k | auto retain_rpc = rpc->shared_from_this(); |
191 | | |
192 | 955k | RpcRetrierState expected_state = RpcRetrierState::kWaiting; |
193 | 955k | bool run = state_.compare_exchange_strong(expected_state, RpcRetrierState::kRunning); |
194 | | // There is very rare case when we get here before switching from scheduling to waiting state. |
195 | | // It happens only during shutdown, when it invoked soon after we scheduled retry. |
196 | | // So we are doing busy wait here, to avoid overhead in general case. |
197 | 955k | while (!run && expected_state == RpcRetrierState::kScheduling) { |
198 | 0 | expected_state = RpcRetrierState::kWaiting; |
199 | 0 | run = state_.compare_exchange_strong(expected_state, RpcRetrierState::kRunning); |
200 | 0 | if (run) { |
201 | 0 | break; |
202 | 0 | } |
203 | 0 | std::this_thread::sleep_for(1ms); |
204 | 0 | } |
205 | 955k | task_id_ = kInvalidTaskId; |
206 | 955k | if (!run) { |
207 | 572 | auto status = STATUS_FORMAT( |
208 | 572 | Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state)); |
209 | 572 | VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", status.ToString()); |
210 | 572 | rpc->Finished(status); |
211 | 572 | return; |
212 | 572 | } |
213 | 954k | Status new_status = status; |
214 | 954k | if (new_status.ok()) { |
215 | | // Has this RPC timed out? |
216 | 954k | if (deadline_ != CoarseTimePoint::max()) { |
217 | 954k | auto now = CoarseMonoClock::Now(); |
218 | 954k | if (deadline_ < now) { |
219 | 6.40k | string err_str = Format( |
220 | 6.40k | "$0 passed its deadline $1 (passed: $2)", *rpc, deadline_, now - start_); |
221 | 6.40k | if (!last_error_.ok()) { |
222 | 6.34k | SubstituteAndAppend(&err_str, ": $0", last_error_.ToString()); |
223 | 6.34k | } |
224 | 6.40k | new_status = STATUS(TimedOut, err_str); |
225 | 6.40k | } |
226 | 954k | } |
227 | 954k | } |
228 | 954k | if (new_status.ok()) { |
229 | 947k | controller_.Reset(); |
230 | 947k | VTRACE_TO(1, rpc->trace(), "Sending Rpc"); |
231 | 947k | rpc->SendRpc(); |
232 | 7.24k | } else { |
233 | | // Service unavailable here means that we failed to to schedule delayed task, i.e. reactor |
234 | | // is shutted down. |
235 | 7.24k | if (new_status.IsServiceUnavailable()) { |
236 | 0 | new_status = STATUS_FORMAT(Aborted, "Aborted because of $0", new_status); |
237 | 0 | } |
238 | 7.24k | VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", new_status.ToString()); |
239 | 7.24k | rpc->Finished(new_status); |
240 | 7.24k | } |
241 | 954k | expected_state = RpcRetrierState::kRunning; |
242 | 954k | state_.compare_exchange_strong(expected_state, RpcRetrierState::kIdle); |
243 | 954k | } |
244 | | |
245 | 12.6M | RpcRetrier::~RpcRetrier() { |
246 | 12.6M | auto task_id = task_id_.load(std::memory_order_acquire); |
247 | 12.6M | auto state = state_.load(std::memory_order_acquire); |
248 | | |
249 | 10.2k | LOG_IF( |
250 | 10.2k | DFATAL, |
251 | 10.2k | (kInvalidTaskId != task_id) || |
252 | 10.2k | (RpcRetrierState::kFinished != state && RpcRetrierState::kIdle != state)) |
253 | 10.2k | << "Destroying RpcRetrier in invalid state: " << ToString(); |
254 | 12.6M | } |
255 | | |
256 | 247k | void RpcRetrier::Abort() { |
257 | 247k | RpcRetrierState expected_state = RpcRetrierState::kIdle; |
258 | 247k | while (!state_.compare_exchange_weak(expected_state, RpcRetrierState::kFinished)) { |
259 | 611 | if (expected_state == RpcRetrierState::kFinished) { |
260 | 0 | break; |
261 | 0 | } |
262 | 611 | if (expected_state != RpcRetrierState::kWaiting) { |
263 | 24 | expected_state = RpcRetrierState::kIdle; |
264 | 24 | } |
265 | 611 | std::this_thread::sleep_for(10ms); |
266 | 611 | } |
267 | 247k | for (;;) { |
268 | 247k | auto task_id = task_id_.load(std::memory_order_acquire); |
269 | 247k | if (task_id == kInvalidTaskId) { |
270 | 247k | break; |
271 | 247k | } |
272 | 586 | messenger_->AbortOnReactor(task_id); |
273 | 586 | std::this_thread::sleep_for(10ms); |
274 | 586 | } |
275 | 247k | } |
276 | | |
277 | 55.6k | std::string RpcRetrier::ToString() const { |
278 | 55.6k | return Format("{ task_id: $0 state: $1 deadline: $2 }", |
279 | 55.6k | task_id_.load(std::memory_order_acquire), |
280 | 55.6k | state_.load(std::memory_order_acquire), |
281 | 55.6k | deadline_); |
282 | 55.6k | } |
283 | | |
284 | 11.9M | RpcController* RpcRetrier::PrepareController() { |
285 | 11.9M | controller_.set_timeout(deadline_ - CoarseMonoClock::now()); |
286 | 11.9M | return &controller_; |
287 | 11.9M | } |
288 | | |
289 | 1 | void Rpc::ScheduleRetry(const Status& status) { |
290 | 1 | auto retry_status = mutable_retrier()->DelayedRetry(this, status); |
291 | 1 | if (!retry_status.ok()) { |
292 | 0 | LOG(WARNING) << "Failed to schedule retry: " << retry_status; |
293 | 0 | VTRACE_TO(1, trace(), "Rpc Finished with status $0", retry_status.ToString()); |
294 | 0 | Finished(retry_status); |
295 | 0 | } |
296 | 1 | } |
297 | | |
298 | 506k | Rpcs::Rpcs(std::mutex* mutex) { |
299 | 506k | if (mutex) { |
300 | 0 | mutex_ = mutex; |
301 | 506k | } else { |
302 | 506k | mutex_holder_.emplace(); |
303 | 506k | mutex_ = &mutex_holder_.get(); |
304 | 506k | } |
305 | 506k | } |
306 | | |
307 | 471k | CoarseTimePoint Rpcs::DoRequestAbortAll(RequestShutdown shutdown) { |
308 | 471k | std::vector<Calls::value_type> calls; |
309 | 471k | { |
310 | 471k | std::lock_guard<std::mutex> lock(*mutex_); |
311 | 471k | if (!shutdown_) { |
312 | 436k | shutdown_ = shutdown; |
313 | 436k | calls.reserve(calls_.size()); |
314 | 436k | calls.assign(calls_.begin(), calls_.end()); |
315 | 436k | } |
316 | 471k | } |
317 | 471k | auto deadline = CoarseMonoClock::now() + FLAGS_rpcs_shutdown_timeout_ms * 1ms; |
318 | | // It takes some time to complete rpc command after its deadline has passed. |
319 | | // So we add extra time for it. |
320 | 471k | auto single_call_extra_delay = std::chrono::milliseconds(FLAGS_rpcs_shutdown_extra_delay_ms); |
321 | 247k | for (auto& call : calls) { |
322 | 247k | CHECK(call); |
323 | 247k | call->Abort(); |
324 | 247k | deadline = std::max(deadline, call->deadline() + single_call_extra_delay); |
325 | 247k | } |
326 | | |
327 | 471k | return deadline; |
328 | 471k | } |
329 | | |
330 | 466k | void Rpcs::Shutdown() { |
331 | 466k | auto deadline = DoRequestAbortAll(RequestShutdown::kTrue); |
332 | 466k | { |
333 | 466k | std::unique_lock<std::mutex> lock(*mutex_); |
334 | 696k | while (!calls_.empty()) { |
335 | 229k | LOG(INFO) << "Waiting calls: " << calls_.size(); |
336 | 229k | if (cond_.wait_until(lock, deadline) == std::cv_status::timeout) { |
337 | 0 | break; |
338 | 0 | } |
339 | 229k | } |
340 | 199 | CHECK(calls_.empty()) << "Calls: " << AsString(calls_); |
341 | 466k | } |
342 | 466k | } |
343 | | |
344 | 1.20M | void Rpcs::Register(RpcCommandPtr call, Handle* handle) { |
345 | 1.20M | if (*handle == calls_.end()) { |
346 | 1.20M | *handle = Register(std::move(call)); |
347 | 1.20M | } |
348 | 1.20M | } |
349 | | |
350 | 1.20M | Rpcs::Handle Rpcs::Register(RpcCommandPtr call) { |
351 | 1.20M | std::lock_guard<std::mutex> lock(*mutex_); |
352 | 1.20M | if (shutdown_) { |
353 | 0 | call->Abort(); |
354 | 0 | return InvalidHandle(); |
355 | 0 | } |
356 | 1.20M | calls_.push_back(std::move(call)); |
357 | 1.20M | return --calls_.end(); |
358 | 1.20M | } |
359 | | |
360 | 1.17M | bool Rpcs::RegisterAndStart(RpcCommandPtr call, Handle* handle) { |
361 | 1.17M | CHECK(*handle == calls_.end()); |
362 | 1.17M | Register(std::move(call), handle); |
363 | 1.17M | if (*handle == InvalidHandle()) { |
364 | 0 | return false; |
365 | 0 | } |
366 | | |
367 | 1.17M | VTRACE_TO(1, (***handle).trace(), "Sending Rpc"); |
368 | 1.17M | (***handle).SendRpc(); |
369 | 1.17M | return true; |
370 | 1.17M | } |
371 | | |
372 | 6.24M | RpcCommandPtr Rpcs::Unregister(Handle* handle) { |
373 | 6.24M | if (*handle == calls_.end()) { |
374 | 3.79k | return RpcCommandPtr(); |
375 | 3.79k | } |
376 | 6.24M | auto result = **handle; |
377 | 6.24M | { |
378 | 6.24M | std::lock_guard<std::mutex> lock(*mutex_); |
379 | 6.24M | calls_.erase(*handle); |
380 | 6.24M | cond_.notify_one(); |
381 | 6.24M | } |
382 | 6.24M | *handle = calls_.end(); |
383 | 6.24M | return result; |
384 | 6.24M | } |
385 | | |
386 | 5.04M | Rpcs::Handle Rpcs::Prepare() { |
387 | 5.04M | std::lock_guard<std::mutex> lock(*mutex_); |
388 | 5.04M | if (shutdown_) { |
389 | 0 | return InvalidHandle(); |
390 | 0 | } |
391 | 5.04M | calls_.emplace_back(); |
392 | 5.04M | return --calls_.end(); |
393 | 5.04M | } |
394 | | |
395 | 5.42k | void Rpcs::RequestAbortAll() { |
396 | 5.42k | DoRequestAbortAll(RequestShutdown::kFalse); |
397 | 5.42k | } |
398 | | |
399 | 1.22M | void Rpcs::Abort(std::initializer_list<Handle*> list) { |
400 | 1.22M | std::vector<RpcCommandPtr> to_abort; |
401 | 1.22M | { |
402 | 1.22M | std::lock_guard<std::mutex> lock(*mutex_); |
403 | 2.69M | for (auto& handle : list) { |
404 | 2.69M | if (*handle != calls_.end()) { |
405 | 0 | to_abort.push_back(**handle); |
406 | 0 | } |
407 | 2.69M | } |
408 | 1.22M | } |
409 | 1.22M | if (to_abort.empty()) { |
410 | 1.22M | return; |
411 | 1.22M | } |
412 | 307 | for (auto& rpc : to_abort) { |
413 | 0 | rpc->Abort(); |
414 | 0 | } |
415 | 307 | { |
416 | 307 | std::unique_lock<std::mutex> lock(*mutex_); |
417 | 0 | for (auto& handle : list) { |
418 | 0 | while (*handle != calls_.end()) { |
419 | 0 | cond_.wait(lock); |
420 | 0 | } |
421 | 0 | } |
422 | 307 | } |
423 | 307 | } |
424 | | |
425 | | } // namespace rpc |
426 | | } // namespace yb |