YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/thread.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/util/thread.h"
34
35
#include <sys/resource.h>
36
#include <sys/syscall.h>
37
#include <sys/time.h>
38
#include <sys/types.h>
39
40
#if defined(__linux__)
41
#include <sys/prctl.h>
42
#endif // defined(__linux__)
43
44
#include <algorithm>
45
#include <array>
46
#include <functional>
47
#include <map>
48
#include <memory>
49
#include <set>
50
#include <vector>
51
52
#include <cds/init.h>
53
#include <cds/gc/dhp.h>
54
55
#include "yb/gutil/atomicops.h"
56
#include "yb/gutil/bind.h"
57
#include "yb/gutil/dynamic_annotations.h"
58
#include "yb/gutil/once.h"
59
#include "yb/gutil/strings/substitute.h"
60
#include "yb/util/debug-util.h"
61
#include "yb/util/errno.h"
62
#include "yb/util/format.h"
63
#include "yb/util/logging.h"
64
#include "yb/util/metrics.h"
65
#include "yb/util/mutex.h"
66
#include "yb/util/os-util.h"
67
#include "yb/util/status_format.h"
68
#include "yb/util/status_log.h"
69
#include "yb/util/stopwatch.h"
70
#include "yb/util/url-coding.h"
71
#include "yb/util/web_callback_registry.h"
72
73
METRIC_DEFINE_gauge_uint64(server, threads_started,
74
                           "Threads Started",
75
                           yb::MetricUnit::kThreads,
76
                           "Total number of threads started on this server",
77
                           yb::EXPOSE_AS_COUNTER);
78
79
METRIC_DEFINE_gauge_uint64(server, threads_running,
80
                           "Threads Running",
81
                           yb::MetricUnit::kThreads,
82
                           "Current number of running threads");
83
84
METRIC_DEFINE_gauge_uint64(server, cpu_utime,
85
                           "User CPU Time",
86
                           yb::MetricUnit::kMilliseconds,
87
                           "Total user CPU time of the process",
88
                           yb::EXPOSE_AS_COUNTER);
89
90
METRIC_DEFINE_gauge_uint64(server, cpu_stime,
91
                           "System CPU Time",
92
                           yb::MetricUnit::kMilliseconds,
93
                           "Total system CPU time of the process",
94
                           yb::EXPOSE_AS_COUNTER);
95
96
METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches,
97
                           "Voluntary Context Switches",
98
                           yb::MetricUnit::kContextSwitches,
99
                           "Total voluntary context switches",
100
                           yb::EXPOSE_AS_COUNTER);
101
102
METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches,
103
                           "Involuntary Context Switches",
104
                           yb::MetricUnit::kContextSwitches,
105
                           "Total involuntary context switches",
106
                           yb::EXPOSE_AS_COUNTER);
