YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/threadpool.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_UTIL_THREADPOOL_H
33
#define YB_UTIL_THREADPOOL_H
34
35
#include <condition_variable>
36
#include <deque>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <string>
41
#include <unordered_set>
42
43
#include <gtest/gtest_prod.h>
44
45
#include "yb/gutil/callback_forward.h"
46
#include "yb/gutil/macros.h"
47
#include "yb/gutil/port.h"
48
#include "yb/gutil/ref_counted.h"
49
50
#include "yb/util/metrics_fwd.h"
51
#include "yb/util/condition_variable.h"
52
#include "yb/util/enums.h"
53
#include "yb/util/math_util.h"
54
#include "yb/util/monotime.h"
55
#include "yb/util/mutex.h"
56
#include "yb/util/status.h"
57
58
namespace yb {
59
60
class Thread;
61
class ThreadPool;
62
class ThreadPoolToken;
63
class Trace;
64
65
class Runnable {
66
 public:
67
  virtual void Run() = 0;
68
106M
  virtual ~Runnable() = default;
69
};
70
71
template <class F>
72
class RunnableImpl : public Runnable {
73
 public:
74
45.7k
  explicit RunnableImpl(const F& f) : f_(f) {}
75
1.84M
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()>::RunnableImpl(void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()&&)
Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()>::RunnableImpl(void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()&&)
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&)
Line
Count
Source
75
209
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&)
Line
Count
Source
75
140k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&>&&)
Line
Count
Source
75
134
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&>&&)
Line
Count
Source
75
592k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*> >::RunnableImpl(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*>&&)
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&>&&)
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&> >::RunnableImpl(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&>&&)
yb::RunnableImpl<std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun> >::RunnableImpl(std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun>&&)
Line
Count
Source
75
18
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> > >::RunnableImpl(std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> >&&)
Line
Count
Source
75
462k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*>&&)
Line
Count
Source
75
4.18k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&> >::RunnableImpl(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&>&&)
Line
Count
Source
75
4.86k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9>::RunnableImpl(yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9&&)
Line
Count
Source
75
63
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&)
Line
Count
Source
75
125
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&)
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&>&&)
Line
Count
Source
75
98
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::EnableBgTasks()::$_14>::RunnableImpl(yb::master::CatalogManager::EnableBgTasks()::$_14&&)
Line
Count
Source
75
7.94k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18>::RunnableImpl(yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18&&)
Line
Count
Source
75
363k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()>::RunnableImpl(yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()&&)
Line
Count
Source
75
237k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&> >::RunnableImpl(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&>&&)
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
sys_catalog.cc:yb::RunnableImpl<yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0>::RunnableImpl(yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0&&)
Line
Count
Source
75
44
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1>::RunnableImpl(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1&&)
Line
Count
Source
75
1
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2>::RunnableImpl(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2&&)
yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&> >::RunnableImpl(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&>&&)
Line
Count
Source
75
15.6k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3>::RunnableImpl(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3&&)
Line
Count
Source
75
4.85k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*> >::RunnableImpl(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*>&&)
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&> >::RunnableImpl(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&>&&)
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*> >::RunnableImpl(std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*>&&)
yb::RunnableImpl<std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&> >::RunnableImpl(std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&>&&)
Line
Count
Source
75
11.0k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
76
77
 private:
78
1.89M
  void Run() override {
79
1.89M
    f_();
80
1.89M
  }
Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()>::Run()
Unexecuted instantiation: data-patcher.cc:yb::RunnableImpl<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()>::Run()
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::Run()
Line
Count
Source
78
209
  void Run() override {
79
209
    f_();
80
209
  }
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >::Run()
Line
Count
Source
78
140k
  void Run() override {
79
140k
    f_();
80
140k
  }
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&> >::Run()
Line
Count
Source
78
134
  void Run() override {
79
134
    f_();
80
134
  }
