YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_compaction_filter_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/db_test_util.h"
25
#include "yb/rocksdb/port/stack_trace.h"
26
27
#include "yb/util/stopwatch.h"
28
#include "yb/util/tsan_util.h"
29
30
namespace rocksdb {
31
32
static int cfilter_count = 0;
33
34
// This is a static filter used for filtering
35
// kvs during the compaction process.
36
static std::string NEW_VALUE = "NewValue";
37
38
class DBTestCompactionFilter : public DBTestBase {
39
 public:
40
8
  DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {}
41
};
42
43
class KeepFilter : public CompactionFilter {
44
 public:
45
  FilterDecision Filter(int level, const Slice& key, const Slice& value,
46
                        std::string* new_value, bool* value_changed)
47
401k
      override {
48
401k
    cfilter_count++;
49
401k
    return FilterDecision::kKeep;
50
401k
  }
51
52
0
  const char* Name() const override { return "KeepFilter"; }
53
};
54
55
class DeleteFilter : public CompactionFilter {
56
 public:
57
  FilterDecision Filter(int level, const Slice& key, const Slice& value,
58
100k
                      std::string* new_value, bool* value_changed) override {
59
100k
    cfilter_count++;
60
100k
    return FilterDecision::kDiscard;
61
100k
  }
62
63
0
  const char* Name() const override { return "DeleteFilter"; }
64
};
65
66
class DeleteISFilter : public CompactionFilter {
67
 public:
68
  FilterDecision Filter(int level, const Slice& key, const Slice& value,
69
                        std::string* new_value,
70
40
                        bool* value_changed) override {
71
40
    cfilter_count++;
72
40
    int i = std::stoi(key.ToString());
73
40
    if (i > 5 && i <= 105) {
74
10
      return FilterDecision::kDiscard;
75
10
    }
76
30
    return FilterDecision::kKeep;
77
30
  }
78
79
1
  bool IgnoreSnapshots() const override { return true; }
80
81
0
  const char* Name() const override { return "DeleteFilter"; }
82
};
83
84
class DelayFilter : public CompactionFilter {
85
 public:
86
0
  explicit DelayFilter(DBTestBase* d) : db_test(d) {}
87
  FilterDecision Filter(int level, const Slice& key, const Slice& value,
88
                        std::string* new_value,
89
0
                        bool* value_changed) override {
90
0
    db_test->env_->addon_time_.fetch_add(1000);
91
0
    return FilterDecision::kDiscard;
92
0
  }
93
94
0
  const char* Name() const override { return "DelayFilter"; }
95
96
 private:
97
  DBTestBase* db_test;
98
};
99
100
class ConditionalFilter : public CompactionFilter {
101
 public:
102
  explicit ConditionalFilter(const std::string* filtered_value)
103
9
      : filtered_value_(filtered_value) {}
104
  FilterDecision Filter(int level, const Slice& key, const Slice& value,
105
9
                        std::string* new_value, bool* value_changed) override {
106
7
    return value.ToBuffer() == *filtered_value_ ? FilterDecision::kDiscard : FilterDecision::kKeep;
107
9
  }
108
109
0
  const char* Name() const override { return "ConditionalFilter"; }
110
111
 private:
112
  const std::string* filtered_value_;
113
};
114
115
class ChangeFilter : public CompactionFilter {
116
 public:
117
14
  ChangeFilter() {}
118
119
  FilterDecision Filter(int level, const Slice& key, const Slice& value,
120
1.40M
                        std::string* new_value, bool* value_changed) override {
121
1.40M
    assert(new_value != nullptr);
122
1.40M
    *new_value = NEW_VALUE;
123
1.40M
    *value_changed = true;
124
1.40M
    return FilterDecision::kKeep;
125
1.40M
  }
126
127
0
  const char* Name() const override { return "ChangeFilter"; }
128
};
129
130
class KeepFilterFactory : public CompactionFilterFactory {
131
 public:
132
  explicit KeepFilterFactory(bool check_context = false,
133
                             bool check_context_cf_id = false)
134
      : check_context_(check_context),
135
        check_context_cf_id_(check_context_cf_id),
136
3
        compaction_filter_created_(false) {}
137
138
  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
139
6
      const CompactionFilter::Context& context) override {
140
6
    if (check_context_) {
141
1
      EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
142
1
      EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
143
1
    }
144
6
    if (check_context_cf_id_) {
145
2
      EXPECT_EQ(expect_cf_id_.load(), context.column_family_id);
146
2
    }
147
6
    compaction_filter_created_ = true;
148
6
    return std::unique_ptr<CompactionFilter>(new KeepFilter());
149
6
  }
150
151
2
  bool compaction_filter_created() const { return compaction_filter_created_; }
152
153
21
  const char* Name() const override { return "KeepFilterFactory"; }
154
  bool check_context_;
155
  bool check_context_cf_id_;
156
  std::atomic_bool expect_full_compaction_;
157
  std::atomic_bool expect_manual_compaction_;
158
  std::atomic<uint32_t> expect_cf_id_;
159
  bool compaction_filter_created_;
160
};
161
162
class DeleteFilterFactory : public CompactionFilterFactory {
163
 public:
164
  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
165
4
      const CompactionFilter::Context& context) override {
166
4
    if (context.is_manual_compaction) {
167
4
      return std::unique_ptr<CompactionFilter>(new DeleteFilter());
168
0
    } else {
169
0
      return std::unique_ptr<CompactionFilter>(nullptr);
170
0
    }
171
4
  }
