YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/periodic-test.cc
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <atomic>
19
#include <cstdint>
20
#include <memory>
21
#include <string>
22
#include <utility>
23
24
#include <gtest/gtest.h>
25
26
#include "yb/rpc/messenger.h"
27
#include "yb/rpc/periodic.h"
28
#include "yb/util/monotime.h"
29
#include "yb/util/result.h"
30
#include "yb/util/test_macros.h"
31
#include "yb/util/test_util.h"
32
33
using std::atomic;
34
using std::shared_ptr;
35
36
namespace yb {
37
namespace rpc {
38
39
class PeriodicTimerTest : public YBTest {
40
 public:
41
  PeriodicTimerTest()
42
12
      : period_ms_(200) {}
43
44
12
  void SetUp() override {
45
12
    ASSERT_OK(MessengerBuilder("test").Build().MoveTo(&messenger_));
46
12
  }
47
48
12
  void TearDown() override {
49
12
    messenger_->Shutdown();
50
12
    YBTest::TearDown();
51
12
  }
52
53
 protected:
54
  const int64_t period_ms_;
55
  std::unique_ptr<Messenger> messenger_;
56
};
57
58
class JitteredPeriodicTimerTest : public PeriodicTimerTest,
59
                                  public ::testing::WithParamInterface<double> {
60
 public:
61
  // In TSAN builds it takes a long time to de-schedule a thread. Also,
62
  // the actual time that thread spends sleeping in SleepFor() scenarios
63
  // might be much longer than requested. Setting the task period to be long
64
  // enough allows for more stable behavior of the test, so no flakiness
65
  // is observed even under substantial load. Otherwise it would be necessary
66
  // to introduce additional logic to verify that the actual timings satisfy
67
  // the implicit constraints of the test scenarios below.
68
  JitteredPeriodicTimerTest()
69
10
      : counter_(0) {
70
10
  }
71
72
10
  virtual void SetUp() override {
73
10
    PeriodicTimerTest::SetUp();
74
10
    timer_ = PeriodicTimer::Create(messenger_.get(),
75
17
                                   [&] { counter_++; },
76
10
                                   MonoDelta::FromMilliseconds(period_ms_),
77
10
                                   GetOptions());
78
10
  }
79
80
10
  virtual void TearDown() override {
81
    // Ensure that the reactor threads are fully quiesced (and thus no timer
82
    // callbacks are running) by the time 'counter_' is destroyed.
83
10
    messenger_->Shutdown();
84
85
10
    PeriodicTimerTest::TearDown();
86
10
  }
87
88
 protected:
89
90
8
  virtual PeriodicTimer::Options GetOptions() {
91
8
    PeriodicTimer::Options opts;
92
8
    opts.jitter_pct = GetParam();
93
8
    return opts;
94
8
  }
95
96
  atomic<int64_t> counter_;
97
  shared_ptr<PeriodicTimer> timer_;
98
};
99
100
INSTANTIATE_TEST_CASE_P(AllJitterModes,
101
                        JitteredPeriodicTimerTest,
102
                        ::testing::Values(0.0, 0.25));
103
104
2
TEST_P(JitteredPeriodicTimerTest, TestStartStop) {
105
  // Before the timer starts, the counter's value should not change.
106
2
  SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2));
107
2
  ASSERT_EQ(0, counter_);
108
109
  // Once started, it should increase (exactly how much depends on load and the
110
  // underlying OS scheduler).
111
2
  timer_->Start();
112
2
  SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2));
113
2
  ASSERT_EVENTUALLY([&]{
114
2
    ASSERT_GT(counter_, 0);
115
2
  });
116
117
  // After stopping the timer, the value should either remain the same or
118
  // increment once (if Stop() raced with a scheduled task).
119
2
  timer_->Stop();
120
2
  int64_t v = counter_;
121
2
  messenger_->Shutdown();
122
2
  ASSERT_TRUE(counter_ == v ||
123
2
              counter_ == v + 1);
