YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_job.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/compaction_job.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <stdint.h>
32
33
#include <algorithm>
34
#include <cmath>
35
#include <functional>
36
#include <list>
37
#include <memory>
38
#include <set>
39
#include <string>
40
#include <thread>
41
#include <utility>
42
#include <vector>
43
44
#include "yb/rocksdb/db/builder.h"
45
#include "yb/rocksdb/db/dbformat.h"
46
#include "yb/rocksdb/db/event_helpers.h"
47
#include "yb/rocksdb/db/filename.h"
48
#include "yb/rocksdb/db/file_numbers.h"
49
#include "yb/rocksdb/db/log_reader.h"
50
#include "yb/rocksdb/db/log_writer.h"
51
#include "yb/rocksdb/db/memtable.h"
52
#include "yb/rocksdb/db/memtable_list.h"
53
#include "yb/rocksdb/db/merge_helper.h"
54
#include "yb/rocksdb/db/version_set.h"
55
#include "yb/rocksdb/port/likely.h"
56
#include "yb/rocksdb/port/port.h"
57
#include "yb/rocksdb/db.h"
58
#include "yb/rocksdb/env.h"
59
#include "yb/rocksdb/statistics.h"
60
#include "yb/rocksdb/status.h"
61
#include "yb/rocksdb/table.h"
62
#include "yb/rocksdb/table/internal_iterator.h"
63
#include "yb/rocksdb/table/table_builder.h"
64
#include "yb/rocksdb/util/coding.h"
65
#include "yb/rocksdb/util/file_reader_writer.h"
66
#include "yb/rocksdb/util/log_buffer.h"
67
#include "yb/rocksdb/util/logging.h"
68
#include "yb/rocksdb/util/sst_file_manager_impl.h"
69
#include "yb/rocksdb/util/mutexlock.h"
70
#include "yb/rocksdb/perf_level.h"
71
#include "yb/rocksdb/util/stop_watch.h"
72
#include "yb/util/stats/perf_step_timer.h"
73
#include "yb/rocksdb/util/sync_point.h"
74
75
#include "yb/util/result.h"
76
#include "yb/util/stats/iostats_context_imp.h"
77
#include "yb/util/string_util.h"
78
79
namespace rocksdb {
80
81
// Maintains state for each sub-compaction
82
struct CompactionJob::SubcompactionState {
83
  Compaction* compaction;
84
  std::unique_ptr<CompactionIterator> c_iter;
85
86
  // The boundaries of the key-range this compaction is interested in. No two
87
  // subcompactions may have overlapping key-ranges.
88
  // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
89
  Slice *start, *end;
90
91
  // The return status of this subcompaction
92
  Status status;
93
94
  // Files produced by this subcompaction
95
  struct Output {
96
    FileMetaData meta;
97
    bool finished;
98
    std::shared_ptr<const TableProperties> table_properties;
99
  };
100
101
  // State kept for output being generated
102
  std::vector<Output> outputs;
103
  std::unique_ptr<WritableFileWriter> base_outfile;
104
  std::unique_ptr<WritableFileWriter> data_outfile;
105
  std::unique_ptr<TableBuilder> builder;
106
66.3M
  Output* current_output() {
107
66.3M
    if (outputs.empty()) {
108
      // This subcompaction's outptut could be empty if compaction was aborted
109
      // before this subcompaction had a chance to generate any output files.
110
      // When subcompactions are executed sequentially this is more likely and
111
      // will be particulalry likely for the later subcompactions to be empty.
112
      // Once they are run in parallel however it should be much rarer.
113
0
      return nullptr;
114
66.3M
    } else {
115
66.3M
      return &outputs.back();
116
66.3M
    }
117
66.3M
  }
118
119
  // State during the subcompaction
120
  uint64_t total_bytes;
121
  uint64_t num_input_records;
122
  uint64_t num_output_records;
123
  CompactionJobStats compaction_job_stats;
124
  uint64_t approx_size;
125
126
  SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
127
                     uint64_t size = 0)
128
      : compaction(c),
129
        start(_start),
130
        end(_end),
131
        base_outfile(nullptr),
132
        data_outfile(nullptr),
133
        builder(nullptr),
134
        total_bytes(0),
135
        num_input_records(0),
136
        num_output_records(0),
137
10.7k
        approx_size(size) {
138
10.7k
    assert(compaction != nullptr);
139
10.7k
  }
140
141
9
  SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
142
143
9
  SubcompactionState& operator=(SubcompactionState&& o) {
144
9
    compaction = std::move(o.compaction);
145
9
    start = std::move(o.start);
146
9
    end = std::move(o.end);
147
9
    status = std::move(o.status);
148
9
    outputs = std::move(o.outputs);
149
9
    base_outfile = std::move(o.base_outfile);
150
9
    data_outfile = std::move(o.data_outfile);
151
9
    builder = std::move(o.builder);
152
9
    total_bytes = std::move(o.total_bytes);
153
9
    num_input_records = std::move(o.num_input_records);
154
9
    num_output_records = std::move(o.num_output_records);
155
9
    compaction_job_stats = std::move(o.compaction_job_stats);
156
9
    approx_size = std::move(o.approx_size);
157
9
    return *this;
158
9
  }
159
160
  // Because member unique_ptrs do not have these.
161
  SubcompactionState(const SubcompactionState&) = delete;
162
163
  SubcompactionState& operator=(const SubcompactionState&) = delete;
164
};
165
166
// Maintains state for the entire compaction
167
struct CompactionJob::CompactionState {
168
  Compaction* const compaction;
169
170
  // REQUIRED: subcompaction states are stored in order of increasing
171
  // key-range
172
  std::vector<CompactionJob::SubcompactionState> sub_compact_states;
173
  Status status;
174
175
  uint64_t total_bytes;
176
  uint64_t num_input_records;
177
  uint64_t num_output_records;
178
179
  explicit CompactionState(Compaction* c)
180
      : compaction(c),
181
        total_bytes(0),
182
        num_input_records(0),
183
10.6k
        num_output_records(0) {}
184
185
21.3k
  size_t NumOutputFiles() {
186
21.3k
    size_t total = 0;
187
21.3k
    for (auto& s : sub_compact_states) {
188
21.3k
      total += s.outputs.size();
189
21.3k
    }
190
21.3k
    return total;
191
21.3k
  }
192
193
10.4k
  Slice SmallestUserKey() {
194
10.4k
    for (const auto& sub_compact_state : sub_compact_states) {
195
10.4k
      if (!sub_compact_state.outputs.empty() &&
196
10.4k
          sub_compact_state.outputs[0].finished) {
197
10.4k
        return sub_compact_state.outputs[0].meta.smallest.key.user_key();
198
10.4k
      }
199
10.4k
    }
200
    // If there is no finished output, return an empty slice.
201
15
    return Slice();
202
10.4k
  }
203
204
10.4k
  Slice LargestUserKey() {
205
10.5k
    for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
206
10.4k
         ++it) {
207
10.4k
      if (!it->outputs.empty() && it->current_output()->finished) {
208
10.4k
        assert(it->current_output() != nullptr);
209
10.4k
        return it->current_output()->meta.largest.key.user_key();
210
10.4k
      }
211
10.4k
    }
212
    // If there is no finished output, return an empty slice.
213
18
    return Slice();
214
10.4k
  }
