YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/io_thread_pool.cc
Line
Count
Source
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/io_thread_pool.h"
17
18
#include <thread>
19
20
#include <boost/asio/io_service.hpp>
21
#include <boost/optional.hpp>
22
#include <glog/logging.h>
23
24
#include "yb/util/status_log.h"
25
#include "yb/util/thread.h"
26
27
using namespace std::literals;
28
29
namespace yb {
30
namespace rpc {
31
32
class IoThreadPool::Impl {
33
 public:
34
54.5k
  Impl(const std::string& name, size_t num_threads) : name_(name) {
35
54.5k
    threads_.reserve(num_threads);
36
54.5k
    size_t index = 0;
37
215k
    while (threads_.size() != num_threads) {
38
160k
      threads_.push_back(CHECK_RESULT(Thread::Make(
39
160k
          Format("iotp_$0", name_), Format("iotp_$0_$1", name_, index),
40
160k
          std::bind(&Impl::Execute, this))));
41
160k
      ++index;
42
160k
    }
43
54.5k
  }
44
45
9.10k
  ~Impl() {
46
9.10k
    Shutdown();
47
9.10k
    Join();
48
9.10k
  }
49
50
91.7k
  IoService& io_service() {
51
91.7k
    return io_service_;
52
91.7k
  }
53
54
17.6k
  void Shutdown() {
55
17.6k
    work_.reset();
56
17.6k
  }
57
58
17.6k
  void Join() {
59
17.6k
    auto deadline = std::chrono::steady_clock::now() + 15s;
60
23.9k
    while (!io_service_.stopped()) {
61
6.30k
      if (std::chrono::steady_clock::now() >= deadline) {
62
1
        LOG(ERROR) << "Io service failed to stop";
63
1
        io_service_.stop();
64
1
        break;
65
1
      }
66
6.30k
      std::this_thread::sleep_for(10ms);
67
6.30k
    }
68
69.9k
    for (auto& thread : threads_) {
69
69.9k
      thread->Join();
70
69.9k
    }
71
17.6k
  }
72
73
 private:
74
160k
  void Execute() {
75
160k
    boost::system::error_code ec;
76
160k
    io_service_.run(ec);
77
160k
    LOG_IF
(ERROR, ec) << "Failed to run io service: " << ec124k
;
78
160k
  }
79
80
  std::string name_;
81
  std::vector<ThreadPtr> threads_;
82
  IoService io_service_;
83
  boost::optional<IoService::work> work_{io_service_};
84
};
85
86
IoThreadPool::IoThreadPool(const std::string& name, size_t num_threads)
87
54.5k
    : impl_(new Impl(name, num_threads)) {}
88
89
9.09k
IoThreadPool::~IoThreadPool() {}
90
91
91.7k
IoService& IoThreadPool::io_service() {
92
91.7k
  return impl_->io_service();
93
91.7k
}
94
95
8.56k
void IoThreadPool::Shutdown() {
96
8.56k
  impl_->Shutdown();
97
8.56k
}
98
99
8.56k
void IoThreadPool::Join() {
100
8.56k
  impl_->Join();
101
8.56k
}
102
103
} // namespace rpc
104
} // namespace yb