YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/log_reader.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/log_reader.h"
25
26
#include <stdio.h>
27
#include "yb/rocksdb/env.h"
28
#include "yb/rocksdb/util/coding.h"
29
#include "yb/rocksdb/util/crc32c.h"
30
#include "yb/rocksdb/util/file_reader_writer.h"
31
32
#include "yb/gutil/macros.h"
33
34
namespace rocksdb {
35
namespace log {
36
37
449k
Reader::Reporter::~Reporter() {
38
449k
}
39
40
Reader::Reader(std::shared_ptr<Logger> info_log,
41
               unique_ptr<SequentialFileReader>&& _file,
42
               Reporter* reporter, bool checksum, uint64_t initial_offset,
43
               uint64_t log_num)
44
    : info_log_(info_log),
45
      file_(std::move(_file)),
46
      reporter_(reporter),
47
      checksum_(checksum),
48
      backing_store_(new uint8_t[kBlockSize]),
49
      buffer_(),
50
      eof_(false),
51
      read_error_(false),
52
      eof_offset_(0),
53
      last_record_offset_(0),
54
      end_of_buffer_offset_(0),
55
      initial_offset_(initial_offset),
56
449k
      log_number_(log_num) {}
57
58
449k
Reader::~Reader() {
59
449k
  delete[] backing_store_;
60
449k
}
61
62
24
bool Reader::SkipToInitialBlock() {
63
24
  size_t initial_offset_in_block = initial_offset_ % kBlockSize;
64
24
  uint64_t block_start_location = initial_offset_ - initial_offset_in_block;
65
66
  // Don't search a block if we'd be in the trailer
67
24
  if (initial_offset_in_block > kBlockSize - 6) {
68
2
    block_start_location += kBlockSize;
69
2
  }
70
71
24
  end_of_buffer_offset_ = block_start_location;
72
73
  // Skip to start of first block that can contain the initial record
74
24
  if (block_start_location > 0) {
75
12
    Status skip_status = file_->Skip(block_start_location);
76
12
    if (!skip_status.ok()) {
77
0
      ReportDrop(static_cast<size_t>(block_start_location), skip_status);
78
0
      return false;
79
0
    }
80
12
  }
81
82
24
  return true;
83
24
}
84
85
// For kAbsoluteConsistency, on clean shutdown we don't expect any error
86
// in the log files.  For other modes, we can ignore only incomplete records
87
// in the last log file, which are presumably due to a write in progress
88
// during restart (or from log recycling).
89
//
90
// TODO krad: Evaluate if we need to move to a more strict mode where we
91
// restrict the inconsistency to only the last log
92
bool Reader::ReadRecord(Slice* record, std::string* scratch,
93
4.09M
                        WALRecoveryMode wal_recovery_mode) {
94
4.09M
  if (last_record_offset_ < initial_offset_) {
95
24
    if (!SkipToInitialBlock()) {
96
0
      return false;
97
0
    }
98
24
  }
99
100
4.09M
  scratch->clear();
101
4.09M
  record->clear();
102
4.09M
  bool in_fragmented_record = false;
103
  // Record offset of the logical record that we're reading
104
  // 0 is a dummy value to make compilers happy
105
4.09M
  uint64_t prospective_record_offset = 0;
106
107
4.09M
  Slice fragment;
108
4.10M
  while (true) {
109
4.10M
    uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
110
4.10M
    size_t drop_size = 0;
111
4.10M
    const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
112
113
4.10M
    switch (record_type) {
114
3.52M
      case kFullType:
115
3.63M
      case kRecyclableFullType:
116
3.63M
        if (in_fragmented_record && 
!scratch->empty()2
) {
117
          // Handle bug in earlier versions of log::Writer where
118
          // it could emit an empty kFirstType record at the tail end
119
          // of a block followed by a kFullType or kFirstType record
120
          // at the beginning of the next block.
121
2
          ReportCorruption(scratch->size(), "partial record without end(1)");
122
2
        }
123
3.63M
        prospective_record_offset = physical_record_offset;
124
3.63M
        scratch->clear();
125
3.63M
        *record = fragment;
126
3.63M
        last_record_offset_ = prospective_record_offset;
127
3.63M
        return true;
128
129
8.84k
      case kFirstType:
130
9.02k
      case kRecyclableFirstType:
131
9.02k
        if (in_fragmented_record && 
!scratch->empty()2
) {
132
          // Handle bug in earlier versions of log::Writer where
133
          // it could emit an empty kFirstType record at the tail end
134
          // of a block followed by a kFullType or kFirstType record
135
          // at the beginning of the next block.
136
2
          ReportCorruption(scratch->size(), "partial record without end(2)");
137
2
        }
138
9.02k
        prospective_record_offset = physical_record_offset;
139
9.02k
        scratch->assign(fragment.cdata(), fragment.size());
140
9.02k
        in_fragmented_record = true;
141
9.02k
        break;
142
143
5.83k
      case kMiddleType:
144
6.06k
      case kRecyclableMiddleType:
145
6.06k
        if (!in_fragmented_record) {
146
6
          ReportCorruption(fragment.size(),
147
6
                           "missing start of fragmented record(1)");
148
6.05k
        } else {
149
6.05k
          scratch->append(fragment.cdata(), fragment.size());
150
6.05k
        }
151
6.06k
        break;
152
153
8.96k
      case kLastType:
154
9.14k
      case kRecyclableLastType:
155
9.14k
        if (!in_fragmented_record) {
156
134
          ReportCorruption(fragment.size(),
157
134
                           "missing start of fragmented record(2)");
158
9.00k
        } else {
159
9.00k
          scratch->append(fragment.cdata(), fragment.size());
160
9.00k
          *record = Slice(*scratch);
161
9.00k
          last_record_offset_ = prospective_record_offset;
162
9.00k
          return true;
163
9.00k
        }
164
134
        break;
165
166
187
      case kBadHeader:
167
187
        if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
168
          // in clean shutdown we don't expect any error in the log files
169
37
          ReportCorruption(drop_size, "truncated header");
170
37
        }
171
187
        FALLTHROUGH_INTENDED;
172
173
448k
      case kEof:
174
448k
        if (in_fragmented_record) {
175
8
          if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
176
            // in clean shutdown we don't expect any error in the log files
177
4
            ReportCorruption(scratch->size(), "error reading trailing data");
178
4
          }
179
          // This can be caused by the writer dying immediately after
180
          //  writing a physical record but before completing the next; don't
181
          //  treat it as a corruption, just ignore the entire logical record.
182
8
          scratch->clear();
183
8
        }
184
448k
        return false;
185
186
0
      case kOldRecord:
187
0
        if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
188
          // Treat a record from a previous instance of the log as EOF.
189
0
          if (in_fragmented_record) {
190
0
            if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
191
              // in clean shutdown we don't expect any error in the log files
192
0
              ReportCorruption(scratch->size(), "error reading trailing data");
193
0
            }
194
            // This can be caused by the writer dying immediately after
195
            //  writing a physical record but before completing the next; don't
196
            //  treat it as a corruption, just ignore the entire logical record.
197
0
            scratch->clear();
198
0
          }
199
0
          return false;
200
0
        }
201
0
        FALLTHROUGH_INTENDED;
202
203
250
      case kBadRecord:
204
250
        if (in_fragmented_record) {
205
2
          ReportCorruption(scratch->size(), "error in middle of record");
206
2
          in_fragmented_record = false;
207
2
          scratch->clear();
208
2
        }
209
250
        break;
210
211
2
      default: {
212
2
        char buf[40];
213
2
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
214
2
        ReportCorruption(
215
2
            (fragment.size() + (in_fragmented_record ? 
scratch->size()0
: 0)),
216
2
            buf);
217
2
        in_fragmented_record = false;
218
2
        scratch->clear();
219
2
        break;
220
0
      }
221
4.10M
    }
