YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <fcntl.h>
34
#include <unistd.h>
35
36
#include <algorithm>
37
#include <vector>
38
39
#include <boost/function.hpp>
40
41
#include <glog/stl_logging.h>
42
43
#include "yb/common/wire_protocol.h"
44
45
#include "yb/consensus/log-test-base.h"
46
#include "yb/consensus/log_index.h"
47
#include "yb/consensus/opid_util.h"
48
49
#include "yb/gutil/stl_util.h"
50
#include "yb/gutil/strings/substitute.h"
51
52
#include "yb/util/random.h"
53
#include "yb/util/size_literals.h"
54
#include "yb/util/stopwatch.h"
55
56
DEFINE_int32(num_batches, 10000,
57
             "Number of batches to write to/read from the Log in TestWriteManyBatches");
58
59
DECLARE_int32(log_min_segments_to_retain);
60
DECLARE_bool(never_fsync);
61
DECLARE_bool(writable_file_use_fsync);
62
DECLARE_int32(o_direct_block_alignment_bytes);
63
DECLARE_int32(o_direct_block_size_bytes);
64
65
namespace yb {
66
namespace log {
67
68
using std::shared_ptr;
69
using consensus::MakeOpId;
70
using strings::Substitute;
71
72
extern const char* kTestTable;
73
extern const char* kTestTablet;
74
75
struct TestLogSequenceElem {
76
  enum ElemType {
77
    REPLICATE,
78
    ROLL
79
  };
80
  ElemType type;
81
  OpIdPB id;
82
};
83
84
class LogTest : public LogTestBase {
85
 public:
86
  static constexpr TableType kTableType = TableType::YQL_TABLE_TYPE;
87
88
8
  void CreateAndRegisterNewAnchor(int64_t log_index, vector<LogAnchor*>* anchors) {
89
8
    anchors->push_back(new LogAnchor());
90
8
    log_anchor_registry_->Register(log_index, CURRENT_TEST_NAME(), anchors->back());
91
8
  }
92
93
  // Create a series of NO_OP entries in the log.
94
  // Anchor each segment on the first OpId of each log segment,
95
  // and update op_id to point to the next valid OpId.
96
  Status AppendMultiSegmentSequence(
97
3
      int num_total_segments, int num_ops_per_segment, OpIdPB* op_id, vector<LogAnchor*>* anchors) {
98
3
    CHECK(op_id->IsInitialized());
99
12
    for (int i = 0; i < num_total_segments - 1; i++) {
100
9
      if (anchors) {
101
5
        CreateAndRegisterNewAnchor(op_id->index(), anchors);
102
5
      }
103
9
      RETURN_NOT_OK(AppendNoOps(op_id, num_ops_per_segment));
104
9
      RETURN_NOT_OK(RollLog());
105
9
    }
106
107
3
    if (anchors) {
108
2
      CreateAndRegisterNewAnchor(op_id->index(), anchors);
109
2
    }
110
3
    RETURN_NOT_OK(AppendNoOps(op_id, num_ops_per_segment));
111
3
    return Status::OK();
112
3
  }
113
114
  Status AppendNewEmptySegmentToReader(int sequence_number,
115
                                       int first_repl_index,
116
3
                                       LogReader* reader) {
117
3
    string fqp = GetTestPath(strings::Substitute("wal-00000000$0", sequence_number));
118
3
    std::unique_ptr<WritableFile> w_log_seg;
119
3
    RETURN_NOT_OK(fs_manager_->env()->NewWritableFile(fqp, &w_log_seg));
120
3
    std::unique_ptr<RandomAccessFile> r_log_seg;
121
3
    RETURN_NOT_OK(fs_manager_->env()->NewRandomAccessFile(fqp, &r_log_seg));
122
123
3
    scoped_refptr<ReadableLogSegment> readable_segment(
124
3
        new ReadableLogSegment(fqp, shared_ptr<RandomAccessFile>(r_log_seg.release())));
125
126
3
    LogSegmentHeaderPB header;
127
3
    header.set_sequence_number(sequence_number);
128
3
    header.set_major_version(0);
129
3
    header.set_minor_version(0);
130
3
    header.set_unused_tablet_id(kTestTablet);
131
3
    SchemaToPB(GetSimpleTestSchema(), header.mutable_unused_schema());
132
133
3
    LogSegmentFooterPB footer;
134
3
    footer.set_num_entries(10);
135
3
    footer.set_min_replicate_index(first_repl_index);
136
3
    footer.set_max_replicate_index(first_repl_index + 9);
137
138
3
    RETURN_NOT_OK(readable_segment->Init(header, footer, 0));
139
3
    RETURN_NOT_OK(reader->AppendSegment(readable_segment));
140
3
    return Status::OK();
141
3
  }
142
143
  void GenerateTestSequence(size_t seq_len,
144
                            vector<TestLogSequenceElem>* ops,
145
                            vector<int64_t>* terms_by_index);
146
  void AppendTestSequence(const vector<TestLogSequenceElem>& seq);
147
148
  // Where to corrupt the log entry.
149
  enum CorruptionPosition {
150
    // Corrupt/truncate within the header.
151
    IN_HEADER,
152
    // Corrupt/truncate within the entry data itself.
153
    IN_ENTRY
154
  };
155
156
  void DoCorruptionTest(CorruptionType type, CorruptionPosition place,
157
                        Status expected_status, int expected_entries);
158
159
  Result<std::vector<OpId>> AppendAndCopy(size_t num_batches, size_t num_entries_per_batch);
160
161
60
  std::string GetLogCopyPath(size_t copy_idx) {
162
60
    return Format("$0.copy-$1", tablet_wal_path_, copy_idx);
163
60
  }
164
165
  Result<SegmentSequence> GetSegmentsFromLogCopyAndCheckLastOpIndex(
166
      size_t copy_idx, int64_t last_op_min_idx);
167
};
168
169
// If we write more than one entry in a batch, we should be able to
170
// read all of those entries back.
171
1
TEST_F(LogTest, TestMultipleEntriesInABatch) {
172
1
  BuildLog();
173
174
1
  OpIdPB opid;
175
1
  opid.set_term(1);
176
1
  opid.set_index(1);
177
178
1
  ASSERT_OK(AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2));
179
180
  // RollOver() the batch so that we have a properly formed footer.
181
1
  ASSERT_OK(log_->AllocateSegmentAndRollOver());
182
183
1
  SegmentSequence segments;
184
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
185
186
1
  auto read_entries = segments[0]->ReadEntries();
187
1
  ASSERT_OK(read_entries.status);
188
189
1
  ASSERT_EQ(2, read_entries.entries.size());
190
191
  // Verify the index.
192
1
  {
193
1
    LogIndexEntry entry;
194
1
    ASSERT_OK(log_->log_index_->GetEntry(1, &entry));
195
1
    ASSERT_EQ(1, entry.op_id.term);
196
1
    ASSERT_EQ(1, entry.segment_sequence_number);
197
1
    int64_t offset = entry.offset_in_segment;
198
199
1
    ASSERT_OK(log_->log_index_->GetEntry(2, &entry));
200
1
    ASSERT_EQ(1, entry.op_id.term);
201
1
    ASSERT_EQ(1, entry.segment_sequence_number);
202
1
    int64_t second_offset = entry.offset_in_segment;
203
204
    // The second entry should be at the same offset as the first entry
205
    // since they were written in the same batch.
206
1
    ASSERT_EQ(second_offset, offset);
207
1
  }
208
209
  // Test LookupOpId
210
1
  {
211
1
    auto loaded_op = ASSERT_RESULT(log_->GetLogReader()->LookupOpId(1));
212
1
    ASSERT_EQ(yb::OpId(1, 1), loaded_op);
213
1
    loaded_op = ASSERT_RESULT(log_->GetLogReader()->LookupOpId(2));
214
1
    ASSERT_EQ(yb::OpId(1, 2), loaded_op);
215
1
    auto result = log_->GetLogReader()->LookupOpId(3);
216
2
    ASSERT_TRUE(!result.ok() && result.status().IsNotFound())
217
2
        << "unexpected status: " << result.status();
218
1
  }
219
220
1
  ASSERT_OK(log_->Close());
221
1
}
222
223
// Tests that everything works properly with fsync enabled:
224
// This also tests SyncDir() (see KUDU-261), which is called whenever
225
// a new log segment is initialized.
226
1
TEST_F(LogTest, TestFsync) {
227
1
  options_.durable_wal_write = true;
228
1
  BuildLog();
229
230
1
  OpIdPB opid;
231
1
  opid.set_term(0);
232
1
  opid.set_index(1);
233
234
1
  ASSERT_OK(AppendNoOp(&opid));
235
1
  ASSERT_OK(log_->Close());
236
1
}
237
238
// Tests interval for durable wal write
239
1
TEST_F(LogTest, TestFsyncInterval) {
240
1
  options_.interval_durable_wal_write = MonoDelta::FromMilliseconds(1);
241
1
  BuildLog();
242
243
1
  OpIdPB opid;
244
1
  opid.set_term(0);
245
1
  opid.set_index(1);
246
247
1
  ASSERT_OK(AppendNoOp(&opid));
248
1
  SleepFor(MonoDelta::FromMilliseconds(2));
249
1
  ASSERT_OK(AppendNoOp(&opid));
250
1
  ASSERT_OK(log_->Close());
251
1
}
252
253
// Tests interval for durable wal write physically
254
1
TEST_F(LogTest, TestFsyncIntervalPhysical) {
255
1
  int interval = 1;
256
1
  options_.interval_durable_wal_write = MonoDelta::FromMilliseconds(interval);
257
1
  FLAGS_never_fsync = false;
258
1
  FLAGS_durable_wal_write = false;
259
1
  options_.preallocate_segments = false;
260
1
  BuildLog();
261
262
1
  OpIdPB opid;
263
1
  opid.set_term(0);
264
1
  opid.set_index(1);
265
266
1
  SegmentSequence segments;
267
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
268
1
  ASSERT_EQ(segments.size(), 1);
269
1
  int64_t orig_size = segments[0]->file_size();
270
1
  string fileName = segments.back()->readable_file()->filename();
271
272
1
  ASSERT_OK(AppendNoOp(&opid));
273
1
  SleepFor(MonoDelta::FromMilliseconds(interval + 1));
274
1
  ASSERT_OK(AppendNoOp(&opid));
275
276
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
277
1
  ASSERT_EQ(segments.size(), 1);
278
1
  int64_t new_size = segments[0]->file_size();
279
1
  ASSERT_GT(new_size, orig_size);
280
281
#if defined(__linux__)
282
  int fd = open(fileName.c_str(), O_RDONLY | O_DIRECT);
283
  ASSERT_GE(fd, 0);
284
#elif defined(__APPLE__)
285
1
  int fd = open(fileName.c_str(), O_RDONLY);
286
1
  ASSERT_GE(fd, 0);
287
1
  ASSERT_NE(fcntl(fd, F_NOCACHE, 1), -1);
288
1
#endif
289
1
  void* temp_buf = nullptr;
290
1
  ASSERT_EQ(posix_memalign(&temp_buf, FLAGS_o_direct_block_alignment_bytes,
291
1
                           FLAGS_o_direct_block_size_bytes), 0);
292
1
  ASSERT_GT(pread(fd, temp_buf, FLAGS_o_direct_block_size_bytes, 0), orig_size);
293
1
  ASSERT_OK(log_->Close());
294
1
  free(temp_buf);
295
1
}
296
297
// Tests data size for durable wal write
298
1
TEST_F(LogTest, TestFsyncDataSize) {
299
1
  options_.bytes_durable_wal_write_mb = 1;
300
1
  options_.interval_durable_wal_write = MonoDelta::FromMilliseconds(10000);
301
1
  BuildLog();
302
303
1
  OpIdPB opid;
304
1
  opid.set_term(0);
305
1
  opid.set_index(1);
306
307
1
  ssize_t size = 0;
308
1
  ASSERT_OK(AppendNoOps(&opid, 100 * 1024, &size));
309
1
  SleepFor(MonoDelta::FromMilliseconds(1));
310
1
  ASSERT_OK(AppendNoOp(&opid));
311
1
  ASSERT_OK(log_->Close());
312
1
  LOG(INFO)<< "Wrote " << size << " batches to log";
313
1
}
314
315
// Regression test for part of KUDU-735:
316
// if a log is not preallocated, we should properly track its on-disk size as we append to
317
// it.
318
1
TEST_F(LogTest, TestSizeIsMaintained) {
319
1
  options_.preallocate_segments = false;
320
1
  BuildLog();
321
322
1
  OpIdPB opid = MakeOpId(0, 1);
323
1
  ASSERT_OK(AppendNoOp(&opid));
324
325
1
  SegmentSequence segments;
326
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
327
1
  int64_t orig_size = segments[0]->file_size();
328
1
  ASSERT_GT(orig_size, 0);
329
330
1
  ASSERT_OK(AppendNoOp(&opid));
331
332
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
333
1
  int64_t new_size = segments[0]->file_size();
334
1
  ASSERT_GT(new_size, orig_size);
335
336
1
  ASSERT_OK(log_->Close());
337
1
}
338
339
// Test that the reader can read from the log even if it hasn't been
340
// properly closed.
341
1
TEST_F(LogTest, TestLogNotTrimmed) {
342
1
  BuildLog();
343
344
1
  OpIdPB opid;
345
1
  opid.set_term(0);
346
1
  opid.set_index(1);
347
348
1
  ASSERT_OK(AppendNoOp(&opid));
349
350
1
  LogEntries entries;
351
1
  SegmentSequence segments;
352
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
353
354
1
  ASSERT_OK(segments[0]->ReadEntries().status);
355
  // Close after testing to ensure correct shutdown
356
  // TODO : put this in TearDown() with a test on log state?
357
1
  ASSERT_OK(log_->Close());
358
1
}
359
360
// Test that the reader will not fail if a log file is completely blank.
361
// This happens when it's opened but nothing has been written.
362
// The reader should gracefully handle this situation, but somehow expose that
363
// the segment is uninitialized. See KUDU-140.
364
1
TEST_F(LogTest, TestBlankLogFile) {
365
1
  BuildLog();
366
367
  // The log's reader will have a segment...
368
1
  ASSERT_EQ(log_->GetLogReader()->num_segments(), 1);
369
370
  // ...and we're able to read from it.
371
1
  SegmentSequence segments;
372
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
373
374
1
  auto read_entries = segments[0]->ReadEntries();
375
1
  ASSERT_OK(read_entries.status);
376
377
  // ...It's just that it's empty.
378
1
  ASSERT_EQ(read_entries.entries.size(), 0);
379
1
}
380
381
void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
382
4
                               Status expected_status, int expected_entries) {
383
4
  const int kNumEntries = 4;
384
4
  BuildLog();
385
4
  OpIdPB op_id = MakeOpId(1, 1);
386
4
  ASSERT_OK(AppendNoOps(&op_id, kNumEntries));
387
388
  // Find the entry that we want to corrupt before closing the log.
389
4
  LogIndexEntry entry;
390
4
  ASSERT_OK(log_->log_index_->GetEntry(4, &entry));
391
392
4
  ASSERT_OK(log_->Close());
393
394
  // Corrupt the log as specified.
395
4
  ssize_t offset = 0;
396
4
  switch (place) {
397
2
    case IN_HEADER:
398
2
      offset = entry.offset_in_segment + 1;
399
2
      break;
400
2
    case IN_ENTRY:
401
2
      offset = entry.offset_in_segment + kEntryHeaderSize + 1;
402
2
      break;
403
4
  }
404
4
  ASSERT_OK(CorruptLogFile(env_.get(), log_->ActiveSegmentForTests()->path(), type, offset));
405
406
  // Open a new reader -- we don't reuse the existing LogReader from log_
407
  // because it has a cached header.
408
4
  std::unique_ptr<LogReader> reader;
409
4
  ASSERT_OK(LogReader::Open(fs_manager_->env(),
410
4
                            make_scoped_refptr(new LogIndex(log_->wal_dir_)), "Log reader: ",
411
4
                            tablet_wal_path_, nullptr, nullptr, &reader));
412
4
  ASSERT_EQ(1, reader->num_segments());
413
414
4
  SegmentSequence segments;
415
4
  ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
416
4
  auto read_entries = segments[0]->ReadEntries();
417
8
  ASSERT_EQ(read_entries.status.CodeAsString(), expected_status.CodeAsString())
418
8
      << "Got unexpected status: " << read_entries.status;
419
420
  // Last entry is ignored, but we should still see the previous ones.
421
4
  ASSERT_EQ(expected_entries, read_entries.entries.size());
422
4
}
423
424
// Tests that the log reader reads up until some truncated entry is found.
425
// It should still return OK, since on a crash, it's acceptable to have
426
// a partial entry at EOF.
427
1
TEST_F(LogTest, TestTruncateLogInEntry) {
428
1
  DoCorruptionTest(TRUNCATE_FILE, IN_ENTRY, Status::OK(), 3);
429
1
}
430
431
// Same, but truncate in the middle of the header of that entry.
432
1
TEST_F(LogTest, TestTruncateLogInHeader) {
433
1
  DoCorruptionTest(TRUNCATE_FILE, IN_HEADER, Status::OK(), 3);
434
1
}
435
436
// Similar to the above, except flips a byte. In this case, it should return
437
// a Corruption instead of an OK, because we still have a valid footer in
438
// the file (indicating that all of the entries should be valid as well).
439
1
TEST_F(LogTest, TestCorruptLogInEntry) {
440
1
  DoCorruptionTest(FLIP_BYTE, IN_ENTRY, STATUS(Corruption, ""), 3);
441
1
}
442
443
// Same, but corrupt in the middle of the header of that entry.
444
1
TEST_F(LogTest, TestCorruptLogInHeader) {
445
1
  DoCorruptionTest(FLIP_BYTE, IN_HEADER, STATUS(Corruption, ""), 3);
446
1
}
447
448
// Tests log metrics for WAL files size
449
1
TEST_F(LogTest, TestLogMetrics) {
450
1
  BuildLog();
451
// Set a small segment size so that we have roll overs.
452
1
  log_->SetMaxSegmentSizeForTests(990);
453
1
  const int kNumEntriesPerBatch = 100;
454
455
1
  OpIdPB op_id = MakeOpId(1, 1);
456
457
1
  SegmentSequence segments;
458
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
459
1
  ASSERT_EQ(segments.size(), 1);
460
461
275
  while (segments.size() < 3) {
462
274
    ASSERT_OK(AppendNoOps(&op_id, kNumEntriesPerBatch));
463
    // Update the segments
464
274
    ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
465
274
  }
466
467
1
  ASSERT_OK(log_->Close());
468
469
1
  int64_t wal_size_old = log_->metrics_->wal_size->value();
470
1
  BuildLog();
471
1
  int64_t wal_size_new = log_->metrics_->wal_size->value();
472
473
1
  ASSERT_EQ(wal_size_old, wal_size_new);
474
1
}
475
476
// Tests that segments roll over when max segment size is reached
477
// and that the player plays all entries in the correct order.
478
1
TEST_F(LogTest, TestSegmentRollover) {
479
1
  BuildLog();
480
  // Set a small segment size so that we have roll overs.
481
1
  log_->SetMaxSegmentSizeForTests(990);
482
1
  const int kNumEntriesPerBatch = 100;
483
484
1
  OpIdPB op_id = MakeOpId(1, 1);
485
1
  int num_entries = 0;
486
487
1
  SegmentSequence segments;
488
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
489
490
275
  while (segments.size() < 3) {
491
274
    ASSERT_OK(AppendNoOps(&op_id, kNumEntriesPerBatch));
492
274
    num_entries += kNumEntriesPerBatch;
493
    // Update the segments
494
274
    ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
495
274
  }
496
497
1
  ASSERT_FALSE(segments.back()->HasFooter());
498
1
  ASSERT_OK(log_->Close());
499
500
1
  std::unique_ptr<LogReader> reader;
501
1
  ASSERT_OK(LogReader::Open(
502
1
      fs_manager_->env(), nullptr, "Log reader: ", tablet_wal_path_, nullptr, nullptr, &reader));
503
1
  ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
504
505
1
  ASSERT_TRUE(segments.back()->HasFooter());
506
507
1
  size_t total_read = 0;
508
5
  for (const scoped_refptr<ReadableLogSegment>& entry : segments) {
509
5
    auto read_entries = entry->ReadEntries();
510
5
    if (!read_entries.status.ok()) {
511
0
      FAIL() << "Failed to read entries in segment: " << entry->path()
512
0
          << ". Status: " << read_entries.status
513
0
          << ".\nSegments: " << DumpSegmentsToString(segments);
514
0
    }
515
5
    total_read += read_entries.entries.size();
516
5
  }
517
518
1
  ASSERT_EQ(num_entries, total_read);
519
1
}
520
521
1
TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
522
1
  const int kNumEntries = 4;
