YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/fault_injection_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright 2014 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
// This test uses a custom Env to keep track of the state of a filesystem as of
25
// the last "sync". It then checks for data loss errors by purposely dropping
26
// file data (or entire files) not protected by a "sync".
27
28
#include <map>
29
#include <set>
30
31
#include <gtest/gtest.h>
32
33
#include "yb/rocksdb/cache.h"
34
#include "yb/rocksdb/db.h"
35
#include "yb/rocksdb/db/db_impl.h"
36
#include "yb/rocksdb/env.h"
37
#include "yb/rocksdb/table.h"
38
#include "yb/rocksdb/util/mock_env.h"
39
#include "yb/rocksdb/util/mutexlock.h"
40
#include "yb/rocksdb/util/sync_point.h"
41
#include "yb/rocksdb/util/testharness.h"
42
#include "yb/rocksdb/util/testutil.h"
43
44
#include "yb/util/test_macros.h"
45
46
namespace rocksdb {
47
48
static const int kValueSize = 1000;
49
static const int kMaxNumValues = 2000;
50
static const size_t kNumIterations = 3;
51
52
class TestWritableFile;
53
class FaultInjectionTestEnv;
54
55
namespace {
56
57
// Assume a filename, and not a directory name like "/foo/bar/"
58
10.4k
static std::string GetDirName(const std::string filename) {
59
10.4k
  size_t found = filename.find_last_of("/\\");
60
10.4k
  if (found == std::string::npos) {
61
0
    return "";
62
10.4k
  } else {
63
10.4k
    return filename.substr(0, found);
64
10.4k
  }
65
10.4k
}
66
67
// Trim the tailing "/" in the end of `str`
68
308
static std::string TrimDirname(const std::string& str) {
69
308
  size_t found = str.find_last_not_of("/");
70
308
  if (found == std::string::npos) {
71
0
    return str;
72
0
  }
73
308
  return str.substr(0, found + 1);
74
308
}
75
76
// Return pair <parent directory name, file name> of a full path.
77
static std::pair<std::string, std::string> GetDirAndName(
78
10.2k
    const std::string& name) {
79
10.2k
  std::string dirname = GetDirName(name);
80
10.2k
  std::string fname = name.substr(dirname.size() + 1);
81
10.2k
  return std::make_pair(dirname, fname);
82
10.2k
}
83
84
// A basic file truncation function suitable for this test.
85
218
Status Truncate(Env* env, const std::string& filename, uint64_t length) {
86
218
  unique_ptr<SequentialFile> orig_file;
87
218
  const EnvOptions options;
88
218
  Status s = env->NewSequentialFile(filename, &orig_file, options);
89
218
  if (!s.ok()) {
90
0
    fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(),
91
0
            s.ToString().c_str());
92
0
    return s;
93
0
  }
94
95
218
  std::unique_ptr<uint8_t[]> scratch(new uint8_t[length]);
96
218
  rocksdb::Slice result;
97
218
  s = orig_file->Read(length, &result, scratch.get());
98
#ifdef OS_WIN
99
  orig_file.reset();
100
#endif
101
218
  if (s.ok()) {
102
218
    std::string tmp_name = GetDirName(filename) + "/truncate.tmp";
103
218
    unique_ptr<WritableFile> tmp_file;
104
218
    s = env->NewWritableFile(tmp_name, &tmp_file, options);
105
218
    if (s.ok()) {
106
218
      s = tmp_file->Append(result);
107
218
      if (s.ok()) {
108
218
        s = env->RenameFile(tmp_name, filename);
109
0
      } else {
110
0
        fprintf(stderr, "Cannot rename file %s to %s: %s\n", tmp_name.c_str(),
111
0
                filename.c_str(), s.ToString().c_str());
112
0
        RETURN_NOT_OK(env->DeleteFile(tmp_name));
113
0
      }
114
218
    }
115
218
  }
116
218
  if (!s.ok()) {
117
0
    fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(),
118
0
            s.ToString().c_str());
119
0
  }
120
121
218
  return s;
122
218
}
123
124
struct FileState {
125
  std::string filename_;
126
  ssize_t pos_;
127
  ssize_t pos_at_last_sync_;
128
  ssize_t pos_at_last_flush_;
129
130
  explicit FileState(const std::string& filename)
131
      : filename_(filename),
132
        pos_(-1),
133
        pos_at_last_sync_(-1),
134
2.78k
        pos_at_last_flush_(-1) { }
135
136
3.09k
  FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {}
137
138
1.26k
  bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; }
