/Users/deen/code/yugabyte-db/src/yb/util/threadpool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <algorithm> |
34 | | #include <functional> |
35 | | #include <limits> |
36 | | #include <memory> |
37 | | |
38 | | #include <gflags/gflags.h> |
39 | | #include <glog/logging.h> |
40 | | |
41 | | #include "yb/gutil/callback.h" |
42 | | #include "yb/gutil/macros.h" |
43 | | #include "yb/gutil/map-util.h" |
44 | | #include "yb/gutil/stl_util.h" |
45 | | #include "yb/gutil/strings/substitute.h" |
46 | | #include "yb/gutil/sysinfo.h" |
47 | | |
48 | | #include "yb/util/errno.h" |
49 | | #include "yb/util/logging.h" |
50 | | #include "yb/util/metrics.h" |
51 | | #include "yb/util/thread.h" |
52 | | #include "yb/util/threadpool.h" |
53 | | #include "yb/util/trace.h" |
54 | | |
55 | | namespace yb { |
56 | | |
57 | | using strings::Substitute; |
58 | | using std::unique_ptr; |
59 | | |
60 | | |
61 | 2.82M | ThreadPoolMetrics::~ThreadPoolMetrics() = default; |
62 | | |
63 | | //////////////////////////////////////////////////////// |
64 | | // ThreadPoolBuilder |
65 | | /////////////////////////////////////////////////////// |
66 | | |
67 | | ThreadPoolBuilder::ThreadPoolBuilder(std::string name) |
68 | | : name_(std::move(name)), |
69 | | min_threads_(0), |
70 | | max_threads_(base::NumCPUs()), |
71 | | max_queue_size_(std::numeric_limits<int>::max()), |
72 | 219k | idle_timeout_(MonoDelta::FromMilliseconds(500)) {} |
73 | | |
74 | 78.7k | ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { |
75 | 78.7k | CHECK_GE(min_threads, 0); |
76 | 78.7k | min_threads_ = min_threads; |
77 | 78.7k | return *this; |
78 | 78.7k | } |
79 | | |
80 | 116k | ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { |
81 | 116k | CHECK_GE(max_threads, 0); |
82 | 116k | max_threads_ = max_threads; |
83 | 116k | return *this; |
84 | 116k | } |
85 | | |
86 | 37.1k | ThreadPoolBuilder& ThreadPoolBuilder::unlimited_threads() { |
87 | 37.1k | max_threads_ = std::numeric_limits<int>::max(); |
88 | 37.1k | return *this; |
89 | 37.1k | } |
90 | | |
91 | 18.5k | ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { |
92 | 18.5k | CHECK_GE(max_queue_size, 0); |
93 | 18.5k | max_queue_size_ = max_queue_size; |
94 | 18.5k | return *this; |
95 | 18.5k | } |
96 | | |
97 | 45.8k | ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) { |
98 | 45.8k | metrics_ = std::move(metrics); |
99 | 45.8k | return *this; |
100 | 45.8k | } |
101 | | |
102 | 9.28k | ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) { |
103 | 9.28k | idle_timeout_ = idle_timeout; |
104 | 9.28k | return *this; |
105 | 9.28k | } |
106 | | |
107 | 219k | Status ThreadPoolBuilder::Build(std::unique_ptr<ThreadPool>* pool) const { |
108 | 219k | pool->reset(new ThreadPool(*this)); |
109 | 219k | RETURN_NOT_OK((*pool)->Init()); |
110 | 219k | return Status::OK(); |
111 | 219k | } |
112 | | |
113 | | //////////////////////////////////////////////////////// |
114 | | // ThreadPoolToken |
115 | | //////////////////////////////////////////////////////// |
116 | | |
117 | | ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, |
118 | | ThreadPool::ExecutionMode mode, |
119 | | ThreadPoolMetrics metrics) |
120 | | : mode_(mode), |
121 | | pool_(pool), |
122 | | metrics_(std::move(metrics)), |
123 | | state_(ThreadPoolTokenState::kIdle), |
124 | | not_running_cond_(&pool->lock_), |
125 | 1.03M | active_threads_(0) { |
126 | 1.03M | } |
127 | | |
128 | 446k | ThreadPoolToken::~ThreadPoolToken() { |
129 | 446k | Shutdown(); |
130 | 446k | pool_->ReleaseToken(this); |
131 | 446k | } |
132 | | |
133 | 34.8M | Status ThreadPoolToken::SubmitClosure(Closure c) { |
134 | 34.8M | return Submit(std::make_shared<FunctionRunnable>((std::bind(&Closure::Run, c)))); |
135 | 34.8M | } |
136 | | |
137 | 58.0M | Status ThreadPoolToken::SubmitFunc(std::function<void()> f) { |
138 | 58.0M | return Submit(std::make_shared<FunctionRunnable>(std::move(f))); |
139 | 58.0M | } |
140 | | |
141 | 92.9M | Status ThreadPoolToken::Submit(std::shared_ptr<Runnable> r) { |
142 | 92.9M | return pool_->DoSubmit(std::move(r), this); |
143 | 92.9M | } |
144 | | |
145 | 674k | void ThreadPoolToken::Shutdown() { |
146 | 674k | MutexLock unique_lock(pool_->lock_); |
147 | 674k | pool_->CheckNotPoolThreadUnlocked(); |
148 | | |
149 | | // Clear the queue under the lock, but defer the releasing of the tasks |
150 | | // outside the lock, in case there are concurrent threads wanting to access |
151 | | // the ThreadPool. The task's destructors may acquire locks, etc, so this |
152 | | // also prevents lock inversions. |
153 | 674k | deque<ThreadPool::Task> to_release = std::move(entries_); |
154 | 674k | pool_->total_queued_tasks_ -= to_release.size(); |
155 | | |
156 | 674k | switch (state()) { |
157 | 429k | case ThreadPoolTokenState::kIdle: |
158 | | // There were no tasks outstanding; we can quiesce the token immediately. |
159 | 429k | Transition(ThreadPoolTokenState::kQuiesced); |
160 | 429k | break; |
161 | 729 | case ThreadPoolTokenState::kRunning: |
162 | | // There were outstanding tasks. If any are still running, switch to |
163 | | // kQuiescing and wait for them to finish (the worker thread executing |
164 | | // the token's last task will switch the token to kQuiesced). Otherwise, |
165 | | // we can quiesce the token immediately. |
166 | | |
167 | | // Note: this is an O(n) operation, but it's expected to be infrequent. |
168 | | // Plus doing it this way (rather than switching to kQuiescing and waiting |
169 | | // for a worker thread to process the queue entry) helps retain state |
170 | | // transition symmetry with ThreadPool::Shutdown. |
171 | 2.12k | for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) { |
172 | 1.39k | if (*it == this) { |
173 | 127 | it = pool_->queue_.erase(it); |
174 | 1.26k | } else { |
175 | 1.26k | it++; |
176 | 1.26k | } |
177 | 1.39k | } |
178 | | |
179 | 729 | if (active_threads_ == 0) { |
180 | 82 | Transition(ThreadPoolTokenState::kQuiesced); |
181 | 82 | break; |
182 | 82 | } |
183 | 647 | Transition(ThreadPoolTokenState::kQuiescing); |
184 | 647 | FALLTHROUGH_INTENDED; |
185 | 647 | case ThreadPoolTokenState::kQuiescing: |
186 | | // The token is already quiescing. Just wait for a worker thread to |
187 | | // switch it to kQuiesced. |
188 | 1.29k | while (state() != ThreadPoolTokenState::kQuiesced) { |
189 | 647 | not_running_cond_.Wait(); |
190 | 647 | } |
191 | 647 | break; |
192 | 245k | default: |
193 | 245k | break; |
194 | 674k | } |
195 | | |
196 | | // Finally release the queued tasks, outside the lock. |
197 | 675k | unique_lock.Unlock(); |
198 | 675k | for (auto& t : to_release) { |
199 | 313 | if (t.trace) { |
200 | 0 | t.trace->Release(); |
201 | 0 | } |
202 | 313 | } |
203 | 675k | } |
204 | | |
205 | 4.82k | void ThreadPoolToken::Wait() { |
206 | 4.82k | MutexLock unique_lock(pool_->lock_); |
207 | 4.82k | pool_->CheckNotPoolThreadUnlocked(); |
208 | 4.89k | while (IsActive()) { |
209 | 78 | not_running_cond_.Wait(); |
210 | 78 | } |
211 | 4.82k | } |
212 | | |
213 | 0 | bool ThreadPoolToken::WaitUntil(const MonoTime& until) { |
214 | 0 | MutexLock unique_lock(pool_->lock_); |
215 | 0 | pool_->CheckNotPoolThreadUnlocked(); |
216 | 0 | while (IsActive()) { |
217 | 0 | if (!not_running_cond_.WaitUntil(until)) { |
218 | 0 | return false; |
219 | 0 | } |
220 | 0 | } |
221 | 0 | return true; |
222 | 0 | } |
223 | | |
224 | 0 | bool ThreadPoolToken::WaitFor(const MonoDelta& delta) { |
225 | 0 | return WaitUntil(MonoTime::Now() + delta); |
226 | 0 | } |
227 | | |
228 | 161M | void ThreadPoolToken::Transition(ThreadPoolTokenState new_state) { |
229 | 161M | #ifndef NDEBUG |
230 | 161M | CHECK_NE(state_, new_state); |
231 | | |
232 | 161M | switch (state_) { |
233 | 81.2M | case ThreadPoolTokenState::kIdle: |
234 | 81.2M | CHECK(new_state == ThreadPoolTokenState::kRunning || |
235 | 81.2M | new_state == ThreadPoolTokenState::kQuiesced); |
236 | 81.2M | if (new_state == ThreadPoolTokenState::kRunning) { |
237 | 80.7M | CHECK(!entries_.empty()); |
238 | 80.7M | } else { |
239 | 445k | CHECK(entries_.empty()); |
240 | 445k | CHECK_EQ(active_threads_, 0); |
241 | 445k | } |
242 | 81.2M | break; |
243 | 80.7M | case ThreadPoolTokenState::kRunning: |
244 | 80.7M | CHECK(new_state == ThreadPoolTokenState::kIdle || |
245 | 80.7M | new_state == ThreadPoolTokenState::kQuiescing || |
246 | 80.7M | new_state == ThreadPoolTokenState::kQuiesced); |
247 | 80.7M | CHECK(entries_.empty()); |
248 | 80.7M | if (new_state == ThreadPoolTokenState::kQuiescing) { |
249 | 657 | CHECK_GT(active_threads_, 0); |
250 | 657 | } |
251 | 80.7M | break; |
252 | 656 | case ThreadPoolTokenState::kQuiescing: |
253 | 656 | CHECK(new_state == ThreadPoolTokenState::kQuiesced); |
254 | 656 | CHECK_EQ(active_threads_, 0); |
255 | 656 | break; |
256 | 0 | case ThreadPoolTokenState::kQuiesced: |
257 | 0 | CHECK(false); // kQuiesced is a terminal state |
258 | 0 | break; |
259 | 0 | default: |
260 | 0 | LOG(FATAL) << "Unknown token state: " << state_; |
261 | 161M | } |
262 | 161M | #endif |
263 | | |
264 | | // Take actions based on the state we're entering. |
265 | 161M | switch (new_state) { |
266 | 80.7M | case ThreadPoolTokenState::kIdle: |
267 | 81.1M | case ThreadPoolTokenState::kQuiesced: |
268 | 81.1M | not_running_cond_.Broadcast(); |
269 | 81.1M | break; |
270 | 80.7M | default: |
271 | 80.7M | break; |
272 | 161M | } |
273 | | |
274 | 161M | state_ = new_state; |
275 | 161M | } |
276 | | |
277 | 0 | const char* ThreadPoolToken::StateToString(ThreadPoolTokenState s) { |
278 | 0 | switch (s) { |
279 | 0 | case ThreadPoolTokenState::kIdle: return "kIdle"; break; |
280 | 0 | case ThreadPoolTokenState::kRunning: return "kRunning"; break; |
281 | 0 | case ThreadPoolTokenState::kQuiescing: return "kQuiescing"; break; |
282 | 0 | case ThreadPoolTokenState::kQuiesced: return "kQuiesced"; break; |
283 | 0 | } |
284 | 0 | return "<cannot reach here>"; |
285 | 0 | } |
286 | | |
287 | | //////////////////////////////////////////////////////// |
288 | | // ThreadPool |
289 | | //////////////////////////////////////////////////////// |
290 | | |
291 | | ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) |
292 | | : name_(builder.name_), |
293 | | min_threads_(builder.min_threads_), |
294 | | max_threads_(builder.max_threads_), |
295 | | max_queue_size_(builder.max_queue_size_), |
296 | | idle_timeout_(builder.idle_timeout_), |
297 | | pool_status_(STATUS(Uninitialized, "The pool was not initialized.")), |
298 | | idle_cond_(&lock_), |
299 | | no_threads_cond_(&lock_), |
300 | | not_empty_(&lock_), |
301 | | num_threads_(0), |
302 | | active_threads_(0), |
303 | | total_queued_tasks_(0), |
304 | | tokenless_(NewToken(ExecutionMode::CONCURRENT)), |
305 | 219k | metrics_(builder.metrics_) { |
306 | 219k | } |
307 | | |
308 | 16.2k | ThreadPool::~ThreadPool() { |
309 | | // There should only be one live token: the one used in tokenless submission. |
310 | 16.2k | CHECK_EQ(1, tokens_.size()) << Substitute( |
311 | 0 | "Threadpool $0 destroyed with $1 allocated tokens", |
312 | 0 | name_, tokens_.size()); |
313 | 16.2k | Shutdown(); |
314 | 16.2k | } |
315 | | |
316 | 219k | Status ThreadPool::Init() { |
317 | 219k | MutexLock unique_lock(lock_); |
318 | 219k | if (!pool_status_.IsUninitialized()) { |
319 | 0 | return STATUS(NotSupported, "The thread pool is already initialized"); |
320 | 0 | } |
321 | 219k | pool_status_ = Status::OK(); |
322 | 298k | for (int i = 0; i < min_threads_; i++78.7k ) { |
323 | 78.7k | Status status = CreateThreadUnlocked(); |
324 | 78.7k | if (!status.ok()) { |
325 | 0 | if (i != 0) { |
326 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 5) << "Cannot create thread: " << status << ", will try later"; |
327 | | // Cannot create enough threads now, will try later. |
328 | 0 | break; |
329 | 0 | } |
330 | 0 | unique_lock.Unlock(); |
331 | 0 | Shutdown(); |
332 | 0 | return status; |
333 | 0 | } |
334 | 78.7k | } |
335 | 219k | return Status::OK(); |
336 | 219k | } |
337 | | |
338 | 31.2k | void ThreadPool::Shutdown() { |
339 | 31.2k | MutexLock unique_lock(lock_); |
340 | 31.2k | CheckNotPoolThreadUnlocked(); |
341 | | |
342 | | // Note: this is the same error seen at submission if the pool is at |
343 | | // capacity, so clients can't tell them apart. This isn't really a practical |
344 | | // concern though because shutting down a pool typically requires clients to |
345 | | // be quiesced first, so there's no danger of a client getting confused. |
346 | 31.2k | pool_status_ = STATUS(ServiceUnavailable, "The pool has been shut down."); |
347 | | |
348 | | // Clear the various queues under the lock, but defer the releasing |
349 | | // of the tasks outside the lock, in case there are concurrent threads |
350 | | // wanting to access the ThreadPool. The task's destructors may acquire |
351 | | // locks, etc, so this also prevents lock inversions. |
352 | 31.2k | queue_.clear(); |
353 | 31.2k | deque<deque<Task>> to_release; |
354 | 31.4k | for (auto* t : tokens_) { |
355 | 31.4k | if (!t->entries_.empty()) { |
356 | 14 | to_release.emplace_back(std::move(t->entries_)); |
357 | 14 | } |
358 | 31.4k | switch (t->state()) { |
359 | 16.4k | case ThreadPoolTokenState::kIdle: |
360 | | // The token is idle; we can quiesce it immediately. |
361 | 16.4k | t->Transition(ThreadPoolTokenState::kQuiesced); |
362 | 16.4k | break; |
363 | 23 | case ThreadPoolTokenState::kRunning: |
364 | | // The token has tasks associated with it. If they're merely queued |
365 | | // (i.e. there are no active threads), the tasks will have been removed |
366 | | // above and we can quiesce immediately. Otherwise, we need to wait for |
367 | | // the threads to finish. |
368 | 23 | t->Transition(t->active_threads_ > 0 ? |
369 | 10 | ThreadPoolTokenState::kQuiescing : |
370 | 23 | ThreadPoolTokenState::kQuiesced13 ); |
371 | 23 | break; |
372 | 14.9k | default: |
373 | 14.9k | break; |
374 | 31.4k | } |
375 | 31.4k | } |
376 | | |
377 | | // The queues are empty. Wake any sleeping worker threads and wait for all |
378 | | // of them to exit. Some worker threads will exit immediately upon waking, |
379 | | // while others will exit after they finish executing an outstanding task. |
380 | 31.2k | total_queued_tasks_ = 0; |
381 | 31.2k | not_empty_.Broadcast(); |
382 | 35.4k | while (num_threads_ > 0) { |
383 | 4.14k | no_threads_cond_.Wait(); |
384 | 4.14k | } |
385 | | |
386 | | // All the threads have exited. Check the state of each token. |
387 | 31.4k | for (auto* t : tokens_) { |
388 | 31.4k | DCHECK(t->state() == ThreadPoolTokenState::kIdle || |
389 | 31.4k | t->state() == ThreadPoolTokenState::kQuiesced); |
390 | 31.4k | } |
391 | | |
392 | | // Finally release the queued tasks, outside the lock. |
393 | 31.2k | unique_lock.Unlock(); |
394 | 31.2k | for (auto& token : to_release) { |
395 | 42 | for (auto& t : token) { |
396 | 42 | if (t.trace) { |
397 | 0 | t.trace->Release(); |
398 | 0 | } |
399 | 42 | } |
400 | 14 | } |
401 | 31.2k | } |
402 | | |
403 | 1.03M | unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) { |
404 | 1.03M | return NewTokenWithMetrics(mode, {}); |
405 | 1.03M | } |
406 | | |
407 | | unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics( |
408 | 1.03M | ExecutionMode mode, ThreadPoolMetrics metrics) { |
409 | 1.03M | MutexLock guard(lock_); |
410 | 1.03M | unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, std::move(metrics))); |
411 | 1.03M | InsertOrDie(&tokens_, t.get()); |
412 | 1.03M | return t; |
413 | 1.03M | } |
414 | | |
415 | 446k | void ThreadPool::ReleaseToken(ThreadPoolToken* t) { |
416 | 446k | MutexLock guard(lock_); |
417 | 18.4E | CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released", |
418 | 18.4E | ThreadPoolToken::StateToString(t->state())); |
419 | 446k | CHECK_EQ(1, tokens_.erase(t)); |
420 | 446k | } |
421 | | |
422 | | |
423 | 11.0k | Status ThreadPool::SubmitClosure(const Closure& task) { |
424 | | // TODO: once all uses of std::bind-based tasks are dead, implement this |
425 | | // in a more straight-forward fashion. |
426 | 11.0k | return SubmitFunc(std::bind(&Closure::Run, task)); |
427 | 11.0k | } |
428 | | |
429 | 963 | Status ThreadPool::SubmitFunc(const std::function<void()>& func) { |
430 | 963 | return Submit(std::make_shared<FunctionRunnable>(func)); |
431 | 963 | } |
432 | | |
433 | 0 | Status ThreadPool::SubmitFunc(std::function<void()>&& func) { |
434 | 0 | return Submit(std::make_shared<FunctionRunnable>(std::move(func))); |
435 | 0 | } |
436 | | |
437 | 1.89M | Status ThreadPool::Submit(const std::shared_ptr<Runnable>& r) { |
438 | 1.89M | return DoSubmit(std::move(r), tokenless_.get()); |
439 | 1.89M | } |
440 | | |
441 | 94.8M | Status ThreadPool::DoSubmit(const std::shared_ptr<Runnable> task, ThreadPoolToken* token) { |
442 | 94.8M | DCHECK(token); |
443 | 94.8M | MonoTime submit_time = MonoTime::Now(); |
444 | | |
445 | 94.8M | MutexLock guard(lock_); |
446 | 94.8M | if (PREDICT_FALSE(!pool_status_.ok())) { |
447 | 2 | return pool_status_; |
448 | 2 | } |
449 | | |
450 | 94.8M | if (PREDICT_FALSE(!token->MaySubmitNewTasks())) { |
451 | 416 | return STATUS(ServiceUnavailable, "Thread pool token was shut down.", "", Errno(ESHUTDOWN)); |
452 | 416 | } |
453 | | |
454 | | // Size limit check. |
455 | 94.8M | int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ + |
456 | 94.8M | static_cast<int64_t>(max_queue_size_) - total_queued_tasks_; |
457 | 94.8M | if (capacity_remaining < 1) { |
458 | 7 | return STATUS(ServiceUnavailable, |
459 | 7 | Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", |
460 | 7 | num_threads_, max_threads_, total_queued_tasks_, max_queue_size_), |
461 | 7 | "", Errno(ESHUTDOWN)); |
462 | 7 | } |
463 | | |
464 | | // Should we create another thread? |
465 | | // We assume that each current inactive thread will grab one item from the |
466 | | // queue. If it seems like we'll need another thread, we create one. |
467 | | // In theory, a currently active thread could finish immediately after this |
468 | | // calculation. This would mean we created a thread we didn't really need. |
469 | | // However, this race is unavoidable, since we don't do the work under a lock. |
470 | | // It's also harmless. |
471 | | // |
472 | | // Of course, we never create more than max_threads_ threads no matter what. |
473 | 94.8M | int threads_from_this_submit = |
474 | 94.8M | token->IsActive() && token->mode() == ExecutionMode::SERIAL14.1M ? 05.08M : 189.7M ; |
475 | 94.8M | int inactive_threads = num_threads_ - active_threads_; |
476 | 94.8M | int64_t additional_threads = (queue_.size() + threads_from_this_submit) - inactive_threads; |
477 | 94.8M | if (additional_threads > 0 && num_threads_ < max_threads_1.05M ) { |
478 | 933k | Status status = CreateThreadUnlocked(); |
479 | 933k | if (!status.ok()) { |
480 | | // If we failed to create a thread, but there are still some other |
481 | | // worker threads, log a warning message and continue. |
482 | 0 | LOG(WARNING) << "Thread pool failed to create thread: " << status << ", num_threads: " |
483 | 0 | << num_threads_ << ", max_threads: " << max_threads_; |
484 | 0 | if (num_threads_ == 0) { |
485 | | // If we have no threads, we can't do any work. |
486 | 0 | return status; |
487 | 0 | } |
488 | 0 | } |
489 | 933k | } |
490 | | |
491 | 94.8M | Task e; |
492 | 94.8M | e.runnable = task; |
493 | 94.8M | e.trace = Trace::CurrentTrace(); |
494 | | // Need to AddRef, since the thread which submitted the task may go away, |
495 | | // and we don't want the trace to be destructed while waiting in the queue. |
496 | 94.8M | if (e.trace) { |
497 | 7.30M | e.trace->AddRef(); |
498 | 7.30M | } |
499 | 94.8M | e.submit_time = submit_time; |
500 | | |
501 | | // Add the task to the token's queue. |
502 | 94.8M | ThreadPoolTokenState state = token->state(); |
503 | 94.8M | DCHECK(state == ThreadPoolTokenState::kIdle || |
504 | 94.8M | state == ThreadPoolTokenState::kRunning); |
505 | 94.8M | token->entries_.emplace_back(std::move(e)); |
506 | 94.8M | if (state == ThreadPoolTokenState::kIdle || |
507 | 94.8M | token->mode() == ExecutionMode::CONCURRENT14.1M ) { |
508 | 89.8M | queue_.emplace_back(token); |
509 | 89.8M | if (state == ThreadPoolTokenState::kIdle) { |
510 | 80.7M | token->Transition(ThreadPoolTokenState::kRunning); |
511 | 80.7M | } |
512 | 89.8M | } |
513 | 94.8M | int length_at_submit = total_queued_tasks_++; |
514 | | |
515 | 94.8M | guard.Unlock(); |
516 | 94.8M | not_empty_.Signal(); |
517 | | |
518 | 94.8M | if (metrics_.queue_length_histogram) { |
519 | 593k | metrics_.queue_length_histogram->Increment(length_at_submit); |
520 | 593k | } |
521 | 94.8M | if (token->metrics_.queue_length_histogram) { |
522 | 3 | token->metrics_.queue_length_histogram->Increment(length_at_submit); |
523 | 3 | } |
524 | | |
525 | 94.8M | return Status::OK(); |
526 | 94.8M | } |
527 | | |
528 | 284 | void ThreadPool::Wait() { |
529 | 284 | MutexLock unique_lock(lock_); |
530 | 359 | while ((!queue_.empty()) || (active_threads_ > 0)327 ) { |
531 | 75 | idle_cond_.Wait(); |
532 | 75 | } |
533 | 284 | } |
534 | | |
535 | 1.01k | bool ThreadPool::WaitUntil(const MonoTime& until) { |
536 | 1.01k | MutexLock unique_lock(lock_); |
537 | 1.01k | while ((!queue_.empty()) || (active_threads_ > 0)) { |
538 | 0 | if (!idle_cond_.WaitUntil(until)) { |
539 | 0 | return false; |
540 | 0 | } |
541 | 0 | } |
542 | 1.01k | return true; |
543 | 1.01k | } |
544 | | |
545 | 1.01k | bool ThreadPool::WaitFor(const MonoDelta& delta) { |
546 | 1.01k | return WaitUntil(MonoTime::Now() + delta); |
547 | 1.01k | } |
548 | | |
549 | 1.01M | void ThreadPool::DispatchThread(bool permanent) { |
550 | 1.01M | MutexLock unique_lock(lock_); |
551 | 189M | while (true) { |
552 | | // Note: STATUS(Aborted, ) is used to indicate normal shutdown. |
553 | 189M | if (!pool_status_.ok()) { |
554 | 18.4E | VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString(); |
555 | 4.55k | break; |
556 | 4.55k | } |
557 | | |
558 | 189M | if (queue_.empty()) { |
559 | 94.5M | if (permanent) { |
560 | 24.8M | not_empty_.Wait(); |
561 | 69.7M | } else { |
562 | 69.7M | if (!not_empty_.TimedWait(idle_timeout_)) { |
563 | | // After much investigation, it appears that pthread condition variables have |
564 | | // a weird behavior in which they can return ETIMEDOUT from timed_wait even if |
565 | | // another thread did in fact signal. Apparently after a timeout there is some |
566 | | // brief period during which another thread may actually grab the internal mutex |
567 | | // protecting the state, signal, and release again before we get the mutex. So, |
568 | | // we'll recheck the empty queue case regardless. |
569 | 837k | if (queue_.empty()) { |
570 | 18.4E | VLOG(3) << "Releasing worker thread from pool " << name_ << " after " |
571 | 18.4E | << idle_timeout_.ToMilliseconds() << "ms of idle time."; |
572 | 832k | break; |
573 | 832k | } |
574 | 837k | } |
575 | 69.7M | } |
576 | 93.7M | continue; |
577 | 94.5M | } |
578 | | |
579 | | // Get the next token and task to execute. |
580 | 94.9M | ThreadPoolToken* token = queue_.front(); |
581 | 94.9M | queue_.pop_front(); |
582 | 94.9M | DCHECK_EQ(ThreadPoolTokenState::kRunning, token->state()); |
583 | 94.9M | DCHECK(!token->entries_.empty()); |
584 | 94.9M | Task task = std::move(token->entries_.front()); |
585 | 94.9M | token->entries_.pop_front(); |
586 | 94.9M | token->active_threads_++; |
587 | 94.9M | --total_queued_tasks_; |
588 | 94.9M | ++active_threads_; |
589 | | |
590 | 94.9M | unique_lock.Unlock(); |
591 | | |
592 | | // Release the reference which was held by the queued item. |
593 | 94.9M | ADOPT_TRACE(task.trace); |
594 | 94.9M | if (task.trace) { |
595 | 7.30M | task.trace->Release(); |
596 | 7.30M | } |
597 | | |
598 | | // Update metrics |
599 | 94.9M | MonoTime now(MonoTime::Now()); |
600 | 94.9M | int64_t queue_time_us = (now - task.submit_time).ToMicroseconds(); |
601 | 94.9M | if (metrics_.queue_time_us_histogram) { |
602 | 593k | metrics_.queue_time_us_histogram->Increment(queue_time_us); |
603 | 593k | } |
604 | 94.9M | if (token->metrics_.queue_time_us_histogram) { |
605 | 3 | token->metrics_.queue_time_us_histogram->Increment(queue_time_us); |
606 | 3 | } |
607 | | |
608 | | // Execute the task |
609 | 94.9M | { |
610 | 94.9M | MicrosecondsInt64 start_wall_us = GetMonoTimeMicros(); |
611 | 94.9M | task.runnable->Run(); |
612 | 94.9M | int64_t wall_us = GetMonoTimeMicros() - start_wall_us; |
613 | | |
614 | 94.9M | if (metrics_.run_time_us_histogram) { |
615 | 733k | metrics_.run_time_us_histogram->Increment(wall_us); |
616 | 733k | } |
617 | 94.9M | if (token->metrics_.run_time_us_histogram) { |
618 | 3 | token->metrics_.run_time_us_histogram->Increment(wall_us); |
619 | 3 | } |
620 | 94.9M | } |
621 | | // Destruct the task while we do not hold the lock. |
622 | | // |
623 | | // The task's destructor may be expensive if it has a lot of bound |
624 | | // objects, and we don't want to block submission of the threadpool. |
625 | | // In the worst case, the destructor might even try to do something |
626 | | // with this threadpool, and produce a deadlock. |
627 | 94.9M | task.runnable.reset(); |
628 | 94.9M | unique_lock.Lock(); |
629 | | |
630 | | // Possible states: |
631 | | // 1. The token was shut down while we ran its task. Transition to kQuiesced. |
632 | | // 2. The token has no more queued tasks. Transition back to kIdle. |
633 | | // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. |
634 | 94.9M | ThreadPoolTokenState state = token->state(); |
635 | 94.9M | DCHECK(state == ThreadPoolTokenState::kRunning || |
636 | 94.9M | state == ThreadPoolTokenState::kQuiescing); |
637 | 94.9M | if (--token->active_threads_ == 0) { |
638 | 89.1M | if (state == ThreadPoolTokenState::kQuiescing) { |
639 | 656 | DCHECK(token->entries_.empty()); |
640 | 656 | token->Transition(ThreadPoolTokenState::kQuiesced); |
641 | 89.1M | } else if (token->entries_.empty()) { |
642 | 80.7M | token->Transition(ThreadPoolTokenState::kIdle); |
643 | 80.7M | } else if (8.42M token->mode() == ExecutionMode::SERIAL8.42M ) { |
644 | 5.08M | queue_.emplace_back(token); |
645 | 5.08M | } |
646 | 89.1M | } |
647 | 94.9M | if (--active_threads_ == 0) { |
648 | 75.7M | idle_cond_.Broadcast(); |
649 | 75.7M | } |
650 | 94.9M | } |
651 | | |
652 | | // It's important that we hold the lock between exiting the loop and dropping |
653 | | // num_threads_. Otherwise it's possible someone else could come along here |
654 | | // and add a new task just as the last running thread is about to exit. |
655 | 1.01M | CHECK(unique_lock.OwnsLock()); |
656 | | |
657 | 1.01M | CHECK_EQ(threads_.erase(Thread::current_thread()), 1); |
658 | 1.01M | if (--num_threads_ == 0) { |
659 | 527k | no_threads_cond_.Broadcast(); |
660 | | |
661 | | // Sanity check: if we're the last thread exiting, the queue ought to be |
662 | | // empty. Otherwise it will never get processed. |
663 | 527k | CHECK(queue_.empty()); |
664 | 527k | DCHECK_EQ(0, total_queued_tasks_); |
665 | 527k | } |
666 | 1.01M | } |
667 | | |
668 | 1.01M | Status ThreadPool::CreateThreadUnlocked() { |
669 | | // The first few threads are permanent, and do not time out. |
670 | 1.01M | bool permanent = (num_threads_ < min_threads_); |
671 | 1.01M | scoped_refptr<Thread> t; |
672 | 1.01M | Status s = yb::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_), |
673 | 1.01M | &ThreadPool::DispatchThread, this, permanent, &t); |
674 | 1.01M | if (s.ok()1.01M ) { |
675 | 1.01M | InsertOrDie(&threads_, t.get()); |
676 | 1.01M | num_threads_++; |
677 | 1.01M | } |
678 | 1.01M | return s; |
679 | 1.01M | } |
680 | | |
681 | 711k | void ThreadPool::CheckNotPoolThreadUnlocked() { |
682 | 711k | Thread* current = Thread::current_thread(); |
683 | 711k | if (ContainsKey(threads_, current)) { |
684 | 0 | LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with " |
685 | 0 | "name '$1' called pool function that would result in deadlock", |
686 | 0 | name_, current->name()); |
687 | 0 | } |
688 | 711k | } |
689 | | |
690 | 0 | CHECKED_STATUS TaskRunner::Init(int concurrency) { |
691 | 0 | ThreadPoolBuilder builder("Task Runner"); |
692 | 0 | if (concurrency > 0) { |
693 | 0 | builder.set_max_threads(concurrency); |
694 | 0 | } |
695 | 0 | return builder.Build(&thread_pool_); |
696 | 0 | } |
697 | | |
698 | 0 | CHECKED_STATUS TaskRunner::Wait() { |
699 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
700 | 0 | cond_.wait(lock, [this] { return running_tasks_ == 0; }); |
701 | 0 | return first_failure_; |
702 | 0 | } |
703 | | |
704 | 0 | void TaskRunner::CompleteTask(const Status& status) { |
705 | 0 | if (!status.ok()) { |
706 | 0 | bool expected = false; |
707 | 0 | if (failed_.compare_exchange_strong(expected, true)) { |
708 | 0 | first_failure_ = status; |
709 | 0 | } else { |
710 | 0 | LOG(WARNING) << status.message() << std::endl; |
711 | 0 | } |
712 | 0 | } |
713 | 0 | if (--running_tasks_ == 0) { |
714 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
715 | 0 | cond_.notify_one(); |
716 | 0 | } |
717 | 0 | } |
718 | | |
719 | | } // namespace yb |