YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/thread.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_UTIL_THREAD_H
34
#define YB_UTIL_THREAD_H
35
36
#include <pthread.h>
37
#include <sys/syscall.h>
38
#include <sys/types.h>
39
40
#include <functional>
41
#include <string>
42
#include <vector>
43
44
#include "yb/gutil/atomicops.h"
45
#include "yb/gutil/callback.h"
46
#include "yb/gutil/ref_counted.h"
47
48
#include "yb/util/countdown_latch.h"
49
#include "yb/util/monotime.h"
50
#include "yb/util/result.h"
51
#include "yb/util/stack_trace.h"
52
53
namespace yb {
54
55
class MetricEntity;
56
class Thread;
57
class WebCallbackRegistry;
58
59
// Utility to join on a thread, printing warning messages if it
60
// takes too long. For example:
61
//
62
//   ThreadJoiner(&my_thread, "processing thread")
63
//     .warn_after_ms(1000)
64
//     .warn_every_ms(5000)
65
//     .Join();
66
//
67
// TODO: would be nice to offer a way to use ptrace() or signals to
68
// dump the stack trace of the thread we're trying to join on if it
69
// gets stuck. But, after looking for 20 minutes or so, it seems
70
// pretty complicated to get right.
71
class ThreadJoiner {
72
 public:
73
  explicit ThreadJoiner(Thread* thread);
74
75
  // Start emitting warnings after specified duration.
76
  //
77
  // Default: 1000 ms.
78
  ThreadJoiner& warn_after(MonoDelta duration);
79
80
  // After the warnings after started, emit another warning at the
81
  // given interval.
82
  //
83
  // Default: 1000 ms.
84
  ThreadJoiner& warn_every(MonoDelta duration);
85
86
  // If the thread has not stopped after this duration, give up
87
  // joining on it and return Status::Aborted.
88
  //
89
  // MonoDelta::kMax (the default) means to wait forever trying to join.
90
  ThreadJoiner& give_up_after(MonoDelta duration);
91
92
  // Join the thread, subject to the above parameters. If the thread joining
93
  // fails for any reason, returns RuntimeError. If it times out, returns
94
  // Aborted.
95
  CHECKED_STATUS Join();
96
97
 private:
98
  Thread* thread_;
99
100
  MonoDelta warn_after_ = MonoDelta::FromMilliseconds(1000);
101
  MonoDelta warn_every_ = MonoDelta::FromMilliseconds(1000);
102
  MonoDelta give_up_after_ = MonoDelta::kMax;
103
104
  DISALLOW_COPY_AND_ASSIGN(ThreadJoiner);
105
};
106
107
typedef scoped_refptr<Thread> ThreadPtr;
108
109
// Thin wrapper around pthread that can register itself with the singleton ThreadMgr
110
// (a private class implemented in thread.cc entirely, which tracks all live threads so
111
// that they may be monitored via the debug webpages). This class has a limited subset of
112
// std::thread's API. Construction is almost the same, but clients must supply a
113
// category and a name for each thread so that they can be identified in the debug web
114
// UI. Otherwise, Join() is the only supported method from std::thread.
115
//
116
// Each Thread object knows its operating system thread ID (TID), which can be used to
117
// attach debuggers to specific threads, to retrieve resource-usage statistics from the
118
// operating system, and to assign threads to resource control groups.
119
//
120
// Threads are shared objects, but in a degenerate way. They may only have
121
// up to two referents: the caller that created the thread (parent), and
122
// the thread itself (child). Moreover, the only two methods to mutate state
123
// (Join() and the destructor) are constrained: the child may not Join() on
124
// itself, and the destructor is only run when there's one referent left.
125
// These constraints allow us to access thread internals without any locks.
126
//
127
// TODO: Consider allowing fragment IDs as category parameters.
128
class Thread : public RefCountedThreadSafe<Thread> {
129
 public:
130
  static const char kPaddingChar;
131
132
  // This constructor pattern mimics that in std::thread. There is
133
  // one constructor for each number of arguments that the thread
134
  // function accepts. To extend the set of acceptable signatures, add
135
  // another constructor with <class F, class A1.... class An>.
136
  //
137
  // In general:
138
  //  - category: string identifying the thread category to which this thread belongs,
139
  //    used for organising threads together on the debug UI.
140
  //  - name: name of this thread. Will be appended with "-<thread-id>" to ensure
141
  //    uniqueness.
142
  //  - F - a method type that supports operator(), and the instance passed to the
143
  //    constructor is executed immediately in a separate thread.
144
  //  - A1...An - argument types whose instances are passed to f(...)
145
  //  - holder - optional shared pointer to hold a reference to the created thread.
146
  template <class F>
147
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
148
106k
                       scoped_refptr<Thread>* holder) {
149
106k
    return StartThread(category, name, f, holder);
150
106k
  }
yb::Status yb::Thread::Create<std::__1::__bind<void (yb::MaintenanceManager::*)(), yb::MaintenanceManager*> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::__bind<void (yb::MaintenanceManager::*)(), yb::MaintenanceManager*> const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
148
16.6k
                       scoped_refptr<Thread>* holder) {
149
16.6k
    return StartThread(category, name, f, holder);
150
16.6k
  }
