YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/service_pool.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/service_pool.h"
34
35
#include <pthread.h>
36
#include <sys/types.h>
37
38
#include <functional>
39
#include <memory>
40
#include <queue>
41
#include <string>
42
#include <vector>
43
44
#include <boost/asio/strand.hpp>
45
#include <cds/container/basket_queue.h>
46
#include <cds/gc/dhp.h>
47
#include <glog/logging.h>
48
49
#include "yb/gutil/atomicops.h"
50
#include "yb/gutil/ref_counted.h"
51
#include "yb/gutil/strings/substitute.h"
52
53
#include "yb/rpc/inbound_call.h"
54
#include "yb/rpc/scheduler.h"
55
#include "yb/rpc/service_if.h"
56
57
#include "yb/util/countdown_latch.h"
58
#include "yb/util/flag_tags.h"
59
#include "yb/util/lockfree.h"
60
#include "yb/util/logging.h"
61
#include "yb/util/metrics.h"
62
#include "yb/util/monotime.h"
63
#include "yb/util/net/sockaddr.h"
64
#include "yb/util/scope_exit.h"
65
#include "yb/util/status.h"
66
#include "yb/util/trace.h"
67
68
using namespace std::literals;
69
using namespace std::placeholders;
70
using std::shared_ptr;
71
using strings::Substitute;
72
73
DEFINE_int64(max_time_in_queue_ms, 6000,
74
             "Fail calls that get stuck in the queue longer than the specified amount of time "
75
                 "(in ms)");
76
TAG_FLAG(max_time_in_queue_ms, advanced);
77
TAG_FLAG(max_time_in_queue_ms, runtime);
78
DEFINE_int64(backpressure_recovery_period_ms, 600000,
79
             "Once we hit a backpressure/service-overflow we will consider dropping stale requests "
80
             "for this duration (in ms)");
81
TAG_FLAG(backpressure_recovery_period_ms, advanced);
82
TAG_FLAG(backpressure_recovery_period_ms, runtime);
83
DEFINE_test_flag(bool, enable_backpressure_mode_for_testing, false,
84
            "For testing purposes. Enables the rpc's to be considered timed out in the queue even "
85
            "when we have not had any backpressure in the recent past.");
86
87
METRIC_DEFINE_coarse_histogram(server, rpc_incoming_queue_time,
88
                        "RPC Queue Time",
89
                        yb::MetricUnit::kMicroseconds,
90
                        "Number of microseconds incoming RPC requests spend in the worker queue");
91
92
METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue,
93
                      "RPC Queue Timeouts",
94
                      yb::MetricUnit::kRequests,
95
                      "Number of RPCs whose timeout elapsed while waiting "
96
                      "in the service queue, and thus were not processed. "
97
                      "Does not include calls that were expired before we tried to execute them.");
98
99
METRIC_DEFINE_counter(server, rpcs_timed_out_early_in_queue,
100
                      "RPC Queue Timeouts",
101
                      yb::MetricUnit::kRequests,
102
                      "Number of RPCs whose timeout elapsed while waiting "
103
                      "in the service queue, and thus were not processed. "
104
                      "Timeout for those calls were detected before the calls tried to execute.");
105
106
METRIC_DEFINE_counter(server, rpcs_queue_overflow,
107
                      "RPC Queue Overflows",
108
                      yb::MetricUnit::kRequests,
109
                      "Number of RPCs dropped because the service queue "
110
                      "was full.");
