YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/threadpool-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <atomic>
34
#include <functional>
35
#include <limits>
36
#include <memory>
37
#include <mutex>
38
#include <string>
39
#include <thread>
40
#include <vector>
41
42
#include <glog/logging.h>
43
#include <gtest/gtest.h>
44
45
#include "yb/gutil/atomicops.h"
46
#include "yb/gutil/bind.h"
47
#include "yb/gutil/sysinfo.h"
48
49
#include "yb/util/barrier.h"
50
#include "yb/util/countdown_latch.h"
51
#include "yb/util/locks.h"
52
#include "yb/util/metrics.h"
53
#include "yb/util/promise.h"
54
#include "yb/util/random.h"
55
#include "yb/util/scope_exit.h"
56
#include "yb/util/test_macros.h"
57
#include "yb/util/test_util.h"
58
#include "yb/util/threadpool.h"
59
#include "yb/util/trace.h"
60
61
using std::atomic;
62
using std::shared_ptr;
63
using std::string;
64
using std::thread;
65
using std::unique_ptr;
66
using std::vector;
67
68
using strings::Substitute;
69
DECLARE_bool(enable_tracing);
70
71
using std::shared_ptr;
72
73
namespace yb {
74
75
namespace {
76
77
static Status BuildMinMaxTestPool(
78
4
    int min_threads, int max_threads, std::unique_ptr<ThreadPool>* pool) {
79
4
  return ThreadPoolBuilder("test").set_min_threads(min_threads)
80
4
                                  .set_max_threads(max_threads)
81
4
                                  .Build(pool);
82
4
}
83
84
} // anonymous namespace
85
86
class TestThreadPool : public ::testing::Test {
87
 public:
88
27
  TestThreadPool() {
89
27
    FLAGS_enable_tracing = true;
90
27
  }
91
};
92
93
1
TEST_F(TestThreadPool, TestNoTaskOpenClose) {
94
1
  std::unique_ptr<ThreadPool> thread_pool;
95
1
  ASSERT_OK(BuildMinMaxTestPool(4, 4, &thread_pool));
96
1
  thread_pool->Shutdown();
97
1
}
98
99
5
static void SimpleTaskMethod(int n, Atomic32 *counter) {
100
188
  while (n--) {
101
183
    base::subtle::NoBarrier_AtomicIncrement(counter, 1);
102
183
    boost::detail::yield(n);
103
183
  }
104
5
}
105
106
class SimpleTask : public Runnable {
107
 public:
108
  SimpleTask(int n, Atomic32 *counter)
109
1
    : n_(n), counter_(counter) {
110
1
  }
111
112
2
  void Run() override {
113
2
    SimpleTaskMethod(n_, counter_);
114
2
  }
115
116
 private:
117
  int n_;
118
  Atomic32 *counter_;
119
};
120
121
1
TEST_F(TestThreadPool, TestSimpleTasks) {
122
1
  std::unique_ptr<ThreadPool> thread_pool;
123
1
  ASSERT_OK(BuildMinMaxTestPool(4, 4, &thread_pool));
124
125
1
  Atomic32 counter(0);
126
1
  std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
127
128
1
  ASSERT_OK(thread_pool->SubmitFunc(std::bind(&SimpleTaskMethod, 10, &counter)));
129
1
  ASSERT_OK(thread_pool->Submit(task));
130
1
  ASSERT_OK(thread_pool->SubmitFunc(std::bind(&SimpleTaskMethod, 20, &counter)));
131
1
  ASSERT_OK(thread_pool->Submit(task));
132
1
  ASSERT_OK(thread_pool->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter)));
133
1
  thread_pool->Wait();
134
1
  ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
135
1
  thread_pool->Shutdown();
136
1
}
137
138
1
static void IssueTraceStatement() {
139
1
  TRACE("hello from task");
140
1
}
141
142
// Test that the thread-local trace is propagated to tasks
143
// submitted to the threadpool.
144
1
TEST_F(TestThreadPool, TestTracePropagation) {
145
1
  std::unique_ptr<ThreadPool> thread_pool;
146
1
  ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool));
147
148
1
  scoped_refptr<Trace> t(new Trace);
149
1
  {
150
1
    ADOPT_TRACE(t.get());
151
1
    ASSERT_OK(thread_pool->SubmitFunc(&IssueTraceStatement));
152
1
  }
153
1
  thread_pool->Wait();
