YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/rate_limiter.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/util/rate_limiter.h"
25
#include "yb/rocksdb/env.h"
26
#include <glog/logging.h>
27
28
namespace rocksdb {
29
30
31
// Pending request
32
struct GenericRateLimiter::Req {
33
  explicit Req(int64_t _bytes, port::Mutex* _mu)
34
1.14k
      : bytes(_bytes), cv(_mu), granted(false) {}
35
  int64_t bytes;
36
  port::CondVar cv;
37
  bool granted;
38
};
39
40
GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
41
                                       int64_t refill_period_us,
42
                                       int32_t fairness)
43
    : refill_period_us_(refill_period_us),
44
      refill_bytes_per_period_(
45
          CalculateRefillBytesPerPeriod(rate_bytes_per_sec)),
46
      env_(Env::Default()),
47
      stop_(false),
48
      exit_cv_(&request_mutex_),
49
      requests_to_wait_(0),
50
      available_bytes_(0),
51
      next_refill_us_(env_->NowMicros()),
52
      fairness_(fairness > 100 ? 100 : fairness),
53
      rnd_((uint32_t)time(nullptr)),
54
63.8k
      leader_(nullptr) {
55
63.8k
  total_requests_[0] = 0;
56
63.8k
  total_requests_[1] = 0;
57
63.8k
  total_bytes_through_[0] = 0;
58
63.8k
  total_bytes_through_[1] = 0;
59
63.8k
}
60
61
52.1k
GenericRateLimiter::~GenericRateLimiter() {
62
52.1k
  MutexLock g(&request_mutex_);
63
52.1k
  stop_ = true;
64
52.1k
  requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() +
65
52.1k
                                           queue_[Env::IO_HIGH].size());
66
0
  for (auto& r : queue_[Env::IO_HIGH]) {
67
0
    r->cv.Signal();
68
0
  }
69
0
  for (auto& r : queue_[Env::IO_LOW]) {
70
0
    r->cv.Signal();
71
0
  }
72
52.1k
  while (requests_to_wait_ > 0) {
73
0
    exit_cv_.Wait();
74
0
  }
75
52.1k
}
76
77
// This API allows user to dynamically change rate limiter's bytes per second.
78
4
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
79
4
  DCHECK_GT(bytes_per_second, 0);
80
4
  refill_bytes_per_period_.store(
81
4
      CalculateRefillBytesPerPeriod(bytes_per_second),
82
4
      std::memory_order_relaxed);
83
4
  MutexLock g(&request_mutex_);
84
4
  available_bytes_ = 0;
85
4
}
86
87
397k
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
88
397k
  assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
89
90
397k
  MutexLock g(&request_mutex_);
91
397k
  if (stop_) {
92
0
    return;
93
0
  }
94
95
397k
  ++total_requests_[pri];
96
97
397k
  if (available_bytes_ >= bytes) {
98
    // Refill thread assigns quota and notifies requests waiting on
99
    // the queue under mutex. So if we get here, that means nobody
100
    // is waiting?
101
396k
    available_bytes_ -= bytes;
102
396k
    total_bytes_through_[pri] += bytes;
103
396k
    return;
104
396k
  }
105
106
  // Request cannot be satisfied at this moment, enqueue
107
686
  Req r(bytes, &request_mutex_);
108
686
  queue_[pri].push_back(&r);
