YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/thread_pool.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/thread_pool.h"
17
18
#include <condition_variable>
19
#include <mutex>
20
21
#include <cds/container/basket_queue.h>
22
#include <cds/gc/dhp.h>
23
24
#include "yb/util/scope_exit.h"
25
#include "yb/util/status_format.h"
26
#include "yb/util/thread.h"
27
28
namespace yb {
29
namespace rpc {
30
31
namespace {
32
33
class Worker;
34
35
typedef cds::container::BasketQueue<cds::gc::DHP, ThreadPoolTask*> TaskQueue;
36
typedef cds::container::BasketQueue<cds::gc::DHP, Worker*> WaitingWorkers;
37
38
struct ThreadPoolShare {
39
  ThreadPoolOptions options;
40
  TaskQueue task_queue;
41
  WaitingWorkers waiting_workers;
42
43
  explicit ThreadPoolShare(ThreadPoolOptions o)
44
37.4k
      : options(std::move(o)) {}
45
};
46
47
namespace {
48
49
const std::string kRpcThreadCategory = "rpc_thread_pool";
50
51
} // namespace
52
53
class Worker {
54
 public:
55
  explicit Worker(ThreadPoolShare* share)
56
156k
      : share_(share) {
57
156k
  }
58
59
156k
  CHECKED_STATUS Start(size_t index) {
60
156k
    auto name = strings::Substitute("rpc_tp_$0_$1", share_->options.name, index);
61
156k
    return yb::Thread::Create(kRpcThreadCategory, name, &Worker::Execute, this, &thread_);
62
156k
  }
63
64
6.65k
  ~Worker() {
65
6.65k
    if (thread_) {
66
6.64k
      thread_->Join();
67
6.64k
    }
68
6.65k
  }
69
70
  Worker(const Worker& worker) = delete;
71
  void operator=(const Worker& worker) = delete;
72
73
6.64k
  void Stop() {
74
6.64k
    stop_requested_ = true;
75
6.64k
    std::lock_guard<std::mutex> lock(mutex_);
76
6.64k
    cond_.notify_one();
77
6.64k
  }
78
79
53.7M
  bool Notify() {
80
53.7M
    std::lock_guard<std::mutex> lock(mutex_);
81
53.7M
    added_to_waiting_workers_ = false;
82
    // There could be cases when we popped task after adding ourselves to worker queue (see below).
83
    // So we are already processing task, but reside in worker queue.
84
    // To handle this case we use waiting_task_ flag.
85
    // If we don't wait task, we return false here, and next worker would be popped from queue
86
    // and notified.
87
53.7M
    if (!waiting_task_) {
88
8.73k
      return false;
89
8.73k
    }
90
53.7M
    cond_.notify_one();
91
53.7M
    return true;
92
53.7M
  }
93
94
 private:
95
  // Our main invariant is empty task queue or empty worker queue.
96
  // In other words, one of those queues should be empty.
97
  // Meaning that we does not have work (task queue empty) or
98
  // does not have free hands (worker queue empty)
99
156k
  void Execute() {
100
156k
    Thread::current_thread()->SetUserData(share_);
101
54.2M
    while (!stop_requested_) {
102
54.0M
      ThreadPoolTask* task = nullptr;
103
54.0M
      if (PopTask(&task)) {
104
53.9M
        task->Run();
105
53.9M
        task->Done(Status::OK());
106
53.9M
      }
107
54.0M
    }
108
156k
  }
109
110
54.0M
  bool PopTask(ThreadPoolTask** task) {
111
    // First of all we try to get already queued task, w/o locking.
112
    // If there is no task, so we could go to waiting state.
113
54.0M
    if (share_->task_queue.pop(*task)) {
114
5.62M
      return true;
115
5.62M
    }
116
48.4M
    std::unique_lock<std::mutex> lock(mutex_);
117
48.4M
    waiting_task_ = true;
118
48.3M
    auto se = ScopeExit([this] {
119
48.3M
      waiting_task_ = false;
120
48.3M
    });
121
122
54.2M
    while (!stop_requested_) {
123
54.1M
      AddToWaitingWorkers();
124
125
      // There could be situation, when task was queued before we added ourselves to
126
      // the worker queue. So worker queue could be empty in this case, and nobody was notified
127
      // about new task. So we check there for this case. This technique is similar to
128
      // double check.
129
54.1M
      if (share_->task_queue.pop(*task)) {
130
328k
        return true;
131
328k
      }
132
133
53.8M
      cond_.wait(lock);
134
135
      // Sometimes another worker could steal task before we wake up. In this case we will
136
      // just enqueue ourselves back.
137
53.8M
      if (share_->task_queue.pop(*task)) {
138
48.0M
        return true;
139
48.0M
      }
140
53.8M
    }
141
87.3k
    return false;
142
48.4M
  }
143
144
54.1M
  void AddToWaitingWorkers() {
145
54.1M
    if (!added_to_waiting_workers_) {
146
53.8M
      auto pushed = share_->waiting_workers.push(this);
147
53.8M
      DCHECK(pushed); // BasketQueue always succeed.
148
53.8M
      added_to_waiting_workers_ = true;
149
53.8M
    }
150
54.1M
  }
151
152
  ThreadPoolShare* share_;
153
  scoped_refptr<yb::Thread> thread_;
154
  std::mutex mutex_;
155
  std::condition_variable cond_;
156
  std::atomic<bool> stop_requested_ = {false};
157
  bool waiting_task_ = false;
158
  bool added_to_waiting_workers_ = false;
159
};
160
161
} // namespace
162
163
class ThreadPool::Impl {
164
 public:
165
  explicit Impl(ThreadPoolOptions options)
166
      : share_(std::move(options)),
167
        queue_full_status_(STATUS_SUBSTITUTE(ServiceUnavailable,
168
                                             "Queue is full, max items: $0",
169
37.4k
                                             share_.options.queue_limit)) {
170
37.4k
    LOG(INFO) << "Starting thread pool " << share_.options.ToString();
171
37.4k
    workers_.reserve(share_.options.max_workers);
172
37.4k
  }
173
174
11.2k
  const ThreadPoolOptions& options() const {
175
11.2k
    return share_.options;
176
11.2k
  }
177
178
53.7M
  bool Enqueue(ThreadPoolTask* task) {
179
53.7M
    ++adding_;
180
53.7M
    if (closing_) {
181
10.1k
      --adding_;
182
10.1k
      task->Done(shutdown_status_);
183
10.1k
      return false;
184
10.1k
    }
185
53.7M
    bool added = share_.task_queue.push(task);
186
53.7M
    DCHECK(added); // BasketQueue always succeed.
187
53.7M
    Worker* worker = nullptr;
188
53.7M
    while (share_.waiting_workers.pop(worker)) {
189
53.7M
      if (worker->Notify()) {
190
53.7M
        --adding_;
191
53.7M
        return true;
192
53.7M
      }
193
53.7M
    }
194
7.45k
    --adding_;
195
196
    // We increment created_workers_ every time, the first max_worker increments would produce
197
    // a new worker. And after that, we will just increment it doing nothing after that.
198
    // So we could be lock free here.
199
7.45k
    auto index = created_workers_++;
200
156k
    if (index < share_.options.max_workers) {
201
156k
      std::lock_guard<std::mutex> lock(mutex_);
202
156k
      if (!closing_) {
203
156k
        auto new_worker = std::make_unique<Worker>(&share_);
204
156k
        auto status = new_worker->Start(workers_.size());
205
156k
        if (status.ok()) {
206
156k
          workers_.push_back(std::move(new_worker));
207
13
        } else if (workers_.empty()) {
208
0
          LOG(FATAL) << "Unable to start first worker: " << status;
209
13
        } else {
210
13
          LOG(WARNING) << "Unable to start worker: " << status;
211
13
        }
212
156k
      }
213
18.4E
    } else {
214
18.4E
      --created_workers_;
215
18.4E
    }
216
7.45k
    return true;
217
53.7M
  }
218
219
17.8k
  void Shutdown() {
220
    // Block creating new workers.
221
17.8k
    created_workers_ += share_.options.max_workers;
222
17.8k
    {
223
17.8k
      std::lock_guard<std::mutex> lock(mutex_);
224
17.8k
      if (closing_) {
225
9.87k
        CHECK(share_.task_queue.empty());
226
9.87k
        CHECK(workers_.empty());
227
9.87k
        return;
228
9.87k
      }
229
7.95k
      closing_ = true;
230
7.95k
    }
231
6.65k
    for (auto& worker : workers_) {
232
6.65k
      if (worker) {
233
6.64k
        worker->Stop();
234
6.64k
      }
235
6.65k
    }
236
    // Shutdown is quite rare situation otherwise enqueue is quite frequent.
237
    // Because of this we use "atomic lock" in enqueue and busy wait in shutdown.
238
    // So we could process enqueue quickly, and stuck in shutdown for sometime.
239
7.95k
    while (adding_ != 0) {
240
0
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
241
0
    }
242
7.95k
    workers_.clear();
243
7.95k
    ThreadPoolTask* task = nullptr;
244
7.96k
    while (share_.task_queue.pop(task)) {
245
6
      task->Done(shutdown_status_);
246
6
    }
247
7.95k
  }
248
249
4.94M
  bool Owns(Thread* thread) {
250
4.94M
    return thread && thread->user_data() == &share_;
251
4.94M
  }
252
253
 private:
254
  ThreadPoolShare share_;
255
  std::vector<std::unique_ptr<Worker>> workers_;
256
  std::atomic<size_t> created_workers_ = {0};
257
  std::mutex mutex_;
258
  std::atomic<bool> closing_ = {false};
259
  std::atomic<size_t> adding_ = {0};
260
  const Status shutdown_status_ = STATUS(Aborted, "Service is shutting down");
261
  const Status queue_full_status_;
262
};
263
264
ThreadPool::ThreadPool(ThreadPoolOptions options)
265
37.4k
    : impl_(new Impl(std::move(options))) {
266
37.4k
}
267
268
ThreadPool::ThreadPool(ThreadPool&& rhs) noexcept
269
0
    : impl_(std::move(rhs.impl_)) {}
270
271
0
ThreadPool& ThreadPool::operator=(ThreadPool&& rhs) noexcept {
272
0
  impl_->Shutdown();
273
0
  impl_ = std::move(rhs.impl_);
274
0
  return *this;
275
0
}
276
277
7.95k
ThreadPool::~ThreadPool() {
278
7.95k
  if (impl_) {
279
7.95k
    impl_->Shutdown();
280
7.95k
  }
281
7.95k
}
282
283
4.44M
bool ThreadPool::IsCurrentThreadRpcWorker() {
284
4.44M
  const Thread* thread = Thread::current_thread();
285
4.44M
  return thread != nullptr && thread->category() == kRpcThreadCategory;
286
4.44M
}
287
288
53.8M
bool ThreadPool::Enqueue(ThreadPoolTask* task) {
289
53.8M
  return impl_->Enqueue(task);
290
53.8M
}
291
292
9.89k
void ThreadPool::Shutdown() {
293
9.89k
  impl_->Shutdown();
294
9.89k
}
295
296
11.2k
const ThreadPoolOptions& ThreadPool::options() const {
297
11.2k
  return impl_->options();
298
11.2k
}
299
300
4.95M
bool ThreadPool::Owns(Thread* thread) {
301
4.95M
  return impl_->Owns(thread);
302
4.95M
}
303
304
4.95M
bool ThreadPool::OwnsThisThread() {
305
4.95M
  return Owns(Thread::current_thread());
306
4.95M
}
307
308
} // namespace rpc
309
} // namespace yb