/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/rate_limiter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/util/rate_limiter.h" |
25 | | #include "yb/rocksdb/env.h" |
26 | | #include <glog/logging.h> |
27 | | |
28 | | namespace rocksdb { |
29 | | |
30 | | |
31 | | // Pending request |
32 | | struct GenericRateLimiter::Req { |
33 | | explicit Req(int64_t _bytes, port::Mutex* _mu) |
34 | 1.32k | : bytes(_bytes), cv(_mu), granted(false) {} |
35 | | int64_t bytes; |
36 | | port::CondVar cv; |
37 | | bool granted; |
38 | | }; |
39 | | |
40 | | GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, |
41 | | int64_t refill_period_us, |
42 | | int32_t fairness) |
43 | | : refill_period_us_(refill_period_us), |
44 | | refill_bytes_per_period_( |
45 | | CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), |
46 | | env_(Env::Default()), |
47 | | stop_(false), |
48 | | exit_cv_(&request_mutex_), |
49 | | requests_to_wait_(0), |
50 | | available_bytes_(0), |
51 | | next_refill_us_(env_->NowMicros()), |
52 | | fairness_(fairness > 100 ? 100 : fairness), |
53 | | rnd_((uint32_t)time(nullptr)), |
54 | 98.3k | leader_(nullptr) { |
55 | 98.3k | total_requests_[0] = 0; |
56 | 98.3k | total_requests_[1] = 0; |
57 | 98.3k | total_bytes_through_[0] = 0; |
58 | 98.3k | total_bytes_through_[1] = 0; |
59 | 98.3k | } |
60 | | |
61 | 81.8k | GenericRateLimiter::~GenericRateLimiter() { |
62 | 81.8k | MutexLock g(&request_mutex_); |
63 | 81.8k | stop_ = true; |
64 | 81.8k | requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() + |
65 | 81.8k | queue_[Env::IO_HIGH].size()); |
66 | 81.8k | for (auto& r : queue_[Env::IO_HIGH]) { |
67 | 0 | r->cv.Signal(); |
68 | 0 | } |
69 | 81.8k | for (auto& r : queue_[Env::IO_LOW]) { |
70 | 0 | r->cv.Signal(); |
71 | 0 | } |
72 | 81.8k | while (requests_to_wait_ > 0) { |
73 | 0 | exit_cv_.Wait(); |
74 | 0 | } |
75 | 81.8k | } |
76 | | |
77 | | // This API allows user to dynamically change rate limiter's bytes per second. |
78 | 4 | void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { |
79 | 4 | DCHECK_GT(bytes_per_second, 0); |
80 | 4 | refill_bytes_per_period_.store( |
81 | 4 | CalculateRefillBytesPerPeriod(bytes_per_second), |
82 | 4 | std::memory_order_relaxed); |
83 | 4 | MutexLock g(&request_mutex_); |
84 | 4 | available_bytes_ = 0; |
85 | 4 | } |
86 | | |
87 | 774k | void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { |
88 | 774k | assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); |
89 | | |
90 | 0 | MutexLock g(&request_mutex_); |
91 | 774k | if (stop_) { |
92 | 0 | return; |
93 | 0 | } |
94 | | |
95 | 774k | ++total_requests_[pri]; |
96 | | |
97 | 774k | if (available_bytes_ >= bytes) { |
98 | | // Refill thread assigns quota and notifies requests waiting on |
99 | | // the queue under mutex. So if we get here, that means nobody |
100 | | // is waiting? |
101 | 773k | available_bytes_ -= bytes; |
102 | 773k | total_bytes_through_[pri] += bytes; |
103 | 773k | return; |
104 | 773k | } |
105 | | |
106 | | // Request cannot be satisfied at this moment, enqueue |
107 | 986 | Req r(bytes, &request_mutex_); |
108 | 986 | queue_[pri].push_back(&r); |
109 | | |
110 | 998 | do { |
111 | 998 | bool timedout = false; |
112 | | // Leader election, candidates can be: |
113 | | // (1) a new incoming request, |
114 | | // (2) a previous leader, whose quota has not been not assigned yet due |
115 | | // to lower priority |
116 | | // (3) a previous waiter at the front of queue, who got notified by |
117 | | // previous leader |
118 | 998 | if (leader_ == nullptr && |
119 | 1.17k | ((!queue_[Env::IO_HIGH].empty() && |
120 | 1.17k | &r == queue_[Env::IO_HIGH].front()1.03k ) || |
121 | 1.17k | (136 !queue_[Env::IO_LOW].empty()136 && |
122 | 1.17k | &r == queue_[Env::IO_LOW].front()136 ))) { |
123 | 1.17k | leader_ = &r; |
124 | 1.17k | timedout = r.cv.TimedWait(next_refill_us_); |
125 | 18.4E | } else { |
126 | | // Not at the front of queue or an leader has already been elected |
127 | 18.4E | r.cv.Wait(); |
128 | 18.4E | } |
129 | | |
130 | | // request_mutex_ is held from now on |
131 | 998 | if (stop_) { |
132 | 0 | --requests_to_wait_; |
133 | 0 | exit_cv_.Signal(); |
134 | 0 | return; |
135 | 0 | } |
136 | | |
137 | | // Make sure the waken up request is always the header of its queue |
138 | 998 | assert(r.granted || |
139 | 998 | (!queue_[Env::IO_HIGH].empty() && |
140 | 998 | &r == queue_[Env::IO_HIGH].front()) || |
141 | 998 | (!queue_[Env::IO_LOW].empty() && |
142 | 998 | &r == queue_[Env::IO_LOW].front())); |
143 | 0 | assert(leader_ == nullptr || |
144 | 998 | (!queue_[Env::IO_HIGH].empty() && |
145 | 998 | leader_ == queue_[Env::IO_HIGH].front()) || |
146 | 998 | (!queue_[Env::IO_LOW].empty() && |
147 | 998 | leader_ == queue_[Env::IO_LOW].front())); |
148 | | |
149 | 1.17k | if (leader_ == &r998 ) { |
150 | | // Waken up from TimedWait() |
151 | 1.17k | if (timedout) { |
152 | | // Time to do refill! |
153 | 1.17k | Refill(); |
154 | | |
155 | | // Re-elect a new leader regardless. This is to simplify the |
156 | | // election handling. |
157 | 1.17k | leader_ = nullptr; |
158 | | |
159 | | // Notify the header of queue if current leader is going away |
160 | 1.17k | if (r.granted) { |
161 | | // Current leader already got granted with quota. Notify header |
162 | | // of waiting queue to participate next round of election. |
163 | 1.16k | assert((queue_[Env::IO_HIGH].empty() || |
164 | 1.16k | &r != queue_[Env::IO_HIGH].front()) && |
165 | 1.16k | (queue_[Env::IO_LOW].empty() || |
166 | 1.16k | &r != queue_[Env::IO_LOW].front())); |
167 | 1.16k | if (!queue_[Env::IO_HIGH].empty()) { |
168 | 1 | queue_[Env::IO_HIGH].front()->cv.Signal(); |
169 | 1.16k | } else if (!queue_[Env::IO_LOW].empty()) { |
170 | 4 | queue_[Env::IO_LOW].front()->cv.Signal(); |
171 | 4 | } |
172 | | // Done |
173 | 1.16k | break; |
174 | 1.16k | } |
175 | 1.17k | } else { |
176 | | // Spontaneous wake up, need to continue to wait |
177 | 0 | assert(!r.granted); |
178 | 0 | leader_ = nullptr; |
179 | 0 | } |
180 | 18.4E | } else { |
181 | | // Waken up by previous leader: |
182 | | // (1) if requested quota is granted, it is done. |
183 | | // (2) if requested quota is not granted, this means current thread |
184 | | // was picked as a new leader candidate (previous leader got quota). |
185 | | // It needs to participate leader election because a new request may |
186 | | // come in before this thread gets waken up. So it may actually need |
187 | | // to do Wait() again. |
188 | 18.4E | assert(!timedout); |
189 | 18.4E | } |
190 | 18.4E | } while (!r.granted); |
191 | 986 | } |
192 | | |
193 | 1.17k | void GenericRateLimiter::Refill() { |
194 | 1.17k | next_refill_us_ = env_->NowMicros() + refill_period_us_; |
195 | | // Carry over the left over quota from the last period |
196 | 1.17k | auto refill_bytes_per_period = |
197 | 1.17k | refill_bytes_per_period_.load(std::memory_order_relaxed); |
198 | 1.17k | if (available_bytes_ < refill_bytes_per_period) { |
199 | 1.17k | available_bytes_ += refill_bytes_per_period; |
200 | 1.17k | } |
201 | | |
202 | 1.17k | int use_low_pri_first = rnd_.OneIn(fairness_) ? 0127 : 11.04k ; |
203 | 3.52k | for (int q = 0; q < 2; ++q2.35k ) { |
204 | 2.35k | auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW1.17k : Env::IO_HIGH1.17k ; |
205 | 2.35k | auto* queue = &queue_[use_pri]; |
206 | 3.67k | while (!queue->empty()) { |
207 | 1.33k | auto* next_req = queue->front(); |
208 | 1.33k | if (available_bytes_ < next_req->bytes) { |
209 | 12 | break; |
210 | 12 | } |
211 | 1.32k | available_bytes_ -= next_req->bytes; |
212 | 1.32k | total_bytes_through_[use_pri] += next_req->bytes; |
213 | 1.32k | queue->pop_front(); |
214 | | |
215 | 1.32k | next_req->granted = true; |
216 | 1.32k | if (next_req != leader_) { |
217 | | // Quota granted, signal the thread |
218 | 153 | next_req->cv.Signal(); |
219 | 153 | } |
220 | 1.32k | } |
221 | 2.35k | } |
222 | 1.17k | } |
223 | | |
224 | | RateLimiter* NewGenericRateLimiter( |
225 | 98.3k | int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { |
226 | 98.3k | assert(rate_bytes_per_sec > 0); |
227 | 0 | assert(refill_period_us > 0); |
228 | 0 | assert(fairness > 0); |
229 | 0 | return new GenericRateLimiter( |
230 | 98.3k | rate_bytes_per_sec, refill_period_us, fairness); |
231 | 98.3k | } |
232 | | |
233 | | } // namespace rocksdb |