YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_controller.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/db/write_controller.h"
22
23
#include <atomic>
24
25
#include "yb/rocksdb/env.h"
26
27
#include "yb/util/atomic.h"
28
#include "yb/util/flag_tags.h"
29
30
DEFINE_test_flag(bool, allow_stop_writes, true,
31
                 "Whether it is allowed to stop writes in tests.");
32
33
namespace rocksdb {
34
35
4.41k
std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
36
4.41k
  CHECK(yb::GetAtomicFlag(&FLAGS_TEST_allow_stop_writes));
37
4.41k
  ++total_stopped_;
38
4.41k
  return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
39
4.41k
}
40
41
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
42
906
    uint64_t write_rate) {
43
906
  total_delayed_++;
44
  // Reset counters.
45
906
  last_refill_time_ = 0;
46
906
  bytes_left_ = 0;
47
906
  set_delayed_write_rate(write_rate);
48
906
  return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
49
906
}
50
51
std::unique_ptr<WriteControllerToken>
52
1.23M
WriteController::GetCompactionPressureToken() {
53
1.23M
  ++total_compaction_pressure_;
54
1.23M
  return std::unique_ptr<WriteControllerToken>(
55
1.23M
      new CompactionPressureToken(this));
56
1.23M
}
57
58
29.2M
bool WriteController::IsStopped() const { return total_stopped_ > 0; }
59
// This is inside DB mutex, so we can't sleep and need to minimize
60
// frequency to get time.
61
// If it turns out to be a performance issue, we can redesign the thread
62
// synchronization model here.
63
// The function trust caller will sleep micros returned.
64
14.1k
uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
65
14.1k
  if (total_stopped_ > 0) {
66
1.67k
    return 0;
67
1.67k
  }
68
12.4k
  if (total_delayed_ == 0) {
69
1
    return 0;
70
1
  }
71
72
12.4k
  const uint64_t kMicrosPerSecond = 1000000;
73
12.4k
  const uint64_t kRefillInterval = 1024U;
74
75
12.4k
  if (bytes_left_ >= num_bytes) {
76
10.2k
    bytes_left_ -= num_bytes;
77
10.2k
    return 0;
78
10.2k
  }
79
  // The frequency to get time inside DB mutex is less than one per refill
80
  // interval.
81
2.23k
  auto time_now = env->NowMicros();
82
83
2.23k
  uint64_t sleep_debt = 0;
84
2.23k
  uint64_t time_since_last_refill = 0;
85
2.23k
  if (last_refill_time_ != 0) {
86
1.70k
    if (last_refill_time_ > time_now) {
87
2
      sleep_debt = last_refill_time_ - time_now;
88
1.70k
    } else {
89
1.70k
      time_since_last_refill = time_now - last_refill_time_;
90
1.70k
      bytes_left_ +=
91
1.70k
          static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
92
1.70k
                                kMicrosPerSecond * delayed_write_rate_);
93
1.70k
      if (time_since_last_refill >= kRefillInterval &&
94
1.70k
          
bytes_left_ > num_bytes170
) {
95
        // If refill interval already passed and we have enough bytes
96
        // return without extra sleeping.
97
131
        last_refill_time_ = time_now;
98
131
        bytes_left_ -= num_bytes;
99
131
        return 0;
100
131
      }
101
1.70k
    }
102
1.70k
  }
103
104
2.10k
  uint64_t single_refill_amount =
105
2.10k
      delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
106
2.10k
  if (bytes_left_ + single_refill_amount >= num_bytes) {
107
    // Wait until a refill interval
108
    // Never trigger expire for less than one refill interval to avoid to get
109
    // time.
110
1.28k
    bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
111
1.28k
    last_refill_time_ = time_now + kRefillInterval;
112
1.28k
    return kRefillInterval + sleep_debt;
113
1.28k
  }
114
115
  // Need to refill more than one interval. Need to sleep longer. Check
116
  // whether expiration will hit
117
118
  // Sleep just until `num_bytes` is allowed.
119
819
  uint64_t sleep_amount =
120
819
      static_cast<uint64_t>(num_bytes /
121
819
                            static_cast<long double>(delayed_write_rate_) *
122
819
                            kMicrosPerSecond) +
123
819
      sleep_debt;
124
819
  last_refill_time_ = time_now + sleep_amount;
125
819
  return sleep_amount;
126
2.10k
}
127
128
4.41k
StopWriteToken::~StopWriteToken() {
129
4.41k
  assert(controller_->total_stopped_ >= 1);
130
0
  --controller_->total_stopped_;
131
4.41k
}
132
133
906
DelayWriteToken::~DelayWriteToken() {
134
906
  controller_->total_delayed_--;
135
906
  assert(controller_->total_delayed_ >= 0);
136
906
}
137
138
1.19M
CompactionPressureToken::~CompactionPressureToken() {
139
1.19M
  controller_->total_compaction_pressure_--;
140
1.19M
  assert(controller_->total_compaction_pressure_ >= 0);
141
1.19M
}
142
143
}  // namespace rocksdb