107
108
namespace yb {
109
110
using std::endl;
111
using std::map;
112
using std::shared_ptr;
113
using std::stringstream;
114
using strings::Substitute;
115
116
using namespace std::placeholders;
117
118
// See comment below in SetThreadName.
119
constexpr int kMaxThreadNameInPerf = 15;
120
const char Thread::kPaddingChar = 'x';
121
122
namespace {
123
124
15.1k
uint64_t GetCpuUTime() {
125
15.1k
  rusage ru;
126
15.1k
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
127
15.1k
  return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL;
128
15.1k
}
129
130
15.1k
uint64_t GetCpuSTime() {
131
15.1k
  rusage ru;
132
15.1k
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
133
15.1k
  return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL;
134
15.1k
}
135
136
15.1k
uint64_t GetVoluntaryContextSwitches() {
137
15.1k
  rusage ru;
138
15.1k
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
139
0
  return ru.ru_nvcsw;;
140
0
}
141
142
15.1k
uint64_t GetInVoluntaryContextSwitches() {
143
15.1k
  rusage ru;
144
15.1k
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
145
15.1k
  return ru.ru_nivcsw;
146
15.1k
}
147
148
class ThreadCategoryTracker {
149
 public:
150
  ThreadCategoryTracker(const string& name, const scoped_refptr<MetricEntity> &metrics) :
151
35.0k
      name_(name), metrics_(metrics) {}
152
153
  void IncrementCategory(const string& category);
154
  void DecrementCategory(const string& category);
155
156
  scoped_refptr<AtomicGauge<uint64>> FindOrCreateGauge(const string& category);
157
158
 private:
159
  string name_;
160
  scoped_refptr<MetricEntity> metrics_;
161
  map<string, scoped_refptr<AtomicGauge<uint64>>> gauges_;
162
};
163
164
1.76M
void ThreadCategoryTracker::IncrementCategory(const string& category) {
165
1.76M
  auto gauge = FindOrCreateGauge(category);
166
1.76M
  gauge->Increment();
167
1.76M
}
168
169
244k
void ThreadCategoryTracker::DecrementCategory(const string& category) {
170
244k
  auto gauge = FindOrCreateGauge(category);
171
244k
  gauge->Decrement();
172
244k
}
173
174
scoped_refptr<AtomicGauge<uint64>> ThreadCategoryTracker::FindOrCreateGauge(
175
2.01M
    const string& category) {
176
2.01M
  if (gauges_.find(category) == gauges_.end()) {
177
324k
    string id = name_ + "_" + category;
178
324k
    EscapeMetricNameForPrometheus(&id);
179
324k
    const string description = id + " metric in ThreadCategoryTracker";
180
324k
    std::unique_ptr<GaugePrototype<uint64>> gauge = std::make_unique<OwningGaugePrototype<uint64>>(
181
324k
        "server", id, description, yb::MetricUnit::kThreads, description,
182
324k
        yb::MetricLevel::kInfo, yb::EXPOSE_AS_COUNTER);
183
324k
    gauges_[category] =
184
324k
        metrics_->FindOrCreateGauge(std::move(gauge), static_cast<uint64>(0) /* initial_value */);
185
324k
  }
186
2.01M
  return gauges_[category];
187
2.01M
}
188
189
// A singleton class that tracks all live threads, and groups them together for easy
190
// auditing. Used only by Thread.
191
class ThreadMgr {
192
 public:
193
  ThreadMgr()
194
      : metrics_enabled_(false),
195
        threads_started_metric_(0),
196
14.8k
        threads_running_metric_(0) {
197
14.8k
    cds::Initialize();
198
14.8k
    cds::gc::dhp::GarbageCollector::construct();
199
14.8k
    cds::threading::Manager::attachThread();
200
14.8k
  }
201
202
4.03k
  ~ThreadMgr() {
203
4.03k
    cds::Terminate();
204
4.03k
    MutexLock l(lock_);
205
4.03k
    thread_categories_.clear();
206
4.03k
  }
207
208
  static void SetThreadName(const std::string& name, int64 tid);
209
210
  Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web);
211
212
  // Registers a thread to the supplied category. The key is a pthread_t,
213
  // not the system TID, since pthread_t is less prone to being recycled.
214
  void AddThread(const pthread_t& pthread_id, const string& name, const string& category,
215
      int64_t tid);
216
217
  // Removes a thread from the supplied category. If the thread has
218
  // already been removed, this is a no-op.
219
  void RemoveThread(const pthread_t& pthread_id, const string& category);
220
221
 private:
222
  // Container class for any details we want to capture about a thread
223
  // TODO: Add start-time.
224
  // TODO: Track fragment ID.
225
  class ThreadDescriptor {
226
   public:
227
923k
    ThreadDescriptor() { }
228
    ThreadDescriptor(string category, string name, int64_t thread_id)
229
        : name_(std::move(name)),
230
          category_(std::move(category)),
231
923k
          thread_id_(thread_id) {}
232
233
0
    const string& name() const { return name_; }
234
0
    const string& category() const { return category_; }
235
0
    int64_t thread_id() const { return thread_id_; }
236
237
   private:
238
    string name_;
239
    string category_;
240
    int64_t thread_id_;
241
  };
242
243
  // A ThreadCategory is a set of threads that are logically related.
244
  // TODO: unordered_map is incompatible with pthread_t, but would be more
245
  // efficient here.
246
  typedef map<const pthread_t, ThreadDescriptor> ThreadCategory;
247
248
  // All thread categorys, keyed on the category name.
249
  typedef map<string, ThreadCategory> ThreadCategoryMap;
250
251
  // Protects thread_categories_ and metrics_enabled_
252
  Mutex lock_;
253
254
  // All thread categorys that ever contained a thread, even if empty
255
  ThreadCategoryMap thread_categories_;
256
257
  // True after StartInstrumentation(..) returns
258
  bool metrics_enabled_;
259
260
  // Counters to track all-time total number of threads, and the
261
  // current number of running threads.
262
  uint64_t threads_started_metric_;
263
  uint64_t threads_running_metric_;
264
265
  // Tracker to track the number of started threads and the number of running threads for each
266
  // category.
267
  std::unique_ptr<ThreadCategoryTracker> started_category_tracker_;
268
  std::unique_ptr<ThreadCategoryTracker> running_category_tracker_;
269
270
  // Metric callbacks.
271
  uint64_t ReadThreadsStarted();
272
  uint64_t ReadThreadsRunning();
273
274
  // Webpage callback; prints all threads by category
275
  void ThreadPathHandler(const WebCallbackRegistry::WebRequest& args,
276
                                WebCallbackRegistry::WebResponse* resp);
277
  void RenderThreadCategoryRows(const ThreadCategory& category, std::string* output);
278
};
279
280
923k
void ThreadMgr::SetThreadName(const string& name, int64 tid) {
281
  // On linux we can get the thread names to show up in the debugger by setting
282
  // the process name for the LWP.  We don't want to do this for the main
283
  // thread because that would rename the process, causing tools like killall
284
  // to stop working.
285
923k
  if (tid == getpid()) {
286
0
    return;
287
0
  }
288
289
923k
  yb::SetThreadName(name);
290
923k
}
291
292
Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
293
17.5k
                                       WebCallbackRegistry* web) {
294
17.5k
  MutexLock l(lock_);
295
17.5k
  metrics_enabled_ = true;
296
17.5k
  started_category_tracker_ = std::make_unique<ThreadCategoryTracker>("threads_started", metrics);
297
17.5k
  running_category_tracker_ = std::make_unique<ThreadCategoryTracker>("threads_running", metrics);
298
299
  // Use function gauges here so that we can register a unique copy of these metrics in
300
  // multiple tservers, even though the ThreadMgr is itself a singleton.
301
17.5k
  metrics->NeverRetire(
302
17.5k
      METRIC_threads_started.InstantiateFunctionGauge(metrics,
303
17.5k
        Bind(&ThreadMgr::ReadThreadsStarted, Unretained(this))));
304
17.5k
  metrics->NeverRetire(
305
17.5k
      METRIC_threads_running.InstantiateFunctionGauge(metrics,
306
17.5k
        Bind(&ThreadMgr::ReadThreadsRunning, Unretained(this))));
307
17.5k
  metrics->NeverRetire(
308
17.5k
      METRIC_cpu_utime.InstantiateFunctionGauge(metrics,
309
17.5k
        Bind(&GetCpuUTime)));
310
17.5k
  metrics->NeverRetire(
311
17.5k
      METRIC_cpu_stime.InstantiateFunctionGauge(metrics,
312
17.5k
        Bind(&GetCpuSTime)));
313
17.5k
  metrics->NeverRetire(
314
17.5k
      METRIC_voluntary_context_switches.InstantiateFunctionGauge(metrics,
315
17.5k
        Bind(&GetVoluntaryContextSwitches)));
316
17.5k
  metrics->NeverRetire(
317
17.5k
      METRIC_involuntary_context_switches.InstantiateFunctionGauge(metrics,
318
17.5k
        Bind(&GetInVoluntaryContextSwitches)));
319
320
17.5k
  WebCallbackRegistry::PathHandlerCallback thread_callback =
321
17.5k
      std::bind(&ThreadMgr::ThreadPathHandler, this, _1, _2);
322
17.5k
  DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback, true, false);