thread_posix.cc:yb::Status yb::Thread::Create<rocksdb::ThreadPool::StartBGThreads()::$_0>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, rocksdb::ThreadPool::StartBGThreads()::$_0 const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
148
89.8k
                       scoped_refptr<Thread>* holder) {
149
89.8k
    return StartThread(category, name, f, holder);
150
89.8k
  }
yb::Status yb::Thread::Create<std::__1::__bind<void (yb::PstackWatcher::*)(), yb::PstackWatcher*> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::__bind<void (yb::PstackWatcher::*)(), yb::PstackWatcher*> const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
148
3
                       scoped_refptr<Thread>* holder) {
149
3
    return StartThread(category, name, f, holder);
150
3
  }
151
152
  template <class F, class A1>
153
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
154
601k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
601k
    return StartThread(category, name, std::bind(f, a1), holder);
156
601k
  }
yb::Status yb::Thread::Create<void (yb::tserver::Heartbeater::Thread::*)(), yb::tserver::Heartbeater::Thread*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::tserver::Heartbeater::Thread::* const&)(), yb::tserver::Heartbeater::Thread* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
8.58k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
8.58k
    return StartThread(category, name, std::bind(f, a1), holder);
156
8.58k
  }
yb::Status yb::Thread::Create<void (yb::tserver::MetricsSnapshotter::Thread::*)(), yb::tserver::MetricsSnapshotter::Thread*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::tserver::MetricsSnapshotter::Thread::* const&)(), yb::tserver::MetricsSnapshotter::Thread* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
1
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
1
    return StartThread(category, name, std::bind(f, a1), holder);
156
1
  }
yb::Status yb::Thread::Create<void (yb::tserver::RemoteBootstrapServiceImpl::*)(), yb::tserver::RemoteBootstrapServiceImpl*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::tserver::RemoteBootstrapServiceImpl::* const&)(), yb::tserver::RemoteBootstrapServiceImpl* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
16.7k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
16.7k
    return StartThread(category, name, std::bind(f, a1), holder);
156
16.7k
  }
Unexecuted instantiation: yb::Status yb::Thread::Create<void (yb::tserver::enterprise::CDCConsumer::*)(), yb::tserver::enterprise::CDCConsumer*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::tserver::enterprise::CDCConsumer::* const&)(), yb::tserver::enterprise::CDCConsumer* const&, scoped_refptr<yb::Thread>*)
yb::Status yb::Thread::Create<void (yb::rpc::Acceptor::*)(), yb::rpc::Acceptor*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::rpc::Acceptor::* const&)(), yb::rpc::Acceptor* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
25.7k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
25.7k
    return StartThread(category, name, std::bind(f, a1), holder);
