YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/column_family_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include <algorithm>
25
#include <atomic>
26
#include <string>
27
#include <thread>
28
29
#include <gtest/gtest.h>
30
31
#include "yb/rocksdb/db.h"
32
#include "yb/rocksdb/db/db_impl.h"
33
#include "yb/rocksdb/db/version_set.h"
34
#include "yb/rocksdb/env.h"
35
#include "yb/rocksdb/iterator.h"
36
#include "yb/rocksdb/util/coding.h"
37
#include "yb/rocksdb/util/options_parser.h"
38
#include "yb/rocksdb/util/sync_point.h"
39
#include "yb/rocksdb/util/testharness.h"
40
#include "yb/rocksdb/util/testutil.h"
41
#include "yb/rocksdb/utilities/merge_operators.h"
42
43
#include "yb/util/string_util.h"
44
#include "yb/util/test_macros.h"
45
46
DECLARE_int32(memstore_arena_size_kb);
47
48
using std::atomic;
49
50
namespace rocksdb {
51
52
// counts how many operations were performed
53
class EnvCounter : public EnvWrapper {
54
 public:
55
  explicit EnvCounter(Env* base)
56
34
      : EnvWrapper(base) {
57
34
    num_new_writable_file_.store(0);
58
34
  }
59
60
2
  int GetNumberOfNewWritableFileCalls() {
61
2
    return num_new_writable_file_.load();
62
2
  }
63
  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
64
1.31k
                         const EnvOptions& soptions) override {
65
1.31k
    num_new_writable_file_.fetch_add(1);
66
1.31k
    return EnvWrapper::NewWritableFile(f, r, soptions);
67
1.31k
  }
68
69
 private:
70
  atomic<int> num_new_writable_file_;
71
};
72
73
class ColumnFamilyTest : public RocksDBTest {
74
 public:
75
34
  ColumnFamilyTest() : rnd_(139) {
76
34
    env_ = new EnvCounter(Env::Default());
77
34
    dbname_ = test::TmpDir() + "/column_family_test";
78
34
    db_options_.create_if_missing = true;
79
34
    db_options_.fail_if_options_file_error = true;
80
34
    db_options_.env = env_;
81
34
    CHECK_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
82
34
  }
83
84
34
  ~ColumnFamilyTest() {
85
34
    Close();
86
34
    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
87
34
    Destroy();
88
34
    delete env_;
89
34
  }
90
91
154
  void Close() {
92
248
    for (auto h : handles_) {
93
248
      if (h) {
94
231
        delete h;
95
231
      }
96
248
    }
97
154
    handles_.clear();
98
154
    names_.clear();
99
154
    delete db_;
100
154
    db_ = nullptr;
101
154
  }
102
103
  Status TryOpen(std::vector<std::string> cf,
104
83
                 std::vector<ColumnFamilyOptions> options = {}) {
105
83
    std::vector<ColumnFamilyDescriptor> column_families;
106
83
    names_.clear();
107
262
    for (size_t i = 0; i < cf.size(); ++i) {
108
179
      column_families.push_back(ColumnFamilyDescriptor(
109
128
          cf[i], options.size() == 0 ? column_family_options_ : options[i]));
110
179
      names_.push_back(cf[i]);
111
179
    }
112
83
    return DB::Open(db_options_, dbname_, column_families, &handles_, &db_);
113
83
  }
114
115
  Status OpenReadOnly(std::vector<std::string> cf,
116
3
                         std::vector<ColumnFamilyOptions> options = {}) {
117
3
    std::vector<ColumnFamilyDescriptor> column_families;
118
3
    names_.clear();
119
11
    for (size_t i = 0; i < cf.size(); ++i) {
120
8
      column_families.push_back(ColumnFamilyDescriptor(
121
8
          cf[i], options.size() == 0 ? column_family_options_ : options[i]));
122
8
      names_.push_back(cf[i]);
123
8
    }
124
3
    return DB::OpenForReadOnly(db_options_, dbname_, column_families, &handles_,
125
3
                               &db_);
126
3
  }
127
128
#ifndef ROCKSDB_LITE  // ReadOnlyDB is not supported
129
  void AssertOpenReadOnly(std::vector<std::string> cf,
130
1
                    std::vector<ColumnFamilyOptions> options = {}) {
131
1
    ASSERT_OK(OpenReadOnly(cf, options));
132
1
  }
133
#endif  // !ROCKSDB_LITE
134
135
136
  void Open(std::vector<std::string> cf,
137
80
            std::vector<ColumnFamilyOptions> options = {}) {
138
80
    ASSERT_OK(TryOpen(cf, options));
139
80
  }
140
141
29
  void Open() {
142
29
    Open({"default"});
143
29
  }
144
145
566
  DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }
146
147
177
  int GetProperty(int cf, std::string property) {
148
177
    std::string value;
149
177
    EXPECT_TRUE(dbfull()->GetProperty(handles_[cf], property, &value));
150
177
#ifndef CYGWIN
151
177
    return std::stoi(value);
152
#else
153
    return std::strtol(value.c_str(), 0 /* off */, 10 /* base */);
154
#endif
155
177
  }
156
157
46
  void Destroy() {
158
46
    Close();
159
46
    ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
160
46
  }
161
162
  void CreateColumnFamilies(
163
      const std::vector<std::string>& cfs,
164
36
      const std::vector<ColumnFamilyOptions>& options = {}) {
165
36
    int cfi = static_cast<int>(handles_.size());
166
36
    handles_.resize(cfi + cfs.size());
167
36
    names_.resize(cfi + cfs.size());
168
105
    for (size_t i = 0; i < cfs.size(); ++i) {
169
69
      const auto& current_cf_opt =
170
66
          options.empty() ? column_family_options_ : options[i];
171
69
      ASSERT_OK(
172
69
          db_->CreateColumnFamily(current_cf_opt, cfs[i], &handles_[cfi]));
173
69
      names_[cfi] = cfs[i];
174
175
69
#ifndef ROCKSDB_LITE  // RocksDBLite does not support GetDescriptor
176
      // Verify the CF options of the returned CF handle.
177
69
      ColumnFamilyDescriptor desc;
178
69
      ASSERT_OK(handles_[cfi]->GetDescriptor(&desc));
179
69
      if (current_cf_opt.arena_block_size == 0) {
180
        // When column family is created and specified arena_block_size is 0, we modify
181
        // arena_block_size to value specified by flags. See SanitizeOptions for info.
182
        // So reset it back to 0, to make verifier happy.
183
62
        desc.options.arena_block_size = 0;
184
62
      }
185
69
      ASSERT_OK(RocksDBOptionsParser::VerifyCFOptions(desc.options, current_cf_opt));
186
69
#endif  // !ROCKSDB_LITE
187
69
      cfi++;
188
69
    }
189
36
  }
190
191
38
  void Reopen(const std::vector<ColumnFamilyOptions> options = {}) {
192
38
    std::vector<std::string> names;
193
115
    for (auto name : names_) {
194
115
      if (name != "") {
195
111
        names.push_back(name);
196
111
      }
197
115
    }
198
38
    Close();
199
38
    assert(options.size() == 0 || names.size() == options.size());
200
38
    Open(names, options);
201
38
  }
202
203
10
  void CreateColumnFamiliesAndReopen(const std::vector<std::string>& cfs) {
204
10
    CreateColumnFamilies(cfs);
205
10
    Reopen();
206
10
  }
207
208
15
  void DropColumnFamilies(const std::vector<int>& cfs) {
209
16
    for (auto cf : cfs) {
210
16
      ASSERT_OK(db_->DropColumnFamily(handles_[cf]));
211
16
      delete handles_[cf];
212
16
      handles_[cf] = nullptr;
213
16
      names_[cf] = "";
214
16
    }
215
15
  }
216
217
156
  void PutRandomData(int cf, int num, int key_value_size, bool save = false) {
218
127k
    for (int i = 0; i < num; ++i) {
219
      // 10 bytes for key, rest is value
220
127k
      if (!save) {
221
126k
        ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11),
222
126k
                      RandomString(&rnd_, key_value_size - 10)));
223
440
      } else {
224
440
        std::string key = test::RandomKey(&rnd_, 11);
225
440
        keys_.insert(key);
226
440
        ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10)));
227
440
      }
228
127k
    }
229
156
  }
230
231
80
  void WaitForFlush(int cf) {
232
80
#ifndef ROCKSDB_LITE  // TEST functions are not supported in lite
233
80
    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
234
80
#endif  // !ROCKSDB_LITE
235
80
  }
236
237
8
  void WaitForCompaction() {
238
8
#ifndef ROCKSDB_LITE  // TEST functions are not supported in lite
239
8
    ASSERT_OK(dbfull()->TEST_WaitForCompact());
240
8
#endif  // !ROCKSDB_LITE
241
8
  }
242
243
12
  uint64_t MaxTotalInMemoryState() {
244
12
#ifndef ROCKSDB_LITE
245
12
    return dbfull()->TEST_MaxTotalInMemoryState();
246
#else
247
    return 0;
248
#endif  // !ROCKSDB_LITE
249
12
  }
250
251
6
  void AssertMaxTotalInMemoryState(uint64_t value) {
252
6
    ASSERT_EQ(value, MaxTotalInMemoryState());
253
6
  }
254
255
127k
  Status Put(int cf, const std::string& key, const std::string& value) {
256
127k
    return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
257
127k
  }
258
12
  Status Merge(int cf, const std::string& key, const std::string& value) {
259
12
    return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value));
260
12
  }
261
15
  Status Flush(int cf) {
262
15
    return db_->Flush(FlushOptions(), handles_[cf]);
263
15
  }
264
265
516
  std::string Get(int cf, const std::string& key) {
266
516
    ReadOptions options;
267
516
    options.verify_checksums = true;
268
516
    std::string result;
269
516
    Status s = db_->Get(options, handles_[cf], Slice(key), &result);
270
516
    if (s.IsNotFound()) {
271
27
      result = "NOT_FOUND";
272
489
    } else if (!s.ok()) {
273
0
      result = s.ToString();
274
0
    }
275
516
    return result;
276
516
  }
277
278
4
  void CompactAll(int cf) {
279
4
    ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
280
4
                                nullptr));
281
4
  }
282
283
0
  void Compact(int cf, const Slice& start, const Slice& limit) {
284
0
    ASSERT_OK(
285
0
        db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
286
0
  }
287
288
113
  int NumTableFilesAtLevel(int level, int cf) {
289
113
    return GetProperty(cf,
290
113
                       "rocksdb.num-files-at-level" + ToString(level));
291
113
  }
292
293
#ifndef ROCKSDB_LITE
294
  // Return spread of files per level
295
58
  std::string FilesPerLevel(int cf) {
296
58
    std::string result;
297
58
    int last_non_zero_offset = 0;
298
168
    for (int level = 0; level < dbfull()->NumberLevels(handles_[cf]); level++) {
299
110
      int f = NumTableFilesAtLevel(level, cf);
300
110
      char buf[100];
301
58
      snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
302
110
      result += buf;
303
110
      if (f > 0) {
304
58
        last_non_zero_offset = static_cast<int>(result.size());
305
58
      }
306
110
    }
307
58
    result.resize(last_non_zero_offset);
308
58
    return result;
309
58
  }
310
#endif
311
312
58
  void AssertFilesPerLevel(const std::string& value, int cf) {
313
58
#ifndef ROCKSDB_LITE
314
58
    ASSERT_EQ(value, FilesPerLevel(cf));
315
58
#endif
316
58
  }
317
318
#ifndef ROCKSDB_LITE  // GetLiveFilesMetaData is not supported
319
7
  int CountLiveFiles() {
320
7
    std::vector<LiveFileMetaData> metadata;
321
7
    db_->GetLiveFilesMetaData(&metadata);
322
7
    return static_cast<int>(metadata.size());
323
7
  }
324
#endif  // !ROCKSDB_LITE
325
326
7
  void AssertCountLiveFiles(int expected_value) {
327
7
#ifndef ROCKSDB_LITE
328
7
    ASSERT_EQ(expected_value, CountLiveFiles());
329
7
#endif
330
7
  }
331
332
  // Do n memtable flushes, each of which produces an sstable
333
  // covering the range [small,large].
334
  void MakeTables(int cf, int n, const std::string& small,
335
0
                  const std::string& large) {
336
0
    for (int i = 0; i < n; i++) {
337
0
      ASSERT_OK(Put(cf, small, "begin"));
338
0
      ASSERT_OK(Put(cf, large, "end"));
339
0
      ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf]));
340
0
    }
341
0
  }
