YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log_util.h"
34
35
#include <algorithm>
36
#include <array>
37
#include <limits>
38
#include <utility>
39
40
#include <glog/logging.h>
41
42
#include "yb/common/hybrid_time.h"
43
44
#include "yb/consensus/opid_util.h"
45
46
#include "yb/fs/fs_manager.h"
47
48
#include "yb/gutil/casts.h"
49
#include "yb/gutil/strings/split.h"
50
#include "yb/gutil/strings/util.h"
51
52
#include "yb/util/coding-inl.h"
53
#include "yb/util/coding.h"
54
#include "yb/util/crc.h"
55
#include "yb/util/debug/trace_event.h"
56
#include "yb/util/env_util.h"
57
#include "yb/util/flag_tags.h"
58
#include "yb/util/pb_util.h"
59
#include "yb/util/result.h"
60
#include "yb/util/size_literals.h"
61
#include "yb/util/status_format.h"
62
#include "yb/util/status_log.h"
63
64
DEFINE_int32(log_segment_size_mb, 64,
65
             "The default segment size for log roll-overs, in MB");
66
TAG_FLAG(log_segment_size_mb, advanced);
67
68
DEFINE_uint64(log_segment_size_bytes, 0,
69
             "The default segment size for log roll-overs, in bytes. "
70
             "If 0 then log_segment_size_mb is used.");
71
72
DEFINE_uint64(initial_log_segment_size_bytes, 1024 * 1024,
73
              "The maximum segment size we want for a new WAL segment, in bytes. "
74
              "This value keeps doubling (for each subsequent WAL segment) till it gets to the "
75
              "maximum configured segment size (log_segment_size_bytes or log_segment_size_mb).");
76
77
DEFINE_bool(durable_wal_write, false,
78
            "Whether the Log/WAL should explicitly call fsync() after each write.");
79
TAG_FLAG(durable_wal_write, stable);
80
81
DEFINE_int32(interval_durable_wal_write_ms, 1000,
82
            "Interval in ms after which the Log/WAL should explicitly call fsync(). "
83
            "If 0 fsysnc() is not called.");
84
TAG_FLAG(interval_durable_wal_write_ms, stable);
85
86
DEFINE_int32(bytes_durable_wal_write_mb, 1,
87
             "Amount of data in MB after which the Log/WAL should explicitly call fsync(). "
88
             "If 0 fsysnc() is not called.");
89
TAG_FLAG(bytes_durable_wal_write_mb, stable);
90
91
DEFINE_bool(log_preallocate_segments, true,
92
            "Whether the WAL should preallocate the entire segment before writing to it");
93
TAG_FLAG(log_preallocate_segments, advanced);
94
95
DEFINE_bool(log_async_preallocate_segments, true,
96
            "Whether the WAL segments preallocation should happen asynchronously");
97
TAG_FLAG(log_async_preallocate_segments, advanced);
98
99
DECLARE_string(fs_data_dirs);
100
101
DEFINE_bool(require_durable_wal_write, false, "Whether durable WAL write is required."
102
    "In case you cannot write using O_DIRECT in WAL and data directories and this flag is set true"
103
    "the system will deliberately crash with the appropriate error. If this flag is set false, "
104
    "the system will soft downgrade the durable_wal_write flag.");
