/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_universal_compaction_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/db_test_util.h" |
25 | | #include "yb/rocksdb/db/job_context.h" |
26 | | #include "yb/rocksdb/port/stack_trace.h" |
27 | | #if !defined(ROCKSDB_LITE) |
28 | | #include "yb/rocksdb/util/file_util.h" |
29 | | #include "yb/rocksdb/util/sync_point.h" |
30 | | |
31 | | namespace rocksdb { |
32 | | |
33 | 1.93k | static std::string CompressibleString(Random* rnd, int len) { |
34 | 1.93k | std::string r; |
35 | 1.93k | CompressibleString(rnd, 0.8, len, &r); |
36 | 1.93k | return r; |
37 | 1.93k | } |
38 | | |
39 | | class DBTestUniversalCompactionBase |
40 | | : public DBTestBase, |
41 | | public ::testing::WithParamInterface<std::tuple<int, bool>> { |
42 | | public: |
43 | | explicit DBTestUniversalCompactionBase( |
44 | 106 | const std::string& path) : DBTestBase(path) {} |
45 | 106 | void SetUp() override { |
46 | 106 | num_levels_ = std::get<0>(GetParam()); |
47 | 106 | exclusive_manual_compaction_ = std::get<1>(GetParam()); |
48 | 106 | } |
49 | | int num_levels_; |
50 | | bool exclusive_manual_compaction_; |
51 | | }; |
52 | | |
53 | | class DBTestUniversalCompactionWithParam : public DBTestUniversalCompactionBase { |
54 | | public: |
55 | | DBTestUniversalCompactionWithParam() : |
56 | 90 | DBTestUniversalCompactionBase("/db_universal_compaction_test") {} |
57 | | }; |
58 | | |
59 | | namespace { |
60 | | void VerifyCompactionResult( |
61 | | const ColumnFamilyMetaData& cf_meta, |
62 | 6 | const std::set<std::string>& overlapping_file_numbers) { |
63 | 6 | #ifndef NDEBUG |
64 | 6 | for (auto& level : cf_meta.levels) { |
65 | 8 | for (auto& file : level.files) { |
66 | 8 | assert(overlapping_file_numbers.find(file.name) == |
67 | 8 | overlapping_file_numbers.end()); |
68 | 8 | } |
69 | 6 | } |
70 | 6 | #endif |
71 | 6 | } |
72 | | |
73 | | class KeepFilter : public CompactionFilter { |
74 | | public: |
75 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
76 | 86.8k | std::string* new_value, bool* value_changed) override { |
77 | 86.8k | return FilterDecision::kKeep; |
78 | 86.8k | } |
79 | | |
80 | 0 | const char* Name() const override { return "KeepFilter"; } |
81 | | }; |
82 | | |
83 | | class KeepFilterFactory : public CompactionFilterFactory { |
84 | | public: |
85 | | explicit KeepFilterFactory(bool check_context = false) |
86 | 12 | : check_context_(check_context) {} |
87 | | |
88 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
89 | 114 | const CompactionFilter::Context& context) override { |
90 | 114 | if (check_context_) { |
91 | 114 | EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction); |
92 | 114 | EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); |
93 | 114 | } |
94 | 114 | return std::unique_ptr<CompactionFilter>(new KeepFilter()); |
95 | 114 | } |
96 | | |
97 | 102 | const char* Name() const override { return "KeepFilterFactory"; } |
98 | | bool check_context_; |
99 | | std::atomic_bool expect_full_compaction_; |
100 | | std::atomic_bool expect_manual_compaction_; |
101 | | }; |
102 | | |
103 | | class DelayFilter : public CompactionFilter { |
104 | | public: |
105 | 0 | explicit DelayFilter(DBTestBase* d) : db_test(d) {} |
106 | | FilterDecision Filter(int level, const Slice& key, const Slice& value, |
107 | | std::string* new_value, |
108 | 0 | bool* value_changed) override { |
109 | 0 | db_test->env_->addon_time_.fetch_add(1000); |
110 | 0 | return FilterDecision::kDiscard; |
111 | 0 | } |
112 | | |
113 | 0 | const char* Name() const override { return "DelayFilter"; } |
114 | | |
115 | | private: |
116 | | DBTestBase* db_test; |
117 | | }; |
118 | | |
119 | | class DelayFilterFactory : public CompactionFilterFactory { |
120 | | public: |
121 | 0 | explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {} |
122 | | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
123 | 0 | const CompactionFilter::Context& context) override { |
124 | 0 | return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test)); |
125 | 0 | } |
126 | | |
127 | 0 | const char* Name() const override { return "DelayFilterFactory"; } |
128 | | |
129 | | private: |
130 | | DBTestBase* db_test; |
131 | | }; |
132 | | } // namespace |
133 | | |
134 | | // Make sure we don't trigger a problem if the trigger conditon is given |
135 | | // to be 0, which is invalid. |
136 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionSingleSortedRun) { |
137 | 6 | Options options; |
138 | 6 | options = CurrentOptions(options); |
139 | | |
140 | 6 | options.compaction_style = kCompactionStyleUniversal; |
141 | 6 | options.num_levels = num_levels_; |
142 | | // Config universal compaction to always compact to one single sorted run. |
143 | 6 | options.level0_file_num_compaction_trigger = 0; |
144 | 6 | options.compaction_options_universal.size_ratio = 10; |
145 | 6 | options.compaction_options_universal.min_merge_width = 2; |
146 | 6 | options.compaction_options_universal.max_size_amplification_percent = 1; |
147 | | |
148 | 6 | options.write_buffer_size = 105 << 10; // 105KB |
149 | 6 | options.arena_block_size = 4 << 10; |
150 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
151 | | // trigger compaction if there are >= 4 files |
152 | 6 | KeepFilterFactory* filter = new KeepFilterFactory(true); |
153 | 6 | filter->expect_manual_compaction_.store(false); |
154 | 6 | options.compaction_filter_factory.reset(filter); |
155 | | |
156 | 6 | DestroyAndReopen(options); |
157 | 6 | ASSERT_EQ(1, db_->GetOptions().level0_file_num_compaction_trigger); |
158 | | |
159 | 6 | Random rnd(301); |
160 | 6 | int key_idx = 0; |
161 | | |
162 | 6 | filter->expect_full_compaction_.store(true); |
163 | | |
164 | 102 | for (int num = 0; num < 16; num++) { |
165 | | // Write 100KB file. And immediately it should be compacted to one file. |
166 | 96 | GenerateNewFile(&rnd, &key_idx); |
167 | 96 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
168 | 96 | ASSERT_EQ(NumSortedRuns(0), 1); |
169 | 96 | } |
170 | 6 | } |
171 | | |
172 | 6 | TEST_P(DBTestUniversalCompactionWithParam, OptimizeFiltersForHits) { |
173 | 6 | Options options; |
174 | 6 | options = CurrentOptions(options); |
175 | 6 | options.compaction_style = kCompactionStyleUniversal; |
176 | 6 | options.compaction_options_universal.size_ratio = 5; |
177 | 6 | options.num_levels = num_levels_; |
178 | 6 | options.write_buffer_size = 105 << 10; // 105KB |
179 | 6 | options.arena_block_size = 4 << 10; |
180 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
181 | | // trigger compaction if there are >= 4 files |
182 | 6 | options.level0_file_num_compaction_trigger = 4; |
183 | 6 | BlockBasedTableOptions bbto; |
184 | 6 | bbto.cache_index_and_filter_blocks = true; |
185 | 6 | bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); |
186 | 6 | bbto.whole_key_filtering = true; |
187 | 6 | options.table_factory.reset(NewBlockBasedTableFactory(bbto)); |
188 | 6 | options.optimize_filters_for_hits = true; |
189 | 6 | options.statistics = rocksdb::CreateDBStatisticsForTests(); |
190 | 6 | options.memtable_factory.reset(new SpecialSkipListFactory(3)); |
191 | | |
192 | 6 | DestroyAndReopen(options); |
193 | | |
194 | | // block compaction from happening |
195 | 6 | env_->SetBackgroundThreads(1, Env::LOW); |
196 | 6 | test::SleepingBackgroundTask sleeping_task_low; |
197 | 6 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, |
198 | 6 | Env::Priority::LOW); |
199 | | |
200 | 30 | for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { |
201 | 24 | ASSERT_OK(Put(Key(num * 10), "val")); |
202 | 24 | if (num) { |
203 | 18 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
204 | 18 | } |
205 | 24 | ASSERT_OK(Put(Key(30 + num * 10), "val")); |
206 | 24 | ASSERT_OK(Put(Key(60 + num * 10), "val")); |
207 | 24 | } |
208 | 6 | ASSERT_OK(Put("", "")); |
209 | 6 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
210 | | |
211 | | // Query set of non existing keys |
212 | 60 | for (int i = 5; i < 90; i += 10) { |
213 | 54 | ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); |
214 | 54 | } |
215 | | |
216 | | // Make sure bloom filter is used at least once. |
217 | 6 | ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); |
218 | 6 | auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL); |
219 | | |
220 | | // Make sure bloom filter is used for all but the last L0 file when looking |
221 | | // up a non-existent key that's in the range of all L0 files. |
222 | 6 | ASSERT_EQ(Get(Key(35)), "NOT_FOUND"); |
223 | 6 | ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1, |
224 | 6 | TestGetTickerCount(options, BLOOM_FILTER_USEFUL)); |
225 | 6 | prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL); |
226 | | |
227 | | // Unblock compaction and wait it for happening. |
228 | 6 | sleeping_task_low.WakeUp(); |
229 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
230 | | |
231 | | // The same queries will not trigger bloom filter |
232 | 60 | for (int i = 5; i < 90; i += 10) { |
233 | 54 | ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); |
234 | 54 | } |
235 | 6 | ASSERT_EQ(prev_counter, TestGetTickerCount(options, BLOOM_FILTER_USEFUL)); |
236 | 6 | } |
237 | | |
238 | | // TODO(kailiu) The tests on UniversalCompaction has some issues: |
239 | | // 1. A lot of magic numbers ("11" or "12"). |
240 | | // 2. Made assumption on the memtable flush conditions, which may change from |
241 | | // time to time. |
242 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionTrigger) { |
243 | 6 | Options options; |
244 | 6 | options.compaction_style = kCompactionStyleUniversal; |
245 | 6 | options.compaction_options_universal.size_ratio = 5; |
246 | 6 | options.num_levels = num_levels_; |
247 | 6 | options.write_buffer_size = 105 << 10; // 105KB |
248 | 6 | options.arena_block_size = 4 << 10; |
249 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
250 | | // trigger compaction if there are >= 4 files |
251 | 6 | options.level0_file_num_compaction_trigger = 4; |
252 | 6 | KeepFilterFactory* filter = new KeepFilterFactory(true); |
253 | 6 | filter->expect_manual_compaction_.store(false); |
254 | 6 | options.compaction_filter_factory.reset(filter); |
255 | | |
256 | 6 | options = CurrentOptions(options); |
257 | 6 | DestroyAndReopen(options); |
258 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
259 | | |
260 | 6 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
261 | 520 | "DBTestWritableFile.GetPreallocationStatus", [&](void* arg) { |
262 | 520 | ASSERT_TRUE(arg != nullptr); |
263 | 520 | size_t preallocation_size = *(static_cast<size_t*>(arg)); |
264 | 520 | if (num_levels_ > 3) { |
265 | 240 | ASSERT_LE(preallocation_size, options.target_file_size_base * 1.1); |
266 | 240 | } |
267 | 520 | }); |
268 | 6 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
269 | | |
270 | 6 | Random rnd(301); |
271 | 6 | int key_idx = 0; |
272 | | |
273 | 6 | filter->expect_full_compaction_.store(true); |
274 | | // Stage 1: |
275 | | // Generate a set of files at level 0, but don't trigger level-0 |
276 | | // compaction. |
277 | 24 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; |
278 | 18 | num++) { |
279 | | // Write 100KB |
280 | 18 | GenerateNewFile(1, &rnd, &key_idx); |
281 | 18 | } |
282 | | |
283 | | // Generate one more file at level-0, which should trigger level-0 |
284 | | // compaction. |
285 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
286 | | // Suppose each file flushed from mem table has size 1. Now we compact |
287 | | // (level0_file_num_compaction_trigger+1)=4 files and should have a big |
288 | | // file of size 4. |
289 | 6 | ASSERT_EQ(NumSortedRuns(1), 1); |
290 | | |
291 | | // Stage 2: |
292 | | // Now we have one file at level 0, with size 4. We also have some data in |
293 | | // mem table. Let's continue generating new files at level 0, but don't |
294 | | // trigger level-0 compaction. |
295 | | // First, clean up memtable before inserting new data. This will generate |
296 | | // a level-0 file, with size around 0.4 (according to previously written |
297 | | // data amount). |
298 | 6 | filter->expect_full_compaction_.store(false); |
299 | 6 | ASSERT_OK(Flush(1)); |
300 | 12 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 3; |
301 | 6 | num++) { |
302 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
303 | 6 | ASSERT_EQ(NumSortedRuns(1), num + 3); |
304 | 6 | } |
305 | | |
306 | | // Generate one more file at level-0, which should trigger level-0 |
307 | | // compaction. |
308 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
309 | | // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. |
310 | | // After compaction, we should have 2 files, with size 4, 2.4. |
311 | 6 | ASSERT_EQ(NumSortedRuns(1), 2); |
312 | | |
313 | | // Stage 3: |
314 | | // Now we have 2 files at level 0, with size 4 and 2.4. Continue |
315 | | // generating new files at level 0. |
316 | 12 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 3; |
317 | 6 | num++) { |
318 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
319 | 6 | ASSERT_EQ(NumSortedRuns(1), num + 3); |
320 | 6 | } |
321 | | |
322 | | // Generate one more file at level-0, which should trigger level-0 |
323 | | // compaction. |
324 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
325 | | // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1. |
326 | | // After compaction, we should have 3 files, with size 4, 2.4, 2. |
327 | 6 | ASSERT_EQ(NumSortedRuns(1), 3); |
328 | | |
329 | | // Stage 4: |
330 | | // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a |
331 | | // new file of size 1. |
332 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
333 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
334 | | // Level-0 compaction is triggered, but no file will be picked up. |
335 | 6 | ASSERT_EQ(NumSortedRuns(1), 4); |
336 | | |
337 | | // Stage 5: |
338 | | // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate |
339 | | // a new file of size 1. |
340 | 6 | filter->expect_full_compaction_.store(true); |
341 | 6 | GenerateNewFile(1, &rnd, &key_idx); |
342 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
343 | | // All files at level 0 will be compacted into a single one. |
344 | 6 | ASSERT_EQ(NumSortedRuns(1), 1); |
345 | | |
346 | 6 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
347 | 6 | } |
348 | | |
349 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionSizeAmplification) { |
350 | 6 | Options options; |
351 | 6 | options.compaction_style = kCompactionStyleUniversal; |
352 | 6 | options.num_levels = num_levels_; |
353 | 6 | options.write_buffer_size = 100 << 10; // 100KB |
354 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
355 | 6 | options.level0_file_num_compaction_trigger = 3; |
356 | 6 | options = CurrentOptions(options); |
357 | 6 | DestroyAndReopen(options); |
358 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
359 | | |
360 | | // Trigger compaction if size amplification exceeds 110% |
361 | 6 | options.compaction_options_universal.max_size_amplification_percent = 110; |
362 | 6 | options = CurrentOptions(options); |
363 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
364 | | |
365 | 6 | Random rnd(301); |
366 | 6 | int key_idx = 0; |
367 | | |
368 | | // Generate two files in Level 0. Both files are approx the same size. |
369 | 18 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; |
370 | 12 | num++) { |
371 | | // Write 110KB (11 values, each 10K) |
372 | 144 | for (int i = 0; i < 11; i++) { |
373 | 132 | ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); |
374 | 132 | key_idx++; |
375 | 132 | } |
376 | 12 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); |
377 | 12 | ASSERT_EQ(NumSortedRuns(1), num + 1); |
378 | 12 | } |
379 | 6 | ASSERT_EQ(NumSortedRuns(1), 2); |
380 | | |
381 | | // Flush whatever is remaining in memtable. This is typically |
382 | | // small, which should not trigger size ratio based compaction |
383 | | // but will instead trigger size amplification. |
384 | 6 | ASSERT_OK(Flush(1)); |
385 | | |
386 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
387 | | |
388 | | // Verify that size amplification did occur |
389 | 6 | ASSERT_EQ(NumSortedRuns(1), 1); |
390 | 6 | } |
391 | | |
392 | 6 | TEST_P(DBTestUniversalCompactionWithParam, CompactFilesOnUniversalCompaction) { |
393 | 6 | const int kTestKeySize = 16; |
394 | 6 | const int kTestValueSize = 984; |
395 | 6 | const int kEntrySize = kTestKeySize + kTestValueSize; |
396 | 6 | const int kEntriesPerBuffer = 10; |
397 | | |
398 | 6 | ChangeCompactOptions(); |
399 | 6 | Options options; |
400 | 6 | options.create_if_missing = true; |
401 | 6 | options.write_buffer_size = kEntrySize * kEntriesPerBuffer; |
402 | 6 | options.compaction_style = kCompactionStyleLevel; |
403 | 6 | options.num_levels = 1; |
404 | 6 | options.target_file_size_base = options.write_buffer_size; |
405 | 6 | options.compression = kNoCompression; |
406 | 6 | options = CurrentOptions(options); |
407 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
408 | 6 | ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal); |
409 | 6 | Random rnd(301); |
410 | 61.4k | for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) { |
411 | 61.4k | ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize))); |
412 | 61.4k | } |
413 | 6 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); |
414 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
415 | 6 | ColumnFamilyMetaData cf_meta; |
416 | 6 | dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); |
417 | 6 | std::vector<std::string> compaction_input_file_names; |
418 | 18 | for (auto file : cf_meta.levels[0].files) { |
419 | 18 | if (rnd.OneIn(2)) { |
420 | 14 | compaction_input_file_names.push_back(file.name); |
421 | 14 | } |
422 | 18 | } |
423 | | |
424 | 6 | if (compaction_input_file_names.size() == 0) { |
425 | 0 | compaction_input_file_names.push_back( |
426 | 0 | cf_meta.levels[0].files[0].name); |
427 | 0 | } |
428 | | |
429 | | // expect fail since universal compaction only allow L0 output |
430 | 6 | ASSERT_FALSE(dbfull() |
431 | 6 | ->CompactFiles(CompactionOptions(), handles_[1], |
432 | 6 | compaction_input_file_names, 1) |
433 | 6 | .ok()); |
434 | | |
435 | | // expect ok and verify the compacted files no longer exist. |
436 | 6 | ASSERT_OK(dbfull()->CompactFiles( |
437 | 6 | CompactionOptions(), handles_[1], |
438 | 6 | compaction_input_file_names, 0)); |
439 | | |
440 | 6 | dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); |
441 | 6 | VerifyCompactionResult( |
442 | 6 | cf_meta, |
443 | 6 | std::set<std::string>(compaction_input_file_names.begin(), |
444 | 6 | compaction_input_file_names.end())); |
445 | | |
446 | 6 | compaction_input_file_names.clear(); |
447 | | |
448 | | // Pick the first and the last file, expect everything is |
449 | | // compacted into one single file. |
450 | 6 | compaction_input_file_names.push_back( |
451 | 6 | cf_meta.levels[0].files[0].name); |
452 | 6 | compaction_input_file_names.push_back( |
453 | 6 | cf_meta.levels[0].files[ |
454 | 6 | cf_meta.levels[0].files.size() - 1].name); |
455 | 6 | ASSERT_OK(dbfull()->CompactFiles( |
456 | 6 | CompactionOptions(), handles_[1], |
457 | 6 | compaction_input_file_names, 0)); |
458 | | |
459 | 6 | dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); |
460 | 6 | ASSERT_EQ(cf_meta.levels[0].files.size(), 1U); |
461 | 6 | } |
462 | | |
463 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionTargetLevel) { |
464 | 6 | Options options; |
465 | 6 | options.compaction_style = kCompactionStyleUniversal; |
466 | 6 | options.write_buffer_size = 100 << 10; // 100KB |
467 | 6 | options.num_levels = 7; |
468 | 6 | options.disable_auto_compactions = true; |
469 | 6 | options = CurrentOptions(options); |
470 | 6 | DestroyAndReopen(options); |
471 | | |
472 | | // Generate 3 overlapping files |
473 | 6 | Random rnd(301); |
474 | 1.26k | for (int i = 0; i < 210; i++) { |
475 | 1.26k | ASSERT_OK(Put(Key(i), RandomString(&rnd, 100))); |
476 | 1.26k | } |
477 | 6 | ASSERT_OK(Flush()); |
478 | | |
479 | 606 | for (int i = 200; i < 300; i++) { |
480 | 600 | ASSERT_OK(Put(Key(i), RandomString(&rnd, 100))); |
481 | 600 | } |
482 | 6 | ASSERT_OK(Flush()); |
483 | | |
484 | 66 | for (int i = 250; i < 260; i++) { |
485 | 60 | ASSERT_OK(Put(Key(i), RandomString(&rnd, 100))); |
486 | 60 | } |
487 | 6 | ASSERT_OK(Flush()); |
488 | | |
489 | 6 | ASSERT_EQ("3", FilesPerLevel(0)); |
490 | | // Compact all files into 1 file and put it in L4 |
491 | 6 | CompactRangeOptions compact_options; |
492 | 6 | compact_options.change_level = true; |
493 | 6 | compact_options.target_level = 4; |
494 | 6 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; |
495 | 6 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); |
496 | 6 | ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); |
497 | 6 | } |
498 | | |
499 | | |
500 | | class DBTestUniversalCompactionMultiLevels |
501 | | : public DBTestUniversalCompactionBase { |
502 | | public: |
503 | | DBTestUniversalCompactionMultiLevels() : |
504 | | DBTestUniversalCompactionBase( |
505 | 8 | "/db_universal_compaction_multi_levels_test") {} |
506 | | }; |
507 | | |
508 | 4 | TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) { |
509 | 4 | Options options; |
510 | 4 | options.compaction_style = kCompactionStyleUniversal; |
511 | 4 | options.num_levels = num_levels_; |
512 | 4 | options.write_buffer_size = 100 << 10; // 100KB |
513 | 4 | options.level0_file_num_compaction_trigger = 8; |
514 | 4 | options.max_background_compactions = 3; |
515 | 4 | options.target_file_size_base = 32 * 1024; |
516 | 4 | options = CurrentOptions(options); |
517 | 4 | CreateAndReopenWithCF({"pikachu"}, options); |
518 | | |
519 | | // Trigger compaction if size amplification exceeds 110% |
520 | 4 | options.compaction_options_universal.max_size_amplification_percent = 110; |
521 | 4 | options = CurrentOptions(options); |
522 | 4 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
523 | | |
524 | 4 | Random rnd(301); |
525 | 4 | int num_keys = 100000; |
526 | 800k | for (int i = 0; i < num_keys * 2; i++) { |
527 | 800k | ASSERT_OK(Put(1, Key(i % num_keys), Key(i))); |
528 | 800k | } |
529 | | |
530 | 4 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
531 | | |
532 | 400k | for (int i = num_keys; i < num_keys * 2; i++) { |
533 | 400k | ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); |
534 | 400k | } |
535 | 4 | } |
536 | | // Tests universal compaction with trivial move enabled |
537 | 4 | TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { |
538 | 4 | int32_t trivial_move = 0; |
539 | 4 | int32_t non_trivial_move = 0; |
540 | 4 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
541 | 4 | "DBImpl::BackgroundCompaction:TrivialMove", |
542 | 56 | [&](void* arg) { trivial_move++; }); |
543 | 4 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
544 | 72 | "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) { |
545 | 72 | non_trivial_move++; |
546 | 72 | ASSERT_TRUE(arg != nullptr); |
547 | 72 | int output_level = *(static_cast<int*>(arg)); |
548 | 72 | ASSERT_EQ(output_level, 0); |
549 | 72 | }); |
550 | 4 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
551 | | |
552 | 4 | Options options; |
553 | 4 | options.compaction_style = kCompactionStyleUniversal; |
554 | 4 | options.compaction_options_universal.allow_trivial_move = true; |
555 | 4 | options.num_levels = 3; |
556 | 4 | options.write_buffer_size = 100 << 10; // 100KB |
557 | 4 | options.level0_file_num_compaction_trigger = 3; |
558 | 4 | options.max_background_compactions = 2; |
559 | 4 | options.target_file_size_base = 32 * 1024; |
560 | 4 | options = CurrentOptions(options); |
561 | 4 | DestroyAndReopen(options); |
562 | 4 | CreateAndReopenWithCF({"pikachu"}, options); |
563 | | |
564 | | // Trigger compaction if size amplification exceeds 110% |
565 | 4 | options.compaction_options_universal.max_size_amplification_percent = 110; |
566 | 4 | options = CurrentOptions(options); |
567 | 4 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
568 | | |
569 | 4 | Random rnd(301); |
570 | 4 | int num_keys = 150000; |
571 | 600k | for (int i = 0; i < num_keys; i++) { |
572 | 600k | ASSERT_OK(Put(1, Key(i), Key(i))); |
573 | 600k | } |
574 | 4 | std::vector<std::string> values; |
575 | | |
576 | 4 | ASSERT_OK(Flush(1)); |
577 | 4 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
578 | | |
579 | 4 | ASSERT_GT(trivial_move, 0); |
580 | 4 | ASSERT_GT(non_trivial_move, 0); |
581 | | |
582 | 4 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
583 | 4 | } |
584 | | |
585 | | INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, |
586 | | DBTestUniversalCompactionMultiLevels, |
587 | | ::testing::Combine(::testing::Values(3, 20), |
588 | | ::testing::Bool())); |
589 | | |
590 | | class DBTestUniversalCompactionParallel : |
591 | | public DBTestUniversalCompactionBase { |
592 | | public: |
593 | | DBTestUniversalCompactionParallel() : |
594 | | DBTestUniversalCompactionBase( |
595 | 4 | "/db_universal_compaction_prallel_test") {} |
596 | | }; |
597 | | |
598 | 4 | TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { |
599 | 4 | Options options; |
600 | 4 | options.compaction_style = kCompactionStyleUniversal; |
601 | 4 | options.num_levels = num_levels_; |
602 | 4 | options.write_buffer_size = 1 << 10; // 1KB |
603 | 4 | options.level0_file_num_compaction_trigger = 3; |
604 | 4 | options.max_background_compactions = 3; |
605 | 4 | options.max_background_flushes = 3; |
606 | 4 | options.target_file_size_base = 1 * 1024; |
607 | 4 | options.compaction_options_universal.max_size_amplification_percent = 110; |
608 | 4 | options = CurrentOptions(options); |
609 | 4 | DestroyAndReopen(options); |
610 | 4 | CreateAndReopenWithCF({"pikachu"}, options); |
611 | | |
612 | | // Delay every compaction so multiple compactions will happen. |
613 | 4 | std::atomic<int> num_compactions_running(0); |
614 | 4 | std::atomic<bool> has_parallel(false); |
615 | 4 | rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start", |
616 | 50 | [&](void* arg) { |
617 | 50 | if (num_compactions_running.fetch_add(1) > 0) { |
618 | 6 | has_parallel.store(true); |
619 | 6 | return; |
620 | 6 | } |
621 | 208 | for (int nwait = 0; nwait < 20000; nwait++) { |
622 | 208 | if (has_parallel.load() || num_compactions_running.load() > 1) { |
623 | 44 | has_parallel.store(true); |
624 | 44 | break; |
625 | 44 | } |
626 | 164 | env_->SleepForMicroseconds(1000); |
627 | 164 | } |
628 | 44 | }); |
629 | 4 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
630 | 4 | "CompactionJob::Run():End", |
631 | 50 | [&](void* arg) { num_compactions_running.fetch_add(-1); }); |
632 | 4 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
633 | | |
634 | 4 | options = CurrentOptions(options); |
635 | 4 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
636 | | |
637 | 4 | Random rnd(301); |
638 | 4 | int num_keys = 30000; |
639 | 160k | for (int i = 0; i < num_keys * 2; i++) { |
640 | 160k | ASSERT_OK(Put(1, Key(i % num_keys), Key(i))); |
641 | 160k | } |
642 | 2 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
643 | | |
644 | 2 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
645 | 2 | ASSERT_EQ(num_compactions_running.load(), 0); |
646 | 2 | ASSERT_TRUE(has_parallel.load()); |
647 | | |
648 | 60.0k | for (int i = num_keys; i < num_keys * 2; i++) { |
649 | 60.0k | ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); |
650 | 60.0k | } |
651 | | |
652 | | // Reopen and check. |
653 | 2 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
654 | 60.0k | for (int i = num_keys; i < num_keys * 2; i++) { |
655 | 60.0k | ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); |
656 | 60.0k | } |
657 | 2 | } |
658 | | |
659 | | INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel, |
660 | | DBTestUniversalCompactionParallel, |
661 | | ::testing::Combine(::testing::Values(1, 10), |
662 | | ::testing::Bool())); |
663 | | |
664 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionOptions) { |
665 | 6 | Options options; |
666 | 6 | options.compaction_style = kCompactionStyleUniversal; |
667 | 6 | options.write_buffer_size = 105 << 10; // 105KB |
668 | 6 | options.arena_block_size = 4 << 10; // 4KB |
669 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
670 | 6 | options.level0_file_num_compaction_trigger = 4; |
671 | 6 | options.num_levels = num_levels_; |
672 | 6 | options.compaction_options_universal.compression_size_percent = -1; |
673 | 6 | options = CurrentOptions(options); |
674 | 6 | DestroyAndReopen(options); |
675 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
676 | | |
677 | 6 | Random rnd(301); |
678 | 6 | int key_idx = 0; |
679 | | |
680 | 30 | for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { |
681 | | // Write 100KB (100 values, each 1K) |
682 | 2.42k | for (int i = 0; i < 100; i++) { |
683 | 2.40k | ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 990))); |
684 | 2.40k | key_idx++; |
685 | 2.40k | } |
686 | 24 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); |
687 | | |
688 | 24 | if (num < options.level0_file_num_compaction_trigger - 1) { |
689 | 18 | ASSERT_EQ(NumSortedRuns(1), num + 1); |
690 | 18 | } |
691 | 24 | } |
692 | | |
693 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
694 | 6 | ASSERT_EQ(NumSortedRuns(1), 1); |
695 | 6 | } |
696 | | |
697 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionStopStyleSimilarSize) { |
698 | 6 | Options options = CurrentOptions(); |
699 | 6 | options.compaction_style = kCompactionStyleUniversal; |
700 | 6 | options.write_buffer_size = 105 << 10; // 105KB |
701 | 6 | options.arena_block_size = 4 << 10; // 4KB |
702 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
703 | | // trigger compaction if there are >= 4 files |
704 | 6 | options.level0_file_num_compaction_trigger = 4; |
705 | 6 | options.compaction_options_universal.size_ratio = 10; |
706 | 6 | options.compaction_options_universal.stop_style = |
707 | 6 | kCompactionStopStyleSimilarSize; |
708 | 6 | options.num_levels = num_levels_; |
709 | 6 | DestroyAndReopen(options); |
710 | | |
711 | 6 | Random rnd(301); |
712 | 6 | int key_idx = 0; |
713 | | |
714 | | // Stage 1: |
715 | | // Generate a set of files at level 0, but don't trigger level-0 |
716 | | // compaction. |
717 | 24 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; |
718 | 18 | num++) { |
719 | | // Write 100KB (100 values, each 1K) |
720 | 1.81k | for (int i = 0; i < 100; i++) { |
721 | 1.80k | ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990))); |
722 | 1.80k | key_idx++; |
723 | 1.80k | } |
724 | 18 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
725 | 18 | ASSERT_EQ(NumSortedRuns(), num + 1); |
726 | 18 | } |
727 | | |
728 | | // Generate one more file at level-0, which should trigger level-0 |
729 | | // compaction. |
730 | 606 | for (int i = 0; i < 100; i++) { |
731 | 600 | ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990))); |
732 | 600 | key_idx++; |
733 | 600 | } |
734 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
735 | | // Suppose each file flushed from mem table has size 1. Now we compact |
736 | | // (level0_file_num_compaction_trigger+1)=4 files and should have a big |
737 | | // file of size 4. |
738 | 6 | ASSERT_EQ(NumSortedRuns(), 1); |
739 | | |
740 | | // Stage 2: |
741 | | // Now we have one file at level 0, with size 4. We also have some data in |
742 | | // mem table. Let's continue generating new files at level 0, but don't |
743 | | // trigger level-0 compaction. |
744 | | // First, clean up memtable before inserting new data. This will generate |
745 | | // a level-0 file, with size around 0.4 (according to previously written |
746 | | // data amount). |
747 | 6 | ASSERT_OK(dbfull()->Flush(FlushOptions())); |
748 | 12 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 3; |
749 | 6 | num++) { |
750 | | // Write 110KB (11 values, each 10K) |
751 | 606 | for (int i = 0; i < 100; i++) { |
752 | 600 | ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990))); |
753 | 600 | key_idx++; |
754 | 600 | } |
755 | 6 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
756 | 6 | ASSERT_EQ(NumSortedRuns(), num + 3); |
757 | 6 | } |
758 | | |
759 | | // Generate one more file at level-0, which should trigger level-0 |
760 | | // compaction. |
761 | 606 | for (int i = 0; i < 100; i++) { |
762 | 600 | ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990))); |
763 | 600 | key_idx++; |
764 | 600 | } |
765 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
766 | | // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. |
767 | | // After compaction, we should have 3 files, with size 4, 0.4, 2. |
768 | 6 | ASSERT_EQ(NumSortedRuns(), 3); |
769 | | // Stage 3: |
770 | | // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one |
771 | | // more file at level-0, which should trigger level-0 compaction. |
772 | 606 | for (int i = 0; i < 100; i++) { |
773 | 600 | ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990))); |
774 | 600 | key_idx++; |
775 | 600 | } |
776 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
777 | | // Level-0 compaction is triggered, but no file will be picked up. |
778 | 6 | ASSERT_EQ(NumSortedRuns(), 4); |
779 | 6 | } |
780 | | |
781 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionCompressRatio1) { |
782 | 6 | if (!Snappy_Supported()) { |
783 | 0 | return; |
784 | 0 | } |
785 | | |
786 | 6 | Options options; |
787 | 6 | options.compaction_style = kCompactionStyleUniversal; |
788 | 6 | options.write_buffer_size = 100 << 10; // 100KB |
789 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
790 | 6 | options.level0_file_num_compaction_trigger = 2; |
791 | 6 | options.num_levels = num_levels_; |
792 | 6 | options.compaction_options_universal.compression_size_percent = 70; |
793 | 6 | options = CurrentOptions(options); |
794 | 6 | DestroyAndReopen(options); |
795 | | |
796 | 6 | Random rnd(301); |
797 | 6 | int key_idx = 0; |
798 | | |
799 | | // The first compaction (2) is compressed. |
800 | 18 | for (int num = 0; num < 2; num++) { |
801 | | // Write 110KB (11 values, each 10K) |
802 | 144 | for (int i = 0; i < 11; i++) { |
803 | 132 | ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); |
804 | 132 | key_idx++; |
805 | 132 | } |
806 | 12 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
807 | 12 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
808 | 12 | } |
809 | 6 | ASSERT_LT(TotalSize(), 110000U * 2 * 0.9); |
810 | | |
811 | | // The second compaction (4) is compressed |
812 | 18 | for (int num = 0; num < 2; num++) { |
813 | | // Write 110KB (11 values, each 10K) |
814 | 144 | for (int i = 0; i < 11; i++) { |
815 | 132 | ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); |
816 | 132 | key_idx++; |
817 | 132 | } |
818 | 12 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
819 | 12 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
820 | 12 | } |
821 | 6 | ASSERT_LT(TotalSize(), 110000 * 4 * 0.9); |
822 | | |
823 | | // The third compaction (2 4) is compressed since this time it is |
824 | | // (1 1 3.2) and 3.2/5.2 doesn't reach ratio. |
825 | 18 | for (int num = 0; num < 2; num++) { |
826 | | // Write 110KB (11 values, each 10K) |
827 | 144 | for (int i = 0; i < 11; i++) { |
828 | 132 | ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); |
829 | 132 | key_idx++; |
830 | 132 | } |
831 | 12 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
832 | 12 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
833 | 12 | } |
834 | 6 | ASSERT_LT(TotalSize(), 110000 * 6 * 0.9); |
835 | | |
836 | | // When we start for the compaction up to (2 4 8), the latest |
837 | | // compressed is not compressed. |
838 | 54 | for (int num = 0; num < 8; num++) { |
839 | | // Write 110KB (11 values, each 10K) |
840 | 576 | for (int i = 0; i < 11; i++) { |
841 | 528 | ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); |
842 | 528 | key_idx++; |
843 | 528 | } |
844 | 48 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
845 | 48 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
846 | 48 | } |
847 | 6 | ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2); |
848 | 6 | } |
849 | | |
850 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionCompressRatio2) { |
851 | 6 | if (!Snappy_Supported()) { |
852 | 0 | return; |
853 | 0 | } |
854 | 6 | Options options; |
855 | 6 | options.compaction_style = kCompactionStyleUniversal; |
856 | 6 | options.write_buffer_size = 100 << 10; // 100KB |
857 | 6 | options.target_file_size_base = 32 << 10; // 32KB |
858 | 6 | options.level0_file_num_compaction_trigger = 2; |
859 | 6 | options.num_levels = num_levels_; |
860 | 6 | options.compaction_options_universal.compression_size_percent = 95; |
861 | 6 | options = CurrentOptions(options); |
862 | 6 | DestroyAndReopen(options); |
863 | | |
864 | 6 | Random rnd(301); |
865 | 6 | int key_idx = 0; |
866 | | |
867 | | // When we start for the compaction up to (2 4 8), the latest |
868 | | // compressed is compressed given the size ratio to compress. |
869 | 90 | for (int num = 0; num < 14; num++) { |
870 | | // Write 120KB (12 values, each 10K) |
871 | 1.09k | for (int i = 0; i < 12; i++) { |
872 | 1.00k | ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); |
873 | 1.00k | key_idx++; |
874 | 1.00k | } |
875 | 84 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
876 | 84 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
877 | 84 | } |
878 | 6 | ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2); |
879 | 6 | } |
880 | | |
881 | | // Test that checks trivial move in universal compaction |
882 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionTrivialMoveTest1) { |
883 | 6 | int32_t trivial_move = 0; |
884 | 6 | int32_t non_trivial_move = 0; |
885 | 6 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
886 | 6 | "DBImpl::BackgroundCompaction:TrivialMove", |
887 | 30 | [&](void* arg) { trivial_move++; }); |
888 | 6 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
889 | 305 | "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) { |
890 | 305 | non_trivial_move++; |
891 | 305 | ASSERT_TRUE(arg != nullptr); |
892 | 305 | int output_level = *(static_cast<int*>(arg)); |
893 | 305 | ASSERT_EQ(output_level, 0); |
894 | 305 | }); |
895 | 6 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
896 | | |
897 | 6 | Options options; |
898 | 6 | options.compaction_style = kCompactionStyleUniversal; |
899 | 6 | options.compaction_options_universal.allow_trivial_move = true; |
900 | 6 | options.num_levels = 2; |
901 | 6 | options.write_buffer_size = 100 << 10; // 100KB |
902 | 6 | options.level0_file_num_compaction_trigger = 3; |
903 | 6 | options.max_background_compactions = 1; |
904 | 6 | options.target_file_size_base = 32 * 1024; |
905 | 6 | options = CurrentOptions(options); |
906 | 6 | DestroyAndReopen(options); |
907 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
908 | | |
909 | | // Trigger compaction if size amplification exceeds 110% |
910 | 6 | options.compaction_options_universal.max_size_amplification_percent = 110; |
911 | 6 | options = CurrentOptions(options); |
912 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
913 | | |
914 | 6 | Random rnd(301); |
915 | 6 | int num_keys = 250000; |
916 | 1.50M | for (int i = 0; i < num_keys; i++) { |
917 | 1.50M | ASSERT_OK(Put(1, Key(i), Key(i))); |
918 | 1.50M | } |
919 | 6 | std::vector<std::string> values; |
920 | | |
921 | 6 | ASSERT_OK(Flush(1)); |
922 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
923 | | |
924 | 6 | ASSERT_GT(trivial_move, 0); |
925 | 6 | ASSERT_GT(non_trivial_move, 0); |
926 | | |
927 | 6 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
928 | 6 | } |
929 | | // Test that checks trivial move in universal compaction |
930 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionTrivialMoveTest2) { |
931 | 6 | int32_t trivial_move = 0; |
932 | 6 | int32_t non_trivial_move = 0; |
933 | 6 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
934 | 6 | "DBImpl::BackgroundCompaction:TrivialMove", |
935 | 138 | [&](void* arg) { trivial_move++; }); |
936 | 6 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
937 | 6 | "DBImpl::BackgroundCompaction:NonTrivial", |
938 | 0 | [&](void* arg) { non_trivial_move++; }); |
939 | | |
940 | 6 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
941 | | |
942 | 6 | Options options; |
943 | 6 | options.compaction_style = kCompactionStyleUniversal; |
944 | 6 | options.compaction_options_universal.allow_trivial_move = true; |
945 | 6 | options.num_levels = 15; |
946 | 6 | options.write_buffer_size = 100 << 10; // 100KB |
947 | 6 | options.level0_file_num_compaction_trigger = 8; |
948 | 6 | options.max_background_compactions = 4; |
949 | 6 | options.target_file_size_base = 64 * 1024; |
950 | 6 | options = CurrentOptions(options); |
951 | 6 | DestroyAndReopen(options); |
952 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
953 | | |
954 | | // Trigger compaction if size amplification exceeds 110% |
955 | 6 | options.compaction_options_universal.max_size_amplification_percent = 110; |
956 | 6 | options = CurrentOptions(options); |
957 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
958 | | |
959 | 6 | Random rnd(301); |
960 | 6 | int num_keys = 500000; |
961 | 1.49M | for (int i = 0; i < num_keys; i++) { |
962 | 1.49M | ASSERT_OK(Put(1, Key(i), Key(i))); |
963 | 1.49M | } |
964 | 0 | std::vector<std::string> values; |
965 | |
|
966 | 0 | ASSERT_OK(Flush(1)); |
967 | 0 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
968 | |
|
969 | 0 | ASSERT_GT(trivial_move, 0); |
970 | 0 | ASSERT_EQ(non_trivial_move, 0); |
971 | |
|
972 | 0 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
973 | 0 | } |
974 | | |
975 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionFourPaths) { |
976 | 6 | Options options; |
977 | 6 | options.db_paths.emplace_back(dbname_, 300 * 1024); |
978 | 6 | options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024); |
979 | 6 | options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024); |
980 | 6 | options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024); |
981 | 6 | options.memtable_factory.reset( |
982 | 6 | new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); |
983 | 6 | options.compaction_style = kCompactionStyleUniversal; |
984 | 6 | options.compaction_options_universal.size_ratio = 5; |
985 | 6 | options.write_buffer_size = 110 << 10; // 105KB |
986 | 6 | options.arena_block_size = 4 << 10; |
987 | 6 | options.level0_file_num_compaction_trigger = 2; |
988 | 6 | options.num_levels = 1; |
989 | 6 | options = CurrentOptions(options); |
990 | | |
991 | 6 | ASSERT_OK(DeleteRecursively(env_, options.db_paths[1].path)); |
992 | 6 | Reopen(options); |
993 | | |
994 | 6 | Random rnd(301); |
995 | 6 | int key_idx = 0; |
996 | | |
997 | | // First three 110KB files are not going to second path. |
998 | | // After that, (100K, 200K) |
999 | 24 | for (int num = 0; num < 3; num++) { |
1000 | 18 | GenerateNewFile(&rnd, &key_idx); |
1001 | 18 | } |
1002 | | |
1003 | | // Another 110KB triggers a compaction to 400K file to second path |
1004 | 6 | GenerateNewFile(&rnd, &key_idx); |
1005 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); |
1006 | | |
1007 | | // (1, 4) |
1008 | 6 | GenerateNewFile(&rnd, &key_idx); |
1009 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); |
1010 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1011 | | |
1012 | | // (1,1,4) -> (2, 4) |
1013 | 6 | GenerateNewFile(&rnd, &key_idx); |
1014 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); |
1015 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1016 | 6 | ASSERT_EQ(0, GetSstFileCount(dbname_)); |
1017 | | |
1018 | | // (1, 2, 4) |
1019 | 6 | GenerateNewFile(&rnd, &key_idx); |
1020 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); |
1021 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1022 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1023 | | |
1024 | | // (1, 1, 2, 4) -> (8) |
1025 | 6 | GenerateNewFile(&rnd, &key_idx); |
1026 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); |
1027 | | |
1028 | | // (1, 8) |
1029 | 6 | GenerateNewFile(&rnd, &key_idx); |
1030 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); |
1031 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1032 | | |
1033 | | // (1, 1, 8) -> (2, 8) |
1034 | 6 | GenerateNewFile(&rnd, &key_idx); |
1035 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); |
1036 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1037 | | |
1038 | | // (1, 2, 8) |
1039 | 6 | GenerateNewFile(&rnd, &key_idx); |
1040 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); |
1041 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1042 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1043 | | |
1044 | | // (1, 1, 2, 8) -> (4, 8) |
1045 | 6 | GenerateNewFile(&rnd, &key_idx); |
1046 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); |
1047 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); |
1048 | | |
1049 | | // (1, 4, 8) |
1050 | 6 | GenerateNewFile(&rnd, &key_idx); |
1051 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); |
1052 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); |
1053 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1054 | | |
1055 | 7.80k | for (int i = 0; i < key_idx; i++) { |
1056 | 7.80k | auto v = Get(Key(i)); |
1057 | 7.80k | ASSERT_NE(v, "NOT_FOUND"); |
1058 | 7.80k | ASSERT_TRUE(v.size() == 1 || v.size() == 990); |
1059 | 7.80k | } |
1060 | | |
1061 | 6 | Reopen(options); |
1062 | | |
1063 | 7.80k | for (int i = 0; i < key_idx; i++) { |
1064 | 7.80k | auto v = Get(Key(i)); |
1065 | 7.80k | ASSERT_NE(v, "NOT_FOUND"); |
1066 | 7.80k | ASSERT_TRUE(v.size() == 1 || v.size() == 990); |
1067 | 7.80k | } |
1068 | | |
1069 | 6 | Destroy(options); |
1070 | 6 | } |
1071 | | |
1072 | 6 | TEST_P(DBTestUniversalCompactionWithParam, IncreaseUniversalCompactionNumLevels) { |
1073 | 18 | std::function<void(int)> verify_func = [&](int num_keys_in_db) { |
1074 | 18 | std::string keys_in_db; |
1075 | 18 | Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]); |
1076 | 9.63k | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
1077 | 9.61k | keys_in_db.append(iter->key().ToString()); |
1078 | 9.61k | keys_in_db.push_back(','); |
1079 | 9.61k | } |
1080 | 18 | delete iter; |
1081 | | |
1082 | 18 | std::string expected_keys; |
1083 | 9.63k | for (int i = 0; i <= num_keys_in_db; i++) { |
1084 | 9.61k | expected_keys.append(Key(i)); |
1085 | 9.61k | expected_keys.push_back(','); |
1086 | 9.61k | } |
1087 | | |
1088 | 18 | ASSERT_EQ(keys_in_db, expected_keys); |
1089 | 18 | }; |
1090 | | |
1091 | 6 | Random rnd(301); |
1092 | 6 | int max_key1 = 200; |
1093 | 6 | int max_key2 = 600; |
1094 | 6 | int max_key3 = 800; |
1095 | 6 | const int KNumKeysPerFile = 10; |
1096 | | |
1097 | | // Stage 1: open a DB with universal compaction, num_levels=1 |
1098 | 6 | Options options = CurrentOptions(); |
1099 | 6 | options.compaction_style = kCompactionStyleUniversal; |
1100 | 6 | options.num_levels = 1; |
1101 | 6 | options.write_buffer_size = 200 << 10; // 200KB |
1102 | 6 | options.level0_file_num_compaction_trigger = 3; |
1103 | 6 | options.memtable_factory.reset(new SpecialSkipListFactory(KNumKeysPerFile)); |
1104 | 6 | options = CurrentOptions(options); |
1105 | 6 | CreateAndReopenWithCF({"pikachu"}, options); |
1106 | | |
1107 | 1.21k | for (int i = 0; i <= max_key1; i++) { |
1108 | | // each value is 10K |
1109 | 1.20k | ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); |
1110 | 1.20k | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); |
1111 | 1.20k | } |
1112 | 6 | ASSERT_OK(Flush(1)); |
1113 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
1114 | | |
1115 | | // Stage 2: reopen with universal compaction, num_levels=4 |
1116 | 6 | options.compaction_style = kCompactionStyleUniversal; |
1117 | 6 | options.num_levels = 4; |
1118 | 6 | options = CurrentOptions(options); |
1119 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
1120 | | |
1121 | 6 | verify_func(max_key1); |
1122 | | |
1123 | | // Insert more keys |
1124 | 2.40k | for (int i = max_key1 + 1; i <= max_key2; i++) { |
1125 | | // each value is 10K |
1126 | 2.40k | ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); |
1127 | 2.40k | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); |
1128 | 2.40k | } |
1129 | 6 | ASSERT_OK(Flush(1)); |
1130 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
1131 | | |
1132 | 6 | verify_func(max_key2); |
1133 | | // Compaction to non-L0 has happened. |
1134 | 6 | ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0); |
1135 | | |
1136 | | // Stage 3: Revert it back to one level and revert to num_levels=1. |
1137 | 6 | options.num_levels = 4; |
1138 | 6 | options.target_file_size_base = INT_MAX; |
1139 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
1140 | | // Compact all to level 0 |
1141 | 6 | CompactRangeOptions compact_options; |
1142 | 6 | compact_options.change_level = true; |
1143 | 6 | compact_options.target_level = 0; |
1144 | 6 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; |
1145 | 6 | ASSERT_OK(dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr)); |
1146 | | // Need to restart it once to remove higher level records in manifest. |
1147 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
1148 | | // Final reopen |
1149 | 6 | options.compaction_style = kCompactionStyleUniversal; |
1150 | 6 | options.num_levels = 1; |
1151 | 6 | options = CurrentOptions(options); |
1152 | 6 | ReopenWithColumnFamilies({"default", "pikachu"}, options); |
1153 | | |
1154 | | // Insert more keys |
1155 | 1.20k | for (int i = max_key2 + 1; i <= max_key3; i++) { |
1156 | | // each value is 10K |
1157 | 1.20k | ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); |
1158 | 1.20k | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); |
1159 | 1.20k | } |
1160 | 6 | ASSERT_OK(Flush(1)); |
1161 | 6 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
1162 | 6 | verify_func(max_key3); |
1163 | 6 | } |
1164 | | |
1165 | | |
1166 | 6 | TEST_P(DBTestUniversalCompactionWithParam, UniversalCompactionSecondPathRatio) { |
1167 | 6 | if (!Snappy_Supported()) { |
1168 | 0 | return; |
1169 | 0 | } |
1170 | 6 | Options options; |
1171 | 6 | options.db_paths.emplace_back(dbname_, 500 * 1024); |
1172 | 6 | options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024); |
1173 | 6 | options.compaction_style = kCompactionStyleUniversal; |
1174 | 6 | options.compaction_options_universal.size_ratio = 5; |
1175 | 6 | options.write_buffer_size = 110 << 10; // 105KB |
1176 | 6 | options.arena_block_size = 4 * 1024; |
1177 | 6 | options.arena_block_size = 4 << 10; |
1178 | 6 | options.level0_file_num_compaction_trigger = 2; |
1179 | 6 | options.num_levels = 1; |
1180 | 6 | options.memtable_factory.reset( |
1181 | 6 | new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); |
1182 | 6 | options = CurrentOptions(options); |
1183 | | |
1184 | 6 | ASSERT_OK(DeleteRecursively(env_, options.db_paths[1].path)); |
1185 | 6 | Reopen(options); |
1186 | | |
1187 | 6 | Random rnd(301); |
1188 | 6 | int key_idx = 0; |
1189 | | |
1190 | | // First three 110KB files are not going to second path. |
1191 | | // After that, (100K, 200K) |
1192 | 24 | for (int num = 0; num < 3; num++) { |
1193 | 18 | GenerateNewFile(&rnd, &key_idx); |
1194 | 18 | } |
1195 | | |
1196 | | // Another 110KB triggers a compaction to 400K file to second path |
1197 | 6 | GenerateNewFile(&rnd, &key_idx); |
1198 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1199 | | |
1200 | | // (1, 4) |
1201 | 6 | GenerateNewFile(&rnd, &key_idx); |
1202 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1203 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1204 | | |
1205 | | // (1,1,4) -> (2, 4) |
1206 | 6 | GenerateNewFile(&rnd, &key_idx); |
1207 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1208 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1209 | | |
1210 | | // (1, 2, 4) |
1211 | 6 | GenerateNewFile(&rnd, &key_idx); |
1212 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1213 | 6 | ASSERT_EQ(2, GetSstFileCount(dbname_)); |
1214 | | |
1215 | | // (1, 1, 2, 4) -> (8) |
1216 | 6 | GenerateNewFile(&rnd, &key_idx); |
1217 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1218 | 6 | ASSERT_EQ(0, GetSstFileCount(dbname_)); |
1219 | | |
1220 | | // (1, 8) |
1221 | 6 | GenerateNewFile(&rnd, &key_idx); |
1222 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1223 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1224 | | |
1225 | | // (1, 1, 8) -> (2, 8) |
1226 | 6 | GenerateNewFile(&rnd, &key_idx); |
1227 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1228 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1229 | | |
1230 | | // (1, 2, 8) |
1231 | 6 | GenerateNewFile(&rnd, &key_idx); |
1232 | 6 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1233 | 6 | ASSERT_EQ(2, GetSstFileCount(dbname_)); |
1234 | | |
1235 | | // (1, 1, 2, 8) -> (4, 8) |
1236 | 6 | GenerateNewFile(&rnd, &key_idx); |
1237 | 6 | ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); |
1238 | 6 | ASSERT_EQ(0, GetSstFileCount(dbname_)); |
1239 | | |
1240 | | // (1, 4, 8) |
1241 | 6 | GenerateNewFile(&rnd, &key_idx); |
1242 | 6 | ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); |
1243 | 6 | ASSERT_EQ(1, GetSstFileCount(dbname_)); |
1244 | | |
1245 | 7.80k | for (int i = 0; i < key_idx; i++) { |
1246 | 7.80k | auto v = Get(Key(i)); |
1247 | 7.80k | ASSERT_NE(v, "NOT_FOUND"); |
1248 | 7.80k | ASSERT_TRUE(v.size() == 1 || v.size() == 990); |
1249 | 7.80k | } |
1250 | | |
1251 | 6 | Reopen(options); |
1252 | | |
1253 | 7.80k | for (int i = 0; i < key_idx; i++) { |
1254 | 7.80k | auto v = Get(Key(i)); |
1255 | 7.80k | ASSERT_NE(v, "NOT_FOUND"); |
1256 | 7.80k | ASSERT_TRUE(v.size() == 1 || v.size() == 990); |
1257 | 7.80k | } |
1258 | | |
1259 | 6 | Destroy(options); |
1260 | 6 | } |
1261 | | |
1262 | | INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompactionWithParam, |
1263 | | ::testing::Combine(::testing::Values(1, 3, 5), |
1264 | | ::testing::Bool())); |
1265 | | |
1266 | | class DBTestUniversalManualCompactionOutputPathId |
1267 | | : public DBTestUniversalCompactionBase { |
1268 | | public: |
1269 | | DBTestUniversalManualCompactionOutputPathId() : |
1270 | | DBTestUniversalCompactionBase( |
1271 | 4 | "/db_universal_compaction_manual_pid_test") {} |
1272 | | }; |
1273 | | |
1274 | | TEST_P(DBTestUniversalManualCompactionOutputPathId, |
1275 | 4 | ManualCompactionOutputPathId) { |
1276 | 4 | Options options = CurrentOptions(); |
1277 | 4 | options.create_if_missing = true; |
1278 | 4 | options.db_paths.emplace_back(dbname_, 1000000000); |
1279 | 4 | options.db_paths.emplace_back(dbname_ + "_2", 1000000000); |
1280 | 4 | options.compaction_style = kCompactionStyleUniversal; |
1281 | 4 | options.num_levels = num_levels_; |
1282 | 4 | options.target_file_size_base = 1 << 30; // Big size |
1283 | 4 | options.level0_file_num_compaction_trigger = 10; |
1284 | 4 | Destroy(options); |
1285 | 4 | DestroyAndReopen(options); |
1286 | 4 | CreateAndReopenWithCF({"pikachu"}, options); |
1287 | 4 | MakeTables(3, "p", "q", 1); |
1288 | 4 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
1289 | 4 | ASSERT_EQ(2, TotalLiveFiles(1)); |
1290 | 4 | ASSERT_EQ(2, GetSstFileCount(options.db_paths[0].path)); |
1291 | 4 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); |
1292 | | |
1293 | | // Full compaction to DB path 0 |
1294 | 4 | CompactRangeOptions compact_options; |
1295 | 4 | compact_options.target_path_id = 1; |
1296 | 4 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; |
1297 | 4 | ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); |
1298 | 4 | ASSERT_EQ(1, TotalLiveFiles(1)); |
1299 | 4 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); |
1300 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1301 | | |
1302 | 4 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); |
1303 | 4 | ASSERT_EQ(1, TotalLiveFiles(1)); |
1304 | 4 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); |
1305 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1306 | | |
1307 | 4 | MakeTables(1, "p", "q", 1); |
1308 | 4 | ASSERT_EQ(2, TotalLiveFiles(1)); |
1309 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); |
1310 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1311 | | |
1312 | 4 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); |
1313 | 4 | ASSERT_EQ(2, TotalLiveFiles(1)); |
1314 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); |
1315 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); |
1316 | | |
1317 | | // Full compaction to DB path 0 |
1318 | 4 | compact_options.target_path_id = 0; |
1319 | 4 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; |
1320 | 4 | ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); |
1321 | 4 | ASSERT_EQ(1, TotalLiveFiles(1)); |
1322 | 4 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); |
1323 | 4 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); |
1324 | | |
1325 | | // Fail when compacting to an invalid path ID |
1326 | 4 | compact_options.target_path_id = 2; |
1327 | 4 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; |
1328 | 4 | ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) |
1329 | 4 | .IsInvalidArgument()); |
1330 | 4 | } |
1331 | | |
1332 | | INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId, |
1333 | | DBTestUniversalManualCompactionOutputPathId, |
1334 | | ::testing::Combine(::testing::Values(1, 8), |
1335 | | ::testing::Bool())); |
1336 | | |
1337 | | class DBTestUniversalCompaction : public DBTestBase { |
1338 | | public: |
1339 | 2 | DBTestUniversalCompaction() : DBTestBase("/db_universal_compaction_test") {} |
1340 | | void GenerateFilesAndCheckCompactionResult( |
1341 | | const Options& options, const std::vector<size_t>& keys_per_file, int value_size, |
1342 | | int num_output_files); |
1343 | | }; |
1344 | | |
1345 | | void DBTestUniversalCompaction::GenerateFilesAndCheckCompactionResult( |
1346 | | const Options& options, const std::vector<size_t>& keys_per_file, int value_size, |
1347 | 9 | int num_output_files) { |
1348 | 9 | DestroyAndReopen(options); |
1349 | | |
1350 | 9 | ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "true"}})); |
1351 | | |
1352 | 9 | LOG(INFO) << "Generating files with keys counts: " << yb::ToString(keys_per_file); |
1353 | | |
1354 | 9 | Random rnd(301); |
1355 | 9 | int key_idx = 0; |
1356 | | |
1357 | 57 | for (size_t num = 0; num < keys_per_file.size(); num++) { |
1358 | 2.47k | for (size_t i = 0; i < keys_per_file[num]; i++) { |
1359 | 2.42k | ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, value_size))); |
1360 | 2.42k | key_idx++; |
1361 | 2.42k | } |
1362 | 48 | ASSERT_OK(Flush()); |
1363 | 48 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); |
1364 | 48 | ASSERT_EQ(NumSortedRuns(0), num + 1); |
1365 | 48 | } |
1366 | | |
1367 | 9 | ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()})); |
1368 | | |
1369 | 9 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
1370 | | |
1371 | 9 | ASSERT_EQ(NumSortedRuns(0), num_output_files); |
1372 | 9 | } |
1373 | | |
1374 | 1 | TEST_F(DBTestUniversalCompaction, DontDeleteOutput) { |
1375 | 1 | Options options; |
1376 | 1 | options.env = env_; |
1377 | 1 | options.create_if_missing = true; |
1378 | 1 | DestroyAndReopen(options); |
1379 | | |
1380 | 1 | std::atomic<bool> stop_requested(false); |
1381 | | |
1382 | 1 | auto purge_thread = std::thread([this, &stop_requested] { |
1383 | 14.0k | while (!stop_requested) { |
1384 | 14.0k | JobContext job_context(0); |
1385 | 14.0k | dbfull()->TEST_LockMutex(); |
1386 | 14.0k | dbfull()->FindObsoleteFiles(&job_context, true /*force*/); |
1387 | 14.0k | dbfull()->TEST_UnlockMutex(); |
1388 | 14.0k | dbfull()->PurgeObsoleteFiles(job_context); |
1389 | 14.0k | job_context.Clean(); |
1390 | 14.0k | } |
1391 | 1 | }); |
1392 | | |
1393 | 301 | for (int iter = 0; iter < 300; ++iter) { |
1394 | 900 | for (int i = 0; i < 2; ++i) { |
1395 | 600 | ASSERT_OK(Put("a", "begin")); |
1396 | 600 | ASSERT_OK(Put("z", "end")); |
1397 | 600 | ASSERT_OK(Flush()); |
1398 | 600 | } |
1399 | | |
1400 | | // If locking output files , PurgeObsoleteFiles() will delete the file that Flush/Compaction |
1401 | | // just created causing error like: |
1402 | | // /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory |
1403 | 300 | Compact("a", "b"); |
1404 | 300 | } |
1405 | | |
1406 | 1 | stop_requested = true; |
1407 | 1 | purge_thread.join(); |
1408 | 1 | } |
1409 | | |
1410 | 1 | TEST_F(DBTestUniversalCompaction, IncludeFilesSmallerThanThreshold) { |
1411 | 1 | const auto value_size = 10_KB; |
1412 | 1 | Options options; |
1413 | 1 | options.compaction_style = kCompactionStyleUniversal; |
1414 | 1 | options.num_levels = 1; |
1415 | | // Make write_buffer_size high to avoid auto flush. |
1416 | 1 | options.write_buffer_size = 10000 * value_size; |
1417 | 1 | options.level0_file_num_compaction_trigger = 5; |
1418 | | // Set high percentage to avoid triggering compactions based on size amplification for this test. |
1419 | 1 | options.compaction_options_universal.max_size_amplification_percent = 10000; |
1420 | 1 | options.compaction_options_universal.stop_style = kCompactionStopStyleTotalSize; |
1421 | 1 | options.compaction_options_universal.size_ratio = 20; |
1422 | 1 | options.compaction_options_universal.always_include_size_threshold = 10 * value_size; |
1423 | 1 | options.compaction_options_universal.min_merge_width = 4; |
1424 | 1 | options = CurrentOptions(options); |
1425 | | |
1426 | | // Sequence of SST files matches read amplification compaction rule if each earlier file is less |
1427 | | // than <sum of newer files sizes> * (100 + size_ratio) / 100 or less than |
1428 | | // always_include_size_threshold. See UniversalCompactionPicker::PickCompactionUniversalReadAmp. |
1429 | | |
1430 | | // Should be compacted into 2 files since 150 > 1.2 * (10+11+25+55) = 121. |
1431 | 1 | GenerateFilesAndCheckCompactionResult(options, {150, 55, 25, 11, 10}, value_size, 2); |
1432 | | |
1433 | | // Should be compacted into 1 file since the whole files sequence matches size_ratio |
1434 | | // (each earlier file is less than 1.2 * <sum of newer files>). |
1435 | 1 | GenerateFilesAndCheckCompactionResult(options, {120, 55, 25, 11, 10}, value_size, 1); |
1436 | | |
1437 | | // No compaction should happen since 60 > 1.2*(10+11+25) = 55.2. |
1438 | 1 | GenerateFilesAndCheckCompactionResult(options, {120, 60, 25, 11, 10}, value_size, 5); |
1439 | | |
1440 | 1 | options.compaction_options_universal.always_include_size_threshold = 35 * value_size; |
1441 | | |
1442 | | // No compaction should happen even with higher threshold. |
1443 | 1 | GenerateFilesAndCheckCompactionResult(options, {120, 60, 25, 11, 10}, value_size, 5); |
1444 | | |
1445 | | // No compaction should happen since each earlier file is more than 1.2 * <sum of newer files> |
1446 | | // and only 3 files are smaller than threshold. |
1447 | 1 | GenerateFilesAndCheckCompactionResult(options, {100, 40, 16, 8, 4}, value_size, 5); |
1448 | | |
1449 | | // Should be compacted into 1 file since all files are smaller than threshold. |
1450 | 1 | GenerateFilesAndCheckCompactionResult(options, {25, 10, 4, 2, 1}, value_size, 1); |
1451 | | |
1452 | | // Should be compacted into 1 file since {180, 80, 40} matches size_ratio and {25, 10} are smaller |
1453 | | // than threshold. |
1454 | 1 | GenerateFilesAndCheckCompactionResult(options, {180, 80, 40, 25, 10}, value_size, 1); |
1455 | | |
1456 | | // Should be compacted into 2 files since {80, 40} matches matches size_ratio and {25, 10} are |
1457 | | // smaller than threshold while 200 > 1.2*(10+25+40+80)=186 and shouldn't be compacted. |
1458 | 1 | GenerateFilesAndCheckCompactionResult(options, {200, 80, 40, 25, 10}, value_size, 2); |
1459 | | |
1460 | | // Should be compacted into 1 file since all files are smaller than threshold. |
1461 | 1 | const std::vector<size_t> file_sizes = {350, 150, 60, 25, 10, 4, 2, 1}; |
1462 | 1 | options.compaction_options_universal.always_include_size_threshold = |
1463 | 1 | *std::max_element(file_sizes.begin(), file_sizes.end()) * value_size * 1.2; |
1464 | 1 | GenerateFilesAndCheckCompactionResult(options, file_sizes, value_size, 1); |
1465 | 1 | } |
1466 | | |
1467 | | } // namespace rocksdb |
1468 | | |
1469 | | #endif // !defined(ROCKSDB_LITE) |
1470 | | |
1471 | 13.2k | int main(int argc, char** argv) { |
1472 | 13.2k | #if !defined(ROCKSDB_LITE) |
1473 | 13.2k | rocksdb::port::InstallStackTraceHandler(); |
1474 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
1475 | 13.2k | return RUN_ALL_TESTS(); |
1476 | | #else |
1477 | | return 0; |
1478 | | #endif |
1479 | 13.2k | } |