YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/service_pool.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/service_pool.h"
34
35
#include <pthread.h>
36
#include <sys/types.h>
37
38
#include <functional>
39
#include <memory>
40
#include <queue>
41
#include <string>
42
#include <vector>
43
44
#include <boost/asio/strand.hpp>
45
#include <cds/container/basket_queue.h>
46
#include <cds/gc/dhp.h>
47
#include <glog/logging.h>
48
49
#include "yb/gutil/atomicops.h"
50
#include "yb/gutil/ref_counted.h"
51
#include "yb/gutil/strings/substitute.h"
52
53
#include "yb/rpc/inbound_call.h"
54
#include "yb/rpc/scheduler.h"
55
#include "yb/rpc/service_if.h"
56
57
#include "yb/util/countdown_latch.h"
58
#include "yb/util/flag_tags.h"
59
#include "yb/util/lockfree.h"
60
#include "yb/util/logging.h"
61
#include "yb/util/metrics.h"
62
#include "yb/util/monotime.h"
63
#include "yb/util/net/sockaddr.h"
64
#include "yb/util/scope_exit.h"
65
#include "yb/util/status.h"
66
#include "yb/util/trace.h"
67
68
using namespace std::literals;
69
using namespace std::placeholders;
70
using std::shared_ptr;
71
using strings::Substitute;
72
73
DEFINE_int64(max_time_in_queue_ms, 6000,
74
             "Fail calls that get stuck in the queue longer than the specified amount of time "
75
                 "(in ms)");
76
TAG_FLAG(max_time_in_queue_ms, advanced);
77
TAG_FLAG(max_time_in_queue_ms, runtime);
78
DEFINE_int64(backpressure_recovery_period_ms, 600000,
79
             "Once we hit a backpressure/service-overflow we will consider dropping stale requests "
80
             "for this duration (in ms)");
81
TAG_FLAG(backpressure_recovery_period_ms, advanced);
82
TAG_FLAG(backpressure_recovery_period_ms, runtime);
83
DEFINE_test_flag(bool, enable_backpressure_mode_for_testing, false,
84
            "For testing purposes. Enables the rpc's to be considered timed out in the queue even "
85
            "when we have not had any backpressure in the recent past.");
86
87
METRIC_DEFINE_coarse_histogram(server, rpc_incoming_queue_time,
88
                        "RPC Queue Time",
89
                        yb::MetricUnit::kMicroseconds,
90
                        "Number of microseconds incoming RPC requests spend in the worker queue");
91
92
METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue,
93
                      "RPC Queue Timeouts",
94
                      yb::MetricUnit::kRequests,
95
                      "Number of RPCs whose timeout elapsed while waiting "
96
                      "in the service queue, and thus were not processed. "
97
                      "Does not include calls that were expired before we tried to execute them.");
98
99
METRIC_DEFINE_counter(server, rpcs_timed_out_early_in_queue,
100
                      "RPC Queue Timeouts",
101
                      yb::MetricUnit::kRequests,
102
                      "Number of RPCs whose timeout elapsed while waiting "
103
                      "in the service queue, and thus were not processed. "
104
                      "Timeout for those calls were detected before the calls tried to execute.");
105
106
METRIC_DEFINE_counter(server, rpcs_queue_overflow,
107
                      "RPC Queue Overflows",
108
                      yb::MetricUnit::kRequests,
109
                      "Number of RPCs dropped because the service queue "
110
                      "was full.");
