YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/threadpool.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <algorithm>
34
#include <functional>
35
#include <limits>
36
#include <memory>
37
38
#include <gflags/gflags.h>
39
#include <glog/logging.h>
40
41
#include "yb/gutil/callback.h"
42
#include "yb/gutil/macros.h"
43
#include "yb/gutil/map-util.h"
44
#include "yb/gutil/stl_util.h"
45
#include "yb/gutil/strings/substitute.h"
46
#include "yb/gutil/sysinfo.h"
47
48
#include "yb/util/errno.h"
49
#include "yb/util/logging.h"
50
#include "yb/util/metrics.h"
51
#include "yb/util/thread.h"
52
#include "yb/util/threadpool.h"
53
#include "yb/util/trace.h"
54
55
namespace yb {
56
57
using strings::Substitute;
58
using std::unique_ptr;
59
60
61
2.82M
ThreadPoolMetrics::~ThreadPoolMetrics() = default;
62
63
////////////////////////////////////////////////////////
64
// ThreadPoolBuilder
65
///////////////////////////////////////////////////////
66
67
ThreadPoolBuilder::ThreadPoolBuilder(std::string name)
68
    : name_(std::move(name)),
69
      min_threads_(0),
70
      max_threads_(base::NumCPUs()),
71
      max_queue_size_(std::numeric_limits<int>::max()),
72
219k
      idle_timeout_(MonoDelta::FromMilliseconds(500)) {}
73
74
78.7k
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
75
78.7k
  CHECK_GE(min_threads, 0);
76
78.7k
  min_threads_ = min_threads;
77
78.7k
  return *this;
78
78.7k
}
79
80
116k
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
81
116k
  CHECK_GE(max_threads, 0);
82
116k
  max_threads_ = max_threads;
83
116k
  return *this;
84
116k
}
85
86
37.1k
ThreadPoolBuilder& ThreadPoolBuilder::unlimited_threads() {
87
37.1k
  max_threads_ = std::numeric_limits<int>::max();
88
37.1k
  return *this;
89
37.1k
}
90
91
18.5k
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
92
18.5k
  CHECK_GE(max_queue_size, 0);
93
18.5k
  max_queue_size_ = max_queue_size;
94
18.5k
  return *this;
95
18.5k
}
96
97
45.8k
ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
98
45.8k
  metrics_ = std::move(metrics);
99
45.8k
  return *this;
100
45.8k
}
101
102
9.28k
ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) {
103
9.28k
  idle_timeout_ = idle_timeout;
104
9.28k
  return *this;
105
9.28k
}
106
107
219k
Status ThreadPoolBuilder::Build(std::unique_ptr<ThreadPool>* pool) const {
108
219k
  pool->reset(new ThreadPool(*this));
109
219k
  RETURN_NOT_OK((*pool)->Init());
110
219k
  return Status::OK();
111
219k
}
112
113
////////////////////////////////////////////////////////
114
// ThreadPoolToken
115
////////////////////////////////////////////////////////
116
117
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
118
                                 ThreadPool::ExecutionMode mode,
119
                                 ThreadPoolMetrics metrics)
120
    : mode_(mode),
121
      pool_(pool),
122
      metrics_(std::move(metrics)),
123
      state_(ThreadPoolTokenState::kIdle),
124
      not_running_cond_(&pool->lock_),
