YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_picker.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
//
14
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
15
//  This source code is licensed under the BSD-style license found in the
16
//  LICENSE file in the root directory of this source tree. An additional grant
17
//  of patent rights can be found in the PATENTS file in the same directory.
18
//
19
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
20
// Use of this source code is governed by a BSD-style license that can be
21
// found in the LICENSE file. See the AUTHORS file for names of contributors.
22
23
#include "yb/rocksdb/db/compaction_picker.h"
24
#include "yb/rocksdb/immutable_options.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
32
#include <limits>
33
#include <queue>
34
#include <string>
35
#include <utility>
36
37
38
#include "yb/rocksdb/compaction_filter.h"
39
#include "yb/rocksdb/db/column_family.h"
40
#include "yb/rocksdb/db/filename.h"
41
#include "yb/rocksdb/db/version_set.h"
42
#include "yb/rocksdb/util/log_buffer.h"
43
#include "yb/rocksdb/util/logging.h"
44
#include "yb/rocksdb/util/random.h"
45
#include "yb/rocksdb/util/statistics.h"
46
#include "yb/util/string_util.h"
47
#include "yb/rocksdb/util/sync_point.h"
48
49
#include "yb/util/logging.h"
50
#include <glog/logging.h>
51
52
DEFINE_bool(aggressive_compaction_for_read_amp, false,
53
            "Determines if we should compact aggressively to reduce read amplification based on "
54
            "number of files alone, without regards to relative sizes of the SSTable files.");
55
56
namespace rocksdb {
57
58
namespace {
59
16.2k
uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
60
16.2k
  uint64_t sum = 0;
61
55.7k
  for (size_t i = 0; i < files.size() && files[i]; i++) {
62
39.5k
    sum += files[i]->compensated_file_size;
63
39.5k
  }
64
16.2k
  return sum;
65
16.2k
}
66
67
// Universal compaction is not supported in ROCKSDB_LITE
68
#ifndef ROCKSDB_LITE
69
70
// Used in universal compaction when trivial move is enabled.
71
// This structure is used for the construction of min heap
72
// that contains the file meta data, the level of the file
73
// and the index of the file in that level
74
75
struct InputFileInfo {
76
2.41k
  InputFileInfo() : f(nullptr) {}
77
78
  FileMetaData* f;
79
  size_t level;
80
  size_t index;
81
};
82
83
// Used in universal compaction when trivial move is enabled.
84
// This comparator is used for the construction of min heap
85
// based on the smallest key of the file.
86
struct UserKeyComparator {
87
603
  explicit UserKeyComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
88
89
11.5k
  bool operator()(InputFileInfo i1, InputFileInfo i2) const {
90
11.5k
    return (ucmp_->Compare(i1.f->smallest.key.user_key(),
91
11.5k
                           i2.f->smallest.key.user_key()) > 0);
92
11.5k
  }
93
94
 private:
95
  const Comparator* ucmp_;
96
};
97
98
typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
99
                            UserKeyComparator> SmallestKeyHeap;
100
constexpr auto kFormatFileSizeInfoBufSize = 256;
101
102
// This function creates the heap that is used to find if the files are
103
// overlapping during universal compaction when the allow_trivial_move
104
// is set.
105
603
SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
106
603
  SmallestKeyHeap smallest_key_priority_q =
107
603
      SmallestKeyHeap(UserKeyComparator(ucmp));
108
109
603
  InputFileInfo input_file;
110
111
3.73k
  for (size_t l = 0; l < c->num_input_levels(); l++) {
112
3.13k
    if (c->num_input_files(l) != 0) {
113
799
      if (l == 0 && c->start_level() == 0) {
114
2.54k
        for (size_t i = 0; i < c->num_input_files(0); i++) {
115
1.94k
          input_file.f = c->input(0, i);
116
1.94k
          input_file.level = 0;
117
1.94k
          input_file.index = i;
118
1.94k
          smallest_key_priority_q.push(std::move(input_file));
119
1.94k
        }
120
196
      } else {
121
196
        input_file.f = c->input(l, 0);
122
196
        input_file.level = l;
123
196
        input_file.index = 0;
124
196
        smallest_key_priority_q.push(std::move(input_file));
125
196
      }
126
799
    }
127
3.13k
  }
128
603
  return smallest_key_priority_q;
129
603
}
130
#endif  // !ROCKSDB_LITE
131
}  // anonymous namespace
132
133
// Determine compression type, based on user options, level of the output
134
// file and whether compression is disabled.
135
// If enable_compression is false, then compression is always disabled no
136
// matter what the values of the other two parameters are.
137
// Otherwise, the compression type is determined based on options and level.
138
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
139
                                   int level, int base_level,
140
35.2k
                                   const bool enable_compression) {
141
35.2k
  if (!enable_compression) {
142
    // disable compression
143
12
    return kNoCompression;
144
12
  }
145
  // If the use has specified a different compression level for each level,
146
  // then pick the compression for that level.
147
35.2k
  if (!ioptions.compression_per_level.empty()) {
148
1.62k
    assert(level == 0 || level >= base_level);
149
1.57k
    int idx = (level == 0) ? 0 : level - base_level + 1;
150
151
1.62k
    const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
152
    // It is possible for level_ to be -1; in that case, we use level
153
    // 0's compression.  This occurs mostly in backwards compatibility
154
    // situations when the builder doesn't know what level the file
155
    // belongs to.  Likewise, if level is beyond the end of the
156
    // specified compression levels, use the last value.
157
1.62k
    return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
158
33.5k
  } else {
159
33.5k
    return ioptions.compression;
160
33.5k
  }
161
35.2k
}
162
163
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
164
                                   const InternalKeyComparator* icmp)
165
347k
    : ioptions_(ioptions), icmp_(icmp) {}
166
167
328k
CompactionPicker::~CompactionPicker() {}
168
169
// Delete this compaction from the list of running compactions.
170
22.9k
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
171
22.9k
  if (c->start_level() == 0 ||
172
12.6k
      ioptions_.compaction_style == kCompactionStyleUniversal) {
173
12.6k
    level0_compactions_in_progress_.erase(c);
174
12.6k
  }
175
22.9k
  if (!status.ok()) {
176
112
    c->ResetNextCompactionIndex();
177
112
  }
178
22.9k
}
179
180
void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
181
76.4k
                                InternalKey* smallest, InternalKey* largest) {
182
76.4k
  const int level = inputs.level;
183
76.4k
  assert(!inputs.empty());
184
76.4k
  smallest->Clear();
185
76.4k
  largest->Clear();
186
187
76.4k
  if (level == 0) {
188
95.0k
    for (size_t i = 0; i < inputs.size(); i++) {
189
62.9k
      FileMetaData* f = inputs[i];
190
62.9k
      if (i == 0) {
191
32.1k
        *smallest = f->smallest.key;
192
32.1k
        *largest = f->largest.key;
193
30.8k
      } else {
194
30.8k
        if (icmp_->Compare(f->smallest.key, *smallest) < 0) {
195
6.52k
          *smallest = f->smallest.key;
196
6.52k
        }
197
30.8k
        if (icmp_->Compare(f->largest.key, *largest) > 0) {
198
10.5k
          *largest = f->largest.key;
199
10.5k
        }
200
30.8k
      }
201
62.9k
    }
202
44.3k
  } else {
203
44.3k
    *smallest = inputs[0]->smallest.key;
204
44.3k
    *largest = inputs[inputs.size() - 1]->largest.key;
205
44.3k
  }
206
76.4k
}
207
208
void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
209
                                const CompactionInputFiles& inputs2,
210
23.8k
                                InternalKey* smallest, InternalKey* largest) {
211
23.8k
  assert(!inputs1.empty() || !inputs2.empty());
212
23.8k
  if (inputs1.empty()) {
213
0
    GetRange(inputs2, smallest, largest);
214
23.8k
  } else if (inputs2.empty()) {
215
13.0k
    GetRange(inputs1, smallest, largest);
216
10.8k
  } else {
217
10.8k
    InternalKey smallest1, smallest2, largest1, largest2;
218
10.8k
    GetRange(inputs1, &smallest1, &largest1);
219
10.8k
    GetRange(inputs2, &smallest2, &largest2);
220
10.8k
    *smallest = icmp_->Compare(smallest1, smallest2) < 0 ?
221
6.20k
                smallest1 : smallest2;
222
10.8k
    *largest = icmp_->Compare(largest1, largest2) < 0 ?
223
8.44k
               largest2 : largest1;
224
10.8k
  }
225
23.8k
}
226
227
bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
228
                                              VersionStorageInfo* vstorage,
229
18.4k
                                              CompactionInputFiles* inputs) {
230
  // This isn't good compaction
231
18.4k
  assert(!inputs->empty());
232
233
18.4k
  const int level = inputs->level;
234
  // GetOverlappingInputs will always do the right thing for level-0.
235
  // So we don't need to do any expansion if level == 0.
236
18.4k
  if (level == 0) {
237
8.09k
    return true;
238
8.09k
  }
239
240
10.3k
  InternalKey smallest, largest;
241
242
  // Keep expanding inputs until we are sure that there is a "clean cut"
243
  // boundary between the files in input and the surrounding files.
244
  // This will ensure that no parts of a key are lost during compaction.
245
10.3k
  int hint_index = -1;
246
10.3k
  size_t old_size;
247
10.3k
  do {
248
10.3k
    old_size = inputs->size();
249
10.3k
    GetRange(*inputs, &smallest, &largest);
250
10.3k
    inputs->clear();
251
10.3k
    vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
252
10.3k
                                   hint_index, &hint_index);
253
10.3k
  } while (inputs->size() > old_size);
254
255
  // we started off with inputs non-empty and the previous loop only grew
256
  // inputs. thus, inputs should be non-empty here
257
10.3k
  assert(!inputs->empty());
258
259
  // If, after the expansion, there are files that are already under
260
  // compaction, then we must drop/cancel this compaction.
261
10.3k
  if (FilesInCompaction(inputs->files)) {
262
0
    RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
263
0
        "[%s] ExpandWhileOverlapping() failure because some of the necessary"
264
0
        " compaction input files are currently being compacted.",
265
0
        cf_name.c_str());
266
0
    return false;
267
0
  }
268
10.3k
  return true;
269
10.3k
}
270
271
// Returns true if any one of specified files are being compacted
272
bool CompactionPicker::FilesInCompaction(
273
58.5k
    const std::vector<FileMetaData*>& files) {
274
116k
  for (size_t i = 0; i < files.size(); i++) {
275
59.3k
    if (files[i]->being_compacted) {
276
956
      return true;
277
956
    }
278
59.3k
  }
279
57.6k
  return false;
280
58.5k
}
281
282
std::unique_ptr<Compaction> CompactionPicker::FormCompaction(
283
    const CompactionOptions& compact_options,
284
    const std::vector<CompactionInputFiles>& input_files, int output_level,
285
    VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
286
15
    uint32_t output_path_id) const {
287
15
  uint64_t max_grandparent_overlap_bytes =
288
15
      output_level + 1 < vstorage->num_levels() ?
289
3
          mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
290
12
          std::numeric_limits<uint64_t>::max();
291
15
  DCHECK_GT(input_files.size(), 0);
292
15
  return Compaction::Create(
293
15
      vstorage, mutable_cf_options, input_files, output_level,
294
15
      compact_options.output_file_size_limit, max_grandparent_overlap_bytes, output_path_id,
295
15
      compact_options.compression,
296
15
      /* grandparents = */ std::vector<FileMetaData*>(), ioptions_.info_log, true);
297
15
}
298
299
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
300
    std::vector<CompactionInputFiles>* input_files,