yb::RunnableImpl<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&> >::Run()
Line
Count
Source
78
592k
  void Run() override {
79
592k
    f_();
80
592k
  }
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*> >::Run()
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&> >::Run()
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&> >::Run()
yb::RunnableImpl<std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun> >::Run()
Line
Count
Source
78
18
  void Run() override {
79
18
    f_();
80
18
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> > >::Run()
Line
Count
Source
78
463k
  void Run() override {
79
463k
    f_();
80
463k
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*> >::Run()
Line
Count
Source
78
4.18k
  void Run() override {
79
4.18k
    f_();
80
4.18k
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&> >::Run()
Line
Count
Source
78
4.86k
  void Run() override {
79
4.86k
    f_();
80
4.86k
  }
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9>::Run()
Line
Count
Source
78
63
  void Run() override {
79
63
    f_();
80
63
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::Run()
Line
Count
Source
78
125
  void Run() override {
79
125
    f_();
80
125
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >::Run()
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&> >::Run()
Line
Count
Source
78
98
  void Run() override {
79
98
    f_();
80
98
  }
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::EnableBgTasks()::$_14>::Run()
Line
Count
Source
78
7.94k
  void Run() override {
79
7.94k
    f_();
80
7.94k
  }
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18>::Run()
Line
Count
Source
78
363k
  void Run() override {
79
363k
    f_();
80
363k
  }
catalog_manager.cc:yb::RunnableImpl<yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()>::Run()
Line
Count
Source
78
237k
  void Run() override {
79
237k
    f_();
80
237k
  }
yb::RunnableImpl<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&> >::Run()
Line
Count
Source
78
2
  void Run() override {
79
2
    f_();
80
2
  }
sys_catalog.cc:yb::RunnableImpl<yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0>::Run()
Line
Count
Source
78
44
  void Run() override {
79
44
    f_();
80
44
  }
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1>::Run()
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
Unexecuted instantiation: ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2>::Run()
yb::RunnableImpl<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&> >::Run()
Line
Count
Source
78
15.6k
  void Run() override {
79
15.6k
    f_();
80
15.6k
  }
ysql_transaction_ddl.cc:yb::RunnableImpl<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3>::Run()
Line
Count
Source
78
4.85k
  void Run() override {
79
4.85k
    f_();
80
4.85k
  }
yb::RunnableImpl<std::__1::__bind<void (yb::client::YBClient::Data::*)(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, bool, bool), yb::client::YBClient::Data*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >&, bool&, bool&> >::Run()
Line
Count
Source
78
48.0k
  void Run() override {
79
48.0k
    f_();
80
48.0k
  }
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*> >::Run()
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&> >::Run()
Unexecuted instantiation: yb::RunnableImpl<std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*> >::Run()
yb::RunnableImpl<std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&> >::Run()
Line
Count
Source
78
11.0k
  void Run() override {
79
11.0k
    f_();
80
11.0k
  }
81
82
  F f_;
83
};
84
85
// Interesting thread pool metrics. Can be applied to the entire pool (see
86
// ThreadPoolBuilder) or to individual tokens.
87
struct ThreadPoolMetrics {
88
  // Measures the queue length seen by tasks when they enter the queue.
89
  scoped_refptr<Histogram> queue_length_histogram;
90
91
  // Measures the amount of time that tasks spend waiting in a queue.
92
  scoped_refptr<Histogram> queue_time_us_histogram;
93
94
  // Measures the amount of time that tasks spend running.
95
  scoped_refptr<Histogram> run_time_us_histogram;
96
97
  ~ThreadPoolMetrics();
98
};
99
100
101
// THREAD_POOL_METRICS_DEFINE / THREAD_POOL_METRICS_INSTANCE are helpers which define the metrics
102
// required for a ThreadPoolMetrics object and instantiate said objects, respectively. Example
103
// usage:
104
// // At the top of the file:
105
// THREAD_POOL_METRICS_DEFINE(server, thread_pool_foo, "Thread pool for Foo jobs.")
106
// ...
107
// // Inline:
108
// ThreadPoolBuilder("foo")
109
//   .set_metrics(THREAD_POOL_METRICS_INSTANCE(server_->metric_entity(), thread_pool_foo))
110
//   ...
111
//   .Build(...);
112
#define THREAD_POOL_METRICS_DEFINE(entity, name, label) \
113
    METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _queue_length), \
114
        label " Queue Length", yb::MetricUnit::kMicroseconds, \
115
        label " - queue length histogram."); \
116
    METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _queue_time_us), \
117
        label " Queue Time", yb::MetricUnit::kMicroseconds, \
118
        label " - queue time histogram, microseconds."); \
