YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/threadpool.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <algorithm>
34
#include <functional>
35
#include <limits>
36
#include <memory>
37
38
#include <gflags/gflags.h>
39
#include <glog/logging.h>
40
41
#include "yb/gutil/callback.h"
42
#include "yb/gutil/macros.h"
43
#include "yb/gutil/map-util.h"
44
#include "yb/gutil/stl_util.h"
45
#include "yb/gutil/strings/substitute.h"
46
#include "yb/gutil/sysinfo.h"
47
48
#include "yb/util/errno.h"
49
#include "yb/util/logging.h"
50
#include "yb/util/metrics.h"
51
#include "yb/util/thread.h"
52
#include "yb/util/threadpool.h"
53
#include "yb/util/trace.h"
54
55
namespace yb {
56
57
using strings::Substitute;
58
using std::unique_ptr;
59
60
61
1.72M
ThreadPoolMetrics::~ThreadPoolMetrics() = default;
62
63
////////////////////////////////////////////////////////
64
// ThreadPoolBuilder
65
///////////////////////////////////////////////////////
66
67
ThreadPoolBuilder::ThreadPoolBuilder(std::string name)
68
    : name_(std::move(name)),
69
      min_threads_(0),
70
      max_threads_(base::NumCPUs()),
71
      max_queue_size_(std::numeric_limits<int>::max()),
72
147k
      idle_timeout_(MonoDelta::FromMilliseconds(500)) {}
73
74
52.3k
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
75
52.3k
  CHECK_GE(min_threads, 0);
76
52.3k
  min_threads_ = min_threads;
77
52.3k
  return *this;
78
52.3k
}
79
80
77.6k
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
81
77.6k
  CHECK_GE(max_threads, 0);
82
77.6k
  max_threads_ = max_threads;
83
77.6k
  return *this;
84
77.6k
}
85
86
24.4k
ThreadPoolBuilder& ThreadPoolBuilder::unlimited_threads() {
87
24.4k
  max_threads_ = std::numeric_limits<int>::max();
88
24.4k
  return *this;
89
24.4k
}
90
91
12.2k
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
92
12.2k
  CHECK_GE(max_queue_size, 0);
93
12.2k
  max_queue_size_ = max_queue_size;
94
12.2k
  return *this;
95
12.2k
}
96
97
30.2k
ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
98
30.2k
  metrics_ = std::move(metrics);
99
30.2k
  return *this;
100
30.2k
}
101
102
6.10k
ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) {
103
6.10k
  idle_timeout_ = idle_timeout;
104
6.10k
  return *this;
105
6.10k
}
106
107
146k
Status ThreadPoolBuilder::Build(std::unique_ptr<ThreadPool>* pool) const {
108
146k
  pool->reset(new ThreadPool(*this));
109
146k
  RETURN_NOT_OK((*pool)->Init());
110
146k
  return Status::OK();
111
146k
}
112
113
////////////////////////////////////////////////////////
114
// ThreadPoolToken
115
////////////////////////////////////////////////////////
116
117
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
118
                                 ThreadPool::ExecutionMode mode,
119
                                 ThreadPoolMetrics metrics)
120
    : mode_(mode),
121
      pool_(pool),
122
      metrics_(std::move(metrics)),
123
      state_(ThreadPoolTokenState::kIdle),
124
      not_running_cond_(&pool->lock_),
