YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_picker.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
//
14
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
15
//  This source code is licensed under the BSD-style license found in the
16
//  LICENSE file in the root directory of this source tree. An additional grant
17
//  of patent rights can be found in the PATENTS file in the same directory.
18
//
19
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
20
// Use of this source code is governed by a BSD-style license that can be
21
// found in the LICENSE file. See the AUTHORS file for names of contributors.
22
23
#include "yb/rocksdb/db/compaction_picker.h"
24
#include "yb/rocksdb/immutable_options.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
32
#include <limits>
33
#include <queue>
34
#include <string>
35
#include <utility>
36
37
38
#include "yb/rocksdb/compaction_filter.h"
39
#include "yb/rocksdb/db/column_family.h"
40
#include "yb/rocksdb/db/filename.h"
41
#include "yb/rocksdb/db/version_set.h"
42
#include "yb/rocksdb/util/log_buffer.h"
43
#include "yb/rocksdb/util/logging.h"
44
#include "yb/rocksdb/util/random.h"
45
#include "yb/rocksdb/util/statistics.h"
46
#include "yb/util/string_util.h"
47
#include "yb/rocksdb/util/sync_point.h"
48
49
#include "yb/util/logging.h"
50
#include <glog/logging.h>
51
52
DEFINE_bool(aggressive_compaction_for_read_amp, false,
53
            "Determines if we should compact aggressively to reduce read amplification based on "
54
            "number of files alone, without regards to relative sizes of the SSTable files.");
55
56
namespace rocksdb {
57
58
namespace {
59
16.3k
uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
60
16.3k
  uint64_t sum = 0;
61
55.8k
  for (size_t i = 0; i < files.size() && 
files[i]39.4k
;
i++39.4k
) {
62
39.4k
    sum += files[i]->compensated_file_size;
63
39.4k
  }
64
16.3k
  return sum;
65
16.3k
}
66
67
// Universal compaction is not supported in ROCKSDB_LITE
68
#ifndef ROCKSDB_LITE
69
70
// Used in universal compaction when trivial move is enabled.
71
// This structure is used for the construction of min heap
72
// that contains the file meta data, the level of the file
73
// and the index of the file in that level
74
75
struct InputFileInfo {
76
2.41k
  InputFileInfo() : f(nullptr) {}
77
78
  FileMetaData* f;
79
  size_t level;
80
  size_t index;
81
};
82
83
// Used in universal compaction when trivial move is enabled.
84
// This comparator is used for the construction of min heap
85
// based on the smallest key of the file.
86
struct UserKeyComparator {
87
604
  explicit UserKeyComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
88
89
11.5k
  bool operator()(InputFileInfo i1, InputFileInfo i2) const {
90
11.5k
    return (ucmp_->Compare(i1.f->smallest.key.user_key(),
91
11.5k
                           i2.f->smallest.key.user_key()) > 0);
92
11.5k
  }
93
94
 private:
95
  const Comparator* ucmp_;
96
};
97
98
typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
99
                            UserKeyComparator> SmallestKeyHeap;
100
constexpr auto kFormatFileSizeInfoBufSize = 256;
101
102
// This function creates the heap that is used to find if the files are
103
// overlapping during universal compaction when the allow_trivial_move
104
// is set.
105
604
SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
106
604
  SmallestKeyHeap smallest_key_priority_q =
107
604
      SmallestKeyHeap(UserKeyComparator(ucmp));
108
109
604
  InputFileInfo input_file;
110
111
3.73k
  for (size_t l = 0; l < c->num_input_levels(); 
l++3.13k
) {
112
3.13k
    if (c->num_input_files(l) != 0) {
113
800
      if (l == 0 && 
c->start_level() == 0604
) {
114
2.55k
        for (size_t i = 0; i < c->num_input_files(0); 
i++1.94k
) {
115
1.94k
          input_file.f = c->input(0, i);
116
1.94k
          input_file.level = 0;
117
1.94k
          input_file.index = i;
118
1.94k
          smallest_key_priority_q.push(std::move(input_file));
119
1.94k
        }
120
604
      } else {
121
196
        input_file.f = c->input(l, 0);
122
196
        input_file.level = l;
123
196
        input_file.index = 0;
124
196
        smallest_key_priority_q.push(std::move(input_file));
125
196
      }
126
800
    }
127
3.13k
  }
128
604
  return smallest_key_priority_q;
129
604
}
130
#endif  // !ROCKSDB_LITE
131
}  // anonymous namespace
132
133
// Determine compression type, based on user options, level of the output
134
// file and whether compression is disabled.
135
// If enable_compression is false, then compression is always disabled no
136
// matter what the values of the other two parameters are.
137
// Otherwise, the compression type is determined based on options and level.
138
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
139
                                   int level, int base_level,
140
36.0k
                                   const bool enable_compression) {
141
36.0k
  if (!enable_compression) {
142
    // disable compression
143
12
    return kNoCompression;
144
12
  }
145
  // If the use has specified a different compression level for each level,
146
  // then pick the compression for that level.
147
35.9k
  if (!ioptions.compression_per_level.empty()) {
148
1.61k
    assert(level == 0 || level >= base_level);
149
1.61k
    int idx = (level == 0) ? 
052
:
level - base_level + 11.56k
;
150
151
1.61k
    const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
152
    // It is possible for level_ to be -1; in that case, we use level
153
    // 0's compression.  This occurs mostly in backwards compatibility
154
    // situations when the builder doesn't know what level the file
155
    // belongs to.  Likewise, if level is beyond the end of the
156
    // specified compression levels, use the last value.
157
1.61k
    return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
158
34.3k
  } else {
159
34.3k
    return ioptions.compression;
160
34.3k
  }
161
35.9k
}
162
163
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
164
                                   const InternalKeyComparator* icmp)
165
442k
    : ioptions_(ioptions), icmp_(icmp) {}
166
167
402k
CompactionPicker::~CompactionPicker() {}
168
169
// Delete this compaction from the list of running compactions.
170
23.7k
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
171
23.7k
  if (c->start_level() == 0 ||
172
23.7k
      
ioptions_.compaction_style == kCompactionStyleUniversal10.4k
) {
173
13.3k
    level0_compactions_in_progress_.erase(c);
174
13.3k
  }
175
23.7k
  if (!status.ok()) {
176
120
    c->ResetNextCompactionIndex();
177
120
  }
178
23.7k
}
179
180
void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
181
77.1k
                                InternalKey* smallest, InternalKey* largest) {
182
77.1k
  const int level = inputs.level;
183
77.1k
  assert(!inputs.empty());
184
0
  smallest->Clear();
185
77.1k
  largest->Clear();
186
187
77.1k
  if (level == 0) {
188
95.9k
    for (size_t i = 0; i < inputs.size(); 
i++63.4k
) {
189
63.4k
      FileMetaData* f = inputs[i];
190
63.4k
      if (i == 0) {
191
32.5k
        *smallest = f->smallest.key;
192
32.5k
        *largest = f->largest.key;
193
32.5k
      } else {
194
30.8k
        if (icmp_->Compare(f->smallest.key, *smallest) < 0) {
195
6.61k
          *smallest = f->smallest.key;
196
6.61k
        }
197
30.8k
        if (icmp_->Compare(f->largest.key, *largest) > 0) {
198
10.8k
          *largest = f->largest.key;
199
10.8k
        }
200
30.8k
      }
201
63.4k
    }
202
44.6k
  } else {
203
44.6k
    *smallest = inputs[0]->smallest.key;
204
44.6k
    *largest = inputs[inputs.size() - 1]->largest.key;
205
44.6k
  }
206
77.1k
}
207
208
void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
209
                                const CompactionInputFiles& inputs2,
210
24.3k
                                InternalKey* smallest, InternalKey* largest) {
211
24.3k
  assert(!inputs1.empty() || !inputs2.empty());
212
24.3k
  if (inputs1.empty()) {
213
0
    GetRange(inputs2, smallest, largest);
214
24.3k
  } else if (inputs2.empty()) {
215
13.4k
    GetRange(inputs1, smallest, largest);
216
13.4k
  } else {
217
10.8k
    InternalKey smallest1, smallest2, largest1, largest2;
218
10.8k
    GetRange(inputs1, &smallest1, &largest1);
219
10.8k
    GetRange(inputs2, &smallest2, &largest2);
220
10.8k
    *smallest = icmp_->Compare(smallest1, smallest2) < 0 ?
221
6.25k
                smallest1 : 
smallest24.63k
;
222
10.8k
    *largest = icmp_->Compare(largest1, largest2) < 0 ?
223
8.49k
               largest2 : 
largest12.39k
;
224
10.8k
  }
225
24.3k
}
226
227
bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
228
                                              VersionStorageInfo* vstorage,
229
18.8k
                                              CompactionInputFiles* inputs) {
230
  // This isn't good compaction
231
18.8k
  assert(!inputs->empty());
232
233
0
  const int level = inputs->level;
234
  // GetOverlappingInputs will always do the right thing for level-0.
235
  // So we don't need to do any expansion if level == 0.
236
18.8k
  if (level == 0) {
237
8.46k
    return true;
238
8.46k
  }
239
240
10.4k
  InternalKey smallest, largest;
241
242
  // Keep expanding inputs until we are sure that there is a "clean cut"
243
  // boundary between the files in input and the surrounding files.
244
  // This will ensure that no parts of a key are lost during compaction.
245
10.4k
  int hint_index = -1;
246
10.4k
  size_t old_size;
247
10.4k
  do {
248
10.4k
    old_size = inputs->size();
249
10.4k
    GetRange(*inputs, &smallest, &largest);
250
10.4k
    inputs->clear();
251
10.4k
    vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
252
10.4k
                                   hint_index, &hint_index);
253
10.4k
  } while (inputs->size() > old_size);
254
255
  // we started off with inputs non-empty and the previous loop only grew
256
  // inputs. thus, inputs should be non-empty here
257
10.4k
  assert(!inputs->empty());
258
259
  // If, after the expansion, there are files that are already under
260
  // compaction, then we must drop/cancel this compaction.
261
10.4k
  if (FilesInCompaction(inputs->files)) {
262
0
    RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
263
0
        "[%s] ExpandWhileOverlapping() failure because some of the necessary"
264
0
        " compaction input files are currently being compacted.",
265
0
        cf_name.c_str());
266
0
    return false;
267
0
  }
268
10.4k
  return true;
269
10.4k
}
270
271
// Returns true if any one of specified files are being compacted
272
bool CompactionPicker::FilesInCompaction(
273
59.2k
    const std::vector<FileMetaData*>& files) {
274
117k
  for (size_t i = 0; i < files.size(); 
i++58.2k
) {
275
59.3k
    if (files[i]->being_compacted) {
276
1.03k
      return true;
277
1.03k
    }
278
59.3k
  }
279
58.2k
  return false;
280
59.2k
}
281
282
std::unique_ptr<Compaction> CompactionPicker::FormCompaction(
283
    const CompactionOptions& compact_options,
284
    const std::vector<CompactionInputFiles>& input_files, int output_level,
285
    VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
286
39
    uint32_t output_path_id) const {
287
39
  uint64_t max_grandparent_overlap_bytes =
288
39
      output_level + 1 < vstorage->num_levels() ?
289
3
          mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
290
39
          
std::numeric_limits<uint64_t>::max()36
;
291
39
  DCHECK_GT(input_files.size(), 0);
292
39
  return Compaction::Create(
293
39
      vstorage, mutable_cf_options, input_files, output_level,
294
39
      compact_options.output_file_size_limit, max_grandparent_overlap_bytes, output_path_id,
295
39
      compact_options.compression,
296
39
      /* grandparents = */ std::vector<FileMetaData*>(), ioptions_.info_log, true);
297
39
}
298
299
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
300
    std::vector<CompactionInputFiles>* input_files,