523
1
  BuildLog();
524
525
1
  SegmentSequence segments;
526
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
527
1
  ASSERT_EQ(segments.size(), 1);
528
1
  scoped_refptr<ReadableLogSegment> readable_segment = segments[0];
529
530
1
  auto header_size = log_->active_segment_->written_offset();
531
1
  ASSERT_GT(header_size, 0);
532
1
  readable_segment->UpdateReadableToOffset(header_size);
533
534
  // Reading the readable segment now should return OK but yield no
535
  // entries.
536
1
  auto read_entries = readable_segment->ReadEntries();
537
1
  ASSERT_OK(read_entries.status);
538
1
  ASSERT_EQ(read_entries.entries.size(), 0);
539
540
  // Dummy add_entry to help us estimate the size of what
541
  // gets written to disk.
542
1
  LogEntryBatchPB batch;
543
1
  OpIdPB op_id = MakeOpId(1, 1);
544
1
  batch.set_mono_time(1);
545
1
  LogEntryPB* log_entry = batch.add_entry();
546
1
  log_entry->set_type(REPLICATE);
547
1
  ReplicateMsg* repl = log_entry->mutable_replicate();
548
1
  repl->mutable_id()->CopyFrom(op_id);
549
1
  repl->set_op_type(NO_OP);
