YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/operation_counter.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/util/operation_counter.h"
15
16
#include <thread>
17
18
#include <glog/logging.h>
19
20
#include "yb/gutil/strings/substitute.h"
21
22
#include "yb/util/debug/long_operation_tracker.h"
23
#include "yb/util/logging.h"
24
#include "yb/util/status_format.h"
25
#include "yb/util/status_log.h"
26
#include "yb/util/trace.h"
27
#include "yb/util/tsan_util.h"
28
29
using namespace std::literals;
30
31
using strings::Substitute;
32
33
namespace yb {
34
35
150k
OperationCounter::OperationCounter(const std::string& log_prefix) : log_prefix_(log_prefix) {
36
150k
}
37
38
75.6k
void OperationCounter::Shutdown() {
39
75.6k
  auto wait_start = CoarseMonoClock::now();
40
75.6k
  auto last_report = wait_start;
41
75.6k
  for (;;) {
42
75.6k
    auto value = value_.load(std::memory_order_acquire);
43
75.6k
    if (value == 0) {
44
75.5k
      break;
45
75.5k
    }
46
9
    auto now = CoarseMonoClock::now();
47
9
    if (now > last_report + std::chrono::seconds(10)) {
48
0
      LOG_WITH_PREFIX(WARNING)
49
0
          << "Long wait for scope counter shutdown " << value << ": " << AsString(now - wait_start);
50
0
      last_report = now;
51
0
    }
52
9
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
53
9
  }
54
75.6k
}
55
56
3.17M
void OperationCounter::Release() {
57
3.17M
  value_.fetch_sub(1, std::memory_order_acq_rel);
58
3.17M
}
59
60
3.16M
void OperationCounter::Acquire() {
61
3.16M
  value_.fetch_add(1, std::memory_order_acq_rel);
62
3.16M
}
63
64
3.16M
ScopedOperation::ScopedOperation(OperationCounter* counter) : counter_(counter) {
65
3.16M
  counter->Acquire();
66
3.16M
}
67
68
namespace {
69
70
// Using upper bits of counter as special flags.
71
constexpr uint64_t kStopDelta = 1ull << 63u;
72
constexpr uint64_t kDisabledDelta = 1ull << 48u;
73
constexpr uint64_t kOpCounterMask = kDisabledDelta - 1;
74
constexpr uint64_t kDisabledCounterMask = ~kOpCounterMask;
75
76
}
77
78
// Return pending operations counter value only.
79
758k
uint64_t RWOperationCounter::GetOpCounter() const {
80
758k
  return Get() & kOpCounterMask;
81
758k
}
82
83
561M
uint64_t RWOperationCounter::Update(uint64_t delta) {
84
561M
  uint64_t result = counters_.fetch_add(delta, std::memory_order::memory_order_acq_rel) + delta;
85
18.4E
  VLOG(2) << "[" << this << "] Update(" << static_cast<int64_t>(delta) << "), result = " << result;
86
  // Ensure that there is no underflow in either counter.
87
561M
  DCHECK_EQ((result & (kStopDelta >> 1u)), 0); // Counter of DisableAndWaitForOps() calls.
88
561M
  DCHECK_EQ((result & (kDisabledDelta >> 1u)), 0); // Counter of pending operations.
89
561M
  return result;
90
561M
}
91
92
19.2k
bool RWOperationCounter::WaitMutexAndIncrement(CoarseTimePoint deadline) {
93
19.2k
  if (deadline == CoarseTimePoint()) {
94
19.2k
    deadline = CoarseMonoClock::now() + 10ms * kTimeMultiplier;
95
19.2k
  } else 
if (31
deadline == CoarseTimePoint::min()31
) {
96
6
    return false;
97
6
  }
98
19.2k
  
for (;;)19.2k
{
99
19.2k
    std::unique_lock<decltype(disable_)> lock(disable_, deadline);
100
19.2k
    if (!lock.owns_lock()) {
101
17.6k
      return false;
102
17.6k
    }
103
104
1.62k
    if (Increment()) {
105
1.57k
      return true;
106
1.57k
    }
107
108
52
    if (counters_.load(std::memory_order_acquire) & kStopDelta) {
109
45
      return false;
110
45
    }
111
52
  }
112
19.2k
}
113
114
593k
void RWOperationCounter::Enable(Unlock unlock, Stop was_stop) {
115
593k
  Update(-(was_stop ? 
kStopDelta150k
:
kDisabledDelta442k
));
116
593k
  if (
unlock593k
) {
117
593k
    UnlockExclusiveOpMutex();
118
593k
  }
119
593k
}
120
121
280M
bool RWOperationCounter::Increment() {
122
280M
  if (Update(1) & kDisabledCounterMask) {
123
19.3k
    Update(-1);
124
19.3k
    return false;
125
19.3k
  }
126
127
280M
  return true;
128
280M
}
129
130
754k
Status RWOperationCounter::DisableAndWaitForOps(const CoarseTimePoint& deadline, Stop stop) {
131
754k
  LongOperationTracker long_operation_tracker(__func__, 1s);
132
133
754k
  const auto start_time = CoarseMonoClock::now();
134
754k
  std::unique_lock<decltype(disable_)> lock(disable_, deadline);
135
754k
  if (!lock.owns_lock()) {
136
0
    return STATUS(TimedOut, "Timed out waiting to disable the resource exclusively");
137
0
  }
138
139
754k
  Update(stop ? 
kStopDelta312k
:
kDisabledDelta441k
);
140
754k
  auto status = WaitForOpsToFinish(start_time, deadline);
141
754k
  if (!status.ok()) {
142
0
    Enable(Unlock::kFalse, stop);
143
0
    return status;
144
0
  }
145
146
754k
  lock.release();
147
754k
  return Status::OK();
148
754k
}
149
150
// The implementation is based on OperationTracker::WaitForAllToFinish.
151
Status RWOperationCounter::WaitForOpsToFinish(
152
755k
    const CoarseTimePoint& start_time, const CoarseTimePoint& deadline) {
153
755k
  const auto complain_interval = 1s;
154
755k
  int64_t num_pending_ops = 0;
155
755k
  int num_complaints = 0;
156
755k
  auto wait_time = 250us;
157
158
758k
  while ((num_pending_ops = GetOpCounter()) > 0) {
159
2.78k
    auto now = CoarseMonoClock::now();
160
2.78k
    auto waited_time = now - start_time;
161
2.78k
    if (now > deadline) {
162
0
      return STATUS_FORMAT(
163
0
          TimedOut,
164
0
          "Timed out waiting for all pending operations to complete. "
165
0
              "$0 transactions pending. Waited for $1",
166
0
          num_pending_ops, waited_time);
167
0
    }
168
2.78k
    if (waited_time > num_complaints * complain_interval) {
169
271
      LOG(WARNING) << Format("Waiting for $0 pending operations to complete now for $1",
170
271
                             num_pending_ops, waited_time);
171
271
      num_complaints++;
172
271
    }
173
2.78k
    std::this_thread::sleep_until(std::min(deadline, now + wait_time));
174
2.78k
    wait_time = std::min(wait_time * 5 / 4, 1000000us);
175
2.78k
  }
176
177
755k
  return Status::OK();
178
755k
}
179
180
ScopedRWOperation::ScopedRWOperation(RWOperationCounter* counter, const CoarseTimePoint& deadline)
181
    : data_{counter, counter ? counter->resource_name() : ""
182
#ifndef NDEBUG
183
            , counter ? LongOperationTracker("ScopedRWOperation", 1s) : LongOperationTracker()
184
#endif
185
307M
      } {
186
307M
  if (counter != nullptr) {
187
    // The race condition between IsReady() and Increment() is OK, because we are checking if
188
    // anyone has started an exclusive operation since we did the increment, and don't proceed
189
    // with this shared-ownership operation in that case.
190
280M
    VTRACE(1, "$0 $1", __func__, resource_name());
191
280M
    if (!counter->Increment() && 
!counter->WaitMutexAndIncrement(deadline)19.2k
) {
192
17.7k
      data_.counter_ = nullptr;
193
17.7k
    }
194
280M
  }
195
307M
}
196
197
310M
ScopedRWOperation::~ScopedRWOperation() {
198
310M
  Reset();
199
310M
}
200
201
320M
void ScopedRWOperation::Reset() {
202
320M
  VTRACE(1, "$0 $1", __func__, resource_name());
203
320M
  if (data_.counter_ != nullptr) {
204
280M
    data_.counter_->Decrement();
205
280M
    data_.counter_ = nullptr;
206
280M
  }
207
320M
  data_.resource_name_ = "";
208
320M
}
209
210
ScopedRWOperationPause::ScopedRWOperationPause(
211
744k
    RWOperationCounter* counter, const CoarseTimePoint& deadline, Stop stop) {
212
744k
  VTRACE(1, "$0 $1", __func__, resource_name());
213
745k
  if (
counter != nullptr744k
) {
214
745k
    data_.status_ = counter->DisableAndWaitForOps(deadline, stop);
215
745k
    if (
data_.status_.ok()745k
) {
216
745k
      data_.counter_ = counter;
217
745k
    }
218
745k
  }
219
744k
  data_.was_stop_ = stop;
220
744k
}
221
222
4.22M
ScopedRWOperationPause::~ScopedRWOperationPause() {
223
4.22M
  VTRACE(1, "$0 $1", __func__, resource_name());
224
4.22M
  Reset();
225
4.22M
}
226
227
5.89M
void ScopedRWOperationPause::Reset() {
228
5.89M
  if (data_.counter_ != nullptr) {
229
593k
    data_.counter_->Enable(Unlock(data_.status_.ok()), data_.was_stop_);
230
    // Prevent from the destructor calling Enable again.
231
593k
    data_.counter_ = nullptr;
232
593k
  }
233
5.89M
}
234
235
151k
void ScopedRWOperationPause::ReleaseMutexButKeepDisabled() {
236
151k
  CHECK_OK(data_.status_);
237
151k
  CHECK_NOTNULL(data_.counter_);
238
151k
  CHECK(data_.was_stop_);
239
151k
  data_.counter_->UnlockExclusiveOpMutex();
240
  // Make sure the destructor has no effect when it runs.
241
151k
  data_.counter_ = nullptr;
242
151k
}
243
244
16.6k
Status MoveStatus(const ScopedRWOperation& scoped) {
245
16.6k
  return scoped.ok() ? 
Status::OK()0
246
16.6k
                     : STATUS_FORMAT(TryAgain, "Resource unavailable: $0", scoped.resource_name());
247
16.6k
}
248
249
}  // namespace yb