YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/server/hybrid_clock.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/server/hybrid_clock.h"
34
35
#include <algorithm>
36
#include <mutex>
37
38
#include <glog/logging.h>
39
40
#include "yb/gutil/bind.h"
41
#include "yb/gutil/walltime.h"
42
43
#include "yb/util/errno.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/locks.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/metrics.h"
48
#include "yb/util/result.h"
49
50
DEFINE_bool(use_hybrid_clock, true,
51
            "Whether HybridClock should be used as the default clock"
52
            " implementation. This should be disabled for testing purposes only.");
53
TAG_FLAG(use_hybrid_clock, hidden);
54
55
METRIC_DEFINE_gauge_uint64(server, hybrid_clock_hybrid_time,
56
                           "Hybrid Clock HybridTime",
57
                           yb::MetricUnit::kMicroseconds,
58
                           "Hybrid clock hybrid_time.");
59
METRIC_DEFINE_gauge_uint64(server, hybrid_clock_error,
60
                           "Hybrid Clock Error",
61
                           yb::MetricUnit::kMicroseconds,
62
                           "Server clock maximum error.");
63
METRIC_DEFINE_gauge_int64(server, hybrid_clock_skew,
64
                           "Hybrid Clock Skew",
65
                           yb::MetricUnit::kMicroseconds,
66
                           "Server clock skew.");
67
68
DEFINE_string(time_source, "",
69
              "The clock source that HybridClock should use (for tests only). "
70
              "Leave empty for WallClock, other values depend on added clock providers and "
71
              "specific for appropriate tests, that adds them.");
72
TAG_FLAG(time_source, hidden);
73
74
DEFINE_bool(fail_on_out_of_range_clock_skew, true,
75
            "In case transactional tables are present, crash the process if clock skew greater "
76
            "than the configured maximum.");
77
78
DEFINE_uint64(clock_skew_force_crash_bound_usec, 60000000,
79
              "If the clock skew larger than this amount (microseconds) is observed, we will force "
80
              "a crash regardless of the value of fail_on_out_of_range_clock_skew. This is useful "
81
              "for avoiding really large hybrid clock jumps. Set to 0 to disable the check. Note "
82
              "that this check is only preformed for clock skew greater than max_clock_skew_usec.");