111
112
namespace yb {
113
namespace rpc {
114
115
namespace {
116
117
const CoarseDuration kTimeoutCheckGranularity = 100ms;
118
const char* const kTimedOutInQueue = "Call waited in the queue past deadline";
119
120
} // namespace
121
122
class ServicePoolImpl final : public InboundCallHandler {
123
 public:
124
  ServicePoolImpl(size_t max_tasks,
125
                  ThreadPool* thread_pool,
126
                  Scheduler* scheduler,
127
                  ServiceIfPtr service,
128
                  const scoped_refptr<MetricEntity>& entity)
129
      : max_queued_calls_(max_tasks),
130
        thread_pool_(*thread_pool),
131
        scheduler_(*scheduler),
132
        service_(std::move(service)),
133
        incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
134
        rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
135
        rpcs_timed_out_early_in_queue_(
136
            METRIC_rpcs_timed_out_early_in_queue.Instantiate(entity)),
137
        rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
138
        check_timeout_strand_(scheduler->io_service()),
139
209k
        log_prefix_(Format("$0: ", service_->service_name())) {
140
141
          // Create per service counter for rpcs_in_queue_.
142
209k
          auto id = Format("rpcs_in_queue_$0", service_->service_name());
143
209k
          EscapeMetricNameForPrometheus(&id);
144
209k
          string description = id + " metric for ServicePoolImpl";
145
209k
          rpcs_in_queue_ = entity->FindOrCreateGauge(
146
209k
              std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
147
209k
                  entity->prototype().name(), std::move(id),
148
209k
                  description, MetricUnit::kRequests, description, MetricLevel::kInfo)),
149
209k
              static_cast<int64>(0) /* initial_value */);
150
151
209k
          LOG_WITH_PREFIX(INFO) << "yb::rpc::ServicePoolImpl created at " << this;
152
209k
  }
153
154
2.23k
  ~ServicePoolImpl() {
155
2.23k
    StartShutdown();
156
2.23k
    CompleteShutdown();
157
2.23k
  }
158
159
4.56k
  void CompleteShutdown() {
160
4.56k
    shutdown_complete_latch_.Wait();
161
4.64k
    while (scheduled_tasks_.load(std::memory_order_acquire) != 0) {
162
74
      std::this_thread::sleep_for(10ms);
163
74
    }
164
4.56k
  }
165
166
4.62k
  void StartShutdown() {
167
4.62k
    bool closing_state = false;
168
4.62k
    if (closing_.compare_exchange_strong(closing_state, true)) {
169
2.30k
      service_->Shutdown();
170
171
2.30k
      auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire);
172
2.30k
      if (check_timeout_task != kUninitializedScheduledTaskId) {
173
352
        scheduler_.Abort(check_timeout_task);
174
352
      }
175
176
2.30k
      check_timeout_strand_.dispatch([this] {
177
2.29k
        std::weak_ptr<InboundCall> inbound_call_wrapper;
178
43.4k
        while (pre_check_timeout_queue_.pop(inbound_call_wrapper)) 
{}41.1k
179
2.29k
        shutdown_complete_latch_.CountDown();
180
2.29k
      });
181
2.30k
    }
182
4.62k
  }
183
184
90.4M
  void Enqueue(const InboundCallPtr& call) {
185
90.4M
    TRACE_TO(call->trace(), "Inserting onto call queue");
186
187
90.4M
    auto task = call->BindTask(this);
188
90.4M
    if (!task) {
189
1.26k
      Overflow(call, "service", queued_calls_.load(std::memory_order_relaxed));
190
1.26k
      return;
191
1.26k
    }
192
193
90.4M
    auto call_deadline = call->GetClientDeadline();
194
90.4M
    if (call_deadline != CoarseTimePoint::max()) {
195
90.2M
      pre_check_timeout_queue_.push(call);
196
90.2M
      ScheduleCheckTimeout(call_deadline);
197
90.2M
    }
198
199
90.4M
    thread_pool_.Enqueue(task);
200
90.4M
  }
201
202
1
  const Counter* RpcsTimedOutInQueueMetricForTests() const {
203
1
    return rpcs_timed_out_early_in_queue_.get();
204
1
  }
205
206
0
  const Counter* RpcsQueueOverflowMetric() const {
207
0
    return rpcs_queue_overflow_.get();
208
0
  }
209
210
0
  std::string service_name() const {
211
0
    return service_->service_name();
212
0
  }
213
214
0
  ServiceIfPtr TEST_get_service() const {
215
0
    return service_;
216
0
  }
217
218
1.26k
  void Overflow(const InboundCallPtr& call, const char* type, size_t limit) {
219
1.26k
    const auto err_msg =
220
1.26k
        Format("$0 request on $1 from $2 dropped due to backpressure. "
221
1.26k
                   "The $3 queue is full, it has $4 items.",
222
1.26k
            call->method_name().ToBuffer(),
223
1.26k
            service_->service_name(),
224
1.26k
            call->remote_address(),
225
1.26k
            type,
226
1.26k
            limit);
227
1.26k
    YB_LOG_EVERY_N_SECS
(WARNING, 3) << LogPrefix() << err_msg54
;
228
1.26k
    const auto response_status = STATUS(ServiceUnavailable, err_msg);
229
1.26k
    rpcs_queue_overflow_->Increment();
230
1.26k
    call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status);
231
1.26k
    last_backpressure_at_.store(
232
1.26k
        CoarseMonoClock::Now().time_since_epoch(), std::memory_order_release);
233
1.26k
  }
