YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/rpc.h"
34
35
#include <functional>
36
#include <string>
37
#include <thread>
38
39
#include "yb/gutil/strings/substitute.h"
40
41
#include "yb/rpc/messenger.h"
42
#include "yb/rpc/rpc_header.pb.h"
43
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/logging.h"
46
#include "yb/util/random_util.h"
47
#include "yb/util/source_location.h"
48
#include "yb/util/status_format.h"
49
#include "yb/util/trace.h"
50
#include "yb/util/tsan_util.h"
51
52
using namespace std::literals;
53
using namespace std::placeholders;
54
55
DEFINE_int64(rpcs_shutdown_timeout_ms, 15000 * yb::kTimeMultiplier,
56
             "Timeout for a batch of multiple RPCs invoked in parallel to shutdown.");
57
DEFINE_int64(rpcs_shutdown_extra_delay_ms, 5000 * yb::kTimeMultiplier,
58
             "Extra allowed time for a single RPC command to complete after its deadline.");
59
DEFINE_int32(min_backoff_ms_exponent, 7,
60
             "Min amount of backoff delay if the server responds with TOO BUSY (default: 128ms). "
61
             "Set this to some amount, during which the server might have recovered.");
62
DEFINE_int32(max_backoff_ms_exponent, 16,
63
             "Max amount of backoff delay if the server responds with TOO BUSY (default: 64 sec). "
64
             "Set this to some duration, past which you are okay having no backoff for a Ddos "
65
             "style build-up, during times when the server is overloaded, and unable to recover.");
66
67
DEFINE_int32(linear_backoff_ms, 1,
68
             "Number of milliseconds added to delay while using linear backoff strategy.");