139
140
  Status DropUnsyncedData(Env* env) const;
141
142
  Status DropRandomUnsyncedData(Env* env, Random* rand) const;
143
};
144
145
}  // anonymous namespace
146
147
// A wrapper around WritableFileWriter* file
148
// is written to or sync'ed.
149
class TestWritableFile : public WritableFile {
150
 public:
151
  explicit TestWritableFile(const std::string& fname,
152
                            unique_ptr<WritableFile>&& f,
153
                            FaultInjectionTestEnv* env);
154
  virtual ~TestWritableFile();
155
  Status Append(const Slice& data) override;
156
2.15k
  Status Truncate(uint64_t size) override { return target_->Truncate(size); }
157
  Status Close() override;
158
  Status Flush() override;
159
  Status Sync() override;
160
8
  bool IsSyncThreadSafe() const override { return true; }
161
162
 private:
163
  FileState state_;
164
  unique_ptr<WritableFile> target_;
165
  bool writable_file_opened_;
166
  FaultInjectionTestEnv* env_;
167
};
168
169
class TestDirectory : public Directory {
170
 public:
171
  explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname,
172
                         Directory* dir)
173
308
      : env_(env), dirname_(dirname), dir_(dir) {}
174
308
  ~TestDirectory() {}
175
176
  Status Fsync() override;
177
178
 private:
179
  FaultInjectionTestEnv* env_;
180
  std::string dirname_;
181
  unique_ptr<Directory> dir_;
182
};
183
184
class FaultInjectionTestEnv : public EnvWrapper {
185
 public:
186
  explicit FaultInjectionTestEnv(Env* base)
187
      : EnvWrapper(base),
188
8
        filesystem_active_(true) {}
189
8
  virtual ~FaultInjectionTestEnv() { }
190
191
  Status NewDirectory(const std::string& name,
192
308
                      unique_ptr<Directory>* result) override {
193
308
    unique_ptr<Directory> r;
194
308
    Status s = target()->NewDirectory(name, &r);
195
308
    EXPECT_OK(s);
196
308
    if (!s.ok()) {
197
0
      return s;
198
0
    }
199
308
    result->reset(new TestDirectory(this, TrimDirname(name), r.release()));
200
308
    return Status::OK();
201
308
  }
202
203
  Status NewWritableFile(const std::string& fname,
204
                         unique_ptr<WritableFile>* result,
205
2.78k
                         const EnvOptions& soptions) override {
206
2.78k
    if (!IsFilesystemActive()) {
207
0
      return STATUS(Corruption, "Not Active");
208
0
    }
209
    // Not allow overwriting files
210
2.78k
    Status s = target()->FileExists(fname);
211
2.78k
    if (s.ok()) {
212
0
      return STATUS(Corruption, "File already exists.");
213
2.78k
    } else if (!s.IsNotFound()) {
214
0
      assert(s.IsIOError());
215
0
      return s;
216
0
    }
217
2.78k
    s = target()->NewWritableFile(fname, result, soptions);
218
2.78k
    if (s.ok()) {
219
2.78k
      result->reset(new TestWritableFile(fname, std::move(*result), this));
220
      // WritableFileWriter* file is opened
221
      // again then it will be truncated - so forget our saved state.
222
2.78k
      UntrackFile(fname);
223
2.78k
      MutexLock l(&mutex_);
224
2.78k
      open_files_.insert(fname);
225
2.78k
      auto dir_and_name = GetDirAndName(fname);
226
2.78k
      auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
227
2.78k
      list.insert(dir_and_name.second);
228
2.78k
    }
229
2.78k
    return s;
230
2.78k
  }
231
232
2.85k
  Status DeleteFile(const std::string& f) override {
233
2.85k
    if (!IsFilesystemActive()) {
234
32
      return STATUS(Corruption, "Not Active");
235
32
    }
236
2.81k
    Status s = EnvWrapper::DeleteFile(f);
237
2.81k
    if (!s.ok()) {
238
0
      fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(),
239
0
              s.ToString().c_str());
240
0
    }
241
2.81k
    EXPECT_OK(s);
242
2.81k
    if (s.ok()) {
243
2.81k
      UntrackFile(f);
244
2.81k
    }
245
2.81k
    return s;
246
2.81k
  }