172
173
23
  const char* Name() const override { return "DeleteFilterFactory"; }
174
};
175
176
// Delete Filter Factory which ignores snapshots
177
class DeleteISFilterFactory : public CompactionFilterFactory {
178
 public:
179
  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
180
1
      const CompactionFilter::Context& context) override {
181
1
    if (context.is_manual_compaction) {
182
1
      return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
183
0
    } else {
184
0
      return std::unique_ptr<CompactionFilter>(nullptr);
185
0
    }
186
1
  }
187
188
3
  const char* Name() const override { return "DeleteFilterFactory"; }
189
};
190
191
class DelayFilterFactory : public CompactionFilterFactory {
192
 public:
193
0
  explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
194
  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
195
0
      const CompactionFilter::Context& context) override {
196
0
    return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
197
0
  }
198
199
0
  const char* Name() const override { return "DelayFilterFactory"; }
200
201
 private:
202
  DBTestBase* db_test;
203
};
204
205
class ConditionalFilterFactory : public CompactionFilterFactory {
206
 public:
207
  explicit ConditionalFilterFactory(const Slice& filtered_value)
208
1
      : filtered_value_(filtered_value.ToString()) {}
209
210
  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
211
9
      const CompactionFilter::Context& context) override {
212
9
    return std::unique_ptr<CompactionFilter>(
213
9
        new ConditionalFilter(&filtered_value_));
214
9
  }
215
216
3
  const char* Name() const override {
217
3
    return "ConditionalFilterFactory";
218
3
  }
219
220
 private:
221
  std::string filtered_value_;
222
};
223
224
class ChangeFilterFactory : public CompactionFilterFactory {
225
 public:
226
5
  ChangeFilterFactory() {}
227
228
  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
229
14
      const CompactionFilter::Context& context) override {
230
14
    return std::unique_ptr<CompactionFilter>(new ChangeFilter());
231
14
  }
232
233
45
  const char* Name() const override { return "ChangeFilterFactory"; }