301
    std::unordered_set<uint64_t>* input_set,
302
    const VersionStorageInfo* vstorage,
303
39
    const CompactionOptions& compact_options) const {
304
39
  if (input_set->size() == 0U) {
305
0
    return STATUS(InvalidArgument,
306
0
        "Compaction must include at least one file.");
307
0
  }
308
39
  assert(input_files);
309
310
0
  std::vector<CompactionInputFiles> matched_input_files;
311
39
  matched_input_files.resize(vstorage->num_levels());
312
39
  int first_non_empty_level = -1;
313
39
  int last_non_empty_level = -1;
314
  // TODO(yhchiang): use a lazy-initialized mapping from
315
  //                 file_number to FileMetaData in Version.
316
96
  for (int level = 0; level < vstorage->num_levels(); 
++level57
) {
317
161
    for (auto file : vstorage->LevelFiles(level)) {
318
161
      auto iter = input_set->find(file->fd.GetNumber());
319
161
      if (iter != input_set->end()) {
320
107
        matched_input_files[level].files.push_back(file);
321
107
        input_set->erase(iter);
322
107
        last_non_empty_level = level;
323
107
        if (first_non_empty_level == -1) {
324
39
          first_non_empty_level = level;
325
39
        }
326
107
      }
327
161
    }
328
57
  }
329
330
39
  if (!input_set->empty()) {
331
0
    std::string message(
332
0
        "Cannot find matched SST files for the following file numbers:");
333
0
    for (auto fn : *input_set) {
334
0
      message += " ";
335
0
      message += ToString(fn);
336
0
    }
337
0
    return STATUS(InvalidArgument, message);
338
0
  }
339
340
39
  for (int level = first_non_empty_level;
341
78
       level <= last_non_empty_level; 
++level39
) {
342
39
    matched_input_files[level].level = level;
343
39
    input_files->emplace_back(std::move(matched_input_files[level]));
344
39
  }
345
346
39
  return Status::OK();
347
39
}
348
349
350
351
// Returns true if any one of the parent files are being compacted
352
bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage,
353
                                         const InternalKey* smallest,
354
                                         const InternalKey* largest,
355
                                         int level,
356
20.5k
                                         int* level_index) {
357
20.5k
  std::vector<FileMetaData*> inputs;
358
20.5k
  assert(level < NumberLevels());
359
360
0
  vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
361
20.5k
                                 *level_index, level_index);
362
20.5k
  return FilesInCompaction(inputs);
363
20.5k
}
364
365
// Populates the set of inputs of all other levels that overlap with the
366
// start level.
367
// Now we assume all levels except start level and output level are empty.
368
// Will also attempt to expand "start level" if that doesn't expand
369
// "output level" or cause "level" to include a file for compaction that has an
370
// overlapping user-key with another file.
371
// REQUIRES: input_level and output_level are different
372
// REQUIRES: inputs->empty() == false
373
// Returns false if files on parent level are currently in compaction, which
374
// means that we can't compact them
375
bool CompactionPicker::SetupOtherInputs(
376
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
377
    VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
378
    CompactionInputFiles* output_level_inputs, int* parent_index,
379
18.3k
    int base_index) {
380
18.3k
  assert(!inputs->empty());
381
0
  assert(output_level_inputs->empty());
382
0
  const int input_level = inputs->level;
383
18.3k
  const int output_level = output_level_inputs->level;
384
18.3k
  assert(input_level != output_level);
385
386
  // For now, we only support merging two levels, start level and output level.
387
  // We need to assert other levels are empty.
388
20.0k
  for (int l = input_level + 1; l < output_level; 
l++1.74k
) {
389
1.74k
    assert(vstorage->NumLevelFiles(l) == 0);
390
1.74k
  }
391
392
18.3k
  InternalKey smallest, largest;
393
394
  // Get the range one last time.
395
18.3k
  GetRange(*inputs, &smallest, &largest);
396
397
  // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
398
  // include in compaction
399
18.3k
  vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
400
18.3k
                                 &output_level_inputs->files, *parent_index,
401
18.3k
                                 parent_index);
402
403
18.3k
  if (FilesInCompaction(output_level_inputs->files)) {
404
1
    return false;
405
1
  }
406
407
  // See if we can further grow the number of inputs in "level" without
408
  // changing the number of "level+1" files we pick up. We also choose NOT
409
  // to expand if this would cause "level" to include some entries for some
410
  // user key, while excluding other entries for the same user key. This
411
  // can happen when one user key spans multiple files.
412
18.3k
  if (!output_level_inputs->empty()) {
413
5.44k
    CompactionInputFiles expanded0;
414
5.44k
    expanded0.level = input_level;
415
    // Get entire range covered by compaction
416
5.44k
    InternalKey all_start, all_limit;
417
5.44k
    GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
418
419
5.44k
    vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
420
5.44k
                                   &expanded0.files, base_index, nullptr);
421
5.44k
    const uint64_t inputs0_size = TotalCompensatedFileSize(inputs->files);
422
5.44k
    const uint64_t inputs1_size =
423
5.44k
        TotalCompensatedFileSize(output_level_inputs->files);
424
5.44k
    const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0.files);
425
5.44k
    uint64_t limit =
426
5.44k
        mutable_cf_options.ExpandedCompactionByteSizeLimit(input_level);
427
5.44k
    if (expanded0.size() > inputs->size() &&
428
5.44k
        
inputs1_size + expanded0_size < limit1.66k
&&
429
5.44k
        
!FilesInCompaction(expanded0.files)1.65k
&&
430
5.44k
        
!vstorage->HasOverlappingUserKey(&expanded0.files, input_level)1.65k
) {
431
1.65k
      InternalKey new_start, new_limit;
432
1.65k
      GetRange(expanded0, &new_start, &new_limit);
433
1.65k
      std::vector<FileMetaData*> expanded1;
434
1.65k
      vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
435
1.65k
                                     &expanded1, *parent_index, parent_index);
436
1.65k
      if (expanded1.size() == output_level_inputs->size() &&
437
1.65k
          
!FilesInCompaction(expanded1)965
) {
438
965
        RLOG(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
439
965
            "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "(%" PRIu64
440
965
            "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
441
965
            " (%" PRIu64 "+%" PRIu64 "bytes)\n",
442
965
            cf_name.c_str(), input_level, inputs->size(),
443
965
            output_level_inputs->size(), inputs0_size, inputs1_size,
444
965
            expanded0.size(), expanded1.size(), expanded0_size, inputs1_size);
445
965
        smallest = new_start;
446
965
        largest = new_limit;
447
965
        inputs->files = expanded0.files;
448
965
        output_level_inputs->files = expanded1;
449
965
      }
450
1.65k
    }
451
5.44k
  }
452
453
18.3k
  return true;
454
18.3k
}
455
456
void CompactionPicker::GetGrandparents(
457
    VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
458
    const CompactionInputFiles& output_level_inputs,
459
18.8k
    std::vector<FileMetaData*>* grandparents) {
460
18.8k
  InternalKey start, limit;
461
18.8k
  GetRange(inputs, output_level_inputs, &start, &limit);
462
  // Compute the set of grandparent files that overlap this compaction
463
  // (parent == level+1; grandparent == level+2)
464
18.8k
  if (output_level_inputs.level + 1 < NumberLevels()) {
465
16.8k
    vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start,
466
16.8k
                                   &limit, grandparents);
467
16.8k
  }
468
18.8k
}
469
470
void CompactionPicker::MarkL0FilesForDeletion(
471
    const VersionStorageInfo* vstorage,
472
17.9k
    const ImmutableCFOptions* ioptions) {
473
  // CompactionFileFilterFactory is used to determine files that can be directly removed during
474
  // compaction rather than requiring a full iteration through the files.
475
17.9k
  if (!ioptions->compaction_file_filter_factory) {
476
17.9k
    return;
477
17.9k
  }
478
  // Compaction file filter factory should only look at files in Level 0.
479
15
  auto file_filter = ioptions->compaction_file_filter_factory->CreateCompactionFileFilter(
480
15
      vstorage->LevelFiles(0));
481
221
  for (FileMetaData* f : vstorage->LevelFiles(0)) {
482
221
    if (file_filter && file_filter->Filter(f) == FilterDecision::kDiscard) {
483
55
      f->set_delete_after_compaction(true);
484
55
    }
485
221
  }
486
15
}
487
488
std::unique_ptr<Compaction> CompactionPicker::CompactRange(
489
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
490
    VersionStorageInfo* vstorage, int input_level, int output_level,
491
    uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
492
6.26k
    InternalKey** compaction_end, bool* manual_conflict) {
493
  // CompactionPickerFIFO has its own implementation of compact range
494
6.26k
  assert(ioptions_.compaction_style != kCompactionStyleFIFO);
495
496
6.26k
  if (input_level == ColumnFamilyData::kCompactAllLevels) {
497
88
    assert(ioptions_.compaction_style == kCompactionStyleUniversal);
498
499
    // Universal compaction with more than one level always compacts all the
500
    // files together to the last level.
501
0
    assert(vstorage->num_levels() > 1);
502
    // DBImpl::CompactRange() set output level to be the last level
503
0
    assert(output_level == vstorage->num_levels() - 1);
504
    // DBImpl::RunManualCompaction will make full range for universal compaction
505
0
    assert(begin == nullptr);
506
0
    assert(end == nullptr);
507
0
    *compaction_end = nullptr;
508
509
88
    int start_level = 0;
510
172
    for (; start_level < vstorage->num_levels() &&
511
172
           vstorage->NumLevelFiles(start_level) == 0;
512
88
         
start_level++84
) {
513
84
    }
514
88
    if (start_level == vstorage->num_levels()) {
515
0
      return nullptr;
516
0
    }
517
518
88
    if ((start_level == 0) && 
(!level0_compactions_in_progress_.empty())76
) {
519
0
      *manual_conflict = true;
520
      // Only one level 0 compaction allowed
521
0
      return nullptr;
522
0
    }
523
524
88
    std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
525
88
                                             start_level);
526
695
    for (int level = start_level; level < vstorage->num_levels(); 
level++607
) {
527
607
      inputs[level - start_level].level = level;
528
607
      auto& files = inputs[level - start_level].files;
529
530
607
      for (FileMetaData* f : vstorage->LevelFiles(level)) {
531
205
        files.push_back(f);
532
205
      }
533
607
      if (FilesInCompaction(files)) {
534
0
        *manual_conflict = true;
535
0
        return nullptr;
536
0
      }
537
607
    }
538
88
    auto c = Compaction::Create(
539
88
        vstorage, mutable_cf_options, std::move(inputs), output_level,
540
88
        mutable_cf_options.MaxFileSizeForLevel(output_level),
541
        /* max_grandparent_overlap_bytes = */ LLONG_MAX, output_path_id,
542
88
        GetCompressionType(ioptions_, output_level, 1),
543
88
        /* grandparents = */ std::vector<FileMetaData*>(), ioptions_.info_log,
544
88
        /* is_manual = */ true);
545
88
    if (c && start_level == 0) {
546
76
      MarkL0FilesForDeletion(vstorage, &ioptions_);
547
76
      level0_compactions_in_progress_.insert(c.get());
548
76
    }
549
88
    return c;
550
88
  }
551
552
6.17k
  CompactionInputFiles inputs;
553
6.17k
  inputs.level = input_level;
554
6.17k
  bool covering_the_whole_range = true;
555
556
  // All files are 'overlapping' in universal style compaction.
557
  // We have to compact the entire range in one shot.