342
343
#ifndef ROCKSDB_LITE  // GetSortedWalFiles is not supported
344
28
  int CountLiveLogFiles() {
345
28
    int micros_wait_for_log_deletion = 20000;
346
28
    env_->SleepForMicroseconds(micros_wait_for_log_deletion);
347
28
    int ret = 0;
348
28
    VectorLogPtr wal_files;
349
28
    Status s;
350
    // GetSortedWalFiles is a flakey function -- it gets all the wal_dir
351
    // children files and then later checks for their existence. if some of the
352
    // log files doesn't exist anymore, it reports an error. it does all of this
353
    // without DB mutex held, so if a background process deletes the log file
354
    // while the function is being executed, it returns an error. We retry the
355
    // function 10 times to avoid the error failing the test
356
28
    for (int retries = 0; retries < 10; ++retries) {
357
28
      wal_files.clear();
358
28
      s = db_->GetSortedWalFiles(&wal_files);
359
28
      if (s.ok()) {
360
28
        break;
361
28
      }
362
28
    }
363
28
    EXPECT_OK(s);
364
156
    for (const auto& wal : wal_files) {
365
156
      if (wal->Type() == kAliveLogFile) {
366
156
        ++ret;
367
156
      }
368
156
    }
369
28
    return ret;
370
0
    return 0;
371
28
  }
372
#endif  // !ROCKSDB_LITE
373
374
28
  void AssertCountLiveLogFiles(int value) {
375
28
#ifndef ROCKSDB_LITE  // GetSortedWalFiles is not supported
376
28
    ASSERT_EQ(value, CountLiveLogFiles());
377
28
#endif  // !ROCKSDB_LITE
378
28
  }
379
380
16
  void AssertNumberOfImmutableMemtables(std::vector<int> num_per_cf) {
381
16
    assert(num_per_cf.size() == handles_.size());
382
383
16
#ifndef ROCKSDB_LITE  // GetProperty is not supported in lite
384
80
    for (size_t i = 0; i < num_per_cf.size(); ++i) {
385
64
      ASSERT_EQ(num_per_cf[i], GetProperty(static_cast<int>(i),
386
64
                                           "rocksdb.num-immutable-mem-table"));
387
64
    }
388
16
#endif  // !ROCKSDB_LITE
389
16
  }
390
391
  void CopyFile(const std::string& source, const std::string& destination,
392
2
                uint64_t size = 0) {
393
2
    const EnvOptions soptions;
394
2
    unique_ptr<SequentialFile> srcfile;
395
2
    ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
396
2
    unique_ptr<WritableFile> destfile;
397
2
    ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
398
399
2
    if (size == 0) {
400
      // default argument means copy everything
401
2
      ASSERT_OK(env_->GetFileSize(source, &size));
402
2
    }
403
404
2
    uint8_t buffer[4096];
405
2
    Slice slice;
406
4
    while (size > 0) {
407
2
      uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
408
2
      ASSERT_OK(srcfile->Read(one, &slice, buffer));
409
2
      ASSERT_OK(destfile->Append(slice));
410
2
      size -= slice.size();
411
2
    }
412
2
    ASSERT_OK(destfile->Close());
413
2
  }
414
415
  std::vector<ColumnFamilyHandle*> handles_;
416
  std::vector<std::string> names_;
417
  std::set<std::string> keys_;
418
  ColumnFamilyOptions column_family_options_;
419
  DBOptions db_options_;
420
  std::string dbname_;
421
  DB* db_ = nullptr;
422
  EnvCounter* env_;
423
  Random rnd_;
424
};
425
426
1
TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) {
427
4
  for (int iter = 0; iter < 3; ++iter) {
428
3
    Open();
429
3
    CreateColumnFamilies({"one", "two", "three"});
430
15
    for (size_t i = 0; i < handles_.size(); ++i) {
431
12
      auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[i]);
432
12
      ASSERT_EQ(i, cfh->GetID());
433
12
    }
434
3
    if (iter == 1) {
435
1
      Reopen();
436
1
    }
437
3
    DropColumnFamilies({3});
438
3
    Reopen();
439
3
    if (iter == 2) {
440
      // this tests if max_column_family is correctly persisted with
441
      // WriteSnapshot()
442
1
      Reopen();
443
1
    }
444
3
    CreateColumnFamilies({"three2"});
445
    // ID 3 that was used for dropped column family "three" should not be reused
446
3
    auto cfh3 = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[3]);
447
3
    ASSERT_EQ(4U, cfh3->GetID());
448
3
    Close();
449
3
    Destroy();
450
3
  }
451
1
}
452
453
1
TEST_F(ColumnFamilyTest, AddDrop) {
454
1
  Open();
455
1
  CreateColumnFamilies({"one", "two", "three"});
456
1
  ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
457
1
  ASSERT_EQ("NOT_FOUND", Get(2, "fodor"));
458
1
  DropColumnFamilies({2});
459
1
  ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
460
1
  CreateColumnFamilies({"four"});
461
1
  ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
462
1
  ASSERT_OK(Put(1, "fodor", "mirko"));
463
1
  ASSERT_EQ("mirko", Get(1, "fodor"));
464
1
  ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
465
1
  Close();
466
1
  ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument());
467
1
  Open({"default", "one", "three", "four"});
468
1
  DropColumnFamilies({1});
469
1
  Reopen();
470
1
  Close();
471
472
1
  std::vector<std::string> families;
473
1
  ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
474
1
  sort(families.begin(), families.end());
475
1
  ASSERT_TRUE(families ==
476
1
              std::vector<std::string>({"default", "four", "three"}));
477
1
}
478
479
1
TEST_F(ColumnFamilyTest, DropTest) {
480
  // first iteration - dont reopen DB before dropping
481
  // second iteration - reopen DB before dropping
482
3
  for (int iter = 0; iter < 2; ++iter) {
483
2
    Open({"default"});
484
2
    CreateColumnFamiliesAndReopen({"pikachu"});
485
202
    for (int i = 0; i < 100; ++i) {
486
200
      ASSERT_OK(Put(1, ToString(i), "bar" + ToString(i)));
487
200
    }
488
2
    ASSERT_OK(Flush(1));
489
490
2
    if (iter == 1) {
491
1
      Reopen();
492
1
    }
493
2
    ASSERT_EQ("bar1", Get(1, "1"));
494
495
2
    AssertCountLiveFiles(1);
496
2
    DropColumnFamilies({1});
497
    // make sure that all files are deleted when we drop the column family
498
2
    AssertCountLiveFiles(0);
499
2
    Destroy();
500
2
  }
501
1
}
502
503
1
TEST_F(ColumnFamilyTest, WriteBatchFailure) {
504
1
  Open();
505
1
  CreateColumnFamiliesAndReopen({"one", "two"});
506
1
  WriteBatch batch;
507
1
  batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
508
1
  batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
509
1
  ASSERT_OK(db_->Write(WriteOptions(), &batch));
510
1
  DropColumnFamilies({1});
511
1
  WriteOptions woptions_ignore_missing_cf;
512
1
  woptions_ignore_missing_cf.ignore_missing_column_families = true;
513
1
  batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
514
1
  ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
515
1
  ASSERT_EQ("column-family", Get(0, "still here"));
516
1
  Status s = db_->Write(WriteOptions(), &batch);
517
1
  ASSERT_TRUE(s.IsInvalidArgument());
518
1
  Close();
519
1
}
520
521
1
TEST_F(ColumnFamilyTest, ReadWrite) {
522
1
  Open();
523
1
  CreateColumnFamiliesAndReopen({"one", "two"});
524
1
  ASSERT_OK(Put(0, "foo", "v1"));
525
1
  ASSERT_OK(Put(0, "bar", "v2"));
526
1
  ASSERT_OK(Put(1, "mirko", "v3"));
527
1
  ASSERT_OK(Put(0, "foo", "v2"));
528
1
  ASSERT_OK(Put(2, "fodor", "v5"));
529
530
5
  for (int iter = 0; iter <= 3; ++iter) {
531
4
    ASSERT_EQ("v2", Get(0, "foo"));
532
4
    ASSERT_EQ("v2", Get(0, "bar"));
533
4
    ASSERT_EQ("v3", Get(1, "mirko"));
534
4
    ASSERT_EQ("v5", Get(2, "fodor"));
535
4
    ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
536
4
    ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
537
4
    ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
538
4
    if (iter <= 1) {
539
2
      Reopen();
540
2
    }
541
4
  }
542
1
  Close();
543
1
}
544
545
1
TEST_F(ColumnFamilyTest, IgnoreRecoveredLog) {
546
1
  std::string backup_logs = dbname_ + "/backup_logs";
547
548
  // delete old files in backup_logs directory
549
1
  ASSERT_OK(env_->CreateDirIfMissing(dbname_));
550
1
  ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
551
1
  std::vector<std::string> old_files;
552
1
  ASSERT_OK(env_->GetChildren(backup_logs, &old_files));
553
2
  for (auto& file : old_files) {
554
2
    if (file != "." && file != "..") {
555
0
      ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file));
556
0
    }
557
2
  }
558
559
1
  column_family_options_.merge_operator =
560
1
      MergeOperators::CreateUInt64AddOperator();
561
1
  db_options_.wal_dir = dbname_ + "/logs";
562
1
  Destroy();
563
1
  Open();
564
1
  CreateColumnFamilies({"cf1", "cf2"});
565
566
  // fill up the DB
567
1
  std::string one, two, three;
568
1
  PutFixed64(&one, 1);
569
1
  PutFixed64(&two, 2);
570
1
  PutFixed64(&three, 3);
571
1
  ASSERT_OK(Merge(0, "foo", one));
572
1
  ASSERT_OK(Merge(1, "mirko", one));
573
1
  ASSERT_OK(Merge(0, "foo", one));
574
1
  ASSERT_OK(Merge(2, "bla", one));
575
1
  ASSERT_OK(Merge(2, "fodor", one));
576
1
  ASSERT_OK(Merge(0, "bar", one));
577
1
  ASSERT_OK(Merge(2, "bla", one));
578
1
  ASSERT_OK(Merge(1, "mirko", two));
579
1
  ASSERT_OK(Merge(1, "franjo", one));
580
581
  // copy the logs to backup
582
1
  std::vector<std::string> logs;
583
1
  ASSERT_OK(env_->GetChildren(db_options_.wal_dir, &logs));
584
3
  for (auto& log : logs) {
585
3
    if (log != ".." && log != ".") {
586
1
      CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log);
587
1
    }
588
3
  }
589
590
  // recover the DB
591
1
  Close();
592
593
  // 1. check consistency
594
  // 2. copy the logs from backup back to WAL dir. if the recovery happens
595
  // again on the same log files, this should lead to incorrect results
596
  // due to applying merge operator twice
597
  // 3. check consistency
598
3
  for (int iter = 0; iter < 2; ++iter) {
599
    // assert consistency
600
2
    Open({"default", "cf1", "cf2"});
601
2
    ASSERT_EQ(two, Get(0, "foo"));
602
2
    ASSERT_EQ(one, Get(0, "bar"));
603
2
    ASSERT_EQ(three, Get(1, "mirko"));
604
2
    ASSERT_EQ(one, Get(1, "franjo"));
605
2
    ASSERT_EQ(one, Get(2, "fodor"));
606
2
    ASSERT_EQ(two, Get(2, "bla"));
607
2
    Close();
608
609
2
    if (iter == 0) {
610
      // copy the logs from backup back to wal dir
611
3
      for (auto& log : logs) {
612
3
        if (log != ".." && log != ".") {
613
1
          CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log);
614
1
        }
615
3
      }
616
1
    }
617
2
  }