125
622k
      active_threads_(0) {
126
622k
}
127
128
276k
ThreadPoolToken::~ThreadPoolToken() {
129
276k
  Shutdown();
130
276k
  pool_->ReleaseToken(this);
131
276k
}
132
133
15.2M
Status ThreadPoolToken::SubmitClosure(Closure c) {
134
15.2M
  return Submit(std::make_shared<FunctionRunnable>((std::bind(&Closure::Run, c))));
135
15.2M
}
136
137
24.9M
Status ThreadPoolToken::SubmitFunc(std::function<void()> f) {
138
24.9M
  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
139
24.9M
}
140
141
40.2M
Status ThreadPoolToken::Submit(std::shared_ptr<Runnable> r) {
142
40.2M
  return pool_->DoSubmit(std::move(r), this);
143
40.2M
}
144
145
421k
void ThreadPoolToken::Shutdown() {
146
421k
  MutexLock unique_lock(pool_->lock_);
147
421k
  pool_->CheckNotPoolThreadUnlocked();
148
149
  // Clear the queue under the lock, but defer the releasing of the tasks
150
  // outside the lock, in case there are concurrent threads wanting to access
151
  // the ThreadPool. The task's destructors may acquire locks, etc, so this
152
  // also prevents lock inversions.
153
421k
  deque<ThreadPool::Task> to_release = std::move(entries_);
154
421k
  pool_->total_queued_tasks_ -= to_release.size();
155
156
421k
  switch (state()) {
157
263k
    case ThreadPoolTokenState::kIdle:
158
      // There were no tasks outstanding; we can quiesce the token immediately.
159
263k
      Transition(ThreadPoolTokenState::kQuiesced);
160
263k
      break;
161
726
    case ThreadPoolTokenState::kRunning:
162
      // There were outstanding tasks. If any are still running, switch to
163
      // kQuiescing and wait for them to finish (the worker thread executing
164
      // the token's last task will switch the token to kQuiesced). Otherwise,
165
      // we can quiesce the token immediately.
166
167
      // Note: this is an O(n) operation, but it's expected to be infrequent.
168
      // Plus doing it this way (rather than switching to kQuiescing and waiting
169
      // for a worker thread to process the queue entry) helps retain state
170
      // transition symmetry with ThreadPool::Shutdown.
171
24.5k
      for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) {
172
23.8k
        if (*it == this) {
173
2.03k
          it = pool_->queue_.erase(it);
174
21.7k
        } else {
175
21.7k
          it++;
176
21.7k
        }
177
23.8k
      }
178
179
726
      if (active_threads_ == 0) {
180
153
        Transition(ThreadPoolTokenState::kQuiesced);
181
153
        break;
182
153
      }
183
573
      Transition(ThreadPoolTokenState::kQuiescing);
184
573
      FALLTHROUGH_INTENDED;
185
574
    case ThreadPoolTokenState::kQuiescing:
186
      // The token is already quiescing. Just wait for a worker thread to
187
      // switch it to kQuiesced.
188
1.14k
      while (state() != ThreadPoolTokenState::kQuiesced) {
189
574
        not_running_cond_.Wait();
190
574
      }
191
574
      break;
192
157k
    default:
193
157k
      break;
194
421k
  }
195
196
  // Finally release the queued tasks, outside the lock.
197
421k
  unique_lock.Unlock();
198
4.37k
  for (auto& t : to_release) {
199
4.37k
    if (t.trace) {
200
0
      t.trace->Release();
201
0
    }
202
4.37k
  }
203
421k
}
204
205
2.59k
void ThreadPoolToken::Wait() {
206
2.59k
  MutexLock unique_lock(pool_->lock_);
207
2.59k
  pool_->CheckNotPoolThreadUnlocked();
208
2.67k
  while (IsActive()) {
209
79
    not_running_cond_.Wait();
210
79
  }
211
2.59k
}
212
213
0
bool ThreadPoolToken::WaitUntil(const MonoTime& until) {
214
0
  MutexLock unique_lock(pool_->lock_);
215
0
  pool_->CheckNotPoolThreadUnlocked();
216
0
  while (IsActive()) {
217
0
    if (!not_running_cond_.WaitUntil(until)) {
218
0
      return false;
219
0
    }
220
0
  }
221
0
  return true;
222
0
}
223
224
0
bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
225
0
  return WaitUntil(MonoTime::Now() + delta);
