/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/thread_posix.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/util/thread_posix.h" |
25 | | |
26 | | #include <pthread.h> |
27 | | #include <unistd.h> |
28 | | |
29 | | #include <atomic> |
30 | | #ifdef __linux__ |
31 | | #include <sys/syscall.h> |
32 | | #endif |
33 | | |
34 | | #include "yb/util/format.h" |
35 | | #include "yb/util/status_log.h" |
36 | | #include "yb/util/thread.h" |
37 | | |
38 | | namespace rocksdb { |
39 | | |
40 | 6.26M | void ThreadPool::PthreadCall(const char* label, int result) { |
41 | 6.26M | if (result != 0) { |
42 | 0 | fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); |
43 | 0 | abort(); |
44 | 0 | } |
45 | 6.26M | } |
46 | | |
47 | 52.6k | ThreadPool::ThreadPool() { |
48 | 52.6k | PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); |
49 | 52.6k | PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); |
50 | 52.6k | } |
51 | | |
52 | 3.15k | ThreadPool::~ThreadPool() { |
53 | 3.15k | DCHECK(bgthreads_.empty()); |
54 | 3.15k | } |
55 | | |
56 | 3.15k | void ThreadPool::JoinAllThreads() { |
57 | 3.15k | PthreadCall("lock", pthread_mutex_lock(&mu_)); |
58 | 3.15k | DCHECK(!exit_all_threads_); |
59 | 3.15k | exit_all_threads_ = true; |
60 | 3.15k | PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); |
61 | 3.15k | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
62 | 3.15k | for (const auto& thread : bgthreads_) { |
63 | 2.71k | thread->Join(); |
64 | 2.71k | } |
65 | 3.15k | bgthreads_.clear(); |
66 | 3.15k | } |
67 | | |
68 | 0 | void ThreadPool::LowerIOPriority() { |
69 | | #ifdef __linux__ |
70 | | PthreadCall("lock", pthread_mutex_lock(&mu_)); |
71 | | low_io_priority_ = true; |
72 | | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
73 | | #endif |
74 | 0 | } |
75 | | |
76 | 89.8k | void ThreadPool::BGThread(size_t thread_id) { |
77 | 89.8k | bool low_io_priority = false; |
78 | 347k | while (true) { |
79 | | // Wait until there is an item that is ready to run |
80 | 260k | PthreadCall("lock", pthread_mutex_lock(&mu_)); |
81 | | // Stop waiting if the thread needs to do work or needs to terminate. |
82 | 464k | while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id)374k && |
83 | 464k | (374k queue_.empty()374k || IsExcessiveThread(thread_id)170k )) { |
84 | 203k | PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); |
85 | 203k | } |
86 | 260k | if (exit_all_threads_) { // mechanism to let BG threads exit safely |
87 | 2.71k | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
88 | 2.71k | break; |
89 | 2.71k | } |
90 | 257k | if (IsLastExcessiveThread(thread_id)) { |
91 | | // Current thread is the last generated one and is excessive. |
92 | | // We always terminate excessive thread in the reverse order of |
93 | | // generation time. |
94 | 28 | bgthreads_.pop_back(); |
95 | 28 | if (HasExcessiveThread()) { |
96 | | // There is still at least more excessive thread to terminate. |
97 | 17 | WakeUpAllThreads(); |
98 | 17 | } |
99 | 28 | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
100 | 28 | break; |
101 | 28 | } |
102 | 257k | void (*function)(void*) = queue_.front().function; |
103 | 257k | void* arg = queue_.front().arg; |
104 | 257k | queue_.pop_front(); |
105 | 257k | queue_len_.store(static_cast<unsigned int>(queue_.size()), |
106 | 257k | std::memory_order_relaxed); |
107 | | |
108 | 257k | bool decrease_io_priority = (low_io_priority != low_io_priority_); |
109 | 257k | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
110 | | |
111 | | #ifdef __linux__ |
112 | | if (decrease_io_priority) { |
113 | | #define IOPRIO_CLASS_SHIFT (13) |
114 | | #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) |
115 | | // Put schedule into IOPRIO_CLASS_IDLE class (lowest) |
116 | | // These system calls only have an effect when used in conjunction |
117 | | // with an I/O scheduler that supports I/O priorities. As at |
118 | | // kernel 2.6.17 the only such scheduler is the Completely |
119 | | // Fair Queuing (CFQ) I/O scheduler. |
120 | | // To change scheduler: |
121 | | // echo cfq > /sys/block/<device_name>/queue/schedule |
122 | | // Tunables to consider: |
123 | | // /sys/block/<device_name>/queue/slice_idle |
124 | | // /sys/block/<device_name>/queue/slice_sync |
125 | | syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS |
126 | | 0, // current thread |
127 | | IOPRIO_PRIO_VALUE(3, 0)); |
128 | | low_io_priority = true; |
129 | | } |
130 | | #else |
131 | 257k | (void)decrease_io_priority; // avoid 'unused variable' error |
132 | 257k | #endif |
133 | 257k | (*function)(arg); |
134 | 257k | } |
135 | 89.8k | } |
136 | | |
137 | 35.5k | void ThreadPool::WakeUpAllThreads() { |
138 | 35.5k | PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); |
139 | 35.5k | } |
140 | | |
141 | 1.71M | void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { |
142 | 1.71M | PthreadCall("lock", pthread_mutex_lock(&mu_)); |
143 | 1.71M | if (exit_all_threads_) { |
144 | 0 | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
145 | 0 | return; |
146 | 0 | } |
147 | 1.71M | if (num > total_threads_limit_ || |
148 | 1.71M | (1.68M num < total_threads_limit_1.68M && allow_reduce120 )) { |
149 | 35.5k | total_threads_limit_ = std::max(1, num); |
150 | 35.5k | WakeUpAllThreads(); |
151 | 35.5k | StartBGThreads(); |
152 | 35.5k | } |
153 | 1.71M | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
154 | 1.71M | } |
155 | | |
156 | 1.71M | void ThreadPool::IncBackgroundThreadsIfNeeded(int num) { |
157 | 1.71M | SetBackgroundThreadsInternal(num, false); |
158 | 1.71M | } |
159 | | |
160 | 1.17k | void ThreadPool::SetBackgroundThreads(int num) { |
161 | 1.17k | SetBackgroundThreadsInternal(num, true); |
162 | 1.17k | } |
163 | | |
164 | 206k | void ThreadPool::StartBGThreads() { |
165 | | // Start background thread if necessary |
166 | 206k | std::string category_name = yb::Format( |
167 | 206k | "rocksdb:$0", GetThreadPriority() == Env::Priority::HIGH ? "high"77.3k : "low"129k ); |
168 | 296k | while (static_cast<int>(bgthreads_.size()) < total_threads_limit_) { |
169 | 89.8k | size_t tid = bgthreads_.size(); |
170 | 89.8k | std::string thread_name = yb::Format("$0:$1", category_name, tid); |
171 | 89.8k | yb::ThreadPtr thread; |
172 | 89.8k | CHECK_OK(yb::Thread::Create( |
173 | 89.8k | category_name, std::move(thread_name), |
174 | 89.8k | [this, tid]() { this->BGThread(tid); }, &thread)); |
175 | | |
176 | 89.8k | bgthreads_.push_back(thread); |
177 | 89.8k | } |
178 | 206k | } |
179 | | |
180 | | void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, |
181 | 170k | void (*unschedFunction)(void* arg)) { |
182 | 170k | PthreadCall("lock", pthread_mutex_lock(&mu_)); |
183 | | |
184 | 170k | if (exit_all_threads_) { |
185 | 0 | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
186 | 0 | return; |
187 | 0 | } |
188 | | |
189 | 170k | StartBGThreads(); |
190 | | |
191 | | // Add to priority queue |
192 | 170k | queue_.push_back(BGItem()); |
193 | 170k | queue_.back().function = function; |
194 | 170k | queue_.back().arg = arg; |
195 | 170k | queue_.back().tag = tag; |
196 | 170k | queue_.back().unschedFunction = unschedFunction; |
197 | 170k | queue_len_.store(static_cast<unsigned int>(queue_.size()), |
198 | 170k | std::memory_order_relaxed); |
199 | | |
200 | 170k | if (!HasExcessiveThread()) { |
201 | | // Wake up at least one waiting thread. |
202 | 170k | PthreadCall("signal", pthread_cond_signal(&bgsignal_)); |
203 | 170k | } else { |
204 | | // Need to wake up all threads to make sure the one woken |
205 | | // up is not the one to terminate. |
206 | 0 | WakeUpAllThreads(); |
207 | 0 | } |
208 | | |
209 | 170k | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
210 | 170k | } |
211 | | |
212 | 792k | int ThreadPool::UnSchedule(void* arg) { |
213 | 792k | int count = 0; |
214 | 792k | PthreadCall("lock", pthread_mutex_lock(&mu_)); |
215 | | |
216 | | // Remove from priority queue |
217 | 792k | BGQueue::iterator it = queue_.begin(); |
218 | 792k | while (it != queue_.end()) { |
219 | 28 | if (arg == (*it).tag) { |
220 | 8 | void (*unschedFunction)(void*) = (*it).unschedFunction; |
221 | 8 | void* arg1 = (*it).arg; |
222 | 8 | if (unschedFunction != nullptr) { |
223 | 2 | (*unschedFunction)(arg1); |
224 | 2 | } |
225 | 8 | it = queue_.erase(it); |
226 | 8 | count++; |
227 | 20 | } else { |
228 | 20 | it++; |
229 | 20 | } |
230 | 28 | } |
231 | 792k | queue_len_.store(static_cast<unsigned int>(queue_.size()), |
232 | 792k | std::memory_order_relaxed); |
233 | 792k | PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
234 | 792k | return count; |
235 | 792k | } |
236 | | |
237 | | } // namespace rocksdb |