234
235
16
  void Failure(const InboundCallPtr& call, const Status& status) override {
236
16
    if (!call->TryStartProcessing()) {
237
3
      return;
238
3
    }
239
240
13
    if (status.IsServiceUnavailable()) {
241
0
      Overflow(call, "global", thread_pool_.options().queue_limit);
242
0
      return;
243
0
    }
244
13
    YB_LOG_EVERY_N_SECS(WARNING, 1)
245
4
        << LogPrefix()
246
4
        << call->method_name() << " request on " << service_->service_name() << " from "
247
4
        << call->remote_address() << " dropped because of: " << status.ToString();
248
13
    const auto response_status = STATUS(ServiceUnavailable, "Service is shutting down");
249
13
    call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, response_status);
250
13
  }
251
252
208k
  void FillEndpoints(const RpcServicePtr& service, RpcEndpointMap* map) {
253
208k
    service_->FillEndpoints(service, map);
254
208k
  }
255
256
90.5M
  void Handle(InboundCallPtr incoming) override {
257
90.5M
    incoming->RecordHandlingStarted(incoming_queue_time_);
258
90.5M
    ADOPT_TRACE(incoming->trace());
259
260
90.5M
    const char* error_message;
261
90.5M
    if (PREDICT_FALSE(incoming->ClientTimedOut())) {
262
3.89k
      error_message = kTimedOutInQueue;
263
90.5M
    } else if (PREDICT_FALSE(ShouldDropRequestDuringHighLoad(incoming))) {
264
0
      error_message = "The server is overloaded. Call waited in the queue past max_time_in_queue.";
265
90.5M
    } else {
266
90.5M
      TRACE_TO(incoming->trace(), "Handling call $0", yb::ToString(incoming->method_name()));
267
268
90.5M
      if (incoming->TryStartProcessing()) {
269
90.5M
        service_->Handle(std::move(incoming));
270
90.5M
      }
271
90.5M
      return;
272
90.5M
    }
273
274
3.89k
    TRACE_TO(incoming->trace(), error_message);
275
3.89k
    
VLOG_WITH_PREFIX1
(4)
276
1
        << "Timing out call " << incoming->ToString() << " due to: " << error_message;
277
278
    // Respond as a failure, even though the client will probably ignore
279
    // the response anyway.
280
3.89k
    TimedOut(incoming.get(), error_message, rpcs_timed_out_in_queue_.get());
281
3.89k
  }
282
283
 private:
284
19.5k
  void TimedOut(InboundCall* call, const char* error_message, Counter* metric) {
285
19.5k
    if (call->RespondTimedOutIfPending(error_message)) {
286
3.89k
      metric->Increment();
287
3.89k
    }
288
19.5k
  }
289
290
90.5M
  bool ShouldDropRequestDuringHighLoad(const InboundCallPtr& incoming) {
291
90.5M
    CoarseTimePoint last_backpressure_at(last_backpressure_at_.load(std::memory_order_acquire));
292
293
    // For testing purposes.
294
90.5M
    if (GetAtomicFlag(&FLAGS_TEST_enable_backpressure_mode_for_testing)) {
295
0
      last_backpressure_at = CoarseMonoClock::Now();
296
0
    }
297
298
    // Test for a sentinel value, to avoid reading the clock.
299
90.5M
    if (last_backpressure_at == CoarseTimePoint()) {
300
89.2M
      return false;
301
89.2M
    }
302
303
1.29M
    auto now = CoarseMonoClock::Now();
304
1.29M
    if (now > last_backpressure_at + FLAGS_backpressure_recovery_period_ms * 1ms) {
305
0
      last_backpressure_at_.store(CoarseTimePoint().time_since_epoch(), std::memory_order_release);
306
0
      return false;
307
0
    }
308
309
1.29M
    return incoming->GetTimeInQueue().ToMilliseconds() > FLAGS_max_time_in_queue_ms;
310
1.29M
  }
