YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/net/rate_limiter.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/util/net/rate_limiter.h"
14
15
#include "yb/util/size_literals.h"
16
#include "yb/util/status.h"
17
18
using namespace yb::size_literals;
19
20
DEFINE_int32(rate_limiter_min_size, 32_KB, "Minimum size for each transmission request");
21
DEFINE_uint64(rate_limiter_min_rate, 1000, "Minimum transmission rate in bytes/sec");
22
23
namespace yb {
24
25
1.45k
RateLimiter::RateLimiter() {}
26
27
RateLimiter::RateLimiter(const std::function<uint64_t()>& max_transmission_rate_updater)
28
5.78k
    : target_rate_updater_(max_transmission_rate_updater) {}
29
30
6.74k
RateLimiter::~RateLimiter() {
31
18.4E
  VLOG(1) << "Total rate limiter rate: " << GetRate();
32
6.74k
}
33
34
11.6k
uint64_t RateLimiter::GetMaxSizeForNextTransmission() {
35
11.6k
  if (!active()) {
36
0
    return 0;
37
0
  }
38
11.6k
  UpdateRate();
39
1
  VLOG(2) << "Max size for next time slot: "
40
1
             << target_rate_ * time_slot_ms_ / MonoTime::kMillisecondsPerSecond;
41
0
  VLOG(2) << "max_transmission_rate: " << target_rate_;
42
0
  VLOG(2) << "time_slot_ms_: " << time_slot_ms_;
43
11.6k
  return GetSizeForNextTimeSlot();
44
11.6k
}
45
46
6
uint64_t RateLimiter::GetRate() {
47
6
  if (!init_) {
48
0
    return 0;
49
0
  }
50
51
6
  auto elapsed = end_time_.GetDeltaSince(start_time_);
52
6
  if (elapsed.ToMicroseconds() == 0) {
53
0
    return 0;
54
0
  }
55
56
0
  VLOG(2) << "Elapsed: " << elapsed;
57
0
  VLOG(2) << "Total bytes: " << total_bytes_;
58
6
  return MonoTime::kMicrosecondsPerSecond * total_bytes_ / elapsed.ToMicroseconds();
59
6
}
60
61
4.82k
void RateLimiter::UpdateDataSizeAndMaybeSleep(uint64_t data_size) {
62
4.82k
  auto now = MonoTime::Now();
63
4.82k
  auto elapsed = now.GetDeltaSince(end_time_);
64
4.82k
  end_time_ = now;
65
4.82k
  total_bytes_ += data_size;
66
4.82k
  UpdateRate();
67
4.82k
  UpdateTimeSlotSizeAndMaybeSleep(data_size, elapsed);
68
4.82k
}
69
70
9.87k
void RateLimiter::UpdateTimeSlotSizeAndMaybeSleep(uint64_t data_size, MonoDelta elapsed) {
71
9.87k
  if (!active()) {
72
10
    return;
73
10
  }
74
75
  // If the rate is greater than target_rate_, sleep until both rates are equal.
76
9.86k
  if (MonoTime::kMillisecondsPerSecond * data_size > target_rate_ * elapsed.ToMilliseconds()) {
77
7.12k
    auto sleep_time =
78
7.12k
        MonoTime::kMillisecondsPerSecond * data_size / target_rate_ - elapsed.ToMilliseconds();
79
0
    VLOG(1) << " target_rate_=" << target_rate_
80
0
            << " elapsed=" << elapsed.ToMilliseconds()
81
0
            << " received size=" << data_size
82
0
            << " and sleeping for=" << sleep_time;
83
7.12k
    SleepFor(MonoDelta::FromMilliseconds(sleep_time));
84
7.12k
#if defined(OS_MACOSX)
85
7.12k
    total_time_slept_ += MonoDelta::FromMilliseconds(sleep_time);
86
7.12k
#endif
87
7.12k
    end_time_ = MonoTime::Now();
88
    // If we slept for more than 80% of time_slot_ms_, reduce the size of this time slot.
89
7.12k
    if (sleep_time > time_slot_ms_ * 80 / 100) {
90
112
      time_slot_ms_ = std::max(min_time_slot_, time_slot_ms_ / 2);
91
112
    }
92
2.73k
  } else {
93
2.73k
    time_slot_ms_ = std::min(max_time_slot_, time_slot_ms_ * 2);
94
2.73k
  }
95
9.86k
}
96
97
22.5k
void RateLimiter::UpdateRate() {
98
22.5k
  if (!active()) {
99
0
    VLOG(1) << "RateLimiter inactive";
100
10
    return;
101
10
  }
102
22.5k
  auto target_rate = target_rate_updater_();
103
7
  VLOG(1) << "New target_rate: " << target_rate;
104
22.5k
  if (target_rate != target_rate_) {
105
6.97k
    rate_start_time_ = MonoTime::Now();
106
6.97k
  }
107
22.5k
  target_rate_ = target_rate;
108
109
22.5k
  if (target_rate_ < FLAGS_rate_limiter_min_rate) {
110
0
    VLOG(1) << "Received transmission rate is less than minimum " << FLAGS_rate_limiter_min_rate;
111
4
    target_rate_ = FLAGS_rate_limiter_min_rate;
112
4
  }
113
22.5k
}
114
115
11.6k
inline uint64_t RateLimiter::GetSizeForNextTimeSlot() {
116
18.4E
  VLOG(1) << "target_rate_ " << target_rate_;
117
  // We don't want to transmit less than min_size_ bytes.
118
11.6k
  min_time_slot_ = (MonoTime::kMillisecondsPerSecond * FLAGS_rate_limiter_min_size + target_rate_) /
119
11.6k
                   target_rate_;
120
0
  VLOG(1) << "min_size=" << FLAGS_rate_limiter_min_size
121
0
          << " max_transmission_rate=" << target_rate_
122
0
          << " min_time_slot=" << min_time_slot_;
123
18.4E
  VLOG(1) << "Max allowed bytes per time slot: "
124
18.4E
             << target_rate_  * min_time_slot_  / MonoTime::kMillisecondsPerSecond;
125
1
  VLOG(1) << "time_slot_size: " << time_slot_ms_;
126
11.6k
  return target_rate_ * time_slot_ms_ / MonoTime::kMillisecondsPerSecond;
127
11.6k
}
128
129
6.77k
void RateLimiter::Init() {
130
6.77k
  start_time_ = MonoTime::Now();
131
6.77k
  rate_start_time_ = start_time_;
132
6.77k
  end_time_ = start_time_;
133
6.77k
  init_ = true;
134
6.77k
}
135
136
0
void RateLimiter::SetTargetRate(uint64_t target_rate) {
137
0
  target_rate_ = target_rate;
138
0
}
139
140
Status RateLimiter::SendOrReceiveData(std::function<Status()> send_rcv_func,
141
6.04k
                                      std::function<uint64()> reply_size_func) {
142
6.04k
  auto start = MonoTime::Now();
143
6.04k
  if (!init_) {
144
5.77k
    Init();
145
5.77k
  }
146
147
6.04k
  UpdateRate();
148
6.04k
  auto status = send_rcv_func();
149
6.04k
  auto now = MonoTime::Now();
150
6.04k
  auto elapsed = now.GetDeltaSince(start);
151
6.04k
  if (status.ok()) {
152
5.04k
    auto data_size = reply_size_func();
153
5.04k
    total_bytes_ += data_size;
154
5.04k
    end_time_ = MonoTime::Now();
155
5.04k
    UpdateTimeSlotSizeAndMaybeSleep(data_size, elapsed);
156
5.04k
  }
157
6.04k
  return status;
158
6.04k
}
159
} // namespace yb