323
17.5k
  return Status::OK();
324
17.5k
}
325
326
15.1k
uint64_t ThreadMgr::ReadThreadsStarted() {
327
15.1k
  MutexLock l(lock_);
328
15.1k
  return threads_started_metric_;
329
15.1k
}
330
331
15.1k
uint64_t ThreadMgr::ReadThreadsRunning() {
332
15.1k
  MutexLock l(lock_);
333
15.1k
  return threads_running_metric_;
334
15.1k
}
335
336
void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
337
923k
    const string& category, int64_t tid) {
338
  // These annotations cause TSAN to ignore the synchronization on lock_
339
  // without causing the subsequent mutations to be treated as data races
340
  // in and of themselves (that's what IGNORE_READS_AND_WRITES does).
341
  //
342
  // Why do we need them here and in SuperviseThread()? TSAN operates by
343
  // observing synchronization events and using them to establish "happens
344
  // before" relationships between threads. Where these relationships are
345
  // not built, shared state access constitutes a data race. The
346
  // synchronization events here, in RemoveThread(), and in
347
  // SuperviseThread() may cause TSAN to establish a "happens before"
348
  // relationship between thread functors, ignoring potential data races.
349
  // The annotations prevent this from happening.
350
923k
  ANNOTATE_IGNORE_SYNC_BEGIN();
351
923k
  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
352
923k
  {
353
923k
    MutexLock l(lock_);
354
923k
    thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid);
355
923k
    if (metrics_enabled_) {
356
883k
      threads_running_metric_++;
357
883k
      threads_started_metric_++;
358
883k
      started_category_tracker_->IncrementCategory(category);
359
883k
      running_category_tracker_->IncrementCategory(category);
360
883k
    }
361
923k
  }