311
312
1.59M
  void CheckTimeout(ScheduledTaskId task_id, CoarseTimePoint time, const Status& status) {
313
1.59M
    auto se = ScopeExit([this, task_id, time] {
314
1.59M
      auto expected_duration = time.time_since_epoch();
315
1.59M
      next_check_timeout_.compare_exchange_strong(
316
1.59M
          expected_duration, CoarseTimePoint::max().time_since_epoch(),
317
1.59M
          std::memory_order_acq_rel);
318
1.59M
      auto expected_task_id = task_id;
319
1.59M
      check_timeout_task_.compare_exchange_strong(
320
1.59M
          expected_task_id, kUninitializedScheduledTaskId, std::memory_order_acq_rel);
321
1.59M
      scheduled_tasks_.fetch_sub(1, std::memory_order_acq_rel);
322
1.59M
    });
323
1.59M
    if (!status.ok()) {
324
403k
      return;
325
403k
    }
326
327
1.18M
    auto now = CoarseMonoClock::now();
328
1.18M
    {
329
1.18M
      std::weak_ptr<InboundCall> weak_inbound_call;
330
79.4M
      while (pre_check_timeout_queue_.pop(weak_inbound_call)) {
331
78.2M
        auto inbound_call = weak_inbound_call.lock();
332
78.2M
        if (!inbound_call) {
333
78.1M
          continue;
334
78.1M
        }
335
84.6k
        if (now > inbound_call->GetClientDeadline()) {
336
10.8k
          TimedOut(inbound_call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get());
337
73.8k
        } else {
338
73.8k
          check_timeout_queue_.emplace(inbound_call);
339
73.8k
        }
340
84.6k
      }
341
1.18M
    }
342
343
1.24M
    while (!check_timeout_queue_.empty() && 
now > check_timeout_queue_.top().time91.3k
) {
344
52.6k
      auto call = check_timeout_queue_.top().call.lock();
345
52.6k
      if (call) {
346
4.82k
        TimedOut(call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get());
347
4.82k
      }
348
52.6k
      check_timeout_queue_.pop();
349
52.6k
    }
350
351
1.18M
    if (!check_timeout_queue_.empty()) {
352
38.7k
      ScheduleCheckTimeout(check_timeout_queue_.top().time);
353
38.7k
    }
354
1.18M
  }
355
356
90.3M
  void ScheduleCheckTimeout(CoarseTimePoint time) {
357
90.3M
    if (closing_.load(std::memory_order_acquire)) {
358
0
      return;
359
0
    }
360
90.3M
    CoarseDuration next_check_timeout = next_check_timeout_.load(std::memory_order_acquire);
361
90.3M
    time += kTimeoutCheckGranularity;
362
90.3M
    while (CoarseTimePoint(next_check_timeout) > time) {
363
1.63M
      if (next_check_timeout_.compare_exchange_weak(
364
1.63M
              next_check_timeout, time.time_since_epoch(), std::memory_order_acq_rel)) {
365
1.63M
        check_timeout_strand_.dispatch([this, time] {
366
1.63M
          auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire);
367
1.63M
          if (check_timeout_task != kUninitializedScheduledTaskId) {
368
403k
            scheduler_.Abort(check_timeout_task);
369
403k
          }
370
1.63M
          scheduled_tasks_.fetch_add(1, std::memory_order_acq_rel);
371
1.63M
          auto task_id = scheduler_.Schedule(
372
1.63M
              [this, time](ScheduledTaskId task_id, const Status& status) {
373
1.59M
                check_timeout_strand_.dispatch([this, time, task_id, status] {
374
1.59M
                  CheckTimeout(task_id, time, status);
375
1.59M
                });
376
1.59M
              },
377
1.63M
              ToSteady(time));
378
1.63M
          check_timeout_task_.store(task_id, std::memory_order_release);
379
1.63M
        });
380
1.63M
        break;
381
1.63M
      }
382
1.63M
    }
383
90.3M
  }
384
385
209k
  const std::string& LogPrefix() const {
386
209k
    return log_prefix_;
387
209k
  }
388
389
90.5M
  bool CallQueued() override {
390
90.5M
    auto queued_calls = queued_calls_.fetch_add(1, std::memory_order_acq_rel);
391
90.5M
    if (queued_calls < 0) {
392
0
      YB_LOG_EVERY_N_SECS(DFATAL, 5) << "Negative number of queued calls: " << queued_calls;
393
0
    }
394
395
90.5M
    if (implicit_cast<size_t>(queued_calls) >= max_queued_calls_) {
396
1.25k
      queued_calls_.fetch_sub(1, std::memory_order_relaxed);
397
1.25k
      return false;
398
1.25k
    }
399
400
90.5M
    rpcs_in_queue_->Increment();
401
90.5M
    return true;
402
90.5M
  }
403
404
90.5M
  void CallDequeued() override {
405
90.5M
    queued_calls_.fetch_sub(1, std::memory_order_relaxed);
406
90.5M
    rpcs_in_queue_->Decrement();
407
90.5M
  }
408
409
  const size_t max_queued_calls_;
410
  ThreadPool& thread_pool_;
411
  Scheduler& scheduler_;
412
  ServiceIfPtr service_;
413
  scoped_refptr<Histogram> incoming_queue_time_;