125
1.03M
      active_threads_(0) {
126
1.03M
}
127
128
446k
ThreadPoolToken::~ThreadPoolToken() {
129
446k
  Shutdown();
130
446k
  pool_->ReleaseToken(this);
131
446k
}
132
133
34.8M
Status ThreadPoolToken::SubmitClosure(Closure c) {
134
34.8M
  return Submit(std::make_shared<FunctionRunnable>((std::bind(&Closure::Run, c))));
135
34.8M
}
136
137
58.0M
Status ThreadPoolToken::SubmitFunc(std::function<void()> f) {
138
58.0M
  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
139
58.0M
}
140
141
92.9M
Status ThreadPoolToken::Submit(std::shared_ptr<Runnable> r) {
142
92.9M
  return pool_->DoSubmit(std::move(r), this);
143
92.9M
}
144
145
674k
void ThreadPoolToken::Shutdown() {
146
674k
  MutexLock unique_lock(pool_->lock_);
147
674k
  pool_->CheckNotPoolThreadUnlocked();
148
149
  // Clear the queue under the lock, but defer the releasing of the tasks
150
  // outside the lock, in case there are concurrent threads wanting to access
151
  // the ThreadPool. The task's destructors may acquire locks, etc, so this
152
  // also prevents lock inversions.
153
674k
  deque<ThreadPool::Task> to_release = std::move(entries_);
154
674k
  pool_->total_queued_tasks_ -= to_release.size();
155
156
674k
  switch (state()) {
157
429k
    case ThreadPoolTokenState::kIdle:
158
      // There were no tasks outstanding; we can quiesce the token immediately.
159
429k
      Transition(ThreadPoolTokenState::kQuiesced);
160
429k
      break;
161
729
    case ThreadPoolTokenState::kRunning:
162
      // There were outstanding tasks. If any are still running, switch to
163
      // kQuiescing and wait for them to finish (the worker thread executing
164
      // the token's last task will switch the token to kQuiesced). Otherwise,
165
      // we can quiesce the token immediately.
166
167
      // Note: this is an O(n) operation, but it's expected to be infrequent.
168
      // Plus doing it this way (rather than switching to kQuiescing and waiting
169
      // for a worker thread to process the queue entry) helps retain state
170
      // transition symmetry with ThreadPool::Shutdown.
171
2.12k
      for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) {
172
1.39k
        if (*it == this) {
173
127
          it = pool_->queue_.erase(it);
174
1.26k
        } else {
175
1.26k
          it++;
176
1.26k
        }
177
1.39k
      }
178
179
729
      if (active_threads_ == 0) {
180
82
        Transition(ThreadPoolTokenState::kQuiesced);
181
82
        break;
182
82
      }
183
647
      Transition(ThreadPoolTokenState::kQuiescing);
184
647
      FALLTHROUGH_INTENDED;
185
647
    case ThreadPoolTokenState::kQuiescing:
186
      // The token is already quiescing. Just wait for a worker thread to
187
      // switch it to kQuiesced.
188
1.29k
      while (state() != ThreadPoolTokenState::kQuiesced) {
189
647
        not_running_cond_.Wait();
190
647
      }
191
647
      break;
192
245k
    default:
193
245k
      break;
194
674k
  }
195
196
  // Finally release the queued tasks, outside the lock.
197
675k
  unique_lock.Unlock();
198
675k
  for (auto& t : to_release) {
199
313
    if (t.trace) {
200
0
      t.trace->Release();
201
0
    }
202
313
  }
203
675k
}
204
205
4.82k
void ThreadPoolToken::Wait() {
206
4.82k
  MutexLock unique_lock(pool_->lock_);
207
4.82k
  pool_->CheckNotPoolThreadUnlocked();
208
4.89k
  while (IsActive()) {
209
78
    not_running_cond_.Wait();
210
78
  }
211
4.82k
}
212
213
0
bool ThreadPoolToken::WaitUntil(const MonoTime& until) {
214
0
  MutexLock unique_lock(pool_->lock_);
215
0
  pool_->CheckNotPoolThreadUnlocked();
216
0
  while (IsActive()) {
217
0
    if (!not_running_cond_.WaitUntil(until)) {
218
0
      return false;
219
0
    }
220
0
  }
221
0
  return true;
222
0
}
223
224
0
bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
225
0
  return WaitUntil(MonoTime::Now() + delta);