119
    METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _run_time_us), \
120
        label " Run Time", yb::MetricUnit::kMicroseconds, \
121
        label " - run time histogram, microseconds.")
122
123
#define THREAD_POOL_METRICS_INSTANCE(entity, name) { \
124
      BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _run_time_us)).Instantiate(entity), \
125
      BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _queue_time_us)).Instantiate(entity), \
126
      BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _run_time_us)).Instantiate(entity) \
127
    }
128
129
// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
130
//
131
// name: Used for debugging output and default names of the worker threads.
132
//    Since thread names are limited to 16 characters on Linux, it's good to
133
//    choose a short name here.
134
//    Required.
135
//
136
// min_threads: Minimum number of threads we'll have at any time.
137
//    Default: 0.
138
//
139
// max_threads: Maximum number of threads we'll have at any time.
140
//    Default: Number of CPUs detected on the system.
141
//
142
// max_queue_size: Maximum number of items to enqueue before returning a
143
//    Status::ServiceUnavailable message from Submit().
144
//    Default: INT_MAX.
145
//
146
// timeout: How long we'll keep around an idle thread before timing it out.
147
//    We always keep at least min_threads.
148
//    Default: 500 milliseconds.
149
//
150
// metrics: Histograms, counters, etc. to update on various threadpool events.
151
//    Default: not set.
152
//
153
class ThreadPoolBuilder {
154
 public:
155
  explicit ThreadPoolBuilder(std::string name);
156
157
  // Note: We violate the style guide by returning mutable references here
158
  // in order to provide traditional Builder pattern conveniences.
159
  ThreadPoolBuilder& set_min_threads(int min_threads);
160
  ThreadPoolBuilder& set_max_threads(int max_threads);
161
  ThreadPoolBuilder& unlimited_threads();
162
  ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
163
  ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
164
  ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
165
166
0
  const std::string& name() const { return name_; }
167
0
  int min_threads() const { return min_threads_; }
168
0
  int max_threads() const { return max_threads_; }
169
0
  int max_queue_size() const { return max_queue_size_; }
170
0
  const MonoDelta& idle_timeout() const { return idle_timeout_; }
171
172
  // Instantiate a new ThreadPool with the existing builder arguments.
173
  CHECKED_STATUS Build(std::unique_ptr<ThreadPool>* pool) const;
174
175
 private:
176
  friend class ThreadPool;
177
  const std::string name_;
178
  int min_threads_;
179
  int max_threads_;
180
  int max_queue_size_;
181
  MonoDelta idle_timeout_;
182
  ThreadPoolMetrics metrics_;
183
184
  DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
185
};
186
187
// Thread pool with a variable number of threads.
188
// The pool can execute a class that implements the Runnable interface, or a
189
// std::function, which can be obtained via std::bind().
190
// Tasks submitted directly to the thread pool enter a FIFO queue and are
191
// dispatched to a worker thread when one becomes free. Tasks may also be
192
// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
193
// can then be used to block on logical groups of tasks.
194
//
195
// A token operates in one of two ExecutionModes, determined at token
196
// construction time:
197
// 1. SERIAL: submitted tasks are run one at a time.
198
// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
199
//    tasks submitted without a token, but the logical grouping that tokens
200
//    impart can be useful when a pool is shared by many contexts (e.g. to
201
//    safely shut down one context, to derive context-specific metrics, etc.).
202
//
203
// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
204
// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
205
// processed in a round-robin fashion, one task at a time. This prevents them
206
// from starving one another. However, tokenless (and CONCURRENT token-based)
207
// tasks can starve SERIAL token-based tasks.
208
//
209
// Usage Example:
210
//    static void Func(int n) { ... }
211
//    class Task : public Runnable { ... }
212
//
213
//    std::unique_ptr<ThreadPool> thread_pool;
214
//    CHECK_OK(
215
//        ThreadPoolBuilder("my_pool")
216
//            .set_min_threads(0)
217
//            .set_max_threads(5)
218
//            .set_max_queue_size(10)
219
//            .set_timeout(MonoDelta::FromMilliseconds(2000))
220
//            .Build(&thread_pool));
221
//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
222
//    thread_pool->Submit(std::bind(&Func, 10));
223
class ThreadPool {
224
 public:
225
  ~ThreadPool();
226
227
  // Wait for the running tasks to complete and then shutdown the threads.
228
  // All the other pending tasks in the queue will be removed.
229
  // NOTE: That the user may implement an external abort logic for the
230
  //       runnables, that must be called before Shutdown(), if the system
231
  //       should know about the non-execution of these tasks, or the runnable
232
  //       require an explicit "abort" notification to exit from the run loop.
233
  void Shutdown();
234
235
  // Submit a function using the yb Closure system.
236
  CHECKED_STATUS SubmitClosure(const Closure& task);
237
238
  // Submit a function binded using std::bind(&FuncName, args...)
239
  CHECKED_STATUS SubmitFunc(const std::function<void()>& func);
240
  CHECKED_STATUS SubmitFunc(std::function<void()>&& func);
241
242
  CHECKED_STATUS SubmitFunc(std::function<void()>& func) { // NOLINT
243
    const auto& const_func = func;
244
    return SubmitFunc(const_func);
245
  }
246
247
  template <class F>
248
1.84M
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1.84M
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1.84M
  }
