/Users/deen/code/yugabyte-db/src/yb/util/operation_counter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/util/operation_counter.h" |
15 | | |
16 | | #include <thread> |
17 | | |
18 | | #include <glog/logging.h> |
19 | | |
20 | | #include "yb/gutil/strings/substitute.h" |
21 | | |
22 | | #include "yb/util/debug/long_operation_tracker.h" |
23 | | #include "yb/util/logging.h" |
24 | | #include "yb/util/status_format.h" |
25 | | #include "yb/util/status_log.h" |
26 | | #include "yb/util/trace.h" |
27 | | #include "yb/util/tsan_util.h" |
28 | | |
29 | | using namespace std::literals; |
30 | | |
31 | | using strings::Substitute; |
32 | | |
33 | | namespace yb { |
34 | | |
35 | 150k | OperationCounter::OperationCounter(const std::string& log_prefix) : log_prefix_(log_prefix) { |
36 | 150k | } |
37 | | |
38 | 75.6k | void OperationCounter::Shutdown() { |
39 | 75.6k | auto wait_start = CoarseMonoClock::now(); |
40 | 75.6k | auto last_report = wait_start; |
41 | 75.6k | for (;;) { |
42 | 75.6k | auto value = value_.load(std::memory_order_acquire); |
43 | 75.6k | if (value == 0) { |
44 | 75.5k | break; |
45 | 75.5k | } |
46 | 9 | auto now = CoarseMonoClock::now(); |
47 | 9 | if (now > last_report + std::chrono::seconds(10)) { |
48 | 0 | LOG_WITH_PREFIX(WARNING) |
49 | 0 | << "Long wait for scope counter shutdown " << value << ": " << AsString(now - wait_start); |
50 | 0 | last_report = now; |
51 | 0 | } |
52 | 9 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
53 | 9 | } |
54 | 75.6k | } |
55 | | |
56 | 3.17M | void OperationCounter::Release() { |
57 | 3.17M | value_.fetch_sub(1, std::memory_order_acq_rel); |
58 | 3.17M | } |
59 | | |
60 | 3.16M | void OperationCounter::Acquire() { |
61 | 3.16M | value_.fetch_add(1, std::memory_order_acq_rel); |
62 | 3.16M | } |
63 | | |
64 | 3.16M | ScopedOperation::ScopedOperation(OperationCounter* counter) : counter_(counter) { |
65 | 3.16M | counter->Acquire(); |
66 | 3.16M | } |
67 | | |
68 | | namespace { |
69 | | |
70 | | // Using upper bits of counter as special flags. |
71 | | constexpr uint64_t kStopDelta = 1ull << 63u; |
72 | | constexpr uint64_t kDisabledDelta = 1ull << 48u; |
73 | | constexpr uint64_t kOpCounterMask = kDisabledDelta - 1; |
74 | | constexpr uint64_t kDisabledCounterMask = ~kOpCounterMask; |
75 | | |
76 | | } |
77 | | |
78 | | // Return pending operations counter value only. |
79 | 758k | uint64_t RWOperationCounter::GetOpCounter() const { |
80 | 758k | return Get() & kOpCounterMask; |
81 | 758k | } |
82 | | |
83 | 561M | uint64_t RWOperationCounter::Update(uint64_t delta) { |
84 | 561M | uint64_t result = counters_.fetch_add(delta, std::memory_order::memory_order_acq_rel) + delta; |
85 | 18.4E | VLOG(2) << "[" << this << "] Update(" << static_cast<int64_t>(delta) << "), result = " << result; |
86 | | // Ensure that there is no underflow in either counter. |
87 | 561M | DCHECK_EQ((result & (kStopDelta >> 1u)), 0); // Counter of DisableAndWaitForOps() calls. |
88 | 561M | DCHECK_EQ((result & (kDisabledDelta >> 1u)), 0); // Counter of pending operations. |
89 | 561M | return result; |
90 | 561M | } |
91 | | |
92 | 19.2k | bool RWOperationCounter::WaitMutexAndIncrement(CoarseTimePoint deadline) { |
93 | 19.2k | if (deadline == CoarseTimePoint()) { |
94 | 19.2k | deadline = CoarseMonoClock::now() + 10ms * kTimeMultiplier; |
95 | 19.2k | } else if (31 deadline == CoarseTimePoint::min()31 ) { |
96 | 6 | return false; |
97 | 6 | } |
98 | 19.2k | for (;;)19.2k { |
99 | 19.2k | std::unique_lock<decltype(disable_)> lock(disable_, deadline); |
100 | 19.2k | if (!lock.owns_lock()) { |
101 | 17.6k | return false; |
102 | 17.6k | } |
103 | | |
104 | 1.62k | if (Increment()) { |
105 | 1.57k | return true; |
106 | 1.57k | } |
107 | | |
108 | 52 | if (counters_.load(std::memory_order_acquire) & kStopDelta) { |
109 | 45 | return false; |
110 | 45 | } |
111 | 52 | } |
112 | 19.2k | } |
113 | | |
114 | 593k | void RWOperationCounter::Enable(Unlock unlock, Stop was_stop) { |
115 | 593k | Update(-(was_stop ? kStopDelta150k : kDisabledDelta442k )); |
116 | 593k | if (unlock593k ) { |
117 | 593k | UnlockExclusiveOpMutex(); |
118 | 593k | } |
119 | 593k | } |
120 | | |
121 | 280M | bool RWOperationCounter::Increment() { |
122 | 280M | if (Update(1) & kDisabledCounterMask) { |
123 | 19.3k | Update(-1); |
124 | 19.3k | return false; |
125 | 19.3k | } |
126 | | |
127 | 280M | return true; |
128 | 280M | } |
129 | | |
130 | 754k | Status RWOperationCounter::DisableAndWaitForOps(const CoarseTimePoint& deadline, Stop stop) { |
131 | 754k | LongOperationTracker long_operation_tracker(__func__, 1s); |
132 | | |
133 | 754k | const auto start_time = CoarseMonoClock::now(); |
134 | 754k | std::unique_lock<decltype(disable_)> lock(disable_, deadline); |
135 | 754k | if (!lock.owns_lock()) { |
136 | 0 | return STATUS(TimedOut, "Timed out waiting to disable the resource exclusively"); |
137 | 0 | } |
138 | | |
139 | 754k | Update(stop ? kStopDelta312k : kDisabledDelta441k ); |
140 | 754k | auto status = WaitForOpsToFinish(start_time, deadline); |
141 | 754k | if (!status.ok()) { |
142 | 0 | Enable(Unlock::kFalse, stop); |
143 | 0 | return status; |
144 | 0 | } |
145 | | |
146 | 754k | lock.release(); |
147 | 754k | return Status::OK(); |
148 | 754k | } |
149 | | |
150 | | // The implementation is based on OperationTracker::WaitForAllToFinish. |
151 | | Status RWOperationCounter::WaitForOpsToFinish( |
152 | 755k | const CoarseTimePoint& start_time, const CoarseTimePoint& deadline) { |
153 | 755k | const auto complain_interval = 1s; |
154 | 755k | int64_t num_pending_ops = 0; |
155 | 755k | int num_complaints = 0; |
156 | 755k | auto wait_time = 250us; |
157 | | |
158 | 758k | while ((num_pending_ops = GetOpCounter()) > 0) { |
159 | 2.78k | auto now = CoarseMonoClock::now(); |
160 | 2.78k | auto waited_time = now - start_time; |
161 | 2.78k | if (now > deadline) { |
162 | 0 | return STATUS_FORMAT( |
163 | 0 | TimedOut, |
164 | 0 | "Timed out waiting for all pending operations to complete. " |
165 | 0 | "$0 transactions pending. Waited for $1", |
166 | 0 | num_pending_ops, waited_time); |
167 | 0 | } |
168 | 2.78k | if (waited_time > num_complaints * complain_interval) { |
169 | 271 | LOG(WARNING) << Format("Waiting for $0 pending operations to complete now for $1", |
170 | 271 | num_pending_ops, waited_time); |
171 | 271 | num_complaints++; |
172 | 271 | } |
173 | 2.78k | std::this_thread::sleep_until(std::min(deadline, now + wait_time)); |
174 | 2.78k | wait_time = std::min(wait_time * 5 / 4, 1000000us); |
175 | 2.78k | } |
176 | | |
177 | 755k | return Status::OK(); |
178 | 755k | } |
179 | | |
180 | | ScopedRWOperation::ScopedRWOperation(RWOperationCounter* counter, const CoarseTimePoint& deadline) |
181 | | : data_{counter, counter ? counter->resource_name() : "" |
182 | | #ifndef NDEBUG |
183 | | , counter ? LongOperationTracker("ScopedRWOperation", 1s) : LongOperationTracker() |
184 | | #endif |
185 | 307M | } { |
186 | 307M | if (counter != nullptr) { |
187 | | // The race condition between IsReady() and Increment() is OK, because we are checking if |
188 | | // anyone has started an exclusive operation since we did the increment, and don't proceed |
189 | | // with this shared-ownership operation in that case. |
190 | 280M | VTRACE(1, "$0 $1", __func__, resource_name()); |
191 | 280M | if (!counter->Increment() && !counter->WaitMutexAndIncrement(deadline)19.2k ) { |
192 | 17.7k | data_.counter_ = nullptr; |
193 | 17.7k | } |
194 | 280M | } |
195 | 307M | } |
196 | | |
197 | 310M | ScopedRWOperation::~ScopedRWOperation() { |
198 | 310M | Reset(); |
199 | 310M | } |
200 | | |
201 | 320M | void ScopedRWOperation::Reset() { |
202 | 320M | VTRACE(1, "$0 $1", __func__, resource_name()); |
203 | 320M | if (data_.counter_ != nullptr) { |
204 | 280M | data_.counter_->Decrement(); |
205 | 280M | data_.counter_ = nullptr; |
206 | 280M | } |
207 | 320M | data_.resource_name_ = ""; |
208 | 320M | } |
209 | | |
210 | | ScopedRWOperationPause::ScopedRWOperationPause( |
211 | 744k | RWOperationCounter* counter, const CoarseTimePoint& deadline, Stop stop) { |
212 | 744k | VTRACE(1, "$0 $1", __func__, resource_name()); |
213 | 745k | if (counter != nullptr744k ) { |
214 | 745k | data_.status_ = counter->DisableAndWaitForOps(deadline, stop); |
215 | 745k | if (data_.status_.ok()745k ) { |
216 | 745k | data_.counter_ = counter; |
217 | 745k | } |
218 | 745k | } |
219 | 744k | data_.was_stop_ = stop; |
220 | 744k | } |
221 | | |
222 | 4.22M | ScopedRWOperationPause::~ScopedRWOperationPause() { |
223 | 4.22M | VTRACE(1, "$0 $1", __func__, resource_name()); |
224 | 4.22M | Reset(); |
225 | 4.22M | } |
226 | | |
227 | 5.89M | void ScopedRWOperationPause::Reset() { |
228 | 5.89M | if (data_.counter_ != nullptr) { |
229 | 593k | data_.counter_->Enable(Unlock(data_.status_.ok()), data_.was_stop_); |
230 | | // Prevent from the destructor calling Enable again. |
231 | 593k | data_.counter_ = nullptr; |
232 | 593k | } |
233 | 5.89M | } |
234 | | |
235 | 151k | void ScopedRWOperationPause::ReleaseMutexButKeepDisabled() { |
236 | 151k | CHECK_OK(data_.status_); |
237 | 151k | CHECK_NOTNULL(data_.counter_); |
238 | 151k | CHECK(data_.was_stop_); |
239 | 151k | data_.counter_->UnlockExclusiveOpMutex(); |
240 | | // Make sure the destructor has no effect when it runs. |
241 | 151k | data_.counter_ = nullptr; |
242 | 151k | } |
243 | | |
244 | 16.6k | Status MoveStatus(const ScopedRWOperation& scoped) { |
245 | 16.6k | return scoped.ok() ? Status::OK()0 |
246 | 16.6k | : STATUS_FORMAT(TryAgain, "Resource unavailable: $0", scoped.resource_name()); |
247 | 16.6k | } |
248 | | |
249 | | } // namespace yb |