550
1
  repl->set_hybrid_time(0L);
551
552
  // Entries are prefixed with a header.
553
1
  auto single_entry_size = batch.ByteSize() + kEntryHeaderSize;
554
555
1
  ssize_t written_entries_size = header_size;
556
1
  ASSERT_OK(AppendNoOps(&op_id, kNumEntries, &written_entries_size));
557
1
  ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset());
558
1
  ASSERT_EQ(single_entry_size * kNumEntries, written_entries_size - header_size);
559
560
  // Updating the readable segment with the offset of the first entry should
561
  // make it read a single entry even though there are several in the log.
562
1
  readable_segment->UpdateReadableToOffset(header_size + single_entry_size);
563
1
  read_entries = readable_segment->ReadEntries();
564
1
  ASSERT_OK(read_entries.status);
565
1
  ASSERT_EQ(read_entries.entries.size(), 1);
566
567
  // Now append another entry so that the Log sets the correct readable offset
568
  // on the reader.
569
1
  ASSERT_OK(AppendNoOps(&op_id, 1, &written_entries_size));
570
571
  // Now the reader should be able to read all 5 entries.
572
1
  read_entries = readable_segment->ReadEntries();
573
1
  ASSERT_OK(read_entries.status);
574
1
  ASSERT_EQ(read_entries.entries.size(), 5);
