YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log_reader.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log_reader.h"
34
35
#include <algorithm>
36
#include <mutex>
37
38
#include <glog/logging.h>
39
40
#include "yb/consensus/consensus_util.h"
41
#include "yb/consensus/log_index.h"
42
#include "yb/consensus/log_util.h"
43
44
#include "yb/gutil/dynamic_annotations.h"
45
46
#include "yb/util/env_util.h"
47
#include "yb/util/flag_tags.h"
48
#include "yb/util/logging.h"
49
#include "yb/util/metrics.h"
50
#include "yb/util/monotime.h"
51
#include "yb/util/path_util.h"
52
#include "yb/util/result.h"
53
54
DEFINE_bool(enable_log_retention_by_op_idx, true,
55
            "If true, logs will be retained based on an op id passed by the cdc service");
56
57
DEFINE_int32(log_max_seconds_to_retain, 24 * 3600, "Log files that are older will be "
58
             "deleted even if they contain cdc unreplicated entries. If 0, this flag will be "
59
             "ignored. This flag is ignored if a log segment contains entries that haven't been"
60
             "flushed to RocksDB.");
61
62
DEFINE_int64(log_stop_retaining_min_disk_mb, 100 * 1024, "Stop retaining logs if the space "
63
             "available for the logs falls below this limit. This flag is ignored if a log segment "
64
             "contains unflushed entries.");
65
66
METRIC_DEFINE_counter(tablet, log_reader_bytes_read, "Bytes Read From Log",
67
                      yb::MetricUnit::kBytes,
68
                      "Data read from the WAL since tablet start");
69
70
METRIC_DEFINE_counter(tablet, log_reader_entries_read, "Entries Read From Log",
71
                      yb::MetricUnit::kEntries,
72
                      "Number of entries read from the WAL since tablet start");
73
74
METRIC_DEFINE_coarse_histogram(table, log_reader_read_batch_latency, "Log Read Latency",
75
                        yb::MetricUnit::kBytes,
76
                        "Microseconds spent reading log entry batches");
77
78
DEFINE_test_flag(bool, record_segments_violate_max_time_policy, false,
79
    "If set, everytime GetSegmentPrefixNotIncluding runs, segments that violate the max time "
80
    "policy will be appended to LogReader::segments_violate_max_time_policy_.");
81
82
DEFINE_test_flag(bool, record_segments_violate_min_space_policy, false,
83
    "If set, everytime GetSegmentPrefixNotIncluding runs, segments that violate the max time "
84
    "policy will be appended to LogReader::segments_violate_min_space_policy_.");
85
86
DEFINE_bool(get_changes_honor_deadline, true,
87
            "Toggle whether to honor the deadline passed to log reader");
88
89
DEFINE_test_flag(int32, get_changes_read_loop_delay_ms, 0,
90
                 "Amount of time to sleep for between each iteration of the loop in "
91
                 "ReadReplicatesInRange. This is used to test the return of partial results.");
