YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/thread_pool.h
Line
Count
Source
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#ifndef YB_RPC_THREAD_POOL_H
17
#define YB_RPC_THREAD_POOL_H
18
19
#include <memory>
20
#include <string>
21
22
#include "yb/gutil/port.h"
23
24
#include "yb/util/tostring.h"
25
26
namespace yb {
27
28
class Status;
29
class Thread;
30
31
namespace rpc {
32
33
class ThreadPoolTask {
34
 public:
35
  // Invoked in thread pool
36
  virtual void Run() = 0;
37
38
  // When thread pool done with task, i.e. it completed or failed, it invokes Done
39
  virtual void Done(const Status& status) = 0;
40
41
 protected:
42
65.7M
  ~ThreadPoolTask() {}
43
};
44
45
template <class F, class Base = ThreadPoolTask>
46
class FunctorThreadPoolTask : public Base {
47
 public:
48
  explicit FunctorThreadPoolTask(const F& f) : f_(f) {}
49
121
  explicit FunctorThreadPoolTask(F&& f) : f_(std::move(f)) {}
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand26ThreadPoolTest_Strand_Test8TestBodyEvE3$_3NS0_10StrandTaskEEC2EOS4_
Line
Count
Source
49
100
  explicit FunctorThreadPoolTask(F&& f) : f_(std::move(f)) {}
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand34ThreadPoolTest_StrandShutdown_Test8TestBodyEvE3$_4NS0_10StrandTaskEEC2EOS4_
Line
Count
Source
49
1
  explicit FunctorThreadPoolTask(F&& f) : f_(std::move(f)) {}
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand48ThreadPoolTest_StrandShutdownAndDestroyRace_Test8TestBodyEvE3$_7NS0_10StrandTaskEEC2EOS4_
Line
Count
Source
49
20
  explicit FunctorThreadPoolTask(F&& f) : f_(std::move(f)) {}
50
51
121
  virtual ~FunctorThreadPoolTask() = default;
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand26ThreadPoolTest_Strand_Test8TestBodyEvE3$_3NS0_10StrandTaskEED2Ev
Line
Count
Source
51
100
  virtual ~FunctorThreadPoolTask() = default;
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand34ThreadPoolTest_StrandShutdown_Test8TestBodyEvE3$_4NS0_10StrandTaskEED2Ev
Line
Count
Source
51
1
  virtual ~FunctorThreadPoolTask() = default;
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand48ThreadPoolTest_StrandShutdownAndDestroyRace_Test8TestBodyEvE3$_7NS0_10StrandTaskEED2Ev
Line
Count
Source
51
20
  virtual ~FunctorThreadPoolTask() = default;
52
53
 private:
54
111
  void Run() override {
55
111
    f_();
56
111
  }
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand26ThreadPoolTest_Strand_Test8TestBodyEvE3$_3NS0_10StrandTaskEE3RunEv
Line
Count
Source
54
100
  void Run() override {
55
100
    f_();
56
100
  }
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand34ThreadPoolTest_StrandShutdown_Test8TestBodyEvE3$_4NS0_10StrandTaskEE3RunEv
Line
Count
Source
54
1
  void Run() override {
55
1
    f_();
56
1
  }
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand48ThreadPoolTest_StrandShutdownAndDestroyRace_Test8TestBodyEvE3$_7NS0_10StrandTaskEE3RunEv
Line
Count
Source
54
10
  void Run() override {
55
10
    f_();
56
10
  }
57
58
121
  void Done(const Status& status) override {
59
121
    delete this;
60
121
  }
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand26ThreadPoolTest_Strand_Test8TestBodyEvE3$_3NS0_10StrandTaskEE4DoneERKNS_6StatusE
Line
Count
Source
58
100
  void Done(const Status& status) override {
59
100
    delete this;
60
100
  }
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand34ThreadPoolTest_StrandShutdown_Test8TestBodyEvE3$_4NS0_10StrandTaskEE4DoneERKNS_6StatusE
Line
Count
Source
58
1
  void Done(const Status& status) override {
59
1
    delete this;
60
1
  }
thread_pool-test.cc:_ZN2yb3rpc21FunctorThreadPoolTaskIZNS0_6strand48ThreadPoolTest_StrandShutdownAndDestroyRace_Test8TestBodyEvE3$_7NS0_10StrandTaskEE4DoneERKNS_6StatusE
Line
Count
Source
58
20
  void Done(const Status& status) override {
59
20
    delete this;
60
20
  }
61
62
  F f_;
63
};
64
65
template <class F>
66
FunctorThreadPoolTask<F>* MakeFunctorThreadPoolTask(const F& f) {
67
  return new FunctorThreadPoolTask<F>(f);
68
}
69
70
template <class F>
71
FunctorThreadPoolTask<F>* MakeFunctorThreadPoolTask(F&& f) {
72
  return new FunctorThreadPoolTask<F>(std::move(f));
73
}
74
75
struct ThreadPoolOptions {
76
  std::string name;
77
  size_t queue_limit;
78
  size_t max_workers;
79
80
37.4k
  std::string ToString() const {
81
37.4k
    return YB_STRUCT_TO_STRING(name, queue_limit, max_workers);
82
37.4k
  }
83
};
84
85
class ThreadPool {
86
 public:
87
  explicit ThreadPool(ThreadPoolOptions options);
88
89
  template <class... Args>
90
  explicit ThreadPool(Args&&... args)
91
37.4k
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
37.4k
  }
_ZN2yb3rpc10ThreadPoolC1IJRA11_KcRKmmEEEDpOT_
Line
Count
Source
91
1
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
1
  }
_ZN2yb3rpc10ThreadPoolC1IJRA5_KcRKmS7_EEEDpOT_
Line
Count
Source
91
10
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
10
  }
_ZN2yb3rpc10ThreadPoolC1IJRA9_KcRKmS7_EEEDpOT_
Line
Count
Source
91
82
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
82
  }
_ZN2yb3rpc10ThreadPoolC1IJRA19_KcRyS6_EEEDpOT_
Line
Count
Source
91
4.72k
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
4.72k
  }
_ZN2yb3rpc10ThreadPoolC1IJNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKmSB_EEEDpOT_
Line
Count
Source
91
11.2k
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
11.2k
  }
_ZN2yb3rpc10ThreadPoolC1IJRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKmSD_EEEDpOT_
Line
Count
Source
91
21.3k
      : ThreadPool(ThreadPoolOptions{std::forward<Args>(args)...}) {
92
93
21.3k
  }
94
95
  ~ThreadPool();
96
97
  ThreadPool(ThreadPool&& rhs) noexcept;
98
  ThreadPool& operator=(ThreadPool&& rhs) noexcept;
99
100
  const ThreadPoolOptions& options() const;
101
102
  bool Enqueue(ThreadPoolTask* task);
103
104
  template <class F>
105
  void EnqueueFunctor(const F& f) {
106
    Enqueue(MakeFunctorThreadPoolTask(f));
107
  }
108
109
  template <class F>
110
  void EnqueueFunctor(F&& f) {
111
    Enqueue(MakeFunctorThreadPoolTask(std::move(f)));
112
  }
113
114
  void Shutdown();
115
116
  static bool IsCurrentThreadRpcWorker();
117
118
  bool Owns(Thread* thread);
119
  bool OwnsThisThread();
120
121
 private:
122
  class Impl;
123
124
  std::unique_ptr<Impl> impl_;
125
};
126
127
} // namespace rpc
128
} // namespace yb
129
130
#endif // YB_RPC_THREAD_POOL_H