YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/server/hybrid_clock.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/server/hybrid_clock.h"
34
35
#include <algorithm>
36
#include <mutex>
37
38
#include <glog/logging.h>
39
40
#include "yb/gutil/bind.h"
41
#include "yb/gutil/walltime.h"
42
43
#include "yb/util/errno.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/locks.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/metrics.h"
48
#include "yb/util/result.h"
49
50
DEFINE_bool(use_hybrid_clock, true,
51
            "Whether HybridClock should be used as the default clock"
52
            " implementation. This should be disabled for testing purposes only.");
53
TAG_FLAG(use_hybrid_clock, hidden);
54
55
METRIC_DEFINE_gauge_uint64(server, hybrid_clock_hybrid_time,
56
                           "Hybrid Clock HybridTime",
57
                           yb::MetricUnit::kMicroseconds,
58
                           "Hybrid clock hybrid_time.");
59
METRIC_DEFINE_gauge_uint64(server, hybrid_clock_error,
60
                           "Hybrid Clock Error",
61
                           yb::MetricUnit::kMicroseconds,
62
                           "Server clock maximum error.");
63
METRIC_DEFINE_gauge_int64(server, hybrid_clock_skew,
64
                           "Hybrid Clock Skew",
65
                           yb::MetricUnit::kMicroseconds,
66
                           "Server clock skew.");
67
68
DEFINE_string(time_source, "",
69
              "The clock source that HybridClock should use (for tests only). "
70
              "Leave empty for WallClock, other values depend on added clock providers and "
71
              "specific for appropriate tests, that adds them.");
72
TAG_FLAG(time_source, hidden);
73
74
DEFINE_bool(fail_on_out_of_range_clock_skew, true,
75
            "In case transactional tables are present, crash the process if clock skew greater "
76
            "than the configured maximum.");
77
78
DEFINE_uint64(clock_skew_force_crash_bound_usec, 60000000,
79
              "If the clock skew larger than this amount (microseconds) is observed, we will force "
80
              "a crash regardless of the value of fail_on_out_of_range_clock_skew. This is useful "
81
              "for avoiding really large hybrid clock jumps. Set to 0 to disable the check. Note "
82
              "that this check is only preformed for clock skew greater than max_clock_skew_usec.");
