YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/async_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/util/async_util.h"
14
15
#include "yb/gutil/bind.h"
16
17
#include "yb/util/logging.h" // Required in NDEBUG mode
18
#include "yb/util/status_log.h"
19
20
namespace yb {
21
22
namespace {
23
24
0
void CallStatusCBMaybe(std::weak_ptr<Synchronizer> weak_sync, const Status& status) {
25
0
  auto sync = weak_sync.lock();
26
0
  if (sync) {
27
0
    sync->StatusCB(status);
28
0
  }
29
0
}
30
31
} // anonymous namespace
32
33
2.63M
Synchronizer::~Synchronizer() {
34
2.63M
  EnsureWaitDone();
35
2.63M
}
36
37
2.63M
void Synchronizer::StatusCB(const Status& status) {
38
2.63M
  std::lock_guard<std::mutex> lock(mutex_);
39
2.63M
  if (
!assigned_2.63M
) {
40
2.63M
    assigned_ = true;
41
2.63M
    status_ = status;
42
2.63M
    cond_.notify_all();
43
18.4E
  } else {
44
18.4E
    LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status;
45
18.4E
  }
46
2.63M
}
47
48
907k
StatusCallback Synchronizer::AsStatusCallback() {
49
907k
  DCHECK(!assigned_);
50
51
  // Cannot destroy the synchronizer without calling Wait().
52
907k
  must_wait_ = true;
53
907k
  return Bind(&Synchronizer::StatusCB, Unretained(this));
54
907k
}
55
56
476k
StdStatusCallback Synchronizer::AsStdStatusCallback() {
57
476k
  DCHECK(!assigned_);
58
59
  // Cannot destroy the synchronizer without calling Wait().
60
476k
  must_wait_ = true;
61
476k
  return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
62
476k
}
63
64
0
StatusCallback Synchronizer::AsStatusCallback(const std::shared_ptr<Synchronizer>& synchronizer) {
65
0
  DCHECK(!synchronizer->assigned_);
66
  // No need to set must_wait_ here -- the callback knows whether Synchronizer still exists.
67
0
  std::weak_ptr<Synchronizer> weak_sync(synchronizer);
68
0
  return Bind(CallStatusCBMaybe, weak_sync);
69
0
}
70
71
2.64M
Status Synchronizer::WaitUntil(const std::chrono::steady_clock::time_point& time) {
72
2.64M
  std::unique_lock<std::mutex> lock(mutex_);
73
5.27M
  auto predicate = [this] { return assigned_; };
74
2.64M
  if (time == std::chrono::steady_clock::time_point::max()) {
75
2.22M
    cond_.wait(lock, predicate);
76
2.22M
  } else 
if (418k
!cond_.wait_until(lock, time, predicate)418k
) {
77
372
    return STATUS(TimedOut, "Timed out while waiting for the callback to be called.");
78
372
  }
79
80
  // The callback that keep a pointer to this potentially stack-allocated synchronizer has been
81
  // called, assuming there was only one such callback. OK for the synchronizer to go out of
82
  // scope.
83
2.64M
  must_wait_ = false;
84
85
2.64M
  return status_;
86
2.64M
}
87
88
0
void Synchronizer::Reset() {
89
0
  std::lock_guard<std::mutex> lock(mutex_);
90
0
  EnsureWaitDone();
91
0
  assigned_ = false;
92
0
  status_ = Status::OK();
93
0
  must_wait_ = false;
94
0
}
95
96
2.63M
void Synchronizer::EnsureWaitDone() {
97
2.63M
  if (must_wait_) {
98
0
    static const char* kErrorMsg =
99
0
        "Synchronizer went out of scope, Wait() has returned success, callbacks may "
100
0
        "access invalid memory!";
101
102
0
#ifndef NDEBUG
103
0
    LOG(FATAL) << kErrorMsg;
104
#else
105
    const int kWaitSec = 10;
106
    YB_LOG_EVERY_N_SECS(ERROR, 1) << kErrorMsg << " Waiting up to " << kWaitSec << " seconds";
107
    CHECK_OK(WaitFor(MonoDelta::FromSeconds(kWaitSec)));
108
#endif
109
0
  }
110
2.63M
}
111
112
}  // namespace yb