575
576
  // Offset should get updated for an additional entry.
577
1
  ASSERT_EQ(single_entry_size * (kNumEntries + 1) + header_size,
578
1
            written_entries_size);
579
1
  ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset());
580
581
  // When we roll it should go back to the header size.
582
1
  ASSERT_OK(log_->AllocateSegmentAndRollOver());
583
1
  ASSERT_EQ(header_size, log_->active_segment_->written_offset());
584
1
  written_entries_size = header_size;
585
586
  // Now that we closed the original segment. If we get a segment from the reader
587
  // again, we should get one with a footer and we should be able to read all entries.
588
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
589
1
  ASSERT_EQ(segments.size(), 2);
590
1
  readable_segment = segments[0];
591
1
  read_entries = readable_segment->ReadEntries();
592
1
  ASSERT_OK(read_entries.status);
593
1
  ASSERT_EQ(read_entries.entries.size(), 5);
594
595
  // Offset should get updated for an additional entry, again.
596
1
  ASSERT_OK(AppendNoOp(&op_id, &written_entries_size));
597
1
  ASSERT_EQ(single_entry_size  + header_size, written_entries_size);
598
1
  ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset());
599
1
}
600
601
// Tests that segments can be GC'd while the log is running.
602
1
TEST_F(LogTest, TestGCWithLogRunning) {
603
1
  BuildLog();
604
605
1
  vector<LogAnchor*> anchors;
606
1
  ElementDeleter deleter(&anchors);
607
608
1
  SegmentSequence segments;
609
610
1
  const int kNumTotalSegments = 4;
611
1
  const int kNumOpsPerSegment = 5;
612
1
  int num_gced_segments;
613
1
  OpIdPB op_id = MakeOpId(1, 1);
614
1
  int64_t anchored_index = -1;
615
616
1
  ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment,
617
1
                                              &op_id, &anchors));
618
619
  // We should get 4 anchors, each pointing at the beginning of a new segment
620
1
  ASSERT_EQ(anchors.size(), 4);
621
622
  // Anchors should prevent GC.
623
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
624
2
  ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments);
625
1
  ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index));
626
1
  ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
627
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
628
2
  ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments);
629
630
  // Freeing the first 2 anchors should allow GC of them.
631
1
  ASSERT_OK(log_anchor_registry_->Unregister(anchors[0]));
632
1
  ASSERT_OK(log_anchor_registry_->Unregister(anchors[1]));
633
1
  ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index));
634
  // We should now be anchored on op 0.11, i.e. on the 3rd segment
635
1
  ASSERT_EQ(anchors[2]->log_index, anchored_index);
636
637
  // However, first, we'll try bumping the min retention threshold and
638
  // verify that we don't GC any.
639
1
  {
640
1
    google::FlagSaver saver;
641
1
    FLAGS_log_min_segments_to_retain = 10;
642
1
    ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
643
1
    ASSERT_EQ(0, num_gced_segments);
644
1
  }
645
646
  // Try again without the modified flag.
647
1
  ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
648
2
  ASSERT_EQ(2, num_gced_segments) << DumpSegmentsToString(segments);
649
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
650
2
  ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments);
651
652
  // Release the remaining "rolled segment" anchor. GC will not delete the
653
  // last rolled segment.
654
1
  ASSERT_OK(log_anchor_registry_->Unregister(anchors[2]));
655
1
  ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index));
656
1
  ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
657
2
  ASSERT_EQ(0, num_gced_segments) << DumpSegmentsToString(segments);
658
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
659
2
  ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments);
660
661
  // Check that we get a NotFound if we try to read before the GCed point.
662
1
  {
663
1
    ReplicateMsgs repls;
664
1
    int64_t starting_op_segment_seq_num;
665
1
    yb::SchemaPB schema;
666
1
    uint32_t schema_version;
667
1
    Status s = log_->GetLogReader()->ReadReplicatesInRange(
668
1
      1, 2, LogReader::kNoSizeLimit, &repls, &starting_op_segment_seq_num,
669
1
        &schema, &schema_version);
670
2
    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
671
1
  }
672
673
1
  ASSERT_OK(log_->Close());
674
1
  CheckRightNumberOfSegmentFiles(2);
675
676
  // We skip the first three, since we unregistered them above.
677
2
  for (int i = 3; i < kNumTotalSegments; i++) {
678
1
    ASSERT_OK(log_anchor_registry_->Unregister(anchors[i]));
679
1
  }
680
1
}
681
682
// Test that, when we are set to retain a given number of log segments,
683
// we also retain any relevant log index chunks, even if those operations
684
// are not necessary for recovery.
685
1
TEST_F(LogTest, TestGCOfIndexChunks) {
686
1
  FLAGS_log_min_segments_to_retain = 4;
687
1
  BuildLog();
688
689
  // Append some segments which cross from one index chunk into another.
690
  // 999990-999994        \___ the first index
691
  // 999995-999999        /    chunk points to these
692
  // 1000000-100004       \_
693
  // 1000005-100009        _|- the second index chunk points to these
694
  // 1000010-<still open> /
695
1
  const int kNumTotalSegments = 5;
696
1
  const int kNumOpsPerSegment = 5;
697
1
  OpIdPB op_id = MakeOpId(1, 999990);
698
1
  ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment,
699
1
                                              &op_id, nullptr));
700
701
  // Run a GC on an op in the second index chunk. We should remove only the
702
  // earliest segment, because we are set to retain 4.
703
1
  int num_gced_segments = 0;
704
1
  ASSERT_OK(log_->GC(1000006, &num_gced_segments));
705
1
  ASSERT_EQ(1, num_gced_segments);
706
707
  // And we should still be able to read ops in the retained segment, even though
708
  // the GC index was higher.
709
1
  auto loaded_op = ASSERT_RESULT(log_->GetLogReader()->LookupOpId(999995));
710
1
  ASSERT_EQ(yb::OpId(1, 999995), loaded_op);
711
712
  // If we drop the retention count down to 1, we can now GC, and the log index
713
  // chunk should also be GCed.
714
1
  FLAGS_log_min_segments_to_retain = 1;
715
1
  ASSERT_OK(log_->GC(1000003, &num_gced_segments));
716
1
  ASSERT_EQ(1, num_gced_segments);