247
248
  virtual Status RenameFile(const std::string& s,
249
934
                            const std::string& t) override {
250
934
    if (!IsFilesystemActive()) {
251
0
      return STATUS(Corruption, "Not Active");
252
0
    }
253
934
    Status ret = EnvWrapper::RenameFile(s, t);
254
255
934
    if (ret.ok()) {
256
934
      MutexLock l(&mutex_);
257
934
      if (db_file_state_.find(s) != db_file_state_.end()) {
258
626
        db_file_state_[t] = db_file_state_[s];
259
626
        db_file_state_.erase(s);
260
626
      }
261
262
934
      auto sdn = GetDirAndName(s);
263
934
      auto tdn = GetDirAndName(t);
264
934
      if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
265
626
        auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
266
626
        assert(tlist.find(tdn.second) == tlist.end());
267
626
        tlist.insert(tdn.second);
268
626
      }
269
934
    }
270
271
934
    return ret;
272
934
  }
273
274
2.78k
  void WritableFileClosed(const FileState& state) {
275
2.78k
    MutexLock l(&mutex_);
276
2.78k
    if (open_files_.find(state.filename_) != open_files_.end()) {
277
2.47k
      db_file_state_[state.filename_] = state;
278
2.47k
      open_files_.erase(state.filename_);
279
2.47k
    }
280
2.78k
  }
281
282
  // For every file that is not fully synced, make a call to `func` with
283
  // InMemoryFileState of the file as the parameter.
284
222
  Status DropFileData(std::function<Status(Env*, FileState)> func) {
285
222
    Status s;
286
222
    MutexLock l(&mutex_);
287
222
    for (std::map<std::string, FileState>::const_iterator it =
288
222
             db_file_state_.begin();
289
1.49k
         s.ok() && it != db_file_state_.end(); ++it) {
290
1.26k
      const FileState& state = it->second;
291
1.26k
      if (!state.IsFullySynced()) {
292
218
        s = func(target(), state);
293
218
      }
294
1.26k
    }
295
222
    return s;
296
222
  }
297
298
186
  Status DropUnsyncedFileData() {
299
182
    return DropFileData([&](Env* env, const FileState& state) {
300
182
      return state.DropUnsyncedData(env);
301
182
    });
302
186
  }
303
304
36
  Status DropRandomUnsyncedFileData(Random* rnd) {
305
36
    return DropFileData([&](Env* env, const FileState& state) {
306
36
      return state.DropRandomUnsyncedData(env, rnd);
307
36
    });
308
36
  }
309
310
148
  Status DeleteFilesCreatedAfterLastDirSync() {
311
    // Because DeleteFile access this container make a copy to avoid deadlock
312
148
    std::map<std::string, std::set<std::string>> map_copy;
313
148
    {
314
148
      MutexLock l(&mutex_);
315
148
      map_copy.insert(dir_to_new_files_since_last_sync_.begin(),
316
148
                      dir_to_new_files_since_last_sync_.end());
317
148
    }
318
319
288
    for (auto& pair : map_copy) {
320
72
      for (std::string name : pair.second) {
321
72
        Status s = DeleteFile(pair.first + "/" + name);
322
72
        if (!s.ok()) {
323
0
          return s;
324
0
        }
325
72
      }
326
288
    }
327
148
    return Status::OK();
328
148
  }
329
308
  void ResetState() {
330
308
    MutexLock l(&mutex_);
331
308
    db_file_state_.clear();
332
308
    dir_to_new_files_since_last_sync_.clear();
333
308
    SetFilesystemActiveNoLock(true);
334
308
  }
335
336
5.59k
  void UntrackFile(const std::string& f) {
337
5.59k
    MutexLock l(&mutex_);
338
5.59k
    auto dir_and_name = GetDirAndName(f);
339
5.59k
    dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
340
5.59k
        dir_and_name.second);
341
5.59k
    db_file_state_.erase(f);
342
5.59k
    open_files_.erase(f);
343
5.59k
  }
344
345
776
  void SyncDir(const std::string& dirname) {
346
776
    MutexLock l(&mutex_);
347
776
    dir_to_new_files_since_last_sync_.erase(dirname);
348
776
  }
349
350
  // Setting the filesystem to inactive is the test equivalent to simulating a
351
  // system reset. Setting to inactive will freeze our saved filesystem state so
352
  // that it will stop being recorded. It can then be reset back to the state at
353
  // the time of the reset.