92
93
namespace yb {
94
namespace log {
95
96
namespace {
97
struct LogSegmentSeqnoComparator {
98
  bool operator() (const scoped_refptr<ReadableLogSegment>& a,
99
27.6k
                   const scoped_refptr<ReadableLogSegment>& b) {
100
27.6k
    return a->header().sequence_number() < b->header().sequence_number();
101
27.6k
  }
102
};
103
}
104
105
using consensus::ReplicateMsg;
106
using env_util::ReadFully;
107
using strings::Substitute;
108
109
const int64_t LogReader::kNoSizeLimit = -1;
110
111
Status LogReader::Open(Env *env,
112
                       const scoped_refptr<LogIndex>& index,
113
                       std::string log_prefix,
114
                       const std::string& tablet_wal_path,
115
                       const scoped_refptr<MetricEntity>& table_metric_entity,
116
                       const scoped_refptr<MetricEntity>& tablet_metric_entity,
117
89.6k
                       std::unique_ptr<LogReader> *reader) {
118
89.6k
  std::unique_ptr<LogReader> log_reader(new LogReader(
119
89.6k
      env, index, std::move(log_prefix), table_metric_entity, tablet_metric_entity));
120
121
89.6k
  RETURN_NOT_OK(log_reader->Init(tablet_wal_path));
122
89.6k
  *reader = std::move(log_reader);
123
89.6k
  return Status::OK();
124
89.6k
}
125
126
LogReader::LogReader(Env* env,
127
                     const scoped_refptr<LogIndex>& index,
128
                     std::string log_prefix,
129
                     const scoped_refptr<MetricEntity>& table_metric_entity,
130
                     const scoped_refptr<MetricEntity>& tablet_metric_entity)
131
    : env_(env),
132
      log_index_(index),
133
      log_prefix_(std::move(log_prefix)),
134
89.6k
      state_(kLogReaderInitialized) {
135
89.6k
  if (table_metric_entity) {
136
89.1k
    read_batch_latency_ = METRIC_log_reader_read_batch_latency.Instantiate(table_metric_entity);
137
89.1k
  }
138
89.6k
  if (tablet_metric_entity) {
139
89.2k
    bytes_read_ = METRIC_log_reader_bytes_read.Instantiate(tablet_metric_entity);
140
89.2k
    entries_read_ = METRIC_log_reader_entries_read.Instantiate(tablet_metric_entity);
141
89.2k
  }
142
89.6k
  if (PREDICT_FALSE(FLAGS_enable_log_retention_by_op_idx &&
143
89.6k
                        (FLAGS_TEST_record_segments_violate_max_time_policy ||
144
0
                         FLAGS_TEST_record_segments_violate_min_space_policy))) {
145
0
    segments_violate_max_time_policy_ = std::make_unique<SegmentSequence>();
146
0
    segments_violate_min_space_policy_ = std::make_unique<SegmentSequence>();
147
0
  }
148
89.6k
}
149
150
48.5k
LogReader::~LogReader() {
151
48.5k
}
152
153
89.6k
Status LogReader::Init(const string& tablet_wal_path) {
154
89.6k
  {
155
89.6k
    std::lock_guard<simple_spinlock> lock(lock_);
156
0
    CHECK_EQ(state_, kLogReaderInitialized) << "bad state for Init(): " << state_;
157
89.6k
  }
158
18.4E
  VLOG_WITH_PREFIX(1) << "Reading wal from path:" << tablet_wal_path;
159
160
89.6k
  if (!env_->FileExists(tablet_wal_path)) {
161
0
    return STATUS(IllegalState, "Cannot find wal location at", tablet_wal_path);
162
0
  }
163
164
18.4E
  VLOG_WITH_PREFIX(1) << "Parsing segments from path: " << tablet_wal_path;
165
166
89.6k
  std::vector<string> files_from_log_directory;
167
89.6k
  RETURN_NOT_OK_PREPEND(env_->GetChildren(tablet_wal_path, &files_from_log_directory),
168
89.6k
                        "Unable to read children from path");
169
170
89.6k
  SegmentSequence read_segments;
171
172
  // Build a log segment from log files, ignoring non log files.
173
189k
  for (const string &potential_log_file : files_from_log_directory) {
174
189k
    if (!IsLogFileName(potential_log_file)) {
175
180k
      continue;
176
180k
    }
177
178
9.23k
    string fqp = JoinPathSegments(tablet_wal_path, potential_log_file);
179
9.23k
    auto segment = VERIFY_RESULT_PREPEND(ReadableLogSegment::Open(env_, fqp),
180
9.23k
                                         Format("Unable to open readable log segment: $0", fqp));
181
9.23k
    if (!segment) {
182
1
      LOG_WITH_PREFIX(INFO) << "Log segment w/o header: " << fqp << ", skipping";
183
1
      continue;
184
1
    }
185
30
    CHECK(segment->IsInitialized()) << "Uninitialized segment at: " << segment->path();
186
187
9.23k
    if (!segment->HasFooter()) {
188
2.33k
      LOG_WITH_PREFIX(WARNING)
189
2.33k
          << "Log segment " << fqp << " was likely left in-progress "
190
2.33k
             "after a previous crash. Will try to rebuild footer by scanning data.";
191
2.33k
      RETURN_NOT_OK(segment->RebuildFooterByScanning());
192
2.33k
    }
193
194
9.23k
    read_segments.push_back(segment);
195
9.23k
  }
196
197
  // Sort the segments by sequence number.
198
89.6k
  std::sort(read_segments.begin(), read_segments.end(), LogSegmentSeqnoComparator());
199
200
89.6k
  {
201
89.6k
    std::lock_guard<simple_spinlock> lock(lock_);
202
203
89.6k
    string previous_seg_path;
204
89.6k
    int64_t previous_seg_seqno = -1;
205
9.20k
    for (const SegmentSequence::value_type& entry : read_segments) {
206
18.4E
      VLOG_WITH_PREFIX(1) << " Log Reader Indexed: " << entry->footer().ShortDebugString();
207
      // Check that the log segments are in sequence.
208
9.20k
      if (previous_seg_seqno != -1 &&
209
7.42k
          entry->header().sequence_number() != implicit_cast<size_t>(previous_seg_seqno) + 1) {
210
0
        return STATUS(Corruption, Substitute("Segment sequence numbers are not consecutive. "
211
0
            "Previous segment: seqno $0, path $1; Current segment: seqno $2, path $3",
212
0
            previous_seg_seqno, previous_seg_path,
213
0
            entry->header().sequence_number(), entry->path()));
214
0
        previous_seg_seqno++;
215
9.20k
      } else {
216
9.20k
        previous_seg_seqno = entry->header().sequence_number();
217
9.20k
      }
218
9.20k
      previous_seg_path = entry->path();
219
9.20k
      RETURN_NOT_OK(AppendSegmentUnlocked(entry));
220
9.20k
    }
221
222
89.6k
    state_ = kLogReaderReading;
223
89.6k
  }
224
89.6k
  return Status::OK();
225
89.6k
}
226
227
1
Status LogReader::InitEmptyReaderForTests() {
228
1
  std::lock_guard<simple_spinlock> lock(lock_);
229
1
  state_ = kLogReaderReading;
230
1
  return Status::OK();
231
1
}
232
233
0
bool LogReader::ViolatesMaxTimePolicy(const scoped_refptr<ReadableLogSegment>& segment) const {
234
0
  if (FLAGS_log_max_seconds_to_retain <= 0) {
235
0
    return false;
236
0
  }
237
238
0
  if (!segment->HasFooter()) {
239
0
    return false;
240
0
  }
241
242
0
  int64_t now = GetCurrentTimeMicros();
243
0
  int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000;
244
0
  if (age_seconds > FLAGS_log_max_seconds_to_retain) {
245
0
    YB_LOG_EVERY_N_SECS(WARNING, 300)
246
0
        << "Segment " << segment->path() << " violates max retention time policy. "
247
0
        << "Segment age: " << age_seconds << " seconds. "
248
0
        << "log_max_seconds_to_retain: " << FLAGS_log_max_seconds_to_retain;
249
0
    if (PREDICT_FALSE(FLAGS_TEST_record_segments_violate_max_time_policy)) {
250
0
      segments_violate_max_time_policy_->push_back(segment);
251
0
    }
252
0
    return true;
253
0
  }
254
0
  return false;
255
0
}
256
257
bool LogReader::ViolatesMinSpacePolicy(const scoped_refptr<ReadableLogSegment>& segment,
258
0
                                       int64_t *potential_reclaimed_space) const {
259
0
  if (FLAGS_log_stop_retaining_min_disk_mb <= 0) {
260
0
    return false;
261
0
  }
262
0
  auto free_space_result = env_->GetFreeSpaceBytes(segment->path());
263
0
  if (!free_space_result.ok()) {
264
0
    YB_LOG_EVERY_N_SECS(WARNING, 300) << "Unable to get free space: " << free_space_result;
265
0
    return false;
266
0
  } else {
267
0
    uint64_t free_space = *free_space_result;
268
0
    if (free_space + *potential_reclaimed_space <
269
0
            implicit_cast<size_t>(FLAGS_log_stop_retaining_min_disk_mb) * 1024) {
270
0
      YB_LOG_EVERY_N_SECS(WARNING, 300)
271
0
          << "Segment " << segment->path() << " violates minimum free space policy "
272
0
          << "specified by log_stop_retaining_min_disk_mb: "
273
0
          << FLAGS_log_stop_retaining_min_disk_mb;
274
0
      *potential_reclaimed_space += segment->file_size();
275
0
      if (PREDICT_FALSE(FLAGS_TEST_record_segments_violate_min_space_policy)) {
276
0
        segments_violate_min_space_policy_->push_back(segment);
277
0
      }
278
0
      return true;
279
0
    }
280
0
  }
281
0
  return false;
282
0
}
283
284
5
Status LogReader::GetSegmentPrefixNotIncluding(int64_t index, SegmentSequence* segments) const {
285
5
  return GetSegmentPrefixNotIncluding(index, index, segments);
286
5
}
287
288
Status LogReader::GetSegmentPrefixNotIncluding(int64_t index, int64_t cdc_max_replicated_index,
289
6.61M
                                               SegmentSequence* segments) const {
290
6.61M
  DCHECK_GE(index, 0);
291
6.61M
  DCHECK(segments);
292
6.61M
  segments->clear();
293
294
6.61M
  std::lock_guard<simple_spinlock> lock(lock_);
295
6.61M
  CHECK_EQ(state_, kLogReaderReading);
296
297
6.61M
  int64_t reclaimed_space = 0;
298
6.81M
  for (const scoped_refptr<ReadableLogSegment>& segment : segments_) {
299
    // The last segment doesn't have a footer. Never include that one.
300
6.81M
    if (!segment->HasFooter()) {
301
6.31M
      break;
302
6.31M
    }
303
304
    // Never garbage collect log segments with unflushed entries.
305
498k
    if (segment->footer().max_replicate_index() >= index) {
306
301k
      break;
307
301k
    }
308
309
    // This log segment contains cdc unreplicated entries. Don't GC it unless the file is too old
310
    // (controlled by flag FLAGS_log_max_seconds_to_retain) or we don't have enough space for the
311
    // logs (controlled by flag FLAGS_log_stop_retaining_min_disk_mb).
312
196k
    if (FLAGS_enable_log_retention_by_op_idx &&
313
196k
        segment->footer().max_replicate_index() >= cdc_max_replicated_index) {
314
315
      // Since this log file contains cdc unreplicated entries, we don't want to GC it unless
316
      // it's too old, or we don't have enough space to store log files.
317
318
0
      if (!ViolatesMaxTimePolicy(segment) && !ViolatesMinSpacePolicy(segment, &reclaimed_space)) {
319
        // We exit the loop since this log segment already contains cdc unreplicated entries and so
320
        // do all subsequent files.
321
0
        break;
322
0
      }
323
196k
    }
324
325
    // TODO: tests for edge cases here with backwards ordered replicates.
326
196k
    segments->push_back(segment);
327
196k
  }
328
329
6.61M
  return Status::OK();
330
6.61M
}
331
332
354
int64_t LogReader::GetMinReplicateIndex() const {
333
354
  std::lock_guard<simple_spinlock> lock(lock_);
334
354
  int64_t min_remaining_op_idx = -1;
335
336
442
  for (const scoped_refptr<ReadableLogSegment>& segment : segments_) {
337
442
    if (!segment->HasFooter()) continue;
338
90
    if (!segment->footer().has_min_replicate_index()) continue;
339
90
    if (min_remaining_op_idx == -1 ||
340
50
        segment->footer().min_replicate_index() < min_remaining_op_idx) {
341
50
      min_remaining_op_idx = segment->footer().min_replicate_index();
342
50
    }
343
90
  }
344
354
  return min_remaining_op_idx;
345
354
}
346
347
2.53M
scoped_refptr<ReadableLogSegment> LogReader::GetSegmentBySequenceNumber(int64_t seq) const {
348
2.53M
  std::lock_guard<simple_spinlock> lock(lock_);
349
2.53M
  if (segments_.empty()) {
350
0
    return nullptr;
351
0
  }
352
353
  // We always have a contiguous set of log segments, so we can find the requested
354
  // segment in our vector by calculating its offset vs the first element.
355
2.53M
  int64_t first_seqno = segments_[0]->header().sequence_number();
356
2.53M
  size_t relative = seq - first_seqno;
357
2.53M
  if (relative >= segments_.size()) {
358
1.00k
    return nullptr;
359
1.00k
  }
360
361
2.53M
  DCHECK_EQ(segments_[relative]->header().sequence_number(), seq);
362
2.53M
  return segments_[relative];
363
2.53M
}
364
365
Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
366
                                           faststring* tmp_buf,
367
26.0k
                                           LogEntryBatchPB* batch) const {
368
26.0k
  const int64_t index = index_entry.op_id.index;
369
370
26.0k
  scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
371
26.0k
    index_entry.segment_sequence_number);
372
26.0k
  if (PREDICT_FALSE(!segment)) {
373
1
    return STATUS(NotFound, Substitute("Segment $0 which contained index $1 has been GCed",
374
1
                                       index_entry.segment_sequence_number,
375
1
                                       index));
376
1
  }
377
378
26.0k
  CHECK_GT(index_entry.offset_in_segment, 0);
379
26.0k
  int64_t offset = index_entry.offset_in_segment;
380
26.0k
  ScopedLatencyMetric scoped(read_batch_latency_.get());
381
26.0k
  RETURN_NOT_OK_PREPEND(segment->ReadEntryHeaderAndBatch(&offset, tmp_buf, batch),
382
26.0k
                        Substitute("Failed to read LogEntry for index $0 from log segment "
383
26.0k
                                   "$1 offset $2",
384
26.0k
                                   index,
385
26.0k
                                   index_entry.segment_sequence_number,
386
26.0k
                                   index_entry.offset_in_segment));
387
388
26.0k
  if (bytes_read_) {
389
25.7k
    bytes_read_->IncrementBy(kEntryHeaderSize + tmp_buf->length());
390
25.7k
    entries_read_->IncrementBy(batch->entry_size());
391
25.7k
  }
392
393
26.0k
  return Status::OK();
394
26.0k
}
395
396
Status LogReader::ReadReplicatesInRange(
397
    const int64_t starting_at,
398
    const int64_t up_to,
399
    int64_t max_bytes_to_read,
400
    ReplicateMsgs* replicates,
401
    int64_t* starting_op_segment_seq_num,
402
    yb::SchemaPB* modified_schema,
403
    uint32_t* modified_schema_version,
404
398
    CoarseTimePoint deadline) const {
405
398
  DCHECK_GT(starting_at, 0);
406
398
  DCHECK_GE(up_to, starting_at);
407
0
  DCHECK(log_index_) << "Require an index to random-read logs";
408
398
  ReplicateMsgs replicates_tmp;
409
398
  LogIndexEntry prev_index_entry;
410
398
  prev_index_entry.segment_sequence_number = -1;
411
398
  prev_index_entry.offset_in_segment = -1;
412
413
  // Remove the deadline if the GetChanges deadline feature is disabled.
414
398
  if (!ANNOTATE_UNPROTECTED_READ(FLAGS_get_changes_honor_deadline)) {
415
0
    deadline = CoarseTimePoint::max();
416
0
  }
417
418
398
  int64_t total_size = 0;
419
398
  bool limit_exceeded = false;
420
398
  faststring tmp_buf;
421
398
  LogEntryBatchPB batch;
422
1.33k
  for (int64_t index = starting_at; index <= up_to && !limit_exceeded; index++) {
423
    // Stop reading if a deadline was specified and the deadline has been exceeded.
424
940
    if (deadline != CoarseTimePoint::max() && CoarseMonoClock::Now() >= deadline) {
425
0
      break;
426
0
    }
427
428
940
    if (PREDICT_FALSE(FLAGS_TEST_get_changes_read_loop_delay_ms > 0)) {
429
0
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_get_changes_read_loop_delay_ms));
430
0
    }
