YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log_reader.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log_reader.h"
34
35
#include <algorithm>
36
#include <mutex>
37
38
#include <glog/logging.h>
39
40
#include "yb/consensus/consensus_util.h"
41
#include "yb/consensus/log_index.h"
42
#include "yb/consensus/log_util.h"
43
44
#include "yb/gutil/dynamic_annotations.h"
45
46
#include "yb/util/env_util.h"
47
#include "yb/util/flag_tags.h"
48
#include "yb/util/logging.h"
49
#include "yb/util/metrics.h"
50
#include "yb/util/monotime.h"
51
#include "yb/util/path_util.h"
52
#include "yb/util/result.h"
53
54
DEFINE_bool(enable_log_retention_by_op_idx, true,
55
            "If true, logs will be retained based on an op id passed by the cdc service");
56
57
DEFINE_int32(log_max_seconds_to_retain, 24 * 3600, "Log files that are older will be "
58
             "deleted even if they contain cdc unreplicated entries. If 0, this flag will be "
59
             "ignored. This flag is ignored if a log segment contains entries that haven't been"
60
             "flushed to RocksDB.");
61
62
DEFINE_int64(log_stop_retaining_min_disk_mb, 100 * 1024, "Stop retaining logs if the space "
63
             "available for the logs falls below this limit. This flag is ignored if a log segment "
64
             "contains unflushed entries.");
65
66
METRIC_DEFINE_counter(tablet, log_reader_bytes_read, "Bytes Read From Log",
67
                      yb::MetricUnit::kBytes,
68
                      "Data read from the WAL since tablet start");
69
70
METRIC_DEFINE_counter(tablet, log_reader_entries_read, "Entries Read From Log",
71
                      yb::MetricUnit::kEntries,
72
                      "Number of entries read from the WAL since tablet start");
73
74
METRIC_DEFINE_coarse_histogram(table, log_reader_read_batch_latency, "Log Read Latency",
75
                        yb::MetricUnit::kBytes,
76
                        "Microseconds spent reading log entry batches");
77
78
DEFINE_test_flag(bool, record_segments_violate_max_time_policy, false,
79
    "If set, everytime GetSegmentPrefixNotIncluding runs, segments that violate the max time "
80
    "policy will be appended to LogReader::segments_violate_max_time_policy_.");
81
82
DEFINE_test_flag(bool, record_segments_violate_min_space_policy, false,
83
    "If set, everytime GetSegmentPrefixNotIncluding runs, segments that violate the max time "
84
    "policy will be appended to LogReader::segments_violate_min_space_policy_.");
85
86
DEFINE_bool(get_changes_honor_deadline, true,
87
            "Toggle whether to honor the deadline passed to log reader");
88
89
DEFINE_test_flag(int32, get_changes_read_loop_delay_ms, 0,
90
                 "Amount of time to sleep for between each iteration of the loop in "
91
                 "ReadReplicatesInRange. This is used to test the return of partial results.");