215
};
216
217
10.6k
void CompactionJob::AggregateStatistics() {
218
10.6k
  for (SubcompactionState& sc : compact_->sub_compact_states) {
219
10.6k
    compact_->total_bytes += sc.total_bytes;
220
10.6k
    compact_->num_input_records += sc.num_input_records;
221
10.6k
    compact_->num_output_records += sc.num_output_records;
222
10.6k
  }
223
10.6k
  if (compaction_job_stats_) {
224
10.6k
    for (SubcompactionState& sc : compact_->sub_compact_states) {
225
10.6k
      compaction_job_stats_->Add(sc.compaction_job_stats);
226
10.6k
    }
227
10.6k
  }
228
10.6k
}
229
230
CompactionJob::CompactionJob(
231
    int job_id, Compaction* compaction, const DBOptions& db_options,
232
    const EnvOptions& env_options, VersionSet* versions,
233
    std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
234
    Directory* db_directory, Directory* output_directory, Statistics* stats,
235
    InstrumentedMutex* db_mutex, Status* db_bg_error,
236
    std::vector<SequenceNumber> existing_snapshots,
237
    SequenceNumber earliest_write_conflict_snapshot,
238
    FileNumbersProvider* file_numbers_provider,
239
    std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
240
    bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
241
    CompactionJobStats* compaction_job_stats)
242
    : job_id_(job_id),
243
      compact_(new CompactionState(compaction)),
244
      compaction_job_stats_(compaction_job_stats),
245
      compaction_stats_(1),
246
      dbname_(dbname),
247
      db_options_(db_options),
248
      env_options_(env_options),
249
      env_(db_options.env),
250
      versions_(versions),
251
      shutting_down_(shutting_down),
252
      log_buffer_(log_buffer),
253
      db_directory_(db_directory),
254
      output_directory_(output_directory),
255
      stats_(stats),
256
      db_mutex_(db_mutex),
257
      db_bg_error_(db_bg_error),
258
      existing_snapshots_(std::move(existing_snapshots)),
259
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
260
      file_numbers_provider_(file_numbers_provider),
261
      table_cache_(std::move(table_cache)),
262
      event_logger_(event_logger),
263
      paranoid_file_checks_(paranoid_file_checks),
264
10.6k
      measure_io_stats_(measure_io_stats) {
265
10.6k
  assert(log_buffer_ != nullptr);
266
10.6k
  const auto* cfd = compact_->compaction->column_family_data();
267
10.6k
  ReportStartedCompaction(compaction);
268
10.6k
}
269
270
10.6k
CompactionJob::~CompactionJob() {
271
10.6k
  assert(compact_ == nullptr);
272
10.6k
}
273
274
void CompactionJob::ReportStartedCompaction(
275
10.6k
    Compaction* compaction) {
276
10.6k
  const auto* cfd = compact_->compaction->column_family_data();
277
278
  // In the current design, a CompactionJob is always created
279
  // for non-trivial compaction.
280
10.6k
  assert(compaction->IsTrivialMove() == false ||
281
10.6k
         compaction->is_manual_compaction() == true);
282
283
10.6k
  IOSTATS_RESET(bytes_written);
284
10.6k
  IOSTATS_RESET(bytes_read);
285
286
10.6k
  if (compaction_job_stats_) {
287
10.6k
    compaction_job_stats_->is_manual_compaction =
288
10.6k
        compaction->is_manual_compaction();
289
10.6k
  }
290
10.6k
}
291
292
10.6k
void CompactionJob::Prepare() {
293
  // Generate file_levels_ for compaction berfore making Iterator
294
10.6k
  auto* c = compact_->compaction;
295
10.6k
  assert(c->column_family_data() != nullptr);
296
10.6k
  assert(c->column_family_data()->current()->storage_info()
297
10.6k
      ->NumLevelFiles(compact_->compaction->level()) > 0);
298
299
  // Is this compaction producing files at the bottommost level?
300
10.6k
  bottommost_level_ = c->bottommost_level();
301
302
10.6k
  if (c->ShouldFormSubcompactions()) {
303
1.05k
    const uint64_t start_micros = env_->NowMicros();
304
1.05k
    GenSubcompactionBoundaries();
305
1.05k
    assert(sizes_.size() == boundaries_.size() + 1);
306
307
2.11k
    for (size_t i = 0; i <= boundaries_.size(); i++) {
308
1.05k
      Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
309
1.05k
      Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
310
1.05k
      compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
311
1.05k
    }
312
9.64k
  } else {
313
9.64k
    compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
314
9.64k
  }
315
10.6k
}
316
317
struct RangeWithSize {
318
  Range range;
319
  uint64_t size;
320
321
  RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
322
132
      : range(a, b), size(s) {}
323
};
324
325
// Generates a histogram representing potential divisions of key ranges from
326
// the input. It adds the starting and/or ending keys of certain input files
327
// to the working set and then finds the approximate size of data in between
328
// each consecutive pair of slices. Then it divides these ranges into
329
// consecutive groups such that each group has a similar size.
330
1.05k
void CompactionJob::GenSubcompactionBoundaries() {
331
1.05k
  auto* c = compact_->compaction;
332
1.05k
  auto* cfd = c->column_family_data();
333
1.05k
  const Comparator* cfd_comparator = cfd->user_comparator();
334
1.05k
  std::vector<Slice> bounds;
335
1.05k
  int start_lvl = c->start_level();
336
1.05k
  int out_lvl = c->output_level();
337
338
  // Add the starting and/or ending key of certain input files as a potential
339
  // boundary
340
8.07k
  for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
341
7.02k
    int lvl = c->level(lvl_idx);
342
7.02k
    if (lvl >= start_lvl && lvl <= out_lvl) {
343
6.28k
      const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
344
6.28k
      size_t num_files = flevel->num_files;
345
346
6.28k
      if (num_files == 0) {
347
4.20k
        continue;
348
4.20k
      }
349
350
2.07k
      if (lvl == 0) {
351
        // For level 0 add the starting and ending key of each file since the
352
        // files may have greatly differing key ranges (not range-partitioned)
353
3.43k
        for (size_t i = 0; i < num_files; i++) {
354
2.38k
          bounds.emplace_back(flevel->files[i].smallest.key);
355
2.38k
          bounds.emplace_back(flevel->files[i].largest.key);
356
2.38k
        }
357
1.03k
      } else {
358
        // For all other levels add the smallest/largest key in the level to
359
        // encompass the range covered by that level
360
1.03k
        bounds.emplace_back(flevel->files[0].smallest.key);
361
1.03k
        bounds.emplace_back(flevel->files[num_files - 1].largest.key);
362
1.03k
        if (lvl == out_lvl) {
363
          // For the last level include the starting keys of all files since
364
          // the last level is the largest and probably has the widest key
365
          // range. Since it's range partitioned, the ending key of one file
366
          // and the starting key of the next are very close (or identical).
367
1.00k
          for (size_t i = 1; i < num_files; i++) {
368
21
            bounds.emplace_back(flevel->files[i].smallest.key);
369
21
          }
370
987
        }
371
1.03k
      }
372
2.07k
    }
373
7.02k
  }