717
718
1
  auto result = log_->GetLogReader()->LookupOpId(999995);
719
// This test relies on kEntriesPerIndexChunk being 1000000, and that's no longer
720
// the case after D1719 (2fe27d886390038bc734ea28638a1b1435e7d0d4) on Mac.
721
#if !defined(__APPLE__)
722
  ASSERT_TRUE(!result.ok() && result.status().IsNotFound()) << "unexpected status: " << result;
723
#endif
724
1
}
725
726
// Tests that we can append FLUSH_MARKER messages to the log queue to make sure
727
// all messages up to a certain point were fsync()ed without actually
728
// writing them to the log.
729
1
TEST_F(LogTest, TestWaitUntilAllFlushed) {
730
1
  BuildLog();
731
  // Append 2 replicate pairs asynchronously
732
1
  AppendReplicateBatchToLog(2, AppendSync::kTrue);
733
734
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
735
736
  // Make sure we only get 4 entries back and that no FLUSH_MARKER commit is found.
737
1
  vector<scoped_refptr<ReadableLogSegment> > segments;
738
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
739
740
1
  auto read_entries = segments[0]->ReadEntries();
741
1
  ASSERT_OK(read_entries.status);
742
1
  ASSERT_EQ(read_entries.entries.size(), 2);
743
3
  for (size_t i = 0; i < read_entries.entries.size(); i++) {
744
2
    ASSERT_TRUE(read_entries.entries[i]->has_replicate());
745
2
  }
746
1
}
747
748
// Tests log reopening and that GC'ing the old log's segments works.
749
1
TEST_F(LogTest, TestLogReopenAndGC) {
750
1
  BuildLog();
751
752
1
  SegmentSequence segments;
753
754
1
  vector<LogAnchor*> anchors;
755
1
  ElementDeleter deleter(&anchors);
756
757
1
  const int kNumTotalSegments = 3;
758
1
  const int kNumOpsPerSegment = 5;
759
1
  int num_gced_segments;
760
1
  OpIdPB op_id = MakeOpId(1, 1);
761
1
  int64_t anchored_index = -1;
762
763
1
  ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment,
764
1
                                              &op_id, &anchors));
765
  // Anchors should prevent GC.
766
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
767
1
  ASSERT_EQ(3, segments.size());
768
1
  ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index));
769
1
  ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
770
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
771
1
  ASSERT_EQ(3, segments.size());
772
773
1
  ASSERT_OK(log_->Close());
774
775
  // Now reopen the log as if we had replayed the state into the stores.
776
  // that were in memory and do GC.
777
1
  BuildLog();
778
779
  // The "old" data consists of 3 segments. We still hold anchors.
780
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
781
1
  ASSERT_EQ(4, segments.size());
782
783
  // Write to a new log segment, as if we had taken new requests and the
784
  // mem stores are holding anchors, but don't roll it.
785
1
  CreateAndRegisterNewAnchor(op_id.index(), &anchors);
786
1
  ASSERT_OK(AppendNoOps(&op_id, kNumOpsPerSegment));
787
788
  // Now release the "old" anchors and GC them.
789
4
  for (int i = 0; i < 3; i++) {
790
3
    ASSERT_OK(log_anchor_registry_->Unregister(anchors[i]));
791
3
  }
792
1
  ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index));
793
794
  // If we set the min_seconds_to_retain high, then we'll retain the logs even
795
  // though we could GC them based on our anchoring.
796
1
  FLAGS_log_min_seconds_to_retain = 500;
797
1
  ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
798
1
  ASSERT_EQ(0, num_gced_segments);
799
800
  // Turn off the time-based retention and try GCing again. This time
801
  // we should succeed.
802
1
  FLAGS_log_min_seconds_to_retain = 0;
803
1
  log_->set_wal_retention_secs(0);
804
1
  ASSERT_OK(log_->GC(anchored_index, &num_gced_segments));
805
1
  ASSERT_EQ(2, num_gced_segments);
806
807
  // After GC there should be only one left, besides the one currently being
808
  // written to. That is because min_segments_to_retain defaults to 2.
809
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
810
2
  ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments);
811
1
  ASSERT_OK(log_->Close());
812
813
1
  CheckRightNumberOfSegmentFiles(2);
814
815
  // Unregister the final anchor.
816
1
  ASSERT_OK(log_anchor_registry_->Unregister(anchors[3]));
