YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/poller.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/poller.h"
15
16
#include "yb/rpc/scheduler.h"
17
18
#include "yb/util/logging.h"
19
20
using namespace std::placeholders;
21
22
namespace yb {
23
namespace rpc {
24
25
Poller::Poller(const std::string& log_prefix, std::function<void()> callback)
26
    : log_prefix_(log_prefix), callback_(callback),
27
79.5k
      poll_task_id_(rpc::kUninitializedScheduledTaskId) {
28
79.5k
}
29
30
73.6k
void Poller::Start(Scheduler* scheduler, MonoDelta interval) {
31
73.6k
  scheduler_ = scheduler;
32
73.6k
  interval_ = interval;
33
34
73.6k
  std::lock_guard<std::mutex> lock(mutex_);
35
73.6k
  if (!closing_) {
36
73.6k
    Schedule();
37
73.6k
  }
38
73.6k
}
39
40
21.5k
void Poller::Shutdown() NO_THREAD_SAFETY_ANALYSIS {
41
21.5k
  std::unique_lock<std::mutex> lock(mutex_);
42
21.5k
  if (!closing_) {
43
21.4k
    closing_ = true;
44
21.4k
    if (scheduler_ == nullptr) {
45
      // Never started
46
99
      return;
47
99
    }
48
21.3k
    if (poll_task_id_ != rpc::kUninitializedScheduledTaskId) {
49
19.6k
      scheduler_->Abort(poll_task_id_);
50
19.6k
    }
51
21.3k
  }
52
41.1k
  cond_.wait(lock, [this]() NO_THREAD_SAFETY_ANALYSIS {
53
41.1k
    return poll_task_id_ == rpc::kUninitializedScheduledTaskId;
54
41.1k
  });
55
21.4k
}
56
57
2.37M
void Poller::Schedule() {
58
2.37M
  poll_task_id_ = scheduler_->Schedule(
59
2.37M
      std::bind(&Poller::Poll, this, _1), interval_.ToSteadyDuration());
60
2.37M
}
61
62
2.39M
void Poller::Poll(const Status& status) {
63
2.39M
  {
64
2.39M
    std::lock_guard<std::mutex> lock(mutex_);
65
2.39M
    if (!status.ok() || closing_) {
66
21.3k
      LOG_WITH_PREFIX(INFO) << "Poll stopped: " << status;
67
21.3k
      poll_task_id_ = rpc::kUninitializedScheduledTaskId;
68
21.3k
      cond_.notify_one();
69
21.3k
      return;
70
21.3k
    }
71
2.37M
  }
72
73
2.37M
  callback_();
74
75
2.37M
  {
76
2.37M
    std::lock_guard<std::mutex> lock(mutex_);
77
2.37M
    if (!closing_) {
78
2.31M
      Schedule();
79
60.1k
    } else {
80
60.1k
      poll_task_id_ = rpc::kUninitializedScheduledTaskId;
81
60.1k
    }
82
2.37M
    if (poll_task_id_ == rpc::kUninitializedScheduledTaskId) {
83
0
      cond_.notify_one();
84
0
    }
85
2.37M
  }
86
2.37M
}
87
88
} // namespace rpc
89
} // namespace yb