374
375
1.05k
  std::sort(bounds.begin(), bounds.end(),
376
6.23k
    [cfd_comparator] (const Slice& a, const Slice& b) -> bool {
377
6.23k
      return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0;
378
6.23k
    });
379
  // Remove duplicated entries from bounds
380
1.05k
  bounds.erase(std::unique(bounds.begin(), bounds.end(),
381
5.81k
    [cfd_comparator] (const Slice& a, const Slice& b) -> bool {
382
5.81k
      return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) == 0;
383
5.81k
    }), bounds.end());
384
385
  // Combine consecutive pairs of boundaries into ranges with an approximate
386
  // size of data covered by keys in that range
387
1.05k
  uint64_t sum = 0;
388
1.05k
  std::vector<RangeWithSize> ranges;
389
1.05k
  auto* v = cfd->current();
390
1.18k
  for (auto it = bounds.begin();;) {
391
1.18k
    const Slice a = *it;
392
1.18k
    it++;
393
394
1.18k
    if (it == bounds.end()) {
395
1.05k
      break;
396
1.05k
    }
397
398
132
    const Slice b = *it;
399
132
    uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1);
400
132
    ranges.emplace_back(a, b, size);
401
132
    sum += size;
402
132
  }
403
404
  // Group the ranges into subcompactions
405
1.05k
  const double min_file_fill_percent = 4.0 / 5;
406
1.05k
  uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
407
1.05k
      sum / min_file_fill_percent /
408
1.05k
      cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel(out_lvl)));
409
1.05k
  uint64_t subcompactions =
410
1.05k
      std::min({static_cast<uint64_t>(ranges.size()),
411
1.05k
                static_cast<uint64_t>(db_options_.max_subcompactions),
412
1.05k
                max_output_files});
413
414
53
  double mean = subcompactions != 0 ? sum * 1.0 / subcompactions
415
998
                                    : std::numeric_limits<double>::max();
416
417
1.05k
  if (subcompactions > 1) {
418
    // Greedily add ranges to the subcompaction until the sum of the ranges'
419
    // sizes becomes >= the expected mean size of a subcompaction
420
3
    sum = 0;
421
33
    for (size_t i = 0; i < ranges.size() - 1; i++) {
422
30
      sum += ranges[i].size;
423
30
      if (subcompactions == 1) {
424
        // If there's only one left to schedule then it goes to the end so no
425
        // need to put an end boundary
426
0
        continue;
427
0
      }
428
30
      if (sum >= mean) {
429
8
        boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
430
8
        sizes_.emplace_back(sum);
431
8
        subcompactions--;
432
8
        sum = 0;
433
8
      }
434
30
    }
435
3
    sizes_.emplace_back(sum + ranges.back().size);
436
1.04k
  } else {
437
    // Only one range so its size is the total sum of sizes computed above
438
1.04k
    sizes_.emplace_back(sum);
439
1.04k
  }
440
1.05k
}
441
442
10.6k
Result<FileNumbersHolder> CompactionJob::Run() {
443
10.6k
  TEST_SYNC_POINT("CompactionJob::Run():Start");
444
10.6k
  log_buffer_->FlushBufferToLog();
445
10.6k
  LogCompaction();
446
447
10.6k
  const size_t num_threads = compact_->sub_compact_states.size();
448
10.6k
  assert(num_threads > 0);
449
10.6k
  const uint64_t start_micros = env_->NowMicros();
450
451
  // Launch a thread for each of subcompactions 1...num_threads-1
452
10.6k
  std::vector<std::thread> thread_pool;
453
10.6k
  thread_pool.reserve(num_threads - 1);
454
10.6k
  FileNumbersHolder file_numbers_holder(file_numbers_provider_->CreateHolder());
455
10.6k
  file_numbers_holder.Reserve(num_threads);
456
10.7k
  for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
457
8
    thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, &file_numbers_holder,
458
8
                             &compact_->sub_compact_states[i]);
459
8
  }
460
461
  // Always schedule the first subcompaction (whether or not there are also
462
  // others) in the current thread to be efficient with resources
463
10.6k
  ProcessKeyValueCompaction(&file_numbers_holder, &compact_->sub_compact_states[0]);
464
465
  // Wait for all other threads (if there are any) to finish execution
466
8
  for (auto& thread : thread_pool) {
467
8
    thread.join();
468
8
  }
469
470
10.6k
  if (output_directory_ && !db_options_.disableDataSync) {
471
10.2k
    RETURN_NOT_OK(output_directory_->Fsync());
472
10.2k
  }
473
474
10.6k
  compaction_stats_.micros = env_->NowMicros() - start_micros;
475
10.6k
  MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
476
477
  // Check if any thread encountered an error during execution
478
10.6k
  Status status;
479
10.6k
  for (const auto& state : compact_->sub_compact_states) {
480
10.6k
    if (!state.status.ok()) {
481
97
      status = state.status;
482
97
      break;
483
97
    }
484
10.6k
  }
485
486
10.6k
  TablePropertiesCollection tp;
487
10.6k
  for (const auto& state : compact_->sub_compact_states) {
488
22.0k
    for (const auto& output : state.outputs) {
489
22.0k
      auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
490
22.0k
                              output.meta.fd.GetPathId());
491
22.0k
      tp[fn] = output.table_properties;
492
22.0k
    }
493
10.6k
  }
494
10.6k
  compact_->compaction->SetOutputTableProperties(std::move(tp));
495
496
  // Finish up all book-keeping to unify the subcompaction results
497
10.6k
  AggregateStatistics();
498
10.6k
  UpdateCompactionStats();
499
10.6k
  RecordCompactionIOStats();
500
10.6k
  LogFlush(db_options_.info_log);
501
10.6k
  TEST_SYNC_POINT("CompactionJob::Run():End");
502
503
10.6k
  compact_->status = status;
504
10.6k
  return file_numbers_holder;
505
10.6k
}
506
507
10.6k
Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
508
10.6k
  db_mutex_->AssertHeld();
509
10.6k
  Status status = compact_->status;
510
10.6k
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
511
10.6k
  cfd->internal_stats()->AddCompactionStats(
512
10.6k
      compact_->compaction->output_level(), compaction_stats_);
513
514
10.6k
  if (status.ok()) {
515
10.5k
    status = InstallCompactionResults(mutable_cf_options);
516
10.5k
  }
517
10.6k
  VersionStorageInfo::LevelSummaryStorage tmp;
518
10.6k
  auto vstorage = cfd->current()->storage_info();