618
1
}
619
620
1
TEST_F(ColumnFamilyTest, FlushTest) {
621
1
  Open();
622
1
  CreateColumnFamiliesAndReopen({"one", "two"});
623
1
  ASSERT_OK(Put(0, "foo", "v1"));
624
1
  ASSERT_OK(Put(0, "bar", "v2"));
625
1
  ASSERT_OK(Put(1, "mirko", "v3"));
626
1
  ASSERT_OK(Put(0, "foo", "v2"));
627
1
  ASSERT_OK(Put(2, "fodor", "v5"));
628
629
3
  for (int j = 0; j < 2; j++) {
630
2
    ReadOptions ro;
631
2
    std::vector<Iterator*> iterators;
632
    // Hold super version.
633
2
    if (j == 0) {
634
1
      ASSERT_OK(db_->NewIterators(ro, handles_, &iterators));
635
1
    }
636
637
8
    for (int i = 0; i < 3; ++i) {
638
6
      uint64_t max_total_in_memory_state =
639
6
          MaxTotalInMemoryState();
640
6
      ASSERT_OK(Flush(i));
641
6
      AssertMaxTotalInMemoryState(max_total_in_memory_state);
642
6
    }
643
2
    ASSERT_OK(Put(1, "foofoo", "bar"));
644
2
    ASSERT_OK(Put(0, "foofoo", "bar"));
645
646
3
    for (auto* it : iterators) {
647
3
      delete it;
648
3
    }
649
2
  }
650
1
  Reopen();
651
652
4
  for (int iter = 0; iter <= 2; ++iter) {
653
3
    ASSERT_EQ("v2", Get(0, "foo"));
654
3
    ASSERT_EQ("v2", Get(0, "bar"));
655
3
    ASSERT_EQ("v3", Get(1, "mirko"));
656
3
    ASSERT_EQ("v5", Get(2, "fodor"));
657
3
    ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
658
3
    ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
659
3
    ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
660
3
    if (iter <= 1) {
661
2
      Reopen();
662
2
    }
663
3
  }
664
1
  Close();
665
1
}
666
667
// Makes sure that obsolete log files get deleted
668
1
TEST_F(ColumnFamilyTest, LogDeletionTest) {
669
1
  db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
670
1
  column_family_options_.arena_block_size = 4 * 1024;
671
1
  column_family_options_.write_buffer_size = 100000;  // 100KB
672
1
  Open();
673
1
  CreateColumnFamilies({"one", "two", "three", "four"});
674
  // Each bracket is one log file. if number is in (), it means
675
  // we don't need it anymore (it's been flushed)
676
  // []
677
1
  AssertCountLiveLogFiles(0);
678
1
  PutRandomData(0, 1, 100);
679
  // [0]
680
1
  PutRandomData(1, 1, 100);
681
  // [0, 1]
682
1
  PutRandomData(1, 1000, 100);
683
1
  WaitForFlush(1);
684
  // [0, (1)] [1]
685
1
  AssertCountLiveLogFiles(2);
686
1
  PutRandomData(0, 1, 100);
687
  // [0, (1)] [0, 1]
688
1
  AssertCountLiveLogFiles(2);
689
1
  PutRandomData(2, 1, 100);
690
  // [0, (1)] [0, 1, 2]
691
1
  PutRandomData(2, 1000, 100);
692
1
  WaitForFlush(2);
693
  // [0, (1)] [0, 1, (2)] [2]
694
1
  AssertCountLiveLogFiles(3);
695
1
  PutRandomData(2, 1000, 100);
696
1
  WaitForFlush(2);
697
  // [0, (1)] [0, 1, (2)] [(2)] [2]
698
1
  AssertCountLiveLogFiles(4);
699
1
  PutRandomData(3, 1, 100);
700
  // [0, (1)] [0, 1, (2)] [(2)] [2, 3]
701
1
  PutRandomData(1, 1, 100);
702
  // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3]
703
1
  AssertCountLiveLogFiles(4);
704
1
  PutRandomData(1, 1000, 100);
705
1
  WaitForFlush(1);
706
  // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1]
707
1
  AssertCountLiveLogFiles(5);
708
1
  PutRandomData(0, 1000, 100);
709
1
  WaitForFlush(0);
710
  // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0]
711
  // delete obsolete logs -->
712
  // [(1), 2, 3] [1, (0)] [0]
713
1
  AssertCountLiveLogFiles(3);
714
1
  PutRandomData(0, 1000, 100);
715
1
  WaitForFlush(0);
716
  // [(1), 2, 3] [1, (0)], [(0)] [0]
717
1
  AssertCountLiveLogFiles(4);
718
1
  PutRandomData(1, 1000, 100);
719
1
  WaitForFlush(1);
720
  // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1]
721
1
  AssertCountLiveLogFiles(5);
722
1
  PutRandomData(2, 1000, 100);
723
1
  WaitForFlush(2);
724
  // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2]
725
1
  AssertCountLiveLogFiles(6);
726
1
  PutRandomData(3, 1000, 100);
727
1
  WaitForFlush(3);
728
  // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3]
729
  // delete obsolete logs -->
730
  // [0, (1)] [1, (2)], [2, (3)] [3]
731
1
  AssertCountLiveLogFiles(4);
732
1
  Close();
733
1
}
734
735
// Makes sure that obsolete log files get deleted
736
1
TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
737
  // disable flushing stale column families
738
1
  db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
739
1
  Open();
740
1
  CreateColumnFamilies({"one", "two", "three"});
741
1
  ColumnFamilyOptions default_cf, one, two, three;
742
  // setup options. all column families have max_write_buffer_number setup to 10
743
  // "default" -> 100KB memtable, start flushing immediatelly
744
  // "one" -> 200KB memtable, start flushing with two immutable memtables
745
  // "two" -> 1MB memtable, start flushing with three immutable memtables
746
  // "three" -> 90KB memtable, start flushing with four immutable memtables
747
1
  default_cf.write_buffer_size = 100000;
748
1
  default_cf.arena_block_size = 4 * 4096;
749
1
  default_cf.max_write_buffer_number = 10;
750
1
  default_cf.min_write_buffer_number_to_merge = 1;
751
1
  default_cf.max_write_buffer_number_to_maintain = 0;
752
1
  one.write_buffer_size = 200000;
753
1
  one.arena_block_size = 4 * 4096;
754
1
  one.max_write_buffer_number = 10;
755
1
  one.min_write_buffer_number_to_merge = 2;
756
1
  one.max_write_buffer_number_to_maintain = 1;
757
1
  two.write_buffer_size = 1000000;
758
1
  two.arena_block_size = 4 * 4096;
759
1
  two.max_write_buffer_number = 10;
760
1
  two.min_write_buffer_number_to_merge = 3;
761
1
  two.max_write_buffer_number_to_maintain = 2;
762
1
  three.write_buffer_size = 4096 * 22;
763
1
  three.arena_block_size = 4096;
764
1
  three.max_write_buffer_number = 10;
765
1
  three.min_write_buffer_number_to_merge = 4;
766
1
  three.max_write_buffer_number_to_maintain = -1;
767
768
1
  Reopen({default_cf, one, two, three});
769
770
1
  int micros_wait_for_flush = 10000;
771
1
  PutRandomData(0, 100, 1000);
772
1
  WaitForFlush(0);
773
1
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
774
1
  AssertCountLiveLogFiles(1);
775
1
  PutRandomData(1, 200, 1000);
776
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
777
1
  AssertNumberOfImmutableMemtables({0, 1, 0, 0});
778
1
  AssertCountLiveLogFiles(2);
779
1
  PutRandomData(2, 1000, 1000);
780
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
781
1
  AssertNumberOfImmutableMemtables({0, 1, 1, 0});
782
1
  AssertCountLiveLogFiles(3);
783
1
  PutRandomData(2, 1000, 1000);
784
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
785
1
  AssertNumberOfImmutableMemtables({0, 1, 2, 0});
786
1
  AssertCountLiveLogFiles(4);
787
1
  PutRandomData(3, 93, 990);
788
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
789
1
  AssertNumberOfImmutableMemtables({0, 1, 2, 1});
790
1
  AssertCountLiveLogFiles(5);
791
1
  PutRandomData(3, 88, 990);
792
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
793
1
  AssertNumberOfImmutableMemtables({0, 1, 2, 2});
794
1
  AssertCountLiveLogFiles(6);
795
1
  PutRandomData(3, 88, 990);
796
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
797
1
  AssertNumberOfImmutableMemtables({0, 1, 2, 3});
798
1
  AssertCountLiveLogFiles(7);
799
1
  PutRandomData(0, 100, 1000);
800
1
  WaitForFlush(0);
801
1
  AssertNumberOfImmutableMemtables({0, 1, 2, 3});
802
1
  AssertCountLiveLogFiles(8);
803
1
  PutRandomData(2, 100, 10000);
804
1
  WaitForFlush(2);
805
1
  AssertNumberOfImmutableMemtables({0, 1, 0, 3});
806
1
  AssertCountLiveLogFiles(9);
807
1
  PutRandomData(3, 88, 990);
808
1
  WaitForFlush(3);
809
1
  AssertNumberOfImmutableMemtables({0, 1, 0, 0});
810
1
  AssertCountLiveLogFiles(10);
811
1
  PutRandomData(3, 88, 990);
812
1
  env_->SleepForMicroseconds(micros_wait_for_flush);
813
1
  AssertNumberOfImmutableMemtables({0, 1, 0, 1});
814
1
  AssertCountLiveLogFiles(11);
815
1
  PutRandomData(1, 200, 1000);
816
1
  WaitForFlush(1);
817
1
  AssertNumberOfImmutableMemtables({0, 0, 0, 1});
818
1
  AssertCountLiveLogFiles(5);
819
1
  PutRandomData(3, 88 * 3, 990);
820
1
  WaitForFlush(3);
821
1
  PutRandomData(3, 88 * 4, 990);
822
1
  WaitForFlush(3);
823
1
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
824
1
  AssertCountLiveLogFiles(12);
825
1
  PutRandomData(0, 100, 1000);
826
1
  WaitForFlush(0);
827
1
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
828
1
  AssertCountLiveLogFiles(12);
829
1
  PutRandomData(2, 3 * 1000, 1000);
830
1
  WaitForFlush(2);
831
1
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
832
1
  AssertCountLiveLogFiles(12);
833
1
  PutRandomData(1, 2*200, 1000);
834
1
  WaitForFlush(1);
835
1
  AssertNumberOfImmutableMemtables({0, 0, 0, 0});
836
1
  AssertCountLiveLogFiles(7);
837
1
  Close();
838
1
}
839
840
1
TEST_F(ColumnFamilyTest, DifferentMergeOperators) {
841
1
  Open();
842
1
  CreateColumnFamilies({"first", "second"});
843
1
  ColumnFamilyOptions default_cf, first, second;
844
1
  first.merge_operator = MergeOperators::CreateUInt64AddOperator();
845
1
  second.merge_operator = MergeOperators::CreateStringAppendOperator();
846
1
  Reopen({default_cf, first, second});
847
848
1
  std::string one, two, three;
849
1
  PutFixed64(&one, 1);
850
1
  PutFixed64(&two, 2);
851
1
  PutFixed64(&three, 3);
852
853
1
  ASSERT_OK(Put(0, "foo", two));
854
1
  ASSERT_OK(Put(0, "foo", one));
855
1
  ASSERT_TRUE(Merge(0, "foo", two).IsNotSupported());
856
1
  ASSERT_EQ(Get(0, "foo"), one);
857
858
1
  ASSERT_OK(Put(1, "foo", two));
859
1
  ASSERT_OK(Put(1, "foo", one));
860
1
  ASSERT_OK(Merge(1, "foo", two));
861
1
  ASSERT_EQ(Get(1, "foo"), three);
862
863
1
  ASSERT_OK(Put(2, "foo", two));
864
1
  ASSERT_OK(Put(2, "foo", one));
865
1
  ASSERT_OK(Merge(2, "foo", two));
866
1
  ASSERT_EQ(Get(2, "foo"), one + "," + two);
867
1
  Close();
868
1
}
869
870
1
TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
871
1
  Open();
872
1
  CreateColumnFamilies({"one", "two"});
873
1
  ColumnFamilyOptions default_cf, one, two;
874
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
875
1
  db_options_.disableDataSync = true;
876
877
1
  default_cf.compaction_style = kCompactionStyleLevel;
878
1
  default_cf.num_levels = 3;
879
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
880
1
  default_cf.target_file_size_base = 30 << 10;
881
1
  default_cf.source_compaction_factor = 100;
882
1
  BlockBasedTableOptions table_options;
883
1
  table_options.no_block_cache = true;
884
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
885
886
1
  one.compaction_style = kCompactionStyleUniversal;
887
888
1
  one.num_levels = 1;
889
  // trigger compaction if there are >= 4 files
890
1
  one.level0_file_num_compaction_trigger = 4;
891
1
  one.write_buffer_size = 120000;
892
893
1
  two.compaction_style = kCompactionStyleLevel;
894
1
  two.num_levels = 4;
895
1
  two.level0_file_num_compaction_trigger = 3;
896
1
  two.write_buffer_size = 100000;
897
898
1
  Reopen({default_cf, one, two});
899
900
  // SETUP column family "one" -- universal style
901
4
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) {
902
3
    PutRandomData(1, 10, 12000);
903
3
    PutRandomData(1, 1, 10);
904
3
    WaitForFlush(1);
905
3
    AssertFilesPerLevel(ToString(i + 1), 1);
906
3
  }
907
908
  // SETUP column family "two" -- level style with 4 levels
