/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/transaction_log_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | #ifndef __STDC_FORMAT_MACROS |
23 | | #define __STDC_FORMAT_MACROS |
24 | | #endif |
25 | | |
26 | | #include <inttypes.h> |
27 | | #include "yb/rocksdb/db/transaction_log_impl.h" |
28 | | #include "yb/rocksdb/db/write_batch_internal.h" |
29 | | #include "yb/rocksdb/util/file_reader_writer.h" |
30 | | |
31 | | namespace rocksdb { |
32 | | |
33 | | TransactionLogIteratorImpl::TransactionLogIteratorImpl( |
34 | | const std::string& dir, const DBOptions* options, |
35 | | const TransactionLogIterator::ReadOptions& read_options, |
36 | | const EnvOptions& soptions, const SequenceNumber seq, |
37 | | std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions) |
38 | | : dir_(dir), |
39 | | options_(options), |
40 | | read_options_(read_options), |
41 | | soptions_(soptions), |
42 | | startingSequenceNumber_(seq), |
43 | | files_(std::move(files)), |
44 | | started_(false), |
45 | | isValid_(false), |
46 | | currentFileIndex_(0), |
47 | | currentBatchSeq_(0), |
48 | | currentLastSeq_(0), |
49 | 50 | versions_(versions) { |
50 | 50 | assert(files_ != nullptr); |
51 | 50 | assert(versions_ != nullptr); |
52 | | |
53 | 50 | reporter_.env = options_->env; |
54 | 50 | reporter_.info_log = options_->info_log.get(); |
55 | 50 | SeekToStartSequence(); // Seek till starting sequence |
56 | 50 | } |
57 | | |
58 | | Status TransactionLogIteratorImpl::OpenLogFile( |
59 | 117 | const LogFile* logFile, unique_ptr<SequentialFileReader>* file_reader) { |
60 | 117 | Env* env = options_->env; |
61 | 117 | unique_ptr<SequentialFile> file; |
62 | 117 | Status s; |
63 | 117 | if (logFile->Type() == kArchivedLogFile) { |
64 | 75 | std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); |
65 | 75 | s = env->NewSequentialFile(fname, &file, soptions_); |
66 | 42 | } else { |
67 | 42 | std::string fname = LogFileName(dir_, logFile->LogNumber()); |
68 | 42 | s = env->NewSequentialFile(fname, &file, soptions_); |
69 | 42 | if (!s.ok()) { |
70 | | // If cannot open file in DB directory. |
71 | | // Try the archive dir, as it could have moved in the meanwhile. |
72 | 0 | fname = ArchivedLogFileName(dir_, logFile->LogNumber()); |
73 | 0 | s = env->NewSequentialFile(fname, &file, soptions_); |
74 | 0 | } |
75 | 42 | } |
76 | 117 | if (s.ok()) { |
77 | 117 | file_reader->reset(new SequentialFileReader(std::move(file))); |
78 | 117 | } |
79 | 117 | return s; |
80 | 117 | } |
81 | | |
82 | 2.69k | BatchResult TransactionLogIteratorImpl::GetBatch() { |
83 | 2.69k | assert(isValid_); // cannot call in a non valid state. |
84 | 2.69k | BatchResult result; |
85 | 2.69k | result.sequence = currentBatchSeq_; |
86 | 2.69k | result.writeBatchPtr = std::move(currentBatch_); |
87 | 2.69k | return result; |
88 | 2.69k | } |
89 | | |
90 | 2.75k | Status TransactionLogIteratorImpl::status() { |
91 | 2.75k | return currentStatus_; |
92 | 2.75k | } |
93 | | |
94 | 2.79k | bool TransactionLogIteratorImpl::Valid() { |
95 | 2.79k | return started_ && isValid_; |
96 | 2.79k | } |
97 | | |
98 | | bool TransactionLogIteratorImpl::RestrictedRead( |
99 | | Slice* record, |
100 | 7.95k | std::string* scratch) { |
101 | | // Don't read if no more complete entries to read from logs |
102 | 7.95k | if (currentLastSeq_ >= versions_->LastSequence()) { |
103 | 43 | return false; |
104 | 43 | } |
105 | 7.91k | return currentLogReader_->ReadRecord(record, scratch); |
106 | 7.91k | } |
107 | | |
108 | | void TransactionLogIteratorImpl::SeekToStartSequence( |
109 | | uint64_t startFileIndex, |
110 | 55 | bool strict) { |
111 | 55 | std::string scratch; |
112 | 55 | Slice record; |
113 | 55 | started_ = false; |
114 | 55 | isValid_ = false; |
115 | 55 | if (files_->size() <= startFileIndex) { |
116 | 1 | return; |
117 | 1 | } |
118 | 54 | Status s = OpenLogReader(files_->at(startFileIndex).get()); |
119 | 54 | if (!s.ok()) { |
120 | 0 | currentStatus_ = s; |
121 | 0 | reporter_.Info(currentStatus_.ToString().c_str()); |
122 | 0 | return; |
123 | 0 | } |
124 | 5.18k | while (RestrictedRead(&record, &scratch)) { |
125 | 5.17k | if (record.size() < 12) { |
126 | 0 | reporter_.Corruption( |
127 | 0 | record.size(), STATUS(Corruption, "very small log record")); |
128 | 0 | continue; |
129 | 0 | } |
130 | 5.17k | UpdateCurrentWriteBatch(record); |
131 | 5.17k | if (currentLastSeq_ >= startingSequenceNumber_) { |
132 | 44 | if (strict && currentBatchSeq_ != startingSequenceNumber_) { |
133 | 0 | currentStatus_ = STATUS(Corruption, "Gap in sequence number. Could not " |
134 | 0 | "seek to required sequence number"); |
135 | 0 | reporter_.Info(currentStatus_.ToString().c_str()); |
136 | 0 | return; |
137 | 44 | } else if (strict) { |
138 | 0 | reporter_.Info("Could seek required sequence number. Iterator will " |
139 | 0 | "continue."); |
140 | 0 | } |
141 | 44 | isValid_ = true; |
142 | 44 | started_ = true; // set started_ as we could seek till starting sequence |
143 | 44 | return; |
144 | 5.13k | } else { |
145 | 5.13k | isValid_ = false; |
146 | 5.13k | } |
147 | 5.17k | } |
148 | | |
149 | | // Could not find start sequence in first file. Normally this must be the |
150 | | // only file. Otherwise log the error and let the iterator return next entry |
151 | | // If strict is set, we want to seek exactly till the start sequence and it |
152 | | // should have been present in the file we scanned above |
153 | 10 | if (strict) { |
154 | 5 | currentStatus_ = STATUS(Corruption, "Gap in sequence number. Could not " |
155 | 5 | "seek to required sequence number"); |
156 | 5 | reporter_.Info(currentStatus_.ToString().c_str()); |
157 | 5 | } else if (files_->size() != 1) { |
158 | 5 | currentStatus_ = STATUS(Corruption, "Start sequence was not found, " |
159 | 5 | "skipping to the next available"); |
160 | 5 | reporter_.Info(currentStatus_.ToString().c_str()); |
161 | | // Let NextImpl find the next available entry. started_ remains false |
162 | | // because we don't want to check for gaps while moving to start sequence |
163 | 5 | NextImpl(true); |
164 | 5 | } |
165 | 10 | } |
166 | | |
167 | 2.70k | void TransactionLogIteratorImpl::Next() { |
168 | 2.70k | return NextImpl(false); |
169 | 2.70k | } |
170 | | |
171 | 2.70k | void TransactionLogIteratorImpl::NextImpl(bool internal) { |
172 | 2.70k | std::string scratch; |
173 | 2.70k | Slice record; |
174 | 2.70k | isValid_ = false; |
175 | 2.70k | if (!internal && !started_) { |
176 | | // Runs every time until we can seek to the start sequence |
177 | 0 | return SeekToStartSequence(); |
178 | 0 | } |
179 | 2.76k | while(true) { |
180 | 2.76k | assert(currentLogReader_); |
181 | 2.76k | if (currentLogReader_->IsEOF()) { |
182 | 2.70k | currentLogReader_->UnmarkEOF(); |
183 | 2.70k | } |
184 | 2.76k | while (RestrictedRead(&record, &scratch)) { |
185 | 2.66k | if (record.size() < 12) { |
186 | 0 | reporter_.Corruption( |
187 | 0 | record.size(), STATUS(Corruption, "very small log record")); |
188 | 0 | continue; |
189 | 2.66k | } else { |
190 | | // started_ should be true if called by application |
191 | 2.66k | assert(internal || started_); |
192 | | // started_ should be false if called internally |
193 | 2.66k | assert(!internal || !started_); |
194 | 2.66k | UpdateCurrentWriteBatch(record); |
195 | 2.66k | if (internal && !started_) { |
196 | 5 | started_ = true; |
197 | 5 | } |
198 | 2.66k | return; |
199 | 2.66k | } |
200 | 2.66k | } |
201 | | |
202 | | // Open the next file |
203 | 106 | if (currentFileIndex_ < files_->size() - 1) { |
204 | 63 | ++currentFileIndex_; |
205 | 63 | Status s = OpenLogReader(files_->at(currentFileIndex_).get()); |
206 | 63 | if (!s.ok()) { |
207 | 0 | isValid_ = false; |
208 | 0 | currentStatus_ = s; |
209 | 0 | return; |
210 | 0 | } |
211 | 43 | } else { |
212 | 43 | isValid_ = false; |
213 | 43 | if (currentLastSeq_ == versions_->LastSequence()) { |
214 | 43 | currentStatus_ = Status::OK(); |
215 | 0 | } else { |
216 | 0 | currentStatus_ = STATUS(Corruption, "NO MORE DATA LEFT"); |
217 | 0 | } |
218 | 43 | return; |
219 | 43 | } |
220 | 106 | } |
221 | 2.70k | } |
222 | | |
223 | | bool TransactionLogIteratorImpl::IsBatchExpected( |
224 | | const WriteBatch* batch, |
225 | 2.65k | const SequenceNumber expectedSeq) { |
226 | 2.65k | assert(batch); |
227 | 2.65k | SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch); |
228 | 2.65k | if (batchSeq != expectedSeq) { |
229 | 5 | char buf[200]; |
230 | 5 | snprintf(buf, sizeof(buf), |
231 | 5 | "Discontinuity in log records. Got seq=%" PRIu64 |
232 | 5 | ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64 |
233 | 5 | ".Log iterator will reseek the correct batch.", |
234 | 5 | batchSeq, expectedSeq, versions_->LastSequence()); |
235 | 5 | reporter_.Info(buf); |
236 | 5 | return false; |
237 | 5 | } |
238 | 2.65k | return true; |
239 | 2.65k | } |
240 | | |
241 | 7.83k | void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { |
242 | 7.83k | std::unique_ptr<WriteBatch> batch(new WriteBatch()); |
243 | 7.83k | WriteBatchInternal::SetContents(batch.get(), record); |
244 | | |
245 | 7.83k | SequenceNumber expectedSeq = currentLastSeq_ + 1; |
246 | | // If the iterator has started, then confirm that we get continuous batches |
247 | 7.83k | if (started_ && !IsBatchExpected(batch.get(), expectedSeq)) { |
248 | | // Seek to the batch having expected sequence number |
249 | 5 | if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) { |
250 | | // Expected batch must lie in the previous log file |
251 | | // Avoid underflow. |
252 | 5 | if (currentFileIndex_ != 0) { |
253 | 5 | currentFileIndex_--; |
254 | 5 | } |
255 | 5 | } |
256 | 5 | startingSequenceNumber_ = expectedSeq; |
257 | | // currentStatus_ will be set to Ok if reseek succeeds |
258 | 5 | currentStatus_ = STATUS(NotFound, "Gap in sequence numbers"); |
259 | 5 | return SeekToStartSequence(currentFileIndex_, true); |
260 | 5 | } |
261 | | |
262 | 7.83k | currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get()); |
263 | 7.83k | currentLastSeq_ = currentBatchSeq_ + |
264 | 7.83k | WriteBatchInternal::Count(batch.get()) - 1; |
265 | | // currentBatchSeq_ can only change here |
266 | 7.83k | assert(currentLastSeq_ <= versions_->LastSequence()); |
267 | | |
268 | 7.83k | currentBatch_ = move(batch); |
269 | 7.83k | isValid_ = true; |
270 | 7.83k | currentStatus_ = Status::OK(); |
271 | 7.83k | } |
272 | | |
273 | 117 | Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { |
274 | 117 | unique_ptr<SequentialFileReader> file; |
275 | 117 | Status s = OpenLogFile(logFile, &file); |
276 | 117 | if (!s.ok()) { |
277 | 0 | return s; |
278 | 0 | } |
279 | 117 | assert(file); |
280 | 117 | currentLogReader_.reset(new log::Reader(options_->info_log, |
281 | 117 | std::move(file), |
282 | 117 | &reporter_, |
283 | 117 | read_options_.verify_checksums_, |
284 | 117 | 0, |
285 | 117 | logFile->LogNumber())); |
286 | 117 | return Status::OK(); |
287 | 117 | } |
288 | | } // namespace rocksdb |
289 | | #endif // ROCKSDB_LITE |