69
70
TAG_FLAG(min_backoff_ms_exponent, hidden);
71
TAG_FLAG(min_backoff_ms_exponent, advanced);
72
TAG_FLAG(max_backoff_ms_exponent, hidden);
73
TAG_FLAG(max_backoff_ms_exponent, advanced);
74
75
namespace yb {
76
77
using std::shared_ptr;
78
using strings::Substitute;
79
using strings::SubstituteAndAppend;
80
81
namespace rpc {
82
83
12.6M
RpcCommand::RpcCommand() : trace_(new Trace) {
84
12.6M
  if (Trace::CurrentTrace()) {
85
183k
    Trace::CurrentTrace()->AddChildTrace(trace_.get());
86
183k
  }
87
12.6M
}
88
89
12.6M
RpcCommand::~RpcCommand() {}
90
91
RpcRetrier::RpcRetrier(CoarseTimePoint deadline, Messenger* messenger, ProxyCache *proxy_cache)
92
    : start_(CoarseMonoClock::now()),
93
      deadline_(deadline),
94
      messenger_(messenger),
95
12.6M
      proxy_cache_(*proxy_cache) {
96
12.6M
  DCHECK(deadline != CoarseTimePoint());
97
12.6M
}
98
99
bool RpcRetrier::HandleResponse(
100
12.2M
    RpcCommand* rpc, Status* out_status, RetryWhenBusy retry_when_busy) {
101
12.2M
  DCHECK_ONLY_NOTNULL(rpc);
102
12.2M
  DCHECK_ONLY_NOTNULL(out_status);
103
104
  // Always retry a TOO_BUSY error.
105
12.2M
  Status controller_status = controller_.status();
106
12.2M
  if (controller_status.IsRemoteError() && retry_when_busy) {
107
1.15k
    const ErrorStatusPB* err = controller_.error_response();
108
1.15k
    if (err &&
109
1.15k
        err->has_code() &&
110
1.15k
        err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
111
1.13k
      auto status = DelayedRetry(rpc, controller_status, BackoffStrategy::kExponential);
112
1.13k
      if (!status.ok()) {
113
84
        *out_status = status;
114
84
        return false;
115
84
      }
116
1.05k
      return true;
117
1.05k
    }
118
1.15k
  }
119
120
12.2M
  *out_status = controller_status;
121
12.2M
  return false;
122
12.2M
}
123
124
Status RpcRetrier::DelayedRetry(
125
956k
    RpcCommand* rpc, const Status& why_status, BackoffStrategy strategy) {
126
  // Add some jitter to the retry delay.
127
  //
128
  // If the delay causes us to miss our deadline, RetryCb will fail the
129
  // RPC on our behalf.
130
  // makes the call redundant by then.
131
956k
  if (strategy == BackoffStrategy::kExponential) {
132
1.13k
    retry_delay_ = fit_bounds<MonoDelta>(
133
1.13k
        retry_delay_ * 2,
134
1.13k
        1ms * (1ULL << (FLAGS_min_backoff_ms_exponent + 1)),
135
1.13k
        1ms * (1ULL << FLAGS_max_backoff_ms_exponent));
136
955k
  } else {
137
955k
    retry_delay_ += 1ms * FLAGS_linear_backoff_ms;
138
955k
  }
139
140
956k
  return DoDelayedRetry(rpc, why_status);
141
956k
}
142
143
816
Status RpcRetrier::DelayedRetry(RpcCommand* rpc, const Status& why_status, MonoDelta add_delay) {
144
816
  retry_delay_ += add_delay;
145
816
  return DoDelayedRetry(rpc, why_status);
146
816
}
147
148
956k
Status RpcRetrier::DoDelayedRetry(RpcCommand* rpc, const Status& why_status) {
149
956k
  if (!why_status.ok() && (last_error_.ok() || last_error_.IsTimedOut())) {
150
30.8k
    last_error_ = why_status;
151
30.8k
  }
152
956k
  attempt_num_++;
153
154
956k
  RpcRetrierState expected_state = RpcRetrierState::kIdle;
155
956k
  while (!state_.compare_exchange_strong(expected_state, RpcRetrierState::kScheduling)) {
156
208
    if (expected_state == RpcRetrierState::kFinished) {
157
84
      auto result = STATUS_FORMAT(IllegalState, "Retry of finished command: $0", rpc);
158
84
      LOG(WARNING) << result;
159
84
      return result;
160
84
    }
161
124
    if (expected_state == RpcRetrierState::kWaiting) {
162
0
      auto result = STATUS_FORMAT(IllegalState, "Retry of already waiting command: $0", rpc);
163
0
      LOG(WARNING) << result;
164
0
      return result;
165
0
    }
166
124
  }
167
168
956k
  auto retain_rpc = rpc->shared_from_this();
169
956k
  task_id_ = messenger_->ScheduleOnReactor(
170
956k
      std::bind(&RpcRetrier::DoRetry, this, rpc, _1),
171
956k
      retry_delay_ + MonoDelta::FromMilliseconds(RandomUniformInt(0, 4)),
172
956k
      SOURCE_LOCATION(), messenger_);
173
174
  // Scheduling state can be changed only in this method, so we expected both
175
  // exchanges below to succeed.
176
956k
  expected_state = RpcRetrierState::kScheduling;
177
956k
  if (task_id_.load(std::memory_order_acquire) == kInvalidTaskId) {
178
0
    auto result = STATUS_FORMAT(Aborted, "Failed to schedule: $0", rpc);
179
0
    LOG(WARNING) << result;
180
0
    CHECK(state_.compare_exchange_strong(
181
0
        expected_state, RpcRetrierState::kFinished, std::memory_order_acq_rel));
182
0
    return result;
183
0
  }
184
956k
  CHECK(state_.compare_exchange_strong(
185
956k
      expected_state, RpcRetrierState::kWaiting, std::memory_order_acq_rel));
186
956k
  return Status::OK();
187
956k
}
188
189
955k
void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) {
190
955k
  auto retain_rpc = rpc->shared_from_this();
191
192
955k
  RpcRetrierState expected_state = RpcRetrierState::kWaiting;
193
955k
  bool run = state_.compare_exchange_strong(expected_state, RpcRetrierState::kRunning);
194
  // There is very rare case when we get here before switching from scheduling to waiting state.
195
  // It happens only during shutdown, when it invoked soon after we scheduled retry.
196
  // So we are doing busy wait here, to avoid overhead in general case.
197
955k
  while (!run && expected_state == RpcRetrierState::kScheduling) {
198
0
    expected_state = RpcRetrierState::kWaiting;
199
0
    run = state_.compare_exchange_strong(expected_state, RpcRetrierState::kRunning);
200
0
    if (run) {
201
0
      break;
202
0
    }
203
0
    std::this_thread::sleep_for(1ms);
204
0
  }
205
955k
  task_id_ = kInvalidTaskId;
206
955k
  if (!run) {
207
572
    auto status = STATUS_FORMAT(
208
572
        Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state));
209
572
    VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", status.ToString());
210
572
    rpc->Finished(status);
211
572
    return;
212
572
  }
213
954k
  Status new_status = status;
214
954k
  if (new_status.ok()) {
215
    // Has this RPC timed out?
216
954k
    if (deadline_ != CoarseTimePoint::max()) {
217
954k
      auto now = CoarseMonoClock::Now();
218
954k
      if (deadline_ < now) {
219
6.40k
        string err_str = Format(
220
6.40k
          "$0 passed its deadline $1 (passed: $2)", *rpc, deadline_, now - start_);
221
6.40k
        if (!last_error_.ok()) {
222
6.34k
          SubstituteAndAppend(&err_str, ": $0", last_error_.ToString());
223
6.34k
        }
224
6.40k
        new_status = STATUS(TimedOut, err_str);
225
6.40k
      }
226
954k
    }
227
954k
  }
228
954k
  if (new_status.ok()) {
229
947k
    controller_.Reset();
230
947k
    VTRACE_TO(1, rpc->trace(), "Sending Rpc");
231
947k
    rpc->SendRpc();
232
7.24k
  } else {
233
    // Service unavailable here means that we failed to to schedule delayed task, i.e. reactor
234
    // is shutted down.
235
7.24k
    if (new_status.IsServiceUnavailable()) {
236
0
      new_status = STATUS_FORMAT(Aborted, "Aborted because of $0", new_status);
237
0
    }
238
7.24k
    VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", new_status.ToString());
239
7.24k
    rpc->Finished(new_status);
240
7.24k
  }
241
954k
  expected_state = RpcRetrierState::kRunning;
242
954k
  state_.compare_exchange_strong(expected_state, RpcRetrierState::kIdle);
243
954k
}
244
245
12.6M
RpcRetrier::~RpcRetrier() {
246
12.6M
  auto task_id = task_id_.load(std::memory_order_acquire);
247
12.6M
  auto state = state_.load(std::memory_order_acquire);
248
249
10.2k
  LOG_IF(
250
10.2k
      DFATAL,
251
10.2k
      (kInvalidTaskId != task_id) ||
252
10.2k
          (RpcRetrierState::kFinished != state && RpcRetrierState::kIdle != state))
253
10.2k
      << "Destroying RpcRetrier in invalid state: " << ToString();
254
12.6M
}
255
256
247k
void RpcRetrier::Abort() {
257
247k
  RpcRetrierState expected_state = RpcRetrierState::kIdle;
258
247k
  while (!state_.compare_exchange_weak(expected_state, RpcRetrierState::kFinished)) {
259
611
    if (expected_state == RpcRetrierState::kFinished) {
260
0
      break;
261
0
    }
262
611
    if (expected_state != RpcRetrierState::kWaiting) {
263
24
      expected_state = RpcRetrierState::kIdle;
264
24
    }
265
611
    std::this_thread::sleep_for(10ms);
266
611
  }
267
247k
  for (;;) {
268
247k
    auto task_id = task_id_.load(std::memory_order_acquire);
269
247k
    if (task_id == kInvalidTaskId) {
270
247k
      break;
271
247k
    }
272
586
    messenger_->AbortOnReactor(task_id);
273
586
    std::this_thread::sleep_for(10ms);
274
586
  }
275
247k
}
276
277
55.6k
std::string RpcRetrier::ToString() const {
278
55.6k
  return Format("{ task_id: $0 state: $1 deadline: $2 }",
279
55.6k
                task_id_.load(std::memory_order_acquire),
280
55.6k
                state_.load(std::memory_order_acquire),
281
55.6k
                deadline_);
282
55.6k
}
283
284
11.9M
RpcController* RpcRetrier::PrepareController() {
285
11.9M
  controller_.set_timeout(deadline_ - CoarseMonoClock::now());
286
11.9M
  return &controller_;
287
11.9M
}
288
289
1
void Rpc::ScheduleRetry(const Status& status) {
290
1
  auto retry_status = mutable_retrier()->DelayedRetry(this, status);
291
1
  if (!retry_status.ok()) {
292
0
    LOG(WARNING) << "Failed to schedule retry: " << retry_status;
293
0
    VTRACE_TO(1, trace(), "Rpc Finished with status $0", retry_status.ToString());
294
0
    Finished(retry_status);
295
0
  }
296
1
}
297
298
506k
Rpcs::Rpcs(std::mutex* mutex) {
299
506k
  if (mutex) {
300
0
    mutex_ = mutex;
301
506k
  } else {
302
506k
    mutex_holder_.emplace();
303
506k
    mutex_ = &mutex_holder_.get();
304
506k
  }
305
506k
}
306
307
471k
CoarseTimePoint Rpcs::DoRequestAbortAll(RequestShutdown shutdown) {
308
471k
  std::vector<Calls::value_type> calls;
309
471k
  {
310
471k
    std::lock_guard<std::mutex> lock(*mutex_);
311
471k
    if (!shutdown_) {
312
436k
      shutdown_ = shutdown;
313
436k
      calls.reserve(calls_.size());
314
436k
      calls.assign(calls_.begin(), calls_.end());
315
436k
    }
316
471k
  }
317
471k
  auto deadline = CoarseMonoClock::now() + FLAGS_rpcs_shutdown_timeout_ms * 1ms;
318
  // It takes some time to complete rpc command after its deadline has passed.
319
  // So we add extra time for it.
320
471k
  auto single_call_extra_delay = std::chrono::milliseconds(FLAGS_rpcs_shutdown_extra_delay_ms);
321
247k
  for (auto& call : calls) {
322
247k
    CHECK(call);
323
247k
    call->Abort();
324
247k
    deadline = std::max(deadline, call->deadline() + single_call_extra_delay);
325
247k
  }
326
327
471k
  return deadline;
328
471k
}
329
330
466k
void Rpcs::Shutdown() {
331
466k
  auto deadline = DoRequestAbortAll(RequestShutdown::kTrue);
332
466k
  {
333
466k
    std::unique_lock<std::mutex> lock(*mutex_);
334
696k
    while (!calls_.empty()) {
335
229k
      LOG(INFO) << "Waiting calls: " << calls_.size();
336
229k
      if (cond_.wait_until(lock, deadline) == std::cv_status::timeout) {
337
0
        break;
338
0
      }
339
229k
    }
340
199
    CHECK(calls_.empty()) << "Calls: " << AsString(calls_);
341
466k
  }
342
466k
}
343
344
1.20M
void Rpcs::Register(RpcCommandPtr call, Handle* handle) {
345
1.20M
  if (*handle == calls_.end()) {
346
1.20M
    *handle = Register(std::move(call));
347
1.20M
  }
348
1.20M
}
349
350
1.20M
Rpcs::Handle Rpcs::Register(RpcCommandPtr call) {
351
1.20M
  std::lock_guard<std::mutex> lock(*mutex_);
352
1.20M
  if (shutdown_) {
353
0
    call->Abort();
354
0
    return InvalidHandle();
355
0
  }
356
1.20M
  calls_.push_back(std::move(call));
357
1.20M
  return --calls_.end();
358
1.20M
}
359
360
1.17M
bool Rpcs::RegisterAndStart(RpcCommandPtr call, Handle* handle) {
361
1.17M
  CHECK(*handle == calls_.end());
362
1.17M
  Register(std::move(call), handle);
363
1.17M
  if (*handle == InvalidHandle()) {
364
0
    return false;
365
0
  }
366
367
1.17M
  VTRACE_TO(1, (***handle).trace(), "Sending Rpc");
368
1.17M
  (***handle).SendRpc();
369
1.17M
  return true;
370
1.17M
}
371
372
6.24M
RpcCommandPtr Rpcs::Unregister(Handle* handle) {
373
6.24M
  if (*handle == calls_.end()) {
374
3.79k
    return RpcCommandPtr();
375
3.79k
  }
376
6.24M
  auto result = **handle;
377
6.24M
  {
378
6.24M
    std::lock_guard<std::mutex> lock(*mutex_);
379
6.24M
    calls_.erase(*handle);
380
6.24M
    cond_.notify_one();
381
6.24M
  }
382
6.24M
  *handle = calls_.end();
383
6.24M
  return result;
384
6.24M
}
385
386
5.04M
Rpcs::Handle Rpcs::Prepare() {
387
5.04M
  std::lock_guard<std::mutex> lock(*mutex_);
388
5.04M
  if (shutdown_) {
389
0
    return InvalidHandle();
390
0
  }
391
5.04M
  calls_.emplace_back();
392
5.04M
  return --calls_.end();
393
5.04M
}
394
395
5.42k
void Rpcs::RequestAbortAll() {
396
5.42k
  DoRequestAbortAll(RequestShutdown::kFalse);
397
5.42k
}
398
399
1.22M
void Rpcs::Abort(std::initializer_list<Handle*> list) {
400
1.22M
  std::vector<RpcCommandPtr> to_abort;
401
1.22M
  {
402
1.22M
    std::lock_guard<std::mutex> lock(*mutex_);
403
2.69M
    for (auto& handle : list) {
404
2.69M
      if (*handle != calls_.end()) {
405
0
        to_abort.push_back(**handle);
406
0
      }
407
2.69M
    }
408
1.22M
  }
409
1.22M
  if (to_abort.empty()) {
410
1.22M
    return;
411
1.22M
  }
412
307
  for (auto& rpc : to_abort) {
413
0
    rpc->Abort();
414
0
  }
415
307
  {
416
307
    std::unique_lock<std::mutex> lock(*mutex_);
417
0
    for (auto& handle : list) {
418
0
      while (*handle != calls_.end()) {
419
0
        cond_.wait(lock);
420
0
      }
421
0
    }
422
307
  }
423
307
}
424
425
} // namespace rpc
426
} // namespace yb