154
1
  ASSERT_STR_CONTAINS(t->DumpToString(true), "hello from task");
155
1
}
156
157
1
TEST_F(TestThreadPool, TestSubmitAfterShutdown) {
158
1
  std::unique_ptr<ThreadPool> thread_pool;
159
1
  ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool));
160
1
  thread_pool->Shutdown();
161
1
  Status s = thread_pool->SubmitFunc(&IssueTraceStatement);
162
1
  ASSERT_EQ("Service unavailable: The pool has been shut down.",
163
1
            s.ToString(/* no file/line */ false));
164
1
}
165
166
class SlowTask : public Runnable {
167
 public:
168
  explicit SlowTask(CountDownLatch* latch)
169
78
    : latch_(latch) {
170
78
  }
171
172
68
  void Run() override {
173
68
    latch_->Wait();
174
68
  }
175
176
78
  static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
177
78
    return std::make_shared<SlowTask>(latch);
178
78
  }
179
 private:
180
  CountDownLatch* latch_;
181
};
182
183
1
TEST_F(TestThreadPool, TestThreadPoolWithNoMinimum) {
184
1
  MonoDelta idle_timeout = MonoDelta::FromMilliseconds(1);
185
1
  std::unique_ptr<ThreadPool> thread_pool;
186
1
  ASSERT_OK(ThreadPoolBuilder("test")
187
1
      .set_min_threads(0).set_max_threads(3)
188
1
      .set_idle_timeout(idle_timeout).Build(&thread_pool));
189
  // There are no threads to start with.
190
1
  ASSERT_EQ(0, thread_pool->num_threads_);
191
  // We get up to 3 threads when submitting work.
192
1
  CountDownLatch latch(1);
193
1
  auto se = ScopeExit([&latch] {
194
1
      latch.CountDown();
195
1
  });
196
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
197
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
198
1
  ASSERT_EQ(2, thread_pool->num_threads_);
199
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
200
1
  ASSERT_EQ(3, thread_pool->num_threads_);
201
  // The 4th piece of work gets queued.
202
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
203
1
  ASSERT_EQ(3, thread_pool->num_threads_);
204
  // Finish all work
205
1
  latch.CountDown();
206
1
  thread_pool->Wait();
207
1
  ASSERT_EQ(0, thread_pool->active_threads_);
208
1
  thread_pool->Shutdown();
209
1
  ASSERT_EQ(0, thread_pool->num_threads_);
210
1
}
211
212
1
TEST_F(TestThreadPool, TestThreadPoolWithNoMaxThreads) {
213
  // By default a threadpool's max_threads is set to the number of CPUs, so
214
  // this test submits more tasks than that to ensure that the number of CPUs
215
  // isn't some kind of upper bound.
216
1
  const int kNumCPUs = base::NumCPUs();
217
1
  std::unique_ptr<ThreadPool> thread_pool;
218
219
  // Build a threadpool with no limit on the maximum number of threads.
220
1
  ASSERT_OK(ThreadPoolBuilder("test")
221
1
                .set_max_threads(std::numeric_limits<int>::max())
222
1
                .Build(&thread_pool));
223
1
  CountDownLatch latch(1);
224
1
  auto se = ScopeExit([&latch] {
225
1
    latch.CountDown();
226
1
    });
227
228
  // Submit tokenless tasks. Each should create a new thread.
229
21
  for (int i = 0; i < kNumCPUs * 2; i++) {
230
20
    ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
231
20
  }
232
1
  ASSERT_EQ(kNumCPUs * 2, thread_pool->num_threads_);
233
234
  // Submit tasks on two tokens. Only two threads should be created.
235
1
  unique_ptr<ThreadPoolToken> t1 = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL);
236
1
  unique_ptr<ThreadPoolToken> t2 = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL);
237
21
  for (int i = 0; i < kNumCPUs * 2; i++) {
238
10
    ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
239
20
    ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
240
20
  }
241
1
  ASSERT_EQ((kNumCPUs * 2) + 2, thread_pool->num_threads_);
242
243
  // Submit more tokenless tasks. Each should create a new thread.
244
11
  for (int i = 0; i < kNumCPUs; i++) {
245
10
    ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
246
10
  }
247
1
  ASSERT_EQ((kNumCPUs * 3) + 2, thread_pool->num_threads_);
248
249
1
  latch.CountDown();
250
1
  thread_pool->Wait();
251
1
  thread_pool->Shutdown();
