YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/tasks_pool.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#ifndef YB_RPC_TASKS_POOL_H
17
#define YB_RPC_TASKS_POOL_H
18
19
#include <boost/lockfree/queue.hpp>
20
21
#include "yb/rpc/thread_pool.h"
22
23
namespace yb {
24
namespace rpc {
25
26
class ThreadPool;
27
28
// Tasks pool that could be used in conjunction with ThreadPool, to preallocate a buffer for a fixed
29
// number of tasks and avoid allocating memory for each task separately.
30
template <class Task>
31
class TasksPool {
32
 public:
33
9.48k
  explicit TasksPool(size_t size) : tasks_(size), queue_(size) {
34
4.72M
    for (auto& task : tasks_) {
35
4.72M
      CHECK(queue_.bounded_push(&task));
36
4.72M
    }
37
9.48k
  }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEEC2Em
Line
Count
Source
33
4.74k
  explicit TasksPool(size_t size) : tasks_(size), queue_(size) {
34
2.36M
    for (auto& task : tasks_) {
35
2.36M
      CHECK(queue_.bounded_push(&task));
36
2.36M
    }
37
4.74k
  }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEEC2Em
Line
Count
Source
33
4.74k
  explicit TasksPool(size_t size) : tasks_(size), queue_(size) {
34
2.36M
    for (auto& task : tasks_) {
35
2.36M
      CHECK(queue_.bounded_push(&task));
36
2.36M
    }
37
4.74k
  }
38
39
  template <class... Args>
40
2.01k
  bool Enqueue(ThreadPool* thread_pool, Args&&... args) {
41
2.01k
    WrappedTask* task = nullptr;
42
2.01k
    if (queue_.pop(task)) {
43
2.01k
      task->pool = this;
44
2.01k
      new (&task->storage) Task(std::forward<Args>(args)...);
45
2.01k
      thread_pool->Enqueue(task);
46
2.01k
      return true;
47
0
    } else {
48
0
      return false;
49
0
    }
50
2.01k
  }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE7EnqueueIJRKPNS2_8YBClientEPNS3_21TransactionTableStateERyEEEbPNS0_10ThreadPoolEDpOT_
Line
Count
Source
40
684
  bool Enqueue(ThreadPool* thread_pool, Args&&... args) {
41
684
    WrappedTask* task = nullptr;
42
684
    if (queue_.pop(task)) {
43
684
      task->pool = this;
44
684
      new (&task->storage) Task(std::forward<Args>(args)...);
45
684
      thread_pool->Enqueue(task);
46
684
      return true;
47
0
    } else {
48
0
      return false;
49
0
    }
50
684
  }
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE7EnqueueIJPNS3_21TransactionTableStateERNSt3__18functionIFvRKNS_6ResultINS9_12basic_stringIcNS9_11char_traitsIcEENS9_9allocatorIcEEEEEEEEERNS_19TransactionLocalityEEEEbPNS0_10ThreadPoolEDpOT_
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE7EnqueueIJRKPNS2_8YBClientEPNS3_21TransactionTableStateEiRNSt3__18functionIFvRKNS_6ResultINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEEEEEEERNS_19TransactionLocalityEEEEbPNS0_10ThreadPoolEDpOT_
Line
Count
Source
40
1.33k
  bool Enqueue(ThreadPool* thread_pool, Args&&... args) {
41
1.33k
    WrappedTask* task = nullptr;
42
1.33k
    if (queue_.pop(task)) {
43
1.33k
      task->pool = this;
44
1.33k
      new (&task->storage) Task(std::forward<Args>(args)...);
45
1.33k
      thread_pool->Enqueue(task);
46
1.33k
      return true;
47
0
    } else {
48
0
      return false;
49
0
    }
50
1.33k
  }
51
52
0
  size_t size() const {
53
0
    return tasks_.size();
54
0
  }
Unexecuted instantiation: transaction_manager.cc:_ZNK2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE4sizeEv
Unexecuted instantiation: transaction_manager.cc:_ZNK2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE4sizeEv
55
 private:
56
  struct WrappedTask;
57
  friend struct WrappedTask;
58
59
2.01k
  void Released(WrappedTask* task) {
60
2.01k
    CHECK(queue_.bounded_push(task));
61
2.01k
  }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE8ReleasedEPNS5_11WrappedTaskE
Line
Count
Source
59
2.01k
  void Released(WrappedTask* task) {
60
2.01k
    CHECK(queue_.bounded_push(task));
61
2.01k
  }
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE8ReleasedEPNS5_11WrappedTaskE
62
63
  struct WrappedTask : public ThreadPoolTask {
64
    TasksPool<Task>* pool = nullptr;
65
    typename std::aligned_storage<sizeof(Task), alignof(Task)>::type storage;
66
67
6.04k
    Task& task() {
68
0
      return *reinterpret_cast<Task*>(&storage);;
69
0
    }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTask4taskEv
Line
Count
Source
67
6.04k
    Task& task() {
68
0
      return *reinterpret_cast<Task*>(&storage);;
69
0
    }
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTask4taskEv
70
71
2.01k
    void Run() override {
72
2.01k
      task().Run();
73
2.01k
    }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTask3RunEv
Line
Count
Source
71
2.01k
    void Run() override {
72
2.01k
      task().Run();
73
2.01k
    }
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTask3RunEv
74
75
2.01k
    void Done(const Status& status) override {
76
2.01k
      task().Done(status);
77
2.01k
      task().~Task();
78
2.01k
      TasksPool<Task>* tasks_pool = pool;
79
2.01k
      pool = nullptr;
80
2.01k
      tasks_pool->Released(this);
81
2.01k
    }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTask4DoneERKNS_6StatusE
Line
Count
Source
75
2.01k
    void Done(const Status& status) override {
76
2.01k
      task().Done(status);
77
2.01k
      task().~Task();
78
2.01k
      TasksPool<Task>* tasks_pool = pool;
79
2.01k
      pool = nullptr;
80
2.01k
      tasks_pool->Released(this);
81
2.01k
    }
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTask4DoneERKNS_6StatusE
82
83
3.64M
    virtual ~WrappedTask() {
84
3.64M
      CHECK(!pool);
85
3.64M
    }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTaskD2Ev
Line
Count
Source
83
1.82M
    virtual ~WrappedTask() {
84
1.82M
      CHECK(!pool);
85
1.82M
    }
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTaskD2Ev
Line
Count
Source
83
1.82M
    virtual ~WrappedTask() {
84
1.82M
      CHECK(!pool);
85
1.82M
    }
86
  };
87
88
  std::vector<WrappedTask> tasks_;
89
  boost::lockfree::queue<WrappedTask*> queue_;
90
};
91
92
} // namespace rpc
93
} // namespace yb
94
95
#endif // YB_RPC_TASKS_POOL_H