301
    std::unordered_set<uint64_t>* input_set,
302
    const VersionStorageInfo* vstorage,
303
15
    const CompactionOptions& compact_options) const {
304
15
  if (input_set->size() == 0U) {
305
0
    return STATUS(InvalidArgument,
306
0
        "Compaction must include at least one file.");
307
0
  }
308
15
  assert(input_files);
309
310
15
  std::vector<CompactionInputFiles> matched_input_files;
311
15
  matched_input_files.resize(vstorage->num_levels());
312
15
  int first_non_empty_level = -1;
313
15
  int last_non_empty_level = -1;
314
  // TODO(yhchiang): use a lazy-initialized mapping from
315
  //                 file_number to FileMetaData in Version.
316
48
  for (int level = 0; level < vstorage->num_levels(); ++level) {
317
49
    for (auto file : vstorage->LevelFiles(level)) {
318
49
      auto iter = input_set->find(file->fd.GetNumber());
319
49
      if (iter != input_set->end()) {
320
47
        matched_input_files[level].files.push_back(file);
321
47
        input_set->erase(iter);
322
47
        last_non_empty_level = level;
323
47
        if (first_non_empty_level == -1) {
324
15
          first_non_empty_level = level;
325
15
        }
326
47
      }
327
49
    }
328
33
  }
329
330
15
  if (!input_set->empty()) {
331
0
    std::string message(
332
0
        "Cannot find matched SST files for the following file numbers:");
333
0
    for (auto fn : *input_set) {
334
0
      message += " ";
335
0
      message += ToString(fn);
336
0
    }
337
0
    return STATUS(InvalidArgument, message);
338
0
  }
339
340
15
  for (int level = first_non_empty_level;
341
30
       level <= last_non_empty_level; ++level) {
342
15
    matched_input_files[level].level = level;
343
15
    input_files->emplace_back(std::move(matched_input_files[level]));
344
15
  }
345
346
15
  return Status::OK();
347
15
}
348
349
350
351
// Returns true if any one of the parent files are being compacted
352
bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage,
353
                                         const InternalKey* smallest,
354
                                         const InternalKey* largest,
355
                                         int level,
356
20.3k
                                         int* level_index) {
357
20.3k
  std::vector<FileMetaData*> inputs;
358
20.3k
  assert(level < NumberLevels());
359
360
20.3k
  vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
361
20.3k
                                 *level_index, level_index);
362
20.3k
  return FilesInCompaction(inputs);
363
20.3k
}
364
365
// Populates the set of inputs of all other levels that overlap with the
366
// start level.
367
// Now we assume all levels except start level and output level are empty.
368
// Will also attempt to expand "start level" if that doesn't expand
369
// "output level" or cause "level" to include a file for compaction that has an
370
// overlapping user-key with another file.
371
// REQUIRES: input_level and output_level are different
372
// REQUIRES: inputs->empty() == false
373
// Returns false if files on parent level are currently in compaction, which
374
// means that we can't compact them
375
bool CompactionPicker::SetupOtherInputs(
376
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
377
    VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
378
    CompactionInputFiles* output_level_inputs, int* parent_index,
379
18.2k
    int base_index) {
380
18.2k
  assert(!inputs->empty());
381
18.2k
  assert(output_level_inputs->empty());
382
18.2k
  const int input_level = inputs->level;
383
18.2k
  const int output_level = output_level_inputs->level;
384
18.2k
  assert(input_level != output_level);
385
386
  // For now, we only support merging two levels, start level and output level.
387
  // We need to assert other levels are empty.
388
19.9k
  for (int l = input_level + 1; l < output_level; l++) {
389
1.75k
    assert(vstorage->NumLevelFiles(l) == 0);
390
1.75k
  }
391
392
18.2k
  InternalKey smallest, largest;
393
394
  // Get the range one last time.
395
18.2k
  GetRange(*inputs, &smallest, &largest);
396
397
  // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
398
  // include in compaction
399
18.2k
  vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
400
18.2k
                                 &output_level_inputs->files, *parent_index,
401
18.2k
                                 parent_index);
402
403
18.2k
  if (FilesInCompaction(output_level_inputs->files)) {
404
0
    return false;
405
0
  }
406
407
  // See if we can further grow the number of inputs in "level" without
408
  // changing the number of "level+1" files we pick up. We also choose NOT
409
  // to expand if this would cause "level" to include some entries for some
410
  // user key, while excluding other entries for the same user key. This
411
  // can happen when one user key spans multiple files.
412
18.2k
  if (!output_level_inputs->empty()) {
413
5.41k
    CompactionInputFiles expanded0;
414
5.41k
    expanded0.level = input_level;
415
    // Get entire range covered by compaction
416
5.41k
    InternalKey all_start, all_limit;
417
5.41k
    GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
418
419
5.41k
    vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
420
5.41k
                                   &expanded0.files, base_index, nullptr);
421
5.41k
    const uint64_t inputs0_size = TotalCompensatedFileSize(inputs->files);
422
5.41k
    const uint64_t inputs1_size =
423
5.41k
        TotalCompensatedFileSize(output_level_inputs->files);
424
5.41k
    const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0.files);
425
5.41k
    uint64_t limit =
426
5.41k
        mutable_cf_options.ExpandedCompactionByteSizeLimit(input_level);
427
5.41k
    if (expanded0.size() > inputs->size() &&
428
1.67k
        inputs1_size + expanded0_size < limit &&
429
1.66k
        !FilesInCompaction(expanded0.files) &&
430
1.66k
        !vstorage->HasOverlappingUserKey(&expanded0.files, input_level)) {
431
1.66k
      InternalKey new_start, new_limit;
432
1.66k
      GetRange(expanded0, &new_start, &new_limit);
433
1.66k
      std::vector<FileMetaData*> expanded1;
434
1.66k
      vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
435
1.66k
                                     &expanded1, *parent_index, parent_index);
436
1.66k
      if (expanded1.size() == output_level_inputs->size() &&
437
954
          !FilesInCompaction(expanded1)) {
438
954
        RLOG(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
439
954
            "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "(%" PRIu64
440
954
            "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
441
954
            " (%" PRIu64 "+%" PRIu64 "bytes)\n",
442
954
            cf_name.c_str(), input_level, inputs->size(),
443
954
            output_level_inputs->size(), inputs0_size, inputs1_size,
444
954
            expanded0.size(), expanded1.size(), expanded0_size, inputs1_size);
445
954
        smallest = new_start;
446
954
        largest = new_limit;
447
954
        inputs->files = expanded0.files;
448
954
        output_level_inputs->files = expanded1;
449
954
      }
450
1.66k
    }
451
5.41k
  }
452
453
18.2k
  return true;
454
18.2k
}
455
456
void CompactionPicker::GetGrandparents(
457
    VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
458
    const CompactionInputFiles& output_level_inputs,
459
18.4k
    std::vector<FileMetaData*>* grandparents) {
460
18.4k
  InternalKey start, limit;
461
18.4k
  GetRange(inputs, output_level_inputs, &start, &limit);
462
  // Compute the set of grandparent files that overlap this compaction
463
  // (parent == level+1; grandparent == level+2)
464
18.4k
  if (output_level_inputs.level + 1 < NumberLevels()) {
465
16.7k
    vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start,
466
16.7k
                                   &limit, grandparents);
467
16.7k
  }
468
18.4k
}
469
470
void CompactionPicker::MarkL0FilesForDeletion(
471
    const VersionStorageInfo* vstorage,
472
16.5k
    const ImmutableCFOptions* ioptions) {
473
  // CompactionFileFilterFactory is used to determine files that can be directly removed during
474
  // compaction rather than requiring a full iteration through the files.
475
16.5k
  if (!ioptions->compaction_file_filter_factory) {
476
16.5k
    return;
477
16.5k
  }
478
  // Compaction file filter factory should only look at files in Level 0.
479
0
  auto file_filter = ioptions->compaction_file_filter_factory->CreateCompactionFileFilter(
480
0
      vstorage->LevelFiles(0));
481
0
  for (FileMetaData* f : vstorage->LevelFiles(0)) {
482
0
    if (file_filter && file_filter->Filter(f) == FilterDecision::kDiscard) {
483
0
      f->delete_after_compaction = true;
484
0
    }
485
0
  }
486
0
}
487
488
std::unique_ptr<Compaction> CompactionPicker::CompactRange(
489
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
490
    VersionStorageInfo* vstorage, int input_level, int output_level,
491
    uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
492
5.86k
    InternalKey** compaction_end, bool* manual_conflict) {
493
  // CompactionPickerFIFO has its own implementation of compact range
494
5.86k
  assert(ioptions_.compaction_style != kCompactionStyleFIFO);
495
496
5.86k
  if (input_level == ColumnFamilyData::kCompactAllLevels) {
497
88
    assert(ioptions_.compaction_style == kCompactionStyleUniversal);
498
499
    // Universal compaction with more than one level always compacts all the
500
    // files together to the last level.
501
88
    assert(vstorage->num_levels() > 1);
502
    // DBImpl::CompactRange() set output level to be the last level
503
88
    assert(output_level == vstorage->num_levels() - 1);
504
    // DBImpl::RunManualCompaction will make full range for universal compaction
505
88
    assert(begin == nullptr);
506
88
    assert(end == nullptr);
507
88
    *compaction_end = nullptr;
508
509
88
    int start_level = 0;
510
172
    for (; start_level < vstorage->num_levels() &&
511
172
           vstorage->NumLevelFiles(start_level) == 0;
512
84
         start_level++) {
513
84
    }
514
88
    if (start_level == vstorage->num_levels()) {
515
0
      return nullptr;
516
0
    }
517
518
88
    if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) {
519
0
      *manual_conflict = true;
520
      // Only one level 0 compaction allowed
521
0
      return nullptr;
522
0
    }
523
524
88
    std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
525
88
                                             start_level);
526
695
    for (int level = start_level; level < vstorage->num_levels(); level++) {
527
607
      inputs[level - start_level].level = level;
528
607
      auto& files = inputs[level - start_level].files;
529
530
206
      for (FileMetaData* f : vstorage->LevelFiles(level)) {
531
206
        files.push_back(f);
532
206
      }
533
607
      if (FilesInCompaction(files)) {
534
0
        *manual_conflict = true;
535
0
        return nullptr;
536
0
      }
537
607
    }
538
88
    auto c = Compaction::Create(
539
88
        vstorage, mutable_cf_options, std::move(inputs), output_level,
540
88
        mutable_cf_options.MaxFileSizeForLevel(output_level),
541
        /* max_grandparent_overlap_bytes = */ LLONG_MAX, output_path_id,
542
88
        GetCompressionType(ioptions_, output_level, 1),
543
88
        /* grandparents = */ std::vector<FileMetaData*>(), ioptions_.info_log,
544
88
        /* is_manual = */ true);
545
88
    if (c && start_level == 0) {
546
76
      MarkL0FilesForDeletion(vstorage, &ioptions_);
547
76
      level0_compactions_in_progress_.insert(c.get());
548
76
    }
549
88
    return c;
550
5.77k
  }
551
552
5.77k
  CompactionInputFiles inputs;
553
5.77k
  inputs.level = input_level;
554
5.77k
  bool covering_the_whole_range = true;
