YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/io_thread_pool.cc
Line
Count
Source
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/io_thread_pool.h"
17
18
#include <thread>
19
20
#include <boost/asio/io_service.hpp>
21
#include <boost/optional.hpp>
22
#include <glog/logging.h>
23
24
#include "yb/util/status_log.h"
25
#include "yb/util/thread.h"
26
27
using namespace std::literals;
28
29
namespace yb {
30
namespace rpc {
31
32
class IoThreadPool::Impl {
33
 public:
34
36.2k
  Impl(const std::string& name, size_t num_threads) : name_(name) {
35
36.2k
    threads_.reserve(num_threads);
36
36.2k
    size_t index = 0;
37
136k
    while (threads_.size() != num_threads) {
38
100k
      threads_.push_back(CHECK_RESULT(Thread::Make(
39
100k
          Format("iotp_$0", name_), Format("iotp_$0_$1", name_, index),
40
100k
          std::bind(&Impl::Execute, this))));
41
100k
      ++index;
42
100k
    }
43
36.2k
  }
44
45
4.08k
  ~Impl() {
46
4.08k
    Shutdown();
47
4.08k
    Join();
48
4.08k
  }
49
50
58.4k
  IoService& io_service() {
51
58.4k
    return io_service_;
52
58.4k
  }
53
54
7.83k
  void Shutdown() {
55
7.83k
    work_.reset();
56
7.83k
  }
57
58
7.84k
  void Join() {
59
7.84k
    auto deadline = std::chrono::steady_clock::now() + 15s;
60
10.7k
    while (!io_service_.stopped()) {
61
2.95k
      if (std::chrono::steady_clock::now() >= deadline) {
62
1
        LOG(ERROR) << "Io service failed to stop";
63
1
        io_service_.stop();
64
1
        break;
65
1
      }
66
2.95k
      std::this_thread::sleep_for(10ms);
67
2.95k
    }
68
30.8k
    for (auto& thread : threads_) {
69
30.8k
      thread->Join();
70
30.8k
    }
71
7.84k
  }
72
73
 private:
74
100k
  void Execute() {
75
100k
    boost::system::error_code ec;
76
100k
    io_service_.run(ec);
77
84.4k
    LOG_IF(ERROR, ec) << "Failed to run io service: " << ec;
78
100k
  }
79
80
  std::string name_;
81
  std::vector<ThreadPtr> threads_;
82
  IoService io_service_;
83
  boost::optional<IoService::work> work_{io_service_};
84
};
85
86
IoThreadPool::IoThreadPool(const std::string& name, size_t num_threads)
87
36.2k
    : impl_(new Impl(name, num_threads)) {}
88
89
4.08k
IoThreadPool::~IoThreadPool() {}
90
91
58.4k
IoService& IoThreadPool::io_service() {
92
58.4k
  return impl_->io_service();
93
58.4k
}
94
95
3.76k
void IoThreadPool::Shutdown() {
96
3.76k
  impl_->Shutdown();
97
3.76k
}
98
99
3.76k
void IoThreadPool::Join() {
100
3.76k
  impl_->Join();
101
3.76k
}
102
103
} // namespace rpc
104
} // namespace yb