519
10.6k
  const auto& stats = compaction_stats_;
520
10.6k
  const auto micros = static_cast<double>(std::max<uint64_t>(stats.micros, 1));
521
10.6k
  const auto bytes_read_non_output_levels = static_cast<double>(
522
10.6k
      std::max<uint64_t>(stats.bytes_read_non_output_levels, 1));
523
10.6k
  LOG_TO_BUFFER(
524
10.6k
      log_buffer_,
525
10.6k
      "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
526
10.6k
      "files in(%d, %d) out(%d) "
527
10.6k
      "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
528
10.6k
      "write-amplify(%.1f) %s, records in: %llu, records dropped: %llu\n",
529
10.6k
      cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
530
10.6k
      (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
531
10.6k
          static_cast<double>(stats.micros),
532
10.6k
      stats.bytes_written / static_cast<double>(stats.micros),
533
10.6k
      compact_->compaction->output_level(),
534
10.6k
      stats.num_input_files_in_non_output_levels,
535
10.6k
      stats.num_input_files_in_output_level,
536
10.6k
      stats.num_output_files,
537
10.6k
      stats.bytes_read_non_output_levels / 1048576.0,
538
10.6k
      stats.bytes_read_output_level / 1048576.0,
539
10.6k
      stats.bytes_written / 1048576.0,
540
10.6k
      (stats.bytes_written + stats.bytes_read_output_level + stats.bytes_read_non_output_levels) /
541
10.6k
          bytes_read_non_output_levels,
542
10.6k
      stats.bytes_written / bytes_read_non_output_levels,
543
10.6k
      status.ToString().c_str(), stats.num_input_records,
544
10.6k
      stats.num_dropped_records);
545
546
10.6k
  UpdateCompactionJobStats(stats);
547
548
10.6k
  auto stream = event_logger_->LogToBuffer(log_buffer_);
549
10.6k
  stream << "job" << job_id_
550
10.6k
         << "event" << "compaction_finished"
551
10.6k
         << "compaction_time_micros" << compaction_stats_.micros
552
10.6k
         << "output_level" << compact_->compaction->output_level()
553
10.6k
         << "num_output_files" << compact_->NumOutputFiles()
554
10.6k
         << "total_output_size" << compact_->total_bytes
555
10.6k
         << "num_input_records" << compact_->num_input_records
556
10.6k
         << "num_output_records" << compact_->num_output_records
557
10.6k
         << "num_subcompactions" << compact_->sub_compact_states.size();
558
559
10.6k
  if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
560
48
    stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
561
48
    stream << "file_range_sync_nanos"
562
48
           << compaction_job_stats_->file_range_sync_nanos;
563
48
    stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
564
48
    stream << "file_prepare_write_nanos"
565
48
           << compaction_job_stats_->file_prepare_write_nanos;
566
48
  }
567
568
10.6k
  stream << "lsm_state";
569
10.6k
  stream.StartArray();
570
70.2k
  for (int level = 0; level < vstorage->num_levels(); ++level) {
571
59.5k
    stream << vstorage->NumLevelFiles(level);
572
59.5k
  }
573
10.6k
  stream.EndArray();
574
575
10.6k
  CleanupCompaction();
576
10.6k
  return status;
