YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/async_util.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
// Utility functions which are handy when doing async/callback-based programming.
33
#ifndef YB_UTIL_ASYNC_UTIL_H
34
#define YB_UTIL_ASYNC_UTIL_H
35
36
#include <pthread.h>
37
38
#include <future>
39
40
#include <boost/function.hpp>
41
42
#include "yb/gutil/macros.h"
43
44
#include "yb/util/monotime.h"
45
#include "yb/util/status.h"
46
#include "yb/util/status_callback.h"
47
48
namespace yb {
49
50
typedef boost::function<void(const Status&)> StatusFunctor;
51
52
// Simple class which can be used to make async methods synchronous.
53
// For example:
54
//   Synchronizer s;
55
//   SomeAsyncMethod(s.callback());
56
//   CHECK_OK(s.Wait());
57
class Synchronizer {
58
 public:
59
  Synchronizer(const Synchronizer&) = delete;
60
  void operator=(const Synchronizer&) = delete;
61
62
2.64M
  Synchronizer() {}
63
  ~Synchronizer();
64
65
  void StatusCB(const Status& status);
66
67
  // Use this for synchronizers declared on the stack. The callback does not take a reference to
68
  // its synchronizer, so the returned callback _must_ go out of scope before its synchronizer.
69
  StatusCallback AsStatusCallback();
70
71
  // Same semantics as AsStatusCallback.
72
  StdStatusCallback AsStdStatusCallback();
73
74
  // This version of AsStatusCallback is for cases when the callback can outlive the synchronizer.
75
  // The callback holds a weak pointer to the synchronizer.
76
  static StatusCallback AsStatusCallback(const std::shared_ptr<Synchronizer>& synchronizer);
77
78
0
  StatusFunctor AsStatusFunctor() {
79
0
    return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
80
0
  }
81
82
2.22M
  CHECKED_STATUS Wait() {
83
2.22M
    return WaitUntil(std::chrono::steady_clock::time_point::max());
84
2.22M
  }
85
86
418k
  CHECKED_STATUS WaitFor(const MonoDelta& delta) {
87
418k
    return WaitUntil(std::chrono::steady_clock::now() + delta.ToSteadyDuration());
88
418k
  }
89
90
  CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time);
91
92
  void Reset();
93
94
 private:
95
96
  // Invoked in the destructor and in Reset() to make sure Wait() was invoked if it had to be.
97
  void EnsureWaitDone();
98
99
  std::mutex mutex_;
100
  std::condition_variable cond_;
101
  bool assigned_ = false;
102
103
  // If we've created a callback and given it out to an asynchronous operation, we must call Wait()
104
  // on the synchronizer before destroying it. Not doing any locking around this variable because
105
  // Wait() is supposed to be called on the same thread as AsStatusCallback(), or with adequate
106
  // synchronization after that. Most frequently Wait() is called right after creating the
107
  // synchronizer.
108
  bool must_wait_ = false;
109
110
  Status status_;
111
};
112
113
// Functor is any functor that accepts callback as only argument.
114
template <class Result, class Functor>
115
392k
std::future<Result> MakeFuture(const Functor& functor) {
116
392k
  auto promise = std::make_shared<std::promise<Result>>();
117
392k
  auto future = promise->get_future();
118
393k
  functor([promise](Result result) {
119
393k
    promise->set_value(std::move(result));
120
393k
  });
client.cc:std::__1::future<yb::Result<std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> > > yb::MakeFuture<yb::Result<std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> >, yb::client::(anonymous namespace)::FetchPartitionsFuture(yb::client::YBClient*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_5>(yb::client::(anonymous namespace)::FetchPartitionsFuture(yb::client::YBClient*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_5 const&)::'lambda'(yb::Result<std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> >)::operator()(yb::Result<std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> >) const
Line
Count
Source
118
134k
  functor([promise](Result result) {
119
134k
    promise->set_value(std::move(result));
120
134k
  });
Unexecuted instantiation: client.cc:std::__1::future<yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > > > yb::MakeFuture<yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > >, yb::client::YBClient::LookupAllTabletsFuture(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_4>(yb::client::YBClient::LookupAllTabletsFuture(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_4 const&)::'lambda'(yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > >)::operator()(yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > >) const
Unexecuted instantiation: meta_cache.cc:std::__1::future<yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > > yb::MakeFuture<yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> >, yb::client::internal::MetaCache::LookupTabletByKeyFuture(std::__1::shared_ptr<yb::client::YBTable> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_5>(yb::client::internal::MetaCache::LookupTabletByKeyFuture(std::__1::shared_ptr<yb::client::YBTable> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_5 const&)::'lambda'(yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> >)::operator()(yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> >) const
transaction.cc:std::__1::future<yb::Status> yb::MakeFuture<yb::Status, yb::client::YBTransaction::CommitFuture(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::SealOnly_Tag>)::$_0>(yb::client::YBTransaction::CommitFuture(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::SealOnly_Tag>)::$_0 const&)::'lambda'(yb::Status)::operator()(yb::Status) const
Line
Count
Source
118
191k
  functor([promise](Result result) {
119
191k
    promise->set_value(std::move(result));
120
191k
  });
transaction.cc:std::__1::future<yb::Result<yb::ChildTransactionDataPB> > yb::MakeFuture<yb::Result<yb::ChildTransactionDataPB>, yb::client::YBTransaction::PrepareChildFuture(yb::StronglyTypedBool<yb::client::ForceConsistentRead_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_1>(yb::client::YBTransaction::PrepareChildFuture(yb::StronglyTypedBool<yb::client::ForceConsistentRead_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_1 const&)::'lambda'(yb::Result<yb::ChildTransactionDataPB>)::operator()(yb::Result<yb::ChildTransactionDataPB>) const
Line
Count
Source
118
66.6k
  functor([promise](Result result) {
119
66.6k
    promise->set_value(std::move(result));
120
66.6k
  });
121
392k
  return future;
122
392k
}
client.cc:std::__1::future<yb::Result<std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> > > yb::MakeFuture<yb::Result<std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> >, yb::client::(anonymous namespace)::FetchPartitionsFuture(yb::client::YBClient*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_5>(yb::client::(anonymous namespace)::FetchPartitionsFuture(yb::client::YBClient*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_5 const&)
Line
Count
Source
115
134k
std::future<Result> MakeFuture(const Functor& functor) {
116
134k
  auto promise = std::make_shared<std::promise<Result>>();
117
134k
  auto future = promise->get_future();
118
134k
  functor([promise](Result result) {
119
134k
    promise->set_value(std::move(result));
120
134k
  });
121
134k
  return future;
122
134k
}
Unexecuted instantiation: client.cc:std::__1::future<yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > > > yb::MakeFuture<yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > >, yb::client::YBClient::LookupAllTabletsFuture(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_4>(yb::client::YBClient::LookupAllTabletsFuture(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_4 const&)
Unexecuted instantiation: meta_cache.cc:std::__1::future<yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > > yb::MakeFuture<yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> >, yb::client::internal::MetaCache::LookupTabletByKeyFuture(std::__1::shared_ptr<yb::client::YBTable> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_5>(yb::client::internal::MetaCache::LookupTabletByKeyFuture(std::__1::shared_ptr<yb::client::YBTable> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_5 const&)
transaction.cc:std::__1::future<yb::Status> yb::MakeFuture<yb::Status, yb::client::YBTransaction::CommitFuture(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::SealOnly_Tag>)::$_0>(yb::client::YBTransaction::CommitFuture(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::SealOnly_Tag>)::$_0 const&)
Line
Count
Source
115
191k
std::future<Result> MakeFuture(const Functor& functor) {
116
191k
  auto promise = std::make_shared<std::promise<Result>>();
117
191k
  auto future = promise->get_future();
118
191k
  functor([promise](Result result) {
119
191k
    promise->set_value(std::move(result));
120
191k
  });
121
191k
  return future;
122
191k
}
transaction.cc:std::__1::future<yb::Result<yb::ChildTransactionDataPB> > yb::MakeFuture<yb::Result<yb::ChildTransactionDataPB>, yb::client::YBTransaction::PrepareChildFuture(yb::StronglyTypedBool<yb::client::ForceConsistentRead_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_1>(yb::client::YBTransaction::PrepareChildFuture(yb::StronglyTypedBool<yb::client::ForceConsistentRead_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)::$_1 const&)
Line
Count
Source
115
66.5k
std::future<Result> MakeFuture(const Functor& functor) {
116
66.5k
  auto promise = std::make_shared<std::promise<Result>>();
117
66.5k
  auto future = promise->get_future();
118
66.5k
  functor([promise](Result result) {
119
66.5k
    promise->set_value(std::move(result));
120
66.5k
  });
121
66.5k
  return future;
122
66.5k
}
123
124
template <class T>
125
bool IsReady(const std::shared_future<T>& f) {
126
  return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
127
}
128
129
template <class T>
130
bool IsReady(const std::future<T>& f) {
131
  return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
132
}
133
134
} // namespace yb
135
#endif /* YB_UTIL_ASYNC_UTIL_H */