234
};
235
236
#ifndef ROCKSDB_LITE
237
1
TEST_F(DBTestCompactionFilter, CompactionFilter) {
238
1
  Options options = CurrentOptions();
239
1
  options.max_open_files = -1;
240
1
  options.num_levels = 3;
241
1
  options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
242
1
  options = CurrentOptions(options);
243
1
  CreateAndReopenWithCF({"pikachu"}, options);
244
245
  // Write 100K keys, these are written to a few files in L0.
246
1
  const std::string value(10, 'x');
247
100k
  for (int i = 0; i < 100000; i++) {
248
100k
    char key[100];
249
100k
    snprintf(key, sizeof(key), "B%010d", i);
250
100k
    ASSERT_OK(Put(1, key, value));
251
100k
  }
252
1
  ASSERT_OK(Flush(1));
253
254
  // Push all files to the highest level L2. Verify that
255
  // the compaction is each level invokes the filter for
256
  // all the keys in that level.
257
1
  cfilter_count = 0;
258
1
  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
259
1
  ASSERT_EQ(cfilter_count, 100000);
260
1
  cfilter_count = 0;
261
1
  ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
262
1
  ASSERT_EQ(cfilter_count, 100000);
263
264
1
  ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
265
1
  ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
266
1
  ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
267
1
  cfilter_count = 0;
268
269
  // All the files are in the lowest level.
270
  // Verify that all but the 100001st record
271
  // has sequence number zero. The 100001st record
272
  // is at the tip of this snapshot and cannot
273
  // be zeroed out.
274
1
  int count = 0;
275
1
  int total = 0;
276
1
  Arena arena;
277
1
  {
278
1
    ScopedArenaIterator iter(
279
1
        dbfull()->NewInternalIterator(&arena, handles_[1]));
280
1
    iter->SeekToFirst();
281
1
    ASSERT_OK(iter->status());
282
100k
    while (iter->Valid()) {
283
100k
      ParsedInternalKey ikey(Slice(), 0, kTypeValue);
284
100k
      ikey.sequence = -1;
285
100k
      ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
286
100k
      total++;
287
100k
      if (ikey.sequence != 0) {
288
1
        count++;
289
1
      }
290
100k
      iter->Next();
291
100k
    }
292
1
  }
293
1
  ASSERT_EQ(total, 100000);
294
1
  ASSERT_EQ(count, 1);
295
296
  // overwrite all the 100K keys once again.
297
100k
  for (int i = 0; i < 100000; i++) {
298
100k
    char key[100];
299
100k
    snprintf(key, sizeof(key), "B%010d", i);
300
100k
    ASSERT_OK(Put(1, key, value));
301
100k
  }
302
1
  ASSERT_OK(Flush(1));
303
304
  // push all files to the highest level L2. This
305
  // means that all keys should pass at least once
306
  // via the compaction filter
307
1
  cfilter_count = 0;
308
1
  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
309
1
  ASSERT_EQ(cfilter_count, 100000);
310
1
  cfilter_count = 0;
311
1
  ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
312
1
  ASSERT_EQ(cfilter_count, 100000);
313
1
  ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
314
1
  ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
315
1
  ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
316
317
  // create a new database with the compaction
318
  // filter in such a way that it deletes all keys
319
1
  options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
320
1
  options.create_if_missing = true;
321
1
  DestroyAndReopen(options);
322
1
  CreateAndReopenWithCF({"pikachu"}, options);
323
324
  // write all the keys once again.
325
100k
  for (int i = 0; i < 100000; i++) {
326
100k
    char key[100];
327
100k
    snprintf(key, sizeof(key), "B%010d", i);
328
100k
    ASSERT_OK(Put(1, key, value));
329
100k
  }
330
1
  ASSERT_OK(Flush(1));
331
1
  ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
332
1
  ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
333
1
  ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
334
335
  // Push all files to the highest level L2. This
336
  // triggers the compaction filter to delete all keys,
337
  // verify that at the end of the compaction process,
338
  // nothing is left.
339
1
  cfilter_count = 0;
340
1
  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
341
1
  ASSERT_EQ(cfilter_count, 100000);
342
1
  cfilter_count = 0;
343
1
  ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
344
1
  ASSERT_EQ(cfilter_count, 0);
345
1
  ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
346
1
  ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
347
348
1
  {
349
    // Scan the entire database to ensure that nothing is left
350
1
    std::unique_ptr<Iterator> iter(
351
1
        db_->NewIterator(ReadOptions(), handles_[1]));
352
1
    iter->SeekToFirst();
353
1
    count = 0;
354
1
    while (iter->Valid()) {
355
0
      count++;
356
0
      iter->Next();
357
0
    }
358
1
    ASSERT_EQ(count, 0);
359
1
  }
360
361
  // The sequence number of the remaining record
362
  // is not zeroed out even though it is at the
363
  // level Lmax because this record is at the tip
364
1
  count = 0;
365
1
  {
366
1
    ScopedArenaIterator iter(
367
1
        dbfull()->NewInternalIterator(&arena, handles_[1]));
368
1
    iter->SeekToFirst();
369
1
    ASSERT_OK(iter->status());
370
1
    while (iter->Valid()) {
371
0
      ParsedInternalKey ikey(Slice(), 0, kTypeValue);
372
0
      ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
373
0
      ASSERT_NE(ikey.sequence, (unsigned)0);
374
0
      count++;
375
0
      iter->Next();
376
0
    }
377
1
    ASSERT_EQ(count, 0);
378
1
  }
379
1
}
380
381
// Tests the edge case where compaction does not produce any output -- all
382
// entries are deleted. The compaction should create bunch of 'DeleteFile'
383
// entries in VersionEdit, but none of the 'AddFile's.
384
1
TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
385
1
  Options options;
