/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_job.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/compaction_job.h" |
25 | | |
26 | | #ifndef __STDC_FORMAT_MACROS |
27 | | #define __STDC_FORMAT_MACROS |
28 | | #endif |
29 | | |
30 | | #include <inttypes.h> |
31 | | #include <stdint.h> |
32 | | |
33 | | #include <algorithm> |
34 | | #include <cmath> |
35 | | #include <functional> |
36 | | #include <list> |
37 | | #include <memory> |
38 | | #include <set> |
39 | | #include <string> |
40 | | #include <thread> |
41 | | #include <utility> |
42 | | #include <vector> |
43 | | |
44 | | #include "yb/rocksdb/db/builder.h" |
45 | | #include "yb/rocksdb/db/dbformat.h" |
46 | | #include "yb/rocksdb/db/event_helpers.h" |
47 | | #include "yb/rocksdb/db/filename.h" |
48 | | #include "yb/rocksdb/db/file_numbers.h" |
49 | | #include "yb/rocksdb/db/log_reader.h" |
50 | | #include "yb/rocksdb/db/log_writer.h" |
51 | | #include "yb/rocksdb/db/memtable.h" |
52 | | #include "yb/rocksdb/db/memtable_list.h" |
53 | | #include "yb/rocksdb/db/merge_helper.h" |
54 | | #include "yb/rocksdb/db/version_set.h" |
55 | | #include "yb/rocksdb/port/likely.h" |
56 | | #include "yb/rocksdb/port/port.h" |
57 | | #include "yb/rocksdb/db.h" |
58 | | #include "yb/rocksdb/env.h" |
59 | | #include "yb/rocksdb/statistics.h" |
60 | | #include "yb/rocksdb/status.h" |
61 | | #include "yb/rocksdb/table.h" |
62 | | #include "yb/rocksdb/table/internal_iterator.h" |
63 | | #include "yb/rocksdb/table/table_builder.h" |
64 | | #include "yb/rocksdb/util/coding.h" |
65 | | #include "yb/rocksdb/util/file_reader_writer.h" |
66 | | #include "yb/rocksdb/util/log_buffer.h" |
67 | | #include "yb/rocksdb/util/logging.h" |
68 | | #include "yb/rocksdb/util/sst_file_manager_impl.h" |
69 | | #include "yb/rocksdb/util/mutexlock.h" |
70 | | #include "yb/rocksdb/perf_level.h" |
71 | | #include "yb/rocksdb/util/stop_watch.h" |
72 | | #include "yb/util/stats/perf_step_timer.h" |
73 | | #include "yb/rocksdb/util/sync_point.h" |
74 | | |
75 | | #include "yb/util/result.h" |
76 | | #include "yb/util/stats/iostats_context_imp.h" |
77 | | #include "yb/util/string_util.h" |
78 | | |
79 | | namespace rocksdb { |
80 | | |
81 | | // Maintains state for each sub-compaction |
82 | | struct CompactionJob::SubcompactionState { |
83 | | Compaction* compaction; |
84 | | std::unique_ptr<CompactionIterator> c_iter; |
85 | | |
86 | | // The boundaries of the key-range this compaction is interested in. No two |
87 | | // subcompactions may have overlapping key-ranges. |
88 | | // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded |
89 | | Slice *start, *end; |
90 | | |
91 | | // The return status of this subcompaction |
92 | | Status status; |
93 | | |
94 | | // Files produced by this subcompaction |
95 | | struct Output { |
96 | | FileMetaData meta; |
97 | | bool finished; |
98 | | std::shared_ptr<const TableProperties> table_properties; |
99 | | }; |
100 | | |
101 | | // State kept for output being generated |
102 | | std::vector<Output> outputs; |
103 | | std::unique_ptr<WritableFileWriter> base_outfile; |
104 | | std::unique_ptr<WritableFileWriter> data_outfile; |
105 | | std::unique_ptr<TableBuilder> builder; |
106 | 94.4M | Output* current_output() { |
107 | 94.4M | if (outputs.empty()) { |
108 | | // This subcompaction's outptut could be empty if compaction was aborted |
109 | | // before this subcompaction had a chance to generate any output files. |
110 | | // When subcompactions are executed sequentially this is more likely and |
111 | | // will be particulalry likely for the later subcompactions to be empty. |
112 | | // Once they are run in parallel however it should be much rarer. |
113 | 0 | return nullptr; |
114 | 94.4M | } else { |
115 | 94.4M | return &outputs.back(); |
116 | 94.4M | } |
117 | 94.4M | } |
118 | | |
119 | | // State during the subcompaction |
120 | | uint64_t total_bytes; |
121 | | uint64_t num_input_records; |
122 | | uint64_t num_output_records; |
123 | | CompactionJobStats compaction_job_stats; |
124 | | uint64_t approx_size; |
125 | | |
126 | | SubcompactionState(Compaction* c, Slice* _start, Slice* _end, |
127 | | uint64_t size = 0) |
128 | | : compaction(c), |
129 | | start(_start), |
130 | | end(_end), |
131 | | base_outfile(nullptr), |
132 | | data_outfile(nullptr), |
133 | | builder(nullptr), |
134 | | total_bytes(0), |
135 | | num_input_records(0), |
136 | | num_output_records(0), |
137 | 11.4k | approx_size(size) { |
138 | 11.4k | assert(compaction != nullptr); |
139 | 11.4k | } |
140 | | |
141 | 9 | SubcompactionState(SubcompactionState&& o) { *this = std::move(o); } |
142 | | |
143 | 9 | SubcompactionState& operator=(SubcompactionState&& o) { |
144 | 9 | compaction = std::move(o.compaction); |
145 | 9 | start = std::move(o.start); |
146 | 9 | end = std::move(o.end); |
147 | 9 | status = std::move(o.status); |
148 | 9 | outputs = std::move(o.outputs); |
149 | 9 | base_outfile = std::move(o.base_outfile); |
150 | 9 | data_outfile = std::move(o.data_outfile); |
151 | 9 | builder = std::move(o.builder); |
152 | 9 | total_bytes = std::move(o.total_bytes); |
153 | 9 | num_input_records = std::move(o.num_input_records); |
154 | 9 | num_output_records = std::move(o.num_output_records); |
155 | 9 | compaction_job_stats = std::move(o.compaction_job_stats); |
156 | 9 | approx_size = std::move(o.approx_size); |
157 | 9 | return *this; |
158 | 9 | } |
159 | | |
160 | | // Because member unique_ptrs do not have these. |
161 | | SubcompactionState(const SubcompactionState&) = delete; |
162 | | |
163 | | SubcompactionState& operator=(const SubcompactionState&) = delete; |
164 | | }; |
165 | | |
166 | | // Maintains state for the entire compaction |
167 | | struct CompactionJob::CompactionState { |
168 | | Compaction* const compaction; |
169 | | |
170 | | // REQUIRED: subcompaction states are stored in order of increasing |
171 | | // key-range |
172 | | std::vector<CompactionJob::SubcompactionState> sub_compact_states; |
173 | | Status status; |
174 | | |
175 | | uint64_t total_bytes; |
176 | | uint64_t num_input_records; |
177 | | uint64_t num_output_records; |
178 | | |
179 | | explicit CompactionState(Compaction* c) |
180 | | : compaction(c), |
181 | | total_bytes(0), |
182 | | num_input_records(0), |
183 | 11.4k | num_output_records(0) {} |
184 | | |
185 | 22.7k | size_t NumOutputFiles() { |
186 | 22.7k | size_t total = 0; |
187 | 22.7k | for (auto& s : sub_compact_states) { |
188 | 22.7k | total += s.outputs.size(); |
189 | 22.7k | } |
190 | 22.7k | return total; |
191 | 22.7k | } |
192 | | |
193 | 11.1k | Slice SmallestUserKey() { |
194 | 11.1k | for (const auto& sub_compact_state : sub_compact_states) { |
195 | 11.1k | if (!sub_compact_state.outputs.empty() && |
196 | 11.1k | sub_compact_state.outputs[0].finished) { |
197 | 11.1k | return sub_compact_state.outputs[0].meta.smallest.key.user_key(); |
198 | 11.1k | } |
199 | 11.1k | } |
200 | | // If there is no finished output, return an empty slice. |
201 | 27 | return Slice(); |
202 | 11.1k | } |
203 | | |
204 | 11.1k | Slice LargestUserKey() { |
205 | 11.2k | for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend(); |
206 | 11.1k | ++it32 ) { |
207 | 11.1k | if (!it->outputs.empty() && it->current_output()->finished) { |
208 | 11.1k | assert(it->current_output() != nullptr); |
209 | 0 | return it->current_output()->meta.largest.key.user_key(); |
210 | 11.1k | } |
211 | 11.1k | } |
212 | | // If there is no finished output, return an empty slice. |
213 | 32 | return Slice(); |
214 | 11.1k | } |
215 | | }; |
216 | | |
217 | 11.4k | void CompactionJob::AggregateStatistics() { |
218 | 11.4k | for (SubcompactionState& sc : compact_->sub_compact_states) { |
219 | 11.4k | compact_->total_bytes += sc.total_bytes; |
220 | 11.4k | compact_->num_input_records += sc.num_input_records; |
221 | 11.4k | compact_->num_output_records += sc.num_output_records; |
222 | 11.4k | } |
223 | 11.4k | if (compaction_job_stats_) { |
224 | 11.3k | for (SubcompactionState& sc : compact_->sub_compact_states) { |
225 | 11.3k | compaction_job_stats_->Add(sc.compaction_job_stats); |
226 | 11.3k | } |
227 | 11.3k | } |
228 | 11.4k | } |
229 | | |
230 | | CompactionJob::CompactionJob( |
231 | | int job_id, Compaction* compaction, const DBOptions& db_options, |
232 | | const EnvOptions& env_options, VersionSet* versions, |
233 | | std::atomic<bool>* shutting_down, LogBuffer* log_buffer, |
234 | | Directory* db_directory, Directory* output_directory, Statistics* stats, |
235 | | InstrumentedMutex* db_mutex, Status* db_bg_error, |
236 | | std::vector<SequenceNumber> existing_snapshots, |
237 | | SequenceNumber earliest_write_conflict_snapshot, |
238 | | FileNumbersProvider* file_numbers_provider, |
239 | | std::shared_ptr<Cache> table_cache, EventLogger* event_logger, |
240 | | bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, |
241 | | CompactionJobStats* compaction_job_stats) |
242 | | : job_id_(job_id), |
243 | | compact_(new CompactionState(compaction)), |
244 | | compaction_job_stats_(compaction_job_stats), |
245 | | compaction_stats_(1), |
246 | | dbname_(dbname), |
247 | | db_options_(db_options), |
248 | | env_options_(env_options), |
249 | | env_(db_options.env), |
250 | | versions_(versions), |
251 | | shutting_down_(shutting_down), |
252 | | log_buffer_(log_buffer), |
253 | | db_directory_(db_directory), |
254 | | output_directory_(output_directory), |
255 | | stats_(stats), |
256 | | db_mutex_(db_mutex), |
257 | | db_bg_error_(db_bg_error), |
258 | | existing_snapshots_(std::move(existing_snapshots)), |
259 | | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), |
260 | | file_numbers_provider_(file_numbers_provider), |
261 | | table_cache_(std::move(table_cache)), |
262 | | event_logger_(event_logger), |
263 | | paranoid_file_checks_(paranoid_file_checks), |
264 | 11.4k | measure_io_stats_(measure_io_stats) { |
265 | 11.4k | assert(log_buffer_ != nullptr); |
266 | 0 | const auto* cfd = compact_->compaction->column_family_data(); |
267 | 11.4k | ReportStartedCompaction(compaction); |
268 | 11.4k | } |
269 | | |
270 | 11.4k | CompactionJob::~CompactionJob() { |
271 | 11.4k | assert(compact_ == nullptr); |
272 | 11.4k | } |
273 | | |
274 | | void CompactionJob::ReportStartedCompaction( |
275 | 11.4k | Compaction* compaction) { |
276 | 11.4k | const auto* cfd = compact_->compaction->column_family_data(); |
277 | | |
278 | | // In the current design, a CompactionJob is always created |
279 | | // for non-trivial compaction. |
280 | 11.4k | assert(compaction->IsTrivialMove() == false || |
281 | 11.4k | compaction->is_manual_compaction() == true); |
282 | | |
283 | 11.4k | IOSTATS_RESET(bytes_written); |
284 | 11.4k | IOSTATS_RESET(bytes_read); |
285 | | |
286 | 11.4k | if (compaction_job_stats_) { |
287 | 11.3k | compaction_job_stats_->is_manual_compaction = |
288 | 11.3k | compaction->is_manual_compaction(); |
289 | 11.3k | } |
290 | 11.4k | } |
291 | | |
292 | 11.4k | void CompactionJob::Prepare() { |
293 | | // Generate file_levels_ for compaction berfore making Iterator |
294 | 11.4k | auto* c = compact_->compaction; |
295 | 11.4k | assert(c->column_family_data() != nullptr); |
296 | 0 | assert(c->column_family_data()->current()->storage_info() |
297 | 11.4k | ->NumLevelFiles(compact_->compaction->level()) > 0); |
298 | | |
299 | | // Is this compaction producing files at the bottommost level? |
300 | 0 | bottommost_level_ = c->bottommost_level(); |
301 | | |
302 | 11.4k | if (c->ShouldFormSubcompactions()) { |
303 | 1.06k | const uint64_t start_micros = env_->NowMicros(); |
304 | 1.06k | GenSubcompactionBoundaries(); |
305 | 1.06k | assert(sizes_.size() == boundaries_.size() + 1); |
306 | | |
307 | 2.14k | for (size_t i = 0; i <= boundaries_.size(); i++1.07k ) { |
308 | 1.07k | Slice* start = i == 0 ? nullptr1.06k : &boundaries_[i - 1]8 ; |
309 | 1.07k | Slice* end = i == boundaries_.size() ? nullptr1.06k : &boundaries_[i]8 ; |
310 | 1.07k | compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]); |
311 | 1.07k | } |
312 | 10.3k | } else { |
313 | 10.3k | compact_->sub_compact_states.emplace_back(c, nullptr, nullptr); |
314 | 10.3k | } |
315 | 11.4k | } |
316 | | |
317 | | struct RangeWithSize { |
318 | | Range range; |
319 | | uint64_t size; |
320 | | |
321 | | RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0) |
322 | 129 | : range(a, b), size(s) {} |
323 | | }; |
324 | | |
325 | | // Generates a histogram representing potential divisions of key ranges from |
326 | | // the input. It adds the starting and/or ending keys of certain input files |
327 | | // to the working set and then finds the approximate size of data in between |
328 | | // each consecutive pair of slices. Then it divides these ranges into |
329 | | // consecutive groups such that each group has a similar size. |
330 | 1.06k | void CompactionJob::GenSubcompactionBoundaries() { |
331 | 1.06k | auto* c = compact_->compaction; |
332 | 1.06k | auto* cfd = c->column_family_data(); |
333 | 1.06k | const Comparator* cfd_comparator = cfd->user_comparator(); |
334 | 1.06k | std::vector<Slice> bounds; |
335 | 1.06k | int start_lvl = c->start_level(); |
336 | 1.06k | int out_lvl = c->output_level(); |
337 | | |
338 | | // Add the starting and/or ending key of certain input files as a potential |
339 | | // boundary |
340 | 8.22k | for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++7.15k ) { |
341 | 7.15k | int lvl = c->level(lvl_idx); |
342 | 7.15k | if (lvl >= start_lvl && lvl <= out_lvl) { |
343 | 6.41k | const LevelFilesBrief* flevel = c->input_levels(lvl_idx); |
344 | 6.41k | size_t num_files = flevel->num_files; |
345 | | |
346 | 6.41k | if (num_files == 0) { |
347 | 4.30k | continue; |
348 | 4.30k | } |
349 | | |
350 | 2.11k | if (lvl == 0) { |
351 | | // For level 0 add the starting and ending key of each file since the |
352 | | // files may have greatly differing key ranges (not range-partitioned) |
353 | 3.48k | for (size_t i = 0; i < num_files; i++2.42k ) { |
354 | 2.42k | bounds.emplace_back(flevel->files[i].smallest.key); |
355 | 2.42k | bounds.emplace_back(flevel->files[i].largest.key); |
356 | 2.42k | } |
357 | 1.06k | } else { |
358 | | // For all other levels add the smallest/largest key in the level to |
359 | | // encompass the range covered by that level |
360 | 1.04k | bounds.emplace_back(flevel->files[0].smallest.key); |
361 | 1.04k | bounds.emplace_back(flevel->files[num_files - 1].largest.key); |
362 | 1.04k | if (lvl == out_lvl) { |
363 | | // For the last level include the starting keys of all files since |
364 | | // the last level is the largest and probably has the widest key |
365 | | // range. Since it's range partitioned, the ending key of one file |
366 | | // and the starting key of the next are very close (or identical). |
367 | 1.02k | for (size_t i = 1; i < num_files; i++21 ) { |
368 | 21 | bounds.emplace_back(flevel->files[i].smallest.key); |
369 | 21 | } |
370 | 1.00k | } |
371 | 1.04k | } |
372 | 2.11k | } |
373 | 7.15k | } |
374 | | |
375 | 1.06k | std::sort(bounds.begin(), bounds.end(), |
376 | 6.26k | [cfd_comparator] (const Slice& a, const Slice& b) -> bool { |
377 | 6.26k | return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0; |
378 | 6.26k | }); |
379 | | // Remove duplicated entries from bounds |
380 | 1.06k | bounds.erase(std::unique(bounds.begin(), bounds.end(), |
381 | 5.89k | [cfd_comparator] (const Slice& a, const Slice& b) -> bool { |
382 | 5.89k | return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) == 0; |
383 | 5.89k | }), bounds.end()); |
384 | | |
385 | | // Combine consecutive pairs of boundaries into ranges with an approximate |
386 | | // size of data covered by keys in that range |
387 | 1.06k | uint64_t sum = 0; |
388 | 1.06k | std::vector<RangeWithSize> ranges; |
389 | 1.06k | auto* v = cfd->current(); |
390 | 1.19k | for (auto it = bounds.begin();;) { |
391 | 1.19k | const Slice a = *it; |
392 | 1.19k | it++; |
393 | | |
394 | 1.19k | if (it == bounds.end()) { |
395 | 1.06k | break; |
396 | 1.06k | } |
397 | | |
398 | 129 | const Slice b = *it; |
399 | 129 | uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1); |
400 | 129 | ranges.emplace_back(a, b, size); |
401 | 129 | sum += size; |
402 | 129 | } |
403 | | |
404 | | // Group the ranges into subcompactions |
405 | 1.06k | const double min_file_fill_percent = 4.0 / 5; |
406 | 1.06k | uint64_t max_output_files = static_cast<uint64_t>(std::ceil( |
407 | 1.06k | sum / min_file_fill_percent / |
408 | 1.06k | cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel(out_lvl))); |
409 | 1.06k | uint64_t subcompactions = |
410 | 1.06k | std::min({static_cast<uint64_t>(ranges.size()), |
411 | 1.06k | static_cast<uint64_t>(db_options_.max_subcompactions), |
412 | 1.06k | max_output_files}); |
413 | | |
414 | 1.06k | double mean = subcompactions != 0 ? sum * 1.0 / subcompactions52 |
415 | 1.06k | : std::numeric_limits<double>::max()1.01k ; |
416 | | |
417 | 1.06k | if (subcompactions > 1) { |
418 | | // Greedily add ranges to the subcompaction until the sum of the ranges' |
419 | | // sizes becomes >= the expected mean size of a subcompaction |
420 | 3 | sum = 0; |
421 | 33 | for (size_t i = 0; i < ranges.size() - 1; i++30 ) { |
422 | 30 | sum += ranges[i].size; |
423 | 30 | if (subcompactions == 1) { |
424 | | // If there's only one left to schedule then it goes to the end so no |
425 | | // need to put an end boundary |
426 | 0 | continue; |
427 | 0 | } |
428 | 30 | if (sum >= mean) { |
429 | 8 | boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); |
430 | 8 | sizes_.emplace_back(sum); |
431 | 8 | subcompactions--; |
432 | 8 | sum = 0; |
433 | 8 | } |
434 | 30 | } |
435 | 3 | sizes_.emplace_back(sum + ranges.back().size); |
436 | 1.06k | } else { |
437 | | // Only one range so its size is the total sum of sizes computed above |
438 | 1.06k | sizes_.emplace_back(sum); |
439 | 1.06k | } |
440 | 1.06k | } |
441 | | |
442 | 11.4k | Result<FileNumbersHolder> CompactionJob::Run() { |
443 | 11.4k | TEST_SYNC_POINT("CompactionJob::Run():Start"); |
444 | 11.4k | log_buffer_->FlushBufferToLog(); |
445 | 11.4k | LogCompaction(); |
446 | | |
447 | 11.4k | const size_t num_threads = compact_->sub_compact_states.size(); |
448 | 11.4k | assert(num_threads > 0); |
449 | 0 | const uint64_t start_micros = env_->NowMicros(); |
450 | | |
451 | | // Launch a thread for each of subcompactions 1...num_threads-1 |
452 | 11.4k | std::vector<std::thread> thread_pool; |
453 | 11.4k | thread_pool.reserve(num_threads - 1); |
454 | 11.4k | FileNumbersHolder file_numbers_holder(file_numbers_provider_->CreateHolder()); |
455 | 11.4k | file_numbers_holder.Reserve(num_threads); |
456 | 11.4k | for (size_t i = 1; i < compact_->sub_compact_states.size(); i++8 ) { |
457 | 8 | thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, &file_numbers_holder, |
458 | 8 | &compact_->sub_compact_states[i]); |
459 | 8 | } |
460 | | |
461 | | // Always schedule the first subcompaction (whether or not there are also |
462 | | // others) in the current thread to be efficient with resources |
463 | 11.4k | ProcessKeyValueCompaction(&file_numbers_holder, &compact_->sub_compact_states[0]); |
464 | | |
465 | | // Wait for all other threads (if there are any) to finish execution |
466 | 11.4k | for (auto& thread : thread_pool) { |
467 | 8 | thread.join(); |
468 | 8 | } |
469 | | |
470 | 11.4k | if (output_directory_ && !db_options_.disableDataSync11.3k ) { |
471 | 10.2k | RETURN_NOT_OK(output_directory_->Fsync()); |
472 | 10.2k | } |
473 | | |
474 | 11.4k | compaction_stats_.micros = env_->NowMicros() - start_micros; |
475 | 11.4k | MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); |
476 | | |
477 | | // Check if any thread encountered an error during execution |
478 | 11.4k | Status status; |
479 | 11.4k | for (const auto& state : compact_->sub_compact_states) { |
480 | 11.4k | if (!state.status.ok()) { |
481 | 103 | status = state.status; |
482 | 103 | break; |
483 | 103 | } |
484 | 11.4k | } |
485 | | |
486 | 11.4k | TablePropertiesCollection tp; |
487 | 11.4k | for (const auto& state : compact_->sub_compact_states) { |
488 | 22.9k | for (const auto& output : state.outputs) { |
489 | 22.9k | auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(), |
490 | 22.9k | output.meta.fd.GetPathId()); |
491 | 22.9k | tp[fn] = output.table_properties; |
492 | 22.9k | } |
493 | 11.4k | } |
494 | 11.4k | compact_->compaction->SetOutputTableProperties(std::move(tp)); |
495 | | |
496 | | // Finish up all book-keeping to unify the subcompaction results |
497 | 11.4k | AggregateStatistics(); |
498 | 11.4k | UpdateCompactionStats(); |
499 | 11.4k | RecordCompactionIOStats(); |
500 | 11.4k | LogFlush(db_options_.info_log); |
501 | 11.4k | TEST_SYNC_POINT("CompactionJob::Run():End"); |
502 | | |
503 | 11.4k | compact_->status = status; |
504 | 11.4k | return file_numbers_holder; |
505 | 11.4k | } |
506 | | |
507 | 11.4k | Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { |
508 | 11.4k | db_mutex_->AssertHeld(); |
509 | 11.4k | Status status = compact_->status; |
510 | 11.4k | ColumnFamilyData* cfd = compact_->compaction->column_family_data(); |
511 | 11.4k | cfd->internal_stats()->AddCompactionStats( |
512 | 11.4k | compact_->compaction->output_level(), compaction_stats_); |
513 | | |
514 | 11.4k | if (status.ok()) { |
515 | 11.2k | status = InstallCompactionResults(mutable_cf_options); |
516 | 11.2k | } |
517 | 11.4k | VersionStorageInfo::LevelSummaryStorage tmp; |
518 | 11.4k | auto vstorage = cfd->current()->storage_info(); |
519 | 11.4k | const auto& stats = compaction_stats_; |
520 | 11.4k | const auto micros = static_cast<double>(std::max<uint64_t>(stats.micros, 1)); |
521 | 11.4k | const auto bytes_read_non_output_levels = static_cast<double>( |
522 | 11.4k | std::max<uint64_t>(stats.bytes_read_non_output_levels, 1)); |
523 | 11.4k | LOG_TO_BUFFER( |
524 | 11.4k | log_buffer_, |
525 | 11.4k | "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " |
526 | 11.4k | "files in(%d, %d) out(%d) " |
527 | 11.4k | "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " |
528 | 11.4k | "write-amplify(%.1f) %s, records in: %llu, records dropped: %llu\n", |
529 | 11.4k | cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), |
530 | 11.4k | (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / |
531 | 11.4k | static_cast<double>(stats.micros), |
532 | 11.4k | stats.bytes_written / static_cast<double>(stats.micros), |
533 | 11.4k | compact_->compaction->output_level(), |
534 | 11.4k | stats.num_input_files_in_non_output_levels, |
535 | 11.4k | stats.num_input_files_in_output_level, |
536 | 11.4k | stats.num_output_files, |
537 | 11.4k | stats.bytes_read_non_output_levels / 1048576.0, |
538 | 11.4k | stats.bytes_read_output_level / 1048576.0, |
539 | 11.4k | stats.bytes_written / 1048576.0, |
540 | 11.4k | (stats.bytes_written + stats.bytes_read_output_level + stats.bytes_read_non_output_levels) / |
541 | 11.4k | bytes_read_non_output_levels, |
542 | 11.4k | stats.bytes_written / bytes_read_non_output_levels, |
543 | 11.4k | status.ToString().c_str(), stats.num_input_records, |
544 | 11.4k | stats.num_dropped_records); |
545 | | |
546 | 11.4k | UpdateCompactionJobStats(stats); |
547 | | |
548 | 11.4k | auto stream = event_logger_->LogToBuffer(log_buffer_); |
549 | 11.4k | stream << "job" << job_id_ |
550 | 11.4k | << "event" << "compaction_finished" |
551 | 11.4k | << "compaction_time_micros" << compaction_stats_.micros |
552 | 11.4k | << "output_level" << compact_->compaction->output_level() |
553 | 11.4k | << "num_output_files" << compact_->NumOutputFiles() |
554 | 11.4k | << "total_output_size" << compact_->total_bytes |
555 | 11.4k | << "num_input_records" << compact_->num_input_records |
556 | 11.4k | << "num_output_records" << compact_->num_output_records |
557 | 11.4k | << "num_subcompactions" << compact_->sub_compact_states.size(); |
558 | | |
559 | 11.4k | if (measure_io_stats_ && compaction_job_stats_ != nullptr48 ) { |
560 | 48 | stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos; |
561 | 48 | stream << "file_range_sync_nanos" |
562 | 48 | << compaction_job_stats_->file_range_sync_nanos; |
563 | 48 | stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos; |
564 | 48 | stream << "file_prepare_write_nanos" |
565 | 48 | << compaction_job_stats_->file_prepare_write_nanos; |
566 | 48 | } |
567 | | |
568 | 11.4k | stream << "lsm_state"; |
569 | 11.4k | stream.StartArray(); |
570 | 71.6k | for (int level = 0; level < vstorage->num_levels(); ++level60.2k ) { |
571 | 60.2k | stream << vstorage->NumLevelFiles(level); |
572 | 60.2k | } |
573 | 11.4k | stream.EndArray(); |
574 | | |
575 | 11.4k | CleanupCompaction(); |
576 | 11.4k | return status; |
577 | 11.4k | } |
578 | | |
579 | | void CompactionJob::ProcessKeyValueCompaction( |
580 | 11.4k | FileNumbersHolder* holder, SubcompactionState* sub_compact) { |
581 | 11.4k | assert(sub_compact != nullptr); |
582 | 0 | std::unique_ptr<InternalIterator> input( |
583 | 11.4k | versions_->MakeInputIterator(sub_compact->compaction)); |
584 | | |
585 | | // I/O measurement variables |
586 | 11.4k | PerfLevel prev_perf_level = PerfLevel::kEnableTime; |
587 | 11.4k | const uint64_t kRecordStatsEvery = 1000; |
588 | 11.4k | uint64_t prev_write_nanos = 0; |
589 | 11.4k | uint64_t prev_fsync_nanos = 0; |
590 | 11.4k | uint64_t prev_range_sync_nanos = 0; |
591 | 11.4k | uint64_t prev_prepare_write_nanos = 0; |
592 | 11.4k | if (measure_io_stats_) { |
593 | 48 | prev_perf_level = GetPerfLevel(); |
594 | 48 | SetPerfLevel(PerfLevel::kEnableTime); |
595 | 48 | prev_write_nanos = IOSTATS(write_nanos); |
596 | 48 | prev_fsync_nanos = IOSTATS(fsync_nanos); |
597 | 48 | prev_range_sync_nanos = IOSTATS(range_sync_nanos); |
598 | 48 | prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); |
599 | 48 | } |
600 | | |
601 | 11.4k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
602 | 11.4k | auto compaction_filter = cfd->ioptions()->compaction_filter; |
603 | 11.4k | std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr; |
604 | 11.4k | if (compaction_filter == nullptr) { |
605 | 11.4k | compaction_filter_from_factory = |
606 | 11.4k | sub_compact->compaction->CreateCompactionFilter(); |
607 | 11.4k | compaction_filter = compaction_filter_from_factory.get(); |
608 | 11.4k | } |
609 | | |
610 | 11.4k | if (compaction_filter) { |
611 | | // This is used to persist the history cutoff hybrid time chosen for the DocDB compaction |
612 | | // filter. |
613 | 3.15k | largest_user_frontier_ = compaction_filter->GetLargestUserFrontier(); |
614 | 3.15k | } |
615 | | |
616 | 11.4k | MergeHelper merge( |
617 | 11.4k | env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, |
618 | 11.4k | compaction_filter, db_options_.info_log.get(), |
619 | 11.4k | cfd->ioptions()->min_partial_merge_operands, |
620 | 11.4k | false /* internal key corruption is expected */, |
621 | 11.4k | existing_snapshots_.empty() ? 011.0k : existing_snapshots_.back()361 , |
622 | 11.4k | compact_->compaction->level(), db_options_.statistics.get()); |
623 | | |
624 | 11.4k | TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); |
625 | | |
626 | 11.4k | Slice* start = sub_compact->start; |
627 | 11.4k | Slice* end = sub_compact->end; |
628 | 11.4k | if (start != nullptr) { |
629 | 8 | IterKey start_iter; |
630 | 8 | start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); |
631 | 8 | input->Seek(start_iter.GetKey()); |
632 | 11.4k | } else { |
633 | 11.4k | input->SeekToFirst(); |
634 | 11.4k | } |
635 | | |
636 | 11.4k | Status status; |
637 | 11.4k | sub_compact->c_iter.reset(new CompactionIterator( |
638 | 11.4k | input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), |
639 | 11.4k | &existing_snapshots_, earliest_write_conflict_snapshot_, false, |
640 | 11.4k | sub_compact->compaction, compaction_filter)); |
641 | 11.4k | auto c_iter = sub_compact->c_iter.get(); |
642 | 11.4k | c_iter->SeekToFirst(); |
643 | 11.4k | const auto& c_iter_stats = c_iter->iter_stats(); |
644 | | // TODO(noetzli): check whether we could check !shutting_down_->... only |
645 | | // only occasionally (see diff D42687) |
646 | 47.3M | while (status.ok()47.3M && !shutting_down_->load(std::memory_order_acquire) && |
647 | 47.3M | !cfd->IsDropped() && c_iter->Valid()47.3M ) { |
648 | | // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() |
649 | | // returns true. |
650 | 47.3M | const Slice& key = c_iter->key(); |
651 | 47.3M | const Slice& value = c_iter->value(); |
652 | | |
653 | | // If an end key (exclusive) is specified, check if the current key is |
654 | | // >= than it and exit if it is because the iterator is out of its range |
655 | 47.3M | if (end != nullptr && |
656 | 47.3M | cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0157k ) { |
657 | 8 | break; |
658 | 47.3M | } else if (sub_compact->compaction->ShouldStopBefore(key) && |
659 | 47.3M | sub_compact->builder != nullptr1.07k ) { |
660 | 1.03k | status = FinishCompactionOutputFile(input->status(), sub_compact); |
661 | 1.03k | if (!status.ok()) { |
662 | 0 | break; |
663 | 0 | } |
664 | 1.03k | } |
665 | | |
666 | 47.3M | if (c_iter_stats.num_input_records % kRecordStatsEvery == |
667 | 47.3M | kRecordStatsEvery - 1) { |
668 | 44.8k | RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); |
669 | 44.8k | c_iter->ResetRecordCounts(); |
670 | 44.8k | RecordCompactionIOStats(); |
671 | 44.8k | } |
672 | | |
673 | | // Open output file if necessary |
674 | 47.3M | if (sub_compact->builder == nullptr) { |
675 | 22.9k | status = OpenCompactionOutputFile(holder, sub_compact); |
676 | 22.9k | if (!status.ok()) { |
677 | 14 | break; |
678 | 14 | } |
679 | 22.9k | } |
680 | 47.3M | assert(sub_compact->builder != nullptr); |
681 | 0 | assert(sub_compact->current_output() != nullptr); |
682 | 0 | sub_compact->builder->Add(key, value); |
683 | 47.3M | auto boundaries = MakeFileBoundaryValues(db_options_.boundary_extractor.get(), |
684 | 47.3M | key, |
685 | 47.3M | value); |
686 | 47.3M | if (!boundaries) { |
687 | 0 | status = std::move(boundaries.status()); |
688 | 0 | break; |
689 | 0 | } |
690 | 47.3M | auto& boundary_values = *boundaries; |
691 | 47.3M | sub_compact->current_output()->meta.UpdateBoundaries(std::move(boundary_values.key), |
692 | 47.3M | boundary_values); |
693 | 47.3M | sub_compact->num_output_records++; |
694 | | |
695 | | // Close output file if it is big enough |
696 | | // TODO(aekmekji): determine if file should be closed earlier than this |
697 | | // during subcompactions (i.e. if output size, estimated by input size, is |
698 | | // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB |
699 | | // and 0.6MB instead of 1MB and 0.2MB) |
700 | 47.3M | if (sub_compact->builder->TotalFileSize() >= |
701 | 47.3M | sub_compact->compaction->max_output_file_size()) { |
702 | 11.5k | status = FinishCompactionOutputFile(input->status(), sub_compact); |
703 | 11.5k | } |
704 | | |
705 | 47.3M | c_iter->Next(); |
706 | 47.3M | } |
707 | | |
708 | 11.4k | sub_compact->num_input_records = c_iter_stats.num_input_records; |
709 | 11.4k | sub_compact->compaction_job_stats.num_input_deletion_records = |
710 | 11.4k | c_iter_stats.num_input_deletion_records; |
711 | 11.4k | sub_compact->compaction_job_stats.num_corrupt_keys = |
712 | 11.4k | c_iter_stats.num_input_corrupt_records; |
713 | 11.4k | sub_compact->compaction_job_stats.total_input_raw_key_bytes += |
714 | 11.4k | c_iter_stats.total_input_raw_key_bytes; |
715 | 11.4k | sub_compact->compaction_job_stats.total_input_raw_value_bytes += |
716 | 11.4k | c_iter_stats.total_input_raw_value_bytes; |
717 | | |
718 | 11.4k | RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); |
719 | 11.4k | RecordCompactionIOStats(); |
720 | | |
721 | 11.4k | if (status.ok() && |
722 | 11.4k | (11.3k shutting_down_->load(std::memory_order_acquire)11.3k || cfd->IsDropped()11.3k )) { |
723 | 54 | status = STATUS(ShutdownInProgress, |
724 | 54 | "Database shutdown or Column family drop during compaction"); |
725 | 54 | } |
726 | 11.4k | if (status.ok() && sub_compact->builder != nullptr11.3k ) { |
727 | 10.3k | status = FinishCompactionOutputFile(input->status(), sub_compact); |
728 | 10.3k | } |
729 | 11.4k | if (status.ok()) { |
730 | 11.3k | status = input->status(); |
731 | 11.3k | } |
732 | | |
733 | 11.4k | if (measure_io_stats_) { |
734 | 48 | sub_compact->compaction_job_stats.file_write_nanos += |
735 | 48 | IOSTATS(write_nanos) - prev_write_nanos; |
736 | 48 | sub_compact->compaction_job_stats.file_fsync_nanos += |
737 | 48 | IOSTATS(fsync_nanos) - prev_fsync_nanos; |
738 | 48 | sub_compact->compaction_job_stats.file_range_sync_nanos += |
739 | 48 | IOSTATS(range_sync_nanos) - prev_range_sync_nanos; |
740 | 48 | sub_compact->compaction_job_stats.file_prepare_write_nanos += |
741 | 48 | IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos; |
742 | 48 | if (prev_perf_level != PerfLevel::kEnableTime) { |
743 | 48 | SetPerfLevel(prev_perf_level); |
744 | 48 | } |
745 | 48 | } |
746 | | |
747 | 11.4k | sub_compact->c_iter.reset(); |
748 | 11.4k | input.reset(); |
749 | 11.4k | sub_compact->status = status; |
750 | 11.4k | if (compaction_filter) { |
751 | 3.14k | compaction_filter->CompactionFinished(); |
752 | 3.14k | } |
753 | 11.4k | } |
754 | | |
755 | | void CompactionJob::RecordDroppedKeys( |
756 | | const CompactionIteratorStats& c_iter_stats, |
757 | 56.3k | CompactionJobStats* compaction_job_stats) { |
758 | 56.3k | if (c_iter_stats.num_record_drop_user > 0) { |
759 | 599 | RecordTick(stats_, COMPACTION_KEY_DROP_USER, |
760 | 599 | c_iter_stats.num_record_drop_user); |
761 | 599 | } |
762 | 56.3k | if (c_iter_stats.num_record_drop_hidden > 0) { |
763 | 10.5k | RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, |
764 | 10.5k | c_iter_stats.num_record_drop_hidden); |
765 | 10.5k | if (compaction_job_stats) { |
766 | 10.5k | compaction_job_stats->num_records_replaced += |
767 | 10.5k | c_iter_stats.num_record_drop_hidden; |
768 | 10.5k | } |
769 | 10.5k | } |
770 | 56.3k | if (c_iter_stats.num_record_drop_obsolete > 0) { |
771 | 3.57k | RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, |
772 | 3.57k | c_iter_stats.num_record_drop_obsolete); |
773 | 3.57k | if (compaction_job_stats) { |
774 | 3.57k | compaction_job_stats->num_expired_deletion_records += |
775 | 3.57k | c_iter_stats.num_record_drop_obsolete; |
776 | 3.57k | } |
777 | 3.57k | } |
778 | 56.3k | } |
779 | | |
780 | 45.5k | void CompactionJob::CloseFile(Status* status, std::unique_ptr<WritableFileWriter>* writer) { |
781 | 45.5k | if (status->ok() && !db_options_.disableDataSync45.5k ) { |
782 | 43.2k | *status = (*writer)->Sync(db_options_.use_fsync); |
783 | 43.2k | } |
784 | 45.5k | if (status->ok()) { |
785 | 45.5k | *status = (*writer)->Close(); |
786 | 45.5k | } |
787 | 45.5k | writer->reset(); |
788 | | |
789 | 45.5k | } |
790 | | |
791 | | Status CompactionJob::FinishCompactionOutputFile( |
792 | 22.9k | const Status& input_status, SubcompactionState* sub_compact) { |
793 | 22.9k | assert(sub_compact != nullptr); |
794 | 0 | assert(sub_compact->base_outfile); |
795 | 0 | const bool is_split_sst = sub_compact->compaction->column_family_data()->ioptions() |
796 | 22.9k | ->table_factory->IsSplitSstForWriteSupported(); |
797 | 22.9k | assert((sub_compact->data_outfile != nullptr) == is_split_sst); |
798 | 0 | assert(sub_compact->builder != nullptr); |
799 | 0 | assert(sub_compact->current_output() != nullptr); |
800 | | |
801 | 0 | uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); |
802 | 22.9k | assert(output_number != 0); |
803 | | |
804 | 0 | TableProperties table_properties; |
805 | | // Check for iterator errors |
806 | 22.9k | Status s = input_status; |
807 | 22.9k | auto meta = &sub_compact->current_output()->meta; |
808 | 22.9k | const uint64_t current_entries = sub_compact->builder->NumEntries(); |
809 | 22.9k | meta->marked_for_compaction = sub_compact->builder->NeedCompact(); |
810 | 22.9k | if (s.ok()) { |
811 | 22.9k | s = sub_compact->builder->Finish(); |
812 | 22.9k | } else { |
813 | 2 | sub_compact->builder->Abandon(); |
814 | 2 | } |
815 | | |
816 | 22.9k | const uint64_t current_total_bytes = sub_compact->builder->TotalFileSize(); |
817 | 22.9k | meta->fd.total_file_size = current_total_bytes; |
818 | 22.9k | meta->fd.base_file_size = sub_compact->builder->BaseFileSize(); |
819 | 22.9k | sub_compact->current_output()->finished = true; |
820 | 22.9k | sub_compact->total_bytes += current_total_bytes; |
821 | | |
822 | | // Finish and check for file errors |
823 | 22.9k | if (sub_compact->data_outfile) { |
824 | 22.6k | CloseFile(&s, &sub_compact->data_outfile); |
825 | 22.6k | } |
826 | 22.9k | CloseFile(&s, &sub_compact->base_outfile); |
827 | | |
828 | 22.9k | if (s.ok() && current_entries > 022.9k ) { |
829 | | // Verify that the table is usable |
830 | 22.9k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
831 | 22.9k | InternalIterator* iter = cfd->table_cache()->NewIterator( |
832 | 22.9k | ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, meta->UserFilter(), |
833 | 22.9k | nullptr, cfd->internal_stats()->GetFileReadHist( |
834 | 22.9k | compact_->compaction->output_level()), |
835 | 22.9k | false); |
836 | 22.9k | s = iter->status(); |
837 | | |
838 | 22.9k | if (s.ok() && paranoid_file_checks_22.8k ) { |
839 | 5 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()4 ) {}4 |
840 | 1 | s = iter->status(); |
841 | 1 | } |
842 | | |
843 | 22.9k | delete iter; |
844 | 22.9k | if (s.ok()) { |
845 | 22.8k | auto tp = sub_compact->builder->GetTableProperties(); |
846 | 22.8k | sub_compact->current_output()->table_properties = |
847 | 22.8k | std::make_shared<TableProperties>(tp); |
848 | 22.8k | TableFileCreationInfo info(std::move(tp)); |
849 | 22.8k | info.db_name = dbname_; |
850 | 22.8k | info.cf_name = cfd->GetName(); |
851 | 22.8k | info.file_path = |
852 | 22.8k | TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), |
853 | 22.8k | meta->fd.GetPathId()); |
854 | 22.8k | info.file_size = meta->fd.GetTotalFileSize(); |
855 | 22.8k | info.job_id = job_id_; |
856 | 22.8k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
857 | 22.8k | "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 |
858 | 22.8k | " keys, %" PRIu64 " bytes%s %s", |
859 | 22.8k | cfd->GetName().c_str(), job_id_, output_number, current_entries, |
860 | 22.8k | current_total_bytes, |
861 | 22.8k | meta->marked_for_compaction ? " (need compaction)" : "", |
862 | 22.8k | meta->FrontiersToString().c_str()); |
863 | 22.8k | EventHelpers::LogAndNotifyTableFileCreation( |
864 | 22.8k | event_logger_, cfd->ioptions()->listeners, meta->fd, info); |
865 | 22.8k | } |
866 | 22.9k | } |
867 | | |
868 | | // Report new file to SstFileManagerImpl |
869 | 22.9k | auto sfm = |
870 | 22.9k | static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get()); |
871 | 22.9k | if (sfm && meta->fd.GetPathId() == 027 ) { |
872 | 24 | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
873 | 24 | auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), |
874 | 24 | meta->fd.GetPathId()); |
875 | 24 | RETURN_NOT_OK(sfm->OnAddFile(fn)); |
876 | 24 | if (is_split_sst) { |
877 | 24 | RETURN_NOT_OK(sfm->OnAddFile(TableBaseToDataFileName(fn))); |
878 | 24 | } |
879 | 24 | if (sfm->IsMaxAllowedSpaceReached()) { |
880 | 4 | InstrumentedMutexLock l(db_mutex_); |
881 | 4 | if (db_bg_error_->ok()) { |
882 | 4 | s = STATUS(IOError, "Max allowed space was reached"); |
883 | 4 | *db_bg_error_ = s; |
884 | 4 | TEST_SYNC_POINT( |
885 | 4 | "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); |
886 | 4 | } |
887 | 4 | } |
888 | 24 | } |
889 | | |
890 | 22.9k | sub_compact->builder.reset(); |
891 | 22.9k | return s; |
892 | 22.9k | } |
893 | | |
894 | | Status CompactionJob::InstallCompactionResults( |
895 | 11.2k | const MutableCFOptions& mutable_cf_options) { |
896 | 11.2k | db_mutex_->AssertHeld(); |
897 | | |
898 | 11.2k | auto* compaction = compact_->compaction; |
899 | | // paranoia: verify that the files that we started with |
900 | | // still exist in the current version and in the same original level. |
901 | | // This ensures that a concurrent compaction did not erroneously |
902 | | // pick the same files to compact_. |
903 | 11.2k | if (!versions_->VerifyCompactionFileConsistency(compaction)) { |
904 | 0 | Compaction::InputLevelSummaryBuffer inputs_summary; |
905 | |
|
906 | 0 | RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, |
907 | 0 | "[%s] [JOB %d] Compaction %s aborted", |
908 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
909 | 0 | compaction->InputLevelSummary(&inputs_summary)); |
910 | 0 | return STATUS(Corruption, "Compaction input files inconsistent"); |
911 | 0 | } |
912 | | |
913 | 11.2k | { |
914 | 11.2k | Compaction::InputLevelSummaryBuffer inputs_summary; |
915 | 11.2k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
916 | 11.2k | "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", |
917 | 11.2k | compaction->column_family_data()->GetName().c_str(), job_id_, |
918 | 11.2k | compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes); |
919 | 11.2k | } |
920 | | |
921 | | // Add compaction outputs |
922 | 11.2k | compaction->AddInputDeletions(compaction->edit()); |
923 | | |
924 | 11.3k | for (const auto& sub_compact : compact_->sub_compact_states) { |
925 | 22.7k | for (const auto& out : sub_compact.outputs) { |
926 | 22.7k | compaction->edit()->AddFile(compaction->output_level(), out.meta); |
927 | 22.7k | } |
928 | 11.3k | } |
929 | 11.2k | if (largest_user_frontier_) { |
930 | 1.06k | compaction->edit()->UpdateFlushedFrontier(largest_user_frontier_); |
931 | 1.06k | } |
932 | 11.2k | return versions_->LogAndApply(compaction->column_family_data(), |
933 | 11.2k | mutable_cf_options, compaction->edit(), |
934 | 11.2k | db_mutex_, db_directory_); |
935 | 11.2k | } |
936 | | |
937 | 67.7k | void CompactionJob::RecordCompactionIOStats() { |
938 | 67.7k | RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); |
939 | 67.7k | IOSTATS_RESET(bytes_read); |
940 | 67.7k | RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written)); |
941 | 67.7k | IOSTATS_RESET(bytes_written); |
942 | 67.7k | } |
943 | | |
944 | | Status CompactionJob::OpenFile(const std::string table_name, uint64_t file_number, |
945 | | const std::string file_type_label, const std::string fname, |
946 | 45.9k | std::unique_ptr<WritableFile>* writable_file) { |
947 | 45.9k | Status s = NewWritableFile(env_, fname, writable_file, env_options_); |
948 | 45.9k | if (!s.ok()) { |
949 | 14 | RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, |
950 | 14 | "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 |
951 | 14 | " fails at NewWritableFile for %s file with status %s", table_name.c_str(), |
952 | 14 | job_id_, file_number, file_type_label.c_str(), s.ToString().c_str()); |
953 | 14 | LogFlush(db_options_.info_log); |
954 | 14 | } |
955 | 45.9k | return s; |
956 | 45.9k | } |
957 | | |
958 | | Status CompactionJob::OpenCompactionOutputFile( |
959 | 22.9k | FileNumbersHolder* holder, SubcompactionState* sub_compact) { |
960 | 22.9k | assert(sub_compact != nullptr); |
961 | 0 | assert(sub_compact->builder == nullptr); |
962 | 0 | FileNumber file_number = file_numbers_provider_->NewFileNumber(holder); |
963 | | |
964 | | // Make the output file |
965 | 22.9k | unique_ptr<WritableFile> base_writable_file; |
966 | 22.9k | unique_ptr<WritableFile> data_writable_file; |
967 | 22.9k | const std::string base_fname = TableFileName(db_options_.db_paths, file_number, |
968 | 22.9k | sub_compact->compaction->output_path_id()); |
969 | 22.9k | const std::string data_fname = TableBaseToDataFileName(base_fname); |
970 | 22.9k | const std::string table_name = sub_compact->compaction->column_family_data()->GetName(); |
971 | 22.9k | RETURN_NOT_OK(OpenFile(table_name, file_number, "base", base_fname, &base_writable_file)); |
972 | 22.9k | RETURN_NOT_OK(OpenFile(table_name, file_number, "data", data_fname, &data_writable_file)); |
973 | | |
974 | 22.9k | SubcompactionState::Output out; |
975 | 22.9k | out.meta.fd = |
976 | 22.9k | FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0, 0); |
977 | | // Update sequence number boundaries for out. |
978 | 86.9k | for (size_t level_idx = 0; level_idx < compact_->compaction->num_input_levels(); level_idx++64.0k ) { |
979 | 229k | for (FileMetaData *fmd : *compact_->compaction->inputs(level_idx) ) { |
980 | 229k | out.meta.UpdateBoundariesExceptKey(fmd->smallest, UpdateBoundariesType::kSmallest); |
981 | 229k | out.meta.UpdateBoundariesExceptKey(fmd->largest, UpdateBoundariesType::kLargest); |
982 | 229k | } |
983 | 64.0k | } |
984 | 22.9k | out.finished = false; |
985 | | |
986 | 22.9k | sub_compact->outputs.push_back(out); |
987 | | |
988 | 22.9k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
989 | | |
990 | 22.9k | { |
991 | 22.9k | auto setup_outfile = [this, sub_compact] ( |
992 | 22.9k | size_t preallocation_block_size, std::unique_ptr<WritableFile>* writable_file, |
993 | 45.6k | std::unique_ptr<WritableFileWriter>* writer) { |
994 | 45.6k | (*writable_file)->SetIOPriority(Env::IO_LOW); |
995 | 45.6k | if (preallocation_block_size > 0) { |
996 | 22.9k | (*writable_file)->SetPreallocationBlockSize(preallocation_block_size); |
997 | 22.9k | } |
998 | 45.6k | writer->reset(new WritableFileWriter( |
999 | 45.6k | std::move(*writable_file), env_options_, sub_compact->compaction->suspender())); |
1000 | 45.6k | }; |
1001 | | |
1002 | 22.9k | const bool is_split_sst = cfd->ioptions()->table_factory->IsSplitSstForWriteSupported(); |
1003 | 22.9k | const size_t preallocation_data_block_size = static_cast<size_t>( |
1004 | 22.9k | sub_compact->compaction->OutputFilePreallocationSize()); |
1005 | | // if we don't have separate data file - preallocate size for base file |
1006 | 22.9k | setup_outfile( |
1007 | 22.9k | is_split_sst ? 022.6k : preallocation_data_block_size278 , &base_writable_file, |
1008 | 22.9k | &sub_compact->base_outfile); |
1009 | 22.9k | if (is_split_sst) { |
1010 | 22.6k | setup_outfile(preallocation_data_block_size, &data_writable_file, &sub_compact->data_outfile); |
1011 | 22.6k | } |
1012 | 22.9k | } |
1013 | | |
1014 | | // If the Column family flag is to only optimize filters for hits, |
1015 | | // we can skip creating filters if this is the bottommost_level where |
1016 | | // data is going to be found |
1017 | 22.9k | bool skip_filters = |
1018 | 22.9k | cfd->ioptions()->optimize_filters_for_hits && bottommost_level_319 ; |
1019 | 22.9k | sub_compact->builder.reset(NewTableBuilder( |
1020 | 22.9k | *cfd->ioptions(), cfd->internal_comparator(), |
1021 | 22.9k | cfd->int_tbl_prop_collector_factories(), cfd->GetID(), |
1022 | 22.9k | sub_compact->base_outfile.get(), sub_compact->data_outfile.get(), |
1023 | 22.9k | sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, |
1024 | 22.9k | skip_filters)); |
1025 | 22.9k | LogFlush(db_options_.info_log); |
1026 | 22.9k | return Status::OK(); |
1027 | 22.9k | } |
1028 | | |
1029 | 11.4k | void CompactionJob::CleanupCompaction() { |
1030 | 11.4k | for (SubcompactionState& sub_compact : compact_->sub_compact_states) { |
1031 | 11.4k | const auto& sub_status = sub_compact.status; |
1032 | | |
1033 | 11.4k | if (sub_compact.builder != nullptr) { |
1034 | | // May happen if we get a shutdown call in the middle of compaction |
1035 | 32 | sub_compact.builder->Abandon(); |
1036 | 32 | sub_compact.builder.reset(); |
1037 | 11.3k | } else if (sub_status.ok() && |
1038 | 11.3k | (11.3k sub_compact.base_outfile != nullptr11.3k || sub_compact.data_outfile != nullptr11.3k )) { |
1039 | 0 | std::string log_message; |
1040 | 0 | log_message.append("sub_status.ok(), but: sub_compact.base_outfile "); |
1041 | 0 | log_message.append(sub_compact.base_outfile == nullptr ? "==" : "!="); |
1042 | 0 | log_message.append(" nullptr, sub_compact.data_outfile "); |
1043 | 0 | log_message.append(sub_compact.data_outfile == nullptr ? "==" : "!="); |
1044 | 0 | log_message.append(" nullptr"); |
1045 | 0 | RLOG(InfoLogLevel::FATAL_LEVEL, db_options_.info_log, log_message.c_str()); |
1046 | 0 | assert(!"If sub_status is OK, sub_compact.*_outfile should be nullptr"); |
1047 | 0 | } |
1048 | 22.9k | for (const auto& out : sub_compact.outputs) { |
1049 | | // If this file was inserted into the table cache then remove |
1050 | | // them here because this compaction was not committed. |
1051 | 22.9k | if (!sub_status.ok()) { |
1052 | 222 | TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber()); |
1053 | 222 | } |
1054 | 22.9k | } |
1055 | 11.4k | } |
1056 | 11.4k | delete compact_; |
1057 | 11.4k | compact_ = nullptr; |
1058 | 11.4k | } |
1059 | | |
1060 | | #ifndef ROCKSDB_LITE |
1061 | | namespace { |
1062 | | void CopyPrefix( |
1063 | 22.3k | const Slice& src, size_t prefix_length, std::string* dst) { |
1064 | 22.3k | assert(prefix_length > 0); |
1065 | 22.3k | size_t length = src.size() > prefix_length ? prefix_length13.4k : src.size()8.92k ; |
1066 | 22.3k | dst->assign(src.cdata(), length); |
1067 | 22.3k | } |
1068 | | } // namespace |
1069 | | |
1070 | | #endif // !ROCKSDB_LITE |
1071 | | |
1072 | 11.3k | void CompactionJob::UpdateCompactionStats() { |
1073 | 11.3k | Compaction* compaction = compact_->compaction; |
1074 | 11.3k | compaction_stats_.num_input_files_in_non_output_levels = 0; |
1075 | 11.3k | compaction_stats_.num_input_files_in_output_level = 0; |
1076 | 11.3k | for (int input_level = 0; |
1077 | 43.1k | input_level < static_cast<int>(compaction->num_input_levels()); |
1078 | 31.7k | ++input_level) { |
1079 | 31.7k | if (compaction->start_level() + input_level |
1080 | 31.7k | != compaction->output_level()) { |
1081 | 21.3k | UpdateCompactionInputStatsHelper( |
1082 | 21.3k | &compaction_stats_.num_input_files_in_non_output_levels, |
1083 | 21.3k | &compaction_stats_.bytes_read_non_output_levels, |
1084 | 21.3k | input_level); |
1085 | 21.3k | } else { |
1086 | 10.4k | UpdateCompactionInputStatsHelper( |
1087 | 10.4k | &compaction_stats_.num_input_files_in_output_level, |
1088 | 10.4k | &compaction_stats_.bytes_read_output_level, |
1089 | 10.4k | input_level); |
1090 | 10.4k | } |
1091 | 31.7k | } |
1092 | | |
1093 | 11.4k | for (const auto& sub_compact : compact_->sub_compact_states) { |
1094 | 11.4k | size_t num_output_files = sub_compact.outputs.size(); |
1095 | 11.4k | if (sub_compact.builder != nullptr) { |
1096 | | // An error occurred so ignore the last output. |
1097 | 31 | assert(num_output_files > 0); |
1098 | 0 | --num_output_files; |
1099 | 31 | } |
1100 | 0 | compaction_stats_.num_output_files += static_cast<int>(num_output_files); |
1101 | | |
1102 | 22.9k | for (const auto& out : sub_compact.outputs) { |
1103 | 22.9k | compaction_stats_.bytes_written += out.meta.fd.total_file_size; |
1104 | 22.9k | } |
1105 | 11.4k | if (sub_compact.num_input_records > sub_compact.num_output_records) { |
1106 | 5.07k | compaction_stats_.num_dropped_records += |
1107 | 5.07k | sub_compact.num_input_records - sub_compact.num_output_records; |
1108 | 5.07k | } |
1109 | 11.4k | } |
1110 | 11.3k | } |
1111 | | |
1112 | | void CompactionJob::UpdateCompactionInputStatsHelper( |
1113 | 31.7k | int* num_files, uint64_t* bytes_read, int input_level) { |
1114 | 31.7k | const Compaction* compaction = compact_->compaction; |
1115 | 31.7k | auto num_input_files = compaction->num_input_files(input_level); |
1116 | 31.7k | *num_files += static_cast<int>(num_input_files); |
1117 | | |
1118 | 79.9k | for (size_t i = 0; i < num_input_files; ++i48.1k ) { |
1119 | 48.1k | const auto* file_meta = compaction->input(input_level, i); |
1120 | 48.1k | *bytes_read += file_meta->fd.GetTotalFileSize(); |
1121 | 48.1k | compaction_stats_.num_input_records += |
1122 | 48.1k | static_cast<uint64_t>(file_meta->num_entries); |
1123 | 48.1k | } |
1124 | 31.7k | } |
1125 | | |
1126 | | void CompactionJob::UpdateCompactionJobStats( |
1127 | 11.4k | const InternalStats::CompactionStats& stats) const { |
1128 | 11.4k | #ifndef ROCKSDB_LITE |
1129 | 11.4k | if (compaction_job_stats_) { |
1130 | 11.3k | compaction_job_stats_->elapsed_micros = stats.micros; |
1131 | | |
1132 | | // input information |
1133 | 11.3k | compaction_job_stats_->total_input_bytes = |
1134 | 11.3k | stats.bytes_read_non_output_levels + |
1135 | 11.3k | stats.bytes_read_output_level; |
1136 | 11.3k | compaction_job_stats_->num_input_records = |
1137 | 11.3k | compact_->num_input_records; |
1138 | 11.3k | compaction_job_stats_->num_input_files = |
1139 | 11.3k | stats.num_input_files_in_non_output_levels + |
1140 | 11.3k | stats.num_input_files_in_output_level; |
1141 | 11.3k | compaction_job_stats_->num_input_files_at_output_level = |
1142 | 11.3k | stats.num_input_files_in_output_level; |
1143 | | |
1144 | | // output information |
1145 | 11.3k | compaction_job_stats_->total_output_bytes = stats.bytes_written; |
1146 | 11.3k | compaction_job_stats_->num_output_records = |
1147 | 11.3k | compact_->num_output_records; |
1148 | 11.3k | compaction_job_stats_->num_output_files = stats.num_output_files; |
1149 | | |
1150 | 11.3k | if (compact_->NumOutputFiles() > 0U) { |
1151 | 11.1k | CopyPrefix( |
1152 | 11.1k | compact_->SmallestUserKey(), |
1153 | 11.1k | CompactionJobStats::kMaxPrefixLength, |
1154 | 11.1k | &compaction_job_stats_->smallest_output_key_prefix); |
1155 | 11.1k | CopyPrefix( |
1156 | 11.1k | compact_->LargestUserKey(), |
1157 | 11.1k | CompactionJobStats::kMaxPrefixLength, |
1158 | 11.1k | &compaction_job_stats_->largest_output_key_prefix); |
1159 | 11.1k | } |
1160 | 11.3k | } |
1161 | 11.4k | #endif // !ROCKSDB_LITE |
1162 | 11.4k | } |
1163 | | |
1164 | 11.4k | void CompactionJob::LogCompaction() { |
1165 | 11.4k | Compaction* compaction = compact_->compaction; |
1166 | 11.4k | ColumnFamilyData* cfd = compaction->column_family_data(); |
1167 | | |
1168 | | // Let's check if anything will get logged. Don't prepare all the info if |
1169 | | // we're not logging |
1170 | 11.4k | if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { |
1171 | 11.4k | Compaction::InputLevelSummaryBuffer inputs_summary; |
1172 | 11.4k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
1173 | 11.4k | "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(), |
1174 | 11.4k | job_id_, compaction->InputLevelSummary(&inputs_summary), |
1175 | 11.4k | compaction->score()); |
1176 | 11.4k | char scratch[2345]; |
1177 | 11.4k | compaction->Summary(scratch, sizeof(scratch)); |
1178 | 11.4k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
1179 | 11.4k | "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); |
1180 | | // build event logger report |
1181 | 11.4k | auto stream = event_logger_->Log(); |
1182 | 11.4k | stream << "job" << job_id_ << "event" |
1183 | 11.4k | << "compaction_started"; |
1184 | 43.1k | for (size_t i = 0; i < compaction->num_input_levels(); ++i31.7k ) { |
1185 | 31.7k | stream << ("files_L" + ToString(compaction->level(i))); |
1186 | 31.7k | stream.StartArray(); |
1187 | 48.2k | for (auto f : *compaction->inputs(i)) { |
1188 | 48.2k | stream << f->fd.GetNumber(); |
1189 | 48.2k | } |
1190 | 31.7k | stream.EndArray(); |
1191 | 31.7k | } |
1192 | 11.4k | stream << "score" << compaction->score() << "input_data_size" |
1193 | 11.4k | << compaction->CalculateTotalInputSize(); |
1194 | 11.4k | } |
1195 | 11.4k | } |
1196 | | |
1197 | | } // namespace rocksdb |