/Users/deen/code/yugabyte-db/src/yb/rpc/tasks_pool.h
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #ifndef YB_RPC_TASKS_POOL_H |
17 | | #define YB_RPC_TASKS_POOL_H |
18 | | |
19 | | #include <boost/lockfree/queue.hpp> |
20 | | |
21 | | #include "yb/rpc/thread_pool.h" |
22 | | |
23 | | namespace yb { |
24 | | namespace rpc { |
25 | | |
26 | | class ThreadPool; |
27 | | |
28 | | // Tasks pool that could be used in conjunction with ThreadPool, to preallocate a buffer for a fixed |
29 | | // number of tasks and avoid allocating memory for each task separately. |
30 | | template <class Task> |
31 | | class TasksPool { |
32 | | public: |
33 | 9.48k | explicit TasksPool(size_t size) : tasks_(size), queue_(size) { |
34 | 4.72M | for (auto& task : tasks_) { |
35 | 4.72M | CHECK(queue_.bounded_push(&task)); |
36 | 4.72M | } |
37 | 9.48k | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEEC2Em Line | Count | Source | 33 | 4.74k | explicit TasksPool(size_t size) : tasks_(size), queue_(size) { | 34 | 2.36M | for (auto& task : tasks_) { | 35 | 2.36M | CHECK(queue_.bounded_push(&task)); | 36 | 2.36M | } | 37 | 4.74k | } |
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEEC2Em Line | Count | Source | 33 | 4.74k | explicit TasksPool(size_t size) : tasks_(size), queue_(size) { | 34 | 2.36M | for (auto& task : tasks_) { | 35 | 2.36M | CHECK(queue_.bounded_push(&task)); | 36 | 2.36M | } | 37 | 4.74k | } |
|
38 | | |
39 | | template <class... Args> |
40 | 2.01k | bool Enqueue(ThreadPool* thread_pool, Args&&... args) { |
41 | 2.01k | WrappedTask* task = nullptr; |
42 | 2.01k | if (queue_.pop(task)) { |
43 | 2.01k | task->pool = this; |
44 | 2.01k | new (&task->storage) Task(std::forward<Args>(args)...); |
45 | 2.01k | thread_pool->Enqueue(task); |
46 | 2.01k | return true; |
47 | 0 | } else { |
48 | 0 | return false; |
49 | 0 | } |
50 | 2.01k | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE7EnqueueIJRKPNS2_8YBClientEPNS3_21TransactionTableStateERyEEEbPNS0_10ThreadPoolEDpOT_ Line | Count | Source | 40 | 684 | bool Enqueue(ThreadPool* thread_pool, Args&&... args) { | 41 | 684 | WrappedTask* task = nullptr; | 42 | 684 | if (queue_.pop(task)) { | 43 | 684 | task->pool = this; | 44 | 684 | new (&task->storage) Task(std::forward<Args>(args)...); | 45 | 684 | thread_pool->Enqueue(task); | 46 | 684 | return true; | 47 | 0 | } else { | 48 | 0 | return false; | 49 | 0 | } | 50 | 684 | } |
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE7EnqueueIJPNS3_21TransactionTableStateERNSt3__18functionIFvRKNS_6ResultINS9_12basic_stringIcNS9_11char_traitsIcEENS9_9allocatorIcEEEEEEEEERNS_19TransactionLocalityEEEEbPNS0_10ThreadPoolEDpOT_ transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE7EnqueueIJRKPNS2_8YBClientEPNS3_21TransactionTableStateEiRNSt3__18functionIFvRKNS_6ResultINSD_12basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEEEEEEERNS_19TransactionLocalityEEEEbPNS0_10ThreadPoolEDpOT_ Line | Count | Source | 40 | 1.33k | bool Enqueue(ThreadPool* thread_pool, Args&&... args) { | 41 | 1.33k | WrappedTask* task = nullptr; | 42 | 1.33k | if (queue_.pop(task)) { | 43 | 1.33k | task->pool = this; | 44 | 1.33k | new (&task->storage) Task(std::forward<Args>(args)...); | 45 | 1.33k | thread_pool->Enqueue(task); | 46 | 1.33k | return true; | 47 | 0 | } else { | 48 | 0 | return false; | 49 | 0 | } | 50 | 1.33k | } |
|
51 | | |
52 | 0 | size_t size() const { |
53 | 0 | return tasks_.size(); |
54 | 0 | } Unexecuted instantiation: transaction_manager.cc:_ZNK2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE4sizeEv Unexecuted instantiation: transaction_manager.cc:_ZNK2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE4sizeEv |
55 | | private: |
56 | | struct WrappedTask; |
57 | | friend struct WrappedTask; |
58 | | |
59 | 2.01k | void Released(WrappedTask* task) { |
60 | 2.01k | CHECK(queue_.bounded_push(task)); |
61 | 2.01k | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE8ReleasedEPNS5_11WrappedTaskE Line | Count | Source | 59 | 2.01k | void Released(WrappedTask* task) { | 60 | 2.01k | CHECK(queue_.bounded_push(task)); | 61 | 2.01k | } |
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE8ReleasedEPNS5_11WrappedTaskE |
62 | | |
63 | | struct WrappedTask : public ThreadPoolTask { |
64 | | TasksPool<Task>* pool = nullptr; |
65 | | typename std::aligned_storage<sizeof(Task), alignof(Task)>::type storage; |
66 | | |
67 | 6.04k | Task& task() { |
68 | 0 | return *reinterpret_cast<Task*>(&storage);; |
69 | 0 | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTask4taskEv Line | Count | Source | 67 | 6.04k | Task& task() { | 68 | 0 | return *reinterpret_cast<Task*>(&storage);; | 69 | 0 | } |
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTask4taskEv |
70 | | |
71 | 2.01k | void Run() override { |
72 | 2.01k | task().Run(); |
73 | 2.01k | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTask3RunEv Line | Count | Source | 71 | 2.01k | void Run() override { | 72 | 2.01k | task().Run(); | 73 | 2.01k | } |
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTask3RunEv |
74 | | |
75 | 2.01k | void Done(const Status& status) override { |
76 | 2.01k | task().Done(status); |
77 | 2.01k | task().~Task(); |
78 | 2.01k | TasksPool<Task>* tasks_pool = pool; |
79 | 2.01k | pool = nullptr; |
80 | 2.01k | tasks_pool->Released(this); |
81 | 2.01k | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTask4DoneERKNS_6StatusE Line | Count | Source | 75 | 2.01k | void Done(const Status& status) override { | 76 | 2.01k | task().Done(status); | 77 | 2.01k | task().~Task(); | 78 | 2.01k | TasksPool<Task>* tasks_pool = pool; | 79 | 2.01k | pool = nullptr; | 80 | 2.01k | tasks_pool->Released(this); | 81 | 2.01k | } |
Unexecuted instantiation: transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTask4DoneERKNS_6StatusE |
82 | | |
83 | 3.64M | virtual ~WrappedTask() { |
84 | 3.64M | CHECK(!pool); |
85 | 3.64M | } transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_121LoadStatusTabletsTaskEE11WrappedTaskD2Ev Line | Count | Source | 83 | 1.82M | virtual ~WrappedTask() { | 84 | 1.82M | CHECK(!pool); | 85 | 1.82M | } |
transaction_manager.cc:_ZN2yb3rpc9TasksPoolINS_6client12_GLOBAL__N_118InvokeCallbackTaskEE11WrappedTaskD2Ev Line | Count | Source | 83 | 1.82M | virtual ~WrappedTask() { | 84 | 1.82M | CHECK(!pool); | 85 | 1.82M | } |
|
86 | | }; |
87 | | |
88 | | std::vector<WrappedTask> tasks_; |
89 | | boost::lockfree::queue<WrappedTask*> queue_; |
90 | | }; |
91 | | |
92 | | } // namespace rpc |
93 | | } // namespace yb |
94 | | |
95 | | #endif // YB_RPC_TASKS_POOL_H |