555
556
  // All files are 'overlapping' in universal style compaction.
557
  // We have to compact the entire range in one shot.
558
5.77k
  if (ioptions_.compaction_style == kCompactionStyleUniversal) {
559
362
    begin = nullptr;
560
362
    end = nullptr;
561
362
  }
562
563
5.77k
  vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
564
5.77k
  if (inputs.empty()) {
565
1.00k
    return nullptr;
566
1.00k
  }
567
568
4.77k
  if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) {
569
    // Only one level 0 compaction allowed
570
2
    TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
571
2
    *manual_conflict = true;
572
2
    return nullptr;
573
2
  }
574
575
  // Avoid compacting too much in one shot in case the range is large.
576
  // But we cannot do this for level-0 since level-0 files can overlap
577
  // and we must not pick one file and drop another older file if the
578
  // two files overlap.
579
4.76k
  if (input_level > 0) {
580
2.43k
    const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) *
581
2.43k
      mutable_cf_options.source_compaction_factor;
582
2.43k
    uint64_t total = 0;
583
2.47k
    for (size_t i = 0; i + 1 < inputs.size(); ++i) {
584
229
      uint64_t s = inputs[i]->compensated_file_size;
585
229
      total += s;
586
229
      if (total >= limit) {
587
190
        **compaction_end = inputs[i + 1]->smallest.key;
588
190
        covering_the_whole_range = false;
589
190
        inputs.files.resize(i + 1);
590
190
        break;
591
190
      }
592
229
    }
593
2.43k
  }
594
4.76k
  assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
595
596
4.76k
  if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) {
597
    // manual compaction is now multi-threaded, so it can
598
    // happen that ExpandWhileOverlapping fails
599
    // we handle it higher in RunManualCompaction
600
0
    *manual_conflict = true;
601
0
    return nullptr;
602
0
  }
603
604
4.76k
  if (covering_the_whole_range) {
605
4.57k
    *compaction_end = nullptr;
606
4.57k
  }
607
608
4.76k
  CompactionInputFiles output_level_inputs;
609
4.76k
  if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
610
4
    assert(input_level == 0);
611
4
    output_level = vstorage->base_level();
612
4
    assert(output_level > 0);
613
4
  }
614
4.76k
  output_level_inputs.level = output_level;
615
4.76k
  if (input_level != output_level) {
616
4.54k
    int parent_index = -1;
617
4.54k
    if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
618
0
                          &output_level_inputs, &parent_index, -1)) {
619
      // manual compaction is now multi-threaded, so it can
620
      // happen that SetupOtherInputs fails
621
      // we handle it higher in RunManualCompaction
622
0
      *manual_conflict = true;
623
0
      return nullptr;
624
0
    }
625
4.76k
  }
626
627
4.76k
  std::vector<CompactionInputFiles> compaction_inputs({inputs});
628
4.76k
  if (!output_level_inputs.empty()) {
629
1.55k
    compaction_inputs.push_back(output_level_inputs);
630
1.55k
  }
631
11.0k
  for (size_t i = 0; i < compaction_inputs.size(); i++) {
632
6.32k
    if (FilesInCompaction(compaction_inputs[i].files)) {
633
0
      *manual_conflict = true;
634
0
      return nullptr;
635
0
    }
636
6.32k
  }
637
638
4.76k
  std::vector<FileMetaData*> grandparents;
639
4.76k
  GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
640
4.76k
  auto compaction = Compaction::Create(
641
4.76k
      vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
642
4.76k
      mutable_cf_options.MaxFileSizeForLevel(output_level),
643
4.76k
      mutable_cf_options.MaxGrandParentOverlapBytes(input_level), output_path_id,
644
4.76k
      GetCompressionType(ioptions_, output_level, vstorage->base_level()), std::move(grandparents),
645
4.76k
      ioptions_.info_log, /* is manual compaction = */ true);
646
4.76k
  if (!compaction) {
647
1
    return nullptr;
648
1
  }
649
650
4.76k
  TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction.get());
651
4.76k
  if (input_level == 0) {
652
2.33k
    MarkL0FilesForDeletion(vstorage, &ioptions_);
653
2.33k
    level0_compactions_in_progress_.insert(compaction.get());
654
2.33k
  }
655
656
  // Creating a compaction influences the compaction score because the score
657
  // takes running compactions into account (by skipping files that are already
658
  // being compacted). Since we just changed compaction score, we recalculate it
659
  // here
660
4.76k
  {  // this piece of code recomputes compaction score
661
4.76k
    CompactionOptionsFIFO dummy_compaction_options_fifo;
662
4.76k
    vstorage->ComputeCompactionScore(mutable_cf_options,
663
4.76k
                                     dummy_compaction_options_fifo);
664
4.76k
  }
665
666
4.76k
  return compaction;
667
4.76k
}
668
669
// Test whether two files have overlapping key-ranges.
670
bool HaveOverlappingKeyRanges(const Comparator* c,
671
                              const SstFileMetaData& a,
672
0
                              const SstFileMetaData& b) {
673
0
  return c->Compare(a.largest.key, b.smallest.key) >= 0 &&
674
0
         c->Compare(b.largest.key, a.smallest.key) >= 0;
675
0
}
676
677
#ifndef ROCKSDB_LITE
678
namespace {
679
680
// Updates smallest/largest keys using keys from specified file.
681
void UpdateBoundaryKeys(const Comparator* comparator,
682
                        const SstFileMetaData& file,
683
                        SstFileMetaData::BoundaryValues* smallest,
684
47
                        SstFileMetaData::BoundaryValues* largest) {
685
47
  if (smallest != nullptr && comparator->Compare(smallest->key, file.smallest.key) > 0) {
686
20
    smallest->key = file.smallest.key;
687
20
  }
688
47
  if (largest != nullptr && comparator->Compare(largest->key, file.largest.key) < 0) {
689
9
    largest->key = file.largest.key;
690
9
  }
691
47
}
692
693
}  // namespace
694
695
Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
696
      std::unordered_set<uint64_t>* input_files,
697
      const ColumnFamilyMetaData& cf_meta,
698
15
      const int output_level) const {
699
15
  auto& levels = cf_meta.levels;
700
15
  auto comparator = icmp_->user_comparator();
701
702
  // TODO(yhchiang): If there is any input files of L1 or up and there
703
  // is at least one L0 files. All L0 files older than the L0 file needs
704
  // to be included. Otherwise, it is a false conditoin
705
706
  // TODO(yhchiang): add is_adjustable to CompactionOptions
707
708
  // the smallest and largest key of the current compaction input
709
15
  SstFileMetaData::BoundaryValues smallest, largest;
710
  // a flag for initializing smallest and largest key
711
15
  bool is_first = true;
712
15
  const int kNotFound = -1;
713
714
  // For each level, it does the following things:
715
  // 1. Find the first and the last compaction input files
716
  //    in the current level.
717
  // 2. Include all files between the first and the last
718
  //    compaction input files.
719
  // 3. Update the compaction key-range.
720
  // 4. For all remaining levels, include files that have
721
  //    overlapping key-range with the compaction key-range.
722
33
  for (int l = 0; l <= output_level; ++l) {
723
18
    auto& current_files = levels[l].files;
724
18
    int first_included = static_cast<int>(current_files.size());
725
18
    int last_included = kNotFound;
726
727
    // identify the first and the last compaction input files
728
    // in the current level.
729
67
    for (size_t f = 0; f < current_files.size(); ++f) {
730
49
      if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
731
45
          input_files->end()) {
732
45
        first_included = std::min(first_included, static_cast<int>(f));
733
45
        last_included = std::max(last_included, static_cast<int>(f));
734
45
        if (is_first) {
735
15
          smallest = current_files[f].smallest;
736
15
          largest = current_files[f].largest;
737
15
          is_first = false;
738
15
        }
739
45
      }
740
49
    }
741
18
    if (last_included == kNotFound) {
742
3
      continue;
743
3
    }
744
745
15
    if (l != 0) {
746
      // expend the compaction input of the current level if it
747
      // has overlapping key-range with other non-compaction input
748
      // files in the same level.
749
0
      while (first_included > 0) {
750
0
        if (comparator->Compare(
751
0
                current_files[first_included - 1].largest.key,
752
0
                current_files[first_included].smallest.key) < 0) {
753
0
          break;
754
0
        }
755
0
        first_included--;
756
0
      }
757
758
0
      while (last_included < static_cast<int>(current_files.size()) - 1) {
759
0
        if (comparator->Compare(
760
0
                current_files[last_included + 1].smallest.key,
761
0
                current_files[last_included].largest.key) > 0) {
762
0
          break;
763
0
        }
764
0
        last_included++;
765
0
      }
766
0
    }
767
768
    // include all files between the first and the last compaction input files.
769
62
    for (int f = first_included; f <= last_included; ++f) {
770
47
      if (current_files[f].being_compacted) {
771
0
        return STATUS(Aborted,
772
0
            "Necessary compaction input file " + current_files[f].name +
773
0
            " is currently being compacted.");
774
0
      }
775
47
      input_files->insert(
776
47
          TableFileNameToNumber(current_files[f].name));
777
47
    }
778
779
    // update smallest and largest key
780
15
    if (l == 0) {
781
62
      for (int f = first_included; f <= last_included; ++f) {
782
47
        UpdateBoundaryKeys(comparator, current_files[f], &smallest, &largest);
783
47
      }
784
0
    } else {
785
0
      UpdateBoundaryKeys(comparator, current_files[first_included], &smallest, nullptr);
786
0
      UpdateBoundaryKeys(comparator, current_files[last_included], nullptr, &largest);
787
0
    }
788
789
15
    SstFileMetaData aggregated_file_meta;
790
15
    aggregated_file_meta.smallest = smallest;
791
15
    aggregated_file_meta.largest = largest;
792
793
    // For all lower levels, include all overlapping files.
794
    // We need to add overlapping files from the current level too because even
795
    // if there no input_files in level l, we would still need to add files
796
    // which overlap with the range containing the input_files in levels 0 to l
797
    // Level 0 doesn't need to be handled this way because files are sorted by
798
    // time and not by key
799
18
    for (int m = std::max(l, 1); m <= output_level; ++m) {
800
0
      for (auto& next_lv_file : levels[m].files) {
801
0
        if (HaveOverlappingKeyRanges(
802
0
            comparator, aggregated_file_meta, next_lv_file)) {
803
0
          if (next_lv_file.being_compacted) {
804
0
            return STATUS(Aborted,
805
0
                "File " + next_lv_file.name +
806
0
                " that has overlapping key range with one of the compaction "
807
0
                " input file is currently being compacted.");
808
0
          }
809
0
          input_files->insert(
810
0
              TableFileNameToNumber(next_lv_file.name));
811
0
        }
812
0
      }
813
3
    }
814
15
  }
815
15
  return Status::OK();
816
15
}
817
818
Status CompactionPicker::SanitizeCompactionInputFiles(
819
    std::unordered_set<uint64_t>* input_files,
820
    const ColumnFamilyMetaData& cf_meta,
821
21
    const int output_level) const {
822
21
  assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
823
21
         cf_meta.levels[cf_meta.levels.size() - 1].level);
824
21
  if (output_level >= static_cast<int>(cf_meta.levels.size())) {
825
6
    return STATUS(InvalidArgument,
826
6
        "Output level for column family " + cf_meta.name +
827
6
        " must between [0, " +
828
6
        ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) +
829
6
        "].");
830
6
  }