226
0
}
227
228
161M
void ThreadPoolToken::Transition(ThreadPoolTokenState new_state) {
229
161M
#ifndef NDEBUG
230
161M
  CHECK_NE(state_, new_state);
231
232
161M
  switch (state_) {
233
81.2M
    case ThreadPoolTokenState::kIdle:
234
81.2M
      CHECK(new_state == ThreadPoolTokenState::kRunning ||
235
81.2M
            new_state == ThreadPoolTokenState::kQuiesced);
236
81.2M
      if (new_state == ThreadPoolTokenState::kRunning) {
237
80.7M
        CHECK(!entries_.empty());
238
80.7M
      } else {
239
445k
        CHECK(entries_.empty());
240
445k
        CHECK_EQ(active_threads_, 0);
241
445k
      }
242
81.2M
      break;
243
80.7M
    case ThreadPoolTokenState::kRunning:
244
80.7M
      CHECK(new_state == ThreadPoolTokenState::kIdle ||
245
80.7M
            new_state == ThreadPoolTokenState::kQuiescing ||
246
80.7M
            new_state == ThreadPoolTokenState::kQuiesced);
247
80.7M
      CHECK(entries_.empty());
248
80.7M
      if (new_state == ThreadPoolTokenState::kQuiescing) {
249
657
        CHECK_GT(active_threads_, 0);
250
657
      }
251
80.7M
      break;
252
656
    case ThreadPoolTokenState::kQuiescing:
253
656
      CHECK(new_state == ThreadPoolTokenState::kQuiesced);
254
656
      CHECK_EQ(active_threads_, 0);
255
656
      break;
256
0
    case ThreadPoolTokenState::kQuiesced:
257
0
      CHECK(false); // kQuiesced is a terminal state
258
0
      break;
259
0
    default:
260
0
      LOG(FATAL) << "Unknown token state: " << state_;
261
161M
  }
262
161M
#endif
263
264
  // Take actions based on the state we're entering.
265
161M
  switch (new_state) {
266
80.7M
    case ThreadPoolTokenState::kIdle:
267
81.1M
    case ThreadPoolTokenState::kQuiesced:
268
81.1M
      not_running_cond_.Broadcast();
269
81.1M
      break;
270
80.7M
    default:
271
80.7M
      break;
272
161M
  }
273
274
161M
  state_ = new_state;
275
161M
}
276
277
0
const char* ThreadPoolToken::StateToString(ThreadPoolTokenState s) {
278
0
  switch (s) {
279
0
    case ThreadPoolTokenState::kIdle: return "kIdle"; break;
280
0
    case ThreadPoolTokenState::kRunning: return "kRunning"; break;
281
0
    case ThreadPoolTokenState::kQuiescing: return "kQuiescing"; break;
282
0
    case ThreadPoolTokenState::kQuiesced: return "kQuiesced"; break;
283
0
  }
284
0
  return "<cannot reach here>";
285
0
}
286
287
////////////////////////////////////////////////////////
288
// ThreadPool
289
////////////////////////////////////////////////////////
290
291
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
292
  : name_(builder.name_),
293
    min_threads_(builder.min_threads_),
294
    max_threads_(builder.max_threads_),
295
    max_queue_size_(builder.max_queue_size_),
296
    idle_timeout_(builder.idle_timeout_),
297
    pool_status_(STATUS(Uninitialized, "The pool was not initialized.")),
298
    idle_cond_(&lock_),
299
    no_threads_cond_(&lock_),
300
    not_empty_(&lock_),
301
    num_threads_(0),
302
    active_threads_(0),
303
    total_queued_tasks_(0),
304
    tokenless_(NewToken(ExecutionMode::CONCURRENT)),
