YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/log_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include <string>
25
#include <gtest/gtest.h>
26
27
#include "yb/rocksdb/db/log_reader.h"
28
#include "yb/rocksdb/db/log_writer.h"
29
#include "yb/rocksdb/env.h"
30
#include "yb/rocksdb/util/coding.h"
31
#include "yb/rocksdb/util/crc32c.h"
32
#include "yb/rocksdb/util/file_reader_writer.h"
33
#include "yb/rocksdb/util/random.h"
34
#include "yb/util/test_macros.h"
35
#include "yb/rocksdb/util/testutil.h"
36
37
namespace rocksdb {
38
namespace log {
39
40
// Construct a string of the specified length made out of the supplied
41
// partial string.
42
2.05k
static std::string BigString(const std::string& partial_string, size_t n) {
43
2.05k
  std::string result;
44
5.04M
  while (result.size() < n) {
45
5.04M
    result.append(partial_string);
46
5.04M
  }
47
2.05k
  result.resize(n);
48
2.05k
  return result;
49
2.05k
}
50
51
// Construct a string from a number
52
402k
static std::string NumberString(int n) {
53
402k
  char buf[50];
54
402k
  snprintf(buf, sizeof(buf), "%d.", n);
55
402k
  return std::string(buf);
56
402k
}
57
58
// Return a skewed potentially long string
59
2.00k
static std::string RandomSkewedString(int i, Random* rnd) {
60
2.00k
  return BigString(NumberString(i), rnd->Skewed(17));
61
2.00k
}
62
63
class LogTest : public RocksDBTest,
64
                public ::testing::WithParamInterface<int> {
65
 private:
66
  class StringSource : public SequentialFile {
67
   public:
68
    Slice& contents_;
69
    bool force_error_;
70
    size_t force_error_position_;
71
    bool force_eof_;
72
    size_t force_eof_position_;
73
    bool returned_partial_;
74
75
    explicit StringSource(Slice& contents) : // NOLINT
76
      contents_(contents),
77
      force_error_(false),
78
      force_error_position_(0),
79
      force_eof_(false),
80
      force_eof_position_(0),
81
112
      returned_partial_(false) { }
82
83
480
    Status Read(size_t n, Slice* result, uint8_t* scratch) override {
84
960
      EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
85
86
480
      if (force_error_) {
87
6
        if (force_error_position_ >= n) {
88
0
          force_error_position_ -= n;
89
6
        } else {
90
6
          *result = Slice(contents_.data(), force_error_position_);
91
6
          contents_.remove_prefix(force_error_position_);
92
6
          force_error_ = false;
93
6
          returned_partial_ = true;
94
6
          return STATUS(Corruption, "read error");
95
6
        }
96
474
      }
97
98
474
      if (contents_.size() < n) {
99
82
        n = contents_.size();
100
82
        returned_partial_ = true;
101
82
      }
102
103
474
      if (force_eof_) {
104
14
        if (force_eof_position_ >= n) {
105
10
          force_eof_position_ -= n;
106
4
        } else {
107
4
          force_eof_ = false;
108
4
          n = force_eof_position_;
109
4
          returned_partial_ = true;
110
4
        }
111
14
      }
112
113
      // By using scratch we ensure that caller has control over the
114
      // lifetime of result.data()
115
474
      memcpy(scratch, contents_.data(), n);
116
474
      *result = Slice(scratch, n);
117
118
474
      contents_.remove_prefix(n);
119
474
      return Status::OK();
120
474
    }
121
122
12
    Status Skip(uint64_t n) override {
123
12
      if (n > contents_.size()) {
124
0
        contents_.clear();
125
0
        return STATUS(NotFound, "in-memory file skipepd past end");
126
0
      }
127
128
12
      contents_.remove_prefix(n);
129
130
12
      return Status::OK();
131
12
    }
132
133
0
    const std::string& filename() const override {
134
0
      static const std::string kFilename = "StringSource";
135
0
      return kFilename;
136
0
    }
137
  };
138
139
  class ReportCollector : public Reader::Reporter {
140
   public:
141
    size_t dropped_bytes_;
142
    std::string message_;
143
144
86
    ReportCollector() : dropped_bytes_(0) { }
145
45
    void Corruption(size_t bytes, const Status& status) override {
146
45
      dropped_bytes_ += bytes;
147
45
      message_.append(status.ToString(false));
148
45
    }
149
  };
150
151
65.5k
  std::string& dest_contents() {
152
65.5k
    auto dest =
153
65.5k
      dynamic_cast<test::StringSink*>(writer_.file()->writable_file());
154
65.5k
    assert(dest);
155
65.5k
    return dest->contents_;
156
65.5k
  }
157
158
12
  const std::string& dest_contents() const {
159
12
    auto dest =
160
12
      dynamic_cast<const test::StringSink*>(writer_.file()->writable_file());
161
12
    assert(dest);
162
12
    return dest->contents_;
163
12
  }
164
165
0
  void reset_source_contents() {
166
0
    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
167
0
    assert(src);
168
0
    src->contents_ = dest_contents();
169
0
  }
170
171
  Slice reader_contents_;
172
  unique_ptr<WritableFileWriter> dest_holder_;
173
  unique_ptr<SequentialFileReader> source_holder_;
174
  ReportCollector report_;
175
  Writer writer_;
176
  Reader reader_;
177
178
  // Record metadata for testing initial offset functionality
179
  static size_t initial_offset_record_sizes_[];
180
  uint64_t initial_offset_last_record_offsets_[4];
181
182
 public:
183
  LogTest()
184
      : reader_contents_(),
185
        dest_holder_(test::GetWritableFileWriter(
186
            new test::StringSink(&reader_contents_))),
187
        source_holder_(
188
            test::GetSequentialFileReader(new StringSource(reader_contents_))),
189
        writer_(std::move(dest_holder_), 123, GetParam()),
190
        reader_(NULL, std::move(source_holder_), &report_, true /*checksum*/,
191
86
                0 /*initial_offset*/, 123) {
192
43
    int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
193
86
    initial_offset_last_record_offsets_[0] = 0;
194
86
    initial_offset_last_record_offsets_[1] = header_size + 10000;
195
86
    initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
196
86
    initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
197
86
                                             (2 * log::kBlockSize - 1000) +
198
86
                                             3 * header_size;
199
86
  }
200
201
201k
  void Write(const std::string& msg) {
202
201k
    ASSERT_OK(writer_.AddRecord(Slice(msg)));
203
201k
  }
204
205
12
  size_t WrittenBytes() const {
206
12
    return dest_contents().size();
207
12
  }
208
209
  std::string Read(const WALRecoveryMode wal_recovery_mode =
210
201k
                       WALRecoveryMode::kTolerateCorruptedTailRecords) {
211
201k
    std::string scratch;
212
201k
    Slice record;
213
201k
    if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
214
201k
      return record.ToString();
215
58
    } else {
216
58
      return "EOF";
217
58
    }
218
201k
  }
219
220
6
  void IncrementByte(int offset, int delta) {
221
6
    dest_contents()[offset] += delta;
222
6
  }
223
224
65.5k
  void SetByte(int offset, char new_byte) {
225
65.5k
    dest_contents()[offset] = new_byte;
226
65.5k
  }
227
228
16
  void ShrinkSize(int bytes) {
229
16
    auto dest =
230
16
      dynamic_cast<test::StringSink*>(writer_.file()->writable_file());
231
16
    assert(dest);
232
16
    dest->Drop(bytes);
233
16
  }
234
235
10
  void FixChecksum(int header_offset, int len, bool recyclable) {
236
    // Compute crc of type/len/data
237
6
    int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
238
10
    uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
239
10
                                 header_size - 6 + len);
240
10
    crc = crc32c::Mask(crc);
241
10
    EncodeFixed32(&dest_contents()[header_offset], crc);
242
10
  }
243
244
6
  void ForceError(size_t position = 0) {
245
6
    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
246
6
    src->force_error_ = true;
247
6
    src->force_error_position_ = position;
248
6
  }
249
250
38
  size_t DroppedBytes() const {
251
38
    return report_.dropped_bytes_;
252
38
  }
253
254
10
  std::string ReportMessage() const {
255
10
    return report_.message_;
256
10
  }
257
258
4
  void ForceEOF(size_t position = 0) {
259
4
    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
260
4
    src->force_eof_ = true;
261
4
    src->force_eof_position_ = position;
262
4
  }
263
264
16
  void UnmarkEOF() {
265
16
    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
266
16
    src->returned_partial_ = false;
267
16
    reader_.UnmarkEOF();
268
16
  }
269
270
12
  bool IsEOF() {
271
12
    return reader_.IsEOF();
272
12
  }
273
274
  // Returns OK iff recorded error message contains "msg"
275
26
  std::string MatchError(const std::string& msg) const {
276
26
    if (report_.message_.find(msg) == std::string::npos) {
277
0
      return report_.message_;
278
26
    } else {
279
26
      return "OK";
280
26
    }
281
26
  }
282
283
26
  void WriteInitialOffsetLog() {
284
130
    for (int i = 0; i < 4; i++) {
285
104
      std::string record(initial_offset_record_sizes_[i],
286
104
                         static_cast<char>('a' + i));
287
104
      Write(record);
288
104
    }
289
26
  }
290
291
4
  void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
292
4
    WriteInitialOffsetLog();
293
4
    unique_ptr<SequentialFileReader> file_reader(
294
4
        test::GetSequentialFileReader(new StringSource(reader_contents_)));
295
4
    unique_ptr<Reader> offset_reader(
296
4
        new Reader(NULL, std::move(file_reader), &report_,
297
4
                   true /*checksum*/, WrittenBytes() + offset_past_end, 123));
298
4
    Slice record;
299
4
    std::string scratch;
300
4
    ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
301
4
  }
