/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_compaction_filter_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/db_test_util.h" |
25 | | #include "yb/rocksdb/port/stack_trace.h" |
26 | | |
27 | | #include "yb/util/stopwatch.h" |
28 | | #include "yb/util/tsan_util.h" |
29 | | |
30 | | namespace rocksdb { |
31 | | |
32 | | static int cfilter_count = 0; |
33 | | |
34 | | // This is a static filter used for filtering |
35 | | // kvs during the compaction process. |
36 | | static std::string NEW_VALUE = "NewValue"; |
37 | | |
38 | | class DBTestCompactionFilter : public DBTestBase { |
39 | | public: |
40 | 8 | DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {} |
41 | | }; |
42 | | |
43 | | class KeepFilter : public CompactionFilter { |
44 | | public: |
45 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
46 | | std::string* new_value, bool* value_changed) |
47 | 401k | override { |
48 | 401k | cfilter_count++; |
49 | 401k | return FilterDecision::kKeep; |
50 | 401k | } |
51 | | |
52 | 0 | const char* Name() const override { return "KeepFilter"; } |
53 | | }; |
54 | | |
55 | | class DeleteFilter : public CompactionFilter { |
56 | | public: |
57 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
58 | 100k | std::string* new_value, bool* value_changed) override { |
59 | 100k | cfilter_count++; |
60 | 100k | return FilterDecision::kDiscard; |
61 | 100k | } |
62 | | |
63 | 0 | const char* Name() const override { return "DeleteFilter"; } |
64 | | }; |
65 | | |
66 | | class DeleteISFilter : public CompactionFilter { |
67 | | public: |
68 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
69 | | std::string* new_value, |
70 | 40 | bool* value_changed) override { |
71 | 40 | cfilter_count++; |
72 | 40 | int i = std::stoi(key.ToString()); |
73 | 40 | if (i > 5 && i <= 105) { |
74 | 10 | return FilterDecision::kDiscard; |
75 | 10 | } |
76 | 30 | return FilterDecision::kKeep; |
77 | 30 | } |
78 | | |
79 | 1 | bool IgnoreSnapshots() const override { return true; } |
80 | | |
81 | 0 | const char* Name() const override { return "DeleteFilter"; } |
82 | | }; |
83 | | |
84 | | class DelayFilter : public CompactionFilter { |
85 | | public: |
86 | 0 | explicit DelayFilter(DBTestBase* d) : db_test(d) {} |
87 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
88 | | std::string* new_value, |
89 | 0 | bool* value_changed) override { |
90 | 0 | db_test->env_->addon_time_.fetch_add(1000); |
91 | 0 | return FilterDecision::kDiscard; |
92 | 0 | } |
93 | | |
94 | 0 | const char* Name() const override { return "DelayFilter"; } |
95 | | |
96 | | private: |
97 | | DBTestBase* db_test; |
98 | | }; |
99 | | |
100 | | class ConditionalFilter : public CompactionFilter { |
101 | | public: |
102 | | explicit ConditionalFilter(const std::string* filtered_value) |
103 | 9 | : filtered_value_(filtered_value) {} |
104 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
105 | 9 | std::string* new_value, bool* value_changed) override { |
106 | 7 | return value.ToBuffer() == *filtered_value_ ? FilterDecision::kDiscard : FilterDecision::kKeep; |
107 | 9 | } |
108 | | |
109 | 0 | const char* Name() const override { return "ConditionalFilter"; } |
110 | | |
111 | | private: |
112 | | const std::string* filtered_value_; |
113 | | }; |
114 | | |
115 | | class ChangeFilter : public CompactionFilter { |
116 | | public: |
117 | 14 | ChangeFilter() {} |
118 | | |
119 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
120 | 1.40M | std::string* new_value, bool* value_changed) override { |
121 | 1.40M | assert(new_value != nullptr); |
122 | 1.40M | *new_value = NEW_VALUE; |
123 | 1.40M | *value_changed = true; |
124 | 1.40M | return FilterDecision::kKeep; |
125 | 1.40M | } |
126 | | |
127 | 0 | const char* Name() const override { return "ChangeFilter"; } |
128 | | }; |
129 | | |
130 | | class KeepFilterFactory : public CompactionFilterFactory { |
131 | | public: |
132 | | explicit KeepFilterFactory(bool check_context = false, |
133 | | bool check_context_cf_id = false) |
134 | | : check_context_(check_context), |
135 | | check_context_cf_id_(check_context_cf_id), |
136 | 3 | compaction_filter_created_(false) {} |
137 | | |
138 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
139 | 6 | const CompactionFilter::Context& context) override { |
140 | 6 | if (check_context_) { |
141 | 1 | EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction); |
142 | 1 | EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); |
143 | 1 | } |
144 | 6 | if (check_context_cf_id_) { |
145 | 2 | EXPECT_EQ(expect_cf_id_.load(), context.column_family_id); |
146 | 2 | } |
147 | 6 | compaction_filter_created_ = true; |
148 | 6 | return std::unique_ptr<CompactionFilter>(new KeepFilter()); |
149 | 6 | } |
150 | | |
151 | 2 | bool compaction_filter_created() const { return compaction_filter_created_; } |
152 | | |
153 | 21 | const char* Name() const override { return "KeepFilterFactory"; } |
154 | | bool check_context_; |
155 | | bool check_context_cf_id_; |
156 | | std::atomic_bool expect_full_compaction_; |
157 | | std::atomic_bool expect_manual_compaction_; |
158 | | std::atomic<uint32_t> expect_cf_id_; |
159 | | bool compaction_filter_created_; |
160 | | }; |
161 | | |
162 | | class DeleteFilterFactory : public CompactionFilterFactory { |
163 | | public: |
164 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
165 | 4 | const CompactionFilter::Context& context) override { |
166 | 4 | if (context.is_manual_compaction) { |
167 | 4 | return std::unique_ptr<CompactionFilter>(new DeleteFilter()); |
168 | 0 | } else { |
169 | 0 | return std::unique_ptr<CompactionFilter>(nullptr); |
170 | 0 | } |
171 | 4 | } |
172 | | |
173 | 23 | const char* Name() const override { return "DeleteFilterFactory"; } |
174 | | }; |
175 | | |
176 | | // Delete Filter Factory which ignores snapshots |
177 | | class DeleteISFilterFactory : public CompactionFilterFactory { |
178 | | public: |
179 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
180 | 1 | const CompactionFilter::Context& context) override { |
181 | 1 | if (context.is_manual_compaction) { |
182 | 1 | return std::unique_ptr<CompactionFilter>(new DeleteISFilter()); |
183 | 0 | } else { |
184 | 0 | return std::unique_ptr<CompactionFilter>(nullptr); |
185 | 0 | } |
186 | 1 | } |
187 | | |
188 | 3 | const char* Name() const override { return "DeleteFilterFactory"; } |
189 | | }; |
190 | | |
191 | | class DelayFilterFactory : public CompactionFilterFactory { |
192 | | public: |
193 | 0 | explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {} |
194 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
195 | 0 | const CompactionFilter::Context& context) override { |
196 | 0 | return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test)); |
197 | 0 | } |
198 | | |
199 | 0 | const char* Name() const override { return "DelayFilterFactory"; } |
200 | | |
201 | | private: |
202 | | DBTestBase* db_test; |
203 | | }; |
204 | | |
205 | | class ConditionalFilterFactory : public CompactionFilterFactory { |
206 | | public: |
207 | | explicit ConditionalFilterFactory(const Slice& filtered_value) |
208 | 1 | : filtered_value_(filtered_value.ToString()) {} |
209 | | |
210 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
211 | 9 | const CompactionFilter::Context& context) override { |
212 | 9 | return std::unique_ptr<CompactionFilter>( |
213 | 9 | new ConditionalFilter(&filtered_value_)); |
214 | 9 | } |
215 | | |
216 | 3 | const char* Name() const override { |
217 | 3 | return "ConditionalFilterFactory"; |
218 | 3 | } |
219 | | |
220 | | private: |
221 | | std::string filtered_value_; |
222 | | }; |
223 | | |
224 | | class ChangeFilterFactory : public CompactionFilterFactory { |
225 | | public: |
226 | 5 | ChangeFilterFactory() {} |
227 | | |
228 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
229 | 14 | const CompactionFilter::Context& context) override { |
230 | 14 | return std::unique_ptr<CompactionFilter>(new ChangeFilter()); |
231 | 14 | } |
232 | | |
233 | 45 | const char* Name() const override { return "ChangeFilterFactory"; } |
234 | | }; |
235 | | |
236 | | #ifndef ROCKSDB_LITE |
237 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilter) { |
238 | 1 | Options options = CurrentOptions(); |
239 | 1 | options.max_open_files = -1; |
240 | 1 | options.num_levels = 3; |
241 | 1 | options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); |
242 | 1 | options = CurrentOptions(options); |
243 | 1 | CreateAndReopenWithCF({"pikachu"}, options); |
244 | | |
245 | | // Write 100K keys, these are written to a few files in L0. |
246 | 1 | const std::string value(10, 'x'); |
247 | 100k | for (int i = 0; i < 100000; i++) { |
248 | 100k | char key[100]; |
249 | 100k | snprintf(key, sizeof(key), "B%010d", i); |
250 | 100k | ASSERT_OK(Put(1, key, value)); |
251 | 100k | } |
252 | 1 | ASSERT_OK(Flush(1)); |
253 | | |
254 | | // Push all files to the highest level L2. Verify that |
255 | | // the compaction is each level invokes the filter for |
256 | | // all the keys in that level. |
257 | 1 | cfilter_count = 0; |
258 | 1 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); |
259 | 1 | ASSERT_EQ(cfilter_count, 100000); |
260 | 1 | cfilter_count = 0; |
261 | 1 | ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); |
262 | 1 | ASSERT_EQ(cfilter_count, 100000); |
263 | | |
264 | 1 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); |
265 | 1 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
266 | 1 | ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); |
267 | 1 | cfilter_count = 0; |
268 | | |
269 | | // All the files are in the lowest level. |
270 | | // Verify that all but the 100001st record |
271 | | // has sequence number zero. The 100001st record |
272 | | // is at the tip of this snapshot and cannot |
273 | | // be zeroed out. |
274 | 1 | int count = 0; |
275 | 1 | int total = 0; |
276 | 1 | Arena arena; |
277 | 1 | { |
278 | 1 | ScopedArenaIterator iter( |
279 | 1 | dbfull()->NewInternalIterator(&arena, handles_[1])); |
280 | 1 | iter->SeekToFirst(); |
281 | 1 | ASSERT_OK(iter->status()); |
282 | 100k | while (iter->Valid()) { |
283 | 100k | ParsedInternalKey ikey(Slice(), 0, kTypeValue); |
284 | 100k | ikey.sequence = -1; |
285 | 100k | ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
286 | 100k | total++; |
287 | 100k | if (ikey.sequence != 0) { |
288 | 1 | count++; |
289 | 1 | } |
290 | 100k | iter->Next(); |
291 | 100k | } |
292 | 1 | } |
293 | 1 | ASSERT_EQ(total, 100000); |
294 | 1 | ASSERT_EQ(count, 1); |
295 | | |
296 | | // overwrite all the 100K keys once again. |
297 | 100k | for (int i = 0; i < 100000; i++) { |
298 | 100k | char key[100]; |
299 | 100k | snprintf(key, sizeof(key), "B%010d", i); |
300 | 100k | ASSERT_OK(Put(1, key, value)); |
301 | 100k | } |
302 | 1 | ASSERT_OK(Flush(1)); |
303 | | |
304 | | // push all files to the highest level L2. This |
305 | | // means that all keys should pass at least once |
306 | | // via the compaction filter |
307 | 1 | cfilter_count = 0; |
308 | 1 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); |
309 | 1 | ASSERT_EQ(cfilter_count, 100000); |
310 | 1 | cfilter_count = 0; |
311 | 1 | ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); |
312 | 1 | ASSERT_EQ(cfilter_count, 100000); |
313 | 1 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); |
314 | 1 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
315 | 1 | ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); |
316 | | |
317 | | // create a new database with the compaction |
318 | | // filter in such a way that it deletes all keys |
319 | 1 | options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); |
320 | 1 | options.create_if_missing = true; |
321 | 1 | DestroyAndReopen(options); |
322 | 1 | CreateAndReopenWithCF({"pikachu"}, options); |
323 | | |
324 | | // write all the keys once again. |
325 | 100k | for (int i = 0; i < 100000; i++) { |
326 | 100k | char key[100]; |
327 | 100k | snprintf(key, sizeof(key), "B%010d", i); |
328 | 100k | ASSERT_OK(Put(1, key, value)); |
329 | 100k | } |
330 | 1 | ASSERT_OK(Flush(1)); |
331 | 1 | ASSERT_NE(NumTableFilesAtLevel(0, 1), 0); |
332 | 1 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
333 | 1 | ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0); |
334 | | |
335 | | // Push all files to the highest level L2. This |
336 | | // triggers the compaction filter to delete all keys, |
337 | | // verify that at the end of the compaction process, |
338 | | // nothing is left. |
339 | 1 | cfilter_count = 0; |
340 | 1 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); |
341 | 1 | ASSERT_EQ(cfilter_count, 100000); |
342 | 1 | cfilter_count = 0; |
343 | 1 | ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); |
344 | 1 | ASSERT_EQ(cfilter_count, 0); |
345 | 1 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); |
346 | 1 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
347 | | |
348 | 1 | { |
349 | | // Scan the entire database to ensure that nothing is left |
350 | 1 | std::unique_ptr<Iterator> iter( |
351 | 1 | db_->NewIterator(ReadOptions(), handles_[1])); |
352 | 1 | iter->SeekToFirst(); |
353 | 1 | count = 0; |
354 | 1 | while (iter->Valid()) { |
355 | 0 | count++; |
356 | 0 | iter->Next(); |
357 | 0 | } |
358 | 1 | ASSERT_EQ(count, 0); |
359 | 1 | } |
360 | | |
361 | | // The sequence number of the remaining record |
362 | | // is not zeroed out even though it is at the |
363 | | // level Lmax because this record is at the tip |
364 | 1 | count = 0; |
365 | 1 | { |
366 | 1 | ScopedArenaIterator iter( |
367 | 1 | dbfull()->NewInternalIterator(&arena, handles_[1])); |
368 | 1 | iter->SeekToFirst(); |
369 | 1 | ASSERT_OK(iter->status()); |
370 | 1 | while (iter->Valid()) { |
371 | 0 | ParsedInternalKey ikey(Slice(), 0, kTypeValue); |
372 | 0 | ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
373 | 0 | ASSERT_NE(ikey.sequence, (unsigned)0); |
374 | 0 | count++; |
375 | 0 | iter->Next(); |
376 | 0 | } |
377 | 1 | ASSERT_EQ(count, 0); |
378 | 1 | } |
379 | 1 | } |
380 | | |
381 | | // Tests the edge case where compaction does not produce any output -- all |
382 | | // entries are deleted. The compaction should create bunch of 'DeleteFile' |
383 | | // entries in VersionEdit, but none of the 'AddFile's. |
384 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) { |
385 | 1 | Options options; |
386 | 1 | options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); |
387 | 1 | options.disable_auto_compactions = true; |
388 | 1 | options.create_if_missing = true; |
389 | 1 | options = CurrentOptions(options); |
390 | 1 | DestroyAndReopen(options); |
391 | | |
392 | | // put some data |
393 | 5 | for (int table = 0; table < 4; ++table) { |
394 | 50 | for (int i = 0; i < 10 + table; ++i) { |
395 | 46 | ASSERT_OK(Put(ToString(table * 100 + i), "val")); |
396 | 46 | } |
397 | 4 | ASSERT_OK(Flush()); |
398 | 4 | } |
399 | | |
400 | | // this will produce empty file (delete compaction filter) |
401 | 1 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
402 | 1 | ASSERT_EQ(0U, CountLiveFiles()); |
403 | | |
404 | 1 | Reopen(options); |
405 | | |
406 | 1 | Iterator* itr = db_->NewIterator(ReadOptions()); |
407 | 1 | itr->SeekToFirst(); |
408 | | // empty db |
409 | 1 | ASSERT_TRUE(!itr->Valid()); |
410 | | |
411 | 1 | delete itr; |
412 | 1 | } |
413 | | #endif // ROCKSDB_LITE |
414 | | |
415 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterWithValueChange) { |
416 | 5 | do { |
417 | 5 | Options options; |
418 | 5 | options.num_levels = 3; |
419 | 5 | options.compaction_filter_factory = |
420 | 5 | std::make_shared<ChangeFilterFactory>(); |
421 | 5 | options = CurrentOptions(options); |
422 | 5 | CreateAndReopenWithCF({"pikachu"}, options); |
423 | | |
424 | | // Lower number of runs for tsan due to low perf. |
425 | 5 | constexpr int kNumKeys = yb::NonTsanVsTsan(100001, 10001); |
426 | | |
427 | | // Write 'kNumKeys' keys, these are written to a few files |
428 | | // in L0. We do this so that the current snapshot points |
429 | | // to the last key that we write. The compaction filter is not invoked |
430 | | // on keys that are visible via a snapshot because we |
431 | | // anyways cannot delete it. |
432 | 5 | const std::string value(10, 'x'); |
433 | 5 | LOG_TIMING(INFO, "Writing Keys") { |
434 | 500k | for (int i = 0; i < kNumKeys; i++) { |
435 | 500k | char key[100]; |
436 | 500k | snprintf(key, sizeof(key), "B%010d", i); |
437 | 500k | ASSERT_OK(Put(1, key, value)); |
438 | 500k | } |
439 | 5 | } |
440 | | |
441 | 5 | LOG_TIMING(INFO, "Pushing files to lower levels") { |
442 | | // push all files to lower levels |
443 | 5 | ASSERT_OK(Flush(1)); |
444 | 5 | if (option_config_ != kUniversalCompactionMultiLevel && |
445 | 4 | option_config_ != kUniversalSubcompactions) { |
446 | 3 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); |
447 | 3 | ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); |
448 | 2 | } else { |
449 | 2 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); |
450 | 2 | } |
451 | 5 | } |
452 | | |
453 | | // re-write all data again |
454 | 5 | LOG_TIMING(INFO, "Rewriting data") { |
455 | 500k | for (int i = 0; i < kNumKeys; i++) { |
456 | 500k | char key[100]; |
457 | 500k | snprintf(key, sizeof(key), "B%010d", i); |
458 | 500k | ASSERT_OK(Put(1, key, value)); |
459 | 500k | } |
460 | 5 | } |
461 | | |
462 | | // push all files to lower levels. This should |
463 | | // invoke the compaction filter for all kNumKeys - 1 keys. |
464 | 5 | LOG_TIMING(INFO, "Pushing files to lower levels after rewrite") { |
465 | 5 | ASSERT_OK(Flush(1)); |
466 | 5 | if (option_config_ != kUniversalCompactionMultiLevel && |
467 | 4 | option_config_ != kUniversalSubcompactions) { |
468 | 3 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); |
469 | 3 | ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); |
470 | 2 | } else { |
471 | 2 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); |
472 | 2 | } |
473 | 5 | } |
474 | | |
475 | | // verify that all keys now have the new value that |
476 | | // was set by the compaction process. |
477 | 5 | LOG_TIMING(INFO, "Verify keys") { |
478 | 500k | for (int i = 0; i < kNumKeys; i++) { |
479 | 500k | char key[100]; |
480 | 500k | snprintf(key, sizeof(key), "B%010d", i); |
481 | 500k | std::string newvalue = Get(1, key); |
482 | 500k | ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); |
483 | 500k | } |
484 | 5 | } |
485 | 5 | } while (ChangeCompactOptions()); |
486 | 1 | } |
487 | | |
488 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) { |
489 | 1 | std::string one, two, three, four; |
490 | 1 | PutFixed64(&one, 1); |
491 | 1 | PutFixed64(&two, 2); |
492 | 1 | PutFixed64(&three, 3); |
493 | 1 | PutFixed64(&four, 4); |
494 | | |
495 | 1 | Options options; |
496 | 1 | options = CurrentOptions(options); |
497 | 1 | options.create_if_missing = true; |
498 | 1 | options.merge_operator = MergeOperators::CreateUInt64AddOperator(); |
499 | 1 | options.num_levels = 3; |
500 | | // Filter out keys with value is 2. |
501 | 1 | options.compaction_filter_factory = |
502 | 1 | std::make_shared<ConditionalFilterFactory>(two); |
503 | 1 | DestroyAndReopen(options); |
504 | | |
505 | | // In the same compaction, a value type needs to be deleted based on |
506 | | // compaction filter, and there is a merge type for the key. compaction |
507 | | // filter result is ignored. |
508 | 1 | ASSERT_OK(db_->Put(WriteOptions(), "foo", two)); |
509 | 1 | ASSERT_OK(Flush()); |
510 | 1 | ASSERT_OK(db_->Merge(WriteOptions(), "foo", one)); |
511 | 1 | ASSERT_OK(Flush()); |
512 | 1 | std::string newvalue = Get("foo"); |
513 | 1 | ASSERT_EQ(newvalue, three); |
514 | 1 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
515 | 1 | newvalue = Get("foo"); |
516 | 1 | ASSERT_EQ(newvalue, three); |
517 | | |
518 | | // value key can be deleted based on compaction filter, leaving only |
519 | | // merge keys. |
520 | 1 | ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); |
521 | 1 | ASSERT_OK(Flush()); |
522 | 1 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
523 | 1 | newvalue = Get("bar"); |
524 | 1 | ASSERT_EQ("NOT_FOUND", newvalue); |
525 | 1 | ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); |
526 | 1 | ASSERT_OK(Flush()); |
527 | 1 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
528 | 1 | newvalue = Get("bar"); |
529 | 1 | ASSERT_EQ(two, two); |
530 | | |
531 | | // Compaction filter never applies to merge keys. |
532 | 1 | ASSERT_OK(db_->Put(WriteOptions(), "foobar", one)); |
533 | 1 | ASSERT_OK(Flush()); |
534 | 1 | ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two)); |
535 | 1 | ASSERT_OK(Flush()); |
536 | 1 | newvalue = Get("foobar"); |
537 | 1 | ASSERT_EQ(newvalue, three); |
538 | 1 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
539 | 1 | newvalue = Get("foobar"); |
540 | 1 | ASSERT_EQ(newvalue, three); |
541 | | |
542 | | // In the same compaction, both of value type and merge type keys need to be |
543 | | // deleted based on compaction filter, and there is a merge type for the key. |
544 | | // For both keys, compaction filter results are ignored. |
545 | 1 | ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two)); |
546 | 1 | ASSERT_OK(Flush()); |
547 | 1 | ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two)); |
548 | 1 | ASSERT_OK(Flush()); |
549 | 1 | newvalue = Get("barfoo"); |
550 | 1 | ASSERT_EQ(newvalue, four); |
551 | 1 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
552 | 1 | newvalue = Get("barfoo"); |
553 | 1 | ASSERT_EQ(newvalue, four); |
554 | 1 | } |
555 | | |
556 | | #ifndef ROCKSDB_LITE |
557 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { |
558 | 1 | KeepFilterFactory* filter = new KeepFilterFactory(true, true); |
559 | | |
560 | 1 | Options options = CurrentOptions(); |
561 | 1 | options.compaction_style = kCompactionStyleUniversal; |
562 | 1 | options.compaction_filter_factory.reset(filter); |
563 | 1 | options.compression = kNoCompression; |
564 | 1 | options.level0_file_num_compaction_trigger = 8; |
565 | 1 | Reopen(options); |
566 | 1 | int num_keys_per_file = 400; |
567 | 4 | for (int j = 0; j < 3; j++) { |
568 | | // Write several keys. |
569 | 3 | const std::string value(10, 'x'); |
570 | 703 | for (int i = 0; i < num_keys_per_file; i++) { |
571 | 700 | char key[100]; |
572 | 700 | snprintf(key, sizeof(key), "B%08d%02d", i, j); |
573 | 700 | ASSERT_OK(Put(key, value)); |
574 | 700 | } |
575 | 3 | ASSERT_OK(dbfull()->TEST_FlushMemTable()); |
576 | | // Make sure next file is much smaller so automatic compaction will not |
577 | | // be triggered. |
578 | 3 | num_keys_per_file /= 2; |
579 | 3 | } |
580 | 1 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
581 | | |
582 | | // Force a manual compaction |
583 | 1 | cfilter_count = 0; |
584 | 1 | filter->expect_manual_compaction_.store(true); |
585 | 1 | filter->expect_full_compaction_.store(true); |
586 | 1 | filter->expect_cf_id_.store(0); |
587 | 1 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
588 | 1 | ASSERT_EQ(cfilter_count, 700); |
589 | 1 | ASSERT_EQ(NumSortedRuns(0), 1); |
590 | 1 | ASSERT_TRUE(filter->compaction_filter_created()); |
591 | | |
592 | | // Verify total number of keys is correct after manual compaction. |
593 | 1 | { |
594 | 1 | int count = 0; |
595 | 1 | int total = 0; |
596 | 1 | Arena arena; |
597 | 1 | ScopedArenaIterator iter(dbfull()->NewInternalIterator(&arena)); |
598 | 1 | iter->SeekToFirst(); |
599 | 1 | ASSERT_OK(iter->status()); |
600 | 701 | while (iter->Valid()) { |
601 | 700 | ParsedInternalKey ikey(Slice(), 0, kTypeValue); |
602 | 700 | ikey.sequence = -1; |
603 | 700 | ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
604 | 700 | total++; |
605 | 700 | if (ikey.sequence != 0) { |
606 | 2 | count++; |
607 | 2 | } |
608 | 700 | iter->Next(); |
609 | 700 | } |
610 | 1 | ASSERT_EQ(total, 700); |
611 | 1 | ASSERT_EQ(count, 2); |
612 | 1 | } |
613 | 1 | } |
614 | | #endif // ROCKSDB_LITE |
615 | | |
616 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) { |
617 | 1 | KeepFilterFactory* filter = new KeepFilterFactory(false, true); |
618 | 1 | filter->expect_cf_id_.store(1); |
619 | | |
620 | 1 | Options options = CurrentOptions(); |
621 | 1 | options.compaction_filter_factory.reset(filter); |
622 | 1 | options.compression = kNoCompression; |
623 | 1 | options.level0_file_num_compaction_trigger = 2; |
624 | 1 | CreateAndReopenWithCF({"pikachu"}, options); |
625 | | |
626 | 1 | int num_keys_per_file = 400; |
627 | 4 | for (int j = 0; j < 3; j++) { |
628 | | // Write several keys. |
629 | 3 | const std::string value(10, 'x'); |
630 | 703 | for (int i = 0; i < num_keys_per_file; i++) { |
631 | 700 | char key[100]; |
632 | 700 | snprintf(key, sizeof(key), "B%08d%02d", i, j); |
633 | 700 | ASSERT_OK(Put(1, key, value)); |
634 | 700 | } |
635 | 3 | ASSERT_OK(Flush(1)); |
636 | | // Make sure next file is much smaller so automatic compaction will not |
637 | | // be triggered. |
638 | 3 | num_keys_per_file /= 2; |
639 | 3 | } |
640 | 1 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
641 | | |
642 | 1 | ASSERT_TRUE(filter->compaction_filter_created()); |
643 | 1 | } |
644 | | |
645 | | #ifndef ROCKSDB_LITE |
646 | | // Compaction filters should only be applied to records that are newer than the |
647 | | // latest snapshot. This test inserts records and applies a delete filter. |
648 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) { |
649 | 1 | Options options; |
650 | 1 | options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); |
651 | 1 | options.disable_auto_compactions = true; |
652 | 1 | options.create_if_missing = true; |
653 | 1 | options = CurrentOptions(options); |
654 | 1 | DestroyAndReopen(options); |
655 | | |
656 | | // Put some data. |
657 | 1 | const Snapshot* snapshot = nullptr; |
658 | 5 | for (int table = 0; table < 4; ++table) { |
659 | 44 | for (int i = 0; i < 10; ++i) { |
660 | 40 | ASSERT_OK(Put(ToString(table * 100 + i), "val")); |
661 | 40 | } |
662 | 4 | ASSERT_OK(Flush()); |
663 | | |
664 | 4 | if (table == 0) { |
665 | 1 | snapshot = db_->GetSnapshot(); |
666 | 1 | } |
667 | 4 | } |
668 | 1 | assert(snapshot != nullptr); |
669 | | |
670 | 1 | cfilter_count = 0; |
671 | 1 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
672 | | // The filter should delete 10 records. |
673 | 1 | ASSERT_EQ(30U, cfilter_count); |
674 | | |
675 | | // Release the snapshot and compact again -> now all records should be |
676 | | // removed. |
677 | 1 | db_->ReleaseSnapshot(snapshot); |
678 | 1 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
679 | 1 | ASSERT_EQ(0U, CountLiveFiles()); |
680 | 1 | } |
681 | | |
682 | | // Compaction filters should only be applied to records that are newer than the |
683 | | // latest snapshot. However, if the compaction filter asks to ignore snapshots |
684 | | // records newer than the snapshot will also be processed |
685 | 1 | TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) { |
686 | 1 | std::string five = ToString(5); |
687 | 1 | Options options; |
688 | 1 | options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>(); |
689 | 1 | options.disable_auto_compactions = true; |
690 | 1 | options.create_if_missing = true; |
691 | 1 | options = CurrentOptions(options); |
692 | 1 | DestroyAndReopen(options); |
693 | | |
694 | | // Put some data. |
695 | 1 | const Snapshot* snapshot = nullptr; |
696 | 5 | for (int table = 0; table < 4; ++table) { |
697 | 44 | for (int i = 0; i < 10; ++i) { |
698 | 40 | ASSERT_OK(Put(ToString(table * 100 + i), "val")); |
699 | 40 | } |
700 | 4 | ASSERT_OK(Flush()); |
701 | | |
702 | 4 | if (table == 0) { |
703 | 1 | snapshot = db_->GetSnapshot(); |
704 | 1 | } |
705 | 4 | } |
706 | 1 | assert(snapshot != nullptr); |
707 | | |
708 | 1 | cfilter_count = 0; |
709 | 1 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
710 | | // The filter should delete 40 records. |
711 | 1 | ASSERT_EQ(40U, cfilter_count); |
712 | | |
713 | 1 | { |
714 | | // Scan the entire database as of the snapshot to ensure |
715 | | // that nothing is left |
716 | 1 | ReadOptions read_options; |
717 | 1 | read_options.snapshot = snapshot; |
718 | 1 | std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); |
719 | 1 | iter->SeekToFirst(); |
720 | 1 | int count = 0; |
721 | 7 | while (iter->Valid()) { |
722 | 6 | count++; |
723 | 6 | iter->Next(); |
724 | 6 | } |
725 | 1 | ASSERT_EQ(count, 6); |
726 | 1 | read_options.snapshot = 0; |
727 | 1 | std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options)); |
728 | 1 | iter1->SeekToFirst(); |
729 | 1 | count = 0; |
730 | 31 | while (iter1->Valid()) { |
731 | 30 | count++; |
732 | 30 | iter1->Next(); |
733 | 30 | } |
734 | | // We have deleted 10 keys from 40 using the compaction filter |
735 | | // Keys 6-9 before the snapshot and 100-105 after the snapshot |
736 | 1 | ASSERT_EQ(count, 30); |
737 | 1 | } |
738 | | |
739 | | // Release the snapshot and compact again -> now all records should be |
740 | | // removed. |
741 | 1 | db_->ReleaseSnapshot(snapshot); |
742 | 1 | } |
743 | | #endif // ROCKSDB_LITE |
744 | | |
745 | | } // namespace rocksdb |
746 | | |
747 | 13.2k | int main(int argc, char** argv) { |
748 | 13.2k | rocksdb::port::InstallStackTraceHandler(); |
749 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
750 | 13.2k | return RUN_ALL_TESTS(); |
751 | 13.2k | } |