831
832
15
  if (output_level > MaxOutputLevel()) {
833
0
    return STATUS(InvalidArgument,
834
0
        "Exceed the maximum output level defined by "
835
0
        "the current compaction algorithm --- " +
836
0
            ToString(MaxOutputLevel()));
837
0
  }
838
839
15
  if (output_level < 0) {
840
0
    return STATUS(InvalidArgument,
841
0
        "Output level cannot be negative.");
842
0
  }
843
844
15
  if (input_files->size() == 0) {
845
0
    return STATUS(InvalidArgument,
846
0
        "A compaction must contain at least one file.");
847
0
  }
848
849
15
  Status s = SanitizeCompactionInputFilesForAllLevels(
850
15
      input_files, cf_meta, output_level);
851
852
15
  if (!s.ok()) {
853
0
    return s;
854
0
  }
855
856
  // for all input files, check whether the file number matches
857
  // any currently-existing files.
858
47
  for (auto file_num : *input_files) {
859
47
    bool found = false;
860
47
    for (auto level_meta : cf_meta.levels) {
861
199
      for (auto file_meta : level_meta.files) {
862
199
        if (file_num == TableFileNameToNumber(file_meta.name)) {
863
47
          if (file_meta.being_compacted) {
864
0
            return STATUS(Aborted,
865
0
                "Specified compaction input file " +
866
0
                MakeTableFileName("", file_num) +
867
0
                " is already being compacted.");
868
0
          }
869
47
          found = true;
870
47
          break;
871
47
        }
872
199
      }
873
47
      if (found) {
874
47
        break;
875
47
      }
876
47
    }
877
47
    if (!found) {
878
0
      return STATUS(InvalidArgument,
879
0
          "Specified compaction input file " +
880
0
          MakeTableFileName("", file_num) +
881
0
          " does not exist in column family " + cf_meta.name + ".");
882
0
    }
883
47
  }
884
885
15
  return Status::OK();
886
15
}
887
#endif  // !ROCKSDB_LITE
888
889
bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
890
87.3k
    const {
891
87.3k
  if (!vstorage->FilesMarkedForCompaction().empty()) {
892
29
    return true;
893
29
  }
894
481k
  for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
895
414k
    if (vstorage->CompactionScore(i) >= 1) {
896
21.1k
      return true;
897
21.1k
    }
898
414k
  }
899
66.1k
  return false;
900
87.3k
}
901
902
void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental(
903
    const std::string& cf_name, VersionStorageInfo* vstorage,
904
6.03k
    CompactionInputFiles* inputs, int* level, int* output_level) {
905
6.03k
  if (vstorage->FilesMarkedForCompaction().empty()) {
906
6.01k
    return;
907
6.01k
  }
908
909
39
  auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
910
    // If it's being compacted it has nothing to do here.
911
    // If this assert() fails that means that some function marked some
912
    // files as being_compacted, but didn't call ComputeCompactionScore()
913
39
    assert(!level_file.second->being_compacted);
914
39
    *level = level_file.first;
915
35
    *output_level = (*level == 0) ? vstorage->base_level() : *level + 1;
916
917
39
    if (*level == 0 && !level0_compactions_in_progress_.empty()) {
918
28
      return false;
919
28
    }
920
921
11
    inputs->files = {level_file.second};
922
11
    inputs->level = *level;
923
11
    return ExpandWhileOverlapping(cf_name, vstorage, inputs);
924
11
  };
925
926
  // take a chance on a random file first
927
25
  Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
928
25
  size_t random_file_index = static_cast<size_t>(rnd.Uniform(
929
25
      static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
930
931
25
  if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
932
    // found the compaction!
933
11
    return;
934
11
  }
935
936
14
  for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
937
14
    if (continuation(level_file)) {
938
      // found the compaction!
939
0
      return;
940
0
    }
941
14
  }
942
14
  inputs->files.clear();
943
14
}
944
945
std::unique_ptr<Compaction> LevelCompactionPicker::PickCompaction(
946
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
947
19.7k
    VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
948
19.7k
  int level = -1;
949
19.7k
  int output_level = -1;
950
19.7k
  int parent_index = -1;
951
19.7k
  int base_index = -1;
952
19.7k
  CompactionInputFiles inputs;
953
19.7k
  double score = 0;
954
19.7k
  CompactionReason compaction_reason = CompactionReason::kUnknown;
955
956
  // Find the compactions by size on all levels.
957
19.7k
  bool skipped_l0 = false;
958
55.3k
  for (int i = 0; i < NumberLevels() - 1; i++) {
959
49.3k
    score = vstorage->CompactionScore(i);
960
49.3k
    level = vstorage->CompactionScoreLevel(i);
961
49.3k
    assert(i == 0 || score <= vstorage->CompactionScore(i - 1));
962
49.3k
    if (score >= 1) {
963
21.9k
      if (skipped_l0 && level == vstorage->base_level()) {
964
        // If L0->base_level compaction is pending, don't schedule further
965
        // compaction from base level. Otherwise L0->base_level compaction
966
        // may starve.
967
2.19k
        continue;
968
2.19k
      }
969
19.7k
      output_level = (level == 0) ? vstorage->base_level() : level + 1;
970
19.7k
      if (PickCompactionBySize(vstorage, level, output_level, &inputs,
971
19.7k
                               &parent_index, &base_index) &&
972
13.6k
          ExpandWhileOverlapping(cf_name, vstorage, &inputs)) {
973
        // found the compaction!
974
13.6k
        if (level == 0) {
975
          // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
976
5.75k
          compaction_reason = CompactionReason::kLevelL0FilesNum;
977
7.94k
        } else {
978
          // L1+ score = `Level files size` / `MaxBytesForLevel`
979
7.94k
          compaction_reason = CompactionReason::kLevelMaxLevelSize;
980
7.94k
        }
981
13.6k
        break;
982
6.02k
      } else {
983
        // didn't find the compaction, clear the inputs
984
6.02k
        inputs.clear();
985
6.02k
        if (level == 0) {
986
6.01k
          skipped_l0 = true;
987
6.01k
        }
988
6.02k
      }
989
19.7k
    }
990
49.3k
  }
991
992
19.7k
  bool is_manual = false;
993
  // if we didn't find a compaction, check if there are any files marked for
994
  // compaction
995
19.7k
  if (inputs.empty()) {
996
6.03k
    is_manual = true;
997
6.03k
    parent_index = base_index = -1;
998
6.03k
    PickFilesMarkedForCompactionExperimental(cf_name, vstorage, &inputs, &level,
999
6.03k
                                             &output_level);
1000
6.03k
    if (!inputs.empty()) {
1001
11
      compaction_reason = CompactionReason::kFilesMarkedForCompaction;
1002
11
    }
1003
6.03k
  }
1004
19.7k
  if (inputs.empty()) {
1005
6.02k
    return nullptr;
1006
6.02k
  }
1007
13.7k
  assert(level >= 0 && output_level >= 0);
1008
1009
  // Two level 0 compaction won't run at the same time, so don't need to worry
1010
  // about files on level 0 being compacted.
1011
13.7k
  if (level == 0) {
1012
5.75k
    assert(level0_compactions_in_progress_.empty());
1013
5.75k
    InternalKey smallest, largest;
1014
5.75k
    GetRange(inputs, &smallest, &largest);
1015
    // Note that the next call will discard the file we placed in
1016
    // c->inputs_[0] earlier and replace it with an overlapping set
1017
    // which will include the picked file.
1018
5.75k
    inputs.files.clear();
1019
5.75k
    vstorage->GetOverlappingInputs(0, &smallest, &largest, &inputs.files);
1020
1021
    // If we include more L0 files in the same compaction run it can
1022
    // cause the 'smallest' and 'largest' key to get extended to a
1023
    // larger range. So, re-invoke GetRange to get the new key range
1024
5.75k
    GetRange(inputs, &smallest, &largest);
1025
5.75k
    if (RangeInCompaction(vstorage, &smallest, &largest, output_level,
1026
13
                          &parent_index)) {
1027
13
      return nullptr;
1028
13
    }
1029
5.74k
    assert(!inputs.files.empty());
1030
5.74k
  }
1031
1032
  // Setup input files from output level
1033
13.6k
  CompactionInputFiles output_level_inputs;
1034
13.6k
  output_level_inputs.level = output_level;
1035
13.6k
  if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
1036
0
                   &output_level_inputs, &parent_index, base_index)) {
1037
0
    return nullptr;
1038
0
  }
1039
1040
13.6k
  std::vector<CompactionInputFiles> compaction_inputs({inputs});
1041
13.6k
  if (!output_level_inputs.empty()) {
1042
3.85k
    compaction_inputs.push_back(output_level_inputs);
1043
3.85k
  }
1044
1045
13.6k
  std::vector<FileMetaData*> grandparents;
1046
13.6k
  GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
1047
13.6k
  auto c = Compaction::Create(
1048
13.6k
      vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
1049
13.6k
      mutable_cf_options.MaxFileSizeForLevel(output_level),
1050
13.6k
      mutable_cf_options.MaxGrandParentOverlapBytes(level),
1051
13.6k
      GetPathId(ioptions_, mutable_cf_options, output_level),
1052
13.6k
      GetCompressionType(ioptions_, output_level, vstorage->base_level()), std::move(grandparents),
1053
13.6k
      ioptions_.info_log, is_manual, score,
1054
13.6k
      /* deletion_compaction = */ false, compaction_reason);
1055
13.6k
  if (!c) {
1056
0
    return nullptr;
1057
0
  }
1058
1059
  // If it's level 0 compaction, make sure we don't execute any other level 0
1060
  // compactions in parallel
1061
13.6k
  if (level == 0) {
1062
5.74k
    level0_compactions_in_progress_.insert(c.get());
1063
5.74k
  }
1064
1065
  // Creating a compaction influences the compaction score because the score
1066
  // takes running compactions into account (by skipping files that are already
1067
  // being compacted). Since we just changed compaction score, we recalculate it
1068
  // here
1069
13.6k
  {  // this piece of code recomputes compaction score
1070
13.6k
    CompactionOptionsFIFO dummy_compaction_options_fifo;
1071
13.6k
    vstorage->ComputeCompactionScore(mutable_cf_options,
1072
13.6k
                                     dummy_compaction_options_fifo);
1073
13.6k
  }
1074
1075
13.6k
  TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c.get());
1076
1077
13.6k
  return c;
1078
13.6k
}
1079
1080
/*
1081
 * Find the optimal path to place a file
1082
 * Given a level, finds the path where levels up to it will fit in levels
1083
 * up to and including this path
1084
 */
1085
uint32_t LevelCompactionPicker::GetPathId(
1086
    const ImmutableCFOptions& ioptions,
1087
13.6k
    const MutableCFOptions& mutable_cf_options, int level) {
1088
13.6k
  uint32_t p = 0;
1089
13.6k
  assert(!ioptions.db_paths.empty());
1090
1091
  // size remaining in the most recent path
1092
13.6k
  uint64_t current_path_size = ioptions.db_paths[0].target_size;
1093
1094
13.6k
  uint64_t level_size;
1095
13.6k
  int cur_level = 0;
1096
1097
13.6k
  level_size = mutable_cf_options.max_bytes_for_level_base;
1098
1099
  // Last path is the fallback
1100
13.9k
  while (p < ioptions.db_paths.size() - 1) {
1101
384
    if (level_size <= current_path_size) {
1102
232
      if (cur_level == level) {
1103
        // Does desired level fit in this path?
1104
80
        return p;
1105
152
      } else {
1106
152
        current_path_size -= level_size;
1107
152
        level_size *= mutable_cf_options.max_bytes_for_level_multiplier;
1108
152
        cur_level++;
1109
152
        continue;
1110
152
      }
1111
152
    }
1112
152
    p++;
1113
152
    current_path_size = ioptions.db_paths[p].target_size;
1114
152
  }
1115
13.6k
  return p;
1116
13.6k
}
1117
1118
bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
1119
                                                 int level, int output_level,
