YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_bootstrap-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <vector>
34
35
#include "yb/common/index.h"
36
37
#include "yb/consensus/consensus-test-util.h"
38
#include "yb/consensus/consensus_meta.h"
39
#include "yb/consensus/log-test-base.h"
40
#include "yb/consensus/log_util.h"
41
#include "yb/consensus/opid_util.h"
42
43
#include "yb/docdb/ql_rowwise_iterator_interface.h"
44
45
#include "yb/server/logical_clock.h"
46
47
#include "yb/tablet/tablet-test-util.h"
48
#include "yb/tablet/tablet.h"
49
#include "yb/tablet/tablet_bootstrap_if.h"
50
#include "yb/tablet/tablet_metadata.h"
51
52
#include "yb/util/logging.h"
53
#include "yb/util/path_util.h"
54
#include "yb/util/random_util.h"
55
#include "yb/util/tostring.h"
56
#include "yb/util/tsan_util.h"
57
58
DECLARE_bool(skip_flushed_entries);
59
DECLARE_int32(retryable_request_timeout_secs);
60
61
using std::shared_ptr;
62
using std::string;
63
using std::vector;
64
65
namespace yb {
66
67
namespace log {
68
69
extern const char* kTestTable;
70
extern const char* kTestTablet;
71
extern const char* kTestNamespace;
72
73
} // namespace log
74
75
namespace tablet {
76
77
using consensus::ConsensusBootstrapInfo;
78
using consensus::ConsensusMetadata;
79
using consensus::kMinimumTerm;
80
using consensus::MakeOpId;
81
using consensus::ReplicateMsg;
82
using consensus::ReplicateMsgPtr;
83
using log::Log;
84
using log::LogAnchorRegistry;
85
using log::LogTestBase;
86
using log::ReadableLogSegment;
87
using log::AppendSync;
88
using server::Clock;
89
using server::LogicalClock;
90
using tserver::WriteRequestPB;
91
92
struct BootstrapReport {
93
  // OpIds replayed using Play... functions.
94
  std::vector<OpId> replayed;
95
96
  // OpIds replayed only into the intents RocksDB (already flushed in the regular RocksDB).
97
  std::vector<OpId> replayed_to_intents_only;
98
99
  // Entries overwritten by a later entry with the same or lower index, from a leader of a later
100
  // term.
101
  std::vector<OpId> overwritten;
102
103
  // OpIds registered with RetryableRequests. This sometimes includes flushed entries.
104
  std::vector<OpId> retryable_requests;
105
106
  // First OpIds of segments to replay, in reverse order (we traverse them from latest to earliest
107
  // in TabletBootstrap).
108
  std::vector<OpId> first_op_ids_of_segments_reversed;
109
};
110
111
struct BootstrapTestHooksImpl : public TabletBootstrapTestHooksIf {
112
410
  virtual ~BootstrapTestHooksImpl() {}
113
114
400
  void Clear() {
115
400
    *this = BootstrapTestHooksImpl();
116
400
  }
117
118
407
  boost::optional<DocDbOpIds> GetFlushedOpIdsOverride() const override {
119
407
    return flushed_op_ids;
120
407
  }
121
122
255k
  void Replayed(OpId op_id, AlreadyAppliedToRegularDB already_applied_to_regular_db) override {
123
255k
    actual_report.replayed.push_back(op_id);
124
255k
    if (already_applied_to_regular_db) {
125
6.69k
      actual_report.replayed_to_intents_only.push_back(op_id);
126
6.69k
    }
127
255k
  }
128
129
66.1k
  void Overwritten(OpId op_id) override {
130
66.1k
    actual_report.overwritten.push_back(op_id);
131
66.1k
  };
132
133
294k
  void RetryableRequest(OpId op_id) override {
134
294k
    actual_report.retryable_requests.push_back(op_id);
135
294k
  }
136
137
39.2k
  bool ShouldSkipTransactionUpdates() const override {
138
39.2k
    return true;
139
39.2k
  }
140
141
58.3k
  bool ShouldSkipWritingIntents() const override {
142
58.3k
    return true;
143
58.3k
  }
144
145
407
  bool HasIntentsDB() const override {
146
407
    return transactional;
147
407
  }
148
149
3.70k
  void FirstOpIdOfSegment(const std::string& path, OpId first_op_id) override {
150
3.70k
    LOG(INFO) << "First OpId of segment " << DirName(path) << ": " << first_op_id;
151
3.70k
    actual_report.first_op_ids_of_segments_reversed.push_back(first_op_id);
152
3.70k
  }
153
154
  // ----------------------------------------------------------------------------------------------
155
  // These fields are populated based in callbacks from TabletBootstrap.
156
  // ----------------------------------------------------------------------------------------------
157
158
  // This is queried by TabletBootstrap during its initialization.
159
  boost::optional<DocDbOpIds> flushed_op_ids;
160
161
  BootstrapReport actual_report;
162
163
  // A parameter set by the test.
164
  bool transactional = false;
165
};
166
167
static constexpr TableType kTableType = TableType::YQL_TABLE_TYPE;
168
169
class BootstrapTest : public LogTestBase {
170
 protected:
171
10
  void SetUp() override {
172
10
    LogTestBase::SetUp();
173
10
    test_hooks_ = std::make_shared<BootstrapTestHooksImpl>();
174
10
  }
175
176
410
  Status LoadTestRaftGroupMetadata(RaftGroupMetadataPtr* meta) {
177
410
    Schema schema = SchemaBuilder(schema_).Build();
178
410
    std::pair<PartitionSchema, Partition> partition = CreateDefaultPartition(schema);
179
180
410
    auto table_info = std::make_shared<TableInfo>(
181
410
        log::kTestTable, log::kTestNamespace, log::kTestTable, kTableType, schema, IndexMap(),
182
410
        boost::none /* index_info */, 0 /* schema_version */, partition.first);
183
410
    *meta = VERIFY_RESULT(RaftGroupMetadata::LoadOrCreate(RaftGroupMetadataData {
184
410
      .fs_manager = fs_manager_.get(),
185
410
      .table_info = table_info,
186
410
      .raft_group_id = log::kTestTablet,
187
410
      .partition = partition.second,
188
410
      .tablet_data_state = TABLET_DATA_READY,
189
410
    }));
190
410
    return (*meta)->Flush();
191
410
  }
192
193
1
  Status PersistTestRaftGroupMetadataState(TabletDataState state) {
194
1
    RaftGroupMetadataPtr meta;
195
1
    RETURN_NOT_OK(LoadTestRaftGroupMetadata(&meta));
196
1
    meta->set_tablet_data_state(state);
197
1
    RETURN_NOT_OK(meta->Flush());
198
1
    return Status::OK();
199
1
  }
200
201
  Status RunBootstrapOnTestTablet(const RaftGroupMetadataPtr& meta,
202
                                  TabletPtr* tablet,
203
409
                                  ConsensusBootstrapInfo* boot_info) {
204
409
    std::unique_ptr<TabletStatusListener> listener(new TabletStatusListener(meta));
205
409
    scoped_refptr<LogAnchorRegistry> log_anchor_registry(new LogAnchorRegistry());
206
    // Now attempt to recover the log
207
409
    TabletOptions tablet_options;
208
409
    TabletInitData tablet_init_data = {
209
409
      .metadata = meta,
210
409
      .client_future = std::shared_future<client::YBClient*>(),
211
409
      .clock = scoped_refptr<Clock>(LogicalClock::CreateStartingAt(HybridTime::kInitial)),
212
409
      .parent_mem_tracker = shared_ptr<MemTracker>(),
213
409
      .block_based_table_mem_tracker = shared_ptr<MemTracker>(),
214
409
      .metric_registry = nullptr,
215
409
      .log_anchor_registry = log_anchor_registry,
216
409
      .tablet_options = tablet_options,
217
409
      .log_prefix_suffix = std::string(),
218
409
      .transaction_participant_context = nullptr,
219
409
      .local_tablet_filter = client::LocalTabletFilter(),
220
409
      .transaction_coordinator_context = nullptr,
221
409
      .txns_enabled = TransactionsEnabled::kTrue,
222
409
      .is_sys_catalog = IsSysCatalogTablet::kFalse,
223
409
    };
224
409
    BootstrapTabletData data = {
225
409
      .tablet_init_data = tablet_init_data,
226
409
      .listener = listener.get(),
227
409
      .append_pool = log_thread_pool_.get(),
228
409
      .allocation_pool = log_thread_pool_.get(),
229
409
      .retryable_requests = nullptr,
230
409
      .test_hooks = test_hooks_
231
409
    };
232
409
    RETURN_NOT_OK(BootstrapTablet(data, tablet, &log_, boot_info));
233
407
    return Status::OK();
234
409
  }
235
236
  Status BootstrapTestTablet(
237
      TabletPtr* tablet,
238
408
      ConsensusBootstrapInfo* boot_info) {
239
408
    RaftGroupMetadataPtr meta;
240
408
    RETURN_NOT_OK_PREPEND(LoadTestRaftGroupMetadata(&meta),
241
408
                          "Unable to load test tablet metadata");
242
243
408
    consensus::RaftConfigPB config;
244
408
    config.set_opid_index(consensus::kInvalidOpIdIndex);
245
408
    consensus::RaftPeerPB* peer = config.add_peers();
246
408
    peer->set_permanent_uuid(meta->fs_manager()->uuid());
247
408
    peer->set_member_type(consensus::PeerMemberType::VOTER);
248
249
408
    std::unique_ptr<ConsensusMetadata> cmeta;
250
408
    RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(meta->fs_manager(), meta->raft_group_id(),
251
408
                                                    meta->fs_manager()->uuid(),
252
408
                                                    config, kMinimumTerm, &cmeta),
253
408
                          "Unable to create consensus metadata");
254
255
408
    RETURN_NOT_OK_PREPEND(RunBootstrapOnTestTablet(meta, tablet, boot_info),
256
407
                          "Unable to bootstrap test tablet");
257
407
    return Status::OK();
258
408
  }
259
260
  void IterateTabletRows(const Tablet* tablet,
261
6
                         vector<string>* results) {
262
6
    auto iter = tablet->NewRowIterator(schema_);
263
6
    ASSERT_OK(iter);
264
6
    ASSERT_OK(IterateToStringList(iter->get(), results));
265
6
    for (const string& result : *results) {
266
0
      VLOG(1) << result;
267
6
    }
268
6
  }
269
270
  std::shared_ptr<BootstrapTestHooksImpl> test_hooks_;
271
};
272
273
// ===============================================================================================
274
// TESTS
275
// ===============================================================================================
276
277
// Tests a normal bootstrap scenario.
278
1
TEST_F(BootstrapTest, TestBootstrap) {
279
1
  BuildLog();
280
1
  const auto current_op_id = MakeOpId(1, current_index_);
281
1
  AppendReplicateBatch(current_op_id, current_op_id);
282
1
  TabletPtr tablet;
283
1
  ConsensusBootstrapInfo boot_info;
284
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
285
286
1
  vector<string> results;
287
1
  IterateTabletRows(tablet.get(), &results);
288
1
}
289
290
// Tests attempting a local bootstrap of a tablet that was in the middle of a remote bootstrap
291
// before "crashing".
292
1
TEST_F(BootstrapTest, TestIncompleteRemoteBootstrap) {
293
1
  BuildLog();
294
295
1
  ASSERT_OK(PersistTestRaftGroupMetadataState(TABLET_DATA_COPYING));
296
1
  TabletPtr tablet;
297
1
  ConsensusBootstrapInfo boot_info;
298
1
  Status s = BootstrapTestTablet(&tablet, &boot_info);
299
2
  ASSERT_TRUE(s.IsCorruption()) << "Expected corruption: " << s.ToString();
300
1
  ASSERT_STR_CONTAINS(s.ToString(), "RaftGroupMetadata bootstrap state is TABLET_DATA_COPYING");
301
1
  LOG(INFO) << "State is still TABLET_DATA_COPYING, as expected: " << s.ToString();
302
1
}
303
304
// Test a crash before a REPLICATE message is marked as committed by a future REPLICATE message.
305
// Bootstrap should not replay the operation, but should return it in the ConsensusBootstrapInfo.
306
1
TEST_F(BootstrapTest, TestOrphanedReplicate) {
307
1
  BuildLog();
308
309
  // Append a REPLICATE with no commit
310
1
  auto replicate_index = current_index_++;
311
312
1
  OpIdPB opid = MakeOpId(1, replicate_index);
313
314
1
  AppendReplicateBatch(opid);
315
316
  // Bootstrap the tablet. It shouldn't replay anything.
317
1
  ConsensusBootstrapInfo boot_info;
318
1
  TabletPtr tablet;
319
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
320
321
  // Table should be empty because we didn't replay the REPLICATE.
322
1
  vector<string> results;
323
1
  IterateTabletRows(tablet.get(), &results);
324
1
  ASSERT_EQ(0, results.size());
325
326
  // The consensus bootstrap info should include the orphaned REPLICATE.
327
2
  ASSERT_EQ(1, boot_info.orphaned_replicates.size())
328
2
      << yb::ToString(boot_info.orphaned_replicates);
329
1
  ASSERT_STR_CONTAINS(boot_info.orphaned_replicates[0]->ShortDebugString(),
330
1
                      "this is a test mutate");
331
332
  // And it should also include the latest opids.
333
1
  EXPECT_EQ("term: 1 index: 1", boot_info.last_id.ShortDebugString());
334
1
}
335
336
// Bootstrap should fail if no ConsensusMetadata file exists.
337
1
TEST_F(BootstrapTest, TestMissingConsensusMetadata) {
338
1
  BuildLog();
339
340
1
  RaftGroupMetadataPtr meta;
341
1
  ASSERT_OK(LoadTestRaftGroupMetadata(&meta));
342
343
1
  TabletPtr tablet;
344
1
  ConsensusBootstrapInfo boot_info;
345
1
  Status s = RunBootstrapOnTestTablet(meta, &tablet, &boot_info);
346
347
1
  ASSERT_TRUE(s.IsNotFound());
348
1
  ASSERT_STR_CONTAINS(s.ToString(), "Unable to load Consensus metadata");
349
1
}
350
351
// Tests that when we have two consecutive replicates and the commit index specified in the second
352
// is that of the first, only the first one is committed.
353
1
TEST_F(BootstrapTest, TestCommitFirstMessageBySpecifyingCommittedIndexInSecond) {
354
1
  BuildLog();
355
356
  // This appends a write with op 1.1
357
1
  const OpIdPB insert_opid = MakeOpId(1, 1);
358
1
  AppendReplicateBatch(insert_opid, MakeOpId(0, 0),
359
1
                       {TupleForAppend(10, 1, "this is a test insert")}, AppendSync::kTrue);
360
361
  // This appends a write with op 1.2 and commits the previous one.
362
1
  const OpIdPB mutate_opid = MakeOpId(1, 2);
363
1
  AppendReplicateBatch(mutate_opid, insert_opid,
364
1
                       {TupleForAppend(10, 2, "this is a test mutate")}, AppendSync::kTrue);
365
1
  ConsensusBootstrapInfo boot_info;
366
1
  TabletPtr tablet;
367
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
368
1
  ASSERT_EQ(boot_info.orphaned_replicates.size(), 1);
369
1
  ASSERT_OPID_EQ(boot_info.last_committed_id, insert_opid);
370
371
  // Confirm that one operation was applied.
372
1
  vector<string> results;
373
1
  IterateTabletRows(tablet.get(), &results);
374
1
  ASSERT_EQ(1, results.size());
375
1
}
376
377
1
TEST_F(BootstrapTest, TestOperationOverwriting) {
378
1
  BuildLog();
379
380
1
  const OpIdPB opid = MakeOpId(1, 1);
381
382
  // Append a replicate in term 1 with only one row.
383
1
  AppendReplicateBatch(opid, MakeOpId(0, 0), {TupleForAppend(1, 0, "this is a test insert")});
384
385
  // Now append replicates for 4.2 and 4.3
386
1
  AppendReplicateBatch(MakeOpId(4, 2));
387
1
  AppendReplicateBatch(MakeOpId(4, 3));
388
389
1
  ASSERT_OK(RollLog());
390
  // And overwrite with 3.2
391
1
  AppendReplicateBatch(MakeOpId(3, 2), MakeOpId(1, 1), {}, AppendSync::kTrue);
392
393
  // When bootstrapping we should apply ops 1.1 and get 3.2 as pending.
394
1
  ConsensusBootstrapInfo boot_info;
395
1
  TabletPtr tablet;
396
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
397
398
1
  ASSERT_EQ(boot_info.orphaned_replicates.size(), 1);
399
1
  ASSERT_OPID_EQ(boot_info.orphaned_replicates[0]->id(), MakeOpId(3, 2));
400
401
  // Confirm that the legitimate data is there.
402
1
  vector<string> results;
403
1
  IterateTabletRows(tablet.get(), &results);
404
1
  ASSERT_EQ(1, results.size());
405
406
1
  ASSERT_EQ("{ int32_value: 1 int32_value: 0 string_value: \"this is a test insert\" }",
407
1
            results[0]);
408
1
}
409
410
1
TEST_F(BootstrapTest, OverwriteTailWithFlushedIndex) {
411
1
  BuildLog();
412
413
1
  test_hooks_->flushed_op_ids = DocDbOpIds{{3, 2}, {3, 2}};
414
415
1
  const std::string kTestStr("this is a test insert");
416
6
  const auto get_test_tuple = [kTestStr](int i) {
417
6
    return TupleForAppend(i, 0, kTestStr);
418
6
  };
419
420
  // Append a replicate in term 1 with only one row (not committed yet).
421
1
  const auto nothing_committed = MakeOpId(0, 0);
422
423
1
  AppendReplicateBatch(MakeOpId(1, 1), nothing_committed, {get_test_tuple(10)});
424
425
  // Now append replicates for 2.2 and 2.3 (not committed yet).
426
1
  AppendReplicateBatch(MakeOpId(2, 2), nothing_committed, {get_test_tuple(1020)});
427
1
  AppendReplicateBatch(MakeOpId(2, 3), nothing_committed, {get_test_tuple(1030)});
428
429
  // And overwrite with 3.2, committing 1.1 and 3.2. This should abort 2.2 and a 2.3.
430
1
  AppendReplicateBatch(MakeOpId(3, 2), MakeOpId(3, 2), {get_test_tuple(20)});
431
432
1
  AppendReplicateBatch(MakeOpId(3, 3), MakeOpId(3, 2), {get_test_tuple(30)});
433
1
  AppendReplicateBatch(MakeOpId(3, 4), MakeOpId(3, 3), {get_test_tuple(40)});
434
435
1
  ConsensusBootstrapInfo boot_info;
436
1
  TabletPtr tablet;
437
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
438
439
1
  LOG(INFO) << "Replayed OpIds: " << ToString(test_hooks_->actual_report.replayed);
440
441
1
  ASSERT_EQ(boot_info.orphaned_replicates.size(), 1);
442
1
  ASSERT_OPID_EQ(boot_info.orphaned_replicates[0]->id(), MakeOpId(3, 4));
443
444
1
  const std::vector<OpId> expected_replayed_op_ids{{3, 3}};
445
1
  ASSERT_EQ(expected_replayed_op_ids, test_hooks_->actual_report.replayed);
446
447
  // Confirm that the legitimate data is there. Note that none of the data for previously flushed
448
  // OpIds (anything at index 2 or before) has been replayed.
449
1
  vector<string> results;
450
1
  IterateTabletRows(tablet.get(), &results);
451
1
  ASSERT_EQ(1, results.size());
452
453
1
  ASSERT_EQ(
454
1
      Format("{ int32_value: 30 int32_value: 0 string_value: \"$0\" }", kTestStr),
455
1
      results[0]);
456
1
}
457
458
// Test that we do not crash when a consensus-only operation has a hybrid_time that is higher than a
459
// hybrid_time assigned to a write operation that follows it in the log.
460
// TODO: this must not happen in YB. Ensure this is not happening and update the test.
461
1
TEST_F(BootstrapTest, TestConsensusOnlyOperationOutOfOrderHybridTime) {
462
1
  BuildLog();
463
464
  // Append NO_OP.
465
1
  auto noop_replicate = std::make_shared<ReplicateMsg>();
466
1
  noop_replicate->set_op_type(consensus::NO_OP);
467
1
  *noop_replicate->mutable_id() = MakeOpId(1, 1);
468
1
  noop_replicate->set_hybrid_time(2);
469
470
  // All YB REPLICATEs require this:
471
1
  *noop_replicate->mutable_committed_op_id() = MakeOpId(0, 0);
472
473
1
  AppendReplicateBatch(noop_replicate, AppendSync::kTrue);
474
475
  // Append WRITE_OP with higher OpId and lower hybrid_time, and commit both messages.
476
1
  const auto second_opid = MakeOpId(1, 2);
477
1
  AppendReplicateBatch(second_opid, second_opid, {TupleForAppend(1, 1, "foo")});
478
479
1
  ConsensusBootstrapInfo boot_info;
480
1
  TabletPtr tablet;
481
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
482
1
  ASSERT_EQ(boot_info.orphaned_replicates.size(), 0);
483
1
  ASSERT_OPID_EQ(boot_info.last_committed_id, second_opid);
484
485
  // Confirm that the insert op was applied.
486
1
  vector<string> results;
487
1
  IterateTabletRows(tablet.get(), &results);
488
1
  ASSERT_EQ(1, results.size());
489
1
}
490
491
// Test that we don't overflow opids. Regression test for KUDU-1933.
492
1
TEST_F(BootstrapTest, TestBootstrapHighOpIdIndex) {
493
  // Start appending with a log index 3 under the int32 max value.
494
  // Append 6 log entries, which will roll us right through the int32 max.
495
1
  const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3;
496
1
  const int kNumEntries = 6;
497
1
  BuildLog();
498
1
  current_index_ = first_log_index;
499
7
  for (int i = 0; i < kNumEntries; i++) {
500
6
    AppendReplicateBatchToLog(1);
501
6
  }
502
503
  // Kick off tablet bootstrap and ensure everything worked.
504
1
  TabletPtr tablet;
505
1
  ConsensusBootstrapInfo boot_info;
506
1
  ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
507
1
  OpIdPB last_opid;
508
1
  last_opid.set_term(1);
509
1
  last_opid.set_index(current_index_ - 1);
510
1
  ASSERT_OPID_EQ(last_opid, boot_info.last_id);
511
1
  ASSERT_OPID_EQ(last_opid, boot_info.last_committed_id);
512
1
}
513
514
struct BootstrapInputEntry {
515
2.16M
  const OpId& op_id() const { return batch_data.op_id; }
516
517
600k
  bool IsTransactional() const { return !batch_data.txn_id.IsNil(); }
518
519
0
  const std::string ToString() const {
520
0
    std::ostringstream ss;
521
0
    ss << "{ ";
522
0
    ss << "op_id: " << op_id() << " ";
523
0
    ss << "committed_op_id: " << batch_data.committed_op_id << " ";
524
0
    ss << "op_type: " << consensus::OperationType_Name(batch_data.op_type) << " ";
525
0
    if (IsTransactional()) {
526
0
      ss << "txn: " << batch_data.txn_id << " ";
527
0
      ss << "txn_status: " << TransactionStatus_Name(batch_data.txn_status) << " ";
528
0
    }
529
0
    if (start_new_segment_with_this_entry) {
530
0
      ss << "start_new_segment_with_this_entry: true ";
531
0
    }
532
0
    ss << "}";
533
0
    return ss.str();
534
0
  }
535
536
  LogTestBase::AppendReplicateBatchData batch_data;
537
538
  bool start_new_segment_with_this_entry = false;
539
};
540
541
struct BootstrapInput {
542
  // All entries that are written to the log and then bootstrapped.
543
  std::vector<BootstrapInputEntry> entries;
544
545
  BootstrapReport expected_report;
546
547
  // This should match the OpIds of entries we call "orphaned replicates" at the end of bootstrap.
548
  std::vector<OpId> uncommitted_tail;
549
550
  // Entries that can be overwritten. None of these are comitted.
551
  std::set<OpId> overwritable;
552
553
  // Committed entries. None of these can be overwritten.
554
  std::set<OpId> committed;
555
556
  DocDbOpIds flushed_op_ids;
557
  OpId final_committed_op_id;
558
  bool transactional = false;
559
};
560
561
// ------------------------------------------------------------------------------------------------
562
// Randomized bootstrap test
563
// ------------------------------------------------------------------------------------------------
564
565
// An internal function for generating a randomized tablet bootstrap input. Populates the entries
566
// vector in the res_input struct. Also returns the final map of index to OpId, with all overwrites
567
// of entries by a new leader having already taken place.
568
std::map<int64_t, OpId> GenerateRawEntriesAndFinalOpByIndex(
569
    const size_t num_entries,
570
    std::mt19937_64* const rng,
571
400
    BootstrapInput* const res_input) {
572
400
  const bool transactional = res_input->transactional;
573
400
  auto& entries = res_input->entries;
574
575
  // This map holds the final OpId at any given index, provided that it has not been overwritten
576
  // by an entry at a later term and the same or earlier index.
577
400
  std::map<int64_t, OpId> final_op_id_by_index;
578
579
400
  int64_t index = 1;
580
400
  int64_t term = 1;
581
400
  int64_t max_index = 1;
582
400
  entries.resize(num_entries);
583
600k
  for (size_t i = 0; i < num_entries; ++i) {
584
600k
    auto& entry = entries[i];
585
600k
    auto& batch_data = entry.batch_data;
586
600k
    batch_data.op_id = {term, index};
587
600k
    if (transactional && RandomUniformInt(1, 4, rng) == 1) {
588
80.5k
      batch_data.op_type = consensus::OperationType::UPDATE_TRANSACTION_OP;
589
80.5k
      batch_data.txn_status = TransactionStatus::APPLYING;
590
80.5k
      batch_data.txn_id = TransactionId::GenerateRandom(rng);
591
519k
    } else {
592
519k
      batch_data.op_type = consensus::OperationType::WRITE_OP;
593
519k
      if (transactional && RandomUniformInt(1, 2, rng) == 1) {
594
120k
        batch_data.txn_id = TransactionId::GenerateRandom(rng);
595
120k
      }
596
519k
    }
597
598
600k
    final_op_id_by_index[index] = batch_data.op_id;
599
600k
    max_index = std::max(max_index, index);
600
601
    // The first entry always start a new segment. Otherwise, we start a new segment randomly.
602
600k
    entry.start_new_segment_with_this_entry = i == 0 || RandomUniformInt(1, 100, rng) == 1;
603
604
    // Advance to the next OpId.
605
600k
    if (i < num_entries - 1) {
606
599k
      if (RandomUniformInt(1, 30) == 1) {
607
        // We advance to the next term and the new leader overwrites a tail of the log.
608
20.1k
        term++;
609
610
        // Jump back in most cases, but index_delta of 0 (keeping the same index) or even
611
        // index_delta of 1 (increasing the index by 1) are also possible.
612
20.1k
        const auto index_delta = RandomUniformInt(
613
            // Jump back by some amount. Very rarely, we might jump back pretty far.
614
20.1k
            RandomUniformInt(1, 500) == 0 ? -200 : -10,
615
            // Upper bound is 1, meaning we increment both term and index.
616
20.1k
            1,
617
20.1k
            rng);
618
20.1k
        index = std::max<int64_t>(1, index + index_delta);
619
579k
      } else {
620
        // In the majority of cases we just advance to the next index.
621
579k
        index++;
622
579k
      }
623
624
599k
      const auto lower_bound_it = final_op_id_by_index.lower_bound(index);
625
599k
      final_op_id_by_index.erase(lower_bound_it, final_op_id_by_index.end());
626
599k
    }
627
600k
  }
628
629
400
  return final_op_id_by_index;
630
400
}
631
632
400
void GenerateRandomInput(size_t num_entries, std::mt19937_64* rng, BootstrapInput* res_input) {
633
400
  auto& entries = res_input->entries;
634
400
  const bool transactional = RandomUniformBool(rng);
635
400
  res_input->transactional = transactional;
636
637
400
  const auto final_op_id_by_index = GenerateRawEntriesAndFinalOpByIndex(
638
400
      num_entries, rng, res_input);
639
640
1.80M
  const auto committed_op_id_for_index = [&final_op_id_by_index](int64_t index) -> OpId {
641
1.80M
    auto it = final_op_id_by_index.find(index);
642
1.80M
    if (it == final_op_id_by_index.end()) {
643
1.56k
      return OpId();
644
1.56k
    }
645
1.79M
    return it->second;
646
1.79M
  };
647
648
1.20M
  const auto is_op_id_committable = [&committed_op_id_for_index](const OpId& op_id) {
649
1.20M
    return committed_op_id_for_index(op_id.index) == op_id;
650
1.20M
  };
651
652
  // ----------------------------------------------------------------------------------------------
653
  // Compute committed OpId for every entry, as well as the final committed OpId.
654
400
  {
655
400
    OpId committed_op_id;
656
657
    // Entries that have not been overwritten by future entries with the same index and a later
658
    // term.
659
400
    std::set<int64_t> finalized_indexes;
660
400
    int64_t committable_up_to_index = 0;
661
662
600k
    for (auto& entry : entries) {
663
600k
      const auto& op_id = entry.op_id();
664
665
600k
      if (is_op_id_committable(op_id)) {
666
489k
        finalized_indexes.insert(op_id.index);
667
489k
      }
668
669
1.08M
      while (finalized_indexes.count(committable_up_to_index + 1)) {
670
489k
        committable_up_to_index++;
671
489k
      }
672
673
600k
      if (committable_up_to_index >= 1) {
674
599k
        const int64_t new_committed_index =
675
300k
            RandomUniformBool(rng) ? committed_op_id.index
676
299k
                                   : RandomUniformInt(committed_op_id.index,
677
299k
                                                      committable_up_to_index,
678
299k
                                                      rng);
679
599k
        const auto new_committed_op_id = committed_op_id_for_index(new_committed_index);
680
599k
        ASSERT_GE(new_committed_op_id, committed_op_id);
681
599k
        committed_op_id = new_committed_op_id;
682
599k
      }
683
684
600k
      entry.batch_data.committed_op_id = committed_op_id;
685
600k
    }
686
687
400
    res_input->final_committed_op_id = committed_op_id;
688
400
  }
689
690
  // ----------------------------------------------------------------------------------------------
691
  // Choose flushed OpIds for regular and intents RocksDBs.
692
  // ----------------------------------------------------------------------------------------------
693
694
  // Test the important case of the flushed OpIds being exactly equal to the last entry in the
695
  // log. In this case we would previously fail to correctly overwrite the tail of the log
696
  // because we would not even look at these entries.
697
  //
698
  // More details https://github.com/yugabyte/yugabyte-db/issues/5003
699
400
  const bool all_entries_committed_and_flushed = RandomUniformInt(1, 20, rng) == 1;
700
400
  if (all_entries_committed_and_flushed) {
701
20
    res_input->final_committed_op_id = entries.back().op_id();
702
20
  }
703
400
  const auto final_committed_op_id = res_input->final_committed_op_id;
704
705
400
  const auto regular_flushed_op_id = all_entries_committed_and_flushed
706
20
      ? final_committed_op_id
707
380
      : committed_op_id_for_index(RandomUniformInt<int64_t>(0, final_committed_op_id.index, rng));
708
709
  // Intents RocksDB cannot be ahead of regular RocksDB in its flushed OpId.
710
214
  const auto intents_flushed_op_id = transactional ? (all_entries_committed_and_flushed
711
9
      ? final_committed_op_id
712
205
      : (RandomUniformBool(rng)
713
          // Make intents and regular DB's flushed OpId the same with a 50% probability.
714
114
          ? regular_flushed_op_id
715
          // Otherwise, the flushed index in the intents DB will be lagging that of the regular DB.
716
91
          : committed_op_id_for_index(RandomUniformInt<int64>(0, regular_flushed_op_id.index, rng))
717
205
      )
718
186
  ) : /* or, in the non-transactional case: */ OpId();
719
720
400
  res_input->flushed_op_ids = {
721
400
    .regular = regular_flushed_op_id,
722
400
    .intents = intents_flushed_op_id
723
400
  };
724
400
  const int64_t intents_flushed_index = intents_flushed_op_id.index;
725
400
  const int64_t regular_flushed_index = regular_flushed_op_id.index;
726
727
400
  const std::vector<OpId> first_opids_in_segments = [&entries]() {
728
400
    std::vector<OpId> first_op_ids;
729
600k
    for (const auto& entry : entries) {
730
600k
      if (entry.start_new_segment_with_this_entry) {
731
6.51k
        first_op_ids.push_back(entry.op_id());
732
6.51k
      }
733
600k
    }
734
400
    return first_op_ids;
735
400
  }();
736
737
  // ----------------------------------------------------------------------------------------------
738
  // Determine segments that the bootstrap procedure will replay.
739
  // ----------------------------------------------------------------------------------------------
740
741
  // Find the first OpId of the segment that we'll look at in the --skip_wal_rewrite mode.
742
400
  const OpId first_op_id_of_segment_to_replay = [&]() {
743
    // This is the cut-off OpId that we use in the "bootstrap optimizer" (--skip_wal_rewrite) logic
744
    // to find the first log segment to replay. The production code uses min of intents and regular
745
    // flushed OpId, but we know that intents_flushed_op_id <= regular_flushed_op_id.
746
400
    const auto flushed_op_id_for_first_segment_search = transactional ?
747
214
        intents_flushed_op_id : regular_flushed_op_id;
748
749
    // Find the first OpId in the array of first OpIds of segments such that it is greater than the
750
    // cut-off. Then, the segment before that will be the last segment with OpId <= cutoff, which is
751
    // what we need.
752
400
    const auto first_segment_to_replay_op_id_it = std::upper_bound(
753
400
        first_opids_in_segments.begin(), first_opids_in_segments.end(),
754
400
        flushed_op_id_for_first_segment_search);
755
400
    return first_segment_to_replay_op_id_it == first_opids_in_segments.begin()
756
0
        ? entries.front().op_id()
757
400
        : *(first_segment_to_replay_op_id_it - 1);
758
400
  }();
759
760
4.09k
  for (auto it = first_opids_in_segments.rbegin(); it != first_opids_in_segments.rend(); it++) {
761
4.05k
    if (*it < first_op_id_of_segment_to_replay)
762
361
      break;
763
3.69k
    res_input->expected_report.first_op_ids_of_segments_reversed.push_back(*it);
764
3.69k
  }
765
766
  // ----------------------------------------------------------------------------------------------
767
  // Compute expected overwritten OpIds.
768
  // ----------------------------------------------------------------------------------------------
769
770
  // Compute the set of OpIds to be overwritten by iterating starting with the first segment
771
  // that will be replayed.
772
400
  {
773
400
    auto& exact_overwrites = res_input->expected_report.overwritten;
774
400
    std::map<int64_t, OpId> pending_replicates;
775
600k
    for (const auto& entry : entries) {
776
600k
      const auto& op_id = entry.op_id();
777
600k
      if (op_id >= first_op_id_of_segment_to_replay) {
778
361k
        auto remove_from_it = pending_replicates.lower_bound(entry.op_id().index);
779
427k
        for (auto it = remove_from_it; it != pending_replicates.end(); ++it) {
780
66.1k
          exact_overwrites.push_back(it->second);
781
66.1k
        }
782
361k
        pending_replicates.erase(remove_from_it, pending_replicates.end());
783
361k
        ASSERT_TRUE(pending_replicates.emplace(op_id.index, op_id).second);
784
361k
      }
785
600k
    }
786
400
  }
787
788
  // ----------------------------------------------------------------------------------------------
789
  // Compute expected replayed OpIds, OpIds to be added to RetryableRequests, "overwritable" OpIds.
790
  // ----------------------------------------------------------------------------------------------
791
792
400
  {
793
400
    auto& replayed = res_input->expected_report.replayed;
794
400
    auto& replayed_to_intents_only = res_input->expected_report.replayed_to_intents_only;
795
600k
    for (const auto& entry : entries) {
796
600k
      const auto op_id = entry.op_id();
797
600k
      const auto& batch_data = entry.batch_data;
798
600k
      const auto op_type = batch_data.op_type;
799
600k
      const int64_t index = op_id.index;
800
600k
      const bool is_transactional = entry.IsTransactional();
801
600k
      if (is_op_id_committable(op_id) && op_id <= final_committed_op_id) {
802
        // This operation has been committed in Raft.
803
488k
        res_input->committed.insert(op_id);
804
488k
        if (op_id >= first_op_id_of_segment_to_replay) {
805
294k
          res_input->expected_report.retryable_requests.push_back(op_id);
806
294k
        }
807
488k
        if (index > intents_flushed_index) {
808
382k
          if (op_id >= first_op_id_of_segment_to_replay) {
809
278k
            bool replay = true;
810
278k
            if (index <= regular_flushed_index) {
811
              // We are in the (intents_flushed_index, regular_flushed_index] range. Special rules
812
              // are used to decide whether to replay these operations.
813
40.2k
              if (op_type == consensus::OperationType::WRITE_OP) {
814
                // Only need to replay intent writes in this index range.
815
33.5k
                replay = is_transactional;
816
6.69k
              } else if (op_type == consensus::OperationType::UPDATE_TRANSACTION_OP) {
817
6.69k
                replay = batch_data.txn_status == TransactionStatus::APPLYING;
818
6.69k
                if (replay) {
819
6.69k
                  replayed_to_intents_only.push_back(op_id);
820
6.69k
                }
821
0
              } else {
822
0
                FAIL() << "Unknown operation type: " << consensus::OperationType_Name(op_type);
823
0
              }
824
278k
            }
825
278k
            if (replay) {
826
255k
              replayed.push_back(op_id);
827
255k
            }
828
278k
          }
829
382k
        }
830
111k
      } else {
831
        // This operation was never committed. Mark it as "overwritable", meaning it _could_ be
832
        // overwritten as part of tablet bootstrap, but is not guaranteed to be.
833
111k
        res_input->overwritable.insert(op_id);
834
111k
      }
835
600k
    }
836
400
  }
837
838
  // ----------------------------------------------------------------------------------------------
839
  // Uncommitted tail / orphaned replicates
840
  // ----------------------------------------------------------------------------------------------
841
842
  // Compute the expected "uncommitted tail" of operations, i.e. those operations that will be left
843
  // in "orphaned replicates" at the end of tablet bootstrap because we don't know if they are
844
  // Raft-committed yet.
845
400
  res_input->uncommitted_tail.clear();
846
489k
  for (const auto& index_and_final_op_id : final_op_id_by_index) {
847
489k
    const auto& op_id = index_and_final_op_id.second;
848
489k
    if (op_id > final_committed_op_id) {
849
1.11k
      res_input->uncommitted_tail.push_back(index_and_final_op_id.second);
850
1.11k
    }
851
489k
  }
852
400
}
853
854
1
TEST_F(BootstrapTest, RandomizedInput) {
855
1
  std::mt19937_64 rng;
856
857
  // Do not change this random seed so we can keep the tests repeatable.
858
1
  rng.seed(3141592653);
859
860
1
  const bool kVerboseOutput = false;  // Turn this on when debugging the test.
861
862
  // This is to avoid non-deterministic time-based behavior in "bootstrap optimizer"
863
  // (skip_wal_rewrite mode).
864
1
  FLAGS_retryable_request_timeout_secs = 0;
865
866
1
  const auto kNumIter = NonTsanVsTsan(400, 150);
867
1
  const auto kNumEntries = NonTsanVsTsan(1500, 500);
868
401
  for (int iteration = 1; iteration <= kNumIter; ++iteration) {
869
400
    LOG(INFO) << "Starting test iteration " << iteration;
870
400
    SCOPED_TRACE(Format("Test iteration $0", iteration));
871
400
    BootstrapInput input;
872
400
    ASSERT_NO_FATALS(GenerateRandomInput(kNumEntries, &rng, &input));
873
400
    if (kVerboseOutput) {
874
0
      for (const auto& entry : input.entries) {
875
0
        LOG(INFO) << "Entry: " << entry.ToString();
876
0
      }
877
0
    }
878
400
    LOG(INFO) << "Flushed OpIds in the test case: " << input.flushed_op_ids.ToString();
879
880
400
    CleanTablet();
881
400
    test_hooks_->Clear();
882
400
    test_hooks_->flushed_op_ids = input.flushed_op_ids;
883
400
    test_hooks_->transactional = input.transactional;
884
400
    LOG(INFO) << "Test iteration " << iteration << " is "
885
214
              << (input.transactional ? "TRANSACTIONAL" : "NON-TRANSACTIONAL");
886
400
    SCOPED_TRACE(Format("Test iteration $0 is transactional: $1", iteration, input.transactional));
887
888
400
    BuildLog();
889
890
600k
    for (size_t i = 0; i < input.entries.size(); ++i) {
891
600k
      const auto& entry = input.entries[i];
892
600k
      if (entry.start_new_segment_with_this_entry && i != 0) {
893
6.11k
        ASSERT_OK(RollLog());
894
6.11k
      }
895
600k
      AppendReplicateBatch(entry.batch_data);
896
600k
    }
897
898
400
    TabletPtr tablet;
899
400
    ConsensusBootstrapInfo boot_info;
900
400
    ASSERT_OK(BootstrapTestTablet(&tablet, &boot_info));
901
902
400
    std::ostringstream error_details;
903
400
    const auto& expected_report = input.expected_report;
904
400
    const auto& actual_report = test_hooks_->actual_report;
905
400
    const auto& actual_replayed_op_ids = actual_report.replayed;
906
907
400
    bool test_failed = false;
908
255k
    for (const auto& op_id : actual_replayed_op_ids) {
909
255k
      if (input.committed.count(op_id) == 0) {
910
0
        const auto msg = Format("An uncommitted entry was replayed: $0", op_id);
911
0
        LOG(ERROR) << "Failure: " << msg;
912
0
        error_details << msg << std::endl;
913
0
        test_failed = true;
914
0
      }
915
255k
    }
916
400
    ASSERT_VECTORS_EQ(expected_report.replayed, actual_replayed_op_ids);
917
400
    ASSERT_VECTORS_EQ(
918
400
        expected_report.replayed_to_intents_only,
919
400
        actual_report.replayed_to_intents_only);
920
921
66.1k
    for (const auto& op_id : actual_report.overwritten) {
922
66.1k
      auto it = input.overwritable.find(op_id);
923
66.1k
      if (it == input.overwritable.end()) {
924
0
        FAIL() << "Entry " << op_id << " was overwritten but was not suppossed to be.";
925
0
      }
926
66.1k
    }
927
928
400
    std::vector<OpId> actual_uncommitted_tail;
929
400
    actual_uncommitted_tail.reserve(boot_info.orphaned_replicates.size());
930
1.11k
    for (const auto& orphaned_replicate : boot_info.orphaned_replicates) {
931
1.11k
      actual_uncommitted_tail.push_back(OpId::FromPB(orphaned_replicate->id()));
932
1.11k
    }
933
934
400
    ASSERT_VECTORS_EQ(input.uncommitted_tail, actual_uncommitted_tail);
935
400
    ASSERT_VECTORS_EQ(
936
400
        expected_report.overwritten,
937
400
        actual_report.overwritten);
938
400
    ASSERT_VECTORS_EQ(
939
400
        expected_report.retryable_requests,
940
400
        actual_report.retryable_requests);
941
400
    ASSERT_VECTORS_EQ(
942
400
        expected_report.first_op_ids_of_segments_reversed,
943
400
        actual_report.first_op_ids_of_segments_reversed);
944
945
400
    if (test_failed) {
946
0
      FAIL() << error_details.str();
947
0
    }
948
400
    LOG(INFO) << "Test iteration " << iteration << " has succeeded";
949
400
  }
950
1
}
951
952
} // namespace tablet
953
} // namespace yb