/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_controller.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/db/write_controller.h" |
22 | | |
23 | | #include <atomic> |
24 | | |
25 | | #include "yb/rocksdb/env.h" |
26 | | |
27 | | #include "yb/util/atomic.h" |
28 | | #include "yb/util/flag_tags.h" |
29 | | |
30 | | DEFINE_test_flag(bool, allow_stop_writes, true, |
31 | | "Whether it is allowed to stop writes in tests."); |
32 | | |
33 | | namespace rocksdb { |
34 | | |
35 | 4.41k | std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() { |
36 | 4.41k | CHECK(yb::GetAtomicFlag(&FLAGS_TEST_allow_stop_writes)); |
37 | 4.41k | ++total_stopped_; |
38 | 4.41k | return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); |
39 | 4.41k | } |
40 | | |
41 | | std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( |
42 | 906 | uint64_t write_rate) { |
43 | 906 | total_delayed_++; |
44 | | // Reset counters. |
45 | 906 | last_refill_time_ = 0; |
46 | 906 | bytes_left_ = 0; |
47 | 906 | set_delayed_write_rate(write_rate); |
48 | 906 | return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); |
49 | 906 | } |
50 | | |
51 | | std::unique_ptr<WriteControllerToken> |
52 | 1.23M | WriteController::GetCompactionPressureToken() { |
53 | 1.23M | ++total_compaction_pressure_; |
54 | 1.23M | return std::unique_ptr<WriteControllerToken>( |
55 | 1.23M | new CompactionPressureToken(this)); |
56 | 1.23M | } |
57 | | |
58 | 29.2M | bool WriteController::IsStopped() const { return total_stopped_ > 0; } |
59 | | // This is inside DB mutex, so we can't sleep and need to minimize |
60 | | // frequency to get time. |
61 | | // If it turns out to be a performance issue, we can redesign the thread |
62 | | // synchronization model here. |
63 | | // The function trust caller will sleep micros returned. |
64 | 14.1k | uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { |
65 | 14.1k | if (total_stopped_ > 0) { |
66 | 1.67k | return 0; |
67 | 1.67k | } |
68 | 12.4k | if (total_delayed_ == 0) { |
69 | 1 | return 0; |
70 | 1 | } |
71 | | |
72 | 12.4k | const uint64_t kMicrosPerSecond = 1000000; |
73 | 12.4k | const uint64_t kRefillInterval = 1024U; |
74 | | |
75 | 12.4k | if (bytes_left_ >= num_bytes) { |
76 | 10.2k | bytes_left_ -= num_bytes; |
77 | 10.2k | return 0; |
78 | 10.2k | } |
79 | | // The frequency to get time inside DB mutex is less than one per refill |
80 | | // interval. |
81 | 2.23k | auto time_now = env->NowMicros(); |
82 | | |
83 | 2.23k | uint64_t sleep_debt = 0; |
84 | 2.23k | uint64_t time_since_last_refill = 0; |
85 | 2.23k | if (last_refill_time_ != 0) { |
86 | 1.70k | if (last_refill_time_ > time_now) { |
87 | 2 | sleep_debt = last_refill_time_ - time_now; |
88 | 1.70k | } else { |
89 | 1.70k | time_since_last_refill = time_now - last_refill_time_; |
90 | 1.70k | bytes_left_ += |
91 | 1.70k | static_cast<uint64_t>(static_cast<double>(time_since_last_refill) / |
92 | 1.70k | kMicrosPerSecond * delayed_write_rate_); |
93 | 1.70k | if (time_since_last_refill >= kRefillInterval && |
94 | 1.70k | bytes_left_ > num_bytes170 ) { |
95 | | // If refill interval already passed and we have enough bytes |
96 | | // return without extra sleeping. |
97 | 131 | last_refill_time_ = time_now; |
98 | 131 | bytes_left_ -= num_bytes; |
99 | 131 | return 0; |
100 | 131 | } |
101 | 1.70k | } |
102 | 1.70k | } |
103 | | |
104 | 2.10k | uint64_t single_refill_amount = |
105 | 2.10k | delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; |
106 | 2.10k | if (bytes_left_ + single_refill_amount >= num_bytes) { |
107 | | // Wait until a refill interval |
108 | | // Never trigger expire for less than one refill interval to avoid to get |
109 | | // time. |
110 | 1.28k | bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; |
111 | 1.28k | last_refill_time_ = time_now + kRefillInterval; |
112 | 1.28k | return kRefillInterval + sleep_debt; |
113 | 1.28k | } |
114 | | |
115 | | // Need to refill more than one interval. Need to sleep longer. Check |
116 | | // whether expiration will hit |
117 | | |
118 | | // Sleep just until `num_bytes` is allowed. |
119 | 819 | uint64_t sleep_amount = |
120 | 819 | static_cast<uint64_t>(num_bytes / |
121 | 819 | static_cast<long double>(delayed_write_rate_) * |
122 | 819 | kMicrosPerSecond) + |
123 | 819 | sleep_debt; |
124 | 819 | last_refill_time_ = time_now + sleep_amount; |
125 | 819 | return sleep_amount; |
126 | 2.10k | } |
127 | | |
128 | 4.41k | StopWriteToken::~StopWriteToken() { |
129 | 4.41k | assert(controller_->total_stopped_ >= 1); |
130 | 0 | --controller_->total_stopped_; |
131 | 4.41k | } |
132 | | |
133 | 906 | DelayWriteToken::~DelayWriteToken() { |
134 | 906 | controller_->total_delayed_--; |
135 | 906 | assert(controller_->total_delayed_ >= 0); |
136 | 906 | } |
137 | | |
138 | 1.19M | CompactionPressureToken::~CompactionPressureToken() { |
139 | 1.19M | controller_->total_compaction_pressure_--; |
140 | 1.19M | assert(controller_->total_compaction_pressure_ >= 0); |
141 | 1.19M | } |
142 | | |
143 | | } // namespace rocksdb |