/Users/deen/code/yugabyte-db/src/yb/consensus/log_reader.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/log_reader.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | |
38 | | #include <glog/logging.h> |
39 | | |
40 | | #include "yb/consensus/consensus_util.h" |
41 | | #include "yb/consensus/log_index.h" |
42 | | #include "yb/consensus/log_util.h" |
43 | | |
44 | | #include "yb/gutil/dynamic_annotations.h" |
45 | | |
46 | | #include "yb/util/env_util.h" |
47 | | #include "yb/util/flag_tags.h" |
48 | | #include "yb/util/logging.h" |
49 | | #include "yb/util/metrics.h" |
50 | | #include "yb/util/monotime.h" |
51 | | #include "yb/util/path_util.h" |
52 | | #include "yb/util/result.h" |
53 | | |
54 | | DEFINE_bool(enable_log_retention_by_op_idx, true, |
55 | | "If true, logs will be retained based on an op id passed by the cdc service"); |
56 | | |
57 | | DEFINE_int32(log_max_seconds_to_retain, 24 * 3600, "Log files that are older will be " |
58 | | "deleted even if they contain cdc unreplicated entries. If 0, this flag will be " |
59 | | "ignored. This flag is ignored if a log segment contains entries that haven't been" |
60 | | "flushed to RocksDB."); |
61 | | |
62 | | DEFINE_int64(log_stop_retaining_min_disk_mb, 100 * 1024, "Stop retaining logs if the space " |
63 | | "available for the logs falls below this limit. This flag is ignored if a log segment " |
64 | | "contains unflushed entries."); |
65 | | |
66 | | METRIC_DEFINE_counter(tablet, log_reader_bytes_read, "Bytes Read From Log", |
67 | | yb::MetricUnit::kBytes, |
68 | | "Data read from the WAL since tablet start"); |
69 | | |
70 | | METRIC_DEFINE_counter(tablet, log_reader_entries_read, "Entries Read From Log", |
71 | | yb::MetricUnit::kEntries, |
72 | | "Number of entries read from the WAL since tablet start"); |
73 | | |
74 | | METRIC_DEFINE_coarse_histogram(table, log_reader_read_batch_latency, "Log Read Latency", |
75 | | yb::MetricUnit::kBytes, |
76 | | "Microseconds spent reading log entry batches"); |
77 | | |
78 | | DEFINE_test_flag(bool, record_segments_violate_max_time_policy, false, |
79 | | "If set, everytime GetSegmentPrefixNotIncluding runs, segments that violate the max time " |
80 | | "policy will be appended to LogReader::segments_violate_max_time_policy_."); |
81 | | |
82 | | DEFINE_test_flag(bool, record_segments_violate_min_space_policy, false, |
83 | | "If set, everytime GetSegmentPrefixNotIncluding runs, segments that violate the max time " |
84 | | "policy will be appended to LogReader::segments_violate_min_space_policy_."); |
85 | | |
86 | | DEFINE_bool(get_changes_honor_deadline, true, |
87 | | "Toggle whether to honor the deadline passed to log reader"); |
88 | | |
89 | | DEFINE_test_flag(int32, get_changes_read_loop_delay_ms, 0, |
90 | | "Amount of time to sleep for between each iteration of the loop in " |
91 | | "ReadReplicatesInRange. This is used to test the return of partial results."); |
92 | | |
93 | | namespace yb { |
94 | | namespace log { |
95 | | |
96 | | namespace { |
97 | | struct LogSegmentSeqnoComparator { |
98 | | bool operator() (const scoped_refptr<ReadableLogSegment>& a, |
99 | 27.7k | const scoped_refptr<ReadableLogSegment>& b) { |
100 | 27.7k | return a->header().sequence_number() < b->header().sequence_number(); |
101 | 27.7k | } |
102 | | }; |
103 | | } |
104 | | |
105 | | using consensus::ReplicateMsg; |
106 | | using env_util::ReadFully; |
107 | | using strings::Substitute; |
108 | | |
109 | | const int64_t LogReader::kNoSizeLimit = -1; |
110 | | |
111 | | Status LogReader::Open(Env *env, |
112 | | const scoped_refptr<LogIndex>& index, |
113 | | std::string log_prefix, |
114 | | const std::string& tablet_wal_path, |
115 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
116 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
117 | 151k | std::unique_ptr<LogReader> *reader) { |
118 | 151k | std::unique_ptr<LogReader> log_reader(new LogReader( |
119 | 151k | env, index, std::move(log_prefix), table_metric_entity, tablet_metric_entity)); |
120 | | |
121 | 151k | RETURN_NOT_OK(log_reader->Init(tablet_wal_path)); |
122 | 151k | *reader = std::move(log_reader); |
123 | 151k | return Status::OK(); |
124 | 151k | } |
125 | | |
126 | | LogReader::LogReader(Env* env, |
127 | | const scoped_refptr<LogIndex>& index, |
128 | | std::string log_prefix, |
129 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
130 | | const scoped_refptr<MetricEntity>& tablet_metric_entity) |
131 | | : env_(env), |
132 | | log_index_(index), |
133 | | log_prefix_(std::move(log_prefix)), |
134 | 151k | state_(kLogReaderInitialized) { |
135 | 151k | if (table_metric_entity) { |
136 | 150k | read_batch_latency_ = METRIC_log_reader_read_batch_latency.Instantiate(table_metric_entity); |
137 | 150k | } |
138 | 151k | if (tablet_metric_entity) { |
139 | 150k | bytes_read_ = METRIC_log_reader_bytes_read.Instantiate(tablet_metric_entity); |
140 | 150k | entries_read_ = METRIC_log_reader_entries_read.Instantiate(tablet_metric_entity); |
141 | 150k | } |
142 | 151k | if (PREDICT_FALSE(FLAGS_enable_log_retention_by_op_idx && |
143 | 151k | (FLAGS_TEST_record_segments_violate_max_time_policy || |
144 | 151k | FLAGS_TEST_record_segments_violate_min_space_policy))) { |
145 | 2 | segments_violate_max_time_policy_ = std::make_unique<SegmentSequence>(); |
146 | 2 | segments_violate_min_space_policy_ = std::make_unique<SegmentSequence>(); |
147 | 2 | } |
148 | 151k | } |
149 | | |
150 | 76.3k | LogReader::~LogReader() { |
151 | 76.3k | } |
152 | | |
153 | 151k | Status LogReader::Init(const string& tablet_wal_path) { |
154 | 151k | { |
155 | 151k | std::lock_guard<simple_spinlock> lock(lock_); |
156 | 151k | CHECK_EQ(state_, kLogReaderInitialized) << "bad state for Init(): " << state_0 ; |
157 | 151k | } |
158 | 18.4E | VLOG_WITH_PREFIX(1) << "Reading wal from path:" << tablet_wal_path; |
159 | | |
160 | 151k | if (!env_->FileExists(tablet_wal_path)) { |
161 | 0 | return STATUS(IllegalState, "Cannot find wal location at", tablet_wal_path); |
162 | 0 | } |
163 | | |
164 | 18.4E | VLOG_WITH_PREFIX(1) << "Parsing segments from path: " << tablet_wal_path; |
165 | | |
166 | 151k | std::vector<string> files_from_log_directory; |
167 | 151k | RETURN_NOT_OK_PREPEND(env_->GetChildren(tablet_wal_path, &files_from_log_directory), |
168 | 151k | "Unable to read children from path"); |
169 | | |
170 | 151k | SegmentSequence read_segments; |
171 | | |
172 | | // Build a log segment from log files, ignoring non log files. |
173 | 313k | for (const string &potential_log_file : files_from_log_directory) { |
174 | 313k | if (!IsLogFileName(potential_log_file)) { |
175 | 302k | continue; |
176 | 302k | } |
177 | | |
178 | 10.6k | string fqp = JoinPathSegments(tablet_wal_path, potential_log_file); |
179 | 10.6k | auto segment = VERIFY_RESULT_PREPEND(ReadableLogSegment::Open(env_, fqp), |
180 | 10.6k | Format("Unable to open readable log segment: $0", fqp)); |
181 | 10.6k | if (!segment) { |
182 | 1 | LOG_WITH_PREFIX(INFO) << "Log segment w/o header: " << fqp << ", skipping"; |
183 | 1 | continue; |
184 | 1 | } |
185 | 10.6k | CHECK(segment->IsInitialized()) << "Uninitialized segment at: " << segment->path()33 ; |
186 | | |
187 | 10.6k | if (!segment->HasFooter()) { |
188 | 3.75k | LOG_WITH_PREFIX(WARNING) |
189 | 3.75k | << "Log segment " << fqp << " was likely left in-progress " |
190 | 3.75k | "after a previous crash. Will try to rebuild footer by scanning data."; |
191 | 3.75k | RETURN_NOT_OK(segment->RebuildFooterByScanning()); |
192 | 3.75k | } |
193 | | |
194 | 10.6k | read_segments.push_back(segment); |
195 | 10.6k | } |
196 | | |
197 | | // Sort the segments by sequence number. |
198 | 151k | std::sort(read_segments.begin(), read_segments.end(), LogSegmentSeqnoComparator()); |
199 | | |
200 | 151k | { |
201 | 151k | std::lock_guard<simple_spinlock> lock(lock_); |
202 | | |
203 | 151k | string previous_seg_path; |
204 | 151k | int64_t previous_seg_seqno = -1; |
205 | 151k | for (const SegmentSequence::value_type& entry : read_segments) { |
206 | 18.4E | VLOG_WITH_PREFIX(1) << " Log Reader Indexed: " << entry->footer().ShortDebugString(); |
207 | | // Check that the log segments are in sequence. |
208 | 10.5k | if (previous_seg_seqno != -1 && |
209 | 10.5k | entry->header().sequence_number() != implicit_cast<size_t>(previous_seg_seqno) + 17.75k ) { |
210 | 0 | return STATUS(Corruption, Substitute("Segment sequence numbers are not consecutive. " |
211 | 0 | "Previous segment: seqno $0, path $1; Current segment: seqno $2, path $3", |
212 | 0 | previous_seg_seqno, previous_seg_path, |
213 | 0 | entry->header().sequence_number(), entry->path())); |
214 | 0 | previous_seg_seqno++; |
215 | 10.5k | } else { |
216 | 10.5k | previous_seg_seqno = entry->header().sequence_number(); |
217 | 10.5k | } |
218 | 10.5k | previous_seg_path = entry->path(); |
219 | 10.5k | RETURN_NOT_OK(AppendSegmentUnlocked(entry)); |
220 | 10.5k | } |
221 | | |
222 | 151k | state_ = kLogReaderReading; |
223 | 151k | } |
224 | 0 | return Status::OK(); |
225 | 151k | } |
226 | | |
227 | 1 | Status LogReader::InitEmptyReaderForTests() { |
228 | 1 | std::lock_guard<simple_spinlock> lock(lock_); |
229 | 1 | state_ = kLogReaderReading; |
230 | 1 | return Status::OK(); |
231 | 1 | } |
232 | | |
233 | 0 | bool LogReader::ViolatesMaxTimePolicy(const scoped_refptr<ReadableLogSegment>& segment) const { |
234 | 0 | if (FLAGS_log_max_seconds_to_retain <= 0) { |
235 | 0 | return false; |
236 | 0 | } |
237 | | |
238 | 0 | if (!segment->HasFooter()) { |
239 | 0 | return false; |
240 | 0 | } |
241 | | |
242 | 0 | int64_t now = GetCurrentTimeMicros(); |
243 | 0 | int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000; |
244 | 0 | if (age_seconds > FLAGS_log_max_seconds_to_retain) { |
245 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 300) |
246 | 0 | << "Segment " << segment->path() << " violates max retention time policy. " |
247 | 0 | << "Segment age: " << age_seconds << " seconds. " |
248 | 0 | << "log_max_seconds_to_retain: " << FLAGS_log_max_seconds_to_retain; |
249 | 0 | if (PREDICT_FALSE(FLAGS_TEST_record_segments_violate_max_time_policy)) { |
250 | 0 | segments_violate_max_time_policy_->push_back(segment); |
251 | 0 | } |
252 | 0 | return true; |
253 | 0 | } |
254 | 0 | return false; |
255 | 0 | } |
256 | | |
257 | | bool LogReader::ViolatesMinSpacePolicy(const scoped_refptr<ReadableLogSegment>& segment, |
258 | 0 | int64_t *potential_reclaimed_space) const { |
259 | 0 | if (FLAGS_log_stop_retaining_min_disk_mb <= 0) { |
260 | 0 | return false; |
261 | 0 | } |
262 | 0 | auto free_space_result = env_->GetFreeSpaceBytes(segment->path()); |
263 | 0 | if (!free_space_result.ok()) { |
264 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 300) << "Unable to get free space: " << free_space_result; |
265 | 0 | return false; |
266 | 0 | } else { |
267 | 0 | uint64_t free_space = *free_space_result; |
268 | 0 | if (free_space + *potential_reclaimed_space < |
269 | 0 | implicit_cast<size_t>(FLAGS_log_stop_retaining_min_disk_mb) * 1024) { |
270 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 300) |
271 | 0 | << "Segment " << segment->path() << " violates minimum free space policy " |
272 | 0 | << "specified by log_stop_retaining_min_disk_mb: " |
273 | 0 | << FLAGS_log_stop_retaining_min_disk_mb; |
274 | 0 | *potential_reclaimed_space += segment->file_size(); |
275 | 0 | if (PREDICT_FALSE(FLAGS_TEST_record_segments_violate_min_space_policy)) { |
276 | 0 | segments_violate_min_space_policy_->push_back(segment); |
277 | 0 | } |
278 | 0 | return true; |
279 | 0 | } |
280 | 0 | } |
281 | 0 | return false; |
282 | 0 | } |
283 | | |
284 | 5 | Status LogReader::GetSegmentPrefixNotIncluding(int64_t index, SegmentSequence* segments) const { |
285 | 5 | return GetSegmentPrefixNotIncluding(index, index, segments); |
286 | 5 | } |
287 | | |
288 | | Status LogReader::GetSegmentPrefixNotIncluding(int64_t index, int64_t cdc_max_replicated_index, |
289 | 50.4M | SegmentSequence* segments) const { |
290 | 50.4M | DCHECK_GE(index, 0); |
291 | 50.4M | DCHECK(segments); |
292 | 50.4M | segments->clear(); |
293 | | |
294 | 50.4M | std::lock_guard<simple_spinlock> lock(lock_); |
295 | 50.4M | CHECK_EQ(state_, kLogReaderReading); |
296 | | |
297 | 50.4M | int64_t reclaimed_space = 0; |
298 | 53.1M | for (const scoped_refptr<ReadableLogSegment>& segment : segments_) { |
299 | | // The last segment doesn't have a footer. Never include that one. |
300 | 53.1M | if (!segment->HasFooter()) { |
301 | 46.9M | break; |
302 | 46.9M | } |
303 | | |
304 | | // Never garbage collect log segments with unflushed entries. |
305 | 6.20M | if (segment->footer().max_replicate_index() >= index) { |
306 | 3.49M | break; |
307 | 3.49M | } |
308 | | |
309 | | // This log segment contains cdc unreplicated entries. Don't GC it unless the file is too old |
310 | | // (controlled by flag FLAGS_log_max_seconds_to_retain) or we don't have enough space for the |
311 | | // logs (controlled by flag FLAGS_log_stop_retaining_min_disk_mb). |
312 | 2.70M | if (FLAGS_enable_log_retention_by_op_idx && |
313 | 2.70M | segment->footer().max_replicate_index() >= cdc_max_replicated_index) { |
314 | | |
315 | | // Since this log file contains cdc unreplicated entries, we don't want to GC it unless |
316 | | // it's too old, or we don't have enough space to store log files. |
317 | |
|
318 | 0 | if (!ViolatesMaxTimePolicy(segment) && !ViolatesMinSpacePolicy(segment, &reclaimed_space)) { |
319 | | // We exit the loop since this log segment already contains cdc unreplicated entries and so |
320 | | // do all subsequent files. |
321 | 0 | break; |
322 | 0 | } |
323 | 0 | } |
324 | | |
325 | | // TODO: tests for edge cases here with backwards ordered replicates. |
326 | 2.70M | segments->push_back(segment); |
327 | 2.70M | } |
328 | | |
329 | 50.4M | return Status::OK(); |
330 | 50.4M | } |
331 | | |
332 | 711 | int64_t LogReader::GetMinReplicateIndex() const { |
333 | 711 | std::lock_guard<simple_spinlock> lock(lock_); |
334 | 711 | int64_t min_remaining_op_idx = -1; |
335 | | |
336 | 835 | for (const scoped_refptr<ReadableLogSegment>& segment : segments_) { |
337 | 835 | if (!segment->HasFooter()) continue710 ; |
338 | 125 | if (!segment->footer().has_min_replicate_index()) continue0 ; |
339 | 125 | if (min_remaining_op_idx == -1 || |
340 | 125 | segment->footer().min_replicate_index() < min_remaining_op_idx22 ) { |
341 | 103 | min_remaining_op_idx = segment->footer().min_replicate_index(); |
342 | 103 | } |
343 | 125 | } |
344 | 711 | return min_remaining_op_idx; |
345 | 711 | } |
346 | | |
347 | 4.85M | scoped_refptr<ReadableLogSegment> LogReader::GetSegmentBySequenceNumber(int64_t seq) const { |
348 | 4.85M | std::lock_guard<simple_spinlock> lock(lock_); |
349 | 4.85M | if (segments_.empty()) { |
350 | 0 | return nullptr; |
351 | 0 | } |
352 | | |
353 | | // We always have a contiguous set of log segments, so we can find the requested |
354 | | // segment in our vector by calculating its offset vs the first element. |
355 | 4.85M | int64_t first_seqno = segments_[0]->header().sequence_number(); |
356 | 4.85M | size_t relative = seq - first_seqno; |
357 | 4.85M | if (relative >= segments_.size()) { |
358 | 2.02k | return nullptr; |
359 | 2.02k | } |
360 | | |
361 | 4.85M | DCHECK_EQ(segments_[relative]->header().sequence_number(), seq); |
362 | 4.85M | return segments_[relative]; |
363 | 4.85M | } |
364 | | |
365 | | Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry, |
366 | | faststring* tmp_buf, |
367 | 30.1k | LogEntryBatchPB* batch) const { |
368 | 30.1k | const int64_t index = index_entry.op_id.index; |
369 | | |
370 | 30.1k | scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber( |
371 | 30.1k | index_entry.segment_sequence_number); |
372 | 30.1k | if (PREDICT_FALSE(!segment)) { |
373 | 1 | return STATUS(NotFound, Substitute("Segment $0 which contained index $1 has been GCed", |
374 | 1 | index_entry.segment_sequence_number, |
375 | 1 | index)); |
376 | 1 | } |
377 | | |
378 | 30.1k | CHECK_GT(index_entry.offset_in_segment, 0); |
379 | 30.1k | int64_t offset = index_entry.offset_in_segment; |
380 | 30.1k | ScopedLatencyMetric scoped(read_batch_latency_.get()); |
381 | 30.1k | RETURN_NOT_OK_PREPEND(segment->ReadEntryHeaderAndBatch(&offset, tmp_buf, batch), |
382 | 30.1k | Substitute("Failed to read LogEntry for index $0 from log segment " |
383 | 30.1k | "$1 offset $2", |
384 | 30.1k | index, |
385 | 30.1k | index_entry.segment_sequence_number, |
386 | 30.1k | index_entry.offset_in_segment)); |
387 | | |
388 | 30.1k | if (bytes_read_) { |
389 | 29.7k | bytes_read_->IncrementBy(kEntryHeaderSize + tmp_buf->length()); |
390 | 29.7k | entries_read_->IncrementBy(batch->entry_size()); |
391 | 29.7k | } |
392 | | |
393 | 30.1k | return Status::OK(); |
394 | 30.1k | } |
395 | | |
396 | | Status LogReader::ReadReplicatesInRange( |
397 | | const int64_t starting_at, |
398 | | const int64_t up_to, |
399 | | int64_t max_bytes_to_read, |
400 | | ReplicateMsgs* replicates, |
401 | | int64_t* starting_op_segment_seq_num, |
402 | | yb::SchemaPB* modified_schema, |
403 | | uint32_t* modified_schema_version, |
404 | 3.30k | CoarseTimePoint deadline) const { |
405 | 3.30k | DCHECK_GT(starting_at, 0); |
406 | 3.30k | DCHECK_GE(up_to, starting_at); |
407 | 3.30k | DCHECK(log_index_) << "Require an index to random-read logs"0 ; |
408 | 3.30k | ReplicateMsgs replicates_tmp; |
409 | 3.30k | LogIndexEntry prev_index_entry; |
410 | 3.30k | prev_index_entry.segment_sequence_number = -1; |
411 | 3.30k | prev_index_entry.offset_in_segment = -1; |
412 | | |
413 | | // Remove the deadline if the GetChanges deadline feature is disabled. |
414 | 3.30k | if (!ANNOTATE_UNPROTECTED_READ(FLAGS_get_changes_honor_deadline)) { |
415 | 0 | deadline = CoarseTimePoint::max(); |
416 | 0 | } |
417 | | |
418 | 3.30k | int64_t total_size = 0; |
419 | 3.30k | bool limit_exceeded = false; |
420 | 3.30k | faststring tmp_buf; |
421 | 3.30k | LogEntryBatchPB batch; |
422 | 33.8k | for (int64_t index = starting_at; index <= up_to && !limit_exceeded32.1k ; index++30.5k ) { |
423 | | // Stop reading if a deadline was specified and the deadline has been exceeded. |
424 | 30.5k | if (deadline != CoarseTimePoint::max() && CoarseMonoClock::Now() >= deadline1.78k ) { |
425 | 0 | break; |
426 | 0 | } |
427 | | |
428 | 30.5k | if (PREDICT_FALSE(FLAGS_TEST_get_changes_read_loop_delay_ms > 0)) { |
429 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_get_changes_read_loop_delay_ms)); |
430 | 0 | } |
431 | | |
432 | 30.5k | LogIndexEntry index_entry; |
433 | 30.5k | RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry), |
434 | 30.5k | Substitute("Failed to read log index for op $0", index)); |
435 | | |
436 | 30.5k | if (index == starting_at && starting_op_segment_seq_num != nullptr3.30k ) { |
437 | 3.30k | *starting_op_segment_seq_num = index_entry.segment_sequence_number; |
438 | 3.30k | } |
439 | | // Since a given LogEntryBatch may contain multiple REPLICATE messages, |
440 | | // it's likely that this index entry points to the same batch as the previous |
441 | | // one. If that's the case, we've already read this REPLICATE and we can |
442 | | // skip reading the batch again. |
443 | 30.5k | if (index == starting_at || |
444 | 30.5k | index_entry.segment_sequence_number != prev_index_entry.segment_sequence_number27.2k || |
445 | 30.5k | index_entry.offset_in_segment != prev_index_entry.offset_in_segment26.4k ) { |
446 | | // Make read operation. |
447 | 30.1k | RETURN_NOT_OK(ReadBatchUsingIndexEntry(index_entry, &tmp_buf, &batch)); |
448 | | |
449 | | // Sanity-check the property that a batch should only have increasing indexes. |
450 | 30.1k | int64_t prev_index = 0; |
451 | 60.7k | for (int i = 0; i < batch.entry_size(); ++i30.5k ) { |
452 | 30.5k | LogEntryPB* entry = batch.mutable_entry(i); |
453 | 30.5k | if (!entry->has_replicate()) continue0 ; |
454 | 30.5k | int64_t this_index = entry->replicate().id().index(); |
455 | 30.5k | CHECK_GT(this_index, prev_index) |
456 | 0 | << "Expected that an entry batch should only include increasing log indexes: " |
457 | 0 | << index_entry.ToString() |
458 | 0 | << "\nBatch: " << batch.DebugString(); |
459 | 30.5k | prev_index = this_index; |
460 | 30.5k | } |
461 | 30.1k | } |
462 | | |
463 | 30.5k | bool found = false; |
464 | 31.0k | for (int i = 0; i < batch.entry_size(); ++i531 ) { |
465 | 31.0k | LogEntryPB* entry = batch.mutable_entry(i); |
466 | 31.0k | if (!entry->has_replicate()) { |
467 | 529 | continue; |
468 | 529 | } |
469 | | |
470 | 30.5k | if (entry->replicate().id().index() != index) { |
471 | 2 | continue; |
472 | 2 | } |
473 | | |
474 | 30.5k | int64_t space_required = entry->replicate().SpaceUsed(); |
475 | 30.5k | if (replicates_tmp.empty() || |
476 | 30.5k | max_bytes_to_read <= 027.2k || |
477 | 30.5k | total_size + space_required < max_bytes_to_read24.8k ) { |
478 | 28.8k | total_size += space_required; |
479 | 28.8k | replicates_tmp.emplace_back(entry->release_replicate()); |
480 | 28.8k | if (replicates_tmp.back()->op_type() == consensus::OperationType::CHANGE_METADATA_OP && |
481 | 28.8k | modified_schema != nullptr1.78k && modified_schema_version != nullptr1.78k ) { |
482 | 1.78k | (*modified_schema).CopyFrom(replicates_tmp.back()->change_metadata_request().schema()); |
483 | 1.78k | *modified_schema_version = replicates_tmp.back()->change_metadata_request(). |
484 | 1.78k | schema_version(); |
485 | 1.78k | } |
486 | 28.8k | } else { |
487 | 1.70k | limit_exceeded = true; |
488 | 1.70k | } |
489 | 30.5k | found = true; |
490 | 30.5k | break; |
491 | 30.5k | } |
492 | 30.5k | CHECK(found) << "Incorrect index entry didn't yield expected log entry: " |
493 | 0 | << index_entry.ToString(); |
494 | | |
495 | 30.5k | prev_index_entry = index_entry; |
496 | 30.5k | } |
497 | | |
498 | 3.30k | replicates->swap(replicates_tmp); |
499 | 3.30k | return Status::OK(); |
500 | 3.30k | } |
501 | | |
502 | 40.8M | Result<yb::OpId> LogReader::LookupOpId(int64_t op_index) const { |
503 | 40.8M | LogIndexEntry index_entry; |
504 | 40.8M | RETURN_NOT_OK_PREPEND(log_index_->GetEntry(op_index, &index_entry), |
505 | 40.8M | strings::Substitute("Failed to read log index for op $0", op_index)); |
506 | 40.8M | return index_entry.op_id; |
507 | 40.8M | } |
508 | | |
509 | 8.27M | Result<int64_t> LogReader::LookupHeader(int64_t op_index) const { |
510 | 8.27M | LogIndexEntry index_entry; |
511 | 8.27M | Status st = log_index_->GetEntry(op_index, &index_entry); |
512 | 8.27M | if (st.IsNotFound()) { |
513 | 3.45M | return -1; |
514 | 3.45M | } |
515 | 4.82M | return index_entry.segment_sequence_number; |
516 | 8.27M | } |
517 | | |
518 | 2.37M | Status LogReader::GetSegmentsSnapshot(SegmentSequence* segments) const { |
519 | 2.37M | std::lock_guard<simple_spinlock> lock(lock_); |
520 | 2.37M | CHECK_EQ(state_, kLogReaderReading); |
521 | 2.37M | segments->assign(segments_.begin(), segments_.end()); |
522 | 2.37M | return Status::OK(); |
523 | 2.37M | } |
524 | | |
525 | 94 | Status LogReader::TrimSegmentsUpToAndIncluding(uint64_t segment_sequence_number) { |
526 | 94 | std::lock_guard<simple_spinlock> lock(lock_); |
527 | 94 | CHECK_EQ(state_, kLogReaderReading); |
528 | 94 | auto iter = segments_.begin(); |
529 | 94 | std::vector<int64_t> deleted_segments; |
530 | | |
531 | 198 | while (iter != segments_.end()) { |
532 | 198 | auto current_seq_no = (*iter)->header().sequence_number(); |
533 | 198 | if (current_seq_no > segment_sequence_number) { |
534 | 94 | break; |
535 | 94 | } |
536 | 104 | deleted_segments.push_back(current_seq_no); |
537 | 104 | iter = segments_.erase(iter); |
538 | 104 | } |
539 | 94 | LOG_WITH_PREFIX(INFO) << "Removed log segment sequence numbers from log reader: " |
540 | 94 | << yb::ToString(deleted_segments); |
541 | 94 | return Status::OK(); |
542 | 94 | } |
543 | | |
544 | 25.1M | void LogReader::UpdateLastSegmentOffset(int64_t readable_to_offset) { |
545 | 25.1M | std::lock_guard<simple_spinlock> lock(lock_); |
546 | 25.1M | CHECK_EQ(state_, kLogReaderReading); |
547 | 25.1M | DCHECK(!segments_.empty()); |
548 | | // Get the last segment |
549 | 25.1M | ReadableLogSegment* segment = segments_.back().get(); |
550 | 25.1M | DCHECK(!segment->HasFooter()); |
551 | 25.1M | segment->UpdateReadableToOffset(readable_to_offset); |
552 | 25.1M | } |
553 | | |
554 | 84.2k | Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment) { |
555 | | // This is used to replace the last segment once we close it properly so it must |
556 | | // have a footer. |
557 | 84.2k | DCHECK(segment->HasFooter()); |
558 | | |
559 | 84.2k | std::lock_guard<simple_spinlock> lock(lock_); |
560 | 84.2k | CHECK_EQ(state_, kLogReaderReading); |
561 | | // Make sure the segment we're replacing has the same sequence number |
562 | 84.2k | CHECK(!segments_.empty()); |
563 | 84.2k | CHECK_EQ(segment->header().sequence_number(), segments_.back()->header().sequence_number()); |
564 | 84.2k | segments_[segments_.size() - 1] = segment; |
565 | | |
566 | 84.2k | return Status::OK(); |
567 | 84.2k | } |
568 | | |
569 | 3 | Status LogReader::AppendSegment(const scoped_refptr<ReadableLogSegment>& segment) { |
570 | 3 | DCHECK(segment->IsInitialized()); |
571 | 3 | if (PREDICT_FALSE(!segment->HasFooter())) { |
572 | 0 | RETURN_NOT_OK(segment->RebuildFooterByScanning()); |
573 | 0 | } |
574 | 3 | std::lock_guard<simple_spinlock> lock(lock_); |
575 | 3 | return AppendSegmentUnlocked(segment); |
576 | 3 | } |
577 | | |
578 | 10.5k | Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment) { |
579 | 10.5k | DCHECK(segment->IsInitialized()); |
580 | 10.5k | DCHECK(segment->HasFooter()); |
581 | | |
582 | 10.5k | if (!segments_.empty()) { |
583 | 7.75k | CHECK_EQ(segments_.back()->header().sequence_number() + 1, |
584 | 7.75k | segment->header().sequence_number()); |
585 | 7.75k | } |
586 | 10.5k | segments_.push_back(segment); |
587 | 10.5k | return Status::OK(); |
588 | 10.5k | } |
589 | | |
590 | 160k | Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment) { |
591 | 160k | DCHECK(segment->IsInitialized()); |
592 | 160k | std::lock_guard<simple_spinlock> lock(lock_); |
593 | 160k | CHECK_EQ(state_, kLogReaderReading); |
594 | 160k | if (!segments_.empty()) { |
595 | 11.7k | CHECK_EQ(segments_.back()->header().sequence_number() + 1, |
596 | 11.7k | segment->header().sequence_number()); |
597 | 11.7k | } |
598 | 160k | segments_.push_back(segment); |
599 | 160k | return Status::OK(); |
600 | 160k | } |
601 | | |
602 | 50.6M | size_t LogReader::num_segments() const { |
603 | 50.6M | std::lock_guard<simple_spinlock> lock(lock_); |
604 | 50.6M | return segments_.size(); |
605 | 50.6M | } |
606 | | |
607 | 0 | string LogReader::ToString() const { |
608 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
609 | 0 | string ret = "Reader's SegmentSequence: \n"; |
610 | 0 | for (const SegmentSequence::value_type& entry : segments_) { |
611 | 0 | ret.append(Substitute("Segment: $0 Footer: $1\n", |
612 | 0 | entry->header().sequence_number(), |
613 | 0 | !entry->HasFooter() ? "NONE" : entry->footer().ShortDebugString())); |
614 | 0 | } |
615 | 0 | return ret; |
616 | 0 | } |
617 | | |
618 | | } // namespace log |
619 | | } // namespace yb |