226
0
}
227
228
64.3M
void ThreadPoolToken::Transition(ThreadPoolTokenState new_state) {
229
64.3M
#ifndef NDEBUG
230
64.3M
  CHECK_NE(state_, new_state);
231
232
64.3M
  switch (state_) {
233
32.3M
    case ThreadPoolTokenState::kIdle:
234
32.3M
      CHECK(new_state == ThreadPoolTokenState::kRunning ||
235
32.3M
            new_state == ThreadPoolTokenState::kQuiesced);
236
32.3M
      if (new_state == ThreadPoolTokenState::kRunning) {
237
32.0M
        CHECK(!entries_.empty());
238
275k
      } else {
239
275k
        CHECK(entries_.empty());
240
275k
        CHECK_EQ(active_threads_, 0);
241
275k
      }
242
32.3M
      break;
243
32.0M
    case ThreadPoolTokenState::kRunning:
244
32.0M
      CHECK(new_state == ThreadPoolTokenState::kIdle ||
245
32.0M
            new_state == ThreadPoolTokenState::kQuiescing ||
246
32.0M
            new_state == ThreadPoolTokenState::kQuiesced);
247
32.0M
      CHECK(entries_.empty());
248
32.0M
      if (new_state == ThreadPoolTokenState::kQuiescing) {
249
592
        CHECK_GT(active_threads_, 0);
250
592
      }
251
32.0M
      break;
252
591
    case ThreadPoolTokenState::kQuiescing:
253
591
      CHECK(new_state == ThreadPoolTokenState::kQuiesced);
254
591
      CHECK_EQ(active_threads_, 0);
255
591
      break;
256
0
    case ThreadPoolTokenState::kQuiesced:
257
0
      CHECK(false); // kQuiesced is a terminal state
258
0
      break;
259
0
    default:
260
0
      LOG(FATAL) << "Unknown token state: " << state_;
261
64.3M
  }
262
64.3M
#endif
263
264
  // Take actions based on the state we're entering.
265
64.3M
  switch (new_state) {
266
32.0M
    case ThreadPoolTokenState::kIdle:
267
32.3M
    case ThreadPoolTokenState::kQuiesced:
268
32.3M
      not_running_cond_.Broadcast();
269
32.3M
      break;
270
32.0M
    default:
271
32.0M
      break;
272
64.3M
  }
273
274
64.3M
  state_ = new_state;
275
64.3M
}
276
277
0
const char* ThreadPoolToken::StateToString(ThreadPoolTokenState s) {
278
0
  switch (s) {
279
0
    case ThreadPoolTokenState::kIdle: return "kIdle"; break;
280
0
    case ThreadPoolTokenState::kRunning: return "kRunning"; break;
281
0
    case ThreadPoolTokenState::kQuiescing: return "kQuiescing"; break;
282
0
    case ThreadPoolTokenState::kQuiesced: return "kQuiesced"; break;
283
0
  }
284
0
  return "<cannot reach here>";
285
0
}
286
287
////////////////////////////////////////////////////////
288
// ThreadPool
289
////////////////////////////////////////////////////////
290
291
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
292
  : name_(builder.name_),
293
    min_threads_(builder.min_threads_),
294
    max_threads_(builder.max_threads_),
295
    max_queue_size_(builder.max_queue_size_),
296
    idle_timeout_(builder.idle_timeout_),
297
    pool_status_(STATUS(Uninitialized, "The pool was not initialized.")),
298
    idle_cond_(&lock_),
299
    no_threads_cond_(&lock_),
300
    not_empty_(&lock_),
301
    num_threads_(0),
302
    active_threads_(0),
303
    total_queued_tasks_(0),
304
    tokenless_(NewToken(ExecutionMode::CONCURRENT)),