354
1.43M
  bool IsFilesystemActive() {
355
1.43M
    MutexLock l(&mutex_);
356
1.43M
    return filesystem_active_;
357
1.43M
  }
358
458
  void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; }
359
150
  void SetFilesystemActive(bool active) {
360
150
    MutexLock l(&mutex_);
361
150
    SetFilesystemActiveNoLock(active);
362
150
  }
363
294
  void AssertNoOpenFile() { ASSERT_TRUE(open_files_.empty()); }
364
365
 private:
366
  port::Mutex mutex_;
367
  std::map<std::string, FileState> db_file_state_;
368
  std::set<std::string> open_files_;
369
  std::unordered_map<std::string, std::set<std::string>>
370
      dir_to_new_files_since_last_sync_;
371
  bool filesystem_active_;  // Record flushes, syncs, writes
372
};
373
374
182
Status FileState::DropUnsyncedData(Env* env) const {
375
182
  ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_;
376
182
  return Truncate(env, filename_, sync_pos);
377
182
}
378
379
36
Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const {
380
36
  ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_;
381
36
  assert(pos_ >= sync_pos);
382
36
  int range = static_cast<int>(pos_ - sync_pos);
383
36
  uint64_t truncated_size =
384
36
      static_cast<uint64_t>(sync_pos) + rand->Uniform(range);
385
36
  return Truncate(env, filename_, truncated_size);
386
36
}
387
388
776
Status TestDirectory::Fsync() {
389
776
  env_->SyncDir(dirname_);
390
776
  return dir_->Fsync();
391
776
}
392
393
TestWritableFile::TestWritableFile(const std::string& fname,
394
                                   unique_ptr<WritableFile>&& f,
395
                                   FaultInjectionTestEnv* env)
396
      : state_(fname),
397
        target_(std::move(f)),
398
        writable_file_opened_(true),