222
4.10M
  }
223
445
  return false;
224
4.09M
}
225
226
22
uint64_t Reader::LastRecordOffset() {
227
22
  return last_record_offset_;
228
22
}
229
230
2.72k
void Reader::UnmarkEOF() {
231
2.72k
  if (read_error_) {
232
0
    return;
233
0
  }
234
235
2.72k
  eof_ = false;
236
237
2.72k
  if (eof_offset_ == 0) {
238
4
    return;
239
4
  }
240
241
  // If the EOF was in the middle of a block (a partial block was read) we have
242
  // to read the rest of the block as ReadPhysicalRecord can only read full
243
  // blocks and expects the file position indicator to be aligned to the start
244
  // of a block.
245
  //
246
  //      consumed_bytes + buffer_size() + remaining == kBlockSize
247
248
2.71k
  size_t consumed_bytes = eof_offset_ - buffer_.size();
249
2.71k
  size_t remaining = kBlockSize - eof_offset_;
250
251
  // backing_store_ is used to concatenate what is left in buffer_ and
252
  // the remainder of the block. If buffer_ already uses backing_store_,
253
  // we just append the new data.
254
2.71k
  if (buffer_.data() != backing_store_ + consumed_bytes) {
255
    // Buffer_ does not use backing_store_ for storage.
256
    // Copy what is left in buffer_ to backing_store.
257
7
    memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
258
7
  }
259
260
2.71k
  Slice read_buffer;
261
2.71k
  Status status = file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_);