Unexecuted instantiation: data-patcher.cc:yb::Status yb::ThreadPool::SubmitFunc<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)
Unexecuted instantiation: data-patcher.cc:yb::Status yb::ThreadPool::SubmitFunc<void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&)
Line
Count
Source
248
209
  CHECKED_STATUS SubmitFunc(F&& f) {
249
209
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
209
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata>&, scoped_refptr<yb::tserver::TransitionInProgressDeleter>&>&&)
Line
Count
Source
248
140k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
140k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
140k
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&), yb::tserver::TSTabletManager*, scoped_refptr<yb::tablet::RaftGroupMetadata> const&, scoped_refptr<yb::tserver::TransitionInProgressDeleter> const&>&&)
Line
Count
Source
248
134
  CHECKED_STATUS SubmitFunc(F&& f) {
249
134
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
134
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&> >(std::__1::__bind<void (yb::tserver::TSTabletManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>), yb::tserver::TSTabletManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>&>&&)
Line
Count
Source
248
592k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
592k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
592k
  }
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*> >(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(), yb::tserver::enterprise::CDCPoller*>&&)
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&> >(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::Status, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>), yb::tserver::enterprise::CDCPoller*, yb::Status const&, std::__1::shared_ptr<yb::cdc::GetChangesResponsePB>&>&&)
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&> >(std::__1::__bind<void (yb::tserver::enterprise::CDCPoller::*)(yb::cdc::OutputClientResponse), yb::tserver::enterprise::CDCPoller*, yb::cdc::OutputClientResponse&>&&)
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun> >(std::__1::__bind<void (yb::MaintenanceManager::*)(yb::ScopedMaintenanceOpRun const&), yb::MaintenanceManager*, yb::ScopedMaintenanceOpRun>&&)
Line
Count
Source
248
18
  CHECKED_STATUS SubmitFunc(F&& f) {
249
18
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
18
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> > >(std::__1::__bind<void (yb::master::RetryingTSRpcTask::*)(), std::__1::shared_ptr<yb::master::RetryingTSRpcTask> >&&)
Line
Count
Source
248
462k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
462k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
462k
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*> >(std::__1::__bind<void (yb::master::CatalogManager::*)(), yb::master::CatalogManager*>&&)
Line
Count
Source
248
4.18k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
4.18k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
4.18k
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&> >(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata&, std::__1::function<yb::Status (bool)>&>&&)
Line
Count
Source
248
4.86k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
4.86k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
4.86k
  }
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9>(yb::master::CatalogManager::VerifyTablePgLayer(scoped_refptr<yb::master::TableInfo>, bool)::$_9&&)
Line
Count
Source
248
63
  CHECKED_STATUS SubmitFunc(F&& f) {
249
63
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
63
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&)
Line
Count
Source
248
125
  CHECKED_STATUS SubmitFunc(F&& f) {
249
125
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
125
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >, yb::TransactionMetadata), yb::master::CatalogManager*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, std::__1::vector<scoped_refptr<yb::master::TableInfo>, std::__1::allocator<scoped_refptr<yb::master::TableInfo> > >&, yb::TransactionMetadata&>&&)
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*, scoped_refptr<yb::master::NamespaceInfo>&>&&)
Line
Count
Source
248
98
  CHECKED_STATUS SubmitFunc(F&& f) {
249
98
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
98
  }
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::EnableBgTasks()::$_14>(yb::master::CatalogManager::EnableBgTasks()::$_14&&)
Line
Count
Source
248
7.94k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
7.94k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
7.94k
  }
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18>(yb::master::CatalogManager::ScheduleTask(std::__1::shared_ptr<yb::master::RetryingTSRpcTask>)::$_18&&)
Line
Count
Source
248
363k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
363k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
363k
  }
