YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log_util.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_CONSENSUS_LOG_UTIL_H_
34
#define YB_CONSENSUS_LOG_UTIL_H_
35
36
#include <iosfwd>
37
#include <map>
38
#include <memory>
39
#include <string>
40
#include <utility>
41
#include <vector>
42
43
#include <gtest/gtest_prod.h>
44
45
#include "yb/consensus/consensus_fwd.h"
46
#include "yb/consensus/log_fwd.h"
47
#include "yb/consensus/log.pb.h"
48
49
#include "yb/gutil/macros.h"
50
#include "yb/gutil/ref_counted.h"
51
52
#include "yb/util/atomic.h"
53
#include "yb/util/compare_util.h"
54
#include "yb/util/env.h"
55
#include "yb/util/monotime.h"
56
#include "yb/util/opid.h"
57
#include "yb/util/restart_safe_clock.h"
58
#include "yb/util/status.h"
59
#include "yb/util/tostring.h"
60
61
// Used by other classes, now part of the API.
62
DECLARE_bool(durable_wal_write);
63
DECLARE_bool(require_durable_wal_write);
64
DECLARE_string(fs_wal_dirs);
65
DECLARE_string(fs_data_dirs);
66
67
namespace yb {
68
namespace log {
69
70
// Suffix for temporary files
71
extern const char kTmpSuffix[];
72
73
// Each log entry is prefixed by its length (4 bytes), CRC (4 bytes),
74
// and checksum of the other two fields (see EntryHeader struct below).
75
extern const size_t kEntryHeaderSize;
76
77
extern const int kLogMajorVersion;
78
extern const int kLogMinorVersion;
79
80
// Options for the Write Ahead Log. The LogOptions constructor initializes default field values
81
// based on flags. See log_util.cc for details.
82
struct LogOptions {
83
84
  // The size of a Log segment.
85
  // Logs will roll over upon reaching this size.
86
  size_t segment_size_bytes;
87
88
  size_t initial_segment_size_bytes;
89
90
  // Whether to call fsync on every call to Append().
91
  bool durable_wal_write;
92
93
  // If non-zero, call fsync on a call to Append() every interval of time.
94
  MonoDelta interval_durable_wal_write;
95
96
  // If non-zero, call fsync on a call to Append() if more than given amount of data to sync.
97
  int32_t bytes_durable_wal_write_mb;
98
99
  // Whether to fallocate segments before writing to them.
100
  bool preallocate_segments;
101
102
  // Whether the allocation should happen asynchronously.
103
  bool async_preallocate_segments;
104
105
  uint32_t retention_secs = 0;
106
107
  // Env for log file operations.
108
  Env* env;
109
110
  std::string peer_uuid;
111
112
  uint64_t initial_active_segment_sequence_number = 0;
113
114
  LogOptions();
115
};
116
117
struct LogEntryMetadata {
118
  RestartSafeCoarseTimePoint entry_time;
119
  int64_t offset;
120
  uint64_t active_segment_sequence_number;
121
122
7.44k
  std::string ToString() const {
123
7.44k
    return YB_STRUCT_TO_STRING(entry_time, offset, active_segment_sequence_number);
124
7.44k
  }
125
126
  friend bool operator==(const LogEntryMetadata& lhs, const LogEntryMetadata& rhs) {
127
    return YB_STRUCT_EQUALS(entry_time, offset, active_segment_sequence_number);
128
  }
129
};
130
131
// A sequence of segments, ordered by increasing sequence number.
132
typedef std::vector<std::unique_ptr<LogEntryPB>> LogEntries;
133
134
struct ReadEntriesResult {
135
  // Read entries
136
  LogEntries entries;
137
138
  // Time, offset in WAL, and sequence number of respective entry
139
  std::vector<LogEntryMetadata> entry_metadata;
140
141
  // Where we finished reading
142
  int64_t end_offset;
143
144
  yb::OpId committed_op_id;
145
146
  // Failure status
147
  Status status;
148
};
149
150
struct FirstEntryMetadata {
151
  OpId op_id;
152
  RestartSafeCoarseTimePoint entry_time;
153
154
0
  std::string ToString() const {
155
0
    return YB_STRUCT_TO_STRING(op_id, entry_time);
156
0
  }
157
};
158
159
// A segment of the log can either be a ReadableLogSegment (for replay and
160
// consensus catch-up) or a WritableLogSegment (where the Log actually stores
161
// state). LogSegments have a maximum size defined in LogOptions (set from the
162
// log_segment_size_mb flag, which defaults to 64). Upon reaching this size
163
// segments are rolled over and the Log continues in a new segment.
164
165
// A readable log segment for recovery and follower catch-up.
166
class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
167
 public:
168
  // Factory method to construct a ReadableLogSegment from a file on the FS.
169
  static Result<scoped_refptr<ReadableLogSegment>> Open(Env* env, const std::string& path);
170
171
  // Build a readable segment to read entries from the provided path.
172
  ReadableLogSegment(std::string path,
173
                     std::shared_ptr<RandomAccessFile> readable_file);
174
175
  // Initialize the ReadableLogSegment.
176
  // This initializer provides methods for avoiding disk IO when creating a
177
  // ReadableLogSegment for the current WritableLogSegment, i.e. for reading
178
  // the log entries in the same segment that is currently being written to.
179
  CHECKED_STATUS Init(const LogSegmentHeaderPB& header,
180
              int64_t first_entry_offset);
181
182
  // Initialize the ReadableLogSegment.
183
  // This initializer provides methods for avoiding disk IO when creating a
184
  // ReadableLogSegment from a WritableLogSegment (i.e. for log rolling).
185
  CHECKED_STATUS Init(const LogSegmentHeaderPB& header,
186
                      const LogSegmentFooterPB& footer,
187
                      int64_t first_entry_offset);
188
189
  // Initialize the ReadableLogSegment.
190
  // This initializer will parse the log segment header and footer.
191
  // Returns false if it is empty segment, that could be ignored.
192
  Result<bool> Init();
193
194
  // Reads all entries of the provided segment.
195
  //
196
  // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all
197
  // the log entries read up to the corrupted one are returned in the 'entries'
198
  // vector.
199
  //
200
  // All gathered information is returned in result.
201
  // In case of failure status field of result is not ok.
202
  //
203
  // Will stop after reading max_entries_to_read entries.
204
  ReadEntriesResult ReadEntries(int64_t max_entries_to_read = std::numeric_limits<int64_t>::max());
205
206
  // Reads the metadata of the first entry in the segment
207
  Result<FirstEntryMetadata> ReadFirstEntryMetadata();
208
209
  // Rebuilds this segment's footer by scanning its entries.
210
  // This is an expensive operation as it reads and parses the whole segment
211
  // so it should be only used in the case of a crash, where the footer is
212
  // missing because we didn't have the time to write it out.
213
  CHECKED_STATUS RebuildFooterByScanning();
214
215
9.80M
  bool IsInitialized() const {
216
9.80M
    return is_initialized_;
217
9.80M
  }
218
219
  // Returns the parent directory where log segments are stored.
220
21.7k
  const std::string &path() const {
221
21.7k
    return path_;
222
21.7k
  }
223
224
  const LogSegmentHeaderPB& header() const;
225
226
  // Indicates whether this segment has a footer.
227
  //
228
  // Segments that were properly closed, e.g. because they were rolled over,
229
  // will have properly written footers. On the other hand if there was a
230
  // crash and the segment was not closed properly the footer will be missing.
231
  // In this case calling ReadEntries() will rebuild the footer.
232
87.8M
  bool HasFooter() const {
233
87.8M
    return footer_.IsInitialized();
234
87.8M
  }
235
236
  // Returns this log segment's footer.
237
  //
238
  // If HasFooter() returns false this cannot be called.
239
9.37M
  const LogSegmentFooterPB& footer() const {
240
9.37M
    DCHECK(IsInitialized());
241
9.37M
    CHECK(HasFooter());
242
9.37M
    return footer_;
243
9.37M
  }
244
245
7.15M
  std::shared_ptr<RandomAccessFile> readable_file() const {
246
7.15M
    return readable_file_;
247
7.15M
  }
248
249
29.0k
  std::shared_ptr<RandomAccessFile> readable_file_checkpoint() const {
250
29.0k
    return readable_file_checkpoint_;
251
29.0k
  }
252
253
3.16M
  int64_t file_size() const {
254
3.16M
    return file_size_.Load();
255
3.16M
  }
256
257
19.4k
  int64_t first_entry_offset() const {
258
19.4k
    return first_entry_offset_;
259
19.4k
  }
260
261
2.69k
  int64_t get_header_size() const {
262
2.69k
    return readable_file_->GetEncryptionHeaderSize();
263
2.69k
  }
264
265
  // Returns the full size of the file, if the segment is closed and has
266
  // a footer, or the offset where the last written, non corrupt entry
267
  // ends.
268
  int64_t readable_up_to() const;
269
270
 private:
271
  friend class RefCountedThreadSafe<ReadableLogSegment>;
272
  friend class LogReader;
273
  FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
274
275
  struct EntryHeader {
276
    // The length of the batch data.
277
    uint32_t msg_length;
278
279
    // The CRC32C of the batch data.
280
    uint32_t msg_crc;
281
282
    // The CRC32C of this EntryHeader.
283
    uint32_t header_crc;
284
  };
285
286
176k
  ~ReadableLogSegment() {}
287
288
  // Helper functions called by Init().
289
290
  CHECKED_STATUS ReadFileSize();
291
292
  Result<bool> ReadHeader();
293
294
  CHECKED_STATUS ReadHeaderMagicAndHeaderLength(uint32_t *len);
295
296
  CHECKED_STATUS ParseHeaderMagicAndHeaderLength(const Slice &data, uint32_t *parsed_len);
297
298
  CHECKED_STATUS ReadFooter();
299
300
  CHECKED_STATUS ReadFooterMagicAndFooterLength(uint32_t *len);
301
302
  CHECKED_STATUS ParseFooterMagicAndFooterLength(const Slice &data, uint32_t *parsed_len);
303
304
  // Starting at 'offset', read the rest of the log file, looking for any
305
  // valid log entry headers. If any are found, sets *has_valid_entries to true.
306
  //
307
  // Returns a bad Status only in the case that some IO error occurred reading the
308
  // file.
309
  CHECKED_STATUS ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries);
310
311
  // Format a nice error message to report on a corruption in a log file.
312
  CHECKED_STATUS MakeCorruptionStatus(
313
      size_t batch_number, int64_t batch_offset, std::vector<int64_t>* recent_offsets,
314
      const std::vector<std::unique_ptr<LogEntryPB>>& entries, const Status& status) const;
315
316
  CHECKED_STATUS ReadEntryHeaderAndBatch(int64_t* offset,
317
                                         faststring* tmp_buf,
318
                                         LogEntryBatchPB* batch);
319
320
  // Reads a log entry header from the segment.
321
  // Also increments the passed offset* by the length of the entry.
322
  CHECKED_STATUS ReadEntryHeader(int64_t *offset, EntryHeader* header);
323
324
  // Decode a log entry header from the given slice, which must be kEntryHeaderSize
325
  // bytes long. Returns true if successful, false if corrupt.
326
  //
327
  // NOTE: this is performance-critical since it is used by ScanForValidEntryHeaders
328
  // and thus returns bool instead of Status.
329
  CHECKED_STATUS DecodeEntryHeader(const Slice& data, EntryHeader* header);
330
331
  // Reads a log entry batch from the provided readable segment, which gets decoded
332
  // into 'entry_batch' and increments 'offset' by the batch's length.
333
  CHECKED_STATUS ReadEntryBatch(int64_t *offset,
334
                                const EntryHeader& header,
335
                                faststring* tmp_buf,
336
                                LogEntryBatchPB* entry_batch);
337
338
  void UpdateReadableToOffset(int64_t readable_to_offset);
339
340
  const std::string path_;
341
342
  // The size of the readable file.
343
  // This is set by Init(). In the case of a log being written to,
344
  // this may be increased by UpdateReadableToOffset()
345
  AtomicInt<int64_t> file_size_;
346
347
  // The offset up to which we can read the file.
348
  // For already written segments this is fixed and equal to the file size
349
  // but for the segments currently written to this is the offset up to which
350
  // we can read without the fear of reading garbage/zeros.
351
  // This is atomic because the Log thread might be updating the segment's readable
352
  // offset while an async reader is reading the segment's entries.
353
  // is reading it.
354
  AtomicInt<int64_t> readable_to_offset_;
355
356
  // a readable file for a log segment (used on replay)
357
  const std::shared_ptr<RandomAccessFile> readable_file_;
358
359
  std::shared_ptr<RandomAccessFile> readable_file_checkpoint_;
360
361
  bool is_initialized_;
362
363
  LogSegmentHeaderPB header_;
364
365
  LogSegmentFooterPB footer_;
366
367
  // True if the footer was rebuilt, rather than actually found on disk.
368
  bool footer_was_rebuilt_;
369
370
  // the offset of the first entry in the log.
371
  int64_t first_entry_offset_;
372
373
  DISALLOW_COPY_AND_ASSIGN(ReadableLogSegment);
374
};
375
376
// A writable log segment where state data is stored.
377
class WritableLogSegment {
378
 public:
379
  WritableLogSegment(std::string path,
380
                     std::shared_ptr<WritableFile> writable_file);
381
382
  // Opens the segment by writing the header.
383
  CHECKED_STATUS WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header);