124
2
}
125
126
2
TEST_P(JitteredPeriodicTimerTest, TestReset) {
127
2
  timer_->Start();
128
2
  MonoTime start_time = MonoTime::Now();
129
130
  // Loop for a little while, resetting the timer's period over and over. As a
131
  // result, the timer should never fire.
132
1.36k
  while (true) {
133
1.36k
    MonoTime now = MonoTime::Now();
134
1.36k
    if (now - start_time > MonoDelta::FromMilliseconds(period_ms_ * 5)) {
135
2
      break;
136
2
    }
137
1.35k
    timer_->Snooze();
138
1.35k
    ASSERT_EQ(0, counter_);
139
1.35k
    SleepFor(MonoDelta::FromMilliseconds(1));
140
1.35k
  }
141
2
}
142
143
2
TEST_P(JitteredPeriodicTimerTest, TestResetWithDelta) {
144
2
  timer_->Start();
145
2
  timer_->Snooze(MonoDelta::FromMilliseconds(period_ms_ * 5));
146
147
  // One period later, the counter still hasn't incremented...
148
2
  SleepFor(MonoDelta::FromMilliseconds(period_ms_));
149
2
  ASSERT_EQ(0, counter_);
150
151
  // ...but it will increment eventually.
152
2
  ASSERT_EVENTUALLY([&](){
153
2
    ASSERT_GT(counter_, 0);
154
2
  });
155
2
}
156
157
2
TEST_P(JitteredPeriodicTimerTest, TestStartWithDelta) {
158
2
  timer_->Start(MonoDelta::FromMilliseconds(period_ms_ * 5));
159
160
  // One period later, the counter still hasn't incremented...
161
2
  SleepFor(MonoDelta::FromMilliseconds(period_ms_));
162
2
  ASSERT_EQ(0, counter_);
163
164
  // ...but it will increment eventually.
165
2
  ASSERT_EVENTUALLY([&](){
166
2
    ASSERT_GT(counter_, 0);
167
2
  });
168
2
}
169
170
1
TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) {
171
1
  const int64_t kPeriods = 10;
172
173
  // Create a timer that restarts itself from within its functor.
174
1
  PeriodicTimer::Options opts;
175
1
  opts.jitter_pct = 0.0; // don't need jittering
176
1
  shared_ptr<PeriodicTimer> timer = PeriodicTimer::Create(
177
1
      messenger_.get(),
178
9
      [&] {
179
9
        timer->Stop();
180
9
        timer->Start();
181
9
      },
182
1
      MonoDelta::FromMilliseconds(period_ms_),
183
1
      std::move(opts));
184
185
  // Run the timer for a fixed amount of time.
186
1
  timer->Start();
187
1
  SleepFor(MonoDelta::FromMilliseconds(period_ms_ * kPeriods));
188
1
  timer->Stop();
189
190
  // Although the timer is restarted by its functor, its overall period should
191
  // remain more or less the same (since the period expired just as the functor
192
  // ran). As such, we should see no more than three callbacks per period:
193
  // one to start scheduling the callback loop, one when it fires, and one more
194
  // after it has been replaced by a new callback loop.
195
1
  ASSERT_LE(timer->NumCallbacksForTests(), kPeriods * 3);
196
1
}
197
198
class JitteredOneShotPeriodicTimerTest : public JitteredPeriodicTimerTest {
199
 protected:
200
2
  virtual PeriodicTimer::Options GetOptions() override {
201
2
    PeriodicTimer::Options opts;
202
2
    opts.jitter_pct = GetParam();
203
2
    opts.one_shot = true;
204
2
    return opts;
205
2
  }
206
};
207
208
INSTANTIATE_TEST_CASE_P(AllJitterModes,
209
                        JitteredOneShotPeriodicTimerTest,
210
                        ::testing::Values(0.0, 0.25));
211
212
2
TEST_P(JitteredOneShotPeriodicTimerTest, TestBasics) {
213
  // Kick off the one-shot timer a few times.
214
8
  for (int i = 0; i < 3; i++) {
215
6
    ASSERT_EQ(i, counter_);
216
217
    // Eventually the task will run.
218
6
    timer_->Start();
219
6
    ASSERT_EVENTUALLY([&](){
220
6
      ASSERT_EQ(i + 1, counter_);
221
6
    });
222
223
    // Even if we explicitly wait another few periods, the counter value
224
    // shouldn't change.
225
6
    SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2));
226
6
    ASSERT_EQ(i + 1, counter_);
227
6
  }
228
2
}
229
230
1
TEST_F(PeriodicTimerTest, TestCallbackRestartsOneShotTimer) {
231
1
  atomic<int64_t> counter(0);
232
233
  // Create a timer that restarts itself from within its functor.
234
1
  PeriodicTimer::Options opts;
235
1
  opts.jitter_pct = 0.0; // don't need jittering
236
1
  opts.one_shot = true;
237
1
  shared_ptr<PeriodicTimer> timer = PeriodicTimer::Create(
238
1
      messenger_.get(),
239
5
      [&] {
240
5
        counter++;
241
5
        timer->Start();
242
5
      },
243
1
      MonoDelta::FromMilliseconds(period_ms_),
244
1
      std::move(opts));
245
246
  // Because the timer restarts itself every time the functor runs, we
247
  // should see the counter value increase with each period.
248
1
  timer->Start();
249
1
  ASSERT_EVENTUALLY([&](){
250
1
    ASSERT_GE(counter, 5);
251
1
  });
252
253
  // Ensure that the reactor threads are fully quiesced (and thus no timer
254
  // callbacks are running) by the time 'counter' is destroyed.
255
1
  messenger_->Shutdown();
256
1
}
257
258
} // namespace rpc
259
} // namespace yb