105
TAG_FLAG(require_durable_wal_write, stable);
106
107
namespace yb {
108
namespace log {
109
110
using env_util::ReadFully;
111
using std::vector;
112
using std::shared_ptr;
113
using strings::Substitute;
114
using strings::SubstituteAndAppend;
115
116
const char kTmpSuffix[] = ".tmp";
117
118
const char kLogSegmentHeaderMagicString[] = "yugalogf";
119
120
// A magic that is written as the very last thing when a segment is closed.
121
// Segments that were not closed (usually the last one being written) will not
122
// have this magic.
123
const char kLogSegmentFooterMagicString[] = "closedls";
124
125
// Header is prefixed with the header magic (8 bytes) and the header length (4 bytes).
126
const size_t kLogSegmentHeaderMagicAndHeaderLength = 12;
127
128
// Footer is suffixed with the footer magic (8 bytes) and the footer length (4 bytes).
129
const size_t kLogSegmentFooterMagicAndFooterLength  = 12;
130
131
const size_t kEntryHeaderSize = 12;
132
133
const int kLogMajorVersion = 1;
134
const int kLogMinorVersion = 0;
135
136
// Maximum log segment header/footer size, in bytes (8 MB).
137
const uint32_t kLogSegmentMaxHeaderOrFooterSize = 8 * 1024 * 1024;
138
139
LogOptions::LogOptions()
140
    : segment_size_bytes(FLAGS_log_segment_size_bytes == 0 ? FLAGS_log_segment_size_mb * 1_MB
141
                                                           : FLAGS_log_segment_size_bytes),
142
      initial_segment_size_bytes(FLAGS_initial_log_segment_size_bytes),
143
      durable_wal_write(FLAGS_durable_wal_write),
144
      interval_durable_wal_write(FLAGS_interval_durable_wal_write_ms > 0 ?
145
                                     MonoDelta::FromMilliseconds(
146
                                         FLAGS_interval_durable_wal_write_ms) : MonoDelta()),
147
      bytes_durable_wal_write_mb(FLAGS_bytes_durable_wal_write_mb),
148
      preallocate_segments(FLAGS_log_preallocate_segments),
149
      async_preallocate_segments(FLAGS_log_async_preallocate_segments),
150
150k
      env(Env::Default()) {
151
150k
}
152
153
Result<scoped_refptr<ReadableLogSegment>> ReadableLogSegment::Open(
154
10.6k
    Env* env, const std::string& path) {
155
10.6k
  VLOG
(1) << "Parsing wal segment: " << path1
;
156
10.6k
  shared_ptr<RandomAccessFile> readable_file;
157
10.6k
  RETURN_NOT_OK_PREPEND(env_util::OpenFileForRandom(env, path, &readable_file),
158
10.6k
                        "Unable to open file for reading");
159
160
10.6k
  auto segment = make_scoped_refptr<ReadableLogSegment>(path, readable_file);
161
10.6k
  if (!VERIFY_RESULT_PREPEND(segment->Init(), "Unable to initialize segment")) {
162
1
    return nullptr;
163
1
  }
164
10.6k
  return segment;
165
10.6k
}
166
167
ReadableLogSegment::ReadableLogSegment(
168
    std::string path, shared_ptr<RandomAccessFile> readable_file)
169
    : path_(std::move(path)),
170
      file_size_(0),
171
      readable_to_offset_(0),
172
      readable_file_(std::move(readable_file)),
173
      is_initialized_(false),
174
255k
      footer_was_rebuilt_(false) {
175
255k
  CHECK_OK(env_util::OpenFileForRandom(Env::Default(), path_, &readable_file_checkpoint_));
176
255k
}
177
178
Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header,
179
                                const LogSegmentFooterPB& footer,
180
84.6k
                                int64_t first_entry_offset) {
181
84.6k
  DCHECK
(!IsInitialized()) << "Can only call Init() once"444
;
182
18.4E
  DCHECK(header.IsInitialized()) << "Log segment header must be initialized";
183
18.4E
  DCHECK(footer.IsInitialized()) << "Log segment footer must be initialized";
184
185
84.6k
  RETURN_NOT_OK(ReadFileSize());
186
187
84.6k
  header_.CopyFrom(header);
188
84.6k
  footer_.CopyFrom(footer);
189
84.6k
  first_entry_offset_ = first_entry_offset;
190
84.6k
  is_initialized_ = true;
191
84.6k
  readable_to_offset_.Store(file_size());
192
193
84.6k
  return Status::OK();
194
84.6k
}
195
196
Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header,
197
159k
                                int64_t first_entry_offset) {
198
159k
  DCHECK
(!IsInitialized()) << "Can only call Init() once"661
;
199
18.4E
  DCHECK(header.IsInitialized()) << "Log segment header must be initialized";
200
201
159k
  RETURN_NOT_OK(ReadFileSize());
202
203
159k
  header_.CopyFrom(header);
204
159k
  first_entry_offset_ = first_entry_offset;
205
159k
  is_initialized_ = true;
206
207
  // On a new segment, we don't expect any readable entries yet.
208
159k
  readable_to_offset_.Store(first_entry_offset);
209
210
159k
  return Status::OK();
211
159k
}
212
213
10.5k
Result<bool> ReadableLogSegment::Init() {
214
10.5k
  DCHECK
(!IsInitialized()) << "Can only call Init() once"12
;
215
216
10.5k
  RETURN_NOT_OK(ReadFileSize());
217
218
10.5k
  if (!VERIFY_RESULT(ReadHeader())) {
219
1
    return false;
220
1
  }
221
222
10.5k
  Status s = ReadFooter();
223
10.5k
  if (!s.ok()) {
224
3.74k
    LOG(WARNING) << "Could not read footer for segment: " << path_
225
3.74k
        << ": " << s.ToString();
226
3.74k
  }
227
228
10.5k
  is_initialized_ = true;
229
230
10.5k
  readable_to_offset_.Store(file_size());
231
232
10.5k
  return true;
233
10.5k
}
234
235
3.57M
int64_t ReadableLogSegment::readable_up_to() const {
236
3.57M
  return readable_to_offset_.Load();
237
3.57M
}
238
239
25.1M
void ReadableLogSegment::UpdateReadableToOffset(int64_t readable_to_offset) {
240
25.1M
  readable_to_offset_.Store(readable_to_offset);
241
25.1M
  file_size_.StoreMax(readable_to_offset);
242
25.1M
}
243
244
3.75k
Status ReadableLogSegment::RebuildFooterByScanning() {
245
3.75k
  TRACE_EVENT1("log", "ReadableLogSegment::RebuildFooterByScanning",
246
3.75k
               "path", path_);
247
248
3.75k
  DCHECK(!footer_.IsInitialized());
249
3.75k
  auto read_entries = ReadEntries();
250
3.75k
  RETURN_NOT_OK(read_entries.status);
251
252
3.75k
  footer_.set_num_entries(read_entries.entries.size());
253
254
3.75k
  uint64_t latest_ht = 0;
255
  // Rebuild the min/max replicate index (by scanning)
256
710k
  for (const auto& entry : read_entries.entries) {
257
710k
    if (
entry->has_replicate()710k
) {
258
710k
      int64_t index = entry->replicate().id().index();
259
      // TODO: common code with Log::UpdateFooterForBatch
260
710k
      if (!footer_.has_min_replicate_index() ||
261
710k
          
index < footer_.min_replicate_index()706k
) {
262
3.82k
        footer_.set_min_replicate_index(index);
263
3.82k
      }
264
710k
      if (!footer_.has_max_replicate_index() ||
265
710k
          
index > footer_.max_replicate_index()706k
) {
266
701k
        footer_.set_max_replicate_index(index);
267
701k
      }
268
710k
      latest_ht = std::max(latest_ht, entry->replicate().hybrid_time());
269
710k
    }
270
710k
  }
271
272
3.75k
  DCHECK(footer_.IsInitialized());
273
3.75k
  DCHECK_EQ(read_entries.entries.size(), footer_.num_entries());
274
3.75k
  footer_was_rebuilt_ = true;
275
276
3.75k
  if (latest_ht > 0) {
277
3.74k
    footer_.set_close_timestamp_micros(yb::HybridTime(latest_ht).GetPhysicalValueMicros());
278
3.74k
  }
279
280
3.75k
  readable_to_offset_.Store(read_entries.end_offset);
281
282
3.75k
  LOG(INFO) << "Successfully rebuilt footer for segment: " << path_
283
3.75k
            << " (valid entries through byte offset " << read_entries.end_offset << ")";
284
3.75k
  return Status::OK();
285
3.75k
}
286
287
255k
Status ReadableLogSegment::ReadFileSize() {
288
  // Check the size of the file.
289
  // Env uses uint here, even though we generally prefer signed ints to avoid
290
  // underflow bugs. Use a local to convert.
291
255k
  uint64_t size = VERIFY_RESULT_PREPEND(readable_file_->Size(), "Unable to read file size");
292
0
  file_size_.Store(size);
293
255k
  if (size == 0) {
294
3
    VLOG
(1) << "Log segment file $0 is zero-length: " << path()0
;
295
3
    return Status::OK();
296
3
  }
297
255k
  return Status::OK();
298
255k
}
299
300
10.5k
Result<bool> ReadableLogSegment::ReadHeader() {
301
10.5k
  uint32_t header_size;
302
10.5k
  RETURN_NOT_OK(ReadHeaderMagicAndHeaderLength(&header_size));
303
10.5k
  if (header_size == 0) {
304
    // If a log file has been pre-allocated but not initialized, then
305
    // 'header_size' will be 0 even the file size is > 0; in this
306
    // case, 'is_initialized_' remains set to false and return
307
    // Status::OK() early. LogReader ignores segments where
308
    // IsInitialized() returns false.
309
1
    return false;
310
1
  }
311
312
10.5k
  if (header_size > kLogSegmentMaxHeaderOrFooterSize) {
313
0
    return STATUS(Corruption,
314
0
        Substitute("File is corrupted. "
315
0
                   "Parsed header size: $0 is zero or bigger than max header size: $1",
316
0
                   header_size, kLogSegmentMaxHeaderOrFooterSize));
317
0
  }
318
319
10.5k
  std::vector<uint8_t> header_space(header_size);
320
10.5k
  Slice header_slice;
321
10.5k
  LogSegmentHeaderPB header;
322
323
  // Read and parse the log segment header.
324
10.5k
  RETURN_NOT_OK_PREPEND(ReadFully(readable_file_.get(), kLogSegmentHeaderMagicAndHeaderLength,
325
10.5k
                                  header_size, &header_slice, header_space.data()),
326
10.5k
                                      "Unable to read fully");
327
328
10.5k
  RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&header,
329
10.5k
                                                header_slice.data(),
330
10.5k
                                                header_size),