302
303
  void CheckInitialOffsetRecord(uint64_t initial_offset,
304
22
                                int expected_record_offset) {
305
22
    WriteInitialOffsetLog();
306
22
    unique_ptr<SequentialFileReader> file_reader(
307
22
        test::GetSequentialFileReader(new StringSource(reader_contents_)));
308
22
    unique_ptr<Reader> offset_reader(
309
22
        new Reader(NULL, std::move(file_reader), &report_,
310
22
                   true /*checksum*/, initial_offset, 123));
311
22
    Slice record;
312
22
    std::string scratch;
313
22
    ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
314
22
    ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
315
22
              record.size());
316
22
    ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
317
22
              offset_reader->LastRecordOffset());
318
22
    ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
319
22
  }
320
321
};
322
323
size_t LogTest::initial_offset_record_sizes_[] =
324
    {10000,  // Two sizable records in first block
325
     10000,
326
     2 * log::kBlockSize - 1000,  // Span three blocks
327
     1};
328
329
2
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
330
331
2
TEST_P(LogTest, ReadWrite) {
332
2
  Write("foo");
333
2
  Write("bar");
334
2
  Write("");
335
2
  Write("xxxx");
336
2
  ASSERT_EQ("foo", Read());
337
2
  ASSERT_EQ("bar", Read());
338
2
  ASSERT_EQ("", Read());
339
2
  ASSERT_EQ("xxxx", Read());
340
2
  ASSERT_EQ("EOF", Read());
341
2
  ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
342
2
}
343
344
2
TEST_P(LogTest, ManyBlocks) {
345
200k
  for (int i = 0; i < 100000; i++) {
346
200k
    Write(NumberString(i));
347
200k
  }
348
200k
  for (int i = 0; i < 100000; i++) {
349
200k
    ASSERT_EQ(NumberString(i), Read());
350
200k
  }
351
2
  ASSERT_EQ("EOF", Read());
352
2
}
353
354
2
TEST_P(LogTest, Fragmentation) {
355
2
  Write("small");
356
2
  Write(BigString("medium", 50000));
357
2
  Write(BigString("large", 100000));
358
2
  ASSERT_EQ("small", Read());
359
2
  ASSERT_EQ(BigString("medium", 50000), Read());
360
2
  ASSERT_EQ(BigString("large", 100000), Read());
361
2
  ASSERT_EQ("EOF", Read());
362
2
}
363
364
2
TEST_P(LogTest, MarginalTrailer) {
365
  // Make a trailer that is exactly the same length as an empty record.
366
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
367
2
  const int n = kBlockSize - 2 * header_size;
368
2
  Write(BigString("foo", n));
369
2
  ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
370
2
  Write("");
371
2
  Write("bar");
372
2
  ASSERT_EQ(BigString("foo", n), Read());
373
2
  ASSERT_EQ("", Read());
374
2
  ASSERT_EQ("bar", Read());
375
2
  ASSERT_EQ("EOF", Read());
376
2
}
377
378
2
TEST_P(LogTest, MarginalTrailer2) {
379
  // Make a trailer that is exactly the same length as an empty record.
380
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
381
2
  const int n = kBlockSize - 2 * header_size;
382
2
  Write(BigString("foo", n));
383
2
  ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
384
2
  Write("bar");
385
2
  ASSERT_EQ(BigString("foo", n), Read());
386
2
  ASSERT_EQ("bar", Read());
387
2
  ASSERT_EQ("EOF", Read());
388
2
  ASSERT_EQ(0U, DroppedBytes());
389
2
  ASSERT_EQ("", ReportMessage());
390
2
}
391
392
2
TEST_P(LogTest, ShortTrailer) {
393
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
394
2
  const int n = kBlockSize - 2 * header_size + 4;
395
2
  Write(BigString("foo", n));
396
2
  ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
397
2
  Write("");
398
2
  Write("bar");
399
2
  ASSERT_EQ(BigString("foo", n), Read());
400
2
  ASSERT_EQ("", Read());
401
2
  ASSERT_EQ("bar", Read());
402
2
  ASSERT_EQ("EOF", Read());
403
2
}
404
405
2
TEST_P(LogTest, AlignedEof) {
406
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
407
2
  const int n = kBlockSize - 2 * header_size + 4;
408
2
  Write(BigString("foo", n));
409
2
  ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
410
2
  ASSERT_EQ(BigString("foo", n), Read());
411
2
  ASSERT_EQ("EOF", Read());
412
2
}
413
414
2
TEST_P(LogTest, RandomRead) {
415
2
  const int N = 500;
416
2
  Random write_rnd(301);
417
1.00k
  for (int i = 0; i < N; i++) {
418
1.00k
    Write(RandomSkewedString(i, &write_rnd));
419
1.00k
  }
420
2
  Random read_rnd(301);
421
1.00k
  for (int i = 0; i < N; i++) {
422
1.00k
    ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
423
1.00k
  }
424
2
  ASSERT_EQ("EOF", Read());
425
2
}
426
427
// Tests of all the error paths in log_reader.cc follow:
428
429
2
TEST_P(LogTest, ReadError) {
430
2
  Write("foo");
431
2
  ForceError();
432
2
  ASSERT_EQ("EOF", Read());
433
2
  ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
434
2
  ASSERT_EQ("OK", MatchError("read error"));
435
2
}
436
437
2
TEST_P(LogTest, BadRecordType) {
438
2
  Write("foo");
439
  // Type is stored in header[6]
440
2
  IncrementByte(6, 100);
441
2
  FixChecksum(0, 3, false);
442
2
  ASSERT_EQ("EOF", Read());
443
2
  ASSERT_EQ(3U, DroppedBytes());
444
2
  ASSERT_EQ("OK", MatchError("unknown record type"));
445
2
}
446
447
2
TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
448
2
  Write("foo");
