/Users/deen/code/yugabyte-db/src/yb/rpc/periodic-test.cc
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <atomic> |
19 | | #include <cstdint> |
20 | | #include <memory> |
21 | | #include <string> |
22 | | #include <utility> |
23 | | |
24 | | #include <gtest/gtest.h> |
25 | | |
26 | | #include "yb/rpc/messenger.h" |
27 | | #include "yb/rpc/periodic.h" |
28 | | #include "yb/util/monotime.h" |
29 | | #include "yb/util/result.h" |
30 | | #include "yb/util/test_macros.h" |
31 | | #include "yb/util/test_util.h" |
32 | | |
33 | | using std::atomic; |
34 | | using std::shared_ptr; |
35 | | |
36 | | namespace yb { |
37 | | namespace rpc { |
38 | | |
39 | | class PeriodicTimerTest : public YBTest { |
40 | | public: |
41 | | PeriodicTimerTest() |
42 | 12 | : period_ms_(200) {} |
43 | | |
44 | 12 | void SetUp() override { |
45 | 12 | ASSERT_OK(MessengerBuilder("test").Build().MoveTo(&messenger_)); |
46 | 12 | } |
47 | | |
48 | 12 | void TearDown() override { |
49 | 12 | messenger_->Shutdown(); |
50 | 12 | YBTest::TearDown(); |
51 | 12 | } |
52 | | |
53 | | protected: |
54 | | const int64_t period_ms_; |
55 | | std::unique_ptr<Messenger> messenger_; |
56 | | }; |
57 | | |
58 | | class JitteredPeriodicTimerTest : public PeriodicTimerTest, |
59 | | public ::testing::WithParamInterface<double> { |
60 | | public: |
61 | | // In TSAN builds it takes a long time to de-schedule a thread. Also, |
62 | | // the actual time that thread spends sleeping in SleepFor() scenarios |
63 | | // might be much longer than requested. Setting the task period to be long |
64 | | // enough allows for more stable behavior of the test, so no flakiness |
65 | | // is observed even under substantial load. Otherwise it would be necessary |
66 | | // to introduce additional logic to verify that the actual timings satisfy |
67 | | // the implicit constraints of the test scenarios below. |
68 | | JitteredPeriodicTimerTest() |
69 | 10 | : counter_(0) { |
70 | 10 | } |
71 | | |
72 | 10 | virtual void SetUp() override { |
73 | 10 | PeriodicTimerTest::SetUp(); |
74 | 10 | timer_ = PeriodicTimer::Create(messenger_.get(), |
75 | 17 | [&] { counter_++; }, |
76 | 10 | MonoDelta::FromMilliseconds(period_ms_), |
77 | 10 | GetOptions()); |
78 | 10 | } |
79 | | |
80 | 10 | virtual void TearDown() override { |
81 | | // Ensure that the reactor threads are fully quiesced (and thus no timer |
82 | | // callbacks are running) by the time 'counter_' is destroyed. |
83 | 10 | messenger_->Shutdown(); |
84 | | |
85 | 10 | PeriodicTimerTest::TearDown(); |
86 | 10 | } |
87 | | |
88 | | protected: |
89 | | |
90 | 8 | virtual PeriodicTimer::Options GetOptions() { |
91 | 8 | PeriodicTimer::Options opts; |
92 | 8 | opts.jitter_pct = GetParam(); |
93 | 8 | return opts; |
94 | 8 | } |
95 | | |
96 | | atomic<int64_t> counter_; |
97 | | shared_ptr<PeriodicTimer> timer_; |
98 | | }; |
99 | | |
100 | | INSTANTIATE_TEST_CASE_P(AllJitterModes, |
101 | | JitteredPeriodicTimerTest, |
102 | | ::testing::Values(0.0, 0.25)); |
103 | | |
104 | 2 | TEST_P(JitteredPeriodicTimerTest, TestStartStop) { |
105 | | // Before the timer starts, the counter's value should not change. |
106 | 2 | SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2)); |
107 | 2 | ASSERT_EQ(0, counter_); |
108 | | |
109 | | // Once started, it should increase (exactly how much depends on load and the |
110 | | // underlying OS scheduler). |
111 | 2 | timer_->Start(); |
112 | 2 | SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2)); |
113 | 2 | ASSERT_EVENTUALLY([&]{ |
114 | 2 | ASSERT_GT(counter_, 0); |
115 | 2 | }); |
116 | | |
117 | | // After stopping the timer, the value should either remain the same or |
118 | | // increment once (if Stop() raced with a scheduled task). |
119 | 2 | timer_->Stop(); |
120 | 2 | int64_t v = counter_; |
121 | 2 | messenger_->Shutdown(); |
122 | 2 | ASSERT_TRUE(counter_ == v || |
123 | 2 | counter_ == v + 1); |
124 | 2 | } |
125 | | |
126 | 2 | TEST_P(JitteredPeriodicTimerTest, TestReset) { |
127 | 2 | timer_->Start(); |
128 | 2 | MonoTime start_time = MonoTime::Now(); |
129 | | |
130 | | // Loop for a little while, resetting the timer's period over and over. As a |
131 | | // result, the timer should never fire. |
132 | 1.36k | while (true) { |
133 | 1.36k | MonoTime now = MonoTime::Now(); |
134 | 1.36k | if (now - start_time > MonoDelta::FromMilliseconds(period_ms_ * 5)) { |
135 | 2 | break; |
136 | 2 | } |
137 | 1.35k | timer_->Snooze(); |
138 | 1.35k | ASSERT_EQ(0, counter_); |
139 | 1.35k | SleepFor(MonoDelta::FromMilliseconds(1)); |
140 | 1.35k | } |
141 | 2 | } |
142 | | |
143 | 2 | TEST_P(JitteredPeriodicTimerTest, TestResetWithDelta) { |
144 | 2 | timer_->Start(); |
145 | 2 | timer_->Snooze(MonoDelta::FromMilliseconds(period_ms_ * 5)); |
146 | | |
147 | | // One period later, the counter still hasn't incremented... |
148 | 2 | SleepFor(MonoDelta::FromMilliseconds(period_ms_)); |
149 | 2 | ASSERT_EQ(0, counter_); |
150 | | |
151 | | // ...but it will increment eventually. |
152 | 2 | ASSERT_EVENTUALLY([&](){ |
153 | 2 | ASSERT_GT(counter_, 0); |
154 | 2 | }); |
155 | 2 | } |
156 | | |
157 | 2 | TEST_P(JitteredPeriodicTimerTest, TestStartWithDelta) { |
158 | 2 | timer_->Start(MonoDelta::FromMilliseconds(period_ms_ * 5)); |
159 | | |
160 | | // One period later, the counter still hasn't incremented... |
161 | 2 | SleepFor(MonoDelta::FromMilliseconds(period_ms_)); |
162 | 2 | ASSERT_EQ(0, counter_); |
163 | | |
164 | | // ...but it will increment eventually. |
165 | 2 | ASSERT_EVENTUALLY([&](){ |
166 | 2 | ASSERT_GT(counter_, 0); |
167 | 2 | }); |
168 | 2 | } |
169 | | |
170 | 1 | TEST_F(PeriodicTimerTest, TestCallbackRestartsTimer) { |
171 | 1 | const int64_t kPeriods = 10; |
172 | | |
173 | | // Create a timer that restarts itself from within its functor. |
174 | 1 | PeriodicTimer::Options opts; |
175 | 1 | opts.jitter_pct = 0.0; // don't need jittering |
176 | 1 | shared_ptr<PeriodicTimer> timer = PeriodicTimer::Create( |
177 | 1 | messenger_.get(), |
178 | 9 | [&] { |
179 | 9 | timer->Stop(); |
180 | 9 | timer->Start(); |
181 | 9 | }, |
182 | 1 | MonoDelta::FromMilliseconds(period_ms_), |
183 | 1 | std::move(opts)); |
184 | | |
185 | | // Run the timer for a fixed amount of time. |
186 | 1 | timer->Start(); |
187 | 1 | SleepFor(MonoDelta::FromMilliseconds(period_ms_ * kPeriods)); |
188 | 1 | timer->Stop(); |
189 | | |
190 | | // Although the timer is restarted by its functor, its overall period should |
191 | | // remain more or less the same (since the period expired just as the functor |
192 | | // ran). As such, we should see no more than three callbacks per period: |
193 | | // one to start scheduling the callback loop, one when it fires, and one more |
194 | | // after it has been replaced by a new callback loop. |
195 | 1 | ASSERT_LE(timer->NumCallbacksForTests(), kPeriods * 3); |
196 | 1 | } |
197 | | |
198 | | class JitteredOneShotPeriodicTimerTest : public JitteredPeriodicTimerTest { |
199 | | protected: |
200 | 2 | virtual PeriodicTimer::Options GetOptions() override { |
201 | 2 | PeriodicTimer::Options opts; |
202 | 2 | opts.jitter_pct = GetParam(); |
203 | 2 | opts.one_shot = true; |
204 | 2 | return opts; |
205 | 2 | } |
206 | | }; |
207 | | |
208 | | INSTANTIATE_TEST_CASE_P(AllJitterModes, |
209 | | JitteredOneShotPeriodicTimerTest, |
210 | | ::testing::Values(0.0, 0.25)); |
211 | | |
212 | 2 | TEST_P(JitteredOneShotPeriodicTimerTest, TestBasics) { |
213 | | // Kick off the one-shot timer a few times. |
214 | 8 | for (int i = 0; i < 3; i++) { |
215 | 6 | ASSERT_EQ(i, counter_); |
216 | | |
217 | | // Eventually the task will run. |
218 | 6 | timer_->Start(); |
219 | 6 | ASSERT_EVENTUALLY([&](){ |
220 | 6 | ASSERT_EQ(i + 1, counter_); |
221 | 6 | }); |
222 | | |
223 | | // Even if we explicitly wait another few periods, the counter value |
224 | | // shouldn't change. |
225 | 6 | SleepFor(MonoDelta::FromMilliseconds(period_ms_ * 2)); |
226 | 6 | ASSERT_EQ(i + 1, counter_); |
227 | 6 | } |
228 | 2 | } |
229 | | |
230 | 1 | TEST_F(PeriodicTimerTest, TestCallbackRestartsOneShotTimer) { |
231 | 1 | atomic<int64_t> counter(0); |
232 | | |
233 | | // Create a timer that restarts itself from within its functor. |
234 | 1 | PeriodicTimer::Options opts; |
235 | 1 | opts.jitter_pct = 0.0; // don't need jittering |
236 | 1 | opts.one_shot = true; |
237 | 1 | shared_ptr<PeriodicTimer> timer = PeriodicTimer::Create( |
238 | 1 | messenger_.get(), |
239 | 5 | [&] { |
240 | 5 | counter++; |
241 | 5 | timer->Start(); |
242 | 5 | }, |
243 | 1 | MonoDelta::FromMilliseconds(period_ms_), |
244 | 1 | std::move(opts)); |
245 | | |
246 | | // Because the timer restarts itself every time the functor runs, we |
247 | | // should see the counter value increase with each period. |
248 | 1 | timer->Start(); |
249 | 1 | ASSERT_EVENTUALLY([&](){ |
250 | 1 | ASSERT_GE(counter, 5); |
251 | 1 | }); |
252 | | |
253 | | // Ensure that the reactor threads are fully quiesced (and thus no timer |
254 | | // callbacks are running) by the time 'counter' is destroyed. |
255 | 1 | messenger_->Shutdown(); |
256 | 1 | } |
257 | | |
258 | | } // namespace rpc |
259 | | } // namespace yb |