252
1
}
253
254
// Regression test for a bug where a task is submitted exactly
255
// as a thread is about to exit. Previously this could hang forever.
256
1
TEST_F(TestThreadPool, TestRace) {
257
1
  alarm(10);
258
1
  MonoDelta idle_timeout = MonoDelta::FromMicroseconds(1);
259
1
  std::unique_ptr<ThreadPool> thread_pool;
260
1
  ASSERT_OK(ThreadPoolBuilder("test")
261
1
      .set_min_threads(0).set_max_threads(1)
262
1
      .set_idle_timeout(idle_timeout).Build(&thread_pool));
263
264
501
  for (int i = 0; i < 500; i++) {
265
500
    CountDownLatch l(1);
266
500
    ASSERT_OK(thread_pool->SubmitFunc([&l]() { l.CountDown(); }));
267
500
    l.Wait();
268
    // Sleeping a different amount in each iteration makes it more likely to hit
269
    // the bug.
270
500
    SleepFor(MonoDelta::FromMicroseconds(i));
271
500
  }
272
1
}
273
274
1
TEST_F(TestThreadPool, TestVariableSizeThreadPool) {
275
1
  MonoDelta idle_timeout = MonoDelta::FromMilliseconds(1);
276
1
  std::unique_ptr<ThreadPool> thread_pool;
277
1
  ASSERT_OK(ThreadPoolBuilder("test")
278
1
      .set_min_threads(1).set_max_threads(4).set_max_queue_size(1)
279
1
      .set_idle_timeout(idle_timeout).Build(&thread_pool));
280
  // There is 1 thread to start with.
281
1
  ASSERT_EQ(1, thread_pool->num_threads_);
282
  // We get up to 4 threads when submitting work.
283
1
  CountDownLatch latch(1);
284
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
285
1
  ASSERT_EQ(1, thread_pool->num_threads_);
286
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
287
1
  ASSERT_EQ(2, thread_pool->num_threads_);
288
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
289
1
  ASSERT_EQ(3, thread_pool->num_threads_);
290
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
291
1
  ASSERT_EQ(4, thread_pool->num_threads_);
292
  // The 5th piece of work gets queued.
293
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
294
1
  ASSERT_EQ(4, thread_pool->num_threads_);
295
  // The 6th piece of work gets rejected.
296
1
  ASSERT_NOK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
297
1
  ASSERT_EQ(4, thread_pool->num_threads_);
298
  // Finish all work
299
1
  latch.CountDown();
300
1
  thread_pool->Wait();
301
1
  ASSERT_EQ(0, thread_pool->active_threads_);
302
1
  thread_pool->Shutdown();
303
1
  ASSERT_EQ(0, thread_pool->num_threads_);
304
1
}
305
306
1
TEST_F(TestThreadPool, TestMaxQueueSize) {
307
1
  std::unique_ptr<ThreadPool> thread_pool;
308
1
  ASSERT_OK(ThreadPoolBuilder("test")
309
1
      .set_min_threads(1).set_max_threads(1)
310
1
      .set_max_queue_size(1).Build(&thread_pool));
311
312
1
  CountDownLatch latch(1);
313
1
  ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
314
1
  Status s = thread_pool->Submit(SlowTask::NewSlowTask(&latch));
315
  // We race against the worker thread to re-enqueue.
316
  // If we get there first, we fail on the 2nd Submit().
317
  // If the worker dequeues first, we fail on the 3rd.
318
1
  if (s.ok()) {
319
1
    s = thread_pool->Submit(SlowTask::NewSlowTask(&latch));
320
1
  }
321
0
  CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
322
1
  latch.CountDown();
323
1
  thread_pool->Wait();
324
1
  thread_pool->Shutdown();
325
1
}
326
327
3
void TestQueueSizeZero(int max_threads) {
328
3
  std::unique_ptr<ThreadPool> thread_pool;
329
3
  ASSERT_OK(ThreadPoolBuilder("test")
330
3
                .set_min_threads(0).set_max_threads(max_threads)
331
3
                .set_max_queue_size(0).Build(&thread_pool));
332
333
3
  CountDownLatch latch(1);
334
9
  for (int i = 0; i < max_threads; i++) {
335
6
    ASSERT_OK(thread_pool->Submit(SlowTask::NewSlowTask(&latch)));
336
6
  }
337
3
  Status s = thread_pool->Submit(SlowTask::NewSlowTask(&latch));
338
6
  ASSERT_TRUE(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
339
3
  latch.CountDown();
340
3
  thread_pool->Wait();
341
3
  thread_pool->Shutdown();
342
3
}
343
344
1
TEST_F(TestThreadPool, TestMaxQueueZero) {
345
1
  TestQueueSizeZero(1);
346
1
  TestQueueSizeZero(5);
347
1
}
348
349
350
1
TEST_F(TestThreadPool, TestMaxQueueZeroNoThreads) {
351
1
  TestQueueSizeZero(0);
352
1
}
353
354
// Test that setting a promise from another thread yields
355
// a value on the current thread.
356
1
TEST_F(TestThreadPool, TestPromises) {
357
1
  std::unique_ptr<ThreadPool> thread_pool;
358
1
  ASSERT_OK(ThreadPoolBuilder("test")
359
1
      .set_min_threads(1).set_max_threads(1)
360
1
      .set_max_queue_size(1).Build(&thread_pool));
361
362
1
  Promise<int> my_promise;
363
1
  ASSERT_OK(thread_pool->SubmitClosure(
364
1
                     Bind(&Promise<int>::Set, Unretained(&my_promise), 5)));
365
1
  ASSERT_EQ(5, my_promise.Get());
366
1
  thread_pool->Shutdown();
367
1
}
368
369
370
METRIC_DEFINE_entity(test_entity);
371
METRIC_DEFINE_coarse_histogram(test_entity, queue_length, "queue length",
372
                        MetricUnit::kTasks, "queue length");
373
374
METRIC_DEFINE_coarse_histogram(test_entity, queue_time, "queue time",
375
                        MetricUnit::kMicroseconds, "queue time");
376
377
METRIC_DEFINE_coarse_histogram(test_entity, run_time, "run time",
378
                        MetricUnit::kMicroseconds, "run time");
379
380
1
TEST_F(TestThreadPool, TestMetrics) {
381
1
  MetricRegistry registry;
382
1
  vector<ThreadPoolMetrics> all_metrics;
383
4
  for (int i = 0; i < 3; i++) {
384
3
    scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
385
3
        &registry, Substitute("test $0", i));
386
3
    all_metrics.emplace_back(ThreadPoolMetrics{
387
3
        METRIC_queue_length.Instantiate(entity),
388
3
        METRIC_queue_time.Instantiate(entity),
389
3
        METRIC_run_time.Instantiate(entity)
390
3
    });
391
3
  }
392
393
  // Enable metrics for the thread pool.
394
1
  std::unique_ptr<ThreadPool> thread_pool;
395
1
  ASSERT_OK(ThreadPoolBuilder("test")
396
1
            .set_min_threads(1).set_max_threads(1)
397
1
            .set_metrics(all_metrics[0])
398
1
            .Build(&thread_pool));
399
400
1
  unique_ptr<ThreadPoolToken> t1 = thread_pool->NewTokenWithMetrics(
401
1
      ThreadPool::ExecutionMode::SERIAL, all_metrics[1]);
402
1
  unique_ptr<ThreadPoolToken> t2 = thread_pool->NewTokenWithMetrics(
403
1
      ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
404
405
  // Submit once to t1, twice to t2, and three times without a token.
406
1
  ASSERT_OK(t1->SubmitFunc([](){}));
407
1
  ASSERT_OK(t2->SubmitFunc([](){}));
408
1
  ASSERT_OK(t2->SubmitFunc([](){}));
409
1
  ASSERT_OK(thread_pool->SubmitFunc([](){}));
410
1
  ASSERT_OK(thread_pool->SubmitFunc([](){}));
411
1
  ASSERT_OK(thread_pool->SubmitFunc([](){}));
412
1
  thread_pool->Wait();
413
414
  // The total counts should reflect the number of submissions to each token.
415
1
  ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount());
416
1
  ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount());