362
923k
  ANNOTATE_IGNORE_SYNC_END();
363
923k
  ANNOTATE_IGNORE_READS_AND_WRITES_END();
364
923k
}
365
366
278k
void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) {
367
278k
  ANNOTATE_IGNORE_SYNC_BEGIN();
368
278k
  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
369
278k
  {
370
278k
    MutexLock l(lock_);
371
278k
    auto category_it = thread_categories_.find(category);
372
278k
    DCHECK(category_it != thread_categories_.end());
373
278k
    category_it->second.erase(pthread_id);
374
278k
    if (metrics_enabled_) {
375
244k
      threads_running_metric_--;
376
244k
      running_category_tracker_->DecrementCategory(category);
377
244k
    }
378
278k
  }
379
278k
  ANNOTATE_IGNORE_SYNC_END();
380
278k
  ANNOTATE_IGNORE_READS_AND_WRITES_END();
381
278k
}
382
383
0
int Compare(const Result<StackTrace>& lhs, const Result<StackTrace>& rhs) {
384
0
  if (lhs.ok()) {
385
0
    if (!rhs.ok()) {
386
0
      return -1;
387
0
    }
388
0
    return lhs->compare(*rhs);
389
0
  }
390
0
  if (rhs.ok()) {
391
0
    return 1;
392
0
  }
393
0
  return lhs.status().message().compare(rhs.status().message());
394
395
0
}
396
397
0
void ThreadMgr::RenderThreadCategoryRows(const ThreadCategory& category, std::string* output) {
398
0
  struct ThreadData {
399
0
    int64_t tid = 0;
400
0
    ThreadIdForStack tid_for_stack = 0;
401
0
    const std::string* name = nullptr;
402
0
    ThreadStats stats;
403
0
    Result<StackTrace> stack_trace = StackTrace();
404
0
    int rowspan = -1;
405
0
  };
406
0
  std::vector<ThreadData> threads;
407
0
  std::vector<ThreadIdForStack> thread_ids;
408
0
  threads.resize(category.size());
409
0
  thread_ids.reserve(category.size());
410
0
  {
411
0
    auto* data = threads.data();
412
0
    for (const ThreadCategory::value_type& thread : category) {
413
0
      data->name = &thread.second.name();
414
0
      data->tid = thread.second.thread_id();
415
#if defined(__linux__)
416
      data->tid_for_stack = data->tid;
417
#else
418
0
      data->tid_for_stack = thread.first;
419
0
#endif
420
0
      Status status = GetThreadStats(data->tid, &data->stats);
421
0
      if (!status.ok()) {
422
0
        YB_LOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
423
0
                                  << status.ToString();
424
0
      }
425
0
      thread_ids.push_back(data->tid_for_stack);
426
0
      ++data;
427
0
    }
428
0
  }
429
430
0
  if (threads.empty()) {
431
0
    return;
432
0
  }
433
434
0
  std::sort(thread_ids.begin(), thread_ids.end());
435
0
  auto stacks = ThreadStacks(thread_ids);
436
437
0
  for (ThreadData& data : threads) {
438
0
    auto it = std::lower_bound(thread_ids.begin(), thread_ids.end(), data.tid_for_stack);
439
0
    DCHECK(it != thread_ids.end() && *it == data.tid_for_stack);
440
0
    data.stack_trace = stacks[it - thread_ids.begin()];
441
0
  }
442
443
0
  std::sort(threads.begin(), threads.end(), [](const ThreadData& lhs, const ThreadData& rhs) {
444
0
    return Compare(lhs.stack_trace, rhs.stack_trace) < 0;
445
0
  });
446
447
0
  auto it = threads.begin();
448
0
  auto first = it;
449
0
  first->rowspan = 1;
450
0
  while (++it != threads.end()) {
451
0
    if (Compare(it->stack_trace, first->stack_trace) != 0) {
452
0
      first = it;
453
0
      first->rowspan = 1;
454
0
    } else {
455
0
      ++first->rowspan;
456
0
    }
457
0
  }
458
459
0
  std::string* active_out = output;
460
0
  for (const auto& thread : threads) {
461
0
    std::string symbolized;
462
0
    if (thread.rowspan > 0) {
463
0
      StackTraceGroup group = StackTraceGroup::kActive;
464
0
      if (thread.stack_trace.ok()) {
465
0
        symbolized = thread.stack_trace->Symbolize(StackTraceLineFormat::DEFAULT, &group);
466
0
      } else {
467
0
        symbolized = thread.stack_trace.status().message().ToBuffer();
468
0
      }
469
0
      active_out = output + to_underlying(group);
470
0
    }
471
472
0
    *active_out += Format(
473
0
         "<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td>",
474
0
         *thread.name, MonoDelta::FromNanoseconds(thread.stats.user_ns),
475
0
         MonoDelta::FromNanoseconds(thread.stats.kernel_ns / 1e9),
476
0
         MonoDelta::FromNanoseconds(thread.stats.iowait_ns / 1e9));
477
0
    if (thread.rowspan > 0) {
478
0
      *active_out += Format("<td rowspan=\"$0\"><pre>$1\nTotal number of threads: $0</pre></td>",
479
0
                            thread.rowspan, symbolized);
480
0
    }
481
0
    *active_out += "</tr>\n";
482
0
  }
483
0
}
484
485
void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
486
0
    WebCallbackRegistry::WebResponse* resp) {
487
0
  std::stringstream *output = &resp->output;
488
0
  MutexLock l(lock_);
489
0
  vector<const ThreadCategory*> categories_to_print;
490
0
  auto category_name = req.parsed_args.find("group");
491
0
  if (category_name != req.parsed_args.end()) {
492
0
    string group = EscapeForHtmlToString(category_name->second);
493
0
    (*output) << "<h2>Thread Group: " << group << "</h2>" << endl;
494
0
    if (group != "all") {
495
0
      ThreadCategoryMap::const_iterator category = thread_categories_.find(group);
496
0
      if (category == thread_categories_.end()) {
497
0
        (*output) << "Thread group '" << group << "' not found" << endl;
498
0
        return;
499
0
      }
500
0
      categories_to_print.push_back(&category->second);
501
0
      (*output) << "<h3>" << category->first << " : " << category->second.size()
502
0
                << "</h3>";
503
0
    } else {
504
0
      for (const ThreadCategoryMap::value_type& category : thread_categories_) {
505
0
        categories_to_print.push_back(&category.second);
506
0
      }
507
0
      (*output) << "<h3>All Threads : </h3>";
508
0
    }
509
510
0
    (*output) << "<table class='table table-hover table-border'>";
511
0
    (*output) << "<tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
512
0
              << "<th>Cumulative Kernel CPU(s)</th>"
513
0
              << "<th>Cumulative IO-wait(s)</th></tr>";
514
515
0
    std::array<std::string, kStackTraceGroupMapSize> groups;
516
517
0
    for (const ThreadCategory* category : categories_to_print) {
518
0
      RenderThreadCategoryRows(*category, groups.data());
519
0
    }
520
521
0
    for (auto g : kStackTraceGroupList) {
522
0
      *output << groups[to_underlying(g)];
523
0
    }
524
0
    (*output) << "</table>";
525
0
  } else {
526
0
    (*output) << "<h2>Thread Groups</h2>";
527
0
    if (metrics_enabled_) {
528
0
      (*output) << "<h4>" << threads_running_metric_ << " thread(s) running";
529
0
    }
530
0
    (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>";
531
532
0
    for (const ThreadCategoryMap::value_type& category : thread_categories_) {
533
0
      string category_arg;
534
0
      UrlEncode(category.first, &category_arg);
535
0
      (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>"
536
0
                << category.first << " : " << category.second.size() << "</h3></a>";
537
0
    }
538
0
  }
539
0
}
540
541
// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
542
// The Thread class adds a reference to thread_manager while it is supervising a thread so
543
// that a race between the end of the process's main thread (and therefore the destruction
544
// of thread_manager) and the end of a thread that tries to remove itself from the
545
// manager after the destruction can be avoided.
546
shared_ptr<ThreadMgr> thread_manager;
547
548
// Controls the single (lazy) initialization of thread_manager.
549
std::once_flag init_threading_internal_once_flag;
550
551
14.8k
void InitThreadingInternal() {
552
  // Warm up the stack trace library. This avoids a race in libunwind initialization
553
  // by making sure we initialize it before we start any other threads.
554
14.8k
  GetStackTraceHex();
555
14.8k
  thread_manager = std::make_shared<ThreadMgr>();
556
14.8k
}
557
558
} // anonymous namespace
559
560
949k
void SetThreadName(const std::string& name) {
561
#if defined(__linux__)
562
  // http://0pointer.de/blog/projects/name-your-threads.html
563
  // Set the name for the LWP (which gets truncated to 15 characters).
564
  // Note that glibc also has a 'pthread_setname_np' api, but it may not be
565
  // available everywhere and it's only benefit over using prctl directly is
566
  // that it can set the name of threads other than the current thread.
567
  int err = prctl(PR_SET_NAME, name.c_str());
568
#else
569
949k
  int err = pthread_setname_np(name.c_str());
570
949k
#endif // defined(__linux__)
571
  // We expect EPERM failures in sandboxed processes, just ignore those.
572
949k
  if (err < 0 && errno != EPERM) {
573
0
    PLOG(ERROR) << "SetThreadName";
574
0
  }
575
949k
}
576
577
944k
void InitThreading() {
578
944k
  std::call_once(init_threading_internal_once_flag, InitThreadingInternal);
579
944k
}
580
581
__thread Thread* Thread::tls_ = nullptr;
582
583
Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
584
17.5k
                                  WebCallbackRegistry* web) {
585
17.5k
  InitThreading();
586
17.5k
  return thread_manager->StartInstrumentation(server_metrics, web);
587
17.5k
}
588
589
ThreadJoiner::ThreadJoiner(Thread* thr)
590
54.7k
  : thread_(CHECK_NOTNULL(thr)) {
591
54.7k
}
592
593
0
ThreadJoiner& ThreadJoiner::warn_after(MonoDelta duration) {
594
0
  warn_after_ = duration;
595
0
  return *this;
596
0
}
597
598
16
ThreadJoiner& ThreadJoiner::warn_every(MonoDelta duration) {
599
16
  warn_every_ = duration;
600
16
  return *this;
601
16
}
602
603
13.6k
ThreadJoiner& ThreadJoiner::give_up_after(MonoDelta duration) {
604
13.6k
  give_up_after_ = duration;
605
13.6k
  return *this;
606
13.6k
}
607
608
54.7k
Status ThreadJoiner::Join() {
609
54.7k
  if (Thread::current_thread() &&
610
142
      Thread::current_thread()->tid() == thread_->tid()) {
611
1
    return STATUS(InvalidArgument, "Can't join on own thread", thread_->name_);
612
1
  }
613
614
  // Early exit: double join is a no-op.
615
54.7k
  if (!thread_->joinable_) {
616
14.9k
    return Status::OK();
617
14.9k
  }
618
619
39.7k
  MonoDelta waited = MonoDelta::kZero;
620
39.7k
  bool keep_trying = true;
621
39.7k
  while (keep_trying) {
622
39.7k
    if (waited >= warn_after_) {
623
22
      LOG(WARNING) << Format("Waited for $0 trying to join with $1 (tid $2)",
624
22
                             waited, thread_->name_, thread_->tid_);
625
22
    }
626
627
39.7k
    auto remaining_before_giveup = give_up_after_;
628
39.7k
    if (remaining_before_giveup != MonoDelta::kMax) {
629
13.6k
      remaining_before_giveup -= waited;
630
13.6k
    }
631
632
39.7k
    auto remaining_before_next_warn = warn_every_;
633
39.7k
    if (waited < warn_after_) {
634
39.7k
      remaining_before_next_warn = warn_after_ - waited;
635
39.7k
    }
636
637
39.7k
    if (remaining_before_giveup < remaining_before_next_warn) {
638
0
      keep_trying = false;
639
0
    }
640
641
39.7k
    auto wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);
642
643
39.7k
    if (thread_->done_.WaitFor(wait_for)) {
644
      // Unconditionally join before returning, to guarantee that any TLS
645
      // has been destroyed (pthread_key_create() destructors only run
646
      // after a pthread's user method has returned).
647
39.7k
      int ret = pthread_join(thread_->thread_, NULL);
648
39.7k
      CHECK_EQ(ret, 0);
649
39.7k
      thread_->joinable_ = false;
650
39.7k
      return Status::OK();
651
39.7k
    }
652
18.4E
    waited += wait_for;
653
18.4E
  }
