/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/fault_injection_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright 2014 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | // This test uses a custom Env to keep track of the state of a filesystem as of |
25 | | // the last "sync". It then checks for data loss errors by purposely dropping |
26 | | // file data (or entire files) not protected by a "sync". |
27 | | |
28 | | #include <map> |
29 | | #include <set> |
30 | | |
31 | | #include <gtest/gtest.h> |
32 | | |
33 | | #include "yb/rocksdb/cache.h" |
34 | | #include "yb/rocksdb/db.h" |
35 | | #include "yb/rocksdb/db/db_impl.h" |
36 | | #include "yb/rocksdb/env.h" |
37 | | #include "yb/rocksdb/table.h" |
38 | | #include "yb/rocksdb/util/mock_env.h" |
39 | | #include "yb/rocksdb/util/mutexlock.h" |
40 | | #include "yb/rocksdb/util/sync_point.h" |
41 | | #include "yb/rocksdb/util/testharness.h" |
42 | | #include "yb/rocksdb/util/testutil.h" |
43 | | |
44 | | #include "yb/util/test_macros.h" |
45 | | |
46 | | namespace rocksdb { |
47 | | |
48 | | static const int kValueSize = 1000; |
49 | | static const int kMaxNumValues = 2000; |
50 | | static const size_t kNumIterations = 3; |
51 | | |
52 | | class TestWritableFile; |
53 | | class FaultInjectionTestEnv; |
54 | | |
55 | | namespace { |
56 | | |
57 | | // Assume a filename, and not a directory name like "/foo/bar/" |
58 | 10.4k | static std::string GetDirName(const std::string filename) { |
59 | 10.4k | size_t found = filename.find_last_of("/\\"); |
60 | 10.4k | if (found == std::string::npos) { |
61 | 0 | return ""; |
62 | 10.4k | } else { |
63 | 10.4k | return filename.substr(0, found); |
64 | 10.4k | } |
65 | 10.4k | } |
66 | | |
67 | | // Trim the tailing "/" in the end of `str` |
68 | 308 | static std::string TrimDirname(const std::string& str) { |
69 | 308 | size_t found = str.find_last_not_of("/"); |
70 | 308 | if (found == std::string::npos) { |
71 | 0 | return str; |
72 | 0 | } |
73 | 308 | return str.substr(0, found + 1); |
74 | 308 | } |
75 | | |
76 | | // Return pair <parent directory name, file name> of a full path. |
77 | | static std::pair<std::string, std::string> GetDirAndName( |
78 | 10.2k | const std::string& name) { |
79 | 10.2k | std::string dirname = GetDirName(name); |
80 | 10.2k | std::string fname = name.substr(dirname.size() + 1); |
81 | 10.2k | return std::make_pair(dirname, fname); |
82 | 10.2k | } |
83 | | |
84 | | // A basic file truncation function suitable for this test. |
85 | 218 | Status Truncate(Env* env, const std::string& filename, uint64_t length) { |
86 | 218 | unique_ptr<SequentialFile> orig_file; |
87 | 218 | const EnvOptions options; |
88 | 218 | Status s = env->NewSequentialFile(filename, &orig_file, options); |
89 | 218 | if (!s.ok()) { |
90 | 0 | fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), |
91 | 0 | s.ToString().c_str()); |
92 | 0 | return s; |
93 | 0 | } |
94 | | |
95 | 218 | std::unique_ptr<uint8_t[]> scratch(new uint8_t[length]); |
96 | 218 | rocksdb::Slice result; |
97 | 218 | s = orig_file->Read(length, &result, scratch.get()); |
98 | | #ifdef OS_WIN |
99 | | orig_file.reset(); |
100 | | #endif |
101 | 218 | if (s.ok()) { |
102 | 218 | std::string tmp_name = GetDirName(filename) + "/truncate.tmp"; |
103 | 218 | unique_ptr<WritableFile> tmp_file; |
104 | 218 | s = env->NewWritableFile(tmp_name, &tmp_file, options); |
105 | 218 | if (s.ok()) { |
106 | 218 | s = tmp_file->Append(result); |
107 | 218 | if (s.ok()) { |
108 | 218 | s = env->RenameFile(tmp_name, filename); |
109 | 0 | } else { |
110 | 0 | fprintf(stderr, "Cannot rename file %s to %s: %s\n", tmp_name.c_str(), |
111 | 0 | filename.c_str(), s.ToString().c_str()); |
112 | 0 | RETURN_NOT_OK(env->DeleteFile(tmp_name)); |
113 | 0 | } |
114 | 218 | } |
115 | 218 | } |
116 | 218 | if (!s.ok()) { |
117 | 0 | fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), |
118 | 0 | s.ToString().c_str()); |
119 | 0 | } |
120 | | |
121 | 218 | return s; |
122 | 218 | } |
123 | | |
124 | | struct FileState { |
125 | | std::string filename_; |
126 | | ssize_t pos_; |
127 | | ssize_t pos_at_last_sync_; |
128 | | ssize_t pos_at_last_flush_; |
129 | | |
130 | | explicit FileState(const std::string& filename) |
131 | | : filename_(filename), |
132 | | pos_(-1), |
133 | | pos_at_last_sync_(-1), |
134 | 2.78k | pos_at_last_flush_(-1) { } |
135 | | |
136 | 3.09k | FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} |
137 | | |
138 | 1.26k | bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } |
139 | | |
140 | | Status DropUnsyncedData(Env* env) const; |
141 | | |
142 | | Status DropRandomUnsyncedData(Env* env, Random* rand) const; |
143 | | }; |
144 | | |
145 | | } // anonymous namespace |
146 | | |
147 | | // A wrapper around WritableFileWriter* file |
148 | | // is written to or sync'ed. |
149 | | class TestWritableFile : public WritableFile { |
150 | | public: |
151 | | explicit TestWritableFile(const std::string& fname, |
152 | | unique_ptr<WritableFile>&& f, |
153 | | FaultInjectionTestEnv* env); |
154 | | virtual ~TestWritableFile(); |
155 | | Status Append(const Slice& data) override; |
156 | 2.15k | Status Truncate(uint64_t size) override { return target_->Truncate(size); } |
157 | | Status Close() override; |
158 | | Status Flush() override; |
159 | | Status Sync() override; |
160 | 8 | bool IsSyncThreadSafe() const override { return true; } |
161 | | |
162 | | private: |
163 | | FileState state_; |
164 | | unique_ptr<WritableFile> target_; |
165 | | bool writable_file_opened_; |
166 | | FaultInjectionTestEnv* env_; |
167 | | }; |
168 | | |
169 | | class TestDirectory : public Directory { |
170 | | public: |
171 | | explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, |
172 | | Directory* dir) |
173 | 308 | : env_(env), dirname_(dirname), dir_(dir) {} |
174 | 308 | ~TestDirectory() {} |
175 | | |
176 | | Status Fsync() override; |
177 | | |
178 | | private: |
179 | | FaultInjectionTestEnv* env_; |
180 | | std::string dirname_; |
181 | | unique_ptr<Directory> dir_; |
182 | | }; |
183 | | |
184 | | class FaultInjectionTestEnv : public EnvWrapper { |
185 | | public: |
186 | | explicit FaultInjectionTestEnv(Env* base) |
187 | | : EnvWrapper(base), |
188 | 8 | filesystem_active_(true) {} |
189 | 8 | virtual ~FaultInjectionTestEnv() { } |
190 | | |
191 | | Status NewDirectory(const std::string& name, |
192 | 308 | unique_ptr<Directory>* result) override { |
193 | 308 | unique_ptr<Directory> r; |
194 | 308 | Status s = target()->NewDirectory(name, &r); |
195 | 308 | EXPECT_OK(s); |
196 | 308 | if (!s.ok()) { |
197 | 0 | return s; |
198 | 0 | } |
199 | 308 | result->reset(new TestDirectory(this, TrimDirname(name), r.release())); |
200 | 308 | return Status::OK(); |
201 | 308 | } |
202 | | |
203 | | Status NewWritableFile(const std::string& fname, |
204 | | unique_ptr<WritableFile>* result, |
205 | 2.78k | const EnvOptions& soptions) override { |
206 | 2.78k | if (!IsFilesystemActive()) { |
207 | 0 | return STATUS(Corruption, "Not Active"); |
208 | 0 | } |
209 | | // Not allow overwriting files |
210 | 2.78k | Status s = target()->FileExists(fname); |
211 | 2.78k | if (s.ok()) { |
212 | 0 | return STATUS(Corruption, "File already exists."); |
213 | 2.78k | } else if (!s.IsNotFound()) { |
214 | 0 | assert(s.IsIOError()); |
215 | 0 | return s; |
216 | 0 | } |
217 | 2.78k | s = target()->NewWritableFile(fname, result, soptions); |
218 | 2.78k | if (s.ok()) { |
219 | 2.78k | result->reset(new TestWritableFile(fname, std::move(*result), this)); |
220 | | // WritableFileWriter* file is opened |
221 | | // again then it will be truncated - so forget our saved state. |
222 | 2.78k | UntrackFile(fname); |
223 | 2.78k | MutexLock l(&mutex_); |
224 | 2.78k | open_files_.insert(fname); |
225 | 2.78k | auto dir_and_name = GetDirAndName(fname); |
226 | 2.78k | auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; |
227 | 2.78k | list.insert(dir_and_name.second); |
228 | 2.78k | } |
229 | 2.78k | return s; |
230 | 2.78k | } |
231 | | |
232 | 2.85k | Status DeleteFile(const std::string& f) override { |
233 | 2.85k | if (!IsFilesystemActive()) { |
234 | 32 | return STATUS(Corruption, "Not Active"); |
235 | 32 | } |
236 | 2.81k | Status s = EnvWrapper::DeleteFile(f); |
237 | 2.81k | if (!s.ok()) { |
238 | 0 | fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), |
239 | 0 | s.ToString().c_str()); |
240 | 0 | } |
241 | 2.81k | EXPECT_OK(s); |
242 | 2.81k | if (s.ok()) { |
243 | 2.81k | UntrackFile(f); |
244 | 2.81k | } |
245 | 2.81k | return s; |
246 | 2.81k | } |
247 | | |
248 | | virtual Status RenameFile(const std::string& s, |
249 | 934 | const std::string& t) override { |
250 | 934 | if (!IsFilesystemActive()) { |
251 | 0 | return STATUS(Corruption, "Not Active"); |
252 | 0 | } |
253 | 934 | Status ret = EnvWrapper::RenameFile(s, t); |
254 | | |
255 | 934 | if (ret.ok()) { |
256 | 934 | MutexLock l(&mutex_); |
257 | 934 | if (db_file_state_.find(s) != db_file_state_.end()) { |
258 | 626 | db_file_state_[t] = db_file_state_[s]; |
259 | 626 | db_file_state_.erase(s); |
260 | 626 | } |
261 | | |
262 | 934 | auto sdn = GetDirAndName(s); |
263 | 934 | auto tdn = GetDirAndName(t); |
264 | 934 | if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) { |
265 | 626 | auto& tlist = dir_to_new_files_since_last_sync_[tdn.first]; |
266 | 626 | assert(tlist.find(tdn.second) == tlist.end()); |
267 | 626 | tlist.insert(tdn.second); |
268 | 626 | } |
269 | 934 | } |
270 | | |
271 | 934 | return ret; |
272 | 934 | } |
273 | | |
274 | 2.78k | void WritableFileClosed(const FileState& state) { |
275 | 2.78k | MutexLock l(&mutex_); |
276 | 2.78k | if (open_files_.find(state.filename_) != open_files_.end()) { |
277 | 2.47k | db_file_state_[state.filename_] = state; |
278 | 2.47k | open_files_.erase(state.filename_); |
279 | 2.47k | } |
280 | 2.78k | } |
281 | | |
282 | | // For every file that is not fully synced, make a call to `func` with |
283 | | // InMemoryFileState of the file as the parameter. |
284 | 222 | Status DropFileData(std::function<Status(Env*, FileState)> func) { |
285 | 222 | Status s; |
286 | 222 | MutexLock l(&mutex_); |
287 | 222 | for (std::map<std::string, FileState>::const_iterator it = |
288 | 222 | db_file_state_.begin(); |
289 | 1.49k | s.ok() && it != db_file_state_.end(); ++it) { |
290 | 1.26k | const FileState& state = it->second; |
291 | 1.26k | if (!state.IsFullySynced()) { |
292 | 218 | s = func(target(), state); |
293 | 218 | } |
294 | 1.26k | } |
295 | 222 | return s; |
296 | 222 | } |
297 | | |
298 | 186 | Status DropUnsyncedFileData() { |
299 | 182 | return DropFileData([&](Env* env, const FileState& state) { |
300 | 182 | return state.DropUnsyncedData(env); |
301 | 182 | }); |
302 | 186 | } |
303 | | |
304 | 36 | Status DropRandomUnsyncedFileData(Random* rnd) { |
305 | 36 | return DropFileData([&](Env* env, const FileState& state) { |
306 | 36 | return state.DropRandomUnsyncedData(env, rnd); |
307 | 36 | }); |
308 | 36 | } |
309 | | |
310 | 148 | Status DeleteFilesCreatedAfterLastDirSync() { |
311 | | // Because DeleteFile access this container make a copy to avoid deadlock |
312 | 148 | std::map<std::string, std::set<std::string>> map_copy; |
313 | 148 | { |
314 | 148 | MutexLock l(&mutex_); |
315 | 148 | map_copy.insert(dir_to_new_files_since_last_sync_.begin(), |
316 | 148 | dir_to_new_files_since_last_sync_.end()); |
317 | 148 | } |
318 | | |
319 | 288 | for (auto& pair : map_copy) { |
320 | 72 | for (std::string name : pair.second) { |
321 | 72 | Status s = DeleteFile(pair.first + "/" + name); |
322 | 72 | if (!s.ok()) { |
323 | 0 | return s; |
324 | 0 | } |
325 | 72 | } |
326 | 288 | } |
327 | 148 | return Status::OK(); |
328 | 148 | } |
329 | 308 | void ResetState() { |
330 | 308 | MutexLock l(&mutex_); |
331 | 308 | db_file_state_.clear(); |
332 | 308 | dir_to_new_files_since_last_sync_.clear(); |
333 | 308 | SetFilesystemActiveNoLock(true); |
334 | 308 | } |
335 | | |
336 | 5.59k | void UntrackFile(const std::string& f) { |
337 | 5.59k | MutexLock l(&mutex_); |
338 | 5.59k | auto dir_and_name = GetDirAndName(f); |
339 | 5.59k | dir_to_new_files_since_last_sync_[dir_and_name.first].erase( |
340 | 5.59k | dir_and_name.second); |
341 | 5.59k | db_file_state_.erase(f); |
342 | 5.59k | open_files_.erase(f); |
343 | 5.59k | } |
344 | | |
345 | 776 | void SyncDir(const std::string& dirname) { |
346 | 776 | MutexLock l(&mutex_); |
347 | 776 | dir_to_new_files_since_last_sync_.erase(dirname); |
348 | 776 | } |
349 | | |
350 | | // Setting the filesystem to inactive is the test equivalent to simulating a |
351 | | // system reset. Setting to inactive will freeze our saved filesystem state so |
352 | | // that it will stop being recorded. It can then be reset back to the state at |
353 | | // the time of the reset. |
354 | 1.43M | bool IsFilesystemActive() { |
355 | 1.43M | MutexLock l(&mutex_); |
356 | 1.43M | return filesystem_active_; |
357 | 1.43M | } |
358 | 458 | void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; } |
359 | 150 | void SetFilesystemActive(bool active) { |
360 | 150 | MutexLock l(&mutex_); |
361 | 150 | SetFilesystemActiveNoLock(active); |
362 | 150 | } |
363 | 294 | void AssertNoOpenFile() { ASSERT_TRUE(open_files_.empty()); } |
364 | | |
365 | | private: |
366 | | port::Mutex mutex_; |
367 | | std::map<std::string, FileState> db_file_state_; |
368 | | std::set<std::string> open_files_; |
369 | | std::unordered_map<std::string, std::set<std::string>> |
370 | | dir_to_new_files_since_last_sync_; |
371 | | bool filesystem_active_; // Record flushes, syncs, writes |
372 | | }; |
373 | | |
374 | 182 | Status FileState::DropUnsyncedData(Env* env) const { |
375 | 182 | ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; |
376 | 182 | return Truncate(env, filename_, sync_pos); |
377 | 182 | } |
378 | | |
379 | 36 | Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const { |
380 | 36 | ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; |
381 | 36 | assert(pos_ >= sync_pos); |
382 | 36 | int range = static_cast<int>(pos_ - sync_pos); |
383 | 36 | uint64_t truncated_size = |
384 | 36 | static_cast<uint64_t>(sync_pos) + rand->Uniform(range); |
385 | 36 | return Truncate(env, filename_, truncated_size); |
386 | 36 | } |
387 | | |
388 | 776 | Status TestDirectory::Fsync() { |
389 | 776 | env_->SyncDir(dirname_); |
390 | 776 | return dir_->Fsync(); |
391 | 776 | } |
392 | | |
393 | | TestWritableFile::TestWritableFile(const std::string& fname, |
394 | | unique_ptr<WritableFile>&& f, |
395 | | FaultInjectionTestEnv* env) |
396 | | : state_(fname), |
397 | | target_(std::move(f)), |
398 | | writable_file_opened_(true), |
399 | 2.78k | env_(env) { |
400 | 2.78k | assert(target_ != nullptr); |
401 | 2.78k | state_.pos_ = 0; |
402 | 2.78k | } |
403 | | |
404 | 2.78k | TestWritableFile::~TestWritableFile() { |
405 | 2.78k | if (writable_file_opened_) { |
406 | 318 | CHECK_OK(Close()); |
407 | 318 | } |
408 | 2.78k | } |
409 | | |
410 | 711k | Status TestWritableFile::Append(const Slice& data) { |
411 | 711k | if (!env_->IsFilesystemActive()) { |
412 | 2 | return STATUS(Corruption, "Not Active"); |
413 | 2 | } |
414 | 711k | Status s = target_->Append(data); |
415 | 711k | if (s.ok()) { |
416 | 711k | state_.pos_ += data.size(); |
417 | 711k | } |
418 | 711k | return s; |
419 | 711k | } |
420 | | |
421 | 2.78k | Status TestWritableFile::Close() { |
422 | 2.78k | writable_file_opened_ = false; |
423 | 2.78k | Status s = target_->Close(); |
424 | 2.78k | if (s.ok()) { |
425 | 2.78k | env_->WritableFileClosed(state_); |
426 | 2.78k | } |
427 | 2.78k | return s; |
428 | 2.78k | } |
429 | | |
430 | 711k | Status TestWritableFile::Flush() { |
431 | 711k | Status s = target_->Flush(); |
432 | 711k | if (s.ok() && env_->IsFilesystemActive()) { |
433 | 711k | state_.pos_at_last_flush_ = state_.pos_; |
434 | 711k | } |
435 | 711k | return s; |
436 | 711k | } |
437 | | |
438 | 2.62k | Status TestWritableFile::Sync() { |
439 | 2.62k | if (!env_->IsFilesystemActive()) { |
440 | 0 | return Status::OK(); |
441 | 0 | } |
442 | | // No need to actual sync. |
443 | 2.62k | state_.pos_at_last_sync_ = state_.pos_; |
444 | 2.62k | return Status::OK(); |
445 | 2.62k | } |
446 | | |
447 | | class FaultInjectionTest : public RocksDBTest, |
448 | | public testing::WithParamInterface<bool> { |
449 | | protected: |
450 | | enum OptionConfig { |
451 | | kDefault, |
452 | | kDifferentDataDir, |
453 | | kWalDir, |
454 | | kSyncWal, |
455 | | kWalDirSyncWal, |
456 | | kMultiLevels, |
457 | | kEnd, |
458 | | }; |
459 | | int option_config_; |
460 | | // When need to make sure data is persistent, sync WAL |
461 | | bool sync_use_wal_; |
462 | | // When need to make sure data is persistent, call DB::CompactRange() |
463 | | bool sync_use_compact_; |
464 | | |
465 | | bool sequential_order_; |
466 | | |
467 | | protected: |
468 | | public: |
469 | | enum ExpectedVerifResult { kValExpectFound, kValExpectNoError }; |
470 | | enum ResetMethod { |
471 | | kResetDropUnsyncedData, |
472 | | kResetDropRandomUnsyncedData, |
473 | | kResetDeleteUnsyncedFiles, |
474 | | kResetDropAndDeleteUnsynced |
475 | | }; |
476 | | |
477 | | std::unique_ptr<Env> base_env_; |
478 | | FaultInjectionTestEnv* env_; |
479 | | std::string dbname_; |
480 | | shared_ptr<Cache> tiny_cache_; |
481 | | Options options_; |
482 | | DB* db_; |
483 | | |
484 | | FaultInjectionTest() |
485 | | : option_config_(kDefault), |
486 | | sync_use_wal_(false), |
487 | | sync_use_compact_(true), |
488 | | base_env_(nullptr), |
489 | | env_(NULL), |
490 | 8 | db_(NULL) { |
491 | 8 | } |
492 | | |
493 | 8 | ~FaultInjectionTest() { |
494 | 8 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
495 | 8 | rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); |
496 | 8 | } |
497 | | |
498 | 12 | bool ChangeOptions() { |
499 | 12 | option_config_++; |
500 | 12 | if (option_config_ >= kEnd) { |
501 | 2 | return false; |
502 | 10 | } else { |
503 | 10 | if (option_config_ == kMultiLevels) { |
504 | 2 | base_env_.reset(new MockEnv(Env::Default())); |
505 | 2 | } |
506 | 10 | return true; |
507 | 10 | } |
508 | 12 | } |
509 | | |
510 | | // Return the current option configuration. |
511 | 8 | Options CurrentOptions() { |
512 | 8 | sync_use_wal_ = false; |
513 | 8 | sync_use_compact_ = true; |
514 | 8 | Options options; |
515 | 8 | switch (option_config_) { |
516 | 0 | case kWalDir: |
517 | 0 | options.wal_dir = test::TmpDir(env_) + "/fault_test_wal"; |
518 | 0 | break; |
519 | 0 | case kDifferentDataDir: |
520 | 0 | options.db_paths.emplace_back(test::TmpDir(env_) + "/fault_test_data", |
521 | 0 | 1000000U); |
522 | 0 | break; |
523 | 0 | case kSyncWal: |
524 | 0 | sync_use_wal_ = true; |
525 | 0 | sync_use_compact_ = false; |
526 | 0 | break; |
527 | 0 | case kWalDirSyncWal: |
528 | 0 | options.wal_dir = test::TmpDir(env_) + "/fault_test_wal"; |
529 | 0 | sync_use_wal_ = true; |
530 | 0 | sync_use_compact_ = false; |
531 | 0 | break; |
532 | 0 | case kMultiLevels: |
533 | 0 | options.write_buffer_size = 64 * 1024; |
534 | 0 | options.target_file_size_base = 64 * 1024; |
535 | 0 | options.level0_file_num_compaction_trigger = 2; |
536 | 0 | options.level0_slowdown_writes_trigger = 2; |
537 | 0 | options.level0_stop_writes_trigger = 4; |
538 | 0 | options.max_bytes_for_level_base = 128 * 1024; |
539 | 0 | options.max_write_buffer_number = 2; |
540 | 0 | options.max_background_compactions = 8; |
541 | 0 | options.max_background_flushes = 8; |
542 | 0 | sync_use_wal_ = true; |
543 | 0 | sync_use_compact_ = false; |
544 | 0 | break; |
545 | 8 | default: |
546 | 8 | break; |
547 | 8 | } |
548 | 8 | return options; |
549 | 8 | } |
550 | | |
551 | 8 | Status NewDB() { |
552 | 8 | assert(db_ == NULL); |
553 | 8 | assert(tiny_cache_ == nullptr); |
554 | 8 | assert(env_ == NULL); |
555 | | |
556 | 8 | env_ = |
557 | 8 | new FaultInjectionTestEnv(base_env_ ? base_env_.get() : Env::Default()); |
558 | | |
559 | 8 | options_ = CurrentOptions(); |
560 | 8 | options_.env = env_; |
561 | 8 | options_.paranoid_checks = true; |
562 | | |
563 | 8 | BlockBasedTableOptions table_options; |
564 | 8 | tiny_cache_ = NewLRUCache(100); |
565 | 8 | table_options.block_cache = tiny_cache_; |
566 | 8 | options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); |
567 | | |
568 | 8 | dbname_ = test::TmpDir() + "/fault_test"; |
569 | | |
570 | 8 | EXPECT_OK(DestroyDB(dbname_, options_)); |
571 | | |
572 | 8 | options_.create_if_missing = true; |
573 | 8 | Status s = OpenDB(); |
574 | 8 | options_.create_if_missing = false; |
575 | 8 | return s; |
576 | 8 | } |
577 | | |
578 | 8 | void SetUp() override { |
579 | 8 | sequential_order_ = GetParam(); |
580 | 8 | ASSERT_OK(NewDB()); |
581 | 8 | } |
582 | | |
583 | 8 | void TearDown() override { |
584 | 8 | CloseDB(); |
585 | | |
586 | 8 | Status s = DestroyDB(dbname_, options_); |
587 | | |
588 | 8 | delete env_; |
589 | 8 | env_ = NULL; |
590 | | |
591 | 8 | tiny_cache_.reset(); |
592 | | |
593 | 8 | ASSERT_OK(s); |
594 | 8 | } |
595 | | |
596 | 290 | void Build(const WriteOptions& write_options, int start_idx, int num_vals) { |
597 | 290 | std::string key_space, value_space; |
598 | 290 | WriteBatch batch; |
599 | 327k | for (int i = start_idx; i < start_idx + num_vals; i++) { |
600 | 327k | Slice key = Key(i, &key_space); |
601 | 327k | batch.Clear(); |
602 | 327k | batch.Put(key, Value(i, &value_space)); |
603 | 327k | ASSERT_OK(db_->Write(write_options, &batch)); |
604 | 327k | } |
605 | 290 | } |
606 | | |
607 | 512k | Status ReadValue(int i, std::string* val) const { |
608 | 512k | std::string key_space, value_space; |
609 | 512k | Slice key = Key(i, &key_space); |
610 | 512k | Value(i, &value_space); |
611 | 512k | ReadOptions options; |
612 | 512k | return db_->Get(options, key, val); |
613 | 512k | } |
614 | | |
615 | | Status Verify(int start_idx, int num_vals, |
616 | 580 | ExpectedVerifResult expected) const { |
617 | 580 | std::string val; |
618 | 580 | std::string value_space; |
619 | 580 | Status s; |
620 | 513k | for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) { |
621 | 512k | Value(i, &value_space); |
622 | 512k | s = ReadValue(i, &val); |
623 | 512k | if (s.ok()) { |
624 | 512k | EXPECT_EQ(value_space, val); |
625 | 512k | } |
626 | 512k | if (expected == kValExpectFound) { |
627 | 423k | if (!s.ok()) { |
628 | 0 | fprintf(stderr, "Error when read %dth record (expect found): %s\n", i, |
629 | 0 | s.ToString().c_str()); |
630 | 0 | return s; |
631 | 0 | } |
632 | 89.7k | } else if (!s.ok() && !s.IsNotFound()) { |
633 | 0 | fprintf(stderr, "Error when read %dth record: %s\n", i, |
634 | 0 | s.ToString().c_str()); |
635 | 0 | return s; |
636 | 0 | } |
637 | 512k | } |
638 | 580 | return Status::OK(); |
639 | 580 | } |
640 | | |
641 | | // Return the ith key |
642 | 840k | Slice Key(int i, std::string* storage) const { |
643 | 840k | int num = i; |
644 | 840k | if (!sequential_order_) { |
645 | | // random transfer |
646 | 420k | int64_t m = 0x5bd1e995LL * num; |
647 | 420k | m ^= (m & 0xff) << 24; |
648 | 420k | num = static_cast<int>(m); |
649 | 420k | } |
650 | 840k | char buf[100]; |
651 | 840k | snprintf(buf, sizeof(buf), "%016d", num); |
652 | 840k | storage->assign(buf, strlen(buf)); |
653 | 840k | return Slice(*storage); |
654 | 840k | } |
655 | | |
656 | | // Return the value to associate with the specified key |
657 | 1.35M | Slice Value(int k, std::string* storage) const { |
658 | 1.35M | Random r(k); |
659 | 1.35M | return RandomString(&r, kValueSize, storage); |
660 | 1.35M | } |
661 | | |
662 | 610 | void CloseDB() { |
663 | 610 | delete db_; |
664 | 610 | db_ = NULL; |
665 | 610 | } |
666 | | |
667 | 308 | Status OpenDB() { |
668 | 308 | CloseDB(); |
669 | 308 | env_->ResetState(); |
670 | 308 | return DB::Open(options_, dbname_, &db_); |
671 | 308 | } |
672 | | |
673 | 144 | void DeleteAllData() { |
674 | 144 | Iterator* iter = db_->NewIterator(ReadOptions()); |
675 | 144 | WriteOptions options; |
676 | 250k | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
677 | 250k | ASSERT_OK(db_->Delete(WriteOptions(), iter->key())); |
678 | 250k | } |
679 | | |
680 | 144 | delete iter; |
681 | | |
682 | 144 | FlushOptions flush_options; |
683 | 144 | flush_options.wait = true; |
684 | 144 | ASSERT_OK(db_->Flush(flush_options)); |
685 | 144 | } |
686 | | |
687 | | // rnd cannot be null for kResetDropRandomUnsyncedData |
688 | 294 | void ResetDBState(ResetMethod reset_method, Random* rnd = nullptr) { |
689 | 294 | env_->AssertNoOpenFile(); |
690 | 294 | switch (reset_method) { |
691 | 110 | case kResetDropUnsyncedData: |
692 | 110 | ASSERT_OK(env_->DropUnsyncedFileData()); |
693 | 110 | break; |
694 | 36 | case kResetDropRandomUnsyncedData: |
695 | 36 | ASSERT_OK(env_->DropRandomUnsyncedFileData(rnd)); |
696 | 36 | break; |
697 | 72 | case kResetDeleteUnsyncedFiles: |
698 | 72 | ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync()); |
699 | 72 | break; |
700 | 76 | case kResetDropAndDeleteUnsynced: |
701 | 76 | ASSERT_OK(env_->DropUnsyncedFileData()); |
702 | 76 | ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync()); |
703 | 76 | break; |
704 | 0 | default: |
705 | 0 | assert(false); |
706 | 294 | } |
707 | 294 | } |
708 | | |
709 | 144 | void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) { |
710 | 144 | DeleteAllData(); |
711 | | |
712 | 144 | WriteOptions write_options; |
713 | 144 | write_options.sync = sync_use_wal_; |
714 | | |
715 | 144 | Build(write_options, 0, num_pre_sync); |
716 | 144 | if (sync_use_compact_) { |
717 | 144 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
718 | 144 | } |
719 | 144 | write_options.sync = false; |
720 | 144 | Build(write_options, num_pre_sync, num_post_sync); |
721 | 144 | } |
722 | | |
723 | | void PartialCompactTestReopenWithFault(ResetMethod reset_method, |
724 | | int num_pre_sync, int num_post_sync, |
725 | 144 | Random* rnd = nullptr) { |
726 | 144 | env_->SetFilesystemActive(false); |
727 | 144 | CloseDB(); |
728 | 144 | ResetDBState(reset_method, rnd); |
729 | 144 | ASSERT_OK(OpenDB()); |
730 | 144 | ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound)); |
731 | 144 | ASSERT_OK(Verify(num_pre_sync, num_post_sync, |
732 | 144 | FaultInjectionTest::kValExpectNoError)); |
733 | 144 | WaitCompactionFinish(); |
734 | 144 | ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound)); |
735 | 144 | ASSERT_OK(Verify(num_pre_sync, num_post_sync, |
736 | 144 | FaultInjectionTest::kValExpectNoError)); |
737 | 144 | } |
738 | | |
739 | 144 | void NoWriteTestPreFault() { |
740 | 144 | } |
741 | | |
742 | 148 | void NoWriteTestReopenWithFault(ResetMethod reset_method) { |
743 | 148 | CloseDB(); |
744 | 148 | ResetDBState(reset_method); |
745 | 148 | ASSERT_OK(OpenDB()); |
746 | 148 | } |
747 | | |
748 | 146 | void WaitCompactionFinish() { |
749 | 146 | ASSERT_OK(static_cast<DBImpl*>(db_)->TEST_WaitForCompact()); |
750 | 146 | ASSERT_OK(db_->Put(WriteOptions(), "", "")); |
751 | 146 | } |
752 | | }; |
753 | | |
754 | 2 | TEST_P(FaultInjectionTest, FaultTest) { |
755 | 12 | do { |
756 | 12 | Random rnd(301); |
757 | | |
758 | 48 | for (size_t idx = 0; idx < kNumIterations; idx++) { |
759 | 36 | int num_pre_sync = rnd.Uniform(kMaxNumValues); |
760 | 36 | int num_post_sync = rnd.Uniform(kMaxNumValues); |
761 | | |
762 | 36 | PartialCompactTestPreFault(num_pre_sync, num_post_sync); |
763 | 36 | PartialCompactTestReopenWithFault(kResetDropUnsyncedData, num_pre_sync, |
764 | 36 | num_post_sync); |
765 | 36 | NoWriteTestPreFault(); |
766 | 36 | NoWriteTestReopenWithFault(kResetDropUnsyncedData); |
767 | | |
768 | 36 | PartialCompactTestPreFault(num_pre_sync, num_post_sync); |
769 | 36 | PartialCompactTestReopenWithFault(kResetDropRandomUnsyncedData, |
770 | 36 | num_pre_sync, num_post_sync, &rnd); |
771 | 36 | NoWriteTestPreFault(); |
772 | 36 | NoWriteTestReopenWithFault(kResetDropUnsyncedData); |
773 | | |
774 | | // Setting a separate data path won't pass the test as we don't sync |
775 | | // it after creating new files, |
776 | 36 | PartialCompactTestPreFault(num_pre_sync, num_post_sync); |
777 | 36 | PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced, |
778 | 36 | num_pre_sync, num_post_sync); |
779 | 36 | NoWriteTestPreFault(); |
780 | 36 | NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); |
781 | | |
782 | 36 | PartialCompactTestPreFault(num_pre_sync, num_post_sync); |
783 | | // No new files created so we expect all values since no files will be |
784 | | // dropped. |
785 | 36 | PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync, |
786 | 36 | num_post_sync); |
787 | 36 | NoWriteTestPreFault(); |
788 | 36 | NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles); |
789 | 36 | } |
790 | 12 | } while (ChangeOptions()); |
791 | 2 | } |
792 | | |
793 | | // Previous log file is not fsynced if sync is forced after log rolling. |
794 | 2 | TEST_P(FaultInjectionTest, WriteOptionSyncTest) { |
795 | 2 | test::SleepingBackgroundTask sleeping_task_low; |
796 | 2 | env_->SetBackgroundThreads(1, Env::HIGH); |
797 | | // Block the job queue to prevent flush job from running. |
798 | 2 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, |
799 | 2 | Env::Priority::HIGH); |
800 | 2 | sleeping_task_low.WaitUntilSleeping(); |
801 | | |
802 | 2 | WriteOptions write_options; |
803 | 2 | write_options.sync = false; |
804 | | |
805 | 2 | std::string key_space, value_space; |
806 | 2 | ASSERT_OK( |
807 | 2 | db_->Put(write_options, Key(1, &key_space), Value(1, &value_space))); |
808 | 2 | FlushOptions flush_options; |
809 | 2 | flush_options.wait = false; |
810 | 2 | ASSERT_OK(db_->Flush(flush_options)); |
811 | 2 | write_options.sync = true; |
812 | 2 | ASSERT_OK( |
813 | 2 | db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); |
814 | | |
815 | 2 | env_->SetFilesystemActive(false); |
816 | 2 | NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); |
817 | 2 | sleeping_task_low.WakeUp(); |
818 | | |
819 | 2 | ASSERT_OK(OpenDB()); |
820 | 2 | std::string val; |
821 | 2 | Value(2, &value_space); |
822 | 2 | ASSERT_OK(ReadValue(2, &val)); |
823 | 2 | ASSERT_EQ(value_space, val); |
824 | | |
825 | 2 | Value(1, &value_space); |
826 | 2 | ASSERT_OK(ReadValue(1, &val)); |
827 | 2 | ASSERT_EQ(value_space, val); |
828 | | |
829 | 2 | sleeping_task_low.WaitUntilDone(); |
830 | 2 | } |
831 | | |
832 | 2 | TEST_P(FaultInjectionTest, UninstalledCompaction) { |
833 | 2 | options_.target_file_size_base = 32 * 1024; |
834 | 2 | options_.write_buffer_size = 100 << 10; // 100KB |
835 | 2 | options_.level0_file_num_compaction_trigger = 6; |
836 | 2 | options_.level0_stop_writes_trigger = 1 << 10; |
837 | 2 | options_.level0_slowdown_writes_trigger = 1 << 10; |
838 | 2 | options_.max_background_compactions = 1; |
839 | 2 | ASSERT_OK(OpenDB()); |
840 | | |
841 | 2 | if (!sequential_order_) { |
842 | 1 | rocksdb::SyncPoint::GetInstance()->LoadDependency({ |
843 | 1 | {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"}, |
844 | 1 | {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"}, |
845 | 1 | {"FaultInjectionTest::FaultTest:2", |
846 | 1 | "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, |
847 | 1 | }); |
848 | 1 | } |
849 | 2 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
850 | | |
851 | 2 | int kNumKeys = 1000; |
852 | 2 | Build(WriteOptions(), 0, kNumKeys); |
853 | 2 | FlushOptions flush_options; |
854 | 2 | flush_options.wait = true; |
855 | 2 | ASSERT_OK(db_->Flush(flush_options)); |
856 | 2 | ASSERT_OK(db_->Put(WriteOptions(), "", "")); |
857 | 2 | TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0"); |
858 | 2 | TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1"); |
859 | 2 | env_->SetFilesystemActive(false); |
860 | 2 | TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2"); |
861 | 2 | CloseDB(); |
862 | 2 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
863 | 2 | ResetDBState(kResetDropUnsyncedData); |
864 | | |
865 | 2 | std::atomic<bool> opened(false); |
866 | 2 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
867 | 2 | "DBImpl::Open:Opened", [&](void* arg) { opened.store(true); }); |
868 | 2 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
869 | 2 | "DBImpl::BGWorkCompaction", |
870 | 1 | [&](void* arg) { ASSERT_TRUE(opened.load()); }); |
871 | 2 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
872 | 2 | ASSERT_OK(OpenDB()); |
873 | 2 | ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound)); |
874 | 2 | WaitCompactionFinish(); |
875 | 2 | ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound)); |
876 | 2 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
877 | 2 | rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); |
878 | 2 | } |
879 | | |
880 | 2 | TEST_P(FaultInjectionTest, ManualLogSyncTest) { |
881 | 2 | test::SleepingBackgroundTask sleeping_task_low; |
882 | 2 | env_->SetBackgroundThreads(1, Env::HIGH); |
883 | | // Block the job queue to prevent flush job from running. |
884 | 2 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, |
885 | 2 | Env::Priority::HIGH); |
886 | 2 | sleeping_task_low.WaitUntilSleeping(); |
887 | | |
888 | 2 | WriteOptions write_options; |
889 | 2 | write_options.sync = false; |
890 | | |
891 | 2 | std::string key_space, value_space; |
892 | 2 | ASSERT_OK( |
893 | 2 | db_->Put(write_options, Key(1, &key_space), Value(1, &value_space))); |
894 | 2 | FlushOptions flush_options; |
895 | 2 | flush_options.wait = false; |
896 | 2 | ASSERT_OK(db_->Flush(flush_options)); |
897 | 2 | ASSERT_OK( |
898 | 2 | db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); |
899 | 2 | ASSERT_OK(db_->SyncWAL()); |
900 | | |
901 | 2 | env_->SetFilesystemActive(false); |
902 | 2 | NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); |
903 | 2 | sleeping_task_low.WakeUp(); |
904 | | |
905 | 2 | ASSERT_OK(OpenDB()); |
906 | 2 | std::string val; |
907 | 2 | Value(2, &value_space); |
908 | 2 | ASSERT_OK(ReadValue(2, &val)); |
909 | 2 | ASSERT_EQ(value_space, val); |
910 | | |
911 | 2 | Value(1, &value_space); |
912 | 2 | ASSERT_OK(ReadValue(1, &val)); |
913 | 2 | ASSERT_EQ(value_space, val); |
914 | 2 | sleeping_task_low.WaitUntilDone(); |
915 | 2 | } |
916 | | |
917 | | INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool()); |
918 | | |
919 | | } // namespace rocksdb |
920 | | |
921 | 13.2k | int main(int argc, char** argv) { |
922 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
923 | 13.2k | return RUN_ALL_TESTS(); |
924 | 13.2k | } |