YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/priority_thread_pool-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include <algorithm>
14
#include <thread>
15
16
#include <gtest/gtest.h>
17
18
#include "yb/util/priority_thread_pool.h"
19
#include "yb/util/random_util.h"
20
#include "yb/util/scope_exit.h"
21
#include "yb/util/test_macros.h"
22
#include "yb/util/test_thread_holder.h"
23
#include "yb/util/tostring.h"
24
25
using namespace std::literals;
26
27
namespace yb {
28
29
// In our jenkins environment test macs switch threads rare, so have to use higher step time.
30
#if defined(__APPLE__)
31
const auto kStepTime = 100ms;
32
#else
33
const auto kStepTime = 25ms;
34
#endif
35
const auto kWaitTime = kStepTime * 3;
36
37
class Share {
38
 public:
39
7.15k
  bool Step(int index) {
40
7.15k
    running_.fetch_or(1ULL << index, std::memory_order_acq_rel);
41
7.15k
    return (stop_.load(std::memory_order_acquire) & (1ULL << index)) == 0;
42
7.15k
  }
43
44
42
  void Stop(int index) {
45
42
    stop_.fetch_or(1ULL << index, std::memory_order_acq_rel);
46
42
  }
47
48
4
  void StopAll() {
49
4
    stop_.store(std::numeric_limits<uint64_t>::max(), std::memory_order_release);
50
4
  }
51
52
486
  void ResetRunning() {
53
486
    running_.store(0, std::memory_order_release);
54
486
  }
55
56
486
  uint64_t running() {
57
486
    return running_.load(std::memory_order_acquire);
58
486
  }
59
60
  // Fill `out` with priorities of running tasks.
61
  // `divisor` is used to convert index to priority.
62
486
  void FillRunningTaskPriorities(std::vector<int>* out, int divisor = 1) {
63
486
    std::this_thread::sleep_for(kWaitTime);
64
486
    ResetRunning();
65
486
    std::this_thread::sleep_for(kWaitTime);
66
486
    auto running_mask = running();
67
486
    out->clear();
68
486
    int i = 0;
69
6.84k
    while (running_mask != 0) {
70
6.35k
      if (running_mask & 1) {
71
1.21k
        out->push_back(i / divisor);
72
1.21k
      }
73
6.35k
      running_mask >>= 1;
74
6.35k
      ++i;
75
6.35k
    }
76
486
  }
77
78
 private:
79
  // ith bit is set to 1 when task i was reported as running after last reset.
80
  std::atomic<uint64_t> running_{0};
81
  // ith bit is set to 1 when we should stop task i.
82
  std::atomic<uint64_t> stop_{0};
83
};
84
85
class Task : public PriorityThreadPoolTask {
86
 public:
87
52
  Task(int index, Share* share) : index_(index), share_(share) {}
88
89
52
  void Run(const Status& status, PriorityThreadPoolSuspender* suspender) override {
90
52
    if (!status.ok()) {
91
1
      return;
92
1
    }
93
7.17k
    while (share_->Step(index_)) {
94
7.12k
      suspender->PauseIfNecessary();
95
7.12k
      std::this_thread::sleep_for(kStepTime);
96
7.12k
    }
97
51
  }
98
99
0
  bool ShouldRemoveWithKey(void* key) override {
100
0
    return false;
101
0
  }
102
103
80
  int Index() const {
104
80
    return index_;
105
80
  }
106
107
32
  std::string ToString() const override {
108
32
    return YB_CLASS_TO_STRING(index);
109
32
  }
110
111
 private:
112
  const int index_;
113
  Share* const share_;
114
};
115
116
// Test randomly submits and stops task to priority thread pool.
117
// Checking that only expected tasks are running.
118
// Task priority is calculated as index/divisor.
119
// So with divisor == 1 we get unique priorities, and with divisor > 1 multiple tasks could have
120
// the same priority.
121
2
void TestRandom(int divisor) {
122
2
  const int kTasks = 20;
123
2
  const int kMaxRunningTasks = 3;
124
2
  PriorityThreadPool thread_pool(kMaxRunningTasks);
125
2
  std::vector<std::unique_ptr<Task>> tasks;
126
2
  tasks.reserve(kTasks);
127
2
  Share share;
128
42
  for (int i = 0; i != kTasks; ++i) {
129
40
    tasks.emplace_back(std::make_unique<Task>(i, &share));
130
40
  }
131
2
  std::shuffle(tasks.begin(), tasks.end(), ThreadLocalRandom());
132
2
  std::set<int> scheduled;
133
2
  size_t schedule_idx = 0;
134
2
  std::set<int> stopped;
135
2
  std::vector<int> running_vector;
136
2
  std::vector<int> expected_running;
137
138
2
  auto se = ScopeExit([&share, &thread_pool] {
139
2
    thread_pool.StartShutdown();
140
2
    share.StopAll();
141
2
    thread_pool.CompleteShutdown();
142
2
  });
143
144
477
  while (stopped.size() != kTasks) {
145
475
    if (schedule_idx < kTasks && RandomUniformInt<size_t>(0, 2 + scheduled.size()) == 0) {
146
40
      auto& task = tasks[schedule_idx];
147
40
      auto index = task->Index();
148
40
      scheduled.insert(index);
149
40
      ++schedule_idx;
150
40
      auto priority = task->Index() / divisor;
151
40
      ASSERT_OK(thread_pool.Submit(priority, &task));
152
40
      ASSERT_TRUE(task == nullptr);
153
40
      LOG(INFO) << "Submitted: " << index << ", scheduled: " << yb::ToString(scheduled);
154
435
    } else if (!scheduled.empty() &&
155
431
               RandomUniformInt<size_t>(0, std::max<size_t>(0, 13 - scheduled.size())) == 0) {
156
40
      auto it = scheduled.end();
157
40
      std::advance(
158
40
          it, -RandomUniformInt<ssize_t>(1, std::min<ssize_t>(scheduled.size(), kMaxRunningTasks)));
159
40
      auto idx = *it;
160
40
      stopped.insert(idx);
161
40
      share.Stop(idx);
162
40
      scheduled.erase(it);
163
40
      LOG(INFO) << "Stopped: " << idx << ", scheduled: " << yb::ToString(scheduled);
164
40
    }
165
475
    share.FillRunningTaskPriorities(&running_vector, divisor);
166
475
    expected_running.clear();
167
475
    auto it = scheduled.end();
168
475
    auto left = kMaxRunningTasks;
169
1.66k
    while (it != scheduled.begin() && left > 0) {
170
1.19k
      --it;
171
1.19k
      expected_running.push_back(*it / divisor);
172
1.19k
      --left;
173
1.19k
    }
174
475
    std::reverse(expected_running.begin(), expected_running.end());
175
950
    ASSERT_EQ(expected_running, running_vector) << "Scheduled: " << yb::ToString(scheduled)
176
950
                                                << ", running: " << yb::ToString(running_vector)
177
950
                                                << ", state: " << thread_pool.StateToString();
178
475
  }
179
2
}
180
181
1
TEST(PriorityThreadPoolTest, RandomUnique) {
182
1
  TestRandom(1 /* divisor */);
183
1
}
184
185
1
TEST(PriorityThreadPoolTest, RandomNonUnique) {
186
1
  TestRandom(3 /* divisor */);
187
1
}
188
189
constexpr int kMaxRandomTaskTimeMs = 40;
190
191
class RandomTask : public PriorityThreadPoolTask {
192
 public:
193
20.9k
  RandomTask() = default;
194
195
20.9k
  void Run(const Status& status, PriorityThreadPoolSuspender* suspender) override {
196
20.9k
    if (!status.ok()) {
197
0
      return;
198
0
    }
199
20.9k
    std::this_thread::sleep_for(1ms * RandomUniformInt(1, kMaxRandomTaskTimeMs));
200
20.9k
  }
201
202
  // Returns true if the task belongs to specified key, which was passed to
203
  // PriorityThreadPool::Remove and should be removed.
204
0
  bool ShouldRemoveWithKey(void* key) override {
205
0
    return false;
206
0
  }
207
208
0
  std::string ToString() const override {
209
0
    return "RandomTask";
210
0
  }
211
};
212
213
1
TEST(PriorityThreadPoolTest, RandomTasks) {
214
1
  constexpr int kMaxRunningTasks = 10;
215
1
  PriorityThreadPool thread_pool(kMaxRunningTasks);
216
1
  TestThreadHolder holder;
217
11
  for (int i = 0; i != kMaxRunningTasks; ++i) {
218
10
    holder.AddThread([&stop = holder.stop_flag(), &thread_pool] {
219
21.0k
      while (!stop.load()) {
220
20.9k
        auto priority = RandomUniformInt(0, 4);
221
20.9k
        auto temp_task = std::make_unique<RandomTask>();
222
20.9k
        ASSERT_OK(thread_pool.Submit(priority, &temp_task));
223
20.9k
        ASSERT_TRUE(temp_task == nullptr);
224
        // Submit tasks slightly slower than they complete.
225
        // To frequently get case of empty thread pool.
226
21.0k
        std::this_thread::sleep_for(1ms * RandomUniformInt(1, kMaxRandomTaskTimeMs * 5 / 4));
227
21.0k
      }
228
10
    });
229
10
  }
230
1
  holder.WaitAndStop(60s);
231
1
  thread_pool.Shutdown();
232
1
}
233
234
namespace {
235
236
12
size_t SubmitTask(int index, Share* share, PriorityThreadPool* thread_pool) {
237
12
  auto task = std::make_unique<Task>(index, share);
238
12
  size_t serial_no = task->SerialNo();
239
12
  EXPECT_OK(thread_pool->Submit(index /* priority */, &task));
240
12
  EXPECT_TRUE(task == nullptr);
241
12
  LOG(INFO) << "Started " << index << ", serial no: " << serial_no;
242
12
  return serial_no;
243
12
}
244
245
} // namespace
246
247
1
TEST(PriorityThreadPoolTest, ChangePriority) {
248
1
  const int kMaxRunningTasks = 3;
249
1
  PriorityThreadPool thread_pool(kMaxRunningTasks);
250
1
  Share share;
251
1
  std::vector<int> running;
252
253
1
  auto se = ScopeExit([&share, &thread_pool] {
254
1
    thread_pool.StartShutdown();
255
1
    share.StopAll();
256
1
    thread_pool.CompleteShutdown();
257
1
  });
258
259
1
  auto task5 = SubmitTask(5, &share, &thread_pool);
260
1
  SubmitTask(6, &share, &thread_pool);
261
1
  SubmitTask(7, &share, &thread_pool);
262
1
  auto task1 = SubmitTask(1, &share, &thread_pool);
263
1
  auto task2 = SubmitTask(2, &share, &thread_pool);
264
1
  SubmitTask(3, &share, &thread_pool);
265
266
1
  share.FillRunningTaskPriorities(&running);
267
1
  ASSERT_EQ(running, std::vector<int>({5, 6, 7}));
268
269
  // Check that we could pause running task when priority increased.
270
1
  ASSERT_TRUE(thread_pool.ChangeTaskPriority(task1, 8));
271
1
  share.FillRunningTaskPriorities(&running);
272
1
  ASSERT_EQ(running, std::vector<int>({1, 6, 7}));
273
274
  // Check that priority of queued task could be updated.
275
1
  ASSERT_TRUE(thread_pool.ChangeTaskPriority(task2, 4));
276
1
  share.Stop(1);
277
1
  share.Stop(7);
278
279
1
  share.FillRunningTaskPriorities(&running);
280
1
  ASSERT_EQ(running, std::vector<int>({2, 5, 6}));
281
282
  // Check decrease of priority.
283
1
  ASSERT_TRUE(thread_pool.ChangeTaskPriority(task5, 1));
284
1
  share.FillRunningTaskPriorities(&running);
285
1
  ASSERT_EQ(running, std::vector<int>({2, 3, 6}));
286
287
  // Check same priority.
288
1
  ASSERT_TRUE(thread_pool.ChangeTaskPriority(task5, 6));
289
1
  share.FillRunningTaskPriorities(&running);
290
1
  ASSERT_EQ(running, std::vector<int>({2, 5, 6}));
291
1
}
292
293
1
TEST(PriorityThreadPoolTest, FailureToCreateThread) {
294
1
  const int kMaxRunningTasks = 3;
295
1
  PriorityThreadPool thread_pool(kMaxRunningTasks);
296
1
  Share share;
297
1
  std::vector<int> running;
298
299
1
  auto se = ScopeExit([&share, &thread_pool] {
300
1
    LOG(INFO) << "Final state of the pool: " << thread_pool.StateToString();
301
1
    thread_pool.StartShutdown();
302
1
    share.StopAll();
303
1
    thread_pool.CompleteShutdown();
304
1
  });
305
306
1
  SubmitTask(5, &share, &thread_pool);
307
1
  SubmitTask(6, &share, &thread_pool);
308
309
1
  share.FillRunningTaskPriorities(&running);
310
1
  ASSERT_EQ(running, std::vector<int>({5, 6}));
311
312
1
  LOG(INFO) << "DISABLING CREATION OF NEW THREADS";
313
314
  // We fail to launch a new thread and just add the task to the list of waiting tasks.
315
1
  thread_pool.TEST_SetThreadCreationFailureProbability(1);
316
317
  // We cannot launch new threads now so we just add tasks as "not started" and the same two tasks
318
  // keep running.
319
  //
320
  // Prior to the #8348 fix, these failures also incorrectly increause the paused_workers_ counter
321
  // each time, which then causes an underflow in PickWorker and we would not be able to start
322
  // any new tasks at all.
323
1
  SubmitTask(7, &share, &thread_pool);
324
1
  share.FillRunningTaskPriorities(&running);
325
1
  ASSERT_EQ(running, std::vector<int>({5, 6}));
326
327
1
  SubmitTask(8, &share, &thread_pool);
328
1
  share.FillRunningTaskPriorities(&running);
329
1
  ASSERT_EQ(running, std::vector<int>({5, 6}));
330
331
1
  SubmitTask(9, &share, &thread_pool);
332
1
  share.FillRunningTaskPriorities(&running);
333
1
  ASSERT_EQ(running, std::vector<int>({5, 6}));
334
335
1
  LOG(INFO) << "ALLOWING CREATION OF NEW THREADS";
336
337
  // Now allow adding new threads and launch more tasks. Tasks 5 and 6 should immediately get
338
  // paused and replaced with 8 and 9, but because we pause and launch exactly one task, 7 does
339
  // not actually get started, even though we have enough threads and enough quota for it.
340
  // We might consider changing this in the future.
341
1
  thread_pool.TEST_SetThreadCreationFailureProbability(0);
342
1
  std::this_thread::sleep_for(kWaitTime);
343
344
1
  share.FillRunningTaskPriorities(&running);
345
1
  ASSERT_EQ(running, std::vector<int>({8, 9}));
346
347
1
  SubmitTask(10, &share, &thread_pool);
348
1
  share.FillRunningTaskPriorities(&running);
349
1
  ASSERT_EQ(running, std::vector<int>({8, 9, 10}));
350
1
}
351
352
} // namespace yb