92
93
namespace yb {
94
namespace log {
95
96
namespace {
97
struct LogSegmentSeqnoComparator {
98
  bool operator() (const scoped_refptr<ReadableLogSegment>& a,
99
27.7k
                   const scoped_refptr<ReadableLogSegment>& b) {
100
27.7k
    return a->header().sequence_number() < b->header().sequence_number();
101
27.7k
  }
102
};
103
}
104
105
using consensus::ReplicateMsg;
106
using env_util::ReadFully;
107
using strings::Substitute;
108
109
const int64_t LogReader::kNoSizeLimit = -1;
110
111
Status LogReader::Open(Env *env,
112
                       const scoped_refptr<LogIndex>& index,
113
                       std::string log_prefix,
114
                       const std::string& tablet_wal_path,
115
                       const scoped_refptr<MetricEntity>& table_metric_entity,
116
                       const scoped_refptr<MetricEntity>& tablet_metric_entity,
117
151k
                       std::unique_ptr<LogReader> *reader) {
118
151k
  std::unique_ptr<LogReader> log_reader(new LogReader(
119
151k
      env, index, std::move(log_prefix), table_metric_entity, tablet_metric_entity));
120
121
151k
  RETURN_NOT_OK(log_reader->Init(tablet_wal_path));
122
151k
  *reader = std::move(log_reader);
123
151k
  return Status::OK();
124
151k
}
125
126
LogReader::LogReader(Env* env,
127
                     const scoped_refptr<LogIndex>& index,
128
                     std::string log_prefix,
129
                     const scoped_refptr<MetricEntity>& table_metric_entity,
130
                     const scoped_refptr<MetricEntity>& tablet_metric_entity)
131
    : env_(env),
132
      log_index_(index),
133
      log_prefix_(std::move(log_prefix)),
134
151k
      state_(kLogReaderInitialized) {
135
151k
  if (table_metric_entity) {
136
150k
    read_batch_latency_ = METRIC_log_reader_read_batch_latency.Instantiate(table_metric_entity);
137
150k
  }
138
151k
  if (tablet_metric_entity) {
139
150k
    bytes_read_ = METRIC_log_reader_bytes_read.Instantiate(tablet_metric_entity);
140
150k
    entries_read_ = METRIC_log_reader_entries_read.Instantiate(tablet_metric_entity);
141
150k
  }
142
151k
  if (PREDICT_FALSE(FLAGS_enable_log_retention_by_op_idx &&
143
151k
                        (FLAGS_TEST_record_segments_violate_max_time_policy ||
144
151k
                         FLAGS_TEST_record_segments_violate_min_space_policy))) {
145
2
    segments_violate_max_time_policy_ = std::make_unique<SegmentSequence>();
146
2
    segments_violate_min_space_policy_ = std::make_unique<SegmentSequence>();
147
2
  }
148
151k
}
149
150
76.3k
LogReader::~LogReader() {
151
76.3k
}
152
153
151k
Status LogReader::Init(const string& tablet_wal_path) {
154
151k
  {
155
151k
    std::lock_guard<simple_spinlock> lock(lock_);
156
151k
    CHECK_EQ
(state_, kLogReaderInitialized) << "bad state for Init(): " << state_0
;
157
151k
  }
158
18.4E
  VLOG_WITH_PREFIX(1) << "Reading wal from path:" << tablet_wal_path;
159
160
151k
  if (!env_->FileExists(tablet_wal_path)) {
161
0
    return STATUS(IllegalState, "Cannot find wal location at", tablet_wal_path);
162
0
  }
163
164
18.4E
  VLOG_WITH_PREFIX(1) << "Parsing segments from path: " << tablet_wal_path;
165
166
151k
  std::vector<string> files_from_log_directory;
167
151k
  RETURN_NOT_OK_PREPEND(env_->GetChildren(tablet_wal_path, &files_from_log_directory),
168
151k
                        "Unable to read children from path");
169
170
151k
  SegmentSequence read_segments;
171
172
  // Build a log segment from log files, ignoring non log files.
173
313k
  for (const string &potential_log_file : files_from_log_directory) {
174
313k
    if (!IsLogFileName(potential_log_file)) {
175
302k
      continue;
176
302k
    }
177
178
10.6k
    string fqp = JoinPathSegments(tablet_wal_path, potential_log_file);
179
10.6k
    auto segment = VERIFY_RESULT_PREPEND(ReadableLogSegment::Open(env_, fqp),
180
10.6k
                                         Format("Unable to open readable log segment: $0", fqp));
181
10.6k
    if (!segment) {
182
1
      LOG_WITH_PREFIX(INFO) << "Log segment w/o header: " << fqp << ", skipping";
183
1
      continue;
184
1
    }
185
10.6k
    CHECK
(segment->IsInitialized()) << "Uninitialized segment at: " << segment->path()33
;
186
187
10.6k
    if (!segment->HasFooter()) {
188
3.75k
      LOG_WITH_PREFIX(WARNING)
189
3.75k
          << "Log segment " << fqp << " was likely left in-progress "
190
3.75k
             "after a previous crash. Will try to rebuild footer by scanning data.";
191
3.75k
      RETURN_NOT_OK(segment->RebuildFooterByScanning());
192
3.75k
    }
193
194
10.6k
    read_segments.push_back(segment);
195
10.6k
  }
196
197
  // Sort the segments by sequence number.
198
151k
  std::sort(read_segments.begin(), read_segments.end(), LogSegmentSeqnoComparator());
199
200
151k
  {
201
151k
    std::lock_guard<simple_spinlock> lock(lock_);
202
203
151k
    string previous_seg_path;
204
151k
    int64_t previous_seg_seqno = -1;
205
151k
    for (const SegmentSequence::value_type& entry : read_segments) {
206
18.4E
      VLOG_WITH_PREFIX(1) << " Log Reader Indexed: " << entry->footer().ShortDebugString();
207
      // Check that the log segments are in sequence.
208
10.5k
      if (previous_seg_seqno != -1 &&
209
10.5k
          
entry->header().sequence_number() != implicit_cast<size_t>(previous_seg_seqno) + 17.75k
) {
210
0
        return STATUS(Corruption, Substitute("Segment sequence numbers are not consecutive. "
211
0
            "Previous segment: seqno $0, path $1; Current segment: seqno $2, path $3",
212
0
            previous_seg_seqno, previous_seg_path,
213
0
            entry->header().sequence_number(), entry->path()));
214
0
        previous_seg_seqno++;
215
10.5k
      } else {
216
10.5k
        previous_seg_seqno = entry->header().sequence_number();
217
10.5k
      }
218
10.5k
      previous_seg_path = entry->path();
219
10.5k
      RETURN_NOT_OK(AppendSegmentUnlocked(entry));
220
10.5k
    }
221
222
151k
    state_ = kLogReaderReading;
223
151k
  }
224
0
  return Status::OK();
225
151k
}
226
227
1
Status LogReader::InitEmptyReaderForTests() {
228
1
  std::lock_guard<simple_spinlock> lock(lock_);
229
1
  state_ = kLogReaderReading;
230
1
  return Status::OK();
231
1
}
232
233
0
bool LogReader::ViolatesMaxTimePolicy(const scoped_refptr<ReadableLogSegment>& segment) const {
234
0
  if (FLAGS_log_max_seconds_to_retain <= 0) {
235
0
    return false;
236
0
  }
237
238
0
  if (!segment->HasFooter()) {
239
0
    return false;
240
0
  }
241
242
0
  int64_t now = GetCurrentTimeMicros();
243
0
  int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000;
244
0
  if (age_seconds > FLAGS_log_max_seconds_to_retain) {
245
0
    YB_LOG_EVERY_N_SECS(WARNING, 300)
246
0
        << "Segment " << segment->path() << " violates max retention time policy. "
247
0
        << "Segment age: " << age_seconds << " seconds. "
248
0
        << "log_max_seconds_to_retain: " << FLAGS_log_max_seconds_to_retain;
249
0
    if (PREDICT_FALSE(FLAGS_TEST_record_segments_violate_max_time_policy)) {
250
0
      segments_violate_max_time_policy_->push_back(segment);
251
0
    }
252
0
    return true;
253
0
  }
254
0
  return false;
255
0
}
256
257
bool LogReader::ViolatesMinSpacePolicy(const scoped_refptr<ReadableLogSegment>& segment,
258
0
                                       int64_t *potential_reclaimed_space) const {
259
0
  if (FLAGS_log_stop_retaining_min_disk_mb <= 0) {
260
0
    return false;
261
0
  }
262
0
  auto free_space_result = env_->GetFreeSpaceBytes(segment->path());
263
0
  if (!free_space_result.ok()) {
264
0
    YB_LOG_EVERY_N_SECS(WARNING, 300) << "Unable to get free space: " << free_space_result;
265
0
    return false;
266
0
  } else {
267
0
    uint64_t free_space = *free_space_result;
268
0
    if (free_space + *potential_reclaimed_space <
269
0
            implicit_cast<size_t>(FLAGS_log_stop_retaining_min_disk_mb) * 1024) {
270
0
      YB_LOG_EVERY_N_SECS(WARNING, 300)
271
0
          << "Segment " << segment->path() << " violates minimum free space policy "
272
0
          << "specified by log_stop_retaining_min_disk_mb: "
273
0
          << FLAGS_log_stop_retaining_min_disk_mb;
274
0
      *potential_reclaimed_space += segment->file_size();
275
0
      if (PREDICT_FALSE(FLAGS_TEST_record_segments_violate_min_space_policy)) {
276
0
        segments_violate_min_space_policy_->push_back(segment);
277
0
      }
278
0
      return true;
279
0
    }
280
0
  }
281
0
  return false;
282
0
}
283
284
5
Status LogReader::GetSegmentPrefixNotIncluding(int64_t index, SegmentSequence* segments) const {
285
5
  return GetSegmentPrefixNotIncluding(index, index, segments);
286
5
}
287
288
Status LogReader::GetSegmentPrefixNotIncluding(int64_t index, int64_t cdc_max_replicated_index,
289
50.4M
                                               SegmentSequence* segments) const {
290
50.4M
  DCHECK_GE(index, 0);
291
50.4M
  DCHECK(segments);
292
50.4M
  segments->clear();
293
294
50.4M
  std::lock_guard<simple_spinlock> lock(lock_);
295
50.4M
  CHECK_EQ(state_, kLogReaderReading);
296
297
50.4M
  int64_t reclaimed_space = 0;
298
53.1M
  for (const scoped_refptr<ReadableLogSegment>& segment : segments_) {
299
    // The last segment doesn't have a footer. Never include that one.
300
53.1M
    if (!segment->HasFooter()) {
301
46.9M
      break;
302
46.9M
    }
303
304
    // Never garbage collect log segments with unflushed entries.
305
6.20M
    if (segment->footer().max_replicate_index() >= index) {
306
3.49M
      break;
307
3.49M
    }
308
309
    // This log segment contains cdc unreplicated entries. Don't GC it unless the file is too old
310
    // (controlled by flag FLAGS_log_max_seconds_to_retain) or we don't have enough space for the
311
    // logs (controlled by flag FLAGS_log_stop_retaining_min_disk_mb).
312
2.70M
    if (FLAGS_enable_log_retention_by_op_idx &&
313
2.70M
        segment->footer().max_replicate_index() >= cdc_max_replicated_index) {
314
315
      // Since this log file contains cdc unreplicated entries, we don't want to GC it unless
316
      // it's too old, or we don't have enough space to store log files.
317
318
0
      if (!ViolatesMaxTimePolicy(segment) && !ViolatesMinSpacePolicy(segment, &reclaimed_space)) {
319
        // We exit the loop since this log segment already contains cdc unreplicated entries and so
320
        // do all subsequent files.
321
0
        break;
322
0
      }
323
0
    }
324
325
    // TODO: tests for edge cases here with backwards ordered replicates.
326
2.70M
    segments->push_back(segment);
327
2.70M
  }
328
329
50.4M
  return Status::OK();
330
50.4M
}
331
332
711
int64_t LogReader::GetMinReplicateIndex() const {
333
711
  std::lock_guard<simple_spinlock> lock(lock_);
334
711
  int64_t min_remaining_op_idx = -1;
335
336
835
  for (const scoped_refptr<ReadableLogSegment>& segment : segments_) {
337
835
    if (!segment->HasFooter()) 
continue710
;
338
125
    if (!segment->footer().has_min_replicate_index()) 
continue0
;
339
125
    if (min_remaining_op_idx == -1 ||
340
125
        
segment->footer().min_replicate_index() < min_remaining_op_idx22
) {
341
103
      min_remaining_op_idx = segment->footer().min_replicate_index();
342
103
    }
343
125
  }
344
711
  return min_remaining_op_idx;
345
711
}
346
347
4.85M
scoped_refptr<ReadableLogSegment> LogReader::GetSegmentBySequenceNumber(int64_t seq) const {
348
4.85M
  std::lock_guard<simple_spinlock> lock(lock_);
349
4.85M
  if (segments_.empty()) {
350
0
    return nullptr;
351
0
  }
352
353
  // We always have a contiguous set of log segments, so we can find the requested
354
  // segment in our vector by calculating its offset vs the first element.
355
4.85M
  int64_t first_seqno = segments_[0]->header().sequence_number();
356
4.85M
  size_t relative = seq - first_seqno;
357
4.85M
  if (relative >= segments_.size()) {
358
2.02k
    return nullptr;
359
2.02k
  }
360
361
4.85M
  DCHECK_EQ(segments_[relative]->header().sequence_number(), seq);
362
4.85M
  return segments_[relative];
363
4.85M
}
364
365
Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
366
                                           faststring* tmp_buf,
367
30.1k
                                           LogEntryBatchPB* batch) const {
368
30.1k
  const int64_t index = index_entry.op_id.index;
369
370
30.1k
  scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
371
30.1k
    index_entry.segment_sequence_number);
372
30.1k
  if (PREDICT_FALSE(!segment)) {
373
1
    return STATUS(NotFound, Substitute("Segment $0 which contained index $1 has been GCed",
374
1
                                       index_entry.segment_sequence_number,
375
1
                                       index));
376
1
  }
377
378
30.1k
  CHECK_GT(index_entry.offset_in_segment, 0);
379
30.1k
  int64_t offset = index_entry.offset_in_segment;
380
30.1k
  ScopedLatencyMetric scoped(read_batch_latency_.get());
381
30.1k
  RETURN_NOT_OK_PREPEND(segment->ReadEntryHeaderAndBatch(&offset, tmp_buf, batch),
382
30.1k
                        Substitute("Failed to read LogEntry for index $0 from log segment "
383
30.1k
                                   "$1 offset $2",
384
30.1k
                                   index,
385
30.1k
                                   index_entry.segment_sequence_number,
386
30.1k
                                   index_entry.offset_in_segment));
387
388
30.1k
  if (bytes_read_) {
389
29.7k
    bytes_read_->IncrementBy(kEntryHeaderSize + tmp_buf->length());
390
29.7k
    entries_read_->IncrementBy(batch->entry_size());
391
29.7k
  }
392
393
30.1k
  return Status::OK();
394
30.1k
}
395
396
Status LogReader::ReadReplicatesInRange(
397
    const int64_t starting_at,
398
    const int64_t up_to,
399
    int64_t max_bytes_to_read,
400
    ReplicateMsgs* replicates,
401
    int64_t* starting_op_segment_seq_num,
402
    yb::SchemaPB* modified_schema,
403
    uint32_t* modified_schema_version,
404
3.30k
    CoarseTimePoint deadline) const {
405
3.30k
  DCHECK_GT(starting_at, 0);
406
3.30k
  DCHECK_GE(up_to, starting_at);
407
3.30k
  DCHECK
(log_index_) << "Require an index to random-read logs"0
;
408
3.30k
  ReplicateMsgs replicates_tmp;
409
3.30k
  LogIndexEntry prev_index_entry;
410
3.30k
  prev_index_entry.segment_sequence_number = -1;
411
3.30k
  prev_index_entry.offset_in_segment = -1;
412
413
  // Remove the deadline if the GetChanges deadline feature is disabled.
414
3.30k
  if (!ANNOTATE_UNPROTECTED_READ(FLAGS_get_changes_honor_deadline)) {
415
0
    deadline = CoarseTimePoint::max();
416
0
  }
417
418
3.30k
  int64_t total_size = 0;
419
3.30k
  bool limit_exceeded = false;
420
3.30k
  faststring tmp_buf;
421
3.30k
  LogEntryBatchPB batch;
422
33.8k
  for (int64_t index = starting_at; index <= up_to && 
!limit_exceeded32.1k
;
index++30.5k
) {
423
    // Stop reading if a deadline was specified and the deadline has been exceeded.
424
30.5k
    if (deadline != CoarseTimePoint::max() && 
CoarseMonoClock::Now() >= deadline1.78k
) {
425
0
      break;
426
0
    }
427
428
30.5k
    if (PREDICT_FALSE(FLAGS_TEST_get_changes_read_loop_delay_ms > 0)) {
429
0
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_get_changes_read_loop_delay_ms));
430
0
    }
