/Users/deen/code/yugabyte-db/src/yb/rpc/poller.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/poller.h" |
15 | | |
16 | | #include "yb/rpc/scheduler.h" |
17 | | |
18 | | #include "yb/util/logging.h" |
19 | | |
20 | | using namespace std::placeholders; |
21 | | |
22 | | namespace yb { |
23 | | namespace rpc { |
24 | | |
25 | | Poller::Poller(const std::string& log_prefix, std::function<void()> callback) |
26 | | : log_prefix_(log_prefix), callback_(callback), |
27 | 79.5k | poll_task_id_(rpc::kUninitializedScheduledTaskId) { |
28 | 79.5k | } |
29 | | |
30 | 73.6k | void Poller::Start(Scheduler* scheduler, MonoDelta interval) { |
31 | 73.6k | scheduler_ = scheduler; |
32 | 73.6k | interval_ = interval; |
33 | | |
34 | 73.6k | std::lock_guard<std::mutex> lock(mutex_); |
35 | 73.6k | if (!closing_) { |
36 | 73.6k | Schedule(); |
37 | 73.6k | } |
38 | 73.6k | } |
39 | | |
40 | 21.5k | void Poller::Shutdown() NO_THREAD_SAFETY_ANALYSIS { |
41 | 21.5k | std::unique_lock<std::mutex> lock(mutex_); |
42 | 21.5k | if (!closing_) { |
43 | 21.4k | closing_ = true; |
44 | 21.4k | if (scheduler_ == nullptr) { |
45 | | // Never started |
46 | 99 | return; |
47 | 99 | } |
48 | 21.3k | if (poll_task_id_ != rpc::kUninitializedScheduledTaskId) { |
49 | 19.6k | scheduler_->Abort(poll_task_id_); |
50 | 19.6k | } |
51 | 21.3k | } |
52 | 41.1k | cond_.wait(lock, [this]() NO_THREAD_SAFETY_ANALYSIS { |
53 | 41.1k | return poll_task_id_ == rpc::kUninitializedScheduledTaskId; |
54 | 41.1k | }); |
55 | 21.4k | } |
56 | | |
57 | 2.37M | void Poller::Schedule() { |
58 | 2.37M | poll_task_id_ = scheduler_->Schedule( |
59 | 2.37M | std::bind(&Poller::Poll, this, _1), interval_.ToSteadyDuration()); |
60 | 2.37M | } |
61 | | |
62 | 2.39M | void Poller::Poll(const Status& status) { |
63 | 2.39M | { |
64 | 2.39M | std::lock_guard<std::mutex> lock(mutex_); |
65 | 2.39M | if (!status.ok() || closing_) { |
66 | 21.3k | LOG_WITH_PREFIX(INFO) << "Poll stopped: " << status; |
67 | 21.3k | poll_task_id_ = rpc::kUninitializedScheduledTaskId; |
68 | 21.3k | cond_.notify_one(); |
69 | 21.3k | return; |
70 | 21.3k | } |
71 | 2.37M | } |
72 | | |
73 | 2.37M | callback_(); |
74 | | |
75 | 2.37M | { |
76 | 2.37M | std::lock_guard<std::mutex> lock(mutex_); |
77 | 2.37M | if (!closing_) { |
78 | 2.31M | Schedule(); |
79 | 60.1k | } else { |
80 | 60.1k | poll_task_id_ = rpc::kUninitializedScheduledTaskId; |
81 | 60.1k | } |
82 | 2.37M | if (poll_task_id_ == rpc::kUninitializedScheduledTaskId) { |
83 | 0 | cond_.notify_one(); |
84 | 0 | } |
85 | 2.37M | } |
86 | 2.37M | } |
87 | | |
88 | | } // namespace rpc |
89 | | } // namespace yb |