331
10.5k
                        "Unable to parse protobuf");
332
10.5k
  DCHECK
(header.IsInitialized()) << "Log segment header must be initialized"13
;
333
334
10.5k
  header_.CopyFrom(header);
335
10.5k
  first_entry_offset_ = header_size + kLogSegmentHeaderMagicAndHeaderLength;
336
337
10.5k
  return true;
338
10.5k
}
339
340
341
10.5k
Status ReadableLogSegment::ReadHeaderMagicAndHeaderLength(uint32_t *len) {
342
10.5k
  uint8_t scratch[kLogSegmentHeaderMagicAndHeaderLength];
343
10.5k
  Slice slice;
344
10.5k
  RETURN_NOT_OK(ReadFully(readable_file_.get(), 0, kLogSegmentHeaderMagicAndHeaderLength,
345
10.5k
                          &slice, scratch));
346
10.5k
  RETURN_NOT_OK(ParseHeaderMagicAndHeaderLength(slice, len));
347
10.5k
  return Status::OK();
348
10.5k
}
349
350
namespace {
351
352
// We don't run TSAN on this function because it makes it really slow and causes some
353
// test timeouts. This is only used on local buffers anyway, so we don't lose much
354
// by not checking it.
355
ATTRIBUTE_NO_SANITIZE_THREAD
356
26.3k
bool IsAllZeros(const Slice& s) {
357
  // Walk a pointer through the slice instead of using s[i]
358
  // since this is way faster in debug mode builds. We also do some
359
  // manual unrolling for the same purpose.
360
26.3k
  const uint8_t* p = s.data();
361
26.3k
  size_t rem = s.size();
362
363
3.41G
  while (rem >= 8) {
364
3.41G
    if (UNALIGNED_LOAD64(p) != 0) 
return false0
;
365
3.41G
    rem -= 8;
366
3.41G
    p += 8;
367
3.41G
  }
368
369
30.7k
  
while (26.3k
rem > 0) {
370
4.38k
    if (*p++ != '\0') 
return false0
;
371
4.38k
    rem--;
372
4.38k
  }
373
26.3k
  return true;
374
26.3k
}
375
} // anonymous namespace
376
377
Status ReadableLogSegment::ParseHeaderMagicAndHeaderLength(const Slice &data,
378
10.5k
                                                           uint32_t *parsed_len) {
379
10.5k
  RETURN_NOT_OK_PREPEND(data.check_size(kLogSegmentHeaderMagicAndHeaderLength),
380
10.5k
                        "Log segment file is too small to contain initial magic number");
381
382
10.5k
  if (memcmp(kLogSegmentHeaderMagicString, data.data(),
383
10.5k
             strlen(kLogSegmentHeaderMagicString)) != 0) {
384
    // As a special case, we check whether the file was allocated but no header
385
    // was written. We treat that case as an uninitialized file, much in the
386
    // same way we treat zero-length files.
387
    // Note: While the above comparison checks 8 bytes, this one checks the full 12
388
    // to ensure we have a full 12 bytes of NULL data.
389
1
    if (IsAllZeros(data)) {
390
      // 12 bytes of NULLs, good enough for us to consider this a file that
391
      // was never written to (but apparently preallocated).
392
1
      LOG(WARNING) << "Log segment file " << path() << " has 12 initial NULL bytes instead of "
393
1
                   << "magic and header length: " << data.ToDebugString()
394
1
                   << " and will be treated as a blank segment.";
395
1
      *parsed_len = 0;
396
1
      return Status::OK();
397
1
    }
398
    // If no magic and not uninitialized, the file is considered corrupt.
399
0
    return STATUS(Corruption, Substitute("Invalid log segment file $0: Bad magic. $1",
400
1
                                         path(), data.ToDebugString()));
401
1
  }
402
403
10.5k
  *parsed_len = DecodeFixed32(data.data() + strlen(kLogSegmentHeaderMagicString));
404
10.5k
  return Status::OK();
405
10.5k
}
406
407
10.5k
Status ReadableLogSegment::ReadFooter() {
408
10.5k
  uint32_t footer_size;
409
10.5k
  RETURN_NOT_OK(ReadFooterMagicAndFooterLength(&footer_size));
410
411
6.85k
  if (footer_size == 0 || 
footer_size > kLogSegmentMaxHeaderOrFooterSize6.85k
) {
412
0
    return STATUS(NotFound,
413
0
        Substitute("File is corrupted. "
414
0
                   "Parsed header size: $0 is zero or bigger than max header size: $1",
415
0
                   footer_size, kLogSegmentMaxHeaderOrFooterSize));
416
0
  }
417
418
6.85k
  if (footer_size > (file_size() - first_entry_offset_)) {
419
0
    return STATUS(NotFound, "Footer not found. File corrupted. "
420
0
        "Decoded footer length pointed at a footer before the first entry.");
421
0
  }
422
423
6.85k
  std::vector<uint8_t> footer_space(footer_size);
424
6.85k
  Slice footer_slice;
425
426
6.85k
  int64_t footer_offset = file_size() - kLogSegmentFooterMagicAndFooterLength - footer_size;
427
428
6.85k
  LogSegmentFooterPB footer;
429
430
  // Read and parse the log segment footer.
431
6.85k
  RETURN_NOT_OK_PREPEND(ReadFully(readable_file_.get(), footer_offset,
432
6.85k
                                  footer_size, &footer_slice, footer_space.data()),
433
6.85k
                        "Footer not found. Could not read fully.");
434
435
6.85k
  RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&footer,
436
6.85k
                                                footer_slice.data(),
437
6.85k
                                                footer_size),