558
6.17k
  if (ioptions_.compaction_style == kCompactionStyleUniversal) {
559
756
    begin = nullptr;
560
756
    end = nullptr;
561
756
  }
562
563
6.17k
  vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
564
6.17k
  if (inputs.empty()) {
565
1.05k
    return nullptr;
566
1.05k
  }
567
568
5.12k
  if ((input_level == 0) && 
(!level0_compactions_in_progress_.empty())2.68k
) {
569
    // Only one level 0 compaction allowed
570
2
    TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
571
2
    *manual_conflict = true;
572
2
    return nullptr;
573
2
  }
574
575
  // Avoid compacting too much in one shot in case the range is large.
576
  // But we cannot do this for level-0 since level-0 files can overlap
577
  // and we must not pick one file and drop another older file if the
578
  // two files overlap.
579
5.12k
  if (input_level > 0) {
580
2.43k
    const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) *
581
2.43k
      mutable_cf_options.source_compaction_factor;
582
2.43k
    uint64_t total = 0;
583
2.48k
    for (size_t i = 0; i + 1 < inputs.size(); 
++i44
) {
584
236
      uint64_t s = inputs[i]->compensated_file_size;
585
236
      total += s;
586
236
      if (total >= limit) {
587
192
        **compaction_end = inputs[i + 1]->smallest.key;
588
192
        covering_the_whole_range = false;
589
192
        inputs.files.resize(i + 1);
590
192
        break;
591
192
      }
592
236
    }
593
2.43k
  }
594
5.12k
  assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
595
596
5.12k
  if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) {
597
    // manual compaction is now multi-threaded, so it can
598
    // happen that ExpandWhileOverlapping fails
599
    // we handle it higher in RunManualCompaction
600
0
    *manual_conflict = true;
601
0
    return nullptr;
602
0
  }
603
604
5.12k
  if (covering_the_whole_range) {
605
4.92k
    *compaction_end = nullptr;
606
4.92k
  }
607
608
5.12k
  CompactionInputFiles output_level_inputs;
609
5.12k
  if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
610
4
    assert(input_level == 0);
611
0
    output_level = vstorage->base_level();
612
4
    assert(output_level > 0);
613
4
  }
614
0
  output_level_inputs.level = output_level;
615
5.12k
  if (input_level != output_level) {
616
4.54k
    int parent_index = -1;
617
4.54k
    if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
618
4.54k
                          &output_level_inputs, &parent_index, -1)) {
619
      // manual compaction is now multi-threaded, so it can
620
      // happen that SetupOtherInputs fails
621
      // we handle it higher in RunManualCompaction
622
0
      *manual_conflict = true;
623
0
      return nullptr;
624
0
    }
625
4.54k
  }
626
627
5.12k
  std::vector<CompactionInputFiles> compaction_inputs({inputs});
628
5.12k
  if (!output_level_inputs.empty()) {
629
1.57k
    compaction_inputs.push_back(output_level_inputs);
630
1.57k
  }
631
11.8k
  for (size_t i = 0; i < compaction_inputs.size(); 
i++6.69k
) {
632
6.69k
    if (FilesInCompaction(compaction_inputs[i].files)) {
633
0
      *manual_conflict = true;
634
0
      return nullptr;
635
0
    }
636
6.69k
  }
637
638
5.12k
  std::vector<FileMetaData*> grandparents;
639
5.12k
  GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
640
5.12k
  auto compaction = Compaction::Create(
641
5.12k
      vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
642
5.12k
      mutable_cf_options.MaxFileSizeForLevel(output_level),
643
5.12k
      mutable_cf_options.MaxGrandParentOverlapBytes(input_level), output_path_id,
644
5.12k
      GetCompressionType(ioptions_, output_level, vstorage->base_level()), std::move(grandparents),
645
5.12k
      ioptions_.info_log, /* is manual compaction = */ true);
646
5.12k
  if (!compaction) {
647
1
    return nullptr;
648
1
  }
649
650
5.11k
  TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction.get());
651
5.11k
  if (input_level == 0) {
652
2.68k
    MarkL0FilesForDeletion(vstorage, &ioptions_);
653
2.68k
    level0_compactions_in_progress_.insert(compaction.get());
654
2.68k
  }
655
656
  // Creating a compaction influences the compaction score because the score
657
  // takes running compactions into account (by skipping files that are already
658
  // being compacted). Since we just changed compaction score, we recalculate it
659
  // here
660
5.11k
  {  // this piece of code recomputes compaction score
661
5.11k
    CompactionOptionsFIFO dummy_compaction_options_fifo;
662
5.11k
    vstorage->ComputeCompactionScore(mutable_cf_options,
663
5.11k
                                     dummy_compaction_options_fifo);
664
5.11k
  }
665
666
5.11k
  return compaction;
667
5.12k
}
668
669
// Test whether two files have overlapping key-ranges.
670
bool HaveOverlappingKeyRanges(const Comparator* c,
671
                              const SstFileMetaData& a,
672
0
                              const SstFileMetaData& b) {
673
0
  return c->Compare(a.largest.key, b.smallest.key) >= 0 &&
674
0
         c->Compare(b.largest.key, a.smallest.key) >= 0;
675
0
}
676
677
#ifndef ROCKSDB_LITE
678
namespace {
679
680
// Updates smallest/largest keys using keys from specified file.
681
void UpdateBoundaryKeys(const Comparator* comparator,
682
                        const SstFileMetaData& file,
683
                        SstFileMetaData::BoundaryValues* smallest,
684
107
                        SstFileMetaData::BoundaryValues* largest) {
685
107
  if (smallest != nullptr && comparator->Compare(smallest->key, file.smallest.key) > 0) {
686
30
    smallest->key = file.smallest.key;
687
30
  }
688
107
  if (largest != nullptr && comparator->Compare(largest->key, file.largest.key) < 0) {
689
33
    largest->key = file.largest.key;
690
33
  }
691
107
}
692
693
}  // namespace
694
695
Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
696
      std::unordered_set<uint64_t>* input_files,
697
      const ColumnFamilyMetaData& cf_meta,
698
39
      const int output_level) const {
699
39
  auto& levels = cf_meta.levels;
700
39
  auto comparator = icmp_->user_comparator();
701
702
  // TODO(yhchiang): If there is any input files of L1 or up and there
703
  // is at least one L0 files. All L0 files older than the L0 file needs
704
  // to be included. Otherwise, it is a false conditoin
705
706
  // TODO(yhchiang): add is_adjustable to CompactionOptions
707
708
  // the smallest and largest key of the current compaction input
709
39
  SstFileMetaData::BoundaryValues smallest, largest;
710
  // a flag for initializing smallest and largest key
711
39
  bool is_first = true;
712
39
  const int kNotFound = -1;
713
714
  // For each level, it does the following things:
715
  // 1. Find the first and the last compaction input files
716
  //    in the current level.
717
  // 2. Include all files between the first and the last
718
  //    compaction input files.
719
  // 3. Update the compaction key-range.
720
  // 4. For all remaining levels, include files that have
721
  //    overlapping key-range with the compaction key-range.
722
81
  for (int l = 0; l <= output_level; 
++l42
) {
723
42
    auto& current_files = levels[l].files;
724
42
    int first_included = static_cast<int>(current_files.size());
725
42
    int last_included = kNotFound;
726
727
    // identify the first and the last compaction input files
728
    // in the current level.
729
203
    for (size_t f = 0; f < current_files.size(); 
++f161
) {
730
161
      if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
731
161
          input_files->end()) {
732
103
        first_included = std::min(first_included, static_cast<int>(f));
733
103
        last_included = std::max(last_included, static_cast<int>(f));
734
103
        if (is_first) {
735
39
          smallest = current_files[f].smallest;
736
39
          largest = current_files[f].largest;
737
39
          is_first = false;
738
39
        }
739
103
      }
740
161
    }
741
42
    if (last_included == kNotFound) {
742
3
      continue;
743
3
    }
744
745
39
    if (l != 0) {
746
      // expend the compaction input of the current level if it
747
      // has overlapping key-range with other non-compaction input
748
      // files in the same level.
749
0
      while (first_included > 0) {
750
0
        if (comparator->Compare(
751
0
                current_files[first_included - 1].largest.key,
752
0
                current_files[first_included].smallest.key) < 0) {
753
0
          break;
754
0
        }
755
0
        first_included--;
756
0
      }
757
758
0
      while (last_included < static_cast<int>(current_files.size()) - 1) {
759
0
        if (comparator->Compare(
760
0
                current_files[last_included + 1].smallest.key,
761
0
                current_files[last_included].largest.key) > 0) {
762
0
          break;
763
0
        }
764
0
        last_included++;
765
0
      }
766
0
    }
767
768
    // include all files between the first and the last compaction input files.
769
146
    for (int f = first_included; f <= last_included; 
++f107
) {
770
107
      if (current_files[f].being_compacted) {
771
0
        return STATUS(Aborted,
772
0
            "Necessary compaction input file " + current_files[f].name +
773
0
            " is currently being compacted.");
774
0
      }
775
107
      input_files->insert(
776
107
          TableFileNameToNumber(current_files[f].name));
777
107
    }
778
779
    // update smallest and largest key
780
39
    if (l == 0) {
781
146
      for (int f = first_included; f <= last_included; 
++f107
) {
782
107
        UpdateBoundaryKeys(comparator, current_files[f], &smallest, &largest);
783
107
      }
784
39
    } else {
785
0
      UpdateBoundaryKeys(comparator, current_files[first_included], &smallest, nullptr);
786
0
      UpdateBoundaryKeys(comparator, current_files[last_included], nullptr, &largest);
787
0
    }
788
789
39
    SstFileMetaData aggregated_file_meta;
790
39
    aggregated_file_meta.smallest = smallest;
791
39
    aggregated_file_meta.largest = largest;
792
793
    // For all lower levels, include all overlapping files.
794
    // We need to add overlapping files from the current level too because even
795
    // if there no input_files in level l, we would still need to add files
796
    // which overlap with the range containing the input_files in levels 0 to l
797
    // Level 0 doesn't need to be handled this way because files are sorted by
798
    // time and not by key
799
42
    for (int m = std::max(l, 1); m <= output_level; 
++m3
) {
800
3
      for (auto& next_lv_file : levels[m].files) {
801
0
        if (HaveOverlappingKeyRanges(
802
0
            comparator, aggregated_file_meta, next_lv_file)) {
803
0
          if (next_lv_file.being_compacted) {
804
0
            return STATUS(Aborted,
805
0
                "File " + next_lv_file.name +
806
0
                " that has overlapping key range with one of the compaction "
807
0
                " input file is currently being compacted.");
808
0
          }
809
0
          input_files->insert(
810
0
              TableFileNameToNumber(next_lv_file.name));
811
0
        }
812
0
      }
813
3
    }
814
39
  }
815
39
  return Status::OK();
816
39
}
817
818
Status CompactionPicker::SanitizeCompactionInputFiles(
819
    std::unordered_set<uint64_t>* input_files,
820
    const ColumnFamilyMetaData& cf_meta,
821
45
    const int output_level) const {
822
45
  assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
823
45
         cf_meta.levels[cf_meta.levels.size() - 1].level);
824
45
  if (output_level >= static_cast<int>(cf_meta.levels.size())) {
825
6
    return STATUS(InvalidArgument,
826
6
        "Output level for column family " + cf_meta.name +
827
6
        " must between [0, " +
828
6
        ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) +
829
6
        "].");
830
6
  }
831
832
39
  if (output_level > MaxOutputLevel()) {
833
0
    return STATUS(InvalidArgument,
834
0
        "Exceed the maximum output level defined by "
835
0
        "the current compaction algorithm --- " +
836
0
            ToString(MaxOutputLevel()));
837
0
  }
