/Users/deen/code/yugabyte-db/src/yb/server/hybrid_clock.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/server/hybrid_clock.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | |
38 | | #include <glog/logging.h> |
39 | | |
40 | | #include "yb/gutil/bind.h" |
41 | | #include "yb/gutil/walltime.h" |
42 | | |
43 | | #include "yb/util/errno.h" |
44 | | #include "yb/util/flag_tags.h" |
45 | | #include "yb/util/locks.h" |
46 | | #include "yb/util/logging.h" |
47 | | #include "yb/util/metrics.h" |
48 | | #include "yb/util/result.h" |
49 | | |
50 | | DEFINE_bool(use_hybrid_clock, true, |
51 | | "Whether HybridClock should be used as the default clock" |
52 | | " implementation. This should be disabled for testing purposes only."); |
53 | | TAG_FLAG(use_hybrid_clock, hidden); |
54 | | |
55 | | METRIC_DEFINE_gauge_uint64(server, hybrid_clock_hybrid_time, |
56 | | "Hybrid Clock HybridTime", |
57 | | yb::MetricUnit::kMicroseconds, |
58 | | "Hybrid clock hybrid_time."); |
59 | | METRIC_DEFINE_gauge_uint64(server, hybrid_clock_error, |
60 | | "Hybrid Clock Error", |
61 | | yb::MetricUnit::kMicroseconds, |
62 | | "Server clock maximum error."); |
63 | | METRIC_DEFINE_gauge_int64(server, hybrid_clock_skew, |
64 | | "Hybrid Clock Skew", |
65 | | yb::MetricUnit::kMicroseconds, |
66 | | "Server clock skew."); |
67 | | |
68 | | DEFINE_string(time_source, "", |
69 | | "The clock source that HybridClock should use (for tests only). " |
70 | | "Leave empty for WallClock, other values depend on added clock providers and " |
71 | | "specific for appropriate tests, that adds them."); |
72 | | TAG_FLAG(time_source, hidden); |
73 | | |
74 | | DEFINE_bool(fail_on_out_of_range_clock_skew, true, |
75 | | "In case transactional tables are present, crash the process if clock skew greater " |
76 | | "than the configured maximum."); |
77 | | |
78 | | DEFINE_uint64(clock_skew_force_crash_bound_usec, 60000000, |
79 | | "If the clock skew larger than this amount (microseconds) is observed, we will force " |
80 | | "a crash regardless of the value of fail_on_out_of_range_clock_skew. This is useful " |
81 | | "for avoiding really large hybrid clock jumps. Set to 0 to disable the check. Note " |
82 | | "that this check is only preformed for clock skew greater than max_clock_skew_usec."); |
83 | | |
84 | | DECLARE_uint64(max_clock_skew_usec); |
85 | | |
86 | | using yb::Status; |
87 | | using strings::Substitute; |
88 | | |
89 | | namespace yb { |
90 | | namespace server { |
91 | | |
92 | | namespace { |
93 | | |
94 | | std::mutex providers_mutex; |
95 | | std::unordered_map<std::string, PhysicalClockProvider> providers; |
96 | | |
97 | | std::atomic<bool> clock_skew_control_enabled{false}; |
98 | | |
99 | | // options should be in format clock_name[,extra_data] and extra_data would be passed to |
100 | | // clock factory. |
101 | 14.6k | PhysicalClockPtr GetClock(const std::string& options) { |
102 | 14.6k | if (options.empty()) { |
103 | 14.1k | return WallClock(); |
104 | 14.1k | } |
105 | | |
106 | 496 | auto pos = options.find(','); |
107 | 494 | auto name = pos == std::string::npos ? options : options.substr(0, pos); |
108 | 494 | auto arg = pos == std::string::npos ? std::string() : options.substr(pos + 1); |
109 | 496 | std::lock_guard<std::mutex> lock(providers_mutex); |
110 | 496 | auto it = providers.find(name); |
111 | 496 | if (it == providers.end()) { |
112 | 0 | LOG(DFATAL) << "Unknown time source: " << name; |
113 | 0 | return WallClock(); |
114 | 0 | } |
115 | 496 | return it->second(arg); |
116 | 496 | } |
117 | | |
118 | | } // namespace |
119 | | |
120 | 10.5k | void HybridClock::RegisterProvider(std::string name, PhysicalClockProvider provider) { |
121 | 10.5k | std::lock_guard<std::mutex> lock(providers_mutex); |
122 | 10.5k | providers.emplace(std::move(name), std::move(provider)); |
123 | 10.5k | } |
124 | | |
125 | 14.6k | HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {} Unexecuted instantiation: _ZN2yb6server11HybridClockC2Ev _ZN2yb6server11HybridClockC1Ev Line | Count | Source | 125 | 14.6k | HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {} |
|
126 | | |
127 | 14.8k | HybridClock::HybridClock(PhysicalClockPtr clock) : clock_(std::move(clock)) {} |
128 | | |
129 | 14.6k | HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {} Unexecuted instantiation: _ZN2yb6server11HybridClockC2ERKNSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEE _ZN2yb6server11HybridClockC1ERKNSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEE Line | Count | Source | 129 | 14.6k | HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {} |
|
130 | | |
131 | 14.6k | Status HybridClock::Init() { |
132 | 14.6k | #if defined(__APPLE__) |
133 | 14.6k | LOG(WARNING) << "HybridClock initialized in local mode (OS X only). " |
134 | 14.6k | << "Not suitable for distributed clusters."; |
135 | 14.6k | #endif // defined(__APPLE__) |
136 | | |
137 | 14.6k | state_ = kInitialized; |
138 | | |
139 | 14.6k | return Status::OK(); |
140 | 14.6k | } |
141 | | |
142 | 100M | HybridTimeRange HybridClock::NowRange() { |
143 | 100M | HybridTime now; |
144 | 100M | uint64_t error; |
145 | | |
146 | 100M | NowWithError(&now, &error); |
147 | 100M | auto max_global_now = HybridTimeFromMicroseconds( |
148 | 100M | clock_->MaxGlobalTime({now.GetPhysicalValueMicros(), error})); |
149 | 100M | return std::make_pair(now, max_global_now); |
150 | 100M | } |
151 | | |
152 | 100M | void HybridClock::NowWithError(HybridTime *hybrid_time, uint64_t *max_error_usec) { |
153 | 0 | DCHECK_EQ(state_, kInitialized) << "Clock not initialized. Must call Init() first."; |
154 | | |
155 | 100M | HybridClockComponents current_components = components_.load(boost::memory_order_acquire); |
156 | | |
157 | 100M | auto now = clock_->Now(); |
158 | 100M | if (PREDICT_FALSE(!now.ok())) { |
159 | 0 | LOG(FATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. " |
160 | 0 | "Status: $0", now.status().ToString()); |
161 | 0 | } |
162 | | |
163 | | // If the current time surpasses the last update just return it |
164 | 100M | HybridClockComponents new_components = { now->time_point, 1 }; |
165 | | |
166 | 18.4E | VLOG(4) << __func__ << ", new: " << new_components << ", current: " << current_components; |
167 | | |
168 | 100M | if (now->time_point < current_components.last_usec) { |
169 | 5.74M | auto delta_us = current_components.last_usec - now->time_point; |
170 | 5.74M | if (delta_us > FLAGS_max_clock_skew_usec) { |
171 | 5.73M | auto delta = MonoDelta::FromMicroseconds(delta_us); |
172 | 5.73M | auto max_allowed = MonoDelta::FromMicroseconds(FLAGS_max_clock_skew_usec); |
173 | 5.73M | if ((ANNOTATE_UNPROTECTED_READ(FLAGS_fail_on_out_of_range_clock_skew) || |
174 | 0 | (ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec) > 0 && |
175 | 0 | delta_us > ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec))) && |
176 | 5.72M | clock_skew_control_enabled.load(std::memory_order_acquire)) { |
177 | 0 | LOG(FATAL) << "Too big clock skew is detected: " << delta << ", while max allowed is: " |
178 | 0 | << max_allowed << "; clock_skew_force_crash_bound_usec=" |
179 | 0 | << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec); |
180 | 5.73M | } else { |
181 | 5.73M | YB_LOG_EVERY_N_SECS(ERROR, 1) |
182 | 13 | << "Too big clock skew is detected: " << delta << ", while max allowed is: " |
183 | 13 | << max_allowed << "; clock_skew_force_crash_bound_usec=" |
184 | 13 | << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec); |
185 | 5.73M | } |
186 | 5.73M | } |
187 | 95.0M | } else { |
188 | | // Loop over the check in case of concurrent updates making the CAS fail. |
189 | 109M | while (now->time_point > current_components.last_usec) { |
190 | 106M | if (components_.compare_exchange_weak(current_components, new_components)) { |
191 | 91.7M | *hybrid_time = HybridTimeFromMicroseconds(new_components.last_usec); |
192 | 91.7M | *max_error_usec = now->max_error; |
193 | 91.7M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
194 | 0 | VLOG(2) << "Current clock is higher than the last one. Resetting logical values." |
195 | 0 | << " Time: " << *hybrid_time << ", Error: " << *max_error_usec; |
196 | 0 | } |
197 | 91.7M | return; |
198 | 91.7M | } |
199 | 106M | } |
200 | 95.0M | } |
201 | | |
202 | | // We don't have the last time read max error since it might have originated |
203 | | // in another machine, but we can put a bound on the maximum error of the |
204 | | // hybrid_time we are providing. |
205 | | // In particular we know that the "true" time falls within the interval |
206 | | // now_usec +- now.maxerror so we get the following situations: |
207 | | // |
208 | | // 1) |
209 | | // --------|----------|----|---------|--------------------------> time |
210 | | // now - e now last now + e |
211 | | // 2) |
212 | | // --------|----------|--------------|------|-------------------> time |
213 | | // now - e now now + e last |
214 | | // |
215 | | // Assuming, in the worst case, that the "true" time is now - error we need to |
216 | | // always return: last - (now - e) as the new maximum error. |
217 | | // This broadens the error interval for both cases but always returns |
218 | | // a correct error interval. |
219 | | |
220 | 16.0M | do { |
221 | 16.0M | new_components.last_usec = current_components.last_usec; |
222 | 16.0M | new_components.logical = current_components.logical + 1; |
223 | 16.0M | new_components.HandleLogicalComponentOverflow(); |
224 | | // Loop over the check until the CAS succeeds, in case there are concurrent updates. |
225 | 16.0M | } while (!components_.compare_exchange_weak(current_components, new_components)); |
226 | | |
227 | 8.96M | *max_error_usec = new_components.last_usec - (now->time_point - now->max_error); |
228 | | |
229 | | // We've already atomically incremented the logical, so subtract 1. |
230 | 8.96M | *hybrid_time = HybridTimeFromMicrosecondsAndLogicalValue( |
231 | 8.96M | new_components.last_usec, |
232 | 8.96M | narrow_cast<LogicalTimeComponent>(new_components.logical)).Decremented(); |
233 | 8.96M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
234 | 0 | VLOG(2) << "Current clock is lower than the last one. Returning last read and incrementing" |
235 | 0 | " logical values. Hybrid time: " << *hybrid_time << " Error: " << *max_error_usec; |
236 | 1 | } |
237 | 8.96M | } |
238 | | |
239 | 50.7M | void HybridClock::Update(const HybridTime& to_update) { |
240 | 50.7M | if (!to_update.is_valid()) { |
241 | 0 | return; |
242 | 0 | } |
243 | | |
244 | 50.7M | HybridClockComponents current_components = components_.load(boost::memory_order_acquire); |
245 | 50.7M | HybridClockComponents new_components = { |
246 | 50.7M | GetPhysicalValueMicros(to_update), GetLogicalValue(to_update) + 1 |
247 | 50.7M | }; |
248 | | |
249 | | // VLOG(4) crashes in TSAN mode |
250 | 2 | if (VLOG_IS_ON(4)) { |
251 | 2 | LOG(INFO) << __func__ << ", new: " << new_components << ", current: " << current_components; |
252 | 2 | } |
253 | | |
254 | 50.7M | new_components.HandleLogicalComponentOverflow(); |
255 | | |
256 | | // Keep trying to CAS until it works or until HT has advanced past this update. |
257 | 52.9M | while (current_components < new_components && |
258 | 21.1M | !components_.compare_exchange_weak(current_components, new_components)) {} |
259 | 50.7M | } |
260 | | |
261 | | // Used to get the hybrid_time for metrics. |
262 | 15.2k | uint64_t HybridClock::NowForMetrics() { |
263 | 15.2k | return Now().ToUint64(); |
264 | 15.2k | } |
265 | | |
266 | | // Used to get the current error, for metrics. |
267 | 15.2k | uint64_t HybridClock::ErrorForMetrics() { |
268 | 15.2k | HybridTime now; |
269 | 15.2k | uint64_t error; |
270 | | |
271 | 15.2k | NowWithError(&now, &error); |
272 | 15.2k | return error; |
273 | 15.2k | } |
274 | | |
275 | 15.2k | int64_t HybridClock::SkewForMetrics() { |
276 | 15.2k | HybridClockComponents current_components = components_.load(boost::memory_order_acquire); |
277 | 15.2k | auto now = clock_->Now(); |
278 | 15.2k | if (PREDICT_FALSE(!now.ok())) { |
279 | 0 | LOG(DFATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. " |
280 | 0 | "Status: $0", now.status().ToString()); |
281 | 0 | return 0; |
282 | 0 | } |
283 | | // Making sure we don't return a negative value. |
284 | 15.2k | int64_t potential_skew = current_components.last_usec - now->time_point; |
285 | 15.2k | return std::max<int64_t>(0, potential_skew); |
286 | 15.2k | } |
287 | | |
288 | 4 | std::string HybridClockComponents::ToString() const { |
289 | 4 | return Format("{ last_usec: $0 logical: $1 }", last_usec, logical); |
290 | 4 | } |
291 | | |
292 | 4 | std::ostream& operator<<(std::ostream& out, const HybridClockComponents& components) { |
293 | 4 | return out << components.ToString(); |
294 | 4 | } |
295 | | |
296 | 67.0M | void HybridClockComponents::HandleLogicalComponentOverflow() { |
297 | 67.0M | if (logical > HybridTime::kLogicalBitMask) { |
298 | 4.86k | static constexpr uint64_t kMaxOverflowValue = 1ULL << HybridTime::kBitsForLogicalComponent; |
299 | 4.86k | if (logical > kMaxOverflowValue) { |
300 | 0 | LOG(FATAL) << "Logical component is too high: last_usec=" << last_usec |
301 | 0 | << "logical=" << logical << ", max allowed is " << kMaxOverflowValue; |
302 | 0 | } |
303 | 4.86k | YB_LOG_EVERY_N_SECS(WARNING, 5) << "Logical component overflow: " |
304 | 2 | << "last_usec=" << last_usec << ", logical=" << logical; |
305 | | |
306 | 4.86k | last_usec += logical >> HybridTime::kBitsForLogicalComponent; |
307 | 4.86k | logical &= HybridTime::kLogicalBitMask; |
308 | 4.86k | } |
309 | 67.0M | } |
310 | | |
311 | 17.1k | void HybridClock::RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) { |
312 | 17.1k | METRIC_hybrid_clock_hybrid_time.InstantiateFunctionGauge( |
313 | 17.1k | metric_entity, |
314 | 17.1k | Bind(&HybridClock::NowForMetrics, Unretained(this))) |
315 | 17.1k | ->AutoDetachToLastValue(&metric_detacher_); |
316 | 17.1k | METRIC_hybrid_clock_error.InstantiateFunctionGauge( |
317 | 17.1k | metric_entity, |
318 | 17.1k | Bind(&HybridClock::ErrorForMetrics, Unretained(this))) |
319 | 17.1k | ->AutoDetachToLastValue(&metric_detacher_); |
320 | 17.1k | METRIC_hybrid_clock_skew.InstantiateFunctionGauge( |
321 | 17.1k | metric_entity, |
322 | 17.1k | Bind(&HybridClock::SkewForMetrics, Unretained(this))) |
323 | 17.1k | ->AutoDetachToLastValue(&metric_detacher_); |
324 | 17.1k | } |
325 | | |
326 | 55.5M | LogicalTimeComponent HybridClock::GetLogicalValue(const HybridTime& hybrid_time) { |
327 | 55.5M | return hybrid_time.GetLogicalValue(); |
328 | 55.5M | } |
329 | | |
330 | 82.2M | MicrosTime HybridClock::GetPhysicalValueMicros(const HybridTime& hybrid_time) { |
331 | 82.2M | return hybrid_time.GetPhysicalValueMicros(); |
332 | 82.2M | } |
333 | | |
334 | 26.3M | uint64_t HybridClock::GetPhysicalValueNanos(const HybridTime& hybrid_time) { |
335 | | // Conversion to nanoseconds here is safe from overflow since 2^kBitsForLogicalComponent is less |
336 | | // than MonoTime::kNanosecondsPerMicrosecond. Although, we still just check for sanity. |
337 | 26.3M | uint64_t micros = hybrid_time.value() >> HybridTime::kBitsForLogicalComponent; |
338 | 26.3M | CHECK(micros <= std::numeric_limits<uint64_t>::max() / MonoTime::kNanosecondsPerMicrosecond); |
339 | 26.3M | return micros * MonoTime::kNanosecondsPerMicrosecond; |
340 | 26.3M | } |
341 | | |
342 | 192M | HybridTime HybridClock::HybridTimeFromMicroseconds(uint64_t micros) { |
343 | 192M | return HybridTime::FromMicros(micros); |
344 | 192M | } |
345 | | |
346 | | HybridTime HybridClock::HybridTimeFromMicrosecondsAndLogicalValue( |
347 | 13.7M | MicrosTime micros, LogicalTimeComponent logical_value) { |
348 | 13.7M | return HybridTime::FromMicrosecondsAndLogicalValue(micros, logical_value); |
349 | 13.7M | } |
350 | | |
351 | | // CAUTION: USE WITH EXTREME CARE!!! This function does not have overflow checking. |
352 | | // It is recommended to use CompareHybridClocksToDelta, below. |
353 | | HybridTime HybridClock::AddPhysicalTimeToHybridTime(const HybridTime& original, |
354 | 5.58M | const MonoDelta& to_add) { |
355 | 5.58M | uint64_t new_physical = GetPhysicalValueMicros(original) + to_add.ToMicroseconds(); |
356 | 5.58M | auto old_logical = GetLogicalValue(original); |
357 | 5.58M | return HybridTimeFromMicrosecondsAndLogicalValue(new_physical, old_logical); |
358 | 5.58M | } |
359 | | |
360 | | int HybridClock::CompareHybridClocksToDelta(const HybridTime& begin, |
361 | | const HybridTime& end, |
362 | 13.2M | const MonoDelta& delta) { |
363 | 13.2M | if (end < begin) { |
364 | 1 | return -1; |
365 | 1 | } |
366 | | // We use nanoseconds since MonoDelta has nanosecond granularity. |
367 | 13.2M | uint64_t begin_nanos = GetPhysicalValueNanos(begin); |
368 | 13.2M | uint64_t end_nanos = GetPhysicalValueNanos(end); |
369 | 13.2M | uint64_t delta_nanos = delta.ToNanoseconds(); |
370 | 13.2M | if (end_nanos - begin_nanos > delta_nanos) { |
371 | 9.10k | return 1; |
372 | 13.1M | } else if (end_nanos - begin_nanos == delta_nanos) { |
373 | 267 | uint64_t begin_logical = GetLogicalValue(begin); |
374 | 267 | uint64_t end_logical = GetLogicalValue(end); |
375 | 267 | if (end_logical > begin_logical) { |
376 | 1 | return 1; |
377 | 266 | } else if (end_logical < begin_logical) { |
378 | 1 | return -1; |
379 | 265 | } else { |
380 | 265 | return 0; |
381 | 265 | } |
382 | 13.1M | } else { |
383 | 13.1M | return -1; |
384 | 13.1M | } |
385 | 13.2M | } |
386 | | |
387 | 20.4k | void HybridClock::EnableClockSkewControl() { |
388 | 20.4k | clock_skew_control_enabled.store(true, std::memory_order_release); |
389 | 20.4k | } |
390 | | |
391 | | } // namespace server |
392 | | } // namespace yb |