654
655
39.7k
#ifndef NDEBUG
656
18.4E
  LOG(WARNING) << "Failed to join:\n" << DumpThreadStack(thread_->tid_for_stack());
657
18.4E
#endif
658
659
18.4E
  return STATUS_FORMAT(Aborted, "Timed out after $0 joining on $1", waited, thread_->name_);
660
39.7k
}
661
662
280k
Thread::~Thread() {
663
280k
  if (joinable_) {
664
241k
    int ret = pthread_detach(thread_);
665
241k
    CHECK_EQ(ret, 0);
666
241k
  }
667
280k
}
668
669
10
void Thread::CallAtExit(const Closure& cb) {
670
10
  CHECK_EQ(Thread::current_thread(), this);
671
10
  exit_callbacks_.push_back(cb);
672
10
}
673
674
2
std::string Thread::ToString() const {
675
2
  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid_, name_, category_);
676
2
}
677
678
Status Thread::StartThread(const std::string& category, const std::string& name,
679
922k
                           ThreadFunctor functor, scoped_refptr<Thread> *holder) {
680
922k
  InitThreading();
681
922k
  string padded_name = name;
682
  // See comment in SetThreadName. We're padding names to the 15 char limit in order to get
683
  // aggregations when using the linux perf tool, as that groups up stacks based on the 15 char
684
  // prefix of all the thread names.
685
922k
  if (name.length() < kMaxThreadNameInPerf) {
686
279k
    padded_name += string(kMaxThreadNameInPerf - name.length(), Thread::kPaddingChar);
687
279k
  }
688
922k
  const string log_prefix = Substitute("$0 ($1) ", padded_name, category);
689
922k
  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");
690
691
  // Temporary reference for the duration of this function.
692
922k
  scoped_refptr<Thread> t(new Thread(category, padded_name, std::move(functor)));
693
694
922k
  {
695
922k
    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread");
696
922k
    int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, t.get());
697
922k
    if (ret) {
698
0
      return STATUS(RuntimeError, "Could not create thread", Errno(ret));
699
0
    }
700
922k
  }