83
84
DECLARE_uint64(max_clock_skew_usec);
85
86
using yb::Status;
87
using strings::Substitute;
88
89
namespace yb {
90
namespace server {
91
92
namespace {
93
94
std::mutex providers_mutex;
95
std::unordered_map<std::string, PhysicalClockProvider> providers;
96
97
std::atomic<bool> clock_skew_control_enabled{false};
98
99
// options should be in format clock_name[,extra_data] and extra_data would be passed to
100
// clock factory.
101
14.6k
PhysicalClockPtr GetClock(const std::string& options) {
102
14.6k
  if (options.empty()) {
103
14.1k
    return WallClock();
104
14.1k
  }
105
106
496
  auto pos = options.find(',');
107
494
  auto name = pos == std::string::npos ? options : options.substr(0, pos);
108
494
  auto arg = pos == std::string::npos ? std::string() : options.substr(pos + 1);
109
496
  std::lock_guard<std::mutex> lock(providers_mutex);
110
496
  auto it = providers.find(name);
111
496
  if (it == providers.end()) {
112
0
    LOG(DFATAL) << "Unknown time source: " << name;
113
0
    return WallClock();
114
0
  }
115
496
  return it->second(arg);
116
496
}
117
118
} // namespace
119
120
10.5k
void HybridClock::RegisterProvider(std::string name, PhysicalClockProvider provider) {
121
10.5k
  std::lock_guard<std::mutex> lock(providers_mutex);
122
10.5k
  providers.emplace(std::move(name), std::move(provider));
123
10.5k
}
124
125
14.6k
HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {}
Unexecuted instantiation: _ZN2yb6server11HybridClockC2Ev
_ZN2yb6server11HybridClockC1Ev
Line
Count
Source
125
14.6k
HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {}
126
127
14.8k
HybridClock::HybridClock(PhysicalClockPtr clock) : clock_(std::move(clock)) {}
128
129
14.6k
HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {}
Unexecuted instantiation: _ZN2yb6server11HybridClockC2ERKNSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEE
_ZN2yb6server11HybridClockC1ERKNSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEE
Line
Count
Source
129
14.6k
HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {}
130
131
14.6k
Status HybridClock::Init() {
132
14.6k
#if defined(__APPLE__)
133
14.6k
  LOG(WARNING) << "HybridClock initialized in local mode (OS X only). "
134
14.6k
               << "Not suitable for distributed clusters.";
135
14.6k
#endif // defined(__APPLE__)
136
137
14.6k
  state_ = kInitialized;
138
139
14.6k
  return Status::OK();
140
14.6k
}
141
142
100M
HybridTimeRange HybridClock::NowRange() {
143
100M
  HybridTime now;
144
100M
  uint64_t error;
145
146
100M
  NowWithError(&now, &error);
147
100M
  auto max_global_now = HybridTimeFromMicroseconds(
148
100M
      clock_->MaxGlobalTime({now.GetPhysicalValueMicros(), error}));
149
100M
  return std::make_pair(now, max_global_now);
150
100M
}
151
152
100M
void HybridClock::NowWithError(HybridTime *hybrid_time, uint64_t *max_error_usec) {
153
0
  DCHECK_EQ(state_, kInitialized) << "Clock not initialized. Must call Init() first.";
154
155
100M
  HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
156
157
100M
  auto now = clock_->Now();
158
100M
  if (PREDICT_FALSE(!now.ok())) {
159
0
    LOG(FATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. "
160
0
        "Status: $0", now.status().ToString());
161
0
  }
162
163
  // If the current time surpasses the last update just return it
164
100M
  HybridClockComponents new_components = { now->time_point, 1 };
165
166
18.4E
  VLOG(4) << __func__ << ", new: " << new_components << ", current: " << current_components;
167
168
100M
  if (now->time_point < current_components.last_usec) {
169
5.74M
    auto delta_us = current_components.last_usec - now->time_point;
170
5.74M
    if (delta_us > FLAGS_max_clock_skew_usec) {
171
5.73M
      auto delta = MonoDelta::FromMicroseconds(delta_us);
172
5.73M
      auto max_allowed = MonoDelta::FromMicroseconds(FLAGS_max_clock_skew_usec);
173
5.73M
      if ((ANNOTATE_UNPROTECTED_READ(FLAGS_fail_on_out_of_range_clock_skew) ||
174
0
           (ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec) > 0 &&
175
0
            delta_us > ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec))) &&
176
5.72M
          clock_skew_control_enabled.load(std::memory_order_acquire)) {
177
0
        LOG(FATAL) << "Too big clock skew is detected: " << delta << ", while max allowed is: "
178
0
                   << max_allowed << "; clock_skew_force_crash_bound_usec="
179
0
                   << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec);
180
5.73M
      } else {
181
5.73M
        YB_LOG_EVERY_N_SECS(ERROR, 1)
182
13
            << "Too big clock skew is detected: " << delta << ", while max allowed is: "
183
13
            << max_allowed << "; clock_skew_force_crash_bound_usec="
184
13
            << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec);
185
5.73M
      }
186
5.73M
    }
187
95.0M
  } else {
188
    // Loop over the check in case of concurrent updates making the CAS fail.
189
109M
    while (now->time_point > current_components.last_usec) {
190
106M
      if (components_.compare_exchange_weak(current_components, new_components)) {
191
91.7M
        *hybrid_time = HybridTimeFromMicroseconds(new_components.last_usec);
192
91.7M
        *max_error_usec = now->max_error;
193
91.7M
        if (PREDICT_FALSE(VLOG_IS_ON(2))) {
194
0
          VLOG(2) << "Current clock is higher than the last one. Resetting logical values."
195
0
              << " Time: " << *hybrid_time << ", Error: " << *max_error_usec;
196
0
        }
197
91.7M
        return;
198
91.7M
      }
199
106M
    }
200
95.0M
  }
201
202
  // We don't have the last time read max error since it might have originated
203
  // in another machine, but we can put a bound on the maximum error of the
204
  // hybrid_time we are providing.
205
  // In particular we know that the "true" time falls within the interval
206
  // now_usec +- now.maxerror so we get the following situations:
207
  //
208
  // 1)
209
  // --------|----------|----|---------|--------------------------> time
210
  //     now - e       now  last   now + e
211
  // 2)
212
  // --------|----------|--------------|------|-------------------> time
213
  //     now - e       now         now + e   last
214
  //
215
  // Assuming, in the worst case, that the "true" time is now - error we need to
216
  // always return: last - (now - e) as the new maximum error.
217
  // This broadens the error interval for both cases but always returns
218
  // a correct error interval.