577
10.6k
}
578
579
void CompactionJob::ProcessKeyValueCompaction(
580
10.7k
    FileNumbersHolder* holder, SubcompactionState* sub_compact) {
581
10.7k
  assert(sub_compact != nullptr);
582
10.7k
  std::unique_ptr<InternalIterator> input(
583
10.7k
      versions_->MakeInputIterator(sub_compact->compaction));
584
585
  // I/O measurement variables
586
10.7k
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
587
10.7k
  const uint64_t kRecordStatsEvery = 1000;
588
10.7k
  uint64_t prev_write_nanos = 0;
589
10.7k
  uint64_t prev_fsync_nanos = 0;
590
10.7k
  uint64_t prev_range_sync_nanos = 0;
591
10.7k
  uint64_t prev_prepare_write_nanos = 0;
592
10.7k
  if (measure_io_stats_) {
593
48
    prev_perf_level = GetPerfLevel();
594
48
    SetPerfLevel(PerfLevel::kEnableTime);
595
48
    prev_write_nanos = IOSTATS(write_nanos);
596
48
    prev_fsync_nanos = IOSTATS(fsync_nanos);
597
48
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
598
48
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
599
48
  }
600
601
10.7k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
602
10.7k
  auto compaction_filter = cfd->ioptions()->compaction_filter;
603
10.7k
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
604
10.7k
  if (compaction_filter == nullptr) {
605
10.6k
    compaction_filter_from_factory =
606
10.6k
        sub_compact->compaction->CreateCompactionFilter();
607
10.6k
    compaction_filter = compaction_filter_from_factory.get();
608
10.6k
  }
609
610
10.7k
  if (compaction_filter) {
611
    // This is used to persist the history cutoff hybrid time chosen for the DocDB compaction
612
    // filter.
613
2.43k
    largest_user_frontier_ = compaction_filter->GetLargestUserFrontier();
614
2.43k
  }
615
616
10.7k
  MergeHelper merge(
617
10.7k
      env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
618
10.7k
      compaction_filter, db_options_.info_log.get(),
619
10.7k
      cfd->ioptions()->min_partial_merge_operands,
620
10.7k
      false /* internal key corruption is expected */,
621
10.3k
      existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
622
10.7k
      compact_->compaction->level(), db_options_.statistics.get());
623
624
10.7k
  TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
625
626
10.7k
  Slice* start = sub_compact->start;
627
10.7k
  Slice* end = sub_compact->end;
628
10.7k
  if (start != nullptr) {
629
8
    IterKey start_iter;
630
8
    start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
631
8
    input->Seek(start_iter.GetKey());
632
10.6k
  } else {
633
10.6k
    input->SeekToFirst();
634
10.6k
  }
635
636
10.7k
  Status status;
637
10.7k
  sub_compact->c_iter.reset(new CompactionIterator(
638
10.7k
      input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
639
10.7k
      &existing_snapshots_, earliest_write_conflict_snapshot_, false,
640
10.7k
      sub_compact->compaction, compaction_filter));
641
10.7k
  auto c_iter = sub_compact->c_iter.get();
642
10.7k
  c_iter->SeekToFirst();
643
10.7k
  const auto& c_iter_stats = c_iter->iter_stats();
644
  // TODO(noetzli): check whether we could check !shutting_down_->... only
645
  // only occasionally (see diff D42687)
646
33.2M
  while (status.ok() && !shutting_down_->load(std::memory_order_acquire) &&
647
33.2M
         !cfd->IsDropped() && c_iter->Valid()) {
648
    // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
649
    // returns true.
650
33.1M
    const Slice& key = c_iter->key();
651
33.1M
    const Slice& value = c_iter->value();
652
653
    // If an end key (exclusive) is specified, check if the current key is
654
    // >= than it and exit if it is because the iterator is out of its range
655
33.1M
    if (end != nullptr &&
656
158k
        cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
657
8
      break;
658
33.1M
    } else if (sub_compact->compaction->ShouldStopBefore(key) &&
659
1.07k
               sub_compact->builder != nullptr) {
660
1.03k
      status = FinishCompactionOutputFile(input->status(), sub_compact);
661
1.03k
      if (!status.ok()) {
662
0
        break;
663
0
      }
664
33.1M
    }
665
666
33.1M
    if (c_iter_stats.num_input_records % kRecordStatsEvery ==
667
31.0k
        kRecordStatsEvery - 1) {
668
31.0k
      RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
669
31.0k
      c_iter->ResetRecordCounts();
670
31.0k
      RecordCompactionIOStats();
671
31.0k
    }
672
673
    // Open output file if necessary
674
33.1M
    if (sub_compact->builder == nullptr) {
675
22.1k
      status = OpenCompactionOutputFile(holder, sub_compact);
676
22.1k
      if (!status.ok()) {
677
15
        break;
678
15
      }
679
33.1M
    }
680
33.1M
    assert(sub_compact->builder != nullptr);
681
33.1M
    assert(sub_compact->current_output() != nullptr);
682
33.1M
    sub_compact->builder->Add(key, value);
683
33.1M
    auto boundaries = MakeFileBoundaryValues(db_options_.boundary_extractor.get(),
684
33.1M
                                             key,
685
33.1M
                                             value);
686
33.1M
    if (!boundaries) {
687
0
      status = std::move(boundaries.status());
688
0
      break;
689
0
    }
690
33.1M
    auto& boundary_values = *boundaries;
691
33.1M
    sub_compact->current_output()->meta.UpdateBoundaries(std::move(boundary_values.key),
692
33.1M
                                                         boundary_values);
693
33.1M
    sub_compact->num_output_records++;
694
695
    // Close output file if it is big enough
696
    // TODO(aekmekji): determine if file should be closed earlier than this
697
    // during subcompactions (i.e. if output size, estimated by input size, is
698
    // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
699
    // and 0.6MB instead of 1MB and 0.2MB)
700
33.1M
    if (sub_compact->builder->TotalFileSize() >=
701
11.3k
        sub_compact->compaction->max_output_file_size()) {
702
11.3k
      status = FinishCompactionOutputFile(input->status(), sub_compact);
703
11.3k
    }
704
705
33.1M
    c_iter->Next();
706
33.1M
  }
707
708
10.7k
  sub_compact->num_input_records = c_iter_stats.num_input_records;
709
10.7k
  sub_compact->compaction_job_stats.num_input_deletion_records =
710
10.7k
      c_iter_stats.num_input_deletion_records;
711
10.7k
  sub_compact->compaction_job_stats.num_corrupt_keys =
712
10.7k
      c_iter_stats.num_input_corrupt_records;
713
10.7k
  sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
714
10.7k
      c_iter_stats.total_input_raw_key_bytes;
715
10.7k
  sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
716
10.7k
      c_iter_stats.total_input_raw_value_bytes;
717
718
10.7k
  RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
719
10.7k
  RecordCompactionIOStats();
720
721
10.7k
  if (status.ok() &&
722
10.6k
      (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
723
46
    status = STATUS(ShutdownInProgress,
724
46
        "Database shutdown or Column family drop during compaction");
725
46
  }
726
10.7k
  if (status.ok() && sub_compact->builder != nullptr) {
727
9.67k
    status = FinishCompactionOutputFile(input->status(), sub_compact);
728
9.67k
  }
729
10.7k
  if (status.ok()) {
730
10.6k
    status = input->status();
731
10.6k
  }
732
733
10.7k
  if (measure_io_stats_) {
734
48
    sub_compact->compaction_job_stats.file_write_nanos +=
735
48
        IOSTATS(write_nanos) - prev_write_nanos;
736
48
    sub_compact->compaction_job_stats.file_fsync_nanos +=
737
48
        IOSTATS(fsync_nanos) - prev_fsync_nanos;
738
48
    sub_compact->compaction_job_stats.file_range_sync_nanos +=
739
48
        IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
740
48
    sub_compact->compaction_job_stats.file_prepare_write_nanos +=
741
48
        IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
742
48
    if (prev_perf_level != PerfLevel::kEnableTime) {
743
48
      SetPerfLevel(prev_perf_level);
744
48
    }
745
48
  }
746
747
10.7k
  sub_compact->c_iter.reset();
748
10.7k
  input.reset();
749
10.7k
  sub_compact->status = status;
750
10.7k
  if (compaction_filter) {
751
2.43k
    compaction_filter->CompactionFinished();
752
2.43k
  }
753
10.7k
}
754
755
void CompactionJob::RecordDroppedKeys(
756
    const CompactionIteratorStats& c_iter_stats,
757
41.7k
    CompactionJobStats* compaction_job_stats) {
758
41.7k
  if (c_iter_stats.num_record_drop_user > 0) {
759
39
    RecordTick(stats_, COMPACTION_KEY_DROP_USER,
760
39
               c_iter_stats.num_record_drop_user);
761
39
  }
762
41.7k
  if (c_iter_stats.num_record_drop_hidden > 0) {
763
10.5k
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
764
10.5k
               c_iter_stats.num_record_drop_hidden);
765
10.5k
    if (compaction_job_stats) {
766
10.5k
      compaction_job_stats->num_records_replaced +=
767
10.5k
          c_iter_stats.num_record_drop_hidden;
768
10.5k
    }
769
10.5k
  }
770
41.7k
  if (c_iter_stats.num_record_drop_obsolete > 0) {
771
3.04k
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
772
3.04k
               c_iter_stats.num_record_drop_obsolete);
773
3.04k
    if (compaction_job_stats) {
774
3.04k
      compaction_job_stats->num_expired_deletion_records +=
775
3.04k
          c_iter_stats.num_record_drop_obsolete;
776
3.04k
    }
777
3.04k
  }
