/Users/deen/code/yugabyte-db/src/yb/util/priority_thread_pool-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include <algorithm> |
14 | | #include <thread> |
15 | | |
16 | | #include <gtest/gtest.h> |
17 | | |
18 | | #include "yb/util/priority_thread_pool.h" |
19 | | #include "yb/util/random_util.h" |
20 | | #include "yb/util/scope_exit.h" |
21 | | #include "yb/util/test_macros.h" |
22 | | #include "yb/util/test_thread_holder.h" |
23 | | #include "yb/util/tostring.h" |
24 | | |
25 | | using namespace std::literals; |
26 | | |
27 | | namespace yb { |
28 | | |
29 | | // In our jenkins environment test macs switch threads rare, so have to use higher step time. |
30 | | #if defined(__APPLE__) |
31 | | const auto kStepTime = 100ms; |
32 | | #else |
33 | | const auto kStepTime = 25ms; |
34 | | #endif |
35 | | const auto kWaitTime = kStepTime * 3; |
36 | | |
37 | | class Share { |
38 | | public: |
39 | 7.15k | bool Step(int index) { |
40 | 7.15k | running_.fetch_or(1ULL << index, std::memory_order_acq_rel); |
41 | 7.15k | return (stop_.load(std::memory_order_acquire) & (1ULL << index)) == 0; |
42 | 7.15k | } |
43 | | |
44 | 42 | void Stop(int index) { |
45 | 42 | stop_.fetch_or(1ULL << index, std::memory_order_acq_rel); |
46 | 42 | } |
47 | | |
48 | 4 | void StopAll() { |
49 | 4 | stop_.store(std::numeric_limits<uint64_t>::max(), std::memory_order_release); |
50 | 4 | } |
51 | | |
52 | 486 | void ResetRunning() { |
53 | 486 | running_.store(0, std::memory_order_release); |
54 | 486 | } |
55 | | |
56 | 486 | uint64_t running() { |
57 | 486 | return running_.load(std::memory_order_acquire); |
58 | 486 | } |
59 | | |
60 | | // Fill `out` with priorities of running tasks. |
61 | | // `divisor` is used to convert index to priority. |
62 | 486 | void FillRunningTaskPriorities(std::vector<int>* out, int divisor = 1) { |
63 | 486 | std::this_thread::sleep_for(kWaitTime); |
64 | 486 | ResetRunning(); |
65 | 486 | std::this_thread::sleep_for(kWaitTime); |
66 | 486 | auto running_mask = running(); |
67 | 486 | out->clear(); |
68 | 486 | int i = 0; |
69 | 6.84k | while (running_mask != 0) { |
70 | 6.35k | if (running_mask & 1) { |
71 | 1.21k | out->push_back(i / divisor); |
72 | 1.21k | } |
73 | 6.35k | running_mask >>= 1; |
74 | 6.35k | ++i; |
75 | 6.35k | } |
76 | 486 | } |
77 | | |
78 | | private: |
79 | | // ith bit is set to 1 when task i was reported as running after last reset. |
80 | | std::atomic<uint64_t> running_{0}; |
81 | | // ith bit is set to 1 when we should stop task i. |
82 | | std::atomic<uint64_t> stop_{0}; |
83 | | }; |
84 | | |
85 | | class Task : public PriorityThreadPoolTask { |
86 | | public: |
87 | 52 | Task(int index, Share* share) : index_(index), share_(share) {} |
88 | | |
89 | 52 | void Run(const Status& status, PriorityThreadPoolSuspender* suspender) override { |
90 | 52 | if (!status.ok()) { |
91 | 1 | return; |
92 | 1 | } |
93 | 7.17k | while (share_->Step(index_)) { |
94 | 7.12k | suspender->PauseIfNecessary(); |
95 | 7.12k | std::this_thread::sleep_for(kStepTime); |
96 | 7.12k | } |
97 | 51 | } |
98 | | |
99 | 0 | bool ShouldRemoveWithKey(void* key) override { |
100 | 0 | return false; |
101 | 0 | } |
102 | | |
103 | 80 | int Index() const { |
104 | 80 | return index_; |
105 | 80 | } |
106 | | |
107 | 32 | std::string ToString() const override { |
108 | 32 | return YB_CLASS_TO_STRING(index); |
109 | 32 | } |
110 | | |
111 | | private: |
112 | | const int index_; |
113 | | Share* const share_; |
114 | | }; |
115 | | |
116 | | // Test randomly submits and stops task to priority thread pool. |
117 | | // Checking that only expected tasks are running. |
118 | | // Task priority is calculated as index/divisor. |
119 | | // So with divisor == 1 we get unique priorities, and with divisor > 1 multiple tasks could have |
120 | | // the same priority. |
121 | 2 | void TestRandom(int divisor) { |
122 | 2 | const int kTasks = 20; |
123 | 2 | const int kMaxRunningTasks = 3; |
124 | 2 | PriorityThreadPool thread_pool(kMaxRunningTasks); |
125 | 2 | std::vector<std::unique_ptr<Task>> tasks; |
126 | 2 | tasks.reserve(kTasks); |
127 | 2 | Share share; |
128 | 42 | for (int i = 0; i != kTasks; ++i) { |
129 | 40 | tasks.emplace_back(std::make_unique<Task>(i, &share)); |
130 | 40 | } |
131 | 2 | std::shuffle(tasks.begin(), tasks.end(), ThreadLocalRandom()); |
132 | 2 | std::set<int> scheduled; |
133 | 2 | size_t schedule_idx = 0; |
134 | 2 | std::set<int> stopped; |
135 | 2 | std::vector<int> running_vector; |
136 | 2 | std::vector<int> expected_running; |
137 | | |
138 | 2 | auto se = ScopeExit([&share, &thread_pool] { |
139 | 2 | thread_pool.StartShutdown(); |
140 | 2 | share.StopAll(); |
141 | 2 | thread_pool.CompleteShutdown(); |
142 | 2 | }); |
143 | | |
144 | 477 | while (stopped.size() != kTasks) { |
145 | 475 | if (schedule_idx < kTasks && RandomUniformInt<size_t>(0, 2 + scheduled.size()) == 0) { |
146 | 40 | auto& task = tasks[schedule_idx]; |
147 | 40 | auto index = task->Index(); |
148 | 40 | scheduled.insert(index); |
149 | 40 | ++schedule_idx; |
150 | 40 | auto priority = task->Index() / divisor; |
151 | 40 | ASSERT_OK(thread_pool.Submit(priority, &task)); |
152 | 40 | ASSERT_TRUE(task == nullptr); |
153 | 40 | LOG(INFO) << "Submitted: " << index << ", scheduled: " << yb::ToString(scheduled); |
154 | 435 | } else if (!scheduled.empty() && |
155 | 431 | RandomUniformInt<size_t>(0, std::max<size_t>(0, 13 - scheduled.size())) == 0) { |
156 | 40 | auto it = scheduled.end(); |
157 | 40 | std::advance( |
158 | 40 | it, -RandomUniformInt<ssize_t>(1, std::min<ssize_t>(scheduled.size(), kMaxRunningTasks))); |
159 | 40 | auto idx = *it; |
160 | 40 | stopped.insert(idx); |
161 | 40 | share.Stop(idx); |
162 | 40 | scheduled.erase(it); |
163 | 40 | LOG(INFO) << "Stopped: " << idx << ", scheduled: " << yb::ToString(scheduled); |
164 | 40 | } |
165 | 475 | share.FillRunningTaskPriorities(&running_vector, divisor); |
166 | 475 | expected_running.clear(); |
167 | 475 | auto it = scheduled.end(); |
168 | 475 | auto left = kMaxRunningTasks; |
169 | 1.66k | while (it != scheduled.begin() && left > 0) { |
170 | 1.19k | --it; |
171 | 1.19k | expected_running.push_back(*it / divisor); |
172 | 1.19k | --left; |
173 | 1.19k | } |
174 | 475 | std::reverse(expected_running.begin(), expected_running.end()); |
175 | 950 | ASSERT_EQ(expected_running, running_vector) << "Scheduled: " << yb::ToString(scheduled) |
176 | 950 | << ", running: " << yb::ToString(running_vector) |
177 | 950 | << ", state: " << thread_pool.StateToString(); |
178 | 475 | } |
179 | 2 | } |
180 | | |
181 | 1 | TEST(PriorityThreadPoolTest, RandomUnique) { |
182 | 1 | TestRandom(1 /* divisor */); |
183 | 1 | } |
184 | | |
185 | 1 | TEST(PriorityThreadPoolTest, RandomNonUnique) { |
186 | 1 | TestRandom(3 /* divisor */); |
187 | 1 | } |
188 | | |
189 | | constexpr int kMaxRandomTaskTimeMs = 40; |
190 | | |
191 | | class RandomTask : public PriorityThreadPoolTask { |
192 | | public: |
193 | 20.9k | RandomTask() = default; |
194 | | |
195 | 20.9k | void Run(const Status& status, PriorityThreadPoolSuspender* suspender) override { |
196 | 20.9k | if (!status.ok()) { |
197 | 0 | return; |
198 | 0 | } |
199 | 20.9k | std::this_thread::sleep_for(1ms * RandomUniformInt(1, kMaxRandomTaskTimeMs)); |
200 | 20.9k | } |
201 | | |
202 | | // Returns true if the task belongs to specified key, which was passed to |
203 | | // PriorityThreadPool::Remove and should be removed. |
204 | 0 | bool ShouldRemoveWithKey(void* key) override { |
205 | 0 | return false; |
206 | 0 | } |
207 | | |
208 | 0 | std::string ToString() const override { |
209 | 0 | return "RandomTask"; |
210 | 0 | } |
211 | | }; |
212 | | |
213 | 1 | TEST(PriorityThreadPoolTest, RandomTasks) { |
214 | 1 | constexpr int kMaxRunningTasks = 10; |
215 | 1 | PriorityThreadPool thread_pool(kMaxRunningTasks); |
216 | 1 | TestThreadHolder holder; |
217 | 11 | for (int i = 0; i != kMaxRunningTasks; ++i) { |
218 | 10 | holder.AddThread([&stop = holder.stop_flag(), &thread_pool] { |
219 | 21.0k | while (!stop.load()) { |
220 | 20.9k | auto priority = RandomUniformInt(0, 4); |
221 | 20.9k | auto temp_task = std::make_unique<RandomTask>(); |
222 | 20.9k | ASSERT_OK(thread_pool.Submit(priority, &temp_task)); |
223 | 20.9k | ASSERT_TRUE(temp_task == nullptr); |
224 | | // Submit tasks slightly slower than they complete. |
225 | | // To frequently get case of empty thread pool. |
226 | 21.0k | std::this_thread::sleep_for(1ms * RandomUniformInt(1, kMaxRandomTaskTimeMs * 5 / 4)); |
227 | 21.0k | } |
228 | 10 | }); |
229 | 10 | } |
230 | 1 | holder.WaitAndStop(60s); |
231 | 1 | thread_pool.Shutdown(); |
232 | 1 | } |
233 | | |
234 | | namespace { |
235 | | |
236 | 12 | size_t SubmitTask(int index, Share* share, PriorityThreadPool* thread_pool) { |
237 | 12 | auto task = std::make_unique<Task>(index, share); |
238 | 12 | size_t serial_no = task->SerialNo(); |
239 | 12 | EXPECT_OK(thread_pool->Submit(index /* priority */, &task)); |
240 | 12 | EXPECT_TRUE(task == nullptr); |
241 | 12 | LOG(INFO) << "Started " << index << ", serial no: " << serial_no; |
242 | 12 | return serial_no; |
243 | 12 | } |
244 | | |
245 | | } // namespace |
246 | | |
247 | 1 | TEST(PriorityThreadPoolTest, ChangePriority) { |
248 | 1 | const int kMaxRunningTasks = 3; |
249 | 1 | PriorityThreadPool thread_pool(kMaxRunningTasks); |
250 | 1 | Share share; |
251 | 1 | std::vector<int> running; |
252 | | |
253 | 1 | auto se = ScopeExit([&share, &thread_pool] { |
254 | 1 | thread_pool.StartShutdown(); |
255 | 1 | share.StopAll(); |
256 | 1 | thread_pool.CompleteShutdown(); |
257 | 1 | }); |
258 | | |
259 | 1 | auto task5 = SubmitTask(5, &share, &thread_pool); |
260 | 1 | SubmitTask(6, &share, &thread_pool); |
261 | 1 | SubmitTask(7, &share, &thread_pool); |
262 | 1 | auto task1 = SubmitTask(1, &share, &thread_pool); |
263 | 1 | auto task2 = SubmitTask(2, &share, &thread_pool); |
264 | 1 | SubmitTask(3, &share, &thread_pool); |
265 | | |
266 | 1 | share.FillRunningTaskPriorities(&running); |
267 | 1 | ASSERT_EQ(running, std::vector<int>({5, 6, 7})); |
268 | | |
269 | | // Check that we could pause running task when priority increased. |
270 | 1 | ASSERT_TRUE(thread_pool.ChangeTaskPriority(task1, 8)); |
271 | 1 | share.FillRunningTaskPriorities(&running); |
272 | 1 | ASSERT_EQ(running, std::vector<int>({1, 6, 7})); |
273 | | |
274 | | // Check that priority of queued task could be updated. |
275 | 1 | ASSERT_TRUE(thread_pool.ChangeTaskPriority(task2, 4)); |
276 | 1 | share.Stop(1); |
277 | 1 | share.Stop(7); |
278 | | |
279 | 1 | share.FillRunningTaskPriorities(&running); |
280 | 1 | ASSERT_EQ(running, std::vector<int>({2, 5, 6})); |
281 | | |
282 | | // Check decrease of priority. |
283 | 1 | ASSERT_TRUE(thread_pool.ChangeTaskPriority(task5, 1)); |
284 | 1 | share.FillRunningTaskPriorities(&running); |
285 | 1 | ASSERT_EQ(running, std::vector<int>({2, 3, 6})); |
286 | | |
287 | | // Check same priority. |
288 | 1 | ASSERT_TRUE(thread_pool.ChangeTaskPriority(task5, 6)); |
289 | 1 | share.FillRunningTaskPriorities(&running); |
290 | 1 | ASSERT_EQ(running, std::vector<int>({2, 5, 6})); |
291 | 1 | } |
292 | | |
293 | 1 | TEST(PriorityThreadPoolTest, FailureToCreateThread) { |
294 | 1 | const int kMaxRunningTasks = 3; |
295 | 1 | PriorityThreadPool thread_pool(kMaxRunningTasks); |
296 | 1 | Share share; |
297 | 1 | std::vector<int> running; |
298 | | |
299 | 1 | auto se = ScopeExit([&share, &thread_pool] { |
300 | 1 | LOG(INFO) << "Final state of the pool: " << thread_pool.StateToString(); |
301 | 1 | thread_pool.StartShutdown(); |
302 | 1 | share.StopAll(); |
303 | 1 | thread_pool.CompleteShutdown(); |
304 | 1 | }); |
305 | | |
306 | 1 | SubmitTask(5, &share, &thread_pool); |
307 | 1 | SubmitTask(6, &share, &thread_pool); |
308 | | |
309 | 1 | share.FillRunningTaskPriorities(&running); |
310 | 1 | ASSERT_EQ(running, std::vector<int>({5, 6})); |
311 | | |
312 | 1 | LOG(INFO) << "DISABLING CREATION OF NEW THREADS"; |
313 | | |
314 | | // We fail to launch a new thread and just add the task to the list of waiting tasks. |
315 | 1 | thread_pool.TEST_SetThreadCreationFailureProbability(1); |
316 | | |
317 | | // We cannot launch new threads now so we just add tasks as "not started" and the same two tasks |
318 | | // keep running. |
319 | | // |
320 | | // Prior to the #8348 fix, these failures also incorrectly increause the paused_workers_ counter |
321 | | // each time, which then causes an underflow in PickWorker and we would not be able to start |
322 | | // any new tasks at all. |
323 | 1 | SubmitTask(7, &share, &thread_pool); |
324 | 1 | share.FillRunningTaskPriorities(&running); |
325 | 1 | ASSERT_EQ(running, std::vector<int>({5, 6})); |
326 | | |
327 | 1 | SubmitTask(8, &share, &thread_pool); |
328 | 1 | share.FillRunningTaskPriorities(&running); |
329 | 1 | ASSERT_EQ(running, std::vector<int>({5, 6})); |
330 | | |
331 | 1 | SubmitTask(9, &share, &thread_pool); |
332 | 1 | share.FillRunningTaskPriorities(&running); |
333 | 1 | ASSERT_EQ(running, std::vector<int>({5, 6})); |
334 | | |
335 | 1 | LOG(INFO) << "ALLOWING CREATION OF NEW THREADS"; |
336 | | |
337 | | // Now allow adding new threads and launch more tasks. Tasks 5 and 6 should immediately get |
338 | | // paused and replaced with 8 and 9, but because we pause and launch exactly one task, 7 does |
339 | | // not actually get started, even though we have enough threads and enough quota for it. |
340 | | // We might consider changing this in the future. |
341 | 1 | thread_pool.TEST_SetThreadCreationFailureProbability(0); |
342 | 1 | std::this_thread::sleep_for(kWaitTime); |
343 | | |
344 | 1 | share.FillRunningTaskPriorities(&running); |
345 | 1 | ASSERT_EQ(running, std::vector<int>({8, 9})); |
346 | | |
347 | 1 | SubmitTask(10, &share, &thread_pool); |
348 | 1 | share.FillRunningTaskPriorities(&running); |
349 | 1 | ASSERT_EQ(running, std::vector<int>({8, 9, 10})); |
350 | 1 | } |
351 | | |
352 | | } // namespace yb |