YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/periodic.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "yb/rpc/periodic.h"
19
20
#include <algorithm>
21
#include <memory>
22
#include <mutex>
23
24
#include <boost/function.hpp>
25
#include <glog/logging.h>
26
27
#include "yb/rpc/messenger.h"
28
#include "yb/util/monotime.h"
29
#include "yb/util/random.h"
30
#include "yb/util/random_util.h"
31
#include "yb/util/status.h"
32
33
using std::shared_ptr;
34
using std::weak_ptr;
35
36
namespace yb {
37
namespace rpc {
38
39
PeriodicTimer::Options::Options()
40
    : jitter_pct(0.25),
41
273k
      one_shot(false) {
42
273k
}
43
44
shared_ptr<PeriodicTimer> PeriodicTimer::Create(
45
    Messenger* messenger,
46
    RunTaskFunctor functor,
47
    MonoDelta period,
48
273k
    Options options) {
49
273k
  return std::make_shared<PeriodicTimer>(messenger, std::move(functor), period, options);
50
273k
}
51
52
PeriodicTimer::PeriodicTimer(
53
    Messenger* messenger,
54
    RunTaskFunctor functor,
55
    MonoDelta period,
56
    Options options)
57
    : messenger_(messenger),
58
      functor_(std::move(functor)),
59
      period_(period),
60
      options_(std::move(options)),
61
      rng_(GetRandomSeed32()),
62
      current_callback_generation_(0),
63
      num_callbacks_for_tests_(0),
64
273k
      started_(false) {
65
273k
  DCHECK_GE(options_.jitter_pct, 0);
66
273k
  DCHECK_LE(options_.jitter_pct, 1);
67
273k
}
68
69
151k
PeriodicTimer::~PeriodicTimer() {
70
151k
  Stop();
71
151k
}
72
73
284k
void PeriodicTimer::Start(MonoDelta next_task_delta) {
74
284k
  std::unique_lock<simple_spinlock> l(lock_);
75
284k
  if (
!started_284k
) {
76
284k
    started_ = true;
77
284k
    SnoozeUnlocked(next_task_delta);
78
284k
    auto new_callback_generation = ++current_callback_generation_;
79
80
    // Invoke Callback() with the lock released.
81
284k
    l.unlock();
82
284k
    Callback(new_callback_generation);
83
284k
  }
84
284k
}
85
86
364k
void PeriodicTimer::Stop() {
87
364k
  std::lock_guard<simple_spinlock> l(lock_);
88
364k
  StopUnlocked();
89
364k
}
90
91
364k
void PeriodicTimer::StopUnlocked() {
92
364k
  DCHECK(lock_.is_locked());
93
364k
  started_ = false;
94
364k
}
95
96
41.6M
void PeriodicTimer::Snooze(MonoDelta next_task_delta) {
97
41.6M
  std::lock_guard<simple_spinlock> l(lock_);
98
41.6M
  SnoozeUnlocked(next_task_delta);
99
41.6M
}
100
101
53.8M
void PeriodicTimer::SnoozeUnlocked(MonoDelta next_task_delta) {
102
53.8M
  DCHECK(lock_.is_locked());
103
53.8M
  if (!started_) {
104
293
    return;
105
293
  }
106
107
53.8M
  if (!next_task_delta) {
108
    // Given jitter percentage J and period P, this yields a delay somewhere
109
    // between (1-J)*P and (1+J)*P.
110
27.2M
    next_task_delta = MonoDelta::FromMilliseconds(
111
27.2M
        GetMinimumPeriod().ToMilliseconds() +
112
27.2M
        rng_.NextDoubleFraction() *
113
27.2M
        options_.jitter_pct *
114
27.2M
        (2 * period_.ToMilliseconds()));
115
27.2M
  }
116
53.8M
  next_task_time_ = MonoTime::Now() + next_task_delta;
117
53.8M
}
118
119
46.0M
MonoDelta PeriodicTimer::GetMinimumPeriod() {
120
  // Given jitter percentage J and period P, this returns (1-J)*P, which is
121
  // the lowest possible jittered value.
122
46.0M
  return MonoDelta::FromMilliseconds((1.0 - options_.jitter_pct) *
123
46.0M
                                     period_.ToMilliseconds());
124
46.0M
}
125
126
1
int64_t PeriodicTimer::NumCallbacksForTests() const {
127
1
  std::lock_guard<simple_spinlock> l(lock_);
128
1
  return num_callbacks_for_tests_;
129
1
}
130
131
18.7M
void PeriodicTimer::Callback(int64_t my_callback_generation) {
132
  // To simplify the implementation, a timer may have only one outstanding
133
  // callback scheduled at a time. This means that once the callback is
134
  // scheduled, the timer's task cannot run any earlier than whenever the
135
  // callback runs. Thus, the delay used when scheduling the callback dictates
136
  // the lowest possible value of 'next_task_delta' that Snooze() can honor.
137
  //
138
  // If the callback's delay is very low, Snooze() can honor a low
139
  // 'next_task_delta', but the callback will run often and burn more CPU
140
  // cycles. If the delay is very high, the timer will be more efficient but
141
  // the granularity for 'next_task_delta' will rise accordingly.
142
  //
143
  // As a "happy medium" we use GetMinimumPeriod() as the delay. This ensures
144
  // that a no-arg Snooze() on a jittered timer will always be honored, and as
145
  // long as the caller passes a value of at least GetMinimumPeriod() to
146
  // Snooze(), that too will be honored.
147
18.7M
  MonoDelta delay = GetMinimumPeriod();
148
18.7M
  bool run_task = false;
149
18.7M
  {
150
18.7M
    std::lock_guard<simple_spinlock> l(lock_);
151
18.7M
    num_callbacks_for_tests_++;
152
153
    // If the timer was stopped, exit.
154
18.7M
    if (!started_) {
155
52.3k
      return;
156
52.3k
    }
157
158
    // If there's a new callback loop in town, exit.
159
    //
160
    // We could check again just before calling Messenger::ScheduleOnReactor()
161
    // (in case someone else restarted the timer while the functor ran, or in
162
    // case the functor itself restarted the timer), but there's no real reason
163
    // to do so: the very next iteration of this callback loop will wind up here
164
    // and exit.
165
18.7M
    if (current_callback_generation_ > my_callback_generation) {
166
3.11k
      return;
167
3.11k
    }
168
169
18.7M
    MonoTime now = MonoTime::Now();
170
18.7M
    if (now < next_task_time_) {
171
      // It's not yet time to run the task. Reduce the scheduled delay if
172
      // enough time has elapsed, but don't increase it.
173
6.81M
      delay = std::min(delay, next_task_time_ - now);
174
11.9M
    } else {
175
      // It's time to run the task. Although the next task time is reset now,
176
      // it may be reset again by virtue of running the task itself.
177
11.9M
      run_task = true;
178
179
11.9M
      if (options_.one_shot) {
180
        // Stop the timer first, in case the task wants to restart it.
181
11
        StopUnlocked();
182
11
      }
183
11.9M
      SnoozeUnlocked();
184
11.9M
      delay = next_task_time_ - now;
185
11.9M
    }
186
18.7M
  }
187
188
18.7M
  if (run_task) {
189
11.9M
    functor_();
190
191
11.9M
    if (options_.one_shot) {
192
      // The task was run; exit the loop. Even if the task restarted the timer,
193
      // that will have started a new callback loop, so exiting here is always
194
      // the correct thing to do.
195
11
      return;
196
11
    }
197
11.9M
  }
198
199
  // Capture a weak_ptr reference into the submitted functor so that we can
200
  // safely handle the functor outliving its timer.
201
18.7M
  weak_ptr<PeriodicTimer> w = shared_from_this();
202
18.7M
  messenger_->scheduler().Schedule([w, my_callback_generation](const Status& s) {
203
18.6M
    if (!s.ok()) {
204
      // The reactor was shut down.
205
190
      return;
206
190
    }
207
18.6M
    if (auto timer = w.lock()) {
208
18.5M
      timer->Callback(my_callback_generation);
209
18.5M
    }
210
18.6M
  }, delay.ToSteadyDuration());
211
18.7M
}
212
213
} // namespace rpc
214
} // namespace yb