701
702
  // The thread has been created and is now joinable.
703
  //
704
  // Why set this in the parent and not the child? Because only the parent
705
  // (or someone communicating with the parent) can join, so joinable must
706
  // be set before the parent returns.
707
922k
  t->joinable_ = true;
708
709
  // Optional, and only set if the thread was successfully created.
710
923k
  if (holder) {
711
923k
    *holder = t;
712
923k
  }
713
714
  // The tid_ member goes through the following states:
715
  // 1  CHILD_WAITING_TID: the child has just been spawned and is waiting
716
  //    for the parent to finish writing to caller state (i.e. 'holder').
717
  // 2. PARENT_WAITING_TID: the parent has updated caller state and is now
718
  //    waiting for the child to write the tid.
719
  // 3. <value>: both the parent and the child are free to continue. If the
720
  //    value is INVALID_TID, the child could not discover its tid.
721
922k
  Release_Store(&t->tid_, PARENT_WAITING_TID);
722
922k
  {
723
922k
    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
724
922k
                                     "waiting for new thread to publish its TID");
725
922k
    int loop_count = 0;
726
3.87M
    while (Acquire_Load(&t->tid_) == PARENT_WAITING_TID) {
727
2.95M
      boost::detail::yield(loop_count++);
728
2.95M
    }