417
1
  ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount());
418
1
  ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount());
419
1
  ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount());
420
1
  ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount());
421
422
  // And the counts on the pool-wide metrics should reflect all submissions.
423
1
  ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount());
424
1
  ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount());
425
1
  ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
426
1
}
427
428
// For test cases that should run with both kinds of tokens.
429
class TestThreadPoolTokenTypes : public TestThreadPool,
430
                                 public testing::WithParamInterface<ThreadPool::ExecutionMode> {};
431
432
INSTANTIATE_TEST_CASE_P(Tokens, TestThreadPoolTokenTypes,
433
                        ::testing::Values(ThreadPool::ExecutionMode::SERIAL,
434
                                          ThreadPool::ExecutionMode::CONCURRENT));
435
436
437
2
TEST_P(TestThreadPoolTokenTypes, TestTokenSubmitAndWait) {
438
2
  std::unique_ptr<ThreadPool> thread_pool;
439
2
  ASSERT_OK(ThreadPoolBuilder("test")
440
2
                .Build(&thread_pool));
441
2
  unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(GetParam());
442
2
  int i = 0;
443
2
  ASSERT_OK(t->SubmitFunc([&]() {
444
2
    SleepFor(MonoDelta::FromMilliseconds(1));
445
2
    i++;
446
2
  }));
447
2
  t->Wait();
448
2
  ASSERT_EQ(1, i);
449
2
}
450
451
1
TEST_F(TestThreadPool, TestTokenSubmitsProcessedSerially) {
452
1
  std::unique_ptr<ThreadPool> thread_pool;
453
1
  ASSERT_OK(ThreadPoolBuilder("test")
454
1
                .Build(&thread_pool));
455
1
  unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL);