438
6.85k
                        "Unable to parse protobuf");
439
440
6.85k
  footer_.Swap(&footer);
441
6.85k
  return Status::OK();
442
6.85k
}
443
444
10.5k
Status ReadableLogSegment::ReadFooterMagicAndFooterLength(uint32_t *len) {
445
10.5k
  uint8_t scratch[kLogSegmentFooterMagicAndFooterLength];
446
10.5k
  Slice slice;
447
448
10.5k
  CHECK_GT(file_size(), kLogSegmentFooterMagicAndFooterLength);
449
10.5k
  RETURN_NOT_OK(ReadFully(readable_file_.get(),
450
10.5k
                          file_size() - kLogSegmentFooterMagicAndFooterLength,
451
10.5k
                          kLogSegmentFooterMagicAndFooterLength,
452
10.5k
                          &slice,
453
10.5k
                          scratch));
454
455
10.5k
  RETURN_NOT_OK(ParseFooterMagicAndFooterLength(slice, len));
456
6.85k
  return Status::OK();
457
10.5k
}
458
459
Status ReadableLogSegment::ParseFooterMagicAndFooterLength(const Slice &data,
460
10.5k
                                                           uint32_t *parsed_len) {
461
10.5k
  RETURN_NOT_OK_PREPEND(data.check_size(kLogSegmentFooterMagicAndFooterLength),
462
10.5k
                        "Slice is too small to contain final magic number");
463
464
10.5k
  if (memcmp(kLogSegmentFooterMagicString, data.data(),
465
10.5k
             strlen(kLogSegmentFooterMagicString)) != 0) {
466
3.75k
    return STATUS(NotFound, "Footer not found. Footer magic doesn't match");
467
3.75k
  }
468
469
6.84k
  *parsed_len = DecodeFixed32(data.data() + strlen(kLogSegmentFooterMagicString));
470
6.84k
  return Status::OK();
471
10.5k
}
472
473
19.2k
ReadEntriesResult ReadableLogSegment::ReadEntries(int64_t max_entries_to_read) {
474
19.2k
  TRACE_EVENT1("log", "ReadableLogSegment::ReadEntries",
475
19.2k
               "path", path_);
476
477
19.2k
  ReadEntriesResult result;
478
479
19.2k
  std::vector<int64_t> recent_offsets(4, -1);
480
19.2k
  int64_t batches_read = 0;
481
482
19.2k
  int64_t offset = first_entry_offset();
483
19.2k
  int64_t readable_to_offset = readable_to_offset_.Load();
484
19.2k
  VLOG(1) << "Reading segment entries from "
485
1
          << path_ << ": offset=" << offset << " file_size="
486
1
          << file_size() << " readable_to_offset=" << readable_to_offset;
487
19.2k
  faststring tmp_buf;
488
489
  // If we have a footer we only read up to it. If we don't we likely crashed
490
  // and always read to the end.
491
19.2k
  int64_t read_up_to = (footer_.IsInitialized() && 
!footer_was_rebuilt_15.5k
) ?
492
7.99k
      file_size() - footer_.ByteSize() - kLogSegmentFooterMagicAndFooterLength :
493
19.2k
      
readable_to_offset11.2k
;
494
495
19.2k
  result.end_offset = offset;
496
497
19.2k
  int64_t num_entries_read = 0;
498
3.54M
  while (offset < read_up_to) {
499
3.53M
    const int64_t this_batch_offset = offset;
500
3.53M
    recent_offsets[batches_read++ % recent_offsets.size()] = offset;
501
502
3.53M
    LogEntryBatchPB current_batch;
503
504
    // Read and validate the entry header first.
505
3.53M
    Status s;
506
3.53M
    if (offset + implicit_cast<ssize_t>(kEntryHeaderSize) < read_up_to) {
507
3.53M
      s = ReadEntryHeaderAndBatch(&offset, &tmp_buf, &current_batch);
508
3.53M
    } else {
509
1.91k
      s = STATUS(Corruption, Substitute("Truncated log entry at offset $0", offset));
510
1.91k
    }
511
512
3.53M
    if (PREDICT_FALSE(!s.ok())) {
513
1.13k
      if (!s.IsCorruption()) {
514
        // IO errors should always propagate back
515
0
        result.status = s.CloneAndPrepend(Substitute("Error reading from log $0", path_));
516
0
        return result;
517
0
      }
518
519
1.13k
      result.status = MakeCorruptionStatus(
520
1.13k
          batches_read, this_batch_offset, &recent_offsets, result.entries, s);
521
522
      // If we have a valid footer in the segment, then the segment was correctly
523
      // closed, and we shouldn't see any corruption anywhere (including the last
524
      // batch).
525
1.13k
      if (HasFooter() && 
!footer_was_rebuilt_3
) {
526
3
        LOG(WARNING) << "Found a corruption in a closed log segment: " << result.status;
527
3
        return result;
528
3
      }
529
530
      // If we read a corrupt entry, but we don't have a footer, then it's
531
      // possible that we crashed in the middle of writing an entry.
532
      // In this case, we scan forward to see if there are any more valid looking
533
      // entries after this one in the file. If there are, it's really a corruption.
534
      // if not, we just WARN it, since it's OK for the last entry to be partially
535
      // written.
536
1.13k
      bool has_valid_entries;
537
1.13k
      auto status = ScanForValidEntryHeaders(offset, &has_valid_entries);
538
1.13k
      if (!status.ok()) {
539
0
        result.status = s.CloneAndPrepend("Scanning forward for valid entries");
540
0
      }
541
542
1.13k
      if (has_valid_entries) {
543
0
        return result;
544
0
      }
545
546
1.13k
      LOG(INFO) << "Ignoring log segment corruption in " << path_ << " because "
547
1.13k
                << "there are no log entries following the corrupted one. "
548
1.13k
                << "The server probably crashed in the middle of writing an entry "
549
1.13k
                << "to the write-ahead log or downloaded an active log via remote bootstrap. "
550
1.13k
                << "Error detail: " << result.status.ToString();
551
1.13k
      break;
552
1.13k
    }
553
554
3.53M
    if (VLOG_IS_ON(3)) {
555
0
      VLOG(3) << "Read Log entry batch: " << current_batch.DebugString();
556
0
    }
557
3.53M
    if (current_batch.has_committed_op_id()) {
558
3.50M
      result.committed_op_id = yb::OpId::FromPB(current_batch.committed_op_id());
559
3.50M
    }
560
561
    // Number of entries to extract from the protobuf repeated field because the ownership of those
562
    // entries will be transferred to the caller.
563
3.53M
    int num_entries_to_extract = 0;
564
565
5.65M
    for (int i = 0; i < current_batch.entry_size(); 
++i2.12M
) {
566
2.12M
      result.entries.emplace_back(current_batch.mutable_entry(i));
567
2.12M
      DCHECK_NE(current_batch.mono_time(), 0);
568
2.12M
      LogEntryMetadata entry_metadata;
569
2.12M
      entry_metadata.offset = this_batch_offset;
570
2.12M
      entry_metadata.active_segment_sequence_number = header().sequence_number();
571
2.12M
      entry_metadata.entry_time = RestartSafeCoarseTimePoint::FromUInt64(current_batch.mono_time());
572
2.12M
      result.entry_metadata.emplace_back(std::move(entry_metadata));
573
2.12M
      num_entries_read++;
574
2.12M
      num_entries_to_extract++;
575
2.12M
      if (num_entries_read >= max_entries_to_read) {
576
7.60k
        break;
577
7.60k
      }
578
2.12M
    }
579
3.53M
    current_batch.mutable_entry()->ExtractSubrange(
580
3.53M
        0, num_entries_to_extract, /* elements */ nullptr);
581
3.53M
    result.end_offset = offset;
582
3.53M
    if (num_entries_read >= max_entries_to_read) {
583
7.60k
      result.status = Status::OK();
584
7.60k
      return result;
585
7.60k
    }
586
3.53M
  }
587
588
11.6k
  if (footer_.IsInitialized() && 
footer_.num_entries() != num_entries_read7.89k
) {
589
0
    result.status = STATUS_FORMAT(
590
0
        Corruption,
591
0
        "Read $0 log entries from $1, but expected $2 based on the footer",
592
0
        num_entries_read, path_, footer_.num_entries());
593
0
  }
594
595
11.6k
  result.status = Status::OK();
596
11.6k
  return result;
597
19.2k
}
598
599
7.64k
Result<FirstEntryMetadata> ReadableLogSegment::ReadFirstEntryMetadata() {
600
7.64k
  auto read_result = ReadEntries(/* max_entries_to_read */ 1);
601
7.64k
  const auto& entries = read_result.entries;
602
7.64k
  const auto& entry_metadata_records = read_result.entry_metadata;
603
7.64k
  if (entries.empty()) {
604
35
    return STATUS(NotFound, "No entries found");
605
35
  }
606
7.61k
  if (entry_metadata_records.empty()) {
607
0
    return STATUS(NotFound, "No entry metadata found");
608
0
  }
609
7.61k
  auto& first_entry = *entries.front();
610
7.61k
  if (!first_entry.has_replicate()) {
611
0
    return STATUS(NotFound, "No REPLICATE message found in the first entry");
612
0
  }
613
614
7.61k
  return FirstEntryMetadata {
615
7.61k
    .op_id = OpId::FromPB(first_entry.replicate().id()),
616
7.61k
    .entry_time = entry_metadata_records.front().entry_time
617
7.61k
  };
618
7.61k
}
619
620
1.12k
Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries) {
621
1.12k
  TRACE_EVENT1("log", "ReadableLogSegment::ScanForValidEntryHeaders",
622
1.12k
               "path", path_);
623
1.12k
  LOG(INFO) << "Scanning " << path_ << " for valid entry headers "
624
1.12k
            << "following offset " << offset << "...";
625
1.12k
  *has_valid_entries = false;
626
627
1.12k
  const int kChunkSize = 1024 * 1024;
628
1.12k
  std::unique_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]);