399
2.78k
        env_(env) {
400
2.78k
  assert(target_ != nullptr);
401
2.78k
  state_.pos_ = 0;
402
2.78k
}
403
404
2.78k
TestWritableFile::~TestWritableFile() {
405
2.78k
  if (writable_file_opened_) {
406
318
    CHECK_OK(Close());
407
318
  }
408
2.78k
}
409
410
711k
Status TestWritableFile::Append(const Slice& data) {
411
711k
  if (!env_->IsFilesystemActive()) {
412
2
    return STATUS(Corruption, "Not Active");
413
2
  }
414
711k
  Status s = target_->Append(data);
415
711k
  if (s.ok()) {
416
711k
    state_.pos_ += data.size();
417
711k
  }
418
711k
  return s;
419
711k
}
420
421
2.78k
Status TestWritableFile::Close() {
422
2.78k
  writable_file_opened_ = false;
423
2.78k
  Status s = target_->Close();
424
2.78k
  if (s.ok()) {
425
2.78k
    env_->WritableFileClosed(state_);
426
2.78k
  }
427
2.78k
  return s;
428
2.78k
}
429
430
711k
Status TestWritableFile::Flush() {
431
711k
  Status s = target_->Flush();
432
711k
  if (s.ok() && env_->IsFilesystemActive()) {
433
711k
    state_.pos_at_last_flush_ = state_.pos_;
434
711k
  }
435
711k
  return s;
436
711k
}
437
438
2.62k
Status TestWritableFile::Sync() {
439
2.62k
  if (!env_->IsFilesystemActive()) {
440
0
    return Status::OK();
441
0
  }
442
  // No need to actual sync.
443
2.62k
  state_.pos_at_last_sync_ = state_.pos_;
444
2.62k
  return Status::OK();
445
2.62k
}
446
447
class FaultInjectionTest : public RocksDBTest,
448
                           public testing::WithParamInterface<bool> {
449
 protected:
450
  enum OptionConfig {
451
    kDefault,
452
    kDifferentDataDir,
453
    kWalDir,
454
    kSyncWal,
455
    kWalDirSyncWal,
456
    kMultiLevels,
457
    kEnd,
458
  };
459
  int option_config_;
460
  // When need to make sure data is persistent, sync WAL
461
  bool sync_use_wal_;
462
  // When need to make sure data is persistent, call DB::CompactRange()
463
  bool sync_use_compact_;
464
465
  bool sequential_order_;
466
467
 protected:
468
 public:
469
  enum ExpectedVerifResult { kValExpectFound, kValExpectNoError };
470
  enum ResetMethod {
471
    kResetDropUnsyncedData,
472
    kResetDropRandomUnsyncedData,
473
    kResetDeleteUnsyncedFiles,
474
    kResetDropAndDeleteUnsynced
475
  };
476
477
  std::unique_ptr<Env> base_env_;
478
  FaultInjectionTestEnv* env_;
479
  std::string dbname_;
480
  shared_ptr<Cache> tiny_cache_;
481
  Options options_;
482
  DB* db_;
483
484
  FaultInjectionTest()
485
      : option_config_(kDefault),
486
        sync_use_wal_(false),
487
        sync_use_compact_(true),
488
        base_env_(nullptr),
489
        env_(NULL),
490
8
        db_(NULL) {
491
8
  }
492
493
8
  ~FaultInjectionTest() {
494
8
    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
495
8
    rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
496
8
  }
497
498
12
  bool ChangeOptions() {
499
12
    option_config_++;
500
12
    if (option_config_ >= kEnd) {
501
2
      return false;
502
10
    } else {
503
10
      if (option_config_ == kMultiLevels) {
504
2
        base_env_.reset(new MockEnv(Env::Default()));
505
2
      }
506
10
      return true;
507
10
    }
508
12
  }
509
510
  // Return the current option configuration.
511
8
  Options CurrentOptions() {
512
8
    sync_use_wal_ = false;
513
8
    sync_use_compact_ = true;
514
8
    Options options;
515
8
    switch (option_config_) {
516
0
      case kWalDir:
517
0
        options.wal_dir = test::TmpDir(env_) + "/fault_test_wal";
518
0
        break;
519
0
      case kDifferentDataDir:
520
0
        options.db_paths.emplace_back(test::TmpDir(env_) + "/fault_test_data",
521
0
                                      1000000U);
522
0
        break;
523
0
      case kSyncWal:
524
0
        sync_use_wal_ = true;
525
0
        sync_use_compact_ = false;
526
0
        break;
527
0
      case kWalDirSyncWal:
528
0
        options.wal_dir = test::TmpDir(env_) + "/fault_test_wal";
529
0
        sync_use_wal_ = true;
530
0
        sync_use_compact_ = false;
531
0
        break;
532
0
      case kMultiLevels:
533
0
        options.write_buffer_size = 64 * 1024;
534
0
        options.target_file_size_base = 64 * 1024;
535
0
        options.level0_file_num_compaction_trigger = 2;
536
0
        options.level0_slowdown_writes_trigger = 2;
537
0
        options.level0_stop_writes_trigger = 4;
538
0
        options.max_bytes_for_level_base = 128 * 1024;
539
0
        options.max_write_buffer_number = 2;
540
0
        options.max_background_compactions = 8;
541
0
        options.max_background_flushes = 8;
542
0
        sync_use_wal_ = true;
543
0
        sync_use_compact_ = false;
544
0
        break;
545
8
      default:
546
8
        break;
547
8
    }
548
8
    return options;
549
8
  }
550
551
8
  Status NewDB() {
552
8
    assert(db_ == NULL);
553
8
    assert(tiny_cache_ == nullptr);
554
8
    assert(env_ == NULL);
555
556
8
    env_ =
557
8
        new FaultInjectionTestEnv(base_env_ ? base_env_.get() : Env::Default());
558
559
8
    options_ = CurrentOptions();
560
8
    options_.env = env_;
561
8
    options_.paranoid_checks = true;
562
563
8
    BlockBasedTableOptions table_options;
564
8
    tiny_cache_ = NewLRUCache(100);
565
8
    table_options.block_cache = tiny_cache_;
566
8
    options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
567
568
8
    dbname_ = test::TmpDir() + "/fault_test";
569
570
8
    EXPECT_OK(DestroyDB(dbname_, options_));
571
572
8
    options_.create_if_missing = true;
573
8
    Status s = OpenDB();
574
8
    options_.create_if_missing = false;
575
8
    return s;
576
8
  }
577
578
8
  void SetUp() override {
579
8
    sequential_order_ = GetParam();
580
8
    ASSERT_OK(NewDB());
581
8
  }
582
583
8
  void TearDown() override {
584
8
    CloseDB();
585
586
8
    Status s = DestroyDB(dbname_, options_);
587
588
8
    delete env_;
589
8
    env_ = NULL;
590
591
8
    tiny_cache_.reset();
592
593
8
    ASSERT_OK(s);
594
8
  }
595
596
290
  void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
597
290
    std::string key_space, value_space;
598
290
    WriteBatch batch;
599
327k
    for (int i = start_idx; i < start_idx + num_vals; i++) {
600
327k
      Slice key = Key(i, &key_space);
601
327k
      batch.Clear();
602
327k
      batch.Put(key, Value(i, &value_space));
603
327k
      ASSERT_OK(db_->Write(write_options, &batch));
604
327k
    }
605
290
  }
