/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_log_iter_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | // Introduction of SyncPoint effectively disabled building and running this test |
25 | | // in Release build. |
26 | | // which is a pity, it is a good test |
27 | | #if !defined(ROCKSDB_LITE) |
28 | | |
29 | | #include "yb/rocksdb/db/db_test_util.h" |
30 | | #include "yb/rocksdb/port/stack_trace.h" |
31 | | |
32 | | #include "yb/util/test_macros.h" |
33 | | |
34 | | namespace rocksdb { |
35 | | |
36 | | class DBTestXactLogIterator : public DBTestBase { |
37 | | public: |
38 | 7 | DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {} |
39 | | |
40 | | std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter( |
41 | 48 | const SequenceNumber seq) { |
42 | 48 | unique_ptr<TransactionLogIterator> iter; |
43 | 48 | Status status = dbfull()->GetUpdatesSince(seq, &iter); |
44 | 48 | EXPECT_OK(status); |
45 | 48 | EXPECT_TRUE(iter->Valid()); |
46 | 48 | return iter; |
47 | 48 | } |
48 | | }; |
49 | | |
50 | | namespace { |
51 | | SequenceNumber ReadRecords( |
52 | | const std::unique_ptr<TransactionLogIterator>& iter, |
53 | 42 | int* const count) { |
54 | 42 | *count = 0; |
55 | 42 | SequenceNumber lastSequence = 0; |
56 | 42 | BatchResult res; |
57 | 2.73k | while (iter->Valid()) { |
58 | 2.68k | res = iter->GetBatch(); |
59 | 2.68k | EXPECT_TRUE(res.sequence > lastSequence); |
60 | 2.68k | ++*count; |
61 | 2.68k | lastSequence = res.sequence; |
62 | 2.68k | EXPECT_OK(iter->status()); |
63 | 2.68k | iter->Next(); |
64 | 2.68k | } |
65 | 42 | return res.sequence; |
66 | 42 | } |
67 | | |
68 | | void ExpectRecords( |
69 | | const int expected_no_records, |
70 | 37 | const std::unique_ptr<TransactionLogIterator>& iter) { |
71 | 37 | int num_records; |
72 | 37 | ReadRecords(iter, &num_records); |
73 | 37 | ASSERT_EQ(num_records, expected_no_records); |
74 | 37 | } |
75 | | } // namespace |
76 | | |
77 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIterator) { |
78 | 5 | do { |
79 | 5 | Options options = OptionsForLogIterTest(); |
80 | 5 | DestroyAndReopen(options); |
81 | 5 | CreateAndReopenWithCF({"pikachu"}, options); |
82 | 5 | ASSERT_OK(Put(0, "key1", DummyString(1024))); |
83 | 5 | ASSERT_OK(Put(1, "key2", DummyString(1024))); |
84 | 5 | ASSERT_OK(Put(1, "key2", DummyString(1024))); |
85 | 5 | ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); |
86 | 5 | { |
87 | 5 | auto iter = OpenTransactionLogIter(0); |
88 | 5 | ExpectRecords(3, iter); |
89 | 5 | } |
90 | 5 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
91 | 5 | env_->SleepForMicroseconds(2 * 1000 * 1000); |
92 | 5 | { |
93 | 5 | ASSERT_OK(Put(0, "key4", DummyString(1024))); |
94 | 5 | ASSERT_OK(Put(1, "key5", DummyString(1024))); |
95 | 5 | ASSERT_OK(Put(0, "key6", DummyString(1024))); |
96 | 5 | } |
97 | 5 | { |
98 | 5 | auto iter = OpenTransactionLogIter(0); |
99 | 5 | ExpectRecords(6, iter); |
100 | 5 | } |
101 | 5 | } while (ChangeCompactOptions()); |
102 | 1 | } |
103 | | |
104 | | #ifndef NDEBUG // sync point is not included with DNDEBUG build |
105 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { |
106 | 1 | static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; |
107 | 1 | static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { |
108 | 1 | {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1", |
109 | 1 | "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"}, |
110 | 1 | {"WalManager::GetSortedWalsOfType:1", |
111 | 1 | "WalManager::PurgeObsoleteFiles:1", |
112 | 1 | "WalManager::PurgeObsoleteFiles:2", |
113 | 1 | "WalManager::GetSortedWalsOfType:2"}}; |
114 | 3 | for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) { |
115 | | // Setup sync point dependency to reproduce the race condition of |
116 | | // a log file moved to archived dir, in the middle of GetSortedWalFiles |
117 | 2 | rocksdb::SyncPoint::GetInstance()->LoadDependency( |
118 | 2 | { { sync_points[test][0], sync_points[test][1] }, |
119 | 2 | { sync_points[test][2], sync_points[test][3] }, |
120 | 2 | }); |
121 | | |
122 | 6 | do { |
123 | 6 | rocksdb::SyncPoint::GetInstance()->ClearTrace(); |
124 | 6 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
125 | 6 | Options options = OptionsForLogIterTest(); |
126 | 6 | DestroyAndReopen(options); |
127 | 6 | ASSERT_OK(Put("key1", DummyString(1024))); |
128 | 6 | ASSERT_OK(dbfull()->Flush(FlushOptions())); |
129 | 6 | ASSERT_OK(Put("key2", DummyString(1024))); |
130 | 6 | ASSERT_OK(dbfull()->Flush(FlushOptions())); |
131 | 6 | ASSERT_OK(Put("key3", DummyString(1024))); |
132 | 6 | ASSERT_OK(dbfull()->Flush(FlushOptions())); |
133 | 6 | ASSERT_OK(Put("key4", DummyString(1024))); |
134 | 6 | ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); |
135 | | |
136 | 6 | { |
137 | 6 | auto iter = OpenTransactionLogIter(0); |
138 | 6 | ExpectRecords(4, iter); |
139 | 6 | } |
140 | | |
141 | 6 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
142 | | // trigger async flush, and log move. Well, log move will |
143 | | // wait until the GetSortedWalFiles:1 to reproduce the race |
144 | | // condition |
145 | 6 | FlushOptions flush_options; |
146 | 6 | flush_options.wait = false; |
147 | 6 | ASSERT_OK(dbfull()->Flush(flush_options)); |
148 | | |
149 | | // "key5" would be written in a new memtable and log |
150 | 6 | ASSERT_OK(Put("key5", DummyString(1024))); |
151 | 6 | { |
152 | | // this iter would miss "key4" if not fixed |
153 | 6 | auto iter = OpenTransactionLogIter(0); |
154 | 6 | ExpectRecords(5, iter); |
155 | 6 | } |
156 | 6 | } while (ChangeCompactOptions()); |
157 | 2 | } |
158 | 1 | } |
159 | | #endif |
160 | | |
161 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) { |
162 | 5 | do { |
163 | 5 | Options options = OptionsForLogIterTest(); |
164 | 5 | DestroyAndReopen(options); |
165 | 5 | ASSERT_OK(Put("key1", DummyString(1024))); |
166 | 5 | auto iter = OpenTransactionLogIter(0); |
167 | 5 | ASSERT_OK(iter->status()); |
168 | 5 | ASSERT_TRUE(iter->Valid()); |
169 | 5 | iter->Next(); |
170 | 5 | ASSERT_TRUE(!iter->Valid()); |
171 | 5 | ASSERT_OK(iter->status()); |
172 | 5 | ASSERT_OK(Put("key2", DummyString(1024))); |
173 | 5 | iter->Next(); |
174 | 5 | ASSERT_OK(iter->status()); |
175 | 5 | ASSERT_TRUE(iter->Valid()); |
176 | 5 | } while (ChangeCompactOptions()); |
177 | 1 | } |
178 | | |
179 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) { |
180 | 5 | do { |
181 | 5 | Options options = OptionsForLogIterTest(); |
182 | 5 | DestroyAndReopen(options); |
183 | 5 | ASSERT_OK(Put("key1", DummyString(1024))); |
184 | 5 | ASSERT_OK(Put("key2", DummyString(1023))); |
185 | 5 | ASSERT_OK(dbfull()->Flush(FlushOptions())); |
186 | 5 | Reopen(options); |
187 | 5 | auto iter = OpenTransactionLogIter(0); |
188 | 5 | ExpectRecords(2, iter); |
189 | 5 | } while (ChangeCompactOptions()); |
190 | 1 | } |
191 | | |
192 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { |
193 | 5 | do { |
194 | 5 | Options options = OptionsForLogIterTest(); |
195 | 5 | DestroyAndReopen(options); |
196 | 5.12k | for (int i = 0; i < 1024; i++) { |
197 | 5.12k | ASSERT_OK(Put("key"+ToString(i), DummyString(10))); |
198 | 5.12k | } |
199 | 5 | ASSERT_OK(dbfull()->Flush(FlushOptions())); |
200 | | // Corrupt this log to create a gap |
201 | 5 | rocksdb::VectorLogPtr wal_files; |
202 | 5 | ASSERT_OK(dbfull()->GetSortedWalFiles(&wal_files)); |
203 | 5 | const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName(); |
204 | 5 | if (mem_env_) { |
205 | 0 | ASSERT_OK(mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2)); |
206 | 5 | } else { |
207 | 5 | ASSERT_EQ(0, truncate(logfile_path.c_str(), |
208 | 5 | wal_files.front()->SizeFileBytes() / 2)); |
209 | 5 | } |
210 | | |
211 | | // Insert a new entry to a new log file |
212 | 5 | ASSERT_OK(Put("key1025", DummyString(10))); |
213 | | // Try to read from the beginning. Should stop before the gap and read less |
214 | | // than 1025 entries |
215 | 5 | auto iter = OpenTransactionLogIter(0); |
216 | 5 | int count; |
217 | 5 | SequenceNumber last_sequence_read = ReadRecords(iter, &count); |
218 | 5 | ASSERT_LT(last_sequence_read, 1025U); |
219 | | // Try to read past the gap, should be able to seek to key1025 |
220 | 5 | auto iter2 = OpenTransactionLogIter(last_sequence_read + 1); |
221 | 5 | ExpectRecords(1, iter2); |
222 | 5 | } while (ChangeCompactOptions()); |
223 | 1 | } |
224 | | |
225 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) { |
226 | 5 | do { |
227 | 5 | Options options = OptionsForLogIterTest(); |
228 | 5 | DestroyAndReopen(options); |
229 | 5 | CreateAndReopenWithCF({"pikachu"}, options); |
230 | 5 | WriteBatch batch; |
231 | 5 | batch.Put(handles_[1], "key1", DummyString(1024)); |
232 | 5 | batch.Put(handles_[0], "key2", DummyString(1024)); |
233 | 5 | batch.Put(handles_[1], "key3", DummyString(1024)); |
234 | 5 | batch.Delete(handles_[0], "key2"); |
235 | 5 | ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); |
236 | 5 | ASSERT_OK(Flush(1)); |
237 | 5 | ASSERT_OK(Flush(0)); |
238 | 5 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
239 | 5 | ASSERT_OK(Put(1, "key4", DummyString(1024))); |
240 | 5 | auto iter = OpenTransactionLogIter(3); |
241 | 5 | ExpectRecords(2, iter); |
242 | 5 | } while (ChangeCompactOptions()); |
243 | 1 | } |
244 | | |
245 | 1 | TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) { |
246 | 1 | Options options = OptionsForLogIterTest(); |
247 | 1 | DestroyAndReopen(options); |
248 | 1 | CreateAndReopenWithCF({"pikachu"}, options); |
249 | 1 | { |
250 | 1 | WriteBatch batch; |
251 | 1 | batch.Put(handles_[1], "key1", DummyString(1024)); |
252 | 1 | batch.Put(handles_[0], "key2", DummyString(1024)); |
253 | 1 | batch.PutLogData(Slice("blob1")); |
254 | 1 | batch.Put(handles_[1], "key3", DummyString(1024)); |
255 | 1 | batch.PutLogData(Slice("blob2")); |
256 | 1 | batch.Delete(handles_[0], "key2"); |
257 | 1 | ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); |
258 | 1 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
259 | 1 | } |
260 | | |
261 | 1 | auto res = OpenTransactionLogIter(0)->GetBatch(); |
262 | 1 | struct Handler : public WriteBatch::Handler { |
263 | 1 | std::string seen; |
264 | 1 | virtual Status PutCF(uint32_t cf, const SliceParts& key, |
265 | 3 | const SliceParts& value) override { |
266 | 3 | seen += "Put(" + ToString(cf) + ", " + key.TheOnlyPart().ToString() + ", " + |
267 | 3 | ToString(value.TheOnlyPart().size()) + ")"; |
268 | 3 | return Status::OK(); |
269 | 3 | } |
270 | 1 | virtual Status MergeCF(uint32_t cf, const Slice& key, |
271 | 0 | const Slice& value) override { |
272 | 0 | seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " + |
273 | 0 | ToString(value.size()) + ")"; |
274 | 0 | return Status::OK(); |
275 | 0 | } |
276 | 2 | void LogData(const Slice& blob) override { |
277 | 2 | seen += "LogData(" + blob.ToString() + ")"; |
278 | 2 | } |
279 | 1 | Status DeleteCF(uint32_t cf, const Slice& key) override { |
280 | 1 | seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")"; |
281 | 1 | return Status::OK(); |
282 | 1 | } |
283 | 1 | } handler; |
284 | 1 | ASSERT_OK(res.writeBatchPtr->Iterate(&handler)); |
285 | 1 | ASSERT_EQ( |
286 | 1 | "Put(1, key1, 1024)" |
287 | 1 | "Put(0, key2, 1024)" |
288 | 1 | "LogData(blob1)" |
289 | 1 | "Put(1, key3, 1024)" |
290 | 1 | "LogData(blob2)" |
291 | 1 | "Delete(0, key2)", |
292 | 1 | handler.seen); |
293 | 1 | } |
294 | | } // namespace rocksdb |
295 | | |
296 | | #endif // !defined(ROCKSDB_LITE) |
297 | | |
298 | 13.2k | int main(int argc, char** argv) { |
299 | 13.2k | #if !defined(ROCKSDB_LITE) |
300 | 13.2k | rocksdb::port::InstallStackTraceHandler(); |
301 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
302 | 13.2k | return RUN_ALL_TESTS(); |
303 | | #else |
304 | | return 0; |
305 | | #endif |
306 | 13.2k | } |