305
219k
    metrics_(builder.metrics_) {
306
219k
}
307
308
16.2k
ThreadPool::~ThreadPool() {
309
  // There should only be one live token: the one used in tokenless submission.
310
16.2k
  CHECK_EQ(1, tokens_.size()) << Substitute(
311
0
      "Threadpool $0 destroyed with $1 allocated tokens",
312
0
      name_, tokens_.size());
313
16.2k
  Shutdown();
314
16.2k
}
315
316
219k
Status ThreadPool::Init() {
317
219k
  MutexLock unique_lock(lock_);
318
219k
  if (!pool_status_.IsUninitialized()) {
319
0
    return STATUS(NotSupported, "The thread pool is already initialized");
320
0
  }
321
219k
  pool_status_ = Status::OK();
322
298k
  for (int i = 0; i < min_threads_; 
i++78.7k
) {
323
78.7k
    Status status = CreateThreadUnlocked();
324
78.7k
    if (!status.ok()) {
325
0
      if (i != 0) {
326
0
        YB_LOG_EVERY_N_SECS(WARNING, 5) << "Cannot create thread: " << status << ", will try later";
327
        // Cannot create enough threads now, will try later.
328
0
        break;
329
0
      }
330
0
      unique_lock.Unlock();
331
0
      Shutdown();
332
0
      return status;
333
0
    }
334
78.7k
  }
335
219k
  return Status::OK();
336
219k
}
337
338
31.2k
void ThreadPool::Shutdown() {
339
31.2k
  MutexLock unique_lock(lock_);
340
31.2k
  CheckNotPoolThreadUnlocked();
341
342
  // Note: this is the same error seen at submission if the pool is at
343
  // capacity, so clients can't tell them apart. This isn't really a practical
344
  // concern though because shutting down a pool typically requires clients to
345
  // be quiesced first, so there's no danger of a client getting confused.
346
31.2k
  pool_status_ = STATUS(ServiceUnavailable, "The pool has been shut down.");
347
348
  // Clear the various queues under the lock, but defer the releasing
349
  // of the tasks outside the lock, in case there are concurrent threads
350
  // wanting to access the ThreadPool. The task's destructors may acquire
351
  // locks, etc, so this also prevents lock inversions.
352
31.2k
  queue_.clear();
353
31.2k
  deque<deque<Task>> to_release;
354
31.4k
  for (auto* t : tokens_) {
355
31.4k
    if (!t->entries_.empty()) {
356
14
      to_release.emplace_back(std::move(t->entries_));
357
14
    }
358
31.4k
    switch (t->state()) {
359
16.4k
      case ThreadPoolTokenState::kIdle:
360
        // The token is idle; we can quiesce it immediately.
361
16.4k
        t->Transition(ThreadPoolTokenState::kQuiesced);
362
16.4k
        break;
363
23
      case ThreadPoolTokenState::kRunning:
364
        // The token has tasks associated with it. If they're merely queued
365
        // (i.e. there are no active threads), the tasks will have been removed
366
        // above and we can quiesce immediately. Otherwise, we need to wait for
367
        // the threads to finish.
368
23
        t->Transition(t->active_threads_ > 0 ?
369
10
            ThreadPoolTokenState::kQuiescing :
370
23
            
ThreadPoolTokenState::kQuiesced13
);
371
23
        break;
372
14.9k
      default:
373
14.9k
        break;
374
31.4k
    }
375
31.4k
  }
376
377
  // The queues are empty. Wake any sleeping worker threads and wait for all
378
  // of them to exit. Some worker threads will exit immediately upon waking,
379
  // while others will exit after they finish executing an outstanding task.
380
31.2k
  total_queued_tasks_ = 0;
381
31.2k
  not_empty_.Broadcast();
382
35.4k
  while (num_threads_ > 0) {
383
4.14k
    no_threads_cond_.Wait();
384
4.14k
  }
385
386
  // All the threads have exited. Check the state of each token.
387
31.4k
  for (auto* t : tokens_) {
388
31.4k
    DCHECK(t->state() == ThreadPoolTokenState::kIdle ||
389
31.4k
           t->state() == ThreadPoolTokenState::kQuiesced);
390
31.4k
  }
391
392
  // Finally release the queued tasks, outside the lock.
393
31.2k
  unique_lock.Unlock();
394
31.2k
  for (auto& token : to_release) {
395
42
    for (auto& t : token) {
396
42
      if (t.trace) {
397
0
        t.trace->Release();
398
0
      }
399
42
    }
400
14
  }
401
31.2k
}
402
403
1.03M
unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
404
1.03M
  return NewTokenWithMetrics(mode, {});
405
1.03M
}
406
407
unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
408
1.03M
    ExecutionMode mode, ThreadPoolMetrics metrics) {
409
1.03M
  MutexLock guard(lock_);
410
1.03M
  unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, std::move(metrics)));
411
1.03M
  InsertOrDie(&tokens_, t.get());
412
1.03M
  return t;
413
1.03M
}
414
415
446k
void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
416
446k
  MutexLock guard(lock_);
417
18.4E
  CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
418
18.4E
                                      ThreadPoolToken::StateToString(t->state()));
419
446k
  CHECK_EQ(1, tokens_.erase(t));
