/Users/deen/code/yugabyte-db/src/yb/util/threadpool.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_UTIL_THREADPOOL_H |
33 | | #define YB_UTIL_THREADPOOL_H |
34 | | |
35 | | #include <condition_variable> |
36 | | #include <deque> |
37 | | #include <functional> |
38 | | #include <memory> |
39 | | #include <mutex> |
40 | | #include <string> |
41 | | #include <unordered_set> |
42 | | |
43 | | #include <gtest/gtest_prod.h> |
44 | | |
45 | | #include "yb/gutil/callback_forward.h" |
46 | | #include "yb/gutil/macros.h" |
47 | | #include "yb/gutil/port.h" |
48 | | #include "yb/gutil/ref_counted.h" |
49 | | |
50 | | #include "yb/util/metrics_fwd.h" |
51 | | #include "yb/util/condition_variable.h" |
52 | | #include "yb/util/enums.h" |
53 | | #include "yb/util/math_util.h" |
54 | | #include "yb/util/monotime.h" |
55 | | #include "yb/util/mutex.h" |
56 | | #include "yb/util/status.h" |
57 | | |
58 | | namespace yb { |
59 | | |
60 | | class Thread; |
61 | | class ThreadPool; |
62 | | class ThreadPoolToken; |
63 | | class Trace; |
64 | | |
65 | | class Runnable { |
66 | | public: |
67 | | virtual void Run() = 0; |
68 | 106M | virtual ~Runnable() = default; |
69 | | }; |
70 | | |
71 | | template <class F> |
72 | | class RunnableImpl : public Runnable { |
73 | | public: |
74 | 45.7k | explicit RunnableImpl(const F& f) : f_(f) {} |
75 | 1.84M | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()>::RunnableImpl(void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()&&) Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()>::RunnableImpl(void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()&&) yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&) Line | Count | Source | 75 | 209 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&) Line | Count | Source | 75 | 140k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&>&&) Line | Count | Source | 75 | 134 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&>&&) Line | Count | Source | 75 | 592k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*> >::RunnableImpl(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*>&&) Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&>&&) Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&>&&) yb::RunnableImpl<std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun> >::RunnableImpl(std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun>&&) Line | Count | Source | 75 | 18 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> > >::RunnableImpl(std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> >&&) Line | Count | Source | 75 | 462k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*>&&) Line | Count | Source | 75 | 4.18k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&> >::RunnableImpl(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&>&&) Line | Count | Source | 75 | 4.86k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9>::RunnableImpl(yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9&&) Line | Count | Source | 75 | 63 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&) Line | Count | Source | 75 | 125 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&) Line | Count | Source | 75 | 2 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&>&&) Line | Count | Source | 75 | 98 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::EnableBgTasks()::$_14>::RunnableImpl(yb::master::CatalogManager::EnableBgTasks()::$_14&&) Line | Count | Source | 75 | 7.94k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18>::RunnableImpl(yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18&&) Line | Count | Source | 75 | 363k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()>::RunnableImpl(yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()&&) Line | Count | Source | 75 | 237k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&>&&) Line | Count | Source | 75 | 2 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
sys_catalog.cc:yb::RunnableImpl<yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0>::RunnableImpl(yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0&&) Line | Count | Source | 75 | 44 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1>::RunnableImpl(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1&&) Line | Count | Source | 75 | 1 | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
Unexecuted instantiation: ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2>::RunnableImpl(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2&&) yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&> >::RunnableImpl(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&>&&) Line | Count | Source | 75 | 15.6k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3>::RunnableImpl(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3&&) Line | Count | Source | 75 | 4.85k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*> >::RunnableImpl(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*>&&) Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&> >::RunnableImpl(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&>&&) Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*> >::RunnableImpl(std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*>&&) yb::RunnableImpl<std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&> >::RunnableImpl(std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&>&&) Line | Count | Source | 75 | 11.0k | explicit RunnableImpl(F&& f) : f_(std::move(f)) {} |
|
76 | | |
77 | | private: |
78 | 1.89M | void Run() override { |
79 | 1.89M | f_(); |
80 | 1.89M | } Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()>::Run() Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()>::Run() yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::Run() Line | Count | Source | 78 | 209 | void Run() override { | 79 | 209 | f_(); | 80 | 209 | } |
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::Run() Line | Count | Source | 78 | 140k | void Run() override { | 79 | 140k | f_(); | 80 | 140k | } |
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&> >::Run() Line | Count | Source | 78 | 134 | void Run() override { | 79 | 134 | f_(); | 80 | 134 | } |
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&> >::Run() Line | Count | Source | 78 | 592k | void Run() override { | 79 | 592k | f_(); | 80 | 592k | } |
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*> >::Run() Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&> >::Run() Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&> >::Run() yb::RunnableImpl<std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun> >::Run() Line | Count | Source | 78 | 18 | void Run() override { | 79 | 18 | f_(); | 80 | 18 | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> > >::Run() Line | Count | Source | 78 | 463k | void Run() override { | 79 | 463k | f_(); | 80 | 463k | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*> >::Run() Line | Count | Source | 78 | 4.18k | void Run() override { | 79 | 4.18k | f_(); | 80 | 4.18k | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&> >::Run() Line | Count | Source | 78 | 4.86k | void Run() override { | 79 | 4.86k | f_(); | 80 | 4.86k | } |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9>::Run() Line | Count | Source | 78 | 63 | void Run() override { | 79 | 63 | f_(); | 80 | 63 | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::Run() Line | Count | Source | 78 | 125 | void Run() override { | 79 | 125 | f_(); | 80 | 125 | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::Run() Line | Count | Source | 78 | 1 | void Run() override { | 79 | 1 | f_(); | 80 | 1 | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&> >::Run() Line | Count | Source | 78 | 98 | void Run() override { | 79 | 98 | f_(); | 80 | 98 | } |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::EnableBgTasks()::$_14>::Run() Line | Count | Source | 78 | 7.94k | void Run() override { | 79 | 7.94k | f_(); | 80 | 7.94k | } |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18>::Run() Line | Count | Source | 78 | 363k | void Run() override { | 79 | 363k | f_(); | 80 | 363k | } |
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()>::Run() Line | Count | Source | 78 | 237k | void Run() override { | 79 | 237k | f_(); | 80 | 237k | } |
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&> >::Run() Line | Count | Source | 78 | 2 | void Run() override { | 79 | 2 | f_(); | 80 | 2 | } |
sys_catalog.cc:yb::RunnableImpl<yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0>::Run() Line | Count | Source | 78 | 44 | void Run() override { | 79 | 44 | f_(); | 80 | 44 | } |
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1>::Run() Line | Count | Source | 78 | 1 | void Run() override { | 79 | 1 | f_(); | 80 | 1 | } |
Unexecuted instantiation: ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2>::Run() yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&> >::Run() Line | Count | Source | 78 | 15.6k | void Run() override { | 79 | 15.6k | f_(); | 80 | 15.6k | } |
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3>::Run() Line | Count | Source | 78 | 4.85k | void Run() override { | 79 | 4.85k | f_(); | 80 | 4.85k | } |
yb::RunnableImpl<std::__1::__bind<void (yb::client::YBClient::Data::*)(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, bool, bool), yb::client::YBClient::Data*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >&, bool&, bool&> >::Run() Line | Count | Source | 78 | 48.0k | void Run() override { | 79 | 48.0k | f_(); | 80 | 48.0k | } |
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*> >::Run() Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&> >::Run() Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*> >::Run() yb::RunnableImpl<std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&> >::Run() Line | Count | Source | 78 | 11.0k | void Run() override { | 79 | 11.0k | f_(); | 80 | 11.0k | } |
|
81 | | |
82 | | F f_; |
83 | | }; |
84 | | |
85 | | // Interesting thread pool metrics. Can be applied to the entire pool (see |
86 | | // ThreadPoolBuilder) or to individual tokens. |
87 | | struct ThreadPoolMetrics { |
88 | | // Measures the queue length seen by tasks when they enter the queue. |
89 | | scoped_refptr<Histogram> queue_length_histogram; |
90 | | |
91 | | // Measures the amount of time that tasks spend waiting in a queue. |
92 | | scoped_refptr<Histogram> queue_time_us_histogram; |
93 | | |
94 | | // Measures the amount of time that tasks spend running. |
95 | | scoped_refptr<Histogram> run_time_us_histogram; |
96 | | |
97 | | ~ThreadPoolMetrics(); |
98 | | }; |
99 | | |
100 | | |
101 | | // THREAD_POOL_METRICS_DEFINE / THREAD_POOL_METRICS_INSTANCE are helpers which define the metrics |
102 | | // required for a ThreadPoolMetrics object and instantiate said objects, respectively. Example |
103 | | // usage: |
104 | | // // At the top of the file: |
105 | | // THREAD_POOL_METRICS_DEFINE(server, thread_pool_foo, "Thread pool for Foo jobs.") |
106 | | // ... |
107 | | // // Inline: |
108 | | // ThreadPoolBuilder("foo") |
109 | | // .set_metrics(THREAD_POOL_METRICS_INSTANCE(server_->metric_entity(), thread_pool_foo)) |
110 | | // ... |
111 | | // .Build(...); |
112 | | #define THREAD_POOL_METRICS_DEFINE(entity, name, label) \ |
113 | | METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _queue_length), \ |
114 | | label " Queue Length", yb::MetricUnit::kMicroseconds, \ |
115 | | label " - queue length histogram."); \ |
116 | | METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _queue_time_us), \ |
117 | | label " Queue Time", yb::MetricUnit::kMicroseconds, \ |
118 | | label " - queue time histogram, microseconds."); \ |
119 | | METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _run_time_us), \ |
120 | | label " Run Time", yb::MetricUnit::kMicroseconds, \ |
121 | | label " - run time histogram, microseconds.") |
122 | | |
123 | | #define THREAD_POOL_METRICS_INSTANCE(entity, name) { \ |
124 | | BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _run_time_us)).Instantiate(entity), \ |
125 | | BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _queue_time_us)).Instantiate(entity), \ |
126 | | BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _run_time_us)).Instantiate(entity) \ |
127 | | } |
128 | | |
129 | | // ThreadPool takes a lot of arguments. We provide sane defaults with a builder. |
130 | | // |
131 | | // name: Used for debugging output and default names of the worker threads. |
132 | | // Since thread names are limited to 16 characters on Linux, it's good to |
133 | | // choose a short name here. |
134 | | // Required. |
135 | | // |
136 | | // min_threads: Minimum number of threads we'll have at any time. |
137 | | // Default: 0. |
138 | | // |
139 | | // max_threads: Maximum number of threads we'll have at any time. |
140 | | // Default: Number of CPUs detected on the system. |
141 | | // |
142 | | // max_queue_size: Maximum number of items to enqueue before returning a |
143 | | // Status::ServiceUnavailable message from Submit(). |
144 | | // Default: INT_MAX. |
145 | | // |
146 | | // timeout: How long we'll keep around an idle thread before timing it out. |
147 | | // We always keep at least min_threads. |
148 | | // Default: 500 milliseconds. |
149 | | // |
150 | | // metrics: Histograms, counters, etc. to update on various threadpool events. |
151 | | // Default: not set. |
152 | | // |
153 | | class ThreadPoolBuilder { |
154 | | public: |
155 | | explicit ThreadPoolBuilder(std::string name); |
156 | | |
157 | | // Note: We violate the style guide by returning mutable references here |
158 | | // in order to provide traditional Builder pattern conveniences. |
159 | | ThreadPoolBuilder& set_min_threads(int min_threads); |
160 | | ThreadPoolBuilder& set_max_threads(int max_threads); |
161 | | ThreadPoolBuilder& unlimited_threads(); |
162 | | ThreadPoolBuilder& set_max_queue_size(int max_queue_size); |
163 | | ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); |
164 | | ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics); |
165 | | |
166 | 0 | const std::string& name() const { return name_; } |
167 | 0 | int min_threads() const { return min_threads_; } |
168 | 0 | int max_threads() const { return max_threads_; } |
169 | 0 | int max_queue_size() const { return max_queue_size_; } |
170 | 0 | const MonoDelta& idle_timeout() const { return idle_timeout_; } |
171 | | |
172 | | // Instantiate a new ThreadPool with the existing builder arguments. |
173 | | CHECKED_STATUS Build(std::unique_ptr<ThreadPool>* pool) const; |
174 | | |
175 | | private: |
176 | | friend class ThreadPool; |
177 | | const std::string name_; |
178 | | int min_threads_; |
179 | | int max_threads_; |
180 | | int max_queue_size_; |
181 | | MonoDelta idle_timeout_; |
182 | | ThreadPoolMetrics metrics_; |
183 | | |
184 | | DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder); |
185 | | }; |
186 | | |
187 | | // Thread pool with a variable number of threads. |
188 | | // The pool can execute a class that implements the Runnable interface, or a |
189 | | // std::function, which can be obtained via std::bind(). |
190 | | // Tasks submitted directly to the thread pool enter a FIFO queue and are |
191 | | // dispatched to a worker thread when one becomes free. Tasks may also be |
192 | | // submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions |
193 | | // can then be used to block on logical groups of tasks. |
194 | | // |
195 | | // A token operates in one of two ExecutionModes, determined at token |
196 | | // construction time: |
197 | | // 1. SERIAL: submitted tasks are run one at a time. |
198 | | // 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike |
199 | | // tasks submitted without a token, but the logical grouping that tokens |
200 | | // impart can be useful when a pool is shared by many contexts (e.g. to |
201 | | // safely shut down one context, to derive context-specific metrics, etc.). |
202 | | // |
203 | | // Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are |
204 | | // processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are |
205 | | // processed in a round-robin fashion, one task at a time. This prevents them |
206 | | // from starving one another. However, tokenless (and CONCURRENT token-based) |
207 | | // tasks can starve SERIAL token-based tasks. |
208 | | // |
209 | | // Usage Example: |
210 | | // static void Func(int n) { ... } |
211 | | // class Task : public Runnable { ... } |
212 | | // |
213 | | // std::unique_ptr<ThreadPool> thread_pool; |
214 | | // CHECK_OK( |
215 | | // ThreadPoolBuilder("my_pool") |
216 | | // .set_min_threads(0) |
217 | | // .set_max_threads(5) |
218 | | // .set_max_queue_size(10) |
219 | | // .set_timeout(MonoDelta::FromMilliseconds(2000)) |
220 | | // .Build(&thread_pool)); |
221 | | // thread_pool->Submit(shared_ptr<Runnable>(new Task())); |
222 | | // thread_pool->Submit(std::bind(&Func, 10)); |
223 | | class ThreadPool { |
224 | | public: |
225 | | ~ThreadPool(); |
226 | | |
227 | | // Wait for the running tasks to complete and then shutdown the threads. |
228 | | // All the other pending tasks in the queue will be removed. |
229 | | // NOTE: That the user may implement an external abort logic for the |
230 | | // runnables, that must be called before Shutdown(), if the system |
231 | | // should know about the non-execution of these tasks, or the runnable |
232 | | // require an explicit "abort" notification to exit from the run loop. |
233 | | void Shutdown(); |
234 | | |
235 | | // Submit a function using the yb Closure system. |
236 | | CHECKED_STATUS SubmitClosure(const Closure& task); |
237 | | |
238 | | // Submit a function binded using std::bind(&FuncName, args...) |
239 | | CHECKED_STATUS SubmitFunc(const std::function<void()>& func); |
240 | | CHECKED_STATUS SubmitFunc(std::function<void()>&& func); |
241 | | |
242 | | CHECKED_STATUS SubmitFunc(std::function<void()>& func) { // NOLINT |
243 | | const auto& const_func = func; |
244 | | return SubmitFunc(const_func); |
245 | | } |
246 | | |
247 | | template <class F> |
248 | 1.84M | CHECKED_STATUS SubmitFunc(F&& f) { |
249 | 1.84M | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); |
250 | 1.84M | } Unexecuted instantiation: data-patcher.cc:yb::Status yb::ThreadPool::SubmitFunc<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&) Unexecuted instantiation: data-patcher.cc:yb::Status yb::ThreadPool::SubmitFunc<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&) yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&) Line | Count | Source | 248 | 209 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 209 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 209 | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&) Line | Count | Source | 248 | 140k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 140k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 140k | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&>&&) Line | Count | Source | 248 | 134 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 134 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 134 | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&>&&) Line | Count | Source | 248 | 592k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 592k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 592k | } |
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*> >(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*>&&) Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&> >(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&>&&) Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&> >(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&>&&) yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun> >(std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun>&&) Line | Count | Source | 248 | 18 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 18 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 18 | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> > >(std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> >&&) Line | Count | Source | 248 | 462k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 462k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 462k | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*> >(std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*>&&) Line | Count | Source | 248 | 4.18k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 4.18k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 4.18k | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&> >(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&>&&) Line | Count | Source | 248 | 4.86k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 4.86k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 4.86k | } |
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9>(yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9&&) Line | Count | Source | 248 | 63 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 63 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 63 | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&) Line | Count | Source | 248 | 125 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 125 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 125 | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&) Line | Count | Source | 248 | 2 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 2 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 2 | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&>&&) Line | Count | Source | 248 | 98 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 98 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 98 | } |
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::EnableBgTasks()::$_14>(yb::master::CatalogManager::EnableBgTasks()::$_14&&) Line | Count | Source | 248 | 7.94k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 7.94k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 7.94k | } |
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18>(yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18&&) Line | Count | Source | 248 | 363k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 363k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 363k | } |
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()>(yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()&&) Line | Count | Source | 248 | 237k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 237k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 237k | } |
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&>&&) Line | Count | Source | 248 | 2 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 2 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 2 | } |
sys_catalog.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0>(yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0&&) Line | Count | Source | 248 | 44 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 44 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 44 | } |
ysql_transaction_ddl.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1>(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1&&) Line | Count | Source | 248 | 1 | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 1 | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 1 | } |
Unexecuted instantiation: ysql_transaction_ddl.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2>(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2&&) yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&> >(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&>&&) Line | Count | Source | 248 | 15.6k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 15.6k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 15.6k | } |
ysql_transaction_ddl.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3>(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3&&) Line | Count | Source | 248 | 4.85k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 4.85k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 4.85k | } |
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*> >(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*>&&) Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&> >(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&>&&) Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*> >(std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*>&&) yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&> >(std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&>&&) Line | Count | Source | 248 | 11.0k | CHECKED_STATUS SubmitFunc(F&& f) { | 249 | 11.0k | return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f))); | 250 | 11.0k | } |
|
251 | | |
252 | | template <class F> |
253 | 48.0k | CHECKED_STATUS SubmitFunc(const F& f) { |
254 | 48.0k | return Submit(std::make_shared<RunnableImpl<F>>(f)); |
255 | 48.0k | } |
256 | | |
257 | | template <class F> |
258 | 46.9k | CHECKED_STATUS SubmitFunc(F& f) { // NOLINT |
259 | 46.9k | const auto& const_f = f; |
260 | 46.9k | return SubmitFunc(const_f); |
261 | 46.9k | } |
262 | | |
263 | | // Submit a Runnable class |
264 | | CHECKED_STATUS Submit(const std::shared_ptr<Runnable>& task); |
265 | | |
266 | | // Wait until all the tasks are completed. |
267 | | void Wait(); |
268 | | |
269 | | // Waits for the pool to reach the idle state, or until 'until' time is reached. |
270 | | // Returns true if the pool reached the idle state, false otherwise. |
271 | | bool WaitUntil(const MonoTime& until); |
272 | | |
273 | | // Waits for the pool to reach the idle state, or until 'delta' time elapses. |
274 | | // Returns true if the pool reached the idle state, false otherwise. |
275 | | bool WaitFor(const MonoDelta& delta); |
276 | | |
277 | | // Allocates a new token for use in token-based task submission. All tokens |
278 | | // must be destroyed before their ThreadPool is destroyed. |
279 | | // |
280 | | // There is no limit on the number of tokens that may be allocated. |
281 | | enum class ExecutionMode { |
282 | | // Tasks submitted via this token will be executed serially. |
283 | | SERIAL, |
284 | | |
285 | | // Tasks submitted via this token may be executed concurrently. |
286 | | CONCURRENT, |
287 | | }; |
288 | | std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode); |
289 | | |
290 | | // Like NewToken(), but lets the caller provide metrics for the token. These |
291 | | // metrics are incremented/decremented in addition to the configured |
292 | | // pool-wide metrics (if any). |
293 | | std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode, |
294 | | ThreadPoolMetrics metrics); |
295 | | |
296 | | private: |
297 | | friend class ThreadPoolBuilder; |
298 | | friend class ThreadPoolToken; |
299 | | |
300 | | |
301 | | // Create a new thread pool using a builder. |
302 | | explicit ThreadPool(const ThreadPoolBuilder& builder); |
303 | | |
304 | | // Initialize the thread pool by starting the minimum number of threads. |
305 | | CHECKED_STATUS Init(); |
306 | | |
307 | | // Dispatcher responsible for dequeueing and executing the tasks |
308 | | void DispatchThread(bool permanent); |
309 | | |
310 | | // Create new thread. Required that lock_ is held. |
311 | | CHECKED_STATUS CreateThreadUnlocked(); |
312 | | |
313 | | private: |
314 | | FRIEND_TEST(TestThreadPool, TestThreadPoolWithNoMinimum); |
315 | | FRIEND_TEST(TestThreadPool, TestThreadPoolWithNoMaxThreads); |
316 | | FRIEND_TEST(TestThreadPool, TestVariableSizeThreadPool); |
317 | | // Aborts if the current thread is a member of this thread pool. |
318 | | void CheckNotPoolThreadUnlocked(); |
319 | | |
320 | | struct Task { |
321 | | std::shared_ptr<Runnable> runnable; |
322 | | Trace* trace; |
323 | | |
324 | | // Time at which the entry was submitted to the pool. |
325 | | MonoTime submit_time; |
326 | | }; |
327 | | // Submits a task to be run via token. |
328 | | Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token); |
329 | | |
330 | | // Releases token 't' and invalidates it. |
331 | | void ReleaseToken(ThreadPoolToken* t); |
332 | | |
333 | | const std::string name_; |
334 | | const int min_threads_; |
335 | | const int max_threads_; |
336 | | const int max_queue_size_; |
337 | | const MonoDelta idle_timeout_; |
338 | | |
339 | | Status pool_status_; |
340 | | Mutex lock_; |
341 | | ConditionVariable idle_cond_; |
342 | | ConditionVariable no_threads_cond_; |
343 | | ConditionVariable not_empty_; |
344 | | int num_threads_; |
345 | | int active_threads_; |
346 | | |
347 | | // Total number of client tasks queued, either directly (queue_) or |
348 | | // indirectly (tokens_). |
349 | | // Protected by lock_. |
350 | | int total_queued_tasks_; |
351 | | |
352 | | // All allocated tokens. |
353 | | // Tokens are owned by the clients. |
354 | | // |
355 | | // Protected by lock_. |
356 | | std::unordered_set<ThreadPoolToken*> tokens_; |
357 | | |
358 | | // FIFO of tokens from which tasks should be executed. Does not own the |
359 | | // tokens; they are owned by clients and are removed from the FIFO on shutdown. |
360 | | // |
361 | | // Protected by lock_. |
362 | | std::deque<ThreadPoolToken*> queue_; |
363 | | |
364 | | // Pointers to all running threads. Raw pointers are safe because a Thread |
365 | | // may only go out of scope after being removed from threads_. |
366 | | // |
367 | | // Protected by lock_. |
368 | | std::unordered_set<Thread*> threads_; |
369 | | |
370 | | // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. |
371 | | std::unique_ptr<ThreadPoolToken> tokenless_; |
372 | | |
373 | | // Metrics for the entire thread pool. |
374 | | const ThreadPoolMetrics metrics_; |
375 | | |
376 | | DISALLOW_COPY_AND_ASSIGN(ThreadPool); |
377 | | }; |
378 | | |
379 | | // All possible token states. Legal state transitions: |
380 | | // kIdle -> kRunning: task is submitted via token |
381 | | // kIdle -> kQuiesced: token or pool is shut down |
382 | | // kRunning -> kIdle: worker thread finishes executing a task and |
383 | | // there are no more tasks queued to the token |
384 | | // kRunning -> kQuiescing: token or pool is shut down while worker thread |
385 | | // is executing a task |
386 | | // kRunning -> kQuiesced: token or pool is shut down |
387 | | // kQuiescing -> kQuiesced: worker thread finishes executing a task |
388 | | // belonging to a shut down token or pool |
389 | | YB_DEFINE_ENUM(ThreadPoolTokenState, |
390 | | // Token has no queued tasks. |
391 | | (kIdle) |
392 | | |
393 | | // A worker thread is running one of the token's previously queued tasks. |
394 | | (kRunning) |
395 | | |
396 | | // No new tasks may be submitted to the token. A worker thread is still |
397 | | // running a previously queued task. |
398 | | (kQuiescing) |
399 | | |
400 | | // No new tasks may be submitted to the token. There are no active tasks |
401 | | // either. At this state, the token may only be destroyed. |
402 | | (kQuiesced)); |
403 | | |
404 | | // Entry point for token-based task submission and blocking for a particular |
405 | | // thread pool. Tokens can only be created via ThreadPool::NewToken(). |
406 | | // |
407 | | // All functions are thread-safe. Mutable members are protected via the |
408 | | // ThreadPool's lock. |
409 | | class ThreadPoolToken { |
410 | | public: |
411 | | // Destroys the token. |
412 | | // |
413 | | // May be called on a token with outstanding tasks, as Shutdown() will be |
414 | | // called first to take care of them. |
415 | | ~ThreadPoolToken(); |
416 | | |
417 | | // Submits a function using the yb Closure system. |
418 | | Status SubmitClosure(Closure c) WARN_UNUSED_RESULT; |
419 | | |
420 | | // Submits a function bound using boost::bind(&FuncName, args...). |
421 | | Status SubmitFunc(const std::function<void()> f) WARN_UNUSED_RESULT; |
422 | | |
423 | | // Submits a Runnable class. |
424 | | Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT; |
425 | | |
426 | | // Marks the token as unusable for future submissions. Any queued tasks not |
427 | | // yet running are destroyed. If tasks are in flight, Shutdown() will wait |
428 | | // on their completion before returning. |
429 | | void Shutdown(); |
430 | | |
431 | | // Waits until all the tasks submitted via this token are completed. |
432 | | void Wait(); |
433 | | |
434 | | // Waits for all submissions using this token are complete, or until 'until' |
435 | | // time is reached. |
436 | | // |
437 | | // Returns true if all submissions are complete, false otherwise. |
438 | | bool WaitUntil(const MonoTime& until); |
439 | | |
440 | | // Waits for all submissions using this token are complete, or until 'delta' |
441 | | // time elapses. |
442 | | // |
443 | | // Returns true if all submissions are complete, false otherwise. |
444 | | bool WaitFor(const MonoDelta& delta); |
445 | | |
446 | | private: |
447 | | friend class ThreadPool; |
448 | | |
449 | | // Returns a textual representation of 's' suitable for debugging. |
450 | | static const char* StateToString(ThreadPoolTokenState s); |
451 | | |
452 | | // Constructs a new token. |
453 | | // |
454 | | // The token may not outlive its thread pool ('pool'). |
455 | | ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, ThreadPoolMetrics metrics); |
456 | | |
457 | | // Changes this token's state to 'new_state' taking actions as needed. |
458 | | void Transition(ThreadPoolTokenState new_state); |
459 | | |
460 | | // Returns true if this token has a task queued and ready to run, or if a |
461 | | // task belonging to this token is already running. |
462 | 95.3M | bool IsActive() const { |
463 | 95.3M | return state_ == ThreadPoolTokenState::kRunning || |
464 | 95.3M | state_ == ThreadPoolTokenState::kQuiescing81.2M ; |
465 | 95.3M | } |
466 | | |
467 | | // Returns true if new tasks may be submitted to this token. |
468 | 94.9M | bool MaySubmitNewTasks() const { |
469 | 94.9M | return state_ != ThreadPoolTokenState::kQuiescing && |
470 | 94.9M | state_ != ThreadPoolTokenState::kQuiesced94.9M ; |
471 | 94.9M | } |
472 | | |
473 | 285M | ThreadPoolTokenState state() const { return state_; } |
474 | 36.7M | ThreadPool::ExecutionMode mode() const { return mode_; } |
475 | | |
476 | | // Token's configured execution mode. |
477 | | const ThreadPool::ExecutionMode mode_; |
478 | | |
479 | | // Pointer to the token's thread pool. |
480 | | ThreadPool* pool_; |
481 | | |
482 | | // Metrics for just this token. |
483 | | const ThreadPoolMetrics metrics_; |
484 | | |
485 | | // Token state machine. |
486 | | ThreadPoolTokenState state_; |
487 | | |
488 | | // Queued client tasks. |
489 | | std::deque<ThreadPool::Task> entries_; |
490 | | |
491 | | // Condition variable for "token is idle". Waiters wake up when the token |
492 | | // transitions to kIdle or kQuiesced. |
493 | | ConditionVariable not_running_cond_; |
494 | | |
495 | | // Number of worker threads currently executing tasks belonging to this |
496 | | // token. |
497 | | int active_threads_; |
498 | | |
499 | | DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken); |
500 | | }; |
501 | | |
502 | | //////////////////////////////////////////////////////// |
503 | | // FunctionRunnable |
504 | | //////////////////////////////////////////////////////// |
505 | | |
506 | | class FunctionRunnable : public Runnable { |
507 | | public: |
508 | 92.9M | explicit FunctionRunnable(std::function<void()> func) : func_(std::move(func)) {} |
509 | | |
510 | 93.0M | void Run() override { |
511 | 93.0M | func_(); |
512 | 93.0M | } |
513 | | |
514 | | private: |
515 | | std::function<void()> func_; |
516 | | }; |
517 | | |
518 | | // Runs submitted tasks in created thread pool with specified concurrency. |
519 | | class TaskRunner { |
520 | | public: |
521 | 0 | TaskRunner() = default; |
522 | | |
523 | | CHECKED_STATUS Init(int concurrency); |
524 | | |
525 | | template <class F> |
526 | 0 | void Submit(F&& f) { |
527 | 0 | ++running_tasks_; |
528 | 0 | auto status = thread_pool_->SubmitFunc([this, f = std::forward<F>(f)]() { |
529 | 0 | auto status = f(); |
530 | 0 | CompleteTask(status); |
531 | 0 | }); Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()::operator()() const Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()::operator()() const |
532 | 0 | if (!status.ok()) { |
533 | 0 | CompleteTask(status); |
534 | 0 | } |
535 | 0 | } Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&) Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&) |
536 | | |
537 | | CHECKED_STATUS Wait(); |
538 | | |
539 | | private: |
540 | | void CompleteTask(const Status& status); |
541 | | |
542 | | std::unique_ptr<ThreadPool> thread_pool_; |
543 | | std::atomic<size_t> running_tasks_{0}; |
544 | | std::atomic<bool> failed_{false}; |
545 | | Status first_failure_; |
546 | | std::mutex mutex_; |
547 | | std::condition_variable cond_; |
548 | | }; |
549 | | |
550 | | } // namespace yb |
551 | | #endif // YB_UTIL_THREADPOOL_H |