386
1
  options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
387
1
  options.disable_auto_compactions = true;
388
1
  options.create_if_missing = true;
389
1
  options = CurrentOptions(options);
390
1
  DestroyAndReopen(options);
391
392
  // put some data
393
5
  for (int table = 0; table < 4; ++table) {
394
50
    for (int i = 0; i < 10 + table; ++i) {
395
46
      ASSERT_OK(Put(ToString(table * 100 + i), "val"));
396
46
    }
397
4
    ASSERT_OK(Flush());
398
4
  }
399
400
  // this will produce empty file (delete compaction filter)
401
1
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
402
1
  ASSERT_EQ(0U, CountLiveFiles());
403
404
1
  Reopen(options);
405
406
1
  Iterator* itr = db_->NewIterator(ReadOptions());
407
1
  itr->SeekToFirst();
408
  // empty db
409
1
  ASSERT_TRUE(!itr->Valid());
410
411
1
  delete itr;
412
1
}
413
#endif  // ROCKSDB_LITE
414
415
1
TEST_F(DBTestCompactionFilter, CompactionFilterWithValueChange) {
416
5
  do {
417
5
    Options options;
418
5
    options.num_levels = 3;
419
5
    options.compaction_filter_factory =
420
5
      std::make_shared<ChangeFilterFactory>();
421
5
    options = CurrentOptions(options);
422
5
    CreateAndReopenWithCF({"pikachu"}, options);
423
424
    // Lower number of runs for tsan due to low perf.
425
5
    constexpr int kNumKeys = yb::NonTsanVsTsan(100001, 10001);
426
427
    // Write 'kNumKeys' keys, these are written to a few files
428
    // in L0. We do this so that the current snapshot points
429
    // to the last key that we write. The compaction filter is  not invoked
430
    // on keys that are visible via a snapshot because we
431
    // anyways cannot delete it.
432
5
    const std::string value(10, 'x');
433
5
    LOG_TIMING(INFO, "Writing Keys") {
434
500k
      for (int i = 0; i < kNumKeys; i++) {
435
500k
        char key[100];
436
500k
        snprintf(key, sizeof(key), "B%010d", i);
437
500k
        ASSERT_OK(Put(1, key, value));
438
500k
      }
439
5
    }
440
441
5
    LOG_TIMING(INFO, "Pushing files to lower levels") {
442
      // push all files to  lower levels
443
5
      ASSERT_OK(Flush(1));
444
5
      if (option_config_ != kUniversalCompactionMultiLevel &&
445
4
          option_config_ != kUniversalSubcompactions) {
446
3
        ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
447
3
        ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
448
2
      } else {
449
2
        ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
450
2
      }
451
5
    }
452
453
    // re-write all data again
454
5
    LOG_TIMING(INFO, "Rewriting data") {
455
500k
      for (int i = 0; i < kNumKeys; i++) {
456
500k
        char key[100];
457
500k
        snprintf(key, sizeof(key), "B%010d", i);
458
500k
        ASSERT_OK(Put(1, key, value));
459
500k
      }
460
5
    }
461
462
    // push all files to  lower levels. This should
463
    // invoke the compaction filter for all kNumKeys - 1 keys.
464
5
    LOG_TIMING(INFO, "Pushing files to lower levels after rewrite") {
465
5
      ASSERT_OK(Flush(1));
466
5
      if (option_config_ != kUniversalCompactionMultiLevel &&
467
4
          option_config_ != kUniversalSubcompactions) {
468
3
        ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
469
3
        ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
470
2
      } else {
471
2
        ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
472
2
      }
473
5
    }
474
475
    // verify that all keys now have the new value that
476
    // was set by the compaction process.
477
5
    LOG_TIMING(INFO, "Verify keys") {
478
500k
      for (int i = 0; i < kNumKeys; i++) {
479
500k
        char key[100];
480
500k
        snprintf(key, sizeof(key), "B%010d", i);
481
500k
        std::string newvalue = Get(1, key);
482
500k
        ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
483
500k
      }
484
5
    }
485
5
  } while (ChangeCompactOptions());
486
1
}
487
488
1
TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
489
1
  std::string one, two, three, four;