384
385
  // Closes the segment by writing the footer and then actually closing the
386
  // underlying WritableFile.
387
  CHECKED_STATUS WriteFooterAndClose(const LogSegmentFooterPB& footer);
388
389
85.3k
  bool IsClosed() {
390
85.3k
    return IsHeaderWritten() && 
IsFooterWritten()85.2k
;
391
85.3k
  }
392
393
25.1M
  int64_t Size() const {
394
25.1M
    return writable_file_->Size();
395
25.1M
  }
396
397
  // Appends the provided batch of data, including a header
398
  // and checksum.
399
  // Makes sure that the log segment has not been closed.
400
  CHECKED_STATUS WriteEntryBatch(const Slice& entry_batch_data);
401
402
  // Makes sure the I/O buffers in the underlying writable file are flushed.
403
  CHECKED_STATUS Sync();
404
405
  // Returns true if the segment header has already been written to disk.
406
415k
  bool IsHeaderWritten() const {
407
415k
    return is_header_written_;
408
415k
  }
409
410
84.6k
  const LogSegmentHeaderPB& header() const {
411
84.6k
    DCHECK(IsHeaderWritten());
412
84.6k
    return header_;
413
84.6k
  }
414
415
255k
  bool IsFooterWritten() const {
416
255k
    return is_footer_written_;
417
255k
  }
