YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/transaction_log_impl.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
#ifndef __STDC_FORMAT_MACROS
23
#define __STDC_FORMAT_MACROS
24
#endif
25
26
#include <inttypes.h>
27
#include "yb/rocksdb/db/transaction_log_impl.h"
28
#include "yb/rocksdb/db/write_batch_internal.h"
29
#include "yb/rocksdb/util/file_reader_writer.h"
30
31
namespace rocksdb {
32
33
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
34
    const std::string& dir, const DBOptions* options,
35
    const TransactionLogIterator::ReadOptions& read_options,
36
    const EnvOptions& soptions, const SequenceNumber seq,
37
    std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions)
38
    : dir_(dir),
39
      options_(options),
40
      read_options_(read_options),
41
      soptions_(soptions),
42
      startingSequenceNumber_(seq),
43
      files_(std::move(files)),
44
      started_(false),
45
      isValid_(false),
46
      currentFileIndex_(0),
47
      currentBatchSeq_(0),
48
      currentLastSeq_(0),
49
50
      versions_(versions) {
50
50
  assert(files_ != nullptr);
51
50
  assert(versions_ != nullptr);
52
53
50
  reporter_.env = options_->env;
54
50
  reporter_.info_log = options_->info_log.get();
55
50
  SeekToStartSequence(); // Seek till starting sequence
56
50
}
57
58
Status TransactionLogIteratorImpl::OpenLogFile(
59
117
    const LogFile* logFile, unique_ptr<SequentialFileReader>* file_reader) {
60
117
  Env* env = options_->env;
61
117
  unique_ptr<SequentialFile> file;
62
117
  Status s;
63
117
  if (logFile->Type() == kArchivedLogFile) {
64
75
    std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
65
75
    s = env->NewSequentialFile(fname, &file, soptions_);
66
42
  } else {
67
42
    std::string fname = LogFileName(dir_, logFile->LogNumber());
68
42
    s = env->NewSequentialFile(fname, &file, soptions_);
69
42
    if (!s.ok()) {
70
      //  If cannot open file in DB directory.
71
      //  Try the archive dir, as it could have moved in the meanwhile.
72
0
      fname = ArchivedLogFileName(dir_, logFile->LogNumber());
73
0
      s = env->NewSequentialFile(fname, &file, soptions_);
74
0
    }
75
42
  }
76
117
  if (s.ok()) {
77
117
    file_reader->reset(new SequentialFileReader(std::move(file)));
78
117
  }
79
117
  return s;
80
117
}
81
82
2.69k
BatchResult TransactionLogIteratorImpl::GetBatch()  {
83
2.69k
  assert(isValid_);  //  cannot call in a non valid state.
84
2.69k
  BatchResult result;
85
2.69k
  result.sequence = currentBatchSeq_;
86
2.69k
  result.writeBatchPtr = std::move(currentBatch_);
87
2.69k
  return result;
88
2.69k
}
89
90
2.75k
Status TransactionLogIteratorImpl::status() {
91
2.75k
  return currentStatus_;
92
2.75k
}
93
94
2.79k
bool TransactionLogIteratorImpl::Valid() {
95
2.79k
  return started_ && isValid_;
96
2.79k
}
97
98
bool TransactionLogIteratorImpl::RestrictedRead(
99
    Slice* record,
100
7.95k
    std::string* scratch) {
101
  // Don't read if no more complete entries to read from logs
102
7.95k
  if (currentLastSeq_ >= versions_->LastSequence()) {
103
43
    return false;
104
43
  }
105
7.91k
  return currentLogReader_->ReadRecord(record, scratch);
106
7.91k
}
107
108
void TransactionLogIteratorImpl::SeekToStartSequence(
109
    uint64_t startFileIndex,
110
55
    bool strict) {
111
55
  std::string scratch;
112
55
  Slice record;
113
55
  started_ = false;
114
55
  isValid_ = false;
115
55
  if (files_->size() <= startFileIndex) {
116
1
    return;
117
1
  }
118
54
  Status s = OpenLogReader(files_->at(startFileIndex).get());
119
54
  if (!s.ok()) {
120
0
    currentStatus_ = s;
121
0
    reporter_.Info(currentStatus_.ToString().c_str());
122
0
    return;
123
0
  }
124
5.18k
  while (RestrictedRead(&record, &scratch)) {
125
5.17k
    if (record.size() < 12) {
126
0
      reporter_.Corruption(
127
0
        record.size(), STATUS(Corruption, "very small log record"));
128
0
      continue;
129
0
    }
130
5.17k
    UpdateCurrentWriteBatch(record);
131
5.17k
    if (currentLastSeq_ >= startingSequenceNumber_) {
132
44
      if (strict && currentBatchSeq_ != startingSequenceNumber_) {
133
0
        currentStatus_ = STATUS(Corruption, "Gap in sequence number. Could not "
134
0
                                            "seek to required sequence number");
135
0
        reporter_.Info(currentStatus_.ToString().c_str());
136
0
        return;
137
44
      } else if (strict) {
138
0
        reporter_.Info("Could seek required sequence number. Iterator will "
139
0
                       "continue.");
140
0
      }
141
44
      isValid_ = true;
142
44
      started_ = true; // set started_ as we could seek till starting sequence
143
44
      return;
144
5.13k
    } else {
145
5.13k
      isValid_ = false;
146
5.13k
    }
147
5.17k
  }
148
149
  // Could not find start sequence in first file. Normally this must be the
150
  // only file. Otherwise log the error and let the iterator return next entry
151
  // If strict is set, we want to seek exactly till the start sequence and it
152
  // should have been present in the file we scanned above
153
10
  if (strict) {
154
5
    currentStatus_ = STATUS(Corruption, "Gap in sequence number. Could not "
155
5
                                        "seek to required sequence number");
156
5
    reporter_.Info(currentStatus_.ToString().c_str());
157
5
  } else if (files_->size() != 1) {
158
5
    currentStatus_ = STATUS(Corruption, "Start sequence was not found, "
159
5
                                        "skipping to the next available");
160
5
    reporter_.Info(currentStatus_.ToString().c_str());
161
    // Let NextImpl find the next available entry. started_ remains false
162
    // because we don't want to check for gaps while moving to start sequence
163
5
    NextImpl(true);
164
5
  }
165
10
}
166
167
2.70k
void TransactionLogIteratorImpl::Next() {
168
2.70k
  return NextImpl(false);
169
2.70k
}
170
171
2.70k
void TransactionLogIteratorImpl::NextImpl(bool internal) {
172
2.70k
  std::string scratch;
173
2.70k
  Slice record;
174
2.70k
  isValid_ = false;
175
2.70k
  if (!internal && !started_) {
176
    // Runs every time until we can seek to the start sequence
177
0
    return SeekToStartSequence();
178
0
  }
179
2.76k
  while(true) {
180
2.76k
    assert(currentLogReader_);
181
2.76k
    if (currentLogReader_->IsEOF()) {
182
2.70k
      currentLogReader_->UnmarkEOF();
183
2.70k
    }
184
2.76k
    while (RestrictedRead(&record, &scratch)) {
185
2.66k
      if (record.size() < 12) {
186
0
        reporter_.Corruption(
187
0
          record.size(), STATUS(Corruption, "very small log record"));
188
0
        continue;
189
2.66k
      } else {
190
        // started_ should be true if called by application
191
2.66k
        assert(internal || started_);
192
        // started_ should be false if called internally
193
2.66k
        assert(!internal || !started_);
194
2.66k
        UpdateCurrentWriteBatch(record);
195
2.66k
        if (internal && !started_) {
196
5
          started_ = true;
197
5
        }
198
2.66k
        return;
199
2.66k
      }
200
2.66k
    }
201
202
    // Open the next file
203
106
    if (currentFileIndex_ < files_->size() - 1) {
204
63
      ++currentFileIndex_;
205
63
      Status s = OpenLogReader(files_->at(currentFileIndex_).get());
206
63
      if (!s.ok()) {
207
0
        isValid_ = false;
208
0
        currentStatus_ = s;
209
0
        return;
210
0
      }
211
43
    } else {
212
43
      isValid_ = false;
213
43
      if (currentLastSeq_ == versions_->LastSequence()) {
214
43
        currentStatus_ = Status::OK();
215
0
      } else {
216
0
        currentStatus_ = STATUS(Corruption, "NO MORE DATA LEFT");
217
0
      }
218
43
      return;
219
43
    }
220
106
  }
221
2.70k
}
222
223
bool TransactionLogIteratorImpl::IsBatchExpected(
224
    const WriteBatch* batch,
225
2.65k
    const SequenceNumber expectedSeq) {
226
2.65k
  assert(batch);
227
2.65k
  SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
228
2.65k
  if (batchSeq != expectedSeq) {
229
5
    char buf[200];
230
5
    snprintf(buf, sizeof(buf),
231
5
             "Discontinuity in log records. Got seq=%" PRIu64
232
5
             ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
233
5
             ".Log iterator will reseek the correct batch.",
234
5
             batchSeq, expectedSeq, versions_->LastSequence());
235
5
    reporter_.Info(buf);
236
5
    return false;
237
5
  }
238
2.65k
  return true;
239
2.65k
}
240
241
7.83k
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
242
7.83k
  std::unique_ptr<WriteBatch> batch(new WriteBatch());