262
263
2.71k
  size_t added = read_buffer.size();
264
2.71k
  end_of_buffer_offset_ += added;
265
266
2.71k
  if (!status.ok()) {
267
4
    if (added > 0) {
268
2
      ReportDrop(added, status);
269
2
    }
270
271
4
    read_error_ = true;
272
4
    return;
273
4
  }
274
275
2.71k
  if (read_buffer.data() != backing_store_ + eof_offset_) {
276
    // Read did not write to backing_store_
277
2
    memmove(backing_store_ + eof_offset_, read_buffer.data(),
278
2
      read_buffer.size());
279
2
  }
280
281
2.71k
  buffer_ = Slice(backing_store_ + consumed_bytes,
282
2.71k
    eof_offset_ + added - consumed_bytes);
283
284
2.71k
  if (added < remaining) {
285
2.71k
    eof_ = true;
286
2.71k
    eof_offset_ += added;
287
2.71k
  } else {
288
4
    eof_offset_ = 0;
289
4
  }
290
2.71k
}
291
292
359
void Reader::ReportCorruption(size_t bytes, const char* reason) {
293
359
  ReportDrop(bytes, STATUS(Corruption, reason));
294
359
}
295
296
363
void Reader::ReportDrop(size_t bytes, const Status& reason) {
297
363
  if (reporter_ != nullptr &&
298
363
      end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
299
363
    reporter_->Corruption(bytes, reason);
300
363
  }
301
363
}
302
303
912k
bool Reader::ReadMore(size_t* drop_size, int *error) {
304
912k
  if (!eof_ && 
!read_error_464k
) {
305
    // Last read was a full read, so this is a trailer to skip
306
464k
    buffer_.clear();
307
464k
    Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
308
464k
    end_of_buffer_offset_ += buffer_.size();
309
464k
    if (!status.ok()) {
310
2
      buffer_.clear();
311
2
      ReportDrop(kBlockSize, status);
312
2
      read_error_ = true;
313
2
      *error = kEof;
314
2
      return false;
315
464k
    } else if (buffer_.size() < (size_t)kBlockSize) {
316
449k
      eof_ = true;
317
449k
      eof_offset_ = buffer_.size();
318
449k
    }
319
464k
    return true;
320
464k
  } else {
321
    // Note that if buffer_ is non-empty, we have a truncated header at the
322
    //  end of the file, which can be caused by the writer crashing in the
323
    //  middle of writing the header. Unless explicitly requested we don't
324
    //  considering this an error, just report EOF.
325
447k
    if (buffer_.size()) {
326
15
      *drop_size = buffer_.size();
327
15
      buffer_.clear();
328
15
      *error = kBadHeader;
329
15
      return false;
330
15
    }
331
447k
    buffer_.clear();
332
447k
    *error = kEof;
333
447k
    return false;
334
447k
  }
335
912k
}
336
337
4.10M
unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
338
4.56M
  while (
true4.56M
) {
339
    // We need at least the minimum header size
340
4.56M
    if (buffer_.size() < (size_t)kHeaderSize) {
341
912k
      int r = 0;
342
912k
      if (!ReadMore(drop_size, &r)) {
343
448k
        return r;
344
448k
      }
345
463k
      continue;
346
912k
    }
347
348
    // Parse the header
349
3.65M
    const char* header = buffer_.cdata();
350
3.65M
    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
351
3.65M
    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
352
3.65M
    const unsigned int type = header[6];
353
3.65M
    const uint32_t length = a | (b << 8);
354
3.65M
    int header_size = kHeaderSize;
355
3.65M
    if (type >= kRecyclableFullType && 
type <= kRecyclableLastType111k
) {
356
110k
      header_size = kRecyclableHeaderSize;
357
      // We need enough for the larger header
358
110k
      if (buffer_.size() < (size_t)kRecyclableHeaderSize) {
359
4
        int r;
360
4
        if (!ReadMore(drop_size, &r)) {
361
4
          return r;
362
4
        }
363
0
        continue;
364
4
      }
365
110k
      const uint32_t log_num = DecodeFixed32(header + 7);
366
110k
      if (log_num != log_number_) {
367
0
        return kOldRecord;
368
0
      }
369
110k
    }
370
3.65M
    if (header_size + length > buffer_.size()) {
371
182
      *drop_size = buffer_.size();
372
182
      buffer_.clear();
373
182
      if (!eof_) {
374
10
        ReportCorruption(*drop_size, "bad record length");
375
10
        return kBadRecord;
376
10
      }
377
      // If the end of the file has been reached without reading |length| bytes
378
      // of payload, assume the writer died in the middle of writing the record.
379
      // Don't report a corruption unless requested.
380
172
      if (*drop_size) {
381
172
        return kBadHeader;
382
172
      }
383
0
      return kEof;
384
172
    }
385
386
3.65M
    if (type == kZeroType && 
length == 046
) {
387
      // Skip zero length record without reporting any drops since
388
      // such records are produced by the mmap based writing code in
389
      // env_posix.cc that preallocates file regions.
390
      // NOTE: this should never happen in DB written by new RocksDB versions,
391
      // since we turn off mmap writes to manifest and log files
392
46
      buffer_.clear();
393
46
      return kBadRecord;
394
46
    }
395
396
    // Check crc
397
3.65M
    
if (3.65M
checksum_3.65M
) {
398
3.65M
      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
399
3.65M
      uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
400
3.65M
      if (actual_crc != expected_crc) {
401
        // Drop the rest of the buffer since "length" itself may have
402
        // been corrupted and if we trust it, we could find some
403
        // fragment of a real log record that just happens to look
404
        // like a valid log record.
405
160
        *drop_size = buffer_.size();
406
160
        buffer_.clear();
407
160
        ReportCorruption(*drop_size, "checksum mismatch");
408
160
        return kBadRecord;
409
160
      }
410
3.65M
    }
411
412
3.65M
    buffer_.remove_prefix(header_size + length);
413
414
    // Skip physical record that started before initial_offset_
415
3.65M
    if (end_of_buffer_offset_ - buffer_.size() - header_size - length <
416
3.65M
        initial_offset_) {
417
34
      result->clear();
418
34
      return kBadRecord;
419
34
    }
420
421
3.65M
    *result = Slice(header + header_size, length);
422
3.65M
    return type;
423
3.65M
  }
424
4.10M
}
425
426
}  // namespace log
427
}  // namespace rocksdb