418
419
84.5k
  const LogSegmentFooterPB& footer() const {
420
84.5k
    DCHECK(IsFooterWritten());
421
84.5k
    return footer_;
422
84.5k
  }
423
424
  // Returns the parent directory where log segments are stored.
425
186k
  const std::string &path() const {
426
186k
    return path_;
427
186k
  }
428
429
244k
  int64_t first_entry_offset() const {
430
244k
    return first_entry_offset_;
431
244k
  }
432
433
75.2M
  int64_t written_offset() const {
434
75.2M
    return written_offset_;
435
75.2M
  }
436
437
 private:
438
439
245k
  const std::shared_ptr<WritableFile>& writable_file() const {
440
245k
    return writable_file_;
441
245k
  }
442
443
  // The path to the log file.
444
  const std::string path_;
445
446
  // The writable file to which this LogSegment will be written.
447
  const std::shared_ptr<WritableFile> writable_file_;
448
449
  bool is_header_written_;
450
451
  bool is_footer_written_;
452
453
  LogSegmentHeaderPB header_;
454
455
  LogSegmentFooterPB footer_;
456
457
  // the offset of the first entry in the log
458
  int64_t first_entry_offset_;
459
460
  // The offset where the last written entry ends.
461
  int64_t written_offset_;
462
463
  DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
464
};
465
466
using consensus::ReplicateMsgs;
467
468
// Sets 'batch' to a newly created batch that contains the pre-allocated
469
// ReplicateMsgs in 'msgs'.
470
// We use C-style passing here to avoid having to allocate a vector
471
// in some hot paths.
472
LogEntryBatchPB CreateBatchFromAllocatedOperations(const ReplicateMsgs& msgs);
473
474
// Checks if 'fname' is a correctly formatted name of log segment file.
475
bool IsLogFileName(const std::string& fname);
476
477
CHECKED_STATUS CheckPathsAreODirectWritable(const std::vector<std::string>& paths);
478
CHECKED_STATUS CheckRelevantPathsAreODirectWritable();
479
480
// Modify durable wal write flag depending on the value of FLAGS_require_durable_wal_write.
481
CHECKED_STATUS ModifyDurableWriteFlagIfNotODirect();
482
483
}  // namespace log
484
}  // namespace yb
485
486
#endif /* YB_CONSENSUS_LOG_UTIL_H_ */