/Users/deen/code/yugabyte-db/src/yb/consensus/log_util.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_CONSENSUS_LOG_UTIL_H_ |
34 | | #define YB_CONSENSUS_LOG_UTIL_H_ |
35 | | |
36 | | #include <iosfwd> |
37 | | #include <map> |
38 | | #include <memory> |
39 | | #include <string> |
40 | | #include <utility> |
41 | | #include <vector> |
42 | | |
43 | | #include <gtest/gtest_prod.h> |
44 | | |
45 | | #include "yb/consensus/consensus_fwd.h" |
46 | | #include "yb/consensus/log_fwd.h" |
47 | | #include "yb/consensus/log.pb.h" |
48 | | |
49 | | #include "yb/gutil/macros.h" |
50 | | #include "yb/gutil/ref_counted.h" |
51 | | |
52 | | #include "yb/util/atomic.h" |
53 | | #include "yb/util/compare_util.h" |
54 | | #include "yb/util/env.h" |
55 | | #include "yb/util/monotime.h" |
56 | | #include "yb/util/opid.h" |
57 | | #include "yb/util/restart_safe_clock.h" |
58 | | #include "yb/util/status.h" |
59 | | #include "yb/util/tostring.h" |
60 | | |
61 | | // Used by other classes, now part of the API. |
62 | | DECLARE_bool(durable_wal_write); |
63 | | DECLARE_bool(require_durable_wal_write); |
64 | | DECLARE_string(fs_wal_dirs); |
65 | | DECLARE_string(fs_data_dirs); |
66 | | |
67 | | namespace yb { |
68 | | namespace log { |
69 | | |
70 | | // Suffix for temporary files |
71 | | extern const char kTmpSuffix[]; |
72 | | |
73 | | // Each log entry is prefixed by its length (4 bytes), CRC (4 bytes), |
74 | | // and checksum of the other two fields (see EntryHeader struct below). |
75 | | extern const size_t kEntryHeaderSize; |
76 | | |
77 | | extern const int kLogMajorVersion; |
78 | | extern const int kLogMinorVersion; |
79 | | |
80 | | // Options for the Write Ahead Log. The LogOptions constructor initializes default field values |
81 | | // based on flags. See log_util.cc for details. |
82 | | struct LogOptions { |
83 | | |
84 | | // The size of a Log segment. |
85 | | // Logs will roll over upon reaching this size. |
86 | | size_t segment_size_bytes; |
87 | | |
88 | | size_t initial_segment_size_bytes; |
89 | | |
90 | | // Whether to call fsync on every call to Append(). |
91 | | bool durable_wal_write; |
92 | | |
93 | | // If non-zero, call fsync on a call to Append() every interval of time. |
94 | | MonoDelta interval_durable_wal_write; |
95 | | |
96 | | // If non-zero, call fsync on a call to Append() if more than given amount of data to sync. |
97 | | int32_t bytes_durable_wal_write_mb; |
98 | | |
99 | | // Whether to fallocate segments before writing to them. |
100 | | bool preallocate_segments; |
101 | | |
102 | | // Whether the allocation should happen asynchronously. |
103 | | bool async_preallocate_segments; |
104 | | |
105 | | uint32_t retention_secs = 0; |
106 | | |
107 | | // Env for log file operations. |
108 | | Env* env; |
109 | | |
110 | | std::string peer_uuid; |
111 | | |
112 | | uint64_t initial_active_segment_sequence_number = 0; |
113 | | |
114 | | LogOptions(); |
115 | | }; |
116 | | |
117 | | struct LogEntryMetadata { |
118 | | RestartSafeCoarseTimePoint entry_time; |
119 | | int64_t offset; |
120 | | uint64_t active_segment_sequence_number; |
121 | | |
122 | 6.02k | std::string ToString() const { |
123 | 6.02k | return YB_STRUCT_TO_STRING(entry_time, offset, active_segment_sequence_number); |
124 | 6.02k | } |
125 | | |
126 | 550 | friend bool operator==(const LogEntryMetadata& lhs, const LogEntryMetadata& rhs) { |
127 | 550 | return YB_STRUCT_EQUALS(entry_time, offset, active_segment_sequence_number); |
128 | 550 | } |
129 | | }; |
130 | | |
131 | | // A sequence of segments, ordered by increasing sequence number. |
132 | | typedef std::vector<std::unique_ptr<LogEntryPB>> LogEntries; |
133 | | |
134 | | struct ReadEntriesResult { |
135 | | // Read entries |
136 | | LogEntries entries; |
137 | | |
138 | | // Time, offset in WAL, and sequence number of respective entry |
139 | | std::vector<LogEntryMetadata> entry_metadata; |
140 | | |
141 | | // Where we finished reading |
142 | | int64_t end_offset; |
143 | | |
144 | | yb::OpId committed_op_id; |
145 | | |
146 | | // Failure status |
147 | | Status status; |
148 | | }; |
149 | | |
150 | | struct FirstEntryMetadata { |
151 | | OpId op_id; |
152 | | RestartSafeCoarseTimePoint entry_time; |
153 | | |
154 | 0 | std::string ToString() const { |
155 | 0 | return YB_STRUCT_TO_STRING(op_id, entry_time); |
156 | 0 | } |
157 | | }; |
158 | | |
159 | | // A segment of the log can either be a ReadableLogSegment (for replay and |
160 | | // consensus catch-up) or a WritableLogSegment (where the Log actually stores |
161 | | // state). LogSegments have a maximum size defined in LogOptions (set from the |
162 | | // log_segment_size_mb flag, which defaults to 64). Upon reaching this size |
163 | | // segments are rolled over and the Log continues in a new segment. |
164 | | |
165 | | // A readable log segment for recovery and follower catch-up. |
166 | | class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { |
167 | | public: |
168 | | // Factory method to construct a ReadableLogSegment from a file on the FS. |
169 | | static Result<scoped_refptr<ReadableLogSegment>> Open(Env* env, const std::string& path); |
170 | | |
171 | | // Build a readable segment to read entries from the provided path. |
172 | | ReadableLogSegment(std::string path, |
173 | | std::shared_ptr<RandomAccessFile> readable_file); |
174 | | |
175 | | // Initialize the ReadableLogSegment. |
176 | | // This initializer provides methods for avoiding disk IO when creating a |
177 | | // ReadableLogSegment for the current WritableLogSegment, i.e. for reading |
178 | | // the log entries in the same segment that is currently being written to. |
179 | | CHECKED_STATUS Init(const LogSegmentHeaderPB& header, |
180 | | int64_t first_entry_offset); |
181 | | |
182 | | // Initialize the ReadableLogSegment. |
183 | | // This initializer provides methods for avoiding disk IO when creating a |
184 | | // ReadableLogSegment from a WritableLogSegment (i.e. for log rolling). |
185 | | CHECKED_STATUS Init(const LogSegmentHeaderPB& header, |
186 | | const LogSegmentFooterPB& footer, |
187 | | int64_t first_entry_offset); |
188 | | |
189 | | // Initialize the ReadableLogSegment. |
190 | | // This initializer will parse the log segment header and footer. |
191 | | // Returns false if it is empty segment, that could be ignored. |
192 | | Result<bool> Init(); |
193 | | |
194 | | // Reads all entries of the provided segment. |
195 | | // |
196 | | // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all |
197 | | // the log entries read up to the corrupted one are returned in the 'entries' |
198 | | // vector. |
199 | | // |
200 | | // All gathered information is returned in result. |
201 | | // In case of failure status field of result is not ok. |
202 | | // |
203 | | // Will stop after reading max_entries_to_read entries. |
204 | | ReadEntriesResult ReadEntries(int64_t max_entries_to_read = std::numeric_limits<int64_t>::max()); |
205 | | |
206 | | // Reads the metadata of the first entry in the segment |
207 | | Result<FirstEntryMetadata> ReadFirstEntryMetadata(); |
208 | | |
209 | | // Rebuilds this segment's footer by scanning its entries. |
210 | | // This is an expensive operation as it reads and parses the whole segment |
211 | | // so it should be only used in the case of a crash, where the footer is |
212 | | // missing because we didn't have the time to write it out. |
213 | | CHECKED_STATUS RebuildFooterByScanning(); |
214 | | |
215 | 1.11M | bool IsInitialized() const { |
216 | 1.11M | return is_initialized_; |
217 | 1.11M | } |
218 | | |
219 | | // Returns the parent directory where log segments are stored. |
220 | 17.7k | const std::string &path() const { |
221 | 17.7k | return path_; |
222 | 17.7k | } |
223 | | |
224 | | const LogSegmentHeaderPB& header() const; |
225 | | |
226 | | // Indicates whether this segment has a footer. |
227 | | // |
228 | | // Segments that were properly closed, e.g. because they were rolled over, |
229 | | // will have properly written footers. On the other hand if there was a |
230 | | // crash and the segment was not closed properly the footer will be missing. |
231 | | // In this case calling ReadEntries() will rebuild the footer. |
232 | 21.7M | bool HasFooter() const { |
233 | 21.7M | return footer_.IsInitialized(); |
234 | 21.7M | } |
235 | | |
236 | | // Returns this log segment's footer. |
237 | | // |
238 | | // If HasFooter() returns false this cannot be called. |
239 | 838k | const LogSegmentFooterPB& footer() const { |
240 | 838k | DCHECK(IsInitialized()); |
241 | 838k | CHECK(HasFooter()); |
242 | 838k | return footer_; |
243 | 838k | } |
244 | | |
245 | 4.72M | std::shared_ptr<RandomAccessFile> readable_file() const { |
246 | 4.72M | return readable_file_; |
247 | 4.72M | } |
248 | | |
249 | 27.5k | std::shared_ptr<RandomAccessFile> readable_file_checkpoint() const { |
250 | 27.5k | return readable_file_checkpoint_; |
251 | 27.5k | } |
252 | | |
253 | 497k | int64_t file_size() const { |
254 | 497k | return file_size_.Load(); |
255 | 497k | } |
256 | | |
257 | 15.1k | int64_t first_entry_offset() const { |
258 | 15.1k | return first_entry_offset_; |
259 | 15.1k | } |
260 | | |
261 | 1.24k | int64_t get_header_size() const { |
262 | 1.24k | return readable_file_->GetEncryptionHeaderSize(); |
263 | 1.24k | } |
264 | | |
265 | | // Returns the full size of the file, if the segment is closed and has |
266 | | // a footer, or the offset where the last written, non corrupt entry |
267 | | // ends. |
268 | | int64_t readable_up_to() const; |
269 | | |
270 | | private: |
271 | | friend class RefCountedThreadSafe<ReadableLogSegment>; |
272 | | friend class LogReader; |
273 | | FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment); |
274 | | |
275 | | struct EntryHeader { |
276 | | // The length of the batch data. |
277 | | uint32_t msg_length; |
278 | | |
279 | | // The CRC32C of the batch data. |
280 | | uint32_t msg_crc; |
281 | | |
282 | | // The CRC32C of this EntryHeader. |
283 | | uint32_t header_crc; |
284 | | }; |
285 | | |
286 | 118k | ~ReadableLogSegment() {} |
287 | | |
288 | | // Helper functions called by Init(). |
289 | | |
290 | | CHECKED_STATUS ReadFileSize(); |
291 | | |
292 | | Result<bool> ReadHeader(); |
293 | | |
294 | | CHECKED_STATUS ReadHeaderMagicAndHeaderLength(uint32_t *len); |
295 | | |
296 | | CHECKED_STATUS ParseHeaderMagicAndHeaderLength(const Slice &data, uint32_t *parsed_len); |
297 | | |
298 | | CHECKED_STATUS ReadFooter(); |
299 | | |
300 | | CHECKED_STATUS ReadFooterMagicAndFooterLength(uint32_t *len); |
301 | | |
302 | | CHECKED_STATUS ParseFooterMagicAndFooterLength(const Slice &data, uint32_t *parsed_len); |
303 | | |
304 | | // Starting at 'offset', read the rest of the log file, looking for any |
305 | | // valid log entry headers. If any are found, sets *has_valid_entries to true. |
306 | | // |
307 | | // Returns a bad Status only in the case that some IO error occurred reading the |
308 | | // file. |
309 | | CHECKED_STATUS ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries); |
310 | | |
311 | | // Format a nice error message to report on a corruption in a log file. |
312 | | CHECKED_STATUS MakeCorruptionStatus( |
313 | | size_t batch_number, int64_t batch_offset, std::vector<int64_t>* recent_offsets, |
314 | | const std::vector<std::unique_ptr<LogEntryPB>>& entries, const Status& status) const; |
315 | | |
316 | | CHECKED_STATUS ReadEntryHeaderAndBatch(int64_t* offset, |
317 | | faststring* tmp_buf, |
318 | | LogEntryBatchPB* batch); |
319 | | |
320 | | // Reads a log entry header from the segment. |
321 | | // Also increments the passed offset* by the length of the entry. |
322 | | CHECKED_STATUS ReadEntryHeader(int64_t *offset, EntryHeader* header); |
323 | | |
324 | | // Decode a log entry header from the given slice, which must be kEntryHeaderSize |
325 | | // bytes long. Returns true if successful, false if corrupt. |
326 | | // |
327 | | // NOTE: this is performance-critical since it is used by ScanForValidEntryHeaders |
328 | | // and thus returns bool instead of Status. |
329 | | CHECKED_STATUS DecodeEntryHeader(const Slice& data, EntryHeader* header); |
330 | | |
331 | | // Reads a log entry batch from the provided readable segment, which gets decoded |
332 | | // into 'entry_batch' and increments 'offset' by the batch's length. |
333 | | CHECKED_STATUS ReadEntryBatch(int64_t *offset, |
334 | | const EntryHeader& header, |
335 | | faststring* tmp_buf, |
336 | | LogEntryBatchPB* entry_batch); |
337 | | |
338 | | void UpdateReadableToOffset(int64_t readable_to_offset); |
339 | | |
340 | | const std::string path_; |
341 | | |
342 | | // The size of the readable file. |
343 | | // This is set by Init(). In the case of a log being written to, |
344 | | // this may be increased by UpdateReadableToOffset() |
345 | | AtomicInt<int64_t> file_size_; |
346 | | |
347 | | // The offset up to which we can read the file. |
348 | | // For already written segments this is fixed and equal to the file size |
349 | | // but for the segments currently written to this is the offset up to which |
350 | | // we can read without the fear of reading garbage/zeros. |
351 | | // This is atomic because the Log thread might be updating the segment's readable |
352 | | // offset while an async reader is reading the segment's entries. |
353 | | // is reading it. |
354 | | AtomicInt<int64_t> readable_to_offset_; |
355 | | |
356 | | // a readable file for a log segment (used on replay) |
357 | | const std::shared_ptr<RandomAccessFile> readable_file_; |
358 | | |
359 | | std::shared_ptr<RandomAccessFile> readable_file_checkpoint_; |
360 | | |
361 | | bool is_initialized_; |
362 | | |
363 | | LogSegmentHeaderPB header_; |
364 | | |
365 | | LogSegmentFooterPB footer_; |
366 | | |
367 | | // True if the footer was rebuilt, rather than actually found on disk. |
368 | | bool footer_was_rebuilt_; |
369 | | |
370 | | // the offset of the first entry in the log. |
371 | | int64_t first_entry_offset_; |
372 | | |
373 | | DISALLOW_COPY_AND_ASSIGN(ReadableLogSegment); |
374 | | }; |
375 | | |
376 | | // A writable log segment where state data is stored. |
377 | | class WritableLogSegment { |
378 | | public: |
379 | | WritableLogSegment(std::string path, |
380 | | std::shared_ptr<WritableFile> writable_file); |
381 | | |
382 | | // Opens the segment by writing the header. |
383 | | CHECKED_STATUS WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header); |
384 | | |
385 | | // Closes the segment by writing the footer and then actually closing the |
386 | | // underlying WritableFile. |
387 | | CHECKED_STATUS WriteFooterAndClose(const LogSegmentFooterPB& footer); |
388 | | |
389 | 55.9k | bool IsClosed() { |
390 | 55.9k | return IsHeaderWritten() && IsFooterWritten(); |
391 | 55.9k | } |
392 | | |
393 | 13.9M | int64_t Size() const { |
394 | 13.9M | return writable_file_->Size(); |
395 | 13.9M | } |
396 | | |
397 | | // Appends the provided batch of data, including a header |
398 | | // and checksum. |
399 | | // Makes sure that the log segment has not been closed. |
400 | | CHECKED_STATUS WriteEntryBatch(const Slice& entry_batch_data); |
401 | | |
402 | | // Makes sure the I/O buffers in the underlying writable file are flushed. |
403 | | CHECKED_STATUS Sync(); |
404 | | |
405 | | // Returns true if the segment header has already been written to disk. |
406 | 264k | bool IsHeaderWritten() const { |
407 | 264k | return is_header_written_; |
408 | 264k | } |
409 | | |
410 | 55.4k | const LogSegmentHeaderPB& header() const { |
411 | 55.4k | DCHECK(IsHeaderWritten()); |
412 | 55.4k | return header_; |
413 | 55.4k | } |
414 | | |
415 | 167k | bool IsFooterWritten() const { |
416 | 167k | return is_footer_written_; |
417 | 167k | } |
418 | | |
419 | 55.2k | const LogSegmentFooterPB& footer() const { |
420 | 55.2k | DCHECK(IsFooterWritten()); |
421 | 55.2k | return footer_; |
422 | 55.2k | } |
423 | | |
424 | | // Returns the parent directory where log segments are stored. |
425 | 125k | const std::string &path() const { |
426 | 125k | return path_; |
427 | 125k | } |
428 | | |
429 | 152k | int64_t first_entry_offset() const { |
430 | 152k | return first_entry_offset_; |
431 | 152k | } |
432 | | |
433 | 41.7M | int64_t written_offset() const { |
434 | 41.7M | return written_offset_; |
435 | 41.7M | } |
436 | | |
437 | | private: |
438 | | |
439 | 153k | const std::shared_ptr<WritableFile>& writable_file() const { |
440 | 153k | return writable_file_; |
441 | 153k | } |
442 | | |
443 | | // The path to the log file. |
444 | | const std::string path_; |
445 | | |
446 | | // The writable file to which this LogSegment will be written. |
447 | | const std::shared_ptr<WritableFile> writable_file_; |
448 | | |
449 | | bool is_header_written_; |
450 | | |
451 | | bool is_footer_written_; |
452 | | |
453 | | LogSegmentHeaderPB header_; |
454 | | |
455 | | LogSegmentFooterPB footer_; |
456 | | |
457 | | // the offset of the first entry in the log |
458 | | int64_t first_entry_offset_; |
459 | | |
460 | | // The offset where the last written entry ends. |
461 | | int64_t written_offset_; |
462 | | |
463 | | DISALLOW_COPY_AND_ASSIGN(WritableLogSegment); |
464 | | }; |
465 | | |
466 | | using consensus::ReplicateMsgs; |
467 | | |
468 | | // Sets 'batch' to a newly created batch that contains the pre-allocated |
469 | | // ReplicateMsgs in 'msgs'. |
470 | | // We use C-style passing here to avoid having to allocate a vector |
471 | | // in some hot paths. |
472 | | LogEntryBatchPB CreateBatchFromAllocatedOperations(const ReplicateMsgs& msgs); |
473 | | |
474 | | // Checks if 'fname' is a correctly formatted name of log segment file. |
475 | | bool IsLogFileName(const std::string& fname); |
476 | | |
477 | | CHECKED_STATUS CheckPathsAreODirectWritable(const std::vector<std::string>& paths); |
478 | | CHECKED_STATUS CheckRelevantPathsAreODirectWritable(); |
479 | | |
480 | | // Modify durable wal write flag depending on the value of FLAGS_require_durable_wal_write. |
481 | | CHECKED_STATUS ModifyDurableWriteFlagIfNotODirect(); |
482 | | |
483 | | } // namespace log |
484 | | } // namespace yb |
485 | | |
486 | | #endif /* YB_CONSENSUS_LOG_UTIL_H_ */ |