111
112
namespace yb {
113
namespace rpc {
114
115
namespace {
116
117
const CoarseDuration kTimeoutCheckGranularity = 100ms;
118
const char* const kTimedOutInQueue = "Call waited in the queue past deadline";
119
120
} // namespace
121
122
class ServicePoolImpl final : public InboundCallHandler {
123
 public:
124
  ServicePoolImpl(size_t max_tasks,
125
                  ThreadPool* thread_pool,
126
                  Scheduler* scheduler,
127
                  ServiceIfPtr service,
128
                  const scoped_refptr<MetricEntity>& entity)
129
      : max_queued_calls_(max_tasks),
130
        thread_pool_(*thread_pool),
131
        scheduler_(*scheduler),
132
        service_(std::move(service)),
133
        incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
134
        rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
135
        rpcs_timed_out_early_in_queue_(
136
            METRIC_rpcs_timed_out_early_in_queue.Instantiate(entity)),
137
        rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
138
        check_timeout_strand_(scheduler->io_service()),
139
140k
        log_prefix_(Format("$0: ", service_->service_name())) {
140
141
          // Create per service counter for rpcs_in_queue_.
142
140k
          auto id = Format("rpcs_in_queue_$0", service_->service_name());
143
140k
          EscapeMetricNameForPrometheus(&id);
144
140k
          string description = id + " metric for ServicePoolImpl";
145
140k
          rpcs_in_queue_ = entity->FindOrCreateGauge(
146
140k
              std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
147
140k
                  entity->prototype().name(), std::move(id),
148
140k
                  description, MetricUnit::kRequests, description, MetricLevel::kInfo)),
149
140k
              static_cast<int64>(0) /* initial_value */);
150
151
140k
          LOG_WITH_PREFIX(INFO) << "yb::rpc::ServicePoolImpl created at " << this;
152
140k
  }
153
154
2.02k
  ~ServicePoolImpl() {
155
2.02k
    StartShutdown();
156
2.02k
    CompleteShutdown();
157
2.02k
  }
158
159
4.12k
  void CompleteShutdown() {
160
4.12k
    shutdown_complete_latch_.Wait();
161
4.21k
    while (scheduled_tasks_.load(std::memory_order_acquire) != 0) {
162
90
      std::this_thread::sleep_for(10ms);
163
90
    }
164
4.12k
  }
165
166
4.12k
  void StartShutdown() {
167
4.12k
    bool closing_state = false;
168
4.12k
    if (closing_.compare_exchange_strong(closing_state, true)) {
169
2.02k
      service_->Shutdown();
170
171
2.02k
      auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire);
172
2.02k
      if (check_timeout_task != kUninitializedScheduledTaskId) {
173
326
        scheduler_.Abort(check_timeout_task);
174
326
      }
175
176
2.01k
      check_timeout_strand_.dispatch([this] {
177
2.01k
        std::weak_ptr<InboundCall> inbound_call_wrapper;
178
44.1k
        while (pre_check_timeout_queue_.pop(inbound_call_wrapper)) {}
179
2.01k
        shutdown_complete_latch_.CountDown();
180
2.01k
      });
181
2.02k
    }
182
4.12k
  }
183
184
29.8M
  void Enqueue(const InboundCallPtr& call) {
185
29.8M
    TRACE_TO(call->trace(), "Inserting onto call queue");
186
187
29.8M
    auto task = call->BindTask(this);
188
29.8M
    if (!task) {
189
1.10k
      Overflow(call, "service", queued_calls_.load(std::memory_order_relaxed));
190
1.10k
      return;
191
1.10k
    }
192
193
29.8M
    auto call_deadline = call->GetClientDeadline();
194
29.8M
    if (call_deadline != CoarseTimePoint::max()) {
195
29.7M
      pre_check_timeout_queue_.push(call);
196
29.7M
      ScheduleCheckTimeout(call_deadline);
197
29.7M
    }
198
199
29.8M
    thread_pool_.Enqueue(task);
200
29.8M
  }
201
202
1
  const Counter* RpcsTimedOutInQueueMetricForTests() const {
203
1
    return rpcs_timed_out_early_in_queue_.get();
204
1
  }
205
206
0
  const Counter* RpcsQueueOverflowMetric() const {
207
0
    return rpcs_queue_overflow_.get();
208
0
  }
209
210
0
  std::string service_name() const {
211
0
    return service_->service_name();
212
0
  }
213
214
0
  ServiceIfPtr TEST_get_service() const {
215
0
    return service_;
216
0
  }
217
218
1.10k
  void Overflow(const InboundCallPtr& call, const char* type, size_t limit) {
219
1.10k
    const auto err_msg =
220
1.10k
        Format("$0 request on $1 from $2 dropped due to backpressure. "
221
1.10k
                   "The $3 queue is full, it has $4 items.",
222
1.10k
            call->method_name().ToBuffer(),
223
1.10k
            service_->service_name(),
224
1.10k
            call->remote_address(),
225
1.10k
            type,
226
1.10k
            limit);
227
1.10k
    YB_LOG_EVERY_N_SECS(WARNING, 3) << LogPrefix() << err_msg;
228
1.10k
    const auto response_status = STATUS(ServiceUnavailable, err_msg);
229
1.10k
    rpcs_queue_overflow_->Increment();
230
1.10k
    call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status);
231
1.10k
    last_backpressure_at_.store(
232
1.10k
        CoarseMonoClock::Now().time_since_epoch(), std::memory_order_release);
233
1.10k
  }
