YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/net/rate_limiter.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/util/net/rate_limiter.h"
14
15
#include "yb/util/size_literals.h"
16
#include "yb/util/status.h"
17
18
using namespace yb::size_literals;
19
20
DEFINE_int32(rate_limiter_min_size, 32_KB, "Minimum size for each transmission request");
21
DEFINE_uint64(rate_limiter_min_rate, 1000, "Minimum transmission rate in bytes/sec");
22
23
namespace yb {
24
25
2.03k
RateLimiter::RateLimiter() {}
26
27
RateLimiter::RateLimiter(const std::function<uint64_t()>& max_transmission_rate_updater)
28
11.2k
    : target_rate_updater_(max_transmission_rate_updater) {}
29
30
13.2k
RateLimiter::~RateLimiter() {
31
13.2k
  VLOG
(1) << "Total rate limiter rate: " << GetRate()2
;
32
13.2k
}
33
34
22.6k
uint64_t RateLimiter::GetMaxSizeForNextTransmission() {
35
22.6k
  if (!active()) {
36
0
    return 0;
37
0
  }
38
22.6k
  UpdateRate();
39
18.4E
  VLOG(2) << "Max size for next time slot: "
40
18.4E
             << target_rate_ * time_slot_ms_ / MonoTime::kMillisecondsPerSecond;
41
18.4E
  VLOG(2) << "max_transmission_rate: " << target_rate_;
42
22.6k
  VLOG
(2) << "time_slot_ms_: " << time_slot_ms_0
;
43
22.6k
  return GetSizeForNextTimeSlot();
44
22.6k
}
45
46
6
uint64_t RateLimiter::GetRate() {
47
6
  if (!init_) {
48
0
    return 0;
49
0
  }
50
51
6
  auto elapsed = end_time_.GetDeltaSince(start_time_);
52
6
  if (elapsed.ToMicroseconds() == 0) {
53
0
    return 0;
54
0
  }
55
56
6
  VLOG
(2) << "Elapsed: " << elapsed0
;
57
6
  VLOG
(2) << "Total bytes: " << total_bytes_0
;
58
6
  return MonoTime::kMicrosecondsPerSecond * total_bytes_ / elapsed.ToMicroseconds();
59
6
}
60
61
9.30k
void RateLimiter::UpdateDataSizeAndMaybeSleep(uint64_t data_size) {
62
9.30k
  auto now = MonoTime::Now();
63
9.30k
  auto elapsed = now.GetDeltaSince(end_time_);
64
9.30k
  end_time_ = now;
65
9.30k
  total_bytes_ += data_size;
66
9.30k
  UpdateRate();
67
9.30k
  UpdateTimeSlotSizeAndMaybeSleep(data_size, elapsed);
68
9.30k
}
69
70
18.8k
void RateLimiter::UpdateTimeSlotSizeAndMaybeSleep(uint64_t data_size, MonoDelta elapsed) {
71
18.8k
  if (!active()) {
72
10
    return;
73
10
  }
74
75
  // If the rate is greater than target_rate_, sleep until both rates are equal.
76
18.8k
  if (MonoTime::kMillisecondsPerSecond * data_size > target_rate_ * elapsed.ToMilliseconds()) {
77
13.1k
    auto sleep_time =
78
13.1k
        MonoTime::kMillisecondsPerSecond * data_size / target_rate_ - elapsed.ToMilliseconds();
79
13.1k
    VLOG(1) << " target_rate_=" << target_rate_
80
0
            << " elapsed=" << elapsed.ToMilliseconds()
81
0
            << " received size=" << data_size
82
0
            << " and sleeping for=" << sleep_time;
83
13.1k
    SleepFor(MonoDelta::FromMilliseconds(sleep_time));
84
13.1k
#if defined(OS_MACOSX)
85
13.1k
    total_time_slept_ += MonoDelta::FromMilliseconds(sleep_time);
86
13.1k
#endif
87
13.1k
    end_time_ = MonoTime::Now();
88
    // If we slept for more than 80% of time_slot_ms_, reduce the size of this time slot.
89
13.1k
    if (sleep_time > time_slot_ms_ * 80 / 100) {
90
112
      time_slot_ms_ = std::max(min_time_slot_, time_slot_ms_ / 2);
91
112
    }
92
13.1k
  } else {
93
5.65k
    time_slot_ms_ = std::min(max_time_slot_, time_slot_ms_ * 2);
94
5.65k
  }
95
18.8k
}
96
97
43.5k
void RateLimiter::UpdateRate() {
98
43.5k
  if (!active()) {
99
10
    VLOG
(1) << "RateLimiter inactive"0
;
100
10
    return;
101
10
  }
102
43.5k
  auto target_rate = target_rate_updater_();
103
43.5k
  VLOG
(1) << "New target_rate: " << target_rate4
;
104
43.5k
  if (target_rate != target_rate_) {
105
13.4k
    rate_start_time_ = MonoTime::Now();
106
13.4k
  }
107
43.5k
  target_rate_ = target_rate;
108
109
43.5k
  if (target_rate_ < FLAGS_rate_limiter_min_rate) {
110
4
    VLOG
(1) << "Received transmission rate is less than minimum " << FLAGS_rate_limiter_min_rate0
;
111
4
    target_rate_ = FLAGS_rate_limiter_min_rate;
112
4
  }
113
43.5k
}
114
115
22.6k
inline uint64_t RateLimiter::GetSizeForNextTimeSlot() {
116
22.6k
  VLOG
(1) << "target_rate_ " << target_rate_0
;
117
  // We don't want to transmit less than min_size_ bytes.
118
22.6k
  min_time_slot_ = (MonoTime::kMillisecondsPerSecond * FLAGS_rate_limiter_min_size + target_rate_) /
119
22.6k
                   target_rate_;
120
18.4E
  VLOG(1) << "min_size=" << FLAGS_rate_limiter_min_size
121
18.4E
          << " max_transmission_rate=" << target_rate_
122
18.4E
          << " min_time_slot=" << min_time_slot_;
123
22.6k
  VLOG(1) << "Max allowed bytes per time slot: "
124
2
             << target_rate_  * min_time_slot_  / MonoTime::kMillisecondsPerSecond;
125
22.6k
  VLOG
(1) << "time_slot_size: " << time_slot_ms_1
;
126
22.6k
  return target_rate_ * time_slot_ms_ / MonoTime::kMillisecondsPerSecond;
127
22.6k
}
128
129
13.2k
void RateLimiter::Init() {
130
13.2k
  start_time_ = MonoTime::Now();
131
13.2k
  rate_start_time_ = start_time_;
132
13.2k
  end_time_ = start_time_;
133
13.2k
  init_ = true;
134
13.2k
}
135
136
0
void RateLimiter::SetTargetRate(uint64_t target_rate) {
137
0
  target_rate_ = target_rate;
138
0
}
139
140
Status RateLimiter::SendOrReceiveData(std::function<Status()> send_rcv_func,
141
11.5k
                                      std::function<uint64()> reply_size_func) {
142
11.5k
  auto start = MonoTime::Now();
143
11.5k
  if (!init_) {
144
11.2k
    Init();
145
11.2k
  }
146
147
11.5k
  UpdateRate();
148
11.5k
  auto status = send_rcv_func();
149
11.5k
  auto now = MonoTime::Now();
150
11.5k
  auto elapsed = now.GetDeltaSince(start);
151
11.5k
  if (status.ok()) {
152
9.52k
    auto data_size = reply_size_func();
153
9.52k
    total_bytes_ += data_size;
154
9.52k
    end_time_ = MonoTime::Now();
155
9.52k
    UpdateTimeSlotSizeAndMaybeSleep(data_size, elapsed);
156
9.52k
  }
157
11.5k
  return status;
158
11.5k
}
159
} // namespace yb