414
  scoped_refptr<Counter> rpcs_timed_out_in_queue_;
415
  scoped_refptr<Counter> rpcs_timed_out_early_in_queue_;
416
  scoped_refptr<Counter> rpcs_queue_overflow_;
417
  scoped_refptr<AtomicGauge<int64_t>> rpcs_in_queue_;
418
  // Have to use CoarseDuration here, since CoarseTimePoint does not work with clang + libstdc++
419
  std::atomic<CoarseDuration> last_backpressure_at_{CoarseTimePoint().time_since_epoch()};
420
  std::atomic<int64_t> queued_calls_{0};
421
422
  // It is too expensive to update timeout priority queue when each call is received.
423
  // So we are doing the following trick.
424
  // All calls are added to pre_check_timeout_queue_, w/o priority.
425
  // Then before timeout check we move calls from this queue to priority queue.
426
  typedef cds::container::BasketQueue<cds::gc::DHP, std::weak_ptr<InboundCall>>
427
      PreCheckTimeoutQueue;
428
  PreCheckTimeoutQueue pre_check_timeout_queue_;
429
430
  // Used to track scheduled time, to avoid unnecessary rescheduling.
431
  std::atomic<CoarseDuration> next_check_timeout_{CoarseTimePoint::max().time_since_epoch()};
432
433
  // Last scheduled task, required to abort scheduled task during reschedule.
434
  std::atomic<ScheduledTaskId> check_timeout_task_{kUninitializedScheduledTaskId};
435
436
  std::atomic<int> scheduled_tasks_{0};
437
438
  // Timeout checking synchronization.
439
  IoService::strand check_timeout_strand_;
440
441
  struct QueuedCheckDeadline {
442
    CoarseTimePoint time;
443
    // We use weak pointer to avoid retaining call that was already processed.
444
    std::weak_ptr<InboundCall> call;
445
446
    explicit QueuedCheckDeadline(const InboundCallPtr& inp)
447
84.6k
        : time(inp->GetClientDeadline()), call(inp) {
448
84.6k
    }
449
  };
450
451
  // Priority queue puts the geatest value on top, so we invert comparison.
452
578k
  friend bool operator<(const QueuedCheckDeadline& lhs, const QueuedCheckDeadline& rhs) {
453
578k
    return lhs.time > rhs.time;
454
578k
  }
455
456
  std::priority_queue<QueuedCheckDeadline> check_timeout_queue_;
457
458
  std::atomic<bool> closing_ = {false};
459
  CountDownLatch shutdown_complete_latch_{1};
460
  std::string log_prefix_;
461
};
462
463
ServicePool::ServicePool(size_t max_tasks,
464
                         ThreadPool* thread_pool,
465
                         Scheduler* scheduler,
466
                         ServiceIfPtr service,
467
                         const scoped_refptr<MetricEntity>& metric_entity)
468
    : impl_(new ServicePoolImpl(
469
209k
        max_tasks, thread_pool, scheduler, std::move(service), metric_entity)) {
470
209k
}
471
472
2.23k
ServicePool::~ServicePool() {
473
2.23k
}
474
475
2.38k
void ServicePool::StartShutdown() {
476
2.38k
  impl_->StartShutdown();
477
2.38k
}
478
479
2.33k
void ServicePool::CompleteShutdown() {
480
2.33k
  impl_->CompleteShutdown();
481
2.33k
}
482
483
90.5M
void ServicePool::QueueInboundCall(InboundCallPtr call) {
484
90.5M
  impl_->Enqueue(std::move(call));
485
90.5M
}
486
487
0
void ServicePool::Handle(InboundCallPtr call) {
488
0
  impl_->Handle(std::move(call));
489
0
}
490
491
208k
void ServicePool::FillEndpoints(RpcEndpointMap* map) {
492
208k
  impl_->FillEndpoints(RpcServicePtr(this), map);
493
208k
}
494
495
1
const Counter* ServicePool::RpcsTimedOutInQueueMetricForTests() const {
496
1
  return impl_->RpcsTimedOutInQueueMetricForTests();
497
1
}
498
499
0
const Counter* ServicePool::RpcsQueueOverflowMetric() const {
500
0
  return impl_->RpcsQueueOverflowMetric();
501
0
}
502
503
0
std::string ServicePool::service_name() const {
504
0
  return impl_->service_name();
505
0
}
506
507
0
ServiceIfPtr ServicePool::TEST_get_service() const {
508
0
  return impl_->TEST_get_service();
509
0
}
510
511
} // namespace rpc
512
} // namespace yb