YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/operation_counter.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/util/operation_counter.h"
15
16
#include <thread>
17
18
#include <glog/logging.h>
19
20
#include "yb/gutil/strings/substitute.h"
21
22
#include "yb/util/debug/long_operation_tracker.h"
23
#include "yb/util/logging.h"
24
#include "yb/util/status_format.h"
25
#include "yb/util/status_log.h"
26
#include "yb/util/trace.h"
27
28
using namespace std::literals;
29
30
using strings::Substitute;
31
32
namespace yb {
33
34
88.7k
OperationCounter::OperationCounter(const std::string& log_prefix) : log_prefix_(log_prefix) {
35
88.7k
}
36
37
47.7k
void OperationCounter::Shutdown() {
38
47.7k
  auto wait_start = CoarseMonoClock::now();
39
47.7k
  auto last_report = wait_start;
40
47.7k
  for (;;) {
41
47.7k
    auto value = value_.load(std::memory_order_acquire);
42
47.7k
    if (value == 0) {
43
47.7k
      break;
44
47.7k
    }
45
15
    auto now = CoarseMonoClock::now();
46
15
    if (now > last_report + std::chrono::seconds(10)) {
47
0
      LOG_WITH_PREFIX(WARNING)
48
0
          << "Long wait for scope counter shutdown " << value << ": " << AsString(now - wait_start);
49
0
      last_report = now;
50
0
    }
51
15
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
52
15
  }
53
47.7k
}
54
55
1.65M
void OperationCounter::Release() {
56
1.65M
  value_.fetch_sub(1, std::memory_order_acq_rel);
57
1.65M
}
58
59
1.65M
void OperationCounter::Acquire() {
60
1.65M
  value_.fetch_add(1, std::memory_order_acq_rel);
61
1.65M
}
62
63
1.65M
ScopedOperation::ScopedOperation(OperationCounter* counter) : counter_(counter) {
64
1.65M
  counter->Acquire();
65
1.65M
}
66
67
namespace {
68
69
// Using upper bits of counter as special flags.
70
constexpr uint64_t kStopDelta = 1ull << 63u;
71
constexpr uint64_t kDisabledDelta = 1ull << 48u;
72
constexpr uint64_t kOpCounterMask = kDisabledDelta - 1;
73
constexpr uint64_t kDisabledCounterMask = ~kOpCounterMask;
74
75
}
76
77
// Return pending operations counter value only.
78
490k
uint64_t RWOperationCounter::GetOpCounter() const {
79
490k
  return Get() & kOpCounterMask;
80
490k
}
81
82
152M
uint64_t RWOperationCounter::Update(uint64_t delta) {
83
152M
  uint64_t result = counters_.fetch_add(delta, std::memory_order::memory_order_acq_rel) + delta;
84
18.4E
  VLOG(2) << "[" << this << "] Update(" << static_cast<int64_t>(delta) << "), result = " << result;
85
  // Ensure that there is no underflow in either counter.
86
152M
  DCHECK_EQ((result & (kStopDelta >> 1u)), 0); // Counter of DisableAndWaitForOps() calls.
87
152M
  DCHECK_EQ((result & (kDisabledDelta >> 1u)), 0); // Counter of pending operations.
88
152M
  return result;
89
152M
}
90
91
169k
bool RWOperationCounter::WaitMutexAndIncrement(CoarseTimePoint deadline) {
92
169k
  if (deadline == CoarseTimePoint()) {
93
169k
    deadline = CoarseMonoClock::now() + 10ms;
94
27
  } else if (deadline == CoarseTimePoint::min()) {
95
15
    return false;
96
15
  }
97
169k
  for (;;) {
98
169k
    std::unique_lock<decltype(disable_)> lock(disable_, deadline);
99
169k
    if (!lock.owns_lock()) {
100
168k
      return false;
101
168k
    }
102
103
1.18k
    if (Increment()) {
104
1.10k
      return true;
105
1.10k
    }
106
107
75
    if (counters_.load(std::memory_order_acquire) & kStopDelta) {
108
38
      return false;
109
38
    }
110
75
  }
111
169k
}
112
113
387k
void RWOperationCounter::Enable(Unlock unlock, Stop was_stop) {
114
387k
  Update(-(was_stop ? kStopDelta : kDisabledDelta));
115
387k
  if (unlock) {
116
387k
    UnlockExclusiveOpMutex();
117
387k
  }
118
387k
}
119
120
76.1M
bool RWOperationCounter::Increment() {
121
76.1M
  if (Update(1) & kDisabledCounterMask) {
122
169k
    Update(-1);
123
169k
    return false;
124
169k
  }
125
126
75.9M
  return true;
127
75.9M
}
128
129
488k
Status RWOperationCounter::DisableAndWaitForOps(const CoarseTimePoint& deadline, Stop stop) {
130
488k
  LongOperationTracker long_operation_tracker(__func__, 1s);
131
132
488k
  const auto start_time = CoarseMonoClock::now();
133
488k
  std::unique_lock<decltype(disable_)> lock(disable_, deadline);
134
488k
  if (!lock.owns_lock()) {
135
0
    return STATUS(TimedOut, "Timed out waiting to disable the resource exclusively");
136
0
  }
137
138
488k
  Update(stop ? kStopDelta : kDisabledDelta);
139
488k
  auto status = WaitForOpsToFinish(start_time, deadline);
140
488k
  if (!status.ok()) {
141
0
    Enable(Unlock::kFalse, stop);
142
0
    return status;
143
0
  }
144
145
488k
  lock.release();
146
488k
  return Status::OK();
147
488k
}
148
149
// The implementation is based on OperationTracker::WaitForAllToFinish.
150
Status RWOperationCounter::WaitForOpsToFinish(
151
489k
    const CoarseTimePoint& start_time, const CoarseTimePoint& deadline) {
152
489k
  const auto complain_interval = 1s;
153
489k
  int64_t num_pending_ops = 0;
154
489k
  int num_complaints = 0;
155
489k
  auto wait_time = 250us;
156
157
490k
  while ((num_pending_ops = GetOpCounter()) > 0) {
158
1.24k
    auto now = CoarseMonoClock::now();
159
1.24k
    auto waited_time = now - start_time;
160
1.24k
    if (now > deadline) {
161
0
      return STATUS_FORMAT(
162
0
          TimedOut,
163
0
          "Timed out waiting for all pending operations to complete. "
164
0
              "$0 transactions pending. Waited for $1",
165
0
          num_pending_ops, waited_time);
166
0
    }
167
1.24k
    if (waited_time > num_complaints * complain_interval) {
168
164
      LOG(WARNING) << Format("Waiting for $0 pending operations to complete now for $1",
169
164
                             num_pending_ops, waited_time);
170
164
      num_complaints++;
171
164
    }
172
1.24k
    std::this_thread::sleep_until(std::min(deadline, now + wait_time));
173
1.24k
    wait_time = std::min(wait_time * 5 / 4, 1000000us);
174
1.24k
  }
175
176
489k
  return Status::OK();
177
489k
}
178
179
ScopedRWOperation::ScopedRWOperation(RWOperationCounter* counter, const CoarseTimePoint& deadline)
180
    : data_{counter, counter ? counter->resource_name() : ""
181
#ifndef NDEBUG
182
            , counter ? LongOperationTracker("ScopedRWOperation", 1s) : LongOperationTracker()
183
#endif
184
88.5M
      } {
185
88.5M
  if (counter != nullptr) {
186
    // The race condition between IsReady() and Increment() is OK, because we are checking if
187
    // anyone has started an exclusive operation since we did the increment, and don't proceed
188
    // with this shared-ownership operation in that case.
189
76.1M
    VTRACE(1, "$0 $1", __func__, resource_name());
190
76.1M
    if (!counter->Increment() && !counter->WaitMutexAndIncrement(deadline)) {
191
168k
      data_.counter_ = nullptr;
192
168k
    }
193
76.1M
  }
194
88.5M
}
195
196
90.3M
ScopedRWOperation::~ScopedRWOperation() {
197
90.3M
  Reset();
198
90.3M
}
199
200
95.5M
void ScopedRWOperation::Reset() {
201
95.5M
  VTRACE(1, "$0 $1", __func__, resource_name());
202
95.5M
  if (data_.counter_ != nullptr) {
203
75.9M
    data_.counter_->Decrement();
204
75.9M
    data_.counter_ = nullptr;
205
75.9M
  }
206
95.5M
  data_.resource_name_ = "";
207
95.5M
}
208
209
ScopedRWOperationPause::ScopedRWOperationPause(
210
483k
    RWOperationCounter* counter, const CoarseTimePoint& deadline, Stop stop) {
211
483k
  VTRACE(1, "$0 $1", __func__, resource_name());
212
483k
  if (counter != nullptr) {
213
483k
    data_.status_ = counter->DisableAndWaitForOps(deadline, stop);
214
484k
    if (data_.status_.ok()) {
215
484k
      data_.counter_ = counter;
216
484k
    }
217
483k
  }
218
483k
  data_.was_stop_ = stop;
219
483k
}
220
221
3.04M
ScopedRWOperationPause::~ScopedRWOperationPause() {
222
3.04M
  VTRACE(1, "$0 $1", __func__, resource_name());
223
3.04M
  Reset();
224
3.04M
}
225
226
4.04M
void ScopedRWOperationPause::Reset() {
227
4.04M
  if (data_.counter_ != nullptr) {
228
387k
    data_.counter_->Enable(Unlock(data_.status_.ok()), data_.was_stop_);
229
    // Prevent from the destructor calling Enable again.
230
387k
    data_.counter_ = nullptr;
231
387k
  }
232
4.04M
}
233
234
96.3k
void ScopedRWOperationPause::ReleaseMutexButKeepDisabled() {
235
96.3k
  CHECK_OK(data_.status_);
236
96.3k
  CHECK_NOTNULL(data_.counter_);
237
96.3k
  CHECK(data_.was_stop_);
238
96.3k
  data_.counter_->UnlockExclusiveOpMutex();
239
  // Make sure the destructor has no effect when it runs.
240
96.3k
  data_.counter_ = nullptr;
241
96.3k
}
242
243
7.43k
Status MoveStatus(const ScopedRWOperation& scoped) {
244
0
  return scoped.ok() ? Status::OK()
245
7.43k
                     : STATUS_FORMAT(TryAgain, "Resource unavailable: $0", scoped.resource_name());
246
7.43k
}
247
248
}  // namespace yb