/Users/deen/code/yugabyte-db/src/yb/consensus/log-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <fcntl.h> |
34 | | #include <unistd.h> |
35 | | |
36 | | #include <algorithm> |
37 | | #include <vector> |
38 | | |
39 | | #include <boost/function.hpp> |
40 | | |
41 | | #include <glog/stl_logging.h> |
42 | | |
43 | | #include "yb/common/wire_protocol.h" |
44 | | |
45 | | #include "yb/consensus/log-test-base.h" |
46 | | #include "yb/consensus/log_index.h" |
47 | | #include "yb/consensus/opid_util.h" |
48 | | |
49 | | #include "yb/gutil/stl_util.h" |
50 | | #include "yb/gutil/strings/substitute.h" |
51 | | |
52 | | #include "yb/util/random.h" |
53 | | #include "yb/util/size_literals.h" |
54 | | #include "yb/util/stopwatch.h" |
55 | | |
56 | | DEFINE_int32(num_batches, 10000, |
57 | | "Number of batches to write to/read from the Log in TestWriteManyBatches"); |
58 | | |
59 | | DECLARE_int32(log_min_segments_to_retain); |
60 | | DECLARE_bool(never_fsync); |
61 | | DECLARE_bool(writable_file_use_fsync); |
62 | | DECLARE_int32(o_direct_block_alignment_bytes); |
63 | | DECLARE_int32(o_direct_block_size_bytes); |
64 | | |
65 | | namespace yb { |
66 | | namespace log { |
67 | | |
68 | | using std::shared_ptr; |
69 | | using consensus::MakeOpId; |
70 | | using strings::Substitute; |
71 | | |
72 | | extern const char* kTestTable; |
73 | | extern const char* kTestTablet; |
74 | | |
75 | | struct TestLogSequenceElem { |
76 | | enum ElemType { |
77 | | REPLICATE, |
78 | | ROLL |
79 | | }; |
80 | | ElemType type; |
81 | | OpIdPB id; |
82 | | }; |
83 | | |
84 | | class LogTest : public LogTestBase { |
85 | | public: |
86 | | static constexpr TableType kTableType = TableType::YQL_TABLE_TYPE; |
87 | | |
88 | 8 | void CreateAndRegisterNewAnchor(int64_t log_index, vector<LogAnchor*>* anchors) { |
89 | 8 | anchors->push_back(new LogAnchor()); |
90 | 8 | log_anchor_registry_->Register(log_index, CURRENT_TEST_NAME(), anchors->back()); |
91 | 8 | } |
92 | | |
93 | | // Create a series of NO_OP entries in the log. |
94 | | // Anchor each segment on the first OpId of each log segment, |
95 | | // and update op_id to point to the next valid OpId. |
96 | | Status AppendMultiSegmentSequence( |
97 | 3 | int num_total_segments, int num_ops_per_segment, OpIdPB* op_id, vector<LogAnchor*>* anchors) { |
98 | 3 | CHECK(op_id->IsInitialized()); |
99 | 12 | for (int i = 0; i < num_total_segments - 1; i++) { |
100 | 9 | if (anchors) { |
101 | 5 | CreateAndRegisterNewAnchor(op_id->index(), anchors); |
102 | 5 | } |
103 | 9 | RETURN_NOT_OK(AppendNoOps(op_id, num_ops_per_segment)); |
104 | 9 | RETURN_NOT_OK(RollLog()); |
105 | 9 | } |
106 | | |
107 | 3 | if (anchors) { |
108 | 2 | CreateAndRegisterNewAnchor(op_id->index(), anchors); |
109 | 2 | } |
110 | 3 | RETURN_NOT_OK(AppendNoOps(op_id, num_ops_per_segment)); |
111 | 3 | return Status::OK(); |
112 | 3 | } |
113 | | |
114 | | Status AppendNewEmptySegmentToReader(int sequence_number, |
115 | | int first_repl_index, |
116 | 3 | LogReader* reader) { |
117 | 3 | string fqp = GetTestPath(strings::Substitute("wal-00000000$0", sequence_number)); |
118 | 3 | std::unique_ptr<WritableFile> w_log_seg; |
119 | 3 | RETURN_NOT_OK(fs_manager_->env()->NewWritableFile(fqp, &w_log_seg)); |
120 | 3 | std::unique_ptr<RandomAccessFile> r_log_seg; |
121 | 3 | RETURN_NOT_OK(fs_manager_->env()->NewRandomAccessFile(fqp, &r_log_seg)); |
122 | | |
123 | 3 | scoped_refptr<ReadableLogSegment> readable_segment( |
124 | 3 | new ReadableLogSegment(fqp, shared_ptr<RandomAccessFile>(r_log_seg.release()))); |
125 | | |
126 | 3 | LogSegmentHeaderPB header; |
127 | 3 | header.set_sequence_number(sequence_number); |
128 | 3 | header.set_major_version(0); |
129 | 3 | header.set_minor_version(0); |
130 | 3 | header.set_unused_tablet_id(kTestTablet); |
131 | 3 | SchemaToPB(GetSimpleTestSchema(), header.mutable_unused_schema()); |
132 | | |
133 | 3 | LogSegmentFooterPB footer; |
134 | 3 | footer.set_num_entries(10); |
135 | 3 | footer.set_min_replicate_index(first_repl_index); |
136 | 3 | footer.set_max_replicate_index(first_repl_index + 9); |
137 | | |
138 | 3 | RETURN_NOT_OK(readable_segment->Init(header, footer, 0)); |
139 | 3 | RETURN_NOT_OK(reader->AppendSegment(readable_segment)); |
140 | 3 | return Status::OK(); |
141 | 3 | } |
142 | | |
143 | | void GenerateTestSequence(size_t seq_len, |
144 | | vector<TestLogSequenceElem>* ops, |
145 | | vector<int64_t>* terms_by_index); |
146 | | void AppendTestSequence(const vector<TestLogSequenceElem>& seq); |
147 | | |
148 | | // Where to corrupt the log entry. |
149 | | enum CorruptionPosition { |
150 | | // Corrupt/truncate within the header. |
151 | | IN_HEADER, |
152 | | // Corrupt/truncate within the entry data itself. |
153 | | IN_ENTRY |
154 | | }; |
155 | | |
156 | | void DoCorruptionTest(CorruptionType type, CorruptionPosition place, |
157 | | Status expected_status, int expected_entries); |
158 | | |
159 | | Result<std::vector<OpId>> AppendAndCopy(size_t num_batches, size_t num_entries_per_batch); |
160 | | |
161 | 60 | std::string GetLogCopyPath(size_t copy_idx) { |
162 | 60 | return Format("$0.copy-$1", tablet_wal_path_, copy_idx); |
163 | 60 | } |
164 | | |
165 | | Result<SegmentSequence> GetSegmentsFromLogCopyAndCheckLastOpIndex( |
166 | | size_t copy_idx, int64_t last_op_min_idx); |
167 | | }; |
168 | | |
169 | | // If we write more than one entry in a batch, we should be able to |
170 | | // read all of those entries back. |
171 | 1 | TEST_F(LogTest, TestMultipleEntriesInABatch) { |
172 | 1 | BuildLog(); |
173 | | |
174 | 1 | OpIdPB opid; |
175 | 1 | opid.set_term(1); |
176 | 1 | opid.set_index(1); |
177 | | |
178 | 1 | ASSERT_OK(AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2)); |
179 | | |
180 | | // RollOver() the batch so that we have a properly formed footer. |
181 | 1 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
182 | | |
183 | 1 | SegmentSequence segments; |
184 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
185 | | |
186 | 1 | auto read_entries = segments[0]->ReadEntries(); |
187 | 1 | ASSERT_OK(read_entries.status); |
188 | | |
189 | 1 | ASSERT_EQ(2, read_entries.entries.size()); |
190 | | |
191 | | // Verify the index. |
192 | 1 | { |
193 | 1 | LogIndexEntry entry; |
194 | 1 | ASSERT_OK(log_->log_index_->GetEntry(1, &entry)); |
195 | 1 | ASSERT_EQ(1, entry.op_id.term); |
196 | 1 | ASSERT_EQ(1, entry.segment_sequence_number); |
197 | 1 | int64_t offset = entry.offset_in_segment; |
198 | | |
199 | 1 | ASSERT_OK(log_->log_index_->GetEntry(2, &entry)); |
200 | 1 | ASSERT_EQ(1, entry.op_id.term); |
201 | 1 | ASSERT_EQ(1, entry.segment_sequence_number); |
202 | 1 | int64_t second_offset = entry.offset_in_segment; |
203 | | |
204 | | // The second entry should be at the same offset as the first entry |
205 | | // since they were written in the same batch. |
206 | 1 | ASSERT_EQ(second_offset, offset); |
207 | 1 | } |
208 | | |
209 | | // Test LookupOpId |
210 | 1 | { |
211 | 1 | auto loaded_op = ASSERT_RESULT(log_->GetLogReader()->LookupOpId(1)); |
212 | 1 | ASSERT_EQ(yb::OpId(1, 1), loaded_op); |
213 | 1 | loaded_op = ASSERT_RESULT(log_->GetLogReader()->LookupOpId(2)); |
214 | 1 | ASSERT_EQ(yb::OpId(1, 2), loaded_op); |
215 | 1 | auto result = log_->GetLogReader()->LookupOpId(3); |
216 | 2 | ASSERT_TRUE(!result.ok() && result.status().IsNotFound()) |
217 | 2 | << "unexpected status: " << result.status(); |
218 | 1 | } |
219 | | |
220 | 1 | ASSERT_OK(log_->Close()); |
221 | 1 | } |
222 | | |
223 | | // Tests that everything works properly with fsync enabled: |
224 | | // This also tests SyncDir() (see KUDU-261), which is called whenever |
225 | | // a new log segment is initialized. |
226 | 1 | TEST_F(LogTest, TestFsync) { |
227 | 1 | options_.durable_wal_write = true; |
228 | 1 | BuildLog(); |
229 | | |
230 | 1 | OpIdPB opid; |
231 | 1 | opid.set_term(0); |
232 | 1 | opid.set_index(1); |
233 | | |
234 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
235 | 1 | ASSERT_OK(log_->Close()); |
236 | 1 | } |
237 | | |
238 | | // Tests interval for durable wal write |
239 | 1 | TEST_F(LogTest, TestFsyncInterval) { |
240 | 1 | options_.interval_durable_wal_write = MonoDelta::FromMilliseconds(1); |
241 | 1 | BuildLog(); |
242 | | |
243 | 1 | OpIdPB opid; |
244 | 1 | opid.set_term(0); |
245 | 1 | opid.set_index(1); |
246 | | |
247 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
248 | 1 | SleepFor(MonoDelta::FromMilliseconds(2)); |
249 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
250 | 1 | ASSERT_OK(log_->Close()); |
251 | 1 | } |
252 | | |
253 | | // Tests interval for durable wal write physically |
254 | 1 | TEST_F(LogTest, TestFsyncIntervalPhysical) { |
255 | 1 | int interval = 1; |
256 | 1 | options_.interval_durable_wal_write = MonoDelta::FromMilliseconds(interval); |
257 | 1 | FLAGS_never_fsync = false; |
258 | 1 | FLAGS_durable_wal_write = false; |
259 | 1 | options_.preallocate_segments = false; |
260 | 1 | BuildLog(); |
261 | | |
262 | 1 | OpIdPB opid; |
263 | 1 | opid.set_term(0); |
264 | 1 | opid.set_index(1); |
265 | | |
266 | 1 | SegmentSequence segments; |
267 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
268 | 1 | ASSERT_EQ(segments.size(), 1); |
269 | 1 | int64_t orig_size = segments[0]->file_size(); |
270 | 1 | string fileName = segments.back()->readable_file()->filename(); |
271 | | |
272 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
273 | 1 | SleepFor(MonoDelta::FromMilliseconds(interval + 1)); |
274 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
275 | | |
276 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
277 | 1 | ASSERT_EQ(segments.size(), 1); |
278 | 1 | int64_t new_size = segments[0]->file_size(); |
279 | 1 | ASSERT_GT(new_size, orig_size); |
280 | | |
281 | | #if defined(__linux__) |
282 | | int fd = open(fileName.c_str(), O_RDONLY | O_DIRECT); |
283 | | ASSERT_GE(fd, 0); |
284 | | #elif defined(__APPLE__) |
285 | 1 | int fd = open(fileName.c_str(), O_RDONLY); |
286 | 1 | ASSERT_GE(fd, 0); |
287 | 1 | ASSERT_NE(fcntl(fd, F_NOCACHE, 1), -1); |
288 | 1 | #endif |
289 | 1 | void* temp_buf = nullptr; |
290 | 1 | ASSERT_EQ(posix_memalign(&temp_buf, FLAGS_o_direct_block_alignment_bytes, |
291 | 1 | FLAGS_o_direct_block_size_bytes), 0); |
292 | 1 | ASSERT_GT(pread(fd, temp_buf, FLAGS_o_direct_block_size_bytes, 0), orig_size); |
293 | 1 | ASSERT_OK(log_->Close()); |
294 | 1 | free(temp_buf); |
295 | 1 | } |
296 | | |
297 | | // Tests data size for durable wal write |
298 | 1 | TEST_F(LogTest, TestFsyncDataSize) { |
299 | 1 | options_.bytes_durable_wal_write_mb = 1; |
300 | 1 | options_.interval_durable_wal_write = MonoDelta::FromMilliseconds(10000); |
301 | 1 | BuildLog(); |
302 | | |
303 | 1 | OpIdPB opid; |
304 | 1 | opid.set_term(0); |
305 | 1 | opid.set_index(1); |
306 | | |
307 | 1 | ssize_t size = 0; |
308 | 1 | ASSERT_OK(AppendNoOps(&opid, 100 * 1024, &size)); |
309 | 1 | SleepFor(MonoDelta::FromMilliseconds(1)); |
310 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
311 | 1 | ASSERT_OK(log_->Close()); |
312 | 1 | LOG(INFO)<< "Wrote " << size << " batches to log"; |
313 | 1 | } |
314 | | |
315 | | // Regression test for part of KUDU-735: |
316 | | // if a log is not preallocated, we should properly track its on-disk size as we append to |
317 | | // it. |
318 | 1 | TEST_F(LogTest, TestSizeIsMaintained) { |
319 | 1 | options_.preallocate_segments = false; |
320 | 1 | BuildLog(); |
321 | | |
322 | 1 | OpIdPB opid = MakeOpId(0, 1); |
323 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
324 | | |
325 | 1 | SegmentSequence segments; |
326 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
327 | 1 | int64_t orig_size = segments[0]->file_size(); |
328 | 1 | ASSERT_GT(orig_size, 0); |
329 | | |
330 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
331 | | |
332 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
333 | 1 | int64_t new_size = segments[0]->file_size(); |
334 | 1 | ASSERT_GT(new_size, orig_size); |
335 | | |
336 | 1 | ASSERT_OK(log_->Close()); |
337 | 1 | } |
338 | | |
339 | | // Test that the reader can read from the log even if it hasn't been |
340 | | // properly closed. |
341 | 1 | TEST_F(LogTest, TestLogNotTrimmed) { |
342 | 1 | BuildLog(); |
343 | | |
344 | 1 | OpIdPB opid; |
345 | 1 | opid.set_term(0); |
346 | 1 | opid.set_index(1); |
347 | | |
348 | 1 | ASSERT_OK(AppendNoOp(&opid)); |
349 | | |
350 | 1 | LogEntries entries; |
351 | 1 | SegmentSequence segments; |
352 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
353 | | |
354 | 1 | ASSERT_OK(segments[0]->ReadEntries().status); |
355 | | // Close after testing to ensure correct shutdown |
356 | | // TODO : put this in TearDown() with a test on log state? |
357 | 1 | ASSERT_OK(log_->Close()); |
358 | 1 | } |
359 | | |
360 | | // Test that the reader will not fail if a log file is completely blank. |
361 | | // This happens when it's opened but nothing has been written. |
362 | | // The reader should gracefully handle this situation, but somehow expose that |
363 | | // the segment is uninitialized. See KUDU-140. |
364 | 1 | TEST_F(LogTest, TestBlankLogFile) { |
365 | 1 | BuildLog(); |
366 | | |
367 | | // The log's reader will have a segment... |
368 | 1 | ASSERT_EQ(log_->GetLogReader()->num_segments(), 1); |
369 | | |
370 | | // ...and we're able to read from it. |
371 | 1 | SegmentSequence segments; |
372 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
373 | | |
374 | 1 | auto read_entries = segments[0]->ReadEntries(); |
375 | 1 | ASSERT_OK(read_entries.status); |
376 | | |
377 | | // ...It's just that it's empty. |
378 | 1 | ASSERT_EQ(read_entries.entries.size(), 0); |
379 | 1 | } |
380 | | |
381 | | void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place, |
382 | 4 | Status expected_status, int expected_entries) { |
383 | 4 | const int kNumEntries = 4; |
384 | 4 | BuildLog(); |
385 | 4 | OpIdPB op_id = MakeOpId(1, 1); |
386 | 4 | ASSERT_OK(AppendNoOps(&op_id, kNumEntries)); |
387 | | |
388 | | // Find the entry that we want to corrupt before closing the log. |
389 | 4 | LogIndexEntry entry; |
390 | 4 | ASSERT_OK(log_->log_index_->GetEntry(4, &entry)); |
391 | | |
392 | 4 | ASSERT_OK(log_->Close()); |
393 | | |
394 | | // Corrupt the log as specified. |
395 | 4 | ssize_t offset = 0; |
396 | 4 | switch (place) { |
397 | 2 | case IN_HEADER: |
398 | 2 | offset = entry.offset_in_segment + 1; |
399 | 2 | break; |
400 | 2 | case IN_ENTRY: |
401 | 2 | offset = entry.offset_in_segment + kEntryHeaderSize + 1; |
402 | 2 | break; |
403 | 4 | } |
404 | 4 | ASSERT_OK(CorruptLogFile(env_.get(), log_->ActiveSegmentForTests()->path(), type, offset)); |
405 | | |
406 | | // Open a new reader -- we don't reuse the existing LogReader from log_ |
407 | | // because it has a cached header. |
408 | 4 | std::unique_ptr<LogReader> reader; |
409 | 4 | ASSERT_OK(LogReader::Open(fs_manager_->env(), |
410 | 4 | make_scoped_refptr(new LogIndex(log_->wal_dir_)), "Log reader: ", |
411 | 4 | tablet_wal_path_, nullptr, nullptr, &reader)); |
412 | 4 | ASSERT_EQ(1, reader->num_segments()); |
413 | | |
414 | 4 | SegmentSequence segments; |
415 | 4 | ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); |
416 | 4 | auto read_entries = segments[0]->ReadEntries(); |
417 | 8 | ASSERT_EQ(read_entries.status.CodeAsString(), expected_status.CodeAsString()) |
418 | 8 | << "Got unexpected status: " << read_entries.status; |
419 | | |
420 | | // Last entry is ignored, but we should still see the previous ones. |
421 | 4 | ASSERT_EQ(expected_entries, read_entries.entries.size()); |
422 | 4 | } |
423 | | |
424 | | // Tests that the log reader reads up until some truncated entry is found. |
425 | | // It should still return OK, since on a crash, it's acceptable to have |
426 | | // a partial entry at EOF. |
427 | 1 | TEST_F(LogTest, TestTruncateLogInEntry) { |
428 | 1 | DoCorruptionTest(TRUNCATE_FILE, IN_ENTRY, Status::OK(), 3); |
429 | 1 | } |
430 | | |
431 | | // Same, but truncate in the middle of the header of that entry. |
432 | 1 | TEST_F(LogTest, TestTruncateLogInHeader) { |
433 | 1 | DoCorruptionTest(TRUNCATE_FILE, IN_HEADER, Status::OK(), 3); |
434 | 1 | } |
435 | | |
436 | | // Similar to the above, except flips a byte. In this case, it should return |
437 | | // a Corruption instead of an OK, because we still have a valid footer in |
438 | | // the file (indicating that all of the entries should be valid as well). |
439 | 1 | TEST_F(LogTest, TestCorruptLogInEntry) { |
440 | 1 | DoCorruptionTest(FLIP_BYTE, IN_ENTRY, STATUS(Corruption, ""), 3); |
441 | 1 | } |
442 | | |
443 | | // Same, but corrupt in the middle of the header of that entry. |
444 | 1 | TEST_F(LogTest, TestCorruptLogInHeader) { |
445 | 1 | DoCorruptionTest(FLIP_BYTE, IN_HEADER, STATUS(Corruption, ""), 3); |
446 | 1 | } |
447 | | |
448 | | // Tests log metrics for WAL files size |
449 | 1 | TEST_F(LogTest, TestLogMetrics) { |
450 | 1 | BuildLog(); |
451 | | // Set a small segment size so that we have roll overs. |
452 | 1 | log_->SetMaxSegmentSizeForTests(990); |
453 | 1 | const int kNumEntriesPerBatch = 100; |
454 | | |
455 | 1 | OpIdPB op_id = MakeOpId(1, 1); |
456 | | |
457 | 1 | SegmentSequence segments; |
458 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
459 | 1 | ASSERT_EQ(segments.size(), 1); |
460 | | |
461 | 275 | while (segments.size() < 3) { |
462 | 274 | ASSERT_OK(AppendNoOps(&op_id, kNumEntriesPerBatch)); |
463 | | // Update the segments |
464 | 274 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
465 | 274 | } |
466 | | |
467 | 1 | ASSERT_OK(log_->Close()); |
468 | | |
469 | 1 | int64_t wal_size_old = log_->metrics_->wal_size->value(); |
470 | 1 | BuildLog(); |
471 | 1 | int64_t wal_size_new = log_->metrics_->wal_size->value(); |
472 | | |
473 | 1 | ASSERT_EQ(wal_size_old, wal_size_new); |
474 | 1 | } |
475 | | |
476 | | // Tests that segments roll over when max segment size is reached |
477 | | // and that the player plays all entries in the correct order. |
478 | 1 | TEST_F(LogTest, TestSegmentRollover) { |
479 | 1 | BuildLog(); |
480 | | // Set a small segment size so that we have roll overs. |
481 | 1 | log_->SetMaxSegmentSizeForTests(990); |
482 | 1 | const int kNumEntriesPerBatch = 100; |
483 | | |
484 | 1 | OpIdPB op_id = MakeOpId(1, 1); |
485 | 1 | int num_entries = 0; |
486 | | |
487 | 1 | SegmentSequence segments; |
488 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
489 | | |
490 | 275 | while (segments.size() < 3) { |
491 | 274 | ASSERT_OK(AppendNoOps(&op_id, kNumEntriesPerBatch)); |
492 | 274 | num_entries += kNumEntriesPerBatch; |
493 | | // Update the segments |
494 | 274 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
495 | 274 | } |
496 | | |
497 | 1 | ASSERT_FALSE(segments.back()->HasFooter()); |
498 | 1 | ASSERT_OK(log_->Close()); |
499 | | |
500 | 1 | std::unique_ptr<LogReader> reader; |
501 | 1 | ASSERT_OK(LogReader::Open( |
502 | 1 | fs_manager_->env(), nullptr, "Log reader: ", tablet_wal_path_, nullptr, nullptr, &reader)); |
503 | 1 | ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); |
504 | | |
505 | 1 | ASSERT_TRUE(segments.back()->HasFooter()); |
506 | | |
507 | 1 | size_t total_read = 0; |
508 | 5 | for (const scoped_refptr<ReadableLogSegment>& entry : segments) { |
509 | 5 | auto read_entries = entry->ReadEntries(); |
510 | 5 | if (!read_entries.status.ok()) { |
511 | 0 | FAIL() << "Failed to read entries in segment: " << entry->path() |
512 | 0 | << ". Status: " << read_entries.status |
513 | 0 | << ".\nSegments: " << DumpSegmentsToString(segments); |
514 | 0 | } |
515 | 5 | total_read += read_entries.entries.size(); |
516 | 5 | } |
517 | | |
518 | 1 | ASSERT_EQ(num_entries, total_read); |
519 | 1 | } |
520 | | |
521 | 1 | TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) { |
522 | 1 | const int kNumEntries = 4; |
523 | 1 | BuildLog(); |
524 | | |
525 | 1 | SegmentSequence segments; |
526 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
527 | 1 | ASSERT_EQ(segments.size(), 1); |
528 | 1 | scoped_refptr<ReadableLogSegment> readable_segment = segments[0]; |
529 | | |
530 | 1 | auto header_size = log_->active_segment_->written_offset(); |
531 | 1 | ASSERT_GT(header_size, 0); |
532 | 1 | readable_segment->UpdateReadableToOffset(header_size); |
533 | | |
534 | | // Reading the readable segment now should return OK but yield no |
535 | | // entries. |
536 | 1 | auto read_entries = readable_segment->ReadEntries(); |
537 | 1 | ASSERT_OK(read_entries.status); |
538 | 1 | ASSERT_EQ(read_entries.entries.size(), 0); |
539 | | |
540 | | // Dummy add_entry to help us estimate the size of what |
541 | | // gets written to disk. |
542 | 1 | LogEntryBatchPB batch; |
543 | 1 | OpIdPB op_id = MakeOpId(1, 1); |
544 | 1 | batch.set_mono_time(1); |
545 | 1 | LogEntryPB* log_entry = batch.add_entry(); |
546 | 1 | log_entry->set_type(REPLICATE); |
547 | 1 | ReplicateMsg* repl = log_entry->mutable_replicate(); |
548 | 1 | repl->mutable_id()->CopyFrom(op_id); |
549 | 1 | repl->set_op_type(NO_OP); |
550 | 1 | repl->set_hybrid_time(0L); |
551 | | |
552 | | // Entries are prefixed with a header. |
553 | 1 | auto single_entry_size = batch.ByteSize() + kEntryHeaderSize; |
554 | | |
555 | 1 | ssize_t written_entries_size = header_size; |
556 | 1 | ASSERT_OK(AppendNoOps(&op_id, kNumEntries, &written_entries_size)); |
557 | 1 | ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset()); |
558 | 1 | ASSERT_EQ(single_entry_size * kNumEntries, written_entries_size - header_size); |
559 | | |
560 | | // Updating the readable segment with the offset of the first entry should |
561 | | // make it read a single entry even though there are several in the log. |
562 | 1 | readable_segment->UpdateReadableToOffset(header_size + single_entry_size); |
563 | 1 | read_entries = readable_segment->ReadEntries(); |
564 | 1 | ASSERT_OK(read_entries.status); |
565 | 1 | ASSERT_EQ(read_entries.entries.size(), 1); |
566 | | |
567 | | // Now append another entry so that the Log sets the correct readable offset |
568 | | // on the reader. |
569 | 1 | ASSERT_OK(AppendNoOps(&op_id, 1, &written_entries_size)); |
570 | | |
571 | | // Now the reader should be able to read all 5 entries. |
572 | 1 | read_entries = readable_segment->ReadEntries(); |
573 | 1 | ASSERT_OK(read_entries.status); |
574 | 1 | ASSERT_EQ(read_entries.entries.size(), 5); |
575 | | |
576 | | // Offset should get updated for an additional entry. |
577 | 1 | ASSERT_EQ(single_entry_size * (kNumEntries + 1) + header_size, |
578 | 1 | written_entries_size); |
579 | 1 | ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset()); |
580 | | |
581 | | // When we roll it should go back to the header size. |
582 | 1 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
583 | 1 | ASSERT_EQ(header_size, log_->active_segment_->written_offset()); |
584 | 1 | written_entries_size = header_size; |
585 | | |
586 | | // Now that we closed the original segment. If we get a segment from the reader |
587 | | // again, we should get one with a footer and we should be able to read all entries. |
588 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
589 | 1 | ASSERT_EQ(segments.size(), 2); |
590 | 1 | readable_segment = segments[0]; |
591 | 1 | read_entries = readable_segment->ReadEntries(); |
592 | 1 | ASSERT_OK(read_entries.status); |
593 | 1 | ASSERT_EQ(read_entries.entries.size(), 5); |
594 | | |
595 | | // Offset should get updated for an additional entry, again. |
596 | 1 | ASSERT_OK(AppendNoOp(&op_id, &written_entries_size)); |
597 | 1 | ASSERT_EQ(single_entry_size + header_size, written_entries_size); |
598 | 1 | ASSERT_EQ(written_entries_size, log_->active_segment_->written_offset()); |
599 | 1 | } |
600 | | |
601 | | // Tests that segments can be GC'd while the log is running. |
602 | 1 | TEST_F(LogTest, TestGCWithLogRunning) { |
603 | 1 | BuildLog(); |
604 | | |
605 | 1 | vector<LogAnchor*> anchors; |
606 | 1 | ElementDeleter deleter(&anchors); |
607 | | |
608 | 1 | SegmentSequence segments; |
609 | | |
610 | 1 | const int kNumTotalSegments = 4; |
611 | 1 | const int kNumOpsPerSegment = 5; |
612 | 1 | int num_gced_segments; |
613 | 1 | OpIdPB op_id = MakeOpId(1, 1); |
614 | 1 | int64_t anchored_index = -1; |
615 | | |
616 | 1 | ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
617 | 1 | &op_id, &anchors)); |
618 | | |
619 | | // We should get 4 anchors, each pointing at the beginning of a new segment |
620 | 1 | ASSERT_EQ(anchors.size(), 4); |
621 | | |
622 | | // Anchors should prevent GC. |
623 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
624 | 2 | ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments); |
625 | 1 | ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index)); |
626 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
627 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
628 | 2 | ASSERT_EQ(4, segments.size()) << DumpSegmentsToString(segments); |
629 | | |
630 | | // Freeing the first 2 anchors should allow GC of them. |
631 | 1 | ASSERT_OK(log_anchor_registry_->Unregister(anchors[0])); |
632 | 1 | ASSERT_OK(log_anchor_registry_->Unregister(anchors[1])); |
633 | 1 | ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index)); |
634 | | // We should now be anchored on op 0.11, i.e. on the 3rd segment |
635 | 1 | ASSERT_EQ(anchors[2]->log_index, anchored_index); |
636 | | |
637 | | // However, first, we'll try bumping the min retention threshold and |
638 | | // verify that we don't GC any. |
639 | 1 | { |
640 | 1 | google::FlagSaver saver; |
641 | 1 | FLAGS_log_min_segments_to_retain = 10; |
642 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
643 | 1 | ASSERT_EQ(0, num_gced_segments); |
644 | 1 | } |
645 | | |
646 | | // Try again without the modified flag. |
647 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
648 | 2 | ASSERT_EQ(2, num_gced_segments) << DumpSegmentsToString(segments); |
649 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
650 | 2 | ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
651 | | |
652 | | // Release the remaining "rolled segment" anchor. GC will not delete the |
653 | | // last rolled segment. |
654 | 1 | ASSERT_OK(log_anchor_registry_->Unregister(anchors[2])); |
655 | 1 | ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index)); |
656 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
657 | 2 | ASSERT_EQ(0, num_gced_segments) << DumpSegmentsToString(segments); |
658 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
659 | 2 | ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
660 | | |
661 | | // Check that we get a NotFound if we try to read before the GCed point. |
662 | 1 | { |
663 | 1 | ReplicateMsgs repls; |
664 | 1 | int64_t starting_op_segment_seq_num; |
665 | 1 | yb::SchemaPB schema; |
666 | 1 | uint32_t schema_version; |
667 | 1 | Status s = log_->GetLogReader()->ReadReplicatesInRange( |
668 | 1 | 1, 2, LogReader::kNoSizeLimit, &repls, &starting_op_segment_seq_num, |
669 | 1 | &schema, &schema_version); |
670 | 2 | ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
671 | 1 | } |
672 | | |
673 | 1 | ASSERT_OK(log_->Close()); |
674 | 1 | CheckRightNumberOfSegmentFiles(2); |
675 | | |
676 | | // We skip the first three, since we unregistered them above. |
677 | 2 | for (int i = 3; i < kNumTotalSegments; i++) { |
678 | 1 | ASSERT_OK(log_anchor_registry_->Unregister(anchors[i])); |
679 | 1 | } |
680 | 1 | } |
681 | | |
682 | | // Test that, when we are set to retain a given number of log segments, |
683 | | // we also retain any relevant log index chunks, even if those operations |
684 | | // are not necessary for recovery. |
685 | 1 | TEST_F(LogTest, TestGCOfIndexChunks) { |
686 | 1 | FLAGS_log_min_segments_to_retain = 4; |
687 | 1 | BuildLog(); |
688 | | |
689 | | // Append some segments which cross from one index chunk into another. |
690 | | // 999990-999994 \___ the first index |
691 | | // 999995-999999 / chunk points to these |
692 | | // 1000000-100004 \_ |
693 | | // 1000005-100009 _|- the second index chunk points to these |
694 | | // 1000010-<still open> / |
695 | 1 | const int kNumTotalSegments = 5; |
696 | 1 | const int kNumOpsPerSegment = 5; |
697 | 1 | OpIdPB op_id = MakeOpId(1, 999990); |
698 | 1 | ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
699 | 1 | &op_id, nullptr)); |
700 | | |
701 | | // Run a GC on an op in the second index chunk. We should remove only the |
702 | | // earliest segment, because we are set to retain 4. |
703 | 1 | int num_gced_segments = 0; |
704 | 1 | ASSERT_OK(log_->GC(1000006, &num_gced_segments)); |
705 | 1 | ASSERT_EQ(1, num_gced_segments); |
706 | | |
707 | | // And we should still be able to read ops in the retained segment, even though |
708 | | // the GC index was higher. |
709 | 1 | auto loaded_op = ASSERT_RESULT(log_->GetLogReader()->LookupOpId(999995)); |
710 | 1 | ASSERT_EQ(yb::OpId(1, 999995), loaded_op); |
711 | | |
712 | | // If we drop the retention count down to 1, we can now GC, and the log index |
713 | | // chunk should also be GCed. |
714 | 1 | FLAGS_log_min_segments_to_retain = 1; |
715 | 1 | ASSERT_OK(log_->GC(1000003, &num_gced_segments)); |
716 | 1 | ASSERT_EQ(1, num_gced_segments); |
717 | | |
718 | 1 | auto result = log_->GetLogReader()->LookupOpId(999995); |
719 | | // This test relies on kEntriesPerIndexChunk being 1000000, and that's no longer |
720 | | // the case after D1719 (2fe27d886390038bc734ea28638a1b1435e7d0d4) on Mac. |
721 | | #if !defined(__APPLE__) |
722 | | ASSERT_TRUE(!result.ok() && result.status().IsNotFound()) << "unexpected status: " << result; |
723 | | #endif |
724 | 1 | } |
725 | | |
726 | | // Tests that we can append FLUSH_MARKER messages to the log queue to make sure |
727 | | // all messages up to a certain point were fsync()ed without actually |
728 | | // writing them to the log. |
729 | 1 | TEST_F(LogTest, TestWaitUntilAllFlushed) { |
730 | 1 | BuildLog(); |
731 | | // Append 2 replicate pairs asynchronously |
732 | 1 | AppendReplicateBatchToLog(2, AppendSync::kTrue); |
733 | | |
734 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
735 | | |
736 | | // Make sure we only get 4 entries back and that no FLUSH_MARKER commit is found. |
737 | 1 | vector<scoped_refptr<ReadableLogSegment> > segments; |
738 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
739 | | |
740 | 1 | auto read_entries = segments[0]->ReadEntries(); |
741 | 1 | ASSERT_OK(read_entries.status); |
742 | 1 | ASSERT_EQ(read_entries.entries.size(), 2); |
743 | 3 | for (size_t i = 0; i < read_entries.entries.size(); i++) { |
744 | 2 | ASSERT_TRUE(read_entries.entries[i]->has_replicate()); |
745 | 2 | } |
746 | 1 | } |
747 | | |
748 | | // Tests log reopening and that GC'ing the old log's segments works. |
749 | 1 | TEST_F(LogTest, TestLogReopenAndGC) { |
750 | 1 | BuildLog(); |
751 | | |
752 | 1 | SegmentSequence segments; |
753 | | |
754 | 1 | vector<LogAnchor*> anchors; |
755 | 1 | ElementDeleter deleter(&anchors); |
756 | | |
757 | 1 | const int kNumTotalSegments = 3; |
758 | 1 | const int kNumOpsPerSegment = 5; |
759 | 1 | int num_gced_segments; |
760 | 1 | OpIdPB op_id = MakeOpId(1, 1); |
761 | 1 | int64_t anchored_index = -1; |
762 | | |
763 | 1 | ASSERT_OK(AppendMultiSegmentSequence(kNumTotalSegments, kNumOpsPerSegment, |
764 | 1 | &op_id, &anchors)); |
765 | | // Anchors should prevent GC. |
766 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
767 | 1 | ASSERT_EQ(3, segments.size()); |
768 | 1 | ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index)); |
769 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
770 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
771 | 1 | ASSERT_EQ(3, segments.size()); |
772 | | |
773 | 1 | ASSERT_OK(log_->Close()); |
774 | | |
775 | | // Now reopen the log as if we had replayed the state into the stores. |
776 | | // that were in memory and do GC. |
777 | 1 | BuildLog(); |
778 | | |
779 | | // The "old" data consists of 3 segments. We still hold anchors. |
780 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
781 | 1 | ASSERT_EQ(4, segments.size()); |
782 | | |
783 | | // Write to a new log segment, as if we had taken new requests and the |
784 | | // mem stores are holding anchors, but don't roll it. |
785 | 1 | CreateAndRegisterNewAnchor(op_id.index(), &anchors); |
786 | 1 | ASSERT_OK(AppendNoOps(&op_id, kNumOpsPerSegment)); |
787 | | |
788 | | // Now release the "old" anchors and GC them. |
789 | 4 | for (int i = 0; i < 3; i++) { |
790 | 3 | ASSERT_OK(log_anchor_registry_->Unregister(anchors[i])); |
791 | 3 | } |
792 | 1 | ASSERT_OK(log_anchor_registry_->GetEarliestRegisteredLogIndex(&anchored_index)); |
793 | | |
794 | | // If we set the min_seconds_to_retain high, then we'll retain the logs even |
795 | | // though we could GC them based on our anchoring. |
796 | 1 | FLAGS_log_min_seconds_to_retain = 500; |
797 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
798 | 1 | ASSERT_EQ(0, num_gced_segments); |
799 | | |
800 | | // Turn off the time-based retention and try GCing again. This time |
801 | | // we should succeed. |
802 | 1 | FLAGS_log_min_seconds_to_retain = 0; |
803 | 1 | log_->set_wal_retention_secs(0); |
804 | 1 | ASSERT_OK(log_->GC(anchored_index, &num_gced_segments)); |
805 | 1 | ASSERT_EQ(2, num_gced_segments); |
806 | | |
807 | | // After GC there should be only one left, besides the one currently being |
808 | | // written to. That is because min_segments_to_retain defaults to 2. |
809 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
810 | 2 | ASSERT_EQ(2, segments.size()) << DumpSegmentsToString(segments); |
811 | 1 | ASSERT_OK(log_->Close()); |
812 | | |
813 | 1 | CheckRightNumberOfSegmentFiles(2); |
814 | | |
815 | | // Unregister the final anchor. |
816 | 1 | ASSERT_OK(log_anchor_registry_->Unregister(anchors[3])); |
817 | 1 | } |
818 | | |
819 | | // Helper to measure the performance of the log. |
820 | 1 | TEST_F(LogTest, TestWriteManyBatches) { |
821 | 1 | uint64_t num_batches = 10; |
822 | 1 | if (AllowSlowTests()) { |
823 | 0 | num_batches = FLAGS_num_batches; |
824 | 0 | } |
825 | 1 | BuildLog(); |
826 | | |
827 | 1 | LOG(INFO)<< "Starting to write " << num_batches << " to log"; |
828 | 1 | LOG_TIMING(INFO, "Wrote all batches to log") { |
829 | 1 | AppendReplicateBatchToLog(num_batches); |
830 | 1 | } |
831 | 1 | ASSERT_OK(log_->Close()); |
832 | 1 | LOG(INFO) << "Done writing"; |
833 | | |
834 | 1 | LOG_TIMING(INFO, "Read all entries from Log") { |
835 | 1 | LOG(INFO) << "Starting to read log"; |
836 | 1 | uint32_t num_entries = 0; |
837 | | |
838 | 1 | std::unique_ptr<LogReader> reader; |
839 | 1 | ASSERT_OK(LogReader::Open( |
840 | 1 | fs_manager_->env(), /* index= */ nullptr, "Log reader: ", tablet_wal_path_, |
841 | 1 | /* table_metric_entity= */ nullptr, /* tablet_metric_entity= */ nullptr, &reader)); |
842 | | |
843 | 1 | std::vector<scoped_refptr<ReadableLogSegment> > segments; |
844 | 1 | ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); |
845 | | |
846 | 1 | for (const scoped_refptr<ReadableLogSegment>& entry : segments) { |
847 | 1 | auto read_entries = entry->ReadEntries(); |
848 | 1 | ASSERT_OK(read_entries.status); |
849 | 1 | num_entries += read_entries.entries.size(); |
850 | 1 | } |
851 | 1 | ASSERT_EQ(num_entries, num_batches); |
852 | 1 | LOG(INFO) << "End readfile"; |
853 | 1 | } |
854 | 1 | } |
855 | | |
856 | | // This tests that querying LogReader works. |
857 | | // This sets up a reader with some segments to query which amount to the |
858 | | // following: |
859 | | // seg002: 0.10 through 0.19 |
860 | | // seg003: 0.20 through 0.29 |
861 | | // seg004: 0.30 through 0.39 |
862 | 1 | TEST_F(LogTest, TestLogReader) { |
863 | 1 | LogReader reader(fs_manager_->env(), |
864 | 1 | scoped_refptr<LogIndex>(), |
865 | 1 | "Log reader: ", |
866 | 1 | nullptr, |
867 | 1 | nullptr); |
868 | 1 | ASSERT_OK(reader.InitEmptyReaderForTests()); |
869 | 1 | ASSERT_OK(AppendNewEmptySegmentToReader(2, 10, &reader)); |
870 | 1 | ASSERT_OK(AppendNewEmptySegmentToReader(3, 20, &reader)); |
871 | 1 | ASSERT_OK(AppendNewEmptySegmentToReader(4, 30, &reader)); |
872 | | |
873 | 1 | OpIdPB op; |
874 | 1 | op.set_term(0); |
875 | 1 | SegmentSequence segments; |
876 | | |
877 | | // Queries for segment prefixes (used for GC) |
878 | | |
879 | | // Asking the reader the prefix of segments that does not include op 1 |
880 | | // should return the empty set. |
881 | 1 | ASSERT_OK(reader.GetSegmentPrefixNotIncluding(1, &segments)); |
882 | 1 | ASSERT_TRUE(segments.empty()); |
883 | | |
884 | | // .. same for op 10 |
885 | 1 | ASSERT_OK(reader.GetSegmentPrefixNotIncluding(10, &segments)); |
886 | 1 | ASSERT_TRUE(segments.empty()); |
887 | | |
888 | | // Asking for the prefix of segments not including op 20 should return |
889 | | // the first segment, since 20 is the first operation in segment 3. |
890 | 1 | ASSERT_OK(reader.GetSegmentPrefixNotIncluding(20, &segments)); |
891 | 1 | ASSERT_EQ(segments.size(), 1); |
892 | 1 | ASSERT_EQ(segments[0]->header().sequence_number(), 2); |
893 | | |
894 | | // Asking for 30 should include the first two. |
895 | 1 | ASSERT_OK(reader.GetSegmentPrefixNotIncluding(30, &segments)); |
896 | 1 | ASSERT_EQ(segments.size(), 2); |
897 | 1 | ASSERT_EQ(segments[0]->header().sequence_number(), 2); |
898 | 1 | ASSERT_EQ(segments[1]->header().sequence_number(), 3); |
899 | | |
900 | | // Asking for anything higher should return all segments. |
901 | 1 | ASSERT_OK(reader.GetSegmentPrefixNotIncluding(1000, &segments)); |
902 | 1 | ASSERT_EQ(segments.size(), 3); |
903 | 1 | ASSERT_EQ(segments[0]->header().sequence_number(), 2); |
904 | 1 | ASSERT_EQ(segments[1]->header().sequence_number(), 3); |
905 | | |
906 | | // Queries for specific segment sequence numbers. |
907 | 1 | scoped_refptr<ReadableLogSegment> segment = reader.GetSegmentBySequenceNumber(2); |
908 | 1 | ASSERT_EQ(2, segment->header().sequence_number()); |
909 | 1 | segment = reader.GetSegmentBySequenceNumber(3); |
910 | 1 | ASSERT_EQ(3, segment->header().sequence_number()); |
911 | | |
912 | 1 | segment = reader.GetSegmentBySequenceNumber(4); |
913 | 1 | ASSERT_EQ(4, segment->header().sequence_number()); |
914 | | |
915 | 1 | segment = reader.GetSegmentBySequenceNumber(5); |
916 | 1 | ASSERT_TRUE(segment.get() == nullptr); |
917 | 1 | } |
918 | | |
919 | | // Test that, even if the LogReader's index is empty because no segments |
920 | | // have been properly closed, we can still read the entries as the reader |
921 | | // returns the current segment. |
922 | 1 | TEST_F(LogTest, TestLogReaderReturnsLatestSegmentIfIndexEmpty) { |
923 | 1 | BuildLog(); |
924 | | |
925 | 1 | AppendReplicateBatch({ |
926 | 1 | .op_id = {1, 1}, |
927 | 1 | .committed_op_id = {0, 0}, |
928 | 1 | .writes = {}, |
929 | 1 | .sync = AppendSync::kTrue, |
930 | 1 | }); |
931 | | |
932 | 1 | SegmentSequence segments; |
933 | 1 | ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments)); |
934 | 1 | ASSERT_EQ(segments.size(), 1); |
935 | | |
936 | 1 | auto read_entries = segments[0]->ReadEntries(); |
937 | 1 | ASSERT_OK(read_entries.status); |
938 | 1 | ASSERT_EQ(1, read_entries.entries.size()); |
939 | 1 | } |
940 | | |
941 | 1 | TEST_F(LogTest, TestOpIdUtils) { |
942 | 1 | OpIdPB id = MakeOpId(1, 2); |
943 | 1 | ASSERT_EQ("1.2", consensus::OpIdToString(id)); |
944 | 1 | ASSERT_EQ(1, id.term()); |
945 | 1 | ASSERT_EQ(2, id.index()); |
946 | 1 | } |
947 | | |
948 | 53 | std::ostream& operator<<(std::ostream& os, const TestLogSequenceElem& elem) { |
949 | 53 | switch (elem.type) { |
950 | 3 | case TestLogSequenceElem::ROLL: |
951 | 3 | os << "ROLL"; |
952 | 3 | break; |
953 | 50 | case TestLogSequenceElem::REPLICATE: |
954 | 50 | os << "R" << elem.id; |
955 | 50 | break; |
956 | 53 | } |
957 | 53 | return os; |
958 | 53 | } |
959 | | |
960 | | // Generates a plausible sequence of items in the log, including term changes, moving the |
961 | | // index backwards, log rolls, etc. |
962 | | // |
963 | | // NOTE: this log sequence may contain some aberrations which would not occur in a real |
964 | | // consensus log, but our API supports them. In the future we may want to add assertions |
965 | | // to the Log implementation that prevent such aberrations, in which case we'd need to |
966 | | // modify this. |
967 | | void LogTest::GenerateTestSequence(size_t seq_len, |
968 | | vector<TestLogSequenceElem>* ops, |
969 | 1 | vector<int64_t>* terms_by_index) { |
970 | 1 | auto rng = &ThreadLocalRandom(); |
971 | 1 | terms_by_index->assign(seq_len + 1, -1); |
972 | 1 | int64_t committed_index = 0; |
973 | 1 | int64_t max_repl_index = 0; |
974 | | |
975 | 1 | OpIdPB id = MakeOpId(1, 0); |
976 | 51 | for (size_t i = 0; i < seq_len; i++) { |
977 | 50 | if (RandomUniformInt(0, 4, rng) == 0) { |
978 | | // Reset term - it may stay the same, or go up/down |
979 | 12 | id.set_term(std::max(static_cast<int64_t>(1), id.term() + RandomUniformInt(0, 4, rng) - 2)); |
980 | 12 | } |
981 | | |
982 | | // Advance index by exactly one |
983 | 50 | id.set_index(id.index() + 1); |
984 | | |
985 | 50 | if (RandomUniformInt(0, 4, rng) == 0) { |
986 | | // Move index backward a bit, but not past the committed index |
987 | 8 | id.set_index(std::max(committed_index + 1, id.index() - RandomUniformInt(0, 4, rng))); |
988 | 8 | } |
989 | | |
990 | | // Roll the log sometimes |
991 | 50 | if (i != 0 && RandomUniformInt(0, 14, rng) == 0) { |
992 | 3 | TestLogSequenceElem op; |
993 | 3 | op.type = TestLogSequenceElem::ROLL; |
994 | 3 | ops->push_back(op); |
995 | 3 | } |
996 | | |
997 | 50 | TestLogSequenceElem op; |
998 | 50 | op.type = TestLogSequenceElem::REPLICATE; |
999 | 50 | op.id = id; |
1000 | 50 | ops->push_back(op); |
1001 | 50 | (*terms_by_index)[id.index()] = id.term(); |
1002 | 50 | max_repl_index = std::max(max_repl_index, id.index()); |
1003 | 50 | } |
1004 | 1 | terms_by_index->resize(max_repl_index + 1); |
1005 | 1 | } |
1006 | | |
1007 | 1 | void LogTest::AppendTestSequence(const vector<TestLogSequenceElem>& seq) { |
1008 | 53 | for (const TestLogSequenceElem& e : seq) { |
1009 | 0 | VLOG(1) << "Appending: " << e; |
1010 | 53 | switch (e.type) { |
1011 | 50 | case TestLogSequenceElem::REPLICATE: |
1012 | 50 | { |
1013 | 50 | OpIdPB id(e.id); |
1014 | 50 | ASSERT_OK(AppendNoOp(&id)); |
1015 | 50 | break; |
1016 | 50 | } |
1017 | 3 | case TestLogSequenceElem::ROLL: |
1018 | 3 | { |
1019 | 3 | ASSERT_OK(RollLog()); |
1020 | 3 | } |
1021 | 53 | } |
1022 | 53 | } |
1023 | 1 | } |
1024 | | |
1025 | | // Test that if multiple REPLICATE entries are written for the same index, |
1026 | | // that we read the latest one. |
1027 | | // |
1028 | | // This is a randomized test: we generate a plausible sequence of log messages, |
1029 | | // write it out, and then read random ranges of log indexes, making sure we |
1030 | | // always see the correct term for each REPLICATE message (i.e whichever term |
1031 | | // was the last to append it). |
1032 | 1 | TEST_F(LogTest, TestReadLogWithReplacedReplicates) { |
1033 | 1 | const int kSequenceLength = AllowSlowTests() ? 1000 : 50; |
1034 | | |
1035 | 1 | vector<int64_t> terms_by_index; |
1036 | 1 | vector<TestLogSequenceElem> seq; |
1037 | 1 | GenerateTestSequence(kSequenceLength, &seq, &terms_by_index); |
1038 | 1 | LOG(INFO) << "test sequence: " << seq; |
1039 | 1 | const int64_t max_repl_index = terms_by_index.size() - 1; |
1040 | 1 | LOG(INFO) << "max_repl_index: " << max_repl_index; |
1041 | | |
1042 | | // Write the test sequence to the log. |
1043 | | // TODO: should consider adding batching here of multiple replicates |
1044 | 1 | BuildLog(); |
1045 | 1 | AppendTestSequence(seq); |
1046 | | |
1047 | 1 | const int kNumRandomReads = 100; |
1048 | | |
1049 | | // We'll advance 'gc_index' randomly through the log until we've gotten to |
1050 | | // the end. This ensures that, when we GC, we don't ever remove the latest |
1051 | | // version of a replicate message unintentionally. |
1052 | 1 | LogReader* reader = log_->GetLogReader(); |
1053 | 10 | for (int gc_index = 1; gc_index < max_repl_index;) { |
1054 | 9 | SCOPED_TRACE(Substitute("after GCing $0", gc_index)); |
1055 | | |
1056 | | // Test reading random ranges of indexes and verifying that we get back the |
1057 | | // REPLICATE messages with the correct terms |
1058 | 909 | for (int random_read = 0; random_read < kNumRandomReads; random_read++) { |
1059 | 900 | auto start_index = RandomUniformInt<int64_t>(gc_index, max_repl_index - 1); |
1060 | 900 | auto end_index = RandomUniformInt<int64_t>(start_index, max_repl_index); |
1061 | 900 | int64_t starting_op_segment_seq_num; |
1062 | 900 | yb::SchemaPB schema; |
1063 | 900 | uint32_t schema_version; |
1064 | 900 | { |
1065 | 900 | SCOPED_TRACE(Substitute("Reading $0-$1", start_index, end_index)); |
1066 | 900 | consensus::ReplicateMsgs repls; |
1067 | 900 | ASSERT_OK(reader->ReadReplicatesInRange(start_index, end_index, |
1068 | 900 | LogReader::kNoSizeLimit, &repls, |
1069 | 900 | &starting_op_segment_seq_num, |
1070 | 900 | &schema, &schema_version)); |
1071 | 900 | ASSERT_EQ(end_index - start_index + 1, repls.size()); |
1072 | 900 | auto expected_index = start_index; |
1073 | 5.81k | for (const auto& repl : repls) { |
1074 | 5.81k | ASSERT_EQ(expected_index, repl->id().index()); |
1075 | 5.81k | ASSERT_EQ(terms_by_index[expected_index], repl->id().term()); |
1076 | 5.81k | expected_index++; |
1077 | 5.81k | } |
1078 | 900 | } |
1079 | | |
1080 | 900 | int64_t bytes_read = log_->reader_->bytes_read_->value(); |
1081 | 900 | int64_t entries_read = log_->reader_->entries_read_->value(); |
1082 | 900 | int64_t read_batch_count = log_->reader_->read_batch_latency_->TotalCount(); |
1083 | 900 | EXPECT_GT(log_->reader_->bytes_read_->value(), 0); |
1084 | 900 | EXPECT_GT(log_->reader_->entries_read_->value(), 0); |
1085 | 900 | EXPECT_GT(log_->reader_->read_batch_latency_->TotalCount(), 0); |
1086 | | |
1087 | | // Test a size-limited read. |
1088 | 900 | int size_limit = RandomUniformInt(1, 1000); |
1089 | 900 | { |
1090 | 900 | SCOPED_TRACE(Substitute("Reading $0-$1 with size limit $2", |
1091 | 900 | start_index, end_index, size_limit)); |
1092 | 900 | ReplicateMsgs repls; |
1093 | 900 | ASSERT_OK(reader->ReadReplicatesInRange(start_index, end_index, size_limit, &repls, |
1094 | 900 | &starting_op_segment_seq_num, |
1095 | 900 | &schema, &schema_version)); |
1096 | 900 | ASSERT_LE(repls.size(), end_index - start_index + 1); |
1097 | 900 | int total_size = 0; |
1098 | 900 | auto expected_index = start_index; |
1099 | 1.87k | for (const auto& repl : repls) { |
1100 | 1.87k | ASSERT_EQ(expected_index, repl->id().index()); |
1101 | 1.87k | ASSERT_EQ(terms_by_index[expected_index], repl->id().term()); |
1102 | 1.87k | expected_index++; |
1103 | 1.87k | total_size += repl->SpaceUsed(); |
1104 | 1.87k | } |
1105 | 900 | if (total_size > size_limit) { |
1106 | 155 | ASSERT_EQ(1, repls.size()); |
1107 | 745 | } else { |
1108 | 745 | ASSERT_LE(total_size, size_limit); |
1109 | 745 | } |
1110 | 900 | } |
1111 | | |
1112 | 900 | EXPECT_GT(log_->reader_->bytes_read_->value(), bytes_read); |
1113 | 900 | EXPECT_GT(log_->reader_->entries_read_->value(), entries_read); |
1114 | 900 | EXPECT_GT(log_->reader_->read_batch_latency_->TotalCount(), read_batch_count); |
1115 | 900 | } |
1116 | | |
1117 | 9 | int num_gced = 0; |
1118 | 9 | ASSERT_OK(log_->GC(gc_index, &num_gced)); |
1119 | 9 | gc_index += RandomUniformInt(0, 9); |
1120 | 9 | } |
1121 | 1 | } |
1122 | | |
1123 | | // Ensure that we can read replicate messages from the LogReader with a very |
1124 | | // high (> 32 bit) log index and term. Regression test for KUDU-1933. |
1125 | 1 | TEST_F(LogTest, TestReadReplicatesHighIndex) { |
1126 | 1 | const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3; |
1127 | 1 | const int kSequenceLength = 10; |
1128 | | |
1129 | 1 | BuildLog(); |
1130 | 1 | OpIdPB op_id; |
1131 | 1 | op_id.set_term(first_log_index); |
1132 | 1 | op_id.set_index(first_log_index); |
1133 | 1 | ASSERT_OK(AppendNoOps(&op_id, kSequenceLength)); |
1134 | | |
1135 | 1 | auto* reader = log_->GetLogReader(); |
1136 | 1 | ReplicateMsgs repls; |
1137 | 1 | int64_t starting_op_segment_seq_num; |
1138 | 1 | yb::SchemaPB schema; |
1139 | 1 | uint32_t schema_version; |
1140 | 1 | ASSERT_OK(reader->ReadReplicatesInRange(first_log_index, first_log_index + kSequenceLength - 1, |
1141 | 1 | LogReader::kNoSizeLimit, &repls, |
1142 | 1 | &starting_op_segment_seq_num, |
1143 | 1 | &schema, &schema_version)); |
1144 | 1 | ASSERT_EQ(kSequenceLength, repls.size()); |
1145 | 1 | } |
1146 | | |
1147 | 1 | TEST_F(LogTest, AllocateSegmentAndRollOver) { |
1148 | 1 | constexpr auto kNumIters = 10; |
1149 | | |
1150 | | // Big enough to not trigger automated rollover and test manual one. |
1151 | 1 | options_.segment_size_bytes = 1_MB; |
1152 | | |
1153 | 1 | BuildLog(); |
1154 | | |
1155 | 1 | ASSERT_EQ(log_->num_segments(), 1); |
1156 | 1 | ASSERT_EQ(log_->active_segment_sequence_number(), 1); |
1157 | | |
1158 | 11 | for (auto i = 0; i < kNumIters; ++i) { |
1159 | 10 | AppendReplicateBatchToLog(1, AppendSync::kTrue); |
1160 | 10 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
1161 | 10 | } |
1162 | | |
1163 | 1 | ASSERT_EQ(log_->num_segments(), kNumIters + 1); |
1164 | 1 | ASSERT_EQ(log_->active_segment_sequence_number(), kNumIters + 1); |
1165 | | |
1166 | 1 | ASSERT_OK(log_->Close()); |
1167 | 1 | } |
1168 | | |
1169 | 1 | TEST_F(LogTest, ConcurrentAllocateSegmentAndRollOver) { |
1170 | 1 | constexpr auto kNumBatches = 10; |
1171 | 1 | constexpr auto kNumEntriesPerBatch = 10; |
1172 | | |
1173 | | // Trigger rollover aggressively during normal append. |
1174 | 1 | options_.segment_size_bytes = 1; |
1175 | | |
1176 | 1 | BuildLog(); |
1177 | | |
1178 | 11 | for (auto i = 0; i < kNumBatches; ++i) { |
1179 | 10 | AppendReplicateBatchToLog(kNumEntriesPerBatch, AppendSync::kFalse); |
1180 | 10 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
1181 | 10 | } |
1182 | | |
1183 | 1 | LOG(INFO) << "Log segments: " << log_->num_segments(); |
1184 | 1 | ASSERT_GE(log_->num_segments(), kNumBatches); |
1185 | | |
1186 | 1 | ASSERT_OK(log_->Close()); |
1187 | 1 | } |
1188 | | |
1189 | 2 | Result<std::vector<OpId>> LogTest::AppendAndCopy(size_t num_batches, size_t num_entries_per_batch) { |
1190 | 2 | std::vector<OpId> last_op_id_before_copy; |
1191 | 2 | last_op_id_before_copy.reserve(num_batches); |
1192 | 32 | for (size_t i = 0; i < num_batches; ++i) { |
1193 | 30 | AppendReplicateBatchToLog( |
1194 | 15 | num_entries_per_batch, i % 2 == 0 ? AppendSync::kFalse : AppendSync::kTrue); |
1195 | 30 | last_op_id_before_copy.push_back(log_->GetLatestEntryOpId()); |
1196 | 30 | RETURN_NOT_OK(log_->CopyTo(GetLogCopyPath(i))); |
1197 | 30 | } |
1198 | 2 | return last_op_id_before_copy; |
1199 | 2 | } |
1200 | | |
1201 | | Result<SegmentSequence> LogTest::GetSegmentsFromLogCopyAndCheckLastOpIndex( |
1202 | 30 | const size_t copy_idx, const int64_t last_op_min_idx) { |
1203 | 30 | const auto log_copy_dir = GetLogCopyPath(copy_idx); |
1204 | 30 | std::unique_ptr<LogReader> copied_log_reader; |
1205 | 30 | RETURN_NOT_OK(LogReader::Open( |
1206 | 30 | fs_manager_->env(), make_scoped_refptr<LogIndex>(log_copy_dir), "Log reader: ", |
1207 | 30 | log_copy_dir, /* table_metric_entity = */ nullptr, |
1208 | 30 | /* tablet_metric_entity = */ nullptr, &copied_log_reader)); |
1209 | | |
1210 | 30 | SegmentSequence copied_segments; |
1211 | 30 | RETURN_NOT_OK(copied_log_reader->GetSegmentsSnapshot(&copied_segments)); |
1212 | | |
1213 | 30 | SCHECK_GE( |
1214 | 30 | copied_segments.back()->footer().max_replicate_index(), last_op_min_idx, InternalError, |
1215 | 30 | "Last copied operation index should be >= index of last log operation added before calling " |
1216 | 30 | "Log::CopyTo."); |
1217 | | |
1218 | 30 | return copied_segments; |
1219 | 30 | } |
1220 | | |
1221 | | // Verifies CopyTo works in parallel with rollovers triggered by concurrent |
1222 | | // log entries writes. |
1223 | 1 | TEST_F(LogTest, CopyTo) { |
1224 | 1 | constexpr auto kNumBatches = 10; |
1225 | 1 | constexpr auto kNumEntriesPerBatch = 10; |
1226 | | |
1227 | | // Trigger rollover aggressively during normal append. |
1228 | 1 | options_.segment_size_bytes = 1; |
1229 | | |
1230 | 1 | BuildLog(); |
1231 | | |
1232 | 1 | auto last_op_id_before_copy = ASSERT_RESULT(AppendAndCopy(kNumBatches, kNumEntriesPerBatch)); |
1233 | | |
1234 | 1 | SegmentSequence segments; |
1235 | 1 | ASSERT_OK(log_->GetSegmentsSnapshot(&segments)); |
1236 | | |
1237 | 11 | for (auto i = 0; i < kNumBatches; ++i) { |
1238 | 10 | auto copied_segments = ASSERT_RESULT( |
1239 | 10 | GetSegmentsFromLogCopyAndCheckLastOpIndex(i, last_op_id_before_copy[i].index)); |
1240 | 10 | ASSERT_LE(copied_segments.size(), segments.size()); |
1241 | | |
1242 | | // Copied log segments should match log segments of the original log. |
1243 | 190 | for (size_t seg_idx = 0; seg_idx < copied_segments.size(); ++seg_idx) { |
1244 | 180 | auto& segment = segments[seg_idx]; |
1245 | 180 | auto& segment_copy = copied_segments[seg_idx]; |
1246 | | |
1247 | 180 | auto entries_result = segment->ReadEntries(); |
1248 | 180 | ASSERT_OK(entries_result.status); |
1249 | 180 | auto entries_copy_result = segment_copy->ReadEntries(); |
1250 | 180 | ASSERT_OK(entries_copy_result.status); |
1251 | | |
1252 | 180 | ASSERT_EQ(entries_copy_result.committed_op_id, entries_result.committed_op_id); |
1253 | 180 | ASSERT_EQ(entries_copy_result.end_offset, entries_result.end_offset); |
1254 | 180 | ASSERT_EQ(entries_copy_result.entry_metadata, entries_result.entry_metadata); |
1255 | 180 | ASSERT_EQ(entries_copy_result.entries.size(), entries_result.entries.size()); |
1256 | 730 | for (size_t entry_idx = 0; entry_idx < entries_copy_result.entries.size(); ++entry_idx) { |
1257 | 550 | ASSERT_EQ( |
1258 | 550 | entries_copy_result.entries[entry_idx]->DebugString(), |
1259 | 550 | entries_result.entries[entry_idx]->DebugString()); |
1260 | 550 | } |
1261 | 180 | } |
1262 | 10 | } |
1263 | | |
1264 | 1 | ASSERT_OK(log_->Close()); |
1265 | 1 | } |
1266 | | |
1267 | | // Verifies CopyTo works in parallel with rollovers triggered by concurrent |
1268 | | // log entries writes and log GC. |
1269 | 1 | TEST_F(LogTest, CopyToWithConcurrentGc) { |
1270 | 1 | constexpr auto kNumBatches = 20; |
1271 | 1 | constexpr auto kNumEntriesPerBatch = 10; |
1272 | | |
1273 | | // Trigger rollover aggressively during normal append. |
1274 | 1 | options_.segment_size_bytes = 1; |
1275 | | |
1276 | 1 | BuildLog(); |
1277 | | |
1278 | 1 | log_->set_wal_retention_secs(0); |
1279 | 1 | std::atomic<bool> stop_gc{false}; |
1280 | 1 | std::thread gc_thread([log = log_.get(), &stop_gc]{ |
1281 | 2.47k | while (!stop_gc.load()) { |
1282 | 2.47k | auto gc_index = log->GetLatestEntryOpId().index; |
1283 | 2.47k | int num_gced = 0; |
1284 | 2.47k | ASSERT_OK(log->GC(gc_index, &num_gced)); |
1285 | 2.47k | } |
1286 | 1 | }); |
1287 | | |
1288 | 1 | auto last_op_id_before_copy_result = AppendAndCopy(kNumBatches, kNumEntriesPerBatch); |
1289 | 1 | stop_gc = true; |
1290 | 1 | gc_thread.join(); |
1291 | 1 | auto last_op_id_before_copy = ASSERT_RESULT(std::move(last_op_id_before_copy_result)); |
1292 | | |
1293 | 21 | for (auto i = 0; i < kNumBatches; ++i) { |
1294 | 20 | auto copied_segments = ASSERT_RESULT( |
1295 | 20 | GetSegmentsFromLogCopyAndCheckLastOpIndex(i, last_op_id_before_copy[i].index)); |
1296 | | |
1297 | | // Make sure copied log contains a sequence of entries without gaps in index. |
1298 | 20 | int64_t last_index = -1; |
1299 | 123 | for (size_t seg_idx = 0; seg_idx < copied_segments.size(); ++seg_idx) { |
1300 | 103 | auto& segment_copy = copied_segments[seg_idx]; |
1301 | 103 | auto entries_copy_result = segment_copy->ReadEntries(); |
1302 | 103 | ASSERT_OK(entries_copy_result.status); |
1303 | | |
1304 | 291 | for (const auto& entry : entries_copy_result.entries) { |
1305 | 291 | const auto index = entry->replicate().id().index(); |
1306 | 291 | if (last_index >= 0) { |
1307 | 271 | ASSERT_EQ(index, last_index + 1); |
1308 | 271 | } |
1309 | 291 | last_index = index; |
1310 | 291 | } |
1311 | 103 | } |
1312 | 20 | } |
1313 | | |
1314 | 1 | ASSERT_OK(log_->Close()); |
1315 | 1 | } |
1316 | | |
1317 | | } // namespace log |
1318 | | } // namespace yb |