490
1
  PutFixed64(&one, 1);
491
1
  PutFixed64(&two, 2);
492
1
  PutFixed64(&three, 3);
493
1
  PutFixed64(&four, 4);
494
495
1
  Options options;
496
1
  options = CurrentOptions(options);
497
1
  options.create_if_missing = true;
498
1
  options.merge_operator = MergeOperators::CreateUInt64AddOperator();
499
1
  options.num_levels = 3;
500
  // Filter out keys with value is 2.
501
1
  options.compaction_filter_factory =
502
1
      std::make_shared<ConditionalFilterFactory>(two);
503
1
  DestroyAndReopen(options);
504
505
  // In the same compaction, a value type needs to be deleted based on
506
  // compaction filter, and there is a merge type for the key. compaction
507
  // filter result is ignored.
508
1
  ASSERT_OK(db_->Put(WriteOptions(), "foo", two));
509
1
  ASSERT_OK(Flush());
510
1
  ASSERT_OK(db_->Merge(WriteOptions(), "foo", one));
511
1
  ASSERT_OK(Flush());
512
1
  std::string newvalue = Get("foo");
513
1
  ASSERT_EQ(newvalue, three);
514
1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
515
1
  newvalue = Get("foo");
516
1
  ASSERT_EQ(newvalue, three);
517
518
  // value key can be deleted based on compaction filter, leaving only
519
  // merge keys.
520
1
  ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
521
1
  ASSERT_OK(Flush());
522
1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
523
1
  newvalue = Get("bar");
524
1
  ASSERT_EQ("NOT_FOUND", newvalue);
525
1
  ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
526
1
  ASSERT_OK(Flush());
527
1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
528
1
  newvalue = Get("bar");
529
1
  ASSERT_EQ(two, two);
530
531
  // Compaction filter never applies to merge keys.
532
1
  ASSERT_OK(db_->Put(WriteOptions(), "foobar", one));
533
1
  ASSERT_OK(Flush());
534
1
  ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two));
535
1
  ASSERT_OK(Flush());
536
1
  newvalue = Get("foobar");
537
1
  ASSERT_EQ(newvalue, three);
538
1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
539
1
  newvalue = Get("foobar");
540
1
  ASSERT_EQ(newvalue, three);
541
542
  // In the same compaction, both of value type and merge type keys need to be
543
  // deleted based on compaction filter, and there is a merge type for the key.
544
  // For both keys, compaction filter results are ignored.
545
1
  ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two));
546
1
  ASSERT_OK(Flush());
547
1
  ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two));
548
1
  ASSERT_OK(Flush());
549
1
  newvalue = Get("barfoo");
550
1
  ASSERT_EQ(newvalue, four);
