YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/periodic.cc
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "yb/rpc/periodic.h"
19
20
#include <algorithm>
21
#include <memory>
22
#include <mutex>
23
24
#include <boost/function.hpp>
25
#include <glog/logging.h>
26
27
#include "yb/rpc/messenger.h"
28
#include "yb/util/monotime.h"
29
#include "yb/util/random.h"
30
#include "yb/util/random_util.h"
31
#include "yb/util/status.h"
32
33
using std::shared_ptr;
34
using std::weak_ptr;
35
36
namespace yb {
37
namespace rpc {
38
39
PeriodicTimer::Options::Options()
40
    : jitter_pct(0.25),
41
158k
      one_shot(false) {
42
158k
}
43
44
shared_ptr<PeriodicTimer> PeriodicTimer::Create(
45
    Messenger* messenger,
46
    RunTaskFunctor functor,
47
    MonoDelta period,
48
158k
    Options options) {
49
158k
  return std::make_shared<PeriodicTimer>(messenger, std::move(functor), period, options);
50
158k
}
51
52
PeriodicTimer::PeriodicTimer(
53
    Messenger* messenger,
54
    RunTaskFunctor functor,
55
    MonoDelta period,
56
    Options options)
57
    : messenger_(messenger),
58
      functor_(std::move(functor)),
59
      period_(period),
60
      options_(std::move(options)),
61
      rng_(GetRandomSeed32()),
62
      current_callback_generation_(0),
63
      num_callbacks_for_tests_(0),
64
158k
      started_(false) {
65
158k
  DCHECK_GE(options_.jitter_pct, 0);
66
158k
  DCHECK_LE(options_.jitter_pct, 1);
67
158k
}
68
69
91.0k
PeriodicTimer::~PeriodicTimer() {
70
91.0k
  Stop();
71
91.0k
}
72
73
163k
void PeriodicTimer::Start(MonoDelta next_task_delta) {
74
163k
  std::unique_lock<simple_spinlock> l(lock_);
75
163k
  if (!started_) {
76
163k
    started_ = true;
77
163k
    SnoozeUnlocked(next_task_delta);
78
163k
    auto new_callback_generation = ++current_callback_generation_;
79
80
    // Invoke Callback() with the lock released.
81
163k
    l.unlock();
82
163k
    Callback(new_callback_generation);
83
163k
  }
84
163k
}
85
86
217k
void PeriodicTimer::Stop() {
87
217k
  std::lock_guard<simple_spinlock> l(lock_);
88
217k
  StopUnlocked();
89
217k
}
90
91
217k
void PeriodicTimer::StopUnlocked() {
92
217k
  DCHECK(lock_.is_locked());
93
217k
  started_ = false;
94
217k
}
95
96
18.7M
void PeriodicTimer::Snooze(MonoDelta next_task_delta) {
97
18.7M
  std::lock_guard<simple_spinlock> l(lock_);
98
18.7M
  SnoozeUnlocked(next_task_delta);
99
18.7M
}
100
101
20.9M
void PeriodicTimer::SnoozeUnlocked(MonoDelta next_task_delta) {
102
20.9M
  DCHECK(lock_.is_locked());
103
20.9M
  if (!started_) {
104
55
    return;
105
55
  }
106
107
20.9M
  if (!next_task_delta) {
108
    // Given jitter percentage J and period P, this yields a delay somewhere
109
    // between (1-J)*P and (1+J)*P.
110
10.4M
    next_task_delta = MonoDelta::FromMilliseconds(
111
10.4M
        GetMinimumPeriod().ToMilliseconds() +
112
10.4M
        rng_.NextDoubleFraction() *
113
10.4M
        options_.jitter_pct *
114
10.4M
        (2 * period_.ToMilliseconds()));
115
10.4M
  }
116
20.9M
  next_task_time_ = MonoTime::Now() + next_task_delta;
117
20.9M
}
118
119
13.6M
MonoDelta PeriodicTimer::GetMinimumPeriod() {
120
  // Given jitter percentage J and period P, this returns (1-J)*P, which is
121
  // the lowest possible jittered value.
122
13.6M
  return MonoDelta::FromMilliseconds((1.0 - options_.jitter_pct) *
123
13.6M
                                     period_.ToMilliseconds());
124
13.6M
}
125
126
1
int64_t PeriodicTimer::NumCallbacksForTests() const {
127
1
  std::lock_guard<simple_spinlock> l(lock_);
128
1
  return num_callbacks_for_tests_;
129
1
}
130
131
3.17M
void PeriodicTimer::Callback(int64_t my_callback_generation) {
132
  // To simplify the implementation, a timer may have only one outstanding
133
  // callback scheduled at a time. This means that once the callback is
134
  // scheduled, the timer's task cannot run any earlier than whenever the
135
  // callback runs. Thus, the delay used when scheduling the callback dictates
136
  // the lowest possible value of 'next_task_delta' that Snooze() can honor.
137
  //
138
  // If the callback's delay is very low, Snooze() can honor a low
139
  // 'next_task_delta', but the callback will run often and burn more CPU
140
  // cycles. If the delay is very high, the timer will be more efficient but
141
  // the granularity for 'next_task_delta' will rise accordingly.
142
  //
143
  // As a "happy medium" we use GetMinimumPeriod() as the delay. This ensures
144
  // that a no-arg Snooze() on a jittered timer will always be honored, and as
145
  // long as the caller passes a value of at least GetMinimumPeriod() to
146
  // Snooze(), that too will be honored.
147
3.17M
  MonoDelta delay = GetMinimumPeriod();
148
3.17M
  bool run_task = false;
149
3.17M
  {
150
3.17M
    std::lock_guard<simple_spinlock> l(lock_);
151
3.17M
    num_callbacks_for_tests_++;
152
153
    // If the timer was stopped, exit.
154
3.17M
    if (!started_) {
155
28.5k
      return;
156
28.5k
    }
157
158
    // If there's a new callback loop in town, exit.
159
    //
160
    // We could check again just before calling Messenger::ScheduleOnReactor()
161
    // (in case someone else restarted the timer while the functor ran, or in
162
    // case the functor itself restarted the timer), but there's no real reason
163
    // to do so: the very next iteration of this callback loop will wind up here
164
    // and exit.
165
3.14M
    if (current_callback_generation_ > my_callback_generation) {
166
1.21k
      return;
167
1.21k
    }
168
169
3.14M
    MonoTime now = MonoTime::Now();
170
3.14M
    if (now < next_task_time_) {
171
      // It's not yet time to run the task. Reduce the scheduled delay if
172
      // enough time has elapsed, but don't increase it.
173
1.08M
      delay = std::min(delay, next_task_time_ - now);
174
2.05M
    } else {
175
      // It's time to run the task. Although the next task time is reset now,
176
      // it may be reset again by virtue of running the task itself.
177
2.05M
      run_task = true;
178
179
2.05M
      if (options_.one_shot) {
180
        // Stop the timer first, in case the task wants to restart it.
181
11
        StopUnlocked();
182
11
      }
183
2.05M
      SnoozeUnlocked();
184
2.05M
      delay = next_task_time_ - now;
185
2.05M
    }
186
3.14M
  }
187
188
3.14M
  if (run_task) {
189
2.05M
    functor_();
190
191
2.05M
    if (options_.one_shot) {
192
      // The task was run; exit the loop. Even if the task restarted the timer,
193
      // that will have started a new callback loop, so exiting here is always
194
      // the correct thing to do.
195
11
      return;
196
11
    }
197
3.14M
  }
198
199
  // Capture a weak_ptr reference into the submitted functor so that we can
200
  // safely handle the functor outliving its timer.
201
3.14M
  weak_ptr<PeriodicTimer> w = shared_from_this();
202
3.08M
  messenger_->scheduler().Schedule([w, my_callback_generation](const Status& s) {
203
3.08M
    if (!s.ok()) {
204
      // The reactor was shut down.
205
184
      return;
206
184
    }
207
3.08M
    if (auto timer = w.lock()) {
208
3.01M
      timer->Callback(my_callback_generation);
209
3.01M
    }
210
3.08M
  }, delay.ToSteadyDuration());
211
3.14M
}
212
213
} // namespace rpc
214
} // namespace yb