606
607
512k
  Status ReadValue(int i, std::string* val) const {
608
512k
    std::string key_space, value_space;
609
512k
    Slice key = Key(i, &key_space);
610
512k
    Value(i, &value_space);
611
512k
    ReadOptions options;
612
512k
    return db_->Get(options, key, val);
613
512k
  }
614
615
  Status Verify(int start_idx, int num_vals,
616
580
                ExpectedVerifResult expected) const {
617
580
    std::string val;
618
580
    std::string value_space;
619
580
    Status s;
620
513k
    for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
621
512k
      Value(i, &value_space);
622
512k
      s = ReadValue(i, &val);
623
512k
      if (s.ok()) {
624
512k
        EXPECT_EQ(value_space, val);
625
512k
      }
626
512k
      if (expected == kValExpectFound) {
627
423k
        if (!s.ok()) {
628
0
          fprintf(stderr, "Error when read %dth record (expect found): %s\n", i,
629
0
                  s.ToString().c_str());
630
0
          return s;
631
0
        }
632
89.7k
      } else if (!s.ok() && !s.IsNotFound()) {
633
0
        fprintf(stderr, "Error when read %dth record: %s\n", i,
634
0
                s.ToString().c_str());
635
0
        return s;
636
0
      }
637
512k
    }
638
580
    return Status::OK();
639
580
  }
640
641
  // Return the ith key
642
840k
  Slice Key(int i, std::string* storage) const {
643
840k
    int num = i;
644
840k
    if (!sequential_order_) {
645
      // random transfer
646
420k
      int64_t m = 0x5bd1e995LL * num;
647
420k
      m ^= (m & 0xff) << 24;
648
420k
      num = static_cast<int>(m);
649
420k
    }
650
840k
    char buf[100];
651
840k
    snprintf(buf, sizeof(buf), "%016d", num);
652
840k
    storage->assign(buf, strlen(buf));
653
840k
    return Slice(*storage);
654
840k
  }
655
656
  // Return the value to associate with the specified key
657
1.35M
  Slice Value(int k, std::string* storage) const {
658
1.35M
    Random r(k);
659
1.35M
    return RandomString(&r, kValueSize, storage);
660
1.35M
  }
661
662
610
  void CloseDB() {
663
610
    delete db_;
664
610
    db_ = NULL;
665
610
  }
666
667
308
  Status OpenDB() {
668
308
    CloseDB();
669
308
    env_->ResetState();
670
308
    return DB::Open(options_, dbname_, &db_);
671
308
  }
672
673
144
  void DeleteAllData() {
674
144
    Iterator* iter = db_->NewIterator(ReadOptions());
675
144
    WriteOptions options;
676
250k
    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
677
250k
      ASSERT_OK(db_->Delete(WriteOptions(), iter->key()));
678
250k
    }
679
680
144
    delete iter;
681
682
144
    FlushOptions flush_options;
683
144
    flush_options.wait = true;
684
144
    ASSERT_OK(db_->Flush(flush_options));
685
144
  }
686
687
  // rnd cannot be null for kResetDropRandomUnsyncedData
688
294
  void ResetDBState(ResetMethod reset_method, Random* rnd = nullptr) {
689
294
    env_->AssertNoOpenFile();
690
294
    switch (reset_method) {
691
110
      case kResetDropUnsyncedData:
692
110
        ASSERT_OK(env_->DropUnsyncedFileData());
693
110
        break;
694
36
      case kResetDropRandomUnsyncedData:
695
36
        ASSERT_OK(env_->DropRandomUnsyncedFileData(rnd));
696
36
        break;
697
72
      case kResetDeleteUnsyncedFiles:
698
72
        ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
699
72
        break;
700
76
      case kResetDropAndDeleteUnsynced:
701
76
        ASSERT_OK(env_->DropUnsyncedFileData());
702
76
        ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync());
703
76
        break;
704
0
      default:
705
0
        assert(false);
706
294
    }
707
294
  }