551
1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
552
1
  newvalue = Get("barfoo");
553
1
  ASSERT_EQ(newvalue, four);
554
1
}
555
556
#ifndef ROCKSDB_LITE
557
1
TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
558
1
  KeepFilterFactory* filter = new KeepFilterFactory(true, true);
559
560
1
  Options options = CurrentOptions();
561
1
  options.compaction_style = kCompactionStyleUniversal;
562
1
  options.compaction_filter_factory.reset(filter);
563
1
  options.compression = kNoCompression;
564
1
  options.level0_file_num_compaction_trigger = 8;
565
1
  Reopen(options);
566
1
  int num_keys_per_file = 400;
567
4
  for (int j = 0; j < 3; j++) {
568
    // Write several keys.
569
3
    const std::string value(10, 'x');
570
703
    for (int i = 0; i < num_keys_per_file; i++) {
571
700
      char key[100];
572
700
      snprintf(key, sizeof(key), "B%08d%02d", i, j);
573
700
      ASSERT_OK(Put(key, value));
574
700
    }
575
3
    ASSERT_OK(dbfull()->TEST_FlushMemTable());
576
    // Make sure next file is much smaller so automatic compaction will not
577
    // be triggered.
578
3
    num_keys_per_file /= 2;
579
3
  }
580
1
  ASSERT_OK(dbfull()->TEST_WaitForCompact());
581
582
  // Force a manual compaction
583
1
  cfilter_count = 0;
584
1
  filter->expect_manual_compaction_.store(true);
585
1
  filter->expect_full_compaction_.store(true);
586
1
  filter->expect_cf_id_.store(0);
587
1
  ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
588
1
  ASSERT_EQ(cfilter_count, 700);
589
1
  ASSERT_EQ(NumSortedRuns(0), 1);
590
1
  ASSERT_TRUE(filter->compaction_filter_created());
591
592
  // Verify total number of keys is correct after manual compaction.
593
1
  {
594
1
    int count = 0;
595
1
    int total = 0;
596
1
    Arena arena;
597
1
    ScopedArenaIterator iter(dbfull()->NewInternalIterator(&arena));
598
1
    iter->SeekToFirst();
599
1
    ASSERT_OK(iter->status());
600
701
    while (iter->Valid()) {
601
700
      ParsedInternalKey ikey(Slice(), 0, kTypeValue);
602
700
      ikey.sequence = -1;
603
700
      ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
604
700
      total++;
605
700
      if (ikey.sequence != 0) {
606
2
        count++;
607
2
      }
608
700
      iter->Next();
609
700
    }
610
1
    ASSERT_EQ(total, 700);
611
1
    ASSERT_EQ(count, 2);
612
1
  }
613
1
}
614
#endif  // ROCKSDB_LITE
615
616
1
TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
617
1
  KeepFilterFactory* filter = new KeepFilterFactory(false, true);
618
1
  filter->expect_cf_id_.store(1);
619
620
1
  Options options = CurrentOptions();
621
1
  options.compaction_filter_factory.reset(filter);
622
1
  options.compression = kNoCompression;
623
1
  options.level0_file_num_compaction_trigger = 2;
624
1
  CreateAndReopenWithCF({"pikachu"}, options);
625
626
1
  int num_keys_per_file = 400;
627
4
  for (int j = 0; j < 3; j++) {
628
    // Write several keys.
629
3
    const std::string value(10, 'x');
630
703
    for (int i = 0; i < num_keys_per_file; i++) {
631
700
      char key[100];
632
700
      snprintf(key, sizeof(key), "B%08d%02d", i, j);
633
700
      ASSERT_OK(Put(1, key, value));
634
700
    }
635
3
    ASSERT_OK(Flush(1));
636
    // Make sure next file is much smaller so automatic compaction will not
637
    // be triggered.
638
3
    num_keys_per_file /= 2;
639
3
  }
640
1
  ASSERT_OK(dbfull()->TEST_WaitForCompact());
641
642
1
  ASSERT_TRUE(filter->compaction_filter_created());
