/Users/deen/code/yugabyte-db/src/yb/rpc/thread_pool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rpc/thread_pool.h" |
17 | | |
18 | | #include <condition_variable> |
19 | | #include <mutex> |
20 | | |
21 | | #include <cds/container/basket_queue.h> |
22 | | #include <cds/gc/dhp.h> |
23 | | |
24 | | #include "yb/util/scope_exit.h" |
25 | | #include "yb/util/status_format.h" |
26 | | #include "yb/util/thread.h" |
27 | | |
28 | | namespace yb { |
29 | | namespace rpc { |
30 | | |
31 | | namespace { |
32 | | |
33 | | class Worker; |
34 | | |
35 | | typedef cds::container::BasketQueue<cds::gc::DHP, ThreadPoolTask*> TaskQueue; |
36 | | typedef cds::container::BasketQueue<cds::gc::DHP, Worker*> WaitingWorkers; |
37 | | |
38 | | struct ThreadPoolShare { |
39 | | ThreadPoolOptions options; |
40 | | TaskQueue task_queue; |
41 | | WaitingWorkers waiting_workers; |
42 | | |
43 | | explicit ThreadPoolShare(ThreadPoolOptions o) |
44 | 61.9k | : options(std::move(o)) {} |
45 | | }; |
46 | | |
47 | | namespace { |
48 | | |
49 | | const std::string kRpcThreadCategory = "rpc_thread_pool"; |
50 | | |
51 | | } // namespace |
52 | | |
53 | | class Worker { |
54 | | public: |
55 | | explicit Worker(ThreadPoolShare* share) |
56 | 239k | : share_(share) { |
57 | 239k | } |
58 | | |
59 | 239k | CHECKED_STATUS Start(size_t index) { |
60 | 239k | auto name = strings::Substitute("rpc_tp_$0_$1", share_->options.name, index); |
61 | 239k | return yb::Thread::Create(kRpcThreadCategory, name, &Worker::Execute, this, &thread_); |
62 | 239k | } |
63 | | |
64 | 9.69k | ~Worker() { |
65 | 9.69k | if (thread_) { |
66 | 9.67k | thread_->Join(); |
67 | 9.67k | } |
68 | 9.69k | } |
69 | | |
70 | | Worker(const Worker& worker) = delete; |
71 | | void operator=(const Worker& worker) = delete; |
72 | | |
73 | 9.69k | void Stop() { |
74 | 9.69k | stop_requested_ = true; |
75 | 9.69k | std::lock_guard<std::mutex> lock(mutex_); |
76 | 9.69k | cond_.notify_one(); |
77 | 9.69k | } |
78 | | |
79 | 168M | bool Notify() { |
80 | 168M | std::lock_guard<std::mutex> lock(mutex_); |
81 | 168M | added_to_waiting_workers_ = false; |
82 | | // There could be cases when we popped task after adding ourselves to worker queue (see below). |
83 | | // So we are already processing task, but reside in worker queue. |
84 | | // To handle this case we use waiting_task_ flag. |
85 | | // If we don't wait task, we return false here, and next worker would be popped from queue |
86 | | // and notified. |
87 | 168M | if (!waiting_task_) { |
88 | 8.68k | return false; |
89 | 8.68k | } |
90 | 168M | cond_.notify_one(); |
91 | 168M | return true; |
92 | 168M | } |
93 | | |
94 | | private: |
95 | | // Our main invariant is empty task queue or empty worker queue. |
96 | | // In other words, one of those queues should be empty. |
97 | | // Meaning that we does not have work (task queue empty) or |
98 | | // does not have free hands (worker queue empty) |
99 | 239k | void Execute() { |
100 | 239k | Thread::current_thread()->SetUserData(share_); |
101 | 168M | while (!stop_requested_) { |
102 | 168M | ThreadPoolTask* task = nullptr; |
103 | 168M | if (PopTask(&task)) { |
104 | 168M | task->Run(); |
105 | 168M | task->Done(Status::OK()); |
106 | 168M | } |
107 | 168M | } |
108 | 239k | } |
109 | | |
110 | 168M | bool PopTask(ThreadPoolTask** task) { |
111 | | // First of all we try to get already queued task, w/o locking. |
112 | | // If there is no task, so we could go to waiting state. |
113 | 168M | if (share_->task_queue.pop(*task)) { |
114 | 16.4M | return true; |
115 | 16.4M | } |
116 | 152M | std::unique_lock<std::mutex> lock(mutex_); |
117 | 152M | waiting_task_ = true; |
118 | 152M | auto se = ScopeExit([this] { |
119 | 151M | waiting_task_ = false; |
120 | 151M | }); |
121 | | |
122 | 168M | while (!stop_requested_) { |
123 | 168M | AddToWaitingWorkers(); |
124 | | |
125 | | // There could be situation, when task was queued before we added ourselves to |
126 | | // the worker queue. So worker queue could be empty in this case, and nobody was notified |
127 | | // about new task. So we check there for this case. This technique is similar to |
128 | | // double check. |
129 | 168M | if (share_->task_queue.pop(*task)) { |
130 | 594k | return true; |
131 | 594k | } |
132 | | |
133 | 168M | cond_.wait(lock); |
134 | | |
135 | | // Sometimes another worker could steal task before we wake up. In this case we will |
136 | | // just enqueue ourselves back. |
137 | 168M | if (share_->task_queue.pop(*task)) { |
138 | 151M | return true; |
139 | 151M | } |
140 | 168M | } |
141 | 97.8k | return false; |
142 | 152M | } |
143 | | |
144 | 168M | void AddToWaitingWorkers() { |
145 | 168M | if (!added_to_waiting_workers_) { |
146 | 168M | auto pushed = share_->waiting_workers.push(this); |
147 | 168M | DCHECK(pushed); // BasketQueue always succeed. |
148 | 168M | added_to_waiting_workers_ = true; |
149 | 168M | } |
150 | 168M | } |
151 | | |
152 | | ThreadPoolShare* share_; |
153 | | scoped_refptr<yb::Thread> thread_; |
154 | | std::mutex mutex_; |
155 | | std::condition_variable cond_; |
156 | | std::atomic<bool> stop_requested_ = {false}; |
157 | | bool waiting_task_ = false; |
158 | | bool added_to_waiting_workers_ = false; |
159 | | }; |
160 | | |
161 | | } // namespace |
162 | | |
163 | | class ThreadPool::Impl { |
164 | | public: |
165 | | explicit Impl(ThreadPoolOptions options) |
166 | | : share_(std::move(options)), |
167 | | queue_full_status_(STATUS_SUBSTITUTE(ServiceUnavailable, |
168 | | "Queue is full, max items: $0", |
169 | 61.9k | share_.options.queue_limit)) { |
170 | 61.9k | LOG(INFO) << "Starting thread pool " << share_.options.ToString(); |
171 | 61.9k | workers_.reserve(share_.options.max_workers); |
172 | 61.9k | } |
173 | | |
174 | 16.7k | const ThreadPoolOptions& options() const { |
175 | 16.7k | return share_.options; |
176 | 16.7k | } |
177 | | |
178 | 166M | bool Enqueue(ThreadPoolTask* task) { |
179 | 166M | ++adding_; |
180 | 166M | if (closing_) { |
181 | 10.0k | --adding_; |
182 | 10.0k | task->Done(shutdown_status_); |
183 | 10.0k | return false; |
184 | 10.0k | } |
185 | 166M | bool added = share_.task_queue.push(task); |
186 | 166M | DCHECK(added); // BasketQueue always succeed. |
187 | 166M | Worker* worker = nullptr; |
188 | 168M | while (share_.waiting_workers.pop(worker)166M ) { |
189 | 168M | if (worker->Notify()168M ) { |
190 | 168M | --adding_; |
191 | 168M | return true; |
192 | 168M | } |
193 | 168M | } |
194 | 18.4E | --adding_; |
195 | | |
196 | | // We increment created_workers_ every time, the first max_worker increments would produce |
197 | | // a new worker. And after that, we will just increment it doing nothing after that. |
198 | | // So we could be lock free here. |
199 | 18.4E | auto index = created_workers_++; |
200 | 18.4E | if (index < share_.options.max_workers) { |
201 | 239k | std::lock_guard<std::mutex> lock(mutex_); |
202 | 239k | if (!closing_239k ) { |
203 | 239k | auto new_worker = std::make_unique<Worker>(&share_); |
204 | 239k | auto status = new_worker->Start(workers_.size()); |
205 | 239k | if (status.ok()) { |
206 | 239k | workers_.push_back(std::move(new_worker)); |
207 | 239k | } else if (28 workers_.empty()28 ) { |
208 | 0 | LOG(FATAL) << "Unable to start first worker: " << status; |
209 | 28 | } else { |
210 | 28 | LOG(WARNING) << "Unable to start worker: " << status; |
211 | 28 | } |
212 | 239k | } |
213 | 18.4E | } else { |
214 | 18.4E | --created_workers_; |
215 | 18.4E | } |
216 | 18.4E | return true; |
217 | 166M | } |
218 | | |
219 | 34.1k | void Shutdown() { |
220 | | // Block creating new workers. |
221 | 34.1k | created_workers_ += share_.options.max_workers; |
222 | 34.1k | { |
223 | 34.1k | std::lock_guard<std::mutex> lock(mutex_); |
224 | 34.1k | if (closing_) { |
225 | 17.9k | CHECK(share_.task_queue.empty()); |
226 | 17.9k | CHECK(workers_.empty()); |
227 | 17.9k | return; |
228 | 17.9k | } |
229 | 16.1k | closing_ = true; |
230 | 16.1k | } |
231 | 9.70k | for (auto& worker : workers_) { |
232 | 9.70k | if (worker) { |
233 | 9.69k | worker->Stop(); |
234 | 9.69k | } |
235 | 9.70k | } |
236 | | // Shutdown is quite rare situation otherwise enqueue is quite frequent. |
237 | | // Because of this we use "atomic lock" in enqueue and busy wait in shutdown. |
238 | | // So we could process enqueue quickly, and stuck in shutdown for sometime. |
239 | 16.1k | while (adding_ != 0) { |
240 | 1 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
241 | 1 | } |
242 | 16.1k | workers_.clear(); |
243 | 16.1k | ThreadPoolTask* task = nullptr; |
244 | 16.2k | while (share_.task_queue.pop(task)) { |
245 | 92 | task->Done(shutdown_status_); |
246 | 92 | } |
247 | 16.1k | } |
248 | | |
249 | 9.25M | bool Owns(Thread* thread) { |
250 | 9.25M | return thread9.25M && thread->user_data() == &share_; |
251 | 9.25M | } |
252 | | |
253 | | private: |
254 | | ThreadPoolShare share_; |
255 | | std::vector<std::unique_ptr<Worker>> workers_; |
256 | | std::atomic<size_t> created_workers_ = {0}; |
257 | | std::mutex mutex_; |
258 | | std::atomic<bool> closing_ = {false}; |
259 | | std::atomic<size_t> adding_ = {0}; |
260 | | const Status shutdown_status_ = STATUS(Aborted, "Service is shutting down"); |
261 | | const Status queue_full_status_; |
262 | | }; |
263 | | |
264 | | ThreadPool::ThreadPool(ThreadPoolOptions options) |
265 | 61.9k | : impl_(new Impl(std::move(options))) { |
266 | 61.9k | } |
267 | | |
268 | | ThreadPool::ThreadPool(ThreadPool&& rhs) noexcept |
269 | 0 | : impl_(std::move(rhs.impl_)) {} |
270 | | |
271 | 0 | ThreadPool& ThreadPool::operator=(ThreadPool&& rhs) noexcept { |
272 | 0 | impl_->Shutdown(); |
273 | 0 | impl_ = std::move(rhs.impl_); |
274 | 0 | return *this; |
275 | 0 | } |
276 | | |
277 | 16.0k | ThreadPool::~ThreadPool() { |
278 | 16.0k | if (impl_) { |
279 | 15.9k | impl_->Shutdown(); |
280 | 15.9k | } |
281 | 16.0k | } |
282 | | |
283 | 8.78M | bool ThreadPool::IsCurrentThreadRpcWorker() { |
284 | 8.78M | const Thread* thread = Thread::current_thread(); |
285 | 8.78M | return thread != nullptr && thread->category() == kRpcThreadCategory8.76M ; |
286 | 8.78M | } |
287 | | |
288 | 168M | bool ThreadPool::Enqueue(ThreadPoolTask* task) { |
289 | 168M | return impl_->Enqueue(task); |
290 | 168M | } |
291 | | |
292 | 18.1k | void ThreadPool::Shutdown() { |
293 | 18.1k | impl_->Shutdown(); |
294 | 18.1k | } |
295 | | |
296 | 16.7k | const ThreadPoolOptions& ThreadPool::options() const { |
297 | 16.7k | return impl_->options(); |
298 | 16.7k | } |
299 | | |
300 | 9.26M | bool ThreadPool::Owns(Thread* thread) { |
301 | 9.26M | return impl_->Owns(thread); |
302 | 9.26M | } |
303 | | |
304 | 9.27M | bool ThreadPool::OwnsThisThread() { |
305 | 9.27M | return Owns(Thread::current_thread()); |
306 | 9.27M | } |
307 | | |
308 | | } // namespace rpc |
309 | | } // namespace yb |