YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/poller.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/poller.h"
15
16
#include "yb/rpc/scheduler.h"
17
18
#include "yb/util/logging.h"
19
20
using namespace std::placeholders;
21
22
namespace yb {
23
namespace rpc {
24
25
Poller::Poller(const std::string& log_prefix, std::function<void()> callback)
26
    : log_prefix_(log_prefix), callback_(callback),
27
145k
      poll_task_id_(rpc::kUninitializedScheduledTaskId) {
28
145k
}
29
30
136k
void Poller::Start(Scheduler* scheduler, MonoDelta interval) {
31
136k
  scheduler_ = scheduler;
32
136k
  interval_ = interval;
33
34
136k
  std::lock_guard<std::mutex> lock(mutex_);
35
136k
  if (!closing_) {
36
136k
    Schedule();
37
136k
  }
38
136k
}
39
40
52.4k
void Poller::Shutdown() NO_THREAD_SAFETY_ANALYSIS {
41
52.4k
  std::unique_lock<std::mutex> lock(mutex_);
42
52.4k
  if (!closing_) {
43
51.9k
    closing_ = true;
44
51.9k
    if (scheduler_ == nullptr) {
45
      // Never started
46
397
      return;
47
397
    }
48
51.5k
    if (poll_task_id_ != rpc::kUninitializedScheduledTaskId) {
49
45.4k
      scheduler_->Abort(poll_task_id_);
50
45.4k
    }
51
51.5k
  }
52
97.4k
  
cond_.wait(lock, [this]() NO_THREAD_SAFETY_ANALYSIS 52.0k
{
53
97.4k
    return poll_task_id_ == rpc::kUninitializedScheduledTaskId;
54
97.4k
  });
55
52.0k
}
56
57
16.4M
void Poller::Schedule() {
58
16.4M
  poll_task_id_ = scheduler_->Schedule(
59
16.4M
      std::bind(&Poller::Poll, this, _1), interval_.ToSteadyDuration());
60
16.4M
}
61
62
16.7M
void Poller::Poll(const Status& status) {
63
16.7M
  {
64
16.7M
    std::lock_guard<std::mutex> lock(mutex_);
65
16.7M
    if (!status.ok() || 
closing_16.6M
) {
66
51.5k
      LOG_WITH_PREFIX(INFO) << "Poll stopped: " << status;
67
51.5k
      poll_task_id_ = rpc::kUninitializedScheduledTaskId;
68
51.5k
      cond_.notify_one();
69
51.5k
      return;
70
51.5k
    }
71
16.7M
  }
72
73
16.6M
  callback_();
74
75
16.6M
  {
76
16.6M
    std::lock_guard<std::mutex> lock(mutex_);
77
16.6M
    if (!closing_) {
78
16.4M
      Schedule();
79
16.4M
    } else {
80
258k
      poll_task_id_ = rpc::kUninitializedScheduledTaskId;
81
258k
    }
82
16.6M
    if (poll_task_id_ == rpc::kUninitializedScheduledTaskId) {
83
0
      cond_.notify_one();
84
0
    }
85
16.6M
  }
86
16.6M
}
87
88
} // namespace rpc
89
} // namespace yb