YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/delayer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/util/delayer.h"
15
16
#include <vector>
17
18
#include <cds/init.h>
19
20
#include "yb/util/scope_exit.h"
21
22
namespace yb {
23
24
0
void Delayer::Delay(MonoTime when, std::function<void()> action) {
25
0
  std::lock_guard<std::mutex> lock(mutex_);
26
0
  if (!thread_.joinable()) {
27
0
    thread_ = std::thread([this] {
28
0
      Execute();
29
0
    });
30
0
  }
31
0
  queue_.emplace_back(when, std::move(action));
32
0
  cond_.notify_one();
33
0
}
34
35
44.6k
Delayer::~Delayer() {
36
44.6k
  {
37
44.6k
    std::lock_guard<std::mutex> lock(mutex_);
38
44.6k
    if (!thread_.joinable()) {
39
44.6k
      return;
40
44.6k
    }
41
51
    stop_ = true;
42
51
    cond_.notify_one();
43
51
  }
44
0
  thread_.join();
45
51
}
46
47
0
void Delayer::Execute() {
48
0
  cds::threading::Manager::attachThread();
49
50
0
  std::vector<std::function<void()>> actions;
51
0
  std::unique_lock<std::mutex> lock(mutex_);
52
0
  while (!stop_) {
53
0
    if (!queue_.empty()) {
54
0
      auto now = MonoTime::Now();
55
0
      auto it = queue_.begin();
56
0
      while (it != queue_.end() && it->first <= now) {
57
0
        actions.push_back(std::move(it->second));
58
0
        ++it;
59
0
      }
60
0
      if (it != queue_.begin()) {
61
0
        queue_.erase(queue_.begin(), it);
62
0
        lock.unlock();
63
0
        auto se = ScopeExit([&lock, &actions] {
64
0
          actions.clear();
65
0
          lock.lock();
66
0
        });
67
0
        for (auto& action : actions) {
68
0
          action();
69
0
        }
70
0
      } else {
71
0
        cond_.wait_until(lock, queue_.front().first.ToSteadyTimePoint());
72
0
      }
73
0
    } else {
74
0
      cond_.wait(lock);
75
0
    }
76
0
  }
77
0
}
78
79
} // namespace yb