/Users/deen/code/yugabyte-db/src/yb/server/hybrid_clock.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/server/hybrid_clock.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | |
38 | | #include <glog/logging.h> |
39 | | |
40 | | #include "yb/gutil/bind.h" |
41 | | #include "yb/gutil/walltime.h" |
42 | | |
43 | | #include "yb/util/errno.h" |
44 | | #include "yb/util/flag_tags.h" |
45 | | #include "yb/util/locks.h" |
46 | | #include "yb/util/logging.h" |
47 | | #include "yb/util/metrics.h" |
48 | | #include "yb/util/result.h" |
49 | | |
50 | | DEFINE_bool(use_hybrid_clock, true, |
51 | | "Whether HybridClock should be used as the default clock" |
52 | | " implementation. This should be disabled for testing purposes only."); |
53 | | TAG_FLAG(use_hybrid_clock, hidden); |
54 | | |
55 | | METRIC_DEFINE_gauge_uint64(server, hybrid_clock_hybrid_time, |
56 | | "Hybrid Clock HybridTime", |
57 | | yb::MetricUnit::kMicroseconds, |
58 | | "Hybrid clock hybrid_time."); |
59 | | METRIC_DEFINE_gauge_uint64(server, hybrid_clock_error, |
60 | | "Hybrid Clock Error", |
61 | | yb::MetricUnit::kMicroseconds, |
62 | | "Server clock maximum error."); |
63 | | METRIC_DEFINE_gauge_int64(server, hybrid_clock_skew, |
64 | | "Hybrid Clock Skew", |
65 | | yb::MetricUnit::kMicroseconds, |
66 | | "Server clock skew."); |
67 | | |
68 | | DEFINE_string(time_source, "", |
69 | | "The clock source that HybridClock should use (for tests only). " |
70 | | "Leave empty for WallClock, other values depend on added clock providers and " |
71 | | "specific for appropriate tests, that adds them."); |
72 | | TAG_FLAG(time_source, hidden); |
73 | | |
74 | | DEFINE_bool(fail_on_out_of_range_clock_skew, true, |
75 | | "In case transactional tables are present, crash the process if clock skew greater " |
76 | | "than the configured maximum."); |
77 | | |
78 | | DEFINE_uint64(clock_skew_force_crash_bound_usec, 60000000, |
79 | | "If the clock skew larger than this amount (microseconds) is observed, we will force " |
80 | | "a crash regardless of the value of fail_on_out_of_range_clock_skew. This is useful " |
81 | | "for avoiding really large hybrid clock jumps. Set to 0 to disable the check. Note " |
82 | | "that this check is only preformed for clock skew greater than max_clock_skew_usec."); |
83 | | |
84 | | DECLARE_uint64(max_clock_skew_usec); |
85 | | |
86 | | using yb::Status; |
87 | | using strings::Substitute; |
88 | | |
89 | | namespace yb { |
90 | | namespace server { |
91 | | |
92 | | namespace { |
93 | | |
94 | | std::mutex providers_mutex; |
95 | | std::unordered_map<std::string, PhysicalClockProvider> providers; |
96 | | |
97 | | std::atomic<bool> clock_skew_control_enabled{false}; |
98 | | |
99 | | // options should be in format clock_name[,extra_data] and extra_data would be passed to |
100 | | // clock factory. |
101 | 26.5k | PhysicalClockPtr GetClock(const std::string& options) { |
102 | 26.5k | if (options.empty()) { |
103 | 26.0k | return WallClock(); |
104 | 26.0k | } |
105 | | |
106 | 532 | auto pos = options.find(','); |
107 | 532 | auto name = pos == std::string::npos ? options523 : options.substr(0, pos)9 ; |
108 | 532 | auto arg = pos == std::string::npos ? std::string()523 : options.substr(pos + 1)9 ; |
109 | 532 | std::lock_guard<std::mutex> lock(providers_mutex); |
110 | 532 | auto it = providers.find(name); |
111 | 532 | if (it == providers.end()) { |
112 | 0 | LOG(DFATAL) << "Unknown time source: " << name; |
113 | 0 | return WallClock(); |
114 | 0 | } |
115 | 532 | return it->second(arg); |
116 | 532 | } |
117 | | |
118 | | } // namespace |
119 | | |
120 | 14.8k | void HybridClock::RegisterProvider(std::string name, PhysicalClockProvider provider) { |
121 | 14.8k | std::lock_guard<std::mutex> lock(providers_mutex); |
122 | 14.8k | providers.emplace(std::move(name), std::move(provider)); |
123 | 14.8k | } |
124 | | |
125 | 26.5k | HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {} Unexecuted instantiation: yb::server::HybridClock::HybridClock() yb::server::HybridClock::HybridClock() Line | Count | Source | 125 | 26.5k | HybridClock::HybridClock() : HybridClock(FLAGS_time_source) {} |
|
126 | | |
127 | 26.7k | HybridClock::HybridClock(PhysicalClockPtr clock) : clock_(std::move(clock)) {} |
128 | | |
129 | 26.5k | HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {} Unexecuted instantiation: yb::server::HybridClock::HybridClock(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) yb::server::HybridClock::HybridClock(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) Line | Count | Source | 129 | 26.5k | HybridClock::HybridClock(const std::string& time_source) : HybridClock(GetClock(time_source)) {} |
|
130 | | |
131 | 26.4k | Status HybridClock::Init() { |
132 | 26.4k | #if defined(__APPLE__) |
133 | 26.4k | LOG(WARNING) << "HybridClock initialized in local mode (OS X only). " |
134 | 26.4k | << "Not suitable for distributed clusters."; |
135 | 26.4k | #endif // defined(__APPLE__) |
136 | | |
137 | 26.4k | state_ = kInitialized; |
138 | | |
139 | 26.4k | return Status::OK(); |
140 | 26.4k | } |
141 | | |
142 | 206M | HybridTimeRange HybridClock::NowRange() { |
143 | 206M | HybridTime now; |
144 | 206M | uint64_t error; |
145 | | |
146 | 206M | NowWithError(&now, &error); |
147 | 206M | auto max_global_now = HybridTimeFromMicroseconds( |
148 | 206M | clock_->MaxGlobalTime({now.GetPhysicalValueMicros(), error})); |
149 | 206M | return std::make_pair(now, max_global_now); |
150 | 206M | } |
151 | | |
152 | 206M | void HybridClock::NowWithError(HybridTime *hybrid_time, uint64_t *max_error_usec) { |
153 | 206M | DCHECK_EQ(state_, kInitialized) << "Clock not initialized. Must call Init() first."0 ; |
154 | | |
155 | 206M | HybridClockComponents current_components = components_.load(boost::memory_order_acquire); |
156 | | |
157 | 206M | auto now = clock_->Now(); |
158 | 206M | if (PREDICT_FALSE(!now.ok())) { |
159 | 0 | LOG(FATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. " |
160 | 0 | "Status: $0", now.status().ToString()); |
161 | 0 | } |
162 | | |
163 | | // If the current time surpasses the last update just return it |
164 | 206M | HybridClockComponents new_components = { now->time_point, 1 }; |
165 | | |
166 | 18.4E | VLOG(4) << __func__ << ", new: " << new_components << ", current: " << current_components; |
167 | | |
168 | 206M | if (now->time_point < current_components.last_usec) { |
169 | 6.64M | auto delta_us = current_components.last_usec - now->time_point; |
170 | 6.64M | if (delta_us > FLAGS_max_clock_skew_usec) { |
171 | 6.63M | auto delta = MonoDelta::FromMicroseconds(delta_us); |
172 | 6.63M | auto max_allowed = MonoDelta::FromMicroseconds(FLAGS_max_clock_skew_usec); |
173 | 6.63M | if ((ANNOTATE_UNPROTECTED_READ(FLAGS_fail_on_out_of_range_clock_skew) || |
174 | 6.63M | (0 ANNOTATE_UNPROTECTED_READ0 (FLAGS_clock_skew_force_crash_bound_usec) > 00 && |
175 | 0 | delta_us > ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec))) && |
176 | 6.63M | clock_skew_control_enabled.load(std::memory_order_acquire)6.62M ) { |
177 | 0 | LOG(FATAL) << "Too big clock skew is detected: " << delta << ", while max allowed is: " |
178 | 0 | << max_allowed << "; clock_skew_force_crash_bound_usec=" |
179 | 0 | << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec); |
180 | 6.63M | } else { |
181 | 6.63M | YB_LOG_EVERY_N_SECS(ERROR, 1) |
182 | 13 | << "Too big clock skew is detected: " << delta << ", while max allowed is: " |
183 | 13 | << max_allowed << "; clock_skew_force_crash_bound_usec=" |
184 | 13 | << ANNOTATE_UNPROTECTED_READ(FLAGS_clock_skew_force_crash_bound_usec); |
185 | 6.63M | } |
186 | 6.63M | } |
187 | 200M | } else { |
188 | | // Loop over the check in case of concurrent updates making the CAS fail. |
189 | 221M | while (now->time_point > current_components.last_usec) { |
190 | 217M | if (components_.compare_exchange_weak(current_components, new_components)) { |
191 | 196M | *hybrid_time = HybridTimeFromMicroseconds(new_components.last_usec); |
192 | 196M | *max_error_usec = now->max_error; |
193 | 196M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
194 | 0 | VLOG(2) << "Current clock is higher than the last one. Resetting logical values." |
195 | 0 | << " Time: " << *hybrid_time << ", Error: " << *max_error_usec; |
196 | 0 | } |
197 | 196M | return; |
198 | 196M | } |
199 | 217M | } |
200 | 200M | } |
201 | | |
202 | | // We don't have the last time read max error since it might have originated |
203 | | // in another machine, but we can put a bound on the maximum error of the |
204 | | // hybrid_time we are providing. |
205 | | // In particular we know that the "true" time falls within the interval |
206 | | // now_usec +- now.maxerror so we get the following situations: |
207 | | // |
208 | | // 1) |
209 | | // --------|----------|----|---------|--------------------------> time |
210 | | // now - e now last now + e |
211 | | // 2) |
212 | | // --------|----------|--------------|------|-------------------> time |
213 | | // now - e now now + e last |
214 | | // |
215 | | // Assuming, in the worst case, that the "true" time is now - error we need to |
216 | | // always return: last - (now - e) as the new maximum error. |
217 | | // This broadens the error interval for both cases but always returns |
218 | | // a correct error interval. |
219 | | |
220 | 19.1M | do 10.3M { |
221 | 19.1M | new_components.last_usec = current_components.last_usec; |
222 | 19.1M | new_components.logical = current_components.logical + 1; |
223 | 19.1M | new_components.HandleLogicalComponentOverflow(); |
224 | | // Loop over the check until the CAS succeeds, in case there are concurrent updates. |
225 | 19.1M | } while (!components_.compare_exchange_weak(current_components, new_components)); |
226 | | |
227 | 10.3M | *max_error_usec = new_components.last_usec - (now->time_point - now->max_error); |
228 | | |
229 | | // We've already atomically incremented the logical, so subtract 1. |
230 | 10.3M | *hybrid_time = HybridTimeFromMicrosecondsAndLogicalValue( |
231 | 10.3M | new_components.last_usec, |
232 | 10.3M | narrow_cast<LogicalTimeComponent>(new_components.logical)).Decremented(); |
233 | 10.3M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
234 | 1 | VLOG(2) << "Current clock is lower than the last one. Returning last read and incrementing" |
235 | 0 | " logical values. Hybrid time: " << *hybrid_time << " Error: " << *max_error_usec; |
236 | 1 | } |
237 | 10.3M | } |
238 | | |
239 | 104M | void HybridClock::Update(const HybridTime& to_update) { |
240 | 104M | if (!to_update.is_valid()) { |
241 | 0 | return; |
242 | 0 | } |
243 | | |
244 | 104M | HybridClockComponents current_components = components_.load(boost::memory_order_acquire); |
245 | 104M | HybridClockComponents new_components = { |
246 | 104M | GetPhysicalValueMicros(to_update), GetLogicalValue(to_update) + 1 |
247 | 104M | }; |
248 | | |
249 | | // VLOG(4) crashes in TSAN mode |
250 | 104M | if (VLOG_IS_ON(4)) { |
251 | 0 | LOG(INFO) << __func__ << ", new: " << new_components << ", current: " << current_components; |
252 | 0 | } |
253 | | |
254 | 104M | new_components.HandleLogicalComponentOverflow(); |
255 | | |
256 | | // Keep trying to CAS until it works or until HT has advanced past this update. |
257 | 107M | while (current_components < new_components && |
258 | 107M | !components_.compare_exchange_weak(current_components, new_components)48.4M ) {}3.38M |
259 | 104M | } |
260 | | |
261 | | // Used to get the hybrid_time for metrics. |
262 | 15.4k | uint64_t HybridClock::NowForMetrics() { |
263 | 15.4k | return Now().ToUint64(); |
264 | 15.4k | } |
265 | | |
266 | | // Used to get the current error, for metrics. |
267 | 15.4k | uint64_t HybridClock::ErrorForMetrics() { |
268 | 15.4k | HybridTime now; |
269 | 15.4k | uint64_t error; |
270 | | |
271 | 15.4k | NowWithError(&now, &error); |
272 | 15.4k | return error; |
273 | 15.4k | } |
274 | | |
275 | 15.4k | int64_t HybridClock::SkewForMetrics() { |
276 | 15.4k | HybridClockComponents current_components = components_.load(boost::memory_order_acquire); |
277 | 15.4k | auto now = clock_->Now(); |
278 | 15.4k | if (PREDICT_FALSE(!now.ok())) { |
279 | 0 | LOG(DFATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. " |
280 | 0 | "Status: $0", now.status().ToString()); |
281 | 0 | return 0; |
282 | 0 | } |
283 | | // Making sure we don't return a negative value. |
284 | 15.4k | int64_t potential_skew = current_components.last_usec - now->time_point; |
285 | 15.4k | return std::max<int64_t>(0, potential_skew); |
286 | 15.4k | } |
287 | | |
288 | 0 | std::string HybridClockComponents::ToString() const { |
289 | 0 | return Format("{ last_usec: $0 logical: $1 }", last_usec, logical); |
290 | 0 | } |
291 | | |
292 | 0 | std::ostream& operator<<(std::ostream& out, const HybridClockComponents& components) { |
293 | 0 | return out << components.ToString(); |
294 | 0 | } |
295 | | |
296 | 124M | void HybridClockComponents::HandleLogicalComponentOverflow() { |
297 | 124M | if (logical > HybridTime::kLogicalBitMask) { |
298 | 5.62k | static constexpr uint64_t kMaxOverflowValue = 1ULL << HybridTime::kBitsForLogicalComponent; |
299 | 5.62k | if (logical > kMaxOverflowValue) { |
300 | 0 | LOG(FATAL) << "Logical component is too high: last_usec=" << last_usec |
301 | 0 | << "logical=" << logical << ", max allowed is " << kMaxOverflowValue; |
302 | 0 | } |
303 | 5.62k | YB_LOG_EVERY_N_SECS(WARNING, 5) << "Logical component overflow: " |
304 | 3 | << "last_usec=" << last_usec << ", logical=" << logical; |
305 | | |
306 | 5.62k | last_usec += logical >> HybridTime::kBitsForLogicalComponent; |
307 | 5.62k | logical &= HybridTime::kLogicalBitMask; |
308 | 5.62k | } |
309 | 124M | } |
310 | | |
311 | 25.7k | void HybridClock::RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) { |
312 | 25.7k | METRIC_hybrid_clock_hybrid_time.InstantiateFunctionGauge( |
313 | 25.7k | metric_entity, |
314 | 25.7k | Bind(&HybridClock::NowForMetrics, Unretained(this))) |
315 | 25.7k | ->AutoDetachToLastValue(&metric_detacher_); |
316 | 25.7k | METRIC_hybrid_clock_error.InstantiateFunctionGauge( |
317 | 25.7k | metric_entity, |
318 | 25.7k | Bind(&HybridClock::ErrorForMetrics, Unretained(this))) |
319 | 25.7k | ->AutoDetachToLastValue(&metric_detacher_); |
320 | 25.7k | METRIC_hybrid_clock_skew.InstantiateFunctionGauge( |
321 | 25.7k | metric_entity, |
322 | 25.7k | Bind(&HybridClock::SkewForMetrics, Unretained(this))) |
323 | 25.7k | ->AutoDetachToLastValue(&metric_detacher_); |
324 | 25.7k | } |
325 | | |
326 | 110M | LogicalTimeComponent HybridClock::GetLogicalValue(const HybridTime& hybrid_time) { |
327 | 110M | return hybrid_time.GetLogicalValue(); |
328 | 110M | } |
329 | | |
330 | 158M | MicrosTime HybridClock::GetPhysicalValueMicros(const HybridTime& hybrid_time) { |
331 | 158M | return hybrid_time.GetPhysicalValueMicros(); |
332 | 158M | } |
333 | | |
334 | 48.5M | uint64_t HybridClock::GetPhysicalValueNanos(const HybridTime& hybrid_time) { |
335 | | // Conversion to nanoseconds here is safe from overflow since 2^kBitsForLogicalComponent is less |
336 | | // than MonoTime::kNanosecondsPerMicrosecond. Although, we still just check for sanity. |
337 | 48.5M | uint64_t micros = hybrid_time.value() >> HybridTime::kBitsForLogicalComponent; |
338 | 48.5M | CHECK(micros <= std::numeric_limits<uint64_t>::max() / MonoTime::kNanosecondsPerMicrosecond); |
339 | 48.5M | return micros * MonoTime::kNanosecondsPerMicrosecond; |
340 | 48.5M | } |
341 | | |
342 | 403M | HybridTime HybridClock::HybridTimeFromMicroseconds(uint64_t micros) { |
343 | 403M | return HybridTime::FromMicros(micros); |
344 | 403M | } |
345 | | |
346 | | HybridTime HybridClock::HybridTimeFromMicrosecondsAndLogicalValue( |
347 | 16.9M | MicrosTime micros, LogicalTimeComponent logical_value) { |
348 | 16.9M | return HybridTime::FromMicrosecondsAndLogicalValue(micros, logical_value); |
349 | 16.9M | } |
350 | | |
351 | | // CAUTION: USE WITH EXTREME CARE!!! This function does not have overflow checking. |
352 | | // It is recommended to use CompareHybridClocksToDelta, below. |
353 | | HybridTime HybridClock::AddPhysicalTimeToHybridTime(const HybridTime& original, |
354 | 6.29M | const MonoDelta& to_add) { |
355 | 6.29M | uint64_t new_physical = GetPhysicalValueMicros(original) + to_add.ToMicroseconds(); |
356 | 6.29M | auto old_logical = GetLogicalValue(original); |
357 | 6.29M | return HybridTimeFromMicrosecondsAndLogicalValue(new_physical, old_logical); |
358 | 6.29M | } |
359 | | |
360 | | int HybridClock::CompareHybridClocksToDelta(const HybridTime& begin, |
361 | | const HybridTime& end, |
362 | 24.2M | const MonoDelta& delta) { |
363 | 24.2M | if (end < begin) { |
364 | 3 | return -1; |
365 | 3 | } |
366 | | // We use nanoseconds since MonoDelta has nanosecond granularity. |
367 | 24.2M | uint64_t begin_nanos = GetPhysicalValueNanos(begin); |
368 | 24.2M | uint64_t end_nanos = GetPhysicalValueNanos(end); |
369 | 24.2M | uint64_t delta_nanos = delta.ToNanoseconds(); |
370 | 24.2M | if (end_nanos - begin_nanos > delta_nanos) { |
371 | 17.9k | return 1; |
372 | 24.2M | } else if (end_nanos - begin_nanos == delta_nanos) { |
373 | 662 | uint64_t begin_logical = GetLogicalValue(begin); |
374 | 662 | uint64_t end_logical = GetLogicalValue(end); |
375 | 662 | if (end_logical > begin_logical) { |
376 | 1 | return 1; |
377 | 661 | } else if (end_logical < begin_logical) { |
378 | 1 | return -1; |
379 | 660 | } else { |
380 | 660 | return 0; |
381 | 660 | } |
382 | 24.2M | } else { |
383 | 24.2M | return -1; |
384 | 24.2M | } |
385 | 24.2M | } |
386 | | |
387 | 49.0k | void HybridClock::EnableClockSkewControl() { |
388 | 49.0k | clock_skew_control_enabled.store(true, std::memory_order_release); |
389 | 49.0k | } |
390 | | |
391 | | } // namespace server |
392 | | } // namespace yb |