catalog_manager.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()>(yb::master::CatalogManager::RebuildYQLSystemPartitions()::$_20::operator()(yb::Status const&) const::'lambda'()&&)
Line
Count
Source
248
237k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
237k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
237k
  }
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&> >(std::__1::__bind<void (yb::master::CatalogManager::*)(scoped_refptr<yb::master::NamespaceInfo>), yb::master::CatalogManager*&, scoped_refptr<yb::master::NamespaceInfo>&>&&)
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
sys_catalog.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0>(yb::master::SysCatalogTable::SysCatalogStateChanged(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::consensus::StateChangeContext>)::$_0&&)
Line
Count
Source
248
44
  CHECKED_STATUS SubmitFunc(F&& f) {
249
44
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
44
  }
ysql_transaction_ddl.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1>(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_1&&)
Line
Count
Source
248
1
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1
  }
Unexecuted instantiation: ysql_transaction_ddl.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2>(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_2&&)
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&> >(std::__1::__bind<void (yb::master::YsqlTransactionDdl::*)(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>), yb::master::YsqlTransactionDdl*, yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>&>&&)
Line
Count
Source
248
15.6k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
15.6k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
15.6k
  }
ysql_transaction_ddl.cc:yb::Status yb::ThreadPool::SubmitFunc<yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3>(yb::master::YsqlTransactionDdl::TransactionReceived(yb::TransactionMetadata const&, std::__1::function<yb::Status (bool)>, yb::Status, yb::tserver::GetTransactionStatusResponsePB const&)::$_3&&)
Line
Count
Source
248
4.85k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
4.85k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
4.85k
  }
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*> >(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(), yb::load_generator::MultiThreadedAction*>&&)
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&> >(std::__1::__bind<void (yb::load_generator::MultiThreadedAction::*)(int), yb::load_generator::MultiThreadedAction*, int&>&&)
Unexecuted instantiation: yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*> >(std::__1::__bind<void (yb::load_generator::MultiThreadedWriter::*)(), yb::load_generator::MultiThreadedWriter*>&&)
yb::Status yb::ThreadPool::SubmitFunc<std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&> >(std::__1::__bind<void (yb::Callback<void ()>::*)() const, yb::Callback<void ()> const&>&&)
Line
Count
Source
248
11.0k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
11.0k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
11.0k
  }