629
630
  // We overlap the reads by the size of the header, so that if a header
631
  // spans chunks, we don't miss it.
632
1.12k
  for (;
633
27.4k
       offset < implicit_cast<int64_t>(file_size() - kEntryHeaderSize);
634
26.3k
       offset += kChunkSize - kEntryHeaderSize) {
635
26.3k
    auto rem = std::min<int64_t>(file_size() - offset, kChunkSize);
636
26.3k
    Slice chunk;
637
    // If encryption is enabled, need to use checkpoint file to read pre-allocated file since
638
    // we want to preserve all 0s.
639
26.3k
    RETURN_NOT_OK(ReadFully(
640
26.3k
        readable_file_checkpoint().get(), offset + readable_file()->GetEncryptionHeaderSize(), rem,
641
26.3k
        &chunk, &buf[0]));
642
643
    // Optimization for the case where a chunk is all zeros -- this is common in the
644
    // case of pre-allocated files. This avoids a lot of redundant CRC calculation.
645
26.3k
    if (IsAllZeros(chunk)) {
646
26.3k
      continue;
647
26.3k
    }
648
649
0
    if (readable_file()->IsEncrypted()) {
650
      // If encryption enabled, decrypt the contents of the file.
651
0
      RETURN_NOT_OK(ReadFully(readable_file().get(), offset, rem, &chunk, &buf[0]));
652
0
    }
653
654
    // Check if this chunk has a valid entry header.
655
0
    for (size_t off_in_chunk = 0;
656
0
         off_in_chunk < chunk.size() - kEntryHeaderSize;
657
0
         off_in_chunk++) {
658
0
      const Slice potential_header = Slice(chunk.data() + off_in_chunk, kEntryHeaderSize);
659
660
0
      EntryHeader header;
661
0
      if (DecodeEntryHeader(potential_header, &header).ok()) {
662
0
        LOG(INFO) << "Found a valid entry header at offset " << (offset + off_in_chunk);
663
0
        *has_valid_entries = true;
664
0
        return Status::OK();
665
0
      }
666
0
    }
667
0
  }
