/Users/deen/code/yugabyte-db/src/yb/tablet/mvcc-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <sstream> |
34 | | |
35 | | #include <glog/logging.h> |
36 | | |
37 | | #include "yb/gutil/casts.h" |
38 | | |
39 | | #include "yb/server/logical_clock.h" |
40 | | |
41 | | #include "yb/tablet/mvcc.h" |
42 | | |
43 | | #include "yb/util/atomic.h" |
44 | | #include "yb/util/enums.h" |
45 | | #include "yb/util/random_util.h" |
46 | | #include "yb/util/scope_exit.h" |
47 | | #include "yb/util/status.h" |
48 | | #include "yb/util/test_util.h" |
49 | | |
50 | | using namespace std::literals; |
51 | | using std::vector; |
52 | | |
53 | | using yb::server::LogicalClock; |
54 | | |
55 | | namespace yb { |
56 | | namespace tablet { |
57 | | |
58 | | class MvccTest : public YBTest { |
59 | | public: |
60 | | MvccTest() |
61 | | : clock_(server::LogicalClock::CreateStartingAt(HybridTime::kInitial)), |
62 | 6 | manager_(std::string(), clock_.get()) { |
63 | 6 | } |
64 | | |
65 | | protected: |
66 | | void RunRandomizedTest(bool use_ht_lease); |
67 | | |
68 | | server::ClockPtr clock_; |
69 | | MvccManager manager_; |
70 | | }; |
71 | | |
72 | | namespace { |
73 | | |
74 | 5 | HybridTime AddLogical(HybridTime input, uint64_t delta) { |
75 | 5 | HybridTime result; |
76 | 5 | EXPECT_OK(result.FromUint64(input.ToUint64() + delta)); |
77 | 5 | return result; |
78 | 5 | } |
79 | | |
80 | | } |
81 | | |
82 | 1 | TEST_F(MvccTest, Basic) { |
83 | 1 | constexpr size_t kTotalEntries = 10; |
84 | 1 | vector<HybridTime> hts(kTotalEntries); |
85 | 11 | for (int i = 0; i != kTotalEntries; ++i) { |
86 | 10 | hts[i] = manager_.AddLeaderPending(OpId(1, i)); |
87 | 10 | } |
88 | 11 | for (int i = 0; i != kTotalEntries; ++i) { |
89 | 10 | manager_.Replicated(hts[i], OpId(1, i)); |
90 | 10 | ASSERT_EQ(hts[i], manager_.LastReplicatedHybridTime()); |
91 | 10 | } |
92 | 1 | } |
93 | | |
94 | 1 | TEST_F(MvccTest, SafeHybridTimeToReadAt) { |
95 | 1 | std::ostringstream mvcc_op_trace_stream; |
96 | 1 | manager_.TEST_DumpTrace(&mvcc_op_trace_stream); |
97 | 1 | ASSERT_STR_CONTAINS(mvcc_op_trace_stream.str(), "No MVCC operations"); |
98 | 1 | mvcc_op_trace_stream.clear(); |
99 | | |
100 | 1 | constexpr uint64_t kLease = 10; |
101 | 1 | constexpr uint64_t kDelta = 10; |
102 | 1 | auto time = AddLogical(clock_->Now(), kLease); |
103 | 1 | FixedHybridTimeLease ht_lease { |
104 | 1 | .time = time, |
105 | 1 | .lease = time, |
106 | 1 | }; |
107 | 1 | clock_->Update(AddLogical(ht_lease.lease, kDelta)); |
108 | 1 | ASSERT_EQ(ht_lease.lease, manager_.SafeTime(ht_lease)); |
109 | | |
110 | 1 | HybridTime ht1 = clock_->Now(); |
111 | 1 | manager_.AddFollowerPending(ht1, OpId(1, 1)); |
112 | 1 | ASSERT_EQ(ht1.Decremented(), manager_.SafeTime(FixedHybridTimeLease())); |
113 | | |
114 | 1 | HybridTime ht2 = manager_.AddLeaderPending(OpId(1, 2)); |
115 | 1 | ASSERT_EQ(ht1.Decremented(), manager_.SafeTime(FixedHybridTimeLease())); |
116 | | |
117 | 1 | manager_.Replicated(ht1, OpId(1, 1)); |
118 | 1 | ASSERT_EQ(ht2.Decremented(), manager_.SafeTime(FixedHybridTimeLease())); |
119 | | |
120 | 1 | manager_.Replicated(ht2, OpId(1, 2)); |
121 | 1 | time = clock_->Now(); |
122 | 1 | ht_lease = { |
123 | 1 | .time = time, |
124 | 1 | .lease = time, |
125 | 1 | }; |
126 | 1 | ASSERT_EQ(time, manager_.SafeTime(ht_lease)); |
127 | | |
128 | 1 | manager_.TEST_DumpTrace(&mvcc_op_trace_stream); |
129 | 1 | const auto mvcc_trace = mvcc_op_trace_stream.str(); |
130 | 1 | ASSERT_STR_CONTAINS(mvcc_trace, "1. SafeTime"); |
131 | 1 | ASSERT_STR_CONTAINS(mvcc_trace, "2. AddFollowerPending"); |
132 | 1 | ASSERT_STR_CONTAINS(mvcc_trace, "8. Replicated"); |
133 | 1 | ASSERT_STR_CONTAINS(mvcc_trace, "9. SafeTime"); |
134 | 1 | } |
135 | | |
136 | 1 | TEST_F(MvccTest, Abort) { |
137 | 1 | constexpr size_t kTotalEntries = 10; |
138 | 1 | vector<HybridTime> hts(kTotalEntries); |
139 | 11 | for (int i = 0; i != kTotalEntries; ++i) { |
140 | 10 | hts[i] = manager_.AddLeaderPending(OpId(1, i)); |
141 | 10 | } |
142 | 1 | size_t begin = 0; |
143 | 1 | size_t end = hts.size(); |
144 | 11 | for (size_t i = 0; i < hts.size(); ++i) { |
145 | 10 | if (i & 1) { |
146 | 5 | ASSERT_EQ(hts[begin].Decremented(), manager_.SafeTime(FixedHybridTimeLease())); |
147 | 5 | manager_.Replicated(hts[begin], OpId(1, begin)); |
148 | 5 | ++begin; |
149 | 5 | } else { |
150 | 5 | --end; |
151 | 5 | manager_.Aborted(hts[end], OpId(1, end)); |
152 | 5 | } |
153 | 10 | } |
154 | 1 | auto now = clock_->Now(); |
155 | 1 | ASSERT_EQ(now, manager_.SafeTime({ |
156 | 1 | .time = now, |
157 | 1 | .lease = now, |
158 | 1 | })); |
159 | 1 | } |
160 | | |
161 | 2 | void MvccTest::RunRandomizedTest(bool use_ht_lease) { |
162 | 2 | constexpr size_t kTotalOperations = 20000; |
163 | 2 | enum class OpType { kAdd, kReplicated, kAborted }; |
164 | | |
165 | 2 | struct Op { |
166 | 2 | OpType type; |
167 | 2 | HybridTime ht; |
168 | 2 | OpId op_id; |
169 | | |
170 | 20.0k | Op CopyAndChangeType(OpType new_type) const { |
171 | 20.0k | Op result = *this; |
172 | 20.0k | result.type = new_type; |
173 | 20.0k | return result; |
174 | 20.0k | } |
175 | 2 | }; |
176 | | |
177 | 2 | std::map<HybridTime, size_t> queue; |
178 | 2 | vector<Op> alive; |
179 | 2 | size_t counts[] = { 0, 0, 0 }; |
180 | | |
181 | 2 | std::atomic<bool> stopped { false }; |
182 | | |
183 | 14 | const auto get_count = [&counts](OpType op) { return counts[to_underlying(op)]; }; |
184 | 2 | LogicalClock* const logical_clock = down_cast<LogicalClock*>(clock_.get()); |
185 | | |
186 | 2 | std::atomic<uint64_t> max_ht_lease{0}; |
187 | 2 | std::atomic<bool> is_leader{true}; |
188 | | |
189 | 2 | const auto ht_lease_provider = |
190 | 63.4k | [use_ht_lease, logical_clock, &max_ht_lease]() -> FixedHybridTimeLease { |
191 | 63.4k | if (!use_ht_lease) { |
192 | 29.4k | return FixedHybridTimeLease(); |
193 | 29.4k | } |
194 | 34.0k | auto now = logical_clock->Peek(); |
195 | 34.0k | auto ht_lease = now.AddMicroseconds(RandomUniformInt(0, 50)); |
196 | | |
197 | | // Update the maximum HT lease that we gave to any caller. |
198 | 34.0k | UpdateAtomicMax(&max_ht_lease, ht_lease.ToUint64()); |
199 | | |
200 | 34.0k | return { |
201 | 34.0k | .time = now, |
202 | 34.0k | .lease = ht_lease, |
203 | 34.0k | }; |
204 | 34.0k | }; |
205 | | |
206 | | // This thread will keep getting the safe time in the background. |
207 | 2 | std::thread safetime_query_thread([this, &stopped, &ht_lease_provider, &is_leader]() { |
208 | 25.2k | while (!stopped.load(std::memory_order_acquire)) { |
209 | 25.2k | if (is_leader.load(std::memory_order_acquire)) { |
210 | 23.6k | manager_.SafeTime(HybridTime::kMin, CoarseTimePoint::max(), ht_lease_provider()); |
211 | 1.62k | } else { |
212 | 1.62k | manager_.SafeTimeForFollower(HybridTime::kMin, CoarseTimePoint::max()); |
213 | 1.62k | } |
214 | 25.2k | std::this_thread::yield(); |
215 | 25.2k | } |
216 | 2 | }); |
217 | | |
218 | 2 | auto se = ScopeExit([&stopped, &safetime_query_thread] { |
219 | 2 | stopped = true; |
220 | 2 | safetime_query_thread.join(); |
221 | 2 | }); |
222 | | |
223 | 2 | vector<Op> ops; |
224 | 2 | ops.reserve(kTotalOperations); |
225 | | |
226 | 2 | const ssize_t kTargetConcurrency = 50; |
227 | | |
228 | 2 | int op_idx = 0; |
229 | 40.0k | for (size_t i = 0; i < kTotalOperations || !alive.empty(); ++i) { |
230 | 40.0k | ssize_t rnd; |
231 | 40.0k | if (kTotalOperations - i <= alive.size()) { |
232 | | // We have (kTotalOperations - i) operations left to do, so let's finish operations that are |
233 | | // already in progress. |
234 | 313 | rnd = kTargetConcurrency + RandomUniformInt(0, 1); |
235 | 39.6k | } else { |
236 | | // If alive.size() < kTargetConcurrency, we'll be starting new operations with probability of |
237 | | // 1 - alive.size() / (2 * kTargetConcurrency), starting at almost 100% and approaching 50% |
238 | | // as alive.size() reaches kTargetConcurrency. |
239 | | // |
240 | | // If alive.size() >= kTargetConcurrency: we keep starting new operations in half of the |
241 | | // cases, and finishing existing ones in half the cases. |
242 | 39.6k | rnd = RandomUniformInt(-kTargetConcurrency, kTargetConcurrency - 1) + |
243 | 39.6k | std::min<ssize_t>(kTargetConcurrency, alive.size()); |
244 | 39.6k | } |
245 | 40.0k | if (rnd < kTargetConcurrency) { |
246 | | // Start a new operation. |
247 | 20.0k | OpId op_id(1, ++op_idx); |
248 | 20.0k | HybridTime ht = manager_.AddLeaderPending(op_id); |
249 | 20.0k | alive.push_back(Op {.type = OpType::kAdd, .ht = ht, .op_id = op_id}); |
250 | 20.0k | queue.emplace(alive.back().ht, alive.size() - 1); |
251 | 20.0k | ops.push_back(alive.back()); |
252 | 20.0k | } else { |
253 | 20.0k | size_t idx; |
254 | 20.0k | if (rnd & 1) { |
255 | | // Finish replication for the next operation. |
256 | 9.87k | idx = queue.begin()->second; |
257 | 9.87k | ops.push_back(alive[idx].CopyAndChangeType(OpType::kReplicated)); |
258 | 9.87k | manager_.Replicated(alive[idx].ht, alive[idx].op_id); |
259 | 10.1k | } else { |
260 | | // Abort the last operation that is alive. |
261 | 10.1k | idx = queue.rbegin()->second; |
262 | 10.1k | ops.push_back(alive[idx].CopyAndChangeType(OpType::kAborted)); |
263 | 10.1k | manager_.Aborted(alive[idx].ht, alive[idx].op_id); |
264 | 10.1k | } |
265 | 20.0k | queue.erase(alive[idx].ht); |
266 | 20.0k | alive[idx] = alive.back(); |
267 | 20.0k | alive.pop_back(); |
268 | 20.0k | if (idx != alive.size()) { |
269 | 14.1k | ASSERT_EQ(queue[alive[idx].ht], alive.size()); |
270 | 14.1k | queue[alive[idx].ht] = idx; |
271 | 14.1k | } |
272 | 20.0k | } |
273 | 40.0k | ++counts[to_underlying(ops.back().type)]; |
274 | | |
275 | 40.0k | HybridTime safe_time; |
276 | 40.0k | if (alive.empty()) { |
277 | 2 | auto time_before = clock_->Now(); |
278 | 2 | safe_time = manager_.SafeTime(ht_lease_provider()); |
279 | 2 | auto time_after = clock_->Now(); |
280 | 2 | ASSERT_GE(safe_time.ToUint64(), time_before.ToUint64()); |
281 | 2 | ASSERT_LE(safe_time.ToUint64(), time_after.ToUint64()); |
282 | 39.9k | } else { |
283 | 39.9k | auto min = queue.begin()->first; |
284 | 39.9k | safe_time = manager_.SafeTime(ht_lease_provider()); |
285 | 39.9k | ASSERT_EQ(min.Decremented(), safe_time); |
286 | 39.9k | } |
287 | 40.0k | if (use_ht_lease) { |
288 | 20.0k | ASSERT_LE(safe_time.ToUint64(), max_ht_lease.load(std::memory_order_acquire)); |
289 | 20.0k | } |
290 | 40.0k | } |
291 | 2 | LOG(INFO) << "Adds: " << get_count(OpType::kAdd) |
292 | 2 | << ", replicates: " << get_count(OpType::kReplicated) |
293 | 2 | << ", aborts: " << get_count(OpType::kAborted); |
294 | 2 | const size_t replicated_and_aborted = |
295 | 2 | get_count(OpType::kReplicated) + get_count(OpType::kAborted); |
296 | 2 | ASSERT_EQ(kTotalOperations, get_count(OpType::kAdd) + replicated_and_aborted); |
297 | 2 | ASSERT_EQ(get_count(OpType::kAdd), replicated_and_aborted); |
298 | | |
299 | | // Replay the recorded operations as if we are a follower receiving these operations from the |
300 | | // leader. |
301 | 2 | is_leader = false; |
302 | 2 | uint64_t shift = std::max(max_ht_lease + 1, clock_->Now().ToUint64() + 1); |
303 | 2 | LOG(INFO) << "Shifting hybrid times by " << shift << " units and replaying in follower mode"; |
304 | 2 | auto start = std::chrono::steady_clock::now(); |
305 | 40.0k | for (auto& op : ops) { |
306 | 40.0k | op.ht = HybridTime(op.ht.ToUint64() + shift); |
307 | 40.0k | switch (op.type) { |
308 | 20.0k | case OpType::kAdd: |
309 | 20.0k | manager_.AddFollowerPending(op.ht, op.op_id); |
310 | 20.0k | break; |
311 | 9.87k | case OpType::kReplicated: |
312 | 9.87k | manager_.Replicated(op.ht, op.op_id); |
313 | 9.87k | break; |
314 | 10.1k | case OpType::kAborted: |
315 | 10.1k | manager_.Aborted(op.ht, op.op_id); |
316 | 10.1k | break; |
317 | 40.0k | } |
318 | 40.0k | } |
319 | 2 | auto end = std::chrono::steady_clock::now(); |
320 | 2 | LOG(INFO) << "Passed: " << yb::ToString(end - start); |
321 | 2 | } |
322 | | |
323 | 1 | TEST_F(MvccTest, RandomWithoutHTLease) { |
324 | 1 | RunRandomizedTest(false); |
325 | 1 | } |
326 | | |
327 | 1 | TEST_F(MvccTest, RandomWithHTLease) { |
328 | 1 | RunRandomizedTest(true); |
329 | 1 | } |
330 | | |
331 | 1 | TEST_F(MvccTest, WaitForSafeTime) { |
332 | 1 | constexpr uint64_t kLease = 10; |
333 | 1 | constexpr uint64_t kDelta = 10; |
334 | 1 | auto limit = AddLogical(clock_->Now(), kLease); |
335 | 1 | clock_->Update(AddLogical(limit, kDelta)); |
336 | 1 | HybridTime ht1 = clock_->Now(); |
337 | 1 | manager_.AddFollowerPending(ht1, OpId(1, 1)); |
338 | 1 | HybridTime ht2 = manager_.AddLeaderPending(OpId(1, 2)); |
339 | 1 | std::atomic<bool> t1_done(false); |
340 | 1 | std::thread t1([this, ht2, &t1_done] { |
341 | 1 | manager_.SafeTime(ht2.Decremented(), CoarseTimePoint::max(), FixedHybridTimeLease()); |
342 | 1 | t1_done = true; |
343 | 1 | }); |
344 | 1 | std::atomic<bool> t2_done(false); |
345 | 1 | std::thread t2([this, ht2, &t2_done] { |
346 | 1 | manager_.SafeTime(AddLogical(ht2, 1), CoarseTimePoint::max(), FixedHybridTimeLease()); |
347 | 1 | t2_done = true; |
348 | 1 | }); |
349 | 1 | std::this_thread::sleep_for(100ms); |
350 | 1 | ASSERT_FALSE(t1_done.load()); |
351 | 1 | ASSERT_FALSE(t2_done.load()); |
352 | | |
353 | 1 | manager_.Replicated(ht1, OpId(1, 1)); |
354 | 1 | std::this_thread::sleep_for(100ms); |
355 | 1 | ASSERT_TRUE(t1_done.load()); |
356 | 1 | ASSERT_FALSE(t2_done.load()); |
357 | | |
358 | 1 | manager_.Replicated(ht2, OpId(1, 2)); |
359 | 1 | std::this_thread::sleep_for(100ms); |
360 | 1 | ASSERT_TRUE(t1_done.load()); |
361 | 1 | ASSERT_TRUE(t2_done.load()); |
362 | | |
363 | 1 | t1.join(); |
364 | 1 | t2.join(); |
365 | | |
366 | 1 | HybridTime ht3 = manager_.AddLeaderPending(OpId(1, 3)); |
367 | 1 | ASSERT_FALSE(manager_.SafeTime(ht3, CoarseMonoClock::now() + 100ms, FixedHybridTimeLease())); |
368 | 1 | } |
369 | | |
370 | | } // namespace tablet |
371 | | } // namespace yb |