431
432
940
    LogIndexEntry index_entry;
433
940
    RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
434
940
                          Substitute("Failed to read log index for op $0", index));
435
436
940
    if (index == starting_at && starting_op_segment_seq_num != nullptr) {
437
398
      *starting_op_segment_seq_num = index_entry.segment_sequence_number;
438
398
    }
439
    // Since a given LogEntryBatch may contain multiple REPLICATE messages,
440
    // it's likely that this index entry points to the same batch as the previous
441
    // one. If that's the case, we've already read this REPLICATE and we can
442
    // skip reading the batch again.
443
940
    if (index == starting_at ||
444
542
        index_entry.segment_sequence_number != prev_index_entry.segment_sequence_number ||
445
934
        index_entry.offset_in_segment != prev_index_entry.offset_in_segment) {
446
      // Make read operation.
447
934
      RETURN_NOT_OK(ReadBatchUsingIndexEntry(index_entry, &tmp_buf, &batch));
448
449
      // Sanity-check the property that a batch should only have increasing indexes.
450
934
      int64_t prev_index = 0;
451
1.87k
      for (int i = 0; i < batch.entry_size(); ++i) {
452
940
        LogEntryPB* entry = batch.mutable_entry(i);
453
940
        if (!entry->has_replicate()) continue;
454
940
        int64_t this_index = entry->replicate().id().index();
455
0
        CHECK_GT(this_index, prev_index)
456
0
          << "Expected that an entry batch should only include increasing log indexes: "
457
0
          << index_entry.ToString()
458
0
          << "\nBatch: " << batch.DebugString();
459
940
        prev_index = this_index;
460
940
      }
461
934
    }