838
839
39
  if (output_level < 0) {
840
0
    return STATUS(InvalidArgument,
841
0
        "Output level cannot be negative.");
842
0
  }
843
844
39
  if (input_files->size() == 0) {
845
0
    return STATUS(InvalidArgument,
846
0
        "A compaction must contain at least one file.");
847
0
  }
848
849
39
  Status s = SanitizeCompactionInputFilesForAllLevels(
850
39
      input_files, cf_meta, output_level);
851
852
39
  if (!s.ok()) {
853
0
    return s;
854
0
  }
855
856
  // for all input files, check whether the file number matches
857
  // any currently-existing files.
858
107
  
for (auto file_num : *input_files)39
{
859
107
    bool found = false;
860
107
    for (auto level_meta : cf_meta.levels) {
861
332
      for (auto file_meta : level_meta.files) {
862
332
        if (file_num == TableFileNameToNumber(file_meta.name)) {
863
107
          if (file_meta.being_compacted) {
864
0
            return STATUS(Aborted,
865
0
                "Specified compaction input file " +
866
0
                MakeTableFileName("", file_num) +
867
0
                " is already being compacted.");
868
0
          }
869
107
          found = true;
870
107
          break;
871
107
        }
872
332
      }
873
107
      if (found) {
874
107
        break;
875
107
      }
876
107
    }
877
107
    if (!found) {
878
0
      return STATUS(InvalidArgument,
879
0
          "Specified compaction input file " +
880
0
          MakeTableFileName("", file_num) +
881
0
          " does not exist in column family " + cf_meta.name + ".");
882
0
    }
883
107
  }
884
885
39
  return Status::OK();
886
39
}
887
#endif  // !ROCKSDB_LITE
888
889
bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
890
87.4k
    const {
891
87.4k
  if (!vstorage->FilesMarkedForCompaction().empty()) {
892
30
    return true;
893
30
  }
894
483k
  
for (int i = 0; 87.3k
i <= vstorage->MaxInputLevel();
i++395k
) {
895
416k
    if (vstorage->CompactionScore(i) >= 1) {
896
20.8k
      return true;
897
20.8k
    }
898
416k
  }
899
66.4k
  return false;
900
87.3k
}
901
902
void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental(
903
    const std::string& cf_name, VersionStorageInfo* vstorage,
904
5.74k
    CompactionInputFiles* inputs, int* level, int* output_level) {
905
5.74k
  if (vstorage->FilesMarkedForCompaction().empty()) {
906
5.71k
    return;
907
5.71k
  }
908
909
40
  
auto continuation = [&](std::pair<int, FileMetaData*> level_file) 26
{
910
    // If it's being compacted it has nothing to do here.
911
    // If this assert() fails that means that some function marked some
912
    // files as being_compacted, but didn't call ComputeCompactionScore()
913
40
    assert(!level_file.second->being_compacted);
914
0
    *level = level_file.first;
915
40
    *output_level = (*level == 0) ? 
vstorage->base_level()35
:
*level + 15
;
916
917
40
    if (*level == 0 && 
!level0_compactions_in_progress_.empty()35
) {
918
28
      return false;
919
28
    }
920
921
12
    inputs->files = {level_file.second};
922
12
    inputs->level = *level;
923
12
    return ExpandWhileOverlapping(cf_name, vstorage, inputs);
924
40
  };
925
926
  // take a chance on a random file first
927
26
  Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
928
26
  size_t random_file_index = static_cast<size_t>(rnd.Uniform(
929
26
      static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
930
931
26
  if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
932
    // found the compaction!
933
12
    return;
934
12
  }
935
936
14
  for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
937
14
    if (continuation(level_file)) {
938
      // found the compaction!
939
0
      return;
940
0
    }
941
14
  }
942
14
  inputs->files.clear();
943
14
}
944
945
std::unique_ptr<Compaction> LevelCompactionPicker::PickCompaction(
946
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
947
19.5k
    VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
948
19.5k
  int level = -1;
949
19.5k
  int output_level = -1;
950
19.5k
  int parent_index = -1;
951
19.5k
  int base_index = -1;
952
19.5k
  CompactionInputFiles inputs;
953
19.5k
  double score = 0;
954
19.5k
  CompactionReason compaction_reason = CompactionReason::kUnknown;
955
956
  // Find the compactions by size on all levels.
957
19.5k
  bool skipped_l0 = false;
958
53.3k
  for (int i = 0; i < NumberLevels() - 1; 
i++33.8k
) {
959
47.6k
    score = vstorage->CompactionScore(i);
960
47.6k
    level = vstorage->CompactionScoreLevel(i);
961
47.6k
    assert(i == 0 || score <= vstorage->CompactionScore(i - 1));
962
47.6k
    if (score >= 1) {
963
21.4k
      if (skipped_l0 && 
level == vstorage->base_level()1.99k
) {
964
        // If L0->base_level compaction is pending, don't schedule further
965
        // compaction from base level. Otherwise L0->base_level compaction
966
        // may starve.
967
1.99k
        continue;
968
1.99k
      }
969
19.4k
      output_level = (level == 0) ? 
vstorage->base_level()11.5k
:
level + 17.99k
;
970
19.4k
      if (PickCompactionBySize(vstorage, level, output_level, &inputs,
971
19.4k
                               &parent_index, &base_index) &&
972
19.4k
          
ExpandWhileOverlapping(cf_name, vstorage, &inputs)13.7k
) {
973
        // found the compaction!
974
13.7k
        if (level == 0) {
975
          // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
976
5.77k
          compaction_reason = CompactionReason::kLevelL0FilesNum;
977
7.98k
        } else {
978
          // L1+ score = `Level files size` / `MaxBytesForLevel`
979
7.98k
          compaction_reason = CompactionReason::kLevelMaxLevelSize;
980
7.98k
        }
981
13.7k
        break;
982
13.7k
      } else {
983
        // didn't find the compaction, clear the inputs
984
5.72k
        inputs.clear();
985
5.72k
        if (level == 0) {
986
5.72k
          skipped_l0 = true;
987
5.72k
        }
988
5.72k
      }
989
19.4k
    }
990
47.6k
  }
991
992
19.5k
  bool is_manual = false;
993
  // if we didn't find a compaction, check if there are any files marked for
994
  // compaction
995
19.5k
  if (inputs.empty()) {
996
5.74k
    is_manual = true;
997
5.74k
    parent_index = base_index = -1;
998
5.74k
    PickFilesMarkedForCompactionExperimental(cf_name, vstorage, &inputs, &level,
999
5.74k
                                             &output_level);
1000
5.74k
    if (!inputs.empty()) {
1001
12
      compaction_reason = CompactionReason::kFilesMarkedForCompaction;
1002
12
    }
1003
5.74k
  }
1004
19.5k
  if (inputs.empty()) {
1005
5.73k
    return nullptr;
1006
5.73k
  }
1007
13.7k
  assert(level >= 0 && output_level >= 0);
1008
1009
  // Two level 0 compaction won't run at the same time, so don't need to worry
1010
  // about files on level 0 being compacted.
1011
13.7k
  if (level == 0) {
1012
5.78k
    assert(level0_compactions_in_progress_.empty());
1013
0
    InternalKey smallest, largest;
1014
5.78k
    GetRange(inputs, &smallest, &largest);
1015
    // Note that the next call will discard the file we placed in
1016
    // c->inputs_[0] earlier and replace it with an overlapping set
1017
    // which will include the picked file.
1018
5.78k
    inputs.files.clear();
1019
5.78k
    vstorage->GetOverlappingInputs(0, &smallest, &largest, &inputs.files);
1020
1021
    // If we include more L0 files in the same compaction run it can
1022
    // cause the 'smallest' and 'largest' key to get extended to a
1023
    // larger range. So, re-invoke GetRange to get the new key range
1024
5.78k
    GetRange(inputs, &smallest, &largest);
1025
5.78k
    if (RangeInCompaction(vstorage, &smallest, &largest, output_level,
1026
5.78k
                          &parent_index)) {
1027
18
      return nullptr;
1028
18
    }
1029
5.76k
    assert(!inputs.files.empty());
1030
5.76k
  }
1031
1032
  // Setup input files from output level
1033
13.7k
  CompactionInputFiles output_level_inputs;
1034
13.7k
  output_level_inputs.level = output_level;
1035
13.7k
  if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
1036
13.7k
                   &output_level_inputs, &parent_index, base_index)) {
1037
1
    return nullptr;
1038
1
  }
1039
1040
13.7k
  std::vector<CompactionInputFiles> compaction_inputs({inputs});
1041
13.7k
  if (!output_level_inputs.empty()) {
1042
3.86k
    compaction_inputs.push_back(output_level_inputs);
1043
3.86k
  }
1044
1045
13.7k
  std::vector<FileMetaData*> grandparents;
1046
13.7k
  GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
1047
13.7k
  auto c = Compaction::Create(
1048
13.7k
      vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
1049
13.7k
      mutable_cf_options.MaxFileSizeForLevel(output_level),
1050
13.7k
      mutable_cf_options.MaxGrandParentOverlapBytes(level),
1051
13.7k
      GetPathId(ioptions_, mutable_cf_options, output_level),
1052
13.7k
      GetCompressionType(ioptions_, output_level, vstorage->base_level()), std::move(grandparents),
1053
13.7k
      ioptions_.info_log, is_manual, score,
1054
13.7k
      /* deletion_compaction = */ false, compaction_reason);
1055
13.7k
  if (!c) {
1056
0
    return nullptr;
1057
0
  }
1058
1059
  // If it's level 0 compaction, make sure we don't execute any other level 0
1060
  // compactions in parallel
1061
13.7k
  if (level == 0) {
1062
5.76k
    level0_compactions_in_progress_.insert(c.get());
1063
5.76k
  }
1064
1065
  // Creating a compaction influences the compaction score because the score
1066
  // takes running compactions into account (by skipping files that are already
1067
  // being compacted). Since we just changed compaction score, we recalculate it
1068
  // here
1069
13.7k
  {  // this piece of code recomputes compaction score
1070
13.7k
    CompactionOptionsFIFO dummy_compaction_options_fifo;
1071
13.7k
    vstorage->ComputeCompactionScore(mutable_cf_options,
1072
13.7k
                                     dummy_compaction_options_fifo);
1073
13.7k
  }
1074
1075
13.7k
  TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c.get());
1076
1077
13.7k
  return c;
1078
13.7k
}
1079
1080
/*
1081
 * Find the optimal path to place a file
1082
 * Given a level, finds the path where levels up to it will fit in levels
1083
 * up to and including this path
1084
 */
1085
uint32_t LevelCompactionPicker::GetPathId(
1086
    const ImmutableCFOptions& ioptions,
1087
13.7k
    const MutableCFOptions& mutable_cf_options, int level) {
1088
13.7k
  uint32_t p = 0;
1089
13.7k
  assert(!ioptions.db_paths.empty());
1090
1091
  // size remaining in the most recent path
1092
0
  uint64_t current_path_size = ioptions.db_paths[0].target_size;
1093
1094
13.7k
  uint64_t level_size;
1095
13.7k
  int cur_level = 0;
1096
1097
13.7k
  level_size = mutable_cf_options.max_bytes_for_level_base;
1098
1099
  // Last path is the fallback
1100
14.0k
  while (p < ioptions.db_paths.size() - 1) {
1101
384
    if (level_size <= current_path_size) {
1102
232
      if (cur_level == level) {
1103
        // Does desired level fit in this path?
1104
80
        return p;
1105
152
      } else {
1106
152
        current_path_size -= level_size;
1107
152
        level_size *= mutable_cf_options.max_bytes_for_level_multiplier;
1108
152
        cur_level++;
1109
152
        continue;
1110
152
      }
1111
232
    }
1112
152
    p++;
1113
152
    current_path_size = ioptions.db_paths[p].target_size;
1114
152
  }
1115
13.6k
  return p;
1116
13.7k
}
1117
1118
bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
1119
                                                 int level, int output_level,