305
147k
    metrics_(builder.metrics_) {
306
147k
}
307
308
12.0k
ThreadPool::~ThreadPool() {
309
  // There should only be one live token: the one used in tokenless submission.
310
0
  CHECK_EQ(1, tokens_.size()) << Substitute(
311
0
      "Threadpool $0 destroyed with $1 allocated tokens",
312
0
      name_, tokens_.size());
313
12.0k
  Shutdown();
314
12.0k
}
315
316
147k
Status ThreadPool::Init() {
317
147k
  MutexLock unique_lock(lock_);
318
147k
  if (!pool_status_.IsUninitialized()) {
319
0
    return STATUS(NotSupported, "The thread pool is already initialized");
320
0
  }
321
147k
  pool_status_ = Status::OK();
322
199k
  for (int i = 0; i < min_threads_; i++) {
323
52.3k
    Status status = CreateThreadUnlocked();
324
52.3k
    if (!status.ok()) {
325
0
      if (i != 0) {
326
0
        YB_LOG_EVERY_N_SECS(WARNING, 5) << "Cannot create thread: " << status << ", will try later";
327
        // Cannot create enough threads now, will try later.
328
0
        break;
329
0
      }
330
0
      unique_lock.Unlock();
331
0
      Shutdown();
332
0
      return status;
333
0
    }
334
52.3k
  }
335
147k
  return Status::OK();
336
147k
}
337
338
24.7k
void ThreadPool::Shutdown() {
339
24.7k
  MutexLock unique_lock(lock_);
340
24.7k
  CheckNotPoolThreadUnlocked();
341
342
  // Note: this is the same error seen at submission if the pool is at
343
  // capacity, so clients can't tell them apart. This isn't really a practical
344
  // concern though because shutting down a pool typically requires clients to
345
  // be quiesced first, so there's no danger of a client getting confused.
346
24.7k
  pool_status_ = STATUS(ServiceUnavailable, "The pool has been shut down.");
347
348
  // Clear the various queues under the lock, but defer the releasing
349
  // of the tasks outside the lock, in case there are concurrent threads
350
  // wanting to access the ThreadPool. The task's destructors may acquire
351
  // locks, etc, so this also prevents lock inversions.
352
24.7k
  queue_.clear();
353
24.7k
  deque<deque<Task>> to_release;
354
24.8k
  for (auto* t : tokens_) {
355
24.8k
    if (!t->entries_.empty()) {
356
7
      to_release.emplace_back(std::move(t->entries_));
357
7
    }
358
24.8k
    switch (t->state()) {
359
12.1k
      case ThreadPoolTokenState::kIdle:
360
        // The token is idle; we can quiesce it immediately.
361
12.1k
        t->Transition(ThreadPoolTokenState::kQuiesced);
362
12.1k
        break;
363
22
      case ThreadPoolTokenState::kRunning:
364
        // The token has tasks associated with it. If they're merely queued
365
        // (i.e. there are no active threads), the tasks will have been removed
366
        // above and we can quiesce immediately. Otherwise, we need to wait for
367
        // the threads to finish.
368
22
        t->Transition(t->active_threads_ > 0 ?
369
19
            ThreadPoolTokenState::kQuiescing :
370
3
            ThreadPoolTokenState::kQuiesced);
371
22
        break;
372
12.7k
      default:
373
12.7k
        break;
374
24.8k
    }
375
24.8k
  }
376
377
  // The queues are empty. Wake any sleeping worker threads and wait for all
378
  // of them to exit. Some worker threads will exit immediately upon waking,
379
  // while others will exit after they finish executing an outstanding task.
380
24.7k
  total_queued_tasks_ = 0;
381
24.7k
  not_empty_.Broadcast();
382
27.8k
  while (num_threads_ > 0) {
383
3.04k
    no_threads_cond_.Wait();
384
3.04k
  }
385
386
  // All the threads have exited. Check the state of each token.
387
24.8k
  for (auto* t : tokens_) {
388
24.8k
    DCHECK(t->state() == ThreadPoolTokenState::kIdle ||
389
24.8k
           t->state() == ThreadPoolTokenState::kQuiesced);
390
24.8k
  }
391
392
  // Finally release the queued tasks, outside the lock.
393
24.7k
  unique_lock.Unlock();
394
7
  for (auto& token : to_release) {
395
24
    for (auto& t : token) {
396
24
      if (t.trace) {
397
0
        t.trace->Release();
398
0
      }
399
24
    }
400
7
  }
401
24.7k
}
402
403
621k
unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
404
621k
  return NewTokenWithMetrics(mode, {});
405
621k
}
406
407
unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
408
623k
    ExecutionMode mode, ThreadPoolMetrics metrics) {
409
623k
  MutexLock guard(lock_);
410
623k
  unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, std::move(metrics)));
411
623k
  InsertOrDie(&tokens_, t.get());
412
623k
  return t;
413
623k
}
414
415
276k
void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
416
276k
  MutexLock guard(lock_);
417
18.4E
  CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
418
18.4E
                                      ThreadPoolToken::StateToString(t->state()));
419
276k
  CHECK_EQ(1, tokens_.erase(t));