1120
                                                 CompactionInputFiles* inputs,
1121
                                                 int* parent_index,
1122
19.7k
                                                 int* base_index) {
1123
  // level 0 files are overlapping. So we cannot pick more
1124
  // than one concurrent compactions at this level. This
1125
  // could be made better by looking at key-ranges that are
1126
  // being compacted at level 0.
1127
19.7k
  if (level == 0 && !level0_compactions_in_progress_.empty()) {
1128
5.82k
    TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
1129
5.82k
    return false;
1130
5.82k
  }
1131
1132
13.8k
  inputs->clear();
1133
1134
13.8k
  assert(level >= 0);
1135
1136
  // Pick the largest file in this level that is not already
1137
  // being compacted
1138
13.8k
  const std::vector<int>& file_size = vstorage->FilesByCompactionPri(level);
1139
13.8k
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(level);
1140
1141
  // record the first file that is not yet compacted
1142
13.8k
  int nextIndex = -1;
1143
1144
13.8k
  for (unsigned int i = vstorage->NextCompactionIndex(level);
1145
15.4k
       i < file_size.size(); i++) {
1146
15.2k
    int index = file_size[i];
1147
15.2k
    auto* f = level_files[index];
1148
1149
    // do not pick a file to compact if it is being compacted
1150
    // from n-1 level.
1151
15.2k
    if (f->being_compacted) {
1152
645
      continue;
1153
645
    }
1154
1155
    // remember the startIndex for the next call to PickCompaction
1156
14.6k
    if (nextIndex == -1) {
1157
13.8k
      nextIndex = i;
1158
13.8k
    }
1159
1160
    // Do not pick this file if its parents at level+1 are being compacted.
1161
    // Maybe we can avoid redoing this work in SetupOtherInputs
1162
14.6k
    *parent_index = -1;
1163
14.6k
    if (RangeInCompaction(vstorage, &f->smallest.key, &f->largest.key, output_level,
1164
943
                          parent_index)) {
1165
943
      continue;
1166
943
    }
1167
13.6k
    inputs->files.push_back(f);
1168
13.6k
    inputs->level = level;
1169
13.6k
    *base_index = index;
1170
13.6k
    break;
1171
13.6k
  }
1172
1173
  // store where to start the iteration in the next call to PickCompaction
1174
13.8k
  vstorage->SetNextCompactionIndex(level, nextIndex);
1175
1176
13.8k
  return inputs->size() > 0;
1177
13.8k
}
1178
1179
#ifndef ROCKSDB_LITE
1180
bool UniversalCompactionPicker::NeedsCompaction(
1181
700k
    const VersionStorageInfo* vstorage) const {
1182
700k
  const int kLevel0 = 0;
1183
700k
  return vstorage->CompactionScore(kLevel0) >= 1;
1184
700k
}
1185
1186
struct UniversalCompactionPicker::SortedRun {
1187
  SortedRun(int _level, FileMetaData* _file, uint64_t _size,
1188
            uint64_t _compensated_file_size, bool _being_compacted)
1189
      : level(_level),
1190
        file(_file),
1191
        size(_size),
1192
        compensated_file_size(_compensated_file_size),
1193
69.7k
        being_compacted(_being_compacted) {
1194
69.7k
    assert(compensated_file_size > 0);
1195
    // Allowed either one of level and file.
1196
69.7k
    assert((level != 0) != (file != nullptr));
1197
69.7k
  }
1198
1199
  void Dump(char* out_buf, size_t out_buf_size,
1200
            bool print_path = false) const;
1201
1202
  // sorted_run_count is added into the string to print
1203
  void DumpSizeInfo(char* out_buf, size_t out_buf_size,
1204
                    size_t sorted_run_count) const;
1205
1206
23.5k
  std::string FileNumAsString() const {
1207
23.5k
    char file_num_buf[kFormatFileNumberBufSize];
1208
23.5k
    Dump(file_num_buf, sizeof(file_num_buf), true);
1209
23.5k
    return file_num_buf;
1210
23.5k
  }
1211
1212
0
  bool delete_after_compaction() const {
1213
0
    return file ? file->delete_after_compaction : false;
1214
0
  }
1215
1216
  int level;
1217
  // `file` Will be null for level > 0. For level = 0, the sorted run is
1218
  // for this file.
1219
  FileMetaData* file;
1220
  // For level > 0, `size` and `compensated_file_size` are sum of sizes all
1221
  // files in the level. `being_compacted` should be the same for all files
1222
  // in a non-zero level. Use the value here.
1223
  uint64_t size;
1224
  uint64_t compensated_file_size;
1225
  bool being_compacted;
1226
};
1227
1228
void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
1229
                                                size_t out_buf_size,
1230
80.6k
                                                bool print_path) const {
1231
80.6k
  if (level == 0) {
1232
65.4k
    assert(file != nullptr);
1233
65.4k
    if (file->fd.GetPathId() == 0 || !print_path) {
1234
65.0k
      snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
1235
480
    } else {
1236
480
      snprintf(out_buf, out_buf_size, "file %" PRIu64
1237
480
                                      "(path "
1238
480
                                      "%" PRIu32 ")",
1239
480
               file->fd.GetNumber(), file->fd.GetPathId());
1240
480
    }
1241
15.1k
  } else {
1242
15.1k
    snprintf(out_buf, out_buf_size, "level %d", level);
1243
15.1k
  }
1244
80.6k
}
1245
1246
void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
1247
33.3k
    char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
1248
33.3k
  if (level == 0) {
1249
23.4k
    assert(file != nullptr);
1250
23.4k
    snprintf(out_buf, out_buf_size,
1251
23.4k
             "file %" PRIu64 "[%" ROCKSDB_PRIszt
1252
23.4k
             "] "
1253
23.4k
             "with size %" PRIu64 " (compensated size %" PRIu64 ")",
1254
23.4k
             file->fd.GetNumber(), sorted_run_count, file->fd.GetTotalFileSize(),
1255
23.4k
             file->compensated_file_size);
1256
9.90k
  } else {
1257
9.90k
    snprintf(out_buf, out_buf_size,
1258
9.90k
             "level %d[%" ROCKSDB_PRIszt
1259
9.90k
             "] "
1260
9.90k
             "with size %" PRIu64 " (compensated size %" PRIu64 ")",
1261
9.90k
             level, sorted_run_count, size, compensated_file_size);
1262
9.90k
  }
1263
33.3k
}
1264
1265
std::vector<std::vector<UniversalCompactionPicker::SortedRun>>
1266
    UniversalCompactionPicker::CalculateSortedRuns(const VersionStorageInfo& vstorage,
1267
                                                   const ImmutableCFOptions& ioptions,
1268
14.1k
                                                   uint64_t max_file_size) {
1269
14.1k
  std::vector<std::vector<SortedRun>> ret(1);
1270
14.1k
  MarkL0FilesForDeletion(&vstorage, &ioptions);
1271
1272
52.9k
  for (FileMetaData* f : vstorage.LevelFiles(0)) {
1273
    // Any files that can be directly removed during compaction can be included, even if they
1274
    // exceed the "max file size for compaction."
1275
52.9k
    if (f->fd.GetTotalFileSize() <= max_file_size || f->delete_after_compaction) {
1276
52.9k
      ret.back().emplace_back(0, f, f->fd.GetTotalFileSize(), f->compensated_file_size,
1277
52.9k
          f->being_compacted);
1278
    // If last sequence is empty it means that there are multiple too-large-to-compact files in
1279
    // a row. So we just don't start new sequence in this case.
1280
0
    } else if (!ret.back().empty()) {
1281
0
      ret.emplace_back();
1282
0
    }
1283
52.9k
  }
1284
1285
63.7k
  for (int level = 1; level < vstorage.num_levels(); level++) {
1286
49.6k
    uint64_t total_compensated_size = 0U;
1287
49.6k
    uint64_t total_size = 0U;
1288
49.6k
    bool being_compacted = false;
1289
49.6k
    bool is_first = true;
1290
87.0k
    for (FileMetaData* f : vstorage.LevelFiles(level)) {
1291
87.0k
      total_compensated_size += f->compensated_file_size;
1292
87.0k
      total_size += f->fd.GetTotalFileSize();
1293
87.0k
      if (ioptions.compaction_options_universal.allow_trivial_move == true) {
1294
58.5k
        if (f->being_compacted) {
1295
1.52k
          being_compacted = f->being_compacted;
1296
1.52k
        }
1297
28.4k
      } else {
1298
        // Compaction always includes all files for a non-zero level, so for a
1299
        // non-zero level, all the files should share the same being_compacted
1300
        // value.
1301
        // This assumption is only valid when
1302
        // ioptions.compaction_options_universal.allow_trivial_move is false
1303
28.4k
        assert(is_first || f->being_compacted == being_compacted);
1304
28.4k
      }
1305
87.0k
      if (is_first) {
1306
16.7k
        being_compacted = f->being_compacted;
1307
16.7k
        is_first = false;
1308
16.7k
      }
1309
87.0k
    }
1310
49.6k
    if (total_compensated_size > 0) {
1311
16.7k
      ret.back().emplace_back(level, nullptr, total_size, total_compensated_size, being_compacted);
1312
16.7k
    }
1313
49.6k
  }
1314
1315
  // If last sequence is empty, it means that we don't have files after too-large-to-compact file.
1316
  // So just drop this sequence.
1317
14.1k
  if (ret.back().empty())
1318
0
    ret.pop_back();
1319
14.1k
  return ret;
1320
14.1k
}
1321
1322
#ifndef NDEBUG
1323
namespace {
1324
// smallest_seqno and largest_seqno are set iff. `files` is not empty.
1325
void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
1326
                             SequenceNumber* smallest_seqno,
1327
2.27k
                             SequenceNumber* largest_seqno) {
1328
2.27k
  DCHECK_ONLY_NOTNULL(smallest_seqno);
1329
2.27k
  DCHECK_ONLY_NOTNULL(largest_seqno);
1330
2.27k
  bool is_first = true;
1331
5.32k
  for (FileMetaData* f : files) {
1332
5.32k
    DCHECK_LE(f->smallest.seqno, f->largest.seqno);
1333
5.32k
    if (is_first) {
1334
2.27k
      is_first = false;
1335
2.27k
      *smallest_seqno = f->smallest.seqno;
1336
2.27k
      *largest_seqno = f->largest.seqno;
1337
3.04k
    } else {
1338
3.04k
      if (f->smallest.seqno < *smallest_seqno) {
1339
1
        *smallest_seqno = f->smallest.seqno;
1340
1
      }
1341
3.04k
      if (f->largest.seqno > *largest_seqno) {
1342
1.33k
        *largest_seqno = f->largest.seqno;
1343
1.33k
      }
1344
3.04k
    }
1345
5.32k
  }
1346
2.27k
}
1347
}  // namespace
1348
#endif
1349
1350
// Algorithm that checks to see if there are any overlapping
1351
// files in the input
1352
603
bool CompactionPicker::IsInputNonOverlapping(Compaction* c) {
1353
603
  auto comparator = icmp_->user_comparator();
1354
603
  int first_iter = 1;
1355
1356
603
  InputFileInfo prev, curr, next;
1357
1358
603
  SmallestKeyHeap smallest_key_priority_q =
1359
603
      create_level_heap(c, icmp_->user_comparator());
1360
1361
4.07k
  while (!smallest_key_priority_q.empty()) {
1362
3.47k
    curr = smallest_key_priority_q.top();
1363
3.47k
    smallest_key_priority_q.pop();
1364
1365
3.47k
    if (first_iter) {
1366
603
      prev = curr;
1367
603
      first_iter = 0;
1368
2.87k
    } else {
1369
2.87k
      if (comparator->Compare(prev.f->largest.key.user_key(),
1370
1
                              curr.f->smallest.key.user_key()) >= 0) {
1371
        // found overlapping files, return false
1372
1
        return false;
1373
1
      }
1374
2.87k
      assert(comparator->Compare(curr.f->largest.key.user_key(),
1375
2.87k
                                 prev.f->largest.key.user_key()) > 0);
1376
2.87k
      prev = curr;
1377
2.87k
    }
1378
1379
3.47k
    next.f = nullptr;
1380
1381
3.47k
    if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) {
1382
1.33k
      next.f = c->input(curr.level, curr.index + 1);
1383
1.33k
      next.level = curr.level;
1384
1.33k
      next.index = curr.index + 1;
1385
1.33k
    }
1386
1387
3.47k
    if (next.f) {
1388
1.33k
      smallest_key_priority_q.push(std::move(next));
1389
1.33k
    }
1390
3.47k
  }