1120
                                                 CompactionInputFiles* inputs,
1121
                                                 int* parent_index,
1122
19.4k
                                                 int* base_index) {
1123
  // level 0 files are overlapping. So we cannot pick more
1124
  // than one concurrent compactions at this level. This
1125
  // could be made better by looking at key-ranges that are
1126
  // being compacted at level 0.
1127
19.4k
  if (level == 0 && 
!level0_compactions_in_progress_.empty()11.5k
) {
1128
5.52k
    TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
1129
5.52k
    return false;
1130
5.52k
  }
1131
1132
13.9k
  inputs->clear();
1133
1134
13.9k
  assert(level >= 0);
1135
1136
  // Pick the largest file in this level that is not already
1137
  // being compacted
1138
0
  const std::vector<int>& file_size = vstorage->FilesByCompactionPri(level);
1139
13.9k
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(level);
1140
1141
  // record the first file that is not yet compacted
1142
13.9k
  int nextIndex = -1;
1143
1144
13.9k
  for (unsigned int i = vstorage->NextCompactionIndex(level);
1145
15.6k
       i < file_size.size(); 
i++1.66k
) {
1146
15.4k
    int index = file_size[i];
1147
15.4k
    auto* f = level_files[index];
1148
1149
    // do not pick a file to compact if it is being compacted
1150
    // from n-1 level.
1151
15.4k
    if (f->being_compacted) {
1152
651
      continue;
1153
651
    }
1154
1155
    // remember the startIndex for the next call to PickCompaction
1156
14.7k
    if (nextIndex == -1) {
1157
13.9k
      nextIndex = i;
1158
13.9k
    }
1159
1160
    // Do not pick this file if its parents at level+1 are being compacted.
1161
    // Maybe we can avoid redoing this work in SetupOtherInputs
1162
14.7k
    *parent_index = -1;
1163
14.7k
    if (RangeInCompaction(vstorage, &f->smallest.key, &f->largest.key, output_level,
1164
14.7k
                          parent_index)) {
1165
1.01k
      continue;
1166
1.01k
    }
1167
13.7k
    inputs->files.push_back(f);
1168
13.7k
    inputs->level = level;
1169
13.7k
    *base_index = index;
1170
13.7k
    break;
1171
14.7k
  }
1172
1173
  // store where to start the iteration in the next call to PickCompaction
1174
13.9k
  vstorage->SetNextCompactionIndex(level, nextIndex);
1175
1176
13.9k
  return inputs->size() > 0;
1177
19.4k
}
1178
1179
#ifndef ROCKSDB_LITE
1180
bool UniversalCompactionPicker::NeedsCompaction(
1181
919k
    const VersionStorageInfo* vstorage) const {
1182
919k
  const int kLevel0 = 0;
1183
919k
  return vstorage->CompactionScore(kLevel0) >= 1;
1184
919k
}
1185
1186
struct UniversalCompactionPicker::SortedRun {
1187
  SortedRun(int _level, FileMetaData* _file, uint64_t _size,
1188
            uint64_t _compensated_file_size, bool _being_compacted)
1189
      : level(_level),
1190
        file(_file),
1191
        size(_size),
1192
        compensated_file_size(_compensated_file_size),
1193
73.2k
        being_compacted(_being_compacted) {
1194
73.2k
    assert(compensated_file_size > 0);
1195
    // Allowed either one of level and file.
1196
0
    assert((level != 0) != (file != nullptr));
1197
73.2k
  }
1198
1199
  void Dump(char* out_buf, size_t out_buf_size,
1200
            bool print_path = false) const;
1201
1202
  // sorted_run_count is added into the string to print
1203
  void DumpSizeInfo(char* out_buf, size_t out_buf_size,
1204
                    size_t sorted_run_count) const;
1205
1206
25.8k
  std::string FileNumAsString() const {
1207
25.8k
    char file_num_buf[kFormatFileNumberBufSize];
1208
25.8k
    Dump(file_num_buf, sizeof(file_num_buf), true);
1209
25.8k
    return file_num_buf;
1210
25.8k
  }
1211
1212
207
  bool delete_after_compaction() const {
1213
207
    return file ? file->delete_after_compaction() : 
false0
;
1214
207
  }
1215
1216
  int level;
1217
  // `file` Will be null for level > 0. For level = 0, the sorted run is
1218
  // for this file.
1219
  FileMetaData* file;
1220
  // For level > 0, `size` and `compensated_file_size` are sum of sizes all
1221
  // files in the level. `being_compacted` should be the same for all files
1222
  // in a non-zero level. Use the value here.
1223
  uint64_t size;
1224
  uint64_t compensated_file_size;
1225
  bool being_compacted;
1226
};
1227
1228
void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
1229
                                                size_t out_buf_size,
1230
86.4k
                                                bool print_path) const {
1231
86.4k
  if (level == 0) {
1232
70.9k
    assert(file != nullptr);
1233
70.9k
    if (file->fd.GetPathId() == 0 || 
!print_path510
) {
1234
70.4k
      snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
1235
70.4k
    } else {
1236
480
      snprintf(out_buf, out_buf_size, "file %" PRIu64
1237
480
                                      "(path "
1238
480
                                      "%" PRIu32 ")",
1239
480
               file->fd.GetNumber(), file->fd.GetPathId());
1240
480
    }
1241
70.9k
  } else {
1242
15.4k
    snprintf(out_buf, out_buf_size, "level %d", level);
1243
15.4k
  }
1244
86.4k
}
1245
1246
void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
1247
35.1k
    char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
1248
35.1k
  if (level == 0) {
1249
25.1k
    assert(file != nullptr);
1250
0
    snprintf(out_buf, out_buf_size,
1251
25.1k
             "file %" PRIu64 "[%" ROCKSDB_PRIszt
1252
25.1k
             "] "
1253
25.1k
             "with size %" PRIu64 " (compensated size %" PRIu64 ")",
1254
25.1k
             file->fd.GetNumber(), sorted_run_count, file->fd.GetTotalFileSize(),
1255
25.1k
             file->compensated_file_size);
1256
25.1k
  } else {
1257
10.0k
    snprintf(out_buf, out_buf_size,
1258
10.0k
             "level %d[%" ROCKSDB_PRIszt
1259
10.0k
             "] "
1260
10.0k
             "with size %" PRIu64 " (compensated size %" PRIu64 ")",
1261
10.0k
             level, sorted_run_count, size, compensated_file_size);
1262
10.0k
  }