420
446k
}
421
422
423
11.0k
Status ThreadPool::SubmitClosure(const Closure& task) {
424
  // TODO: once all uses of std::bind-based tasks are dead, implement this
425
  // in a more straight-forward fashion.
426
11.0k
  return SubmitFunc(std::bind(&Closure::Run, task));
427
11.0k
}
428
429
963
Status ThreadPool::SubmitFunc(const std::function<void()>& func) {
430
963
  return Submit(std::make_shared<FunctionRunnable>(func));
431
963
}
432
433
0
Status ThreadPool::SubmitFunc(std::function<void()>&& func) {
434
0
  return Submit(std::make_shared<FunctionRunnable>(std::move(func)));
435
0
}
436
437
1.89M
Status ThreadPool::Submit(const std::shared_ptr<Runnable>& r) {
438
1.89M
  return DoSubmit(std::move(r), tokenless_.get());
439
1.89M
}
440
441
94.8M
Status ThreadPool::DoSubmit(const std::shared_ptr<Runnable> task, ThreadPoolToken* token) {
442
94.8M
  DCHECK(token);
443
94.8M
  MonoTime submit_time = MonoTime::Now();
444
445
94.8M
  MutexLock guard(lock_);
446
94.8M
  if (PREDICT_FALSE(!pool_status_.ok())) {
447
2
    return pool_status_;
448
2
  }
449
450
94.8M
  if (PREDICT_FALSE(!token->MaySubmitNewTasks())) {
451
416
    return STATUS(ServiceUnavailable, "Thread pool token was shut down.", "", Errno(ESHUTDOWN));
452
416
  }
453
454
  // Size limit check.
455
94.8M
  int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
456
94.8M
                               static_cast<int64_t>(max_queue_size_) - total_queued_tasks_;
457
94.8M
  if (capacity_remaining < 1) {
458
7
    return STATUS(ServiceUnavailable,
459
7
                  Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
460
7
                             num_threads_, max_threads_, total_queued_tasks_, max_queue_size_),
461
7
                  "", Errno(ESHUTDOWN));
462
7
  }
463
464
  // Should we create another thread?
465
  // We assume that each current inactive thread will grab one item from the
466
  // queue.  If it seems like we'll need another thread, we create one.
467
  // In theory, a currently active thread could finish immediately after this
468
  // calculation.  This would mean we created a thread we didn't really need.
469
  // However, this race is unavoidable, since we don't do the work under a lock.
470
  // It's also harmless.
471
  //
472
  // Of course, we never create more than max_threads_ threads no matter what.
473
94.8M
  int threads_from_this_submit =
474
94.8M
      token->IsActive() && 
token->mode() == ExecutionMode::SERIAL14.1M
?
05.08M
:
189.7M
;
475
94.8M
  int inactive_threads = num_threads_ - active_threads_;
476
94.8M
  int64_t additional_threads = (queue_.size() + threads_from_this_submit) - inactive_threads;
477
94.8M
  if (additional_threads > 0 && 
num_threads_ < max_threads_1.05M
) {
478
933k
    Status status = CreateThreadUnlocked();
479
933k
    if (!status.ok()) {
480
      // If we failed to create a thread, but there are still some other
481
      // worker threads, log a warning message and continue.
482
0
      LOG(WARNING) << "Thread pool failed to create thread: " << status << ", num_threads: "
483
0
                   << num_threads_ << ", max_threads: " << max_threads_;
484
0
      if (num_threads_ == 0) {
485
        // If we have no threads, we can't do any work.
486
0
        return status;
487
0
      }
488
0
    }
489
933k
  }
490
491
94.8M
  Task e;
492
94.8M
  e.runnable = task;
493
94.8M
  e.trace = Trace::CurrentTrace();
494
  // Need to AddRef, since the thread which submitted the task may go away,
495
  // and we don't want the trace to be destructed while waiting in the queue.
496
94.8M
  if (e.trace) {
497
7.30M
    e.trace->AddRef();
498
7.30M
  }
499
94.8M
  e.submit_time = submit_time;
500
501
  // Add the task to the token's queue.
502
94.8M
  ThreadPoolTokenState state = token->state();
503
94.8M
  DCHECK(state == ThreadPoolTokenState::kIdle ||
504
94.8M
         state == ThreadPoolTokenState::kRunning);
505
94.8M
  token->entries_.emplace_back(std::move(e));
506
94.8M
  if (state == ThreadPoolTokenState::kIdle ||
507
94.8M
      
token->mode() == ExecutionMode::CONCURRENT14.1M
) {
508
89.8M
    queue_.emplace_back(token);
509
89.8M
    if (state == ThreadPoolTokenState::kIdle) {
510
80.7M
      token->Transition(ThreadPoolTokenState::kRunning);
511
80.7M
    }
512
89.8M
  }
513
94.8M
  int length_at_submit = total_queued_tasks_++;
514
515
94.8M
  guard.Unlock();
516
94.8M
  not_empty_.Signal();
517
518
94.8M
  if (metrics_.queue_length_histogram) {
519
593k
    metrics_.queue_length_histogram->Increment(length_at_submit);
520
593k
  }