456
1
  Random r(SeedRandom());
457
1
  string result;
458
6
  for (char c = 'a'; c < 'f'; c++) {
459
    // Sleep a little first so that there's a higher chance of out-of-order
460
    // appends if the submissions did execute in parallel.
461
5
    int sleep_ms = r.Next() % 5;
462
5
    ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() {
463
5
      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
464
5
      result += c;
465
5
    }));
466
5
  }
467
1
  t->Wait();
468
1
  ASSERT_EQ("abcde", result);
469
1
}
470
471
2
TEST_P(TestThreadPoolTokenTypes, TestTokenSubmitsProcessedConcurrently) {
472
2
  const int kNumTokens = 5;
473
2
  std::unique_ptr<ThreadPool> thread_pool;
474
2
  ASSERT_OK(ThreadPoolBuilder("test")
475
2
                .set_max_threads(kNumTokens)
476
2
                .Build(&thread_pool));
477
2
  vector<unique_ptr<ThreadPoolToken>> tokens;
478
479
  // A violation to the tested invariant would yield a deadlock, so let's set
480
  // up an alarm to bail us out.
481
2
  alarm(60);
482
2
  auto se = ScopeExit([] {
483
2
    alarm(0); // Disable alarm on test exit.
484
2
  });
485
2
  shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
486
12
  for (int i = 0; i < kNumTokens; i++) {
487
10
    tokens.emplace_back(thread_pool->NewToken(GetParam()));
488
10
    ASSERT_OK(tokens.back()->SubmitFunc([b]() {
489
10
      b->Wait();
490
10
    }));
491
10
  }
492
493
  // This will deadlock if the above tasks weren't all running concurrently.
494
2
  b->Wait();
495
2
}
496
497
1
TEST_F(TestThreadPool, TestTokenSubmitsNonSequential) {
498
1
  const int kNumSubmissions = 5;
499
1
  std::unique_ptr<ThreadPool> thread_pool;
500
1
  ASSERT_OK(ThreadPoolBuilder("test")
501
1
                .set_max_threads(kNumSubmissions)
502
1
                .Build(&thread_pool));
503
504
  // A violation to the tested invariant would yield a deadlock, so let's set
505
  // up an alarm to bail us out.
506
1
  alarm(60);
507
1
  auto se = ScopeExit([] {
508
1
    alarm(0); // Disable alarm on test exit.
509
1
  });
510
1
  shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
511
1
  unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
512
6
  for (int i = 0; i < kNumSubmissions; i++) {
513
5
    ASSERT_OK(t->SubmitFunc([b]() {
514
5
      b->Wait();
515
5
    }));
516
5
  }
517
518
  // This will deadlock if the above tasks weren't all running concurrently.
519
1
  b->Wait();
520
1
}
521
522
2
TEST_P(TestThreadPoolTokenTypes, TestTokenShutdown) {
523
2
  std::unique_ptr<ThreadPool> thread_pool;
524
2
  ASSERT_OK(ThreadPoolBuilder("test")
525
2
                .set_max_threads(4)
526
2
                .Build(&thread_pool));
527
528
2
  unique_ptr<ThreadPoolToken> t1(thread_pool->NewToken(GetParam()));
529
2
  unique_ptr<ThreadPoolToken> t2(thread_pool->NewToken(GetParam()));
530
2
  CountDownLatch l1(1);
531
2
  CountDownLatch l2(1);
532
533
  // A violation to the tested invariant would yield a deadlock, so let's set
534
  // up an alarm to bail us out.
535
2
  alarm(60);
536
2
  auto se = ScopeExit([] {
537
2
    alarm(0); // Disable alarm on test exit.
538
2
  });
539
540
8
  for (int i = 0; i < 3; i++) {
541
6
    ASSERT_OK(t1->SubmitFunc([&]() {
542
6
      l1.Wait();
543
6
    }));
544
6
  }
545
8
  for (int i = 0; i < 3; i++) {
546
6
    ASSERT_OK(t2->SubmitFunc([&]() {
547
6
      l2.Wait();
548
6
    }));
549
6
  }
550
551
  // Unblock all of t1's tasks, but not t2's tasks.
552
2
  l1.CountDown();
553
554
  // If this also waited for t2's tasks, it would deadlock.
555
2
  t1->Shutdown();
556
557
  // We can no longer submit to t1 but we can still submit to t2.
558
2
  ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable());
