YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/debug/long_operation_tracker.cc
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/util/debug/long_operation_tracker.h"
15
16
#include <condition_variable>
17
#include <mutex>
18
#include <queue>
19
#include <thread>
20
21
#include "yb/util/debug-util.h"
22
#include "yb/util/thread.h"
23
#include "yb/util/tsan_util.h"
24
25
namespace yb {
26
27
struct LongOperationTracker::TrackedOperation {
28
  ThreadIdForStack thread_id;
29
  const char* message;
30
  CoarseTimePoint start;
31
  // time when we should log warning
32
  CoarseTimePoint time;
33
  bool complete = false;
34
35
  TrackedOperation(
36
      ThreadIdForStack thread_id_, const char* message_, CoarseTimePoint start_,
37
      CoarseTimePoint time_)
38
371M
      : thread_id(thread_id_), message(message_), start(start_), time(time_) {
39
371M
  }
40
};
41
42
namespace {
43
44
typedef std::shared_ptr<LongOperationTracker::TrackedOperation> TrackedOperationPtr;
45
46
struct TrackedOperationComparer {
47
  // Order is reversed, because priority_queue keeps track of the "largest" element.
48
6.86G
  bool operator()(const TrackedOperationPtr& lhs, const TrackedOperationPtr& rhs) {
49
6.86G
    return lhs->time > rhs->time;
50
6.86G
  }
51
};
52
53
// Singleton that maintains queue of tracked operation and runs thread that checks for expired
54
// operations.
55
class LongOperationTrackerHelper {
56
 public:
57
22.0k
  LongOperationTrackerHelper() : thread_(std::bind(&LongOperationTrackerHelper::Execute, this)) {
58
22.0k
  }
59
60
  LongOperationTrackerHelper(const LongOperationTrackerHelper&) = delete;
61
  void operator=(const LongOperationTrackerHelper&) = delete;
62
63
6.85k
  ~LongOperationTrackerHelper() {
64
6.85k
    {
65
6.85k
      std::lock_guard<std::mutex> lock(mutex_);
66
6.85k
      stop_ = true;
67
6.85k
    }
68
6.85k
    cond_.notify_one();
69
6.85k
    thread_.join();
70
6.85k
  }
71
72
371M
  static LongOperationTrackerHelper& Instance() {
73
371M
    static LongOperationTrackerHelper result;
74
371M
    return result;
75
371M
  }
76
77
371M
  TrackedOperationPtr Register(const char* message, MonoDelta duration) {
78
371M
    auto start = CoarseMonoClock::now();
79
371M
    auto result = std::make_shared<LongOperationTracker::TrackedOperation>(
80
371M
        Thread::CurrentThreadIdForStack(), message, start, start + duration * kTimeMultiplier);
81
371M
    {
82
371M
      std::lock_guard<std::mutex> lock(mutex_);
83
371M
      queue_.push(result);
84
371M
    }
85
86
371M
    cond_.notify_one();
87
88
371M
    return result;
89
371M
  }
90
91
 private:
92
22.0k
  void Execute() {
93
22.0k
    std::unique_lock<std::mutex> lock(mutex_);
94
565M
    while (!stop_) {
95
565M
      if (queue_.empty()) {
96
31.9k
        cond_.wait(lock);
97
31.9k
        continue;
98
31.9k
      }
99
100
565M
      const CoarseTimePoint first_entry_time = queue_.top()->time;
101
102
565M
      auto now = CoarseMonoClock::now();
103
565M
      if (now < first_entry_time) {
104
331M
        if (cond_.wait_for(lock, first_entry_time - now) != std::cv_status::timeout) {
105
198M
          continue;
106
198M
        }
107
132M
        now = CoarseMonoClock::now();
108
132M
      }
109
110
366M
      TrackedOperationPtr operation = queue_.top();
111
366M
      queue_.pop();
112
366M
      if (!operation.unique()) {
113
6.78k
        lock.unlock();
114
6.78k
        LOG(WARNING) << operation->message << " running for " << MonoDelta(now - operation->start)
115
6.78k
                     << " in thread " << operation->thread_id << ":\n"
116
6.78k
                     << DumpThreadStack(operation->thread_id);
117
6.78k
        lock.lock();
118
6.78k
      }
119
366M
    }
120
22.0k
  }
121
122
  std::priority_queue<
123
      TrackedOperationPtr, std::vector<TrackedOperationPtr>, TrackedOperationComparer> queue_;
124
125
  std::mutex mutex_;
126
  std::condition_variable cond_;
127
  bool stop_;
128
  std::thread thread_;
129
};
130
131
} // namespace
132
133
LongOperationTracker::LongOperationTracker(const char* message, MonoDelta duration)
134
371M
    : tracked_operation_(LongOperationTrackerHelper::Instance().Register(message, duration)) {
135
371M
}
136
137
401M
LongOperationTracker::~LongOperationTracker() {
138
401M
  if (!tracked_operation_) {
139
30.6M
    return;
140
30.6M
  }
141
371M
  auto now = CoarseMonoClock::now();
142
371M
  if (now > tracked_operation_->time) {
143
9.32k
    LOG(WARNING) << tracked_operation_->message << " took a long time: "
144
9.32k
                 << MonoDelta(now - tracked_operation_->start);
145
9.32k
  }
146
371M
}
147
148
} // namespace yb