729
922k
  }
730
731
18.4E
  VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << padded_name;
732
922k
  return Status::OK();
733
922k
}
734
735
923k
void* Thread::SuperviseThread(void* arg) {
736
923k
  Thread* t = static_cast<Thread*>(arg);
737
923k
  int64_t system_tid = Thread::CurrentThreadId();
738
923k
  if (system_tid == -1) {
739
0
    string error_msg = ErrnoToString(errno);
740
0
    YB_LOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
741
0
  }
742
923k
  string name = strings::Substitute("$0-$1", t->name(), system_tid);
743
744
  // Take an additional reference to the thread manager, which we'll need below.
745
923k
  ANNOTATE_IGNORE_SYNC_BEGIN();
746
923k
  shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
747
923k
  ANNOTATE_IGNORE_SYNC_END();
748
749
  // Set up the TLS.
750
  //
751
  // We could store a scoped_refptr in the TLS itself, but as its
752
  // lifecycle is poorly defined, we'll use a bare pointer and take an
753
  // additional reference on t out of band, in thread_ref.
754
923k
  scoped_refptr<Thread> thread_ref = t;
755
923k
  t->tls_ = t;
756
757
  // Wait until the parent has updated all caller-visible state, then write
758
  // the TID to 'tid_', thus completing the parent<-->child handshake.
759
923k
  int loop_count = 0;
760
1.32M
  while (Acquire_Load(&t->tid_) == CHILD_WAITING_TID) {
761
397k
    boost::detail::yield(loop_count++);
762
397k
  }
763
923k
  Release_Store(&t->tid_, system_tid);
764
765
923k
  thread_manager->SetThreadName(name, t->tid());
766
923k
  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid());