462
463
940
    bool found = false;
464
950
    for (int i = 0; i < batch.entry_size(); ++i) {
465
950
      LogEntryPB* entry = batch.mutable_entry(i);
466
950
      if (!entry->has_replicate()) {
467
10
        continue;
468
10
      }
469
470
940
      if (entry->replicate().id().index() != index) {
471
0
        continue;
472
0
      }
473
474
940
      int64_t space_required = entry->replicate().SpaceUsed();
475
940
      if (replicates_tmp.empty() ||
476
542
          max_bytes_to_read <= 0 ||
477
940
          total_size + space_required < max_bytes_to_read) {
478
940
        total_size += space_required;
479
940
        replicates_tmp.emplace_back(entry->release_replicate());
480
940
        if (replicates_tmp.back()->op_type() == consensus::OperationType::CHANGE_METADATA_OP &&
481
301
            modified_schema != nullptr && modified_schema_version != nullptr) {
482
301
          (*modified_schema).CopyFrom(replicates_tmp.back()->change_metadata_request().schema());
483
301
          *modified_schema_version = replicates_tmp.back()->change_metadata_request().
484
301
            schema_version();
485
301
        }
486
0
      } else {
487
0
        limit_exceeded = true;
488
0
      }
489
940
      found = true;
490
940
      break;
491
940
    }