420
276k
}
421
422
423
7.43k
Status ThreadPool::SubmitClosure(const Closure& task) {
424
  // TODO: once all uses of std::bind-based tasks are dead, implement this
425
  // in a more straight-forward fashion.
426
7.43k
  return SubmitFunc(std::bind(&Closure::Run, task));
427
7.43k
}
428
429
1.01k
Status ThreadPool::SubmitFunc(const std::function<void()>& func) {
430
1.01k
  return Submit(std::make_shared<FunctionRunnable>(func));
431
1.01k
}
432
433
0
Status ThreadPool::SubmitFunc(std::function<void()>&& func) {
434
0
  return Submit(std::make_shared<FunctionRunnable>(std::move(func)));
435
0
}
436
437
971k
Status ThreadPool::Submit(const std::shared_ptr<Runnable>& r) {
438
971k
  return DoSubmit(std::move(r), tokenless_.get());
439
971k
}
440
441
41.1M
Status ThreadPool::DoSubmit(const std::shared_ptr<Runnable> task, ThreadPoolToken* token) {
442
41.1M
  DCHECK(token);
443
41.1M
  MonoTime submit_time = MonoTime::Now();
444
445
41.1M
  MutexLock guard(lock_);
446
41.1M
  if (PREDICT_FALSE(!pool_status_.ok())) {
447
2
    return pool_status_;
448
2
  }
449
450
41.1M
  if (PREDICT_FALSE(!token->MaySubmitNewTasks())) {
451
1.17k
    return STATUS(ServiceUnavailable, "Thread pool token was shut down.", "", Errno(ESHUTDOWN));
452
1.17k
  }
453
454
  // Size limit check.
455
41.1M
  int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
456
41.1M
                               static_cast<int64_t>(max_queue_size_) - total_queued_tasks_;
457
41.1M
  if (capacity_remaining < 1) {
458
7
    return STATUS(ServiceUnavailable,
459
7
                  Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
460
7
                             num_threads_, max_threads_, total_queued_tasks_, max_queue_size_),
461
7
                  "", Errno(ESHUTDOWN));
462
7
  }
463
464
  // Should we create another thread?
465
  // We assume that each current inactive thread will grab one item from the
466
  // queue.  If it seems like we'll need another thread, we create one.
467
  // In theory, a currently active thread could finish immediately after this
468
  // calculation.  This would mean we created a thread we didn't really need.
469
  // However, this race is unavoidable, since we don't do the work under a lock.
470
  // It's also harmless.
471
  //
472
  // Of course, we never create more than max_threads_ threads no matter what.
473
41.1M
  int threads_from_this_submit =
474
41.1M
      token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
475
41.1M
  int inactive_threads = num_threads_ - active_threads_;
476
41.1M
  int64_t additional_threads = (queue_.size() + threads_from_this_submit) - inactive_threads;
477
41.1M
  if (additional_threads > 0 && num_threads_ < max_threads_) {
478
301k
    Status status = CreateThreadUnlocked();
479
301k
    if (!status.ok()) {
480
      // If we failed to create a thread, but there are still some other
481
      // worker threads, log a warning message and continue.
482
0
      LOG(WARNING) << "Thread pool failed to create thread: " << status << ", num_threads: "
483
0
                   << num_threads_ << ", max_threads: " << max_threads_;
484
0
      if (num_threads_ == 0) {
485
        // If we have no threads, we can't do any work.
486
0
        return status;
487
0
      }
488
41.1M
    }
489
301k
  }
490
491
41.1M
  Task e;
492
41.1M
  e.runnable = task;
493
41.1M
  e.trace = Trace::CurrentTrace();
494
  // Need to AddRef, since the thread which submitted the task may go away,
495
  // and we don't want the trace to be destructed while waiting in the queue.
496
41.1M
  if (e.trace) {
497
1.46M
    e.trace->AddRef();
498
1.46M
  }
499
41.1M
  e.submit_time = submit_time;
500
501
  // Add the task to the token's queue.
502
41.1M
  ThreadPoolTokenState state = token->state();
503
41.1M
  DCHECK(state == ThreadPoolTokenState::kIdle ||
504
41.1M
         state == ThreadPoolTokenState::kRunning);
505
41.1M
  token->entries_.emplace_back(std::move(e));
506
41.1M
  if (state == ThreadPoolTokenState::kIdle ||
507
38.5M
      token->mode() == ExecutionMode::CONCURRENT) {
508
38.5M
    queue_.emplace_back(token);
509
38.5M
    if (state == ThreadPoolTokenState::kIdle) {
510
32.0M
      token->Transition(ThreadPoolTokenState::kRunning);
511
32.0M
    }
512
38.5M
  }
513
41.1M
  int length_at_submit = total_queued_tasks_++;
514
515
41.1M
  guard.Unlock();
516
41.1M
  not_empty_.Signal();