778
41.7k
}
779
780
43.8k
void CompactionJob::CloseFile(Status* status, std::unique_ptr<WritableFileWriter>* writer) {
781
43.8k
  if (status->ok() && !db_options_.disableDataSync) {
782
42.9k
    *status = (*writer)->Sync(db_options_.use_fsync);
783
42.9k
  }
784
43.8k
  if (status->ok()) {
785
43.8k
    *status = (*writer)->Close();
786
43.8k
  }
787
43.8k
  writer->reset();
788
789
43.8k
}
790
791
Status CompactionJob::FinishCompactionOutputFile(
792
22.0k
    const Status& input_status, SubcompactionState* sub_compact) {
793
22.0k
  assert(sub_compact != nullptr);
794
22.0k
  assert(sub_compact->base_outfile);
795
22.0k
  const bool is_split_sst = sub_compact->compaction->column_family_data()->ioptions()
796
22.0k
      ->table_factory->IsSplitSstForWriteSupported();
797
22.0k
  assert((sub_compact->data_outfile != nullptr) == is_split_sst);
798
22.0k
  assert(sub_compact->builder != nullptr);
799
22.0k
  assert(sub_compact->current_output() != nullptr);
800
801
22.0k
  uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
802
22.0k
  assert(output_number != 0);
803
804
22.0k
  TableProperties table_properties;
805
  // Check for iterator errors
806
22.0k
  Status s = input_status;
807
22.0k
  auto meta = &sub_compact->current_output()->meta;
808
22.0k
  const uint64_t current_entries = sub_compact->builder->NumEntries();
809
22.0k
  meta->marked_for_compaction = sub_compact->builder->NeedCompact();
810
22.0k
  if (s.ok()) {
811
22.0k
    s = sub_compact->builder->Finish();
812
2
  } else {
813
2
    sub_compact->builder->Abandon();
814
2
  }
815
816
22.0k
  const uint64_t current_total_bytes = sub_compact->builder->TotalFileSize();
817
22.0k
  meta->fd.total_file_size = current_total_bytes;
818
22.0k
  meta->fd.base_file_size = sub_compact->builder->BaseFileSize();
819
22.0k
  sub_compact->current_output()->finished = true;
820
22.0k
  sub_compact->total_bytes += current_total_bytes;
821
822
  // Finish and check for file errors
823
22.0k
  if (sub_compact->data_outfile) {
824
21.7k
    CloseFile(&s, &sub_compact->data_outfile);
825
21.7k
  }
826
22.0k
  CloseFile(&s, &sub_compact->base_outfile);
827
828
22.0k
  if (s.ok() && current_entries > 0) {
829
    // Verify that the table is usable
830
22.0k
    ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
831
22.0k
    InternalIterator* iter = cfd->table_cache()->NewIterator(
832
22.0k
        ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, meta->UserFilter(),
833
22.0k
        nullptr, cfd->internal_stats()->GetFileReadHist(
834
22.0k
                     compact_->compaction->output_level()),
835
22.0k
        false);
836
22.0k
    s = iter->status();
837
838
22.0k
    if (s.ok() && paranoid_file_checks_) {
839
5
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
840
1
      s = iter->status();
841
1
    }
842
843
22.0k
    delete iter;
844
22.0k
    if (s.ok()) {
845
22.0k
      auto tp = sub_compact->builder->GetTableProperties();
846
22.0k
      sub_compact->current_output()->table_properties =
847
22.0k
          std::make_shared<TableProperties>(tp);
848
22.0k
      TableFileCreationInfo info(std::move(tp));
849
22.0k
      info.db_name = dbname_;
850
22.0k
      info.cf_name = cfd->GetName();
851
22.0k
      info.file_path =
852
22.0k
          TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
853
22.0k
                        meta->fd.GetPathId());
854
22.0k
      info.file_size = meta->fd.GetTotalFileSize();
855
22.0k
      info.job_id = job_id_;
856
22.0k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
857
22.0k
          "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
858
22.0k
          " keys, %" PRIu64 " bytes%s %s",
859
22.0k
          cfd->GetName().c_str(), job_id_, output_number, current_entries,
860
22.0k
          current_total_bytes,
861
22.0k
          meta->marked_for_compaction ? " (need compaction)" : "",
862
22.0k
          meta->FrontiersToString().c_str());
863
22.0k
      EventHelpers::LogAndNotifyTableFileCreation(
864
22.0k
          event_logger_, cfd->ioptions()->listeners, meta->fd, info);
865
22.0k
    }
866
22.0k
  }
867
868
  // Report new file to SstFileManagerImpl
869
22.0k
  auto sfm =
870
22.0k
      static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
871
22.0k
  if (sfm && meta->fd.GetPathId() == 0) {
872
24
    ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
873
24
    auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
874
24
                            meta->fd.GetPathId());
875
24
    RETURN_NOT_OK(sfm->OnAddFile(fn));
876
24
    if (is_split_sst) {
877
24
      RETURN_NOT_OK(sfm->OnAddFile(TableBaseToDataFileName(fn)));
878
24
    }
879
24
    if (sfm->IsMaxAllowedSpaceReached()) {
880
4
      InstrumentedMutexLock l(db_mutex_);
881
4
      if (db_bg_error_->ok()) {
882
4
        s = STATUS(IOError, "Max allowed space was reached");
883
4
        *db_bg_error_ = s;
884
4
        TEST_SYNC_POINT(
885
4
            "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
886
4
      }
887
4
    }
888
24
  }
889
890
22.0k
  sub_compact->builder.reset();
891
22.0k
  return s;
892
22.0k
}
893
894
Status CompactionJob::InstallCompactionResults(
895
10.5k
    const MutableCFOptions& mutable_cf_options) {
896
10.5k
  db_mutex_->AssertHeld();
897
898
10.5k
  auto* compaction = compact_->compaction;
899
  // paranoia: verify that the files that we started with
900
  // still exist in the current version and in the same original level.
901
  // This ensures that a concurrent compaction did not erroneously
902
  // pick the same files to compact_.
903
10.5k
  if (!versions_->VerifyCompactionFileConsistency(compaction)) {
904
0
    Compaction::InputLevelSummaryBuffer inputs_summary;
905
906
0
    RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
907
0
        "[%s] [JOB %d] Compaction %s aborted",
908
0
        compaction->column_family_data()->GetName().c_str(), job_id_,
909
0
        compaction->InputLevelSummary(&inputs_summary));
910
0
    return STATUS(Corruption, "Compaction input files inconsistent");
911
0
  }
912
913
10.5k
  {
914
10.5k
    Compaction::InputLevelSummaryBuffer inputs_summary;
915
10.5k
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
916
10.5k
        "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
917
10.5k
        compaction->column_family_data()->GetName().c_str(), job_id_,
918
10.5k
        compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
919
10.5k
  }
920
921
  // Add compaction outputs
922
10.5k
  compaction->AddInputDeletions(compaction->edit());
923
924
10.6k
  for (const auto& sub_compact : compact_->sub_compact_states) {
925
21.8k
    for (const auto& out : sub_compact.outputs) {
926
21.8k
      compaction->edit()->AddFile(compaction->output_level(), out.meta);
927
21.8k
    }
928
10.6k
  }
929
10.5k
  if (largest_user_frontier_) {
930
433
    compaction->edit()->UpdateFlushedFrontier(largest_user_frontier_);
931
433
  }
932
10.5k
  return versions_->LogAndApply(compaction->column_family_data(),
933
10.5k
                                mutable_cf_options, compaction->edit(),
934
10.5k
                                db_mutex_, db_directory_);