668
669
1.12k
  LOG(INFO) << "Found no log entry headers";
670
1.12k
  return Status::OK();
671
1.12k
}
672
673
Status ReadableLogSegment::MakeCorruptionStatus(
674
    size_t batch_number,
675
    int64_t batch_offset,
676
    std::vector<int64_t>* recent_offsets,
677
    const std::vector<std::unique_ptr<LogEntryPB>>& entries,
678
1.13k
    const Status& status) const {
679
680
1.13k
  string err = "Log file corruption detected. ";
681
1.13k
  SubstituteAndAppend(&err, "Failed trying to read batch #$0 at offset $1 for log segment $2: ",
682
1.13k
                      batch_number, batch_offset, path_);
683
1.13k
  err.append("Prior batch offsets:");
684
1.13k
  std::sort(recent_offsets->begin(), recent_offsets->end());
685
4.52k
  for (int64_t offset : *recent_offsets) {
686
4.52k
    if (offset >= 0) {
687
4.40k
      SubstituteAndAppend(&err, " $0", offset);
688
4.40k
    }
689
4.52k
  }
690
1.13k
  if (!entries.empty()) {
691
1.12k
    err.append("; Last log entries read:");
692
1.12k
    const int kNumEntries = 4; // Include up to the last 4 entries in the segment.
693
1.12k
    for (size_t i = std::max(0, static_cast<int>(entries.size()) - kNumEntries);
694
4.83k
        i < entries.size(); 
i++3.71k
) {
695
3.71k
      LogEntryPB* entry = entries[i].get();
696
3.71k
      LogEntryTypePB type = entry->type();
697
3.71k
      string opid_str;
698
3.71k
      if (type == log::REPLICATE && 
entry->has_replicate()3.70k
) {
699
3.70k
        opid_str = consensus::OpIdToString(entry->replicate().id());
700
3.70k
      } else {
701
2
        opid_str = "<unknown>";
702
2
      }
703
3.71k
      SubstituteAndAppend(&err, " [$0 ($1)]", LogEntryTypePB_Name(type), opid_str);
704
3.71k
    }
705
1.12k
  }
706
707
1.13k
  return status.CloneAndAppend(err);
708
1.13k
}
709
710
Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf,
711
3.56M
                                                   LogEntryBatchPB* batch) {
712
3.56M
  EntryHeader header;
713
3.56M
  RETURN_NOT_OK(ReadEntryHeader(offset, &header));
714
3.56M
  RETURN_NOT_OK(ReadEntryBatch(offset, header, tmp_buf, batch));
715
3.56M
  return Status::OK();
716
3.56M
}
717
718
719
3.56M
Status ReadableLogSegment::ReadEntryHeader(int64_t *offset, EntryHeader* header) {
720
3.56M
  uint8_t scratch[kEntryHeaderSize];
721
3.56M
  Slice slice;
722
3.56M
  RETURN_NOT_OK_PREPEND(ReadFully(readable_file().get(), *offset, kEntryHeaderSize,
723
3.56M
                                  &slice, scratch),
724
3.56M
                        "Could not read log entry header");
725
726
3.56M
  RETURN_NOT_OK(DecodeEntryHeader(slice, header));
727
3.56M
  *offset += slice.size();
728
3.56M
  return Status::OK();
729
3.56M
}
730
731
3.56M
Status ReadableLogSegment::DecodeEntryHeader(const Slice& data, EntryHeader* header) {
732
3.56M
  DCHECK_EQ(kEntryHeaderSize, data.size());
733
3.56M
  header->msg_length = DecodeFixed32(data.data());
734
3.56M
  header->msg_crc = DecodeFixed32(data.data() + 4);
735
3.56M
  header->header_crc = DecodeFixed32(data.data() + 8);
736
737
  // Verify the header.
738
3.56M
  uint32_t computed_crc = crc::Crc32c(data.data(), 8);
739
3.56M
  if (computed_crc != header->header_crc) {
740
1.13k
    return STATUS_FORMAT(
741
1.13k
        Corruption, "Invalid checksum in log entry head header: found=$0, computed=$1",
742
1.13k
        header->header_crc, computed_crc);
743
1.13k
  }
744
3.56M
  return Status::OK();
745
3.56M
}
746
747
748
Status ReadableLogSegment::ReadEntryBatch(int64_t *offset,
749
                                          const EntryHeader& header,
750
                                          faststring *tmp_buf,
751
3.56M
                                          LogEntryBatchPB* entry_batch) {
752
3.56M
  TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch",
753
3.56M
               "path", path_,
754
3.56M
               "range", Substitute("offset=$0 entry_len=$1",
755
3.56M
                                   *offset, header.msg_length));