492
0
    CHECK(found) << "Incorrect index entry didn't yield expected log entry: "
493
0
                 << index_entry.ToString();
494
495
940
    prev_index_entry = index_entry;
496
940
  }
497
498
398
  replicates->swap(replicates_tmp);
499
398
  return Status::OK();
500
398
}
501
502
14.7M
Result<yb::OpId> LogReader::LookupOpId(int64_t op_index) const {
503
14.7M
  LogIndexEntry index_entry;
504
14.7M
  RETURN_NOT_OK_PREPEND(log_index_->GetEntry(op_index, &index_entry),
505
14.7M
                        strings::Substitute("Failed to read log index for op $0", op_index));
506
14.7M
  return index_entry.op_id;
507
14.7M
}
508
509
4.21M
Result<int64_t> LogReader::LookupHeader(int64_t op_index) const {
510
4.21M
  LogIndexEntry index_entry;
511
4.21M
  Status st = log_index_->GetEntry(op_index, &index_entry);
512
4.21M
  if (st.IsNotFound()) {
513
1.71M
    return -1;
514
1.71M
  }
515
2.50M
  return index_entry.segment_sequence_number;
516
2.50M
}
517
518
308k
Status LogReader::GetSegmentsSnapshot(SegmentSequence* segments) const {
519
308k
  std::lock_guard<simple_spinlock> lock(lock_);
520
308k
  CHECK_EQ(state_, kLogReaderReading);
521
308k
  segments->assign(segments_.begin(), segments_.end());
522
308k
  return Status::OK();
523
308k
}
524
525
41
Status LogReader::TrimSegmentsUpToAndIncluding(uint64_t segment_sequence_number) {
526
41
  std::lock_guard<simple_spinlock> lock(lock_);
527
41
  CHECK_EQ(state_, kLogReaderReading);
528
41
  auto iter = segments_.begin();
529
41
  std::vector<int64_t> deleted_segments;
530
531
128
  while (iter != segments_.end()) {
532
128
    auto current_seq_no = (*iter)->header().sequence_number();
533
128
    if (current_seq_no > segment_sequence_number) {
534
41
      break;
535
41
    }
536
87
    deleted_segments.push_back(current_seq_no);
537
87
    iter = segments_.erase(iter);
538
87
  }
539
41
  LOG_WITH_PREFIX(INFO) << "Removed log segment sequence numbers from log reader: "
540
41
                        << yb::ToString(deleted_segments);
541
41
  return Status::OK();
542
41
}
543
544
13.9M
void LogReader::UpdateLastSegmentOffset(int64_t readable_to_offset) {
545
13.9M
  std::lock_guard<simple_spinlock> lock(lock_);
546
13.9M
  CHECK_EQ(state_, kLogReaderReading);
547
13.9M
  DCHECK(!segments_.empty());
548
  // Get the last segment
549
13.9M
  ReadableLogSegment* segment = segments_.back().get();
550
13.9M
  DCHECK(!segment->HasFooter());
551
13.9M
  segment->UpdateReadableToOffset(readable_to_offset);
552
13.9M
}
553
554
55.1k
Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment) {
555
  // This is used to replace the last segment once we close it properly so it must
556
  // have a footer.
557
55.1k
  DCHECK(segment->HasFooter());
558
559
55.1k
  std::lock_guard<simple_spinlock> lock(lock_);
560
55.1k
  CHECK_EQ(state_, kLogReaderReading);
561
  // Make sure the segment we're replacing has the same sequence number
562
55.1k
  CHECK(!segments_.empty());
563
55.1k
  CHECK_EQ(segment->header().sequence_number(), segments_.back()->header().sequence_number());
564
55.1k
  segments_[segments_.size() - 1] = segment;
565
566
55.1k
  return Status::OK();
567
55.1k
}
568
569
3
Status LogReader::AppendSegment(const scoped_refptr<ReadableLogSegment>& segment) {
570
3
  DCHECK(segment->IsInitialized());
571
3
  if (PREDICT_FALSE(!segment->HasFooter())) {
572
0
    RETURN_NOT_OK(segment->RebuildFooterByScanning());
573
0
  }
574
3
  std::lock_guard<simple_spinlock> lock(lock_);
575
3
  return AppendSegmentUnlocked(segment);
576
3
}
577
578
9.20k
Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment) {
579
9.20k
  DCHECK(segment->IsInitialized());
580
9.20k
  DCHECK(segment->HasFooter());
581
582
9.20k
  if (!segments_.empty()) {
583
7.42k
    CHECK_EQ(segments_.back()->header().sequence_number() + 1,
584
7.42k
             segment->header().sequence_number());
585
7.42k
  }
586
9.20k
  segments_.push_back(segment);
587
9.20k
  return Status::OK();
588
9.20k
}
589
590
97.0k
Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment) {
591
97.0k
  DCHECK(segment->IsInitialized());
592
97.0k
  std::lock_guard<simple_spinlock> lock(lock_);
593
97.0k
  CHECK_EQ(state_, kLogReaderReading);
594
97.0k
  if (!segments_.empty()) {
595
9.11k
    CHECK_EQ(segments_.back()->header().sequence_number() + 1,
596
9.11k
             segment->header().sequence_number());
597
9.11k
  }
598
97.0k
  segments_.push_back(segment);
599
97.0k
  return Status::OK();
600
97.0k
}
601
602
6.70M
size_t LogReader::num_segments() const {
603
6.70M
  std::lock_guard<simple_spinlock> lock(lock_);
604
6.70M
  return segments_.size();
605
6.70M
}
606
607
0
string LogReader::ToString() const {
608
0
  std::lock_guard<simple_spinlock> lock(lock_);
609
0
  string ret = "Reader's SegmentSequence: \n";
610
0
  for (const SegmentSequence::value_type& entry : segments_) {
611
0
    ret.append(Substitute("Segment: $0 Footer: $1\n",
612
0
                          entry->header().sequence_number(),
613
0
                          !entry->HasFooter() ? "NONE" : entry->footer().ShortDebugString()));
614
0
  }
615
0
  return ret;
616
0
}
617
618
}  // namespace log
619
}  // namespace yb