909
3
  for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) {
910
2
    PutRandomData(2, 10, 12000);
911
2
    PutRandomData(2, 1, 10);
912
2
    WaitForFlush(2);
913
2
    AssertFilesPerLevel(ToString(i + 1), 2);
914
2
  }
915
916
  // TRIGGER compaction "one"
917
1
  PutRandomData(1, 10, 12000);
918
1
  PutRandomData(1, 1, 10);
919
920
  // TRIGGER compaction "two"
921
1
  PutRandomData(2, 10, 12000);
922
1
  PutRandomData(2, 1, 10);
923
924
  // WAIT for compactions
925
1
  WaitForCompaction();
926
927
  // VERIFY compaction "one"
928
1
  AssertFilesPerLevel("1", 1);
929
930
  // VERIFY compaction "two"
931
1
  AssertFilesPerLevel("0,1", 2);
932
1
  CompactAll(2);
933
1
  AssertFilesPerLevel("0,1", 2);
934
935
1
  Close();
936
1
}
937
938
#ifndef ROCKSDB_LITE
939
// Sync points not supported in RocksDB Lite
940
941
1
TEST_F(ColumnFamilyTest, MultipleManualCompactions) {
942
1
  Open();
943
1
  CreateColumnFamilies({"one", "two"});
944
1
  ColumnFamilyOptions default_cf, one, two;
945
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
946
1
  db_options_.disableDataSync = true;
947
1
  db_options_.max_background_compactions = 3;
948
949
1
  default_cf.compaction_style = kCompactionStyleLevel;
950
1
  default_cf.num_levels = 3;
951
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
952
1
  default_cf.target_file_size_base = 30 << 10;
953
1
  default_cf.source_compaction_factor = 100;
954
1
  BlockBasedTableOptions table_options;
955
1
  table_options.no_block_cache = true;
956
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
957
958
1
  one.compaction_style = kCompactionStyleUniversal;
959
960
1
  one.num_levels = 1;
961
  // trigger compaction if there are >= 4 files
962
1
  one.level0_file_num_compaction_trigger = 4;
963
1
  one.write_buffer_size = 120000;
964
965
1
  two.compaction_style = kCompactionStyleLevel;
966
1
  two.num_levels = 4;
967
1
  two.level0_file_num_compaction_trigger = 3;
968
1
  two.write_buffer_size = 100000;
969
970
1
  Reopen({default_cf, one, two});
971
972
  // SETUP column family "one" -- universal style
973
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
974
2
    PutRandomData(1, 10, 12000, true);
975
2
    PutRandomData(1, 1, 10, true);
976
2
    WaitForFlush(1);
977
2
    AssertFilesPerLevel(ToString(i + 1), 1);
978
2
  }
979
1
  bool cf_1_1 = true;
980
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
981
1
      {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"},
982
1
       {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"},
983
1
       {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}});
984
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
985
2
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
986
2
        if (cf_1_1) {
987
1
          TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4");
988
1
          cf_1_1 = false;
989
1
          TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3");
990
1
        }
991
2
      });
992
993
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
994
1
  std::vector<std::thread> threads;
995
1
  threads.emplace_back([&] {
996
1
    CompactRangeOptions compact_options;
997
1
    compact_options.exclusive_manual_compaction = false;
998
1
    ASSERT_OK(
999
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1000
1
  });
1001
1002
  // SETUP column family "two" -- level style with 4 levels
1003
2
  for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
1004
1
    PutRandomData(2, 10, 12000);
1005
1
    PutRandomData(2, 1, 10);
1006
1
    WaitForFlush(2);
1007
1
    AssertFilesPerLevel(ToString(i + 1), 2);
1008
1
  }
1009
1
  threads.emplace_back([&] {
1010
1
    TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1");
1011
1
    CompactRangeOptions compact_options;
1012
1
    compact_options.exclusive_manual_compaction = false;
1013
1
    ASSERT_OK(
1014
1
        db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
1015
1
    TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2");
1016
1
  });
1017
1018
1
  TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5");
1019
2
  for (auto& t : threads) {
1020
2
    t.join();
1021
2
  }
1022
1023
  // VERIFY compaction "one"
1024
1
  AssertFilesPerLevel("1", 1);
1025
1026
  // VERIFY compaction "two"
1027
1
  AssertFilesPerLevel("0,1", 2);
1028
1
  CompactAll(2);
1029
1
  AssertFilesPerLevel("0,1", 2);
1030
  // Compare against saved keys
1031
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1032
23
  while (key_iter != keys_.end()) {
1033
22
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1034
22
    key_iter++;
1035
22
  }
1036
1
  Close();
1037
1
}
1038
1039
1
TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) {
1040
1
  Open();
1041
1
  CreateColumnFamilies({"one", "two"});
1042
1
  ColumnFamilyOptions default_cf, one, two;
1043
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1044
1
  db_options_.disableDataSync = true;
1045
1
  db_options_.max_background_compactions = 3;
1046
1047
1
  default_cf.compaction_style = kCompactionStyleLevel;
1048
1
  default_cf.num_levels = 3;
1049
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1050
1
  default_cf.target_file_size_base = 30 << 10;
1051
1
  default_cf.source_compaction_factor = 100;
1052
1
  BlockBasedTableOptions table_options;
1053
1
  table_options.no_block_cache = true;
1054
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1055
1056
1
  one.compaction_style = kCompactionStyleUniversal;
1057
1058
1
  one.num_levels = 1;
1059
  // trigger compaction if there are >= 4 files
1060
1
  one.level0_file_num_compaction_trigger = 4;
1061
1
  one.write_buffer_size = 120000;
1062
1063
1
  two.compaction_style = kCompactionStyleLevel;
1064
1
  two.num_levels = 4;
1065
1
  two.level0_file_num_compaction_trigger = 3;
1066
1
  two.write_buffer_size = 100000;
1067
1068
1
  Reopen({default_cf, one, two});
1069
1070
1
  bool cf_1_1 = true;
1071
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1072
1
      {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"},
1073
1
       {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"},
1074
1
       {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}});
1075
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1076
2
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1077
2
        if (cf_1_1) {
1078
1
          cf_1_1 = false;
1079
1
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
1080
1
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
1081
1
        }
1082
2
      });
1083
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1084
  // SETUP column family "one" -- universal style
1085
5
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1086
4
    PutRandomData(1, 10, 12000, true);
1087
4
    PutRandomData(1, 1, 10, true);
1088
4
    WaitForFlush(1);
1089
4
    AssertFilesPerLevel(ToString(i + 1), 1);
1090
4
  }
1091
1092
1
  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
1093
1094
  // SETUP column family "two" -- level style with 4 levels
1095
2
  for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
1096
1
    PutRandomData(2, 10, 12000);
1097
1
    PutRandomData(2, 1, 10);
1098
1
    WaitForFlush(2);
1099
1
    AssertFilesPerLevel(ToString(i + 1), 2);
1100
1
  }
1101
1
  std::thread threads([&] {
1102
1
    CompactRangeOptions compact_options;
1103
1
    compact_options.exclusive_manual_compaction = false;
1104
1
    ASSERT_OK(
1105
1
        db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
1106
1
    TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
1107
1
  });
1108
1109
1
  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
1110
1
  threads.join();
1111
1112
  // WAIT for compactions
1113
1
  WaitForCompaction();
1114
1115
  // VERIFY compaction "one"
1116
1
  AssertFilesPerLevel("1", 1);
1117
1118
  // VERIFY compaction "two"
1119
1
  AssertFilesPerLevel("0,1", 2);
1120
1
  CompactAll(2);
1121
1
  AssertFilesPerLevel("0,1", 2);
1122
  // Compare against saved keys
1123
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1124
45
  while (key_iter != keys_.end()) {
1125
44
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1126
44
    key_iter++;
1127
44
  }
1128
1
  Close();
1129
1
}
1130
1131
1
TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
1132
1
  Open();
1133
1
  CreateColumnFamilies({"one", "two"});
1134
1
  ColumnFamilyOptions default_cf, one, two;
1135
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1136
1
  db_options_.disableDataSync = true;
1137
1
  db_options_.max_background_compactions = 3;
1138
1139
1
  default_cf.compaction_style = kCompactionStyleLevel;
1140
1
  default_cf.num_levels = 3;
1141
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1142
1
  default_cf.target_file_size_base = 30 << 10;
1143
1
  default_cf.source_compaction_factor = 100;
1144
1
  BlockBasedTableOptions table_options;
1145
1
  table_options.no_block_cache = true;
1146
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1147
1148
1
  one.compaction_style = kCompactionStyleUniversal;
1149
1150
1
  one.num_levels = 1;
1151
  // trigger compaction if there are >= 4 files
1152
1
  one.level0_file_num_compaction_trigger = 4;
1153
1
  one.write_buffer_size = 120000;
1154
1155
1
  two.compaction_style = kCompactionStyleLevel;
1156
1
  two.num_levels = 4;
1157
1
  two.level0_file_num_compaction_trigger = 3;
1158
1
  two.write_buffer_size = 100000;
1159
1160
1
  Reopen({default_cf, one, two});
1161
1162
  // SETUP column family "one" -- universal style
1163
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1164
2
    PutRandomData(1, 10, 12000, true);
1165
2
    PutRandomData(1, 1, 10, true);
1166
2
    WaitForFlush(1);
1167
2
    AssertFilesPerLevel(ToString(i + 1), 1);
1168
2
  }
1169
1
  bool cf_1_1 = true;
1170
1
  bool cf_1_2 = true;
1171
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1172
1
      {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"},
1173
1
       {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"},
1174
1
       {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}});
1175
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1176
3
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1177
3
        if (cf_1_1) {
1178
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1179
1
          cf_1_1 = false;
1180
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1181
2
        } else if (cf_1_2) {
1182
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1183
1
          cf_1_2 = false;
1184
1
        }
1185
3
      });
1186
1187
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1188
1
  std::thread threads([&] {
1189
1
    CompactRangeOptions compact_options;
1190
1
    compact_options.exclusive_manual_compaction = false;
1191
1
    ASSERT_OK(
1192
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1193
1
  });
1194
1195
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1196
1197
  // SETUP column family "two" -- level style with 4 levels
1198
4
  for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) {
1199
3
    PutRandomData(2, 10, 12000);
1200
3
    PutRandomData(2, 1, 10);
1201
3
    WaitForFlush(2);
1202
3
    AssertFilesPerLevel(ToString(i + 1), 2);
1203
3
  }
1204
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1205
1
  threads.join();
1206
1207
  // WAIT for compactions
1208
1
  WaitForCompaction();
1209
1210
  // VERIFY compaction "one"
1211
1
  AssertFilesPerLevel("1", 1);
1212
1213
  // VERIFY compaction "two"
1214
1
  AssertFilesPerLevel("0,1", 2);
1215
1
  CompactAll(2);
1216
1
  AssertFilesPerLevel("0,1", 2);
1217
  // Compare against saved keys
1218
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1219
23
  while (key_iter != keys_.end()) {
1220
22
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1221
22
    key_iter++;
1222
22
  }
1223
1
  Close();
1224
1
}
1225
1226
1
TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
1227
1
  Open();
1228
1
  CreateColumnFamilies({"one"});
1229
1
  ColumnFamilyOptions default_cf, one;
1230
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1231
1
  db_options_.disableDataSync = true;
1232
1
  db_options_.max_background_compactions = 3;
1233
1234
1
  default_cf.compaction_style = kCompactionStyleLevel;
1235
1
  default_cf.num_levels = 3;
1236
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1237
1
  default_cf.target_file_size_base = 30 << 10;
1238
1
  default_cf.source_compaction_factor = 100;
1239
1
  BlockBasedTableOptions table_options;
1240
1
  table_options.no_block_cache = true;
1241
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1242
1243
1
  one.compaction_style = kCompactionStyleUniversal;
1244
1245
1
  one.num_levels = 1;
1246
  // trigger compaction if there are >= 4 files
1247
1
  one.level0_file_num_compaction_trigger = 4;
1248
1
  one.write_buffer_size = 120000;
1249
1250
1
  Reopen({default_cf, one});
1251
1252
  // SETUP column family "one" -- universal style
1253
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1254
2
    PutRandomData(1, 10, 12000, true);
1255
2
    PutRandomData(1, 1, 10, true);
1256
2
    WaitForFlush(1);
1257
2
    AssertFilesPerLevel(ToString(i + 1), 1);
1258
2
  }
1259
1
  bool cf_1_1 = true;
1260
1
  bool cf_1_2 = true;
1261
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1262
1
      {{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:2"},
1263
1
       {"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:5"},
1264
1
       {"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:2"},
1265
1
       {"ColumnFamilyTest::ManualManual:1",
1266
1
        "ColumnFamilyTest::ManualManual:3"}});