449
2
  ShrinkSize(4);   // Drop all payload as well as a header byte
450
2
  ASSERT_EQ("EOF", Read());
451
  // Truncated last record is ignored, not treated as an error
452
2
  ASSERT_EQ(0U, DroppedBytes());
453
2
  ASSERT_EQ("", ReportMessage());
454
2
}
455
456
2
TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
457
2
  Write("foo");
458
2
  ShrinkSize(4);  // Drop all payload as well as a header byte
459
2
  ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
460
  // Truncated last record is ignored, not treated as an error
461
2
  ASSERT_GT(DroppedBytes(), 0U);
462
2
  ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
463
2
}
464
465
2
TEST_P(LogTest, BadLength) {
466
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
467
2
  const int kPayloadSize = kBlockSize - header_size;
468
2
  Write(BigString("bar", kPayloadSize));
469
2
  Write("foo");
470
  // Least significant size byte is stored in header[4].
471
2
  IncrementByte(4, 1);
472
2
  ASSERT_EQ("foo", Read());
473
2
  ASSERT_EQ(kBlockSize, DroppedBytes());
474
2
  ASSERT_EQ("OK", MatchError("bad record length"));
475
2
}
476
477
2
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
478
2
  Write("foo");
479
2
  ShrinkSize(1);
480
2
  ASSERT_EQ("EOF", Read());