1263
35.1k
}
1264
1265
std::vector<std::vector<UniversalCompactionPicker::SortedRun>>
1266
    UniversalCompactionPicker::CalculateSortedRuns(const VersionStorageInfo& vstorage,
1267
                                                   const ImmutableCFOptions& ioptions,
1268
15.1k
                                                   uint64_t max_file_size) {
1269
15.1k
  std::vector<std::vector<SortedRun>> ret(1);
1270
15.1k
  MarkL0FilesForDeletion(&vstorage, &ioptions);
1271
1272
56.2k
  for (FileMetaData* f : vstorage.LevelFiles(0)) {
1273
    // Any files that can be directly removed during compaction can be included, even if they
1274
    // exceed the "max file size for compaction."
1275
56.2k
    if (f->fd.GetTotalFileSize() <= max_file_size || 
f->delete_after_compaction()88
) {
1276
56.1k
      ret.back().emplace_back(0, f, f->fd.GetTotalFileSize(), f->compensated_file_size,
1277
56.1k
          f->being_compacted);
1278
    // If last sequence is empty it means that there are multiple too-large-to-compact files in
1279
    // a row. So we just don't start new sequence in this case.
1280
56.1k
    } else 
if (77
!ret.back().empty()77
) {
1281
77
      ret.emplace_back();
1282
77
    }
1283
56.2k
  }
1284
1285
65.8k
  for (int level = 1; level < vstorage.num_levels(); 
level++50.6k
) {
1286
50.6k
    uint64_t total_compensated_size = 0U;
1287
50.6k
    uint64_t total_size = 0U;
1288
50.6k
    bool being_compacted = false;
1289
50.6k
    bool is_first = true;
1290
87.5k
    for (FileMetaData* f : vstorage.LevelFiles(level)) {
1291
87.5k
      total_compensated_size += f->compensated_file_size;
1292
87.5k
      total_size += f->fd.GetTotalFileSize();
1293
87.5k
      if (ioptions.compaction_options_universal.allow_trivial_move == true) {
1294
58.5k
        if (f->being_compacted) {
1295
1.51k
          being_compacted = f->being_compacted;
1296
1.51k
        }
1297
58.5k
      } else {
1298
        // Compaction always includes all files for a non-zero level, so for a
1299
        // non-zero level, all the files should share the same being_compacted
1300
        // value.
1301
        // This assumption is only valid when
1302
        // ioptions.compaction_options_universal.allow_trivial_move is false
1303
29.0k
        assert(is_first || f->being_compacted == being_compacted);
1304
29.0k
      }
1305
87.5k
      if (is_first) {
1306
17.0k
        being_compacted = f->being_compacted;
1307
17.0k
        is_first = false;
1308
17.0k
      }
1309
87.5k
    }
1310
50.6k
    if (total_compensated_size > 0) {
1311
17.0k
      ret.back().emplace_back(level, nullptr, total_size, total_compensated_size, being_compacted);
1312
17.0k
    }
1313
50.6k
  }
1314
1315
  // If last sequence is empty, it means that we don't have files after too-large-to-compact file.
1316
  // So just drop this sequence.
1317
15.1k
  if (ret.back().empty())
1318
17
    ret.pop_back();
1319
15.1k
  return ret;
1320
15.1k
}
1321
1322
#ifndef NDEBUG
1323
namespace {
1324
// smallest_seqno and largest_seqno are set iff. `files` is not empty.
1325
void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
1326
                             SequenceNumber* smallest_seqno,
1327
2.32k
                             SequenceNumber* largest_seqno) {
1328
2.32k
  DCHECK_ONLY_NOTNULL(smallest_seqno);
1329
2.32k
  DCHECK_ONLY_NOTNULL(largest_seqno);
1330
2.32k
  bool is_first = true;
1331
5.41k
  for (FileMetaData* f : files) {
1332
5.41k
    DCHECK_LE(f->smallest.seqno, f->largest.seqno);
1333
5.41k
    if (is_first) {
1334
2.32k
      is_first = false;
1335
2.32k
      *smallest_seqno = f->smallest.seqno;
1336
2.32k
      *largest_seqno = f->largest.seqno;
1337
3.09k
    } else {
1338
3.09k
      if (f->smallest.seqno < *smallest_seqno) {
1339
1
        *smallest_seqno = f->smallest.seqno;
1340
1
      }
1341
3.09k
      if (f->largest.seqno > *largest_seqno) {
1342
1.33k
        *largest_seqno = f->largest.seqno;
1343
1.33k
      }
1344
3.09k
    }
1345
5.41k
  }
1346
2.32k
}
1347
}  // namespace
1348
#endif
1349
1350
// Algorithm that checks to see if there are any overlapping
1351
// files in the input
1352
604
bool CompactionPicker::IsInputNonOverlapping(Compaction* c) {
1353
604
  auto comparator = icmp_->user_comparator();
1354
604
  int first_iter = 1;
1355
1356
604
  InputFileInfo prev, curr, next;
1357
1358
604
  SmallestKeyHeap smallest_key_priority_q =
1359
604
      create_level_heap(c, icmp_->user_comparator());
1360
1361
4.07k
  while (!smallest_key_priority_q.empty()) {
1362
3.47k
    curr = smallest_key_priority_q.top();
1363
3.47k
    smallest_key_priority_q.pop();
1364
1365
3.47k
    if (first_iter) {
1366
604
      prev = curr;
1367
604
      first_iter = 0;
1368
2.87k
    } else {
1369
2.87k
      if (comparator->Compare(prev.f->largest.key.user_key(),
1370
2.87k
                              curr.f->smallest.key.user_key()) >= 0) {
1371
        // found overlapping files, return false
1372
1
        return false;
1373
1
      }
1374
2.87k
      assert(comparator->Compare(curr.f->largest.key.user_key(),
1375
2.87k
                                 prev.f->largest.key.user_key()) > 0);
1376
0
      prev = curr;
1377
2.87k
    }
1378
1379
3.47k
    next.f = nullptr;
1380
1381
3.47k
    if (curr.level != 0 && 
curr.index < c->num_input_files(curr.level) - 11.52k
) {
1382
1.33k
      next.f = c->input(curr.level, curr.index + 1);
1383
1.33k
      next.level = curr.level;
1384
1.33k
      next.index = curr.index + 1;
1385
1.33k
    }
1386
1387
3.47k
    if (next.f) {
1388
1.33k
      smallest_key_priority_q.push(std::move(next));
1389
1.33k
    }
1390
3.47k
  }
1391
603
  return true;
1392
604
}
1393
1394
// Universal style of compaction. Pick files that are contiguous in
1395
// time-range to compact.
1396
//
1397
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompaction(
1398
    const std::string& cf_name,
1399
    const MutableCFOptions& mutable_cf_options,
1400
    VersionStorageInfo* vstorage,
1401
15.1k
    LogBuffer* log_buffer) {
1402
15.1k
  std::vector<std::vector<SortedRun>> sorted_runs = CalculateSortedRuns(
1403
15.1k
      *vstorage,
1404
15.1k
      ioptions_,
1405
15.1k
      mutable_cf_options.MaxFileSizeForCompaction());
1406
1407
15.2k
  for (const auto& block : sorted_runs) {
1408
15.2k
    auto result = DoPickCompaction(cf_name, mutable_cf_options, vstorage, log_buffer, block);
1409
15.2k
    if (result != nullptr) {
1410
4.75k
      return result;
1411
4.75k
    }
1412
15.2k
  }
1413
10.4k
  TEST_SYNC_POINT("UniversalCompactionPicker::PickCompaction:SkippingCompaction");
1414
10.4k
  return nullptr;
1415
15.1k
}
1416
1417
std::unique_ptr<Compaction> UniversalCompactionPicker::DoPickCompaction(
1418
    const std::string& cf_name,
1419
    const MutableCFOptions& mutable_cf_options,
1420
    VersionStorageInfo* vstorage,
1421
    LogBuffer* log_buffer,
1422
15.2k
    const std::vector<UniversalCompactionPicker::SortedRun>& sorted_runs) {
1423
15.2k
  const int kLevel0 = 0;
1424
15.2k
  double score = vstorage->CompactionScore(kLevel0);
1425
15.2k
  std::unique_ptr<Compaction> c;
1426
1427
15.2k
  if (sorted_runs.size() == 0) {
1428
0
    RDEBUG(ioptions_.info_log, "[%s] Universal: nothing to do\n", cf_name.c_str());
1429
0
    return nullptr;
1430
0
  }
1431
15.2k
  VersionStorageInfo::LevelSummaryStorage tmp;
1432
15.2k
  RDEBUG(ioptions_.info_log,
1433
15.2k
         "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
1434
15.2k
         cf_name.c_str(), sorted_runs.size(),
1435
15.2k
         vstorage->LevelSummary(&tmp));
1436
1437
  // First, if we're using a compaction_file_filter_factory, check if we can directly delete this
1438
  // set of files. This type of compaction may include any number of files, regardless of
1439
  // level0_file_num_compaction_trigger.
1440
15.2k
  if (ioptions_.compaction_file_filter_factory) {
1441
38
    c = PickCompactionUniversalDeletion(
1442
38
        cf_name, mutable_cf_options, vstorage, score, sorted_runs, log_buffer);
1443
38
  }
1444
15.2k
  if (c) {
1445
2
    LOG_TO_BUFFER(log_buffer, "[%s] Universal: compacting for direct deletion\n",
1446
2
                  cf_name.c_str());
1447
15.2k
  } else {
1448
    // Check if the number of files to compact is greater than or equal to
1449
    // level0_file_num_compaction_trigger. If so, consider size amplification and
1450
    // read amplification as compaction reasons.
1451
15.2k
    if (sorted_runs.size() <
1452
15.2k
            (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
1453
23
      RDEBUG(ioptions_.info_log, "[%s] Universal: nothing to do\n", cf_name.c_str());
1454
23
      return nullptr;
1455
23
    }
1456
1457
    // Check for size amplification next.
1458
15.2k
    c = PickCompactionUniversalSizeAmp(cf_name, mutable_cf_options, vstorage,
1459
15.2k
                                            score, sorted_runs, log_buffer);
1460
15.2k
    if (c) {
1461
715
      LOG_TO_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
1462
715
                  cf_name.c_str());
1463
14.5k
    } else {
1464
      // Size amplification is within limits. Try reducing read
1465
      // amplification while maintaining file size ratios.
1466
14.5k
      unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
1467
1468
14.5k
      c = PickCompactionUniversalReadAmp(
1469
14.5k
          cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
1470
14.5k
          ioptions_.compaction_options_universal.always_include_size_threshold,
1471
14.5k
          sorted_runs, log_buffer);
1472
14.5k
      if (c) {
1473
4.04k
        LOG_TO_BUFFER(log_buffer, "[%s] Universal: compacting for size ratio\n",
1474
4.04k
                    cf_name.c_str());
1475
10.4k
      } else {
1476
        // ENG-1401: We trigger compaction logic when num files exceeds
1477
        // level0_file_num_compaction_trigger. However, we only want to compact based on
1478
        // files being of comparable sizes. This is already checked in the block above.
1479
        // Previously, if we didn't find any such candidates, then we were falling down
1480
        // into the block below to compact files without regards to their relative sizes,
1481
        // if the number of files is greater than level0_file_num_compaction_trigger.
1482
        // This was causing a lot of read/write amplification.
1483
        //
1484
        // Ideally, we should just remove this block below. For now, putting this
1485
        // under a gflag.
1486
10.4k
        if (FLAGS_aggressive_compaction_for_read_amp) {
1487
          // Size amplification and file size ratios are within configured limits.
1488
          // If max read amplification is exceeding configured limits, then force
1489
          // compaction without looking at filesize ratios and try to reduce
1490
          // the number of files to fewer than level0_file_num_compaction_trigger.
1491
          // This is guaranteed by NeedsCompaction()
1492
0
          assert(sorted_runs.size() >=
1493
0
                static_cast<size_t>(
1494
0
                    mutable_cf_options.level0_file_num_compaction_trigger));
1495
0
          unsigned int num_files =
1496
0
          static_cast<unsigned int>(sorted_runs.size()) -
1497
0
            mutable_cf_options.level0_file_num_compaction_trigger;
1498
0
          if ((c = PickCompactionUniversalReadAmp(
1499
0
                      cf_name, mutable_cf_options, vstorage, score, UINT_MAX, num_files,
1500
0
                      ioptions_.compaction_options_universal.always_include_size_threshold,
1501
0
                      sorted_runs, log_buffer)) != nullptr) {
1502
0
            LOG_TO_BUFFER(log_buffer,
1503
0
                          "[%s] Universal: compacting for file num -- %u\n",
1504
0
                          cf_name.c_str(), num_files);
1505
0
          }
1506
0
        }
1507
10.4k
      }
1508
14.5k
    }
1509
15.2k
  }
1510
15.2k
  if (c == nullptr) {
1511
10.4k
    return nullptr;
1512
10.4k
  }
1513
1514
4.75k
  if (ioptions_.compaction_options_universal.allow_trivial_move == true) {
1515
604
    c->set_is_trivial_move(IsInputNonOverlapping(c.get()));
1516
604
  }
1517
1518
// validate that all the chosen files of L0 are non overlapping in time
1519
4.75k
#ifndef NDEBUG
1520
4.75k
  SequenceNumber prev_smallest_seqno = 0U;
1521
4.75k
  bool is_first = true;
1522
1523
4.75k
  size_t level_index = 0U;
1524
4.75k
  if (c->start_level() == 0) {
1525
14.7k
    for (auto f : *c->inputs(0)) {
1526
14.7k
      DCHECK_LE(f->smallest.seqno, f->largest.seqno);
1527
14.7k
      if (is_first) {
1528
4.75k
        is_first = false;
1529
10.0k
      } else {
1530
10.0k
        DCHECK_GT(prev_smallest_seqno, f->largest.seqno);
1531
10.0k
      }
1532
14.7k
      prev_smallest_seqno = f->smallest.seqno;
1533
14.7k
    }
1534
4.75k
    level_index = 1U;
1535
4.75k
  }
1536
21.2k
  for (; level_index < c->num_input_levels(); 
level_index++16.4k
) {
1537
16.4k
    if (c->num_input_files(level_index) != 0) {
1538
2.32k
      SequenceNumber smallest_seqno = 0U;
1539
2.32k
      SequenceNumber largest_seqno = 0U;
1540
2.32k
      GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
1541
2.32k
                              &largest_seqno);
1542
2.32k
      if (is_first) {
1543
3
        is_first = false;
1544
2.32k
      } else if (prev_smallest_seqno > 0) {
1545
        // A level is considered as the bottommost level if there are
1546
        // no files in higher levels or if files in higher levels do
1547
        // not overlap with the files being compacted. Sequence numbers
1548
        // of files in bottommost level can be set to 0 to help
1549
        // compression. As a result, the following assert may not hold
1550
        // if the prev_smallest_seqno is 0.
1551
2.28k
        assert(prev_smallest_seqno > largest_seqno);
1552
2.28k
      }
1553
0
      prev_smallest_seqno = smallest_seqno;
1554
2.32k
    }
1555
16.4k
  }
1556
4.75k
#endif
1557
  // update statistics
1558
4.75k
  MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
1559
4.75k
              c->inputs(0)->size());
1560
1561
4.75k
  level0_compactions_in_progress_.insert(c.get());
1562
1563
4.75k
  return c;