243
7.83k
  WriteBatchInternal::SetContents(batch.get(), record);
244
245
7.83k
  SequenceNumber expectedSeq = currentLastSeq_ + 1;
246
  // If the iterator has started, then confirm that we get continuous batches
247
7.83k
  if (started_ && !IsBatchExpected(batch.get(), expectedSeq)) {
248
    // Seek to the batch having expected sequence number
249
5
    if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) {
250
      // Expected batch must lie in the previous log file
251
      // Avoid underflow.
252
5
      if (currentFileIndex_ != 0) {
253
5
        currentFileIndex_--;
254
5
      }
255
5
    }
256
5
    startingSequenceNumber_ = expectedSeq;
257
    // currentStatus_ will be set to Ok if reseek succeeds
258
5
    currentStatus_ = STATUS(NotFound, "Gap in sequence numbers");
259
5
    return SeekToStartSequence(currentFileIndex_, true);
260
5
  }
261
262
7.83k
  currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get());
263
7.83k
  currentLastSeq_ = currentBatchSeq_ +
264
7.83k
                    WriteBatchInternal::Count(batch.get()) - 1;
265
  // currentBatchSeq_ can only change here
266
7.83k
  assert(currentLastSeq_ <= versions_->LastSequence());
267
268
7.83k
  currentBatch_ = move(batch);
269
7.83k
  isValid_ = true;
270
7.83k
  currentStatus_ = Status::OK();
271
7.83k
}
272
273
117
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
274
117
  unique_ptr<SequentialFileReader> file;
275
117
  Status s = OpenLogFile(logFile, &file);
276
117
  if (!s.ok()) {
277
0
    return s;
278
0
  }
279
117
  assert(file);
280
117
  currentLogReader_.reset(new log::Reader(options_->info_log,
281
117
                                          std::move(file),
282
117
                                          &reporter_,
283
117
                                          read_options_.verify_checksums_,
284
117
                                          0,
285
117
                                          logFile->LogNumber()));
286
117
  return Status::OK();
287
117
}
288
}  //  namespace rocksdb
289
#endif  // ROCKSDB_LITE