1391
602
  return true;
1392
603
}
1393
1394
// Universal style of compaction. Pick files that are contiguous in
1395
// time-range to compact.
1396
//
1397
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompaction(
1398
    const std::string& cf_name,
1399
    const MutableCFOptions& mutable_cf_options,
1400
    VersionStorageInfo* vstorage,
1401
14.1k
    LogBuffer* log_buffer) {
1402
14.1k
  std::vector<std::vector<SortedRun>> sorted_runs = CalculateSortedRuns(
1403
14.1k
      *vstorage,
1404
14.1k
      ioptions_,
1405
14.1k
      mutable_cf_options.MaxFileSizeForCompaction());
1406
1407
14.1k
  for (const auto& block : sorted_runs) {
1408
14.1k
    auto result = DoPickCompaction(cf_name, mutable_cf_options, vstorage, log_buffer, block);
1409
14.1k
    if (result != nullptr) {
1410
4.43k
      return result;
1411
4.43k
    }
1412
14.1k
  }
1413
9.71k
  TEST_SYNC_POINT("UniversalCompactionPicker::PickCompaction:SkippingCompaction");
1414
9.71k
  return nullptr;
1415
14.1k
}
1416
1417
std::unique_ptr<Compaction> UniversalCompactionPicker::DoPickCompaction(
1418
    const std::string& cf_name,
1419
    const MutableCFOptions& mutable_cf_options,
1420
    VersionStorageInfo* vstorage,
1421
    LogBuffer* log_buffer,
1422
14.1k
    const std::vector<UniversalCompactionPicker::SortedRun>& sorted_runs) {
1423
14.1k
  const int kLevel0 = 0;
1424
14.1k
  double score = vstorage->CompactionScore(kLevel0);
1425
14.1k
  std::unique_ptr<Compaction> c;
1426
1427
14.1k
  if (sorted_runs.size() == 0) {
1428
0
    RDEBUG(ioptions_.info_log, "[%s] Universal: nothing to do\n", cf_name.c_str());
1429
0
    return nullptr;
1430
0
  }
1431
14.1k
  VersionStorageInfo::LevelSummaryStorage tmp;
1432
14.1k
  RDEBUG(ioptions_.info_log,
1433
14.1k
         "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
1434
14.1k
         cf_name.c_str(), sorted_runs.size(),
1435
14.1k
         vstorage->LevelSummary(&tmp));
1436
1437
  // First, if we're using a compaction_file_filter_factory, check if we can directly delete this
1438
  // set of files. This type of compaction may include any number of files, regardless of
1439
  // level0_file_num_compaction_trigger.
1440
14.1k
  if (ioptions_.compaction_file_filter_factory) {
1441
0
    c = PickCompactionUniversalDeletion(
1442
0
        cf_name, mutable_cf_options, vstorage, score, sorted_runs, log_buffer);
1443
0
  }
1444
14.1k
  if (c) {
1445
0
    LOG_TO_BUFFER(log_buffer, "[%s] Universal: compacting for direct deletion\n",
1446
0
                  cf_name.c_str());
1447
14.1k
  } else {
1448
    // Check if the number of files to compact is greater than or equal to
1449
    // level0_file_num_compaction_trigger. If so, consider size amplification and
1450
    // read amplification as compaction reasons.
1451
14.1k
    if (sorted_runs.size() <
1452
0
            (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
1453
0
      RDEBUG(ioptions_.info_log, "[%s] Universal: nothing to do\n", cf_name.c_str());
1454
0
      return nullptr;
1455
0
    }
1456
1457
    // Check for size amplification next.
1458
14.1k
    c = PickCompactionUniversalSizeAmp(cf_name, mutable_cf_options, vstorage,
1459
14.1k
                                            score, sorted_runs, log_buffer);
1460
14.1k
    if (c) {
1461
526
      LOG_TO_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
1462
526
                  cf_name.c_str());
1463
13.6k
    } else {
1464
      // Size amplification is within limits. Try reducing read
1465
      // amplification while maintaining file size ratios.
1466
13.6k
      unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
1467
1468
13.6k
      c = PickCompactionUniversalReadAmp(
1469
13.6k
          cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
1470
13.6k
          ioptions_.compaction_options_universal.always_include_size_threshold,
1471
13.6k
          sorted_runs, log_buffer);
1472
13.6k
      if (c) {
1473
3.90k
        LOG_TO_BUFFER(log_buffer, "[%s] Universal: compacting for size ratio\n",
1474
3.90k
                    cf_name.c_str());
1475
9.71k
      } else {
1476
        // ENG-1401: We trigger compaction logic when num files exceeds
1477
        // level0_file_num_compaction_trigger. However, we only want to compact based on
1478
        // files being of comparable sizes. This is already checked in the block above.
1479
        // Previously, if we didn't find any such candidates, then we were falling down
1480
        // into the block below to compact files without regards to their relative sizes,
1481
        // if the number of files is greater than level0_file_num_compaction_trigger.
1482
        // This was causing a lot of read/write amplification.
1483
        //
1484
        // Ideally, we should just remove this block below. For now, putting this
1485
        // under a gflag.
1486
9.71k
        if (FLAGS_aggressive_compaction_for_read_amp) {
1487
          // Size amplification and file size ratios are within configured limits.
1488
          // If max read amplification is exceeding configured limits, then force
1489
          // compaction without looking at filesize ratios and try to reduce
1490
          // the number of files to fewer than level0_file_num_compaction_trigger.
1491
          // This is guaranteed by NeedsCompaction()
1492
0
          assert(sorted_runs.size() >=
1493
0
                static_cast<size_t>(
1494
0
                    mutable_cf_options.level0_file_num_compaction_trigger));
1495
0
          unsigned int num_files =
1496
0
          static_cast<unsigned int>(sorted_runs.size()) -
1497
0
            mutable_cf_options.level0_file_num_compaction_trigger;
1498
0
          if ((c = PickCompactionUniversalReadAmp(
1499
0
                      cf_name, mutable_cf_options, vstorage, score, UINT_MAX, num_files,
1500
0
                      ioptions_.compaction_options_universal.always_include_size_threshold,
1501
0
                      sorted_runs, log_buffer)) != nullptr) {
1502
0
            LOG_TO_BUFFER(log_buffer,
1503
0
                          "[%s] Universal: compacting for file num -- %u\n",
1504
0
                          cf_name.c_str(), num_files);
1505
0
          }
1506
0
        }
1507
9.71k
      }
1508
13.6k
    }
1509
14.1k
  }
1510
14.1k
  if (c == nullptr) {
1511
9.71k
    return nullptr;
1512
9.71k
  }
1513
1514
4.43k
  if (ioptions_.compaction_options_universal.allow_trivial_move == true) {
1515
603
    c->set_is_trivial_move(IsInputNonOverlapping(c.get()));
1516
603
  }
1517
1518
// validate that all the chosen files of L0 are non overlapping in time
1519
4.43k
#ifndef NDEBUG
1520
4.43k
  SequenceNumber prev_smallest_seqno = 0U;
1521
4.43k
  bool is_first = true;
1522
1523
4.43k
  size_t level_index = 0U;
1524
4.43k
  if (c->start_level() == 0) {
1525
13.0k
    for (auto f : *c->inputs(0)) {
1526
13.0k
      DCHECK_LE(f->smallest.seqno, f->largest.seqno);
1527
13.0k
      if (is_first) {
1528
4.43k
        is_first = false;
1529
8.64k
      } else {
1530
8.64k
        DCHECK_GT(prev_smallest_seqno, f->largest.seqno);
1531
8.64k
      }
1532
13.0k
      prev_smallest_seqno = f->smallest.seqno;
1533
13.0k
    }
1534
4.43k
    level_index = 1U;
1535
4.43k
  }
1536
21.0k
  for (; level_index < c->num_input_levels(); level_index++) {
1537
16.6k
    if (c->num_input_files(level_index) != 0) {
1538
2.27k
      SequenceNumber smallest_seqno = 0U;
1539
2.27k
      SequenceNumber largest_seqno = 0U;
1540
2.27k
      GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
1541
2.27k
                              &largest_seqno);
1542
2.27k
      if (is_first) {
1543
2
        is_first = false;
1544
2.27k
      } else if (prev_smallest_seqno > 0) {
1545
        // A level is considered as the bottommost level if there are
1546
        // no files in higher levels or if files in higher levels do
1547
        // not overlap with the files being compacted. Sequence numbers
1548
        // of files in bottommost level can be set to 0 to help
1549
        // compression. As a result, the following assert may not hold
1550
        // if the prev_smallest_seqno is 0.
1551
2.24k
        assert(prev_smallest_seqno > largest_seqno);
1552
2.24k
      }
1553
2.27k
      prev_smallest_seqno = smallest_seqno;
1554
2.27k
    }
1555
16.6k
  }
1556
4.43k
#endif
1557
  // update statistics
1558
4.43k
  MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
1559
4.43k
              c->inputs(0)->size());
1560
1561
4.43k
  level0_compactions_in_progress_.insert(c.get());
1562
1563
4.43k
  return c;