756
757
3.56M
  if (header.msg_length == 0) {
758
0
    return STATUS(Corruption, "Invalid 0 entry length");
759
0
  }
760
3.56M
  int64_t limit = readable_up_to();
761
3.56M
  if (PREDICT_FALSE(header.msg_length + *offset > limit)) {
762
    // The log was likely truncated during writing.
763
1
    return STATUS(Corruption,
764
1
        Substitute("Could not read $0-byte log entry from offset $1 in $2: "
765
1
                   "log only readable up to offset $3",
766
1
                   header.msg_length, *offset, path_, limit));
767
1
  }
768
769
3.56M
  tmp_buf->clear();
770
3.56M
  tmp_buf->resize(header.msg_length);
771
3.56M
  Slice entry_batch_slice;
772
773
3.56M
  Status s =  readable_file()->Read(*offset,
774
3.56M
                                    header.msg_length,
775
3.56M
                                    &entry_batch_slice,
776
3.56M
                                    tmp_buf->data());
777
778
3.56M
  if (!s.ok()) 
return 0
STATUS0
(IOError, Substitute("Could not read entry. Cause: $0",
779
3.56M
                                                 s.ToString()));
780
781
  // Verify the CRC.
782
3.56M
  uint32_t read_crc = crc::Crc32c(entry_batch_slice.data(), entry_batch_slice.size());
783
3.56M
  if (PREDICT_FALSE(read_crc != header.msg_crc)) {
784
2
    return STATUS(Corruption, Substitute("Entry CRC mismatch in byte range $0-$1: "
785
2
                                         "expected CRC=$2, computed=$3",
786
2
                                         *offset, *offset + header.msg_length,
787
2
                                         header.msg_crc, read_crc));
788
2
  }
789
790
791
3.56M
  LogEntryBatchPB read_entry_batch;
792
3.56M
  s = pb_util::ParseFromArray(&read_entry_batch,
793
3.56M
                              entry_batch_slice.data(),
794
3.56M
                              header.msg_length);
795
796
3.56M
  if (!s.ok()) 
return 0
STATUS0
(Corruption, Substitute("Could parse PB. Cause: $0",
797
3.56M
                                                    s.ToString()));
798
799
3.56M
  *offset += entry_batch_slice.size();
800
3.56M
  entry_batch->Swap(&read_entry_batch);
801
3.56M
  return Status::OK();
802
3.56M
}
803
804
26.6M
const LogSegmentHeaderPB& ReadableLogSegment::header() const {
805
26.6M
  DCHECK(header_.IsInitialized());
806
26.6M
  return header_;
807
26.6M
}
808
809
WritableLogSegment::WritableLogSegment(string path,
810
                                       shared_ptr<WritableFile> writable_file)
811
    : path_(std::move(path)),
812
      writable_file_(std::move(writable_file)),
813
      is_header_written_(false),
814
      is_footer_written_(false),
815
160k
      written_offset_(0) {}
