YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/mvcc-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <sstream>
34
35
#include <glog/logging.h>
36
37
#include "yb/gutil/casts.h"
38
39
#include "yb/server/logical_clock.h"
40
41
#include "yb/tablet/mvcc.h"
42
43
#include "yb/util/atomic.h"
44
#include "yb/util/enums.h"
45
#include "yb/util/random_util.h"
46
#include "yb/util/scope_exit.h"
47
#include "yb/util/status.h"
48
#include "yb/util/test_util.h"
49
50
using namespace std::literals;
51
using std::vector;
52
53
using yb::server::LogicalClock;
54
55
namespace yb {
56
namespace tablet {
57
58
class MvccTest : public YBTest {
59
 public:
60
  MvccTest()
61
      : clock_(server::LogicalClock::CreateStartingAt(HybridTime::kInitial)),
62
6
        manager_(std::string(), clock_.get()) {
63
6
  }
64
65
 protected:
66
  void RunRandomizedTest(bool use_ht_lease);
67
68
  server::ClockPtr clock_;
69
  MvccManager manager_;
70
};
71
72
namespace {
73
74
5
HybridTime AddLogical(HybridTime input, uint64_t delta) {
75
5
  HybridTime result;
76
5
  EXPECT_OK(result.FromUint64(input.ToUint64() + delta));
77
5
  return result;
78
5
}
79
80
}
81
82
1
TEST_F(MvccTest, Basic) {
83
1
  constexpr size_t kTotalEntries = 10;
84
1
  vector<HybridTime> hts(kTotalEntries);
85
11
  for (int i = 0; i != kTotalEntries; ++i) {
86
10
    hts[i] = manager_.AddLeaderPending(OpId(1, i));
87
10
  }
88
11
  for (int i = 0; i != kTotalEntries; ++i) {
89
10
    manager_.Replicated(hts[i], OpId(1, i));
90
10
    ASSERT_EQ(hts[i], manager_.LastReplicatedHybridTime());
91
10
  }
92
1
}
93
94
1
TEST_F(MvccTest, SafeHybridTimeToReadAt) {
95
1
  std::ostringstream mvcc_op_trace_stream;
96
1
  manager_.TEST_DumpTrace(&mvcc_op_trace_stream);
97
1
  ASSERT_STR_CONTAINS(mvcc_op_trace_stream.str(), "No MVCC operations");
98
1
  mvcc_op_trace_stream.clear();
99
100
1
  constexpr uint64_t kLease = 10;
101
1
  constexpr uint64_t kDelta = 10;
102
1
  auto time = AddLogical(clock_->Now(), kLease);
103
1
  FixedHybridTimeLease ht_lease {
104
1
    .time = time,
105
1
    .lease = time,
106
1
  };
107
1
  clock_->Update(AddLogical(ht_lease.lease, kDelta));
108
1
  ASSERT_EQ(ht_lease.lease, manager_.SafeTime(ht_lease));
109
110
1
  HybridTime ht1 = clock_->Now();
111
1
  manager_.AddFollowerPending(ht1, OpId(1, 1));
112
1
  ASSERT_EQ(ht1.Decremented(), manager_.SafeTime(FixedHybridTimeLease()));
113
114
1
  HybridTime ht2 = manager_.AddLeaderPending(OpId(1, 2));
115
1
  ASSERT_EQ(ht1.Decremented(), manager_.SafeTime(FixedHybridTimeLease()));
116
117
1
  manager_.Replicated(ht1, OpId(1, 1));
118
1
  ASSERT_EQ(ht2.Decremented(), manager_.SafeTime(FixedHybridTimeLease()));
119
120
1
  manager_.Replicated(ht2, OpId(1, 2));
121
1
  time = clock_->Now();
122
1
  ht_lease = {
123
1
    .time = time,
124
1
    .lease = time,
125
1
  };
126
1
  ASSERT_EQ(time, manager_.SafeTime(ht_lease));
127
128
1
  manager_.TEST_DumpTrace(&mvcc_op_trace_stream);
129
1
  const auto mvcc_trace = mvcc_op_trace_stream.str();
130
1
  ASSERT_STR_CONTAINS(mvcc_trace, "1. SafeTime");
131
1
  ASSERT_STR_CONTAINS(mvcc_trace, "2. AddFollowerPending");
132
1
  ASSERT_STR_CONTAINS(mvcc_trace, "8. Replicated");
133
1
  ASSERT_STR_CONTAINS(mvcc_trace, "9. SafeTime");
134
1
}
135
136
1
TEST_F(MvccTest, Abort) {
137
1
  constexpr size_t kTotalEntries = 10;
138
1
  vector<HybridTime> hts(kTotalEntries);
139
11
  for (int i = 0; i != kTotalEntries; ++i) {
140
10
    hts[i] = manager_.AddLeaderPending(OpId(1, i));
141
10
  }
142
1
  size_t begin = 0;
143
1
  size_t end = hts.size();
144
11
  for (size_t i = 0; i < hts.size(); ++i) {
145
10
    if (i & 1) {
146
5
      ASSERT_EQ(hts[begin].Decremented(), manager_.SafeTime(FixedHybridTimeLease()));
147
5
      manager_.Replicated(hts[begin], OpId(1, begin));
148
5
      ++begin;
149
5
    } else {
150
5
      --end;
151
5
      manager_.Aborted(hts[end], OpId(1, end));
152
5
    }
153
10
  }
154
1
  auto now = clock_->Now();
155
1
  ASSERT_EQ(now, manager_.SafeTime({
156
1
    .time = now,
157
1
    .lease = now,
158
1
  }));
159
1
}
160
161
2
void MvccTest::RunRandomizedTest(bool use_ht_lease) {
162
2
  constexpr size_t kTotalOperations = 20000;
163
2
  enum class OpType { kAdd, kReplicated, kAborted };
164
165
2
  struct Op {
166
2
    OpType type;
167
2
    HybridTime ht;
168
2
    OpId op_id;
169
170
20.0k
    Op CopyAndChangeType(OpType new_type) const {
171
20.0k
      Op result = *this;
172
20.0k
      result.type = new_type;
173
20.0k
      return result;
174
20.0k
    }
175
2
  };
176
177
2
  std::map<HybridTime, size_t> queue;
178
2
  vector<Op> alive;
179
2
  size_t counts[] = { 0, 0, 0 };
180
181
2
  std::atomic<bool> stopped { false };
182
183
14
  const auto get_count = [&counts](OpType op) { return counts[to_underlying(op)]; };
184
2
  LogicalClock* const logical_clock = down_cast<LogicalClock*>(clock_.get());
185
186
2
  std::atomic<uint64_t> max_ht_lease{0};
187
2
  std::atomic<bool> is_leader{true};
188
189
2
  const auto ht_lease_provider =
190
63.4k
      [use_ht_lease, logical_clock, &max_ht_lease]() -> FixedHybridTimeLease {
191
63.4k
        if (!use_ht_lease) {
192
29.4k
          return FixedHybridTimeLease();
193
29.4k
        }
194
34.0k
        auto now = logical_clock->Peek();
195
34.0k
        auto ht_lease = now.AddMicroseconds(RandomUniformInt(0, 50));
196
197
        // Update the maximum HT lease that we gave to any caller.
198
34.0k
        UpdateAtomicMax(&max_ht_lease, ht_lease.ToUint64());
199
200
34.0k
        return {
201
34.0k
          .time = now,
202
34.0k
          .lease = ht_lease,
203
34.0k
        };
204
34.0k
      };
205
206
  // This thread will keep getting the safe time in the background.
207
2
  std::thread safetime_query_thread([this, &stopped, &ht_lease_provider, &is_leader]() {
208
25.2k
    while (!stopped.load(std::memory_order_acquire)) {
209
25.2k
      if (is_leader.load(std::memory_order_acquire)) {
210
23.6k
        manager_.SafeTime(HybridTime::kMin, CoarseTimePoint::max(), ht_lease_provider());
211
1.62k
      } else {
212
1.62k
        manager_.SafeTimeForFollower(HybridTime::kMin, CoarseTimePoint::max());
213
1.62k
      }
214
25.2k
      std::this_thread::yield();
215
25.2k
    }
216
2
  });
217
218
2
  auto se = ScopeExit([&stopped, &safetime_query_thread] {
219
2
    stopped = true;
220
2
    safetime_query_thread.join();
221
2
  });
222
223
2
  vector<Op> ops;
224
2
  ops.reserve(kTotalOperations);
225
226
2
  const ssize_t kTargetConcurrency = 50;
227
228
2
  int op_idx = 0;
229
40.0k
  for (size_t i = 0; i < kTotalOperations || !alive.empty(); ++i) {
230
40.0k
    ssize_t rnd;
231
40.0k
    if (kTotalOperations - i <= alive.size()) {
232
      // We have (kTotalOperations - i) operations left to do, so let's finish operations that are
233
      // already in progress.
234
313
      rnd = kTargetConcurrency + RandomUniformInt(0, 1);
235
39.6k
    } else {
236
      // If alive.size() < kTargetConcurrency, we'll be starting new operations with probability of
237
      // 1 - alive.size() / (2 * kTargetConcurrency), starting at almost 100% and approaching 50%
238
      // as alive.size() reaches kTargetConcurrency.
239
      //
240
      // If alive.size() >= kTargetConcurrency: we keep starting new operations in half of the
241
      // cases, and finishing existing ones in half the cases.
242
39.6k
      rnd = RandomUniformInt(-kTargetConcurrency, kTargetConcurrency - 1) +
243
39.6k
          std::min<ssize_t>(kTargetConcurrency, alive.size());
244
39.6k
    }
245
40.0k
    if (rnd < kTargetConcurrency) {
246
      // Start a new operation.
247
20.0k
      OpId op_id(1, ++op_idx);
248
20.0k
      HybridTime ht = manager_.AddLeaderPending(op_id);
249
20.0k
      alive.push_back(Op {.type = OpType::kAdd, .ht = ht, .op_id = op_id});
250
20.0k
      queue.emplace(alive.back().ht, alive.size() - 1);
251
20.0k
      ops.push_back(alive.back());
252
20.0k
    } else {
253
20.0k
      size_t idx;
254
20.0k
      if (rnd & 1) {
255
        // Finish replication for the next operation.
256
9.87k
        idx = queue.begin()->second;
257
9.87k
        ops.push_back(alive[idx].CopyAndChangeType(OpType::kReplicated));
258
9.87k
        manager_.Replicated(alive[idx].ht, alive[idx].op_id);
259
10.1k
      } else {
260
        // Abort the last operation that is alive.
261
10.1k
        idx = queue.rbegin()->second;
262
10.1k
        ops.push_back(alive[idx].CopyAndChangeType(OpType::kAborted));
263
10.1k
        manager_.Aborted(alive[idx].ht, alive[idx].op_id);
264
10.1k
      }
265
20.0k
      queue.erase(alive[idx].ht);
266
20.0k
      alive[idx] = alive.back();
267
20.0k
      alive.pop_back();
268
20.0k
      if (idx != alive.size()) {
269
14.1k
        ASSERT_EQ(queue[alive[idx].ht], alive.size());
270
14.1k
        queue[alive[idx].ht] = idx;
271
14.1k
      }
272
20.0k
    }
273
40.0k
    ++counts[to_underlying(ops.back().type)];
274
275
40.0k
    HybridTime safe_time;
276
40.0k
    if (alive.empty()) {
277
2
      auto time_before = clock_->Now();
278
2
      safe_time = manager_.SafeTime(ht_lease_provider());
279
2
      auto time_after = clock_->Now();
280
2
      ASSERT_GE(safe_time.ToUint64(), time_before.ToUint64());
281
2
      ASSERT_LE(safe_time.ToUint64(), time_after.ToUint64());
282
39.9k
    } else {
283
39.9k
      auto min = queue.begin()->first;
284
39.9k
      safe_time = manager_.SafeTime(ht_lease_provider());
285
39.9k
      ASSERT_EQ(min.Decremented(), safe_time);
286
39.9k
    }
287
40.0k
    if (use_ht_lease) {
288
20.0k
      ASSERT_LE(safe_time.ToUint64(), max_ht_lease.load(std::memory_order_acquire));
289
20.0k
    }
290
40.0k
  }
291
2
  LOG(INFO) << "Adds: " << get_count(OpType::kAdd)
292
2
            << ", replicates: " << get_count(OpType::kReplicated)
293
2
            << ", aborts: " << get_count(OpType::kAborted);
294
2
  const size_t replicated_and_aborted =
295
2
      get_count(OpType::kReplicated) + get_count(OpType::kAborted);
296
2
  ASSERT_EQ(kTotalOperations, get_count(OpType::kAdd) + replicated_and_aborted);
297
2
  ASSERT_EQ(get_count(OpType::kAdd), replicated_and_aborted);
298
299
  // Replay the recorded operations as if we are a follower receiving these operations from the
300
  // leader.
301
2
  is_leader = false;
302
2
  uint64_t shift = std::max(max_ht_lease + 1, clock_->Now().ToUint64() + 1);
303
2
  LOG(INFO) << "Shifting hybrid times by " << shift << " units and replaying in follower mode";
304
2
  auto start = std::chrono::steady_clock::now();
305
40.0k
  for (auto& op : ops) {
306
40.0k
    op.ht = HybridTime(op.ht.ToUint64() + shift);
307
40.0k
    switch (op.type) {
308
20.0k
      case OpType::kAdd:
309
20.0k
        manager_.AddFollowerPending(op.ht, op.op_id);
310
20.0k
        break;
311
9.87k
      case OpType::kReplicated:
312
9.87k
        manager_.Replicated(op.ht, op.op_id);
313
9.87k
        break;
314
10.1k
      case OpType::kAborted:
315
10.1k
        manager_.Aborted(op.ht, op.op_id);
316
10.1k
        break;
317
40.0k
    }
318
40.0k
  }
319
2
  auto end = std::chrono::steady_clock::now();
320
2
  LOG(INFO) << "Passed: " << yb::ToString(end - start);
321
2
}
322
323
1
TEST_F(MvccTest, RandomWithoutHTLease) {
324
1
  RunRandomizedTest(false);
325
1
}
326
327
1
TEST_F(MvccTest, RandomWithHTLease) {
328
1
  RunRandomizedTest(true);
329
1
}
330
331
1
TEST_F(MvccTest, WaitForSafeTime) {
332
1
  constexpr uint64_t kLease = 10;
333
1
  constexpr uint64_t kDelta = 10;
334
1
  auto limit = AddLogical(clock_->Now(), kLease);
335
1
  clock_->Update(AddLogical(limit, kDelta));
336
1
  HybridTime ht1 = clock_->Now();
337
1
  manager_.AddFollowerPending(ht1, OpId(1, 1));
338
1
  HybridTime ht2 = manager_.AddLeaderPending(OpId(1, 2));
339
1
  std::atomic<bool> t1_done(false);
340
1
  std::thread t1([this, ht2, &t1_done] {
341
1
    manager_.SafeTime(ht2.Decremented(), CoarseTimePoint::max(), FixedHybridTimeLease());
342
1
    t1_done = true;
343
1
  });
344
1
  std::atomic<bool> t2_done(false);
345
1
  std::thread t2([this, ht2, &t2_done] {
346
1
    manager_.SafeTime(AddLogical(ht2, 1), CoarseTimePoint::max(), FixedHybridTimeLease());
347
1
    t2_done = true;
348
1
  });
349
1
  std::this_thread::sleep_for(100ms);
350
1
  ASSERT_FALSE(t1_done.load());
351
1
  ASSERT_FALSE(t2_done.load());
352
353
1
  manager_.Replicated(ht1, OpId(1, 1));
354
1
  std::this_thread::sleep_for(100ms);
355
1
  ASSERT_TRUE(t1_done.load());
356
1
  ASSERT_FALSE(t2_done.load());
357
358
1
  manager_.Replicated(ht2, OpId(1, 2));
359
1
  std::this_thread::sleep_for(100ms);
360
1
  ASSERT_TRUE(t1_done.load());
361
1
  ASSERT_TRUE(t2_done.load());
362
363
1
  t1.join();
364
1
  t2.join();
365
366
1
  HybridTime ht3 = manager_.AddLeaderPending(OpId(1, 3));
367
1
  ASSERT_FALSE(manager_.SafeTime(ht3, CoarseMonoClock::now() + 100ms, FixedHybridTimeLease()));
368
1
}
369
370
} // namespace tablet
371
} // namespace yb