/Users/deen/code/yugabyte-db/src/yb/util/operation_counter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/util/operation_counter.h" |
15 | | |
16 | | #include <thread> |
17 | | |
18 | | #include <glog/logging.h> |
19 | | |
20 | | #include "yb/gutil/strings/substitute.h" |
21 | | |
22 | | #include "yb/util/debug/long_operation_tracker.h" |
23 | | #include "yb/util/logging.h" |
24 | | #include "yb/util/status_format.h" |
25 | | #include "yb/util/status_log.h" |
26 | | #include "yb/util/trace.h" |
27 | | |
28 | | using namespace std::literals; |
29 | | |
30 | | using strings::Substitute; |
31 | | |
32 | | namespace yb { |
33 | | |
34 | 88.7k | OperationCounter::OperationCounter(const std::string& log_prefix) : log_prefix_(log_prefix) { |
35 | 88.7k | } |
36 | | |
37 | 47.7k | void OperationCounter::Shutdown() { |
38 | 47.7k | auto wait_start = CoarseMonoClock::now(); |
39 | 47.7k | auto last_report = wait_start; |
40 | 47.7k | for (;;) { |
41 | 47.7k | auto value = value_.load(std::memory_order_acquire); |
42 | 47.7k | if (value == 0) { |
43 | 47.7k | break; |
44 | 47.7k | } |
45 | 15 | auto now = CoarseMonoClock::now(); |
46 | 15 | if (now > last_report + std::chrono::seconds(10)) { |
47 | 0 | LOG_WITH_PREFIX(WARNING) |
48 | 0 | << "Long wait for scope counter shutdown " << value << ": " << AsString(now - wait_start); |
49 | 0 | last_report = now; |
50 | 0 | } |
51 | 15 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
52 | 15 | } |
53 | 47.7k | } |
54 | | |
55 | 1.65M | void OperationCounter::Release() { |
56 | 1.65M | value_.fetch_sub(1, std::memory_order_acq_rel); |
57 | 1.65M | } |
58 | | |
59 | 1.65M | void OperationCounter::Acquire() { |
60 | 1.65M | value_.fetch_add(1, std::memory_order_acq_rel); |
61 | 1.65M | } |
62 | | |
63 | 1.65M | ScopedOperation::ScopedOperation(OperationCounter* counter) : counter_(counter) { |
64 | 1.65M | counter->Acquire(); |
65 | 1.65M | } |
66 | | |
67 | | namespace { |
68 | | |
69 | | // Using upper bits of counter as special flags. |
70 | | constexpr uint64_t kStopDelta = 1ull << 63u; |
71 | | constexpr uint64_t kDisabledDelta = 1ull << 48u; |
72 | | constexpr uint64_t kOpCounterMask = kDisabledDelta - 1; |
73 | | constexpr uint64_t kDisabledCounterMask = ~kOpCounterMask; |
74 | | |
75 | | } |
76 | | |
77 | | // Return pending operations counter value only. |
78 | 490k | uint64_t RWOperationCounter::GetOpCounter() const { |
79 | 490k | return Get() & kOpCounterMask; |
80 | 490k | } |
81 | | |
82 | 152M | uint64_t RWOperationCounter::Update(uint64_t delta) { |
83 | 152M | uint64_t result = counters_.fetch_add(delta, std::memory_order::memory_order_acq_rel) + delta; |
84 | 18.4E | VLOG(2) << "[" << this << "] Update(" << static_cast<int64_t>(delta) << "), result = " << result; |
85 | | // Ensure that there is no underflow in either counter. |
86 | 152M | DCHECK_EQ((result & (kStopDelta >> 1u)), 0); // Counter of DisableAndWaitForOps() calls. |
87 | 152M | DCHECK_EQ((result & (kDisabledDelta >> 1u)), 0); // Counter of pending operations. |
88 | 152M | return result; |
89 | 152M | } |
90 | | |
91 | 169k | bool RWOperationCounter::WaitMutexAndIncrement(CoarseTimePoint deadline) { |
92 | 169k | if (deadline == CoarseTimePoint()) { |
93 | 169k | deadline = CoarseMonoClock::now() + 10ms; |
94 | 27 | } else if (deadline == CoarseTimePoint::min()) { |
95 | 15 | return false; |
96 | 15 | } |
97 | 169k | for (;;) { |
98 | 169k | std::unique_lock<decltype(disable_)> lock(disable_, deadline); |
99 | 169k | if (!lock.owns_lock()) { |
100 | 168k | return false; |
101 | 168k | } |
102 | | |
103 | 1.18k | if (Increment()) { |
104 | 1.10k | return true; |
105 | 1.10k | } |
106 | | |
107 | 75 | if (counters_.load(std::memory_order_acquire) & kStopDelta) { |
108 | 38 | return false; |
109 | 38 | } |
110 | 75 | } |
111 | 169k | } |
112 | | |
113 | 387k | void RWOperationCounter::Enable(Unlock unlock, Stop was_stop) { |
114 | 387k | Update(-(was_stop ? kStopDelta : kDisabledDelta)); |
115 | 387k | if (unlock) { |
116 | 387k | UnlockExclusiveOpMutex(); |
117 | 387k | } |
118 | 387k | } |
119 | | |
120 | 76.1M | bool RWOperationCounter::Increment() { |
121 | 76.1M | if (Update(1) & kDisabledCounterMask) { |
122 | 169k | Update(-1); |
123 | 169k | return false; |
124 | 169k | } |
125 | | |
126 | 75.9M | return true; |
127 | 75.9M | } |
128 | | |
129 | 488k | Status RWOperationCounter::DisableAndWaitForOps(const CoarseTimePoint& deadline, Stop stop) { |
130 | 488k | LongOperationTracker long_operation_tracker(__func__, 1s); |
131 | | |
132 | 488k | const auto start_time = CoarseMonoClock::now(); |
133 | 488k | std::unique_lock<decltype(disable_)> lock(disable_, deadline); |
134 | 488k | if (!lock.owns_lock()) { |
135 | 0 | return STATUS(TimedOut, "Timed out waiting to disable the resource exclusively"); |
136 | 0 | } |
137 | | |
138 | 488k | Update(stop ? kStopDelta : kDisabledDelta); |
139 | 488k | auto status = WaitForOpsToFinish(start_time, deadline); |
140 | 488k | if (!status.ok()) { |
141 | 0 | Enable(Unlock::kFalse, stop); |
142 | 0 | return status; |
143 | 0 | } |
144 | | |
145 | 488k | lock.release(); |
146 | 488k | return Status::OK(); |
147 | 488k | } |
148 | | |
149 | | // The implementation is based on OperationTracker::WaitForAllToFinish. |
150 | | Status RWOperationCounter::WaitForOpsToFinish( |
151 | 489k | const CoarseTimePoint& start_time, const CoarseTimePoint& deadline) { |
152 | 489k | const auto complain_interval = 1s; |
153 | 489k | int64_t num_pending_ops = 0; |
154 | 489k | int num_complaints = 0; |
155 | 489k | auto wait_time = 250us; |
156 | | |
157 | 490k | while ((num_pending_ops = GetOpCounter()) > 0) { |
158 | 1.24k | auto now = CoarseMonoClock::now(); |
159 | 1.24k | auto waited_time = now - start_time; |
160 | 1.24k | if (now > deadline) { |
161 | 0 | return STATUS_FORMAT( |
162 | 0 | TimedOut, |
163 | 0 | "Timed out waiting for all pending operations to complete. " |
164 | 0 | "$0 transactions pending. Waited for $1", |
165 | 0 | num_pending_ops, waited_time); |
166 | 0 | } |
167 | 1.24k | if (waited_time > num_complaints * complain_interval) { |
168 | 164 | LOG(WARNING) << Format("Waiting for $0 pending operations to complete now for $1", |
169 | 164 | num_pending_ops, waited_time); |
170 | 164 | num_complaints++; |
171 | 164 | } |
172 | 1.24k | std::this_thread::sleep_until(std::min(deadline, now + wait_time)); |
173 | 1.24k | wait_time = std::min(wait_time * 5 / 4, 1000000us); |
174 | 1.24k | } |
175 | | |
176 | 489k | return Status::OK(); |
177 | 489k | } |
178 | | |
179 | | ScopedRWOperation::ScopedRWOperation(RWOperationCounter* counter, const CoarseTimePoint& deadline) |
180 | | : data_{counter, counter ? counter->resource_name() : "" |
181 | | #ifndef NDEBUG |
182 | | , counter ? LongOperationTracker("ScopedRWOperation", 1s) : LongOperationTracker() |
183 | | #endif |
184 | 88.5M | } { |
185 | 88.5M | if (counter != nullptr) { |
186 | | // The race condition between IsReady() and Increment() is OK, because we are checking if |
187 | | // anyone has started an exclusive operation since we did the increment, and don't proceed |
188 | | // with this shared-ownership operation in that case. |
189 | 76.1M | VTRACE(1, "$0 $1", __func__, resource_name()); |
190 | 76.1M | if (!counter->Increment() && !counter->WaitMutexAndIncrement(deadline)) { |
191 | 168k | data_.counter_ = nullptr; |
192 | 168k | } |
193 | 76.1M | } |
194 | 88.5M | } |
195 | | |
196 | 90.3M | ScopedRWOperation::~ScopedRWOperation() { |
197 | 90.3M | Reset(); |
198 | 90.3M | } |
199 | | |
200 | 95.5M | void ScopedRWOperation::Reset() { |
201 | 95.5M | VTRACE(1, "$0 $1", __func__, resource_name()); |
202 | 95.5M | if (data_.counter_ != nullptr) { |
203 | 75.9M | data_.counter_->Decrement(); |
204 | 75.9M | data_.counter_ = nullptr; |
205 | 75.9M | } |
206 | 95.5M | data_.resource_name_ = ""; |
207 | 95.5M | } |
208 | | |
209 | | ScopedRWOperationPause::ScopedRWOperationPause( |
210 | 483k | RWOperationCounter* counter, const CoarseTimePoint& deadline, Stop stop) { |
211 | 483k | VTRACE(1, "$0 $1", __func__, resource_name()); |
212 | 483k | if (counter != nullptr) { |
213 | 483k | data_.status_ = counter->DisableAndWaitForOps(deadline, stop); |
214 | 484k | if (data_.status_.ok()) { |
215 | 484k | data_.counter_ = counter; |
216 | 484k | } |
217 | 483k | } |
218 | 483k | data_.was_stop_ = stop; |
219 | 483k | } |
220 | | |
221 | 3.04M | ScopedRWOperationPause::~ScopedRWOperationPause() { |
222 | 3.04M | VTRACE(1, "$0 $1", __func__, resource_name()); |
223 | 3.04M | Reset(); |
224 | 3.04M | } |
225 | | |
226 | 4.04M | void ScopedRWOperationPause::Reset() { |
227 | 4.04M | if (data_.counter_ != nullptr) { |
228 | 387k | data_.counter_->Enable(Unlock(data_.status_.ok()), data_.was_stop_); |
229 | | // Prevent from the destructor calling Enable again. |
230 | 387k | data_.counter_ = nullptr; |
231 | 387k | } |
232 | 4.04M | } |
233 | | |
234 | 96.3k | void ScopedRWOperationPause::ReleaseMutexButKeepDisabled() { |
235 | 96.3k | CHECK_OK(data_.status_); |
236 | 96.3k | CHECK_NOTNULL(data_.counter_); |
237 | 96.3k | CHECK(data_.was_stop_); |
238 | 96.3k | data_.counter_->UnlockExclusiveOpMutex(); |
239 | | // Make sure the destructor has no effect when it runs. |
240 | 96.3k | data_.counter_ = nullptr; |
241 | 96.3k | } |
242 | | |
243 | 7.43k | Status MoveStatus(const ScopedRWOperation& scoped) { |
244 | 0 | return scoped.ok() ? Status::OK() |
245 | 7.43k | : STATUS_FORMAT(TryAgain, "Resource unavailable: $0", scoped.resource_name()); |
246 | 7.43k | } |
247 | | |
248 | | } // namespace yb |