559
2
  ASSERT_OK(t2->SubmitFunc([](){}));
560
561
  // Unblock t2's tasks.
562
2
  l2.CountDown();
563
2
  t2->Shutdown();
564
2
}
565
566
2
TEST_P(TestThreadPoolTokenTypes, TestTokenWaitForAll) {
567
2
  const int kNumTokens = 3;
568
2
  const int kNumSubmissions = 20;
569
2
  std::unique_ptr<ThreadPool> thread_pool;
570
2
  ASSERT_OK(ThreadPoolBuilder("test")
571
2
                .Build(&thread_pool));
572
2
  Random r(SeedRandom());
573
2
  vector<unique_ptr<ThreadPoolToken>> tokens;
574
8
  for (int i = 0; i < kNumTokens; i++) {
575
6
    tokens.emplace_back(thread_pool->NewToken(GetParam()));
576
6
  }
577
578
2
  atomic<int32_t> v(0);
579
42
  for (int i = 0; i < kNumSubmissions; i++) {
580
    // Sleep a little first to raise the likelihood of the test thread
581
    // reaching Wait() before the submissions finish.
582
40
    int sleep_ms = r.Next() % 5;
583
584
40
    auto task = [&v, sleep_ms]() {
585
40
      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
586
40
      v++;
587
40
    };
588
589
    // Half of the submissions will be token-less, and half will use a token.
590
40
    if (i % 2 == 0) {
591
20
      ASSERT_OK(thread_pool->SubmitFunc(task));
592
20
    } else {
593
20
      int token_idx = r.Next() % tokens.size();
594
20
      ASSERT_OK(tokens[token_idx]->SubmitFunc(task));
595
20
    }
596
40
  }
597
2
  thread_pool->Wait();
598
2
  ASSERT_EQ(kNumSubmissions, v);
599
2
}
600
601
1
TEST_F(TestThreadPool, TestFuzz) {
602
1
  const int kNumOperations = 1000;
603
1
  Random r(SeedRandom());
604
1
  std::unique_ptr<ThreadPool> thread_pool;
605
1
  ASSERT_OK(ThreadPoolBuilder("test")
606
1
                .Build(&thread_pool));
607
1
  vector<unique_ptr<ThreadPoolToken>> tokens;
608
609
1.00k
  for (int i = 0; i < kNumOperations; i++) {
610
    // Operation distribution:
611
    //
612
    // - Submit without a token: 40%
613
    // - Submit with a randomly selected token: 35%
614
    // - Allocate a new token: 10%
615
    // - Wait on a randomly selected token: 7%
616
    // - Shutdown a randomly selected token: 4%
617
    // - Deallocate a randomly selected token: 2%
618
    // - Wait for all submissions: 2%
619
1.00k
    int op = r.Next() % 100;
620
1.00k
    if (op < 40) {
621
      // Submit without a token.
622
382
      int sleep_ms = r.Next() % 5;
623
382
      ASSERT_OK(thread_pool->SubmitFunc([sleep_ms]() {
624
        // Sleep a little first to increase task overlap.
625
382
        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
626
382
      }));
627
618
    } else if (op < 75) {
628
      // Submit with a randomly selected token.
629
353
      if (tokens.empty()) {
630
0
        continue;
631
0
      }
632
353
      int sleep_ms = r.Next() % 5;
633
353
      int token_idx = r.Next() % tokens.size();
634
228
      Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() {
635
        // Sleep a little first to increase task overlap.
636
228
        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
637
228
      });
638
353
      ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
639
265
    } else if (op < 85) {
640
      // Allocate a token with a randomly selected policy.
641
100
      ThreadPool::ExecutionMode mode = r.Next() % 2 ?
642
56
          ThreadPool::ExecutionMode::SERIAL :
643
44
          ThreadPool::ExecutionMode::CONCURRENT;
644
100
      tokens.emplace_back(thread_pool->NewToken(mode));
645
165
    } else if (op < 92) {
646
      // Wait on a randomly selected token.
647
76
      if (tokens.empty()) {
648
0
        continue;
649
0
      }
650
76
      int token_idx = r.Next() % tokens.size();
651
76
      tokens[token_idx]->Wait();
652
89
    } else if (op < 96) {
653
      // Shutdown a randomly selected token.
654
41
      if (tokens.empty()) {
655
0
        continue;
656
0
      }
657
41
      int token_idx = r.Next() % tokens.size();
658
41
      tokens[token_idx]->Shutdown();
659
48
    } else if (op < 98) {
660
      // Deallocate a randomly selected token.
661
20
      if (tokens.empty()) {
662
0
        continue;
663
0
      }
664
20
      auto it = tokens.begin();
665
20
      int token_idx = r.Next() % tokens.size();
666
20
      std::advance(it, token_idx);
667
20
      tokens.erase(it);
668
28
    } else {
669
      // Wait on everything.
670
28
      ASSERT_LT(op, 100);
671
28
      ASSERT_GE(op, 98);
672
28
      thread_pool->Wait();
673
28
    }
674
1.00k
  }
