YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/async_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/util/async_util.h"
14
15
#include "yb/gutil/bind.h"
16
17
#include "yb/util/logging.h" // Required in NDEBUG mode
18
#include "yb/util/status_log.h"
19
20
namespace yb {
21
22
namespace {
23
24
0
void CallStatusCBMaybe(std::weak_ptr<Synchronizer> weak_sync, const Status& status) {
25
0
  auto sync = weak_sync.lock();
26
0
  if (sync) {
27
0
    sync->StatusCB(status);
28
0
  }
29
0
}
30
31
} // anonymous namespace
32
33
1.86M
Synchronizer::~Synchronizer() {
34
1.86M
  EnsureWaitDone();
35
1.86M
}
36
37
1.86M
void Synchronizer::StatusCB(const Status& status) {
38
1.86M
  std::lock_guard<std::mutex> lock(mutex_);
39
1.86M
  if (!assigned_) {
40
1.86M
    assigned_ = true;
41
1.86M
    status_ = status;
42
1.86M
    cond_.notify_all();
43
216
  } else {
44
216
    LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status;
45
216
  }
46
1.86M
}
47
48
854k
StatusCallback Synchronizer::AsStatusCallback() {
49
854k
  DCHECK(!assigned_);
50
51
  // Cannot destroy the synchronizer without calling Wait().
52
854k
  must_wait_ = true;
53
854k
  return Bind(&Synchronizer::StatusCB, Unretained(this));
54
854k
}
55
56
236k
StdStatusCallback Synchronizer::AsStdStatusCallback() {
57
236k
  DCHECK(!assigned_);
58
59
  // Cannot destroy the synchronizer without calling Wait().
60
236k
  must_wait_ = true;
61
236k
  return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
62
236k
}
63
64
0
StatusCallback Synchronizer::AsStatusCallback(const std::shared_ptr<Synchronizer>& synchronizer) {
65
0
  DCHECK(!synchronizer->assigned_);
66
  // No need to set must_wait_ here -- the callback knows whether Synchronizer still exists.
67
0
  std::weak_ptr<Synchronizer> weak_sync(synchronizer);
68
0
  return Bind(CallStatusCBMaybe, weak_sync);
69
0
}
70
71
1.86M
Status Synchronizer::WaitUntil(const std::chrono::steady_clock::time_point& time) {
72
1.86M
  std::unique_lock<std::mutex> lock(mutex_);
73
3.72M
  auto predicate = [this] { return assigned_; };
74
1.86M
  if (time == std::chrono::steady_clock::time_point::max()) {
75
1.85M
    cond_.wait(lock, predicate);
76
6.06k
  } else if (!cond_.wait_until(lock, time, predicate)) {
77
1
    return STATUS(TimedOut, "Timed out while waiting for the callback to be called.");
78
1
  }
79
80
  // The callback that keep a pointer to this potentially stack-allocated synchronizer has been
81
  // called, assuming there was only one such callback. OK for the synchronizer to go out of
82
  // scope.
83
1.86M
  must_wait_ = false;
84
85
1.86M
  return status_;
86
1.86M
}
87
88
0
void Synchronizer::Reset() {
89
0
  std::lock_guard<std::mutex> lock(mutex_);
90
0
  EnsureWaitDone();
91
0
  assigned_ = false;
92
0
  status_ = Status::OK();
93
0
  must_wait_ = false;
94
0
}
95
96
1.86M
void Synchronizer::EnsureWaitDone() {
97
1.86M
  if (must_wait_) {
98
0
    static const char* kErrorMsg =
99
0
        "Synchronizer went out of scope, Wait() has returned success, callbacks may "
100
0
        "access invalid memory!";
101
102
0
#ifndef NDEBUG
103
0
    LOG(FATAL) << kErrorMsg;
104
#else
105
    const int kWaitSec = 10;
106
    YB_LOG_EVERY_N_SECS(ERROR, 1) << kErrorMsg << " Waiting up to " << kWaitSec << " seconds";
107
    CHECK_OK(WaitFor(MonoDelta::FromSeconds(kWaitSec)));
108
#endif
109
0
  }
110
1.86M
}
111
112
}  // namespace yb