/Users/deen/code/yugabyte-db/src/yb/util/delayer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/util/delayer.h" |
15 | | |
16 | | #include <vector> |
17 | | |
18 | | #include <cds/init.h> |
19 | | |
20 | | #include "yb/util/scope_exit.h" |
21 | | |
22 | | namespace yb { |
23 | | |
24 | 0 | void Delayer::Delay(MonoTime when, std::function<void()> action) { |
25 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
26 | 0 | if (!thread_.joinable()) { |
27 | 0 | thread_ = std::thread([this] { |
28 | 0 | Execute(); |
29 | 0 | }); |
30 | 0 | } |
31 | 0 | queue_.emplace_back(when, std::move(action)); |
32 | 0 | cond_.notify_one(); |
33 | 0 | } |
34 | | |
35 | 44.6k | Delayer::~Delayer() { |
36 | 44.6k | { |
37 | 44.6k | std::lock_guard<std::mutex> lock(mutex_); |
38 | 44.6k | if (!thread_.joinable()) { |
39 | 44.6k | return; |
40 | 44.6k | } |
41 | 51 | stop_ = true; |
42 | 51 | cond_.notify_one(); |
43 | 51 | } |
44 | 0 | thread_.join(); |
45 | 51 | } |
46 | | |
47 | 0 | void Delayer::Execute() { |
48 | 0 | cds::threading::Manager::attachThread(); |
49 | |
|
50 | 0 | std::vector<std::function<void()>> actions; |
51 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
52 | 0 | while (!stop_) { |
53 | 0 | if (!queue_.empty()) { |
54 | 0 | auto now = MonoTime::Now(); |
55 | 0 | auto it = queue_.begin(); |
56 | 0 | while (it != queue_.end() && it->first <= now) { |
57 | 0 | actions.push_back(std::move(it->second)); |
58 | 0 | ++it; |
59 | 0 | } |
60 | 0 | if (it != queue_.begin()) { |
61 | 0 | queue_.erase(queue_.begin(), it); |
62 | 0 | lock.unlock(); |
63 | 0 | auto se = ScopeExit([&lock, &actions] { |
64 | 0 | actions.clear(); |
65 | 0 | lock.lock(); |
66 | 0 | }); |
67 | 0 | for (auto& action : actions) { |
68 | 0 | action(); |
69 | 0 | } |
70 | 0 | } else { |
71 | 0 | cond_.wait_until(lock, queue_.front().first.ToSteadyTimePoint()); |
72 | 0 | } |
73 | 0 | } else { |
74 | 0 | cond_.wait(lock); |
75 | 0 | } |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | | } // namespace yb |