YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_universal_compaction_deletion_test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rocksdb/db/db_test_util.h"
15
#include "yb/rocksdb/util/testutil.h"
16
17
#include "yb/util/path_util.h"
18
#include "yb/util/test_macros.h"
19
#include "yb/util/test_util.h"
20
21
using namespace std::literals;
22
23
namespace rocksdb {
24
25
namespace {
26
  constexpr auto kNumCompactionTrigger = 4;
27
  constexpr auto kWaitTimeout = 60s;
28
}
29
30
class OnFileCreationListener : public EventListener {
31
 public:
32
3
  OnFileCreationListener() {}
33
34
  void OnTableFileCreated(
35
26
      const TableFileCreationInfo& info) override {
36
26
    LOG(INFO) << "Created SST file: " << info.file_path;
37
38
26
    auto file_name = yb::BaseName(info.file_path);
39
40
26
    bool do_pause;
41
26
    {
42
26
      std::lock_guard<std::mutex> l(mutex_);
43
26
      created_file_names_.push_back(file_name);
44
26
      do_pause = created_file_names_.size() > pause_after_num_files_created_;
45
26
    }
46
47
26
    if (do_pause) {
48
5
      ASSERT_OK(yb::LoggedWaitFor(
49
5
          [this, &file_name] {
50
5
            std::lock_guard<std::mutex> l(mutex_);
51
5
            return file_names_to_resume_.erase(file_name) > 0;
52
5
          }, kWaitTimeout, yb::Format("Pausing on $0 ...", file_name)));
53
5
    }
54
26
  }
55
56
4
  void SetPauseAfterFilesCreated(size_t n) {
57
4
    pause_after_num_files_created_ = n;
58
4
  }
59
60
  // Disable pausing newly created files, but will hold already paused ones until they are resumed
61
  // by ResumeFileName call.
62
2
  void DisablePausing() {
63
2
    pause_after_num_files_created_ = std::numeric_limits<size_t>::max();
64
2
  }
65
66
5
  void ResumeFileName(const std::string& file_name) {
67
5
    std::lock_guard<std::mutex> l(mutex_);
68
5
    file_names_to_resume_.insert(file_name);
69
5
  }
70
71
0
  std::vector<std::string> CreatedFileNames() {
72
0
    std::lock_guard<std::mutex> l(mutex_);
73
0
    return created_file_names_;
74
0
  }
75
76
5
  const std::string& GetLastCreatedFileName() {
77
5
    std::lock_guard<std::mutex> l(mutex_);
78
5
    return created_file_names_.back();
79
5
  }
80
81
50
  size_t NumFilesCreated() {
82
50
    std::lock_guard<std::mutex> l(mutex_);
83
50
    return created_file_names_.size();
84
50
  }
85
86
 private:
87
  std::atomic<size_t> pause_after_num_files_created_{std::numeric_limits<size_t>::max()};
88
  std::mutex mutex_;
89
  std::unordered_set<std::string> file_names_to_resume_;
90
  std::vector<std::string> created_file_names_;
91
};
92
93
class DBTestUniversalCompactionDeletion : public DBTestBase {
94
 public:
95
  DBTestUniversalCompactionDeletion() :
96
3
      DBTestBase("/db_universal_compaction_deletion_test"), rnd_(301) {}
97
98
  // Creates SST file of size around, but not less than 1MB, uses key range
99
  // [num_sst_files_ * 50; num_sst_files_ * 50 + 100).
100
21
  void CreateSstFile(bool do_flush = true) {
101
2.12k
    for (int j = 0; j < 100; ++j) {
102
2.10k
      ASSERT_OK(Put(Key(num_sst_files_ * 50 + j), RandomString(&rnd_, 10_KB)));
103
2.10k
    }
104
21
    if (do_flush) {
105
20
      ASSERT_OK(Flush());
106
20
    }
107
21
    ++num_sst_files_;
108
21
  }
109
110
3
  Options CurrentOptions() {
111
3
    Options options = DBTestBase::CurrentOptions();
112
3
    options.env = env_;
113
3
    options.compaction_style = kCompactionStyleUniversal;
114
3
    options.num_levels = 1;
115
3
    options.write_buffer_size = 2_MB;
116
3
    options.max_bytes_for_level_base = 1_MB;
117
3
    options.level0_file_num_compaction_trigger = kNumCompactionTrigger;
118
3
    options.max_background_flushes = 2;
119
3
    options.max_background_compactions = 2;
120
3
    options.listeners.push_back(file_create_listener_);
121
122
3
    return options;
123
3
  }
124
125
4
  CHECKED_STATUS WaitForNumFilesCreated(const std::string& desc, size_t num_files) {
126
4
    return yb::LoggedWaitFor(
127
38
        [this, num_files] { return file_create_listener_->NumFilesCreated() >= num_files; },
128
4
        kWaitTimeout, desc);
129
4
  }
130
131
  template <class FilePathsContainer>
132
  CHECKED_STATUS WaitFilePathsDeleted(
133
3
      FilePathsContainer file_paths, const std::string& description) {
134
3
    RETURN_NOT_OK_PREPEND(
135
3
        yb::LoggedWaitFor(
136
3
            [this, &file_paths] {
137
3
              for (auto it = file_paths.begin(); it != file_paths.end();) {
138
3
                if (env_->FileExists(*it).IsNotFound()) {
139
3
                  it = file_paths.erase(it);
140
3
                } else {
141
3
                  ++it;
142
3
                }
143
3
              }
144
3
              return file_paths.empty();
145
3
            },
146
3
            kWaitTimeout, yb::Format("Waiting for $0 to be deleted", description)),
147
3
        yb::Format("$0 should be deleted: $1", description, file_paths));
148
3
    return Status::OK();
149
3
  }
_ZN7rocksdb33DBTestUniversalCompactionDeletion20WaitFilePathsDeletedINSt3__16vectorINS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS7_IS9_EEEEEEN2yb6StatusET_RKS9_
Line
Count
Source
133
2
      FilePathsContainer file_paths, const std::string& description) {
134
2
    RETURN_NOT_OK_PREPEND(
135
2
        yb::LoggedWaitFor(
136
2
            [this, &file_paths] {
137
2
              for (auto it = file_paths.begin(); it != file_paths.end();) {
138
2
                if (env_->FileExists(*it).IsNotFound()) {
139
2
                  it = file_paths.erase(it);
140
2
                } else {
141
2
                  ++it;
142
2
                }
143
2
              }
144
2
              return file_paths.empty();
145
2
            },
146
2
            kWaitTimeout, yb::Format("Waiting for $0 to be deleted", description)),
147
2
        yb::Format("$0 should be deleted: $1", description, file_paths));
148
2
    return Status::OK();
149
2
  }
_ZN7rocksdb33DBTestUniversalCompactionDeletion20WaitFilePathsDeletedINSt3__113unordered_setINS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEENS2_4hashIS9_EENS2_8equal_toIS9_EENS7_IS9_EEEEEEN2yb6StatusET_RKS9_
Line
Count
Source
133
1
      FilePathsContainer file_paths, const std::string& description) {
134
1
    RETURN_NOT_OK_PREPEND(
135
1
        yb::LoggedWaitFor(
136
1
            [this, &file_paths] {
137
1
              for (auto it = file_paths.begin(); it != file_paths.end();) {
138
1
                if (env_->FileExists(*it).IsNotFound()) {
139
1
                  it = file_paths.erase(it);
140
1
                } else {
141
1
                  ++it;
142
1
                }
143
1
              }
144
1
              return file_paths.empty();
145
1
            },
146
1
            kWaitTimeout, yb::Format("Waiting for $0 to be deleted", description)),
147
1
        yb::Format("$0 should be deleted: $1", description, file_paths));
148
1
    return Status::OK();
149
1
  }
