YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/threadpool.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_UTIL_THREADPOOL_H
33
#define YB_UTIL_THREADPOOL_H
34
35
#include <condition_variable>
36
#include <deque>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <string>
41
#include <unordered_set>
42
43
#include <gtest/gtest_prod.h>
44
45
#include "yb/gutil/callback_forward.h"
46
#include "yb/gutil/macros.h"
47
#include "yb/gutil/port.h"
48
#include "yb/gutil/ref_counted.h"
49
50
#include "yb/util/metrics_fwd.h"
51
#include "yb/util/condition_variable.h"
52
#include "yb/util/enums.h"
53
#include "yb/util/math_util.h"
54
#include "yb/util/monotime.h"
55
#include "yb/util/mutex.h"
56
#include "yb/util/status.h"
57
58
namespace yb {
59
60
class Thread;
61
class ThreadPool;
62
class ThreadPoolToken;
63
class Trace;
64
65
class Runnable {
66
 public:
67
  virtual void Run() = 0;
68
46.9M
  virtual ~Runnable() = default;
69
};
70
71
template <class F>
72
class RunnableImpl : public Runnable {
73
 public:
74
30.9k
  explicit RunnableImpl(const F& f) : f_(f) {}
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_49TestThreadPoolTokenTypes_TestTokenWaitForAll_Test8TestBodyEvE4$_20EC2ERKS2_
Line
Count
Source
74
20
  explicit RunnableImpl(const F& f) : f_(f) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6client8YBClient4DataEFvNS1_6chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS1_5ratioILl1ELl1000000000EEEEEEEbbEJPS5_RSD_RbSI_EEEEC2ERKSJ_
Line
Count
Source
74
30.9k
  explicit RunnableImpl(const F& f) : f_(f) {}
75
938k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: data-patcher.cc:_ZN2yb12RunnableImplIZNS_10TaskRunner6SubmitIZNS_5tools21ChangeTimeInDataFilesENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS6_12basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEENSB_ISD_EEEEbPS1_E3$_3EEvOT_EUlvE_EC2EOSM_
Unexecuted instantiation: data-patcher.cc:_ZN2yb12RunnableImplIZNS_10TaskRunner6SubmitIZNS_5tools19ChangeTimeInWalDirsENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS6_12basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEENSB_ISD_EEEEPS1_E3$_8EEvOT_EUlvE_EC2EOSM_
_ZN2yb12RunnableImplINSt3__16__bindIMNS_9consensus18LocalTestPeerProxyEFvNS3_18ConsensusRequestPBEPNS3_19ConsensusResponsePBEEJPS4_RKS5_RS7_EEEEC2EOSE_
Line
Count
Source
75
631
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_9consensus18LocalTestPeerProxyEFvPKNS3_13VoteRequestPBEPNS3_14VoteResponsePBEEJPS4_RS7_RS9_EEEEC2EOSF_
Line
Count
Source
75
16
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_9consensus10TestDriverEFvvEJPS4_EEEEC2EOS8_
_ZN2yb12RunnableImplINSt3__16__bindIRFvPNS_14CountDownLatchEiEJS4_iEEEEC2EOS7_
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIPFviPiEJiS3_EEEEC2EOS6_
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplIPFvvEEC2EOS2_
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_28TestThreadPool_TestRace_Test8TestBodyEvE3$_2EC2EOS2_
Line
Count
Source
75
500
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_6EC2EOS2_
Line
Count
Source
75
1
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_7EC2EOS2_
Line
Count
Source
75
1
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_8EC2EOS2_
Line
Count
Source
75
1
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_28TestThreadPool_TestFuzz_Test8TestBodyEvE4$_21EC2EOS2_
Line
Count
Source
75
382
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_14load_generator19MultiThreadedActionEFvvEJPS4_EEEEC2EOS8_
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_14load_generator19MultiThreadedActionEFviEJPS4_RiEEEEC2EOS9_
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_14load_generator19MultiThreadedWriterEFvvEJPS4_EEEEC2EOS8_
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master17RetryingTSRpcTaskEFvvEJNS1_10shared_ptrIS4_EEEEEEC2EOS9_
Line
Count
Source
75
271k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFvvEJPS4_EEEEC2EOS8_
Line
Count
Source
75
561
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master18YsqlTransactionDdlEFvRKNS_19TransactionMetadataENS1_8functionIFNS_6StatusEbEEEEJPS4_RS5_RSB_EEEEC2EOSH_
Line
Count
Source
75
1.43k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:_ZN2yb12RunnableImplIZNS_6master14CatalogManager18VerifyTablePgLayerE13scoped_refptrINS1_9TableInfoEEbE3$_7EC2EOS6_
Line
Count
Source
75
23
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFvNS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS1_6vectorI13scoped_refptrINS3_9TableInfoEENS8_ISE_EEEENS_19TransactionMetadataEEJPS4_RKSA_RSG_RSH_EEEEC2EOSP_
Line
Count
Source
75
24
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFvNS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS1_6vectorI13scoped_refptrINS3_9TableInfoEENS8_ISE_EEEENS_19TransactionMetadataEEJPS4_RSA_RSG_RSH_EEEEC2EOSO_
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFv13scoped_refptrINS3_13NamespaceInfoEEEJPS4_RS7_EEEEC2EOSC_
Line
Count
Source
75
47
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:_ZN2yb12RunnableImplIZNS_6master14CatalogManager13EnableBgTasksEvE4$_12EC2EOS3_
Line
Count
Source
75
5.35k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:_ZN2yb12RunnableImplIZNS_6master14CatalogManager12ScheduleTaskENSt3__110shared_ptrINS1_17RetryingTSRpcTaskEEEE4$_16EC2EOS7_
Line
Count
Source
75
212k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
catalog_manager.cc:_ZN2yb12RunnableImplIZZNS_6master14CatalogManager26RebuildYQLSystemPartitionsEvENK4$_18clERKNS_6StatusEEUlvE_EC2EOS7_
Line
Count
Source
75
10.6k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFv13scoped_refptrINS3_13NamespaceInfoEEEJRPS4_RS7_EEEEC2EOSD_
Line
Count
Source
75
2
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
sys_catalog.cc:_ZN2yb12RunnableImplIZNS_6master15SysCatalogTable22SysCatalogStateChangedERKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEENS3_10shared_ptrINS_9consensus18StateChangeContextEEEE3$_0EC2EOSG_
Line
Count
Source
75
32
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: ysql_transaction_ddl.cc:_ZN2yb12RunnableImplIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES8_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_1EC2EOSF_
Unexecuted instantiation: ysql_transaction_ddl.cc:_ZN2yb12RunnableImplIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES8_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_2EC2EOSF_
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master18YsqlTransactionDdlEFvRKNS_19TransactionMetadataENS1_8functionIFNS_6StatusEbEEEEJPS4_S7_RSB_EEEEC2EOSG_
Line
Count
Source
75
4.20k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
ysql_transaction_ddl.cc:_ZN2yb12RunnableImplIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES8_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_3EC2EOSF_
Line
Count
Source
75
1.43k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_18MaintenanceManagerEFvRKNS_22ScopedMaintenanceOpRunEEJPS3_S4_EEEEC2EOSA_
Line
Count
Source
75
8
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS5_INS3_27TransitionInProgressDeleterEEEJPS4_SA_RSC_EEEEC2EOSJ_
Line
Count
Source
75
219
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS5_INS3_27TransitionInProgressDeleterEEEJPS4_RS8_RSC_EEEEC2EOSK_
Line
Count
Source
75
82.2k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS5_INS3_27TransitionInProgressDeleterEEEJPS4_SA_SE_EEEEC2EOSI_
Line
Count
Source
75
86
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRKNS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS1_10shared_ptrINS_9consensus18StateChangeContextEEEEJPS4_SC_RSG_EEEEC2EOSL_
Line
Count
Source
75
338k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvvEJPS5_EEEEC2EOS9_
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvNS_6StatusENS1_10shared_ptrINS_3cdc20GetChangesResponsePBEEEEJPS5_RKS6_RSA_EEEEC2EOSH_
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvNS_3cdc20OutputClientResponseEEJPS5_RS7_EEEEC2EOSC_
_ZN2yb12RunnableImplINSt3__16__bindIMNS_8CallbackIFvvEEEKFvvEJRKS5_EEEEC2EOSA_
Line
Count
Source
75
7.43k
  explicit RunnableImpl(F&& f) : f_(std::move(f)) {}
76
77
 private:
78
971k
  void Run() override {
79
971k
    f_();
80
971k
  }
Unexecuted instantiation: data-patcher.cc:_ZN2yb12RunnableImplIZNS_10TaskRunner6SubmitIZNS_5tools21ChangeTimeInDataFilesENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS6_12basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEENSB_ISD_EEEEbPS1_E3$_3EEvOT_EUlvE_E3RunEv
Unexecuted instantiation: data-patcher.cc:_ZN2yb12RunnableImplIZNS_10TaskRunner6SubmitIZNS_5tools19ChangeTimeInWalDirsENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS6_12basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEENSB_ISD_EEEEPS1_E3$_8EEvOT_EUlvE_E3RunEv
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_9consensus10TestDriverEFvvEJPS4_EEEE3RunEv
_ZN2yb12RunnableImplINSt3__16__bindIMNS_9consensus18LocalTestPeerProxyEFvNS3_18ConsensusRequestPBEPNS3_19ConsensusResponsePBEEJPS4_RKS5_RS7_EEEE3RunEv
Line
Count
Source
78
634
  void Run() override {
79
634
    f_();
80
634
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_9consensus18LocalTestPeerProxyEFvPKNS3_13VoteRequestPBEPNS3_14VoteResponsePBEEJPS4_RS7_RS9_EEEE3RunEv
Line
Count
Source
78
16
  void Run() override {
79
16
    f_();
80
16
  }
_ZN2yb12RunnableImplINSt3__16__bindIRFvPNS_14CountDownLatchEiEJS4_iEEEE3RunEv
Line
Count
Source
78
2
  void Run() override {
79
2
    f_();
80
2
  }
_ZN2yb12RunnableImplINSt3__16__bindIPFviPiEJiS3_EEEE3RunEv
Line
Count
Source
78
2
  void Run() override {
79
2
    f_();
80
2
  }
_ZN2yb12RunnableImplIPFvvEE3RunEv
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_28TestThreadPool_TestRace_Test8TestBodyEvE3$_2E3RunEv
Line
Count
Source
78
500
  void Run() override {
79
500
    f_();
80
500
  }
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_6E3RunEv
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_7E3RunEv
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_8E3RunEv
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_49TestThreadPoolTokenTypes_TestTokenWaitForAll_Test8TestBodyEvE4$_20E3RunEv
Line
Count
Source
78
20
  void Run() override {
79
20
    f_();
80
20
  }
threadpool-test.cc:_ZN2yb12RunnableImplIZNS_28TestThreadPool_TestFuzz_Test8TestBodyEvE4$_21E3RunEv
Line
Count
Source
78
363
  void Run() override {
79
363
    f_();
80
363
  }
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_14load_generator19MultiThreadedActionEFvvEJPS4_EEEE3RunEv
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_14load_generator19MultiThreadedActionEFviEJPS4_RiEEEE3RunEv
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_14load_generator19MultiThreadedWriterEFvvEJPS4_EEEE3RunEv
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master17RetryingTSRpcTaskEFvvEJNS1_10shared_ptrIS4_EEEEEE3RunEv
Line
Count
Source
78
272k
  void Run() override {
79
272k
    f_();
80
272k
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFvvEJPS4_EEEE3RunEv
Line
Count
Source
78
560
  void Run() override {
79
560
    f_();
80
560
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master18YsqlTransactionDdlEFvRKNS_19TransactionMetadataENS1_8functionIFNS_6StatusEbEEEEJPS4_RS5_RSB_EEEE3RunEv
Line
Count
Source
78
1.43k
  void Run() override {
79
1.43k
    f_();
80
1.43k
  }
catalog_manager.cc:_ZN2yb12RunnableImplIZNS_6master14CatalogManager18VerifyTablePgLayerE13scoped_refptrINS1_9TableInfoEEbE3$_7E3RunEv
Line
Count
Source
78
23
  void Run() override {
79
23
    f_();
80
23
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFvNS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS1_6vectorI13scoped_refptrINS3_9TableInfoEENS8_ISE_EEEENS_19TransactionMetadataEEJPS4_RKSA_RSG_RSH_EEEE3RunEv
Line
Count
Source
78
24
  void Run() override {
79
24
    f_();
80
24
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFvNS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS1_6vectorI13scoped_refptrINS3_9TableInfoEENS8_ISE_EEEENS_19TransactionMetadataEEJPS4_RSA_RSG_RSH_EEEE3RunEv
Line
Count
Source
78
1
  void Run() override {
79
1
    f_();
80
1
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFv13scoped_refptrINS3_13NamespaceInfoEEEJPS4_RS7_EEEE3RunEv
Line
Count
Source
78
47
  void Run() override {
79
47
    f_();
80
47
  }
catalog_manager.cc:_ZN2yb12RunnableImplIZNS_6master14CatalogManager13EnableBgTasksEvE4$_12E3RunEv
Line
Count
Source
78
5.35k
  void Run() override {
79
5.35k
    f_();
80
5.35k
  }
catalog_manager.cc:_ZN2yb12RunnableImplIZNS_6master14CatalogManager12ScheduleTaskENSt3__110shared_ptrINS1_17RetryingTSRpcTaskEEEE4$_16E3RunEv
Line
Count
Source
78
212k
  void Run() override {
79
212k
    f_();
80
212k
  }
catalog_manager.cc:_ZN2yb12RunnableImplIZZNS_6master14CatalogManager26RebuildYQLSystemPartitionsEvENK4$_18clERKNS_6StatusEEUlvE_E3RunEv
Line
Count
Source
78
10.6k
  void Run() override {
79
10.6k
    f_();
80
10.6k
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master14CatalogManagerEFv13scoped_refptrINS3_13NamespaceInfoEEEJRPS4_RS7_EEEE3RunEv
Line
Count
Source
78
2
  void Run() override {
79
2
    f_();
80
2
  }
sys_catalog.cc:_ZN2yb12RunnableImplIZNS_6master15SysCatalogTable22SysCatalogStateChangedERKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEENS3_10shared_ptrINS_9consensus18StateChangeContextEEEE3$_0E3RunEv
Line
Count
Source
78
32
  void Run() override {
79
32
    f_();
80
32
  }
Unexecuted instantiation: ysql_transaction_ddl.cc:_ZN2yb12RunnableImplIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES8_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_1E3RunEv
Unexecuted instantiation: ysql_transaction_ddl.cc:_ZN2yb12RunnableImplIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES8_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_2E3RunEv
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6master18YsqlTransactionDdlEFvRKNS_19TransactionMetadataENS1_8functionIFNS_6StatusEbEEEEJPS4_S7_RSB_EEEE3RunEv
Line
Count
Source
78
4.20k
  void Run() override {
79
4.20k
    f_();
80
4.20k
  }
ysql_transaction_ddl.cc:_ZN2yb12RunnableImplIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES8_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_3E3RunEv
Line
Count
Source
78
1.43k
  void Run() override {
79
1.43k
    f_();
80
1.43k
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_18MaintenanceManagerEFvRKNS_22ScopedMaintenanceOpRunEEJPS3_S4_EEEE3RunEv
Line
Count
Source
78
8
  void Run() override {
79
8
    f_();
80
8
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS5_INS3_27TransitionInProgressDeleterEEEJPS4_SA_RSC_EEEE3RunEv
Line
Count
Source
78
219
  void Run() override {
79
219
    f_();
80
219
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS5_INS3_27TransitionInProgressDeleterEEEJPS4_RS8_RSC_EEEE3RunEv
Line
Count
Source
78
82.2k
  void Run() override {
79
82.2k
    f_();
80
82.2k
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS5_INS3_27TransitionInProgressDeleterEEEJPS4_SA_SE_EEEE3RunEv
Line
Count
Source
78
86
  void Run() override {
79
86
    f_();
80
86
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRKNS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS1_10shared_ptrINS_9consensus18StateChangeContextEEEEJPS4_SC_RSG_EEEE3RunEv
Line
Count
Source
78
338k
  void Run() override {
79
338k
    f_();
80
338k
  }
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvvEJPS5_EEEE3RunEv
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvNS_6StatusENS1_10shared_ptrINS_3cdc20GetChangesResponsePBEEEEJPS5_RKS6_RSA_EEEE3RunEv
Unexecuted instantiation: _ZN2yb12RunnableImplINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvNS_3cdc20OutputClientResponseEEJPS5_RS7_EEEE3RunEv
_ZN2yb12RunnableImplINSt3__16__bindIMNS_6client8YBClient4DataEFvNS1_6chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS1_5ratioILl1ELl1000000000EEEEEEEbbEJPS5_RSD_RbSI_EEEE3RunEv
Line
Count
Source
78
32.9k
  void Run() override {
79
32.9k
    f_();
80
32.9k
  }
_ZN2yb12RunnableImplINSt3__16__bindIMNS_8CallbackIFvvEEEKFvvEJRKS5_EEEE3RunEv
Line
Count
Source
78
7.43k
  void Run() override {
79
7.43k
    f_();
80
7.43k
  }
81
82
  F f_;
83
};
84
85
// Interesting thread pool metrics. Can be applied to the entire pool (see
86
// ThreadPoolBuilder) or to individual tokens.
87
struct ThreadPoolMetrics {
88
  // Measures the queue length seen by tasks when they enter the queue.
89
  scoped_refptr<Histogram> queue_length_histogram;
90
91
  // Measures the amount of time that tasks spend waiting in a queue.
92
  scoped_refptr<Histogram> queue_time_us_histogram;
93
94
  // Measures the amount of time that tasks spend running.
95
  scoped_refptr<Histogram> run_time_us_histogram;
96
97
  ~ThreadPoolMetrics();
98
};
99
100
101
// THREAD_POOL_METRICS_DEFINE / THREAD_POOL_METRICS_INSTANCE are helpers which define the metrics
102
// required for a ThreadPoolMetrics object and instantiate said objects, respectively. Example
103
// usage:
104
// // At the top of the file:
105
// THREAD_POOL_METRICS_DEFINE(server, thread_pool_foo, "Thread pool for Foo jobs.")
106
// ...
107
// // Inline:
108
// ThreadPoolBuilder("foo")
109
//   .set_metrics(THREAD_POOL_METRICS_INSTANCE(server_->metric_entity(), thread_pool_foo))
110
//   ...
111
//   .Build(...);
112
#define THREAD_POOL_METRICS_DEFINE(entity, name, label) \
113
    METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _queue_length), \
114
        label " Queue Length", yb::MetricUnit::kMicroseconds, \
115
        label " - queue length histogram."); \
116
    METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _queue_time_us), \
117
        label " Queue Time", yb::MetricUnit::kMicroseconds, \
118
        label " - queue time histogram, microseconds."); \
119
    METRIC_DEFINE_coarse_histogram(entity, BOOST_PP_CAT(name, _run_time_us), \
120
        label " Run Time", yb::MetricUnit::kMicroseconds, \
121
        label " - run time histogram, microseconds.")
122
123
#define THREAD_POOL_METRICS_INSTANCE(entity, name) { \
124
      BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _run_time_us)).Instantiate(entity), \
125
      BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _queue_time_us)).Instantiate(entity), \
126
      BOOST_PP_CAT(METRIC_, BOOST_PP_CAT(name, _run_time_us)).Instantiate(entity) \
127
    }
128
129
// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
130
//
131
// name: Used for debugging output and default names of the worker threads.
132
//    Since thread names are limited to 16 characters on Linux, it's good to
133
//    choose a short name here.
134
//    Required.
135
//
136
// min_threads: Minimum number of threads we'll have at any time.
137
//    Default: 0.
138
//
139
// max_threads: Maximum number of threads we'll have at any time.
140
//    Default: Number of CPUs detected on the system.
141
//
142
// max_queue_size: Maximum number of items to enqueue before returning a
143
//    Status::ServiceUnavailable message from Submit().
144
//    Default: INT_MAX.
145
//
146
// timeout: How long we'll keep around an idle thread before timing it out.
147
//    We always keep at least min_threads.
148
//    Default: 500 milliseconds.
149
//
150
// metrics: Histograms, counters, etc. to update on various threadpool events.
151
//    Default: not set.
152
//
153
class ThreadPoolBuilder {
154
 public:
155
  explicit ThreadPoolBuilder(std::string name);
156
157
  // Note: We violate the style guide by returning mutable references here
158
  // in order to provide traditional Builder pattern conveniences.
159
  ThreadPoolBuilder& set_min_threads(int min_threads);
160
  ThreadPoolBuilder& set_max_threads(int max_threads);
161
  ThreadPoolBuilder& unlimited_threads();
162
  ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
163
  ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
164
  ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
165
166
0
  const std::string& name() const { return name_; }
167
0
  int min_threads() const { return min_threads_; }
168
0
  int max_threads() const { return max_threads_; }
169
0
  int max_queue_size() const { return max_queue_size_; }
170
0
  const MonoDelta& idle_timeout() const { return idle_timeout_; }
171
172
  // Instantiate a new ThreadPool with the existing builder arguments.
173
  CHECKED_STATUS Build(std::unique_ptr<ThreadPool>* pool) const;
174
175
 private:
176
  friend class ThreadPool;
177
  const std::string name_;
178
  int min_threads_;
179
  int max_threads_;
180
  int max_queue_size_;
181
  MonoDelta idle_timeout_;
182
  ThreadPoolMetrics metrics_;
183
184
  DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
185
};
186
187
// Thread pool with a variable number of threads.
188
// The pool can execute a class that implements the Runnable interface, or a
189
// std::function, which can be obtained via std::bind().
190
// Tasks submitted directly to the thread pool enter a FIFO queue and are
191
// dispatched to a worker thread when one becomes free. Tasks may also be
192
// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
193
// can then be used to block on logical groups of tasks.
194
//
195
// A token operates in one of two ExecutionModes, determined at token
196
// construction time:
197
// 1. SERIAL: submitted tasks are run one at a time.
198
// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
199
//    tasks submitted without a token, but the logical grouping that tokens
200
//    impart can be useful when a pool is shared by many contexts (e.g. to
201
//    safely shut down one context, to derive context-specific metrics, etc.).
202
//
203
// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
204
// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
205
// processed in a round-robin fashion, one task at a time. This prevents them
206
// from starving one another. However, tokenless (and CONCURRENT token-based)
207
// tasks can starve SERIAL token-based tasks.
208
//
209
// Usage Example:
210
//    static void Func(int n) { ... }
211
//    class Task : public Runnable { ... }
212
//
213
//    std::unique_ptr<ThreadPool> thread_pool;
214
//    CHECK_OK(
215
//        ThreadPoolBuilder("my_pool")
216
//            .set_min_threads(0)
217
//            .set_max_threads(5)
218
//            .set_max_queue_size(10)
219
//            .set_timeout(MonoDelta::FromMilliseconds(2000))
220
//            .Build(&thread_pool));
221
//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
222
//    thread_pool->Submit(std::bind(&Func, 10));
223
class ThreadPool {
224
 public:
225
  ~ThreadPool();
226
227
  // Wait for the running tasks to complete and then shutdown the threads.
228
  // All the other pending tasks in the queue will be removed.
229
  // NOTE: That the user may implement an external abort logic for the
230
  //       runnables, that must be called before Shutdown(), if the system
231
  //       should know about the non-execution of these tasks, or the runnable
232
  //       require an explicit "abort" notification to exit from the run loop.
233
  void Shutdown();
234
235
  // Submit a function using the yb Closure system.
236
  CHECKED_STATUS SubmitClosure(const Closure& task);
237
238
  // Submit a function binded using std::bind(&FuncName, args...)
239
  CHECKED_STATUS SubmitFunc(const std::function<void()>& func);
240
  CHECKED_STATUS SubmitFunc(std::function<void()>&& func);
241
242
1.01k
  CHECKED_STATUS SubmitFunc(std::function<void()>& func) { // NOLINT
243
1.01k
    const auto& const_func = func;
244
1.01k
    return SubmitFunc(const_func);
245
1.01k
  }
246
247
  template <class F>
248
938k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
938k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
938k
  }
Unexecuted instantiation: data-patcher.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_10TaskRunner6SubmitIZNS_5tools21ChangeTimeInDataFilesENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENSC_ISE_EEEEbPS2_E3$_3EEvOT_EUlvE_EENS_6StatusESM_
Unexecuted instantiation: data-patcher.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_10TaskRunner6SubmitIZNS_5tools19ChangeTimeInWalDirsENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENSC_ISE_EEEEPS2_E3$_8EEvOT_EUlvE_EENS_6StatusESM_
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_9consensus10TestDriverEFvvEJPS5_EEEEENS_6StatusEOT_
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_9consensus18LocalTestPeerProxyEFvNS4_18ConsensusRequestPBEPNS4_19ConsensusResponsePBEEJPS5_RKS6_RS8_EEEEENS_6StatusEOT_
Line
Count
Source
248
634
  CHECKED_STATUS SubmitFunc(F&& f) {
249
634
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
634
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_9consensus18LocalTestPeerProxyEFvPKNS4_13VoteRequestPBEPNS4_14VoteResponsePBEEJPS5_RS8_RSA_EEEEENS_6StatusEOT_
Line
Count
Source
248
16
  CHECKED_STATUS SubmitFunc(F&& f) {
249
16
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
16
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIRFvPNS_14CountDownLatchEiEJS5_iEEEEENS_6StatusEOT_
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIPFviPiEJiS4_EEEEENS_6StatusEOT_
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
_ZN2yb10ThreadPool10SubmitFuncIPFvvEEENS_6StatusEOT_
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_28TestThreadPool_TestRace_Test8TestBodyEvE3$_2EENS_6StatusEOT_
Line
Count
Source
248
500
  CHECKED_STATUS SubmitFunc(F&& f) {
249
500
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
500
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_6EENS_6StatusEOT_
Line
Count
Source
248
1
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_7EENS_6StatusEOT_
Line
Count
Source
248
1
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_31TestThreadPool_TestMetrics_Test8TestBodyEvE3$_8EENS_6StatusEOT_
Line
Count
Source
248
1
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_28TestThreadPool_TestFuzz_Test8TestBodyEvE4$_21EENS_6StatusEOT_
Line
Count
Source
248
382
  CHECKED_STATUS SubmitFunc(F&& f) {
249
382
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
382
  }
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_14load_generator19MultiThreadedActionEFvvEJPS5_EEEEENS_6StatusEOT_
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_14load_generator19MultiThreadedActionEFviEJPS5_RiEEEEENS_6StatusEOT_
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_14load_generator19MultiThreadedWriterEFvvEJPS5_EEEEENS_6StatusEOT_
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master17RetryingTSRpcTaskEFvvEJNS2_10shared_ptrIS5_EEEEEEENS_6StatusEOT_
Line
Count
Source
248
271k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
271k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
271k
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master14CatalogManagerEFvvEJPS5_EEEEENS_6StatusEOT_
Line
Count
Source
248
561
  CHECKED_STATUS SubmitFunc(F&& f) {
249
561
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
561
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master18YsqlTransactionDdlEFvRKNS_19TransactionMetadataENS2_8functionIFNS_6StatusEbEEEEJPS5_RS6_RSC_EEEEESA_OT_
Line
Count
Source
248
1.43k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1.43k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1.43k
  }
catalog_manager.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master14CatalogManager18VerifyTablePgLayerE13scoped_refptrINS2_9TableInfoEEbE3$_7EENS_6StatusEOT_
Line
Count
Source
248
23
  CHECKED_STATUS SubmitFunc(F&& f) {
249
23
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
23
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master14CatalogManagerEFvNS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS2_6vectorI13scoped_refptrINS4_9TableInfoEENS9_ISF_EEEENS_19TransactionMetadataEEJPS5_RKSB_RSH_RSI_EEEEENS_6StatusEOT_
Line
Count
Source
248
24
  CHECKED_STATUS SubmitFunc(F&& f) {
249
24
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
24
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master14CatalogManagerEFvNS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS2_6vectorI13scoped_refptrINS4_9TableInfoEENS9_ISF_EEEENS_19TransactionMetadataEEJPS5_RSB_RSH_RSI_EEEEENS_6StatusEOT_
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master14CatalogManagerEFv13scoped_refptrINS4_13NamespaceInfoEEEJPS5_RS8_EEEEENS_6StatusEOT_
Line
Count
Source
248
47
  CHECKED_STATUS SubmitFunc(F&& f) {
249
47
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
47
  }
catalog_manager.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master14CatalogManager13EnableBgTasksEvE4$_12EENS_6StatusEOT_
Line
Count
Source
248
5.35k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
5.35k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
5.35k
  }
catalog_manager.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master14CatalogManager12ScheduleTaskENSt3__110shared_ptrINS2_17RetryingTSRpcTaskEEEE4$_16EENS_6StatusEOT_
Line
Count
Source
248
212k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
212k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
212k
  }
catalog_manager.cc:_ZN2yb10ThreadPool10SubmitFuncIZZNS_6master14CatalogManager26RebuildYQLSystemPartitionsEvENK4$_18clERKNS_6StatusEEUlvE_EES5_OT_
Line
Count
Source
248
10.6k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
10.6k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
10.6k
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master14CatalogManagerEFv13scoped_refptrINS4_13NamespaceInfoEEEJRPS5_RS8_EEEEENS_6StatusEOT_
Line
Count
Source
248
2
  CHECKED_STATUS SubmitFunc(F&& f) {
249
2
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
2
  }
sys_catalog.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master15SysCatalogTable22SysCatalogStateChangedERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEENS4_10shared_ptrINS_9consensus18StateChangeContextEEEE3$_0EENS_6StatusEOT_
Line
Count
Source
248
32
  CHECKED_STATUS SubmitFunc(F&& f) {
249
32
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
32
  }
Unexecuted instantiation: ysql_transaction_ddl.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES9_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_1EES9_OT_
Unexecuted instantiation: ysql_transaction_ddl.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES9_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_2EES9_OT_
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6master18YsqlTransactionDdlEFvRKNS_19TransactionMetadataENS2_8functionIFNS_6StatusEbEEEEJPS5_S8_RSC_EEEEESA_OT_
Line
Count
Source
248
4.20k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
4.20k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
4.20k
  }
ysql_transaction_ddl.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_6master18YsqlTransactionDdl19TransactionReceivedERKNS_19TransactionMetadataENSt3__18functionIFNS_6StatusEbEEES9_RKNS_7tserver30GetTransactionStatusResponsePBEE3$_3EES9_OT_
Line
Count
Source
248
1.43k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
1.43k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
1.43k
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_18MaintenanceManagerEFvRKNS_22ScopedMaintenanceOpRunEEJPS4_S5_EEEEENS_6StatusEOT_
Line
Count
Source
248
8
  CHECKED_STATUS SubmitFunc(F&& f) {
249
8
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
8
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS6_INS4_27TransitionInProgressDeleterEEEJPS5_SB_RSD_EEEEENS_6StatusEOT_
Line
Count
Source
248
219
  CHECKED_STATUS SubmitFunc(F&& f) {
249
219
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
219
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS6_INS4_27TransitionInProgressDeleterEEEJPS5_RS9_RSD_EEEEENS_6StatusEOT_
Line
Count
Source
248
82.2k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
82.2k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
82.2k
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRK13scoped_refptrINS_6tablet17RaftGroupMetadataEERKS6_INS4_27TransitionInProgressDeleterEEEJPS5_SB_SF_EEEEENS_6StatusEOT_
Line
Count
Source
248
86
  CHECKED_STATUS SubmitFunc(F&& f) {
249
86
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
86
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver15TSTabletManagerEFvRKNS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS2_10shared_ptrINS_9consensus18StateChangeContextEEEEJPS5_SD_RSH_EEEEENS_6StatusEOT_
Line
Count
Source
248
338k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
338k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
338k
  }
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvvEJPS6_EEEEENS_6StatusEOT_
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvNS_6StatusENS2_10shared_ptrINS_3cdc20GetChangesResponsePBEEEEJPS6_RKS7_RSB_EEEEES7_OT_
Unexecuted instantiation: _ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_7tserver10enterprise9CDCPollerEFvNS_3cdc20OutputClientResponseEEJPS6_RS8_EEEEENS_6StatusEOT_
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_8CallbackIFvvEEEKFvvEJRKS6_EEEEENS_6StatusEOT_
Line
Count
Source
248
7.43k
  CHECKED_STATUS SubmitFunc(F&& f) {
249
7.43k
    return Submit(std::make_shared<RunnableImpl<F>>(std::forward<F>(f)));
250
7.43k
  }
251
252
  template <class F>
253
32.3k
  CHECKED_STATUS SubmitFunc(const F& f) {
254
32.3k
    return Submit(std::make_shared<RunnableImpl<F>>(f));
255
32.3k
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_49TestThreadPoolTokenTypes_TestTokenWaitForAll_Test8TestBodyEvE4$_20EENS_6StatusERKT_
Line
Count
Source
253
20
  CHECKED_STATUS SubmitFunc(const F& f) {
254
20
    return Submit(std::make_shared<RunnableImpl<F>>(f));
255
20
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6client8YBClient4DataEFvNS2_6chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS2_5ratioILl1ELl1000000000EEEEEEEbbEJPS6_RSE_RbSJ_EEEEENS_6StatusERKT_
Line
Count
Source
253
32.2k
  CHECKED_STATUS SubmitFunc(const F& f) {
254
32.2k
    return Submit(std::make_shared<RunnableImpl<F>>(f));
255
32.2k
  }
256
257
  template <class F>
258
32.1k
  CHECKED_STATUS SubmitFunc(F& f) { // NOLINT
259
32.1k
    const auto& const_f = f;
260
32.1k
    return SubmitFunc(const_f);
261
32.1k
  }
threadpool-test.cc:_ZN2yb10ThreadPool10SubmitFuncIZNS_49TestThreadPoolTokenTypes_TestTokenWaitForAll_Test8TestBodyEvE4$_20EENS_6StatusERT_
Line
Count
Source
258
20
  CHECKED_STATUS SubmitFunc(F& f) { // NOLINT
259
20
    const auto& const_f = f;
260
20
    return SubmitFunc(const_f);
261
20
  }
_ZN2yb10ThreadPool10SubmitFuncINSt3__16__bindIMNS_6client8YBClient4DataEFvNS2_6chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS2_5ratioILl1ELl1000000000EEEEEEEbbEJPS6_RSE_RbSJ_EEEEENS_6StatusERT_
Line
Count
Source
258
32.0k
  CHECKED_STATUS SubmitFunc(F& f) { // NOLINT
259
32.0k
    const auto& const_f = f;
260
32.0k
    return SubmitFunc(const_f);
261
32.0k
  }
262
263
  // Submit a Runnable class
264
  CHECKED_STATUS Submit(const std::shared_ptr<Runnable>& task);
265
266
  // Wait until all the tasks are completed.
267
  void Wait();
268
269
  // Waits for the pool to reach the idle state, or until 'until' time is reached.
270
  // Returns true if the pool reached the idle state, false otherwise.
271
  bool WaitUntil(const MonoTime& until);
272
273
  // Waits for the pool to reach the idle state, or until 'delta' time elapses.
274
  // Returns true if the pool reached the idle state, false otherwise.
275
  bool WaitFor(const MonoDelta& delta);
276
277
  // Allocates a new token for use in token-based task submission. All tokens
278
  // must be destroyed before their ThreadPool is destroyed.
279
  //
280
  // There is no limit on the number of tokens that may be allocated.
281
  enum class ExecutionMode {
282
    // Tasks submitted via this token will be executed serially.
283
    SERIAL,
284
285
    // Tasks submitted via this token may be executed concurrently.
286
    CONCURRENT,
287
  };
288
  std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
289
290
  // Like NewToken(), but lets the caller provide metrics for the token. These
291
  // metrics are incremented/decremented in addition to the configured
292
  // pool-wide metrics (if any).
293
  std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode,
294
                                                       ThreadPoolMetrics metrics);
295
296
 private:
297
  friend class ThreadPoolBuilder;
298
  friend class ThreadPoolToken;
299
300
301
  // Create a new thread pool using a builder.
302
  explicit ThreadPool(const ThreadPoolBuilder& builder);
303
304
  // Initialize the thread pool by starting the minimum number of threads.
305
  CHECKED_STATUS Init();
306
307
  // Dispatcher responsible for dequeueing and executing the tasks
308
  void DispatchThread(bool permanent);
309
310
  // Create new thread. Required that lock_ is held.
311
  CHECKED_STATUS CreateThreadUnlocked();
312
313
 private:
314
  FRIEND_TEST(TestThreadPool, TestThreadPoolWithNoMinimum);
315
  FRIEND_TEST(TestThreadPool, TestThreadPoolWithNoMaxThreads);
316
  FRIEND_TEST(TestThreadPool, TestVariableSizeThreadPool);
317
  // Aborts if the current thread is a member of this thread pool.
318
  void CheckNotPoolThreadUnlocked();
319
320
  struct Task {
321
    std::shared_ptr<Runnable> runnable;
322
    Trace* trace;
323
324
    // Time at which the entry was submitted to the pool.
325
    MonoTime submit_time;
326
  };
327
  // Submits a task to be run via token.
328
  Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
329
330
  // Releases token 't' and invalidates it.
331
  void ReleaseToken(ThreadPoolToken* t);
332
333
  const std::string name_;
334
  const int min_threads_;
335
  const int max_threads_;
336
  const int max_queue_size_;
337
  const MonoDelta idle_timeout_;
338
339
  Status pool_status_;
340
  Mutex lock_;
341
  ConditionVariable idle_cond_;
342
  ConditionVariable no_threads_cond_;
343
  ConditionVariable not_empty_;
344
  int num_threads_;
345
  int active_threads_;
346
347
  // Total number of client tasks queued, either directly (queue_) or
348
  // indirectly (tokens_).
349
  // Protected by lock_.
350
  int total_queued_tasks_;
351
352
  // All allocated tokens.
353
  // Tokens are owned by the clients.
354
  //
355
  // Protected by lock_.
356
  std::unordered_set<ThreadPoolToken*> tokens_;
357
358
  // FIFO of tokens from which tasks should be executed. Does not own the
359
  // tokens; they are owned by clients and are removed from the FIFO on shutdown.
360
  //
361
  // Protected by lock_.
362
  std::deque<ThreadPoolToken*> queue_;
363
364
  // Pointers to all running threads. Raw pointers are safe because a Thread
365
  // may only go out of scope after being removed from threads_.
366
  //
367
  // Protected by lock_.
368
  std::unordered_set<Thread*> threads_;
369
370
  // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
371
  std::unique_ptr<ThreadPoolToken> tokenless_;
372
373
  // Metrics for the entire thread pool.
374
  const ThreadPoolMetrics metrics_;
375
376
  DISALLOW_COPY_AND_ASSIGN(ThreadPool);
377
};
378
379
// All possible token states. Legal state transitions:
380
//   kIdle      -> kRunning: task is submitted via token
381
//   kIdle      -> kQuiesced: token or pool is shut down
382
//   kRunning   -> kIdle: worker thread finishes executing a task and
383
//                      there are no more tasks queued to the token
384
//   kRunning   -> kQuiescing: token or pool is shut down while worker thread
385
//                           is executing a task
386
//   kRunning   -> kQuiesced: token or pool is shut down
387
//   kQuiescing -> kQuiesced:  worker thread finishes executing a task
388
//                           belonging to a shut down token or pool
389
YB_DEFINE_ENUM(ThreadPoolTokenState,
390
                   // Token has no queued tasks.
391
                   (kIdle)
392
393
                   // A worker thread is running one of the token's previously queued tasks.
394
                   (kRunning)
395
396
                   // No new tasks may be submitted to the token. A worker thread is still
397
                   // running a previously queued task.
398
                   (kQuiescing)
399
400
                   // No new tasks may be submitted to the token. There are no active tasks
401
                   // either. At this state, the token may only be destroyed.
402
                   (kQuiesced));
403
404
// Entry point for token-based task submission and blocking for a particular
405
// thread pool. Tokens can only be created via ThreadPool::NewToken().
406
//
407
// All functions are thread-safe. Mutable members are protected via the
408
// ThreadPool's lock.
409
class ThreadPoolToken {
410
 public:
411
  // Destroys the token.
412
  //
413
  // May be called on a token with outstanding tasks, as Shutdown() will be
414
  // called first to take care of them.
415
  ~ThreadPoolToken();
416
417
  // Submits a function using the yb Closure system.
418
  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
419
420
  // Submits a function bound using boost::bind(&FuncName, args...).
421
  Status SubmitFunc(const std::function<void()> f) WARN_UNUSED_RESULT;
422
423
  // Submits a Runnable class.
424
  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
425
426
  // Marks the token as unusable for future submissions. Any queued tasks not
427
  // yet running are destroyed. If tasks are in flight, Shutdown() will wait
428
  // on their completion before returning.
429
  void Shutdown();
430
431
  // Waits until all the tasks submitted via this token are completed.
432
  void Wait();
433
434
  // Waits for all submissions using this token are complete, or until 'until'
435
  // time is reached.
436
  //
437
  // Returns true if all submissions are complete, false otherwise.
438
  bool WaitUntil(const MonoTime& until);
439
440
  // Waits for all submissions using this token are complete, or until 'delta'
441
  // time elapses.
442
  //
443
  // Returns true if all submissions are complete, false otherwise.
444
  bool WaitFor(const MonoDelta& delta);
445
446
 private:
447
  friend class ThreadPool;
448
449
  // Returns a textual representation of 's' suitable for debugging.
450
  static const char* StateToString(ThreadPoolTokenState s);
451
452
  // Constructs a new token.
453
  //
454
  // The token may not outlive its thread pool ('pool').
455
  ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, ThreadPoolMetrics metrics);
456
457
  // Changes this token's state to 'new_state' taking actions as needed.
458
  void Transition(ThreadPoolTokenState new_state);
459
460
  // Returns true if this token has a task queued and ready to run, or if a
461
  // task belonging to this token is already running.
462
41.4M
  bool IsActive() const {
463
41.4M
    return state_ == ThreadPoolTokenState::kRunning ||
464
32.3M
           state_ == ThreadPoolTokenState::kQuiescing;
465
41.4M
  }
466
467
  // Returns true if new tasks may be submitted to this token.
468
41.1M
  bool MaySubmitNewTasks() const {
469
41.1M
    return state_ != ThreadPoolTokenState::kQuiescing &&
470
41.1M
           state_ != ThreadPoolTokenState::kQuiesced;
471
41.1M
  }
472
473
124M
  ThreadPoolTokenState state() const { return state_; }
474
24.1M
  ThreadPool::ExecutionMode mode() const { return mode_; }
475
476
  // Token's configured execution mode.
477
  const ThreadPool::ExecutionMode mode_;
478
479
  // Pointer to the token's thread pool.
480
  ThreadPool* pool_;
481
482
  // Metrics for just this token.
483
  const ThreadPoolMetrics metrics_;
484
485
  // Token state machine.
486
  ThreadPoolTokenState state_;
487
488
  // Queued client tasks.
489
  std::deque<ThreadPool::Task> entries_;
490
491
  // Condition variable for "token is idle". Waiters wake up when the token
492
  // transitions to kIdle or kQuiesced.
493
  ConditionVariable not_running_cond_;
494
495
  // Number of worker threads currently executing tasks belonging to this
496
  // token.
497
  int active_threads_;
498
499
  DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
500
};
501
502
////////////////////////////////////////////////////////
503
// FunctionRunnable
504
////////////////////////////////////////////////////////
505
506
class FunctionRunnable : public Runnable {
507
 public:
508
40.1M
  explicit FunctionRunnable(std::function<void()> func) : func_(std::move(func)) {}
509
510
40.2M
  void Run() override {
511
40.2M
    func_();
512
40.2M
  }
513
514
 private:
515
  std::function<void()> func_;
516
};
517
518
// Runs submitted tasks in created thread pool with specified concurrency.
519
class TaskRunner {
520
 public:
521
0
  TaskRunner() = default;
522
523
  CHECKED_STATUS Init(int concurrency);
524
525
  template <class F>
526
0
  void Submit(F&& f) {
527
0
    ++running_tasks_;
528
0
    auto status = thread_pool_->SubmitFunc([this, f = std::forward<F>(f)]() {
529
0
      auto status = f();
530
0
      CompleteTask(status);
531
0
    });
Unexecuted instantiation: data-patcher.cc:_ZZN2yb10TaskRunner6SubmitIZNS_5tools21ChangeTimeInDataFilesENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEENSA_ISC_EEEEbPS0_E3$_3EEvOT_ENKUlvE_clEv
Unexecuted instantiation: data-patcher.cc:_ZZN2yb10TaskRunner6SubmitIZNS_5tools19ChangeTimeInWalDirsENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEENSA_ISC_EEEEPS0_E3$_8EEvOT_ENKUlvE_clEv
532
0
    if (!status.ok()) {
533
0
      CompleteTask(status);
534
0
    }
535
0
  }
Unexecuted instantiation: data-patcher.cc:_ZN2yb10TaskRunner6SubmitIZNS_5tools21ChangeTimeInDataFilesENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEENSA_ISC_EEEEbPS0_E3$_3EEvOT_
Unexecuted instantiation: data-patcher.cc:_ZN2yb10TaskRunner6SubmitIZNS_5tools19ChangeTimeInWalDirsENS_9MonoDeltaENS_10HybridTimeEmRKNSt3__16vectorINS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEENSA_ISC_EEEEPS0_E3$_8EEvOT_
536
537
  CHECKED_STATUS Wait();
538
539
 private:
540
  void CompleteTask(const Status& status);
541
542
  std::unique_ptr<ThreadPool> thread_pool_;
543
  std::atomic<size_t> running_tasks_{0};
544
  std::atomic<bool> failed_{false};
545
  Status first_failure_;
546
  std::mutex mutex_;
547
  std::condition_variable cond_;
548
};
549
550
} // namespace yb
551
#endif // YB_UTIL_THREADPOOL_H