251
252
  template <class F>
253
48.0k
  CHECKED_STATUS SubmitFunc(const F& f) {
254
48.0k
    return Submit(std::make_shared<RunnableImpl<F>>(f));
255
48.0k
  }
256
257
  template <class F>
258
46.9k
  CHECKED_STATUS SubmitFunc(F& f) { // NOLINT
259
46.9k
    const auto& const_f = f;
260
46.9k
    return SubmitFunc(const_f);
261
46.9k
  }
262
263
  // Submit a Runnable class
264
  CHECKED_STATUS Submit(const std::shared_ptr<Runnable>& task);
265
266
  // Wait until all the tasks are completed.
267
  void Wait();
268
269
  // Waits for the pool to reach the idle state, or until 'until' time is reached.
270
  // Returns true if the pool reached the idle state, false otherwise.
271
  bool WaitUntil(const MonoTime& until);
272
273
  // Waits for the pool to reach the idle state, or until 'delta' time elapses.
274
  // Returns true if the pool reached the idle state, false otherwise.
275
  bool WaitFor(const MonoDelta& delta);
276
277
  // Allocates a new token for use in token-based task submission. All tokens
278
  // must be destroyed before their ThreadPool is destroyed.
279
  //
280
  // There is no limit on the number of tokens that may be allocated.
281
  enum class ExecutionMode {
282
    // Tasks submitted via this token will be executed serially.
283
    SERIAL,
284
285
    // Tasks submitted via this token may be executed concurrently.
286
    CONCURRENT,
287
  };
288
  std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
289
290
  // Like NewToken(), but lets the caller provide metrics for the token. These
291
  // metrics are incremented/decremented in addition to the configured
292
  // pool-wide metrics (if any).
293
  std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode,
294
                                                       ThreadPoolMetrics metrics);
295
296
 private:
297
  friend class ThreadPoolBuilder;
298
  friend class ThreadPoolToken;
299
300
301
  // Create a new thread pool using a builder.
302
  explicit ThreadPool(const ThreadPoolBuilder& builder);
303
304
  // Initialize the thread pool by starting the minimum number of threads.
305
  CHECKED_STATUS Init();
306
307
  // Dispatcher responsible for dequeueing and executing the tasks
308
  void DispatchThread(bool permanent);
309
310
  // Create new thread. Required that lock_ is held.
311
  CHECKED_STATUS CreateThreadUnlocked();
312
313
 private:
314
  FRIEND_TEST(TestThreadPool, TestThreadPoolWithNoMinimum);
315
  FRIEND_TEST(TestThreadPool, TestThreadPoolWithNoMaxThreads);
316
  FRIEND_TEST(TestThreadPool, TestVariableSizeThreadPool);
317
  // Aborts if the current thread is a member of this thread pool.
318
  void CheckNotPoolThreadUnlocked();
319
320
  struct Task {
321
    std::shared_ptr<Runnable> runnable;
322
    Trace* trace;
323
324
    // Time at which the entry was submitted to the pool.
325
    MonoTime submit_time;
326
  };
327
  // Submits a task to be run via token.
328
  Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
329
330
  // Releases token 't' and invalidates it.
331
  void ReleaseToken(ThreadPoolToken* t);
332
333
  const std::string name_;
334
  const int min_threads_;
335
  const int max_threads_;
336
  const int max_queue_size_;
337
  const MonoDelta idle_timeout_;
338
339
  Status pool_status_;
340
  Mutex lock_;
341
  ConditionVariable idle_cond_;
342
  ConditionVariable no_threads_cond_;
343
  ConditionVariable not_empty_;
344
  int num_threads_;
345
  int active_threads_;
346
347
  // Total number of client tasks queued, either directly (queue_) or
348
  // indirectly (tokens_).