708
709
144
  void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) {
710
144
    DeleteAllData();
711
712
144
    WriteOptions write_options;
713
144
    write_options.sync = sync_use_wal_;
714
715
144
    Build(write_options, 0, num_pre_sync);
716
144
    if (sync_use_compact_) {
717
144
      ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
718
144
    }
719
144
    write_options.sync = false;
720
144
    Build(write_options, num_pre_sync, num_post_sync);
721
144
  }
722
723
  void PartialCompactTestReopenWithFault(ResetMethod reset_method,
724
                                         int num_pre_sync, int num_post_sync,
725
144
                                         Random* rnd = nullptr) {
726
144
    env_->SetFilesystemActive(false);
727
144
    CloseDB();
728
144
    ResetDBState(reset_method, rnd);
729
144
    ASSERT_OK(OpenDB());
730
144
    ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
731
144
    ASSERT_OK(Verify(num_pre_sync, num_post_sync,
732
144
                     FaultInjectionTest::kValExpectNoError));
733
144
    WaitCompactionFinish();
734
144
    ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound));
735
144
    ASSERT_OK(Verify(num_pre_sync, num_post_sync,
736
144
                     FaultInjectionTest::kValExpectNoError));
737
144
  }
738
739
144
  void NoWriteTestPreFault() {
740
144
  }
741
742
148
  void NoWriteTestReopenWithFault(ResetMethod reset_method) {
743
148
    CloseDB();
744
148
    ResetDBState(reset_method);
745
148
    ASSERT_OK(OpenDB());
746
148
  }
747
748
146
  void WaitCompactionFinish() {
749
146
    ASSERT_OK(static_cast<DBImpl*>(db_)->TEST_WaitForCompact());
750
146
    ASSERT_OK(db_->Put(WriteOptions(), "", ""));
751
146
  }
752
};
753
754
2
TEST_P(FaultInjectionTest, FaultTest) {
755
12
  do {
756
12
    Random rnd(301);
757
758
48
    for (size_t idx = 0; idx < kNumIterations; idx++) {
759
36
      int num_pre_sync = rnd.Uniform(kMaxNumValues);
760
36
      int num_post_sync = rnd.Uniform(kMaxNumValues);
761
762
36
      PartialCompactTestPreFault(num_pre_sync, num_post_sync);
763
36
      PartialCompactTestReopenWithFault(kResetDropUnsyncedData, num_pre_sync,
764
36
                                        num_post_sync);
765
36
      NoWriteTestPreFault();
766
36
      NoWriteTestReopenWithFault(kResetDropUnsyncedData);
767
768
36
      PartialCompactTestPreFault(num_pre_sync, num_post_sync);
769
36
      PartialCompactTestReopenWithFault(kResetDropRandomUnsyncedData,
770
36
                                        num_pre_sync, num_post_sync, &rnd);
771
36
      NoWriteTestPreFault();
772
36
      NoWriteTestReopenWithFault(kResetDropUnsyncedData);
773
774
      // Setting a separate data path won't pass the test as we don't sync
775
      // it after creating new files,
776
36
      PartialCompactTestPreFault(num_pre_sync, num_post_sync);
777
36
      PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced,
778
36
                                        num_pre_sync, num_post_sync);
779
36
      NoWriteTestPreFault();
780
36
      NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
781
782
36
      PartialCompactTestPreFault(num_pre_sync, num_post_sync);
783
      // No new files created so we expect all values since no files will be
784
      // dropped.
785
36
      PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync,
786
36
                                        num_post_sync);
787
36
      NoWriteTestPreFault();
788
36
      NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles);
789
36
    }
790
12
  } while (ChangeOptions());
791
2
}
792
793
// Previous log file is not fsynced if sync is forced after log rolling.
794
2
TEST_P(FaultInjectionTest, WriteOptionSyncTest) {
795
2
  test::SleepingBackgroundTask sleeping_task_low;
796
2
  env_->SetBackgroundThreads(1, Env::HIGH);
797
  // Block the job queue to prevent flush job from running.
798
2
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
799
2
                 Env::Priority::HIGH);
800
2
  sleeping_task_low.WaitUntilSleeping();
801
802
2
  WriteOptions write_options;
803
2
  write_options.sync = false;
804
805
2
  std::string key_space, value_space;