816
817
160k
Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header) {
818
18.4E
  DCHECK(!IsHeaderWritten()) << "Can only call WriteHeader() once";
819
18.4E
  DCHECK(new_header.IsInitialized())
820
18.4E
      << "Log segment header must be initialized" << new_header.InitializationErrorString();
821
160k
  faststring buf;
822
823
  // First the magic.
824
160k
  buf.append(kLogSegmentHeaderMagicString);
825
  // Then Length-prefixed header.
826
160k
  PutFixed32(&buf, new_header.ByteSize());
827
  // Then Serialize the PB.
828
160k
  pb_util::AppendToString(new_header, &buf);
829
160k
  RETURN_NOT_OK(writable_file()->Append(Slice(buf)));
830
831
160k
  header_.CopyFrom(new_header);
832
160k
  first_entry_offset_ = buf.size();
833
160k
  written_offset_ = first_entry_offset_;
834
160k
  is_header_written_ = true;
835
836
160k
  return Status::OK();
837
160k
}
838
839
85.4k
Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer) {
840
85.4k
  TRACE_EVENT1("log", "WritableLogSegment::WriteFooterAndClose",
841
85.4k
               "path", path_);
842
85.4k
  DCHECK(IsHeaderWritten());
843
85.4k
  DCHECK(!IsFooterWritten());
844
85.4k
  DCHECK
(footer.IsInitialized()) << footer.InitializationErrorString()20
;
845
846
85.4k
  faststring buf;
847
848
85.4k
  pb_util::AppendToString(footer, &buf);
849
850
85.4k
  buf.append(kLogSegmentFooterMagicString);
851
85.4k
  PutFixed32(&buf, footer.ByteSize());
852
853
85.4k
  RETURN_NOT_OK_PREPEND(writable_file()->Append(Slice(buf)), "Could not write the footer");
854
855
85.4k
  footer_.CopyFrom(footer);
856
85.4k
  is_footer_written_ = true;
857
858
85.4k
  RETURN_NOT_OK(writable_file_->Close());
859
860
85.4k
  written_offset_ += buf.size();
861
862
85.4k
  return Status::OK();
863
85.4k
}
864
865
866
25.0M
Status WritableLogSegment::WriteEntryBatch(const Slice& data) {
867
25.0M
  DCHECK(is_header_written_);
868
25.0M
  DCHECK(!is_footer_written_);
869
25.0M
  uint8_t header_buf[kEntryHeaderSize];
870
871
  // First encode the length of the message.
872
25.0M
  auto len = data.size();
873
25.0M
  InlineEncodeFixed32(&header_buf[0], narrow_cast<uint32_t>(len));
874
875
  // Then the CRC of the message.
876
25.0M
  uint32_t msg_crc = crc::Crc32c(data.data(), data.size());
877
25.0M
  InlineEncodeFixed32(&header_buf[4], msg_crc);
878
879
  // Then the CRC of the header
880
25.0M
  uint32_t header_crc = crc::Crc32c(&header_buf, 8);
881
25.0M
  InlineEncodeFixed32(&header_buf[8], header_crc);
882
883
25.0M
  std::array<Slice, 2> slices = {
884
25.0M
      Slice(header_buf, sizeof(header_buf)),
885
25.0M
      Slice(data),
886
25.0M
  };
887
888
  // Write the header to the file, followed by the batch data itself.
889
25.0M
  RETURN_NOT_OK(writable_file_->AppendSlices(slices.data(), slices.size()));
890
25.0M
  written_offset_ += sizeof(header_buf) + data.size();
891
892
25.0M
  return Status::OK();
893
25.0M
}
894
895
547k
Status WritableLogSegment::Sync() {
896
547k
  return writable_file_->Sync();
897
547k
}
898
899
// Creates a LogEntryBatchPB from pre-allocated ReplicateMsgs managed using shared pointers. The
900
// caller has to ensure these messages are not deleted twice, both by LogEntryBatchPB and by
901
// the shared pointers.
902
25.0M
LogEntryBatchPB CreateBatchFromAllocatedOperations(const ReplicateMsgs& msgs) {
903
25.0M
  LogEntryBatchPB result;
904
25.0M
  result.set_mono_time(RestartSafeCoarseMonoClock().Now().ToUInt64());
905
25.0M
  result.mutable_entry()->Reserve(narrow_cast<int>(msgs.size()));
906
25.0M
  for (const auto& msg_ptr : msgs) {
907
15.2M
    LogEntryPB* entry_pb = result.add_entry();
908
15.2M
    entry_pb->set_type(log::REPLICATE);
909
    // entry_pb does not actually own the ReplicateMsg object, even though it thinks it does,
910
    // because we release it in ~LogEntryBatch. LogEntryBatchPB has a separate vector of shared
911
    // pointers to messages.
912
15.2M
    entry_pb->set_allocated_replicate(msg_ptr.get());
913
15.2M
  }
914
25.0M
  return result;
915
25.0M
}
916
917
324k
bool IsLogFileName(const string& fname) {
918
324k
  if (HasPrefixString(fname, ".")) {
919
    // Hidden file or ./..
920
302k
    VLOG
(1) << "Ignoring hidden file: " << fname98
;
921
302k
    return false;
922
302k
  }
923
924
22.5k
  if (HasSuffixString(fname, kTmpSuffix)) {
925
0
    LOG(WARNING) << "Ignoring tmp file: " << fname;
926
0
    return false;
927
0
  }
928
929
22.5k
  vector<string> v = strings::Split(fname, "-");
930
22.5k
  if (v.size() != 2 || 
v[0] != FsManager::kWalFileNamePrefix20.9k
) {
931
18.4E
    VLOG(1) << "Not a log file: " << fname;
932
1.60k
    return false;
933
1.60k
  }
934
935
20.9k
  return true;
936
22.5k
}
937
938
177
std::vector<std::string> ParseDirFlags(string flag_dirs, string flag_name) {
939
177
  std::vector<std::string> paths = strings::Split(flag_dirs, ",", strings::SkipEmpty());
940
177
  return paths;
941
177
}
942
943
177
Status CheckPathsAreODirectWritable(const std::vector<std::string> &paths) {
944
177
  Env *def_env = Env::Default();
945
177
  for (const auto &path : paths) {
946
163
    RETURN_NOT_OK(CheckODirectTempFileCreationInDir(def_env, path));
947
163
  }
948
177
  return Status::OK();
949
177
}
950
951
63
Status CheckRelevantPathsAreODirectWritable() {
952
63
  if (!FLAGS_log_dir.empty()) {
953
51
    RETURN_NOT_OK_PREPEND(CheckPathsAreODirectWritable(ParseDirFlags(
954
51
        FLAGS_log_dir, "--log_dir")), "Not all log_dirs are O_DIRECT Writable.");
955
51
  }
956
63
  RETURN_NOT_OK_PREPEND(CheckPathsAreODirectWritable(ParseDirFlags(
957
63
      FLAGS_fs_data_dirs, "--data_dirs")), "Not all fs_data_dirs are O_DIRECT Writable.");
958
959
63
  RETURN_NOT_OK_PREPEND(CheckPathsAreODirectWritable(ParseDirFlags(
960
63
      FLAGS_fs_wal_dirs, "--wal_dirs")), "Not all fs_wal_dirs are O_DIRECT Writable.");
961
63
  return Status::OK();
962
63
}
963
964
14.6k
Status ModifyDurableWriteFlagIfNotODirect() {
965
14.6k
  if (FLAGS_durable_wal_write) {
966
63
    Status s = CheckRelevantPathsAreODirectWritable();
967
63
    if (!s.ok()) {
968
0
      if (FLAGS_require_durable_wal_write) {
969
        // Crash with appropriate error.
970
0
        RETURN_NOT_OK_PREPEND(s, "require_durable_wal_write is set true, but O_DIRECT is "
971
0
            "not allowed.")
972
0
      } else {
973
        // Report error but do not crash.
974
0
        LOG(ERROR) << "O_DIRECT is not allowed in some of the directories. "
975
0
            "Setting durable wal write flag to false.";
976
0
        FLAGS_durable_wal_write = false;
977
0
      }
978
0
    }
979
63
  }
980
14.6k
  return Status::OK();
981
14.6k
}
982
983
}  // namespace log
984
}  // namespace yb