234
235
7
  void Failure(const InboundCallPtr& call, const Status& status) override {
236
7
    if (!call->TryStartProcessing()) {
237
3
      return;
238
3
    }
239
240
4
    if (status.IsServiceUnavailable()) {
241
0
      Overflow(call, "global", thread_pool_.options().queue_limit);
242
0
      return;
243
0
    }
244
4
    YB_LOG_EVERY_N_SECS(WARNING, 1)
245
3
        << LogPrefix()
246
3
        << call->method_name() << " request on " << service_->service_name() << " from "
247
3
        << call->remote_address() << " dropped because of: " << status.ToString();
248
4
    const auto response_status = STATUS(ServiceUnavailable, "Service is shutting down");
249
4
    call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, response_status);
250
4
  }
251
252
140k
  void FillEndpoints(const RpcServicePtr& service, RpcEndpointMap* map) {
253
140k
    service_->FillEndpoints(service, map);
254
140k
  }
255
256
29.8M
  void Handle(InboundCallPtr incoming) override {
257
29.8M
    incoming->RecordHandlingStarted(incoming_queue_time_);
258
29.8M
    ADOPT_TRACE(incoming->trace());
259
260
29.8M
    const char* error_message;
261
29.8M
    if (PREDICT_FALSE(incoming->ClientTimedOut())) {
262
52
      error_message = kTimedOutInQueue;
263
29.8M
    } else if (PREDICT_FALSE(ShouldDropRequestDuringHighLoad(incoming))) {
264
0
      error_message = "The server is overloaded. Call waited in the queue past max_time_in_queue.";
265
29.8M
    } else {
266
29.8M
      TRACE_TO(incoming->trace(), "Handling call $0", yb::ToString(incoming->method_name()));
267
268
29.8M
      if (incoming->TryStartProcessing()) {
269
29.8M
        service_->Handle(std::move(incoming));
270
29.8M
      }
271
29.8M
      return;
272
29.8M
    }
273
274
52
    TRACE_TO(incoming->trace(), error_message);
275
0
    VLOG_WITH_PREFIX(4)
276
0
        << "Timing out call " << incoming->ToString() << " due to: " << error_message;
277
278
    // Respond as a failure, even though the client will probably ignore
279
    // the response anyway.
280
52
    TimedOut(incoming.get(), error_message, rpcs_timed_out_in_queue_.get());
281
52
  }
282
283
 private:
284
11.9k
  void TimedOut(InboundCall* call, const char* error_message, Counter* metric) {
285
11.9k
    if (call->RespondTimedOutIfPending(error_message)) {
286
55
      metric->Increment();
287
55
    }
288
11.9k
  }
289
290
29.8M
  bool ShouldDropRequestDuringHighLoad(const InboundCallPtr& incoming) {
291
29.8M
    CoarseTimePoint last_backpressure_at(last_backpressure_at_.load(std::memory_order_acquire));
292
293
    // For testing purposes.
294
29.8M
    if (GetAtomicFlag(&FLAGS_TEST_enable_backpressure_mode_for_testing)) {
295
0
      last_backpressure_at = CoarseMonoClock::Now();
296
0
    }
297
298
    // Test for a sentinel value, to avoid reading the clock.
299
29.8M
    if (last_backpressure_at == CoarseTimePoint()) {
300
28.6M
      return false;
301
28.6M
    }
302
303
1.20M
    auto now = CoarseMonoClock::Now();
304
1.20M
    if (now > last_backpressure_at + FLAGS_backpressure_recovery_period_ms * 1ms) {
305
0
      last_backpressure_at_.store(CoarseTimePoint().time_since_epoch(), std::memory_order_release);
306
0
      return false;
307
0
    }
308
309
1.20M
    return incoming->GetTimeInQueue().ToMilliseconds() > FLAGS_max_time_in_queue_ms;
310
1.20M
  }