1267
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1268
3
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1269
3
        if (cf_1_1) {
1270
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:4");
1271
1
          cf_1_1 = false;
1272
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:3");
1273
2
        } else if (cf_1_2) {
1274
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:2");
1275
1
          cf_1_2 = false;
1276
1
        }
1277
3
      });
1278
1279
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1280
1
  std::thread threads([&] {
1281
1
    CompactRangeOptions compact_options;
1282
1
    compact_options.exclusive_manual_compaction = true;
1283
1
    ASSERT_OK(
1284
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1285
1
  });
1286
1287
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:5");
1288
1289
1
  WaitForFlush(1);
1290
1291
  // Add more L0 files and force another manual compaction
1292
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1293
2
    PutRandomData(1, 10, 12000, true);
1294
2
    PutRandomData(1, 1, 10, true);
1295
2
    WaitForFlush(1);
1296
2
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1297
2
                        1);
1298
2
  }
1299
1300
1
  std::thread threads1([&] {
1301
1
    CompactRangeOptions compact_options;
1302
1
    compact_options.exclusive_manual_compaction = false;
1303
1
    ASSERT_OK(
1304
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1305
1
  });
1306
1307
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:1");
1308
1309
1
  threads.join();
1310
1
  threads1.join();
1311
1
  WaitForCompaction();
1312
  // VERIFY compaction "one"
1313
1
  ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
1314
1315
  // Compare against saved keys
1316
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1317
45
  while (key_iter != keys_.end()) {
1318
44
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1319
44
    key_iter++;
1320
44
  }
1321
1
  Close();
1322
1
}
1323
1324
1
TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
1325
1
  Open();
1326
1
  CreateColumnFamilies({"one"});
1327
1
  ColumnFamilyOptions default_cf, one;
1328
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1329
1
  db_options_.disableDataSync = true;
1330
1
  db_options_.max_background_compactions = 3;
1331
1332
1
  default_cf.compaction_style = kCompactionStyleLevel;
1333
1
  default_cf.num_levels = 3;
1334
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1335
1
  default_cf.target_file_size_base = 30 << 10;
1336
1
  default_cf.source_compaction_factor = 100;
1337
1
  BlockBasedTableOptions table_options;
1338
1
  table_options.no_block_cache = true;
1339
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1340
1341
1
  one.compaction_style = kCompactionStyleUniversal;
1342
1343
1
  one.num_levels = 1;
1344
  // trigger compaction if there are >= 4 files
1345
1
  one.level0_file_num_compaction_trigger = 4;
1346
1
  one.write_buffer_size = 120000;
1347
1348
1
  Reopen({default_cf, one});
1349
1350
  // SETUP column family "one" -- universal style
1351
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1352
2
    PutRandomData(1, 10, 12000, true);
1353
2
    PutRandomData(1, 1, 10, true);
1354
2
    WaitForFlush(1);
1355
2
    AssertFilesPerLevel(ToString(i + 1), 1);
1356
2
  }
1357
1
  bool cf_1_1 = true;
1358
1
  bool cf_1_2 = true;
1359
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1360
1
      {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
1361
1
       {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
1362
1
       {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"},
1363
1
       {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
1364
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1365
2
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1366
2
        if (cf_1_1) {
1367
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1368
1
          cf_1_1 = false;
1369
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1370
1
        } else if (cf_1_2) {
1371
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1372
1
          cf_1_2 = false;
1373
1
        }
1374
2
      });
1375
1376
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1377
1
  std::thread threads([&] {
1378
1
    CompactRangeOptions compact_options;
1379
1
    compact_options.exclusive_manual_compaction = false;
1380
1
    ASSERT_OK(
1381
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1382
1
  });
1383
1384
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1385
1386
1
  WaitForFlush(1);
1387
1388
  // Add more L0 files and force automatic compaction
1389
5
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1390
4
    PutRandomData(1, 10, 12000, true);
1391
4
    PutRandomData(1, 1, 10, true);
1392
4
    WaitForFlush(1);
1393
4
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1394
4
                        1);
1395
4
  }
1396
1397
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1398
1399
1
  threads.join();
1400
1
  WaitForCompaction();
1401
  // VERIFY compaction "one"
1402
1
  ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
1403
1404
  // Compare against saved keys
1405
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1406
67
  while (key_iter != keys_.end()) {
1407
66
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1408
66
    key_iter++;
1409
66
  }
1410
1
  Close();
1411
1
}
1412
1413
1
TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
1414
1
  Open();
1415
1
  CreateColumnFamilies({"one"});
1416
1
  ColumnFamilyOptions default_cf, one;
1417
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1418
1
  db_options_.disableDataSync = true;
1419
1
  db_options_.max_background_compactions = 3;
1420
1421
1
  default_cf.compaction_style = kCompactionStyleLevel;
1422
1
  default_cf.num_levels = 3;
1423
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1424
1
  default_cf.target_file_size_base = 30 << 10;
1425
1
  default_cf.source_compaction_factor = 100;
1426
1
  BlockBasedTableOptions table_options;
1427
1
  table_options.no_block_cache = true;
1428
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1429
1430
1
  one.compaction_style = kCompactionStyleLevel;
1431
1432
1
  one.num_levels = 1;
1433
  // trigger compaction if there are >= 4 files
1434
1
  one.level0_file_num_compaction_trigger = 4;
1435
1
  one.write_buffer_size = 120000;
1436
1437
1
  Reopen({default_cf, one});
1438
1439
  // SETUP column family "one" -- level style
1440
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1441
2
    PutRandomData(1, 10, 12000, true);
1442
2
    PutRandomData(1, 1, 10, true);
1443
2
    WaitForFlush(1);
1444
2
    AssertFilesPerLevel(ToString(i + 1), 1);
1445
2
  }
1446
1
  bool cf_1_1 = true;
1447
1
  bool cf_1_2 = true;
1448
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1449
1
      {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
1450
1
       {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
1451
1
       {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"},
1452
1
       {"LevelCompactionPicker::PickCompactionBySize:0",
1453
1
        "ColumnFamilyTest::ManualAuto:3"},
1454
1
       {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
1455
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1456
2
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1457
2
        if (cf_1_1) {
1458
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1459
1
          cf_1_1 = false;
1460
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1461
1
        } else if (cf_1_2) {
1462
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1463
1
          cf_1_2 = false;
1464
1
        }
1465
2
      });
1466
1467
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1468
1
  std::thread threads([&] {
1469
1
    CompactRangeOptions compact_options;
1470
1
    compact_options.exclusive_manual_compaction = false;
1471
1
    ASSERT_OK(
1472
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1473
1
  });
1474
1475
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1476
1477
  // Add more L0 files and force automatic compaction
1478
5
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1479
4
    PutRandomData(1, 10, 12000, true);
1480
4
    PutRandomData(1, 1, 10, true);
1481
4
    WaitForFlush(1);
1482
4
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1483
4
                        1);
1484
4
  }
1485
1486
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1487
1488
1
  threads.join();
1489
1
  WaitForCompaction();
1490
  // VERIFY compaction "one"
1491
1
  AssertFilesPerLevel("0,1", 1);
1492
1493
  // Compare against saved keys
1494
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1495
67
  while (key_iter != keys_.end()) {
1496
66
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1497
66
    key_iter++;
1498
66
  }
1499
1
  Close();
1500
1
}
1501
1502
// This test checks for automatic getting a conflict if there is a
1503
// manual which has not yet been scheduled.
1504
// The manual compaction waits in NotScheduled
1505
// We generate more files and then trigger an automatic compaction
1506
// This will wait because there is an unscheduled manual compaction.
1507
// Once the conflict is hit, the manual compaction starts and ends
1508
// Then another automatic will start and end.
1509
1
TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
1510
1
  Open();
1511
1
  CreateColumnFamilies({"one"});
1512
1
  ColumnFamilyOptions default_cf, one;
1513
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1514
1
  db_options_.disableDataSync = true;
1515
1
  db_options_.max_background_compactions = 3;
1516
1517
1
  default_cf.compaction_style = kCompactionStyleLevel;
1518
1
  default_cf.num_levels = 3;
1519
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1520
1
  default_cf.target_file_size_base = 30 << 10;
1521
1
  default_cf.source_compaction_factor = 100;
1522
1
  BlockBasedTableOptions table_options;
1523
1
  table_options.no_block_cache = true;
1524
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1525
1526
1
  one.compaction_style = kCompactionStyleUniversal;
1527
1528
1
  one.num_levels = 1;
1529
  // trigger compaction if there are >= 4 files
1530
1
  one.level0_file_num_compaction_trigger = 4;
1531
1
  one.write_buffer_size = 120000;
1532
1533
1
  Reopen({default_cf, one});
1534
1535
  // SETUP column family "one" -- universal style
1536
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1537
2
    PutRandomData(1, 10, 12000, true);
1538
2
    PutRandomData(1, 1, 10, true);
1539
2
    WaitForFlush(1);
1540
2
    AssertFilesPerLevel(ToString(i + 1), 1);
1541
2
  }
1542
1
  bool cf_1_1 = true;
1543
1
  bool cf_1_2 = true;
1544
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1545
1
      {{"ColumnFamilyTest::ManualAutoCon:7",
1546
1
        "DBImpl::RunManualCompaction()::Conflict"},
1547
1
       {"ColumnFamilyTest::ManualAutoCon:9",
1548
1
        "ColumnFamilyTest::ManualAutoCon:8"},
1549
1
       {"ColumnFamilyTest::ManualAutoCon:2",
1550
1
        "ColumnFamilyTest::ManualAutoCon:6"},
1551
1
       {"ColumnFamilyTest::ManualAutoCon:4",
1552
1
        "ColumnFamilyTest::ManualAutoCon:5"},
1553
1
       {"ColumnFamilyTest::ManualAutoCon:1",
1554
1
        "ColumnFamilyTest::ManualAutoCon:2"},
1555
1
       {"ColumnFamilyTest::ManualAutoCon:1",
1556
1
        "ColumnFamilyTest::ManualAutoCon:3"}});
1557
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1558
3
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1559
3
        if (cf_1_1) {
1560
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4");
1561
1
          cf_1_1 = false;
1562
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3");
1563
2
        } else if (cf_1_2) {
1564
1
          cf_1_2 = false;
1565
1
          TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2");
1566
1
        }
1567
3
      });
1568
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1569
1
      "DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) {
1570
1
        InstrumentedMutex* mutex = static_cast<InstrumentedMutex*>(arg);
1571
1
        mutex->Unlock();
1572
1
        TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9");
1573
1
        TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7");
1574
1
        mutex->Lock();
1575
1
      });
1576
1577
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1578
1
  std::thread threads([&] {
1579
1
    CompactRangeOptions compact_options;
1580
1
    compact_options.exclusive_manual_compaction = false;
1581
1
    ASSERT_OK(
1582
1
        db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1583
1
    TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6");
1584
1
  });
1585
1586
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8");
1587
1
  WaitForFlush(1);
1588
1589
  // Add more L0 files and force automatic compaction
1590
5
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1591
4
    PutRandomData(1, 10, 12000, true);
1592
4
    PutRandomData(1, 1, 10, true);
1593
4
    WaitForFlush(1);
1594
4
    AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1595
4
                        1);
1596
4
  }
1597
1598
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5");
1599
  // Add more L0 files and force automatic compaction
1600
5
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1601
4
    PutRandomData(1, 10, 12000, true);
1602
4
    PutRandomData(1, 1, 10, true);
1603
4
    WaitForFlush(1);
1604
4
  }
1605
1
  TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1");
1606
1607
1
  threads.join();
1608
1
  WaitForCompaction();
1609
  // VERIFY compaction "one"
1610
1
  ASSERT_LE(NumTableFilesAtLevel(0, 1), 3);
1611
1612
  // Compare against saved keys
1613
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1614
111
  while (key_iter != keys_.end()) {
1615
110
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1616
110
    key_iter++;
1617
110
  }
1618
1619
1
  Close();
1620
1
}
1621
1622
// In this test, we generate enough files to trigger automatic compactions.
1623
// The automatic compaction waits in NonTrivial:AfterRun
1624
// We generate more files and then trigger an automatic compaction
1625
// This will wait because the automatic compaction has files it needs.
1626
// Once the conflict is hit, the automatic compaction starts and ends
1627
// Then the manual will run and end.
1628
1
TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
1629
1
  Open();
1630
1
  CreateColumnFamilies({"one"});
1631
1
  ColumnFamilyOptions default_cf, one;