431
432
30.5k
    LogIndexEntry index_entry;
433
30.5k
    RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
434
30.5k
                          Substitute("Failed to read log index for op $0", index));
435
436
30.5k
    if (index == starting_at && 
starting_op_segment_seq_num != nullptr3.30k
) {
437
3.30k
      *starting_op_segment_seq_num = index_entry.segment_sequence_number;
438
3.30k
    }
439
    // Since a given LogEntryBatch may contain multiple REPLICATE messages,
440
    // it's likely that this index entry points to the same batch as the previous
441
    // one. If that's the case, we've already read this REPLICATE and we can
442
    // skip reading the batch again.
443
30.5k
    if (index == starting_at ||
444
30.5k
        
index_entry.segment_sequence_number != prev_index_entry.segment_sequence_number27.2k
||
445
30.5k
        
index_entry.offset_in_segment != prev_index_entry.offset_in_segment26.4k
) {
446
      // Make read operation.
447
30.1k
      RETURN_NOT_OK(ReadBatchUsingIndexEntry(index_entry, &tmp_buf, &batch));
448
449
      // Sanity-check the property that a batch should only have increasing indexes.
450
30.1k
      int64_t prev_index = 0;
451
60.7k
      for (int i = 0; i < batch.entry_size(); 
++i30.5k
) {
452
30.5k
        LogEntryPB* entry = batch.mutable_entry(i);
453
30.5k
        if (!entry->has_replicate()) 
continue0
;
454
30.5k
        int64_t this_index = entry->replicate().id().index();
455
30.5k
        CHECK_GT(this_index, prev_index)
456
0
          << "Expected that an entry batch should only include increasing log indexes: "
457
0
          << index_entry.ToString()
458
0
          << "\nBatch: " << batch.DebugString();
459
30.5k
        prev_index = this_index;
460
30.5k
      }
461
30.1k
    }
