/Users/deen/code/yugabyte-db/src/yb/util/thread.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/util/thread.h" |
34 | | |
35 | | #include <sys/resource.h> |
36 | | #include <sys/syscall.h> |
37 | | #include <sys/time.h> |
38 | | #include <sys/types.h> |
39 | | |
40 | | #if defined(__linux__) |
41 | | #include <sys/prctl.h> |
42 | | #endif // defined(__linux__) |
43 | | |
44 | | #include <algorithm> |
45 | | #include <array> |
46 | | #include <functional> |
47 | | #include <map> |
48 | | #include <memory> |
49 | | #include <set> |
50 | | #include <vector> |
51 | | |
52 | | #include <cds/init.h> |
53 | | #include <cds/gc/dhp.h> |
54 | | |
55 | | #include "yb/gutil/atomicops.h" |
56 | | #include "yb/gutil/bind.h" |
57 | | #include "yb/gutil/dynamic_annotations.h" |
58 | | #include "yb/gutil/once.h" |
59 | | #include "yb/gutil/strings/substitute.h" |
60 | | #include "yb/util/debug-util.h" |
61 | | #include "yb/util/errno.h" |
62 | | #include "yb/util/format.h" |
63 | | #include "yb/util/logging.h" |
64 | | #include "yb/util/metrics.h" |
65 | | #include "yb/util/mutex.h" |
66 | | #include "yb/util/os-util.h" |
67 | | #include "yb/util/status_format.h" |
68 | | #include "yb/util/status_log.h" |
69 | | #include "yb/util/stopwatch.h" |
70 | | #include "yb/util/url-coding.h" |
71 | | #include "yb/util/web_callback_registry.h" |
72 | | |
73 | | METRIC_DEFINE_gauge_uint64(server, threads_started, |
74 | | "Threads Started", |
75 | | yb::MetricUnit::kThreads, |
76 | | "Total number of threads started on this server", |
77 | | yb::EXPOSE_AS_COUNTER); |
78 | | |
79 | | METRIC_DEFINE_gauge_uint64(server, threads_running, |
80 | | "Threads Running", |
81 | | yb::MetricUnit::kThreads, |
82 | | "Current number of running threads"); |
83 | | |
84 | | METRIC_DEFINE_gauge_uint64(server, cpu_utime, |
85 | | "User CPU Time", |
86 | | yb::MetricUnit::kMilliseconds, |
87 | | "Total user CPU time of the process", |
88 | | yb::EXPOSE_AS_COUNTER); |
89 | | |
90 | | METRIC_DEFINE_gauge_uint64(server, cpu_stime, |
91 | | "System CPU Time", |
92 | | yb::MetricUnit::kMilliseconds, |
93 | | "Total system CPU time of the process", |
94 | | yb::EXPOSE_AS_COUNTER); |
95 | | |
96 | | METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches, |
97 | | "Voluntary Context Switches", |
98 | | yb::MetricUnit::kContextSwitches, |
99 | | "Total voluntary context switches", |
100 | | yb::EXPOSE_AS_COUNTER); |
101 | | |
102 | | METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches, |
103 | | "Involuntary Context Switches", |
104 | | yb::MetricUnit::kContextSwitches, |
105 | | "Total involuntary context switches", |
106 | | yb::EXPOSE_AS_COUNTER); |
107 | | |
108 | | namespace yb { |
109 | | |
110 | | using std::endl; |
111 | | using std::map; |
112 | | using std::shared_ptr; |
113 | | using std::stringstream; |
114 | | using strings::Substitute; |
115 | | |
116 | | using namespace std::placeholders; |
117 | | |
118 | | // See comment below in SetThreadName. |
119 | | constexpr int kMaxThreadNameInPerf = 15; |
120 | | const char Thread::kPaddingChar = 'x'; |
121 | | |
122 | | namespace { |
123 | | |
124 | 15.1k | uint64_t GetCpuUTime() { |
125 | 15.1k | rusage ru; |
126 | 15.1k | CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); |
127 | 15.1k | return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL; |
128 | 15.1k | } |
129 | | |
130 | 15.1k | uint64_t GetCpuSTime() { |
131 | 15.1k | rusage ru; |
132 | 15.1k | CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); |
133 | 15.1k | return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL; |
134 | 15.1k | } |
135 | | |
136 | 15.1k | uint64_t GetVoluntaryContextSwitches() { |
137 | 15.1k | rusage ru; |
138 | 15.1k | CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); |
139 | 0 | return ru.ru_nvcsw;; |
140 | 0 | } |
141 | | |
142 | 15.1k | uint64_t GetInVoluntaryContextSwitches() { |
143 | 15.1k | rusage ru; |
144 | 15.1k | CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); |
145 | 15.1k | return ru.ru_nivcsw; |
146 | 15.1k | } |
147 | | |
148 | | class ThreadCategoryTracker { |
149 | | public: |
150 | | ThreadCategoryTracker(const string& name, const scoped_refptr<MetricEntity> &metrics) : |
151 | 35.0k | name_(name), metrics_(metrics) {} |
152 | | |
153 | | void IncrementCategory(const string& category); |
154 | | void DecrementCategory(const string& category); |
155 | | |
156 | | scoped_refptr<AtomicGauge<uint64>> FindOrCreateGauge(const string& category); |
157 | | |
158 | | private: |
159 | | string name_; |
160 | | scoped_refptr<MetricEntity> metrics_; |
161 | | map<string, scoped_refptr<AtomicGauge<uint64>>> gauges_; |
162 | | }; |
163 | | |
164 | 1.76M | void ThreadCategoryTracker::IncrementCategory(const string& category) { |
165 | 1.76M | auto gauge = FindOrCreateGauge(category); |
166 | 1.76M | gauge->Increment(); |
167 | 1.76M | } |
168 | | |
169 | 244k | void ThreadCategoryTracker::DecrementCategory(const string& category) { |
170 | 244k | auto gauge = FindOrCreateGauge(category); |
171 | 244k | gauge->Decrement(); |
172 | 244k | } |
173 | | |
174 | | scoped_refptr<AtomicGauge<uint64>> ThreadCategoryTracker::FindOrCreateGauge( |
175 | 2.01M | const string& category) { |
176 | 2.01M | if (gauges_.find(category) == gauges_.end()) { |
177 | 324k | string id = name_ + "_" + category; |
178 | 324k | EscapeMetricNameForPrometheus(&id); |
179 | 324k | const string description = id + " metric in ThreadCategoryTracker"; |
180 | 324k | std::unique_ptr<GaugePrototype<uint64>> gauge = std::make_unique<OwningGaugePrototype<uint64>>( |
181 | 324k | "server", id, description, yb::MetricUnit::kThreads, description, |
182 | 324k | yb::MetricLevel::kInfo, yb::EXPOSE_AS_COUNTER); |
183 | 324k | gauges_[category] = |
184 | 324k | metrics_->FindOrCreateGauge(std::move(gauge), static_cast<uint64>(0) /* initial_value */); |
185 | 324k | } |
186 | 2.01M | return gauges_[category]; |
187 | 2.01M | } |
188 | | |
189 | | // A singleton class that tracks all live threads, and groups them together for easy |
190 | | // auditing. Used only by Thread. |
191 | | class ThreadMgr { |
192 | | public: |
193 | | ThreadMgr() |
194 | | : metrics_enabled_(false), |
195 | | threads_started_metric_(0), |
196 | 14.8k | threads_running_metric_(0) { |
197 | 14.8k | cds::Initialize(); |
198 | 14.8k | cds::gc::dhp::GarbageCollector::construct(); |
199 | 14.8k | cds::threading::Manager::attachThread(); |
200 | 14.8k | } |
201 | | |
202 | 4.03k | ~ThreadMgr() { |
203 | 4.03k | cds::Terminate(); |
204 | 4.03k | MutexLock l(lock_); |
205 | 4.03k | thread_categories_.clear(); |
206 | 4.03k | } |
207 | | |
208 | | static void SetThreadName(const std::string& name, int64 tid); |
209 | | |
210 | | Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web); |
211 | | |
212 | | // Registers a thread to the supplied category. The key is a pthread_t, |
213 | | // not the system TID, since pthread_t is less prone to being recycled. |
214 | | void AddThread(const pthread_t& pthread_id, const string& name, const string& category, |
215 | | int64_t tid); |
216 | | |
217 | | // Removes a thread from the supplied category. If the thread has |
218 | | // already been removed, this is a no-op. |
219 | | void RemoveThread(const pthread_t& pthread_id, const string& category); |
220 | | |
221 | | private: |
222 | | // Container class for any details we want to capture about a thread |
223 | | // TODO: Add start-time. |
224 | | // TODO: Track fragment ID. |
225 | | class ThreadDescriptor { |
226 | | public: |
227 | 923k | ThreadDescriptor() { } |
228 | | ThreadDescriptor(string category, string name, int64_t thread_id) |
229 | | : name_(std::move(name)), |
230 | | category_(std::move(category)), |
231 | 923k | thread_id_(thread_id) {} |
232 | | |
233 | 0 | const string& name() const { return name_; } |
234 | 0 | const string& category() const { return category_; } |
235 | 0 | int64_t thread_id() const { return thread_id_; } |
236 | | |
237 | | private: |
238 | | string name_; |
239 | | string category_; |
240 | | int64_t thread_id_; |
241 | | }; |
242 | | |
243 | | // A ThreadCategory is a set of threads that are logically related. |
244 | | // TODO: unordered_map is incompatible with pthread_t, but would be more |
245 | | // efficient here. |
246 | | typedef map<const pthread_t, ThreadDescriptor> ThreadCategory; |
247 | | |
248 | | // All thread categorys, keyed on the category name. |
249 | | typedef map<string, ThreadCategory> ThreadCategoryMap; |
250 | | |
251 | | // Protects thread_categories_ and metrics_enabled_ |
252 | | Mutex lock_; |
253 | | |
254 | | // All thread categorys that ever contained a thread, even if empty |
255 | | ThreadCategoryMap thread_categories_; |
256 | | |
257 | | // True after StartInstrumentation(..) returns |
258 | | bool metrics_enabled_; |
259 | | |
260 | | // Counters to track all-time total number of threads, and the |
261 | | // current number of running threads. |
262 | | uint64_t threads_started_metric_; |
263 | | uint64_t threads_running_metric_; |
264 | | |
265 | | // Tracker to track the number of started threads and the number of running threads for each |
266 | | // category. |
267 | | std::unique_ptr<ThreadCategoryTracker> started_category_tracker_; |
268 | | std::unique_ptr<ThreadCategoryTracker> running_category_tracker_; |
269 | | |
270 | | // Metric callbacks. |
271 | | uint64_t ReadThreadsStarted(); |
272 | | uint64_t ReadThreadsRunning(); |
273 | | |
274 | | // Webpage callback; prints all threads by category |
275 | | void ThreadPathHandler(const WebCallbackRegistry::WebRequest& args, |
276 | | WebCallbackRegistry::WebResponse* resp); |
277 | | void RenderThreadCategoryRows(const ThreadCategory& category, std::string* output); |
278 | | }; |
279 | | |
280 | 923k | void ThreadMgr::SetThreadName(const string& name, int64 tid) { |
281 | | // On linux we can get the thread names to show up in the debugger by setting |
282 | | // the process name for the LWP. We don't want to do this for the main |
283 | | // thread because that would rename the process, causing tools like killall |
284 | | // to stop working. |
285 | 923k | if (tid == getpid()) { |
286 | 0 | return; |
287 | 0 | } |
288 | | |
289 | 923k | yb::SetThreadName(name); |
290 | 923k | } |
291 | | |
292 | | Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, |
293 | 17.5k | WebCallbackRegistry* web) { |
294 | 17.5k | MutexLock l(lock_); |
295 | 17.5k | metrics_enabled_ = true; |
296 | 17.5k | started_category_tracker_ = std::make_unique<ThreadCategoryTracker>("threads_started", metrics); |
297 | 17.5k | running_category_tracker_ = std::make_unique<ThreadCategoryTracker>("threads_running", metrics); |
298 | | |
299 | | // Use function gauges here so that we can register a unique copy of these metrics in |
300 | | // multiple tservers, even though the ThreadMgr is itself a singleton. |
301 | 17.5k | metrics->NeverRetire( |
302 | 17.5k | METRIC_threads_started.InstantiateFunctionGauge(metrics, |
303 | 17.5k | Bind(&ThreadMgr::ReadThreadsStarted, Unretained(this)))); |
304 | 17.5k | metrics->NeverRetire( |
305 | 17.5k | METRIC_threads_running.InstantiateFunctionGauge(metrics, |
306 | 17.5k | Bind(&ThreadMgr::ReadThreadsRunning, Unretained(this)))); |
307 | 17.5k | metrics->NeverRetire( |
308 | 17.5k | METRIC_cpu_utime.InstantiateFunctionGauge(metrics, |
309 | 17.5k | Bind(&GetCpuUTime))); |
310 | 17.5k | metrics->NeverRetire( |
311 | 17.5k | METRIC_cpu_stime.InstantiateFunctionGauge(metrics, |
312 | 17.5k | Bind(&GetCpuSTime))); |
313 | 17.5k | metrics->NeverRetire( |
314 | 17.5k | METRIC_voluntary_context_switches.InstantiateFunctionGauge(metrics, |
315 | 17.5k | Bind(&GetVoluntaryContextSwitches))); |
316 | 17.5k | metrics->NeverRetire( |
317 | 17.5k | METRIC_involuntary_context_switches.InstantiateFunctionGauge(metrics, |
318 | 17.5k | Bind(&GetInVoluntaryContextSwitches))); |
319 | | |
320 | 17.5k | WebCallbackRegistry::PathHandlerCallback thread_callback = |
321 | 17.5k | std::bind(&ThreadMgr::ThreadPathHandler, this, _1, _2); |
322 | 17.5k | DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback, true, false); |
323 | 17.5k | return Status::OK(); |
324 | 17.5k | } |
325 | | |
326 | 15.1k | uint64_t ThreadMgr::ReadThreadsStarted() { |
327 | 15.1k | MutexLock l(lock_); |
328 | 15.1k | return threads_started_metric_; |
329 | 15.1k | } |
330 | | |
331 | 15.1k | uint64_t ThreadMgr::ReadThreadsRunning() { |
332 | 15.1k | MutexLock l(lock_); |
333 | 15.1k | return threads_running_metric_; |
334 | 15.1k | } |
335 | | |
336 | | void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name, |
337 | 923k | const string& category, int64_t tid) { |
338 | | // These annotations cause TSAN to ignore the synchronization on lock_ |
339 | | // without causing the subsequent mutations to be treated as data races |
340 | | // in and of themselves (that's what IGNORE_READS_AND_WRITES does). |
341 | | // |
342 | | // Why do we need them here and in SuperviseThread()? TSAN operates by |
343 | | // observing synchronization events and using them to establish "happens |
344 | | // before" relationships between threads. Where these relationships are |
345 | | // not built, shared state access constitutes a data race. The |
346 | | // synchronization events here, in RemoveThread(), and in |
347 | | // SuperviseThread() may cause TSAN to establish a "happens before" |
348 | | // relationship between thread functors, ignoring potential data races. |
349 | | // The annotations prevent this from happening. |
350 | 923k | ANNOTATE_IGNORE_SYNC_BEGIN(); |
351 | 923k | ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); |
352 | 923k | { |
353 | 923k | MutexLock l(lock_); |
354 | 923k | thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid); |
355 | 923k | if (metrics_enabled_) { |
356 | 883k | threads_running_metric_++; |
357 | 883k | threads_started_metric_++; |
358 | 883k | started_category_tracker_->IncrementCategory(category); |
359 | 883k | running_category_tracker_->IncrementCategory(category); |
360 | 883k | } |
361 | 923k | } |
362 | 923k | ANNOTATE_IGNORE_SYNC_END(); |
363 | 923k | ANNOTATE_IGNORE_READS_AND_WRITES_END(); |
364 | 923k | } |
365 | | |
366 | 278k | void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) { |
367 | 278k | ANNOTATE_IGNORE_SYNC_BEGIN(); |
368 | 278k | ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); |
369 | 278k | { |
370 | 278k | MutexLock l(lock_); |
371 | 278k | auto category_it = thread_categories_.find(category); |
372 | 278k | DCHECK(category_it != thread_categories_.end()); |
373 | 278k | category_it->second.erase(pthread_id); |
374 | 278k | if (metrics_enabled_) { |
375 | 244k | threads_running_metric_--; |
376 | 244k | running_category_tracker_->DecrementCategory(category); |
377 | 244k | } |
378 | 278k | } |
379 | 278k | ANNOTATE_IGNORE_SYNC_END(); |
380 | 278k | ANNOTATE_IGNORE_READS_AND_WRITES_END(); |
381 | 278k | } |
382 | | |
383 | 0 | int Compare(const Result<StackTrace>& lhs, const Result<StackTrace>& rhs) { |
384 | 0 | if (lhs.ok()) { |
385 | 0 | if (!rhs.ok()) { |
386 | 0 | return -1; |
387 | 0 | } |
388 | 0 | return lhs->compare(*rhs); |
389 | 0 | } |
390 | 0 | if (rhs.ok()) { |
391 | 0 | return 1; |
392 | 0 | } |
393 | 0 | return lhs.status().message().compare(rhs.status().message()); |
394 | |
|
395 | 0 | } |
396 | | |
397 | 0 | void ThreadMgr::RenderThreadCategoryRows(const ThreadCategory& category, std::string* output) { |
398 | 0 | struct ThreadData { |
399 | 0 | int64_t tid = 0; |
400 | 0 | ThreadIdForStack tid_for_stack = 0; |
401 | 0 | const std::string* name = nullptr; |
402 | 0 | ThreadStats stats; |
403 | 0 | Result<StackTrace> stack_trace = StackTrace(); |
404 | 0 | int rowspan = -1; |
405 | 0 | }; |
406 | 0 | std::vector<ThreadData> threads; |
407 | 0 | std::vector<ThreadIdForStack> thread_ids; |
408 | 0 | threads.resize(category.size()); |
409 | 0 | thread_ids.reserve(category.size()); |
410 | 0 | { |
411 | 0 | auto* data = threads.data(); |
412 | 0 | for (const ThreadCategory::value_type& thread : category) { |
413 | 0 | data->name = &thread.second.name(); |
414 | 0 | data->tid = thread.second.thread_id(); |
415 | | #if defined(__linux__) |
416 | | data->tid_for_stack = data->tid; |
417 | | #else |
418 | 0 | data->tid_for_stack = thread.first; |
419 | 0 | #endif |
420 | 0 | Status status = GetThreadStats(data->tid, &data->stats); |
421 | 0 | if (!status.ok()) { |
422 | 0 | YB_LOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: " |
423 | 0 | << status.ToString(); |
424 | 0 | } |
425 | 0 | thread_ids.push_back(data->tid_for_stack); |
426 | 0 | ++data; |
427 | 0 | } |
428 | 0 | } |
429 | |
|
430 | 0 | if (threads.empty()) { |
431 | 0 | return; |
432 | 0 | } |
433 | | |
434 | 0 | std::sort(thread_ids.begin(), thread_ids.end()); |
435 | 0 | auto stacks = ThreadStacks(thread_ids); |
436 | |
|
437 | 0 | for (ThreadData& data : threads) { |
438 | 0 | auto it = std::lower_bound(thread_ids.begin(), thread_ids.end(), data.tid_for_stack); |
439 | 0 | DCHECK(it != thread_ids.end() && *it == data.tid_for_stack); |
440 | 0 | data.stack_trace = stacks[it - thread_ids.begin()]; |
441 | 0 | } |
442 | |
|
443 | 0 | std::sort(threads.begin(), threads.end(), [](const ThreadData& lhs, const ThreadData& rhs) { |
444 | 0 | return Compare(lhs.stack_trace, rhs.stack_trace) < 0; |
445 | 0 | }); |
446 | |
|
447 | 0 | auto it = threads.begin(); |
448 | 0 | auto first = it; |
449 | 0 | first->rowspan = 1; |
450 | 0 | while (++it != threads.end()) { |
451 | 0 | if (Compare(it->stack_trace, first->stack_trace) != 0) { |
452 | 0 | first = it; |
453 | 0 | first->rowspan = 1; |
454 | 0 | } else { |
455 | 0 | ++first->rowspan; |
456 | 0 | } |
457 | 0 | } |
458 | |
|
459 | 0 | std::string* active_out = output; |
460 | 0 | for (const auto& thread : threads) { |
461 | 0 | std::string symbolized; |
462 | 0 | if (thread.rowspan > 0) { |
463 | 0 | StackTraceGroup group = StackTraceGroup::kActive; |
464 | 0 | if (thread.stack_trace.ok()) { |
465 | 0 | symbolized = thread.stack_trace->Symbolize(StackTraceLineFormat::DEFAULT, &group); |
466 | 0 | } else { |
467 | 0 | symbolized = thread.stack_trace.status().message().ToBuffer(); |
468 | 0 | } |
469 | 0 | active_out = output + to_underlying(group); |
470 | 0 | } |
471 | |
|
472 | 0 | *active_out += Format( |
473 | 0 | "<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td>", |
474 | 0 | *thread.name, MonoDelta::FromNanoseconds(thread.stats.user_ns), |
475 | 0 | MonoDelta::FromNanoseconds(thread.stats.kernel_ns / 1e9), |
476 | 0 | MonoDelta::FromNanoseconds(thread.stats.iowait_ns / 1e9)); |
477 | 0 | if (thread.rowspan > 0) { |
478 | 0 | *active_out += Format("<td rowspan=\"$0\"><pre>$1\nTotal number of threads: $0</pre></td>", |
479 | 0 | thread.rowspan, symbolized); |
480 | 0 | } |
481 | 0 | *active_out += "</tr>\n"; |
482 | 0 | } |
483 | 0 | } |
484 | | |
485 | | void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req, |
486 | 0 | WebCallbackRegistry::WebResponse* resp) { |
487 | 0 | std::stringstream *output = &resp->output; |
488 | 0 | MutexLock l(lock_); |
489 | 0 | vector<const ThreadCategory*> categories_to_print; |
490 | 0 | auto category_name = req.parsed_args.find("group"); |
491 | 0 | if (category_name != req.parsed_args.end()) { |
492 | 0 | string group = EscapeForHtmlToString(category_name->second); |
493 | 0 | (*output) << "<h2>Thread Group: " << group << "</h2>" << endl; |
494 | 0 | if (group != "all") { |
495 | 0 | ThreadCategoryMap::const_iterator category = thread_categories_.find(group); |
496 | 0 | if (category == thread_categories_.end()) { |
497 | 0 | (*output) << "Thread group '" << group << "' not found" << endl; |
498 | 0 | return; |
499 | 0 | } |
500 | 0 | categories_to_print.push_back(&category->second); |
501 | 0 | (*output) << "<h3>" << category->first << " : " << category->second.size() |
502 | 0 | << "</h3>"; |
503 | 0 | } else { |
504 | 0 | for (const ThreadCategoryMap::value_type& category : thread_categories_) { |
505 | 0 | categories_to_print.push_back(&category.second); |
506 | 0 | } |
507 | 0 | (*output) << "<h3>All Threads : </h3>"; |
508 | 0 | } |
509 | |
|
510 | 0 | (*output) << "<table class='table table-hover table-border'>"; |
511 | 0 | (*output) << "<tr><th>Thread name</th><th>Cumulative User CPU(s)</th>" |
512 | 0 | << "<th>Cumulative Kernel CPU(s)</th>" |
513 | 0 | << "<th>Cumulative IO-wait(s)</th></tr>"; |
514 | |
|
515 | 0 | std::array<std::string, kStackTraceGroupMapSize> groups; |
516 | |
|
517 | 0 | for (const ThreadCategory* category : categories_to_print) { |
518 | 0 | RenderThreadCategoryRows(*category, groups.data()); |
519 | 0 | } |
520 | |
|
521 | 0 | for (auto g : kStackTraceGroupList) { |
522 | 0 | *output << groups[to_underlying(g)]; |
523 | 0 | } |
524 | 0 | (*output) << "</table>"; |
525 | 0 | } else { |
526 | 0 | (*output) << "<h2>Thread Groups</h2>"; |
527 | 0 | if (metrics_enabled_) { |
528 | 0 | (*output) << "<h4>" << threads_running_metric_ << " thread(s) running"; |
529 | 0 | } |
530 | 0 | (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>"; |
531 | |
|
532 | 0 | for (const ThreadCategoryMap::value_type& category : thread_categories_) { |
533 | 0 | string category_arg; |
534 | 0 | UrlEncode(category.first, &category_arg); |
535 | 0 | (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>" |
536 | 0 | << category.first << " : " << category.second.size() << "</h3></a>"; |
537 | 0 | } |
538 | 0 | } |
539 | 0 | } |
540 | | |
541 | | // Singleton instance of ThreadMgr. Only visible in this file, used only by Thread. |
542 | | // The Thread class adds a reference to thread_manager while it is supervising a thread so |
543 | | // that a race between the end of the process's main thread (and therefore the destruction |
544 | | // of thread_manager) and the end of a thread that tries to remove itself from the |
545 | | // manager after the destruction can be avoided. |
546 | | shared_ptr<ThreadMgr> thread_manager; |
547 | | |
548 | | // Controls the single (lazy) initialization of thread_manager. |
549 | | std::once_flag init_threading_internal_once_flag; |
550 | | |
551 | 14.8k | void InitThreadingInternal() { |
552 | | // Warm up the stack trace library. This avoids a race in libunwind initialization |
553 | | // by making sure we initialize it before we start any other threads. |
554 | 14.8k | GetStackTraceHex(); |
555 | 14.8k | thread_manager = std::make_shared<ThreadMgr>(); |
556 | 14.8k | } |
557 | | |
558 | | } // anonymous namespace |
559 | | |
560 | 949k | void SetThreadName(const std::string& name) { |
561 | | #if defined(__linux__) |
562 | | // http://0pointer.de/blog/projects/name-your-threads.html |
563 | | // Set the name for the LWP (which gets truncated to 15 characters). |
564 | | // Note that glibc also has a 'pthread_setname_np' api, but it may not be |
565 | | // available everywhere and it's only benefit over using prctl directly is |
566 | | // that it can set the name of threads other than the current thread. |
567 | | int err = prctl(PR_SET_NAME, name.c_str()); |
568 | | #else |
569 | 949k | int err = pthread_setname_np(name.c_str()); |
570 | 949k | #endif // defined(__linux__) |
571 | | // We expect EPERM failures in sandboxed processes, just ignore those. |
572 | 949k | if (err < 0 && errno != EPERM) { |
573 | 0 | PLOG(ERROR) << "SetThreadName"; |
574 | 0 | } |
575 | 949k | } |
576 | | |
577 | 944k | void InitThreading() { |
578 | 944k | std::call_once(init_threading_internal_once_flag, InitThreadingInternal); |
579 | 944k | } |
580 | | |
581 | | __thread Thread* Thread::tls_ = nullptr; |
582 | | |
583 | | Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics, |
584 | 17.5k | WebCallbackRegistry* web) { |
585 | 17.5k | InitThreading(); |
586 | 17.5k | return thread_manager->StartInstrumentation(server_metrics, web); |
587 | 17.5k | } |
588 | | |
589 | | ThreadJoiner::ThreadJoiner(Thread* thr) |
590 | 54.7k | : thread_(CHECK_NOTNULL(thr)) { |
591 | 54.7k | } |
592 | | |
593 | 0 | ThreadJoiner& ThreadJoiner::warn_after(MonoDelta duration) { |
594 | 0 | warn_after_ = duration; |
595 | 0 | return *this; |
596 | 0 | } |
597 | | |
598 | 16 | ThreadJoiner& ThreadJoiner::warn_every(MonoDelta duration) { |
599 | 16 | warn_every_ = duration; |
600 | 16 | return *this; |
601 | 16 | } |
602 | | |
603 | 13.6k | ThreadJoiner& ThreadJoiner::give_up_after(MonoDelta duration) { |
604 | 13.6k | give_up_after_ = duration; |
605 | 13.6k | return *this; |
606 | 13.6k | } |
607 | | |
608 | 54.7k | Status ThreadJoiner::Join() { |
609 | 54.7k | if (Thread::current_thread() && |
610 | 142 | Thread::current_thread()->tid() == thread_->tid()) { |
611 | 1 | return STATUS(InvalidArgument, "Can't join on own thread", thread_->name_); |
612 | 1 | } |
613 | | |
614 | | // Early exit: double join is a no-op. |
615 | 54.7k | if (!thread_->joinable_) { |
616 | 14.9k | return Status::OK(); |
617 | 14.9k | } |
618 | | |
619 | 39.7k | MonoDelta waited = MonoDelta::kZero; |
620 | 39.7k | bool keep_trying = true; |
621 | 39.7k | while (keep_trying) { |
622 | 39.7k | if (waited >= warn_after_) { |
623 | 22 | LOG(WARNING) << Format("Waited for $0 trying to join with $1 (tid $2)", |
624 | 22 | waited, thread_->name_, thread_->tid_); |
625 | 22 | } |
626 | | |
627 | 39.7k | auto remaining_before_giveup = give_up_after_; |
628 | 39.7k | if (remaining_before_giveup != MonoDelta::kMax) { |
629 | 13.6k | remaining_before_giveup -= waited; |
630 | 13.6k | } |
631 | | |
632 | 39.7k | auto remaining_before_next_warn = warn_every_; |
633 | 39.7k | if (waited < warn_after_) { |
634 | 39.7k | remaining_before_next_warn = warn_after_ - waited; |
635 | 39.7k | } |
636 | | |
637 | 39.7k | if (remaining_before_giveup < remaining_before_next_warn) { |
638 | 0 | keep_trying = false; |
639 | 0 | } |
640 | | |
641 | 39.7k | auto wait_for = std::min(remaining_before_giveup, remaining_before_next_warn); |
642 | | |
643 | 39.7k | if (thread_->done_.WaitFor(wait_for)) { |
644 | | // Unconditionally join before returning, to guarantee that any TLS |
645 | | // has been destroyed (pthread_key_create() destructors only run |
646 | | // after a pthread's user method has returned). |
647 | 39.7k | int ret = pthread_join(thread_->thread_, NULL); |
648 | 39.7k | CHECK_EQ(ret, 0); |
649 | 39.7k | thread_->joinable_ = false; |
650 | 39.7k | return Status::OK(); |
651 | 39.7k | } |
652 | 18.4E | waited += wait_for; |
653 | 18.4E | } |
654 | | |
655 | 39.7k | #ifndef NDEBUG |
656 | 18.4E | LOG(WARNING) << "Failed to join:\n" << DumpThreadStack(thread_->tid_for_stack()); |
657 | 18.4E | #endif |
658 | | |
659 | 18.4E | return STATUS_FORMAT(Aborted, "Timed out after $0 joining on $1", waited, thread_->name_); |
660 | 39.7k | } |
661 | | |
662 | 280k | Thread::~Thread() { |
663 | 280k | if (joinable_) { |
664 | 241k | int ret = pthread_detach(thread_); |
665 | 241k | CHECK_EQ(ret, 0); |
666 | 241k | } |
667 | 280k | } |
668 | | |
669 | 10 | void Thread::CallAtExit(const Closure& cb) { |
670 | 10 | CHECK_EQ(Thread::current_thread(), this); |
671 | 10 | exit_callbacks_.push_back(cb); |
672 | 10 | } |
673 | | |
674 | 2 | std::string Thread::ToString() const { |
675 | 2 | return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid_, name_, category_); |
676 | 2 | } |
677 | | |
678 | | Status Thread::StartThread(const std::string& category, const std::string& name, |
679 | 922k | ThreadFunctor functor, scoped_refptr<Thread> *holder) { |
680 | 922k | InitThreading(); |
681 | 922k | string padded_name = name; |
682 | | // See comment in SetThreadName. We're padding names to the 15 char limit in order to get |
683 | | // aggregations when using the linux perf tool, as that groups up stacks based on the 15 char |
684 | | // prefix of all the thread names. |
685 | 922k | if (name.length() < kMaxThreadNameInPerf) { |
686 | 279k | padded_name += string(kMaxThreadNameInPerf - name.length(), Thread::kPaddingChar); |
687 | 279k | } |
688 | 922k | const string log_prefix = Substitute("$0 ($1) ", padded_name, category); |
689 | 922k | SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread"); |
690 | | |
691 | | // Temporary reference for the duration of this function. |
692 | 922k | scoped_refptr<Thread> t(new Thread(category, padded_name, std::move(functor))); |
693 | | |
694 | 922k | { |
695 | 922k | SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread"); |
696 | 922k | int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, t.get()); |
697 | 922k | if (ret) { |
698 | 0 | return STATUS(RuntimeError, "Could not create thread", Errno(ret)); |
699 | 0 | } |
700 | 922k | } |
701 | | |
702 | | // The thread has been created and is now joinable. |
703 | | // |
704 | | // Why set this in the parent and not the child? Because only the parent |
705 | | // (or someone communicating with the parent) can join, so joinable must |
706 | | // be set before the parent returns. |
707 | 922k | t->joinable_ = true; |
708 | | |
709 | | // Optional, and only set if the thread was successfully created. |
710 | 923k | if (holder) { |
711 | 923k | *holder = t; |
712 | 923k | } |
713 | | |
714 | | // The tid_ member goes through the following states: |
715 | | // 1 CHILD_WAITING_TID: the child has just been spawned and is waiting |
716 | | // for the parent to finish writing to caller state (i.e. 'holder'). |
717 | | // 2. PARENT_WAITING_TID: the parent has updated caller state and is now |
718 | | // waiting for the child to write the tid. |
719 | | // 3. <value>: both the parent and the child are free to continue. If the |
720 | | // value is INVALID_TID, the child could not discover its tid. |
721 | 922k | Release_Store(&t->tid_, PARENT_WAITING_TID); |
722 | 922k | { |
723 | 922k | SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, |
724 | 922k | "waiting for new thread to publish its TID"); |
725 | 922k | int loop_count = 0; |
726 | 3.87M | while (Acquire_Load(&t->tid_) == PARENT_WAITING_TID) { |
727 | 2.95M | boost::detail::yield(loop_count++); |
728 | 2.95M | } |
729 | 922k | } |
730 | | |
731 | 18.4E | VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << padded_name; |
732 | 922k | return Status::OK(); |
733 | 922k | } |
734 | | |
735 | 923k | void* Thread::SuperviseThread(void* arg) { |
736 | 923k | Thread* t = static_cast<Thread*>(arg); |
737 | 923k | int64_t system_tid = Thread::CurrentThreadId(); |
738 | 923k | if (system_tid == -1) { |
739 | 0 | string error_msg = ErrnoToString(errno); |
740 | 0 | YB_LOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg; |
741 | 0 | } |
742 | 923k | string name = strings::Substitute("$0-$1", t->name(), system_tid); |
743 | | |
744 | | // Take an additional reference to the thread manager, which we'll need below. |
745 | 923k | ANNOTATE_IGNORE_SYNC_BEGIN(); |
746 | 923k | shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager; |
747 | 923k | ANNOTATE_IGNORE_SYNC_END(); |
748 | | |
749 | | // Set up the TLS. |
750 | | // |
751 | | // We could store a scoped_refptr in the TLS itself, but as its |
752 | | // lifecycle is poorly defined, we'll use a bare pointer and take an |
753 | | // additional reference on t out of band, in thread_ref. |
754 | 923k | scoped_refptr<Thread> thread_ref = t; |
755 | 923k | t->tls_ = t; |
756 | | |
757 | | // Wait until the parent has updated all caller-visible state, then write |
758 | | // the TID to 'tid_', thus completing the parent<-->child handshake. |
759 | 923k | int loop_count = 0; |
760 | 1.32M | while (Acquire_Load(&t->tid_) == CHILD_WAITING_TID) { |
761 | 397k | boost::detail::yield(loop_count++); |
762 | 397k | } |
763 | 923k | Release_Store(&t->tid_, system_tid); |
764 | | |
765 | 923k | thread_manager->SetThreadName(name, t->tid()); |
766 | 923k | thread_manager->AddThread(pthread_self(), name, t->category(), t->tid()); |
767 | | |
768 | 923k | cds::threading::Manager::attachThread(); |
769 | | |
770 | | // FinishThread() is guaranteed to run (even if functor_ throws an |
771 | | // exception) because pthread_cleanup_push() creates a scoped object |
772 | | // whose destructor invokes the provided callback. |
773 | 923k | pthread_cleanup_push(&Thread::FinishThread, t); |
774 | 923k | t->functor_(); |
775 | 923k | pthread_cleanup_pop(true); |
776 | | |
777 | 923k | return NULL; |
778 | 923k | } |
779 | | |
780 | 39.8k | void Thread::Join() { |
781 | 39.8k | WARN_NOT_OK(ThreadJoiner(this).Join(), "Thread join failed"); |
782 | 39.8k | } |
783 | | |
784 | 280k | void Thread::FinishThread(void* arg) { |
785 | 280k | cds::threading::Manager::detachThread(); |
786 | | |
787 | 280k | Thread* t = static_cast<Thread*>(arg); |
788 | | |
789 | 10 | for (Closure& c : t->exit_callbacks_) { |
790 | 10 | c.Run(); |
791 | 10 | } |
792 | | |
793 | | // We're here either because of the explicit pthread_cleanup_pop() in |
794 | | // SuperviseThread() or through pthread_exit(). In either case, |
795 | | // thread_manager is guaranteed to be live because thread_mgr_ref in |
796 | | // SuperviseThread() is still live. |
797 | 280k | thread_manager->RemoveThread(pthread_self(), t->category()); |
798 | | |
799 | | // Signal any Joiner that we're done. |
800 | 280k | t->done_.CountDown(); |
801 | | |
802 | 18.4E | VLOG(2) << "Ended thread " << t->tid() << " - " |
803 | 18.4E | << t->category() << ":" << t->name(); |
804 | 280k | } |
805 | | |
806 | 52.0k | CDSAttacher::CDSAttacher() { |
807 | 52.0k | cds::threading::Manager::attachThread(); |
808 | 52.0k | } |
809 | | |
810 | 50.3k | CDSAttacher::~CDSAttacher() { |
811 | 50.3k | cds::threading::Manager::detachThread(); |
812 | 50.3k | } |
813 | | |
814 | | } // namespace yb |