150
151
  CHECKED_STATUS WaitLiveFilesDeleted(
152
2
      const std::vector<LiveFileMetaData>& files, const std::string& description) {
153
2
    std::vector<std::string> file_paths;
154
8
    for (const auto& file : files) {
155
8
      file_paths.push_back(dbname_ + file.name);
156
8
    }
157
2
    return WaitFilePathsDeleted(std::move(file_paths), description);
158
2
  }
159
160
  Random rnd_;
161
  int num_sst_files_ = 0;
162
  std::shared_ptr<OnFileCreationListener> file_create_listener_ =
163
      std::make_shared<OnFileCreationListener>();
164
};
165
166
// This reproduces an issue where we delete a file too late because when it was supposed to be
167
// deleted, it was blocked by concurrent flush.
168
// Consider following scenario which was possible before the issue was fixed:
169
// - Compaction (1) starts with base version #1 and input files #111-#114.
170
// - Flush (2) starts with base version #2 (which also includes files #111-#114) and increments ref
171
// counter of version #2.
172
// - Compaction (1) finishes, but input files #111 and #111-#114 are not deleted, because they are
173
// being held by version #2, which is being held by flush (2).
174
// - Flush (2) finishes and decrements ref counter of version #2.
175
// - Compaction (3) starts.
176
// - Compaction (3) finishes and purging obsolete SST files including #111-#114.
177
1
TEST_F(DBTestUniversalCompactionDeletion, DeleteObsoleteFilesDelayedByFlush) {
178
1
  Options options = CurrentOptions();
179
1
  Reopen(options);
180
181
1
  file_create_listener_->SetPauseAfterFilesCreated(kNumCompactionTrigger);
182
5
  for (int i = 0; i < kNumCompactionTrigger; ++i) {
183
4
    CreateSstFile();
184
4
  }
185
186
1
  std::vector<LiveFileMetaData> input_files;
187
1
  db_->GetLiveFilesMetaData(&input_files);
188
4
  for (auto file : input_files) {
189
4
    LOG(INFO) << "Input file: " << file.ToString();
190
4
  }
191
192
1
  ASSERT_OK(
193
1
      WaitForNumFilesCreated("Waiting for compaction (1) delay ...", kNumCompactionTrigger + 1));
194
1
  const auto compaction_1_output = file_create_listener_->GetLastCreatedFileName();
195
196
1
  size_t num_files = file_create_listener_->NumFilesCreated();
197
1
  CreateSstFile(false /* do_flush */);
198
1
  std::thread flusher([this] {
199
1
    ASSERT_OK(Flush());
200
1
  });
201
202
1
  ASSERT_OK(WaitForNumFilesCreated("Waiting for flush (2) delay ...", num_files + 1));
203
1
  const auto flush_2_output = file_create_listener_->GetLastCreatedFileName();
204
1
  file_create_listener_->DisablePausing();
205
206
1
  LOG(INFO) << "Resuming compaction (1) ...";
207
1
  file_create_listener_->ResumeFileName(compaction_1_output);
208
1
  ASSERT_OK(yb::LoggedWaitFor(
209
1
      [this] { return dbfull()->TEST_NumTotalRunningCompactions() == 0; },
210
1
      kWaitTimeout, "Waiting for compaction (1) to be completed ..."));
211
212
  // Compaction (1) input files should be deleted before flush (2) is completed.
213
1
  ASSERT_OK(WaitLiveFilesDeleted(input_files, "compaction (1) input files"));
214
215
1
  LOG(INFO) << "Resuming flush (2) ...";
216
1
  file_create_listener_->ResumeFileName(flush_2_output);
217
1
  flusher.join();
218
1
}
219
220
// This reproduces an issue where we delete compacted files too late because when they were
221
// supposed to be deleted, it was blocked by concurrent huge compaction job with lower pending
222
// output file number.
223
// Consider following scenario which was possible before the issue was fixed:
224
// - Huge compaction (1) starts to write output file #110.
225
// - New files #111-#114 are written.
226
// - Compaction (2) starts with input files #111-#114.
227
// - Compaction (2) finishes, but input files #111-#114 are not deleted, because their numbers
228
// are bigger than #110.
229
// - Huge compaction (1) finishes.
230
// - Compaction (3) starts.
231
// - Compaction (3) finishes and purging obsolete SST files including #111-#114.
232
1
TEST_F(DBTestUniversalCompactionDeletion, DeleteObsoleteFilesMinPendingOutput) {
233
1
  Options options = CurrentOptions();
234
1
  Reopen(options);
235
236
  // Simulate huge long-running compaction (1).
237
1
  file_create_listener_->SetPauseAfterFilesCreated(kNumCompactionTrigger);
238
5
  for (int i = 0; i < kNumCompactionTrigger; ++i) {
239
4
    CreateSstFile();
240
4
  }
241
1
  ASSERT_OK(
242
1
      WaitForNumFilesCreated("Waiting for compaction (1) delay ...", kNumCompactionTrigger + 1));
243
1
  const auto compaction_1_output = file_create_listener_->GetLastCreatedFileName();
244
1
  file_create_listener_->DisablePausing();
245
246
1
  std::vector<LiveFileMetaData> live_files_1;
247
1
  db_->GetLiveFilesMetaData(&live_files_1);
248
  // Write new files to be compacted by compaction (2).
249
5
  for (int i = 0; i < kNumCompactionTrigger; ++i) {
250
4
    CreateSstFile();
251
4
  }
252
1
  std::unordered_set<std::string> input_files_2;
253
1
  {
254
1
    std::vector<LiveFileMetaData> live_files;
255
1
    db_->GetLiveFilesMetaData(&live_files);
256
8
    for (auto file : live_files) {
257
8
      input_files_2.insert(file.name);
258
8
    }
259
4
    for (auto file : live_files_1) {
260
4
      input_files_2.erase(file.name);
261
4
    }
262
1
  }
263
264
1
  ASSERT_OK(yb::LoggedWaitFor(
265
1
      [this] { return dbfull()->TEST_NumTotalRunningCompactions() == 1; }, kWaitTimeout,
266
1
      "Waiting for compaction (2) to be completed ..."));
267
268
  // Compaction (2) input files should be deleted before compaction (1) is completed.
269
1
  ASSERT_OK(WaitFilePathsDeleted(input_files_2, "compaction (2) input files"));
270
271
1
  LOG(INFO) << "Resuming compaction (1)  ...";
272
1
  file_create_listener_->ResumeFileName(compaction_1_output);
273
1
  ASSERT_OK(dbfull()->TEST_WaitForCompact());
274
1
}
275
276
// This reproduces an issue where we delete compacted files too late because when they were
277
// supposed to be deleted, it was blocked by scheduled compaction holding input version
278
// referring these files.
279
// Consider following scenario which was possible before the issue was fixed:
280
// - Compaction (1) starts with input files #111-#114.
281
// - Flush job (2) starts with base version #10 including files #111-#114 and increments ref
282
// counter of version #10.
283
// - Right before finishing flush job (2) it schedules another compaction (3) with base version #10
284
// and due to this increments ref counter of version #10 again.
285
// - Flush job (2) finishes, but input files #111-#114 are not deleted, because they are being
286
// held by version #10 (blocked by scheduled compaction (3)).
287
// - Compaction (1) finishes, but input files #111-#114 are not deleted, because they are being
288
// held by version #10.
289
// - Compaction (3) starts.
290
// - Compaction (3) finishes and purging obsolete SST files including #111-#114.
291
1
TEST_F(DBTestUniversalCompactionDeletion, DeleteObsoleteFilesDelayedByScheduledCompaction) {
292
1
  Options options = CurrentOptions();
293
1
  Reopen(options);
294
295
1
  file_create_listener_->SetPauseAfterFilesCreated(kNumCompactionTrigger);
296
  // Trigger compaction (1).
297
5
  for (int i = 0; i < kNumCompactionTrigger; ++i) {
298
4
    CreateSstFile();
299
4
  }
300
301
1
  std::vector<LiveFileMetaData> input_files;
302
1
  db_->GetLiveFilesMetaData(&input_files);
303
4
  for (auto file : input_files) {
304
4
    LOG(INFO) << "Input file: " << file.ToString();
305
4
  }
306
307
1
  ASSERT_OK(
308
1
      WaitForNumFilesCreated("Waiting for compaction (1) delay ...", kNumCompactionTrigger + 1));
309
1
  const auto compaction_1_output = file_create_listener_->GetLastCreatedFileName();
310
311
  // Allow kNumCompactionTrigger more files to be created without delay and enqueue compaction (3).
312
1
  file_create_listener_->SetPauseAfterFilesCreated(
313
1
      file_create_listener_->NumFilesCreated() + kNumCompactionTrigger);
314
5
  for (int i = 0; i < kNumCompactionTrigger; ++i) {
315
4
    CreateSstFile();
316
4
  }
317
318
1
  ASSERT_OK(yb::LoggedWaitFor(
319
1
      [this] { return dbfull()->TEST_NumRunningFlushes() == 0; }, kWaitTimeout,
320
1
      "Waiting for flush (2) completion ..."));
321
322
1
  ASSERT_OK(yb::LoggedWaitFor(
323
1
      [this] { return dbfull()->TEST_NumBackgroundCompactionsScheduled() == 2; }, kWaitTimeout,
324
1
      "Waiting for compaction (3) to be enqueued ..."));
325
326
1
  LOG(INFO) << "Resuming compaction (1)  ...";
327
1
  file_create_listener_->ResumeFileName(compaction_1_output);
328
1
  ASSERT_OK(yb::LoggedWaitFor(
329
1
      [this, &compaction_1_output] {
330
1
        std::vector<LiveFileMetaData> files;
331
1
        db_->GetLiveFilesMetaData(&files);
332
1
        for (auto file : files) {
333
1
          if (file.name == '/' + compaction_1_output) {
334
1
            return true;
335
1
          }
336
1
        }
337
1
        return false;
338
1
      }, kWaitTimeout, "Waiting for compaction (1) to be completed ..."));
339
340
  // Compaction (1) input files should be deleted before compaction (3) is completed.
341
1
  ASSERT_OK(WaitLiveFilesDeleted(input_files, "compaction (1) input files"));
342
343
  // Need to wait for compaction 3 to actually generate its output file, before we try to get that
344
  // file's name below.
345
1
  ASSERT_OK(yb::LoggedWaitFor(
346
1
      [this]{ return file_create_listener_->NumFilesCreated() == 2 * kNumCompactionTrigger + 2; },
347
1
      kWaitTimeout, "Waiting for compaction (3) to be completed"));
348
349
1
  const auto compaction_3_output = file_create_listener_->GetLastCreatedFileName();
350
1
  file_create_listener_->ResumeFileName(compaction_3_output);
351
1
  ASSERT_OK(dbfull()->TEST_WaitForCompact());
352
1
}
353
354
}  // namespace rocksdb
355
356
13.2k
int main(int argc, char** argv) {
357
13.2k
  ::testing::InitGoogleTest(&argc, argv);
358
13.2k
  return RUN_ALL_TESTS();
359
13.2k
}