/Users/deen/code/yugabyte-db/src/yb/util/threadpool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <algorithm> |
34 | | #include <functional> |
35 | | #include <limits> |
36 | | #include <memory> |
37 | | |
38 | | #include <gflags/gflags.h> |
39 | | #include <glog/logging.h> |
40 | | |
41 | | #include "yb/gutil/callback.h" |
42 | | #include "yb/gutil/macros.h" |
43 | | #include "yb/gutil/map-util.h" |
44 | | #include "yb/gutil/stl_util.h" |
45 | | #include "yb/gutil/strings/substitute.h" |
46 | | #include "yb/gutil/sysinfo.h" |
47 | | |
48 | | #include "yb/util/errno.h" |
49 | | #include "yb/util/logging.h" |
50 | | #include "yb/util/metrics.h" |
51 | | #include "yb/util/thread.h" |
52 | | #include "yb/util/threadpool.h" |
53 | | #include "yb/util/trace.h" |
54 | | |
55 | | namespace yb { |
56 | | |
57 | | using strings::Substitute; |
58 | | using std::unique_ptr; |
59 | | |
60 | | |
61 | 1.72M | ThreadPoolMetrics::~ThreadPoolMetrics() = default; |
62 | | |
63 | | //////////////////////////////////////////////////////// |
64 | | // ThreadPoolBuilder |
65 | | /////////////////////////////////////////////////////// |
66 | | |
67 | | ThreadPoolBuilder::ThreadPoolBuilder(std::string name) |
68 | | : name_(std::move(name)), |
69 | | min_threads_(0), |
70 | | max_threads_(base::NumCPUs()), |
71 | | max_queue_size_(std::numeric_limits<int>::max()), |
72 | 147k | idle_timeout_(MonoDelta::FromMilliseconds(500)) {} |
73 | | |
74 | 52.3k | ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { |
75 | 52.3k | CHECK_GE(min_threads, 0); |
76 | 52.3k | min_threads_ = min_threads; |
77 | 52.3k | return *this; |
78 | 52.3k | } |
79 | | |
80 | 77.6k | ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { |
81 | 77.6k | CHECK_GE(max_threads, 0); |
82 | 77.6k | max_threads_ = max_threads; |
83 | 77.6k | return *this; |
84 | 77.6k | } |
85 | | |
86 | 24.4k | ThreadPoolBuilder& ThreadPoolBuilder::unlimited_threads() { |
87 | 24.4k | max_threads_ = std::numeric_limits<int>::max(); |
88 | 24.4k | return *this; |
89 | 24.4k | } |
90 | | |
91 | 12.2k | ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { |
92 | 12.2k | CHECK_GE(max_queue_size, 0); |
93 | 12.2k | max_queue_size_ = max_queue_size; |
94 | 12.2k | return *this; |
95 | 12.2k | } |
96 | | |
97 | 30.2k | ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) { |
98 | 30.2k | metrics_ = std::move(metrics); |
99 | 30.2k | return *this; |
100 | 30.2k | } |
101 | | |
102 | 6.10k | ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) { |
103 | 6.10k | idle_timeout_ = idle_timeout; |
104 | 6.10k | return *this; |
105 | 6.10k | } |
106 | | |
107 | 146k | Status ThreadPoolBuilder::Build(std::unique_ptr<ThreadPool>* pool) const { |
108 | 146k | pool->reset(new ThreadPool(*this)); |
109 | 146k | RETURN_NOT_OK((*pool)->Init()); |
110 | 146k | return Status::OK(); |
111 | 146k | } |
112 | | |
113 | | //////////////////////////////////////////////////////// |
114 | | // ThreadPoolToken |
115 | | //////////////////////////////////////////////////////// |
116 | | |
117 | | ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, |
118 | | ThreadPool::ExecutionMode mode, |
119 | | ThreadPoolMetrics metrics) |
120 | | : mode_(mode), |
121 | | pool_(pool), |
122 | | metrics_(std::move(metrics)), |
123 | | state_(ThreadPoolTokenState::kIdle), |
124 | | not_running_cond_(&pool->lock_), |
125 | 622k | active_threads_(0) { |
126 | 622k | } |
127 | | |
128 | 276k | ThreadPoolToken::~ThreadPoolToken() { |
129 | 276k | Shutdown(); |
130 | 276k | pool_->ReleaseToken(this); |
131 | 276k | } |
132 | | |
133 | 15.2M | Status ThreadPoolToken::SubmitClosure(Closure c) { |
134 | 15.2M | return Submit(std::make_shared<FunctionRunnable>((std::bind(&Closure::Run, c)))); |
135 | 15.2M | } |
136 | | |
137 | 24.9M | Status ThreadPoolToken::SubmitFunc(std::function<void()> f) { |
138 | 24.9M | return Submit(std::make_shared<FunctionRunnable>(std::move(f))); |
139 | 24.9M | } |
140 | | |
141 | 40.2M | Status ThreadPoolToken::Submit(std::shared_ptr<Runnable> r) { |
142 | 40.2M | return pool_->DoSubmit(std::move(r), this); |
143 | 40.2M | } |
144 | | |
145 | 421k | void ThreadPoolToken::Shutdown() { |
146 | 421k | MutexLock unique_lock(pool_->lock_); |
147 | 421k | pool_->CheckNotPoolThreadUnlocked(); |
148 | | |
149 | | // Clear the queue under the lock, but defer the releasing of the tasks |
150 | | // outside the lock, in case there are concurrent threads wanting to access |
151 | | // the ThreadPool. The task's destructors may acquire locks, etc, so this |
152 | | // also prevents lock inversions. |
153 | 421k | deque<ThreadPool::Task> to_release = std::move(entries_); |
154 | 421k | pool_->total_queued_tasks_ -= to_release.size(); |
155 | | |
156 | 421k | switch (state()) { |
157 | 263k | case ThreadPoolTokenState::kIdle: |
158 | | // There were no tasks outstanding; we can quiesce the token immediately. |
159 | 263k | Transition(ThreadPoolTokenState::kQuiesced); |
160 | 263k | break; |
161 | 726 | case ThreadPoolTokenState::kRunning: |
162 | | // There were outstanding tasks. If any are still running, switch to |
163 | | // kQuiescing and wait for them to finish (the worker thread executing |
164 | | // the token's last task will switch the token to kQuiesced). Otherwise, |
165 | | // we can quiesce the token immediately. |
166 | | |
167 | | // Note: this is an O(n) operation, but it's expected to be infrequent. |
168 | | // Plus doing it this way (rather than switching to kQuiescing and waiting |
169 | | // for a worker thread to process the queue entry) helps retain state |
170 | | // transition symmetry with ThreadPool::Shutdown. |
171 | 24.5k | for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) { |
172 | 23.8k | if (*it == this) { |
173 | 2.03k | it = pool_->queue_.erase(it); |
174 | 21.7k | } else { |
175 | 21.7k | it++; |
176 | 21.7k | } |
177 | 23.8k | } |
178 | | |
179 | 726 | if (active_threads_ == 0) { |
180 | 153 | Transition(ThreadPoolTokenState::kQuiesced); |
181 | 153 | break; |
182 | 153 | } |
183 | 573 | Transition(ThreadPoolTokenState::kQuiescing); |
184 | 573 | FALLTHROUGH_INTENDED; |
185 | 574 | case ThreadPoolTokenState::kQuiescing: |
186 | | // The token is already quiescing. Just wait for a worker thread to |
187 | | // switch it to kQuiesced. |
188 | 1.14k | while (state() != ThreadPoolTokenState::kQuiesced) { |
189 | 574 | not_running_cond_.Wait(); |
190 | 574 | } |
191 | 574 | break; |
192 | 157k | default: |
193 | 157k | break; |
194 | 421k | } |
195 | | |
196 | | // Finally release the queued tasks, outside the lock. |
197 | 421k | unique_lock.Unlock(); |
198 | 4.37k | for (auto& t : to_release) { |
199 | 4.37k | if (t.trace) { |
200 | 0 | t.trace->Release(); |
201 | 0 | } |
202 | 4.37k | } |
203 | 421k | } |
204 | | |
205 | 2.59k | void ThreadPoolToken::Wait() { |
206 | 2.59k | MutexLock unique_lock(pool_->lock_); |
207 | 2.59k | pool_->CheckNotPoolThreadUnlocked(); |
208 | 2.67k | while (IsActive()) { |
209 | 79 | not_running_cond_.Wait(); |
210 | 79 | } |
211 | 2.59k | } |
212 | | |
213 | 0 | bool ThreadPoolToken::WaitUntil(const MonoTime& until) { |
214 | 0 | MutexLock unique_lock(pool_->lock_); |
215 | 0 | pool_->CheckNotPoolThreadUnlocked(); |
216 | 0 | while (IsActive()) { |
217 | 0 | if (!not_running_cond_.WaitUntil(until)) { |
218 | 0 | return false; |
219 | 0 | } |
220 | 0 | } |
221 | 0 | return true; |
222 | 0 | } |
223 | | |
224 | 0 | bool ThreadPoolToken::WaitFor(const MonoDelta& delta) { |
225 | 0 | return WaitUntil(MonoTime::Now() + delta); |
226 | 0 | } |
227 | | |
228 | 64.3M | void ThreadPoolToken::Transition(ThreadPoolTokenState new_state) { |
229 | 64.3M | #ifndef NDEBUG |
230 | 64.3M | CHECK_NE(state_, new_state); |
231 | | |
232 | 64.3M | switch (state_) { |
233 | 32.3M | case ThreadPoolTokenState::kIdle: |
234 | 32.3M | CHECK(new_state == ThreadPoolTokenState::kRunning || |
235 | 32.3M | new_state == ThreadPoolTokenState::kQuiesced); |
236 | 32.3M | if (new_state == ThreadPoolTokenState::kRunning) { |
237 | 32.0M | CHECK(!entries_.empty()); |
238 | 275k | } else { |
239 | 275k | CHECK(entries_.empty()); |
240 | 275k | CHECK_EQ(active_threads_, 0); |
241 | 275k | } |
242 | 32.3M | break; |
243 | 32.0M | case ThreadPoolTokenState::kRunning: |
244 | 32.0M | CHECK(new_state == ThreadPoolTokenState::kIdle || |
245 | 32.0M | new_state == ThreadPoolTokenState::kQuiescing || |
246 | 32.0M | new_state == ThreadPoolTokenState::kQuiesced); |
247 | 32.0M | CHECK(entries_.empty()); |
248 | 32.0M | if (new_state == ThreadPoolTokenState::kQuiescing) { |
249 | 592 | CHECK_GT(active_threads_, 0); |
250 | 592 | } |
251 | 32.0M | break; |
252 | 591 | case ThreadPoolTokenState::kQuiescing: |
253 | 591 | CHECK(new_state == ThreadPoolTokenState::kQuiesced); |
254 | 591 | CHECK_EQ(active_threads_, 0); |
255 | 591 | break; |
256 | 0 | case ThreadPoolTokenState::kQuiesced: |
257 | 0 | CHECK(false); // kQuiesced is a terminal state |
258 | 0 | break; |
259 | 0 | default: |
260 | 0 | LOG(FATAL) << "Unknown token state: " << state_; |
261 | 64.3M | } |
262 | 64.3M | #endif |
263 | | |
264 | | // Take actions based on the state we're entering. |
265 | 64.3M | switch (new_state) { |
266 | 32.0M | case ThreadPoolTokenState::kIdle: |
267 | 32.3M | case ThreadPoolTokenState::kQuiesced: |
268 | 32.3M | not_running_cond_.Broadcast(); |
269 | 32.3M | break; |
270 | 32.0M | default: |
271 | 32.0M | break; |
272 | 64.3M | } |
273 | | |
274 | 64.3M | state_ = new_state; |
275 | 64.3M | } |
276 | | |
277 | 0 | const char* ThreadPoolToken::StateToString(ThreadPoolTokenState s) { |
278 | 0 | switch (s) { |
279 | 0 | case ThreadPoolTokenState::kIdle: return "kIdle"; break; |
280 | 0 | case ThreadPoolTokenState::kRunning: return "kRunning"; break; |
281 | 0 | case ThreadPoolTokenState::kQuiescing: return "kQuiescing"; break; |
282 | 0 | case ThreadPoolTokenState::kQuiesced: return "kQuiesced"; break; |
283 | 0 | } |
284 | 0 | return "<cannot reach here>"; |
285 | 0 | } |
286 | | |
287 | | //////////////////////////////////////////////////////// |
288 | | // ThreadPool |
289 | | //////////////////////////////////////////////////////// |
290 | | |
291 | | ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) |
292 | | : name_(builder.name_), |
293 | | min_threads_(builder.min_threads_), |
294 | | max_threads_(builder.max_threads_), |
295 | | max_queue_size_(builder.max_queue_size_), |
296 | | idle_timeout_(builder.idle_timeout_), |
297 | | pool_status_(STATUS(Uninitialized, "The pool was not initialized.")), |
298 | | idle_cond_(&lock_), |
299 | | no_threads_cond_(&lock_), |
300 | | not_empty_(&lock_), |
301 | | num_threads_(0), |
302 | | active_threads_(0), |
303 | | total_queued_tasks_(0), |
304 | | tokenless_(NewToken(ExecutionMode::CONCURRENT)), |
305 | 147k | metrics_(builder.metrics_) { |
306 | 147k | } |
307 | | |
308 | 12.0k | ThreadPool::~ThreadPool() { |
309 | | // There should only be one live token: the one used in tokenless submission. |
310 | 0 | CHECK_EQ(1, tokens_.size()) << Substitute( |
311 | 0 | "Threadpool $0 destroyed with $1 allocated tokens", |
312 | 0 | name_, tokens_.size()); |
313 | 12.0k | Shutdown(); |
314 | 12.0k | } |
315 | | |
316 | 147k | Status ThreadPool::Init() { |
317 | 147k | MutexLock unique_lock(lock_); |
318 | 147k | if (!pool_status_.IsUninitialized()) { |
319 | 0 | return STATUS(NotSupported, "The thread pool is already initialized"); |
320 | 0 | } |
321 | 147k | pool_status_ = Status::OK(); |
322 | 199k | for (int i = 0; i < min_threads_; i++) { |
323 | 52.3k | Status status = CreateThreadUnlocked(); |
324 | 52.3k | if (!status.ok()) { |
325 | 0 | if (i != 0) { |
326 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 5) << "Cannot create thread: " << status << ", will try later"; |
327 | | // Cannot create enough threads now, will try later. |
328 | 0 | break; |
329 | 0 | } |
330 | 0 | unique_lock.Unlock(); |
331 | 0 | Shutdown(); |
332 | 0 | return status; |
333 | 0 | } |
334 | 52.3k | } |
335 | 147k | return Status::OK(); |
336 | 147k | } |
337 | | |
338 | 24.7k | void ThreadPool::Shutdown() { |
339 | 24.7k | MutexLock unique_lock(lock_); |
340 | 24.7k | CheckNotPoolThreadUnlocked(); |
341 | | |
342 | | // Note: this is the same error seen at submission if the pool is at |
343 | | // capacity, so clients can't tell them apart. This isn't really a practical |
344 | | // concern though because shutting down a pool typically requires clients to |
345 | | // be quiesced first, so there's no danger of a client getting confused. |
346 | 24.7k | pool_status_ = STATUS(ServiceUnavailable, "The pool has been shut down."); |
347 | | |
348 | | // Clear the various queues under the lock, but defer the releasing |
349 | | // of the tasks outside the lock, in case there are concurrent threads |
350 | | // wanting to access the ThreadPool. The task's destructors may acquire |
351 | | // locks, etc, so this also prevents lock inversions. |
352 | 24.7k | queue_.clear(); |
353 | 24.7k | deque<deque<Task>> to_release; |
354 | 24.8k | for (auto* t : tokens_) { |
355 | 24.8k | if (!t->entries_.empty()) { |
356 | 7 | to_release.emplace_back(std::move(t->entries_)); |
357 | 7 | } |
358 | 24.8k | switch (t->state()) { |
359 | 12.1k | case ThreadPoolTokenState::kIdle: |
360 | | // The token is idle; we can quiesce it immediately. |
361 | 12.1k | t->Transition(ThreadPoolTokenState::kQuiesced); |
362 | 12.1k | break; |
363 | 22 | case ThreadPoolTokenState::kRunning: |
364 | | // The token has tasks associated with it. If they're merely queued |
365 | | // (i.e. there are no active threads), the tasks will have been removed |
366 | | // above and we can quiesce immediately. Otherwise, we need to wait for |
367 | | // the threads to finish. |
368 | 22 | t->Transition(t->active_threads_ > 0 ? |
369 | 19 | ThreadPoolTokenState::kQuiescing : |
370 | 3 | ThreadPoolTokenState::kQuiesced); |
371 | 22 | break; |
372 | 12.7k | default: |
373 | 12.7k | break; |
374 | 24.8k | } |
375 | 24.8k | } |
376 | | |
377 | | // The queues are empty. Wake any sleeping worker threads and wait for all |
378 | | // of them to exit. Some worker threads will exit immediately upon waking, |
379 | | // while others will exit after they finish executing an outstanding task. |
380 | 24.7k | total_queued_tasks_ = 0; |
381 | 24.7k | not_empty_.Broadcast(); |
382 | 27.8k | while (num_threads_ > 0) { |
383 | 3.04k | no_threads_cond_.Wait(); |
384 | 3.04k | } |
385 | | |
386 | | // All the threads have exited. Check the state of each token. |
387 | 24.8k | for (auto* t : tokens_) { |
388 | 24.8k | DCHECK(t->state() == ThreadPoolTokenState::kIdle || |
389 | 24.8k | t->state() == ThreadPoolTokenState::kQuiesced); |
390 | 24.8k | } |
391 | | |
392 | | // Finally release the queued tasks, outside the lock. |
393 | 24.7k | unique_lock.Unlock(); |
394 | 7 | for (auto& token : to_release) { |
395 | 24 | for (auto& t : token) { |
396 | 24 | if (t.trace) { |
397 | 0 | t.trace->Release(); |
398 | 0 | } |
399 | 24 | } |
400 | 7 | } |
401 | 24.7k | } |
402 | | |
403 | 621k | unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) { |
404 | 621k | return NewTokenWithMetrics(mode, {}); |
405 | 621k | } |
406 | | |
407 | | unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics( |
408 | 623k | ExecutionMode mode, ThreadPoolMetrics metrics) { |
409 | 623k | MutexLock guard(lock_); |
410 | 623k | unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, std::move(metrics))); |
411 | 623k | InsertOrDie(&tokens_, t.get()); |
412 | 623k | return t; |
413 | 623k | } |
414 | | |
415 | 276k | void ThreadPool::ReleaseToken(ThreadPoolToken* t) { |
416 | 276k | MutexLock guard(lock_); |
417 | 18.4E | CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released", |
418 | 18.4E | ThreadPoolToken::StateToString(t->state())); |
419 | 276k | CHECK_EQ(1, tokens_.erase(t)); |
420 | 276k | } |
421 | | |
422 | | |
423 | 7.43k | Status ThreadPool::SubmitClosure(const Closure& task) { |
424 | | // TODO: once all uses of std::bind-based tasks are dead, implement this |
425 | | // in a more straight-forward fashion. |
426 | 7.43k | return SubmitFunc(std::bind(&Closure::Run, task)); |
427 | 7.43k | } |
428 | | |
429 | 1.01k | Status ThreadPool::SubmitFunc(const std::function<void()>& func) { |
430 | 1.01k | return Submit(std::make_shared<FunctionRunnable>(func)); |
431 | 1.01k | } |
432 | | |
433 | 0 | Status ThreadPool::SubmitFunc(std::function<void()>&& func) { |
434 | 0 | return Submit(std::make_shared<FunctionRunnable>(std::move(func))); |
435 | 0 | } |
436 | | |
437 | 971k | Status ThreadPool::Submit(const std::shared_ptr<Runnable>& r) { |
438 | 971k | return DoSubmit(std::move(r), tokenless_.get()); |
439 | 971k | } |
440 | | |
441 | 41.1M | Status ThreadPool::DoSubmit(const std::shared_ptr<Runnable> task, ThreadPoolToken* token) { |
442 | 41.1M | DCHECK(token); |
443 | 41.1M | MonoTime submit_time = MonoTime::Now(); |
444 | | |
445 | 41.1M | MutexLock guard(lock_); |
446 | 41.1M | if (PREDICT_FALSE(!pool_status_.ok())) { |
447 | 2 | return pool_status_; |
448 | 2 | } |
449 | | |
450 | 41.1M | if (PREDICT_FALSE(!token->MaySubmitNewTasks())) { |
451 | 1.17k | return STATUS(ServiceUnavailable, "Thread pool token was shut down.", "", Errno(ESHUTDOWN)); |
452 | 1.17k | } |
453 | | |
454 | | // Size limit check. |
455 | 41.1M | int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ + |
456 | 41.1M | static_cast<int64_t>(max_queue_size_) - total_queued_tasks_; |
457 | 41.1M | if (capacity_remaining < 1) { |
458 | 7 | return STATUS(ServiceUnavailable, |
459 | 7 | Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", |
460 | 7 | num_threads_, max_threads_, total_queued_tasks_, max_queue_size_), |
461 | 7 | "", Errno(ESHUTDOWN)); |
462 | 7 | } |
463 | | |
464 | | // Should we create another thread? |
465 | | // We assume that each current inactive thread will grab one item from the |
466 | | // queue. If it seems like we'll need another thread, we create one. |
467 | | // In theory, a currently active thread could finish immediately after this |
468 | | // calculation. This would mean we created a thread we didn't really need. |
469 | | // However, this race is unavoidable, since we don't do the work under a lock. |
470 | | // It's also harmless. |
471 | | // |
472 | | // Of course, we never create more than max_threads_ threads no matter what. |
473 | 41.1M | int threads_from_this_submit = |
474 | 41.1M | token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1; |
475 | 41.1M | int inactive_threads = num_threads_ - active_threads_; |
476 | 41.1M | int64_t additional_threads = (queue_.size() + threads_from_this_submit) - inactive_threads; |
477 | 41.1M | if (additional_threads > 0 && num_threads_ < max_threads_) { |
478 | 301k | Status status = CreateThreadUnlocked(); |
479 | 301k | if (!status.ok()) { |
480 | | // If we failed to create a thread, but there are still some other |
481 | | // worker threads, log a warning message and continue. |
482 | 0 | LOG(WARNING) << "Thread pool failed to create thread: " << status << ", num_threads: " |
483 | 0 | << num_threads_ << ", max_threads: " << max_threads_; |
484 | 0 | if (num_threads_ == 0) { |
485 | | // If we have no threads, we can't do any work. |
486 | 0 | return status; |
487 | 0 | } |
488 | 41.1M | } |
489 | 301k | } |
490 | | |
491 | 41.1M | Task e; |
492 | 41.1M | e.runnable = task; |
493 | 41.1M | e.trace = Trace::CurrentTrace(); |
494 | | // Need to AddRef, since the thread which submitted the task may go away, |
495 | | // and we don't want the trace to be destructed while waiting in the queue. |
496 | 41.1M | if (e.trace) { |
497 | 1.46M | e.trace->AddRef(); |
498 | 1.46M | } |
499 | 41.1M | e.submit_time = submit_time; |
500 | | |
501 | | // Add the task to the token's queue. |
502 | 41.1M | ThreadPoolTokenState state = token->state(); |
503 | 41.1M | DCHECK(state == ThreadPoolTokenState::kIdle || |
504 | 41.1M | state == ThreadPoolTokenState::kRunning); |
505 | 41.1M | token->entries_.emplace_back(std::move(e)); |
506 | 41.1M | if (state == ThreadPoolTokenState::kIdle || |
507 | 38.5M | token->mode() == ExecutionMode::CONCURRENT) { |
508 | 38.5M | queue_.emplace_back(token); |
509 | 38.5M | if (state == ThreadPoolTokenState::kIdle) { |
510 | 32.0M | token->Transition(ThreadPoolTokenState::kRunning); |
511 | 32.0M | } |
512 | 38.5M | } |
513 | 41.1M | int length_at_submit = total_queued_tasks_++; |
514 | | |
515 | 41.1M | guard.Unlock(); |
516 | 41.1M | not_empty_.Signal(); |
517 | | |
518 | 41.1M | if (metrics_.queue_length_histogram) { |
519 | 338k | metrics_.queue_length_histogram->Increment(length_at_submit); |
520 | 338k | } |
521 | 41.1M | if (token->metrics_.queue_length_histogram) { |
522 | 3 | token->metrics_.queue_length_histogram->Increment(length_at_submit); |
523 | 3 | } |
524 | | |
525 | 41.1M | return Status::OK(); |
526 | 41.1M | } |
527 | | |
528 | 292 | void ThreadPool::Wait() { |
529 | 292 | MutexLock unique_lock(lock_); |
530 | 426 | while ((!queue_.empty()) || (active_threads_ > 0)) { |
531 | 134 | idle_cond_.Wait(); |
532 | 134 | } |
533 | 292 | } |
534 | | |
535 | 429 | bool ThreadPool::WaitUntil(const MonoTime& until) { |
536 | 429 | MutexLock unique_lock(lock_); |
537 | 429 | while ((!queue_.empty()) || (active_threads_ > 0)) { |
538 | 0 | if (!idle_cond_.WaitUntil(until)) { |
539 | 0 | return false; |
540 | 0 | } |
541 | 0 | } |
542 | 429 | return true; |
543 | 429 | } |
544 | | |
545 | 429 | bool ThreadPool::WaitFor(const MonoDelta& delta) { |
546 | 429 | return WaitUntil(MonoTime::Now() + delta); |
547 | 429 | } |
548 | | |
549 | 354k | void ThreadPool::DispatchThread(bool permanent) { |
550 | 354k | MutexLock unique_lock(lock_); |
551 | 82.2M | while (true) { |
552 | | // Note: STATUS(Aborted, ) is used to indicate normal shutdown. |
553 | 82.1M | if (!pool_status_.ok()) { |
554 | 2 | VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString(); |
555 | 3.45k | break; |
556 | 3.45k | } |
557 | | |
558 | 82.1M | if (queue_.empty()) { |
559 | 40.9M | if (permanent) { |
560 | 9.50M | not_empty_.Wait(); |
561 | 31.4M | } else { |
562 | 31.4M | if (!not_empty_.TimedWait(idle_timeout_)) { |
563 | | // After much investigation, it appears that pthread condition variables have |
564 | | // a weird behavior in which they can return ETIMEDOUT from timed_wait even if |
565 | | // another thread did in fact signal. Apparently after a timeout there is some |
566 | | // brief period during which another thread may actually grab the internal mutex |
567 | | // protecting the state, signal, and release again before we get the mutex. So, |
568 | | // we'll recheck the empty queue case regardless. |
569 | 238k | if (queue_.empty()) { |
570 | 18.4E | VLOG(3) << "Releasing worker thread from pool " << name_ << " after " |
571 | 18.4E | << idle_timeout_.ToMilliseconds() << "ms of idle time."; |
572 | 237k | break; |
573 | 237k | } |
574 | 40.7M | } |
575 | 31.4M | } |
576 | 40.7M | continue; |
577 | 40.7M | } |
578 | | |
579 | | // Get the next token and task to execute. |
580 | 41.1M | ThreadPoolToken* token = queue_.front(); |
581 | 41.1M | queue_.pop_front(); |
582 | 41.1M | DCHECK_EQ(ThreadPoolTokenState::kRunning, token->state()); |
583 | 41.1M | DCHECK(!token->entries_.empty()); |
584 | 41.1M | Task task = std::move(token->entries_.front()); |
585 | 41.1M | token->entries_.pop_front(); |
586 | 41.1M | token->active_threads_++; |
587 | 41.1M | --total_queued_tasks_; |
588 | 41.1M | ++active_threads_; |
589 | | |
590 | 41.1M | unique_lock.Unlock(); |
591 | | |
592 | | // Release the reference which was held by the queued item. |
593 | 41.1M | ADOPT_TRACE(task.trace); |
594 | 41.1M | if (task.trace) { |
595 | 1.46M | task.trace->Release(); |
596 | 1.46M | } |
597 | | |
598 | | // Update metrics |
599 | 41.1M | MonoTime now(MonoTime::Now()); |
600 | 41.1M | int64_t queue_time_us = (now - task.submit_time).ToMicroseconds(); |
601 | 41.1M | if (metrics_.queue_time_us_histogram) { |
602 | 338k | metrics_.queue_time_us_histogram->Increment(queue_time_us); |
603 | 338k | } |
604 | 41.1M | if (token->metrics_.queue_time_us_histogram) { |
605 | 3 | token->metrics_.queue_time_us_histogram->Increment(queue_time_us); |
606 | 3 | } |
607 | | |
608 | | // Execute the task |
609 | 41.1M | { |
610 | 41.1M | MicrosecondsInt64 start_wall_us = GetMonoTimeMicros(); |
611 | 41.1M | task.runnable->Run(); |
612 | 41.1M | int64_t wall_us = GetMonoTimeMicros() - start_wall_us; |
613 | | |
614 | 41.1M | if (metrics_.run_time_us_histogram) { |
615 | 421k | metrics_.run_time_us_histogram->Increment(wall_us); |
616 | 421k | } |
617 | 41.1M | if (token->metrics_.run_time_us_histogram) { |
618 | 3 | token->metrics_.run_time_us_histogram->Increment(wall_us); |
619 | 3 | } |
620 | 41.1M | } |
621 | | // Destruct the task while we do not hold the lock. |
622 | | // |
623 | | // The task's destructor may be expensive if it has a lot of bound |
624 | | // objects, and we don't want to block submission of the threadpool. |
625 | | // In the worst case, the destructor might even try to do something |
626 | | // with this threadpool, and produce a deadlock. |
627 | 41.1M | task.runnable.reset(); |
628 | 41.1M | unique_lock.Lock(); |
629 | | |
630 | | // Possible states: |
631 | | // 1. The token was shut down while we ran its task. Transition to kQuiesced. |
632 | | // 2. The token has no more queued tasks. Transition back to kIdle. |
633 | | // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. |
634 | 41.1M | ThreadPoolTokenState state = token->state(); |
635 | 41.1M | DCHECK(state == ThreadPoolTokenState::kRunning || |
636 | 41.1M | state == ThreadPoolTokenState::kQuiescing); |
637 | 41.1M | if (--token->active_threads_ == 0) { |
638 | 37.9M | if (state == ThreadPoolTokenState::kQuiescing) { |
639 | 591 | DCHECK(token->entries_.empty()); |
640 | 591 | token->Transition(ThreadPoolTokenState::kQuiesced); |
641 | 37.9M | } else if (token->entries_.empty()) { |
642 | 32.0M | token->Transition(ThreadPoolTokenState::kIdle); |
643 | 5.90M | } else if (token->mode() == ExecutionMode::SERIAL) { |
644 | 2.67M | queue_.emplace_back(token); |
645 | 2.67M | } |
646 | 37.9M | } |
647 | 41.1M | if (--active_threads_ == 0) { |
648 | 30.9M | idle_cond_.Broadcast(); |
649 | 30.9M | } |
650 | 41.1M | } |
651 | | |
652 | | // It's important that we hold the lock between exiting the loop and dropping |
653 | | // num_threads_. Otherwise it's possible someone else could come along here |
654 | | // and add a new task just as the last running thread is about to exit. |
655 | 354k | CHECK(unique_lock.OwnsLock()); |
656 | | |
657 | 354k | CHECK_EQ(threads_.erase(Thread::current_thread()), 1); |
658 | 354k | if (--num_threads_ == 0) { |
659 | 101k | no_threads_cond_.Broadcast(); |
660 | | |
661 | | // Sanity check: if we're the last thread exiting, the queue ought to be |
662 | | // empty. Otherwise it will never get processed. |
663 | 101k | CHECK(queue_.empty()); |
664 | 101k | DCHECK_EQ(0, total_queued_tasks_); |
665 | 101k | } |
666 | 354k | } |
667 | | |
668 | 353k | Status ThreadPool::CreateThreadUnlocked() { |
669 | | // The first few threads are permanent, and do not time out. |
670 | 353k | bool permanent = (num_threads_ < min_threads_); |
671 | 353k | scoped_refptr<Thread> t; |
672 | 353k | Status s = yb::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_), |
673 | 353k | &ThreadPool::DispatchThread, this, permanent, &t); |
674 | 354k | if (s.ok()) { |
675 | 354k | InsertOrDie(&threads_, t.get()); |
676 | 354k | num_threads_++; |
677 | 354k | } |
678 | 353k | return s; |
679 | 353k | } |
680 | | |
681 | 448k | void ThreadPool::CheckNotPoolThreadUnlocked() { |
682 | 448k | Thread* current = Thread::current_thread(); |
683 | 448k | if (ContainsKey(threads_, current)) { |
684 | 0 | LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with " |
685 | 0 | "name '$1' called pool function that would result in deadlock", |
686 | 0 | name_, current->name()); |
687 | 0 | } |
688 | 448k | } |
689 | | |
690 | 0 | CHECKED_STATUS TaskRunner::Init(int concurrency) { |
691 | 0 | ThreadPoolBuilder builder("Task Runner"); |
692 | 0 | if (concurrency > 0) { |
693 | 0 | builder.set_max_threads(concurrency); |
694 | 0 | } |
695 | 0 | return builder.Build(&thread_pool_); |
696 | 0 | } |
697 | | |
698 | 0 | CHECKED_STATUS TaskRunner::Wait() { |
699 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
700 | 0 | cond_.wait(lock, [this] { return running_tasks_ == 0; }); |
701 | 0 | return first_failure_; |
702 | 0 | } |
703 | | |
704 | 0 | void TaskRunner::CompleteTask(const Status& status) { |
705 | 0 | if (!status.ok()) { |
706 | 0 | bool expected = false; |
707 | 0 | if (failed_.compare_exchange_strong(expected, true)) { |
708 | 0 | first_failure_ = status; |
709 | 0 | } else { |
710 | 0 | LOG(WARNING) << status.message() << std::endl; |
711 | 0 | } |
712 | 0 | } |
713 | 0 | if (--running_tasks_ == 0) { |
714 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
715 | 0 | cond_.notify_one(); |
716 | 0 | } |
717 | 0 | } |
718 | | |
719 | | } // namespace yb |