/Users/deen/code/yugabyte-db/src/yb/util/debug/long_operation_tracker.cc
Line | Count | Source |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/util/debug/long_operation_tracker.h" |
15 | | |
16 | | #include <condition_variable> |
17 | | #include <mutex> |
18 | | #include <queue> |
19 | | #include <thread> |
20 | | |
21 | | #include "yb/util/debug-util.h" |
22 | | #include "yb/util/thread.h" |
23 | | #include "yb/util/tsan_util.h" |
24 | | |
25 | | namespace yb { |
26 | | |
27 | | struct LongOperationTracker::TrackedOperation { |
28 | | ThreadIdForStack thread_id; |
29 | | const char* message; |
30 | | CoarseTimePoint start; |
31 | | // time when we should log warning |
32 | | CoarseTimePoint time; |
33 | | bool complete = false; |
34 | | |
35 | | TrackedOperation( |
36 | | ThreadIdForStack thread_id_, const char* message_, CoarseTimePoint start_, |
37 | | CoarseTimePoint time_) |
38 | 371M | : thread_id(thread_id_), message(message_), start(start_), time(time_) { |
39 | 371M | } |
40 | | }; |
41 | | |
42 | | namespace { |
43 | | |
44 | | typedef std::shared_ptr<LongOperationTracker::TrackedOperation> TrackedOperationPtr; |
45 | | |
46 | | struct TrackedOperationComparer { |
47 | | // Order is reversed, because priority_queue keeps track of the "largest" element. |
48 | 6.86G | bool operator()(const TrackedOperationPtr& lhs, const TrackedOperationPtr& rhs) { |
49 | 6.86G | return lhs->time > rhs->time; |
50 | 6.86G | } |
51 | | }; |
52 | | |
53 | | // Singleton that maintains queue of tracked operation and runs thread that checks for expired |
54 | | // operations. |
55 | | class LongOperationTrackerHelper { |
56 | | public: |
57 | 22.0k | LongOperationTrackerHelper() : thread_(std::bind(&LongOperationTrackerHelper::Execute, this)) { |
58 | 22.0k | } |
59 | | |
60 | | LongOperationTrackerHelper(const LongOperationTrackerHelper&) = delete; |
61 | | void operator=(const LongOperationTrackerHelper&) = delete; |
62 | | |
63 | 6.85k | ~LongOperationTrackerHelper() { |
64 | 6.85k | { |
65 | 6.85k | std::lock_guard<std::mutex> lock(mutex_); |
66 | 6.85k | stop_ = true; |
67 | 6.85k | } |
68 | 6.85k | cond_.notify_one(); |
69 | 6.85k | thread_.join(); |
70 | 6.85k | } |
71 | | |
72 | 371M | static LongOperationTrackerHelper& Instance() { |
73 | 371M | static LongOperationTrackerHelper result; |
74 | 371M | return result; |
75 | 371M | } |
76 | | |
77 | 371M | TrackedOperationPtr Register(const char* message, MonoDelta duration) { |
78 | 371M | auto start = CoarseMonoClock::now(); |
79 | 371M | auto result = std::make_shared<LongOperationTracker::TrackedOperation>( |
80 | 371M | Thread::CurrentThreadIdForStack(), message, start, start + duration * kTimeMultiplier); |
81 | 371M | { |
82 | 371M | std::lock_guard<std::mutex> lock(mutex_); |
83 | 371M | queue_.push(result); |
84 | 371M | } |
85 | | |
86 | 371M | cond_.notify_one(); |
87 | | |
88 | 371M | return result; |
89 | 371M | } |
90 | | |
91 | | private: |
92 | 22.0k | void Execute() { |
93 | 22.0k | std::unique_lock<std::mutex> lock(mutex_); |
94 | 565M | while (!stop_) { |
95 | 565M | if (queue_.empty()) { |
96 | 31.9k | cond_.wait(lock); |
97 | 31.9k | continue; |
98 | 31.9k | } |
99 | | |
100 | 565M | const CoarseTimePoint first_entry_time = queue_.top()->time; |
101 | | |
102 | 565M | auto now = CoarseMonoClock::now(); |
103 | 565M | if (now < first_entry_time) { |
104 | 331M | if (cond_.wait_for(lock, first_entry_time - now) != std::cv_status::timeout) { |
105 | 198M | continue; |
106 | 198M | } |
107 | 132M | now = CoarseMonoClock::now(); |
108 | 132M | } |
109 | | |
110 | 366M | TrackedOperationPtr operation = queue_.top(); |
111 | 366M | queue_.pop(); |
112 | 366M | if (!operation.unique()) { |
113 | 6.78k | lock.unlock(); |
114 | 6.78k | LOG(WARNING) << operation->message << " running for " << MonoDelta(now - operation->start) |
115 | 6.78k | << " in thread " << operation->thread_id << ":\n" |
116 | 6.78k | << DumpThreadStack(operation->thread_id); |
117 | 6.78k | lock.lock(); |
118 | 6.78k | } |
119 | 366M | } |
120 | 22.0k | } |
121 | | |
122 | | std::priority_queue< |
123 | | TrackedOperationPtr, std::vector<TrackedOperationPtr>, TrackedOperationComparer> queue_; |
124 | | |
125 | | std::mutex mutex_; |
126 | | std::condition_variable cond_; |
127 | | bool stop_; |
128 | | std::thread thread_; |
129 | | }; |
130 | | |
131 | | } // namespace |
132 | | |
133 | | LongOperationTracker::LongOperationTracker(const char* message, MonoDelta duration) |
134 | 371M | : tracked_operation_(LongOperationTrackerHelper::Instance().Register(message, duration)) { |
135 | 371M | } |
136 | | |
137 | 401M | LongOperationTracker::~LongOperationTracker() { |
138 | 401M | if (!tracked_operation_) { |
139 | 30.6M | return; |
140 | 30.6M | } |
141 | 371M | auto now = CoarseMonoClock::now(); |
142 | 371M | if (now > tracked_operation_->time) { |
143 | 9.32k | LOG(WARNING) << tracked_operation_->message << " took a long time: " |
144 | 9.32k | << MonoDelta(now - tracked_operation_->start); |
145 | 9.32k | } |
146 | 371M | } |
147 | | |
148 | | } // namespace yb |