349
  // Protected by lock_.
350
  int total_queued_tasks_;
351
352
  // All allocated tokens.
353
  // Tokens are owned by the clients.
354
  //
355
  // Protected by lock_.
356
  std::unordered_set<ThreadPoolToken*> tokens_;
357
358
  // FIFO of tokens from which tasks should be executed. Does not own the
359
  // tokens; they are owned by clients and are removed from the FIFO on shutdown.
360
  //
361
  // Protected by lock_.
362
  std::deque<ThreadPoolToken*> queue_;
363
364
  // Pointers to all running threads. Raw pointers are safe because a Thread
365
  // may only go out of scope after being removed from threads_.
366
  //
367
  // Protected by lock_.
368
  std::unordered_set<Thread*> threads_;
369
370
  // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
371
  std::unique_ptr<ThreadPoolToken> tokenless_;
372
373
  // Metrics for the entire thread pool.
374
  const ThreadPoolMetrics metrics_;
375
376
  DISALLOW_COPY_AND_ASSIGN(ThreadPool);
377
};
378
379
// All possible token states. Legal state transitions:
380
//   kIdle      -> kRunning: task is submitted via token
381
//   kIdle      -> kQuiesced: token or pool is shut down
382
//   kRunning   -> kIdle: worker thread finishes executing a task and
383
//                      there are no more tasks queued to the token
384
//   kRunning   -> kQuiescing: token or pool is shut down while worker thread
385
//                           is executing a task
386
//   kRunning   -> kQuiesced: token or pool is shut down
387
//   kQuiescing -> kQuiesced:  worker thread finishes executing a task
388
//                           belonging to a shut down token or pool
389
YB_DEFINE_ENUM(ThreadPoolTokenState,
390
                   // Token has no queued tasks.
391
                   (kIdle)
392
393
                   // A worker thread is running one of the token's previously queued tasks.
394
                   (kRunning)
395
396
                   // No new tasks may be submitted to the token. A worker thread is still
397
                   // running a previously queued task.
398
                   (kQuiescing)
399
400
                   // No new tasks may be submitted to the token. There are no active tasks
401
                   // either. At this state, the token may only be destroyed.
402
                   (kQuiesced));
403
404
// Entry point for token-based task submission and blocking for a particular
405
// thread pool. Tokens can only be created via ThreadPool::NewToken().
406
//
407
// All functions are thread-safe. Mutable members are protected via the
408
// ThreadPool's lock.
409
class ThreadPoolToken {
410
 public:
411
  // Destroys the token.
412
  //
413
  // May be called on a token with outstanding tasks, as Shutdown() will be
414
  // called first to take care of them.
415
  ~ThreadPoolToken();
416
417
  // Submits a function using the yb Closure system.
418
  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
419
420
  // Submits a function bound using boost::bind(&FuncName, args...).
421
  Status SubmitFunc(const std::function<void()> f) WARN_UNUSED_RESULT;
422
423
  // Submits a Runnable class.
424
  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
425
426
  // Marks the token as unusable for future submissions. Any queued tasks not
427
  // yet running are destroyed. If tasks are in flight, Shutdown() will wait
428
  // on their completion before returning.
429
  void Shutdown();
430
431
  // Waits until all the tasks submitted via this token are completed.
432
  void Wait();
433
434
  // Waits for all submissions using this token are complete, or until 'until'
435
  // time is reached.
436
  //
437
  // Returns true if all submissions are complete, false otherwise.
438
  bool WaitUntil(const MonoTime& until);
439
440
  // Waits for all submissions using this token are complete, or until 'delta'
441
  // time elapses.
442
  //
443
  // Returns true if all submissions are complete, false otherwise.
444
  bool WaitFor(const MonoDelta& delta);
445
446
 private:
447
  friend class ThreadPool;
448
449
  // Returns a textual representation of 's' suitable for debugging.
450
  static const char* StateToString(ThreadPoolTokenState s);
451
452
  // Constructs a new token.
453
  //
454
  // The token may not outlive its thread pool ('pool').
455
  ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, ThreadPoolMetrics metrics);
