/Users/deen/code/yugabyte-db/src/yb/util/threadpool-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <atomic> |
34 | | #include <functional> |
35 | | #include <limits> |
36 | | #include <memory> |
37 | | #include <mutex> |
38 | | #include <string> |
39 | | #include <thread> |
40 | | #include <vector> |
41 | | |
42 | | #include <glog/logging.h> |
43 | | #include <gtest/gtest.h> |
44 | | |
45 | | #include "yb/gutil/atomicops.h" |
46 | | #include "yb/gutil/bind.h" |
47 | | #include "yb/gutil/sysinfo.h" |
48 | | |
49 | | #include "yb/util/barrier.h" |
50 | | #include "yb/util/countdown_latch.h" |
51 | | #include "yb/util/locks.h" |
52 | | #include "yb/util/metrics.h" |
53 | | #include "yb/util/promise.h" |
54 | | #include "yb/util/random.h" |
55 | | #include "yb/util/scope_exit.h" |
56 | | #include "yb/util/test_macros.h" |
57 | | #include "yb/util/test_util.h" |
58 | | #include "yb/util/threadpool.h" |
59 | | #include "yb/util/trace.h" |
60 | | |
61 | | using std::atomic; |
62 | | using std::shared_ptr; |
63 | | using std::string; |
64 | | using std::thread; |
65 | | using std::unique_ptr; |
66 | | using std::vector; |
67 | | |
68 | | using strings::Substitute; |
69 | | DECLARE_bool(enable_tracing); |
70 | | |
71 | | using std::shared_ptr; |
72 | | |
73 | | namespace yb { |
74 | | |
75 | | namespace { |
76 | | |
77 | | static Status BuildMinMaxTestPool( |
78 | 4 | int min_threads, int max_threads, std::unique_ptr<ThreadPool>* pool) { |
79 | 4 | return ThreadPoolBuilder("test").set_min_threads(min_threads) |
80 | 4 | .set_max_threads(max_threads) |
81 | 4 | .Build(pool); |
82 | 4 | } |
83 | | |
84 | | } // anonymous namespace |
85 | | |
86 | | class TestThreadPool : public ::testing::Test { |
87 | | public: |
88 | 27 | TestThreadPool() { |
89 | 27 | FLAGS_enable_tracing = true; |
90 | 27 | } |
91 | | }; |
92 | | |
93 | 1 | TEST_F(TestThreadPool, TestNoTaskOpenClose) { |
94 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
95 | 1 | ASSERT_OK(BuildMinMaxTestPool(4, 4, &thread_pool)); |
96 | 1 | thread_pool->Shutdown(); |
97 | 1 | } |
98 | | |
99 | 5 | static void SimpleTaskMethod(int n, Atomic32 *counter) { |
100 | 188 | while (n--) { |
101 | 183 | base::subtle::NoBarrier_AtomicIncrement(counter, 1); |
102 | 183 | boost::detail::yield(n); |
103 | 183 | } |
104 | 5 | } |
105 | | |
106 | | class SimpleTask : public Runnable { |
107 | | public: |
108 | | SimpleTask(int n, Atomic32 *counter) |
109 | 1 | : n_(n), counter_(counter) { |
110 | 1 | } |
111 | | |
112 | 2 | void Run() override { |
113 | 2 | SimpleTaskMethod(n_, counter_); |
114 | 2 | } |
115 | | |
116 | | private: |
117 | | int n_; |
118 | | Atomic32 *counter_; |
119 | | }; |
120 | | |
121 | 1 | TEST_F(TestThreadPool, TestSimpleTasks) { |
122 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
123 | 1 | ASSERT_OK(BuildMinMaxTestPool(4, 4, &thread_pool)); |
124 | | |
125 | 1 | Atomic32 counter(0); |
126 | 1 | std::shared_ptr<Runnable> task(new SimpleTask(15, &counter)); |
127 | | |
128 | 1 | ASSERT_OK(thread_pool->SubmitFunc(std::bind(&SimpleTaskMethod, 10, &counter))); |
129 | 1 | ASSERT_OK(thread_pool->Submit(task)); |
130 | 1 | ASSERT_OK(thread_pool->SubmitFunc(std::bind(&SimpleTaskMethod, 20, &counter))); |
131 | 1 | ASSERT_OK(thread_pool->Submit(task)); |
132 | 1 | ASSERT_OK(thread_pool->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter))); |
133 | 1 | thread_pool->Wait(); |
134 | 1 | ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter)); |
135 | 1 | thread_pool->Shutdown(); |
136 | 1 | } |
137 | | |
138 | 1 | static void IssueTraceStatement() { |
139 | 1 | TRACE("hello from task"); |
140 | 1 | } |
141 | | |
142 | | // Test that the thread-local trace is propagated to tasks |
143 | | // submitted to the threadpool. |
144 | 1 | TEST_F(TestThreadPool, TestTracePropagation) { |
145 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
146 | 1 | ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool)); |
147 | | |
148 | 1 | scoped_refptr<Trace> t(new Trace); |
149 | 1 | { |
150 | 1 | ADOPT_TRACE(t.get()); |
151 | 1 | ASSERT_OK(thread_pool->SubmitFunc(&IssueTraceStatement)); |
152 | 1 | } |
153 | 1 | thread_pool->Wait(); |
154 | 1 | ASSERT_STR_CONTAINS(t->DumpToString(true), "hello from task"); |
155 | 1 | } |
156 | | |
157 | 1 | TEST_F(TestThreadPool, TestSubmitAfterShutdown) { |
158 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
159 | 1 | ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool)); |
160 | 1 | thread_pool->Shutdown(); |
161 | 1 | Status s = thread_pool->SubmitFunc(&IssueTraceStatement); |
162 | 1 | ASSERT_EQ("Service unavailable: The pool has been shut down.", |
163 | 1 | s.ToString(/* no file/line */ false)); |
164 | 1 | } |
165 | | |
166 | | class SlowTask : public Runnable { |
167 | | public: |
168 | | explicit SlowTask(CountDownLatch* latch) |
169 | 78 | : latch_(latch) { |
170 | 78 | } |
171 | | |
172 | 68 | void Run() override { |
173 | 68 | latch_->Wait(); |
174 | 68 | } |
175 | | |
176 | 78 | static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) { |
177 | 78 | return std::make_shared<SlowTask>(latch); |
178 | 78 | } |
179 | | private: |
180 | | CountDownLatch* latch_; |
181 | | }; |
182 | | |
183 | 1 | TEST_F(TestThreadPool, TestThreadPoolWithNoMinimum) { |
184 | 1 | MonoDelta idle_timeout = MonoDelta::FromMilliseconds(1); |
185 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
186 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
187 | 1 | .set_min_threads(0).set_max_threads(3) |
188 | 1 | .set_idle_timeout(idle_timeout).Build(&thread_pool)); |
189 | | // There are no threads to start with. |
190 | 1 | ASSERT_EQ(0, thread_pool->num_threads_); |
191 | | // We get up to 3 threads when submitting work. |
192 | 1 | CountDownLatch latch(1); |
193 | 1 | auto se = ScopeExit([&latch] { |
194 | 1 | latch.CountDown(); |
195 | 1 | }); |
196 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
197 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
198 | 1 | ASSERT_EQ(2, thread_pool->num_threads_); |
199 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
200 | 1 | ASSERT_EQ(3, thread_pool->num_threads_); |
201 | | // The 4th piece of work gets queued. |
202 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
203 | 1 | ASSERT_EQ(3, thread_pool->num_threads_); |
204 | | // Finish all work |
205 | 1 | latch.CountDown(); |
206 | 1 | thread_pool->Wait(); |
207 | 1 | ASSERT_EQ(0, thread_pool->active_threads_); |
208 | 1 | thread_pool->Shutdown(); |
209 | 1 | ASSERT_EQ(0, thread_pool->num_threads_); |
210 | 1 | } |
211 | | |
212 | 1 | TEST_F(TestThreadPool, TestThreadPoolWithNoMaxThreads) { |
213 | | // By default a threadpool's max_threads is set to the number of CPUs, so |
214 | | // this test submits more tasks than that to ensure that the number of CPUs |
215 | | // isn't some kind of upper bound. |
216 | 1 | const int kNumCPUs = base::NumCPUs(); |
217 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
218 | | |
219 | | // Build a threadpool with no limit on the maximum number of threads. |
220 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
221 | 1 | .set_max_threads(std::numeric_limits<int>::max()) |
222 | 1 | .Build(&thread_pool)); |
223 | 1 | CountDownLatch latch(1); |
224 | 1 | auto se = ScopeExit([&latch] { |
225 | 1 | latch.CountDown(); |
226 | 1 | }); |
227 | | |
228 | | // Submit tokenless tasks. Each should create a new thread. |
229 | 21 | for (int i = 0; i < kNumCPUs * 2; i++) { |
230 | 20 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
231 | 20 | } |
232 | 1 | ASSERT_EQ(kNumCPUs * 2, thread_pool->num_threads_); |
233 | | |
234 | | // Submit tasks on two tokens. Only two threads should be created. |
235 | 1 | unique_ptr<ThreadPoolToken> t1 = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL); |
236 | 1 | unique_ptr<ThreadPoolToken> t2 = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL); |
237 | 21 | for (int i = 0; i < kNumCPUs * 2; i++) { |
238 | 10 | ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get(); |
239 | 20 | ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch))); |
240 | 20 | } |
241 | 1 | ASSERT_EQ((kNumCPUs * 2) + 2, thread_pool->num_threads_); |
242 | | |
243 | | // Submit more tokenless tasks. Each should create a new thread. |
244 | 11 | for (int i = 0; i < kNumCPUs; i++) { |
245 | 10 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
246 | 10 | } |
247 | 1 | ASSERT_EQ((kNumCPUs * 3) + 2, thread_pool->num_threads_); |
248 | | |
249 | 1 | latch.CountDown(); |
250 | 1 | thread_pool->Wait(); |
251 | 1 | thread_pool->Shutdown(); |
252 | 1 | } |
253 | | |
254 | | // Regression test for a bug where a task is submitted exactly |
255 | | // as a thread is about to exit. Previously this could hang forever. |
256 | 1 | TEST_F(TestThreadPool, TestRace) { |
257 | 1 | alarm(10); |
258 | 1 | MonoDelta idle_timeout = MonoDelta::FromMicroseconds(1); |
259 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
260 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
261 | 1 | .set_min_threads(0).set_max_threads(1) |
262 | 1 | .set_idle_timeout(idle_timeout).Build(&thread_pool)); |
263 | | |
264 | 501 | for (int i = 0; i < 500; i++) { |
265 | 500 | CountDownLatch l(1); |
266 | 500 | ASSERT_OK(thread_pool->SubmitFunc([&l]() { l.CountDown(); })); |
267 | 500 | l.Wait(); |
268 | | // Sleeping a different amount in each iteration makes it more likely to hit |
269 | | // the bug. |
270 | 500 | SleepFor(MonoDelta::FromMicroseconds(i)); |
271 | 500 | } |
272 | 1 | } |
273 | | |
274 | 1 | TEST_F(TestThreadPool, TestVariableSizeThreadPool) { |
275 | 1 | MonoDelta idle_timeout = MonoDelta::FromMilliseconds(1); |
276 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
277 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
278 | 1 | .set_min_threads(1).set_max_threads(4).set_max_queue_size(1) |
279 | 1 | .set_idle_timeout(idle_timeout).Build(&thread_pool)); |
280 | | // There is 1 thread to start with. |
281 | 1 | ASSERT_EQ(1, thread_pool->num_threads_); |
282 | | // We get up to 4 threads when submitting work. |
283 | 1 | CountDownLatch latch(1); |
284 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
285 | 1 | ASSERT_EQ(1, thread_pool->num_threads_); |
286 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
287 | 1 | ASSERT_EQ(2, thread_pool->num_threads_); |
288 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
289 | 1 | ASSERT_EQ(3, thread_pool->num_threads_); |
290 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
291 | 1 | ASSERT_EQ(4, thread_pool->num_threads_); |
292 | | // The 5th piece of work gets queued. |
293 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
294 | 1 | ASSERT_EQ(4, thread_pool->num_threads_); |
295 | | // The 6th piece of work gets rejected. |
296 | 1 | ASSERT_NOK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
297 | 1 | ASSERT_EQ(4, thread_pool->num_threads_); |
298 | | // Finish all work |
299 | 1 | latch.CountDown(); |
300 | 1 | thread_pool->Wait(); |
301 | 1 | ASSERT_EQ(0, thread_pool->active_threads_); |
302 | 1 | thread_pool->Shutdown(); |
303 | 1 | ASSERT_EQ(0, thread_pool->num_threads_); |
304 | 1 | } |
305 | | |
306 | 1 | TEST_F(TestThreadPool, TestMaxQueueSize) { |
307 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
308 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
309 | 1 | .set_min_threads(1).set_max_threads(1) |
310 | 1 | .set_max_queue_size(1).Build(&thread_pool)); |
311 | | |
312 | 1 | CountDownLatch latch(1); |
313 | 1 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
314 | 1 | Status s = thread_pool->Submit(SlowTask::NewSlowTask(&latch)); |
315 | | // We race against the worker thread to re-enqueue. |
316 | | // If we get there first, we fail on the 2nd Submit(). |
317 | | // If the worker dequeues first, we fail on the 3rd. |
318 | 1 | if (s.ok()) { |
319 | 1 | s = thread_pool->Submit(SlowTask::NewSlowTask(&latch)); |
320 | 1 | } |
321 | 0 | CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); |
322 | 1 | latch.CountDown(); |
323 | 1 | thread_pool->Wait(); |
324 | 1 | thread_pool->Shutdown(); |
325 | 1 | } |
326 | | |
327 | 3 | void TestQueueSizeZero(int max_threads) { |
328 | 3 | std::unique_ptr<ThreadPool> thread_pool; |
329 | 3 | ASSERT_OK(ThreadPoolBuilder("test") |
330 | 3 | .set_min_threads(0).set_max_threads(max_threads) |
331 | 3 | .set_max_queue_size(0).Build(&thread_pool)); |
332 | | |
333 | 3 | CountDownLatch latch(1); |
334 | 9 | for (int i = 0; i < max_threads; i++) { |
335 | 6 | ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch))); |
336 | 6 | } |
337 | 3 | Status s = thread_pool->Submit(SlowTask::NewSlowTask(&latch)); |
338 | 6 | ASSERT_TRUE(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); |
339 | 3 | latch.CountDown(); |
340 | 3 | thread_pool->Wait(); |
341 | 3 | thread_pool->Shutdown(); |
342 | 3 | } |
343 | | |
344 | 1 | TEST_F(TestThreadPool, TestMaxQueueZero) { |
345 | 1 | TestQueueSizeZero(1); |
346 | 1 | TestQueueSizeZero(5); |
347 | 1 | } |
348 | | |
349 | | |
350 | 1 | TEST_F(TestThreadPool, TestMaxQueueZeroNoThreads) { |
351 | 1 | TestQueueSizeZero(0); |
352 | 1 | } |
353 | | |
354 | | // Test that setting a promise from another thread yields |
355 | | // a value on the current thread. |
356 | 1 | TEST_F(TestThreadPool, TestPromises) { |
357 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
358 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
359 | 1 | .set_min_threads(1).set_max_threads(1) |
360 | 1 | .set_max_queue_size(1).Build(&thread_pool)); |
361 | | |
362 | 1 | Promise<int> my_promise; |
363 | 1 | ASSERT_OK(thread_pool->SubmitClosure( |
364 | 1 | Bind(&Promise<int>::Set, Unretained(&my_promise), 5))); |
365 | 1 | ASSERT_EQ(5, my_promise.Get()); |
366 | 1 | thread_pool->Shutdown(); |
367 | 1 | } |
368 | | |
369 | | |
370 | | METRIC_DEFINE_entity(test_entity); |
371 | | METRIC_DEFINE_coarse_histogram(test_entity, queue_length, "queue length", |
372 | | MetricUnit::kTasks, "queue length"); |
373 | | |
374 | | METRIC_DEFINE_coarse_histogram(test_entity, queue_time, "queue time", |
375 | | MetricUnit::kMicroseconds, "queue time"); |
376 | | |
377 | | METRIC_DEFINE_coarse_histogram(test_entity, run_time, "run time", |
378 | | MetricUnit::kMicroseconds, "run time"); |
379 | | |
380 | 1 | TEST_F(TestThreadPool, TestMetrics) { |
381 | 1 | MetricRegistry registry; |
382 | 1 | vector<ThreadPoolMetrics> all_metrics; |
383 | 4 | for (int i = 0; i < 3; i++) { |
384 | 3 | scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate( |
385 | 3 | ®istry, Substitute("test $0", i)); |
386 | 3 | all_metrics.emplace_back(ThreadPoolMetrics{ |
387 | 3 | METRIC_queue_length.Instantiate(entity), |
388 | 3 | METRIC_queue_time.Instantiate(entity), |
389 | 3 | METRIC_run_time.Instantiate(entity) |
390 | 3 | }); |
391 | 3 | } |
392 | | |
393 | | // Enable metrics for the thread pool. |
394 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
395 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
396 | 1 | .set_min_threads(1).set_max_threads(1) |
397 | 1 | .set_metrics(all_metrics[0]) |
398 | 1 | .Build(&thread_pool)); |
399 | | |
400 | 1 | unique_ptr<ThreadPoolToken> t1 = thread_pool->NewTokenWithMetrics( |
401 | 1 | ThreadPool::ExecutionMode::SERIAL, all_metrics[1]); |
402 | 1 | unique_ptr<ThreadPoolToken> t2 = thread_pool->NewTokenWithMetrics( |
403 | 1 | ThreadPool::ExecutionMode::SERIAL, all_metrics[2]); |
404 | | |
405 | | // Submit once to t1, twice to t2, and three times without a token. |
406 | 1 | ASSERT_OK(t1->SubmitFunc([](){})); |
407 | 1 | ASSERT_OK(t2->SubmitFunc([](){})); |
408 | 1 | ASSERT_OK(t2->SubmitFunc([](){})); |
409 | 1 | ASSERT_OK(thread_pool->SubmitFunc([](){})); |
410 | 1 | ASSERT_OK(thread_pool->SubmitFunc([](){})); |
411 | 1 | ASSERT_OK(thread_pool->SubmitFunc([](){})); |
412 | 1 | thread_pool->Wait(); |
413 | | |
414 | | // The total counts should reflect the number of submissions to each token. |
415 | 1 | ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount()); |
416 | 1 | ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount()); |
417 | 1 | ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount()); |
418 | 1 | ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount()); |
419 | 1 | ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount()); |
420 | 1 | ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount()); |
421 | | |
422 | | // And the counts on the pool-wide metrics should reflect all submissions. |
423 | 1 | ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount()); |
424 | 1 | ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount()); |
425 | 1 | ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount()); |
426 | 1 | } |
427 | | |
428 | | // For test cases that should run with both kinds of tokens. |
429 | | class TestThreadPoolTokenTypes : public TestThreadPool, |
430 | | public testing::WithParamInterface<ThreadPool::ExecutionMode> {}; |
431 | | |
432 | | INSTANTIATE_TEST_CASE_P(Tokens, TestThreadPoolTokenTypes, |
433 | | ::testing::Values(ThreadPool::ExecutionMode::SERIAL, |
434 | | ThreadPool::ExecutionMode::CONCURRENT)); |
435 | | |
436 | | |
437 | 2 | TEST_P(TestThreadPoolTokenTypes, TestTokenSubmitAndWait) { |
438 | 2 | std::unique_ptr<ThreadPool> thread_pool; |
439 | 2 | ASSERT_OK(ThreadPoolBuilder("test") |
440 | 2 | .Build(&thread_pool)); |
441 | 2 | unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(GetParam()); |
442 | 2 | int i = 0; |
443 | 2 | ASSERT_OK(t->SubmitFunc([&]() { |
444 | 2 | SleepFor(MonoDelta::FromMilliseconds(1)); |
445 | 2 | i++; |
446 | 2 | })); |
447 | 2 | t->Wait(); |
448 | 2 | ASSERT_EQ(1, i); |
449 | 2 | } |
450 | | |
451 | 1 | TEST_F(TestThreadPool, TestTokenSubmitsProcessedSerially) { |
452 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
453 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
454 | 1 | .Build(&thread_pool)); |
455 | 1 | unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL); |
456 | 1 | Random r(SeedRandom()); |
457 | 1 | string result; |
458 | 6 | for (char c = 'a'; c < 'f'; c++) { |
459 | | // Sleep a little first so that there's a higher chance of out-of-order |
460 | | // appends if the submissions did execute in parallel. |
461 | 5 | int sleep_ms = r.Next() % 5; |
462 | 5 | ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() { |
463 | 5 | SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
464 | 5 | result += c; |
465 | 5 | })); |
466 | 5 | } |
467 | 1 | t->Wait(); |
468 | 1 | ASSERT_EQ("abcde", result); |
469 | 1 | } |
470 | | |
471 | 2 | TEST_P(TestThreadPoolTokenTypes, TestTokenSubmitsProcessedConcurrently) { |
472 | 2 | const int kNumTokens = 5; |
473 | 2 | std::unique_ptr<ThreadPool> thread_pool; |
474 | 2 | ASSERT_OK(ThreadPoolBuilder("test") |
475 | 2 | .set_max_threads(kNumTokens) |
476 | 2 | .Build(&thread_pool)); |
477 | 2 | vector<unique_ptr<ThreadPoolToken>> tokens; |
478 | | |
479 | | // A violation to the tested invariant would yield a deadlock, so let's set |
480 | | // up an alarm to bail us out. |
481 | 2 | alarm(60); |
482 | 2 | auto se = ScopeExit([] { |
483 | 2 | alarm(0); // Disable alarm on test exit. |
484 | 2 | }); |
485 | 2 | shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1); |
486 | 12 | for (int i = 0; i < kNumTokens; i++) { |
487 | 10 | tokens.emplace_back(thread_pool->NewToken(GetParam())); |
488 | 10 | ASSERT_OK(tokens.back()->SubmitFunc([b]() { |
489 | 10 | b->Wait(); |
490 | 10 | })); |
491 | 10 | } |
492 | | |
493 | | // This will deadlock if the above tasks weren't all running concurrently. |
494 | 2 | b->Wait(); |
495 | 2 | } |
496 | | |
497 | 1 | TEST_F(TestThreadPool, TestTokenSubmitsNonSequential) { |
498 | 1 | const int kNumSubmissions = 5; |
499 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
500 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
501 | 1 | .set_max_threads(kNumSubmissions) |
502 | 1 | .Build(&thread_pool)); |
503 | | |
504 | | // A violation to the tested invariant would yield a deadlock, so let's set |
505 | | // up an alarm to bail us out. |
506 | 1 | alarm(60); |
507 | 1 | auto se = ScopeExit([] { |
508 | 1 | alarm(0); // Disable alarm on test exit. |
509 | 1 | }); |
510 | 1 | shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1); |
511 | 1 | unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT); |
512 | 6 | for (int i = 0; i < kNumSubmissions; i++) { |
513 | 5 | ASSERT_OK(t->SubmitFunc([b]() { |
514 | 5 | b->Wait(); |
515 | 5 | })); |
516 | 5 | } |
517 | | |
518 | | // This will deadlock if the above tasks weren't all running concurrently. |
519 | 1 | b->Wait(); |
520 | 1 | } |
521 | | |
522 | 2 | TEST_P(TestThreadPoolTokenTypes, TestTokenShutdown) { |
523 | 2 | std::unique_ptr<ThreadPool> thread_pool; |
524 | 2 | ASSERT_OK(ThreadPoolBuilder("test") |
525 | 2 | .set_max_threads(4) |
526 | 2 | .Build(&thread_pool)); |
527 | | |
528 | 2 | unique_ptr<ThreadPoolToken> t1(thread_pool->NewToken(GetParam())); |
529 | 2 | unique_ptr<ThreadPoolToken> t2(thread_pool->NewToken(GetParam())); |
530 | 2 | CountDownLatch l1(1); |
531 | 2 | CountDownLatch l2(1); |
532 | | |
533 | | // A violation to the tested invariant would yield a deadlock, so let's set |
534 | | // up an alarm to bail us out. |
535 | 2 | alarm(60); |
536 | 2 | auto se = ScopeExit([] { |
537 | 2 | alarm(0); // Disable alarm on test exit. |
538 | 2 | }); |
539 | | |
540 | 8 | for (int i = 0; i < 3; i++) { |
541 | 6 | ASSERT_OK(t1->SubmitFunc([&]() { |
542 | 6 | l1.Wait(); |
543 | 6 | })); |
544 | 6 | } |
545 | 8 | for (int i = 0; i < 3; i++) { |
546 | 6 | ASSERT_OK(t2->SubmitFunc([&]() { |
547 | 6 | l2.Wait(); |
548 | 6 | })); |
549 | 6 | } |
550 | | |
551 | | // Unblock all of t1's tasks, but not t2's tasks. |
552 | 2 | l1.CountDown(); |
553 | | |
554 | | // If this also waited for t2's tasks, it would deadlock. |
555 | 2 | t1->Shutdown(); |
556 | | |
557 | | // We can no longer submit to t1 but we can still submit to t2. |
558 | 2 | ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable()); |
559 | 2 | ASSERT_OK(t2->SubmitFunc([](){})); |
560 | | |
561 | | // Unblock t2's tasks. |
562 | 2 | l2.CountDown(); |
563 | 2 | t2->Shutdown(); |
564 | 2 | } |
565 | | |
566 | 2 | TEST_P(TestThreadPoolTokenTypes, TestTokenWaitForAll) { |
567 | 2 | const int kNumTokens = 3; |
568 | 2 | const int kNumSubmissions = 20; |
569 | 2 | std::unique_ptr<ThreadPool> thread_pool; |
570 | 2 | ASSERT_OK(ThreadPoolBuilder("test") |
571 | 2 | .Build(&thread_pool)); |
572 | 2 | Random r(SeedRandom()); |
573 | 2 | vector<unique_ptr<ThreadPoolToken>> tokens; |
574 | 8 | for (int i = 0; i < kNumTokens; i++) { |
575 | 6 | tokens.emplace_back(thread_pool->NewToken(GetParam())); |
576 | 6 | } |
577 | | |
578 | 2 | atomic<int32_t> v(0); |
579 | 42 | for (int i = 0; i < kNumSubmissions; i++) { |
580 | | // Sleep a little first to raise the likelihood of the test thread |
581 | | // reaching Wait() before the submissions finish. |
582 | 40 | int sleep_ms = r.Next() % 5; |
583 | | |
584 | 40 | auto task = [&v, sleep_ms]() { |
585 | 40 | SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
586 | 40 | v++; |
587 | 40 | }; |
588 | | |
589 | | // Half of the submissions will be token-less, and half will use a token. |
590 | 40 | if (i % 2 == 0) { |
591 | 20 | ASSERT_OK(thread_pool->SubmitFunc(task)); |
592 | 20 | } else { |
593 | 20 | int token_idx = r.Next() % tokens.size(); |
594 | 20 | ASSERT_OK(tokens[token_idx]->SubmitFunc(task)); |
595 | 20 | } |
596 | 40 | } |
597 | 2 | thread_pool->Wait(); |
598 | 2 | ASSERT_EQ(kNumSubmissions, v); |
599 | 2 | } |
600 | | |
601 | 1 | TEST_F(TestThreadPool, TestFuzz) { |
602 | 1 | const int kNumOperations = 1000; |
603 | 1 | Random r(SeedRandom()); |
604 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
605 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
606 | 1 | .Build(&thread_pool)); |
607 | 1 | vector<unique_ptr<ThreadPoolToken>> tokens; |
608 | | |
609 | 1.00k | for (int i = 0; i < kNumOperations; i++) { |
610 | | // Operation distribution: |
611 | | // |
612 | | // - Submit without a token: 40% |
613 | | // - Submit with a randomly selected token: 35% |
614 | | // - Allocate a new token: 10% |
615 | | // - Wait on a randomly selected token: 7% |
616 | | // - Shutdown a randomly selected token: 4% |
617 | | // - Deallocate a randomly selected token: 2% |
618 | | // - Wait for all submissions: 2% |
619 | 1.00k | int op = r.Next() % 100; |
620 | 1.00k | if (op < 40) { |
621 | | // Submit without a token. |
622 | 382 | int sleep_ms = r.Next() % 5; |
623 | 382 | ASSERT_OK(thread_pool->SubmitFunc([sleep_ms]() { |
624 | | // Sleep a little first to increase task overlap. |
625 | 382 | SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
626 | 382 | })); |
627 | 618 | } else if (op < 75) { |
628 | | // Submit with a randomly selected token. |
629 | 353 | if (tokens.empty()) { |
630 | 0 | continue; |
631 | 0 | } |
632 | 353 | int sleep_ms = r.Next() % 5; |
633 | 353 | int token_idx = r.Next() % tokens.size(); |
634 | 228 | Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() { |
635 | | // Sleep a little first to increase task overlap. |
636 | 228 | SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
637 | 228 | }); |
638 | 353 | ASSERT_TRUE(s.ok() || s.IsServiceUnavailable()); |
639 | 265 | } else if (op < 85) { |
640 | | // Allocate a token with a randomly selected policy. |
641 | 100 | ThreadPool::ExecutionMode mode = r.Next() % 2 ? |
642 | 56 | ThreadPool::ExecutionMode::SERIAL : |
643 | 44 | ThreadPool::ExecutionMode::CONCURRENT; |
644 | 100 | tokens.emplace_back(thread_pool->NewToken(mode)); |
645 | 165 | } else if (op < 92) { |
646 | | // Wait on a randomly selected token. |
647 | 76 | if (tokens.empty()) { |
648 | 0 | continue; |
649 | 0 | } |
650 | 76 | int token_idx = r.Next() % tokens.size(); |
651 | 76 | tokens[token_idx]->Wait(); |
652 | 89 | } else if (op < 96) { |
653 | | // Shutdown a randomly selected token. |
654 | 41 | if (tokens.empty()) { |
655 | 0 | continue; |
656 | 0 | } |
657 | 41 | int token_idx = r.Next() % tokens.size(); |
658 | 41 | tokens[token_idx]->Shutdown(); |
659 | 48 | } else if (op < 98) { |
660 | | // Deallocate a randomly selected token. |
661 | 20 | if (tokens.empty()) { |
662 | 0 | continue; |
663 | 0 | } |
664 | 20 | auto it = tokens.begin(); |
665 | 20 | int token_idx = r.Next() % tokens.size(); |
666 | 20 | std::advance(it, token_idx); |
667 | 20 | tokens.erase(it); |
668 | 28 | } else { |
669 | | // Wait on everything. |
670 | 28 | ASSERT_LT(op, 100); |
671 | 28 | ASSERT_GE(op, 98); |
672 | 28 | thread_pool->Wait(); |
673 | 28 | } |
674 | 1.00k | } |
675 | | |
676 | | // Some test runs will shut down the pool before the tokens, and some won't. |
677 | | // Either way should be safe. |
678 | 1 | if (r.Next() % 2 == 0) { |
679 | 1 | thread_pool->Shutdown(); |
680 | 1 | } |
681 | 1 | } |
682 | | |
683 | 2 | TEST_P(TestThreadPoolTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) { |
684 | 2 | std::unique_ptr<ThreadPool> thread_pool; |
685 | 2 | ASSERT_OK(ThreadPoolBuilder("test") |
686 | 2 | .set_min_threads(1) |
687 | 2 | .set_max_threads(1) |
688 | 2 | .set_max_queue_size(1) |
689 | 2 | .Build(&thread_pool)); |
690 | | |
691 | 2 | CountDownLatch latch(1); |
692 | 2 | unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(GetParam()); |
693 | 2 | auto se = ScopeExit([&latch] { |
694 | 2 | latch.CountDown(); |
695 | 2 | }); |
696 | | // We will be able to submit two tasks: one for max_threads == 1 and one for |
697 | | // max_queue_size == 1. |
698 | 2 | ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch))); |
699 | 2 | ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch))); |
700 | 2 | Status s = t->Submit(SlowTask::NewSlowTask(&latch)); |
701 | 2 | ASSERT_TRUE(s.IsServiceUnavailable()); |
702 | 2 | } |
703 | | |
704 | 1 | TEST_F(TestThreadPool, TestTokenConcurrency) { |
705 | 1 | const int kNumTokens = 20; |
706 | 1 | const int kTestRuntimeSecs = 1; |
707 | 1 | const int kCycleThreads = 2; |
708 | 1 | const int kShutdownThreads = 2; |
709 | 1 | const int kWaitThreads = 2; |
710 | 1 | const int kSubmitThreads = 8; |
711 | | |
712 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
713 | 1 | ASSERT_OK(ThreadPoolBuilder("test") |
714 | 1 | .Build(&thread_pool)); |
715 | 1 | vector<shared_ptr<ThreadPoolToken>> tokens; |
716 | 1 | Random rng(SeedRandom()); |
717 | | |
718 | | // Protects 'tokens' and 'rng'. |
719 | 1 | simple_spinlock lock; |
720 | | |
721 | | // Fetch a token from 'tokens' at random. |
722 | 11.7k | auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> { |
723 | 11.7k | std::lock_guard<simple_spinlock> l(lock); |
724 | 11.7k | int idx = rng.Uniform(kNumTokens); |
725 | 11.7k | return tokens[idx]; |
726 | 11.7k | }; |
727 | | |
728 | | // Preallocate all of the tokens. |
729 | 21 | for (int i = 0; i < kNumTokens; i++) { |
730 | 20 | ThreadPool::ExecutionMode mode; |
731 | 20 | { |
732 | 20 | std::lock_guard<simple_spinlock> l(lock); |
733 | 20 | mode = rng.Next() % 2 ? |
734 | 11 | ThreadPool::ExecutionMode::SERIAL : |
735 | 9 | ThreadPool::ExecutionMode::CONCURRENT; |
736 | 20 | } |
737 | 20 | tokens.emplace_back(thread_pool->NewToken(mode).release()); |
738 | 20 | } |
739 | | |
740 | 1 | atomic<int64_t> total_num_tokens_cycled(0); |
741 | 1 | atomic<int64_t> total_num_tokens_shutdown(0); |
742 | 1 | atomic<int64_t> total_num_tokens_waited(0); |
743 | 1 | atomic<int64_t> total_num_tokens_submitted(0); |
744 | | |
745 | 1 | CountDownLatch latch(1); |
746 | 1 | vector<thread> threads; |
747 | | |
748 | 3 | for (int i = 0; i < kCycleThreads; i++) { |
749 | | // Pick a token at random and replace it. |
750 | | // |
751 | | // The replaced token is only destroyed when the last ref is dropped, |
752 | | // possibly by another thread. |
753 | 2 | threads.emplace_back([&]() { |
754 | 2 | int num_tokens_cycled = 0; |
755 | 3.94k | while (latch.count()) { |
756 | 3.94k | { |
757 | 3.94k | std::lock_guard<simple_spinlock> l(lock); |
758 | 3.94k | int idx = rng.Uniform(kNumTokens); |
759 | 3.94k | ThreadPool::ExecutionMode mode = rng.Next() % 2 ? |
760 | 1.93k | ThreadPool::ExecutionMode::SERIAL : |
761 | 2.01k | ThreadPool::ExecutionMode::CONCURRENT; |
762 | 3.94k | tokens[idx] = shared_ptr<ThreadPoolToken>(thread_pool->NewToken(mode).release()); |
763 | 3.94k | } |
764 | 3.94k | num_tokens_cycled++; |
765 | | |
766 | | // Sleep a bit, otherwise this thread outpaces the other threads and |
767 | | // nothing interesting happens to most tokens. |
768 | 3.94k | SleepFor(MonoDelta::FromMicroseconds(10)); |
769 | 3.94k | } |
770 | 2 | total_num_tokens_cycled += num_tokens_cycled; |
771 | 2 | }); |
772 | 2 | } |
773 | | |
774 | 3 | for (int i = 0; i < kShutdownThreads; i++) { |
775 | | // Pick a token at random and shut it down. Submitting a task to a shut |
776 | | // down token will return a ServiceUnavailable error. |
777 | 2 | threads.emplace_back([&]() { |
778 | 2 | int num_tokens_shutdown = 0; |
779 | 1.74k | while (latch.count()) { |
780 | 1.74k | GetRandomToken()->Shutdown(); |
781 | 1.74k | num_tokens_shutdown++; |
782 | 1.74k | } |
783 | 2 | total_num_tokens_shutdown += num_tokens_shutdown; |
784 | 2 | }); |
785 | 2 | } |
786 | | |
787 | 3 | for (int i = 0; i < kWaitThreads; i++) { |
788 | | // Pick a token at random and wait for any outstanding tasks. |
789 | 2 | threads.emplace_back([&]() { |
790 | 2 | int num_tokens_waited = 0; |
791 | 2.51k | while (latch.count()) { |
792 | 2.51k | GetRandomToken()->Wait(); |
793 | 2.51k | num_tokens_waited++; |
794 | 2.51k | } |
795 | 2 | total_num_tokens_waited += num_tokens_waited; |
796 | 2 | }); |
797 | 2 | } |
798 | | |
799 | 9 | for (int i = 0; i < kSubmitThreads; i++) { |
800 | | // Pick a token at random and submit a task to it. |
801 | 8 | threads.emplace_back([&]() { |
802 | 8 | int num_tokens_submitted = 0; |
803 | 8 | Random rng(SeedRandom()); |
804 | 7.50k | while (latch.count()) { |
805 | 7.49k | int sleep_ms = rng.Next() % 5; |
806 | 2.13k | Status s = GetRandomToken()->SubmitFunc([sleep_ms]() { |
807 | | // Sleep a little first so that tasks are running during other events. |
808 | 2.13k | SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
809 | 2.13k | }); |
810 | 7.49k | ASSERT_TRUE(s.ok() || s.IsServiceUnavailable()); |
811 | 7.49k | num_tokens_submitted++; |
812 | 7.49k | } |
813 | 8 | total_num_tokens_submitted += num_tokens_submitted; |
814 | 8 | }); |
815 | 8 | } |
816 | | |
817 | 1 | SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs)); |
818 | 1 | latch.CountDown(); |
819 | 14 | for (auto& t : threads) { |
820 | 14 | t.join(); |
821 | 14 | } |
822 | | |
823 | 1 | LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1", |
824 | 1 | kCycleThreads, total_num_tokens_cycled.load()); |
825 | 1 | LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1", |
826 | 1 | kShutdownThreads, total_num_tokens_shutdown.load()); |
827 | 1 | LOG(INFO) << Substitute("Tokens waited ($0 threads): $1", |
828 | 1 | kWaitThreads, total_num_tokens_waited.load()); |
829 | 1 | LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1", |
830 | 1 | kSubmitThreads, total_num_tokens_submitted.load()); |
831 | 1 | } |
832 | | |
833 | | } // namespace yb |