YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/operation_counter.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_UTIL_OPERATION_COUNTER_H
15
#define YB_UTIL_OPERATION_COUNTER_H
16
17
#include <atomic>
18
#include <mutex>
19
20
#include "yb/util/cross_thread_mutex.h"
21
#include "yb/util/debug/long_operation_tracker.h"
22
#include "yb/util/monotime.h"
23
#include "yb/util/status.h"
24
#include "yb/util/strongly_typed_bool.h"
25
26
namespace yb {
27
28
YB_STRONGLY_TYPED_BOOL(Stop);
29
YB_STRONGLY_TYPED_BOOL(Unlock);
30
31
class ScopedOperation;
32
class ScopedRWOperation;
33
34
// Class that counts acquired tokens and don't shutdown until this count drops to zero.
35
class OperationCounter {
36
 public:
37
  explicit OperationCounter(const std::string& log_prefix);
38
39
  void Shutdown();
40
41
  void Acquire();
42
  void Release();
43
44
 private:
45
0
  const std::string& LogPrefix() const {
46
0
    return log_prefix_;
47
0
  }
48
49
  std::string log_prefix_;
50
  std::atomic<size_t> value_{0};
51
};
52
53
class ScopedOperation {
54
 public:
55
16.9M
  ScopedOperation() = default;
56
  explicit ScopedOperation(OperationCounter* counter);
57
58
 private:
59
  struct ScopedCounterDeleter {
60
1.65M
    void operator()(OperationCounter* counter) {
61
1.65M
      counter->Release();
62
1.65M
    }
63
  };
64
65
  std::unique_ptr<OperationCounter, ScopedCounterDeleter> counter_;
66
};
67
68
// This is used to track the number of pending operations using a certain resource (such as
69
// the RocksDB database or the schema within a tablet) so we can safely wait for all operations to
70
// complete and destroy or replace the resource. This is similar to a shared mutex, but allows
71
// fine-grained control, such as preventing new operations from being started.
72
class RWOperationCounter {
73
 public:
74
289k
  explicit RWOperationCounter(const std::string resource_name) : resource_name_(resource_name) {}
75
76
  CHECKED_STATUS DisableAndWaitForOps(const CoarseTimePoint& deadline, Stop stop);
77
78
  void Enable(Unlock unlock, Stop was_stop);
79
80
489k
  void UnlockExclusiveOpMutex() {
81
489k
    disable_.unlock();
82
489k
  }
83
84
  bool Increment();
85
86
75.9M
  void Decrement() { Update(-1); }
87
490k
  uint64_t Get() const {
88
490k
    return counters_.load(std::memory_order::memory_order_acquire);
89
490k
  }
90
91
  // Return pending operations counter value only.
92
  uint64_t GetOpCounter() const;
93
94
  bool WaitMutexAndIncrement(CoarseTimePoint deadline);
95
96
76.0M
  std::string resource_name() const {
97
76.0M
    return resource_name_;
98
76.0M
  }
99
 private:
100
  CHECKED_STATUS WaitForOpsToFinish(
101
      const CoarseTimePoint& start_time, const CoarseTimePoint& deadline);
102
103
  uint64_t Update(uint64_t delta);
104
105
  // The upper 16 bits are used for storing the number of separate operations that have disabled the
106
  // resource. E.g. tablet shutdown running at the same time with Truncate/RestoreSnapshot.
107
  // The lower 48 bits are used to keep track of the number of concurrent read/write operations.
108
  std::atomic<uint64_t> counters_{0};
109
110
  // Mutex to disable the resource exclusively. This mutex is locked by DisableAndWaitForOps after
111
  // waiting for all shared-ownership operations to complete. We need this to avoid a race condition
112
  // between Raft operations that replace RocksDB (apply snapshot / truncate) and tablet shutdown.
113
  yb::CrossThreadMutex disable_;
114
115
  std::string resource_name_;
116
};
117
118
// A convenience class to automatically increment/decrement a RWOperationCounter. This is used
119
// for regular RocksDB read/write operations that are allowed to proceed in parallel. Constructing
120
// a ScopedRWOperation might fail because the counter is in the disabled state. An instance
121
// of this class resembles a Result or a Status, because it can be used with the RETURN_NOT_OK
122
// macro.
123
class ScopedRWOperation {
124
 public:
125
  // Object is not copyable, but movable.
126
  void operator=(const ScopedRWOperation&) = delete;
127
  ScopedRWOperation(const ScopedRWOperation&) = delete;
128
129
  explicit ScopedRWOperation(RWOperationCounter* counter = nullptr,
130
                             const CoarseTimePoint& deadline = CoarseTimePoint());
131
132
1.75M
  ScopedRWOperation(ScopedRWOperation&& op) : data_{std::move(op.data_)} {
133
1.75M
    op.data_.counter_ = nullptr;  // Moved ownership.
134
1.75M
  }
135
136
  ~ScopedRWOperation();
137
138
3.48M
  void operator=(ScopedRWOperation&& op) {
139
3.48M
    Reset();
140
3.48M
    data_ = std::move(op.data_);
141
3.48M
    op.data_.counter_ = nullptr;
142
3.48M
  }
143
144
75.8M
  bool ok() const {
145
75.8M
    return data_.counter_ != nullptr;
146
75.8M
  }
147
148
  void Reset();
149
150
7.43k
  std::string resource_name() const {
151
7.43k
    return data_.counter_ ? data_.counter_->resource_name() : "null";
152
7.43k
  }
153
 private:
154
  struct Data {
155
    RWOperationCounter* counter_ = nullptr;
156
    std::string resource_name_;
157
#ifndef NDEBUG
158
    LongOperationTracker long_operation_tracker_;
159
#endif
160
  };
161
162
  Data data_;
163
};
164
165
// RETURN_NOT_OK macro support.
166
Status MoveStatus(const ScopedRWOperation& scoped);
167
168
// A convenience class to automatically pause/resume a RWOperationCounter.
169
class ScopedRWOperationPause {
170
 public:
171
  // Object is not copyable, but movable.
172
  void operator=(const ScopedRWOperationPause&) = delete;
173
  ScopedRWOperationPause(const ScopedRWOperationPause&) = delete;
174
175
983k
  ScopedRWOperationPause() {}
176
  ScopedRWOperationPause(RWOperationCounter* counter, const CoarseTimePoint& deadline, Stop stop);
177
178
1.57M
  ScopedRWOperationPause(ScopedRWOperationPause&& p) : data_(std::move(p.data_)) {
179
1.57M
    p.data_.counter_ = nullptr;  // Moved ownership.
180
1.57M
  }
181
182
  ~ScopedRWOperationPause();
183
184
  void Reset();
185
186
0
  std::string resource_name() const {
187
0
    return data_.counter_ ? data_.counter_->resource_name() : "null";
188
0
  }
189
190
454k
  void operator=(ScopedRWOperationPause&& p) {
191
454k
    Reset();
192
454k
    data_ = std::move(p.data_);
193
454k
    p.data_.counter_ = nullptr;  // Moved ownership.
194
454k
  }
195
196
  // This is called during tablet shutdown to release the mutex that we took to prevent concurrent
197
  // exclusive-ownership operations on the RocksDB instance, such as truncation and snapshot
198
  // restoration. It is fine to release the mutex because these exclusive operations are not allowed
199
  // to happen after tablet shutdown anyway.
200
  void ReleaseMutexButKeepDisabled();
201
202
484k
  bool ok() const {
203
484k
    return data_.status_.ok();
204
484k
  }
205
206
739k
  Status&& status() {
207
739k
    return std::move(data_.status_);
208
739k
  }
209
210
 private:
211
  struct Data {
212
    RWOperationCounter* counter_ = nullptr;
213
    Status status_;
214
    Stop was_stop_ = Stop::kFalse;
215
  };
216
  Data data_;
217
};
218
219
// RETURN_NOT_OK macro support.
220
0
inline Status&& MoveStatus(ScopedRWOperationPause&& p) {
221
0
  return std::move(p.status());
222
0
}
223
224
} // namespace yb
225
226
#endif // YB_UTIL_OPERATION_COUNTER_H