109
110
697
  do {
111
697
    bool timedout = false;
112
    // Leader election, candidates can be:
113
    // (1) a new incoming request,
114
    // (2) a previous leader, whose quota has not been not assigned yet due
115
    //     to lower priority
116
    // (3) a previous waiter at the front of queue, who got notified by
117
    //     previous leader
118
697
    if (leader_ == nullptr &&
119
974
        ((!queue_[Env::IO_HIGH].empty() &&
120
755
            &r == queue_[Env::IO_HIGH].front()) ||
121
219
         (!queue_[Env::IO_LOW].empty() &&
122
974
            &r == queue_[Env::IO_LOW].front()))) {
123
974
      leader_ = &r;
124
974
      timedout = r.cv.TimedWait(next_refill_us_);
125
18.4E
    } else {
126
      // Not at the front of queue or an leader has already been elected
127
18.4E
      r.cv.Wait();
128
18.4E
    }
129
130
    // request_mutex_ is held from now on
131
697
    if (stop_) {
132
0
      --requests_to_wait_;
133
0
      exit_cv_.Signal();
134
0
      return;
135
0
    }
136
137
    // Make sure the waken up request is always the header of its queue
138
697
    assert(r.granted ||
139
697
           (!queue_[Env::IO_HIGH].empty() &&
140
697
            &r == queue_[Env::IO_HIGH].front()) ||
141
697
           (!queue_[Env::IO_LOW].empty() &&
142
697
            &r == queue_[Env::IO_LOW].front()));
143
697
    assert(leader_ == nullptr ||
144
697
           (!queue_[Env::IO_HIGH].empty() &&
145
697
            leader_ == queue_[Env::IO_HIGH].front()) ||
146
697
           (!queue_[Env::IO_LOW].empty() &&
147
697
            leader_ == queue_[Env::IO_LOW].front()));
148
149
974
    if (leader_ == &r) {
150
      // Waken up from TimedWait()
151
974
      if (timedout) {
152
        // Time to do refill!
153
974
        Refill();
154
155
        // Re-elect a new leader regardless. This is to simplify the
156
        // election handling.
157
974
        leader_ = nullptr;
158
159
        // Notify the header of queue if current leader is going away
160
974
        if (r.granted) {
161
          // Current leader already got granted with quota. Notify header
162
          // of waiting queue to participate next round of election.
163
963
          assert((queue_[Env::IO_HIGH].empty() ||
164
963
                    &r != queue_[Env::IO_HIGH].front()) &&
165
963
                 (queue_[Env::IO_LOW].empty() ||
166
963
                    &r != queue_[Env::IO_LOW].front()));
167
963
          if (!queue_[Env::IO_HIGH].empty()) {
168
0
            queue_[Env::IO_HIGH].front()->cv.Signal();
169
963
          } else if (!queue_[Env::IO_LOW].empty()) {
170
0
            queue_[Env::IO_LOW].front()->cv.Signal();
171
0
          }
172
          // Done
173
963
          break;
174
963
        }
175
0
      } else {
176
        // Spontaneous wake up, need to continue to wait
177
0
        assert(!r.granted);
178
0
        leader_ = nullptr;
179
0
      }
180
18.4E
    } else {
181
      // Waken up by previous leader:
182
      // (1) if requested quota is granted, it is done.
183
      // (2) if requested quota is not granted, this means current thread
184
      // was picked as a new leader candidate (previous leader got quota).
185
      // It needs to participate leader election because a new request may
186
      // come in before this thread gets waken up. So it may actually need
187
      // to do Wait() again.
188
18.4E
      assert(!timedout);
189
18.4E
    }
190
18.4E
  } while (!r.granted);
191
686
}
192
193
974
void GenericRateLimiter::Refill() {
194
974
  next_refill_us_ = env_->NowMicros() + refill_period_us_;
195
  // Carry over the left over quota from the last period
196
974
  auto refill_bytes_per_period =
197
974
      refill_bytes_per_period_.load(std::memory_order_relaxed);
198
974
  if (available_bytes_ < refill_bytes_per_period) {
199
974
    available_bytes_ += refill_bytes_per_period;
200
974
  }
201
202
870
  int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
203
2.92k
  for (int q = 0; q < 2; ++q) {
204
974
    auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;
205
1.94k
    auto* queue = &queue_[use_pri];
206
3.09k
    while (!queue->empty()) {
207
1.15k
      auto* next_req = queue->front();
208
1.15k
      if (available_bytes_ < next_req->bytes) {
209
11
        break;
210
11
      }
211
1.14k
      available_bytes_ -= next_req->bytes;
212
1.14k
      total_bytes_through_[use_pri] += next_req->bytes;
213
1.14k
      queue->pop_front();
214
215
1.14k
      next_req->granted = true;
216
1.14k
      if (next_req != leader_) {
217
        // Quota granted, signal the thread
218
183
        next_req->cv.Signal();
219
183
      }
220
1.14k
    }
221
1.94k
  }
222
974
}
223
224
RateLimiter* NewGenericRateLimiter(
225
63.8k
    int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) {
226
63.8k
  assert(rate_bytes_per_sec > 0);
227
63.8k
  assert(refill_period_us > 0);
228
63.8k
  assert(fairness > 0);
229
63.8k
  return new GenericRateLimiter(
230
63.8k
      rate_bytes_per_sec, refill_period_us, fairness);
231
63.8k
}
232
233
}  // namespace rocksdb