156
25.7k
  }
yb::Status yb::Thread::Create<void (yb::rpc::Reactor::*)(), yb::rpc::Reactor*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::rpc::Reactor::* const&)(), yb::rpc::Reactor* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
284k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
284k
    return StartThread(category, name, std::bind(f, a1), holder);
156
284k
  }
thread_pool.cc:yb::Status yb::Thread::Create<void (yb::rpc::(anonymous namespace)::Worker::*)(), yb::rpc::(anonymous namespace)::Worker*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::rpc::(anonymous namespace)::Worker::* const&)(), yb::rpc::(anonymous namespace)::Worker* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
239k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
239k
    return StartThread(category, name, std::bind(f, a1), holder);
156
239k
  }
Unexecuted instantiation: yb::Status yb::Thread::Create<void (yb::server::RpcServerBase::*)(), yb::server::RpcServerBase*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::server::RpcServerBase::* const&)(), yb::server::RpcServerBase* const&, scoped_refptr<yb::Thread>*)
yb::Status yb::Thread::Create<void (yb::master::CatalogManagerBgTasks::*)(), yb::master::CatalogManagerBgTasks*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::master::CatalogManagerBgTasks::* const&)(), yb::master::CatalogManagerBgTasks* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
7.94k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
7.94k
    return StartThread(category, name, std::bind(f, a1), holder);
156
7.94k
  }
yb::Status yb::Thread::Create<void (yb::pgwrapper::PgSupervisor::*)(), yb::pgwrapper::PgSupervisor*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::pgwrapper::PgSupervisor::* const&)(), yb::pgwrapper::PgSupervisor* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
1.99k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
1.99k
    return StartThread(category, name, std::bind(f, a1), holder);
156
1.99k
  }
yb::Status yb::Thread::Create<void (yb::BackgroundTask::*)(), yb::BackgroundTask*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::BackgroundTask::* const&)(), yb::BackgroundTask* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
16.6k
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
16.6k
    return StartThread(category, name, std::bind(f, a1), holder);
156
16.6k
  }
yb::Status yb::Thread::Create<void (yb::debug::TraceSamplingThread::*)(), yb::debug::TraceSamplingThread*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::debug::TraceSamplingThread::* const&)(), yb::debug::TraceSamplingThread* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
1
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
1
    return StartThread(category, name, std::bind(f, a1), holder);
156
1
  }
yb::Status yb::Thread::Create<void (yb::RandomizedFailureMonitor::*)(), yb::RandomizedFailureMonitor*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::RandomizedFailureMonitor::* const&)(), yb::RandomizedFailureMonitor* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
1
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
1
    return StartThread(category, name, std::bind(f, a1), holder);
156
1
  }
yb::Status yb::Thread::Create<void (yb::TimeSeriesCollector::*)(), yb::TimeSeriesCollector*>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::TimeSeriesCollector::* const&)(), yb::TimeSeriesCollector* const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
154
6
                       const A1& a1, scoped_refptr<Thread>* holder) {
155
6
    return StartThread(category, name, std::bind(f, a1), holder);
156
6
  }
157
158
  template <class F, class A1, class A2>
159
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
160
1.01M
                       const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
161
1.01M
    return StartThread(category, name, std::bind(f, a1, a2), holder);
162
1.01M
  }
yb::Status yb::Thread::Create<void (yb::TestWorkload::State::*)(yb::TestWorkloadOptions const&), yb::TestWorkload::State*, yb::TestWorkloadOptions>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::TestWorkload::State::* const&)(yb::TestWorkloadOptions const&), yb::TestWorkload::State* const&, yb::TestWorkloadOptions const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
160
165
                       const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
161
165
    return StartThread(category, name, std::bind(f, a1, a2), holder);
162
165
  }
