YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/tasks_pool.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#ifndef YB_RPC_TASKS_POOL_H
17
#define YB_RPC_TASKS_POOL_H
18
19
#include <boost/lockfree/queue.hpp>
20
21
#include "yb/rpc/thread_pool.h"
22
23
namespace yb {
24
namespace rpc {
25
26
class ThreadPool;
27
28
// Tasks pool that could be used in conjunction with ThreadPool, to preallocate a buffer for a fixed
29
// number of tasks and avoid allocating memory for each task separately.
30
template <class Task>
31
class TasksPool {
32
 public:
33
19.6k
  explicit TasksPool(size_t size) : tasks_(size), queue_(size) {
34
9.77M
    for (auto& task : tasks_) {
35
9.77M
      CHECK(queue_.bounded_push(&task));
36
9.77M
    }
37
19.6k
  }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::TasksPool(unsigned long)
Line
Count
Source
33
9.81k
  explicit TasksPool(size_t size) : tasks_(size), queue_(size) {
34
4.88M
    for (auto& task : tasks_) {
35
4.88M
      CHECK(queue_.bounded_push(&task));
36
4.88M
    }
37
9.81k
  }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::TasksPool(unsigned long)
Line
Count
Source
33
9.81k
  explicit TasksPool(size_t size) : tasks_(size), queue_(size) {
34
4.89M
    for (auto& task : tasks_) {
35
4.89M
      CHECK(queue_.bounded_push(&task));
36
4.89M
    }
37
9.81k
  }
38
39
  template <class... Args>
40
3.36k
  bool Enqueue(ThreadPool* thread_pool, Args&&... args) {
41
3.36k
    WrappedTask* task = nullptr;
42
3.36k
    if (
queue_.pop(task)3.36k
) {
43
3.36k
      task->pool = this;
44
3.36k
      new (&task->storage) Task(std::forward<Args>(args)...);
45
3.36k
      thread_pool->Enqueue(task);
46
3.36k
      return true;
47
18.4E
    } else {
48
18.4E
      return false;
49
18.4E
    }
50
3.36k
  }
transaction_manager.cc:bool yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::Enqueue<yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*, unsigned long long&>(yb::rpc::ThreadPool*, yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*&&, unsigned long long&)
Line
Count
Source
40
1.23k
  bool Enqueue(ThreadPool* thread_pool, Args&&... args) {
41
1.23k
    WrappedTask* task = nullptr;
42
1.23k
    if (queue_.pop(task)) {
43
1.23k
      task->pool = this;
44
1.23k
      new (&task->storage) Task(std::forward<Args>(args)...);
45
1.23k
      thread_pool->Enqueue(task);
46
1.23k
      return true;
47
1.23k
    } else {
48
0
      return false;
49
0
    }
50
1.23k
  }
Unexecuted instantiation: transaction_manager.cc:bool yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::Enqueue<yb::client::(anonymous namespace)::TransactionTableState*, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&>(yb::rpc::ThreadPool*, yb::client::(anonymous namespace)::TransactionTableState*&&, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&)
transaction_manager.cc:bool yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::Enqueue<yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*, int, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&>(yb::rpc::ThreadPool*, yb::client::YBClient* const&, yb::client::(anonymous namespace)::TransactionTableState*&&, int&&, std::__1::function<void (yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)>&, yb::TransactionLocality&)
Line
Count
Source
40
2.13k
  bool Enqueue(ThreadPool* thread_pool, Args&&... args) {
41
2.13k
    WrappedTask* task = nullptr;
42
2.13k
    if (
queue_.pop(task)2.13k
) {
43
2.13k
      task->pool = this;
44
2.13k
      new (&task->storage) Task(std::forward<Args>(args)...);
45
2.13k
      thread_pool->Enqueue(task);
46
2.13k
      return true;
47
18.4E
    } else {
48
18.4E
      return false;
49
18.4E
    }
50
2.13k
  }
51
52
0
  size_t size() const {
53
0
    return tasks_.size();
54
0
  }
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::size() const
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::size() const
55
 private:
56
  struct WrappedTask;
57
  friend struct WrappedTask;
58
59
3.31k
  void Released(WrappedTask* task) {
60
3.31k
    CHECK(queue_.bounded_push(task));
61
3.31k
  }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::Released(yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask*)
Line
Count
Source
59
3.31k
  void Released(WrappedTask* task) {
60
3.31k
    CHECK(queue_.bounded_push(task));
61
3.31k
  }
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::Released(yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask*)
62
63
  struct WrappedTask : public ThreadPoolTask {
64
    TasksPool<Task>* pool = nullptr;
65
    typename std::aligned_storage<sizeof(Task), alignof(Task)>::type storage;
66
67
9.97k
    Task& task() {
68
9.97k
      return *reinterpret_cast<Task*>(&storage);;
69
0
    }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::task()
Line
Count
Source
67
9.97k
    Task& task() {
68
9.97k
      return *reinterpret_cast<Task*>(&storage);;
69
0
    }
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::task()
70
71
3.36k
    void Run() override {
72
3.36k
      task().Run();
73
3.36k
    }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::Run()
Line
Count
Source
71
3.36k
    void Run() override {
72
3.36k
      task().Run();
73
3.36k
    }
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::Run()
74
75
3.31k
    void Done(const Status& status) override {
76
3.31k
      task().Done(status);
77
3.31k
      task().~Task();
78
3.31k
      TasksPool<Task>* tasks_pool = pool;
79
3.31k
      pool = nullptr;
80
3.31k
      tasks_pool->Released(this);
81
3.31k
    }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::Done(yb::Status const&)
Line
Count
Source
75
3.31k
    void Done(const Status& status) override {
76
3.31k
      task().Done(status);
77
3.31k
      task().~Task();
78
3.31k
      TasksPool<Task>* tasks_pool = pool;
79
3.31k
      pool = nullptr;
80
3.31k
      tasks_pool->Released(this);
81
3.31k
    }
Unexecuted instantiation: transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::Done(yb::Status const&)
82
83
6.66M
    virtual ~WrappedTask() {
84
6.66M
      CHECK(!pool);
85
6.66M
    }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::LoadStatusTabletsTask>::WrappedTask::~WrappedTask()
Line
Count
Source
83
3.33M
    virtual ~WrappedTask() {
84
3.33M
      CHECK(!pool);
85
3.33M
    }
transaction_manager.cc:yb::rpc::TasksPool<yb::client::(anonymous namespace)::InvokeCallbackTask>::WrappedTask::~WrappedTask()
Line
Count
Source
83
3.33M
    virtual ~WrappedTask() {
84
3.33M
      CHECK(!pool);
85
3.33M
    }
86
  };
87
88
  std::vector<WrappedTask> tasks_;
89
  boost::lockfree::queue<WrappedTask*> queue_;
90
};
91
92
} // namespace rpc
93
} // namespace yb
94
95
#endif // YB_RPC_TASKS_POOL_H