YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/failure_detector.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/util/failure_detector.h"
34
35
#include <mutex>
36
#include <unordered_map>
37
38
#include <glog/logging.h>
39
40
#include "yb/gutil/map-util.h"
41
#include "yb/gutil/stl_util.h"
42
43
#include "yb/util/locks.h"
44
#include "yb/util/status.h"
45
#include "yb/util/status_log.h"
46
#include "yb/util/thread.h"
47
48
namespace yb {
49
50
using std::unordered_map;
51
using strings::Substitute;
52
53
const int64_t RandomizedFailureMonitor::kMinWakeUpTimeMillis = 10;
54
55
TimedFailureDetector::TimedFailureDetector(MonoDelta failure_period)
56
1
    : failure_period_(std::move(failure_period)) {}
57
58
1
TimedFailureDetector::~TimedFailureDetector() {
59
1
  STLDeleteValues(&nodes_);
60
1
}
61
62
Status TimedFailureDetector::Track(const string& name,
63
                                   const MonoTime& now,
64
1
                                   const FailureDetectedCallback& callback) {
65
1
  std::lock_guard<simple_spinlock> lock(lock_);
66
1
  auto node = std::make_unique<Node>();
67
1
  node->permanent_name = name;
68
1
  node->callback = callback;
69
1
  node->last_heard_of = now;
70
1
  node->status = ALIVE;
71
1
  if (!InsertIfNotPresent(&nodes_, name, node.get())) {
72
0
    return STATUS(AlreadyPresent,
73
0
        Substitute("Node with name '$0' is already being monitored", name));
74
0
  }
75
1
  node.release();
76
1
  return Status::OK();
77
1
}
78
79
1
Status TimedFailureDetector::UnTrack(const string& name) {
80
1
  std::lock_guard<simple_spinlock> lock(lock_);
81
1
  Node* node = EraseKeyReturnValuePtr(&nodes_, name);
82
1
  if (PREDICT_FALSE(node == NULL)) {
83
0
    return STATUS(NotFound, Substitute("Node with name '$0' not found", name));
84
0
  }
85
1
  delete node;
86
1
  return Status::OK();
87
1
}
88
89
3
bool TimedFailureDetector::IsTracking(const std::string& name) {
90
3
  std::lock_guard<simple_spinlock> lock(lock_);
91
3
  return ContainsKey(nodes_, name);
92
3
}
93
94
40
Status TimedFailureDetector::MessageFrom(const std::string& name, const MonoTime& now) {
95
0
  VLOG(3) << "Received message from " << name << " at " << now.ToString();
96
40
  std::lock_guard<simple_spinlock> lock(lock_);
97
40
  Node* node = FindPtrOrNull(nodes_, name);
98
40
  if (node == NULL) {
99
0
    VLOG(1) << "Not tracking node: " << name;
100
0
    return STATUS(NotFound, Substitute("Message from unknown node '$0'", name));
101
0
  }
102
40
  node->last_heard_of = now;
103
40
  node->status = ALIVE;
104
40
  return Status::OK();
105
40
}
106
107
FailureDetector::NodeStatus TimedFailureDetector::GetNodeStatusUnlocked(const std::string& name,
108
7
                                                                        const MonoTime& now) {
109
7
  Node* node = FindOrDie(nodes_, name);
110
7
  if (now.GetDeltaSince(node->last_heard_of).MoreThan(failure_period_)) {
111
1
    node->status = DEAD;
112
1
  }
113
7
  return node->status;
114
7
}
115
116
7
void TimedFailureDetector::CheckForFailures(const MonoTime& now) {
117
7
  unordered_map<string, FailureDetectedCallback> callbacks;
118
7
  {
119
7
    std::lock_guard<simple_spinlock> lock(lock_);
120
7
    for (const auto& entry : nodes_) {
121
7
      if (GetNodeStatusUnlocked(entry.first, now) == DEAD) {
122
1
        InsertOrDie(&callbacks, entry.first, entry.second->callback);
123
1
      }
124
7
    }
125
7
  }
126
127
  // Invoke failure callbacks outside of lock.
128
1
  for (const auto& entry : callbacks) {
129
1
    const string& node_name = entry.first;
130
1
    const FailureDetectedCallback& callback = entry.second;
131
1
    callback.Run(node_name, STATUS(RemoteError, Substitute("Node '$0' failed", node_name)));
132
1
  }
133
7
}
134
135
RandomizedFailureMonitor::RandomizedFailureMonitor(uint32_t random_seed,
136
                                                   int64_t period_mean_millis,
137
                                                   int64_t period_stddev_millis)
138
    : period_mean_millis_(period_mean_millis),
139
      period_stddev_millis_(period_stddev_millis),
140
      random_(random_seed),
141
      run_latch_(0),
142
1
      shutdown_(false) {
143
1
}
144
145
1
RandomizedFailureMonitor::~RandomizedFailureMonitor() {
146
1
  Shutdown();
147
1
}
148
149
1
Status RandomizedFailureMonitor::Start() {
150
1
  CHECK(!thread_);
151
1
  run_latch_.Reset(1);
152
1
  return Thread::Create("failure-monitors", "failure-monitor",
153
1
                        &RandomizedFailureMonitor::RunThread,
154
1
                        this, &thread_);
155
1
}
156
157
2
void RandomizedFailureMonitor::Shutdown() {
158
2
  if (!thread_) {
159
1
    return;
160
1
  }
161
162
1
  {
163
1
    std::lock_guard<simple_spinlock> l(lock_);
164
1
    if (shutdown_) {
165
0
      return;
166
0
    }
167
1
    shutdown_ = true;
168
1
  }
169
170
1
  run_latch_.CountDown();
171
1
  CHECK_OK(ThreadJoiner(thread_.get()).Join());
172
1
  thread_.reset();
173
1
}
174
175
Status RandomizedFailureMonitor::MonitorFailureDetector(const string& name,
176
1
                                                        const scoped_refptr<FailureDetector>& fd) {
177
1
  std::lock_guard<simple_spinlock> l(lock_);
178
1
  bool inserted = InsertIfNotPresent(&fds_, name, fd);
179
1
  if (PREDICT_FALSE(!inserted)) {
180
0
    return STATUS(AlreadyPresent, Substitute("Already monitoring failure detector '$0'", name));
181
0
  }
182
1
  return Status::OK();
183
1
}
184
185
1
Status RandomizedFailureMonitor::UnmonitorFailureDetector(const string& name) {
186
1
  std::lock_guard<simple_spinlock> l(lock_);
187
1
  auto count = fds_.erase(name);
188
1
  if (PREDICT_FALSE(count == 0)) {
189
0
    return STATUS(NotFound, Substitute("Failure detector '$0' not found", name));
190
0
  }
191
1
  return Status::OK();
192
1
}
193
194
1
void RandomizedFailureMonitor::RunThread() {
195
0
  VLOG(1) << "Failure monitor thread starting";
196
197
8
  while (true) {
198
8
    int64_t wait_millis = random_.Normal(period_mean_millis_, period_stddev_millis_);
199
8
    if (wait_millis < kMinWakeUpTimeMillis) {
200
0
      wait_millis = kMinWakeUpTimeMillis;
201
0
    }
202
203
8
    MonoDelta wait_delta = MonoDelta::FromMilliseconds(wait_millis);
204
0
    VLOG(3) << "RandomizedFailureMonitor sleeping for: " << wait_delta.ToString();
205
8
    if (run_latch_.WaitFor(wait_delta)) {
206
      // CountDownLatch reached 0.
207
1
      std::lock_guard<simple_spinlock> lock(lock_);
208
      // Check if we were told to shutdown.
209
1
      if (shutdown_) {
210
        // Latch fired: exit loop.
211
0
        VLOG(1) << "RandomizedFailureMonitor thread shutting down";
212
1
        return;
213
1
      }
214
7
    }
215
216
    // Take a copy of the FD map under the lock.
217
7
    FDMap fds_copy;
218
7
    {
219
7
      std::lock_guard<simple_spinlock> l(lock_);
220
7
      fds_copy = fds_;
221
7
    }
222
223
7
    MonoTime now = MonoTime::Now();
224
7
    for (const FDMap::value_type& entry : fds_copy) {
225
7
      entry.second->CheckForFailures(now);
226
7
    }
227
7
  }
228
1
}
229
230
}  // namespace yb