1564
4.43k
}
1565
1566
uint32_t UniversalCompactionPicker::GetPathId(
1567
4.43k
    const ImmutableCFOptions& ioptions, uint64_t file_size) {
1568
  // Two conditions need to be satisfied:
1569
  // (1) the target path needs to be able to hold the file's size
1570
  // (2) Total size left in this and previous paths need to be not
1571
  //     smaller than expected future file size before this new file is
1572
  //     compacted, which is estimated based on size_ratio.
1573
  // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
1574
  // we will make sure the target file, probably with size of 16, will be
1575
  // placed in a path so that eventually when new files are generated and
1576
  // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
1577
  // before the path we chose.
1578
  //
1579
  // TODO(sdong): now the case of multiple column families is not
1580
  // considered in this algorithm. So the target size can be violated in
1581
  // that case. We need to improve it.
1582
4.43k
  uint64_t accumulated_size = 0;
1583
4.43k
  uint64_t future_size = file_size *
1584
4.43k
    (100 - ioptions.compaction_options_universal.size_ratio) / 100;
1585
4.43k
  uint32_t p = 0;
1586
4.43k
  assert(!ioptions.db_paths.empty());
1587
4.51k
  for (; p < ioptions.db_paths.size() - 1; p++) {
1588
126
    uint64_t target_size = ioptions.db_paths[p].target_size;
1589
126
    if (target_size > file_size &&
1590
78
        accumulated_size + (target_size - file_size) > future_size) {
1591
48
      return p;
1592
48
    }
1593
78
    accumulated_size += target_size;
1594
78
  }
1595
4.38k
  return p;
1596
4.43k
}
1597
1598
//
1599
// Consider compaction files based on their size differences with
1600
// the next file in time order.
1601
//
1602
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompactionUniversalReadAmp(
1603
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1604
    VersionStorageInfo* vstorage, double score, unsigned int ratio,
1605
    unsigned int max_number_of_files_to_compact, size_t always_include_size_threshold,
1606
13.6k
    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
1607
13.6k
  unsigned int min_merge_width =
1608
13.6k
    ioptions_.compaction_options_universal.min_merge_width;
1609
13.6k
  unsigned int max_merge_width =
1610
13.6k
    ioptions_.compaction_options_universal.max_merge_width;
1611
1612
13.6k
  const SortedRun* sr = nullptr;
1613
13.6k
  bool done = false;
1614
13.6k
  size_t start_index = 0;
1615
13.6k
  unsigned int candidate_count = 0;
1616
1617
13.6k
  unsigned int max_files_to_compact = std::min(max_merge_width,
1618
13.6k
                                       max_number_of_files_to_compact);
1619
13.6k
  min_merge_width = std::max(min_merge_width, 2U);
1620
1621
  // Caller checks the size before executing this function. This invariant is
1622
  // important because otherwise we may have a possible integer underflow when
1623
  // dealing with unsigned types.
1624
13.6k
  assert(sorted_runs.size() > 0);
1625
1626
  // Considers a candidate file only if it is smaller than the
1627
  // total size accumulated so far.
1628
33.5k
  for (size_t loop = 0; loop < sorted_runs.size(); loop++) {
1629
23.8k
    candidate_count = 0;
1630
1631
    // Skip files that are already being compacted
1632
50.8k
    for (sr = nullptr; loop < sorted_runs.size(); loop++) {
1633
48.4k
      sr = &sorted_runs[loop];
1634
1635
48.4k
      if (!sr->being_compacted) {
1636
21.4k
        candidate_count = 1;
1637
21.4k
        break;
1638
21.4k
      }
1639
27.0k
      char file_num_buf[kFormatFileNumberBufSize];
1640
27.0k
      sr->Dump(file_num_buf, sizeof(file_num_buf));
1641
27.0k
      LOG_TO_BUFFER(log_buffer,
1642
27.0k
                  "[%s] Universal: %s"
1643
27.0k
                  "[%d] being compacted, skipping",
1644
27.0k
                  cf_name.c_str(), file_num_buf, loop);
1645
1646
27.0k
      sr = nullptr;
1647
27.0k
    }
1648
1649
    // This file is not being compacted. Consider it as the
1650
    // first candidate to be compacted.
1651
21.4k
    uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
1652
23.8k
    if (sr != nullptr) {
1653
21.4k
      char file_num_buf[kFormatFileNumberBufSize];
1654
21.4k
      sr->Dump(file_num_buf, sizeof(file_num_buf), true);
1655
21.4k
      RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d].",
1656
21.4k
             cf_name.c_str(), file_num_buf, loop);
1657
21.4k
    }
1658
1659
    // Check if the succeeding files need compaction.
1660
23.8k
    for (size_t i = loop + 1;
1661
33.4k
         candidate_count < max_files_to_compact && i < sorted_runs.size();
1662
23.0k
         i++) {
1663
23.0k
      const SortedRun* succeeding_sr = &sorted_runs[i];
1664
23.0k
      if (succeeding_sr->being_compacted) {
1665
1.84k
        break;
1666
1.84k
      }
1667
      // Pick files if the total/last candidate file size (increased by the specified ratio) is
1668
      // still larger than the next candidate file or if the next candidate file has size no more
1669
      // than always_include_size_threshold.
1670
      // candidate_size is the total size of files picked so far with the default
1671
      // kCompactionStopStyleTotalSize;
1672
      // with kCompactionStopStyleSimilarSize, it's simply the size of the last picked file.
1673
21.1k
      const bool is_include_by_threshold = succeeding_sr->size <= always_include_size_threshold;
1674
21.1k
      double sz = candidate_size * (100.0 + ratio) / 100.0;
1675
21.1k
      if (sz < static_cast<double>(succeeding_sr->size) && !is_include_by_threshold) {
1676
11.5k
        break;
1677
11.5k
      }
1678
9.67k
      if (ioptions_.compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
1679
18
        if (!is_include_by_threshold) {
1680
          // Similar-size stopping rule: also check the last picked file isn't
1681
          // far larger than the next candidate file.
1682
18
          sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
1683
18
          if (sz < static_cast<double>(candidate_size)) {
1684
            // If the small file we've encountered begins a run of similar-size
1685
            // files, we'll pick them up on a future iteration of the outer
1686
            // loop. If it's some lonely straggler, it'll eventually get picked
1687
            // by the last-resort read amp strategy which disregards size ratios.
1688
12
            break;
1689
12
          }
1690
6
          candidate_size = succeeding_sr->compensated_file_size;
1691
6
        }
1692
9.65k
      } else {  // default kCompactionStopStyleTotalSize
1693
9.65k
        candidate_size += succeeding_sr->compensated_file_size;
1694
9.65k
      }
1695
9.66k
      candidate_count++;
1696
9.66k
    }
1697
1698
    // Found a series of consecutive files that need compaction.
1699
23.8k
    if (candidate_count >= (unsigned int)min_merge_width) {
1700
3.90k
      start_index = loop;
1701
3.90k
      done = true;
1702
3.90k
      break;
1703
19.9k
    } else {
1704
19.9k
#ifndef NDEBUG
1705
19.9k
      for (size_t i = loop;
1706
37.8k
           i < loop + candidate_count && i < sorted_runs.size(); i++) {
1707
17.9k
        const SortedRun* skipping_sr = &sorted_runs[i];
1708
17.9k
        char file_num_buf[kFormatFileSizeInfoBufSize];
1709
17.9k
        skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1710
17.9k
        RDEBUG(ioptions_.info_log, "[%s] Universal: Skipping %s", cf_name.c_str(), file_num_buf);
1711
17.9k
      }
1712
19.9k
#endif
1713
19.9k
    }
1714
23.8k
  }
1715
13.6k
  if (!done || candidate_count <= 1) {
1716
9.71k
    return nullptr;
1717
9.71k
  }
1718
3.90k
  size_t first_index_after = start_index + candidate_count;
1719
  // Compression is enabled if files compacted earlier already reached
1720
  // size ratio of compression.
1721
3.90k
  bool enable_compression = true;
1722
3.90k
  int ratio_to_compress =
1723
3.90k
      ioptions_.compaction_options_universal.compression_size_percent;
1724
3.90k
  if (ratio_to_compress >= 0) {
1725
90
    uint64_t total_size = 0;
1726
306
    for (auto& sorted_run : sorted_runs) {
1727
306
      total_size += sorted_run.compensated_file_size;
1728
306
    }
1729
1730
90
    uint64_t older_file_size = 0;
1731
138
    for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
1732
60
      older_file_size += sorted_runs[i].size;
1733
60
      if (older_file_size * 100L >= total_size * static_cast<int64_t>(ratio_to_compress)) {
1734
12
        enable_compression = false;
1735
12
        break;
1736
12
      }
1737
60
    }
1738
90
  }
1739
1740
3.90k
  uint64_t estimated_total_size = 0;
1741
17.5k
  for (unsigned int i = 0; i < first_index_after; i++) {
1742
13.6k
    estimated_total_size += sorted_runs[i].size;
1743
13.6k
  }
1744
3.90k
  uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
1745
3.90k
  int start_level = sorted_runs[start_index].level;
1746
3.90k
  int output_level;
1747
3.90k
  if (first_index_after == sorted_runs.size()) {
1748
672
    output_level = vstorage->num_levels() - 1;
1749
3.23k
  } else if (sorted_runs[first_index_after].level == 0) {
1750
1.44k
    output_level = 0;
1751
1.78k
  } else {
1752
1.78k
    output_level = sorted_runs[first_index_after].level - 1;
1753
1.78k
  }
1754
1755
3.90k
  std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
1756
22.3k
  for (size_t i = 0; i < inputs.size(); ++i) {
1757
18.3k
    inputs[i].level = start_level + static_cast<int>(i);
1758
18.3k
  }
1759
17.0k
  for (size_t i = start_index; i < first_index_after; i++) {
1760
13.1k
    auto& picking_sr = sorted_runs[i];
1761
13.1k
    if (picking_sr.level == 0) {
1762
11.2k
      FileMetaData* picking_file = picking_sr.file;
1763
11.2k
      inputs[0].files.push_back(picking_file);
1764
1.89k
    } else {
1765
1.89k
      auto& files = inputs[picking_sr.level - start_level].files;
1766
2.51k
      for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
1767
2.51k
        files.push_back(f);
1768
2.51k
      }
1769
1.89k
    }
1770
13.1k
    char file_num_buf[kFormatFileSizeInfoBufSize];
1771
13.1k
    picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
1772
13.1k
    LOG_TO_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
1773
13.1k
                file_num_buf);
1774
13.1k
  }
1775
1776
3.90k
  CompactionReason compaction_reason;
1777
3.90k
  if (max_number_of_files_to_compact == UINT_MAX) {
1778
3.90k
    compaction_reason = CompactionReason::kUniversalSortedRunNum;
1779
0
  } else {
1780
0
    compaction_reason = CompactionReason::kUniversalSizeRatio;
1781
0
  }
1782
3.90k
  return Compaction::Create(
1783
3.90k
      vstorage, mutable_cf_options, std::move(inputs), output_level,
1784
3.90k
      mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
1785
3.90k
      GetCompressionType(ioptions_, start_level, 1, enable_compression),
1786
3.90k
      /* grandparents = */ std::vector<FileMetaData*>(), ioptions_.info_log,
1787
3.90k
      /* is_manual = */ false, score,
1788
3.90k
      /* deletion_compaction = */ false, compaction_reason);
1789
3.90k
}
1790
1791
// Look to see if all files within the run are marked for deletion.
1792
// If so, we can run a low-cost compaction that just deletes those files.
1793
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompactionUniversalDeletion(
1794
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1795
    VersionStorageInfo* vstorage, double score,
1796
0
    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
1797
1798
  // Universal deletion compaction is currently only compatible with Level-0
1799
  // universal compactions.
1800
0
  if (vstorage->num_levels() > 1) {
1801
0
    RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
1802
0
        "[%s] Unexpected number of storage levels during universal deletion "
1803
0
        "(expected 1, was %d).",
1804
0
        cf_name.c_str(), vstorage->num_levels());
1805
0
    return nullptr;
1806
0
  }