311
312
237k
  void CheckTimeout(ScheduledTaskId task_id, CoarseTimePoint time, const Status& status) {
313
237k
    auto se = ScopeExit([this, task_id, time] {
314
237k
      auto expected_duration = time.time_since_epoch();
315
237k
      next_check_timeout_.compare_exchange_strong(
316
237k
          expected_duration, CoarseTimePoint::max().time_since_epoch(),
317
237k
          std::memory_order_acq_rel);
318
237k
      auto expected_task_id = task_id;
319
237k
      check_timeout_task_.compare_exchange_strong(
320
237k
          expected_task_id, kUninitializedScheduledTaskId, std::memory_order_acq_rel);
321
237k
      scheduled_tasks_.fetch_sub(1, std::memory_order_acq_rel);
322
237k
    });
323
237k
    if (!status.ok()) {
324
139k
      return;
325
139k
    }
326
327
97.7k
    auto now = CoarseMonoClock::now();
328
97.7k
    {
329
97.7k
      std::weak_ptr<InboundCall> weak_inbound_call;
330
25.5M
      while (pre_check_timeout_queue_.pop(weak_inbound_call)) {
331
25.4M
        auto inbound_call = weak_inbound_call.lock();
332
25.4M
        if (!inbound_call) {
333
25.3M
          continue;
334
25.3M
        }
335
70.1k
        if (now > inbound_call->GetClientDeadline()) {
336
5.59k
          TimedOut(inbound_call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get());
337
64.5k
        } else {
338
64.5k
          check_timeout_queue_.emplace(inbound_call);
339
64.5k
        }
340
70.1k
      }
341
97.7k
    }
342
343
147k
    while (!check_timeout_queue_.empty() && now > check_timeout_queue_.top().time) {
344
50.0k
      auto call = check_timeout_queue_.top().call.lock();
345
50.0k
      if (call) {
346
6.28k
        TimedOut(call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get());
347
6.28k
      }
348
50.0k
      check_timeout_queue_.pop();
349
50.0k
    }
350
351
97.7k
    if (!check_timeout_queue_.empty()) {
352
10.8k
      ScheduleCheckTimeout(check_timeout_queue_.top().time);
353
10.8k
    }
354
97.7k
  }
355
356
29.7M
  void ScheduleCheckTimeout(CoarseTimePoint time) {
357
29.7M
    if (closing_.load(std::memory_order_acquire)) {
358
0
      return;
359
0
    }
360
29.7M
    CoarseDuration next_check_timeout = next_check_timeout_.load(std::memory_order_acquire);
361
29.7M
    time += kTimeoutCheckGranularity;
362
29.7M
    while (CoarseTimePoint(next_check_timeout) > time) {
363
266k
      if (next_check_timeout_.compare_exchange_weak(
364
265k
              next_check_timeout, time.time_since_epoch(), std::memory_order_acq_rel)) {
365
266k
        check_timeout_strand_.dispatch([this, time] {
366
266k
          auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire);
367
266k
          if (check_timeout_task != kUninitializedScheduledTaskId) {
368
139k
            scheduler_.Abort(check_timeout_task);
369
139k
          }
370
266k
          scheduled_tasks_.fetch_add(1, std::memory_order_acq_rel);
371
266k
          auto task_id = scheduler_.Schedule(
372
237k
              [this, time](ScheduledTaskId task_id, const Status& status) {
373
237k
                check_timeout_strand_.dispatch([this, time, task_id, status] {
374
237k
                  CheckTimeout(task_id, time, status);
375
237k
                });
376
237k
              },
377
266k
              ToSteady(time));
378
266k
          check_timeout_task_.store(task_id, std::memory_order_release);
379
266k
        });
380
265k
        break;
381
265k
      }
382
266k
    }
383
29.7M
  }
384
385
140k
  const std::string& LogPrefix() const {
386
140k
    return log_prefix_;
387
140k
  }
388
389
29.8M
  bool CallQueued() override {
390
29.8M
    auto queued_calls = queued_calls_.fetch_add(1, std::memory_order_acq_rel);
391
29.8M
    if (queued_calls < 0) {
392
0
      YB_LOG_EVERY_N_SECS(DFATAL, 5) << "Negative number of queued calls: " << queued_calls;
393
0
    }
394
395
29.8M
    if (implicit_cast<size_t>(queued_calls) >= max_queued_calls_) {
396
1.10k
      queued_calls_.fetch_sub(1, std::memory_order_relaxed);
397
1.10k
      return false;
398
1.10k
    }
399
400
29.8M
    rpcs_in_queue_->Increment();
401
29.8M
    return true;
402
29.8M
  }
403
404
29.8M
  void CallDequeued() override {
405
29.8M
    queued_calls_.fetch_sub(1, std::memory_order_relaxed);
406
29.8M
    rpcs_in_queue_->Decrement();
407
29.8M
  }
408
409
  const size_t max_queued_calls_;
410
  ThreadPool& thread_pool_;
411
  Scheduler& scheduler_;
412
  ServiceIfPtr service_;
413
  scoped_refptr<Histogram> incoming_queue_time_;
414
  scoped_refptr<Counter> rpcs_timed_out_in_queue_;