1632
1
  db_options_.max_open_files = 20;  // only 10 files in file cache
1633
1
  db_options_.disableDataSync = true;
1634
1
  db_options_.max_background_compactions = 3;
1635
1636
1
  default_cf.compaction_style = kCompactionStyleLevel;
1637
1
  default_cf.num_levels = 3;
1638
1
  default_cf.write_buffer_size = 64 << 10;  // 64KB
1639
1
  default_cf.target_file_size_base = 30 << 10;
1640
1
  default_cf.source_compaction_factor = 100;
1641
1
  BlockBasedTableOptions table_options;
1642
1
  table_options.no_block_cache = true;
1643
1
  default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1644
1645
1
  one.compaction_style = kCompactionStyleUniversal;
1646
1647
1
  one.num_levels = 1;
1648
  // trigger compaction if there are >= 4 files
1649
1
  one.level0_file_num_compaction_trigger = 4;
1650
1
  one.write_buffer_size = 120000;
1651
1652
1
  Reopen({default_cf, one});
1653
1654
1
  bool cf_1_1 = true;
1655
1
  bool cf_1_2 = true;
1656
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
1657
1
      {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"},
1658
1
       {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"},
1659
1
       {"CompactionPicker::CompactRange:Conflict",
1660
1
        "ColumnFamilyTest::AutoManual:3"}});
1661
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
1662
2
      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
1663
2
        if (cf_1_1) {
1664
1
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
1665
1
          cf_1_1 = false;
1666
1
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
1667
1
        } else if (cf_1_2) {
1668
1
          TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
1669
1
          cf_1_2 = false;
1670
1
        }
1671
2
      });
1672
1673
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1674
1675
  // SETUP column family "one" -- universal style
1676
5
  for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1677
4
    PutRandomData(1, 10, 12000, true);
1678
4
    PutRandomData(1, 1, 10, true);
1679
4
    WaitForFlush(1);
1680
4
    AssertFilesPerLevel(ToString(i + 1), 1);
1681
4
  }
1682
1683
1
  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
1684
1685
  // Add another L0 file and force automatic compaction
1686
3
  for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1687
2
    PutRandomData(1, 10, 12000, true);
1688
2
    PutRandomData(1, 1, 10, true);
1689
2
    WaitForFlush(1);
1690
2
  }
1691
1692
1
  CompactRangeOptions compact_options;
1693
1
  compact_options.exclusive_manual_compaction = false;
1694
1
  ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1695
1696
1
  TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
1697
1698
1
  WaitForCompaction();
1699
  // VERIFY compaction "one"
1700
1
  AssertFilesPerLevel("1", 1);
1701
  // Compare against saved keys
1702
1
  std::set<std::string>::iterator key_iter = keys_.begin();
1703
67
  while (key_iter != keys_.end()) {
1704
66
    ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1705
66
    key_iter++;
1706
66
  }
1707
1708
1
  Close();
1709
1
}
1710
#endif  // !ROCKSDB_LITE
1711
1712
#ifndef ROCKSDB_LITE  // Tailing interator not supported
1713
namespace {
1714
18
std::string IterStatus(Iterator* iter) {
1715
18
  std::string result;
1716
18
  if (iter->Valid()) {
1717
12
    result = iter->key().ToString() + "->" + iter->value().ToString();
1718
6
  } else {
1719
6
    result = "(invalid)";
1720
6
  }
1721
18
  return result;
1722
18
}
1723
}  // anonymous namespace
1724
1725
1
TEST_F(ColumnFamilyTest, NewIteratorsTest) {
1726
  // iter == 0 -- no tailing
1727
  // iter == 2 -- tailing
1728
3
  for (int iter = 0; iter < 2; ++iter) {
1729
2
    Open();
1730
2
    CreateColumnFamiliesAndReopen({"one", "two"});
1731
2
    ASSERT_OK(Put(0, "a", "b"));
1732
2
    ASSERT_OK(Put(1, "b", "a"));
1733
2
    ASSERT_OK(Put(2, "c", "m"));
1734
2
    ASSERT_OK(Put(2, "v", "t"));
1735
2
    std::vector<Iterator*> iterators;
1736
2
    ReadOptions options;
1737
2
    options.tailing = (iter == 1);
1738
2
    ASSERT_OK(db_->NewIterators(options, handles_, &iterators));
1739
1740
6
    for (auto it : iterators) {
1741
6
      it->SeekToFirst();
1742
6
    }
1743
2
    ASSERT_EQ(IterStatus(iterators[0]), "a->b");
1744
2
    ASSERT_EQ(IterStatus(iterators[1]), "b->a");
1745
2
    ASSERT_EQ(IterStatus(iterators[2]), "c->m");
1746
1747
2
    ASSERT_OK(Put(1, "x", "x"));
1748
1749
6
    for (auto it : iterators) {
1750
6
      it->Next();
1751
6
    }
1752
1753
2
    ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
1754
2
    if (iter == 0) {
1755
      // no tailing
1756
1
      ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
1757
1
    } else {
1758
      // tailing
1759
1
      ASSERT_EQ(IterStatus(iterators[1]), "x->x");
1760
1
    }
1761
2
    ASSERT_EQ(IterStatus(iterators[2]), "v->t");
1762
1763
6
    for (auto it : iterators) {
1764
6
      delete it;
1765
6
    }
1766
2
    Destroy();
1767
2
  }
1768
1
}
1769
#endif  // !ROCKSDB_LITE
1770
1771
#ifndef ROCKSDB_LITE  // ReadOnlyDB is not supported
1772
1
TEST_F(ColumnFamilyTest, ReadOnlyDBTest) {
1773
1
  Open();
1774
1
  CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
1775
1
  ASSERT_OK(Put(0, "a", "b"));
1776
1
  ASSERT_OK(Put(1, "foo", "bla"));
1777
1
  ASSERT_OK(Put(2, "foo", "blabla"));
1778
1
  ASSERT_OK(Put(3, "foo", "blablabla"));
1779
1
  ASSERT_OK(Put(4, "foo", "blablablabla"));
1780
1781
1
  DropColumnFamilies({2});
1782
1
  Close();
1783
  // open only a subset of column families
1784
1
  AssertOpenReadOnly({"default", "one", "four"});
1785
1
  ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
1786
1
  ASSERT_EQ("bla", Get(1, "foo"));
1787
1
  ASSERT_EQ("blablablabla", Get(2, "foo"));
1788
1789
1790
  // test newiterators
1791
1
  {
1792
1
    std::vector<Iterator*> iterators;
1793
1
    ASSERT_OK(db_->NewIterators(ReadOptions(), handles_, &iterators));
1794
3
    for (auto it : iterators) {
1795
3
      it->SeekToFirst();
1796
3
    }
1797
1
    ASSERT_EQ(IterStatus(iterators[0]), "a->b");
1798
1
    ASSERT_EQ(IterStatus(iterators[1]), "foo->bla");
1799
1
    ASSERT_EQ(IterStatus(iterators[2]), "foo->blablablabla");
1800
3
    for (auto it : iterators) {
1801
3
      it->Next();
1802
3
    }
1803
1
    ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
1804
1
    ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
1805
1
    ASSERT_EQ(IterStatus(iterators[2]), "(invalid)");
1806
1807
3
    for (auto it : iterators) {
1808
3
      delete it;
1809
3
    }
1810
1
  }
1811
1812
1
  Close();
1813
  // can't open dropped column family
1814
1
  Status s = OpenReadOnly({"default", "one", "two"});
1815
1
  ASSERT_TRUE(!s.ok());
1816
1817
  // Can't open without specifying default column family
1818
1
  s = OpenReadOnly({"one", "four"});
1819
1
  ASSERT_TRUE(!s.ok());
1820
1
}
1821
#endif  // !ROCKSDB_LITE
1822
1823
1
TEST_F(ColumnFamilyTest, DontRollEmptyLogs) {
1824
1
  Open();
1825
1
  CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
1826
1827
6
  for (size_t i = 0; i < handles_.size(); ++i) {
1828
5
    PutRandomData(static_cast<int>(i), 10, 100);
1829
5
  }
1830
1
  int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls();
1831
  // this will trigger the flushes
1832
6
  for (int i = 0; i <= 4; ++i) {
1833
5
    ASSERT_OK(Flush(i));
1834
5
  }
1835
1836
5
  for (int i = 0; i < 4; ++i) {
1837
4
    WaitForFlush(i);
1838
4
  }
1839
1
  int total_new_writable_files =
1840
1
      env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start;
1841
1
  ASSERT_EQ(static_cast<size_t>(total_new_writable_files), handles_.size() * 2 + 1);
1842
1
  Close();
1843
1
}
1844
1845
1
TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) {
1846
1
  Open();
1847
1
  CreateColumnFamilies({"one", "two"});
1848
1
  ColumnFamilyOptions default_cf, one, two;
1849
1
  default_cf.write_buffer_size = 100000;  // small write buffer size
1850
1
  default_cf.arena_block_size = 4096;
1851
1
  default_cf.disable_auto_compactions = true;
1852
1
  one.disable_auto_compactions = true;
1853
1
  two.disable_auto_compactions = true;
1854
1
  db_options_.max_total_wal_size = 210000;
1855
1856
1
  Reopen({default_cf, one, two});
1857
1858
1
  PutRandomData(2, 1, 10);  // 10 bytes
1859
3
  for (int i = 0; i < 2; ++i) {
1860
2
    PutRandomData(0, 100, 1000);  // flush
1861
2
    WaitForFlush(0);
1862
1863
2
    AssertCountLiveFiles(i + 1);
1864
2
  }
1865
  // third flush. now, CF [two] should be detected as stale and flushed
1866
  // column family 1 should not be flushed since it's empty
1867
1
  PutRandomData(0, 100, 1000);  // flush
1868
1
  WaitForFlush(0);
1869
1
  WaitForFlush(2);
1870
  // 3 files for default column families, 1 file for column family [two], zero
1871
  // files for column family [one], because it's empty
1872
1
  AssertCountLiveFiles(4);
1873
1
  Close();
1874
1
}
1875
1876
1
TEST_F(ColumnFamilyTest, CreateMissingColumnFamilies) {
1877
1
  Status s = TryOpen({"one", "two"});
1878
1
  ASSERT_TRUE(!s.ok());
1879
1
  db_options_.create_missing_column_families = true;
1880
1
  s = TryOpen({"default", "one", "two"});
1881
1
  ASSERT_TRUE(s.ok());
1882
1
  Close();
1883
1
}
1884
1885
1
TEST_F(ColumnFamilyTest, SanitizeOptions) {
1886
1
  DBOptions db_options;
1887
3
  for (int s = kCompactionStyleLevel; s <= kCompactionStyleUniversal; ++s) {
1888
8
    for (int l = 0; l <= 2; l++) {
1889
24
      for (int i = 1; i <= 3; i++) {
1890
72
        for (int j = 1; j <= 3; j++) {
1891
216
          for (int k = 1; k <= 3; k++) {
1892
162
            ColumnFamilyOptions original;
1893
162
            original.compaction_style = static_cast<CompactionStyle>(s);
1894
162
            original.num_levels = l;
1895
162
            original.level0_stop_writes_trigger = i;
1896
162
            original.level0_slowdown_writes_trigger = j;
1897
162
            original.level0_file_num_compaction_trigger = k;
1898
162
            original.write_buffer_size =
1899
162
                l * 4 * 1024 * 1024 + i * 1024 * 1024 + j * 1024 + k;
1900
1901
162
            ColumnFamilyOptions result =
1902
162
                SanitizeOptions(db_options, nullptr, original);
1903
162
            ASSERT_TRUE(result.level0_stop_writes_trigger >=
1904
162
                        result.level0_slowdown_writes_trigger);
1905
162
            ASSERT_TRUE(result.level0_slowdown_writes_trigger >=
1906
162
                        result.level0_file_num_compaction_trigger);
1907
162
            ASSERT_TRUE(result.level0_file_num_compaction_trigger ==
1908
162
                        original.level0_file_num_compaction_trigger);
1909
162
            if (s == kCompactionStyleLevel) {
1910
81
              ASSERT_GE(result.num_levels, 2);
1911
81
            } else {
1912
81
              ASSERT_GE(result.num_levels, 1);
1913
81
              if (original.num_levels >= 1) {
1914
54
                ASSERT_EQ(result.num_levels, original.num_levels);
1915
54
              }
1916
81
            }
1917
1918
            // Make sure Sanitize options sets arena_block_size default of 1MB or to 1/8 of
1919
            // the write_buffer_size, rounded up to a multiple of 4k.
1920
162
            size_t expected_arena_block_size = l * 4 * 1024 * 1024 / 8 + i * 1024 * 1024 / 8;
1921
162
            if (j + k != 0) {
1922
              // not a multiple of 4k, round up 4k
1923
162
              expected_arena_block_size += 4 * 1024;
1924
162
            }
1925
162
            expected_arena_block_size = std::min(
1926
162
                static_cast<size_t>(FLAGS_memstore_arena_size_kb << 10), expected_arena_block_size);
1927
162
            ASSERT_EQ(expected_arena_block_size, result.arena_block_size);
1928
162
          }
1929
54
        }
1930
18
      }
1931
6
    }
1932
2
  }
1933
1
}
1934
1935
1
TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) {
1936
  // iter 0 -- drop CF, don't reopen
1937
  // iter 1 -- delete CF, reopen
1938
3
  for (int iter = 0; iter < 2; ++iter) {
1939
2
    db_options_.create_missing_column_families = true;
1940
2
    db_options_.max_open_files = 20;
1941
    // delete obsolete files always
1942
2
    db_options_.delete_obsolete_files_period_micros = 0;
1943
2
    Open({"default", "one", "two"});
1944
2
    ColumnFamilyOptions options;
1945
2
    options.level0_file_num_compaction_trigger = 100;
1946
2
    options.level0_slowdown_writes_trigger = 200;
1947
2
    options.level0_stop_writes_trigger = 200;
1948
2
    options.write_buffer_size = 100000;  // small write buffer size
1949
2
    Reopen({options, options, options});
1950
1951
    // 1MB should create ~10 files for each CF
1952
2
    int kKeysNum = 10000;
1953
2
    PutRandomData(0, kKeysNum, 100);
1954
2
    PutRandomData(1, kKeysNum, 100);
1955
2
    PutRandomData(2, kKeysNum, 100);
1956
1957
2
    {
1958
2
      std::unique_ptr<Iterator> iterator(
1959
2
          db_->NewIterator(ReadOptions(), handles_[2]));
1960
2
      iterator->SeekToFirst();
1961
1962
2
      if (iter == 0) {
1963
        // Drop CF two
1964
1
        ASSERT_OK(db_->DropColumnFamily(handles_[2]));
1965
1
      } else {
1966
        // delete CF two
1967
1
        delete handles_[2];
1968
1
        handles_[2] = nullptr;
1969
1
      }
1970
      // Make sure iterator created can still be used.
1971
2
      int count = 0;
1972
20.0k
      for (; iterator->Valid(); iterator->Next()) {
1973
20.0k
        ASSERT_OK(iterator->status());
1974
20.0k
        ++count;
1975
20.0k
      }
1976
2
      ASSERT_OK(iterator->status());
1977
2
      ASSERT_EQ(count, kKeysNum);
1978
2
    }
1979
1980
    // Add bunch more data to other CFs
1981
2
    PutRandomData(0, kKeysNum, 100);
1982
2
    PutRandomData(1, kKeysNum, 100);
1983
1984
2
    if (iter == 1) {
1985
1
      Reopen();
1986
1
    }
1987
1988
    // Since we didn't delete CF handle, RocksDB's contract guarantees that
1989
    // we're still able to read dropped CF
1990
8
    for (int i = 0; i < 3; ++i) {
1991
6
      std::unique_ptr<Iterator> iterator(
1992
6
          db_->NewIterator(ReadOptions(), handles_[i]));
1993
6
      int count = 0;
1994
100k
      for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1995
100k
        ASSERT_OK(iterator->status());
1996
100k
        ++count;
1997
100k
      }
1998
6
      ASSERT_OK(iterator->status());
1999
6
      ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2));