yb::Status yb::Thread::Create<void (yb::ThreadPool::*)(bool), yb::ThreadPool*, bool>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, void (yb::ThreadPool::* const&)(bool), yb::ThreadPool* const&, bool const&, scoped_refptr<yb::Thread>*)
Line
Count
Source
160
1.01M
                       const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
161
1.01M
    return StartThread(category, name, std::bind(f, a1, a2), holder);
162
1.01M
  }
163
164
  template <class F, class A1, class A2, class A3>
165
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
166
                       const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) {
167
    return StartThread(category, name, std::bind(f, a1, a2, a3), holder);
168
  }
169
170
  template <class F, class A1, class A2, class A3, class A4>
171
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
172
                       const A1& a1, const A2& a2, const A3& a3, const A4& a4,
173
                       scoped_refptr<Thread>* holder) {
174
    return StartThread(category, name, std::bind(f, a1, a2, a3, a4), holder);
175
  }
176
177
  template <class F, class A1, class A2, class A3, class A4, class A5>
178
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
179
                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
180
                       scoped_refptr<Thread>* holder) {
181
    return StartThread(category, name, std::bind(f, a1, a2, a3, a4, a5), holder);
182
  }
183
184
  template <class F, class A1, class A2, class A3, class A4, class A5, class A6>
185
  static CHECKED_STATUS Create(const std::string& category, const std::string& name, const F& f,
186
                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
187
                       const A6& a6, scoped_refptr<Thread>* holder) {
188
    return StartThread(category, name, std::bind(f, a1, a2, a3, a4, a5, a6), holder);
189
  }
190
191
  template <class F>
192
  static Result<ThreadPtr> Make(
193
      const std::string& category, const std::string& name, const F& f) {
194
    ThreadPtr result;
195
    RETURN_NOT_OK(StartThread(category, name, f, &result));
196
    return result;
197
  }
198
199
  template <class... Args>
200
  static Result<ThreadPtr> Make(
201
160k
      const std::string& category, const std::string& name, Args&&... args) {
202
160k
    ThreadPtr result;
203
160k
    RETURN_NOT_OK(StartThread(category, name, std::bind(std::forward<Args>(args)...), &result));
204
160k
    return result;
205
160k
  }
yb::Result<scoped_refptr<yb::Thread> > yb::Thread::Make<std::__1::__bind<void (yb::rpc::IoThreadPool::Impl::*)(), yb::rpc::IoThreadPool::Impl*> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::__bind<void (yb::rpc::IoThreadPool::Impl::*)(), yb::rpc::IoThreadPool::Impl*>&&)
Line
Count
Source
201
160k
      const std::string& category, const std::string& name, Args&&... args) {
202
160k
    ThreadPtr result;
203
160k
    RETURN_NOT_OK(StartThread(category, name, std::bind(std::forward<Args>(args)...), &result));
204
160k
    return result;
205
160k
  }
priority_thread_pool.cc:yb::Result<scoped_refptr<yb::Thread> > yb::Thread::Make<std::__1::__bind<void (yb::(anonymous namespace)::PriorityThreadPoolWorker::*)(), yb::(anonymous namespace)::PriorityThreadPoolWorker*&> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::__bind<void (yb::(anonymous namespace)::PriorityThreadPoolWorker::*)(), yb::(anonymous namespace)::PriorityThreadPoolWorker*&>&&)
Line
Count
Source
201
172
      const std::string& category, const std::string& name, Args&&... args) {
202
172
    ThreadPtr result;
203
172
    RETURN_NOT_OK(StartThread(category, name, std::bind(std::forward<Args>(args)...), &result));
204
172
    return result;
205
172
  }
206
207
  // Emulates std::thread and detaches.
208
  ~Thread();
209
210
  // Blocks until this thread finishes execution. Once this method returns, the thread
211
  // will be unregistered with the ThreadMgr and will not appear in the debug UI.
212
  void Join();
213
214
  // Call the given Closure on the thread before it exits. The closures are executed