415
  scoped_refptr<Counter> rpcs_timed_out_early_in_queue_;
416
  scoped_refptr<Counter> rpcs_queue_overflow_;
417
  scoped_refptr<AtomicGauge<int64_t>> rpcs_in_queue_;
418
  // Have to use CoarseDuration here, since CoarseTimePoint does not work with clang + libstdc++
419
  std::atomic<CoarseDuration> last_backpressure_at_{CoarseTimePoint().time_since_epoch()};
420
  std::atomic<int64_t> queued_calls_{0};
421
422
  // It is too expensive to update timeout priority queue when each call is received.
423
  // So we are doing the following trick.
424
  // All calls are added to pre_check_timeout_queue_, w/o priority.
425
  // Then before timeout check we move calls from this queue to priority queue.
426
  typedef cds::container::BasketQueue<cds::gc::DHP, std::weak_ptr<InboundCall>>
427
      PreCheckTimeoutQueue;
428
  PreCheckTimeoutQueue pre_check_timeout_queue_;
429
430
  // Used to track scheduled time, to avoid unnecessary rescheduling.
431
  std::atomic<CoarseDuration> next_check_timeout_{CoarseTimePoint::max().time_since_epoch()};
432
433
  // Last scheduled task, required to abort scheduled task during reschedule.
434
  std::atomic<ScheduledTaskId> check_timeout_task_{kUninitializedScheduledTaskId};
435
436
  std::atomic<int> scheduled_tasks_{0};
437
438
  // Timeout checking synchronization.
439
  IoService::strand check_timeout_strand_;
440
441
  struct QueuedCheckDeadline {
442
    CoarseTimePoint time;
443
    // We use weak pointer to avoid retaining call that was already processed.
444
    std::weak_ptr<InboundCall> call;
445
446
    explicit QueuedCheckDeadline(const InboundCallPtr& inp)
447
67.1k
        : time(inp->GetClientDeadline()), call(inp) {
448
67.1k
    }
449
  };
450
451
  // Priority queue puts the geatest value on top, so we invert comparison.
452
1.16M
  friend bool operator<(const QueuedCheckDeadline& lhs, const QueuedCheckDeadline& rhs) {
453
1.16M
    return lhs.time > rhs.time;
454
1.16M
  }
455
456
  std::priority_queue<QueuedCheckDeadline> check_timeout_queue_;
457
458
  std::atomic<bool> closing_ = {false};
459
  CountDownLatch shutdown_complete_latch_{1};
460
  std::string log_prefix_;
461
};
462
463
ServicePool::ServicePool(size_t max_tasks,
464
                         ThreadPool* thread_pool,
465
                         Scheduler* scheduler,
466
                         ServiceIfPtr service,
467
                         const scoped_refptr<MetricEntity>& metric_entity)
468
    : impl_(new ServicePoolImpl(
469
140k
        max_tasks, thread_pool, scheduler, std::move(service), metric_entity)) {
470
140k
}
471
472
2.02k
ServicePool::~ServicePool() {
473
2.02k
}
474
475
2.10k
void ServicePool::StartShutdown() {
476
2.10k
  impl_->StartShutdown();
477
2.10k
}
478
479
2.10k
void ServicePool::CompleteShutdown() {
480
2.10k
  impl_->CompleteShutdown();
481
2.10k
}
482
483
29.8M
void ServicePool::QueueInboundCall(InboundCallPtr call) {
484
29.8M
  impl_->Enqueue(std::move(call));
485
29.8M
}
486
487
0
void ServicePool::Handle(InboundCallPtr call) {
488
0
  impl_->Handle(std::move(call));
489
0
}
490
491
140k
void ServicePool::FillEndpoints(RpcEndpointMap* map) {
492
140k
  impl_->FillEndpoints(RpcServicePtr(this), map);
493
140k
}
494
495
1
const Counter* ServicePool::RpcsTimedOutInQueueMetricForTests() const {
496
1
  return impl_->RpcsTimedOutInQueueMetricForTests();
497
1
}
498
499
0
const Counter* ServicePool::RpcsQueueOverflowMetric() const {
500
0
  return impl_->RpcsQueueOverflowMetric();
501
0
}
502
503
0
std::string ServicePool::service_name() const {
504
0
  return impl_->service_name();
505
0
}
506
507
0
ServiceIfPtr ServicePool::TEST_get_service() const {
508
0
  return impl_->TEST_get_service();
509
0
}
510
511
} // namespace rpc
512
} // namespace yb