2000
6
    }
2001
2002
2
    Close();
2003
2
    Destroy();
2004
2
  }
2005
1
}
2006
2007
1
TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
2008
1
  db_options_.create_missing_column_families = true;
2009
1
  Open({"default", "one"});
2010
1
  ColumnFamilyOptions options;
2011
1
  options.level0_file_num_compaction_trigger = 100;
2012
1
  options.level0_slowdown_writes_trigger = 200;
2013
1
  options.level0_stop_writes_trigger = 200;
2014
1
  options.max_write_buffer_number = 20;
2015
1
  options.write_buffer_size = 100000;  // small write buffer size
2016
1
  Reopen({options, options});
2017
2018
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
2019
1
      {{"VersionSet::LogAndApply::ColumnFamilyDrop:0",
2020
1
        "FlushJob::WriteLevel0Table"},
2021
1
       {"VersionSet::LogAndApply::ColumnFamilyDrop:1",
2022
1
        "FlushJob::InstallResults"},
2023
1
       {"FlushJob::InstallResults",
2024
1
        "VersionSet::LogAndApply::ColumnFamilyDrop:2"}});
2025
2026
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2027
1
  test::SleepingBackgroundTask sleeping_task;
2028
2029
1
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2030
1
                 Env::Priority::HIGH);
2031
2032
  // 1MB should create ~10 files for each CF
2033
1
  int kKeysNum = 10000;
2034
1
  PutRandomData(1, kKeysNum, 100);
2035
2036
1
  std::vector<std::thread> threads;
2037
1
  threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
2038
2039
1
  sleeping_task.WakeUp();
2040
1
  sleeping_task.WaitUntilDone();
2041
1
  sleeping_task.Reset();
2042
  // now we sleep again. this is just so we're certain that flush job finished
2043
1
  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2044
1
                 Env::Priority::HIGH);
2045
1
  sleeping_task.WakeUp();
2046
1
  sleeping_task.WaitUntilDone();
2047
2048
1
  {
2049
    // Since we didn't delete CF handle, RocksDB's contract guarantees that
2050
    // we're still able to read dropped CF
2051
1
    std::unique_ptr<Iterator> iterator(
2052
1
        db_->NewIterator(ReadOptions(), handles_[1]));
2053
1
    int count = 0;
2054
10.0k
    for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2055
10.0k
      ASSERT_OK(iterator->status());
2056
10.0k
      ++count;
2057
10.0k
    }
2058
1
    ASSERT_OK(iterator->status());
2059
1
    ASSERT_EQ(count, kKeysNum);
2060
1
  }
2061
1
  for (auto& t : threads) {
2062
1
    t.join();
2063
1
  }
2064
2065
1
  Close();
2066
1
  Destroy();
2067
1
}
2068
2069
#ifndef ROCKSDB_LITE
2070
// skipped as persisting options is not supported in ROCKSDB_LITE
2071
namespace {
2072
std::atomic<int> test_stage(0);
2073
const int kMainThreadStartPersistingOptionsFile = 1;
2074
const int kChildThreadFinishDroppingColumnFamily = 2;
2075
const int kChildThreadWaitingMainThreadPersistOptions = 3;
2076
void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
2077
1
                            std::vector<Comparator*>* comparators) {
2078
2
  while (test_stage < kMainThreadStartPersistingOptionsFile) {
2079
1
    Env::Default()->SleepForMicroseconds(100);
2080
1
  }
2081
1
  cf_test->DropColumnFamilies({cf_id});
2082
1
  if ((*comparators)[cf_id]) {
2083
1
    delete (*comparators)[cf_id];
2084
1
    (*comparators)[cf_id] = nullptr;
2085
1
  }
2086
1
  test_stage = kChildThreadFinishDroppingColumnFamily;
2087
1
}
2088
}  // namespace
2089
2090
1
TEST_F(ColumnFamilyTest, CreateAndDropRace) {
2091
1
  const int kCfCount = 5;
2092
1
  std::vector<ColumnFamilyOptions> cf_opts;
2093
1
  std::vector<Comparator*> comparators;
2094
6
  for (int i = 0; i < kCfCount; ++i) {
2095
5
    cf_opts.emplace_back();
2096
5
    comparators.push_back(new test::SimpleSuffixReverseComparator());
2097
5
    cf_opts.back().comparator = comparators.back();
2098
5
  }
2099
1
  db_options_.create_if_missing = true;
2100
1
  db_options_.create_missing_column_families = true;
2101
2102
1
  auto main_thread_id = std::this_thread::get_id();
2103
2104
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start",
2105
2
                                                 [&](void* arg) {
2106
2
    auto current_thread_id = std::this_thread::get_id();
2107
    // If it's the main thread hitting this sync-point, then it
2108
    // will be blocked until some other thread update the test_stage.
2109
2
    if (main_thread_id == current_thread_id) {
2110
1
      test_stage = kMainThreadStartPersistingOptionsFile;
2111
2
      while (test_stage < kChildThreadFinishDroppingColumnFamily) {
2112
1
        Env::Default()->SleepForMicroseconds(100);
2113
1
      }
2114
1
    }
2115
2
  });
2116
2117
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
2118
1
      "WriteThread::EnterUnbatched:Wait", [&](void* arg) {
2119
        // This means a thread doing DropColumnFamily() is waiting for
2120
        // other thread to finish persisting options.
2121
        // In such case, we update the test_stage to unblock the main thread.
2122
1
        test_stage = kChildThreadWaitingMainThreadPersistOptions;
2123
2124
        // Note that based on the test setting, this must not be the
2125
        // main thread.
2126
1
        ASSERT_NE(main_thread_id, std::this_thread::get_id());
2127
1
      });
2128
2129
  // Create a database with four column families
2130
1
  Open({"default", "one", "two", "three"},
2131
1
       {cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});
2132
2133
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2134
2135
  // Start a thread that will drop the first column family
2136
  // and its comparator
2137
1
  std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators);
2138
2139
1
  DropColumnFamilies({2});
2140
2141
1
  drop_cf_thread.join();
2142
1
  Close();
2143
1
  Destroy();
2144
5
  for (auto* comparator : comparators) {
2145
5
    if (comparator) {
2146
4
      delete comparator;
2147
4
    }
2148
5
  }
2149
1
}
2150
#endif  // !ROCKSDB_LITE
2151
2152
1
TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
2153
1
  const uint64_t kBaseRate = 810000u;
2154
1
  db_options_.delayed_write_rate = kBaseRate;
2155
1
  db_options_.base_background_compactions = 2;
2156
1
  db_options_.max_background_compactions = 6;
2157
2158
1
  Open({"default"});
2159
1
  ColumnFamilyData* cfd =
2160
1
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2161
2162
1
  VersionStorageInfo* vstorage = cfd->current()->storage_info();
2163
2164
1
  MutableCFOptions mutable_cf_options(
2165
1
      Options(db_options_, column_family_options_),
2166
1
      ImmutableCFOptions(Options(db_options_, column_family_options_)));
2167
2168
1
  mutable_cf_options.level0_slowdown_writes_trigger = 20;
2169
1
  mutable_cf_options.level0_stop_writes_trigger = 10000;
