/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_universal_compaction_deletion_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rocksdb/db/db_test_util.h" |
15 | | #include "yb/rocksdb/util/testutil.h" |
16 | | |
17 | | #include "yb/util/path_util.h" |
18 | | #include "yb/util/test_macros.h" |
19 | | #include "yb/util/test_util.h" |
20 | | |
21 | | using namespace std::literals; |
22 | | |
23 | | namespace rocksdb { |
24 | | |
25 | | namespace { |
26 | | constexpr auto kNumCompactionTrigger = 4; |
27 | | constexpr auto kWaitTimeout = 60s; |
28 | | } |
29 | | |
30 | | class OnFileCreationListener : public EventListener { |
31 | | public: |
32 | 3 | OnFileCreationListener() {} |
33 | | |
34 | | void OnTableFileCreated( |
35 | 26 | const TableFileCreationInfo& info) override { |
36 | 26 | LOG(INFO) << "Created SST file: " << info.file_path; |
37 | | |
38 | 26 | auto file_name = yb::BaseName(info.file_path); |
39 | | |
40 | 26 | bool do_pause; |
41 | 26 | { |
42 | 26 | std::lock_guard<std::mutex> l(mutex_); |
43 | 26 | created_file_names_.push_back(file_name); |
44 | 26 | do_pause = created_file_names_.size() > pause_after_num_files_created_; |
45 | 26 | } |
46 | | |
47 | 26 | if (do_pause) { |
48 | 5 | ASSERT_OK(yb::LoggedWaitFor( |
49 | 5 | [this, &file_name] { |
50 | 5 | std::lock_guard<std::mutex> l(mutex_); |
51 | 5 | return file_names_to_resume_.erase(file_name) > 0; |
52 | 5 | }, kWaitTimeout, yb::Format("Pausing on $0 ...", file_name))); |
53 | 5 | } |
54 | 26 | } |
55 | | |
56 | 4 | void SetPauseAfterFilesCreated(size_t n) { |
57 | 4 | pause_after_num_files_created_ = n; |
58 | 4 | } |
59 | | |
60 | | // Disable pausing newly created files, but will hold already paused ones until they are resumed |
61 | | // by ResumeFileName call. |
62 | 2 | void DisablePausing() { |
63 | 2 | pause_after_num_files_created_ = std::numeric_limits<size_t>::max(); |
64 | 2 | } |
65 | | |
66 | 5 | void ResumeFileName(const std::string& file_name) { |
67 | 5 | std::lock_guard<std::mutex> l(mutex_); |
68 | 5 | file_names_to_resume_.insert(file_name); |
69 | 5 | } |
70 | | |
71 | 0 | std::vector<std::string> CreatedFileNames() { |
72 | 0 | std::lock_guard<std::mutex> l(mutex_); |
73 | 0 | return created_file_names_; |
74 | 0 | } |
75 | | |
76 | 5 | const std::string& GetLastCreatedFileName() { |
77 | 5 | std::lock_guard<std::mutex> l(mutex_); |
78 | 5 | return created_file_names_.back(); |
79 | 5 | } |
80 | | |
81 | 50 | size_t NumFilesCreated() { |
82 | 50 | std::lock_guard<std::mutex> l(mutex_); |
83 | 50 | return created_file_names_.size(); |
84 | 50 | } |
85 | | |
86 | | private: |
87 | | std::atomic<size_t> pause_after_num_files_created_{std::numeric_limits<size_t>::max()}; |
88 | | std::mutex mutex_; |
89 | | std::unordered_set<std::string> file_names_to_resume_; |
90 | | std::vector<std::string> created_file_names_; |
91 | | }; |
92 | | |
93 | | class DBTestUniversalCompactionDeletion : public DBTestBase { |
94 | | public: |
95 | | DBTestUniversalCompactionDeletion() : |
96 | 3 | DBTestBase("/db_universal_compaction_deletion_test"), rnd_(301) {} |
97 | | |
98 | | // Creates SST file of size around, but not less than 1MB, uses key range |
99 | | // [num_sst_files_ * 50; num_sst_files_ * 50 + 100). |
100 | 21 | void CreateSstFile(bool do_flush = true) { |
101 | 2.12k | for (int j = 0; j < 100; ++j) { |
102 | 2.10k | ASSERT_OK(Put(Key(num_sst_files_ * 50 + j), RandomString(&rnd_, 10_KB))); |
103 | 2.10k | } |
104 | 21 | if (do_flush) { |
105 | 20 | ASSERT_OK(Flush()); |
106 | 20 | } |
107 | 21 | ++num_sst_files_; |
108 | 21 | } |
109 | | |
110 | 3 | Options CurrentOptions() { |
111 | 3 | Options options = DBTestBase::CurrentOptions(); |
112 | 3 | options.env = env_; |
113 | 3 | options.compaction_style = kCompactionStyleUniversal; |
114 | 3 | options.num_levels = 1; |
115 | 3 | options.write_buffer_size = 2_MB; |
116 | 3 | options.max_bytes_for_level_base = 1_MB; |
117 | 3 | options.level0_file_num_compaction_trigger = kNumCompactionTrigger; |
118 | 3 | options.max_background_flushes = 2; |
119 | 3 | options.max_background_compactions = 2; |
120 | 3 | options.listeners.push_back(file_create_listener_); |
121 | | |
122 | 3 | return options; |
123 | 3 | } |
124 | | |
125 | 4 | CHECKED_STATUS WaitForNumFilesCreated(const std::string& desc, size_t num_files) { |
126 | 4 | return yb::LoggedWaitFor( |
127 | 38 | [this, num_files] { return file_create_listener_->NumFilesCreated() >= num_files; }, |
128 | 4 | kWaitTimeout, desc); |
129 | 4 | } |
130 | | |
131 | | template <class FilePathsContainer> |
132 | | CHECKED_STATUS WaitFilePathsDeleted( |
133 | 3 | FilePathsContainer file_paths, const std::string& description) { |
134 | 3 | RETURN_NOT_OK_PREPEND( |
135 | 3 | yb::LoggedWaitFor( |
136 | 3 | [this, &file_paths] { |
137 | 3 | for (auto it = file_paths.begin(); it != file_paths.end();) { |
138 | 3 | if (env_->FileExists(*it).IsNotFound()) { |
139 | 3 | it = file_paths.erase(it); |
140 | 3 | } else { |
141 | 3 | ++it; |
142 | 3 | } |
143 | 3 | } |
144 | 3 | return file_paths.empty(); |
145 | 3 | }, |
146 | 3 | kWaitTimeout, yb::Format("Waiting for $0 to be deleted", description)), |
147 | 3 | yb::Format("$0 should be deleted: $1", description, file_paths)); |
148 | 3 | return Status::OK(); |
149 | 3 | } _ZN7rocksdb33DBTestUniversalCompactionDeletion20WaitFilePathsDeletedINSt3__16vectorINS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS7_IS9_EEEEEEN2yb6StatusET_RKS9_ Line | Count | Source | 133 | 2 | FilePathsContainer file_paths, const std::string& description) { | 134 | 2 | RETURN_NOT_OK_PREPEND( | 135 | 2 | yb::LoggedWaitFor( | 136 | 2 | [this, &file_paths] { | 137 | 2 | for (auto it = file_paths.begin(); it != file_paths.end();) { | 138 | 2 | if (env_->FileExists(*it).IsNotFound()) { | 139 | 2 | it = file_paths.erase(it); | 140 | 2 | } else { | 141 | 2 | ++it; | 142 | 2 | } | 143 | 2 | } | 144 | 2 | return file_paths.empty(); | 145 | 2 | }, | 146 | 2 | kWaitTimeout, yb::Format("Waiting for $0 to be deleted", description)), | 147 | 2 | yb::Format("$0 should be deleted: $1", description, file_paths)); | 148 | 2 | return Status::OK(); | 149 | 2 | } |
_ZN7rocksdb33DBTestUniversalCompactionDeletion20WaitFilePathsDeletedINSt3__113unordered_setINS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS2_4hashIS9_EENS2_8equal_toIS9_EENS7_IS9_EEEEEEN2yb6StatusET_RKS9_ Line | Count | Source | 133 | 1 | FilePathsContainer file_paths, const std::string& description) { | 134 | 1 | RETURN_NOT_OK_PREPEND( | 135 | 1 | yb::LoggedWaitFor( | 136 | 1 | [this, &file_paths] { | 137 | 1 | for (auto it = file_paths.begin(); it != file_paths.end();) { | 138 | 1 | if (env_->FileExists(*it).IsNotFound()) { | 139 | 1 | it = file_paths.erase(it); | 140 | 1 | } else { | 141 | 1 | ++it; | 142 | 1 | } | 143 | 1 | } | 144 | 1 | return file_paths.empty(); | 145 | 1 | }, | 146 | 1 | kWaitTimeout, yb::Format("Waiting for $0 to be deleted", description)), | 147 | 1 | yb::Format("$0 should be deleted: $1", description, file_paths)); | 148 | 1 | return Status::OK(); | 149 | 1 | } |
|
150 | | |
151 | | CHECKED_STATUS WaitLiveFilesDeleted( |
152 | 2 | const std::vector<LiveFileMetaData>& files, const std::string& description) { |
153 | 2 | std::vector<std::string> file_paths; |
154 | 8 | for (const auto& file : files) { |
155 | 8 | file_paths.push_back(dbname_ + file.name); |
156 | 8 | } |
157 | 2 | return WaitFilePathsDeleted(std::move(file_paths), description); |
158 | 2 | } |
159 | | |
160 | | Random rnd_; |
161 | | int num_sst_files_ = 0; |
162 | | std::shared_ptr<OnFileCreationListener> file_create_listener_ = |
163 | | std::make_shared<OnFileCreationListener>(); |
164 | | }; |
165 | | |
166 | | // This reproduces an issue where we delete a file too late because when it was supposed to be |
167 | | // deleted, it was blocked by concurrent flush. |
168 | | // Consider following scenario which was possible before the issue was fixed: |
169 | | // - Compaction (1) starts with base version #1 and input files #111-#114. |
170 | | // - Flush (2) starts with base version #2 (which also includes files #111-#114) and increments ref |
171 | | // counter of version #2. |
172 | | // - Compaction (1) finishes, but input files #111 and #111-#114 are not deleted, because they are |
173 | | // being held by version #2, which is being held by flush (2). |
174 | | // - Flush (2) finishes and decrements ref counter of version #2. |
175 | | // - Compaction (3) starts. |
176 | | // - Compaction (3) finishes and purging obsolete SST files including #111-#114. |
177 | 1 | TEST_F(DBTestUniversalCompactionDeletion, DeleteObsoleteFilesDelayedByFlush) { |
178 | 1 | Options options = CurrentOptions(); |
179 | 1 | Reopen(options); |
180 | | |
181 | 1 | file_create_listener_->SetPauseAfterFilesCreated(kNumCompactionTrigger); |
182 | 5 | for (int i = 0; i < kNumCompactionTrigger; ++i) { |
183 | 4 | CreateSstFile(); |
184 | 4 | } |
185 | | |
186 | 1 | std::vector<LiveFileMetaData> input_files; |
187 | 1 | db_->GetLiveFilesMetaData(&input_files); |
188 | 4 | for (auto file : input_files) { |
189 | 4 | LOG(INFO) << "Input file: " << file.ToString(); |
190 | 4 | } |
191 | | |
192 | 1 | ASSERT_OK( |
193 | 1 | WaitForNumFilesCreated("Waiting for compaction (1) delay ...", kNumCompactionTrigger + 1)); |
194 | 1 | const auto compaction_1_output = file_create_listener_->GetLastCreatedFileName(); |
195 | | |
196 | 1 | size_t num_files = file_create_listener_->NumFilesCreated(); |
197 | 1 | CreateSstFile(false /* do_flush */); |
198 | 1 | std::thread flusher([this] { |
199 | 1 | ASSERT_OK(Flush()); |
200 | 1 | }); |
201 | | |
202 | 1 | ASSERT_OK(WaitForNumFilesCreated("Waiting for flush (2) delay ...", num_files + 1)); |
203 | 1 | const auto flush_2_output = file_create_listener_->GetLastCreatedFileName(); |
204 | 1 | file_create_listener_->DisablePausing(); |
205 | | |
206 | 1 | LOG(INFO) << "Resuming compaction (1) ..."; |
207 | 1 | file_create_listener_->ResumeFileName(compaction_1_output); |
208 | 1 | ASSERT_OK(yb::LoggedWaitFor( |
209 | 1 | [this] { return dbfull()->TEST_NumTotalRunningCompactions() == 0; }, |
210 | 1 | kWaitTimeout, "Waiting for compaction (1) to be completed ...")); |
211 | | |
212 | | // Compaction (1) input files should be deleted before flush (2) is completed. |
213 | 1 | ASSERT_OK(WaitLiveFilesDeleted(input_files, "compaction (1) input files")); |
214 | | |
215 | 1 | LOG(INFO) << "Resuming flush (2) ..."; |
216 | 1 | file_create_listener_->ResumeFileName(flush_2_output); |
217 | 1 | flusher.join(); |
218 | 1 | } |
219 | | |
220 | | // This reproduces an issue where we delete compacted files too late because when they were |
221 | | // supposed to be deleted, it was blocked by concurrent huge compaction job with lower pending |
222 | | // output file number. |
223 | | // Consider following scenario which was possible before the issue was fixed: |
224 | | // - Huge compaction (1) starts to write output file #110. |
225 | | // - New files #111-#114 are written. |
226 | | // - Compaction (2) starts with input files #111-#114. |
227 | | // - Compaction (2) finishes, but input files #111-#114 are not deleted, because their numbers |
228 | | // are bigger than #110. |
229 | | // - Huge compaction (1) finishes. |
230 | | // - Compaction (3) starts. |
231 | | // - Compaction (3) finishes and purging obsolete SST files including #111-#114. |
232 | 1 | TEST_F(DBTestUniversalCompactionDeletion, DeleteObsoleteFilesMinPendingOutput) { |
233 | 1 | Options options = CurrentOptions(); |
234 | 1 | Reopen(options); |
235 | | |
236 | | // Simulate huge long-running compaction (1). |
237 | 1 | file_create_listener_->SetPauseAfterFilesCreated(kNumCompactionTrigger); |
238 | 5 | for (int i = 0; i < kNumCompactionTrigger; ++i) { |
239 | 4 | CreateSstFile(); |
240 | 4 | } |
241 | 1 | ASSERT_OK( |
242 | 1 | WaitForNumFilesCreated("Waiting for compaction (1) delay ...", kNumCompactionTrigger + 1)); |
243 | 1 | const auto compaction_1_output = file_create_listener_->GetLastCreatedFileName(); |
244 | 1 | file_create_listener_->DisablePausing(); |
245 | | |
246 | 1 | std::vector<LiveFileMetaData> live_files_1; |
247 | 1 | db_->GetLiveFilesMetaData(&live_files_1); |
248 | | // Write new files to be compacted by compaction (2). |
249 | 5 | for (int i = 0; i < kNumCompactionTrigger; ++i) { |
250 | 4 | CreateSstFile(); |
251 | 4 | } |
252 | 1 | std::unordered_set<std::string> input_files_2; |
253 | 1 | { |
254 | 1 | std::vector<LiveFileMetaData> live_files; |
255 | 1 | db_->GetLiveFilesMetaData(&live_files); |
256 | 8 | for (auto file : live_files) { |
257 | 8 | input_files_2.insert(file.name); |
258 | 8 | } |
259 | 4 | for (auto file : live_files_1) { |
260 | 4 | input_files_2.erase(file.name); |
261 | 4 | } |
262 | 1 | } |
263 | | |
264 | 1 | ASSERT_OK(yb::LoggedWaitFor( |
265 | 1 | [this] { return dbfull()->TEST_NumTotalRunningCompactions() == 1; }, kWaitTimeout, |
266 | 1 | "Waiting for compaction (2) to be completed ...")); |
267 | | |
268 | | // Compaction (2) input files should be deleted before compaction (1) is completed. |
269 | 1 | ASSERT_OK(WaitFilePathsDeleted(input_files_2, "compaction (2) input files")); |
270 | | |
271 | 1 | LOG(INFO) << "Resuming compaction (1) ..."; |
272 | 1 | file_create_listener_->ResumeFileName(compaction_1_output); |
273 | 1 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
274 | 1 | } |
275 | | |
276 | | // This reproduces an issue where we delete compacted files too late because when they were |
277 | | // supposed to be deleted, it was blocked by scheduled compaction holding input version |
278 | | // referring these files. |
279 | | // Consider following scenario which was possible before the issue was fixed: |
280 | | // - Compaction (1) starts with input files #111-#114. |
281 | | // - Flush job (2) starts with base version #10 including files #111-#114 and increments ref |
282 | | // counter of version #10. |
283 | | // - Right before finishing flush job (2) it schedules another compaction (3) with base version #10 |
284 | | // and due to this increments ref counter of version #10 again. |
285 | | // - Flush job (2) finishes, but input files #111-#114 are not deleted, because they are being |
286 | | // held by version #10 (blocked by scheduled compaction (3)). |
287 | | // - Compaction (1) finishes, but input files #111-#114 are not deleted, because they are being |
288 | | // held by version #10. |
289 | | // - Compaction (3) starts. |
290 | | // - Compaction (3) finishes and purging obsolete SST files including #111-#114. |
291 | 1 | TEST_F(DBTestUniversalCompactionDeletion, DeleteObsoleteFilesDelayedByScheduledCompaction) { |
292 | 1 | Options options = CurrentOptions(); |
293 | 1 | Reopen(options); |
294 | | |
295 | 1 | file_create_listener_->SetPauseAfterFilesCreated(kNumCompactionTrigger); |
296 | | // Trigger compaction (1). |
297 | 5 | for (int i = 0; i < kNumCompactionTrigger; ++i) { |
298 | 4 | CreateSstFile(); |
299 | 4 | } |
300 | | |
301 | 1 | std::vector<LiveFileMetaData> input_files; |
302 | 1 | db_->GetLiveFilesMetaData(&input_files); |
303 | 4 | for (auto file : input_files) { |
304 | 4 | LOG(INFO) << "Input file: " << file.ToString(); |
305 | 4 | } |
306 | | |
307 | 1 | ASSERT_OK( |
308 | 1 | WaitForNumFilesCreated("Waiting for compaction (1) delay ...", kNumCompactionTrigger + 1)); |
309 | 1 | const auto compaction_1_output = file_create_listener_->GetLastCreatedFileName(); |
310 | | |
311 | | // Allow kNumCompactionTrigger more files to be created without delay and enqueue compaction (3). |
312 | 1 | file_create_listener_->SetPauseAfterFilesCreated( |
313 | 1 | file_create_listener_->NumFilesCreated() + kNumCompactionTrigger); |
314 | 5 | for (int i = 0; i < kNumCompactionTrigger; ++i) { |
315 | 4 | CreateSstFile(); |
316 | 4 | } |
317 | | |
318 | 1 | ASSERT_OK(yb::LoggedWaitFor( |
319 | 1 | [this] { return dbfull()->TEST_NumRunningFlushes() == 0; }, kWaitTimeout, |
320 | 1 | "Waiting for flush (2) completion ...")); |
321 | | |
322 | 1 | ASSERT_OK(yb::LoggedWaitFor( |
323 | 1 | [this] { return dbfull()->TEST_NumBackgroundCompactionsScheduled() == 2; }, kWaitTimeout, |
324 | 1 | "Waiting for compaction (3) to be enqueued ...")); |
325 | | |
326 | 1 | LOG(INFO) << "Resuming compaction (1) ..."; |
327 | 1 | file_create_listener_->ResumeFileName(compaction_1_output); |
328 | 1 | ASSERT_OK(yb::LoggedWaitFor( |
329 | 1 | [this, &compaction_1_output] { |
330 | 1 | std::vector<LiveFileMetaData> files; |
331 | 1 | db_->GetLiveFilesMetaData(&files); |
332 | 1 | for (auto file : files) { |
333 | 1 | if (file.name == '/' + compaction_1_output) { |
334 | 1 | return true; |
335 | 1 | } |
336 | 1 | } |
337 | 1 | return false; |
338 | 1 | }, kWaitTimeout, "Waiting for compaction (1) to be completed ...")); |
339 | | |
340 | | // Compaction (1) input files should be deleted before compaction (3) is completed. |
341 | 1 | ASSERT_OK(WaitLiveFilesDeleted(input_files, "compaction (1) input files")); |
342 | | |
343 | | // Need to wait for compaction 3 to actually generate its output file, before we try to get that |
344 | | // file's name below. |
345 | 1 | ASSERT_OK(yb::LoggedWaitFor( |
346 | 1 | [this]{ return file_create_listener_->NumFilesCreated() == 2 * kNumCompactionTrigger + 2; }, |
347 | 1 | kWaitTimeout, "Waiting for compaction (3) to be completed")); |
348 | | |
349 | 1 | const auto compaction_3_output = file_create_listener_->GetLastCreatedFileName(); |
350 | 1 | file_create_listener_->ResumeFileName(compaction_3_output); |
351 | 1 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
352 | 1 | } |
353 | | |
354 | | } // namespace rocksdb |
355 | | |
356 | 13.2k | int main(int argc, char** argv) { |
357 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
358 | 13.2k | return RUN_ALL_TESTS(); |
359 | 13.2k | } |