462
463
30.5k
    bool found = false;
464
31.0k
    for (int i = 0; i < batch.entry_size(); 
++i531
) {
465
31.0k
      LogEntryPB* entry = batch.mutable_entry(i);
466
31.0k
      if (!entry->has_replicate()) {
467
529
        continue;
468
529
      }
469
470
30.5k
      if (entry->replicate().id().index() != index) {
471
2
        continue;
472
2
      }
473
474
30.5k
      int64_t space_required = entry->replicate().SpaceUsed();
475
30.5k
      if (replicates_tmp.empty() ||
476
30.5k
          
max_bytes_to_read <= 027.2k
||
477
30.5k
          
total_size + space_required < max_bytes_to_read24.8k
) {
478
28.8k
        total_size += space_required;
479
28.8k
        replicates_tmp.emplace_back(entry->release_replicate());
480
28.8k
        if (replicates_tmp.back()->op_type() == consensus::OperationType::CHANGE_METADATA_OP &&
481
28.8k
            
modified_schema != nullptr1.78k
&&
modified_schema_version != nullptr1.78k
) {
482
1.78k
          (*modified_schema).CopyFrom(replicates_tmp.back()->change_metadata_request().schema());
483
1.78k
          *modified_schema_version = replicates_tmp.back()->change_metadata_request().
484
1.78k
            schema_version();
485
1.78k
        }
486
28.8k
      } else {
487
1.70k
        limit_exceeded = true;
488
1.70k
      }
489
30.5k
      found = true;
490
30.5k
      break;
491
30.5k
    }