2170
1
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2171
1
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2172
2173
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2174
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2175
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2176
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2177
2178
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(201);
2179
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2180
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2181
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2182
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2183
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2184
2185
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(400);
2186
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2187
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2188
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2189
1
  ASSERT_EQ(kBaseRate / 1.2,
2190
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2191
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2192
2193
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2194
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2195
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2196
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2197
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2,
2198
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2199
2200
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(450);
2201
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2202
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2203
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2204
1
  ASSERT_EQ(kBaseRate / 1.2,
2205
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2206
2207
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(205);
2208
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2209
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2210
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2211
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2212
2213
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(202);
2214
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2215
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2216
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2217
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2218
2219
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(201);
2220
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2221
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2222
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2223
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2224
2225
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(198);
2226
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2227
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2228
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2229
2230
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(399);
2231
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2232
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2233
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2234
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2235
2236
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(599);
2237
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2238
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2239
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2240
1
  ASSERT_EQ(kBaseRate / 1.2,
2241
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2242
2243
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(2001);
2244
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2245
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
2246
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2247
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2248
2249
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
2250
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2251
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
2252
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2253
2254
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(390);
2255
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2256
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2257
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2258
1
  ASSERT_EQ(kBaseRate / 1.2,
2259
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2260
2261
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(100);
2262
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2263
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2264
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2265
2266
1
  vstorage->set_l0_delay_trigger_count(100);
2267
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2268
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2269
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2270
1
  ASSERT_EQ(kBaseRate / 1.2,
2271
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2272
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2273
2274
1
  vstorage->set_l0_delay_trigger_count(101);
2275
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2276
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2277
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2278
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2,
2279
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2280
2281
1
  vstorage->set_l0_delay_trigger_count(0);
2282
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2283
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2284
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2285
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2286
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2,
2287
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2288
2289
1
  vstorage->set_l0_delay_trigger_count(101);
2290
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2291
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2292
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2293
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2 / 1.2,
2294
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2295
2296
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(200);
2297
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2298
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2299
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2300
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2,
2301
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2302
2303
1
  vstorage->set_l0_delay_trigger_count(0);
2304
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(0);
2305
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2306
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2307
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2308
2309
1
  mutable_cf_options.disable_auto_compactions = true;
2310
1
  dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate);
2311
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2312
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2313
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2314
2315
1
  vstorage->set_l0_delay_trigger_count(50);
2316
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2317
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2318
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2319
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2320
2321
1
  vstorage->set_l0_delay_trigger_count(60);
2322
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2323
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2324
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2325
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2326
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2327
2328
1
  vstorage->set_l0_delay_trigger_count(70);
2329
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2330
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2331
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2332
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2333
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2334
2335
1
  mutable_cf_options.disable_auto_compactions = false;
2336
1
  vstorage->set_l0_delay_trigger_count(71);
2337
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(501);
2338
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2339
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2340
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2341
1
  ASSERT_EQ(kBaseRate / 1.2,
2342
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2343
1
}
2344
2345
1
TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
2346
1
  db_options_.base_background_compactions = 2;
2347
1
  db_options_.max_background_compactions = 6;
2348
1
  Open({"default"});
2349
1
  ColumnFamilyData* cfd =
2350
1
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2351
2352
1
  VersionStorageInfo* vstorage = cfd->current()->storage_info();
2353
2354
1
  MutableCFOptions mutable_cf_options(
2355
1
      Options(db_options_, column_family_options_),
2356
1
      ImmutableCFOptions(Options(db_options_, column_family_options_)));
2357
2358
  // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
2359
1
  mutable_cf_options.level0_file_num_compaction_trigger = 4;
2360
1
  mutable_cf_options.level0_slowdown_writes_trigger = 36;
2361
1
  mutable_cf_options.level0_stop_writes_trigger = 50;
2362
  // Speedup threshold = 200 / 4 = 50
2363
1
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2364
1
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2365
2366
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(40);
2367
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2368
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2369
2370
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2371
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2372
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2373
2374
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2375
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2376
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2377
2378
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(45);
2379
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2380
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2381
2382
1
  vstorage->set_l0_delay_trigger_count(7);
2383
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2384
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2385
2386
1
  vstorage->set_l0_delay_trigger_count(9);
2387
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2388
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2389
2390
1
  vstorage->set_l0_delay_trigger_count(6);
2391
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2392
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2393
2394
  // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
2395
1
  mutable_cf_options.level0_file_num_compaction_trigger = 4;
2396
1
  mutable_cf_options.level0_slowdown_writes_trigger = 16;
2397
1
  mutable_cf_options.level0_stop_writes_trigger = 30;
2398
2399
1
  vstorage->set_l0_delay_trigger_count(5);
2400
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2401
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2402
2403
1
  vstorage->set_l0_delay_trigger_count(7);
2404
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2405
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2406
2407
1
  vstorage->set_l0_delay_trigger_count(3);
2408
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2409
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2410
1
}
2411
2412
1
TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
2413
1
  const uint64_t kBaseRate = 810000u;
2414
1
  db_options_.delayed_write_rate = kBaseRate;
2415
1
  Open();
2416
1
  CreateColumnFamilies({"one"});
2417
1
  ColumnFamilyData* cfd =
2418
1
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2419
1
  VersionStorageInfo* vstorage = cfd->current()->storage_info();
2420
2421
1
  ColumnFamilyData* cfd1 =
2422
1
      static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2423
1
  VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
2424
2425
1
  MutableCFOptions mutable_cf_options(
2426
1
      Options(db_options_, column_family_options_),
2427
1
      ImmutableCFOptions(Options(db_options_, column_family_options_)));
2428
1
  mutable_cf_options.level0_slowdown_writes_trigger = 20;
2429
1
  mutable_cf_options.level0_stop_writes_trigger = 10000;
2430
1
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2431
1
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2432
2433
1
  MutableCFOptions mutable_cf_options1 = mutable_cf_options;
2434
1
  mutable_cf_options1.soft_pending_compaction_bytes_limit = 500;
2435
2436
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2437
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2438
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2439
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2440
2441
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(201);
2442
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2443
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2444
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2445
2446
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
2447
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2448
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2449
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2450
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2451
2452
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(70);
2453
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2454
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2455
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2456
1
  ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2457
2458
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(800);
2459
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2460
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2461
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2462
1
  ASSERT_EQ(kBaseRate / 1.2,
2463
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2464
2465
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2466
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2467
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2468
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2469
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2,
2470
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2471
2472
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(700);
2473
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2474
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2475
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2476
1
  ASSERT_EQ(kBaseRate / 1.2,
2477
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2478
2479
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2480
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2481
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2482
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2483
1
  ASSERT_EQ(kBaseRate / 1.2 / 1.2,
2484
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2485
2486
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
2487
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2488
1
  ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
2489
1
  ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2490
1
  ASSERT_EQ(kBaseRate / 1.2,
2491
1
            dbfull()->TEST_write_controler().delayed_write_rate());
2492
1
}
2493
2494
1
TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
2495
1
  db_options_.base_background_compactions = 2;
2496
1
  db_options_.max_background_compactions = 6;
2497
1
  column_family_options_.soft_pending_compaction_bytes_limit = 200;
2498
1
  column_family_options_.hard_pending_compaction_bytes_limit = 2000;
2499
1
  Open();
2500
1
  CreateColumnFamilies({"one"});
2501
1
  ColumnFamilyData* cfd =
2502
1
      static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2503
1
  VersionStorageInfo* vstorage = cfd->current()->storage_info();
2504
2505
1
  ColumnFamilyData* cfd1 =
2506
1
      static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2507
1
  VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
2508
2509
1
  MutableCFOptions mutable_cf_options(
2510
1
      Options(db_options_, column_family_options_),
2511
1
      ImmutableCFOptions(Options(db_options_, column_family_options_)));
2512
  // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
2513
1
  mutable_cf_options.level0_file_num_compaction_trigger = 4;
2514
1
  mutable_cf_options.level0_slowdown_writes_trigger = 36;
2515
1
  mutable_cf_options.level0_stop_writes_trigger = 30;
2516
  // Speedup threshold = 200 / 4 = 50
2517
1
  mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2518
1
  mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2519
2520
1
  MutableCFOptions mutable_cf_options1 = mutable_cf_options;
2521
1
  mutable_cf_options1.level0_slowdown_writes_trigger = 16;
2522
2523
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(40);
2524
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2525
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2526
2527
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(60);
2528
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2529
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2530
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2531
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2532
2533
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(30);
2534
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2535
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2536
2537
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(70);
2538
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2539
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2540
2541
1
  vstorage->TEST_set_estimated_compaction_needed_bytes(20);
2542
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2543
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2544
2545
1
  vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
2546
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2547
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2548
2549
1
  vstorage->set_l0_delay_trigger_count(9);
2550
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2551
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2552
2553
1
  vstorage1->set_l0_delay_trigger_count(2);
2554
1
  cfd1->RecalculateWriteStallConditions(mutable_cf_options);
2555
1
  ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
2556
2557
1
  vstorage->set_l0_delay_trigger_count(0);
2558
1
  cfd->RecalculateWriteStallConditions(mutable_cf_options);
2559
1
  ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
2560
1
}
2561
2562
1
TEST_F(ColumnFamilyTest, LogSyncConflictFlush) {
2563
1
  Open();
2564
1
  CreateColumnFamiliesAndReopen({"one", "two"});
2565
2566
1
  ASSERT_OK(Put(0, "", ""));
2567
1
  ASSERT_OK(Put(1, "foo", "bar"));
2568
2569
1
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
2570
1
      {{"DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
2571
1
        "ColumnFamilyTest::LogSyncConflictFlush:1"},
2572
1
       {"ColumnFamilyTest::LogSyncConflictFlush:2",
2573
1
        "DBImpl::SyncWAL:BeforeMarkLogsSynced:2"}});
2574
2575
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2576
2577
1
  std::thread thread([&] { ASSERT_OK(db_->SyncWAL()); });
2578
2579
1
  TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1");
2580
1
  ASSERT_OK(Flush(1));
2581
1
  ASSERT_OK(Put(1, "foo", "bar"));
2582
1
  ASSERT_OK(Flush(1));
2583
2584
1
  TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:2");
2585
2586
1
  thread.join();
2587
2588
1
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2589
1
  Close();
2590
1
}
2591
2592
1
TEST_F(ColumnFamilyTest, GetColumnFamiliesOptions) {
2593
1
  column_family_options_.arena_block_size = 4096;
2594
2595
  // Source options
2596
1
  ColumnFamilyOptions src_d = column_family_options_;
2597
1
  ColumnFamilyOptions src_1;
2598
1
  ColumnFamilyOptions src_2;
2599
1
  ColumnFamilyOptions src_3;
2600
1
  src_1.arena_block_size = 2 * column_family_options_.arena_block_size;
2601
1
  src_2.arena_block_size = 3 * column_family_options_.arena_block_size;
2602
1
  src_3.arena_block_size = 4 * column_family_options_.arena_block_size;
2603
2604
1
  using OptionsRef  = std::reference_wrapper<ColumnFamilyOptions>;
2605
1
  using Descriptors = std::map<std::string, OptionsRef>;
2606
2607
9
  auto compare_with = [this](Descriptors src) {
2608
9
    std::vector<std::string> cf_names;
2609
9
    std::vector<ColumnFamilyOptions> cf_options;
2610
9
    db_->GetColumnFamiliesOptions(&cf_names, &cf_options);
2611
2612
    // Comapre sizes
2613
9
    ASSERT_EQ(cf_names.size(), cf_options.size());
2614
9
    ASSERT_EQ(cf_names.size(), src.size());
2615
2616
    // Keep sorted order for the case descriptors are stored in an unpredictable way
2617
9
    Descriptors dst;
2618
30
    for (size_t i = 0; i < cf_names.size(); ++i) {
2619
21
      dst.insert(std::make_pair(cf_names[i], std::ref(cf_options[i])));
2620
21
    }
2621
2622
    // Compare options
2623
9
    for (auto it1 = src.begin(), it2 = dst.begin();
2624
30
         it1 != src.end() && it2 != dst.end(); ++it1, ++it2) {
2625
21
      ASSERT_EQ(it1->first, it2->first);
2626
21
      ASSERT_OK(RocksDBOptionsParser::VerifyCFOptions(it1->second, it2->second));
2627
21
    }
2628
9
  };
2629
2630
1
  Open();
2631
1
  compare_with({{"default", std::ref(src_d)}});
2632
2633
1
  CreateColumnFamilies({"1", "2"}, {src_1, src_2});
2634
1
  compare_with({{"default", std::ref(src_d)}, {"1", std::ref(src_1)}, {"2", std::ref(src_2)}});
2635
2636
1
  DropColumnFamilies({1});
2637
1
  compare_with({{"default", std::ref(src_d)}, {"2", std::ref(src_2)}});
2638
2639
1
  CreateColumnFamilies({"3"}, {src_3});
2640
1
  compare_with({{"default", std::ref(src_d)}, {"3", std::ref(src_3)}, {"2", std::ref(src_2)}});
2641
1
  Close();
2642
2643
1
  src_3.arena_block_size += 1024;
2644
1
  Open({"3", "2", "default"}, {src_3, src_2, src_d});
2645
1
  compare_with({{"default", std::ref(src_d)}, {"2", std::ref(src_2)}, {"3", std::ref(src_3)}});
2646
2647
1
  DropColumnFamilies({0});
2648
1
  compare_with({{"default", std::ref(src_d)}, {"2", std::ref(src_2)}});
2649
1
  Close();
2650
2651
1
  db_options_.create_missing_column_families = true;
2652
1
  src_3.arena_block_size -= 1024;
2653
1
  Open({"1", "default", "2", "3"}, {src_1, src_d, src_2, src_3});
2654
1
  compare_with({{"default", std::ref(src_d)}, {"2", std::ref(src_2)},
2655
1
                {"3", std::ref(src_3)}, {"1", std::ref(src_1)}});
2656
2657
1
  DropColumnFamilies({0, 2});
2658
1
  compare_with({{"3", std::ref(src_3)}, {"default", std::ref(src_d)}});
2659
2660
1
  DropColumnFamilies({3});
2661
1
  compare_with({{"default", std::ref(src_d)}});
2662
1
  Close();
2663
1
}
2664
2665
}  // namespace rocksdb
2666
2667
13.2k
int main(int argc, char** argv) {
2668
13.2k
  ::testing::InitGoogleTest(&argc, argv);
2669
13.2k
  return RUN_ALL_TESTS();
2670
13.2k
}