215
  // in the order they are added.
216
  //
217
  // NOTE: This must only be called on the currently executing thread, to avoid having
218
  // to reason about complicated races (eg registering a callback on an already-dead
219
  // thread).
220
  //
221
  // This callback is guaranteed to be called except in the case of a process crash.
222
  void CallAtExit(const Closure& cb);
223
224
  // The thread ID assigned to this thread by the operating system. If the OS does not
225
  // support retrieving the tid, returns Thread::INVALID_TID.
226
4.01M
  int64_t tid() const { return tid_; }
227
228
  // Returns the thread's pthread ID.
229
4
  pthread_t pthread_id() const { return thread_; }
230
231
4
  ThreadIdForStack tid_for_stack() {
232
#if defined(__linux__)
233
    return tid();
234
#else
235
4
    return pthread_id();
236
4
#endif
237
4
  }
238
239
1.93M
  const std::string& name() const { return name_; }
240
11.5M
  const std::string& category() const { return category_; }
241
242
  // Return a string representation of the thread identifying information.
243
  std::string ToString() const;
244
245
  // The current thread of execution, or NULL if the current thread isn't a yb::Thread.
246
  // This call is signal-safe.
247
5.85G
  static Thread* current_thread() { return tls_; }
248
249
  // Returns a unique, stable identifier for this thread. Note that this is a static
250
  // method and thus can be used on any thread, including the main thread of the
251
  // process.
252
  //
253
  // In general, this should be used when a value is required that is unique to
254
  // a thread and must work on any thread including the main process thread.
255
  //
256
  // NOTE: this is _not_ the TID, but rather a unique value assigned by the
257
  // thread implementation. So, this value should not be presented to the user
258
  // in log messages, etc.
259
84.7M
  static int64_t UniqueThreadId() {
260
#if defined(__linux__)
261
    // This cast is a little bit ugly, but it is significantly faster than
262
    // calling syscall(SYS_gettid). In particular, this speeds up some code
263
    // paths in the tracing implementation.
264
    return static_cast<int64_t>(pthread_self());
265
#elif defined(__APPLE__)
266
84.7M
    uint64_t tid;
267
84.7M
    CHECK_EQ(0, pthread_threadid_np(NULL, &tid));
268
84.7M
    return tid;
269
#else
270
#error Unsupported platform
271
#endif
272
84.7M
  }
273
274
  // Returns the system thread ID (tid on Linux) for the current thread. Note
275
  // that this is a static method and thus can be used from any thread,
276
  // including the main thread of the process. This is in contrast to
277
  // Thread::tid(), which only works on yb::Threads.
278
  //
279
  // Thread::tid() will return the same value, but the value is cached in the
280
  // Thread object, so will be faster to call.
281
  //
282
  // Thread::UniqueThreadId() (or Thread::tid()) should be preferred for
283
  // performance sensistive code, however it is only guaranteed to return a
284
  // unique and stable thread ID, not necessarily the system thread ID.
285
82.4M
  static int64_t CurrentThreadId() {
286
#if defined(__linux__)
287
    return syscall(SYS_gettid);
288
#else
289
82.4M
    return UniqueThreadId();
290
82.4M
#endif
291
82.4M
  }
292
293
371M
  static ThreadIdForStack CurrentThreadIdForStack() {
294
#if defined(__linux__)
295
    return CurrentThreadId();
296
#else
297
371M
    return pthread_self();
298
371M
#endif
299
371M
  }
300
301
9.31M
  void* user_data() {
302
9.31M
    return user_data_;
303
9.31M
  }
304
305
239k
  void SetUserData(void* value) {
306
239k
    user_data_ = value;
307
239k
  }
308
309
 private:
310
  friend class ThreadJoiner;
311
312
  // The various special values for tid_ that describe the various steps
313
  // in the parent<-->child handshake.
314
  enum {
315
    INVALID_TID = -1,
316
    CHILD_WAITING_TID = -2,
317
    PARENT_WAITING_TID = -3,
318
  };