643
1
}
644
645
#ifndef ROCKSDB_LITE
646
// Compaction filters should only be applied to records that are newer than the
647
// latest snapshot. This test inserts records and applies a delete filter.
648
1
TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) {
649
1
  Options options;
650
1
  options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
651
1
  options.disable_auto_compactions = true;
652
1
  options.create_if_missing = true;
653
1
  options = CurrentOptions(options);
654
1
  DestroyAndReopen(options);
655
656
  // Put some data.
657
1
  const Snapshot* snapshot = nullptr;
658
5
  for (int table = 0; table < 4; ++table) {
659
44
    for (int i = 0; i < 10; ++i) {
660
40
      ASSERT_OK(Put(ToString(table * 100 + i), "val"));
661
40
    }
662
4
    ASSERT_OK(Flush());
663
664
4
    if (table == 0) {
665
1
      snapshot = db_->GetSnapshot();
666
1
    }
667
4
  }
668
1
  assert(snapshot != nullptr);
669
670
1
  cfilter_count = 0;
671
1
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
672
  // The filter should delete 10 records.
673
1
  ASSERT_EQ(30U, cfilter_count);
674
675
  // Release the snapshot and compact again -> now all records should be
676
  // removed.
677
1
  db_->ReleaseSnapshot(snapshot);
678
1
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
679
1
  ASSERT_EQ(0U, CountLiveFiles());
680
1
}
681
682
// Compaction filters should only be applied to records that are newer than the
683
// latest snapshot. However, if the compaction filter asks to ignore snapshots
684
// records newer than the snapshot will also be processed
685
1
TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
686
1
  std::string five = ToString(5);
687
1
  Options options;
688
1
  options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
689
1
  options.disable_auto_compactions = true;
690
1
  options.create_if_missing = true;
691
1
  options = CurrentOptions(options);
692
1
  DestroyAndReopen(options);
693
694
  // Put some data.
695
1
  const Snapshot* snapshot = nullptr;
696
5
  for (int table = 0; table < 4; ++table) {
697
44
    for (int i = 0; i < 10; ++i) {
698
40
      ASSERT_OK(Put(ToString(table * 100 + i), "val"));
699
40
    }
700
4
    ASSERT_OK(Flush());
701
702
4
    if (table == 0) {
703
1
      snapshot = db_->GetSnapshot();
704
1
    }
705
4
  }
706
1
  assert(snapshot != nullptr);
707
708
1
  cfilter_count = 0;
709
1
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
710
  // The filter should delete 40 records.
711
1
  ASSERT_EQ(40U, cfilter_count);
712
713
1
  {
714
    // Scan the entire database as of the snapshot to ensure
715
    // that nothing is left
716
1
    ReadOptions read_options;
717
1
    read_options.snapshot = snapshot;
718
1
    std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
719
1
    iter->SeekToFirst();
720
1
    int count = 0;
721
7
    while (iter->Valid()) {
722
6
      count++;
723
6
      iter->Next();
724
6
    }
725
1
    ASSERT_EQ(count, 6);
726
1
    read_options.snapshot = 0;
727
1
    std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
728
1
    iter1->SeekToFirst();
729
1
    count = 0;
730
31
    while (iter1->Valid()) {
731
30
      count++;
732
30
      iter1->Next();
733
30
    }
734
    // We have deleted 10 keys from 40 using the compaction filter
735
    //  Keys 6-9 before the snapshot and 100-105 after the snapshot
736
1
    ASSERT_EQ(count, 30);
737
1
  }
738
739
  // Release the snapshot and compact again -> now all records should be
740
  // removed.
741
1
  db_->ReleaseSnapshot(snapshot);
742
1
}
743
#endif  // ROCKSDB_LITE
744
745
}  // namespace rocksdb
746
747
13.2k
int main(int argc, char** argv) {
748
13.2k
  rocksdb::port::InstallStackTraceHandler();
749
13.2k
  ::testing::InitGoogleTest(&argc, argv);
750
13.2k
  return RUN_ALL_TESTS();
751
13.2k
}