YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/rpc.h"
34
35
#include <functional>
36
#include <string>
37
#include <thread>
38
39
#include "yb/gutil/strings/substitute.h"
40
41
#include "yb/rpc/messenger.h"
42
#include "yb/rpc/rpc_header.pb.h"
43
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/logging.h"
46
#include "yb/util/random_util.h"
47
#include "yb/util/source_location.h"
48
#include "yb/util/status_format.h"
49
#include "yb/util/trace.h"
50
#include "yb/util/tsan_util.h"
51
52
using namespace std::literals;
53
using namespace std::placeholders;
54
55
DEFINE_int64(rpcs_shutdown_timeout_ms, 15000 * yb::kTimeMultiplier,
56
             "Timeout for a batch of multiple RPCs invoked in parallel to shutdown.");
57
DEFINE_int64(rpcs_shutdown_extra_delay_ms, 5000 * yb::kTimeMultiplier,
58
             "Extra allowed time for a single RPC command to complete after its deadline.");
59
DEFINE_int32(min_backoff_ms_exponent, 7,
60
             "Min amount of backoff delay if the server responds with TOO BUSY (default: 128ms). "
61
             "Set this to some amount, during which the server might have recovered.");
62
DEFINE_int32(max_backoff_ms_exponent, 16,
63
             "Max amount of backoff delay if the server responds with TOO BUSY (default: 64 sec). "
64
             "Set this to some duration, past which you are okay having no backoff for a Ddos "
65
             "style build-up, during times when the server is overloaded, and unable to recover.");
66
67
DEFINE_int32(linear_backoff_ms, 1,
68
             "Number of milliseconds added to delay while using linear backoff strategy.");