1807
0
  std::vector<CompactionInputFiles> inputs(1);
1808
0
  auto& input_files = inputs[0];
1809
1810
  // Check each file to see if they are marked for deletion.
1811
0
  for (size_t loop = 0; loop < sorted_runs.size(); loop++) {
1812
0
    const auto sr = &sorted_runs[loop];
1813
1814
0
    if (!sr->being_compacted && sr->delete_after_compaction()) {
1815
0
      FileMetaData* f = sr->file;
1816
0
      input_files.files.push_back(f);
1817
1818
0
      char file_num_buf[kFormatFileSizeInfoBufSize];
1819
0
      sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1820
0
      LOG_TO_BUFFER(log_buffer, "[%s] Universal: file deletion picking %s",
1821
0
                  cf_name.c_str(), file_num_buf);
1822
0
    } else {
1823
0
      RDEBUG(ioptions_.info_log, "[%s] Universal: skipping %s[%d] compacted %s\n",
1824
0
          cf_name.c_str(), sr->FileNumAsString().c_str(), loop,
1825
0
          sr->delete_after_compaction() ?
1826
0
              "is not marked for deletion." : "is already being compacted.");
1827
0
    }
1828
0
  }
1829
  // If no files are marked for deletion, return nullptr.
1830
0
  if (input_files.size() == 0) {
1831
0
    return nullptr; // no candidate files
1832
0
  }
1833
1834
0
  return Compaction::Create(
1835
0
      vstorage,
1836
0
      mutable_cf_options,
1837
0
      std::move(inputs),
1838
0
      /* output level = */ 0,
1839
0
      mutable_cf_options.MaxFileSizeForLevel(0),
1840
      /* max_grandparent_overlap_bytes = */ LLONG_MAX,
1841
0
      /* path_id = */ 0,
1842
0
      GetCompressionType(ioptions_, 0, 1),
1843
0
      /* grandparents = */ {},
1844
0
      ioptions_.info_log,
1845
0
      /* is manual = */ false,
1846
0
      score,
1847
0
      /* deletion_compaction not currently used, but could be in the future */ false,
1848
0
      CompactionReason::kUniversalDirectDeletion);
1849
0
}
1850
1851
// Look at overall size amplification. If size amplification
1852
// exceeeds the configured value, then do a compaction
1853
// of the candidate files all the way upto the earliest
1854
// base file (overrides configured values of file-size ratios,
1855
// min_merge_width and max_merge_width).
1856
//
1857
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
1858
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1859
    VersionStorageInfo* vstorage, double score,
1860
14.1k
    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
1861
  // percentage flexibilty while reducing size amplification
1862
14.1k
  uint64_t ratio = ioptions_.compaction_options_universal.
1863
14.1k
                     max_size_amplification_percent;
1864
1865
14.1k
  unsigned int candidate_count = 0;
1866
14.1k
  uint64_t candidate_size = 0;
1867
14.1k
  size_t start_index = 0;
1868
14.1k
  const SortedRun* sr = nullptr;
1869
1870
  // Skip files that are already being compacted
1871
36.1k
  for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) {
1872
30.6k
    sr = &sorted_runs[loop];
1873
30.6k
    if (!sr->being_compacted) {
1874
8.65k
      start_index = loop;         // Consider this as the first candidate.
1875
8.65k
      break;
1876
8.65k
    }
1877
22.0k
    RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d] is already being "
1878
22.0k
        "compacted. Cannot be a candidate to reduce size amp.\n",
1879
22.0k
        cf_name.c_str(), sr->FileNumAsString().c_str(), loop);
1880
22.0k
    sr = nullptr;
1881
22.0k
  }
1882
1883
14.1k
  if (sr == nullptr) {
1884
5.49k
    return nullptr;             // no candidate files
1885
5.49k
  }
1886
8.65k
  {
1887
8.65k
    char file_num_buf[kFormatFileNumberBufSize];
1888
8.65k
    sr->Dump(file_num_buf, sizeof(file_num_buf), true);
1889
8.65k
    RDEBUG(ioptions_.info_log,
1890
8.65k
           "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
1891
8.65k
           cf_name.c_str(), file_num_buf, start_index,
1892
8.65k
           " to reduce size amp.\n");
1893
8.65k
  }
1894
1895
  // keep adding up all the remaining files
1896
33.0k
  for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) {
1897
25.9k
    sr = &sorted_runs[loop];
1898
25.9k
    if (sr->being_compacted) {
1899
1.49k
      RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d] is already being "
1900
1.49k
          "compacted. No size amp reduction possible.\n",
1901
1.49k
          cf_name.c_str(), sr->FileNumAsString().c_str(), loop);
1902
1.49k
      return nullptr;
1903
1.49k
    }
1904
24.4k
    candidate_size += sr->compensated_file_size;
1905
24.4k
    candidate_count++;
1906
24.4k
  }
1907
7.15k
  if (candidate_count == 0) {
1908
0
    return nullptr;
1909
0
  }
1910
1911
  // Check that the earliest file is not already being compacted.
1912
7.15k
  sr = &sorted_runs.back();
1913
7.15k
  if (sr->being_compacted) {
1914
1
    RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d] is already being "
1915
1
          "compacted. No size amp reduction possible.\n",
1916
1
          cf_name.c_str(), sr->FileNumAsString().c_str(), sorted_runs.size() - 1);
1917
1
    return nullptr;
1918
1
  }
1919
  // size of earliest file
1920
7.15k
  uint64_t earliest_file_size = sr->size;
1921
1922
  // size amplification = percentage of additional size
1923
7.15k
  if (candidate_size * 100 < ratio * earliest_file_size) {
1924
6.62k
    RDEBUG(ioptions_.info_log,
1925
6.62k
           "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
1926
6.62k
           " earliest-file-size %" PRIu64,
1927
6.62k
           cf_name.c_str(), candidate_size, earliest_file_size);
1928
6.62k
    return nullptr;
1929
526
  } else {
1930
526
    RDEBUG(ioptions_.info_log,
1931
526
           "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
1932
526
           " earliest-file-size %" PRIu64,
1933
526
           cf_name.c_str(), candidate_size, earliest_file_size);
1934
526
  }
1935
526
  assert(start_index < sorted_runs.size() - 1);
1936
1937
  // Estimate total file size
1938
526
  uint64_t estimated_total_size = 0;
1939
2.73k
  for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
1940
2.21k
    estimated_total_size += sorted_runs[loop].size;
1941
2.21k
  }
1942
526
  uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
1943
526
  int start_level = sorted_runs[start_index].level;
1944
1945
526
  std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
1946
3.17k
  for (size_t i = 0; i < inputs.size(); ++i) {
1947
2.64k
    inputs[i].level = start_level + static_cast<int>(i);
1948
2.64k
  }
1949
  // We always compact all the files, so always compress.
1950
2.73k
  for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
1951
2.21k
    auto& picking_sr = sorted_runs[loop];
1952
2.21k
    if (picking_sr.level == 0) {
1953
1.82k
      FileMetaData* f = picking_sr.file;
1954
1.82k
      inputs[0].files.push_back(f);
1955
383
    } else {
1956
383
      auto& files = inputs[picking_sr.level - start_level].files;
1957
2.80k
      for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
1958
2.80k
        files.push_back(f);
1959
2.80k
      }
1960
383
    }
1961
2.21k
    char file_num_buf[kFormatFileSizeInfoBufSize];
1962
2.21k
    picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1963
2.21k
    LOG_TO_BUFFER(log_buffer, "[%s] Universal: size amp picking %s",
1964
2.21k
                cf_name.c_str(), file_num_buf);
1965
2.21k
  }
1966
1967
526
  return Compaction::Create(
1968
526
      vstorage, mutable_cf_options, std::move(inputs), vstorage->num_levels() - 1,
1969
526
      mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
1970
      /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
1971
526
      GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1),
1972
526
      /* grandparents */ std::vector<FileMetaData*>(), ioptions_.info_log, /* is manual = */ false,
1973
526
      score, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification);
1974
7.15k
}
1975
1976
bool FIFOCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
1977
549
    const {
1978
549
  const int kLevel0 = 0;
1979
549
  return vstorage->CompactionScore(kLevel0) >= 1;
1980
549
}
1981
1982
std::unique_ptr<Compaction> FIFOCompactionPicker::PickCompaction(
1983
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1984
27
    VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
1985
27
  assert(vstorage->num_levels() == 1);
1986
27
  const int kLevel0 = 0;
1987
27
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
1988
27
  uint64_t total_size = 0;
1989
94
  for (const auto& file : level_files) {
1990
94
    total_size += file->fd.total_file_size;
1991
94
  }
1992
1993
27
  if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
1994
20
      level_files.size() == 0) {
1995
    // total size not exceeded
1996
7
    RDEBUG(ioptions_.info_log,
1997
7
           "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
1998
7
           ", max size %" PRIu64 "\n",
1999
7
           cf_name.c_str(), total_size,
2000
7
           ioptions_.compaction_options_fifo.max_table_files_size);
2001
7
    return nullptr;
2002
7
  }
2003
2004
20
  if (!level0_compactions_in_progress_.empty()) {
2005
8
    RDEBUG(ioptions_.info_log,
2006
8
           "[%s] FIFO compaction: Already executing compaction. No need "
2007
8
           "to run parallel compactions since compactions are very fast",
2008
8
           cf_name.c_str());
2009
8
    return nullptr;
2010
8
  }
2011
2012
12
  std::vector<CompactionInputFiles> inputs;
2013
12
  inputs.emplace_back();
2014
12
  inputs[0].level = 0;
2015
  // delete old files (FIFO)
2016
16
  for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
2017
16
    auto f = *ritr;
2018
16
    total_size -= f->compensated_file_size;
2019
16
    inputs[0].files.push_back(f);
2020
16
    char tmp_fsize[16];
2021
16
    AppendHumanBytes(f->fd.GetTotalFileSize(), tmp_fsize, sizeof(tmp_fsize));
2022
16
    LOG_TO_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
2023
16
                            " with size %s for deletion",
2024
16
                cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
2025
16
    if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
2026
12
      break;
2027
12
    }
2028
16
  }
2029
12
  auto c = Compaction::Create(
2030
12
      vstorage, mutable_cf_options, std::move(inputs), 0 /* output_level */,
2031
12
      0 /* target_file_size */, 0 /* max_grandparent_overlap_bytes */, 0 /* output_path_id */,
2032
12
      kNoCompression, std::vector<FileMetaData*>(), ioptions_.info_log, /* is manual */ false,
2033
12
      vstorage->CompactionScore(0),
2034
12
      /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
2035
12
  if (c) {
2036
12
    level0_compactions_in_progress_.insert(c.get());
2037
12
  }
2038
12
  return c;
2039
12
}
2040
2041
std::unique_ptr<Compaction> FIFOCompactionPicker::CompactRange(
2042
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
2043
    VersionStorageInfo* vstorage, int input_level, int output_level,
2044
    uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
2045
11
    InternalKey** compaction_end, bool* manual_conflict) {
2046
11
  assert(input_level == 0);
2047
11
  assert(output_level == 0);
2048
11
  *compaction_end = nullptr;
2049
11
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
2050
11
  auto c = PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
2051
11
  log_buffer.FlushBufferToLog();
2052
11
  return c;
2053
11
}
2054
2055
#endif  // !ROCKSDB_LITE
2056
2057
}  // namespace rocksdb