/Users/deen/code/yugabyte-db/src/yb/tablet/mvcc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/mvcc.h" |
34 | | |
35 | | #include <boost/circular_buffer.hpp> |
36 | | #include <boost/variant.hpp> |
37 | | |
38 | | #include "yb/gutil/macros.h" |
39 | | |
40 | | #include "yb/util/atomic.h" |
41 | | #include "yb/util/compare_util.h" |
42 | | #include "yb/util/enums.h" |
43 | | #include "yb/util/flag_tags.h" |
44 | | #include "yb/util/format.h" |
45 | | #include "yb/util/logging.h" |
46 | | |
47 | | using namespace std::literals; |
48 | | |
49 | | DEFINE_test_flag(int64, mvcc_op_trace_num_items, 32, |
50 | | "Number of items to keep in an MvccManager operation trace. Set to 0 to disable " |
51 | | "MVCC operation tracing."); |
52 | | |
53 | | DEFINE_test_flag(int32, inject_mvcc_delay_add_leader_pending_ms, 0, |
54 | | "Inject delay after MvccManager::AddLeaderPending read clock."); |
55 | | |
56 | | namespace yb { |
57 | | namespace tablet { |
58 | | |
59 | | namespace { |
60 | | |
61 | | struct SetLeaderOnlyModeTraceItem { |
62 | | bool leader_only; |
63 | | |
64 | 0 | std::string ToString() const { |
65 | 0 | return Format("SetLeaderOnlyMode $0", YB_STRUCT_TO_STRING(leader_only)); |
66 | 0 | } |
67 | | }; |
68 | | |
69 | | struct SetLastReplicatedTraceItem { |
70 | | HybridTime ht; |
71 | | |
72 | 0 | std::string ToString() const { |
73 | 0 | return Format("SetLastReplicated $0", YB_STRUCT_TO_STRING(ht)); |
74 | 0 | } |
75 | | }; |
76 | | |
77 | | struct SetPropagatedSafeTimeOnFollowerTraceItem { |
78 | | HybridTime ht; |
79 | | |
80 | 0 | std::string ToString() const { |
81 | 0 | return Format("SetPropagatedSafeTimeOnFollower $0", YB_STRUCT_TO_STRING(ht)); |
82 | 0 | } |
83 | | }; |
84 | | |
85 | | struct UpdatePropagatedSafeTimeOnLeaderTraceItem { |
86 | | FixedHybridTimeLease ht_lease; |
87 | | HybridTime safe_time; |
88 | | |
89 | 0 | std::string ToString() const { |
90 | 0 | return Format("UpdatePropagatedSafeTimeOnLeader $0", |
91 | 0 | YB_STRUCT_TO_STRING(ht_lease, safe_time)); |
92 | 0 | } |
93 | | }; |
94 | | |
95 | | struct AddLeaderPendingTraceItem { |
96 | | HybridTime ht; |
97 | | OpId op_id; |
98 | | |
99 | 1 | std::string ToString() const { |
100 | 1 | return Format("AddLeaderPending $0", YB_STRUCT_TO_STRING(ht, op_id)); |
101 | 1 | } |
102 | | }; |
103 | | |
104 | | struct AddFollowerPendingTraceItem { |
105 | | HybridTime ht; |
106 | | OpId op_id; |
107 | | |
108 | 1 | std::string ToString() const { |
109 | 1 | return Format("AddFollowerPending $0", YB_STRUCT_TO_STRING(ht, op_id)); |
110 | 1 | } |
111 | | }; |
112 | | |
113 | | struct ReplicatedTraceItem { |
114 | | HybridTime ht; |
115 | | OpId op_id; |
116 | | |
117 | 2 | std::string ToString() const { |
118 | 2 | return Format("Replicated $0", YB_STRUCT_TO_STRING(ht, op_id)); |
119 | 2 | } |
120 | | }; |
121 | | |
122 | | struct AbortedTraceItem { |
123 | | HybridTime ht; |
124 | | OpId op_id; |
125 | | |
126 | 0 | std::string ToString() const { |
127 | 0 | return Format("Aborted $0", YB_STRUCT_TO_STRING(ht, op_id)); |
128 | 0 | } |
129 | | }; |
130 | | |
131 | | struct SafeTimeTraceItem { |
132 | | HybridTime min_allowed; |
133 | | CoarseTimePoint deadline; |
134 | | FixedHybridTimeLease ht_lease; |
135 | | HybridTime safe_time; |
136 | | |
137 | 5 | std::string ToString() const { |
138 | 5 | return Format("SafeTime $0", YB_STRUCT_TO_STRING(min_allowed, deadline, ht_lease, safe_time)); |
139 | 5 | } |
140 | | }; |
141 | | |
142 | | struct SafeTimeForFollowerTraceItem { |
143 | | HybridTime min_allowed; |
144 | | CoarseTimePoint deadline; |
145 | | SafeTimeWithSource safe_time_with_source; |
146 | | |
147 | 0 | std::string ToString() const { |
148 | 0 | return Format("SafeTimeForFollower $0", |
149 | 0 | YB_STRUCT_TO_STRING(min_allowed, deadline, safe_time_with_source)); |
150 | 0 | } |
151 | | }; |
152 | | |
153 | | struct LastReplicatedHybridTimeTraceItem { |
154 | | HybridTime last_replicated; |
155 | | |
156 | 0 | std::string ToString() const { |
157 | 0 | return Format("LastReplicatedHybridTime $0", YB_STRUCT_TO_STRING(last_replicated)); |
158 | 0 | } |
159 | | }; |
160 | | |
161 | | typedef boost::variant< |
162 | | SetLeaderOnlyModeTraceItem, |
163 | | SetLastReplicatedTraceItem, |
164 | | SetPropagatedSafeTimeOnFollowerTraceItem, |
165 | | UpdatePropagatedSafeTimeOnLeaderTraceItem, |
166 | | AddLeaderPendingTraceItem, |
167 | | AddFollowerPendingTraceItem, |
168 | | ReplicatedTraceItem, |
169 | | AbortedTraceItem, |
170 | | SafeTimeTraceItem, |
171 | | SafeTimeForFollowerTraceItem, |
172 | | LastReplicatedHybridTimeTraceItem |
173 | | > TraceItemVariant; |
174 | | |
175 | | class ItemPrintingVisitor : public boost::static_visitor<>{ |
176 | | public: |
177 | | explicit ItemPrintingVisitor(std::ostream* out, size_t index) |
178 | | : out_(*out), |
179 | 9 | index_(index) { |
180 | 9 | } |
181 | | |
182 | 9 | template<typename T> void operator()(const T& t) const { |
183 | 9 | out_ << index_ << ". " << t.ToString() << std::endl; |
184 | 9 | } Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_26SetLeaderOnlyModeTraceItemEEEvRKT_ Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_26SetLastReplicatedTraceItemEEEvRKT_ Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_40SetPropagatedSafeTimeOnFollowerTraceItemEEEvRKT_ Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_41UpdatePropagatedSafeTimeOnLeaderTraceItemEEEvRKT_ mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_25AddLeaderPendingTraceItemEEEvRKT_ Line | Count | Source | 182 | 1 | template<typename T> void operator()(const T& t) const { | 183 | 1 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 1 | } |
mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_27AddFollowerPendingTraceItemEEEvRKT_ Line | Count | Source | 182 | 1 | template<typename T> void operator()(const T& t) const { | 183 | 1 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 1 | } |
mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_19ReplicatedTraceItemEEEvRKT_ Line | Count | Source | 182 | 2 | template<typename T> void operator()(const T& t) const { | 183 | 2 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 2 | } |
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_16AbortedTraceItemEEEvRKT_ mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_17SafeTimeTraceItemEEEvRKT_ Line | Count | Source | 182 | 5 | template<typename T> void operator()(const T& t) const { | 183 | 5 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 5 | } |
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_28SafeTimeForFollowerTraceItemEEEvRKT_ Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_33LastReplicatedHybridTimeTraceItemEEEvRKT_ |
185 | | |
186 | | private: |
187 | | std::ostream& out_; |
188 | | size_t index_; |
189 | | }; |
190 | | |
191 | | } // namespace |
192 | | |
193 | 5 | std::string FixedHybridTimeLease::ToString() const { |
194 | 5 | return YB_STRUCT_TO_STRING(time, lease); |
195 | 5 | } |
196 | | |
197 | | class MvccManager::MvccOpTrace { |
198 | | public: |
199 | 89.2k | explicit MvccOpTrace(size_t capacity) : items_(capacity) {} |
200 | 48.1k | ~MvccOpTrace() = default; |
201 | | |
202 | 71.3M | void Add(TraceItemVariant v) { |
203 | 71.3M | items_.push_back(std::move(v)); |
204 | 71.3M | } |
205 | | |
206 | 2 | void DumpTrace(ostream* out) const { |
207 | 2 | if (items_.empty()) { |
208 | 1 | *out << "No MVCC operations" << std::endl; |
209 | 1 | return; |
210 | 1 | } |
211 | 1 | *out << "Recent " << items_.size() << " MVCC operations:" << std::endl; |
212 | 1 | size_t i = 1; |
213 | 9 | for (const auto& item : items_) { |
214 | 9 | boost::apply_visitor(ItemPrintingVisitor(out, i), item); |
215 | 9 | ++i; |
216 | 9 | } |
217 | 1 | } |
218 | | |
219 | | private: |
220 | | boost::circular_buffer_space_optimized<TraceItemVariant, std::allocator<TraceItemVariant>> items_; |
221 | | }; |
222 | | |
223 | | struct MvccManager::InvariantViolationLoggingHelper { |
224 | | const std::string& log_prefix; |
225 | | MvccOpTrace* mvcc_op_trace; |
226 | | }; |
227 | | |
228 | | std::ostream& operator<< ( |
229 | | std::ostream& out, |
230 | 0 | const MvccManager::InvariantViolationLoggingHelper& log_helper) { |
231 | 0 | out << log_helper.log_prefix; |
232 | 0 | log_helper.mvcc_op_trace->DumpTrace(&out); |
233 | 0 | return out; |
234 | 0 | } |
235 | | |
236 | | // ------------------------------------------------------------------------------------------------ |
237 | | // SafeTimeWithSource |
238 | | // ------------------------------------------------------------------------------------------------ |
239 | | |
240 | 0 | std::string SafeTimeWithSource::ToString() const { |
241 | 0 | return Format("{ safe_time: $0 source: $1 }", safe_time, source); |
242 | 0 | } |
243 | | |
244 | | // ------------------------------------------------------------------------------------------------ |
245 | | // MvccManager |
246 | | // ------------------------------------------------------------------------------------------------ |
247 | | |
248 | | MvccManager::MvccManager(std::string prefix, server::ClockPtr clock) |
249 | | : prefix_(std::move(prefix)), |
250 | 89.2k | clock_(std::move(clock)) { |
251 | 89.2k | auto op_trace_num_items = GetAtomicFlag(&FLAGS_TEST_mvcc_op_trace_num_items); |
252 | 89.2k | if (op_trace_num_items > 0) { |
253 | 89.2k | op_trace_ = std::make_unique<MvccManager::MvccOpTrace>(op_trace_num_items); |
254 | 89.2k | } |
255 | 89.2k | } |
256 | | |
257 | 48.1k | MvccManager::~MvccManager() { |
258 | 48.1k | } |
259 | | |
260 | 7.48M | void MvccManager::Replicated(HybridTime ht, const OpId& op_id) { |
261 | 882 | VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ", " << op_id << ")"; |
262 | 7.48M | CHECK(!op_id.empty()); |
263 | | |
264 | 7.48M | { |
265 | 7.48M | std::lock_guard<std::mutex> lock(mutex_); |
266 | 7.48M | if (op_trace_) { |
267 | 7.48M | op_trace_->Add(ReplicatedTraceItem { .ht = ht, .op_id = op_id }); |
268 | 7.48M | } |
269 | 18.4E | CHECK(!queue_.empty()) << InvariantViolationLogPrefix(); |
270 | 0 | CHECK_EQ(queue_.front(), |
271 | 0 | (QueueItem{ .hybrid_time = ht, .op_id = op_id })) << InvariantViolationLogPrefix(); |
272 | 7.48M | queue_.pop_front(); |
273 | 7.48M | last_replicated_ = ht; |
274 | 7.48M | } |
275 | 7.48M | cond_.notify_all(); |
276 | 7.48M | } |
277 | | |
278 | 20.6k | void MvccManager::Aborted(HybridTime ht, const OpId& op_id) { |
279 | 0 | VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ", " << op_id << ")"; |
280 | | |
281 | 20.6k | { |
282 | 20.6k | std::lock_guard<std::mutex> lock(mutex_); |
283 | 20.6k | if (op_trace_) { |
284 | 20.6k | op_trace_->Add(AbortedTraceItem { .ht = ht, .op_id = op_id }); |
285 | 20.6k | } |
286 | 0 | CHECK(!queue_.empty()) << InvariantViolationLogPrefix(); |
287 | 0 | CHECK_EQ(queue_.back(), |
288 | 0 | (QueueItem{ .hybrid_time = ht, .op_id = op_id })) |
289 | 0 | << InvariantViolationLogPrefix() << "It is allowed to abort only last operation"; |
290 | 20.6k | queue_.pop_back(); |
291 | 20.6k | } |
292 | 20.6k | cond_.notify_all(); |
293 | 20.6k | } |
294 | | |
295 | 1.16M | bool BadNextOpId(const OpId& prev, const OpId& next) { |
296 | 1.16M | if (prev.index >= next.index) { |
297 | 0 | return true; |
298 | 0 | } |
299 | 1.16M | if (prev.term > next.term) { |
300 | 0 | return true; |
301 | 0 | } |
302 | 1.16M | return false; |
303 | 1.16M | } |
304 | | |
305 | 2.53M | HybridTime MvccManager::AddLeaderPending(const OpId& op_id) { |
306 | 2.53M | std::lock_guard<std::mutex> lock(mutex_); |
307 | 2.53M | auto ht = clock_->Now(); |
308 | 2.53M | AtomicFlagSleepMs(&FLAGS_TEST_inject_mvcc_delay_add_leader_pending_ms); |
309 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << "(" << op_id << "), time: " << ht; |
310 | 2.53M | AddPending(ht, op_id, /* is_follower_side= */ false); |
311 | | |
312 | 2.53M | if (op_trace_) { |
313 | 2.53M | op_trace_->Add(AddLeaderPendingTraceItem { |
314 | 2.53M | .ht = ht, |
315 | 2.53M | .op_id = op_id, |
316 | 2.53M | }); |
317 | 2.53M | } |
318 | | |
319 | 2.53M | return ht; |
320 | 2.53M | } |
321 | | |
322 | 4.97M | void MvccManager::AddFollowerPending(HybridTime ht, const OpId& op_id) { |
323 | 4.97M | std::lock_guard<std::mutex> lock(mutex_); |
324 | 803 | VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ", " << op_id << ")"; |
325 | | |
326 | 4.97M | AddPending(ht, op_id, /* is_follower_side= */ true); |
327 | | |
328 | 4.97M | if (op_trace_) { |
329 | 4.96M | op_trace_->Add(AddFollowerPendingTraceItem { |
330 | 4.96M | .ht = ht, |
331 | 4.96M | .op_id = op_id, |
332 | 4.96M | }); |
333 | 4.96M | } |
334 | 4.97M | } |
335 | | |
336 | 7.51M | void MvccManager::AddPending(HybridTime ht, const OpId& op_id, bool is_follower_side) { |
337 | 7.51M | CHECK(!op_id.empty()); |
338 | | |
339 | 6.33M | HybridTime last_ht_in_queue = queue_.empty() ? HybridTime::kMin : queue_.back().hybrid_time; |
340 | | |
341 | 7.51M | HybridTime sanity_check_lower_bound = |
342 | 7.51M | std::max({ |
343 | 7.51M | max_safe_time_returned_with_lease_.safe_time, |
344 | 7.51M | max_safe_time_returned_without_lease_.safe_time, |
345 | 7.51M | max_safe_time_returned_for_follower_.safe_time, |
346 | 7.51M | propagated_safe_time_, |
347 | 7.51M | last_replicated_, |
348 | 7.51M | last_ht_in_queue}); |
349 | | |
350 | 7.51M | if (ht <= sanity_check_lower_bound) { |
351 | 0 | auto get_details_msg = [&](bool drain_aborted) { |
352 | 0 | std::ostringstream ss; |
353 | 0 | #define LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(full, safe_time) \ |
354 | 0 | "\n " << EXPR_VALUE_FOR_LOG(full) \ |
355 | 0 | << "\n " << (ht <= safe_time ? "!!! " : "") << EXPR_VALUE_FOR_LOG(ht <= safe_time) \ |
356 | 0 | << "\n " << EXPR_VALUE_FOR_LOG( \ |
357 | 0 | static_cast<int64_t>(ht.ToUint64() - safe_time.ToUint64())) \ |
358 | 0 | << "\n " << EXPR_VALUE_FOR_LOG(ht.PhysicalDiff(safe_time)) \ |
359 | 0 | << "\n " |
360 | |
|
361 | 0 | #define LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t.safe_time) |
362 | 0 | #define LOG_INFO_FOR_HT_LOWER_BOUND(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t) |
363 | |
|
364 | 0 | ss << "New operation's hybrid time too low: " << ht << ", op id: " << op_id |
365 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_with_lease_) |
366 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_without_lease_) |
367 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_for_follower_) |
368 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND(last_replicated_) |
369 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND(last_ht_in_queue) |
370 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND(propagated_safe_time_) |
371 | 0 | << "\n " << EXPR_VALUE_FOR_LOG(queue_.size()) |
372 | 0 | << "\n " << EXPR_VALUE_FOR_LOG(queue_); |
373 | 0 | return ss.str(); |
374 | 0 | #undef LOG_INFO_FOR_HT_LOWER_BOUND |
375 | 0 | }; |
376 | |
|
377 | | #ifdef NDEBUG |
378 | | // In release mode, let's try to avoid crashing if possible if we ever hit this situation. |
379 | | // On the leader side, we can assign a timestamp that is high enough. |
380 | | if (!is_follower_side && |
381 | | sanity_check_lower_bound && |
382 | | sanity_check_lower_bound != HybridTime::kMax) { |
383 | | HybridTime incremented_hybrid_time = sanity_check_lower_bound.Incremented(); |
384 | | YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix() |
385 | | << "Assigning an artificially incremented hybrid time: " << incremented_hybrid_time |
386 | | << ". This needs to be investigated. " << get_details_msg(/* drain_aborted */ false); |
387 | | ht = incremented_hybrid_time; |
388 | | } |
389 | | #endif |
390 | |
|
391 | 0 | if (ht <= sanity_check_lower_bound) { |
392 | 0 | LOG_WITH_PREFIX(FATAL) << InvariantViolationLogPrefix() |
393 | 0 | << get_details_msg(/* drain_aborted */ true); |
394 | 0 | } |
395 | 0 | } |
396 | | |
397 | 2.18k | LOG_IF_WITH_PREFIX(DFATAL, |
398 | 2.18k | !queue_.empty() && BadNextOpId(queue_.back().op_id, op_id)) |
399 | 2.18k | << "Op sequence failure: " << AsString(queue_.back().op_id) << " followed by " |
400 | 2.18k | << AsString(op_id) << " " << InvariantViolationLogPrefix(); |
401 | | |
402 | 7.51M | queue_.push_back(QueueItem { |
403 | 7.51M | .hybrid_time = ht, |
404 | 7.51M | .op_id = op_id, |
405 | 7.51M | }); |
406 | 7.51M | } |
407 | | |
408 | 1.72k | void MvccManager::SetLastReplicated(HybridTime ht) { |
409 | 0 | VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ")"; |
410 | | |
411 | 1.72k | { |
412 | 1.72k | std::lock_guard<std::mutex> lock(mutex_); |
413 | 1.72k | if (op_trace_) { |
414 | 1.72k | op_trace_->Add(SetLastReplicatedTraceItem { .ht = ht }); |
415 | 1.72k | } |
416 | 1.72k | last_replicated_ = ht; |
417 | 1.72k | } |
418 | 1.72k | cond_.notify_all(); |
419 | 1.72k | } |
420 | | |
421 | 10.1M | void MvccManager::SetPropagatedSafeTimeOnFollower(HybridTime ht) { |
422 | 2.15k | VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ")"; |
423 | | |
424 | 10.1M | { |
425 | 10.1M | std::lock_guard<std::mutex> lock(mutex_); |
426 | 10.1M | if (op_trace_) { |
427 | 10.1M | op_trace_->Add(SetPropagatedSafeTimeOnFollowerTraceItem { .ht = ht }); |
428 | 10.1M | } |
429 | 10.1M | if (ht >= propagated_safe_time_) { |
430 | 10.1M | propagated_safe_time_ = ht; |
431 | 8.80k | } else { |
432 | 8.80k | LOG_WITH_PREFIX(WARNING) |
433 | 8.80k | << "Received propagated safe time " << ht << " less than the old value: " |
434 | 8.80k | << propagated_safe_time_ << ". This could happen on followers when a new leader " |
435 | 8.80k | << "is elected."; |
436 | 8.80k | } |
437 | 10.1M | } |
438 | 10.1M | cond_.notify_all(); |
439 | 10.1M | } |
440 | | |
441 | | // NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock. |
442 | | void MvccManager::UpdatePropagatedSafeTimeOnLeader(const FixedHybridTimeLease& ht_lease) |
443 | 15.1M | NO_THREAD_SAFETY_ANALYSIS { |
444 | 1.03k | VLOG_WITH_PREFIX(1) << __func__ << "(" << ht_lease << ")"; |
445 | | |
446 | 15.1M | { |
447 | 15.1M | std::unique_lock<std::mutex> lock(mutex_); |
448 | 15.1M | auto safe_time = DoGetSafeTime(HybridTime::kMin, // min_allowed |
449 | 15.1M | CoarseTimePoint::max(), // deadline |
450 | 15.1M | ht_lease, |
451 | 15.1M | &lock); |
452 | 15.1M | #ifndef NDEBUG |
453 | | // This should only be called from RaftConsensus::UpdateMajorityReplicated, and ht_lease passed |
454 | | // in here should keep increasing, so we should not see propagated_safe_time_ going backwards. |
455 | 0 | CHECK_GE(safe_time, propagated_safe_time_) |
456 | 0 | << InvariantViolationLogPrefix() |
457 | 0 | << "ht_lease: " << ht_lease; |
458 | 15.1M | propagated_safe_time_ = safe_time; |
459 | | #else |
460 | | // Do not crash in production. |
461 | | if (safe_time < propagated_safe_time_) { |
462 | | YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix() |
463 | | << "Previously saw " << EXPR_VALUE_FOR_LOG(propagated_safe_time_) |
464 | | << ", but now safe time is " << safe_time; |
465 | | } else { |
466 | | propagated_safe_time_ = safe_time; |
467 | | } |
468 | | #endif |
469 | | |
470 | 15.1M | if (op_trace_) { |
471 | 15.1M | op_trace_->Add(UpdatePropagatedSafeTimeOnLeaderTraceItem { |
472 | 15.1M | .ht_lease = ht_lease, |
473 | 15.1M | .safe_time = safe_time |
474 | 15.1M | }); |
475 | 15.1M | } |
476 | 15.1M | } |
477 | 15.1M | cond_.notify_all(); |
478 | 15.1M | } |
479 | | |
480 | 98.8k | void MvccManager::SetLeaderOnlyMode(bool leader_only) { |
481 | 98.8k | std::lock_guard<std::mutex> lock(mutex_); |
482 | 98.8k | if (op_trace_) { |
483 | 98.8k | op_trace_->Add(SetLeaderOnlyModeTraceItem { |
484 | 98.8k | .leader_only = leader_only |
485 | 98.8k | }); |
486 | 98.8k | } |
487 | 98.8k | leader_only_mode_ = leader_only; |
488 | 98.8k | } |
489 | | |
490 | | // NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock. |
491 | | HybridTime MvccManager::SafeTimeForFollower( |
492 | 453k | HybridTime min_allowed, CoarseTimePoint deadline) const NO_THREAD_SAFETY_ANALYSIS { |
493 | 453k | std::unique_lock<std::mutex> lock(mutex_); |
494 | | |
495 | 453k | if (leader_only_mode_) { |
496 | | // If there are no followers (RF == 1), use SafeTime() because propagated_safe_time_ might not |
497 | | // have a valid value. |
498 | 5.21k | return DoGetSafeTime(min_allowed, deadline, FixedHybridTimeLease(), &lock); |
499 | 5.21k | } |
500 | | |
501 | 448k | SafeTimeWithSource result; |
502 | 450k | auto predicate = [this, &result, min_allowed] { |
503 | | // last_replicated_ is updated earlier than propagated_safe_time_, so because of concurrency it |
504 | | // could be greater than propagated_safe_time_. |
505 | 450k | if (propagated_safe_time_ > last_replicated_) { |
506 | 318k | if (queue_.empty() || propagated_safe_time_ < queue_.front().hybrid_time) { |
507 | 309k | result.safe_time = propagated_safe_time_; |
508 | 309k | result.source = SafeTimeSource::kPropagated; |
509 | 9.43k | } else { |
510 | 9.43k | result.safe_time = queue_.front().hybrid_time.Decremented(); |
511 | 9.43k | result.source = SafeTimeSource::kNextInQueue; |
512 | 9.43k | } |
513 | 131k | } else { |
514 | 131k | result.safe_time = last_replicated_; |
515 | 131k | result.source = SafeTimeSource::kLastReplicated; |
516 | 131k | } |
517 | 450k | return result.safe_time >= min_allowed; |
518 | 450k | }; |
519 | 448k | if (deadline == CoarseTimePoint::max()) { |
520 | 112k | cond_.wait(lock, predicate); |
521 | 335k | } else if (!cond_.wait_until(lock, deadline, predicate)) { |
522 | 0 | return HybridTime::kInvalid; |
523 | 0 | } |
524 | 18.4E | VLOG_WITH_PREFIX(1) << "SafeTimeForFollower(" << min_allowed |
525 | 18.4E | << "), result = " << result.ToString(); |
526 | 0 | CHECK_GE(result.safe_time, max_safe_time_returned_for_follower_.safe_time) |
527 | 0 | << InvariantViolationLogPrefix() |
528 | 0 | << "result: " << result.ToString() |
529 | 0 | << ", max_safe_time_returned_for_follower_: " |
530 | 0 | << max_safe_time_returned_for_follower_.ToString(); |
531 | 448k | max_safe_time_returned_for_follower_ = result; |
532 | 448k | if (op_trace_) { |
533 | 448k | op_trace_->Add(SafeTimeForFollowerTraceItem { |
534 | 448k | .min_allowed = min_allowed, |
535 | 448k | .deadline = deadline, |
536 | 448k | .safe_time_with_source = result |
537 | 448k | }); |
538 | 448k | } |
539 | 448k | return result.safe_time; |
540 | 448k | } |
541 | | |
542 | | // NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock. |
543 | | HybridTime MvccManager::SafeTime( |
544 | | HybridTime min_allowed, |
545 | | CoarseTimePoint deadline, |
546 | 18.2M | const FixedHybridTimeLease& ht_lease) const NO_THREAD_SAFETY_ANALYSIS { |
547 | 18.2M | std::unique_lock<std::mutex> lock(mutex_); |
548 | 18.2M | auto safe_time = DoGetSafeTime(min_allowed, deadline, ht_lease, &lock); |
549 | 18.2M | if (op_trace_) { |
550 | 18.2M | op_trace_->Add(SafeTimeTraceItem { |
551 | 18.2M | .min_allowed = min_allowed, |
552 | 18.2M | .deadline = deadline, |
553 | 18.2M | .ht_lease = ht_lease, |
554 | 18.2M | .safe_time = safe_time |
555 | 18.2M | }); |
556 | 18.2M | } |
557 | 18.2M | return safe_time; |
558 | 18.2M | } |
559 | | |
560 | | HybridTime MvccManager::DoGetSafeTime(const HybridTime min_allowed, |
561 | | const CoarseTimePoint deadline, |
562 | | const FixedHybridTimeLease& ht_lease, |
563 | 33.3M | std::unique_lock<std::mutex>* lock) const { |
564 | 33.3M | DCHECK_ONLY_NOTNULL(lock); |
565 | 12.3k | CHECK(ht_lease.lease.is_valid()) << InvariantViolationLogPrefix(); |
566 | 0 | CHECK_LE(min_allowed, ht_lease.lease) << InvariantViolationLogPrefix(); |
567 | | |
568 | 33.3M | const bool has_lease = !ht_lease.empty(); |
569 | | // Because different calls that have current hybrid time leader lease as an argument can come to |
570 | | // us out of order, we might see an older value of hybrid time leader lease expiration after a |
571 | | // newer value. We mitigate this by always using the highest value we've seen. |
572 | 33.3M | if (has_lease) { |
573 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !ht_lease.time.is_valid()) << "Bad ht lease: " << ht_lease; |
574 | 32.9M | } |
575 | | |
576 | 33.3M | HybridTime result; |
577 | 33.3M | SafeTimeSource source = SafeTimeSource::kUnknown; |
578 | 33.3M | auto predicate = [this, &result, &source, min_allowed, ht_lease, has_lease] { |
579 | 33.3M | if (queue_.empty()) { |
580 | 24.0M | result = ht_lease.time.is_valid() |
581 | 23.6M | ? std::max(max_safe_time_returned_with_lease_.safe_time, ht_lease.time) |
582 | 381k | : clock_->Now(); |
583 | 24.0M | source = SafeTimeSource::kNow; |
584 | 8.09k | VLOG_WITH_PREFIX(2) << "DoGetSafeTime, Now: " << result; |
585 | 9.34M | } else { |
586 | 9.34M | result = queue_.front().hybrid_time.Decremented(); |
587 | 9.34M | source = SafeTimeSource::kNextInQueue; |
588 | 18.4E | VLOG_WITH_PREFIX(2) << "DoGetSafeTime, Queue front (decremented): " << result; |
589 | 9.34M | } |
590 | | |
591 | 33.3M | if (has_lease) { |
592 | 32.9M | auto used_lease = std::max({ht_lease.lease, max_safe_time_returned_with_lease_.safe_time}); |
593 | 32.9M | if (result > used_lease) { |
594 | 3.17M | result = used_lease; |
595 | 3.17M | source = SafeTimeSource::kHybridTimeLease; |
596 | 3.17M | } |
597 | 32.9M | } |
598 | | |
599 | | // This function could be invoked at a follower, so it has a very old ht_lease. In this case it |
600 | | // is safe to read at least at last_replicated_. |
601 | 33.3M | result = std::max(result, last_replicated_); |
602 | | |
603 | 33.3M | return result >= min_allowed; |
604 | 33.3M | }; |
605 | | |
606 | | // In the case of an empty queue, the safe hybrid time to read at is only limited by hybrid time |
607 | | // ht_lease, which is by definition higher than min_allowed, so we would not get blocked. |
608 | 33.3M | if (deadline == CoarseTimePoint::max()) { |
609 | 32.7M | cond_.wait(*lock, predicate); |
610 | 595k | } else if (!cond_.wait_until(*lock, deadline, predicate)) { |
611 | 1 | return HybridTime::kInvalid; |
612 | 1 | } |
613 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(1) |
614 | 18.4E | << "(" << min_allowed << ", " << ht_lease << "), result = " << result; |
615 | | |
616 | 32.9M | auto enforced_min_time = has_lease ? max_safe_time_returned_with_lease_.safe_time |
617 | 419k | : max_safe_time_returned_without_lease_.safe_time; |
618 | 0 | CHECK_GE(result, enforced_min_time) |
619 | 0 | << InvariantViolationLogPrefix() |
620 | 0 | << ": " << EXPR_VALUE_FOR_LOG(has_lease) |
621 | 0 | << ", " << EXPR_VALUE_FOR_LOG(enforced_min_time.ToUint64() - result.ToUint64()) |
622 | 0 | << ", " << EXPR_VALUE_FOR_LOG(ht_lease) |
623 | 0 | << ", " << EXPR_VALUE_FOR_LOG(last_replicated_) |
624 | 0 | << ", " << EXPR_VALUE_FOR_LOG(clock_->Now()) |
625 | 0 | << ", " << EXPR_VALUE_FOR_LOG(ToString(deadline)) |
626 | 0 | << ", " << EXPR_VALUE_FOR_LOG(queue_.size()) |
627 | 0 | << ", " << EXPR_VALUE_FOR_LOG(queue_); |
628 | | |
629 | 33.3M | if (has_lease) { |
630 | 32.9M | max_safe_time_returned_with_lease_ = { result, source }; |
631 | 418k | } else { |
632 | 418k | max_safe_time_returned_without_lease_ = { result, source }; |
633 | 418k | } |
634 | 33.3M | return result; |
635 | 33.3M | } |
636 | | |
637 | 12.3M | HybridTime MvccManager::LastReplicatedHybridTime() const { |
638 | 12.3M | std::lock_guard<std::mutex> lock(mutex_); |
639 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << "(), result = " << last_replicated_; |
640 | 12.3M | if (op_trace_) { |
641 | 12.3M | op_trace_->Add(LastReplicatedHybridTimeTraceItem { |
642 | 12.3M | .last_replicated = last_replicated_ |
643 | 12.3M | }); |
644 | 12.3M | } |
645 | 12.3M | return last_replicated_; |
646 | 12.3M | } |
647 | | |
648 | | // Using NO_THREAD_SAFETY_ANALYSIS here because we're only reading op_trace_ here and it is set |
649 | | // in the constructor. |
650 | 0 | MvccManager::InvariantViolationLoggingHelper MvccManager::InvariantViolationLogPrefix() const { |
651 | 0 | return { prefix_, op_trace_.get() }; |
652 | 0 | } |
653 | | |
654 | | // Ditto regarding NO_THREAD_SAFETY_ANALYSIS. |
655 | 2 | void MvccManager::TEST_DumpTrace(std::ostream* out) NO_THREAD_SAFETY_ANALYSIS { |
656 | 2 | if (op_trace_) |
657 | 2 | op_trace_->DumpTrace(out); |
658 | 2 | } |
659 | | |
660 | 0 | std::string MvccManager::QueueItem::ToString() const { |
661 | 0 | return YB_STRUCT_TO_STRING(hybrid_time, op_id); |
662 | 0 | } |
663 | | |
664 | 7.51M | bool MvccManager::QueueItem::Eq(const MvccManager::QueueItem& rhs) const { |
665 | 7.51M | const auto& lhs = *this; |
666 | 7.51M | return YB_STRUCT_EQUALS(hybrid_time, op_id); |
667 | 7.51M | } |
668 | | |
669 | | } // namespace tablet |
670 | | } // namespace yb |