83
84
DECLARE_uint64(max_clock_skew_usec);
85
86
using yb::Status;
87
using strings::Substitute;
88
89
namespace yb {
90
namespace server {
91
92
namespace {
93
94
std::mutex providers_mutex;
95
std::unordered_map<std::string, PhysicalClockProvider> providers;
96
97
std::atomic<bool> clock_skew_control_enabled{false};
98
99
// options should be in format clock_name[,extra_data] and extra_data would be passed to
100
// clock factory.
101
26.5k
PhysicalClockPtr GetClock(const std::string& options) {
102
26.5k
  if (options.empty()) {
103
26.0k
    return WallClock();
104
26.0k
  }
105
106
532
  auto pos = options.find(',');
107
532
  auto name = pos == std::string::npos ? 
options523
:
options.substr(0, pos)9
;
108
532
  auto arg = pos == std::string::npos ? 
std::string()523
:
options.substr(pos + 1)9
;
109
532
  std::lock_guard<std::mutex> lock(providers_mutex);
110
532
  auto it = providers.find(name);
111
532
  if (it == providers.end()) {
112
0
    LOG(DFATAL) << "Unknown time source: " << name;
113
0
    return WallClock();
114
0
  }
115
532
  return it->second(arg);
116
532
}
117
118
} // namespace
119
120
14.8k
void HybridClock::RegisterProvider(std::string name, PhysicalClockProvider provider) {
121
14.8k
  std::lock_guard<std::mutex> lock(providers_mutex);
122
14.8k
  providers.emplace(std::move(name), std::move(provider));
123
14.8k
}
124
125
26.5k
HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {}
Unexecuted instantiation: yb::server::HybridClock::HybridClock()
yb::server::HybridClock::HybridClock()
Line
Count
Source
125
26.5k
HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {}
126
127
26.7k
HybridClock::HybridClock(PhysicalClockPtr clock) : clock_(std::move(clock)) {}
128
129
26.5k
HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {}
Unexecuted instantiation: yb::server::HybridClock::HybridClock(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
yb::server::HybridClock::HybridClock(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
Line
Count
Source
129
26.5k
HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {}
130
131
26.4k
Status HybridClock::Init() {
132
26.4k
#if defined(__APPLE__)
133
26.4k
  LOG(WARNING) << "HybridClock initialized in local mode (OS X only). "
134
26.4k
               << "Not suitable for distributed clusters.";
135
26.4k
#endif // defined(__APPLE__)
136
137
26.4k
  state_ = kInitialized;
138
139
26.4k
  return Status::OK();
140
26.4k
}
141
142
206M
HybridTimeRange HybridClock::NowRange() {
143
206M
  HybridTime now;
144
206M
  uint64_t error;
145
146
206M
  NowWithError(&now, &error);
147
206M
  auto max_global_now = HybridTimeFromMicroseconds(
148
206M
      clock_->MaxGlobalTime({now.GetPhysicalValueMicros(), error}));
149
206M
  return std::make_pair(now, max_global_now);
150
206M
}
151
152
206M
void HybridClock::NowWithError(HybridTime *hybrid_time, uint64_t *max_error_usec) {
153
206M
  DCHECK_EQ
(state_, kInitialized) << "Clock not initialized. Must call Init() first."0
;
154
155
206M
  HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
156
157
206M
  auto now = clock_->Now();
158
206M
  if (PREDICT_FALSE(!now.ok())) {
159
0
    LOG(FATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. "
160
0
        "Status: $0", now.status().ToString());
161
0
  }
162
163
  // If the current time surpasses the last update just return it
164
206M
  HybridClockComponents new_components = { now->time_point, 1 };
165
166
18.4E
  VLOG(4) << __func__ << ", new: " << new_components << ", current: " << current_components;
167
168
206M
  if (now->time_point < current_components.last_usec) {
169
6.64M
    auto delta_us = current_components.last_usec - now->time_point;
170
6.64M
    if (delta_us > FLAGS_max_clock_skew_usec) {
171
6.63M
      auto delta = MonoDelta::FromMicroseconds(delta_us);
172
6.63M
      auto max_allowed = MonoDelta::FromMicroseconds(FLAGS_max_clock_skew_usec);
173
6.63M
      if ((ANNOTATE_UNPROTECTED_READ(FLAGS_fail_on_out_of_range_clock_skew) ||
174
6.63M
           
(0
ANNOTATE_UNPROTECTED_READ0
(FLAGS_clock_skew_force_crash_bound_usec) > 00
&&
175
0
            delta_us > ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec))) &&
176
6.63M
          
clock_skew_control_enabled.load(std::memory_order_acquire)6.62M
) {
177
0
        LOG(FATAL) << "Too big clock skew is detected: " << delta << ", while max allowed is: "
178
0
                   << max_allowed << "; clock_skew_force_crash_bound_usec="
179
0
                   << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec);
180
6.63M
      } else {
181
6.63M
        YB_LOG_EVERY_N_SECS(ERROR, 1)
182
13
            << "Too big clock skew is detected: " << delta << ", while max allowed is: "
183
13
            << max_allowed << "; clock_skew_force_crash_bound_usec="
184
13
            << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec);
185
6.63M
      }
186
6.63M
    }
187
200M
  } else {
188
    // Loop over the check in case of concurrent updates making the CAS fail.
189
221M
    while (now->time_point > current_components.last_usec) {
190
217M
      if (components_.compare_exchange_weak(current_components, new_components)) {
191
196M
        *hybrid_time = HybridTimeFromMicroseconds(new_components.last_usec);
192
196M
        *max_error_usec = now->max_error;
193
196M
        if (PREDICT_FALSE(VLOG_IS_ON(2))) {
194
0
          VLOG(2) << "Current clock is higher than the last one. Resetting logical values."
195
0
              << " Time: " << *hybrid_time << ", Error: " << *max_error_usec;
196
0
        }
197
196M
        return;
198
196M
      }
199
217M
    }
200
200M
  }
201
202
  // We don't have the last time read max error since it might have originated
203
  // in another machine, but we can put a bound on the maximum error of the
204
  // hybrid_time we are providing.
205
  // In particular we know that the "true" time falls within the interval
206
  // now_usec +- now.maxerror so we get the following situations:
207
  //
208
  // 1)
209
  // --------|----------|----|---------|--------------------------> time
210
  //     now - e       now  last   now + e
211
  // 2)
212
  // --------|----------|--------------|------|-------------------> time
213
  //     now - e       now         now + e   last
214
  //
215
  // Assuming, in the worst case, that the "true" time is now - error we need to
216
  // always return: last - (now - e) as the new maximum error.
217
  // This broadens the error interval for both cases but always returns
218
  // a correct error interval.
219
220
19.1M
  
do 10.3M
{
221
19.1M
    new_components.last_usec = current_components.last_usec;
222
19.1M
    new_components.logical = current_components.logical + 1;
223
19.1M
    new_components.HandleLogicalComponentOverflow();
224
    // Loop over the check until the CAS succeeds, in case there are concurrent updates.
225
19.1M
  } while (!components_.compare_exchange_weak(current_components, new_components));
226
227
10.3M
  *max_error_usec = new_components.last_usec - (now->time_point - now->max_error);
228
229
  // We've already atomically incremented the logical, so subtract 1.
230
10.3M
  *hybrid_time = HybridTimeFromMicrosecondsAndLogicalValue(
231
10.3M
      new_components.last_usec,
232
10.3M
      narrow_cast<LogicalTimeComponent>(new_components.logical)).Decremented();
233
10.3M
  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
234
1
    VLOG(2) << "Current clock is lower than the last one. Returning last read and incrementing"
235
0
        " logical values. Hybrid time: " << *hybrid_time << " Error: " << *max_error_usec;
236
1
  }
237
10.3M
}
238
239
104M
void HybridClock::Update(const HybridTime& to_update) {
240
104M
  if (!to_update.is_valid()) {
241
0
    return;
242
0
  }
243
244
104M
  HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
245
104M
  HybridClockComponents new_components = {
246
104M
    GetPhysicalValueMicros(to_update), GetLogicalValue(to_update) + 1
247
104M
  };
248
249
  // VLOG(4) crashes in TSAN mode
250
104M
  if (VLOG_IS_ON(4)) {
251
0
    LOG(INFO) << __func__ << ", new: " << new_components << ", current: " << current_components;
252
0
  }
253
254
104M
  new_components.HandleLogicalComponentOverflow();
255
256
  // Keep trying to CAS until it works or until HT has advanced past this update.
257
107M
  while (current_components < new_components &&
258
107M
      
!components_.compare_exchange_weak(current_components, new_components)48.4M
)
{}3.38M
259
104M
}
260
261
// Used to get the hybrid_time for metrics.
262
15.4k
uint64_t HybridClock::NowForMetrics() {
263
15.4k
  return Now().ToUint64();
264
15.4k
}
265
266
// Used to get the current error, for metrics.
267
15.4k
uint64_t HybridClock::ErrorForMetrics() {
268
15.4k
  HybridTime now;
269
15.4k
  uint64_t error;
270
271
15.4k
  NowWithError(&now, &error);
272
15.4k
  return error;
273
15.4k
}
274
275
15.4k
int64_t HybridClock::SkewForMetrics() {
276
15.4k
  HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
277
15.4k
  auto now = clock_->Now();
278
15.4k
  if (PREDICT_FALSE(!now.ok())) {
279
0
    LOG(DFATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. "
280
0
        "Status: $0", now.status().ToString());
281
0
    return 0;
282
0
  }
283
  // Making sure we don't return a negative value.
284
15.4k
  int64_t potential_skew = current_components.last_usec - now->time_point;
285
15.4k
  return std::max<int64_t>(0, potential_skew);
286
15.4k
}
287
288
0
std::string HybridClockComponents::ToString() const {
289
0
  return Format("{ last_usec: $0 logical: $1 }", last_usec, logical);
290
0
}
291
292
0
std::ostream& operator<<(std::ostream& out, const HybridClockComponents& components) {
293
0
  return out << components.ToString();
294
0
}
295
296
124M
void HybridClockComponents::HandleLogicalComponentOverflow() {
297
124M
  if (logical > HybridTime::kLogicalBitMask) {
298
5.62k
    static constexpr uint64_t kMaxOverflowValue = 1ULL << HybridTime::kBitsForLogicalComponent;
299
5.62k
    if (logical > kMaxOverflowValue) {
300
0
      LOG(FATAL) << "Logical component is too high: last_usec=" << last_usec
301
0
                 << "logical=" << logical << ", max allowed is " << kMaxOverflowValue;
302
0
    }
303
5.62k
    YB_LOG_EVERY_N_SECS(WARNING, 5) << "Logical component overflow: "
304
3
        << "last_usec=" << last_usec << ", logical=" << logical;
305
306
5.62k
    last_usec += logical >> HybridTime::kBitsForLogicalComponent;
307
5.62k
    logical &= HybridTime::kLogicalBitMask;
308
5.62k
  }
309
124M
}
310
311
25.7k
void HybridClock::RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) {
312
25.7k
  METRIC_hybrid_clock_hybrid_time.InstantiateFunctionGauge(
313
25.7k
      metric_entity,
314
25.7k
      Bind(&HybridClock::NowForMetrics, Unretained(this)))
315
25.7k
    ->AutoDetachToLastValue(&metric_detacher_);
316
25.7k
  METRIC_hybrid_clock_error.InstantiateFunctionGauge(
317
25.7k
      metric_entity,
318
25.7k
      Bind(&HybridClock::ErrorForMetrics, Unretained(this)))
319
25.7k
    ->AutoDetachToLastValue(&metric_detacher_);
320
25.7k
  METRIC_hybrid_clock_skew.InstantiateFunctionGauge(
321
25.7k
      metric_entity,
322
25.7k
      Bind(&HybridClock::SkewForMetrics, Unretained(this)))
323
25.7k
    ->AutoDetachToLastValue(&metric_detacher_);
324
25.7k
}
325
326
110M
LogicalTimeComponent HybridClock::GetLogicalValue(const HybridTime& hybrid_time) {
327
110M
  return hybrid_time.GetLogicalValue();
328
110M
}
329
330
158M
MicrosTime HybridClock::GetPhysicalValueMicros(const HybridTime& hybrid_time) {
331
158M
  return hybrid_time.GetPhysicalValueMicros();
332
158M
}
333
334
48.5M
uint64_t HybridClock::GetPhysicalValueNanos(const HybridTime& hybrid_time) {
335
  // Conversion to nanoseconds here is safe from overflow since 2^kBitsForLogicalComponent is less
336
  // than MonoTime::kNanosecondsPerMicrosecond. Although, we still just check for sanity.
337
48.5M
  uint64_t micros = hybrid_time.value() >> HybridTime::kBitsForLogicalComponent;
338
48.5M
  CHECK(micros <= std::numeric_limits<uint64_t>::max() / MonoTime::kNanosecondsPerMicrosecond);
339
48.5M
  return micros * MonoTime::kNanosecondsPerMicrosecond;
340
48.5M
}
341
342
403M
HybridTime HybridClock::HybridTimeFromMicroseconds(uint64_t micros) {
343
403M
  return HybridTime::FromMicros(micros);
344
403M
}
345
346
HybridTime HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(
347
16.9M
    MicrosTime micros, LogicalTimeComponent logical_value) {
348
16.9M
  return HybridTime::FromMicrosecondsAndLogicalValue(micros, logical_value);
349
16.9M
}
350
351
// CAUTION: USE WITH EXTREME CARE!!! This function does not have overflow checking.
352
// It is recommended to use CompareHybridClocksToDelta, below.
353
HybridTime HybridClock::AddPhysicalTimeToHybridTime(const HybridTime& original,
354
6.29M
                                                    const MonoDelta& to_add) {
355
6.29M
  uint64_t new_physical = GetPhysicalValueMicros(original) + to_add.ToMicroseconds();
356
6.29M
  auto old_logical = GetLogicalValue(original);
357
6.29M
  return HybridTimeFromMicrosecondsAndLogicalValue(new_physical, old_logical);
358
6.29M
}
359
360
int HybridClock::CompareHybridClocksToDelta(const HybridTime& begin,
361
                                            const HybridTime& end,
362
24.2M
                                            const MonoDelta& delta) {
363
24.2M
  if (end < begin) {
364
3
    return -1;
365
3
  }
366
  // We use nanoseconds since MonoDelta has nanosecond granularity.
367
24.2M
  uint64_t begin_nanos = GetPhysicalValueNanos(begin);
368
24.2M
  uint64_t end_nanos = GetPhysicalValueNanos(end);
369
24.2M
  uint64_t delta_nanos = delta.ToNanoseconds();
370
24.2M
  if (end_nanos - begin_nanos > delta_nanos) {
371
17.9k
    return 1;
372
24.2M
  } else if (end_nanos - begin_nanos == delta_nanos) {
373
662
    uint64_t begin_logical = GetLogicalValue(begin);
374
662
    uint64_t end_logical = GetLogicalValue(end);
375
662
    if (end_logical > begin_logical) {
376
1
      return 1;
377
661
    } else if (end_logical < begin_logical) {
378
1
      return -1;
379
660
    } else {
380
660
      return 0;
381
660
    }
382
24.2M
  } else {
383
24.2M
    return -1;
384
24.2M
  }
385
24.2M
}
386
387
49.0k
void HybridClock::EnableClockSkewControl() {
388
49.0k
  clock_skew_control_enabled.store(true, std::memory_order_release);
389
49.0k
}
390
391
}  // namespace server
392
}  // namespace yb