1564
15.2k
}
1565
1566
uint32_t UniversalCompactionPicker::GetPathId(
1567
4.75k
    const ImmutableCFOptions& ioptions, uint64_t file_size) {
1568
  // Two conditions need to be satisfied:
1569
  // (1) the target path needs to be able to hold the file's size
1570
  // (2) Total size left in this and previous paths need to be not
1571
  //     smaller than expected future file size before this new file is
1572
  //     compacted, which is estimated based on size_ratio.
1573
  // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
1574
  // we will make sure the target file, probably with size of 16, will be
1575
  // placed in a path so that eventually when new files are generated and
1576
  // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
1577
  // before the path we chose.
1578
  //
1579
  // TODO(sdong): now the case of multiple column families is not
1580
  // considered in this algorithm. So the target size can be violated in
1581
  // that case. We need to improve it.
1582
4.75k
  uint64_t accumulated_size = 0;
1583
4.75k
  uint64_t future_size = file_size *
1584
4.75k
    (100 - ioptions.compaction_options_universal.size_ratio) / 100;
1585
4.75k
  uint32_t p = 0;
1586
4.75k
  assert(!ioptions.db_paths.empty());
1587
4.83k
  for (; p < ioptions.db_paths.size() - 1; 
p++78
) {
1588
126
    uint64_t target_size = ioptions.db_paths[p].target_size;
1589
126
    if (target_size > file_size &&
1590
126
        
accumulated_size + (target_size - file_size) > future_size78
) {
1591
48
      return p;
1592
48
    }
1593
78
    accumulated_size += target_size;
1594
78
  }
1595
4.70k
  return p;
1596
4.75k
}
1597
1598
//
1599
// Consider compaction files based on their size differences with
1600
// the next file in time order.
1601
//
1602
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompactionUniversalReadAmp(
1603
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1604
    VersionStorageInfo* vstorage, double score, unsigned int ratio,
1605
    unsigned int max_number_of_files_to_compact, size_t always_include_size_threshold,
1606
14.5k
    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
1607
14.5k
  unsigned int min_merge_width =
1608
14.5k
    ioptions_.compaction_options_universal.min_merge_width;
1609
14.5k
  unsigned int max_merge_width =
1610
14.5k
    ioptions_.compaction_options_universal.max_merge_width;
1611
1612
14.5k
  const SortedRun* sr = nullptr;
1613
14.5k
  bool done = false;
1614
14.5k
  size_t start_index = 0;
1615
14.5k
  unsigned int candidate_count = 0;
1616
1617
14.5k
  unsigned int max_files_to_compact = std::min(max_merge_width,
1618
14.5k
                                       max_number_of_files_to_compact);
1619
14.5k
  min_merge_width = std::max(min_merge_width, 2U);
1620
1621
  // Caller checks the size before executing this function. This invariant is
1622
  // important because otherwise we may have a possible integer underflow when
1623
  // dealing with unsigned types.
1624
14.5k
  assert(sorted_runs.size() > 0);
1625
1626
  // Considers a candidate file only if it is smaller than the
1627
  // total size accumulated so far.
1628
35.1k
  for (size_t loop = 0; loop < sorted_runs.size(); 
loop++20.6k
) {
1629
24.6k
    candidate_count = 0;
1630
1631
    // Skip files that are already being compacted
1632
54.5k
    for (sr = nullptr; loop < sorted_runs.size(); 
loop++29.8k
) {
1633
51.5k
      sr = &sorted_runs[loop];
1634
1635
51.5k
      if (!sr->being_compacted) {
1636
21.6k
        candidate_count = 1;
1637
21.6k
        break;
1638
21.6k
      }
1639
29.8k
      char file_num_buf[kFormatFileNumberBufSize];
1640
29.8k
      sr->Dump(file_num_buf, sizeof(file_num_buf));
1641
29.8k
      LOG_TO_BUFFER(log_buffer,
1642
29.8k
                  "[%s] Universal: %s"
1643
29.8k
                  "[%d] being compacted, skipping",
1644
29.8k
                  cf_name.c_str(), file_num_buf, loop);
1645
1646
29.8k
      sr = nullptr;
1647
29.8k
    }
1648
1649
    // This file is not being compacted. Consider it as the
1650
    // first candidate to be compacted.
1651
24.6k
    uint64_t candidate_size = sr != nullptr ? 
sr->compensated_file_size21.6k
:
02.98k
;
1652
24.6k
    if (sr != nullptr) {
1653
21.6k
      char file_num_buf[kFormatFileNumberBufSize];
1654
21.6k
      sr->Dump(file_num_buf, sizeof(file_num_buf), true);
1655
21.6k
      RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d].",
1656
21.6k
             cf_name.c_str(), file_num_buf, loop);
1657
21.6k
    }
1658
1659
    // Check if the succeeding files need compaction.
1660
24.6k
    for (size_t i = loop + 1;
1661
35.0k
         candidate_count < max_files_to_compact && i < sorted_runs.size();
1662
24.6k
         
i++10.3k
) {
1663
23.5k
      const SortedRun* succeeding_sr = &sorted_runs[i];
1664
23.5k
      if (succeeding_sr->being_compacted) {
1665
1.58k
        break;
1666
1.58k
      }
1667
      // Pick files if the total/last candidate file size (increased by the specified ratio) is
1668
      // still larger than the next candidate file or if the next candidate file has size no more
1669
      // than always_include_size_threshold.
1670
      // candidate_size is the total size of files picked so far with the default
1671
      // kCompactionStopStyleTotalSize;
1672
      // with kCompactionStopStyleSimilarSize, it's simply the size of the last picked file.
1673
21.9k
      const bool is_include_by_threshold = succeeding_sr->size <= always_include_size_threshold;
1674
21.9k
      double sz = candidate_size * (100.0 + ratio) / 100.0;
1675
21.9k
      if (sz < static_cast<double>(succeeding_sr->size) && 
!is_include_by_threshold12.0k
) {
1676
11.5k
        break;
1677
11.5k
      }
1678
10.3k
      if (ioptions_.compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
1679
18
        if (!is_include_by_threshold) {
1680
          // Similar-size stopping rule: also check the last picked file isn't
1681
          // far larger than the next candidate file.
1682
18
          sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
1683
18
          if (sz < static_cast<double>(candidate_size)) {
1684
            // If the small file we've encountered begins a run of similar-size
1685
            // files, we'll pick them up on a future iteration of the outer
1686
            // loop. If it's some lonely straggler, it'll eventually get picked
1687
            // by the last-resort read amp strategy which disregards size ratios.
1688
12
            break;
1689
12
          }
1690
6
          candidate_size = succeeding_sr->compensated_file_size;
1691
6
        }
1692
10.3k
      } else {  // default kCompactionStopStyleTotalSize
1693
10.3k
        candidate_size += succeeding_sr->compensated_file_size;
1694
10.3k
      }
1695
10.3k
      candidate_count++;
1696
10.3k
    }
1697
1698
    // Found a series of consecutive files that need compaction.
1699
24.6k
    if (candidate_count >= (unsigned int)min_merge_width) {
1700
4.04k
      start_index = loop;
1701
4.04k
      done = true;
1702
4.04k
      break;
1703
20.6k
    } else {
1704
20.6k
#ifndef NDEBUG
1705
20.6k
      for (size_t i = loop;
1706
38.6k
           i < loop + candidate_count && 
i < sorted_runs.size()18.0k
;
i++18.0k
) {
1707
18.0k
        const SortedRun* skipping_sr = &sorted_runs[i];
1708
18.0k
        char file_num_buf[kFormatFileSizeInfoBufSize];
1709
18.0k
        skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1710
18.0k
        RDEBUG(ioptions_.info_log, "[%s] Universal: Skipping %s", cf_name.c_str(), file_num_buf);
1711
18.0k
      }
1712
20.6k
#endif
1713
20.6k
    }
1714
24.6k
  }
1715
14.5k
  if (!done || 
candidate_count <= 14.04k
) {
1716
10.4k
    return nullptr;
1717
10.4k
  }
1718
4.04k
  size_t first_index_after = start_index + candidate_count;
1719
  // Compression is enabled if files compacted earlier already reached
1720
  // size ratio of compression.
1721
4.04k
  bool enable_compression = true;
1722
4.04k
  int ratio_to_compress =
1723
4.04k
      ioptions_.compaction_options_universal.compression_size_percent;
1724
4.04k
  if (ratio_to_compress >= 0) {
1725
90
    uint64_t total_size = 0;
1726
306
    for (auto& sorted_run : sorted_runs) {
1727
306
      total_size += sorted_run.compensated_file_size;
1728
306
    }
1729
1730
90
    uint64_t older_file_size = 0;
1731
138
    for (size_t i = sorted_runs.size() - 1; i >= first_index_after; 
i--48
) {
1732
60
      older_file_size += sorted_runs[i].size;
1733
60
      if (older_file_size * 100L >= total_size * static_cast<int64_t>(ratio_to_compress)) {
1734
12
        enable_compression = false;
1735
12
        break;
1736
12
      }
1737
60
    }
1738
90
  }
1739
1740
4.04k
  uint64_t estimated_total_size = 0;
1741
18.5k
  for (unsigned int i = 0; i < first_index_after; 
i++14.5k
) {
1742
14.5k
    estimated_total_size += sorted_runs[i].size;
1743
14.5k
  }
1744
4.04k
  uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
1745
4.04k
  int start_level = sorted_runs[start_index].level;
1746
4.04k
  int output_level;
1747
4.04k
  if (first_index_after == sorted_runs.size()) {
1748
898
    output_level = vstorage->num_levels() - 1;
1749
3.14k
  } else if (sorted_runs[first_index_after].level == 0) {
1750
1.34k
    output_level = 0;
1751
1.80k
  } else {
1752
1.80k
    output_level = sorted_runs[first_index_after].level - 1;
1753
1.80k
  }
1754
1755
4.04k
  std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
1756
22.2k
  for (size_t i = 0; i < inputs.size(); 
++i18.1k
) {
1757
18.1k
    inputs[i].level = start_level + static_cast<int>(i);
1758
18.1k
  }
1759
18.0k
  for (size_t i = start_index; i < first_index_after; 
i++13.9k
) {
1760
13.9k
    auto& picking_sr = sorted_runs[i];
1761
13.9k
    if (picking_sr.level == 0) {
1762
12.0k
      FileMetaData* picking_file = picking_sr.file;
1763
12.0k
      inputs[0].files.push_back(picking_file);
1764
12.0k
    } else {
1765
1.90k
      auto& files = inputs[picking_sr.level - start_level].files;
1766
2.55k
      for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
1767
2.55k
        files.push_back(f);
1768
2.55k
      }
1769
1.90k
    }
1770
13.9k
    char file_num_buf[kFormatFileSizeInfoBufSize];
1771
13.9k
    picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
1772
13.9k
    LOG_TO_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
1773
13.9k
                file_num_buf);
1774
13.9k
  }
1775
1776
4.04k
  CompactionReason compaction_reason;
1777
4.04k
  if (max_number_of_files_to_compact == UINT_MAX) {
1778
4.04k
    compaction_reason = CompactionReason::kUniversalSortedRunNum;
1779
4.04k
  } else {
1780
0
    compaction_reason = CompactionReason::kUniversalSizeRatio;
1781
0
  }
1782
4.04k
  return Compaction::Create(
1783
4.04k
      vstorage, mutable_cf_options, std::move(inputs), output_level,
1784
4.04k
      mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
1785
4.04k
      GetCompressionType(ioptions_, start_level, 1, enable_compression),
1786
4.04k
      /* grandparents = */ std::vector<FileMetaData*>(), ioptions_.info_log,
1787
4.04k
      /* is_manual = */ false, score,
1788
4.04k
      /* deletion_compaction = */ false, compaction_reason);
1789
14.5k
}
1790
1791
// Look to see if all files within the run are marked for deletion.
1792
// If so, we can run a low-cost compaction that just deletes those files.
1793
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompactionUniversalDeletion(
1794
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1795
    VersionStorageInfo* vstorage, double score,
1796
38
    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
1797
1798
  // Universal deletion compaction is currently only compatible with Level-0
1799
  // universal compactions.
1800
38
  if (vstorage->num_levels() > 1) {
1801
0
    RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
1802
0
        "[%s] Unexpected number of storage levels during universal deletion "
1803
0
        "(expected 1, was %d).",
1804
0
        cf_name.c_str(), vstorage->num_levels());
1805
0
    return nullptr;
1806
0
  }
1807
38
  std::vector<CompactionInputFiles> inputs(1);
1808
38
  auto& input_files = inputs[0];
1809
1810
  // Check each file to see if they are marked for deletion.