219
220
16.0M
  do {
221
16.0M
    new_components.last_usec = current_components.last_usec;
222
16.0M
    new_components.logical = current_components.logical + 1;
223
16.0M
    new_components.HandleLogicalComponentOverflow();
224
    // Loop over the check until the CAS succeeds, in case there are concurrent updates.
225
16.0M
  } while (!components_.compare_exchange_weak(current_components, new_components));
226
227
8.96M
  *max_error_usec = new_components.last_usec - (now->time_point - now->max_error);
228
229
  // We've already atomically incremented the logical, so subtract 1.
230
8.96M
  *hybrid_time = HybridTimeFromMicrosecondsAndLogicalValue(
231
8.96M
      new_components.last_usec,
232
8.96M
      narrow_cast<LogicalTimeComponent>(new_components.logical)).Decremented();
233
8.96M
  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
234
0
    VLOG(2) << "Current clock is lower than the last one. Returning last read and incrementing"
235
0
        " logical values. Hybrid time: " << *hybrid_time << " Error: " << *max_error_usec;
236
1
  }
237
8.96M
}
238
239
50.7M
void HybridClock::Update(const HybridTime& to_update) {
240
50.7M
  if (!to_update.is_valid()) {
241
0
    return;
242
0
  }
243
244
50.7M
  HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
245
50.7M
  HybridClockComponents new_components = {
246
50.7M
    GetPhysicalValueMicros(to_update), GetLogicalValue(to_update) + 1
247
50.7M
  };
248
249
  // VLOG(4) crashes in TSAN mode
250
2
  if (VLOG_IS_ON(4)) {
251
2
    LOG(INFO) << __func__ << ", new: " << new_components << ", current: " << current_components;
252
2
  }
253
254
50.7M
  new_components.HandleLogicalComponentOverflow();
255
256
  // Keep trying to CAS until it works or until HT has advanced past this update.
257
52.9M
  while (current_components < new_components &&
258
21.1M
      !components_.compare_exchange_weak(current_components, new_components)) {}
259
50.7M
}
260
261
// Used to get the hybrid_time for metrics.
262
15.2k
uint64_t HybridClock::NowForMetrics() {
263
15.2k
  return Now().ToUint64();
264
15.2k
}
265
266
// Used to get the current error, for metrics.
267
15.2k
uint64_t HybridClock::ErrorForMetrics() {
268
15.2k
  HybridTime now;
269
15.2k
  uint64_t error;
270
271
15.2k
  NowWithError(&now, &error);
272
15.2k
  return error;
273
15.2k
}
274
275
15.2k
int64_t HybridClock::SkewForMetrics() {
276
15.2k
  HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
277
15.2k
  auto now = clock_->Now();
278
15.2k
  if (PREDICT_FALSE(!now.ok())) {
279
0
    LOG(DFATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. "
280
0
        "Status: $0", now.status().ToString());
281
0
    return 0;
282
0
  }
283
  // Making sure we don't return a negative value.
284
15.2k
  int64_t potential_skew = current_components.last_usec - now->time_point;
285
15.2k
  return std::max<int64_t>(0, potential_skew);
286
15.2k
}
287
288
4
std::string HybridClockComponents::ToString() const {
289
4
  return Format("{ last_usec: $0 logical: $1 }", last_usec, logical);
290
4
}
291
292
4
std::ostream& operator<<(std::ostream& out, const HybridClockComponents& components) {
293
4
  return out << components.ToString();
294
4
}
295
296
67.0M
void HybridClockComponents::HandleLogicalComponentOverflow() {
297
67.0M
  if (logical > HybridTime::kLogicalBitMask) {
298
4.86k
    static constexpr uint64_t kMaxOverflowValue = 1ULL << HybridTime::kBitsForLogicalComponent;
299
4.86k
    if (logical > kMaxOverflowValue) {
300
0
      LOG(FATAL) << "Logical component is too high: last_usec=" << last_usec
301
0
                 << "logical=" << logical << ", max allowed is " << kMaxOverflowValue;
302
0
    }
303
4.86k
    YB_LOG_EVERY_N_SECS(WARNING, 5) << "Logical component overflow: "
304
2
        << "last_usec=" << last_usec << ", logical=" << logical;
305
306
4.86k
    last_usec += logical >> HybridTime::kBitsForLogicalComponent;
307
4.86k
    logical &= HybridTime::kLogicalBitMask;
308
4.86k
  }
309
67.0M
}
310
311
17.1k
void HybridClock::RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) {
312
17.1k
  METRIC_hybrid_clock_hybrid_time.InstantiateFunctionGauge(
313
17.1k
      metric_entity,
314
17.1k
      Bind(&HybridClock::NowForMetrics, Unretained(this)))
315
17.1k
    ->AutoDetachToLastValue(&metric_detacher_);
316
17.1k
  METRIC_hybrid_clock_error.InstantiateFunctionGauge(
317
17.1k
      metric_entity,
318
17.1k
      Bind(&HybridClock::ErrorForMetrics, Unretained(this)))
319
17.1k
    ->AutoDetachToLastValue(&metric_detacher_);
320
17.1k
  METRIC_hybrid_clock_skew.InstantiateFunctionGauge(
321
17.1k
      metric_entity,
322
17.1k
      Bind(&HybridClock::SkewForMetrics, Unretained(this)))
323
17.1k
    ->AutoDetachToLastValue(&metric_detacher_);
324
17.1k
}
325
326
55.5M
LogicalTimeComponent HybridClock::GetLogicalValue(const HybridTime& hybrid_time) {
327
55.5M
  return hybrid_time.GetLogicalValue();
328
55.5M
}
329
330
82.2M
MicrosTime HybridClock::GetPhysicalValueMicros(const HybridTime& hybrid_time) {
331
82.2M
  return hybrid_time.GetPhysicalValueMicros();
332
82.2M
}
333
334
26.3M
uint64_t HybridClock::GetPhysicalValueNanos(const HybridTime& hybrid_time) {
335
  // Conversion to nanoseconds here is safe from overflow since 2^kBitsForLogicalComponent is less
336
  // than MonoTime::kNanosecondsPerMicrosecond. Although, we still just check for sanity.
337
26.3M
  uint64_t micros = hybrid_time.value() >> HybridTime::kBitsForLogicalComponent;
338
26.3M
  CHECK(micros <= std::numeric_limits<uint64_t>::max() / MonoTime::kNanosecondsPerMicrosecond);
339
26.3M
  return micros * MonoTime::kNanosecondsPerMicrosecond;
340
26.3M
}
341
342
192M
HybridTime HybridClock::HybridTimeFromMicroseconds(uint64_t micros) {
343
192M
  return HybridTime::FromMicros(micros);
344
192M
}
345
346
HybridTime HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(
347
13.7M
    MicrosTime micros, LogicalTimeComponent logical_value) {
348
13.7M
  return HybridTime::FromMicrosecondsAndLogicalValue(micros, logical_value);
349
13.7M
}
350
351
// CAUTION: USE WITH EXTREME CARE!!! This function does not have overflow checking.
352
// It is recommended to use CompareHybridClocksToDelta, below.
353
HybridTime HybridClock::AddPhysicalTimeToHybridTime(const HybridTime& original,
354
5.58M
                                                    const MonoDelta& to_add) {
355
5.58M
  uint64_t new_physical = GetPhysicalValueMicros(original) + to_add.ToMicroseconds();
356
5.58M
  auto old_logical = GetLogicalValue(original);
357
5.58M
  return HybridTimeFromMicrosecondsAndLogicalValue(new_physical, old_logical);
358
5.58M
}
359
360
int HybridClock::CompareHybridClocksToDelta(const HybridTime& begin,
361
                                            const HybridTime& end,
362
13.2M
                                            const MonoDelta& delta) {
363
13.2M
  if (end < begin) {
364
1
    return -1;
365
1
  }
366
  // We use nanoseconds since MonoDelta has nanosecond granularity.
367
13.2M
  uint64_t begin_nanos = GetPhysicalValueNanos(begin);
368
13.2M
  uint64_t end_nanos = GetPhysicalValueNanos(end);
369
13.2M
  uint64_t delta_nanos = delta.ToNanoseconds();
370
13.2M
  if (end_nanos - begin_nanos > delta_nanos) {
371
9.10k
    return 1;
372
13.1M
  } else if (end_nanos - begin_nanos == delta_nanos) {
373
267
    uint64_t begin_logical = GetLogicalValue(begin);
374
267
    uint64_t end_logical = GetLogicalValue(end);
375
267
    if (end_logical > begin_logical) {
376
1
      return 1;
377
266
    } else if (end_logical < begin_logical) {
378
1
      return -1;
379
265
    } else {
380
265
      return 0;
381
265
    }
382
13.1M
  } else {
383
13.1M
    return -1;
384
13.1M
  }
385
13.2M
}
386
387
20.4k
void HybridClock::EnableClockSkewControl() {
388
20.4k
  clock_skew_control_enabled.store(true, std::memory_order_release);
389
20.4k
}
390
391
}  // namespace server
392
}  // namespace yb