806
2
  ASSERT_OK(
807
2
      db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
808
2
  FlushOptions flush_options;
809
2
  flush_options.wait = false;
810
2
  ASSERT_OK(db_->Flush(flush_options));
811
2
  write_options.sync = true;
812
2
  ASSERT_OK(
813
2
      db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
814
815
2
  env_->SetFilesystemActive(false);
816
2
  NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
817
2
  sleeping_task_low.WakeUp();
818
819
2
  ASSERT_OK(OpenDB());
820
2
  std::string val;
821
2
  Value(2, &value_space);
822
2
  ASSERT_OK(ReadValue(2, &val));
823
2
  ASSERT_EQ(value_space, val);
824
825
2
  Value(1, &value_space);
826
2
  ASSERT_OK(ReadValue(1, &val));
827
2
  ASSERT_EQ(value_space, val);
828
829
2
  sleeping_task_low.WaitUntilDone();
830
2
}
831
832
2
TEST_P(FaultInjectionTest, UninstalledCompaction) {
833
2
  options_.target_file_size_base = 32 * 1024;
834
2
  options_.write_buffer_size = 100 << 10;  // 100KB
835
2
  options_.level0_file_num_compaction_trigger = 6;
836
2
  options_.level0_stop_writes_trigger = 1 << 10;
837
2
  options_.level0_slowdown_writes_trigger = 1 << 10;
838
2
  options_.max_background_compactions = 1;
839
2
  ASSERT_OK(OpenDB());
840
841
2
  if (!sequential_order_) {
842
1
    rocksdb::SyncPoint::GetInstance()->LoadDependency({
843
1
        {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
844
1
        {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
845
1
        {"FaultInjectionTest::FaultTest:2",
846
1
         "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
847
1
    });
848
1
  }
849
2
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
850
851
2
  int kNumKeys = 1000;
852
2
  Build(WriteOptions(), 0, kNumKeys);
853
2
  FlushOptions flush_options;
854
2
  flush_options.wait = true;
855
2
  ASSERT_OK(db_->Flush(flush_options));
856
2
  ASSERT_OK(db_->Put(WriteOptions(), "", ""));
857
2
  TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
858
2
  TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
859
2
  env_->SetFilesystemActive(false);
860
2
  TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
861
2
  CloseDB();
862
2
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
863
2
  ResetDBState(kResetDropUnsyncedData);
864
865
2
  std::atomic<bool> opened(false);
866
2
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
867
2
      "DBImpl::Open:Opened", [&](void* arg) { opened.store(true); });
868
2
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
869
2
      "DBImpl::BGWorkCompaction",
870
1
      [&](void* arg) { ASSERT_TRUE(opened.load()); });
871
2
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
872
2
  ASSERT_OK(OpenDB());
873
2
  ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
874
2
  WaitCompactionFinish();
875
2
  ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound));
876
2
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
877
2
  rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
878
2
}
879
880
2
TEST_P(FaultInjectionTest, ManualLogSyncTest) {
881
2
  test::SleepingBackgroundTask sleeping_task_low;
882
2
  env_->SetBackgroundThreads(1, Env::HIGH);
883
  // Block the job queue to prevent flush job from running.
884
2
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
885
2
                 Env::Priority::HIGH);
886
2
  sleeping_task_low.WaitUntilSleeping();
887
888
2
  WriteOptions write_options;
889
2
  write_options.sync = false;
890
891
2
  std::string key_space, value_space;
892
2
  ASSERT_OK(
893
2
      db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
894
2
  FlushOptions flush_options;
895
2
  flush_options.wait = false;
896
2
  ASSERT_OK(db_->Flush(flush_options));
897
2
  ASSERT_OK(
898
2
      db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
899
2
  ASSERT_OK(db_->SyncWAL());
900
901
2
  env_->SetFilesystemActive(false);
902
2
  NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
903
2
  sleeping_task_low.WakeUp();
904
905
2
  ASSERT_OK(OpenDB());
906
2
  std::string val;
907
2
  Value(2, &value_space);
908
2
  ASSERT_OK(ReadValue(2, &val));
909
2
  ASSERT_EQ(value_space, val);
910
911
2
  Value(1, &value_space);
912
2
  ASSERT_OK(ReadValue(1, &val));
913
2
  ASSERT_EQ(value_space, val);
914
2
  sleeping_task_low.WaitUntilDone();
915
2
}
916
917
INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool());
918
919
}  // namespace rocksdb
920
921
13.2k
int main(int argc, char** argv) {
922
13.2k
  ::testing::InitGoogleTest(&argc, argv);
923
13.2k
  return RUN_ALL_TESTS();
924
13.2k
}