767
768
923k
  cds::threading::Manager::attachThread();
769
770
  // FinishThread() is guaranteed to run (even if functor_ throws an
771
  // exception) because pthread_cleanup_push() creates a scoped object
772
  // whose destructor invokes the provided callback.
773
923k
  pthread_cleanup_push(&Thread::FinishThread, t);
774
923k
  t->functor_();
775
923k
  pthread_cleanup_pop(true);
776
777
923k
  return NULL;
778
923k
}
779
780
39.8k
void Thread::Join() {
781
39.8k
  WARN_NOT_OK(ThreadJoiner(this).Join(), "Thread join failed");
782
39.8k
}
783
784
280k
void Thread::FinishThread(void* arg) {
785
280k
  cds::threading::Manager::detachThread();
786
787
280k
  Thread* t = static_cast<Thread*>(arg);
788
789
10
  for (Closure& c : t->exit_callbacks_) {
790
10
    c.Run();
791
10
  }
792
793
  // We're here either because of the explicit pthread_cleanup_pop() in
794
  // SuperviseThread() or through pthread_exit(). In either case,
795
  // thread_manager is guaranteed to be live because thread_mgr_ref in
796
  // SuperviseThread() is still live.
797
280k
  thread_manager->RemoveThread(pthread_self(), t->category());
798
799
  // Signal any Joiner that we're done.
800
280k
  t->done_.CountDown();
801
802
18.4E
  VLOG(2) << "Ended thread " << t->tid() << " - "
803
18.4E
          << t->category() << ":" << t->name();
804
280k
}
805
806
52.0k
CDSAttacher::CDSAttacher() {
807
52.0k
  cds::threading::Manager::attachThread();
808
52.0k
}
809
810
50.3k
CDSAttacher::~CDSAttacher() {
811
50.3k
  cds::threading::Manager::detachThread();
812
50.3k
}
813
814
} // namespace yb