69
70
TAG_FLAG(min_backoff_ms_exponent, hidden);
71
TAG_FLAG(min_backoff_ms_exponent, advanced);
72
TAG_FLAG(max_backoff_ms_exponent, hidden);
73
TAG_FLAG(max_backoff_ms_exponent, advanced);
74
75
namespace yb {
76
77
using std::shared_ptr;
78
using strings::Substitute;
79
using strings::SubstituteAndAppend;
80
81
namespace rpc {
82
83
49.5M
RpcCommand::RpcCommand() : trace_(new Trace) {
84
49.5M
  if (Trace::CurrentTrace()) {
85
515k
    Trace::CurrentTrace()->AddChildTrace(trace_.get());
86
515k
  }
87
49.5M
}
88
89
49.5M
RpcCommand::~RpcCommand() {}
90
91
RpcRetrier::RpcRetrier(CoarseTimePoint deadline, Messenger* messenger, ProxyCache *proxy_cache)
92
    : start_(CoarseMonoClock::now()),
93
      deadline_(deadline),
94
      messenger_(messenger),
95
49.5M
      proxy_cache_(*proxy_cache) {
96
49.5M
  DCHECK(deadline != CoarseTimePoint());
97
49.5M
}
98
99
bool RpcRetrier::HandleResponse(
100
48.5M
    RpcCommand* rpc, Status* out_status, RetryWhenBusy retry_when_busy) {
101
48.5M
  DCHECK_ONLY_NOTNULL(rpc);
102
48.5M
  DCHECK_ONLY_NOTNULL(out_status);
103
104
  // Always retry a TOO_BUSY error.
105
48.5M
  Status controller_status = controller_.status();
106
48.5M
  if (controller_status.IsRemoteError() && 
retry_when_busy266k
) {
107
266k
    const ErrorStatusPB* err = controller_.error_response();
108
266k
    if (err &&
109
266k
        err->has_code() &&
110
266k
        err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
111
1.29k
      auto status = DelayedRetry(rpc, controller_status, BackoffStrategy::kExponential);
112
1.29k
      if (!status.ok()) {
113
221
        *out_status = status;
114
221
        return false;
115
221
      }
116
1.07k
      return true;
117
1.29k
    }
118
266k
  }
119
120
48.5M
  *out_status = controller_status;
121
48.5M
  return false;
122
48.5M
}
123
124
Status RpcRetrier::DelayedRetry(
125
10.2M
    RpcCommand* rpc, const Status& why_status, BackoffStrategy strategy) {
126
  // Add some jitter to the retry delay.
127
  //
128
  // If the delay causes us to miss our deadline, RetryCb will fail the
129
  // RPC on our behalf.
130
  // makes the call redundant by then.
131
10.2M
  if (strategy == BackoffStrategy::kExponential) {
132
1.29k
    retry_delay_ = fit_bounds<MonoDelta>(
133
1.29k
        retry_delay_ * 2,
134
1.29k
        1ms * (1ULL << (FLAGS_min_backoff_ms_exponent + 1)),
135
1.29k
        1ms * (1ULL << FLAGS_max_backoff_ms_exponent));
136
10.2M
  } else {
137
10.2M
    retry_delay_ += 1ms * FLAGS_linear_backoff_ms;
138
10.2M
  }
139
140
10.2M
  return DoDelayedRetry(rpc, why_status);
141
10.2M
}
142
143
1.44k
Status RpcRetrier::DelayedRetry(RpcCommand* rpc, const Status& why_status, MonoDelta add_delay) {
144
1.44k
  retry_delay_ += add_delay;
145
1.44k
  return DoDelayedRetry(rpc, why_status);
146
1.44k
}
147
148
10.2M
Status RpcRetrier::DoDelayedRetry(RpcCommand* rpc, const Status& why_status) {
149
10.2M
  if (!why_status.ok() && 
(10.2M
last_error_.ok()10.2M
||
last_error_.IsTimedOut()9.77M
)) {
150
438k
    last_error_ = why_status;
151
438k
  }
152
10.2M
  attempt_num_++;
153
154
10.2M
  RpcRetrierState expected_state = RpcRetrierState::kIdle;
155
10.2M
  while (!state_.compare_exchange_strong(expected_state, RpcRetrierState::kScheduling)) {
156
440
    if (expected_state == RpcRetrierState::kFinished) {
157
221
      auto result = STATUS_FORMAT(IllegalState, "Retry of finished command: $0", rpc);
158
221
      LOG(WARNING) << result;
159
221
      return result;
160
221
    }
161
219
    if (expected_state == RpcRetrierState::kWaiting) {
162
0
      auto result = STATUS_FORMAT(IllegalState, "Retry of already waiting command: $0", rpc);
163
0
      LOG(WARNING) << result;
164
0
      return result;
165
0
    }
166
219
  }
167
168
10.2M
  auto retain_rpc = rpc->shared_from_this();
169
10.2M
  task_id_ = messenger_->ScheduleOnReactor(
170
10.2M
      std::bind(&RpcRetrier::DoRetry, this, rpc, _1),
171
10.2M
      retry_delay_ + MonoDelta::FromMilliseconds(RandomUniformInt(0, 4)),
172
10.2M
      SOURCE_LOCATION(), messenger_);
173
174
  // Scheduling state can be changed only in this method, so we expected both
175
  // exchanges below to succeed.
176
10.2M
  expected_state = RpcRetrierState::kScheduling;
177
10.2M
  if (task_id_.load(std::memory_order_acquire) == kInvalidTaskId) {
178
0
    auto result = STATUS_FORMAT(Aborted, "Failed to schedule: $0", rpc);
179
0
    LOG(WARNING) << result;
180
0
    CHECK(state_.compare_exchange_strong(
181
0
        expected_state, RpcRetrierState::kFinished, std::memory_order_acq_rel));
182
0
    return result;
183
0
  }
184
10.2M
  CHECK(state_.compare_exchange_strong(
185
10.2M
      expected_state, RpcRetrierState::kWaiting, std::memory_order_acq_rel));
186
10.2M
  return Status::OK();
187
10.2M
}
188
189
10.2M
void RpcRetrier::DoRetry(RpcCommand* rpc, const Status& status) {
190
10.2M
  auto retain_rpc = rpc->shared_from_this();
191
192
10.2M
  RpcRetrierState expected_state = RpcRetrierState::kWaiting;
193
10.2M
  bool run = state_.compare_exchange_strong(expected_state, RpcRetrierState::kRunning);
194
  // There is very rare case when we get here before switching from scheduling to waiting state.
195
  // It happens only during shutdown, when it invoked soon after we scheduled retry.
196
  // So we are doing busy wait here, to avoid overhead in general case.
197
10.2M
  while (!run && 
expected_state == RpcRetrierState::kScheduling943
) {
198
130
    expected_state = RpcRetrierState::kWaiting;
199
130
    run = state_.compare_exchange_strong(expected_state, RpcRetrierState::kRunning);
200
130
    if (run) {
201
53
      break;
202
53
    }
203
77
    std::this_thread::sleep_for(1ms);
204
77
  }
205
10.2M
  task_id_ = kInvalidTaskId;
206
10.2M
  if (!run) {
207
813
    auto status = STATUS_FORMAT(
208
813
        Aborted, "$0 aborted: $1", rpc->ToString(), yb::rpc::ToString(expected_state));
209
813
    VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", status.ToString());
210
813
    rpc->Finished(status);
211
813
    return;
212
813
  }
213
10.2M
  Status new_status = status;
214
10.2M
  if (
new_status.ok()10.2M
) {
215
    // Has this RPC timed out?
216
10.2M
    if (deadline_ != CoarseTimePoint::max()) {
217
10.2M
      auto now = CoarseMonoClock::Now();
218
10.2M
      if (deadline_ < now) {
219
10.6k
        string err_str = Format(
220
10.6k
          "$0 passed its deadline $1 (passed: $2)", *rpc, deadline_, now - start_);
221
10.6k
        if (!last_error_.ok()) {
222
10.5k
          SubstituteAndAppend(&err_str, ": $0", last_error_.ToString());
223
10.5k
        }
224
10.6k
        new_status = STATUS(TimedOut, err_str);
225
10.6k
      }
226
10.2M
    }
227
10.2M
  }
228
10.2M
  if (new_status.ok()) {
229
10.2M
    controller_.Reset();
230
10.2M
    VTRACE_TO(1, rpc->trace(), "Sending Rpc");
231
10.2M
    rpc->SendRpc();
232
10.2M
  } else {
233
    // Service unavailable here means that we failed to to schedule delayed task, i.e. reactor
234
    // is shutted down.
235
11.4k
    if (new_status.IsServiceUnavailable()) {
236
0
      new_status = STATUS_FORMAT(Aborted, "Aborted because of $0", new_status);
237
0
    }
238
11.4k
    VTRACE_TO(1, rpc->trace(), "Rpc Finished with status $0", new_status.ToString());
239
11.4k
    rpc->Finished(new_status);
240
11.4k
  }
241
10.2M
  expected_state = RpcRetrierState::kRunning;
242
10.2M
  state_.compare_exchange_strong(expected_state, RpcRetrierState::kIdle);
243
10.2M
}
244
245
49.5M
RpcRetrier::~RpcRetrier() {
246
49.5M
  auto task_id = task_id_.load(std::memory_order_acquire);
247
49.5M
  auto state = state_.load(std::memory_order_acquire);
248
249
49.5M
  LOG_IF(
250
24.7k
      DFATAL,
251
24.7k
      (kInvalidTaskId != task_id) ||
252
24.7k
          (RpcRetrierState::kFinished != state && RpcRetrierState::kIdle != state))
253
24.7k
      << "Destroying RpcRetrier in invalid state: " << ToString();
254
49.5M
}
255
256
515k
void RpcRetrier::Abort() {
257
515k
  RpcRetrierState expected_state = RpcRetrierState::kIdle;
258
516k
  while (!state_.compare_exchange_weak(expected_state, RpcRetrierState::kFinished)) {
259
1.36k
    if (expected_state == RpcRetrierState::kFinished) {
260
501
      break;
261
501
    }
262
861
    if (expected_state != RpcRetrierState::kWaiting) {
263
30
      expected_state = RpcRetrierState::kIdle;
264
30
    }
265
861
    std::this_thread::sleep_for(10ms);
266
861
  }
267
516k
  for (;;) {
268
516k
    auto task_id = task_id_.load(std::memory_order_acquire);
269
516k
    if (task_id == kInvalidTaskId) {
270
515k
      break;
271
515k
    }
272
836
    messenger_->AbortOnReactor(task_id);
273
836
    std::this_thread::sleep_for(10ms);
274
836
  }
275
515k
}
276
277
67.0k
std::string RpcRetrier::ToString() const {
278
67.0k
  return Format("{ task_id: $0 state: $1 deadline: $2 }",
279
67.0k
                task_id_.load(std::memory_order_acquire),
280
67.0k
                state_.load(std::memory_order_acquire),
281
67.0k
                deadline_);
282
67.0k
}
283
284
48.2M
RpcController* RpcRetrier::PrepareController() {
285
48.2M
  controller_.set_timeout(deadline_ - CoarseMonoClock::now());
286
48.2M
  return &controller_;
287
48.2M
}
288
289
39
void Rpc::ScheduleRetry(const Status& status) {
290
39
  auto retry_status = mutable_retrier()->DelayedRetry(this, status);
291
39
  if (!retry_status.ok()) {
292
0
    LOG(WARNING) << "Failed to schedule retry: " << retry_status;
293
0
    VTRACE_TO(1, trace(), "Rpc Finished with status $0", retry_status.ToString());
294
0
    Finished(retry_status);
295
0
  }
296
39
}
297
298
611k
Rpcs::Rpcs(std::mutex* mutex) {
299
611k
  if (mutex) {
300
0
    mutex_ = mutex;
301
611k
  } else {
302
611k
    mutex_holder_.emplace();
303
611k
    mutex_ = &mutex_holder_.get();
304
611k
  }
305
611k
}
306
307
976k
CoarseTimePoint Rpcs::DoRequestAbortAll(RequestShutdown shutdown) {
308
976k
  std::vector<Calls::value_type> calls;
309
976k
  {
310
976k
    std::lock_guard<std::mutex> lock(*mutex_);
311
976k
    if (!shutdown_) {
312
907k
      shutdown_ = shutdown;
313
907k
      calls.reserve(calls_.size());
314
907k
      calls.assign(calls_.begin(), calls_.end());
315
907k
    }
316
976k
  }
317
976k
  auto deadline = CoarseMonoClock::now() + FLAGS_rpcs_shutdown_timeout_ms * 1ms;
318
  // It takes some time to complete rpc command after its deadline has passed.
319
  // So we add extra time for it.
320
976k
  auto single_call_extra_delay = std::chrono::milliseconds(FLAGS_rpcs_shutdown_extra_delay_ms);
321
976k
  for (auto& call : calls) {
322
515k
    CHECK(call);
323
515k
    call->Abort();
324
515k
    deadline = std::max(deadline, call->deadline() + single_call_extra_delay);
325
515k
  }
326
327
976k
  return deadline;
328
976k
}
329
330
558k
void Rpcs::Shutdown() {
331
558k
  auto deadline = DoRequestAbortAll(RequestShutdown::kTrue);
332
558k
  {
333
558k
    std::unique_lock<std::mutex> lock(*mutex_);
334
856k
    while (!calls_.empty()) {
335
297k
      LOG(INFO) << "Waiting calls: " << calls_.size();
336
297k
      if (cond_.wait_until(lock, deadline) == std::cv_status::timeout) {
337
0
        break;
338
0
      }
339
297k
    }
340
558k
    CHECK
(calls_.empty()) << "Calls: " << AsString(calls_)223
;
341
558k
  }
342
558k
}
343
344
1.81M
void Rpcs::Register(RpcCommandPtr call, Handle* handle) {
345
1.81M
  if (*handle == calls_.end()) {
346
1.81M
    *handle = Register(std::move(call));
347
1.81M
  }
348
1.81M
}
349
350
1.81M
Rpcs::Handle Rpcs::Register(RpcCommandPtr call) {
351
1.81M
  std::lock_guard<std::mutex> lock(*mutex_);
352
1.81M
  if (shutdown_) {
353
0
    call->Abort();
354
0
    return InvalidHandle();
355
0
  }
356
1.81M
  calls_.push_back(std::move(call));
357
1.81M
  return --calls_.end();
358
1.81M
}
359
360
1.77M
bool Rpcs::RegisterAndStart(RpcCommandPtr call, Handle* handle) {
361
1.77M
  CHECK(*handle == calls_.end());
362
1.77M
  Register(std::move(call), handle);
363
1.77M
  if (*handle == InvalidHandle()) {
364
0
    return false;
365
0
  }
366
367
1.77M
  VTRACE_TO(1, (***handle).trace(), "Sending Rpc");
368
1.77M
  (***handle).SendRpc();
369
1.77M
  return true;
370
1.77M
}
371
372
36.6M
RpcCommandPtr Rpcs::Unregister(Handle* handle) {
373
36.6M
  if (*handle == calls_.end()) {
374
5.70k
    return RpcCommandPtr();
375
5.70k
  }
376
36.5M
  auto result = **handle;
377
36.5M
  {
378
36.5M
    std::lock_guard<std::mutex> lock(*mutex_);
379
36.5M
    calls_.erase(*handle);
380
36.5M
    cond_.notify_one();
381
36.5M
  }
382
36.5M
  *handle = calls_.end();
383
36.5M
  return result;
384
36.6M
}
385
386
34.8M
Rpcs::Handle Rpcs::Prepare() {
387
34.8M
  std::lock_guard<std::mutex> lock(*mutex_);
388
34.8M
  if (shutdown_) {
389
0
    return InvalidHandle();
390
0
  }
391
34.8M
  calls_.emplace_back();
392
34.8M
  return --calls_.end();
393
34.8M
}
394
395
417k
void Rpcs::RequestAbortAll() {
396
417k
  DoRequestAbortAll(RequestShutdown::kFalse);
397
417k
}
398
399
2.09M
void Rpcs::Abort(std::initializer_list<Handle*> list) {
400
2.09M
  std::vector<RpcCommandPtr> to_abort;
401
2.09M
  {
402
2.09M
    std::lock_guard<std::mutex> lock(*mutex_);
403
4.60M
    for (auto& handle : list) {
404
4.60M
      if (*handle != calls_.end()) {
405
0
        to_abort.push_back(**handle);
406
0
      }
407
4.60M
    }
408
2.09M
  }
409
2.09M
  if (to_abort.empty()) {
410
2.09M
    return;
411
2.09M
  }
412
678
  for (auto& rpc : to_abort) {
413
0
    rpc->Abort();
414
0
  }
415
678
  {
416
678
    std::unique_lock<std::mutex> lock(*mutex_);
417
678
    for (auto& handle : list) {
418
0
      while (*handle != calls_.end()) {
419
0
        cond_.wait(lock);
420
0
      }
421
0
    }
422
678
  }
423
678
}
424
425
} // namespace rpc
426
} // namespace yb