817
1
}
818
819
// Helper to measure the performance of the log.
820
1
TEST_F(LogTest, TestWriteManyBatches) {
821
1
  uint64_t num_batches = 10;
822
1
  if (AllowSlowTests()) {
823
0
    num_batches = FLAGS_num_batches;
824
0
  }
825
1
  BuildLog();
826
827
1
  LOG(INFO)<< "Starting to write " << num_batches << " to log";
828
1
  LOG_TIMING(INFO, "Wrote all batches to log") {
829
1
    AppendReplicateBatchToLog(num_batches);
830
1
  }
831
1
  ASSERT_OK(log_->Close());
832
1
  LOG(INFO) << "Done writing";
833
834
1
  LOG_TIMING(INFO, "Read all entries from Log") {
835
1
    LOG(INFO) << "Starting to read log";
836
1
    uint32_t num_entries = 0;
837
838
1
    std::unique_ptr<LogReader> reader;
839
1
    ASSERT_OK(LogReader::Open(
840
1
        fs_manager_->env(), /* index= */ nullptr, "Log reader: ", tablet_wal_path_,
841
1
        /* table_metric_entity= */ nullptr, /* tablet_metric_entity= */ nullptr, &reader));
842
843
1
    std::vector<scoped_refptr<ReadableLogSegment> > segments;
844
1
    ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
845
846
1
    for (const scoped_refptr<ReadableLogSegment>& entry : segments) {
847
1
      auto read_entries = entry->ReadEntries();
848
1
      ASSERT_OK(read_entries.status);
849
1
      num_entries += read_entries.entries.size();
850
1
    }
851
1
    ASSERT_EQ(num_entries, num_batches);
852
1
    LOG(INFO) << "End readfile";
853
1
  }
854
1
}
855
856
// This tests that querying LogReader works.
857
// This sets up a reader with some segments to query which amount to the
858
// following:
859
// seg002: 0.10 through 0.19
860
// seg003: 0.20 through 0.29
861
// seg004: 0.30 through 0.39
862
1
TEST_F(LogTest, TestLogReader) {
863
1
  LogReader reader(fs_manager_->env(),
864
1
                   scoped_refptr<LogIndex>(),
865
1
                   "Log reader: ",
866
1
                   nullptr,
867
1
                   nullptr);
868
1
  ASSERT_OK(reader.InitEmptyReaderForTests());
869
1
  ASSERT_OK(AppendNewEmptySegmentToReader(2, 10, &reader));
870
1
  ASSERT_OK(AppendNewEmptySegmentToReader(3, 20, &reader));
871
1
  ASSERT_OK(AppendNewEmptySegmentToReader(4, 30, &reader));
872
873
1
  OpIdPB op;
874
1
  op.set_term(0);
875
1
  SegmentSequence segments;
876
877
  // Queries for segment prefixes (used for GC)
878
879
  // Asking the reader the prefix of segments that does not include op 1
880
  // should return the empty set.
881
1
  ASSERT_OK(reader.GetSegmentPrefixNotIncluding(1, &segments));
882
1
  ASSERT_TRUE(segments.empty());
883
884
  // .. same for op 10
885
1
  ASSERT_OK(reader.GetSegmentPrefixNotIncluding(10, &segments));
886
1
  ASSERT_TRUE(segments.empty());
887
888
  // Asking for the prefix of segments not including op 20 should return
889
  // the first segment, since 20 is the first operation in segment 3.
890
1
  ASSERT_OK(reader.GetSegmentPrefixNotIncluding(20, &segments));
891
1
  ASSERT_EQ(segments.size(), 1);
892
1
  ASSERT_EQ(segments[0]->header().sequence_number(), 2);
893
894
  // Asking for 30 should include the first two.
895
1
  ASSERT_OK(reader.GetSegmentPrefixNotIncluding(30, &segments));
896
1
  ASSERT_EQ(segments.size(), 2);
897
1
  ASSERT_EQ(segments[0]->header().sequence_number(), 2);
898
1
  ASSERT_EQ(segments[1]->header().sequence_number(), 3);
899
900
  // Asking for anything higher should return all segments.
901
1
  ASSERT_OK(reader.GetSegmentPrefixNotIncluding(1000, &segments));
902
1
  ASSERT_EQ(segments.size(), 3);
903
1
  ASSERT_EQ(segments[0]->header().sequence_number(), 2);
904
1
  ASSERT_EQ(segments[1]->header().sequence_number(), 3);
905
906
  // Queries for specific segment sequence numbers.
907
1
  scoped_refptr<ReadableLogSegment> segment = reader.GetSegmentBySequenceNumber(2);
908
1
  ASSERT_EQ(2, segment->header().sequence_number());
909
1
  segment = reader.GetSegmentBySequenceNumber(3);
910
1
  ASSERT_EQ(3, segment->header().sequence_number());
911
912
1
  segment = reader.GetSegmentBySequenceNumber(4);
913
1
  ASSERT_EQ(4, segment->header().sequence_number());
914
915
1
  segment = reader.GetSegmentBySequenceNumber(5);
916
1
  ASSERT_TRUE(segment.get() == nullptr);
917
1
}
918
919
// Test that, even if the LogReader's index is empty because no segments
920
// have been properly closed, we can still read the entries as the reader
921
// returns the current segment.
922
1
TEST_F(LogTest, TestLogReaderReturnsLatestSegmentIfIndexEmpty) {
923
1
  BuildLog();
924
925
1
  AppendReplicateBatch({
926
1
    .op_id = {1, 1},
927
1
    .committed_op_id = {0, 0},
928
1
    .writes = {},
929
1
    .sync = AppendSync::kTrue,
930
1
  });
931
932
1
  SegmentSequence segments;
933
1
  ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
934
1
  ASSERT_EQ(segments.size(), 1);
935
936
1
  auto read_entries = segments[0]->ReadEntries();
937
1
  ASSERT_OK(read_entries.status);
938
1
  ASSERT_EQ(1, read_entries.entries.size());
939
1
}
940
941
1
TEST_F(LogTest, TestOpIdUtils) {
942
1
  OpIdPB id = MakeOpId(1, 2);
943
1
  ASSERT_EQ("1.2", consensus::OpIdToString(id));
944
1
  ASSERT_EQ(1, id.term());
945
1
  ASSERT_EQ(2, id.index());
946
1
}
947
948
53
std::ostream& operator<<(std::ostream& os, const TestLogSequenceElem& elem) {
949
53
  switch (elem.type) {
950
3
    case TestLogSequenceElem::ROLL:
951
3
      os << "ROLL";
952
3
      break;
953
50
    case TestLogSequenceElem::REPLICATE:
954
50
      os << "R" << elem.id;
955
50
      break;
956
53
  }
957
53
  return os;
958
53
}
959
960
// Generates a plausible sequence of items in the log, including term changes, moving the
961
// index backwards, log rolls, etc.
962
//
963
// NOTE: this log sequence may contain some aberrations which would not occur in a real
964
// consensus log, but our API supports them. In the future we may want to add assertions
965
// to the Log implementation that prevent such aberrations, in which case we'd need to
966
// modify this.
967
void LogTest::GenerateTestSequence(size_t seq_len,
968
                                   vector<TestLogSequenceElem>* ops,
969
1
                                   vector<int64_t>* terms_by_index) {
970
1
  auto rng = &ThreadLocalRandom();
971
1
  terms_by_index->assign(seq_len + 1, -1);
972
1
  int64_t committed_index = 0;
973
1
  int64_t max_repl_index = 0;
974
975
1
  OpIdPB id = MakeOpId(1, 0);
976
51
  for (size_t i = 0; i < seq_len; i++) {
977
50
    if (RandomUniformInt(0, 4, rng) == 0) {
978
      // Reset term - it may stay the same, or go up/down
979
12
      id.set_term(std::max(static_cast<int64_t>(1), id.term() + RandomUniformInt(0, 4, rng) - 2));
980
12
    }
981
982
    // Advance index by exactly one
983
50
    id.set_index(id.index() + 1);
984
985
50
    if (RandomUniformInt(0, 4, rng) == 0) {
986
      // Move index backward a bit, but not past the committed index
987
8
      id.set_index(std::max(committed_index + 1, id.index() - RandomUniformInt(0, 4, rng)));
988
8
    }
989
990
    // Roll the log sometimes
991
50
    if (i != 0 && RandomUniformInt(0, 14, rng) == 0) {
992
3
      TestLogSequenceElem op;
993
3
      op.type = TestLogSequenceElem::ROLL;
994
3
      ops->push_back(op);
995
3
    }
996
997
50
    TestLogSequenceElem op;
998
50
    op.type = TestLogSequenceElem::REPLICATE;
999
50
    op.id = id;
1000
50
    ops->push_back(op);
1001
50
    (*terms_by_index)[id.index()] = id.term();
1002
50
    max_repl_index = std::max(max_repl_index, id.index());
1003
50
  }
1004
1
  terms_by_index->resize(max_repl_index + 1);
1005
1
}
1006
1007
1
void LogTest::AppendTestSequence(const vector<TestLogSequenceElem>& seq) {
1008
53
  for (const TestLogSequenceElem& e : seq) {
1009
0
    VLOG(1) << "Appending: " << e;
1010
53
    switch (e.type) {
1011
50
      case TestLogSequenceElem::REPLICATE:
1012
50
      {
1013
50
        OpIdPB id(e.id);
1014
50
        ASSERT_OK(AppendNoOp(&id));
1015
50
        break;
1016
50
      }
1017
3
      case TestLogSequenceElem::ROLL:
1018
3
      {
1019
3
        ASSERT_OK(RollLog());
1020
3
      }
1021
53
    }
1022
53
  }
1023
1
}
1024
1025
// Test that if multiple REPLICATE entries are written for the same index,
1026
// that we read the latest one.
1027
//
1028
// This is a randomized test: we generate a plausible sequence of log messages,
1029
// write it out, and then read random ranges of log indexes, making sure we
1030
// always see the correct term for each REPLICATE message (i.e whichever term
1031
// was the last to append it).
1032
1
TEST_F(LogTest, TestReadLogWithReplacedReplicates) {
1033
1
  const int kSequenceLength = AllowSlowTests() ? 1000 : 50;
1034
1035
1
  vector<int64_t> terms_by_index;
1036
1
  vector<TestLogSequenceElem> seq;
1037
1
  GenerateTestSequence(kSequenceLength, &seq, &terms_by_index);
1038
1
  LOG(INFO) << "test sequence: " << seq;
1039
1
  const int64_t max_repl_index = terms_by_index.size() - 1;
1040
1
  LOG(INFO) << "max_repl_index: " << max_repl_index;
1041
1042
  // Write the test sequence to the log.
1043
  // TODO: should consider adding batching here of multiple replicates
1044
1
  BuildLog();
1045
1
  AppendTestSequence(seq);
1046
1047
1
  const int kNumRandomReads = 100;
1048
1049
  // We'll advance 'gc_index' randomly through the log until we've gotten to
1050
  // the end. This ensures that, when we GC, we don't ever remove the latest
1051
  // version of a replicate message unintentionally.
1052
1
  LogReader* reader = log_->GetLogReader();
1053
10
  for (int gc_index = 1; gc_index < max_repl_index;) {
1054
9
    SCOPED_TRACE(Substitute("after GCing $0", gc_index));
1055
1056
    // Test reading random ranges of indexes and verifying that we get back the
1057
    // REPLICATE messages with the correct terms
1058
909
    for (int random_read = 0; random_read < kNumRandomReads; random_read++) {
1059
900
      auto start_index = RandomUniformInt<int64_t>(gc_index, max_repl_index - 1);
1060
900
      auto end_index = RandomUniformInt<int64_t>(start_index, max_repl_index);
1061
900
      int64_t starting_op_segment_seq_num;
1062
900
      yb::SchemaPB schema;
1063
900
      uint32_t schema_version;
1064
900
      {
1065
900
        SCOPED_TRACE(Substitute("Reading $0-$1", start_index, end_index));
1066
900
        consensus::ReplicateMsgs repls;
1067
900
        ASSERT_OK(reader->ReadReplicatesInRange(start_index, end_index,
1068
900
                                                LogReader::kNoSizeLimit, &repls,
1069
900
                                                &starting_op_segment_seq_num,
1070
900
                                                &schema, &schema_version));
1071
900
        ASSERT_EQ(end_index - start_index + 1, repls.size());
1072
900
        auto expected_index = start_index;
1073
5.81k
        for (const auto& repl : repls) {
1074
5.81k
          ASSERT_EQ(expected_index, repl->id().index());
1075
5.81k
          ASSERT_EQ(terms_by_index[expected_index], repl->id().term());
1076
5.81k
          expected_index++;
1077
5.81k
        }
1078
900
      }
1079
1080
900
      int64_t bytes_read = log_->reader_->bytes_read_->value();
1081
900
      int64_t entries_read = log_->reader_->entries_read_->value();
1082
900
      int64_t read_batch_count = log_->reader_->read_batch_latency_->TotalCount();
1083
900
      EXPECT_GT(log_->reader_->bytes_read_->value(), 0);
1084
900
      EXPECT_GT(log_->reader_->entries_read_->value(), 0);
1085
900
      EXPECT_GT(log_->reader_->read_batch_latency_->TotalCount(), 0);
1086
1087
      // Test a size-limited read.
1088
900
      int size_limit = RandomUniformInt(1, 1000);
1089
900
      {
1090
900
        SCOPED_TRACE(Substitute("Reading $0-$1 with size limit $2",
1091
900
                                start_index, end_index, size_limit));
1092
900
        ReplicateMsgs repls;
1093
900
        ASSERT_OK(reader->ReadReplicatesInRange(start_index, end_index, size_limit, &repls,
1094
900
                                                &starting_op_segment_seq_num,
1095
900
                                                &schema, &schema_version));
1096
900
        ASSERT_LE(repls.size(), end_index - start_index + 1);
1097
900
        int total_size = 0;
1098
900
        auto expected_index = start_index;
1099
1.87k
        for (const auto& repl : repls) {
1100
1.87k
          ASSERT_EQ(expected_index, repl->id().index());
1101
1.87k
          ASSERT_EQ(terms_by_index[expected_index], repl->id().term());
1102
1.87k
          expected_index++;
1103
1.87k
          total_size += repl->SpaceUsed();
1104
1.87k
        }
1105
900
        if (total_size > size_limit) {
1106
155
          ASSERT_EQ(1, repls.size());
1107
745
        } else {
1108
745
          ASSERT_LE(total_size, size_limit);
1109
745
        }
1110
900
      }
1111
1112
900
      EXPECT_GT(log_->reader_->bytes_read_->value(), bytes_read);
1113
900
      EXPECT_GT(log_->reader_->entries_read_->value(), entries_read);
1114
900
      EXPECT_GT(log_->reader_->read_batch_latency_->TotalCount(), read_batch_count);
1115
900
    }
1116
1117
9
    int num_gced = 0;
1118
9
    ASSERT_OK(log_->GC(gc_index, &num_gced));
1119
9
    gc_index += RandomUniformInt(0, 9);
1120
9
  }
1121
1
}
1122
1123
// Ensure that we can read replicate messages from the LogReader with a very
1124
// high (> 32 bit) log index and term. Regression test for KUDU-1933.
1125
1
TEST_F(LogTest, TestReadReplicatesHighIndex) {
1126
1
  const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3;
1127
1
  const int kSequenceLength = 10;
1128
1129
1
  BuildLog();
1130
1
  OpIdPB op_id;
1131
1
  op_id.set_term(first_log_index);
1132
1
  op_id.set_index(first_log_index);
1133
1
  ASSERT_OK(AppendNoOps(&op_id, kSequenceLength));
1134
1135
1
  auto* reader = log_->GetLogReader();
1136
1
  ReplicateMsgs repls;
1137
1
  int64_t starting_op_segment_seq_num;
1138
1
  yb::SchemaPB schema;
1139
1
  uint32_t schema_version;
1140
1
  ASSERT_OK(reader->ReadReplicatesInRange(first_log_index, first_log_index + kSequenceLength - 1,
1141
1
                                          LogReader::kNoSizeLimit, &repls,
1142
1
                                          &starting_op_segment_seq_num,
1143
1
                                          &schema, &schema_version));
1144
1
  ASSERT_EQ(kSequenceLength, repls.size());
1145
1
}
1146
1147
1
TEST_F(LogTest, AllocateSegmentAndRollOver) {
1148
1
  constexpr auto kNumIters = 10;
1149
1150
  // Big enough to not trigger automated rollover and test manual one.
1151
1
  options_.segment_size_bytes = 1_MB;
1152
1153
1
  BuildLog();
1154
1155
1
  ASSERT_EQ(log_->num_segments(), 1);
1156
1
  ASSERT_EQ(log_->active_segment_sequence_number(), 1);
1157
1158
11
  for (auto i = 0; i < kNumIters; ++i) {
1159
10
    AppendReplicateBatchToLog(1, AppendSync::kTrue);
1160
10
    ASSERT_OK(log_->AllocateSegmentAndRollOver());
1161
10
  }
1162
1163
1
  ASSERT_EQ(log_->num_segments(), kNumIters + 1);
1164
1
  ASSERT_EQ(log_->active_segment_sequence_number(), kNumIters + 1);
1165
1166
1
  ASSERT_OK(log_->Close());
1167
1
}
1168
1169
1
TEST_F(LogTest, ConcurrentAllocateSegmentAndRollOver) {
1170
1
  constexpr auto kNumBatches = 10;
1171
1
  constexpr auto kNumEntriesPerBatch = 10;
1172
1173
  // Trigger rollover aggressively during normal append.
1174
1
  options_.segment_size_bytes = 1;
1175
1176
1
  BuildLog();
1177
1178
11
  for (auto i = 0; i < kNumBatches; ++i) {
1179
10
    AppendReplicateBatchToLog(kNumEntriesPerBatch, AppendSync::kFalse);
1180
10
    ASSERT_OK(log_->AllocateSegmentAndRollOver());
1181
10
  }
1182
1183
1
  LOG(INFO) << "Log segments: " << log_->num_segments();
1184
1
  ASSERT_GE(log_->num_segments(), kNumBatches);
1185
1186
1
  ASSERT_OK(log_->Close());
1187
1
}
1188
1189
2
Result<std::vector<OpId>> LogTest::AppendAndCopy(size_t num_batches, size_t num_entries_per_batch) {
1190
2
  std::vector<OpId> last_op_id_before_copy;
1191
2
  last_op_id_before_copy.reserve(num_batches);
1192
32
  for (size_t i = 0; i < num_batches; ++i) {
1193
30
    AppendReplicateBatchToLog(
1194
15
        num_entries_per_batch, i % 2 == 0 ? AppendSync::kFalse : AppendSync::kTrue);
1195
30
    last_op_id_before_copy.push_back(log_->GetLatestEntryOpId());
1196
30
    RETURN_NOT_OK(log_->CopyTo(GetLogCopyPath(i)));
1197
30
  }
1198
2
  return last_op_id_before_copy;
1199
2
}
1200
1201
Result<SegmentSequence> LogTest::GetSegmentsFromLogCopyAndCheckLastOpIndex(
1202
30
    const size_t copy_idx, const int64_t last_op_min_idx) {
1203
30
  const auto log_copy_dir = GetLogCopyPath(copy_idx);
1204
30
  std::unique_ptr<LogReader> copied_log_reader;
1205
30
  RETURN_NOT_OK(LogReader::Open(
1206
30
      fs_manager_->env(), make_scoped_refptr<LogIndex>(log_copy_dir), "Log reader: ",
1207
30
      log_copy_dir, /* table_metric_entity = */ nullptr,
1208
30
      /* tablet_metric_entity = */ nullptr, &copied_log_reader));
1209
1210
30
  SegmentSequence copied_segments;
1211
30
  RETURN_NOT_OK(copied_log_reader->GetSegmentsSnapshot(&copied_segments));
1212
1213
30
  SCHECK_GE(
1214
30
      copied_segments.back()->footer().max_replicate_index(), last_op_min_idx, InternalError,
1215
30
      "Last copied operation index should be >= index of last log operation added before calling "
1216
30
      "Log::CopyTo.");
1217
1218
30
  return copied_segments;
1219
30
}
1220
1221
// Verifies CopyTo works in parallel with rollovers triggered by concurrent
1222
// log entries writes.
1223
1
TEST_F(LogTest, CopyTo) {
1224
1
  constexpr auto kNumBatches = 10;
1225
1
  constexpr auto kNumEntriesPerBatch = 10;
1226
1227
  // Trigger rollover aggressively during normal append.
1228
1
  options_.segment_size_bytes = 1;
1229
1230
1
  BuildLog();
1231
1232
1
  auto last_op_id_before_copy = ASSERT_RESULT(AppendAndCopy(kNumBatches, kNumEntriesPerBatch));
1233
1234
1
  SegmentSequence segments;
1235
1
  ASSERT_OK(log_->GetSegmentsSnapshot(&segments));
1236
1237
11
  for (auto i = 0; i < kNumBatches; ++i) {
1238
10
    auto copied_segments = ASSERT_RESULT(
1239
10
        GetSegmentsFromLogCopyAndCheckLastOpIndex(i, last_op_id_before_copy[i].index));
1240
10
    ASSERT_LE(copied_segments.size(), segments.size());
1241
1242
    // Copied log segments should match log segments of the original log.
1243
190
    for (size_t seg_idx = 0; seg_idx < copied_segments.size(); ++seg_idx) {
1244
180
      auto& segment = segments[seg_idx];
1245
180
      auto& segment_copy = copied_segments[seg_idx];
1246
1247
180
      auto entries_result = segment->ReadEntries();
1248
180
      ASSERT_OK(entries_result.status);
1249
180
      auto entries_copy_result = segment_copy->ReadEntries();
1250
180
      ASSERT_OK(entries_copy_result.status);
1251
1252
180
      ASSERT_EQ(entries_copy_result.committed_op_id, entries_result.committed_op_id);
1253
180
      ASSERT_EQ(entries_copy_result.end_offset, entries_result.end_offset);
1254
180
      ASSERT_EQ(entries_copy_result.entry_metadata, entries_result.entry_metadata);
1255
180
      ASSERT_EQ(entries_copy_result.entries.size(), entries_result.entries.size());
1256
730
      for (size_t entry_idx = 0; entry_idx < entries_copy_result.entries.size(); ++entry_idx) {
1257
550
        ASSERT_EQ(
1258
550
            entries_copy_result.entries[entry_idx]->DebugString(),
1259
550
            entries_result.entries[entry_idx]->DebugString());
1260
550
      }
1261
180
    }
1262
10
  }
1263
1264
1
  ASSERT_OK(log_->Close());
1265
1
}
1266
1267
// Verifies CopyTo works in parallel with rollovers triggered by concurrent
1268
// log entries writes and log GC.
1269
1
TEST_F(LogTest, CopyToWithConcurrentGc) {
1270
1
  constexpr auto kNumBatches = 20;
1271
1
  constexpr auto kNumEntriesPerBatch = 10;
1272
1273
  // Trigger rollover aggressively during normal append.
1274
1
  options_.segment_size_bytes = 1;
1275
1276
1
  BuildLog();
1277
1278
1
  log_->set_wal_retention_secs(0);
1279
1
  std::atomic<bool> stop_gc{false};
1280
1
  std::thread gc_thread([log = log_.get(), &stop_gc]{
1281
2.47k
    while (!stop_gc.load()) {
1282
2.47k
      auto gc_index = log->GetLatestEntryOpId().index;
1283
2.47k
      int num_gced = 0;
1284
2.47k
      ASSERT_OK(log->GC(gc_index, &num_gced));
1285
2.47k
    }
1286
1
  });
1287
1288
1
  auto last_op_id_before_copy_result = AppendAndCopy(kNumBatches, kNumEntriesPerBatch);
1289
1
  stop_gc = true;
1290
1
  gc_thread.join();
1291
1
  auto last_op_id_before_copy = ASSERT_RESULT(std::move(last_op_id_before_copy_result));
1292
1293
21
  for (auto i = 0; i < kNumBatches; ++i) {
1294
20
    auto copied_segments = ASSERT_RESULT(
1295
20
        GetSegmentsFromLogCopyAndCheckLastOpIndex(i, last_op_id_before_copy[i].index));
1296
1297
    // Make sure copied log contains a sequence of entries without gaps in index.
1298
20
    int64_t last_index = -1;
1299
123
    for (size_t seg_idx = 0; seg_idx < copied_segments.size(); ++seg_idx) {
1300
103
      auto& segment_copy = copied_segments[seg_idx];
1301
103
      auto entries_copy_result = segment_copy->ReadEntries();
1302
103
      ASSERT_OK(entries_copy_result.status);
1303
1304
291
      for (const auto& entry : entries_copy_result.entries) {
1305
291
        const auto index = entry->replicate().id().index();
1306
291
        if (last_index >= 0) {
1307
271
          ASSERT_EQ(index, last_index + 1);
1308
271
        }
1309
291
        last_index = index;
1310
291
      }
1311
103
    }
1312
20
  }
1313
1314
1
  ASSERT_OK(log_->Close());
1315
1
}
1316
1317
} // namespace log
1318
} // namespace yb