1811
213
  for (size_t loop = 0; loop < sorted_runs.size(); 
loop++175
) {
1812
175
    const auto sr = &sorted_runs[loop];
1813
1814
175
    if (!sr->being_compacted && 
sr->delete_after_compaction()55
) {
1815
23
      FileMetaData* f = sr->file;
1816
23
      input_files.files.push_back(f);
1817
1818
23
      char file_num_buf[kFormatFileSizeInfoBufSize];
1819
23
      sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1820
23
      LOG_TO_BUFFER(log_buffer, "[%s] Universal: file deletion picking %s",
1821
23
                  cf_name.c_str(), file_num_buf);
1822
152
    } else {
1823
152
      RDEBUG(ioptions_.info_log, "[%s] Universal: skipping %s[%d] compacted %s\n",
1824
152
          cf_name.c_str(), sr->FileNumAsString().c_str(), loop,
1825
152
          sr->delete_after_compaction() ?
1826
152
              "is not marked for deletion." : "is already being compacted.");
1827
152
    }
1828
175
  }
1829
  // If no files are marked for deletion, return nullptr.
1830
38
  if (input_files.size() == 0) {
1831
36
    return nullptr; // no candidate files
1832
36
  }
1833
1834
2
  return Compaction::Create(
1835
2
      vstorage,
1836
2
      mutable_cf_options,
1837
2
      std::move(inputs),
1838
2
      /* output level = */ 0,
1839
2
      mutable_cf_options.MaxFileSizeForLevel(0),
1840
      /* max_grandparent_overlap_bytes = */ LLONG_MAX,
1841
2
      /* path_id = */ 0,
1842
2
      GetCompressionType(ioptions_, 0, 1),
1843
2
      /* grandparents = */ {},
1844
2
      ioptions_.info_log,
1845
2
      /* is manual = */ false,
1846
2
      score,
1847
2
      /* deletion_compaction not currently used, but could be in the future */ false,
1848
2
      CompactionReason::kUniversalDirectDeletion);
1849
38
}
1850
1851
// Look at overall size amplification. If size amplification
1852
// exceeeds the configured value, then do a compaction
1853
// of the candidate files all the way upto the earliest
1854
// base file (overrides configured values of file-size ratios,
1855
// min_merge_width and max_merge_width).
1856
//
1857
std::unique_ptr<Compaction> UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
1858
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1859
    VersionStorageInfo* vstorage, double score,
1860
15.2k
    const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
1861
  // percentage flexibilty while reducing size amplification
1862
15.2k
  uint64_t ratio = ioptions_.compaction_options_universal.
1863
15.2k
                     max_size_amplification_percent;
1864
1865
15.2k
  unsigned int candidate_count = 0;
1866
15.2k
  uint64_t candidate_size = 0;
1867
15.2k
  size_t start_index = 0;
1868
15.2k
  const SortedRun* sr = nullptr;
1869
1870
  // Skip files that are already being compacted
1871
39.6k
  for (size_t loop = 0; loop < sorted_runs.size() - 1; 
loop++24.4k
) {
1872
33.3k
    sr = &sorted_runs[loop];
1873
33.3k
    if (!sr->being_compacted) {
1874
8.98k
      start_index = loop;         // Consider this as the first candidate.
1875
8.98k
      break;
1876
8.98k
    }
1877
24.4k
    RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d] is already being "
1878
24.4k
        "compacted. Cannot be a candidate to reduce size amp.\n",
1879
24.4k
        cf_name.c_str(), sr->FileNumAsString().c_str(), loop);
1880
24.4k
    sr = nullptr;
1881
24.4k
  }
1882
1883
15.2k
  if (sr == nullptr) {
1884
6.23k
    return nullptr;             // no candidate files
1885
6.23k
  }
1886
8.98k
  {
1887
8.98k
    char file_num_buf[kFormatFileNumberBufSize];
1888
8.98k
    sr->Dump(file_num_buf, sizeof(file_num_buf), true);
1889
8.98k
    RDEBUG(ioptions_.info_log,
1890
8.98k
           "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
1891
8.98k
           cf_name.c_str(), file_num_buf, start_index,
1892
8.98k
           " to reduce size amp.\n");
1893
8.98k
  }
1894
1895
  // keep adding up all the remaining files
1896
34.7k
  for (size_t loop = start_index; loop < sorted_runs.size() - 1; 
loop++25.7k
) {
1897
27.0k
    sr = &sorted_runs[loop];
1898
27.0k
    if (sr->being_compacted) {
1899
1.33k
      RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d] is already being "
1900
1.33k
          "compacted. No size amp reduction possible.\n",
1901
1.33k
          cf_name.c_str(), sr->FileNumAsString().c_str(), loop);
1902
1.33k
      return nullptr;
1903
1.33k
    }
1904
25.7k
    candidate_size += sr->compensated_file_size;
1905
25.7k
    candidate_count++;
1906
25.7k
  }
1907
7.65k
  if (candidate_count == 0) {
1908
0
    return nullptr;
1909
0
  }
1910
1911
  // Check that the earliest file is not already being compacted.
1912
7.65k
  sr = &sorted_runs.back();
1913
7.65k
  if (sr->being_compacted) {
1914
1
    RDEBUG(ioptions_.info_log, "[%s] Universal: Possible candidate %s[%d] is already being "
1915
1
          "compacted. No size amp reduction possible.\n",
1916
1
          cf_name.c_str(), sr->FileNumAsString().c_str(), sorted_runs.size() - 1);
1917
1
    return nullptr;
1918
1
  }
1919
  // size of earliest file
1920
7.65k
  uint64_t earliest_file_size = sr->size;
1921
1922
  // size amplification = percentage of additional size
1923
7.65k
  if (candidate_size * 100 < ratio * earliest_file_size) {
1924
6.93k
    RDEBUG(ioptions_.info_log,
1925
6.93k
           "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
1926
6.93k
           " earliest-file-size %" PRIu64,
1927
6.93k
           cf_name.c_str(), candidate_size, earliest_file_size);
1928
6.93k
    return nullptr;
1929
6.93k
  } else {
1930
715
    RDEBUG(ioptions_.info_log,
1931
715
           "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
1932
715
           " earliest-file-size %" PRIu64,
1933
715
           cf_name.c_str(), candidate_size, earliest_file_size);
1934
715
  }
1935
715
  assert(start_index < sorted_runs.size() - 1);
1936
1937
  // Estimate total file size
1938
0
  uint64_t estimated_total_size = 0;
1939
3.78k
  for (size_t loop = start_index; loop < sorted_runs.size(); 
loop++3.07k
) {
1940
3.07k
    estimated_total_size += sorted_runs[loop].size;
1941
3.07k
  }
1942
715
  uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
1943
715
  int start_level = sorted_runs[start_index].level;
1944
1945
715
  std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
1946
3.78k
  for (size_t i = 0; i < inputs.size(); 
++i3.07k
) {
1947
3.07k
    inputs[i].level = start_level + static_cast<int>(i);
1948
3.07k
  }
1949
  // We always compact all the files, so always compress.
1950
3.78k
  for (size_t loop = start_index; loop < sorted_runs.size(); 
loop++3.07k
) {
1951
3.07k
    auto& picking_sr = sorted_runs[loop];
1952
3.07k
    if (picking_sr.level == 0) {
1953
2.65k
      FileMetaData* f = picking_sr.file;
1954
2.65k
      inputs[0].files.push_back(f);
1955
2.65k
    } else {
1956
422
      auto& files = inputs[picking_sr.level - start_level].files;
1957
2.86k
      for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
1958
2.86k
        files.push_back(f);
1959
2.86k
      }
1960
422
    }
1961
3.07k
    char file_num_buf[kFormatFileSizeInfoBufSize];
1962
3.07k
    picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1963
3.07k
    LOG_TO_BUFFER(log_buffer, "[%s] Universal: size amp picking %s",
1964
3.07k
                cf_name.c_str(), file_num_buf);
1965
3.07k
  }
1966
1967
715
  return Compaction::Create(
1968
715
      vstorage, mutable_cf_options, std::move(inputs), vstorage->num_levels() - 1,
1969
715
      mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
1970
      /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
1971
715
      GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1),
1972
715
      /* grandparents */ std::vector<FileMetaData*>(), ioptions_.info_log, /* is manual = */ false,
1973
715
      score, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification);
1974
7.65k
}
1975
1976
bool FIFOCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
1977
549
    const {
1978
549
  const int kLevel0 = 0;
1979
549
  return vstorage->CompactionScore(kLevel0) >= 1;
1980
549
}
1981
1982
std::unique_ptr<Compaction> FIFOCompactionPicker::PickCompaction(
1983
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1984
27
    VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
1985
27
  assert(vstorage->num_levels() == 1);
1986
0
  const int kLevel0 = 0;
1987
27
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
1988
27
  uint64_t total_size = 0;
1989
94
  for (const auto& file : level_files) {
1990
94
    total_size += file->fd.total_file_size;
1991
94
  }
1992
1993
27
  if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
1994
27
      
level_files.size() == 020
) {
1995
    // total size not exceeded
1996
7
    RDEBUG(ioptions_.info_log,
1997
7
           "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
1998
7
           ", max size %" PRIu64 "\n",
1999
7
           cf_name.c_str(), total_size,
2000
7
           ioptions_.compaction_options_fifo.max_table_files_size);
2001
7
    return nullptr;
2002
7
  }
2003
2004
20
  if (!level0_compactions_in_progress_.empty()) {
2005
8
    RDEBUG(ioptions_.info_log,
2006
8
           "[%s] FIFO compaction: Already executing compaction. No need "
2007
8
           "to run parallel compactions since compactions are very fast",
2008
8
           cf_name.c_str());
2009
8
    return nullptr;
2010
8
  }
2011
2012
12
  std::vector<CompactionInputFiles> inputs;
2013
12
  inputs.emplace_back();
2014
12
  inputs[0].level = 0;
2015
  // delete old files (FIFO)
2016
16
  for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); 
++ritr4
) {
2017
16
    auto f = *ritr;
2018
16
    total_size -= f->compensated_file_size;
2019
16
    inputs[0].files.push_back(f);
2020
16
    char tmp_fsize[16];
2021
16
    AppendHumanBytes(f->fd.GetTotalFileSize(), tmp_fsize, sizeof(tmp_fsize));
2022
16
    LOG_TO_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
2023
16
                            " with size %s for deletion",
2024
16
                cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
2025
16
    if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
2026
12
      break;
2027
12
    }
2028
16
  }
2029
12
  auto c = Compaction::Create(
2030
12
      vstorage, mutable_cf_options, std::move(inputs), 0 /* output_level */,
2031
12
      0 /* target_file_size */, 0 /* max_grandparent_overlap_bytes */, 0 /* output_path_id */,
2032
12
      kNoCompression, std::vector<FileMetaData*>(), ioptions_.info_log, /* is manual */ false,
2033
12
      vstorage->CompactionScore(0),
2034
12
      /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
2035
12
  if (c) {
2036
12
    level0_compactions_in_progress_.insert(c.get());
2037
12
  }
2038
12
  return c;
2039
20
}
2040
2041
std::unique_ptr<Compaction> FIFOCompactionPicker::CompactRange(
2042
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
2043
    VersionStorageInfo* vstorage, int input_level, int output_level,
2044
    uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
2045
11
    InternalKey** compaction_end, bool* manual_conflict) {
2046
11
  assert(input_level == 0);
2047
0
  assert(output_level == 0);
2048
0
  *compaction_end = nullptr;
2049
11
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
2050
11
  auto c = PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
2051
11
  log_buffer.FlushBufferToLog();
2052
11
  return c;
2053
11
}
2054
2055
#endif  // !ROCKSDB_LITE
2056
2057
}  // namespace rocksdb