/Users/deen/code/yugabyte-db/src/yb/tablet/mvcc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/mvcc.h" |
34 | | |
35 | | #include <boost/circular_buffer.hpp> |
36 | | #include <boost/variant.hpp> |
37 | | |
38 | | #include "yb/gutil/macros.h" |
39 | | |
40 | | #include "yb/util/atomic.h" |
41 | | #include "yb/util/compare_util.h" |
42 | | #include "yb/util/enums.h" |
43 | | #include "yb/util/flag_tags.h" |
44 | | #include "yb/util/format.h" |
45 | | #include "yb/util/logging.h" |
46 | | |
47 | | using namespace std::literals; |
48 | | |
49 | | DEFINE_test_flag(int64, mvcc_op_trace_num_items, 32, |
50 | | "Number of items to keep in an MvccManager operation trace. Set to 0 to disable " |
51 | | "MVCC operation tracing."); |
52 | | |
53 | | DEFINE_test_flag(int32, inject_mvcc_delay_add_leader_pending_ms, 0, |
54 | | "Inject delay after MvccManager::AddLeaderPending read clock."); |
55 | | |
56 | | namespace yb { |
57 | | namespace tablet { |
58 | | |
59 | | namespace { |
60 | | |
61 | | struct SetLeaderOnlyModeTraceItem { |
62 | | bool leader_only; |
63 | | |
64 | 0 | std::string ToString() const { |
65 | 0 | return Format("SetLeaderOnlyMode $0", YB_STRUCT_TO_STRING(leader_only)); |
66 | 0 | } |
67 | | }; |
68 | | |
69 | | struct SetLastReplicatedTraceItem { |
70 | | HybridTime ht; |
71 | | |
72 | 0 | std::string ToString() const { |
73 | 0 | return Format("SetLastReplicated $0", YB_STRUCT_TO_STRING(ht)); |
74 | 0 | } |
75 | | }; |
76 | | |
77 | | struct SetPropagatedSafeTimeOnFollowerTraceItem { |
78 | | HybridTime ht; |
79 | | |
80 | 0 | std::string ToString() const { |
81 | 0 | return Format("SetPropagatedSafeTimeOnFollower $0", YB_STRUCT_TO_STRING(ht)); |
82 | 0 | } |
83 | | }; |
84 | | |
85 | | struct UpdatePropagatedSafeTimeOnLeaderTraceItem { |
86 | | FixedHybridTimeLease ht_lease; |
87 | | HybridTime safe_time; |
88 | | |
89 | 0 | std::string ToString() const { |
90 | 0 | return Format("UpdatePropagatedSafeTimeOnLeader $0", |
91 | 0 | YB_STRUCT_TO_STRING(ht_lease, safe_time)); |
92 | 0 | } |
93 | | }; |
94 | | |
95 | | struct AddLeaderPendingTraceItem { |
96 | | HybridTime ht; |
97 | | OpId op_id; |
98 | | |
99 | 1 | std::string ToString() const { |
100 | 1 | return Format("AddLeaderPending $0", YB_STRUCT_TO_STRING(ht, op_id)); |
101 | 1 | } |
102 | | }; |
103 | | |
104 | | struct AddFollowerPendingTraceItem { |
105 | | HybridTime ht; |
106 | | OpId op_id; |
107 | | |
108 | 1 | std::string ToString() const { |
109 | 1 | return Format("AddFollowerPending $0", YB_STRUCT_TO_STRING(ht, op_id)); |
110 | 1 | } |
111 | | }; |
112 | | |
113 | | struct ReplicatedTraceItem { |
114 | | HybridTime ht; |
115 | | OpId op_id; |
116 | | |
117 | 2 | std::string ToString() const { |
118 | 2 | return Format("Replicated $0", YB_STRUCT_TO_STRING(ht, op_id)); |
119 | 2 | } |
120 | | }; |
121 | | |
122 | | struct AbortedTraceItem { |
123 | | HybridTime ht; |
124 | | OpId op_id; |
125 | | |
126 | 0 | std::string ToString() const { |
127 | 0 | return Format("Aborted $0", YB_STRUCT_TO_STRING(ht, op_id)); |
128 | 0 | } |
129 | | }; |
130 | | |
131 | | struct SafeTimeTraceItem { |
132 | | HybridTime min_allowed; |
133 | | CoarseTimePoint deadline; |
134 | | FixedHybridTimeLease ht_lease; |
135 | | HybridTime safe_time; |
136 | | |
137 | 5 | std::string ToString() const { |
138 | 5 | return Format("SafeTime $0", YB_STRUCT_TO_STRING(min_allowed, deadline, ht_lease, safe_time)); |
139 | 5 | } |
140 | | }; |
141 | | |
142 | | struct SafeTimeForFollowerTraceItem { |
143 | | HybridTime min_allowed; |
144 | | CoarseTimePoint deadline; |
145 | | SafeTimeWithSource safe_time_with_source; |
146 | | |
147 | 0 | std::string ToString() const { |
148 | 0 | return Format("SafeTimeForFollower $0", |
149 | 0 | YB_STRUCT_TO_STRING(min_allowed, deadline, safe_time_with_source)); |
150 | 0 | } |
151 | | }; |
152 | | |
153 | | struct LastReplicatedHybridTimeTraceItem { |
154 | | HybridTime last_replicated; |
155 | | |
156 | 0 | std::string ToString() const { |
157 | 0 | return Format("LastReplicatedHybridTime $0", YB_STRUCT_TO_STRING(last_replicated)); |
158 | 0 | } |
159 | | }; |
160 | | |
161 | | typedef boost::variant< |
162 | | SetLeaderOnlyModeTraceItem, |
163 | | SetLastReplicatedTraceItem, |
164 | | SetPropagatedSafeTimeOnFollowerTraceItem, |
165 | | UpdatePropagatedSafeTimeOnLeaderTraceItem, |
166 | | AddLeaderPendingTraceItem, |
167 | | AddFollowerPendingTraceItem, |
168 | | ReplicatedTraceItem, |
169 | | AbortedTraceItem, |
170 | | SafeTimeTraceItem, |
171 | | SafeTimeForFollowerTraceItem, |
172 | | LastReplicatedHybridTimeTraceItem |
173 | | > TraceItemVariant; |
174 | | |
175 | | class ItemPrintingVisitor : public boost::static_visitor<>{ |
176 | | public: |
177 | | explicit ItemPrintingVisitor(std::ostream* out, size_t index) |
178 | | : out_(*out), |
179 | 9 | index_(index) { |
180 | 9 | } |
181 | | |
182 | 9 | template<typename T> void operator()(const T& t) const { |
183 | 9 | out_ << index_ << ". " << t.ToString() << std::endl; |
184 | 9 | } Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SetLeaderOnlyModeTraceItem>(yb::tablet::(anonymous namespace)::SetLeaderOnlyModeTraceItem const&) const Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SetLastReplicatedTraceItem>(yb::tablet::(anonymous namespace)::SetLastReplicatedTraceItem const&) const Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SetPropagatedSafeTimeOnFollowerTraceItem>(yb::tablet::(anonymous namespace)::SetPropagatedSafeTimeOnFollowerTraceItem const&) const Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::UpdatePropagatedSafeTimeOnLeaderTraceItem>(yb::tablet::(anonymous namespace)::UpdatePropagatedSafeTimeOnLeaderTraceItem const&) const mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::AddLeaderPendingTraceItem>(yb::tablet::(anonymous namespace)::AddLeaderPendingTraceItem const&) const Line | Count | Source | 182 | 1 | template<typename T> void operator()(const T& t) const { | 183 | 1 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 1 | } |
mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::AddFollowerPendingTraceItem>(yb::tablet::(anonymous namespace)::AddFollowerPendingTraceItem const&) const Line | Count | Source | 182 | 1 | template<typename T> void operator()(const T& t) const { | 183 | 1 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 1 | } |
mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::ReplicatedTraceItem>(yb::tablet::(anonymous namespace)::ReplicatedTraceItem const&) const Line | Count | Source | 182 | 2 | template<typename T> void operator()(const T& t) const { | 183 | 2 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 2 | } |
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::AbortedTraceItem>(yb::tablet::(anonymous namespace)::AbortedTraceItem const&) const mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SafeTimeTraceItem>(yb::tablet::(anonymous namespace)::SafeTimeTraceItem const&) const Line | Count | Source | 182 | 5 | template<typename T> void operator()(const T& t) const { | 183 | 5 | out_ << index_ << ". " << t.ToString() << std::endl; | 184 | 5 | } |
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SafeTimeForFollowerTraceItem>(yb::tablet::(anonymous namespace)::SafeTimeForFollowerTraceItem const&) const Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::LastReplicatedHybridTimeTraceItem>(yb::tablet::(anonymous namespace)::LastReplicatedHybridTimeTraceItem const&) const |
185 | | |
186 | | private: |
187 | | std::ostream& out_; |
188 | | size_t index_; |
189 | | }; |
190 | | |
191 | | } // namespace |
192 | | |
193 | 5 | std::string FixedHybridTimeLease::ToString() const { |
194 | 5 | return YB_STRUCT_TO_STRING(time, lease); |
195 | 5 | } |
196 | | |
197 | | class MvccManager::MvccOpTrace { |
198 | | public: |
199 | 150k | explicit MvccOpTrace(size_t capacity) : items_(capacity) {} |
200 | 75.9k | ~MvccOpTrace() = default; |
201 | | |
202 | 155M | void Add(TraceItemVariant v) { |
203 | 155M | items_.push_back(std::move(v)); |
204 | 155M | } |
205 | | |
206 | 2 | void DumpTrace(ostream* out) const { |
207 | 2 | if (items_.empty()) { |
208 | 1 | *out << "No MVCC operations" << std::endl; |
209 | 1 | return; |
210 | 1 | } |
211 | 1 | *out << "Recent " << items_.size() << " MVCC operations:" << std::endl; |
212 | 1 | size_t i = 1; |
213 | 9 | for (const auto& item : items_) { |
214 | 9 | boost::apply_visitor(ItemPrintingVisitor(out, i), item); |
215 | 9 | ++i; |
216 | 9 | } |
217 | 1 | } |
218 | | |
219 | | private: |
220 | | boost::circular_buffer_space_optimized<TraceItemVariant, std::allocator<TraceItemVariant>> items_; |
221 | | }; |
222 | | |
223 | | struct MvccManager::InvariantViolationLoggingHelper { |
224 | | const std::string& log_prefix; |
225 | | MvccOpTrace* mvcc_op_trace; |
226 | | }; |
227 | | |
228 | | std::ostream& operator<< ( |
229 | | std::ostream& out, |
230 | 0 | const MvccManager::InvariantViolationLoggingHelper& log_helper) { |
231 | 0 | out << log_helper.log_prefix; |
232 | 0 | log_helper.mvcc_op_trace->DumpTrace(&out); |
233 | 0 | return out; |
234 | 0 | } |
235 | | |
236 | | // ------------------------------------------------------------------------------------------------ |
237 | | // SafeTimeWithSource |
238 | | // ------------------------------------------------------------------------------------------------ |
239 | | |
240 | 0 | std::string SafeTimeWithSource::ToString() const { |
241 | 0 | return Format("{ safe_time: $0 source: $1 }", safe_time, source); |
242 | 0 | } |
243 | | |
244 | | // ------------------------------------------------------------------------------------------------ |
245 | | // MvccManager |
246 | | // ------------------------------------------------------------------------------------------------ |
247 | | |
248 | | MvccManager::MvccManager(std::string prefix, server::ClockPtr clock) |
249 | | : prefix_(std::move(prefix)), |
250 | 150k | clock_(std::move(clock)) { |
251 | 150k | auto op_trace_num_items = GetAtomicFlag(&FLAGS_TEST_mvcc_op_trace_num_items); |
252 | 150k | if (op_trace_num_items > 0) { |
253 | 150k | op_trace_ = std::make_unique<MvccManager::MvccOpTrace>(op_trace_num_items); |
254 | 150k | } |
255 | 150k | } |
256 | | |
257 | 76.0k | MvccManager::~MvccManager() { |
258 | 76.0k | } |
259 | | |
260 | 13.4M | void MvccManager::Replicated(HybridTime ht, const OpId& op_id) { |
261 | 13.4M | VLOG_WITH_PREFIX991 (1) << __func__ << "(" << ht << ", " << op_id << ")"991 ; |
262 | 13.4M | CHECK(!op_id.empty()); |
263 | | |
264 | 13.4M | { |
265 | 13.4M | std::lock_guard<std::mutex> lock(mutex_); |
266 | 13.4M | if (op_trace_) { |
267 | 13.4M | op_trace_->Add(ReplicatedTraceItem { .ht = ht, .op_id = op_id }); |
268 | 13.4M | } |
269 | 18.4E | CHECK(!queue_.empty()) << InvariantViolationLogPrefix(); |
270 | 13.4M | CHECK_EQ(queue_.front(), |
271 | 0 | (QueueItem{ .hybrid_time = ht, .op_id = op_id })) << InvariantViolationLogPrefix(); |
272 | 13.4M | queue_.pop_front(); |
273 | 13.4M | last_replicated_ = ht; |
274 | 13.4M | } |
275 | 13.4M | cond_.notify_all(); |
276 | 13.4M | } |
277 | | |
278 | 20.4k | void MvccManager::Aborted(HybridTime ht, const OpId& op_id) { |
279 | 20.4k | VLOG_WITH_PREFIX0 (1) << __func__ << "(" << ht << ", " << op_id << ")"0 ; |
280 | | |
281 | 20.4k | { |
282 | 20.4k | std::lock_guard<std::mutex> lock(mutex_); |
283 | 20.4k | if (op_trace_) { |
284 | 20.4k | op_trace_->Add(AbortedTraceItem { .ht = ht, .op_id = op_id }); |
285 | 20.4k | } |
286 | 20.4k | CHECK(!queue_.empty()) << InvariantViolationLogPrefix()0 ; |
287 | 20.4k | CHECK_EQ(queue_.back(), |
288 | 0 | (QueueItem{ .hybrid_time = ht, .op_id = op_id })) |
289 | 0 | << InvariantViolationLogPrefix() << "It is allowed to abort only last operation"; |
290 | 20.4k | queue_.pop_back(); |
291 | 20.4k | } |
292 | 20.4k | cond_.notify_all(); |
293 | 20.4k | } |
294 | | |
295 | 2.02M | bool BadNextOpId(const OpId& prev, const OpId& next) { |
296 | 2.02M | if (prev.index >= next.index) { |
297 | 0 | return true; |
298 | 0 | } |
299 | 2.02M | if (prev.term > next.term) { |
300 | 0 | return true; |
301 | 0 | } |
302 | 2.02M | return false; |
303 | 2.02M | } |
304 | | |
305 | 4.64M | HybridTime MvccManager::AddLeaderPending(const OpId& op_id) { |
306 | 4.64M | std::lock_guard<std::mutex> lock(mutex_); |
307 | 4.64M | auto ht = clock_->Now(); |
308 | 4.64M | AtomicFlagSleepMs(&FLAGS_TEST_inject_mvcc_delay_add_leader_pending_ms); |
309 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << "(" << op_id << "), time: " << ht; |
310 | 4.64M | AddPending(ht, op_id, /* is_follower_side= */ false); |
311 | | |
312 | 4.64M | if (op_trace_) { |
313 | 4.64M | op_trace_->Add(AddLeaderPendingTraceItem { |
314 | 4.64M | .ht = ht, |
315 | 4.64M | .op_id = op_id, |
316 | 4.64M | }); |
317 | 4.64M | } |
318 | | |
319 | 4.64M | return ht; |
320 | 4.64M | } |
321 | | |
322 | 8.80M | void MvccManager::AddFollowerPending(HybridTime ht, const OpId& op_id) { |
323 | 8.80M | std::lock_guard<std::mutex> lock(mutex_); |
324 | 8.80M | VLOG_WITH_PREFIX2.62k (1) << __func__ << "(" << ht << ", " << op_id << ")"2.62k ; |
325 | | |
326 | 8.80M | AddPending(ht, op_id, /* is_follower_side= */ true); |
327 | | |
328 | 8.80M | if (op_trace_) { |
329 | 8.79M | op_trace_->Add(AddFollowerPendingTraceItem { |
330 | 8.79M | .ht = ht, |
331 | 8.79M | .op_id = op_id, |
332 | 8.79M | }); |
333 | 8.79M | } |
334 | 8.80M | } |
335 | | |
336 | 13.4M | void MvccManager::AddPending(HybridTime ht, const OpId& op_id, bool is_follower_side) { |
337 | 13.4M | CHECK(!op_id.empty()); |
338 | | |
339 | 13.4M | HybridTime last_ht_in_queue = queue_.empty() ? HybridTime::kMin11.4M : queue_.back().hybrid_time2.03M ; |
340 | | |
341 | 13.4M | HybridTime sanity_check_lower_bound = |
342 | 13.4M | std::max({ |
343 | 13.4M | max_safe_time_returned_with_lease_.safe_time, |
344 | 13.4M | max_safe_time_returned_without_lease_.safe_time, |
345 | 13.4M | max_safe_time_returned_for_follower_.safe_time, |
346 | 13.4M | propagated_safe_time_, |
347 | 13.4M | last_replicated_, |
348 | 13.4M | last_ht_in_queue}); |
349 | | |
350 | 13.4M | if (ht <= sanity_check_lower_bound) { |
351 | 0 | auto get_details_msg = [&](bool drain_aborted) { |
352 | 0 | std::ostringstream ss; |
353 | 0 | #define LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(full, safe_time) \ |
354 | 0 | "\n " << EXPR_VALUE_FOR_LOG(full) \ |
355 | 0 | << "\n " << (ht <= safe_time ? "!!! " : "") << EXPR_VALUE_FOR_LOG(ht <= safe_time) \ |
356 | 0 | << "\n " << EXPR_VALUE_FOR_LOG( \ |
357 | 0 | static_cast<int64_t>(ht.ToUint64() - safe_time.ToUint64())) \ |
358 | 0 | << "\n " << EXPR_VALUE_FOR_LOG(ht.PhysicalDiff(safe_time)) \ |
359 | 0 | << "\n " |
360 | |
|
361 | 0 | #define LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t.safe_time) |
362 | 0 | #define LOG_INFO_FOR_HT_LOWER_BOUND(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t) |
363 | |
|
364 | 0 | ss << "New operation's hybrid time too low: " << ht << ", op id: " << op_id |
365 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_with_lease_) |
366 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_without_lease_) |
367 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_for_follower_) |
368 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND(last_replicated_) |
369 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND(last_ht_in_queue) |
370 | 0 | << LOG_INFO_FOR_HT_LOWER_BOUND(propagated_safe_time_) |
371 | 0 | << "\n " << EXPR_VALUE_FOR_LOG(queue_.size()) |
372 | 0 | << "\n " << EXPR_VALUE_FOR_LOG(queue_); |
373 | 0 | return ss.str(); |
374 | 0 | #undef LOG_INFO_FOR_HT_LOWER_BOUND |
375 | 0 | }; |
376 | |
|
377 | | #ifdef NDEBUG |
378 | | // In release mode, let's try to avoid crashing if possible if we ever hit this situation. |
379 | | // On the leader side, we can assign a timestamp that is high enough. |
380 | | if (!is_follower_side && |
381 | | sanity_check_lower_bound && |
382 | | sanity_check_lower_bound != HybridTime::kMax) { |
383 | | HybridTime incremented_hybrid_time = sanity_check_lower_bound.Incremented(); |
384 | | YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix() |
385 | | << "Assigning an artificially incremented hybrid time: " << incremented_hybrid_time |
386 | | << ". This needs to be investigated. " << get_details_msg(/* drain_aborted */ false); |
387 | | ht = incremented_hybrid_time; |
388 | | } |
389 | | #endif |
390 | |
|
391 | 0 | if (ht <= sanity_check_lower_bound) { |
392 | 0 | LOG_WITH_PREFIX(FATAL) << InvariantViolationLogPrefix() |
393 | 0 | << get_details_msg(/* drain_aborted */ true); |
394 | 0 | } |
395 | 0 | } |
396 | | |
397 | 13.4M | LOG_IF_WITH_PREFIX3.38k (DFATAL, |
398 | 3.38k | !queue_.empty() && BadNextOpId(queue_.back().op_id, op_id)) |
399 | 3.38k | << "Op sequence failure: " << AsString(queue_.back().op_id) << " followed by " |
400 | 3.38k | << AsString(op_id) << " " << InvariantViolationLogPrefix(); |
401 | | |
402 | 13.4M | queue_.push_back(QueueItem { |
403 | 13.4M | .hybrid_time = ht, |
404 | 13.4M | .op_id = op_id, |
405 | 13.4M | }); |
406 | 13.4M | } |
407 | | |
408 | 2.79k | void MvccManager::SetLastReplicated(HybridTime ht) { |
409 | 2.79k | VLOG_WITH_PREFIX0 (1) << __func__ << "(" << ht << ")"0 ; |
410 | | |
411 | 2.79k | { |
412 | 2.79k | std::lock_guard<std::mutex> lock(mutex_); |
413 | 2.79k | if (op_trace_) { |
414 | 2.79k | op_trace_->Add(SetLastReplicatedTraceItem { .ht = ht }); |
415 | 2.79k | } |
416 | 2.79k | last_replicated_ = ht; |
417 | 2.79k | } |
418 | 2.79k | cond_.notify_all(); |
419 | 2.79k | } |
420 | | |
421 | 25.2M | void MvccManager::SetPropagatedSafeTimeOnFollower(HybridTime ht) { |
422 | 25.2M | VLOG_WITH_PREFIX7.42k (1) << __func__ << "(" << ht << ")"7.42k ; |
423 | | |
424 | 25.2M | { |
425 | 25.2M | std::lock_guard<std::mutex> lock(mutex_); |
426 | 25.2M | if (op_trace_) { |
427 | 25.2M | op_trace_->Add(SetPropagatedSafeTimeOnFollowerTraceItem { .ht = ht }); |
428 | 25.2M | } |
429 | 25.2M | if (ht >= propagated_safe_time_) { |
430 | 25.2M | propagated_safe_time_ = ht; |
431 | 25.2M | } else { |
432 | 30.3k | LOG_WITH_PREFIX(WARNING) |
433 | 30.3k | << "Received propagated safe time " << ht << " less than the old value: " |
434 | 30.3k | << propagated_safe_time_ << ". This could happen on followers when a new leader " |
435 | 30.3k | << "is elected."; |
436 | 30.3k | } |
437 | 25.2M | } |
438 | 25.2M | cond_.notify_all(); |
439 | 25.2M | } |
440 | | |
441 | | // NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock. |
442 | | void MvccManager::UpdatePropagatedSafeTimeOnLeader(const FixedHybridTimeLease& ht_lease) |
443 | 34.6M | NO_THREAD_SAFETY_ANALYSIS { |
444 | 34.6M | VLOG_WITH_PREFIX7.66k (1) << __func__ << "(" << ht_lease << ")"7.66k ; |
445 | | |
446 | 34.6M | { |
447 | 34.6M | std::unique_lock<std::mutex> lock(mutex_); |
448 | 34.6M | auto safe_time = DoGetSafeTime(HybridTime::kMin, // min_allowed |
449 | 34.6M | CoarseTimePoint::max(), // deadline |
450 | 34.6M | ht_lease, |
451 | 34.6M | &lock); |
452 | 34.6M | #ifndef NDEBUG |
453 | | // This should only be called from RaftConsensus::UpdateMajorityReplicated, and ht_lease passed |
454 | | // in here should keep increasing, so we should not see propagated_safe_time_ going backwards. |
455 | 34.6M | CHECK_GE(safe_time, propagated_safe_time_) |
456 | 0 | << InvariantViolationLogPrefix() |
457 | 0 | << "ht_lease: " << ht_lease; |
458 | 34.6M | propagated_safe_time_ = safe_time; |
459 | | #else |
460 | | // Do not crash in production. |
461 | | if (safe_time < propagated_safe_time_) { |
462 | | YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix() |
463 | | << "Previously saw " << EXPR_VALUE_FOR_LOG(propagated_safe_time_) |
464 | | << ", but now safe time is " << safe_time; |
465 | | } else { |
466 | | propagated_safe_time_ = safe_time; |
467 | | } |
468 | | #endif |
469 | | |
470 | 34.6M | if (op_trace_) { |
471 | 34.6M | op_trace_->Add(UpdatePropagatedSafeTimeOnLeaderTraceItem { |
472 | 34.6M | .ht_lease = ht_lease, |
473 | 34.6M | .safe_time = safe_time |
474 | 34.6M | }); |
475 | 34.6M | } |
476 | 34.6M | } |
477 | 34.6M | cond_.notify_all(); |
478 | 34.6M | } |
479 | | |
480 | 169k | void MvccManager::SetLeaderOnlyMode(bool leader_only) { |
481 | 169k | std::lock_guard<std::mutex> lock(mutex_); |
482 | 169k | if (op_trace_) { |
483 | 169k | op_trace_->Add(SetLeaderOnlyModeTraceItem { |
484 | 169k | .leader_only = leader_only |
485 | 169k | }); |
486 | 169k | } |
487 | 169k | leader_only_mode_ = leader_only; |
488 | 169k | } |
489 | | |
490 | | // NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock. |
491 | | HybridTime MvccManager::SafeTimeForFollower( |
492 | 904k | HybridTime min_allowed, CoarseTimePoint deadline) const NO_THREAD_SAFETY_ANALYSIS { |
493 | 904k | std::unique_lock<std::mutex> lock(mutex_); |
494 | | |
495 | 904k | if (leader_only_mode_) { |
496 | | // If there are no followers (RF == 1), use SafeTime() because propagated_safe_time_ might not |
497 | | // have a valid value. |
498 | 81.1k | return DoGetSafeTime(min_allowed, deadline, FixedHybridTimeLease(), &lock); |
499 | 81.1k | } |
500 | | |
501 | 823k | SafeTimeWithSource result; |
502 | 827k | auto predicate = [this, &result, min_allowed] { |
503 | | // last_replicated_ is updated earlier than propagated_safe_time_, so because of concurrency it |
504 | | // could be greater than propagated_safe_time_. |
505 | 827k | if (propagated_safe_time_ > last_replicated_) { |
506 | 636k | if (queue_.empty() || propagated_safe_time_ < queue_.front().hybrid_time215k ) { |
507 | 617k | result.safe_time = propagated_safe_time_; |
508 | 617k | result.source = SafeTimeSource::kPropagated; |
509 | 617k | } else { |
510 | 18.8k | result.safe_time = queue_.front().hybrid_time.Decremented(); |
511 | 18.8k | result.source = SafeTimeSource::kNextInQueue; |
512 | 18.8k | } |
513 | 636k | } else { |
514 | 190k | result.safe_time = last_replicated_; |
515 | 190k | result.source = SafeTimeSource::kLastReplicated; |
516 | 190k | } |
517 | 827k | return result.safe_time >= min_allowed; |
518 | 827k | }; |
519 | 823k | if (deadline == CoarseTimePoint::max()) { |
520 | 322k | cond_.wait(lock, predicate); |
521 | 500k | } else if (!cond_.wait_until(lock, deadline, predicate)) { |
522 | 0 | return HybridTime::kInvalid; |
523 | 0 | } |
524 | 18.4E | VLOG_WITH_PREFIX(1) << "SafeTimeForFollower(" << min_allowed |
525 | 18.4E | << "), result = " << result.ToString(); |
526 | 823k | CHECK_GE(result.safe_time, max_safe_time_returned_for_follower_.safe_time) |
527 | 0 | << InvariantViolationLogPrefix() |
528 | 0 | << "result: " << result.ToString() |
529 | 0 | << ", max_safe_time_returned_for_follower_: " |
530 | 0 | << max_safe_time_returned_for_follower_.ToString(); |
531 | 823k | max_safe_time_returned_for_follower_ = result; |
532 | 823k | if (op_trace_823k ) { |
533 | 823k | op_trace_->Add(SafeTimeForFollowerTraceItem { |
534 | 823k | .min_allowed = min_allowed, |
535 | 823k | .deadline = deadline, |
536 | 823k | .safe_time_with_source = result |
537 | 823k | }); |
538 | 823k | } |
539 | 823k | return result.safe_time; |
540 | 823k | } |
541 | | |
542 | | // NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock. |
543 | | HybridTime MvccManager::SafeTime( |
544 | | HybridTime min_allowed, |
545 | | CoarseTimePoint deadline, |
546 | 38.7M | const FixedHybridTimeLease& ht_lease) const NO_THREAD_SAFETY_ANALYSIS { |
547 | 38.7M | std::unique_lock<std::mutex> lock(mutex_); |
548 | 38.7M | auto safe_time = DoGetSafeTime(min_allowed, deadline, ht_lease, &lock); |
549 | 38.7M | if (op_trace_38.7M ) { |
550 | 38.7M | op_trace_->Add(SafeTimeTraceItem { |
551 | 38.7M | .min_allowed = min_allowed, |
552 | 38.7M | .deadline = deadline, |
553 | 38.7M | .ht_lease = ht_lease, |
554 | 38.7M | .safe_time = safe_time |
555 | 38.7M | }); |
556 | 38.7M | } |
557 | 38.7M | return safe_time; |
558 | 38.7M | } |
559 | | |
560 | | HybridTime MvccManager::DoGetSafeTime(const HybridTime min_allowed, |
561 | | const CoarseTimePoint deadline, |
562 | | const FixedHybridTimeLease& ht_lease, |
563 | 73.5M | std::unique_lock<std::mutex>* lock) const { |
564 | 73.5M | DCHECK_ONLY_NOTNULL(lock); |
565 | 73.5M | CHECK(ht_lease.lease.is_valid()) << InvariantViolationLogPrefix()36.9k ; |
566 | 73.5M | CHECK_LE(min_allowed, ht_lease.lease) << InvariantViolationLogPrefix()0 ; |
567 | | |
568 | 73.5M | const bool has_lease = !ht_lease.empty(); |
569 | | // Because different calls that have current hybrid time leader lease as an argument can come to |
570 | | // us out of order, we might see an older value of hybrid time leader lease expiration after a |
571 | | // newer value. We mitigate this by always using the highest value we've seen. |
572 | 73.5M | if (has_lease) { |
573 | 72.3M | LOG_IF_WITH_PREFIX3.30k (DFATAL, !ht_lease.time.is_valid()) << "Bad ht lease: " << ht_lease3.30k ; |
574 | 72.3M | } |
575 | | |
576 | 73.5M | HybridTime result; |
577 | 73.5M | SafeTimeSource source = SafeTimeSource::kUnknown; |
578 | 73.5M | auto predicate = [this, &result, &source, min_allowed, ht_lease, has_lease] { |
579 | 73.5M | if (queue_.empty()) { |
580 | 55.8M | result = ht_lease.time.is_valid() |
581 | 55.8M | ? std::max(max_safe_time_returned_with_lease_.safe_time, ht_lease.time)54.7M |
582 | 55.8M | : clock_->Now()1.17M ; |
583 | 55.8M | source = SafeTimeSource::kNow; |
584 | 55.8M | VLOG_WITH_PREFIX12.6k (2) << "DoGetSafeTime, Now: " << result12.6k ; |
585 | 55.8M | } else { |
586 | 17.7M | result = queue_.front().hybrid_time.Decremented(); |
587 | 17.7M | source = SafeTimeSource::kNextInQueue; |
588 | 17.7M | VLOG_WITH_PREFIX48.3k (2) << "DoGetSafeTime, Queue front (decremented): " << result48.3k ; |
589 | 17.7M | } |
590 | | |
591 | 73.5M | if (has_lease) { |
592 | 72.3M | auto used_lease = std::max({ht_lease.lease, max_safe_time_returned_with_lease_.safe_time}); |
593 | 72.3M | if (result > used_lease) { |
594 | 3.38M | result = used_lease; |
595 | 3.38M | source = SafeTimeSource::kHybridTimeLease; |
596 | 3.38M | } |
597 | 72.3M | } |
598 | | |
599 | | // This function could be invoked at a follower, so it has a very old ht_lease. In this case it |
600 | | // is safe to read at least at last_replicated_. |
601 | 73.5M | result = std::max(result, last_replicated_); |
602 | | |
603 | 73.5M | return result >= min_allowed; |
604 | 73.5M | }; |
605 | | |
606 | | // In the case of an empty queue, the safe hybrid time to read at is only limited by hybrid time |
607 | | // ht_lease, which is by definition higher than min_allowed, so we would not get blocked. |
608 | 73.5M | if (deadline == CoarseTimePoint::max()) { |
609 | 71.9M | cond_.wait(*lock, predicate); |
610 | 71.9M | } else if (1.58M !cond_.wait_until(*lock, deadline, predicate)1.58M ) { |
611 | 1 | return HybridTime::kInvalid; |
612 | 1 | } |
613 | 73.5M | VLOG_WITH_PREFIX_AND_FUNC13.2k (1) |
614 | 13.2k | << "(" << min_allowed << ", " << ht_lease << "), result = " << result; |
615 | | |
616 | 73.5M | auto enforced_min_time = has_lease ? max_safe_time_returned_with_lease_.safe_time72.3M |
617 | 73.5M | : max_safe_time_returned_without_lease_.safe_time1.21M ; |
618 | 73.5M | CHECK_GE(result, enforced_min_time) |
619 | 0 | << InvariantViolationLogPrefix() |
620 | 0 | << ": " << EXPR_VALUE_FOR_LOG(has_lease) |
621 | 0 | << ", " << EXPR_VALUE_FOR_LOG(enforced_min_time.ToUint64() - result.ToUint64()) |
622 | 0 | << ", " << EXPR_VALUE_FOR_LOG(ht_lease) |
623 | 0 | << ", " << EXPR_VALUE_FOR_LOG(last_replicated_) |
624 | 0 | << ", " << EXPR_VALUE_FOR_LOG(clock_->Now()) |
625 | 0 | << ", " << EXPR_VALUE_FOR_LOG(ToString(deadline)) |
626 | 0 | << ", " << EXPR_VALUE_FOR_LOG(queue_.size()) |
627 | 0 | << ", " << EXPR_VALUE_FOR_LOG(queue_); |
628 | | |
629 | 73.5M | if (has_lease) { |
630 | 72.3M | max_safe_time_returned_with_lease_ = { result, source }; |
631 | 72.3M | } else { |
632 | 1.22M | max_safe_time_returned_without_lease_ = { result, source }; |
633 | 1.22M | } |
634 | 73.5M | return result; |
635 | 73.5M | } |
636 | | |
637 | 29.2M | HybridTime MvccManager::LastReplicatedHybridTime() const { |
638 | 29.2M | std::lock_guard<std::mutex> lock(mutex_); |
639 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << "(), result = " << last_replicated_; |
640 | 29.2M | if (op_trace_29.2M ) { |
641 | 29.2M | op_trace_->Add(LastReplicatedHybridTimeTraceItem { |
642 | 29.2M | .last_replicated = last_replicated_ |
643 | 29.2M | }); |
644 | 29.2M | } |
645 | 29.2M | return last_replicated_; |
646 | 29.2M | } |
647 | | |
648 | | // Using NO_THREAD_SAFETY_ANALYSIS here because we're only reading op_trace_ here and it is set |
649 | | // in the constructor. |
650 | 0 | MvccManager::InvariantViolationLoggingHelper MvccManager::InvariantViolationLogPrefix() const { |
651 | 0 | return { prefix_, op_trace_.get() }; |
652 | 0 | } |
653 | | |
654 | | // Ditto regarding NO_THREAD_SAFETY_ANALYSIS. |
655 | 2 | void MvccManager::TEST_DumpTrace(std::ostream* out) NO_THREAD_SAFETY_ANALYSIS { |
656 | 2 | if (op_trace_) |
657 | 2 | op_trace_->DumpTrace(out); |
658 | 2 | } |
659 | | |
660 | 0 | std::string MvccManager::QueueItem::ToString() const { |
661 | 0 | return YB_STRUCT_TO_STRING(hybrid_time, op_id); |
662 | 0 | } |
663 | | |
664 | 13.4M | bool MvccManager::QueueItem::Eq(const MvccManager::QueueItem& rhs) const { |
665 | 13.4M | const auto& lhs = *this; |
666 | 13.4M | return YB_STRUCT_EQUALS(hybrid_time, op_id); |
667 | 13.4M | } |
668 | | |
669 | | } // namespace tablet |
670 | | } // namespace yb |