481
2
  ASSERT_EQ(0U, DroppedBytes());
482
2
  ASSERT_EQ("", ReportMessage());
483
2
}
484
485
2
TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
486
2
  Write("foo");
487
2
  ShrinkSize(1);
488
2
  ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
489
2
  ASSERT_GT(DroppedBytes(), 0U);
490
2
  ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
491
2
}
492
493
2
TEST_P(LogTest, ChecksumMismatch) {
494
2
  Write("foooooo");
495
2
  IncrementByte(0, 14);
496
2
  ASSERT_EQ("EOF", Read());
497
2
  ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes());
498
2
  ASSERT_EQ("OK", MatchError("checksum mismatch"));
499
2
}
500
501
2
TEST_P(LogTest, UnexpectedMiddleType) {
502
2
  Write("foo");
503
1
  SetByte(6, GetParam() ? kRecyclableMiddleType : kMiddleType);
504
2
  FixChecksum(0, 3, !!GetParam());
505
2
  ASSERT_EQ("EOF", Read());
506
2
  ASSERT_EQ(3U, DroppedBytes());
507
2
  ASSERT_EQ("OK", MatchError("missing start"));
508
2
}
509
510
2
TEST_P(LogTest, UnexpectedLastType) {
511
2
  Write("foo");
512
1
  SetByte(6, GetParam() ? kRecyclableLastType : kLastType);
513
2
  FixChecksum(0, 3, !!GetParam());
514
2
  ASSERT_EQ("EOF", Read());
515
2
  ASSERT_EQ(3U, DroppedBytes());
516
2
  ASSERT_EQ("OK", MatchError("missing start"));
517
2
}
518
519
2
TEST_P(LogTest, UnexpectedFullType) {
520
2
  Write("foo");
521
2
  Write("bar");
522
1
  SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
523
2
  FixChecksum(0, 3, !!GetParam());
524
2
  ASSERT_EQ("bar", Read());
525
2
  ASSERT_EQ("EOF", Read());
526
2
  ASSERT_EQ(3U, DroppedBytes());
527
2
  ASSERT_EQ("OK", MatchError("partial record without end"));
528
2
}
529
530
2
TEST_P(LogTest, UnexpectedFirstType) {
531
2
  Write("foo");
532
2
  Write(BigString("bar", 100000));
533
1
  SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
534
2
  FixChecksum(0, 3, !!GetParam());
535
2
  ASSERT_EQ(BigString("bar", 100000), Read());
536
2
  ASSERT_EQ("EOF", Read());
537
2
  ASSERT_EQ(3U, DroppedBytes());
538
2
  ASSERT_EQ("OK", MatchError("partial record without end"));
539
2
}
540
541
2
TEST_P(LogTest, MissingLastIsIgnored) {
542
2
  Write(BigString("bar", kBlockSize));
543
  // Remove the LAST block, including header.
544
2
  ShrinkSize(14);
545
2
  ASSERT_EQ("EOF", Read());
546
2
  ASSERT_EQ("", ReportMessage());
547
2
  ASSERT_EQ(0U, DroppedBytes());
548
2
}
549
550
2
TEST_P(LogTest, MissingLastIsNotIgnored) {
551
2
  Write(BigString("bar", kBlockSize));
552
  // Remove the LAST block, including header.
553
2
  ShrinkSize(14);
554
2
  ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
555
2
  ASSERT_GT(DroppedBytes(), 0U);
556
2
  ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
557
2
}
558
559
2
TEST_P(LogTest, PartialLastIsIgnored) {
560
2
  Write(BigString("bar", kBlockSize));
561
  // Cause a bad record length in the LAST block.
562
2
  ShrinkSize(1);
563
2
  ASSERT_EQ("EOF", Read());
564
2
  ASSERT_EQ("", ReportMessage());
565
2
  ASSERT_EQ(0U, DroppedBytes());
566
2
}
567
568
2
TEST_P(LogTest, PartialLastIsNotIgnored) {
569
2
  Write(BigString("bar", kBlockSize));
570
  // Cause a bad record length in the LAST block.
571
2
  ShrinkSize(1);
572
2
  ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
573
2
  ASSERT_GT(DroppedBytes(), 0U);
574
2
  ASSERT_EQ("OK", MatchError(
575
2
                      "Corruption: truncated headerCorruption: "
576
2
                      "error reading trailing data"));
577
2
}
578
579
2
TEST_P(LogTest, ErrorJoinsRecords) {
580
  // Consider two fragmented records:
581
  //    first(R1) last(R1) first(R2) last(R2)
582
  // where the middle two fragments disappear.  We do not want
583
  // first(R1),last(R2) to get joined and returned as a valid record.
584
585
  // Write records that span two blocks
586
2
  Write(BigString("foo", kBlockSize));
587
2
  Write(BigString("bar", kBlockSize));
588
2
  Write("correct");
589
590
  // Wipe the middle block
591
65.5k
  for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
592
65.5k
    SetByte(offset, 'x');
593
65.5k
  }
