/Users/deen/code/yugabyte-db/src/yb/consensus/log_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/log_util.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <array> |
37 | | #include <limits> |
38 | | #include <utility> |
39 | | |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/common/hybrid_time.h" |
43 | | |
44 | | #include "yb/consensus/opid_util.h" |
45 | | |
46 | | #include "yb/fs/fs_manager.h" |
47 | | |
48 | | #include "yb/gutil/casts.h" |
49 | | #include "yb/gutil/strings/split.h" |
50 | | #include "yb/gutil/strings/util.h" |
51 | | |
52 | | #include "yb/util/coding-inl.h" |
53 | | #include "yb/util/coding.h" |
54 | | #include "yb/util/crc.h" |
55 | | #include "yb/util/debug/trace_event.h" |
56 | | #include "yb/util/env_util.h" |
57 | | #include "yb/util/flag_tags.h" |
58 | | #include "yb/util/pb_util.h" |
59 | | #include "yb/util/result.h" |
60 | | #include "yb/util/size_literals.h" |
61 | | #include "yb/util/status_format.h" |
62 | | #include "yb/util/status_log.h" |
63 | | |
64 | | DEFINE_int32(log_segment_size_mb, 64, |
65 | | "The default segment size for log roll-overs, in MB"); |
66 | | TAG_FLAG(log_segment_size_mb, advanced); |
67 | | |
68 | | DEFINE_uint64(log_segment_size_bytes, 0, |
69 | | "The default segment size for log roll-overs, in bytes. " |
70 | | "If 0 then log_segment_size_mb is used."); |
71 | | |
72 | | DEFINE_uint64(initial_log_segment_size_bytes, 1024 * 1024, |
73 | | "The maximum segment size we want for a new WAL segment, in bytes. " |
74 | | "This value keeps doubling (for each subsequent WAL segment) till it gets to the " |
75 | | "maximum configured segment size (log_segment_size_bytes or log_segment_size_mb)."); |
76 | | |
77 | | DEFINE_bool(durable_wal_write, false, |
78 | | "Whether the Log/WAL should explicitly call fsync() after each write."); |
79 | | TAG_FLAG(durable_wal_write, stable); |
80 | | |
81 | | DEFINE_int32(interval_durable_wal_write_ms, 1000, |
82 | | "Interval in ms after which the Log/WAL should explicitly call fsync(). " |
83 | | "If 0 fsysnc() is not called."); |
84 | | TAG_FLAG(interval_durable_wal_write_ms, stable); |
85 | | |
86 | | DEFINE_int32(bytes_durable_wal_write_mb, 1, |
87 | | "Amount of data in MB after which the Log/WAL should explicitly call fsync(). " |
88 | | "If 0 fsysnc() is not called."); |
89 | | TAG_FLAG(bytes_durable_wal_write_mb, stable); |
90 | | |
91 | | DEFINE_bool(log_preallocate_segments, true, |
92 | | "Whether the WAL should preallocate the entire segment before writing to it"); |
93 | | TAG_FLAG(log_preallocate_segments, advanced); |
94 | | |
95 | | DEFINE_bool(log_async_preallocate_segments, true, |
96 | | "Whether the WAL segments preallocation should happen asynchronously"); |
97 | | TAG_FLAG(log_async_preallocate_segments, advanced); |
98 | | |
99 | | DECLARE_string(fs_data_dirs); |
100 | | |
101 | | DEFINE_bool(require_durable_wal_write, false, "Whether durable WAL write is required." |
102 | | "In case you cannot write using O_DIRECT in WAL and data directories and this flag is set true" |
103 | | "the system will deliberately crash with the appropriate error. If this flag is set false, " |
104 | | "the system will soft downgrade the durable_wal_write flag."); |
105 | | TAG_FLAG(require_durable_wal_write, stable); |
106 | | |
107 | | namespace yb { |
108 | | namespace log { |
109 | | |
110 | | using env_util::ReadFully; |
111 | | using std::vector; |
112 | | using std::shared_ptr; |
113 | | using strings::Substitute; |
114 | | using strings::SubstituteAndAppend; |
115 | | |
116 | | const char kTmpSuffix[] = ".tmp"; |
117 | | |
118 | | const char kLogSegmentHeaderMagicString[] = "yugalogf"; |
119 | | |
120 | | // A magic that is written as the very last thing when a segment is closed. |
121 | | // Segments that were not closed (usually the last one being written) will not |
122 | | // have this magic. |
123 | | const char kLogSegmentFooterMagicString[] = "closedls"; |
124 | | |
125 | | // Header is prefixed with the header magic (8 bytes) and the header length (4 bytes). |
126 | | const size_t kLogSegmentHeaderMagicAndHeaderLength = 12; |
127 | | |
128 | | // Footer is suffixed with the footer magic (8 bytes) and the footer length (4 bytes). |
129 | | const size_t kLogSegmentFooterMagicAndFooterLength = 12; |
130 | | |
131 | | const size_t kEntryHeaderSize = 12; |
132 | | |
133 | | const int kLogMajorVersion = 1; |
134 | | const int kLogMinorVersion = 0; |
135 | | |
136 | | // Maximum log segment header/footer size, in bytes (8 MB). |
137 | | const uint32_t kLogSegmentMaxHeaderOrFooterSize = 8 * 1024 * 1024; |
138 | | |
139 | | LogOptions::LogOptions() |
140 | | : segment_size_bytes(FLAGS_log_segment_size_bytes == 0 ? FLAGS_log_segment_size_mb * 1_MB |
141 | | : FLAGS_log_segment_size_bytes), |
142 | | initial_segment_size_bytes(FLAGS_initial_log_segment_size_bytes), |
143 | | durable_wal_write(FLAGS_durable_wal_write), |
144 | | interval_durable_wal_write(FLAGS_interval_durable_wal_write_ms > 0 ? |
145 | | MonoDelta::FromMilliseconds( |
146 | | FLAGS_interval_durable_wal_write_ms) : MonoDelta()), |
147 | | bytes_durable_wal_write_mb(FLAGS_bytes_durable_wal_write_mb), |
148 | | preallocate_segments(FLAGS_log_preallocate_segments), |
149 | | async_preallocate_segments(FLAGS_log_async_preallocate_segments), |
150 | 150k | env(Env::Default()) { |
151 | 150k | } |
152 | | |
153 | | Result<scoped_refptr<ReadableLogSegment>> ReadableLogSegment::Open( |
154 | 10.6k | Env* env, const std::string& path) { |
155 | 10.6k | VLOG(1) << "Parsing wal segment: " << path1 ; |
156 | 10.6k | shared_ptr<RandomAccessFile> readable_file; |
157 | 10.6k | RETURN_NOT_OK_PREPEND(env_util::OpenFileForRandom(env, path, &readable_file), |
158 | 10.6k | "Unable to open file for reading"); |
159 | | |
160 | 10.6k | auto segment = make_scoped_refptr<ReadableLogSegment>(path, readable_file); |
161 | 10.6k | if (!VERIFY_RESULT_PREPEND(segment->Init(), "Unable to initialize segment")) { |
162 | 1 | return nullptr; |
163 | 1 | } |
164 | 10.6k | return segment; |
165 | 10.6k | } |
166 | | |
167 | | ReadableLogSegment::ReadableLogSegment( |
168 | | std::string path, shared_ptr<RandomAccessFile> readable_file) |
169 | | : path_(std::move(path)), |
170 | | file_size_(0), |
171 | | readable_to_offset_(0), |
172 | | readable_file_(std::move(readable_file)), |
173 | | is_initialized_(false), |
174 | 255k | footer_was_rebuilt_(false) { |
175 | 255k | CHECK_OK(env_util::OpenFileForRandom(Env::Default(), path_, &readable_file_checkpoint_)); |
176 | 255k | } |
177 | | |
178 | | Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header, |
179 | | const LogSegmentFooterPB& footer, |
180 | 84.6k | int64_t first_entry_offset) { |
181 | 84.6k | DCHECK(!IsInitialized()) << "Can only call Init() once"444 ; |
182 | 18.4E | DCHECK(header.IsInitialized()) << "Log segment header must be initialized"; |
183 | 18.4E | DCHECK(footer.IsInitialized()) << "Log segment footer must be initialized"; |
184 | | |
185 | 84.6k | RETURN_NOT_OK(ReadFileSize()); |
186 | | |
187 | 84.6k | header_.CopyFrom(header); |
188 | 84.6k | footer_.CopyFrom(footer); |
189 | 84.6k | first_entry_offset_ = first_entry_offset; |
190 | 84.6k | is_initialized_ = true; |
191 | 84.6k | readable_to_offset_.Store(file_size()); |
192 | | |
193 | 84.6k | return Status::OK(); |
194 | 84.6k | } |
195 | | |
196 | | Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header, |
197 | 159k | int64_t first_entry_offset) { |
198 | 159k | DCHECK(!IsInitialized()) << "Can only call Init() once"661 ; |
199 | 18.4E | DCHECK(header.IsInitialized()) << "Log segment header must be initialized"; |
200 | | |
201 | 159k | RETURN_NOT_OK(ReadFileSize()); |
202 | | |
203 | 159k | header_.CopyFrom(header); |
204 | 159k | first_entry_offset_ = first_entry_offset; |
205 | 159k | is_initialized_ = true; |
206 | | |
207 | | // On a new segment, we don't expect any readable entries yet. |
208 | 159k | readable_to_offset_.Store(first_entry_offset); |
209 | | |
210 | 159k | return Status::OK(); |
211 | 159k | } |
212 | | |
213 | 10.5k | Result<bool> ReadableLogSegment::Init() { |
214 | 10.5k | DCHECK(!IsInitialized()) << "Can only call Init() once"12 ; |
215 | | |
216 | 10.5k | RETURN_NOT_OK(ReadFileSize()); |
217 | | |
218 | 10.5k | if (!VERIFY_RESULT(ReadHeader())) { |
219 | 1 | return false; |
220 | 1 | } |
221 | | |
222 | 10.5k | Status s = ReadFooter(); |
223 | 10.5k | if (!s.ok()) { |
224 | 3.74k | LOG(WARNING) << "Could not read footer for segment: " << path_ |
225 | 3.74k | << ": " << s.ToString(); |
226 | 3.74k | } |
227 | | |
228 | 10.5k | is_initialized_ = true; |
229 | | |
230 | 10.5k | readable_to_offset_.Store(file_size()); |
231 | | |
232 | 10.5k | return true; |
233 | 10.5k | } |
234 | | |
235 | 3.57M | int64_t ReadableLogSegment::readable_up_to() const { |
236 | 3.57M | return readable_to_offset_.Load(); |
237 | 3.57M | } |
238 | | |
239 | 25.1M | void ReadableLogSegment::UpdateReadableToOffset(int64_t readable_to_offset) { |
240 | 25.1M | readable_to_offset_.Store(readable_to_offset); |
241 | 25.1M | file_size_.StoreMax(readable_to_offset); |
242 | 25.1M | } |
243 | | |
244 | 3.75k | Status ReadableLogSegment::RebuildFooterByScanning() { |
245 | 3.75k | TRACE_EVENT1("log", "ReadableLogSegment::RebuildFooterByScanning", |
246 | 3.75k | "path", path_); |
247 | | |
248 | 3.75k | DCHECK(!footer_.IsInitialized()); |
249 | 3.75k | auto read_entries = ReadEntries(); |
250 | 3.75k | RETURN_NOT_OK(read_entries.status); |
251 | | |
252 | 3.75k | footer_.set_num_entries(read_entries.entries.size()); |
253 | | |
254 | 3.75k | uint64_t latest_ht = 0; |
255 | | // Rebuild the min/max replicate index (by scanning) |
256 | 710k | for (const auto& entry : read_entries.entries) { |
257 | 710k | if (entry->has_replicate()710k ) { |
258 | 710k | int64_t index = entry->replicate().id().index(); |
259 | | // TODO: common code with Log::UpdateFooterForBatch |
260 | 710k | if (!footer_.has_min_replicate_index() || |
261 | 710k | index < footer_.min_replicate_index()706k ) { |
262 | 3.82k | footer_.set_min_replicate_index(index); |
263 | 3.82k | } |
264 | 710k | if (!footer_.has_max_replicate_index() || |
265 | 710k | index > footer_.max_replicate_index()706k ) { |
266 | 701k | footer_.set_max_replicate_index(index); |
267 | 701k | } |
268 | 710k | latest_ht = std::max(latest_ht, entry->replicate().hybrid_time()); |
269 | 710k | } |
270 | 710k | } |
271 | | |
272 | 3.75k | DCHECK(footer_.IsInitialized()); |
273 | 3.75k | DCHECK_EQ(read_entries.entries.size(), footer_.num_entries()); |
274 | 3.75k | footer_was_rebuilt_ = true; |
275 | | |
276 | 3.75k | if (latest_ht > 0) { |
277 | 3.74k | footer_.set_close_timestamp_micros(yb::HybridTime(latest_ht).GetPhysicalValueMicros()); |
278 | 3.74k | } |
279 | | |
280 | 3.75k | readable_to_offset_.Store(read_entries.end_offset); |
281 | | |
282 | 3.75k | LOG(INFO) << "Successfully rebuilt footer for segment: " << path_ |
283 | 3.75k | << " (valid entries through byte offset " << read_entries.end_offset << ")"; |
284 | 3.75k | return Status::OK(); |
285 | 3.75k | } |
286 | | |
287 | 255k | Status ReadableLogSegment::ReadFileSize() { |
288 | | // Check the size of the file. |
289 | | // Env uses uint here, even though we generally prefer signed ints to avoid |
290 | | // underflow bugs. Use a local to convert. |
291 | 255k | uint64_t size = VERIFY_RESULT_PREPEND(readable_file_->Size(), "Unable to read file size"); |
292 | 0 | file_size_.Store(size); |
293 | 255k | if (size == 0) { |
294 | 3 | VLOG(1) << "Log segment file $0 is zero-length: " << path()0 ; |
295 | 3 | return Status::OK(); |
296 | 3 | } |
297 | 255k | return Status::OK(); |
298 | 255k | } |
299 | | |
300 | 10.5k | Result<bool> ReadableLogSegment::ReadHeader() { |
301 | 10.5k | uint32_t header_size; |
302 | 10.5k | RETURN_NOT_OK(ReadHeaderMagicAndHeaderLength(&header_size)); |
303 | 10.5k | if (header_size == 0) { |
304 | | // If a log file has been pre-allocated but not initialized, then |
305 | | // 'header_size' will be 0 even the file size is > 0; in this |
306 | | // case, 'is_initialized_' remains set to false and return |
307 | | // Status::OK() early. LogReader ignores segments where |
308 | | // IsInitialized() returns false. |
309 | 1 | return false; |
310 | 1 | } |
311 | | |
312 | 10.5k | if (header_size > kLogSegmentMaxHeaderOrFooterSize) { |
313 | 0 | return STATUS(Corruption, |
314 | 0 | Substitute("File is corrupted. " |
315 | 0 | "Parsed header size: $0 is zero or bigger than max header size: $1", |
316 | 0 | header_size, kLogSegmentMaxHeaderOrFooterSize)); |
317 | 0 | } |
318 | | |
319 | 10.5k | std::vector<uint8_t> header_space(header_size); |
320 | 10.5k | Slice header_slice; |
321 | 10.5k | LogSegmentHeaderPB header; |
322 | | |
323 | | // Read and parse the log segment header. |
324 | 10.5k | RETURN_NOT_OK_PREPEND(ReadFully(readable_file_.get(), kLogSegmentHeaderMagicAndHeaderLength, |
325 | 10.5k | header_size, &header_slice, header_space.data()), |
326 | 10.5k | "Unable to read fully"); |
327 | | |
328 | 10.5k | RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&header, |
329 | 10.5k | header_slice.data(), |
330 | 10.5k | header_size), |
331 | 10.5k | "Unable to parse protobuf"); |
332 | 10.5k | DCHECK(header.IsInitialized()) << "Log segment header must be initialized"13 ; |
333 | | |
334 | 10.5k | header_.CopyFrom(header); |
335 | 10.5k | first_entry_offset_ = header_size + kLogSegmentHeaderMagicAndHeaderLength; |
336 | | |
337 | 10.5k | return true; |
338 | 10.5k | } |
339 | | |
340 | | |
341 | 10.5k | Status ReadableLogSegment::ReadHeaderMagicAndHeaderLength(uint32_t *len) { |
342 | 10.5k | uint8_t scratch[kLogSegmentHeaderMagicAndHeaderLength]; |
343 | 10.5k | Slice slice; |
344 | 10.5k | RETURN_NOT_OK(ReadFully(readable_file_.get(), 0, kLogSegmentHeaderMagicAndHeaderLength, |
345 | 10.5k | &slice, scratch)); |
346 | 10.5k | RETURN_NOT_OK(ParseHeaderMagicAndHeaderLength(slice, len)); |
347 | 10.5k | return Status::OK(); |
348 | 10.5k | } |
349 | | |
350 | | namespace { |
351 | | |
352 | | // We don't run TSAN on this function because it makes it really slow and causes some |
353 | | // test timeouts. This is only used on local buffers anyway, so we don't lose much |
354 | | // by not checking it. |
355 | | ATTRIBUTE_NO_SANITIZE_THREAD |
356 | 26.3k | bool IsAllZeros(const Slice& s) { |
357 | | // Walk a pointer through the slice instead of using s[i] |
358 | | // since this is way faster in debug mode builds. We also do some |
359 | | // manual unrolling for the same purpose. |
360 | 26.3k | const uint8_t* p = s.data(); |
361 | 26.3k | size_t rem = s.size(); |
362 | | |
363 | 3.41G | while (rem >= 8) { |
364 | 3.41G | if (UNALIGNED_LOAD64(p) != 0) return false0 ; |
365 | 3.41G | rem -= 8; |
366 | 3.41G | p += 8; |
367 | 3.41G | } |
368 | | |
369 | 30.7k | while (26.3k rem > 0) { |
370 | 4.38k | if (*p++ != '\0') return false0 ; |
371 | 4.38k | rem--; |
372 | 4.38k | } |
373 | 26.3k | return true; |
374 | 26.3k | } |
375 | | } // anonymous namespace |
376 | | |
377 | | Status ReadableLogSegment::ParseHeaderMagicAndHeaderLength(const Slice &data, |
378 | 10.5k | uint32_t *parsed_len) { |
379 | 10.5k | RETURN_NOT_OK_PREPEND(data.check_size(kLogSegmentHeaderMagicAndHeaderLength), |
380 | 10.5k | "Log segment file is too small to contain initial magic number"); |
381 | | |
382 | 10.5k | if (memcmp(kLogSegmentHeaderMagicString, data.data(), |
383 | 10.5k | strlen(kLogSegmentHeaderMagicString)) != 0) { |
384 | | // As a special case, we check whether the file was allocated but no header |
385 | | // was written. We treat that case as an uninitialized file, much in the |
386 | | // same way we treat zero-length files. |
387 | | // Note: While the above comparison checks 8 bytes, this one checks the full 12 |
388 | | // to ensure we have a full 12 bytes of NULL data. |
389 | 1 | if (IsAllZeros(data)) { |
390 | | // 12 bytes of NULLs, good enough for us to consider this a file that |
391 | | // was never written to (but apparently preallocated). |
392 | 1 | LOG(WARNING) << "Log segment file " << path() << " has 12 initial NULL bytes instead of " |
393 | 1 | << "magic and header length: " << data.ToDebugString() |
394 | 1 | << " and will be treated as a blank segment."; |
395 | 1 | *parsed_len = 0; |
396 | 1 | return Status::OK(); |
397 | 1 | } |
398 | | // If no magic and not uninitialized, the file is considered corrupt. |
399 | 0 | return STATUS(Corruption, Substitute("Invalid log segment file $0: Bad magic. $1", |
400 | 1 | path(), data.ToDebugString())); |
401 | 1 | } |
402 | | |
403 | 10.5k | *parsed_len = DecodeFixed32(data.data() + strlen(kLogSegmentHeaderMagicString)); |
404 | 10.5k | return Status::OK(); |
405 | 10.5k | } |
406 | | |
407 | 10.5k | Status ReadableLogSegment::ReadFooter() { |
408 | 10.5k | uint32_t footer_size; |
409 | 10.5k | RETURN_NOT_OK(ReadFooterMagicAndFooterLength(&footer_size)); |
410 | | |
411 | 6.85k | if (footer_size == 0 || footer_size > kLogSegmentMaxHeaderOrFooterSize6.85k ) { |
412 | 0 | return STATUS(NotFound, |
413 | 0 | Substitute("File is corrupted. " |
414 | 0 | "Parsed header size: $0 is zero or bigger than max header size: $1", |
415 | 0 | footer_size, kLogSegmentMaxHeaderOrFooterSize)); |
416 | 0 | } |
417 | | |
418 | 6.85k | if (footer_size > (file_size() - first_entry_offset_)) { |
419 | 0 | return STATUS(NotFound, "Footer not found. File corrupted. " |
420 | 0 | "Decoded footer length pointed at a footer before the first entry."); |
421 | 0 | } |
422 | | |
423 | 6.85k | std::vector<uint8_t> footer_space(footer_size); |
424 | 6.85k | Slice footer_slice; |
425 | | |
426 | 6.85k | int64_t footer_offset = file_size() - kLogSegmentFooterMagicAndFooterLength - footer_size; |
427 | | |
428 | 6.85k | LogSegmentFooterPB footer; |
429 | | |
430 | | // Read and parse the log segment footer. |
431 | 6.85k | RETURN_NOT_OK_PREPEND(ReadFully(readable_file_.get(), footer_offset, |
432 | 6.85k | footer_size, &footer_slice, footer_space.data()), |
433 | 6.85k | "Footer not found. Could not read fully."); |
434 | | |
435 | 6.85k | RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&footer, |
436 | 6.85k | footer_slice.data(), |
437 | 6.85k | footer_size), |
438 | 6.85k | "Unable to parse protobuf"); |
439 | | |
440 | 6.85k | footer_.Swap(&footer); |
441 | 6.85k | return Status::OK(); |
442 | 6.85k | } |
443 | | |
444 | 10.5k | Status ReadableLogSegment::ReadFooterMagicAndFooterLength(uint32_t *len) { |
445 | 10.5k | uint8_t scratch[kLogSegmentFooterMagicAndFooterLength]; |
446 | 10.5k | Slice slice; |
447 | | |
448 | 10.5k | CHECK_GT(file_size(), kLogSegmentFooterMagicAndFooterLength); |
449 | 10.5k | RETURN_NOT_OK(ReadFully(readable_file_.get(), |
450 | 10.5k | file_size() - kLogSegmentFooterMagicAndFooterLength, |
451 | 10.5k | kLogSegmentFooterMagicAndFooterLength, |
452 | 10.5k | &slice, |
453 | 10.5k | scratch)); |
454 | | |
455 | 10.5k | RETURN_NOT_OK(ParseFooterMagicAndFooterLength(slice, len)); |
456 | 6.85k | return Status::OK(); |
457 | 10.5k | } |
458 | | |
459 | | Status ReadableLogSegment::ParseFooterMagicAndFooterLength(const Slice &data, |
460 | 10.5k | uint32_t *parsed_len) { |
461 | 10.5k | RETURN_NOT_OK_PREPEND(data.check_size(kLogSegmentFooterMagicAndFooterLength), |
462 | 10.5k | "Slice is too small to contain final magic number"); |
463 | | |
464 | 10.5k | if (memcmp(kLogSegmentFooterMagicString, data.data(), |
465 | 10.5k | strlen(kLogSegmentFooterMagicString)) != 0) { |
466 | 3.75k | return STATUS(NotFound, "Footer not found. Footer magic doesn't match"); |
467 | 3.75k | } |
468 | | |
469 | 6.84k | *parsed_len = DecodeFixed32(data.data() + strlen(kLogSegmentFooterMagicString)); |
470 | 6.84k | return Status::OK(); |
471 | 10.5k | } |
472 | | |
473 | 19.2k | ReadEntriesResult ReadableLogSegment::ReadEntries(int64_t max_entries_to_read) { |
474 | 19.2k | TRACE_EVENT1("log", "ReadableLogSegment::ReadEntries", |
475 | 19.2k | "path", path_); |
476 | | |
477 | 19.2k | ReadEntriesResult result; |
478 | | |
479 | 19.2k | std::vector<int64_t> recent_offsets(4, -1); |
480 | 19.2k | int64_t batches_read = 0; |
481 | | |
482 | 19.2k | int64_t offset = first_entry_offset(); |
483 | 19.2k | int64_t readable_to_offset = readable_to_offset_.Load(); |
484 | 19.2k | VLOG(1) << "Reading segment entries from " |
485 | 1 | << path_ << ": offset=" << offset << " file_size=" |
486 | 1 | << file_size() << " readable_to_offset=" << readable_to_offset; |
487 | 19.2k | faststring tmp_buf; |
488 | | |
489 | | // If we have a footer we only read up to it. If we don't we likely crashed |
490 | | // and always read to the end. |
491 | 19.2k | int64_t read_up_to = (footer_.IsInitialized() && !footer_was_rebuilt_15.5k ) ? |
492 | 7.99k | file_size() - footer_.ByteSize() - kLogSegmentFooterMagicAndFooterLength : |
493 | 19.2k | readable_to_offset11.2k ; |
494 | | |
495 | 19.2k | result.end_offset = offset; |
496 | | |
497 | 19.2k | int64_t num_entries_read = 0; |
498 | 3.54M | while (offset < read_up_to) { |
499 | 3.53M | const int64_t this_batch_offset = offset; |
500 | 3.53M | recent_offsets[batches_read++ % recent_offsets.size()] = offset; |
501 | | |
502 | 3.53M | LogEntryBatchPB current_batch; |
503 | | |
504 | | // Read and validate the entry header first. |
505 | 3.53M | Status s; |
506 | 3.53M | if (offset + implicit_cast<ssize_t>(kEntryHeaderSize) < read_up_to) { |
507 | 3.53M | s = ReadEntryHeaderAndBatch(&offset, &tmp_buf, ¤t_batch); |
508 | 3.53M | } else { |
509 | 1.91k | s = STATUS(Corruption, Substitute("Truncated log entry at offset $0", offset)); |
510 | 1.91k | } |
511 | | |
512 | 3.53M | if (PREDICT_FALSE(!s.ok())) { |
513 | 1.13k | if (!s.IsCorruption()) { |
514 | | // IO errors should always propagate back |
515 | 0 | result.status = s.CloneAndPrepend(Substitute("Error reading from log $0", path_)); |
516 | 0 | return result; |
517 | 0 | } |
518 | | |
519 | 1.13k | result.status = MakeCorruptionStatus( |
520 | 1.13k | batches_read, this_batch_offset, &recent_offsets, result.entries, s); |
521 | | |
522 | | // If we have a valid footer in the segment, then the segment was correctly |
523 | | // closed, and we shouldn't see any corruption anywhere (including the last |
524 | | // batch). |
525 | 1.13k | if (HasFooter() && !footer_was_rebuilt_3 ) { |
526 | 3 | LOG(WARNING) << "Found a corruption in a closed log segment: " << result.status; |
527 | 3 | return result; |
528 | 3 | } |
529 | | |
530 | | // If we read a corrupt entry, but we don't have a footer, then it's |
531 | | // possible that we crashed in the middle of writing an entry. |
532 | | // In this case, we scan forward to see if there are any more valid looking |
533 | | // entries after this one in the file. If there are, it's really a corruption. |
534 | | // if not, we just WARN it, since it's OK for the last entry to be partially |
535 | | // written. |
536 | 1.13k | bool has_valid_entries; |
537 | 1.13k | auto status = ScanForValidEntryHeaders(offset, &has_valid_entries); |
538 | 1.13k | if (!status.ok()) { |
539 | 0 | result.status = s.CloneAndPrepend("Scanning forward for valid entries"); |
540 | 0 | } |
541 | | |
542 | 1.13k | if (has_valid_entries) { |
543 | 0 | return result; |
544 | 0 | } |
545 | | |
546 | 1.13k | LOG(INFO) << "Ignoring log segment corruption in " << path_ << " because " |
547 | 1.13k | << "there are no log entries following the corrupted one. " |
548 | 1.13k | << "The server probably crashed in the middle of writing an entry " |
549 | 1.13k | << "to the write-ahead log or downloaded an active log via remote bootstrap. " |
550 | 1.13k | << "Error detail: " << result.status.ToString(); |
551 | 1.13k | break; |
552 | 1.13k | } |
553 | | |
554 | 3.53M | if (VLOG_IS_ON(3)) { |
555 | 0 | VLOG(3) << "Read Log entry batch: " << current_batch.DebugString(); |
556 | 0 | } |
557 | 3.53M | if (current_batch.has_committed_op_id()) { |
558 | 3.50M | result.committed_op_id = yb::OpId::FromPB(current_batch.committed_op_id()); |
559 | 3.50M | } |
560 | | |
561 | | // Number of entries to extract from the protobuf repeated field because the ownership of those |
562 | | // entries will be transferred to the caller. |
563 | 3.53M | int num_entries_to_extract = 0; |
564 | | |
565 | 5.65M | for (int i = 0; i < current_batch.entry_size(); ++i2.12M ) { |
566 | 2.12M | result.entries.emplace_back(current_batch.mutable_entry(i)); |
567 | 2.12M | DCHECK_NE(current_batch.mono_time(), 0); |
568 | 2.12M | LogEntryMetadata entry_metadata; |
569 | 2.12M | entry_metadata.offset = this_batch_offset; |
570 | 2.12M | entry_metadata.active_segment_sequence_number = header().sequence_number(); |
571 | 2.12M | entry_metadata.entry_time = RestartSafeCoarseTimePoint::FromUInt64(current_batch.mono_time()); |
572 | 2.12M | result.entry_metadata.emplace_back(std::move(entry_metadata)); |
573 | 2.12M | num_entries_read++; |
574 | 2.12M | num_entries_to_extract++; |
575 | 2.12M | if (num_entries_read >= max_entries_to_read) { |
576 | 7.60k | break; |
577 | 7.60k | } |
578 | 2.12M | } |
579 | 3.53M | current_batch.mutable_entry()->ExtractSubrange( |
580 | 3.53M | 0, num_entries_to_extract, /* elements */ nullptr); |
581 | 3.53M | result.end_offset = offset; |
582 | 3.53M | if (num_entries_read >= max_entries_to_read) { |
583 | 7.60k | result.status = Status::OK(); |
584 | 7.60k | return result; |
585 | 7.60k | } |
586 | 3.53M | } |
587 | | |
588 | 11.6k | if (footer_.IsInitialized() && footer_.num_entries() != num_entries_read7.89k ) { |
589 | 0 | result.status = STATUS_FORMAT( |
590 | 0 | Corruption, |
591 | 0 | "Read $0 log entries from $1, but expected $2 based on the footer", |
592 | 0 | num_entries_read, path_, footer_.num_entries()); |
593 | 0 | } |
594 | | |
595 | 11.6k | result.status = Status::OK(); |
596 | 11.6k | return result; |
597 | 19.2k | } |
598 | | |
599 | 7.64k | Result<FirstEntryMetadata> ReadableLogSegment::ReadFirstEntryMetadata() { |
600 | 7.64k | auto read_result = ReadEntries(/* max_entries_to_read */ 1); |
601 | 7.64k | const auto& entries = read_result.entries; |
602 | 7.64k | const auto& entry_metadata_records = read_result.entry_metadata; |
603 | 7.64k | if (entries.empty()) { |
604 | 35 | return STATUS(NotFound, "No entries found"); |
605 | 35 | } |
606 | 7.61k | if (entry_metadata_records.empty()) { |
607 | 0 | return STATUS(NotFound, "No entry metadata found"); |
608 | 0 | } |
609 | 7.61k | auto& first_entry = *entries.front(); |
610 | 7.61k | if (!first_entry.has_replicate()) { |
611 | 0 | return STATUS(NotFound, "No REPLICATE message found in the first entry"); |
612 | 0 | } |
613 | | |
614 | 7.61k | return FirstEntryMetadata { |
615 | 7.61k | .op_id = OpId::FromPB(first_entry.replicate().id()), |
616 | 7.61k | .entry_time = entry_metadata_records.front().entry_time |
617 | 7.61k | }; |
618 | 7.61k | } |
619 | | |
620 | 1.12k | Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries) { |
621 | 1.12k | TRACE_EVENT1("log", "ReadableLogSegment::ScanForValidEntryHeaders", |
622 | 1.12k | "path", path_); |
623 | 1.12k | LOG(INFO) << "Scanning " << path_ << " for valid entry headers " |
624 | 1.12k | << "following offset " << offset << "..."; |
625 | 1.12k | *has_valid_entries = false; |
626 | | |
627 | 1.12k | const int kChunkSize = 1024 * 1024; |
628 | 1.12k | std::unique_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]); |
629 | | |
630 | | // We overlap the reads by the size of the header, so that if a header |
631 | | // spans chunks, we don't miss it. |
632 | 1.12k | for (; |
633 | 27.4k | offset < implicit_cast<int64_t>(file_size() - kEntryHeaderSize); |
634 | 26.3k | offset += kChunkSize - kEntryHeaderSize) { |
635 | 26.3k | auto rem = std::min<int64_t>(file_size() - offset, kChunkSize); |
636 | 26.3k | Slice chunk; |
637 | | // If encryption is enabled, need to use checkpoint file to read pre-allocated file since |
638 | | // we want to preserve all 0s. |
639 | 26.3k | RETURN_NOT_OK(ReadFully( |
640 | 26.3k | readable_file_checkpoint().get(), offset + readable_file()->GetEncryptionHeaderSize(), rem, |
641 | 26.3k | &chunk, &buf[0])); |
642 | | |
643 | | // Optimization for the case where a chunk is all zeros -- this is common in the |
644 | | // case of pre-allocated files. This avoids a lot of redundant CRC calculation. |
645 | 26.3k | if (IsAllZeros(chunk)) { |
646 | 26.3k | continue; |
647 | 26.3k | } |
648 | | |
649 | 0 | if (readable_file()->IsEncrypted()) { |
650 | | // If encryption enabled, decrypt the contents of the file. |
651 | 0 | RETURN_NOT_OK(ReadFully(readable_file().get(), offset, rem, &chunk, &buf[0])); |
652 | 0 | } |
653 | | |
654 | | // Check if this chunk has a valid entry header. |
655 | 0 | for (size_t off_in_chunk = 0; |
656 | 0 | off_in_chunk < chunk.size() - kEntryHeaderSize; |
657 | 0 | off_in_chunk++) { |
658 | 0 | const Slice potential_header = Slice(chunk.data() + off_in_chunk, kEntryHeaderSize); |
659 | |
|
660 | 0 | EntryHeader header; |
661 | 0 | if (DecodeEntryHeader(potential_header, &header).ok()) { |
662 | 0 | LOG(INFO) << "Found a valid entry header at offset " << (offset + off_in_chunk); |
663 | 0 | *has_valid_entries = true; |
664 | 0 | return Status::OK(); |
665 | 0 | } |
666 | 0 | } |
667 | 0 | } |
668 | | |
669 | 1.12k | LOG(INFO) << "Found no log entry headers"; |
670 | 1.12k | return Status::OK(); |
671 | 1.12k | } |
672 | | |
673 | | Status ReadableLogSegment::MakeCorruptionStatus( |
674 | | size_t batch_number, |
675 | | int64_t batch_offset, |
676 | | std::vector<int64_t>* recent_offsets, |
677 | | const std::vector<std::unique_ptr<LogEntryPB>>& entries, |
678 | 1.13k | const Status& status) const { |
679 | | |
680 | 1.13k | string err = "Log file corruption detected. "; |
681 | 1.13k | SubstituteAndAppend(&err, "Failed trying to read batch #$0 at offset $1 for log segment $2: ", |
682 | 1.13k | batch_number, batch_offset, path_); |
683 | 1.13k | err.append("Prior batch offsets:"); |
684 | 1.13k | std::sort(recent_offsets->begin(), recent_offsets->end()); |
685 | 4.52k | for (int64_t offset : *recent_offsets) { |
686 | 4.52k | if (offset >= 0) { |
687 | 4.40k | SubstituteAndAppend(&err, " $0", offset); |
688 | 4.40k | } |
689 | 4.52k | } |
690 | 1.13k | if (!entries.empty()) { |
691 | 1.12k | err.append("; Last log entries read:"); |
692 | 1.12k | const int kNumEntries = 4; // Include up to the last 4 entries in the segment. |
693 | 1.12k | for (size_t i = std::max(0, static_cast<int>(entries.size()) - kNumEntries); |
694 | 4.83k | i < entries.size(); i++3.71k ) { |
695 | 3.71k | LogEntryPB* entry = entries[i].get(); |
696 | 3.71k | LogEntryTypePB type = entry->type(); |
697 | 3.71k | string opid_str; |
698 | 3.71k | if (type == log::REPLICATE && entry->has_replicate()3.70k ) { |
699 | 3.70k | opid_str = consensus::OpIdToString(entry->replicate().id()); |
700 | 3.70k | } else { |
701 | 2 | opid_str = "<unknown>"; |
702 | 2 | } |
703 | 3.71k | SubstituteAndAppend(&err, " [$0 ($1)]", LogEntryTypePB_Name(type), opid_str); |
704 | 3.71k | } |
705 | 1.12k | } |
706 | | |
707 | 1.13k | return status.CloneAndAppend(err); |
708 | 1.13k | } |
709 | | |
710 | | Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf, |
711 | 3.56M | LogEntryBatchPB* batch) { |
712 | 3.56M | EntryHeader header; |
713 | 3.56M | RETURN_NOT_OK(ReadEntryHeader(offset, &header)); |
714 | 3.56M | RETURN_NOT_OK(ReadEntryBatch(offset, header, tmp_buf, batch)); |
715 | 3.56M | return Status::OK(); |
716 | 3.56M | } |
717 | | |
718 | | |
719 | 3.56M | Status ReadableLogSegment::ReadEntryHeader(int64_t *offset, EntryHeader* header) { |
720 | 3.56M | uint8_t scratch[kEntryHeaderSize]; |
721 | 3.56M | Slice slice; |
722 | 3.56M | RETURN_NOT_OK_PREPEND(ReadFully(readable_file().get(), *offset, kEntryHeaderSize, |
723 | 3.56M | &slice, scratch), |
724 | 3.56M | "Could not read log entry header"); |
725 | | |
726 | 3.56M | RETURN_NOT_OK(DecodeEntryHeader(slice, header)); |
727 | 3.56M | *offset += slice.size(); |
728 | 3.56M | return Status::OK(); |
729 | 3.56M | } |
730 | | |
731 | 3.56M | Status ReadableLogSegment::DecodeEntryHeader(const Slice& data, EntryHeader* header) { |
732 | 3.56M | DCHECK_EQ(kEntryHeaderSize, data.size()); |
733 | 3.56M | header->msg_length = DecodeFixed32(data.data()); |
734 | 3.56M | header->msg_crc = DecodeFixed32(data.data() + 4); |
735 | 3.56M | header->header_crc = DecodeFixed32(data.data() + 8); |
736 | | |
737 | | // Verify the header. |
738 | 3.56M | uint32_t computed_crc = crc::Crc32c(data.data(), 8); |
739 | 3.56M | if (computed_crc != header->header_crc) { |
740 | 1.13k | return STATUS_FORMAT( |
741 | 1.13k | Corruption, "Invalid checksum in log entry head header: found=$0, computed=$1", |
742 | 1.13k | header->header_crc, computed_crc); |
743 | 1.13k | } |
744 | 3.56M | return Status::OK(); |
745 | 3.56M | } |
746 | | |
747 | | |
748 | | Status ReadableLogSegment::ReadEntryBatch(int64_t *offset, |
749 | | const EntryHeader& header, |
750 | | faststring *tmp_buf, |
751 | 3.56M | LogEntryBatchPB* entry_batch) { |
752 | 3.56M | TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch", |
753 | 3.56M | "path", path_, |
754 | 3.56M | "range", Substitute("offset=$0 entry_len=$1", |
755 | 3.56M | *offset, header.msg_length)); |
756 | | |
757 | 3.56M | if (header.msg_length == 0) { |
758 | 0 | return STATUS(Corruption, "Invalid 0 entry length"); |
759 | 0 | } |
760 | 3.56M | int64_t limit = readable_up_to(); |
761 | 3.56M | if (PREDICT_FALSE(header.msg_length + *offset > limit)) { |
762 | | // The log was likely truncated during writing. |
763 | 1 | return STATUS(Corruption, |
764 | 1 | Substitute("Could not read $0-byte log entry from offset $1 in $2: " |
765 | 1 | "log only readable up to offset $3", |
766 | 1 | header.msg_length, *offset, path_, limit)); |
767 | 1 | } |
768 | | |
769 | 3.56M | tmp_buf->clear(); |
770 | 3.56M | tmp_buf->resize(header.msg_length); |
771 | 3.56M | Slice entry_batch_slice; |
772 | | |
773 | 3.56M | Status s = readable_file()->Read(*offset, |
774 | 3.56M | header.msg_length, |
775 | 3.56M | &entry_batch_slice, |
776 | 3.56M | tmp_buf->data()); |
777 | | |
778 | 3.56M | if (!s.ok()) return 0 STATUS0 (IOError, Substitute("Could not read entry. Cause: $0", |
779 | 3.56M | s.ToString())); |
780 | | |
781 | | // Verify the CRC. |
782 | 3.56M | uint32_t read_crc = crc::Crc32c(entry_batch_slice.data(), entry_batch_slice.size()); |
783 | 3.56M | if (PREDICT_FALSE(read_crc != header.msg_crc)) { |
784 | 2 | return STATUS(Corruption, Substitute("Entry CRC mismatch in byte range $0-$1: " |
785 | 2 | "expected CRC=$2, computed=$3", |
786 | 2 | *offset, *offset + header.msg_length, |
787 | 2 | header.msg_crc, read_crc)); |
788 | 2 | } |
789 | | |
790 | | |
791 | 3.56M | LogEntryBatchPB read_entry_batch; |
792 | 3.56M | s = pb_util::ParseFromArray(&read_entry_batch, |
793 | 3.56M | entry_batch_slice.data(), |
794 | 3.56M | header.msg_length); |
795 | | |
796 | 3.56M | if (!s.ok()) return 0 STATUS0 (Corruption, Substitute("Could parse PB. Cause: $0", |
797 | 3.56M | s.ToString())); |
798 | | |
799 | 3.56M | *offset += entry_batch_slice.size(); |
800 | 3.56M | entry_batch->Swap(&read_entry_batch); |
801 | 3.56M | return Status::OK(); |
802 | 3.56M | } |
803 | | |
804 | 26.6M | const LogSegmentHeaderPB& ReadableLogSegment::header() const { |
805 | 26.6M | DCHECK(header_.IsInitialized()); |
806 | 26.6M | return header_; |
807 | 26.6M | } |
808 | | |
809 | | WritableLogSegment::WritableLogSegment(string path, |
810 | | shared_ptr<WritableFile> writable_file) |
811 | | : path_(std::move(path)), |
812 | | writable_file_(std::move(writable_file)), |
813 | | is_header_written_(false), |
814 | | is_footer_written_(false), |
815 | 160k | written_offset_(0) {} |
816 | | |
817 | 160k | Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_header) { |
818 | 18.4E | DCHECK(!IsHeaderWritten()) << "Can only call WriteHeader() once"; |
819 | 18.4E | DCHECK(new_header.IsInitialized()) |
820 | 18.4E | << "Log segment header must be initialized" << new_header.InitializationErrorString(); |
821 | 160k | faststring buf; |
822 | | |
823 | | // First the magic. |
824 | 160k | buf.append(kLogSegmentHeaderMagicString); |
825 | | // Then Length-prefixed header. |
826 | 160k | PutFixed32(&buf, new_header.ByteSize()); |
827 | | // Then Serialize the PB. |
828 | 160k | pb_util::AppendToString(new_header, &buf); |
829 | 160k | RETURN_NOT_OK(writable_file()->Append(Slice(buf))); |
830 | | |
831 | 160k | header_.CopyFrom(new_header); |
832 | 160k | first_entry_offset_ = buf.size(); |
833 | 160k | written_offset_ = first_entry_offset_; |
834 | 160k | is_header_written_ = true; |
835 | | |
836 | 160k | return Status::OK(); |
837 | 160k | } |
838 | | |
839 | 85.4k | Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer) { |
840 | 85.4k | TRACE_EVENT1("log", "WritableLogSegment::WriteFooterAndClose", |
841 | 85.4k | "path", path_); |
842 | 85.4k | DCHECK(IsHeaderWritten()); |
843 | 85.4k | DCHECK(!IsFooterWritten()); |
844 | 85.4k | DCHECK(footer.IsInitialized()) << footer.InitializationErrorString()20 ; |
845 | | |
846 | 85.4k | faststring buf; |
847 | | |
848 | 85.4k | pb_util::AppendToString(footer, &buf); |
849 | | |
850 | 85.4k | buf.append(kLogSegmentFooterMagicString); |
851 | 85.4k | PutFixed32(&buf, footer.ByteSize()); |
852 | | |
853 | 85.4k | RETURN_NOT_OK_PREPEND(writable_file()->Append(Slice(buf)), "Could not write the footer"); |
854 | | |
855 | 85.4k | footer_.CopyFrom(footer); |
856 | 85.4k | is_footer_written_ = true; |
857 | | |
858 | 85.4k | RETURN_NOT_OK(writable_file_->Close()); |
859 | | |
860 | 85.4k | written_offset_ += buf.size(); |
861 | | |
862 | 85.4k | return Status::OK(); |
863 | 85.4k | } |
864 | | |
865 | | |
866 | 25.0M | Status WritableLogSegment::WriteEntryBatch(const Slice& data) { |
867 | 25.0M | DCHECK(is_header_written_); |
868 | 25.0M | DCHECK(!is_footer_written_); |
869 | 25.0M | uint8_t header_buf[kEntryHeaderSize]; |
870 | | |
871 | | // First encode the length of the message. |
872 | 25.0M | auto len = data.size(); |
873 | 25.0M | InlineEncodeFixed32(&header_buf[0], narrow_cast<uint32_t>(len)); |
874 | | |
875 | | // Then the CRC of the message. |
876 | 25.0M | uint32_t msg_crc = crc::Crc32c(data.data(), data.size()); |
877 | 25.0M | InlineEncodeFixed32(&header_buf[4], msg_crc); |
878 | | |
879 | | // Then the CRC of the header |
880 | 25.0M | uint32_t header_crc = crc::Crc32c(&header_buf, 8); |
881 | 25.0M | InlineEncodeFixed32(&header_buf[8], header_crc); |
882 | | |
883 | 25.0M | std::array<Slice, 2> slices = { |
884 | 25.0M | Slice(header_buf, sizeof(header_buf)), |
885 | 25.0M | Slice(data), |
886 | 25.0M | }; |
887 | | |
888 | | // Write the header to the file, followed by the batch data itself. |
889 | 25.0M | RETURN_NOT_OK(writable_file_->AppendSlices(slices.data(), slices.size())); |
890 | 25.0M | written_offset_ += sizeof(header_buf) + data.size(); |
891 | | |
892 | 25.0M | return Status::OK(); |
893 | 25.0M | } |
894 | | |
895 | 547k | Status WritableLogSegment::Sync() { |
896 | 547k | return writable_file_->Sync(); |
897 | 547k | } |
898 | | |
899 | | // Creates a LogEntryBatchPB from pre-allocated ReplicateMsgs managed using shared pointers. The |
900 | | // caller has to ensure these messages are not deleted twice, both by LogEntryBatchPB and by |
901 | | // the shared pointers. |
902 | 25.0M | LogEntryBatchPB CreateBatchFromAllocatedOperations(const ReplicateMsgs& msgs) { |
903 | 25.0M | LogEntryBatchPB result; |
904 | 25.0M | result.set_mono_time(RestartSafeCoarseMonoClock().Now().ToUInt64()); |
905 | 25.0M | result.mutable_entry()->Reserve(narrow_cast<int>(msgs.size())); |
906 | 25.0M | for (const auto& msg_ptr : msgs) { |
907 | 15.2M | LogEntryPB* entry_pb = result.add_entry(); |
908 | 15.2M | entry_pb->set_type(log::REPLICATE); |
909 | | // entry_pb does not actually own the ReplicateMsg object, even though it thinks it does, |
910 | | // because we release it in ~LogEntryBatch. LogEntryBatchPB has a separate vector of shared |
911 | | // pointers to messages. |
912 | 15.2M | entry_pb->set_allocated_replicate(msg_ptr.get()); |
913 | 15.2M | } |
914 | 25.0M | return result; |
915 | 25.0M | } |
916 | | |
917 | 324k | bool IsLogFileName(const string& fname) { |
918 | 324k | if (HasPrefixString(fname, ".")) { |
919 | | // Hidden file or ./.. |
920 | 302k | VLOG(1) << "Ignoring hidden file: " << fname98 ; |
921 | 302k | return false; |
922 | 302k | } |
923 | | |
924 | 22.5k | if (HasSuffixString(fname, kTmpSuffix)) { |
925 | 0 | LOG(WARNING) << "Ignoring tmp file: " << fname; |
926 | 0 | return false; |
927 | 0 | } |
928 | | |
929 | 22.5k | vector<string> v = strings::Split(fname, "-"); |
930 | 22.5k | if (v.size() != 2 || v[0] != FsManager::kWalFileNamePrefix20.9k ) { |
931 | 18.4E | VLOG(1) << "Not a log file: " << fname; |
932 | 1.60k | return false; |
933 | 1.60k | } |
934 | | |
935 | 20.9k | return true; |
936 | 22.5k | } |
937 | | |
938 | 177 | std::vector<std::string> ParseDirFlags(string flag_dirs, string flag_name) { |
939 | 177 | std::vector<std::string> paths = strings::Split(flag_dirs, ",", strings::SkipEmpty()); |
940 | 177 | return paths; |
941 | 177 | } |
942 | | |
943 | 177 | Status CheckPathsAreODirectWritable(const std::vector<std::string> &paths) { |
944 | 177 | Env *def_env = Env::Default(); |
945 | 177 | for (const auto &path : paths) { |
946 | 163 | RETURN_NOT_OK(CheckODirectTempFileCreationInDir(def_env, path)); |
947 | 163 | } |
948 | 177 | return Status::OK(); |
949 | 177 | } |
950 | | |
951 | 63 | Status CheckRelevantPathsAreODirectWritable() { |
952 | 63 | if (!FLAGS_log_dir.empty()) { |
953 | 51 | RETURN_NOT_OK_PREPEND(CheckPathsAreODirectWritable(ParseDirFlags( |
954 | 51 | FLAGS_log_dir, "--log_dir")), "Not all log_dirs are O_DIRECT Writable."); |
955 | 51 | } |
956 | 63 | RETURN_NOT_OK_PREPEND(CheckPathsAreODirectWritable(ParseDirFlags( |
957 | 63 | FLAGS_fs_data_dirs, "--data_dirs")), "Not all fs_data_dirs are O_DIRECT Writable."); |
958 | | |
959 | 63 | RETURN_NOT_OK_PREPEND(CheckPathsAreODirectWritable(ParseDirFlags( |
960 | 63 | FLAGS_fs_wal_dirs, "--wal_dirs")), "Not all fs_wal_dirs are O_DIRECT Writable."); |
961 | 63 | return Status::OK(); |
962 | 63 | } |
963 | | |
964 | 14.6k | Status ModifyDurableWriteFlagIfNotODirect() { |
965 | 14.6k | if (FLAGS_durable_wal_write) { |
966 | 63 | Status s = CheckRelevantPathsAreODirectWritable(); |
967 | 63 | if (!s.ok()) { |
968 | 0 | if (FLAGS_require_durable_wal_write) { |
969 | | // Crash with appropriate error. |
970 | 0 | RETURN_NOT_OK_PREPEND(s, "require_durable_wal_write is set true, but O_DIRECT is " |
971 | 0 | "not allowed.") |
972 | 0 | } else { |
973 | | // Report error but do not crash. |
974 | 0 | LOG(ERROR) << "O_DIRECT is not allowed in some of the directories. " |
975 | 0 | "Setting durable wal write flag to false."; |
976 | 0 | FLAGS_durable_wal_write = false; |
977 | 0 | } |
978 | 0 | } |
979 | 63 | } |
980 | 14.6k | return Status::OK(); |
981 | 14.6k | } |
982 | | |
983 | | } // namespace log |
984 | | } // namespace yb |