319
320
  // Function object that wraps the user-supplied function to run in a separate thread.
321
  typedef std::function<void()> ThreadFunctor;
322
323
  Thread(std::string category, std::string name, ThreadFunctor functor)
324
      : thread_(0),
325
        category_(std::move(category)),
326
        name_(std::move(name)),
327
        tid_(CHILD_WAITING_TID),
328
        functor_(std::move(functor)),
329
        done_(1),
330
1.88M
        joinable_(false) {}
331
332
  // Library-specific thread ID.
333
  pthread_t thread_;
334
335
  // Name and category for this thread.
336
  const std::string category_;
337
  const std::string name_;
338
339
  // OS-specific thread ID. Once the constructor finishes StartThread(),
340
  // guaranteed to be set either to a non-negative integer, or to INVALID_TID.
341
  int64_t tid_;
342
343
  // User function to be executed by this thread.
344
  const ThreadFunctor functor_;
345
346
  // Joiners wait on this latch to be notified if the thread is done.
347
  //
348
  // Note that Joiners must additionally pthread_join(), otherwise certain
349
  // resources that callers expect to be destroyed (like TLS) may still be
350
  // alive when a Joiner finishes.
351
  CountDownLatch done_;
352
353
  bool joinable_;
354
355
  // Thread local pointer to the current thread of execution. Will be NULL if the current
356
  // thread is not a Thread.
357
  static __thread Thread* tls_;
358
359
  std::vector<Closure> exit_callbacks_;
360
361
  // Some generic user data. For instance could be used to identify thread pool, which started
362
  // this thread.
363
  void* user_data_ = nullptr;
364
365
  // Starts the thread running SuperviseThread(), and returns once that thread has
366
  // initialised and its TID has been read. Waits for notification from the started
367
  // thread that initialisation is complete before returning. On success, stores a
368
  // reference to the thread in holder.
369
  static CHECKED_STATUS StartThread(
370
      const std::string& category, const std::string& name,
371
      ThreadFunctor functor, ThreadPtr* holder);
372
373
  // Wrapper for the user-supplied function. Invoked from the new thread,
374
  // with the Thread as its only argument. Executes functor_, but before
375
  // doing so registers with the global ThreadMgr and reads the thread's
376
  // system ID. After functor_ terminates, unregisters with the ThreadMgr.
377
  // Always returns NULL.
378
  //
379
  // SuperviseThread() notifies StartThread() when thread initialisation is
380
  // completed via the tid_, which is set to the new thread's system ID.
381
  // By that point in time SuperviseThread() has also taken a reference to
382
  // the Thread object, allowing it to safely refer to it even after the
383
  // caller drops its reference.
384
  //
385
  // Additionally, StartThread() notifies SuperviseThread() when the actual
386
  // Thread object has been assigned (SuperviseThread() is spinning during
387
  // this time). Without this, the new thread may reference the actual
388
  // Thread object before it has been assigned by StartThread(). See
389
  // KUDU-11 for more details.
390
  static void* SuperviseThread(void* arg);
391
392
  // Invoked when the user-supplied function finishes or in the case of an
393
  // abrupt exit (i.e. pthread_exit()). Cleans up after SuperviseThread().
394
  static void FinishThread(void* arg);
395
};
396
397
typedef scoped_refptr<Thread> ThreadPtr;
398
399
// Registers /threadz with the debug webserver, and creates thread-tracking metrics under
400
// the given entity.
401
Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
402
                                  WebCallbackRegistry* web);
403
404
// This initializes the thread manager and warms up libunwind's state (see ENG-1402).
405
void InitThreading();
406
407
void SetThreadName(const std::string& name);
408
409
class CDSAttacher {
410
 public:
411
  CDSAttacher();
412
  ~CDSAttacher();
413
};
414
415
} // namespace yb
416
417
#endif /* YB_UTIL_THREAD_H */