456
457
  // Changes this token's state to 'new_state' taking actions as needed.
458
  void Transition(ThreadPoolTokenState new_state);
459
460
  // Returns true if this token has a task queued and ready to run, or if a
461
  // task belonging to this token is already running.
462
95.3M
  bool IsActive() const {
463
95.3M
    return state_ == ThreadPoolTokenState::kRunning ||
464
95.3M
           
state_ == ThreadPoolTokenState::kQuiescing81.2M
;
465
95.3M
  }
466
467
  // Returns true if new tasks may be submitted to this token.
468
94.9M
  bool MaySubmitNewTasks() const {
469
94.9M
    return state_ != ThreadPoolTokenState::kQuiescing &&
470
94.9M
           
state_ != ThreadPoolTokenState::kQuiesced94.9M
;
471
94.9M
  }
472
473
285M
  ThreadPoolTokenState state() const { return state_; }
474
36.7M
  ThreadPool::ExecutionMode mode() const { return mode_; }
475
476
  // Token's configured execution mode.
477
  const ThreadPool::ExecutionMode mode_;
478
479
  // Pointer to the token's thread pool.
480
  ThreadPool* pool_;
481
482
  // Metrics for just this token.
483
  const ThreadPoolMetrics metrics_;
484
485
  // Token state machine.
486
  ThreadPoolTokenState state_;
487
488
  // Queued client tasks.
489
  std::deque<ThreadPool::Task> entries_;
490
491
  // Condition variable for "token is idle". Waiters wake up when the token
492
  // transitions to kIdle or kQuiesced.
493
  ConditionVariable not_running_cond_;
494
495
  // Number of worker threads currently executing tasks belonging to this
496
  // token.
497
  int active_threads_;
498
499
  DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
500
};
501
502
////////////////////////////////////////////////////////
503
// FunctionRunnable
504
////////////////////////////////////////////////////////
505
506
class FunctionRunnable : public Runnable {
507
 public:
508
92.9M
  explicit FunctionRunnable(std::function<void()> func) : func_(std::move(func)) {}
509
510
93.0M
  void Run() override {
511
93.0M
    func_();
512
93.0M
  }
513
514
 private:
515
  std::function<void()> func_;
516
};
517
518
// Runs submitted tasks in created thread pool with specified concurrency.
519
class TaskRunner {
520
 public:
521
0
  TaskRunner() = default;
522
523
  CHECKED_STATUS Init(int concurrency);
524
525
  template <class F>
526
0
  void Submit(F&& f) {
527
0
    ++running_tasks_;
528
0
    auto status = thread_pool_->SubmitFunc([this, f = std::forward<F>(f)]() {
529
0
      auto status = f();
530
0
      CompleteTask(status);
531
0
    });
Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)::'lambda'()::operator()() const
Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)::'lambda'()::operator()() const
532
0
    if (!status.ok()) {
533
0
      CompleteTask(status);
534
0
    }
535
0
  }
Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3>(yb::tools::ChangeTimeInDataFiles(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, bool, yb::TaskRunner*)::$_3&&)
Unexecuted instantiation: data-patcher.cc:void yb::TaskRunner::Submit<yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8>(yb::tools::ChangeTimeInWalDirs(yb::MonoDelta, yb::HybridTime, unsigned long, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, yb::TaskRunner*)::$_8&&)
536
537
  CHECKED_STATUS Wait();
538
539
 private:
540
  void CompleteTask(const Status& status);
541
542
  std::unique_ptr<ThreadPool> thread_pool_;
543
  std::atomic<size_t> running_tasks_{0};
544
  std::atomic<bool> failed_{false};
545
  Status first_failure_;
546
  std::mutex mutex_;
547
  std::condition_variable cond_;
548
};
549
550
} // namespace yb
551
#endif // YB_UTIL_THREADPOOL_H