YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/thread_pool.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/thread_pool.h"
17
18
#include <condition_variable>
19
#include <mutex>
20
21
#include <cds/container/basket_queue.h>
22
#include <cds/gc/dhp.h>
23
24
#include "yb/util/scope_exit.h"
25
#include "yb/util/status_format.h"
26
#include "yb/util/thread.h"
27
28
namespace yb {
29
namespace rpc {
30
31
namespace {
32
33
class Worker;
34
35
typedef cds::container::BasketQueue<cds::gc::DHP, ThreadPoolTask*> TaskQueue;
36
typedef cds::container::BasketQueue<cds::gc::DHP, Worker*> WaitingWorkers;
37
38
struct ThreadPoolShare {
39
  ThreadPoolOptions options;
40
  TaskQueue task_queue;
41
  WaitingWorkers waiting_workers;
42
43
  explicit ThreadPoolShare(ThreadPoolOptions o)
44
61.9k
      : options(std::move(o)) {}
45
};
46
47
namespace {
48
49
const std::string kRpcThreadCategory = "rpc_thread_pool";
50
51
} // namespace
52
53
class Worker {
54
 public:
55
  explicit Worker(ThreadPoolShare* share)
56
239k
      : share_(share) {
57
239k
  }
58
59
239k
  CHECKED_STATUS Start(size_t index) {
60
239k
    auto name = strings::Substitute("rpc_tp_$0_$1", share_->options.name, index);
61
239k
    return yb::Thread::Create(kRpcThreadCategory, name, &Worker::Execute, this, &thread_);
62
239k
  }
63
64
9.69k
  ~Worker() {
65
9.69k
    if (thread_) {
66
9.67k
      thread_->Join();
67
9.67k
    }
68
9.69k
  }
69
70
  Worker(const Worker& worker) = delete;
71
  void operator=(const Worker& worker) = delete;
72
73
9.69k
  void Stop() {
74
9.69k
    stop_requested_ = true;
75
9.69k
    std::lock_guard<std::mutex> lock(mutex_);
76
9.69k
    cond_.notify_one();
77
9.69k
  }
78
79
168M
  bool Notify() {
80
168M
    std::lock_guard<std::mutex> lock(mutex_);
81
168M
    added_to_waiting_workers_ = false;
82
    // There could be cases when we popped task after adding ourselves to worker queue (see below).
83
    // So we are already processing task, but reside in worker queue.
84
    // To handle this case we use waiting_task_ flag.
85
    // If we don't wait task, we return false here, and next worker would be popped from queue
86
    // and notified.
87
168M
    if (!waiting_task_) {
88
8.68k
      return false;
89
8.68k
    }
90
168M
    cond_.notify_one();
91
168M
    return true;
92
168M
  }
93
94
 private:
95
  // Our main invariant is empty task queue or empty worker queue.
96
  // In other words, one of those queues should be empty.
97
  // Meaning that we does not have work (task queue empty) or
98
  // does not have free hands (worker queue empty)
99
239k
  void Execute() {
100
239k
    Thread::current_thread()->SetUserData(share_);
101
168M
    while (!stop_requested_) {
102
168M
      ThreadPoolTask* task = nullptr;
103
168M
      if (PopTask(&task)) {
104
168M
        task->Run();
105
168M
        task->Done(Status::OK());
106
168M
      }
107
168M
    }
108
239k
  }
109
110
168M
  bool PopTask(ThreadPoolTask** task) {
111
    // First of all we try to get already queued task, w/o locking.
112
    // If there is no task, so we could go to waiting state.
113
168M
    if (share_->task_queue.pop(*task)) {
114
16.4M
      return true;
115
16.4M
    }
116
152M
    std::unique_lock<std::mutex> lock(mutex_);
117
152M
    waiting_task_ = true;
118
152M
    auto se = ScopeExit([this] {
119
151M
      waiting_task_ = false;
120
151M
    });
121
122
168M
    while (!stop_requested_) {
123
168M
      AddToWaitingWorkers();
124
125
      // There could be situation, when task was queued before we added ourselves to
126
      // the worker queue. So worker queue could be empty in this case, and nobody was notified
127
      // about new task. So we check there for this case. This technique is similar to
128
      // double check.
129
168M
      if (share_->task_queue.pop(*task)) {
130
594k
        return true;
131
594k
      }
132
133
168M
      cond_.wait(lock);
134
135
      // Sometimes another worker could steal task before we wake up. In this case we will
136
      // just enqueue ourselves back.
137
168M
      if (share_->task_queue.pop(*task)) {
138
151M
        return true;
139
151M
      }
140
168M
    }
141
97.8k
    return false;
142
152M
  }
143
144
168M
  void AddToWaitingWorkers() {
145
168M
    if (!added_to_waiting_workers_) {
146
168M
      auto pushed = share_->waiting_workers.push(this);
147
168M
      DCHECK(pushed); // BasketQueue always succeed.
148
168M
      added_to_waiting_workers_ = true;
149
168M
    }
150
168M
  }
151
152
  ThreadPoolShare* share_;
153
  scoped_refptr<yb::Thread> thread_;
154
  std::mutex mutex_;
155
  std::condition_variable cond_;
156
  std::atomic<bool> stop_requested_ = {false};
157
  bool waiting_task_ = false;
158
  bool added_to_waiting_workers_ = false;
159
};
160
161
} // namespace
162
163
class ThreadPool::Impl {
164
 public:
165
  explicit Impl(ThreadPoolOptions options)
166
      : share_(std::move(options)),
167
        queue_full_status_(STATUS_SUBSTITUTE(ServiceUnavailable,
168
                                             "Queue is full, max items: $0",
169
61.9k
                                             share_.options.queue_limit)) {
170
61.9k
    LOG(INFO) << "Starting thread pool " << share_.options.ToString();
171
61.9k
    workers_.reserve(share_.options.max_workers);
172
61.9k
  }
173
174
16.7k
  const ThreadPoolOptions& options() const {
175
16.7k
    return share_.options;
176
16.7k
  }
177
178
166M
  bool Enqueue(ThreadPoolTask* task) {
179
166M
    ++adding_;
180
166M
    if (closing_) {
181
10.0k
      --adding_;
182
10.0k
      task->Done(shutdown_status_);
183
10.0k
      return false;
184
10.0k
    }
185
166M
    bool added = share_.task_queue.push(task);
186
166M
    DCHECK(added); // BasketQueue always succeed.
187
166M
    Worker* worker = nullptr;
188
168M
    while (
share_.waiting_workers.pop(worker)166M
) {
189
168M
      if (
worker->Notify()168M
) {
190
168M
        --adding_;
191
168M
        return true;
192
168M
      }
193
168M
    }
194
18.4E
    --adding_;
195
196
    // We increment created_workers_ every time, the first max_worker increments would produce
197
    // a new worker. And after that, we will just increment it doing nothing after that.
198
    // So we could be lock free here.
199
18.4E
    auto index = created_workers_++;
200
18.4E
    if (index < share_.options.max_workers) {
201
239k
      std::lock_guard<std::mutex> lock(mutex_);
202
239k
      if (
!closing_239k
) {
203
239k
        auto new_worker = std::make_unique<Worker>(&share_);
204
239k
        auto status = new_worker->Start(workers_.size());
205
239k
        if (status.ok()) {
206
239k
          workers_.push_back(std::move(new_worker));
207
239k
        } else 
if (28
workers_.empty()28
) {
208
0
          LOG(FATAL) << "Unable to start first worker: " << status;
209
28
        } else {
210
28
          LOG(WARNING) << "Unable to start worker: " << status;
211
28
        }
212
239k
      }
213
18.4E
    } else {
214
18.4E
      --created_workers_;
215
18.4E
    }
216
18.4E
    return true;
217
166M
  }
218
219
34.1k
  void Shutdown() {
220
    // Block creating new workers.
221
34.1k
    created_workers_ += share_.options.max_workers;
222
34.1k
    {
223
34.1k
      std::lock_guard<std::mutex> lock(mutex_);
224
34.1k
      if (closing_) {
225
17.9k
        CHECK(share_.task_queue.empty());
226
17.9k
        CHECK(workers_.empty());
227
17.9k
        return;
228
17.9k
      }
229
16.1k
      closing_ = true;
230
16.1k
    }
231
9.70k
    for (auto& worker : workers_) {
232
9.70k
      if (worker) {
233
9.69k
        worker->Stop();
234
9.69k
      }
235
9.70k
    }
236
    // Shutdown is quite rare situation otherwise enqueue is quite frequent.
237
    // Because of this we use "atomic lock" in enqueue and busy wait in shutdown.
238
    // So we could process enqueue quickly, and stuck in shutdown for sometime.
239
16.1k
    while (adding_ != 0) {
240
1
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
241
1
    }
242
16.1k
    workers_.clear();
243
16.1k
    ThreadPoolTask* task = nullptr;
244
16.2k
    while (share_.task_queue.pop(task)) {
245
92
      task->Done(shutdown_status_);
246
92
    }
247
16.1k
  }
248
249
9.25M
  bool Owns(Thread* thread) {
250
9.25M
    return 
thread9.25M
&& thread->user_data() == &share_;
251
9.25M
  }
252
253
 private:
254
  ThreadPoolShare share_;
255
  std::vector<std::unique_ptr<Worker>> workers_;
256
  std::atomic<size_t> created_workers_ = {0};
257
  std::mutex mutex_;
258
  std::atomic<bool> closing_ = {false};
259
  std::atomic<size_t> adding_ = {0};
260
  const Status shutdown_status_ = STATUS(Aborted, "Service is shutting down");
261
  const Status queue_full_status_;
262
};
263
264
ThreadPool::ThreadPool(ThreadPoolOptions options)
265
61.9k
    : impl_(new Impl(std::move(options))) {
266
61.9k
}
267
268
ThreadPool::ThreadPool(ThreadPool&& rhs) noexcept
269
0
    : impl_(std::move(rhs.impl_)) {}
270
271
0
ThreadPool& ThreadPool::operator=(ThreadPool&& rhs) noexcept {
272
0
  impl_->Shutdown();
273
0
  impl_ = std::move(rhs.impl_);
274
0
  return *this;
275
0
}
276
277
16.0k
ThreadPool::~ThreadPool() {
278
16.0k
  if (impl_) {
279
15.9k
    impl_->Shutdown();
280
15.9k
  }
281
16.0k
}
282
283
8.78M
bool ThreadPool::IsCurrentThreadRpcWorker() {
284
8.78M
  const Thread* thread = Thread::current_thread();
285
8.78M
  return thread != nullptr && 
thread->category() == kRpcThreadCategory8.76M
;
286
8.78M
}
287
288
168M
bool ThreadPool::Enqueue(ThreadPoolTask* task) {
289
168M
  return impl_->Enqueue(task);
290
168M
}
291
292
18.1k
void ThreadPool::Shutdown() {
293
18.1k
  impl_->Shutdown();
294
18.1k
}
295
296
16.7k
const ThreadPoolOptions& ThreadPool::options() const {
297
16.7k
  return impl_->options();
298
16.7k
}
299
300
9.26M
bool ThreadPool::Owns(Thread* thread) {
301
9.26M
  return impl_->Owns(thread);
302
9.26M
}
303
304
9.27M
bool ThreadPool::OwnsThisThread() {
305
9.27M
  return Owns(Thread::current_thread());
306
9.27M
}
307
308
} // namespace rpc
309
} // namespace yb