517
518
41.1M
  if (metrics_.queue_length_histogram) {
519
338k
    metrics_.queue_length_histogram->Increment(length_at_submit);
520
338k
  }
521
41.1M
  if (token->metrics_.queue_length_histogram) {
522
3
    token->metrics_.queue_length_histogram->Increment(length_at_submit);
523
3
  }
524
525
41.1M
  return Status::OK();
526
41.1M
}
527
528
292
void ThreadPool::Wait() {
529
292
  MutexLock unique_lock(lock_);
530
426
  while ((!queue_.empty()) || (active_threads_ > 0)) {
531
134
    idle_cond_.Wait();
532
134
  }
533
292
}
534
535
429
bool ThreadPool::WaitUntil(const MonoTime& until) {
536
429
  MutexLock unique_lock(lock_);
537
429
  while ((!queue_.empty()) || (active_threads_ > 0)) {
538
0
    if (!idle_cond_.WaitUntil(until)) {
539
0
      return false;
540
0
    }
541
0
  }
542
429
  return true;
543
429
}
544
545
429
bool ThreadPool::WaitFor(const MonoDelta& delta) {
546
429
  return WaitUntil(MonoTime::Now() + delta);
547
429
}
548
549
354k
void ThreadPool::DispatchThread(bool permanent) {
550
354k
  MutexLock unique_lock(lock_);
551
82.2M
  while (true) {
552
    // Note: STATUS(Aborted, ) is used to indicate normal shutdown.
553
82.1M
    if (!pool_status_.ok()) {
554
2
      VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
555
3.45k
      break;
556
3.45k
    }
557
558
82.1M
    if (queue_.empty()) {
559
40.9M
      if (permanent) {
560
9.50M
        not_empty_.Wait();
561
31.4M
      } else {
562
31.4M
        if (!not_empty_.TimedWait(idle_timeout_)) {
563
          // After much investigation, it appears that pthread condition variables have
564
          // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
565
          // another thread did in fact signal. Apparently after a timeout there is some
566
          // brief period during which another thread may actually grab the internal mutex
567
          // protecting the state, signal, and release again before we get the mutex. So,
568
          // we'll recheck the empty queue case regardless.
569
238k
          if (queue_.empty()) {
570
18.4E
            VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
571
18.4E
                    << idle_timeout_.ToMilliseconds() << "ms of idle time.";
572
237k
            break;
573
237k
          }
574
40.7M
        }
575
31.4M
      }
576
40.7M
      continue;
577
40.7M
    }
578
579
    // Get the next token and task to execute.
580
41.1M
    ThreadPoolToken* token = queue_.front();
581
41.1M
    queue_.pop_front();
582
41.1M
    DCHECK_EQ(ThreadPoolTokenState::kRunning, token->state());
583
41.1M
    DCHECK(!token->entries_.empty());
584
41.1M
    Task task = std::move(token->entries_.front());
585
41.1M
    token->entries_.pop_front();
586
41.1M
    token->active_threads_++;
587
41.1M
    --total_queued_tasks_;
588
41.1M
    ++active_threads_;
589
590
41.1M
    unique_lock.Unlock();
591
592
    // Release the reference which was held by the queued item.
593
41.1M
    ADOPT_TRACE(task.trace);
594
41.1M
    if (task.trace) {
595
1.46M
      task.trace->Release();
596
1.46M
    }
597
598
    // Update metrics
599
41.1M
    MonoTime now(MonoTime::Now());
600
41.1M
    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
601
41.1M
    if (metrics_.queue_time_us_histogram) {
602
338k
      metrics_.queue_time_us_histogram->Increment(queue_time_us);
603
338k
    }
604
41.1M
    if (token->metrics_.queue_time_us_histogram) {
605
3
      token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
606
3
    }
607
608
    // Execute the task
609
41.1M
    {
610
41.1M
      MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
611
41.1M
      task.runnable->Run();
612
41.1M
      int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
613
614
41.1M
      if (metrics_.run_time_us_histogram) {
615
421k
        metrics_.run_time_us_histogram->Increment(wall_us);
616
421k
      }
617
41.1M
      if (token->metrics_.run_time_us_histogram) {
618
3
        token->metrics_.run_time_us_histogram->Increment(wall_us);
619
3
      }
620
41.1M
    }
621
    // Destruct the task while we do not hold the lock.
622
    //
623
    // The task's destructor may be expensive if it has a lot of bound
624
    // objects, and we don't want to block submission of the threadpool.
625
    // In the worst case, the destructor might even try to do something
626
    // with this threadpool, and produce a deadlock.
627
41.1M
    task.runnable.reset();
628
41.1M
    unique_lock.Lock();
629
630
    // Possible states:
631
    // 1. The token was shut down while we ran its task. Transition to kQuiesced.
632
    // 2. The token has no more queued tasks. Transition back to kIdle.
633
    // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
634
41.1M
    ThreadPoolTokenState state = token->state();
635
41.1M
    DCHECK(state == ThreadPoolTokenState::kRunning ||
636
41.1M
           state == ThreadPoolTokenState::kQuiescing);
637
41.1M
    if (--token->active_threads_ == 0) {
638
37.9M
      if (state == ThreadPoolTokenState::kQuiescing) {
639
591
        DCHECK(token->entries_.empty());
640
591
        token->Transition(ThreadPoolTokenState::kQuiesced);
641
37.9M
      } else if (token->entries_.empty()) {
642
32.0M
        token->Transition(ThreadPoolTokenState::kIdle);
643
5.90M
      } else if (token->mode() == ExecutionMode::SERIAL) {
644
2.67M
        queue_.emplace_back(token);
645
2.67M
      }
646
37.9M
    }
647
41.1M
    if (--active_threads_ == 0) {
648
30.9M
      idle_cond_.Broadcast();
649
30.9M
    }
650
41.1M
  }
651
652
  // It's important that we hold the lock between exiting the loop and dropping
653
  // num_threads_. Otherwise it's possible someone else could come along here
654
  // and add a new task just as the last running thread is about to exit.
655
354k
  CHECK(unique_lock.OwnsLock());
656
657
354k
  CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
658
354k
  if (--num_threads_ == 0) {
659
101k
    no_threads_cond_.Broadcast();
660
661
    // Sanity check: if we're the last thread exiting, the queue ought to be
662
    // empty. Otherwise it will never get processed.
663
101k
    CHECK(queue_.empty());
664
101k
    DCHECK_EQ(0, total_queued_tasks_);
665
101k
  }
666
354k
}
667
668
353k
Status ThreadPool::CreateThreadUnlocked() {
669
  // The first few threads are permanent, and do not time out.
670
353k
  bool permanent = (num_threads_ < min_threads_);
671
353k
  scoped_refptr<Thread> t;
672
353k
  Status s = yb::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
673
353k
                                  &ThreadPool::DispatchThread, this, permanent, &t);