594
595
2
  ASSERT_EQ("correct", Read());
596
2
  ASSERT_EQ("EOF", Read());
597
2
  size_t dropped = DroppedBytes();
598
2
  ASSERT_LE(dropped, 2 * kBlockSize + 100);
599
2
  ASSERT_GE(dropped, 2 * kBlockSize);
600
2
}
601
602
2
TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
603
604
2
TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
605
606
2
TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
607
608
2
TEST_P(LogTest, ReadSecondStart) {
609
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
610
2
  CheckInitialOffsetRecord(10000 + header_size, 1);
611
2
}
612
613
2
TEST_P(LogTest, ReadThirdOneOff) {
614
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
615
2
  CheckInitialOffsetRecord(10000 + header_size + 1, 2);
616
2
}
617
618
2
TEST_P(LogTest, ReadThirdStart) {
619
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
620
2
  CheckInitialOffsetRecord(20000 + 2 * header_size, 2);
621
2
}
622
623
2
TEST_P(LogTest, ReadFourthOneOff) {
624
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
625
2
  CheckInitialOffsetRecord(20000 + 2 * header_size + 1, 3);
626
2
}
627
628
2
TEST_P(LogTest, ReadFourthFirstBlockTrailer) {
629
2
  CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
630
2
}
631
632
2
TEST_P(LogTest, ReadFourthMiddleBlock) {
633
2
  CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
634
2
}
635
636
2
TEST_P(LogTest, ReadFourthLastBlock) {
637
2
  CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
638
2
}
639
640
2
TEST_P(LogTest, ReadFourthStart) {
641
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
642
2
  CheckInitialOffsetRecord(
643
2
      2 * (header_size + 1000) + (2 * log::kBlockSize - 1000) + 3 * header_size,
644
2
      3);
645
2
}
646
647
2
TEST_P(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
648
649
2
TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
650
651
2
TEST_P(LogTest, ClearEofSingleBlock) {
652
2
  Write("foo");
653
2
  Write("bar");
654
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
655
2
  ForceEOF(3 + header_size + 2);
656
2
  ASSERT_EQ("foo", Read());
657
2
  UnmarkEOF();
658
2
  ASSERT_EQ("bar", Read());
659
2
  ASSERT_TRUE(IsEOF());
660
2
  ASSERT_EQ("EOF", Read());
661
2
  Write("xxx");
662
2
  UnmarkEOF();
663
2
  ASSERT_EQ("xxx", Read());
664
2
  ASSERT_TRUE(IsEOF());
665
2
}
666
667
2
TEST_P(LogTest, ClearEofMultiBlock) {
668
2
  size_t num_full_blocks = 5;
669
1
  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
670
2
  size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
671
2
  Write(BigString("foo", n));
672
2
  Write(BigString("bar", n));
673
2
  ForceEOF(n + num_full_blocks * header_size + header_size + 3);
674
2
  ASSERT_EQ(BigString("foo", n), Read());
675
2
  ASSERT_TRUE(IsEOF());
676
2
  UnmarkEOF();
677
2
  ASSERT_EQ(BigString("bar", n), Read());
678
2
  ASSERT_TRUE(IsEOF());
679
2
  Write(BigString("xxx", n));
680
2
  UnmarkEOF();
681
2
  ASSERT_EQ(BigString("xxx", n), Read());
682
2
  ASSERT_TRUE(IsEOF());
683
2
}
684
685
2
TEST_P(LogTest, ClearEofError) {
686
  // If an error occurs during Read() in UnmarkEOF(), the records contained
687
  // in the buffer should be returned on subsequent calls of ReadRecord()
688
  // until no more full records are left, whereafter ReadRecord() should return
689
  // false to indicate that it cannot read any further.
690
691
2
  Write("foo");
692
2
  Write("bar");
693
2
  UnmarkEOF();
694
2
  ASSERT_EQ("foo", Read());
695
2
  ASSERT_TRUE(IsEOF());
696
2
  Write("xxx");
697
2
  ForceError(0);
698
2
  UnmarkEOF();
699
2
  ASSERT_EQ("bar", Read());
700
2
  ASSERT_EQ("EOF", Read());
701
2
}
702
703
2
TEST_P(LogTest, ClearEofError2) {
704
2
  Write("foo");
705
2
  Write("bar");
706
2
  UnmarkEOF();
707
2
  ASSERT_EQ("foo", Read());
708
2
  Write("xxx");
709
2
  ForceError(3);
710
2
  UnmarkEOF();
711
2
  ASSERT_EQ("bar", Read());
712
2
  ASSERT_EQ("EOF", Read());
713
2
  ASSERT_EQ(3U, DroppedBytes());
714
2
  ASSERT_EQ("OK", MatchError("read error"));
715
2
}
716
717
INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
718
719
}  // namespace log
720
}  // namespace rocksdb
721
722
13.2k
int main(int argc, char** argv) {
723
13.2k
  ::testing::InitGoogleTest(&argc, argv);
724
13.2k
  return RUN_ALL_TESTS();
725
13.2k
}