521
94.8M
  if (token->metrics_.queue_length_histogram) {
522
3
    token->metrics_.queue_length_histogram->Increment(length_at_submit);
523
3
  }
524
525
94.8M
  return Status::OK();
526
94.8M
}
527
528
284
void ThreadPool::Wait() {
529
284
  MutexLock unique_lock(lock_);
530
359
  while ((!queue_.empty()) || 
(active_threads_ > 0)327
) {
531
75
    idle_cond_.Wait();
532
75
  }
533
284
}
534
535
1.01k
bool ThreadPool::WaitUntil(const MonoTime& until) {
536
1.01k
  MutexLock unique_lock(lock_);
537
1.01k
  while ((!queue_.empty()) || (active_threads_ > 0)) {
538
0
    if (!idle_cond_.WaitUntil(until)) {
539
0
      return false;
540
0
    }
541
0
  }
542
1.01k
  return true;
543
1.01k
}
544
545
1.01k
bool ThreadPool::WaitFor(const MonoDelta& delta) {
546
1.01k
  return WaitUntil(MonoTime::Now() + delta);
547
1.01k
}
548
549
1.01M
void ThreadPool::DispatchThread(bool permanent) {
550
1.01M
  MutexLock unique_lock(lock_);
551
189M
  while (true) {
552
    // Note: STATUS(Aborted, ) is used to indicate normal shutdown.
553
189M
    if (!pool_status_.ok()) {
554
18.4E
      VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
555
4.55k
      break;
556
4.55k
    }
557
558
189M
    if (queue_.empty()) {
559
94.5M
      if (permanent) {
560
24.8M
        not_empty_.Wait();
561
69.7M
      } else {
562
69.7M
        if (!not_empty_.TimedWait(idle_timeout_)) {
563
          // After much investigation, it appears that pthread condition variables have
564
          // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
565
          // another thread did in fact signal. Apparently after a timeout there is some
566
          // brief period during which another thread may actually grab the internal mutex
567
          // protecting the state, signal, and release again before we get the mutex. So,
568
          // we'll recheck the empty queue case regardless.
569
837k
          if (queue_.empty()) {
570
18.4E
            VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
571
18.4E
                    << idle_timeout_.ToMilliseconds() << "ms of idle time.";
572
832k
            break;
573
832k
          }
574
837k
        }
575
69.7M
      }
576
93.7M
      continue;
577
94.5M
    }
578
579
    // Get the next token and task to execute.
580
94.9M
    ThreadPoolToken* token = queue_.front();
581
94.9M
    queue_.pop_front();
582
94.9M
    DCHECK_EQ(ThreadPoolTokenState::kRunning, token->state());
583
94.9M
    DCHECK(!token->entries_.empty());
584
94.9M
    Task task = std::move(token->entries_.front());
585
94.9M
    token->entries_.pop_front();
586
94.9M
    token->active_threads_++;
587
94.9M
    --total_queued_tasks_;
588
94.9M
    ++active_threads_;
589
590
94.9M
    unique_lock.Unlock();
591
592
    // Release the reference which was held by the queued item.
593
94.9M
    ADOPT_TRACE(task.trace);
594
94.9M
    if (task.trace) {
595
7.30M
      task.trace->Release();
596
7.30M
    }
597
598
    // Update metrics
599
94.9M
    MonoTime now(MonoTime::Now());
600
94.9M
    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
601
94.9M
    if (metrics_.queue_time_us_histogram) {
602
593k
      metrics_.queue_time_us_histogram->Increment(queue_time_us);
603
593k
    }
604
94.9M
    if (token->metrics_.queue_time_us_histogram) {
605
3
      token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
606
3
    }
607
608
    // Execute the task
609
94.9M
    {
610
94.9M
      MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
611
94.9M
      task.runnable->Run();
612
94.9M
      int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
613
614
94.9M
      if (metrics_.run_time_us_histogram) {
615
733k
        metrics_.run_time_us_histogram->Increment(wall_us);
616
733k
      }
617
94.9M
      if (token->metrics_.run_time_us_histogram) {
618
3
        token->metrics_.run_time_us_histogram->Increment(wall_us);
619
3
      }
620
94.9M
    }
621
    // Destruct the task while we do not hold the lock.
622
    //
623
    // The task's destructor may be expensive if it has a lot of bound
624
    // objects, and we don't want to block submission of the threadpool.
625
    // In the worst case, the destructor might even try to do something
626
    // with this threadpool, and produce a deadlock.
627
94.9M
    task.runnable.reset();
628
94.9M
    unique_lock.Lock();
629
630
    // Possible states:
631
    // 1. The token was shut down while we ran its task. Transition to kQuiesced.
632
    // 2. The token has no more queued tasks. Transition back to kIdle.
633
    // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
634
94.9M
    ThreadPoolTokenState state = token->state();
635
94.9M
    DCHECK(state == ThreadPoolTokenState::kRunning ||
636
94.9M
           state == ThreadPoolTokenState::kQuiescing);
637
94.9M
    if (--token->active_threads_ == 0) {
638
89.1M
      if (state == ThreadPoolTokenState::kQuiescing) {
639
656
        DCHECK(token->entries_.empty());
640
656
        token->Transition(ThreadPoolTokenState::kQuiesced);
641
89.1M
      } else if (token->entries_.empty()) {
642
80.7M
        token->Transition(ThreadPoolTokenState::kIdle);
643
80.7M
      } else 
if (8.42M
token->mode() == ExecutionMode::SERIAL8.42M
) {
644
5.08M
        queue_.emplace_back(token);
645
5.08M
      }
646
89.1M
    }
647
94.9M
    if (--active_threads_ == 0) {
648
75.7M
      idle_cond_.Broadcast();
649
75.7M
    }
650
94.9M
  }
651
652
  // It's important that we hold the lock between exiting the loop and dropping
653
  // num_threads_. Otherwise it's possible someone else could come along here
654
  // and add a new task just as the last running thread is about to exit.
655
1.01M
  CHECK(unique_lock.OwnsLock());
656
657
1.01M
  CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
658
1.01M
  if (--num_threads_ == 0) {
659
527k
    no_threads_cond_.Broadcast();
660
661
    // Sanity check: if we're the last thread exiting, the queue ought to be
662
    // empty. Otherwise it will never get processed.
663
527k
    CHECK(queue_.empty());
664
527k
    DCHECK_EQ(0, total_queued_tasks_);
665
527k
  }
666
1.01M
}
667
668
1.01M
Status ThreadPool::CreateThreadUnlocked() {
669
  // The first few threads are permanent, and do not time out.
670
1.01M
  bool permanent = (num_threads_ < min_threads_);
671
1.01M
  scoped_refptr<Thread> t;
672
1.01M
  Status s = yb::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
673
1.01M
                                  &ThreadPool::DispatchThread, this, permanent, &t);