935
10.5k
}
936
937
52.4k
void CompactionJob::RecordCompactionIOStats() {
938
52.4k
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
939
52.4k
  IOSTATS_RESET(bytes_read);
940
52.4k
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
941
52.4k
  IOSTATS_RESET(bytes_written);
942
52.4k
}
943
944
Status CompactionJob::OpenFile(const std::string table_name, uint64_t file_number,
945
    const std::string file_type_label, const std::string fname,
946
44.2k
    std::unique_ptr<WritableFile>* writable_file) {
947
44.2k
  Status s = NewWritableFile(env_, fname, writable_file, env_options_);
948
44.2k
  if (!s.ok()) {
949
15
    RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
950
15
        "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
951
15
        " fails at NewWritableFile for %s file with status %s", table_name.c_str(),
952
15
        job_id_, file_number, file_type_label.c_str(), s.ToString().c_str());
953
15
    LogFlush(db_options_.info_log);
954
15
  }
955
44.2k
  return s;
956
44.2k
}
957
958
Status CompactionJob::OpenCompactionOutputFile(
959
22.1k
    FileNumbersHolder* holder, SubcompactionState* sub_compact) {
960
22.1k
  assert(sub_compact != nullptr);
961
22.1k
  assert(sub_compact->builder == nullptr);
962
22.1k
  FileNumber file_number = file_numbers_provider_->NewFileNumber(holder);
963
964
  // Make the output file
965
22.1k
  unique_ptr<WritableFile> base_writable_file;
966
22.1k
  unique_ptr<WritableFile> data_writable_file;
967
22.1k
  const std::string base_fname = TableFileName(db_options_.db_paths, file_number,
968
22.1k
                                    sub_compact->compaction->output_path_id());
969
22.1k
  const std::string data_fname = TableBaseToDataFileName(base_fname);
970
22.1k
  const std::string table_name = sub_compact->compaction->column_family_data()->GetName();
971
22.1k
  RETURN_NOT_OK(OpenFile(table_name, file_number, "base", base_fname, &base_writable_file));
972
22.0k
  RETURN_NOT_OK(OpenFile(table_name, file_number, "data", data_fname, &data_writable_file));
973
974
22.0k
  SubcompactionState::Output out;
975
22.0k
  out.meta.fd =
976
22.0k
      FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0, 0);
977
  // Update sequence number boundaries for out.
978
84.9k
  for (size_t level_idx = 0; level_idx < compact_->compaction->num_input_levels(); level_idx++) {
979
225k
    for (FileMetaData *fmd : *compact_->compaction->inputs(level_idx) ) {
980
225k
      out.meta.UpdateBoundariesExceptKey(fmd->smallest, UpdateBoundariesType::kSmallest);
981
225k
      out.meta.UpdateBoundariesExceptKey(fmd->largest, UpdateBoundariesType::kLargest);
982
225k
    }
983
62.8k
  }
984
22.0k
  out.finished = false;
985
986
22.0k
  sub_compact->outputs.push_back(out);
987
988
22.0k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
989
990
22.0k
  {
991
22.0k
    auto setup_outfile = [this, sub_compact] (
992
22.0k
        size_t preallocation_block_size, std::unique_ptr<WritableFile>* writable_file,
993
43.9k
        std::unique_ptr<WritableFileWriter>* writer) {
994
43.9k
      (*writable_file)->SetIOPriority(Env::IO_LOW);
995
43.9k
      if (preallocation_block_size > 0) {
996
22.0k
        (*writable_file)->SetPreallocationBlockSize(preallocation_block_size);
997
22.0k
      }
998
43.9k
      writer->reset(new WritableFileWriter(
999
43.9k
          std::move(*writable_file), env_options_, sub_compact->compaction->suspender()));
1000
43.9k
    };
1001
1002
22.0k
    const bool is_split_sst = cfd->ioptions()->table_factory->IsSplitSstForWriteSupported();
1003
22.0k
    const size_t preallocation_data_block_size = static_cast<size_t>(
1004
22.0k
        sub_compact->compaction->OutputFilePreallocationSize());
1005
    // if we don't have separate data file - preallocate size for base file
1006
22.0k
    setup_outfile(
1007
21.8k
        is_split_sst ? 0 : preallocation_data_block_size, &base_writable_file,
1008
22.0k
        &sub_compact->base_outfile);
1009
22.0k
    if (is_split_sst) {
1010
21.8k
      setup_outfile(preallocation_data_block_size, &data_writable_file, &sub_compact->data_outfile);
1011
21.8k
    }
1012
22.0k
  }
1013
1014
  // If the Column family flag is to only optimize filters for hits,
1015
  // we can skip creating filters if this is the bottommost_level where
1016
  // data is going to be found
1017
22.0k
  bool skip_filters =
1018
22.0k
      cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
1019
22.0k
  sub_compact->builder.reset(NewTableBuilder(
1020
22.0k
      *cfd->ioptions(), cfd->internal_comparator(),
1021
22.0k
      cfd->int_tbl_prop_collector_factories(), cfd->GetID(),
1022
22.0k
      sub_compact->base_outfile.get(), sub_compact->data_outfile.get(),
1023
22.0k
      sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts,
1024
22.0k
      skip_filters));
1025
22.0k
  LogFlush(db_options_.info_log);
1026
22.0k
  return Status::OK();
1027
22.0k
}
1028
1029
10.6k
void CompactionJob::CleanupCompaction() {
1030
10.6k
  for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1031
10.6k
    const auto& sub_status = sub_compact.status;
1032
1033
10.6k
    if (sub_compact.builder != nullptr) {
1034
      // May happen if we get a shutdown call in the middle of compaction
1035
18
      sub_compact.builder->Abandon();
1036
18
      sub_compact.builder.reset();
1037
10.6k
    } else if (sub_status.ok() &&
1038
10.6k
        (sub_compact.base_outfile != nullptr || sub_compact.data_outfile != nullptr)) {
1039
0
      std::string log_message;
1040
0
      log_message.append("sub_status.ok(), but: sub_compact.base_outfile ");
1041
0
      log_message.append(sub_compact.base_outfile == nullptr ? "==" : "!=");
1042
0
      log_message.append(" nullptr, sub_compact.data_outfile ");
1043
0
      log_message.append(sub_compact.data_outfile == nullptr ? "==" : "!=");
1044
0
      log_message.append(" nullptr");
1045
0
      RLOG(InfoLogLevel::FATAL_LEVEL, db_options_.info_log, log_message.c_str());
1046
0
      assert(!"If sub_status is OK, sub_compact.*_outfile should be nullptr");
1047
0
    }
1048
22.0k
    for (const auto& out : sub_compact.outputs) {
1049
      // If this file was inserted into the table cache then remove
1050
      // them here because this compaction was not committed.
1051
22.0k
      if (!sub_status.ok()) {
1052
198
        TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
1053
198
      }
1054
22.0k
    }
1055
10.6k
  }
1056
10.6k
  delete compact_;
1057
10.6k
  compact_ = nullptr;