674
354k
  if (s.ok()) {
675
354k
    InsertOrDie(&threads_, t.get());
676
354k
    num_threads_++;
677
354k
  }
678
353k
  return s;
679
353k
}
680
681
448k
void ThreadPool::CheckNotPoolThreadUnlocked() {
682
448k
  Thread* current = Thread::current_thread();
683
448k
  if (ContainsKey(threads_, current)) {
684
0
    LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with "
685
0
        "name '$1' called pool function that would result in deadlock",
686
0
        name_, current->name());
687
0
  }
688
448k
}
689
690
0
CHECKED_STATUS TaskRunner::Init(int concurrency) {
691
0
  ThreadPoolBuilder builder("Task Runner");
692
0
  if (concurrency > 0) {
693
0
    builder.set_max_threads(concurrency);
694
0
  }
695
0
  return builder.Build(&thread_pool_);
696
0
}
697
698
0
CHECKED_STATUS TaskRunner::Wait() {
699
0
  std::unique_lock<std::mutex> lock(mutex_);
700
0
  cond_.wait(lock, [this] { return running_tasks_ == 0; });
701
0
  return first_failure_;
702
0
}
703
704
0
void TaskRunner::CompleteTask(const Status& status) {
705
0
  if (!status.ok()) {
706
0
    bool expected = false;
707
0
    if (failed_.compare_exchange_strong(expected, true)) {
708
0
      first_failure_ = status;
709
0
    } else {
710
0
      LOG(WARNING) << status.message() << std::endl;
711
0
    }
712
0
  }
713
0
  if (--running_tasks_ == 0) {
714
0
    std::lock_guard<std::mutex> lock(mutex_);
715
0
    cond_.notify_one();
716
0
  }
717
0
}
718
719
} // namespace yb