674
1.01M
  if (
s.ok()1.01M
) {
675
1.01M
    InsertOrDie(&threads_, t.get());
676
1.01M
    num_threads_++;
677
1.01M
  }
678
1.01M
  return s;
679
1.01M
}
680
681
711k
void ThreadPool::CheckNotPoolThreadUnlocked() {
682
711k
  Thread* current = Thread::current_thread();
683
711k
  if (ContainsKey(threads_, current)) {
684
0
    LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with "
685
0
        "name '$1' called pool function that would result in deadlock",
686
0
        name_, current->name());
687
0
  }
688
711k
}
689
690
0
CHECKED_STATUS TaskRunner::Init(int concurrency) {
691
0
  ThreadPoolBuilder builder("Task Runner");
692
0
  if (concurrency > 0) {
693
0
    builder.set_max_threads(concurrency);
694
0
  }
695
0
  return builder.Build(&thread_pool_);
696
0
}
697
698
0
CHECKED_STATUS TaskRunner::Wait() {
699
0
  std::unique_lock<std::mutex> lock(mutex_);
700
0
  cond_.wait(lock, [this] { return running_tasks_ == 0; });
701
0
  return first_failure_;
702
0
}
703
704
0
void TaskRunner::CompleteTask(const Status& status) {
705
0
  if (!status.ok()) {
706
0
    bool expected = false;
707
0
    if (failed_.compare_exchange_strong(expected, true)) {
708
0
      first_failure_ = status;
709
0
    } else {
710
0
      LOG(WARNING) << status.message() << std::endl;
711
0
    }
712
0
  }
713
0
  if (--running_tasks_ == 0) {
714
0
    std::lock_guard<std::mutex> lock(mutex_);
715
0
    cond_.notify_one();
716
0
  }
717
0
}
718
719
} // namespace yb