1058
10.6k
}
1059
1060
#ifndef ROCKSDB_LITE
1061
namespace {
1062
void CopyPrefix(
1063
20.9k
    const Slice& src, size_t prefix_length, std::string* dst) {
1064
20.9k
  assert(prefix_length > 0);
1065
12.0k
  size_t length = src.size() > prefix_length ? prefix_length : src.size();
1066
20.9k
  dst->assign(src.cdata(), length);
1067
20.9k
}
1068
}  // namespace
1069
1070
#endif  // !ROCKSDB_LITE
1071
1072
10.6k
void CompactionJob::UpdateCompactionStats() {
1073
10.6k
  Compaction* compaction = compact_->compaction;
1074
10.6k
  compaction_stats_.num_input_files_in_non_output_levels = 0;
1075
10.6k
  compaction_stats_.num_input_files_in_output_level = 0;
1076
10.6k
  for (int input_level = 0;
1077
41.8k
       input_level < static_cast<int>(compaction->num_input_levels());
1078
31.1k
       ++input_level) {
1079
31.1k
    if (compaction->start_level() + input_level
1080
21.4k
        != compaction->output_level()) {
1081
21.4k
      UpdateCompactionInputStatsHelper(
1082
21.4k
          &compaction_stats_.num_input_files_in_non_output_levels,
1083
21.4k
          &compaction_stats_.bytes_read_non_output_levels,
1084
21.4k
          input_level);
1085
9.70k
    } else {
1086
9.70k
      UpdateCompactionInputStatsHelper(
1087
9.70k
          &compaction_stats_.num_input_files_in_output_level,
1088
9.70k
          &compaction_stats_.bytes_read_output_level,
1089
9.70k
          input_level);
1090
9.70k
    }
1091
31.1k
  }
1092
1093
10.6k
  for (const auto& sub_compact : compact_->sub_compact_states) {
1094
10.6k
    size_t num_output_files = sub_compact.outputs.size();
1095
10.6k
    if (sub_compact.builder != nullptr) {
1096
      // An error occurred so ignore the last output.
1097
18
      assert(num_output_files > 0);
1098
18
      --num_output_files;
1099
18
    }
1100
10.6k
    compaction_stats_.num_output_files += static_cast<int>(num_output_files);
1101
1102
22.0k
    for (const auto& out : sub_compact.outputs) {
1103
22.0k
      compaction_stats_.bytes_written += out.meta.fd.total_file_size;
1104
22.0k
    }
1105
10.6k
    if (sub_compact.num_input_records > sub_compact.num_output_records) {
1106
4.79k
      compaction_stats_.num_dropped_records +=
1107
4.79k
          sub_compact.num_input_records - sub_compact.num_output_records;
1108
4.79k
    }
1109
10.6k
  }
1110
10.6k
}
1111
1112
void CompactionJob::UpdateCompactionInputStatsHelper(
1113
31.1k
    int* num_files, uint64_t* bytes_read, int input_level) {
1114
31.1k
  const Compaction* compaction = compact_->compaction;
1115
31.1k
  auto num_input_files = compaction->num_input_files(input_level);
1116
31.1k
  *num_files += static_cast<int>(num_input_files);
1117
1118
76.6k
  for (size_t i = 0; i < num_input_files; ++i) {
1119
45.5k
    const auto* file_meta = compaction->input(input_level, i);
1120
45.5k
    *bytes_read += file_meta->fd.GetTotalFileSize();
1121
45.5k
    compaction_stats_.num_input_records +=
1122
45.5k
        static_cast<uint64_t>(file_meta->num_entries);
1123
45.5k
  }
1124
31.1k
}
1125
1126
void CompactionJob::UpdateCompactionJobStats(
1127
10.6k
    const InternalStats::CompactionStats& stats) const {
1128
10.6k
#ifndef ROCKSDB_LITE
1129
10.6k
  if (compaction_job_stats_) {
1130
10.6k
    compaction_job_stats_->elapsed_micros = stats.micros;
1131
1132
    // input information
1133
10.6k
    compaction_job_stats_->total_input_bytes =
1134
10.6k
        stats.bytes_read_non_output_levels +
1135
10.6k
        stats.bytes_read_output_level;
1136
10.6k
    compaction_job_stats_->num_input_records =
1137
10.6k
        compact_->num_input_records;
1138
10.6k
    compaction_job_stats_->num_input_files =
1139
10.6k
        stats.num_input_files_in_non_output_levels +
1140
10.6k
        stats.num_input_files_in_output_level;
1141
10.6k
    compaction_job_stats_->num_input_files_at_output_level =
1142
10.6k
        stats.num_input_files_in_output_level;
1143
1144
    // output information
1145
10.6k
    compaction_job_stats_->total_output_bytes = stats.bytes_written;
1146
10.6k
    compaction_job_stats_->num_output_records =
1147
10.6k
        compact_->num_output_records;
1148
10.6k
    compaction_job_stats_->num_output_files = stats.num_output_files;
1149
1150
10.6k
    if (compact_->NumOutputFiles() > 0U) {
1151
10.4k
      CopyPrefix(
1152
10.4k
          compact_->SmallestUserKey(),
1153
10.4k
          CompactionJobStats::kMaxPrefixLength,
1154
10.4k
          &compaction_job_stats_->smallest_output_key_prefix);
1155
10.4k
      CopyPrefix(
1156
10.4k
          compact_->LargestUserKey(),
1157
10.4k
          CompactionJobStats::kMaxPrefixLength,
1158
10.4k
          &compaction_job_stats_->largest_output_key_prefix);
1159
10.4k
    }
1160
10.6k
  }
1161
10.6k
#endif  // !ROCKSDB_LITE
1162
10.6k
}
1163
1164
10.6k
void CompactionJob::LogCompaction() {
1165
10.6k
  Compaction* compaction = compact_->compaction;
1166
10.6k
  ColumnFamilyData* cfd = compaction->column_family_data();
1167
1168
  // Let's check if anything will get logged. Don't prepare all the info if
1169
  // we're not logging
1170
10.6k
  if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
1171
10.6k
    Compaction::InputLevelSummaryBuffer inputs_summary;
1172
10.6k
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
1173
10.6k
        "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
1174
10.6k
        job_id_, compaction->InputLevelSummary(&inputs_summary),
1175
10.6k
        compaction->score());
1176
10.6k
    char scratch[2345];
1177
10.6k
    compaction->Summary(scratch, sizeof(scratch));
1178
10.6k
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
1179
10.6k
        "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
1180
    // build event logger report
1181
10.6k
    auto stream = event_logger_->Log();
1182
10.6k
    stream << "job" << job_id_ << "event"
1183
10.6k
           << "compaction_started";
1184
41.8k
    for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
1185
31.1k
      stream << ("files_L" + ToString(compaction->level(i)));
1186
31.1k
      stream.StartArray();
1187
45.6k
      for (auto f : *compaction->inputs(i)) {
1188
45.6k
        stream << f->fd.GetNumber();
1189
45.6k
      }
1190
31.1k
      stream.EndArray();
1191
31.1k
    }
1192
10.6k
    stream << "score" << compaction->score() << "input_data_size"
1193
10.6k
           << compaction->CalculateTotalInputSize();
1194
10.6k
  }
1195
10.6k
}
1196
1197
}  // namespace rocksdb