/Users/deen/code/yugabyte-db/src/yb/util/net/rate_limiter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include "yb/util/net/rate_limiter.h" |
14 | | |
15 | | #include "yb/util/size_literals.h" |
16 | | #include "yb/util/status.h" |
17 | | |
18 | | using namespace yb::size_literals; |
19 | | |
20 | | DEFINE_int32(rate_limiter_min_size, 32_KB, "Minimum size for each transmission request"); |
21 | | DEFINE_uint64(rate_limiter_min_rate, 1000, "Minimum transmission rate in bytes/sec"); |
22 | | |
23 | | namespace yb { |
24 | | |
25 | 1.45k | RateLimiter::RateLimiter() {} |
26 | | |
27 | | RateLimiter::RateLimiter(const std::function<uint64_t()>& max_transmission_rate_updater) |
28 | 5.78k | : target_rate_updater_(max_transmission_rate_updater) {} |
29 | | |
30 | 6.74k | RateLimiter::~RateLimiter() { |
31 | 18.4E | VLOG(1) << "Total rate limiter rate: " << GetRate(); |
32 | 6.74k | } |
33 | | |
34 | 11.6k | uint64_t RateLimiter::GetMaxSizeForNextTransmission() { |
35 | 11.6k | if (!active()) { |
36 | 0 | return 0; |
37 | 0 | } |
38 | 11.6k | UpdateRate(); |
39 | 1 | VLOG(2) << "Max size for next time slot: " |
40 | 1 | << target_rate_ * time_slot_ms_ / MonoTime::kMillisecondsPerSecond; |
41 | 0 | VLOG(2) << "max_transmission_rate: " << target_rate_; |
42 | 0 | VLOG(2) << "time_slot_ms_: " << time_slot_ms_; |
43 | 11.6k | return GetSizeForNextTimeSlot(); |
44 | 11.6k | } |
45 | | |
46 | 6 | uint64_t RateLimiter::GetRate() { |
47 | 6 | if (!init_) { |
48 | 0 | return 0; |
49 | 0 | } |
50 | | |
51 | 6 | auto elapsed = end_time_.GetDeltaSince(start_time_); |
52 | 6 | if (elapsed.ToMicroseconds() == 0) { |
53 | 0 | return 0; |
54 | 0 | } |
55 | | |
56 | 0 | VLOG(2) << "Elapsed: " << elapsed; |
57 | 0 | VLOG(2) << "Total bytes: " << total_bytes_; |
58 | 6 | return MonoTime::kMicrosecondsPerSecond * total_bytes_ / elapsed.ToMicroseconds(); |
59 | 6 | } |
60 | | |
61 | 4.82k | void RateLimiter::UpdateDataSizeAndMaybeSleep(uint64_t data_size) { |
62 | 4.82k | auto now = MonoTime::Now(); |
63 | 4.82k | auto elapsed = now.GetDeltaSince(end_time_); |
64 | 4.82k | end_time_ = now; |
65 | 4.82k | total_bytes_ += data_size; |
66 | 4.82k | UpdateRate(); |
67 | 4.82k | UpdateTimeSlotSizeAndMaybeSleep(data_size, elapsed); |
68 | 4.82k | } |
69 | | |
70 | 9.87k | void RateLimiter::UpdateTimeSlotSizeAndMaybeSleep(uint64_t data_size, MonoDelta elapsed) { |
71 | 9.87k | if (!active()) { |
72 | 10 | return; |
73 | 10 | } |
74 | | |
75 | | // If the rate is greater than target_rate_, sleep until both rates are equal. |
76 | 9.86k | if (MonoTime::kMillisecondsPerSecond * data_size > target_rate_ * elapsed.ToMilliseconds()) { |
77 | 7.12k | auto sleep_time = |
78 | 7.12k | MonoTime::kMillisecondsPerSecond * data_size / target_rate_ - elapsed.ToMilliseconds(); |
79 | 0 | VLOG(1) << " target_rate_=" << target_rate_ |
80 | 0 | << " elapsed=" << elapsed.ToMilliseconds() |
81 | 0 | << " received size=" << data_size |
82 | 0 | << " and sleeping for=" << sleep_time; |
83 | 7.12k | SleepFor(MonoDelta::FromMilliseconds(sleep_time)); |
84 | 7.12k | #if defined(OS_MACOSX) |
85 | 7.12k | total_time_slept_ += MonoDelta::FromMilliseconds(sleep_time); |
86 | 7.12k | #endif |
87 | 7.12k | end_time_ = MonoTime::Now(); |
88 | | // If we slept for more than 80% of time_slot_ms_, reduce the size of this time slot. |
89 | 7.12k | if (sleep_time > time_slot_ms_ * 80 / 100) { |
90 | 112 | time_slot_ms_ = std::max(min_time_slot_, time_slot_ms_ / 2); |
91 | 112 | } |
92 | 2.73k | } else { |
93 | 2.73k | time_slot_ms_ = std::min(max_time_slot_, time_slot_ms_ * 2); |
94 | 2.73k | } |
95 | 9.86k | } |
96 | | |
97 | 22.5k | void RateLimiter::UpdateRate() { |
98 | 22.5k | if (!active()) { |
99 | 0 | VLOG(1) << "RateLimiter inactive"; |
100 | 10 | return; |
101 | 10 | } |
102 | 22.5k | auto target_rate = target_rate_updater_(); |
103 | 7 | VLOG(1) << "New target_rate: " << target_rate; |
104 | 22.5k | if (target_rate != target_rate_) { |
105 | 6.97k | rate_start_time_ = MonoTime::Now(); |
106 | 6.97k | } |
107 | 22.5k | target_rate_ = target_rate; |
108 | | |
109 | 22.5k | if (target_rate_ < FLAGS_rate_limiter_min_rate) { |
110 | 0 | VLOG(1) << "Received transmission rate is less than minimum " << FLAGS_rate_limiter_min_rate; |
111 | 4 | target_rate_ = FLAGS_rate_limiter_min_rate; |
112 | 4 | } |
113 | 22.5k | } |
114 | | |
115 | 11.6k | inline uint64_t RateLimiter::GetSizeForNextTimeSlot() { |
116 | 18.4E | VLOG(1) << "target_rate_ " << target_rate_; |
117 | | // We don't want to transmit less than min_size_ bytes. |
118 | 11.6k | min_time_slot_ = (MonoTime::kMillisecondsPerSecond * FLAGS_rate_limiter_min_size + target_rate_) / |
119 | 11.6k | target_rate_; |
120 | 0 | VLOG(1) << "min_size=" << FLAGS_rate_limiter_min_size |
121 | 0 | << " max_transmission_rate=" << target_rate_ |
122 | 0 | << " min_time_slot=" << min_time_slot_; |
123 | 18.4E | VLOG(1) << "Max allowed bytes per time slot: " |
124 | 18.4E | << target_rate_ * min_time_slot_ / MonoTime::kMillisecondsPerSecond; |
125 | 1 | VLOG(1) << "time_slot_size: " << time_slot_ms_; |
126 | 11.6k | return target_rate_ * time_slot_ms_ / MonoTime::kMillisecondsPerSecond; |
127 | 11.6k | } |
128 | | |
129 | 6.77k | void RateLimiter::Init() { |
130 | 6.77k | start_time_ = MonoTime::Now(); |
131 | 6.77k | rate_start_time_ = start_time_; |
132 | 6.77k | end_time_ = start_time_; |
133 | 6.77k | init_ = true; |
134 | 6.77k | } |
135 | | |
136 | 0 | void RateLimiter::SetTargetRate(uint64_t target_rate) { |
137 | 0 | target_rate_ = target_rate; |
138 | 0 | } |
139 | | |
140 | | Status RateLimiter::SendOrReceiveData(std::function<Status()> send_rcv_func, |
141 | 6.04k | std::function<uint64()> reply_size_func) { |
142 | 6.04k | auto start = MonoTime::Now(); |
143 | 6.04k | if (!init_) { |
144 | 5.77k | Init(); |
145 | 5.77k | } |
146 | | |
147 | 6.04k | UpdateRate(); |
148 | 6.04k | auto status = send_rcv_func(); |
149 | 6.04k | auto now = MonoTime::Now(); |
150 | 6.04k | auto elapsed = now.GetDeltaSince(start); |
151 | 6.04k | if (status.ok()) { |
152 | 5.04k | auto data_size = reply_size_func(); |
153 | 5.04k | total_bytes_ += data_size; |
154 | 5.04k | end_time_ = MonoTime::Now(); |
155 | 5.04k | UpdateTimeSlotSizeAndMaybeSleep(data_size, elapsed); |
156 | 5.04k | } |
157 | 6.04k | return status; |
158 | 6.04k | } |
159 | | } // namespace yb |