/Users/deen/code/yugabyte-db/src/yb/rpc/tasks_pool.h
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #ifndef YB_RPC_TASKS_POOL_H |
17 | | #define YB_RPC_TASKS_POOL_H |
18 | | |
19 | | #include <boost/lockfree/queue.hpp> |
20 | | |
21 | | #include "yb/rpc/thread_pool.h" |
22 | | |
23 | | namespace yb { |
24 | | namespace rpc { |
25 | | |
26 | | class ThreadPool; |
27 | | |
28 | | // Tasks pool that could be used in conjunction with ThreadPool, to preallocate a buffer for a fixed |
29 | | // number of tasks and avoid allocating memory for each task separately. |
30 | | template <class Task> |
31 | | class TasksPool { |
32 | | public: |
33 | 19.6k | explicit TasksPool(size_t size) : tasks_(size), queue_(size) { |
34 | 9.77M | for (auto& task : tasks_) { |
35 | 9.77M | CHECK(queue_.bounded_push(&task)); |
36 | 9.77M | } |
37 | 19.6k | } transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::TasksPool(unsigned long) Line | Count | Source | 33 | 9.81k | explicit TasksPool(size_t size) : tasks_(size), queue_(size) { | 34 | 4.88M | for (auto& task : tasks_) { | 35 | 4.88M | CHECK(queue_.bounded_push(&task)); | 36 | 4.88M | } | 37 | 9.81k | } |
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::TasksPool(unsigned long) Line | Count | Source | 33 | 9.81k | explicit TasksPool(size_t size) : tasks_(size), queue_(size) { | 34 | 4.89M | for (auto& task : tasks_) { | 35 | 4.89M | CHECK(queue_.bounded_push(&task)); | 36 | 4.89M | } | 37 | 9.81k | } |
|
38 | | |
39 | | template <class... Args> |
40 | 3.36k | bool Enqueue(ThreadPool* thread_pool, Args&&... args) { |
41 | 3.36k | WrappedTask* task = nullptr; |
42 | 3.36k | if (queue_.pop(task)3.36k ) { |
43 | 3.36k | task->pool = this; |
44 | 3.36k | new (&task->storage) Task(std::forward<Args>(args)...); |
45 | 3.36k | thread_pool->Enqueue(task); |
46 | 3.36k | return true; |
47 | 18.4E | } else { |
48 | 18.4E | return false; |
49 | 18.4E | } |
50 | 3.36k | } transaction_manager.cc:bool yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::Enqueue<yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*, unsigned long long&>(yb::rpc::ThreadPool*, yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*&&, unsigned long long&) Line | Count | Source | 40 | 1.23k | bool Enqueue(ThreadPool* thread_pool, Args&&... args) { | 41 | 1.23k | WrappedTask* task = nullptr; | 42 | 1.23k | if (queue_.pop(task)) { | 43 | 1.23k | task->pool = this; | 44 | 1.23k | new (&task->storage) Task(std::forward<Args>(args)...); | 45 | 1.23k | thread_pool->Enqueue(task); | 46 | 1.23k | return true; | 47 | 1.23k | } else { | 48 | 0 | return false; | 49 | 0 | } | 50 | 1.23k | } |
Unexecuted instantiation: transaction_manager.cc:bool yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::Enqueue<yb::client::(anonymous namespace)::TransactionTableState*, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&>(yb::rpc::ThreadPool*, yb::client::(anonymous namespace)::TransactionTableState*&&, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&) transaction_manager.cc:bool yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::Enqueue<yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*, int, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&>(yb::rpc::ThreadPool*, yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*&&, int&&, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&) Line | Count | Source | 40 | 2.13k | bool Enqueue(ThreadPool* thread_pool, Args&&... args) { | 41 | 2.13k | WrappedTask* task = nullptr; | 42 | 2.13k | if (queue_.pop(task)2.13k ) { | 43 | 2.13k | task->pool = this; | 44 | 2.13k | new (&task->storage) Task(std::forward<Args>(args)...); | 45 | 2.13k | thread_pool->Enqueue(task); | 46 | 2.13k | return true; | 47 | 18.4E | } else { | 48 | 18.4E | return false; | 49 | 18.4E | } | 50 | 2.13k | } |
|
51 | | |
52 | 0 | size_t size() const { |
53 | 0 | return tasks_.size(); |
54 | 0 | } Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::size() const Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::size() const |
55 | | private: |
56 | | struct WrappedTask; |
57 | | friend struct WrappedTask; |
58 | | |
59 | 3.31k | void Released(WrappedTask* task) { |
60 | 3.31k | CHECK(queue_.bounded_push(task)); |
61 | 3.31k | } transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::Released(yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask*) Line | Count | Source | 59 | 3.31k | void Released(WrappedTask* task) { | 60 | 3.31k | CHECK(queue_.bounded_push(task)); | 61 | 3.31k | } |
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::Released(yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask*) |
62 | | |
63 | | struct WrappedTask : public ThreadPoolTask { |
64 | | TasksPool<Task>* pool = nullptr; |
65 | | typename std::aligned_storage<sizeof(Task), alignof(Task)>::type storage; |
66 | | |
67 | 9.97k | Task& task() { |
68 | 9.97k | return *reinterpret_cast<Task*>(&storage);; |
69 | 0 | } transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::task() Line | Count | Source | 67 | 9.97k | Task& task() { | 68 | 9.97k | return *reinterpret_cast<Task*>(&storage);; | 69 | 0 | } |
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::task() |
70 | | |
71 | 3.36k | void Run() override { |
72 | 3.36k | task().Run(); |
73 | 3.36k | } transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::Run() Line | Count | Source | 71 | 3.36k | void Run() override { | 72 | 3.36k | task().Run(); | 73 | 3.36k | } |
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::Run() |
74 | | |
75 | 3.31k | void Done(const Status& status) override { |
76 | 3.31k | task().Done(status); |
77 | 3.31k | task().~Task(); |
78 | 3.31k | TasksPool<Task>* tasks_pool = pool; |
79 | 3.31k | pool = nullptr; |
80 | 3.31k | tasks_pool->Released(this); |
81 | 3.31k | } transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::Done(yb::Status const&) Line | Count | Source | 75 | 3.31k | void Done(const Status& status) override { | 76 | 3.31k | task().Done(status); | 77 | 3.31k | task().~Task(); | 78 | 3.31k | TasksPool<Task>* tasks_pool = pool; | 79 | 3.31k | pool = nullptr; | 80 | 3.31k | tasks_pool->Released(this); | 81 | 3.31k | } |
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::Done(yb::Status const&) |
82 | | |
83 | 6.66M | virtual ~WrappedTask() { |
84 | 6.66M | CHECK(!pool); |
85 | 6.66M | } transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::~WrappedTask() Line | Count | Source | 83 | 3.33M | virtual ~WrappedTask() { | 84 | 3.33M | CHECK(!pool); | 85 | 3.33M | } |
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::~WrappedTask() Line | Count | Source | 83 | 3.33M | virtual ~WrappedTask() { | 84 | 3.33M | CHECK(!pool); | 85 | 3.33M | } |
|
86 | | }; |
87 | | |
88 | | std::vector<WrappedTask> tasks_; |
89 | | boost::lockfree::queue<WrappedTask*> queue_; |
90 | | }; |
91 | | |
92 | | } // namespace rpc |
93 | | } // namespace yb |
94 | | |
95 | | #endif // YB_RPC_TASKS_POOL_H |