675
676
  // Some test runs will shut down the pool before the tokens, and some won't.
677
  // Either way should be safe.
678
1
  if (r.Next() % 2 == 0) {
679
1
    thread_pool->Shutdown();
680
1
  }
681
1
}
682
683
2
TEST_P(TestThreadPoolTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
684
2
  std::unique_ptr<ThreadPool> thread_pool;
685
2
  ASSERT_OK(ThreadPoolBuilder("test")
686
2
                .set_min_threads(1)
687
2
                .set_max_threads(1)
688
2
                .set_max_queue_size(1)
689
2
                .Build(&thread_pool));
690
691
2
  CountDownLatch latch(1);
692
2
  unique_ptr<ThreadPoolToken> t = thread_pool->NewToken(GetParam());
693
2
  auto se = ScopeExit([&latch] {
694
2
                     latch.CountDown();
695
2
    });
696
  // We will be able to submit two tasks: one for max_threads == 1 and one for
697
  // max_queue_size == 1.
698
2
  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
699
2
  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
700
2
  Status s = t->Submit(SlowTask::NewSlowTask(&latch));
701
2
  ASSERT_TRUE(s.IsServiceUnavailable());
702
2
}
703
704
1
TEST_F(TestThreadPool, TestTokenConcurrency) {
705
1
  const int kNumTokens = 20;
706
1
  const int kTestRuntimeSecs = 1;
707
1
  const int kCycleThreads = 2;
708
1
  const int kShutdownThreads = 2;
709
1
  const int kWaitThreads = 2;
710
1
  const int kSubmitThreads = 8;
711
712
1
  std::unique_ptr<ThreadPool> thread_pool;
713
1
  ASSERT_OK(ThreadPoolBuilder("test")
714
1
                .Build(&thread_pool));
715
1
  vector<shared_ptr<ThreadPoolToken>> tokens;
716
1
  Random rng(SeedRandom());
717
718
  // Protects 'tokens' and 'rng'.
719
1
  simple_spinlock lock;
720
721
  // Fetch a token from 'tokens' at random.
722
11.7k
  auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> {
723
11.7k
    std::lock_guard<simple_spinlock> l(lock);
724
11.7k
    int idx = rng.Uniform(kNumTokens);
725
11.7k
    return tokens[idx];
726
11.7k
  };
727
728
  // Preallocate all of the tokens.
729
21
  for (int i = 0; i < kNumTokens; i++) {
730
20
    ThreadPool::ExecutionMode mode;
731
20
    {
732
20
      std::lock_guard<simple_spinlock> l(lock);
733
20
      mode = rng.Next() % 2 ?
734
11
          ThreadPool::ExecutionMode::SERIAL :
735
9
          ThreadPool::ExecutionMode::CONCURRENT;
736
20
    }
737
20
    tokens.emplace_back(thread_pool->NewToken(mode).release());
738
20
  }
739
740
1
  atomic<int64_t> total_num_tokens_cycled(0);
741
1
  atomic<int64_t> total_num_tokens_shutdown(0);
742
1
  atomic<int64_t> total_num_tokens_waited(0);
743
1
  atomic<int64_t> total_num_tokens_submitted(0);
744
745
1
  CountDownLatch latch(1);
746
1
  vector<thread> threads;
747
748
3
  for (int i = 0; i < kCycleThreads; i++) {
749
    // Pick a token at random and replace it.
750
    //
751
    // The replaced token is only destroyed when the last ref is dropped,
752
    // possibly by another thread.
753
2
    threads.emplace_back([&]() {
754
2
      int num_tokens_cycled = 0;
755
3.94k
      while (latch.count()) {
756
3.94k
        {
757
3.94k
          std::lock_guard<simple_spinlock> l(lock);
758
3.94k
          int idx = rng.Uniform(kNumTokens);
759
3.94k
          ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
760
1.93k
              ThreadPool::ExecutionMode::SERIAL :
761
2.01k
              ThreadPool::ExecutionMode::CONCURRENT;
762
3.94k
          tokens[idx] = shared_ptr<ThreadPoolToken>(thread_pool->NewToken(mode).release());
763
3.94k
        }
764
3.94k
        num_tokens_cycled++;
765
766
        // Sleep a bit, otherwise this thread outpaces the other threads and
767
        // nothing interesting happens to most tokens.
768
3.94k
        SleepFor(MonoDelta::FromMicroseconds(10));
769
3.94k
      }
770
2
      total_num_tokens_cycled += num_tokens_cycled;
771
2
    });
772
2
  }
773
774
3
  for (int i = 0; i < kShutdownThreads; i++) {
775
    // Pick a token at random and shut it down. Submitting a task to a shut
776
    // down token will return a ServiceUnavailable error.
777
2
    threads.emplace_back([&]() {
778
2
      int num_tokens_shutdown = 0;
779
1.74k
      while (latch.count()) {
780
1.74k
        GetRandomToken()->Shutdown();
781
1.74k
        num_tokens_shutdown++;
782
1.74k
      }
783
2
      total_num_tokens_shutdown += num_tokens_shutdown;
784
2
    });
785
2
  }
786
787
3
  for (int i = 0; i < kWaitThreads; i++) {
788
    // Pick a token at random and wait for any outstanding tasks.
789
2
    threads.emplace_back([&]() {
790
2
      int num_tokens_waited  = 0;
791
2.51k
      while (latch.count()) {
792
2.51k
        GetRandomToken()->Wait();
793
2.51k
        num_tokens_waited++;
794
2.51k
      }
795
2
      total_num_tokens_waited += num_tokens_waited;
796
2
    });
797
2
  }
798
799
9
  for (int i = 0; i < kSubmitThreads; i++) {
800
    // Pick a token at random and submit a task to it.
801
8
    threads.emplace_back([&]() {
802
8
      int num_tokens_submitted = 0;
803
8
      Random rng(SeedRandom());
804
7.50k
      while (latch.count()) {
805
7.49k
        int sleep_ms = rng.Next() % 5;
806
2.13k
        Status s = GetRandomToken()->SubmitFunc([sleep_ms]() {
807
          // Sleep a little first so that tasks are running during other events.
808
2.13k
          SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
809
2.13k
        });
810
7.49k
        ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
811
7.49k
        num_tokens_submitted++;
812
7.49k
      }
813
8
      total_num_tokens_submitted += num_tokens_submitted;
814
8
    });
815
8
  }
816
817
1
  SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs));
818
1
  latch.CountDown();
819
14
  for (auto& t : threads) {
820
14
    t.join();
821
14
  }
822
823
1
  LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1",
824
1
                          kCycleThreads, total_num_tokens_cycled.load());
825
1
  LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1",
826
1
                          kShutdownThreads, total_num_tokens_shutdown.load());
827
1
  LOG(INFO) << Substitute("Tokens waited ($0 threads): $1",
828
1
                          kWaitThreads, total_num_tokens_waited.load());
829
1
  LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1",
830
1
                          kSubmitThreads, total_num_tokens_submitted.load());
831
1
}
832
833
} // namespace yb