492
30.5k
    CHECK(found) << "Incorrect index entry didn't yield expected log entry: "
493
0
                 << index_entry.ToString();
494
495
30.5k
    prev_index_entry = index_entry;
496
30.5k
  }
497
498
3.30k
  replicates->swap(replicates_tmp);
499
3.30k
  return Status::OK();
500
3.30k
}
501
502
40.8M
Result<yb::OpId> LogReader::LookupOpId(int64_t op_index) const {
503
40.8M
  LogIndexEntry index_entry;
504
40.8M
  RETURN_NOT_OK_PREPEND(log_index_->GetEntry(op_index, &index_entry),
505
40.8M
                        strings::Substitute("Failed to read log index for op $0", op_index));
506
40.8M
  return index_entry.op_id;
507
40.8M
}
508
509
8.27M
Result<int64_t> LogReader::LookupHeader(int64_t op_index) const {
510
8.27M
  LogIndexEntry index_entry;
511
8.27M
  Status st = log_index_->GetEntry(op_index, &index_entry);
512
8.27M
  if (st.IsNotFound()) {
513
3.45M
    return -1;
514
3.45M
  }
515
4.82M
  return index_entry.segment_sequence_number;
516
8.27M
}
517
518
2.37M
Status LogReader::GetSegmentsSnapshot(SegmentSequence* segments) const {
519
2.37M
  std::lock_guard<simple_spinlock> lock(lock_);
520
2.37M
  CHECK_EQ(state_, kLogReaderReading);
521
2.37M
  segments->assign(segments_.begin(), segments_.end());
522
2.37M
  return Status::OK();
523
2.37M
}
524
525
94
Status LogReader::TrimSegmentsUpToAndIncluding(uint64_t segment_sequence_number) {
526
94
  std::lock_guard<simple_spinlock> lock(lock_);
527
94
  CHECK_EQ(state_, kLogReaderReading);
528
94
  auto iter = segments_.begin();
529
94
  std::vector<int64_t> deleted_segments;
530
531
198
  while (iter != segments_.end()) {
532
198
    auto current_seq_no = (*iter)->header().sequence_number();
533
198
    if (current_seq_no > segment_sequence_number) {
534
94
      break;
535
94
    }
536
104
    deleted_segments.push_back(current_seq_no);
537
104
    iter = segments_.erase(iter);
538
104
  }
539
94
  LOG_WITH_PREFIX(INFO) << "Removed log segment sequence numbers from log reader: "
540
94
                        << yb::ToString(deleted_segments);
541
94
  return Status::OK();
542
94
}
543
544
25.1M
void LogReader::UpdateLastSegmentOffset(int64_t readable_to_offset) {
545
25.1M
  std::lock_guard<simple_spinlock> lock(lock_);
546
25.1M
  CHECK_EQ(state_, kLogReaderReading);
547
25.1M
  DCHECK(!segments_.empty());
548
  // Get the last segment
549
25.1M
  ReadableLogSegment* segment = segments_.back().get();
550
25.1M
  DCHECK(!segment->HasFooter());
551
25.1M
  segment->UpdateReadableToOffset(readable_to_offset);
552
25.1M
}
553
554
84.2k
Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment) {
555
  // This is used to replace the last segment once we close it properly so it must
556
  // have a footer.
557
84.2k
  DCHECK(segment->HasFooter());
558
559
84.2k
  std::lock_guard<simple_spinlock> lock(lock_);
560
84.2k
  CHECK_EQ(state_, kLogReaderReading);
561
  // Make sure the segment we're replacing has the same sequence number
562
84.2k
  CHECK(!segments_.empty());
563
84.2k
  CHECK_EQ(segment->header().sequence_number(), segments_.back()->header().sequence_number());
564
84.2k
  segments_[segments_.size() - 1] = segment;
565
566
84.2k
  return Status::OK();
567
84.2k
}
568
569
3
Status LogReader::AppendSegment(const scoped_refptr<ReadableLogSegment>& segment) {
570
3
  DCHECK(segment->IsInitialized());
571
3
  if (PREDICT_FALSE(!segment->HasFooter())) {
572
0
    RETURN_NOT_OK(segment->RebuildFooterByScanning());
573
0
  }
574
3
  std::lock_guard<simple_spinlock> lock(lock_);
575
3
  return AppendSegmentUnlocked(segment);
576
3
}
577
578
10.5k
Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment) {
579
10.5k
  DCHECK(segment->IsInitialized());
580
10.5k
  DCHECK(segment->HasFooter());
581
582
10.5k
  if (!segments_.empty()) {
583
7.75k
    CHECK_EQ(segments_.back()->header().sequence_number() + 1,
584
7.75k
             segment->header().sequence_number());
585
7.75k
  }
586
10.5k
  segments_.push_back(segment);
587
10.5k
  return Status::OK();
588
10.5k
}
589
590
160k
Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment) {
591
160k
  DCHECK(segment->IsInitialized());
592
160k
  std::lock_guard<simple_spinlock> lock(lock_);
593
160k
  CHECK_EQ(state_, kLogReaderReading);
594
160k
  if (!segments_.empty()) {
595
11.7k
    CHECK_EQ(segments_.back()->header().sequence_number() + 1,
596
11.7k
             segment->header().sequence_number());
597
11.7k
  }
598
160k
  segments_.push_back(segment);
599
160k
  return Status::OK();
600
160k
}
601
602
50.6M
size_t LogReader::num_segments() const {
603
50.6M
  std::lock_guard<simple_spinlock> lock(lock_);
604
50.6M
  return segments_.size();
605
50.6M
}
606
607
0
string LogReader::ToString() const {
608
0
  std::lock_guard<simple_spinlock> lock(lock_);
609
0
  string ret = "Reader's SegmentSequence: \n";
610
0
  for (const SegmentSequence::value_type& entry : segments_) {
611
0
    ret.append(Substitute("Segment: $0 Footer: $1\n",
612
0
                          entry->header().sequence_number(),
613
0
                          !entry->HasFooter() ? "NONE" : entry->footer().ShortDebugString()));
614
0
  }
615
0
  return ret;
616
0
}
617
618
}  // namespace log
619
}  // namespace yb