/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_set.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/version_set.h" |
25 | | #include <memory> |
26 | | |
27 | | #ifndef __STDC_FORMAT_MACROS |
28 | | #define __STDC_FORMAT_MACROS |
29 | | #endif |
30 | | |
31 | | #include <inttypes.h> |
32 | | #include <stdio.h> |
33 | | #include <algorithm> |
34 | | #include <map> |
35 | | #include <set> |
36 | | #include <climits> |
37 | | #include <unordered_map> |
38 | | #include <vector> |
39 | | #include <string> |
40 | | |
41 | | #include <glog/logging.h> |
42 | | #include <boost/container/small_vector.hpp> |
43 | | |
44 | | #include "yb/gutil/casts.h" |
45 | | |
46 | | #include "yb/util/format.h" |
47 | | #include "yb/util/status_format.h" |
48 | | |
49 | | #include "yb/rocksdb/db/filename.h" |
50 | | #include "yb/rocksdb/db/file_numbers.h" |
51 | | #include "yb/rocksdb/db/internal_stats.h" |
52 | | #include "yb/rocksdb/db/log_reader.h" |
53 | | #include "yb/rocksdb/db/log_writer.h" |
54 | | #include "yb/rocksdb/db/memtable.h" |
55 | | #include "yb/rocksdb/db/merge_context.h" |
56 | | #include "yb/rocksdb/db/table_cache.h" |
57 | | #include "yb/rocksdb/db/compaction.h" |
58 | | #include "yb/rocksdb/db/version_builder.h" |
59 | | #include "yb/rocksdb/db/writebuffer.h" |
60 | | #include "yb/rocksdb/env.h" |
61 | | #include "yb/rocksdb/merge_operator.h" |
62 | | #include "yb/rocksdb/table/internal_iterator.h" |
63 | | #include "yb/rocksdb/table/iterator_wrapper.h" |
64 | | #include "yb/rocksdb/table/table_reader.h" |
65 | | #include "yb/rocksdb/table/merger.h" |
66 | | #include "yb/rocksdb/table/two_level_iterator.h" |
67 | | #include "yb/rocksdb/table/format.h" |
68 | | #include "yb/rocksdb/table/meta_blocks.h" |
69 | | #include "yb/rocksdb/table/get_context.h" |
70 | | |
71 | | #include "yb/rocksdb/util/coding.h" |
72 | | #include "yb/rocksdb/util/file_reader_writer.h" |
73 | | #include "yb/rocksdb/util/logging.h" |
74 | | #include "yb/rocksdb/util/statistics.h" |
75 | | #include "yb/rocksdb/util/stop_watch.h" |
76 | | #include "yb/rocksdb/util/sync_point.h" |
77 | | |
78 | | namespace rocksdb { |
79 | | |
80 | | namespace { |
81 | | |
82 | | // Find File in LevelFilesBrief data structure |
83 | | // Within an index range defined by left and right |
84 | | int FindFileInRange(const InternalKeyComparator& icmp, |
85 | | const LevelFilesBrief& file_level, |
86 | | const Slice& key, |
87 | | uint32_t left, |
88 | 3.02M | uint32_t right) { |
89 | 7.83M | while (left < right) { |
90 | 4.80M | uint32_t mid = (left + right) / 2; |
91 | 4.80M | const FdWithBoundaries& f = file_level.files[mid]; |
92 | 4.80M | if (icmp.InternalKeyComparator::Compare(f.largest.key, key) < 0) { |
93 | | // Key at "mid.largest" is < "target". Therefore all |
94 | | // files at or before "mid" are uninteresting. |
95 | 1.88M | left = mid + 1; |
96 | 2.92M | } else { |
97 | | // Key at "mid.largest" is >= "target". Therefore all files |
98 | | // after "mid" are uninteresting. |
99 | 2.92M | right = mid; |
100 | 2.92M | } |
101 | 4.80M | } |
102 | 3.02M | return right; |
103 | 3.02M | } |
104 | | |
105 | | // Class to help choose the next file to search for the particular key. |
106 | | // Searches and returns files level by level. |
107 | | // We can search level-by-level since entries never hop across |
108 | | // levels. Therefore we are guaranteed that if we find data |
109 | | // in a smaller level, later levels are irrelevant (unless we |
110 | | // are MergeInProgress). |
111 | | class FilePicker { |
112 | | public: |
113 | | FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key, |
114 | | const Slice& ikey, autovector<LevelFilesBrief>* file_levels, |
115 | | unsigned int num_levels, FileIndexer* file_indexer, |
116 | | const Comparator* user_comparator, |
117 | | const InternalKeyComparator* internal_comparator) |
118 | | : num_levels_(num_levels), |
119 | | curr_level_(-1), |
120 | | hit_file_level_(-1), |
121 | | search_left_bound_(0), |
122 | | search_right_bound_(FileIndexer::kLevelMaxIndex), |
123 | | #ifndef NDEBUG |
124 | | files_(files), |
125 | | #endif |
126 | | level_files_brief_(file_levels), |
127 | | is_hit_file_last_in_level_(false), |
128 | | user_key_(user_key), |
129 | | ikey_(ikey), |
130 | | file_indexer_(file_indexer), |
131 | | user_comparator_(user_comparator), |
132 | 7.30M | internal_comparator_(internal_comparator) { |
133 | | // Setup member variables to search first level. |
134 | 7.30M | search_ended_ = !PrepareNextLevel(); |
135 | 7.30M | if (!search_ended_) { |
136 | | // Prefetch Level 0 table data to avoid cache miss if possible. |
137 | 16.0M | for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i9.10M ) { |
138 | 9.10M | auto* r = (*level_files_brief_)[0].files[i].fd.table_reader; |
139 | 9.10M | if (r) { |
140 | 71.4k | r->Prepare(ikey); |
141 | 71.4k | } |
142 | 9.10M | } |
143 | 6.98M | } |
144 | 7.30M | } |
145 | | |
146 | 9.08M | FdWithBoundaries* GetNextFile() { |
147 | 10.9M | while (!search_ended_) { // Loops over different levels. |
148 | 12.4M | while (curr_index_in_curr_level_ < curr_file_level_->num_files) { |
149 | | // Loops over all files in current level. |
150 | 10.8M | FdWithBoundaries* f = &curr_file_level_->files[curr_index_in_curr_level_]; |
151 | 10.8M | hit_file_level_ = curr_level_; |
152 | 10.8M | is_hit_file_last_in_level_ = |
153 | 10.8M | curr_index_in_curr_level_ == curr_file_level_->num_files - 1; |
154 | 10.8M | int cmp_largest = -1; |
155 | | |
156 | | // Do key range filtering of files or/and fractional cascading if: |
157 | | // (1) not all the files are in level 0, or |
158 | | // (2) there are more than 3 Level 0 files |
159 | | // If there are only 3 or less level 0 files in the system, we skip |
160 | | // the key range filtering. In this case, more likely, the system is |
161 | | // highly tuned to minimize number of tables queried by each query, |
162 | | // so it is unlikely that key range filtering is more efficient than |
163 | | // querying the files. |
164 | 10.8M | if (num_levels_ > 1 || curr_file_level_->num_files > 35.26M ) { |
165 | | // Check if key is within a file's range. If search left bound and |
166 | | // right bound point to the same find, we are sure key falls in |
167 | | // range. |
168 | 5.65M | assert( |
169 | 5.65M | curr_level_ == 0 || |
170 | 5.65M | curr_index_in_curr_level_ == start_index_in_curr_level_ || |
171 | 5.65M | user_comparator_->Compare(user_key_, f->smallest.user_key()) <= 0); |
172 | | |
173 | 0 | int cmp_smallest = user_comparator_->Compare(user_key_, f->smallest.user_key()); |
174 | 5.65M | if (cmp_smallest >= 0) { |
175 | 3.57M | cmp_largest = user_comparator_->Compare(user_key_, f->largest.user_key()); |
176 | 3.57M | } |
177 | | |
178 | | // Setup file search bound for the next level based on the |
179 | | // comparison results |
180 | 5.65M | if (curr_level_ > 0) { |
181 | 2.64M | file_indexer_->GetNextLevelIndex(curr_level_, |
182 | 2.64M | curr_index_in_curr_level_, |
183 | 2.64M | cmp_smallest, cmp_largest, |
184 | 2.64M | &search_left_bound_, |
185 | 2.64M | &search_right_bound_); |
186 | 2.64M | } |
187 | | // Key falls out of current file's range |
188 | 5.65M | if (cmp_smallest < 0 || cmp_largest > 03.57M ) { |
189 | 2.58M | if (curr_level_ == 0) { |
190 | 2.31M | ++curr_index_in_curr_level_; |
191 | 2.31M | continue; |
192 | 2.31M | } else { |
193 | | // Search next level. |
194 | 275k | break; |
195 | 275k | } |
196 | 2.58M | } |
197 | 5.65M | } |
198 | 8.25M | #ifndef NDEBUG |
199 | | // Sanity check to make sure that the files are correctly sorted |
200 | 8.25M | if (prev_file_) { |
201 | 702k | if (curr_level_ != 0) { |
202 | 0 | int comp_sign = internal_comparator_->Compare( |
203 | 0 | prev_file_->largest.key, f->smallest.key); |
204 | 0 | assert(comp_sign < 0); |
205 | 702k | } else { |
206 | | // level == 0, the current file cannot be newer than the previous |
207 | | // one. Use compressed data structure, has no attribute seqNo |
208 | 702k | assert(curr_index_in_curr_level_ > 0); |
209 | 0 | assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_], |
210 | 702k | files_[0][curr_index_in_curr_level_-1])); |
211 | 702k | } |
212 | 702k | } |
213 | 0 | prev_file_ = f; |
214 | 8.25M | #endif |
215 | 8.25M | if (curr_level_ > 0 && cmp_largest < 02.36M ) { |
216 | | // No more files to search in this level. |
217 | 2.36M | search_ended_ = !PrepareNextLevel(); |
218 | 5.89M | } else { |
219 | 5.89M | ++curr_index_in_curr_level_; |
220 | 5.89M | } |
221 | 8.25M | return f; |
222 | 10.8M | } |
223 | | // Start searching next level. |
224 | 1.85M | search_ended_ = !PrepareNextLevel(); |
225 | 1.85M | } |
226 | | // Search ended. |
227 | 827k | return nullptr; |
228 | 9.08M | } |
229 | | |
230 | | // getter for current file level |
231 | | // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts |
232 | 25.9M | unsigned int GetHitFileLevel() { return hit_file_level_; } |
233 | | |
234 | | // Returns true if the most recent "hit file" (i.e., one returned by |
235 | | // GetNextFile()) is at the last index in its level. |
236 | 8.25M | bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } |
237 | | |
238 | | private: |
239 | | unsigned int num_levels_; |
240 | | unsigned int curr_level_; |
241 | | unsigned int hit_file_level_; |
242 | | int32_t search_left_bound_; |
243 | | int32_t search_right_bound_; |
244 | | #ifndef NDEBUG |
245 | | std::vector<FileMetaData*>* files_; |
246 | | #endif |
247 | | autovector<LevelFilesBrief>* level_files_brief_; |
248 | | bool search_ended_; |
249 | | bool is_hit_file_last_in_level_; |
250 | | LevelFilesBrief* curr_file_level_; |
251 | | unsigned int curr_index_in_curr_level_; |
252 | | unsigned int start_index_in_curr_level_; |
253 | | Slice user_key_; |
254 | | Slice ikey_; |
255 | | FileIndexer* file_indexer_; |
256 | | const Comparator* user_comparator_; |
257 | | const InternalKeyComparator* internal_comparator_; |
258 | | #ifndef NDEBUG |
259 | | FdWithBoundaries* prev_file_; |
260 | | #endif |
261 | | |
262 | | // Setup local variables to search next level. |
263 | | // Returns false if there are no more levels to search. |
264 | 11.5M | bool PrepareNextLevel() { |
265 | 11.5M | curr_level_++; |
266 | 18.2M | while (curr_level_ < num_levels_) { |
267 | 15.7M | curr_file_level_ = &(*level_files_brief_)[curr_level_]; |
268 | 15.7M | if (curr_file_level_->num_files == 0) { |
269 | | // When current level is empty, the search bound generated from upper |
270 | | // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is |
271 | | // also empty. |
272 | 6.74M | assert(search_left_bound_ == 0); |
273 | 0 | assert(search_right_bound_ == -1 || |
274 | 6.74M | search_right_bound_ == FileIndexer::kLevelMaxIndex); |
275 | | // Since current level is empty, it will need to search all files in |
276 | | // the next level |
277 | 0 | search_left_bound_ = 0; |
278 | 6.74M | search_right_bound_ = FileIndexer::kLevelMaxIndex; |
279 | 6.74M | curr_level_++; |
280 | 6.74M | continue; |
281 | 6.74M | } |
282 | | |
283 | | // Some files may overlap each other. We find |
284 | | // all files that overlap user_key and process them in order from |
285 | | // newest to oldest. In the context of merge-operator, this can occur at |
286 | | // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes |
287 | | // are always compacted into a single entry). |
288 | 8.95M | int32_t start_index; |
289 | 8.95M | if (curr_level_ == 0) { |
290 | | // On Level-0, we read through all files to check for overlap. |
291 | 5.98M | start_index = 0; |
292 | 5.98M | } else { |
293 | | // On Level-n (n>=1), files are sorted. Binary search to find the |
294 | | // earliest file whose largest key >= ikey. Search left bound and |
295 | | // right bound are used to narrow the range. |
296 | 2.96M | if (search_left_bound_ == search_right_bound_) { |
297 | 101k | start_index = search_left_bound_; |
298 | 2.86M | } else if (search_left_bound_ < search_right_bound_) { |
299 | 2.84M | if (search_right_bound_ == FileIndexer::kLevelMaxIndex) { |
300 | 2.23M | search_right_bound_ = |
301 | 2.23M | static_cast<int32_t>(curr_file_level_->num_files) - 1; |
302 | 2.23M | } |
303 | 2.84M | start_index = |
304 | 2.84M | FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_, |
305 | 2.84M | static_cast<uint32_t>(search_left_bound_), |
306 | 2.84M | static_cast<uint32_t>(search_right_bound_)); |
307 | 2.84M | } else { |
308 | | // search_left_bound > search_right_bound, key does not exist in |
309 | | // this level. Since no comparison is done in this level, it will |
310 | | // need to search all files in the next level. |
311 | 25.2k | search_left_bound_ = 0; |
312 | 25.2k | search_right_bound_ = FileIndexer::kLevelMaxIndex; |
313 | 25.2k | curr_level_++; |
314 | 25.2k | continue; |
315 | 25.2k | } |
316 | 2.96M | } |
317 | 8.93M | start_index_in_curr_level_ = start_index; |
318 | 8.93M | curr_index_in_curr_level_ = start_index; |
319 | 8.93M | #ifndef NDEBUG |
320 | 8.93M | prev_file_ = nullptr; |
321 | 8.93M | #endif |
322 | 8.93M | return true; |
323 | 8.95M | } |
324 | | // curr_level_ = num_levels_. So, no more levels to search. |
325 | 2.58M | return false; |
326 | 11.5M | } |
327 | | }; |
328 | | |
329 | 778k | SstFileMetaData::BoundaryValues ConvertBoundaryValues(const FileMetaData::BoundaryValues& source) { |
330 | 778k | SstFileMetaData::BoundaryValues result; |
331 | 778k | result.key = source.key.user_key().ToBuffer(); |
332 | 778k | result.seqno = source.seqno; |
333 | 778k | result.user_frontier = source.user_frontier; |
334 | 778k | result.user_values = source.user_values; |
335 | 778k | return result; |
336 | 778k | } |
337 | | |
338 | | } // anonymous namespace |
339 | | |
340 | 1.57M | VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } |
341 | | |
342 | 41.0M | uint64_t VersionStorageInfo::NumFiles() const { |
343 | 41.0M | uint64_t result = 0; |
344 | 55.5M | for (int level = num_non_empty_levels_; level-- > 0;) { |
345 | 14.5M | result += files_[level].size(); |
346 | 14.5M | } |
347 | 41.0M | return result; |
348 | 41.0M | } |
349 | | |
350 | 1.57M | Version::~Version() { |
351 | 1.57M | assert(refs_ == 0); |
352 | | |
353 | | // Remove from linked list |
354 | 0 | prev_->next_ = next_; |
355 | 1.57M | next_->prev_ = prev_; |
356 | | |
357 | | // Drop references to files |
358 | 3.27M | for (int level = 0; level < storage_info_.num_levels_; level++1.69M ) { |
359 | 2.66M | for (size_t i = 0; i < storage_info_.files_[level].size(); i++962k ) { |
360 | 962k | FileMetaData* f = storage_info_.files_[level][i]; |
361 | 962k | assert(f->refs > 0); |
362 | 0 | vset_->UnrefFile(cfd_, f); |
363 | 962k | } |
364 | 1.69M | } |
365 | 1.57M | } |
366 | | |
367 | | int FindFile(const InternalKeyComparator& icmp, |
368 | | const LevelFilesBrief& file_level, |
369 | 173k | const Slice& key) { |
370 | 173k | return FindFileInRange(icmp, file_level, key, 0, |
371 | 173k | static_cast<uint32_t>(file_level.num_files)); |
372 | 173k | } |
373 | | |
374 | | void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, |
375 | | const std::vector<FileMetaData*>& files, |
376 | 302k | Arena* arena) { |
377 | 302k | assert(file_level); |
378 | 0 | assert(arena); |
379 | | |
380 | 0 | size_t num = files.size(); |
381 | 302k | file_level->num_files = num; |
382 | 302k | char* mem = arena->AllocateAligned(num * sizeof(FdWithBoundaries)); |
383 | 302k | file_level->files = reinterpret_cast<FdWithBoundaries*>(mem); |
384 | | |
385 | 1.33M | for (size_t i = 0; i < num; i++1.03M ) { |
386 | 1.03M | new (file_level->files + i) FdWithBoundaries(arena, *files[i]); |
387 | 1.03M | } |
388 | 302k | } |
389 | | |
390 | | static bool AfterFile(const Comparator* ucmp, |
391 | 14 | const Slice* user_key, const FdWithBoundaries* f) { |
392 | | // nullptr user_key occurs before all keys and is therefore never after *f |
393 | 14 | return (user_key != nullptr && |
394 | 14 | ucmp->Compare(*user_key, f->largest.user_key()) > 0); |
395 | 14 | } |
396 | | |
397 | | static bool BeforeFile(const Comparator* ucmp, |
398 | 7.63k | const Slice* user_key, const FdWithBoundaries* f) { |
399 | | // nullptr user_key occurs after all keys and is therefore never before *f |
400 | 7.63k | return (user_key != nullptr && |
401 | 7.63k | ucmp->Compare(*user_key, f->smallest.user_key()) < 06.62k ); |
402 | 7.63k | } |
403 | | |
404 | | bool SomeFileOverlapsRange( |
405 | | const InternalKeyComparator& icmp, |
406 | | bool disjoint_sorted_files, |
407 | | const LevelFilesBrief& file_level, |
408 | | const Slice* smallest_user_key, |
409 | 24.7k | const Slice* largest_user_key) { |
410 | 24.7k | const Comparator* ucmp = icmp.user_comparator(); |
411 | 24.7k | if (!disjoint_sorted_files) { |
412 | | // Need to check against all files |
413 | 16 | for (size_t i = 0; i < file_level.num_files; i++4 ) { |
414 | 14 | const FdWithBoundaries* f = file_level.files + i; |
415 | 14 | if (AfterFile(ucmp, smallest_user_key, f) || |
416 | 14 | BeforeFile(ucmp, largest_user_key, f)12 ) { |
417 | | // No overlap |
418 | 10 | } else { |
419 | 10 | return true; // Overlap |
420 | 10 | } |
421 | 14 | } |
422 | 2 | return false; |
423 | 12 | } |
424 | | |
425 | | // Binary search over file list |
426 | 24.7k | uint32_t index = 0; |
427 | 24.7k | if (smallest_user_key != nullptr) { |
428 | | // Find the earliest possible internal key for smallest_user_key |
429 | 23.5k | InternalKey small = InternalKey::MaxPossibleForUserKey(*smallest_user_key); |
430 | 23.5k | index = FindFile(icmp, file_level, small.Encode()); |
431 | 23.5k | } |
432 | | |
433 | 24.7k | if (index >= file_level.num_files) { |
434 | | // beginning of range is after all files, so no overlap. |
435 | 17.1k | return false; |
436 | 17.1k | } |
437 | | |
438 | 7.62k | return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]); |
439 | 24.7k | } |
440 | | |
441 | | namespace { |
442 | | |
443 | | // An internal iterator. For a given version/level pair, yields |
444 | | // information about the files in the level. For a given entry, key() |
445 | | // is the largest key that occurs in the file, and value() is an |
446 | | // 16-byte value containing the file number and file size, both |
447 | | // encoded using EncodeFixed64. |
448 | | class LevelFileNumIterator : public InternalIterator { |
449 | | public: |
450 | | LevelFileNumIterator(const InternalKeyComparator& icmp, |
451 | | const LevelFilesBrief* flevel) |
452 | | : icmp_(icmp), |
453 | | flevel_(flevel), |
454 | | index_(static_cast<uint32_t>(flevel->num_files)), |
455 | 27.8k | current_value_(0, 0, 0, 0) { // Marks as invalid |
456 | 27.8k | } |
457 | | |
458 | 596k | bool Valid() const override { return index_ < flevel_->num_files; } |
459 | | |
460 | 148k | void Seek(const Slice& target) override { |
461 | 148k | index_ = FindFile(icmp_, *flevel_, target); |
462 | 148k | } |
463 | | |
464 | 17.4k | void SeekToFirst() override { index_ = 0; } |
465 | | |
466 | 55 | void SeekToLast() override { |
467 | 55 | index_ = (flevel_->num_files == 0) |
468 | 55 | ? 00 |
469 | 55 | : static_cast<uint32_t>(flevel_->num_files) - 1; |
470 | 55 | } |
471 | | |
472 | 27.6k | void Next() override { |
473 | 27.6k | assert(Valid()); |
474 | 0 | index_++; |
475 | 27.6k | } |
476 | | |
477 | 144 | void Prev() override { |
478 | 144 | assert(Valid()); |
479 | 144 | if (index_ == 0) { |
480 | 39 | index_ = static_cast<uint32_t>(flevel_->num_files); // Marks as invalid |
481 | 105 | } else { |
482 | 105 | index_--; |
483 | 105 | } |
484 | 144 | } |
485 | | |
486 | 173k | Slice key() const override { |
487 | 173k | assert(Valid()); |
488 | 0 | return flevel_->files[index_].largest.key; |
489 | 173k | } |
490 | | |
491 | 173k | Slice value() const override { |
492 | 173k | assert(Valid()); |
493 | | |
494 | 0 | auto file_meta = flevel_->files[index_]; |
495 | 173k | current_value_ = file_meta.fd; |
496 | 173k | return Slice(reinterpret_cast<const char*>(¤t_value_), |
497 | 173k | sizeof(FileDescriptor)); |
498 | 173k | } |
499 | | |
500 | 54.9k | Status status() const override { return Status::OK(); } |
501 | | |
502 | | private: |
503 | | const InternalKeyComparator icmp_; |
504 | | const LevelFilesBrief* flevel_; |
505 | | uint32_t index_; |
506 | | mutable FileDescriptor current_value_; |
507 | | }; |
508 | | |
509 | | class LevelFileIteratorState : public TwoLevelIteratorState { |
510 | | public: |
511 | | // @param skip_filters Disables loading/accessing the filter block |
512 | | LevelFileIteratorState(TableCache* table_cache, |
513 | | const ReadOptions& read_options, |
514 | | const EnvOptions& env_options, |
515 | | const InternalKeyComparatorPtr& icomparator, |
516 | | HistogramImpl* file_read_hist, bool for_compaction, |
517 | | bool prefix_enabled, bool skip_filters) |
518 | | : TwoLevelIteratorState(prefix_enabled), |
519 | | table_cache_(table_cache), |
520 | | read_options_(read_options), |
521 | | env_options_(env_options), |
522 | | icomparator_(icomparator), |
523 | | file_read_hist_(file_read_hist), |
524 | | for_compaction_(for_compaction), |
525 | 27.8k | skip_filters_(skip_filters) {} |
526 | | |
527 | 154k | InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { |
528 | 154k | if (meta_handle.size() != sizeof(FileDescriptor)) { |
529 | 0 | return NewErrorInternalIterator( |
530 | 0 | STATUS(Corruption, "FileReader invoked with unexpected value")); |
531 | 154k | } else { |
532 | 154k | const FileDescriptor* fd = |
533 | 154k | reinterpret_cast<const FileDescriptor*>(meta_handle.data()); |
534 | 154k | return table_cache_->NewIterator( |
535 | 154k | read_options_, env_options_, icomparator_, *fd, Slice() /* filter */, |
536 | 154k | nullptr /* don't need reference to table*/, file_read_hist_, |
537 | 154k | for_compaction_, nullptr /* arena */, skip_filters_); |
538 | 154k | } |
539 | 154k | } |
540 | | |
541 | 1.84k | bool PrefixMayMatch(const Slice& internal_key) override { |
542 | 1.84k | return true; |
543 | 1.84k | } |
544 | | |
545 | | private: |
546 | | TableCache* table_cache_; |
547 | | const ReadOptions read_options_; |
548 | | const EnvOptions& env_options_; |
549 | | InternalKeyComparatorPtr icomparator_; |
550 | | HistogramImpl* file_read_hist_; |
551 | | bool for_compaction_; |
552 | | bool skip_filters_; |
553 | | }; |
554 | | |
555 | | // A wrapper of version builder which references the current version in |
556 | | // constructor and unref it in the destructor. |
557 | | // Both of the constructor and destructor need to be called inside DB Mutex. |
558 | | class BaseReferencedVersionBuilder { |
559 | | public: |
560 | | explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd) |
561 | | : version_builder_(new VersionBuilder( |
562 | | cfd->current()->version_set()->env_options(), cfd->table_cache(), |
563 | | cfd->current()->storage_info(), cfd->ioptions()->info_log)), |
564 | 773k | version_(cfd->current()) { |
565 | 773k | version_->Ref(); |
566 | 773k | } |
567 | 774k | ~BaseReferencedVersionBuilder() { |
568 | 774k | delete version_builder_; |
569 | 774k | version_->Unref(); |
570 | 774k | } |
571 | 1.24M | VersionBuilder* version_builder() { return version_builder_; } |
572 | | |
573 | | private: |
574 | | VersionBuilder* version_builder_; |
575 | | Version* version_; |
576 | | }; |
577 | | } // anonymous namespace |
578 | | |
579 | | Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp, |
580 | | const FileMetaData* file_meta, |
581 | 105k | const std::string* fname) const { |
582 | 105k | auto table_cache = cfd_->table_cache(); |
583 | 105k | auto ioptions = cfd_->ioptions(); |
584 | 105k | Status s = table_cache->GetTableProperties( |
585 | 105k | vset_->env_options_, cfd_->internal_comparator(), file_meta->fd, |
586 | 105k | tp, true /* no io */); |
587 | 105k | if (s.ok()) { |
588 | 74.2k | return s; |
589 | 74.2k | } |
590 | | |
591 | | // We only ignore error type `Incomplete` since it's by design that we |
592 | | // disallow table when it's not in table cache. |
593 | 31.0k | if (!s.IsIncomplete()) { |
594 | 0 | return s; |
595 | 0 | } |
596 | | |
597 | | // 2. Table is not present in table cache, we'll read the table properties |
598 | | // directly from the properties block in the file. |
599 | 31.0k | std::unique_ptr<RandomAccessFile> file; |
600 | 31.0k | if (fname != nullptr) { |
601 | 26 | s = ioptions->env->NewRandomAccessFile( |
602 | 26 | *fname, &file, vset_->env_options_); |
603 | 31.0k | } else { |
604 | 31.0k | s = ioptions->env->NewRandomAccessFile( |
605 | 31.0k | TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), |
606 | 31.0k | file_meta->fd.GetPathId()), |
607 | 31.0k | &file, vset_->env_options_); |
608 | 31.0k | } |
609 | 31.0k | if (!s.ok()) { |
610 | 7 | return s; |
611 | 7 | } |
612 | | |
613 | 31.0k | TableProperties* raw_table_properties; |
614 | | // By setting the magic number to kInvalidTableMagicNumber, we can by |
615 | | // pass the magic number check in the footer. |
616 | 31.0k | std::unique_ptr<RandomAccessFileReader> file_reader( |
617 | 31.0k | new RandomAccessFileReader(std::move(file))); |
618 | 31.0k | s = ReadTableProperties( |
619 | 31.0k | file_reader.get(), file_meta->fd.GetBaseFileSize(), |
620 | 31.0k | Footer::kInvalidTableMagicNumber /* table's magic number */, vset_->env_, |
621 | 31.0k | ioptions->info_log, &raw_table_properties); |
622 | 31.0k | if (!s.ok()) { |
623 | 45 | return s; |
624 | 45 | } |
625 | 30.9k | RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); |
626 | | |
627 | 30.9k | *tp = std::shared_ptr<const TableProperties>(raw_table_properties); |
628 | 30.9k | return s; |
629 | 31.0k | } |
630 | | |
631 | 159 | Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { |
632 | 159 | Status s; |
633 | 1.26k | for (int level = 0; level < storage_info_.num_levels_; level++1.10k ) { |
634 | 1.10k | s = GetPropertiesOfAllTables(props, level); |
635 | 1.10k | if (!s.ok()) { |
636 | 0 | return s; |
637 | 0 | } |
638 | 1.10k | } |
639 | | |
640 | 159 | return Status::OK(); |
641 | 159 | } |
642 | | |
643 | | Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props, |
644 | 1.80k | int level) { |
645 | 5.83k | for (const auto& file_meta : storage_info_.files_[level]) { |
646 | 5.83k | auto fname = |
647 | 5.83k | TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), |
648 | 5.83k | file_meta->fd.GetPathId()); |
649 | | // 1. If the table is already present in table cache, load table |
650 | | // properties from there. |
651 | 5.83k | std::shared_ptr<const TableProperties> table_properties; |
652 | 5.83k | Status s = GetTableProperties(&table_properties, file_meta, &fname); |
653 | 5.83k | if (s.ok()) { |
654 | 5.83k | props->insert({fname, table_properties}); |
655 | 5.83k | } else { |
656 | 0 | return s; |
657 | 0 | } |
658 | 5.83k | } |
659 | | |
660 | 1.80k | return Status::OK(); |
661 | 1.80k | } |
662 | | |
663 | | Status Version::GetPropertiesOfTablesInRange( |
664 | 0 | const Range* range, std::size_t n, TablePropertiesCollection* props) const { |
665 | 0 | for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { |
666 | 0 | for (decltype(n) i = 0; i < n; i++) { |
667 | | // Convert user_key into a corresponding internal key. |
668 | 0 | const InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); |
669 | 0 | const InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); |
670 | 0 | std::vector<FileMetaData*> files; |
671 | 0 | storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr, false); |
672 | 0 | for (const auto& file_meta : files) { |
673 | 0 | auto fname = |
674 | 0 | TableFileName(vset_->db_options_->db_paths, |
675 | 0 | file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); |
676 | 0 | if (props->count(fname) == 0) { |
677 | | // 1. If the table is already present in table cache, load table |
678 | | // properties from there. |
679 | 0 | std::shared_ptr<const TableProperties> table_properties; |
680 | 0 | Status s = GetTableProperties(&table_properties, file_meta, &fname); |
681 | 0 | if (s.ok()) { |
682 | 0 | props->insert({fname, table_properties}); |
683 | 0 | } else { |
684 | 0 | return s; |
685 | 0 | } |
686 | 0 | } |
687 | 0 | } |
688 | 0 | } |
689 | 0 | } |
690 | | |
691 | 0 | return Status::OK(); |
692 | 0 | } |
693 | | |
694 | | Status Version::GetAggregatedTableProperties( |
695 | 803 | std::shared_ptr<const TableProperties>* tp, int level) { |
696 | 803 | TablePropertiesCollection props; |
697 | 803 | Status s; |
698 | 803 | if (level < 0) { |
699 | 103 | s = GetPropertiesOfAllTables(&props); |
700 | 700 | } else { |
701 | 700 | s = GetPropertiesOfAllTables(&props, level); |
702 | 700 | } |
703 | 803 | if (!s.ok()) { |
704 | 0 | return s; |
705 | 0 | } |
706 | | |
707 | 803 | auto* new_tp = new TableProperties(); |
708 | 5.76k | for (const auto& item : props) { |
709 | 5.76k | new_tp->Add(*item.second); |
710 | 5.76k | } |
711 | 803 | tp->reset(new_tp); |
712 | 803 | return Status::OK(); |
713 | 803 | } |
714 | | |
715 | 152 | size_t Version::GetMemoryUsageByTableReaders() { |
716 | 152 | size_t total_usage = 0; |
717 | 152 | for (auto& file_level : storage_info_.level_files_brief_) { |
718 | 168 | for (size_t i = 0; i < file_level.num_files; i++80 ) { |
719 | 80 | total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( |
720 | 80 | vset_->env_options_, cfd_->internal_comparator(), |
721 | 80 | file_level.files[i].fd); |
722 | 80 | } |
723 | 88 | } |
724 | 152 | return total_usage; |
725 | 152 | } |
726 | | |
727 | 655 | void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) { |
728 | 655 | assert(cf_meta); |
729 | 0 | assert(cfd_); |
730 | | |
731 | 0 | cf_meta->name = cfd_->GetName(); |
732 | 655 | cf_meta->size = 0; |
733 | 655 | cf_meta->file_count = 0; |
734 | 655 | cf_meta->levels.clear(); |
735 | | |
736 | 655 | auto* ioptions = cfd_->ioptions(); |
737 | 655 | auto* vstorage = storage_info(); |
738 | | |
739 | 2.86k | for (int level = 0; level < cfd_->NumberLevels(); level++2.20k ) { |
740 | 2.20k | uint64_t level_size = 0; |
741 | 2.20k | cf_meta->file_count += vstorage->LevelFiles(level).size(); |
742 | 2.20k | std::vector<SstFileMetaData> files; |
743 | 5.14k | for (const auto& file : vstorage->LevelFiles(level)) { |
744 | 5.14k | uint32_t path_id = file->fd.GetPathId(); |
745 | 5.14k | std::string file_path; |
746 | 5.14k | if (path_id < ioptions->db_paths.size()) { |
747 | 5.14k | file_path = ioptions->db_paths[path_id].path; |
748 | 5.14k | } else { |
749 | 0 | assert(!ioptions->db_paths.empty()); |
750 | 0 | file_path = ioptions->db_paths.back().path; |
751 | 0 | } |
752 | 0 | files.emplace_back( |
753 | 5.14k | MakeTableFileName("", file->fd.GetNumber()), |
754 | 5.14k | file_path, |
755 | 5.14k | file->fd.GetTotalFileSize(), |
756 | 5.14k | file->fd.GetBaseFileSize(), |
757 | 5.14k | file->fd.GetBaseFileSize() + |
758 | 5.14k | file->raw_key_size + file->raw_value_size, |
759 | 5.14k | ConvertBoundaryValues(file->smallest), |
760 | 5.14k | ConvertBoundaryValues(file->largest), |
761 | 5.14k | file->being_compacted); |
762 | 5.14k | level_size += file->fd.GetTotalFileSize(); |
763 | 5.14k | } |
764 | 2.20k | cf_meta->levels.emplace_back( |
765 | 2.20k | level, level_size, std::move(files)); |
766 | 2.20k | cf_meta->size += level_size; |
767 | 2.20k | } |
768 | 655 | } |
769 | | |
770 | | |
771 | 16 | uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const { |
772 | | // Estimation will be inaccurate when: |
773 | | // (1) there exist merge keys |
774 | | // (2) keys are directly overwritten |
775 | | // (3) deletion on non-existing keys |
776 | | // (4) low number of samples |
777 | 16 | if (current_num_samples_ == 0) { |
778 | 3 | return 0; |
779 | 3 | } |
780 | | |
781 | 13 | if (current_num_non_deletions_ <= current_num_deletions_) { |
782 | 0 | return 0; |
783 | 0 | } |
784 | | |
785 | 13 | uint64_t est = current_num_non_deletions_ - current_num_deletions_; |
786 | | |
787 | 13 | uint64_t file_count = 0; |
788 | 96 | for (int level = 0; level < num_levels_; ++level83 ) { |
789 | 83 | file_count += files_[level].size(); |
790 | 83 | } |
791 | | |
792 | 13 | if (current_num_samples_ < file_count) { |
793 | | // casting to avoid overflowing |
794 | 1 | return |
795 | 1 | static_cast<uint64_t>( |
796 | 1 | (est * static_cast<double>(file_count) / current_num_samples_) |
797 | 1 | ); |
798 | 12 | } else { |
799 | 12 | return est; |
800 | 12 | } |
801 | 13 | } |
802 | | |
803 | | void Version::AddIterators(const ReadOptions& read_options, |
804 | | const EnvOptions& soptions, |
805 | 38.1M | MergeIteratorBuilder* merge_iter_builder) { |
806 | 38.1M | assert(storage_info_.finalized_); |
807 | | |
808 | 38.1M | if (storage_info_.num_non_empty_levels() == 0) { |
809 | | // No file in the Version. |
810 | 27.3M | return; |
811 | 27.3M | } |
812 | | |
813 | 10.7M | auto* arena = merge_iter_builder->GetArena(); |
814 | | |
815 | | // Merge all level zero files together since they may overlap |
816 | 31.3M | for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++20.5M ) { |
817 | 20.5M | const auto& file = storage_info_.LevelFilesBrief(0).files[i]; |
818 | 20.5M | if (!read_options.file_filter || read_options.file_filter->Filter(file)14.8M ) { |
819 | 20.5M | InternalIterator *file_iter; |
820 | 20.5M | TableCache::TableReaderWithHandle trwh; |
821 | 20.5M | Status s = cfd_->table_cache()->GetTableReaderForIterator(read_options, soptions, |
822 | 20.5M | cfd_->internal_comparator(), file.fd, &trwh, cfd_->internal_stats()->GetFileReadHist(0), |
823 | 20.5M | false); |
824 | 20.5M | if (s.ok()20.5M ) { |
825 | 20.5M | if (!read_options.table_aware_file_filter || |
826 | 20.5M | read_options.table_aware_file_filter->Filter(trwh.table_reader)12.9M ) { |
827 | 14.3M | file_iter = cfd_->table_cache()->NewIterator( |
828 | 14.3M | read_options, &trwh, storage_info_.LevelFiles(0)[i]->UserFilter(), false, arena); |
829 | 14.3M | } else { |
830 | 6.13M | file_iter = nullptr; |
831 | 6.13M | } |
832 | 18.4E | } else { |
833 | 18.4E | file_iter = NewErrorInternalIterator(s, arena); |
834 | 18.4E | } |
835 | 20.5M | if (file_iter) { |
836 | 14.3M | merge_iter_builder->AddIterator(file_iter); |
837 | 14.3M | } |
838 | 20.5M | } |
839 | 20.5M | } |
840 | | |
841 | | // For levels > 0, we can use a concatenating iterator that sequentially |
842 | | // walks through the non-overlapping files in the level, opening them |
843 | | // lazily. |
844 | 10.8M | for (int level = 1; level < storage_info_.num_non_empty_levels(); level++19.9k ) { |
845 | 19.9k | if (storage_info_.LevelFilesBrief(level).num_files != 0) { |
846 | 18.0k | auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); |
847 | 18.0k | auto* state = new (mem) |
848 | 18.0k | LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, |
849 | 18.0k | cfd_->internal_comparator(), |
850 | 18.0k | cfd_->internal_stats()->GetFileReadHist(level), |
851 | 18.0k | false /* for_compaction */, |
852 | 18.0k | cfd_->ioptions()->prefix_extractor != nullptr, |
853 | 18.0k | IsFilterSkipped(level)); |
854 | 18.0k | mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); |
855 | 18.0k | auto* first_level_iter = new (mem) LevelFileNumIterator( |
856 | 18.0k | *cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); |
857 | 18.0k | merge_iter_builder->AddIterator(NewTwoLevelIterator(state, first_level_iter, arena, false)); |
858 | 18.0k | } |
859 | 19.9k | } |
860 | 10.7M | } |
861 | | |
862 | | VersionStorageInfo::VersionStorageInfo( |
863 | | const InternalKeyComparatorPtr& internal_comparator, |
864 | | const Comparator* user_comparator, int levels, |
865 | | CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage) |
866 | | : internal_comparator_(internal_comparator), |
867 | | user_comparator_(user_comparator), |
868 | | // cfd is nullptr if Version is dummy |
869 | | num_levels_(levels), |
870 | | num_non_empty_levels_(0), |
871 | | file_indexer_(user_comparator), |
872 | | compaction_style_(compaction_style), |
873 | | files_(new std::vector<FileMetaData*>[num_levels_]), |
874 | | base_level_(num_levels_ == 1 ? -1 : 1), |
875 | | files_by_compaction_pri_(num_levels_), |
876 | | level0_non_overlapping_(false), |
877 | | next_file_to_compact_by_size_(num_levels_), |
878 | | compaction_score_(num_levels_), |
879 | | compaction_level_(num_levels_), |
880 | | l0_delay_trigger_count_(0), |
881 | | accumulated_file_size_(0), |
882 | | accumulated_raw_key_size_(0), |
883 | | accumulated_raw_value_size_(0), |
884 | | accumulated_num_non_deletions_(0), |
885 | | accumulated_num_deletions_(0), |
886 | | current_num_non_deletions_(0), |
887 | | current_num_deletions_(0), |
888 | | current_num_samples_(0), |
889 | | estimated_compaction_needed_bytes_(0), |
890 | 1.65M | finalized_(false) { |
891 | 1.65M | if (ref_vstorage != nullptr) { |
892 | 774k | accumulated_file_size_ = ref_vstorage->accumulated_file_size_; |
893 | 774k | accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_; |
894 | 774k | accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_; |
895 | 774k | accumulated_num_non_deletions_ = |
896 | 774k | ref_vstorage->accumulated_num_non_deletions_; |
897 | 774k | accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_; |
898 | 774k | current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_; |
899 | 774k | current_num_deletions_ = ref_vstorage->current_num_deletions_; |
900 | 774k | current_num_samples_ = ref_vstorage->current_num_samples_; |
901 | 774k | } |
902 | 1.65M | } |
903 | | |
904 | | Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, |
905 | | uint64_t version_number) |
906 | | : env_(vset->env_), |
907 | | cfd_(column_family_data), |
908 | | info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log), |
909 | | db_statistics_((cfd_ == nullptr) ? nullptr |
910 | | : cfd_->ioptions()->statistics), |
911 | | table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()), |
912 | | merge_operator_((cfd_ == nullptr) ? nullptr |
913 | | : cfd_->ioptions()->merge_operator), |
914 | | storage_info_((cfd_ == nullptr) ? nullptr : cfd_->internal_comparator(), |
915 | | (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(), |
916 | | cfd_ == nullptr ? 0 : cfd_->NumberLevels(), |
917 | | cfd_ == nullptr ? kCompactionStyleLevel |
918 | | : cfd_->ioptions()->compaction_style, |
919 | | (cfd_ == nullptr || cfd_->current() == nullptr) |
920 | | ? nullptr |
921 | | : cfd_->current()->storage_info()), |
922 | | vset_(vset), |
923 | | next_(this), |
924 | | prev_(this), |
925 | | refs_(0), |
926 | 1.65M | version_number_(version_number) {} |
927 | | |
928 | | void Version::Get(const ReadOptions& read_options, const LookupKey& k, |
929 | | std::string* value, Status* status, |
930 | | MergeContext* merge_context, bool* value_found, |
931 | 7.30M | bool* key_exists, SequenceNumber* seq) { |
932 | 7.30M | Slice ikey = k.internal_key(); |
933 | 7.30M | Slice user_key = k.user_key(); |
934 | | |
935 | 7.30M | assert(status->ok() || status->IsMergeInProgress()); |
936 | | |
937 | 7.30M | if (key_exists != nullptr) { |
938 | | // will falsify below if not found |
939 | 21 | *key_exists = true; |
940 | 21 | } |
941 | | |
942 | 7.30M | GetContext get_context( |
943 | 7.30M | user_comparator(), merge_operator_, info_log_, db_statistics_, |
944 | 7.30M | status->ok() ? GetContext::kNotFound7.30M : GetContext::kMerge1.84k , user_key, |
945 | 7.30M | value, value_found, merge_context, this->env_, seq); |
946 | | |
947 | 7.30M | FilePicker fp( |
948 | 7.30M | storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, |
949 | 7.30M | storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, |
950 | 7.30M | user_comparator(), internal_comparator().get()); |
951 | 7.30M | FdWithBoundaries* f = fp.GetNextFile(); |
952 | 9.08M | while (f != nullptr) { |
953 | 8.25M | *status = table_cache_->Get( |
954 | 8.25M | read_options, internal_comparator(), f->fd, ikey, &get_context, |
955 | 8.25M | cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), |
956 | 8.25M | IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()), |
957 | 8.25M | fp.IsHitFileLastInLevel())); |
958 | | // TODO: examine the behavior for corrupted key |
959 | 8.25M | if (!status->ok()) { |
960 | 524 | return; |
961 | 524 | } |
962 | | |
963 | 8.25M | switch (get_context.State()) { |
964 | 1.78M | case GetContext::kNotFound: |
965 | | // Keep searching in other files |
966 | 1.78M | break; |
967 | 6.47M | case GetContext::kFound: |
968 | 6.47M | if (fp.GetHitFileLevel() == 0) { |
969 | 4.40M | RecordTick(db_statistics_, GET_HIT_L0); |
970 | 4.40M | } else if (2.06M fp.GetHitFileLevel() == 12.06M ) { |
971 | 1.20M | RecordTick(db_statistics_, GET_HIT_L1); |
972 | 1.20M | } else if (853k fp.GetHitFileLevel() >= 2853k ) { |
973 | 853k | RecordTick(db_statistics_, GET_HIT_L2_AND_UP); |
974 | 853k | } |
975 | 6.47M | return; |
976 | 4.91k | case GetContext::kDeleted: |
977 | | // Use empty error message for speed |
978 | 4.91k | *status = STATUS(NotFound, ""); |
979 | 4.91k | return; |
980 | 0 | case GetContext::kCorrupt: |
981 | 0 | *status = STATUS(Corruption, "corrupted key for ", user_key); |
982 | 0 | return; |
983 | 567 | case GetContext::kMerge: |
984 | 567 | break; |
985 | 8.25M | } |
986 | 1.78M | f = fp.GetNextFile(); |
987 | 1.78M | } |
988 | | |
989 | 827k | if (GetContext::kMerge == get_context.State()) { |
990 | 1.51k | if (!merge_operator_) { |
991 | 0 | *status = STATUS(InvalidArgument, |
992 | 0 | "merge_operator is not properly initialized."); |
993 | 0 | return; |
994 | 0 | } |
995 | | // merge_operands are in saver and we hit the beginning of the key history |
996 | | // do a final merge of nullptr and operands; |
997 | 1.51k | if (merge_operator_->FullMerge(user_key, nullptr, |
998 | 1.51k | merge_context->GetOperands(), value, |
999 | 1.51k | info_log_)) { |
1000 | 1.51k | *status = Status::OK(); |
1001 | 1.51k | } else { |
1002 | 0 | RecordTick(db_statistics_, NUMBER_MERGE_FAILURES); |
1003 | 0 | *status = STATUS(Corruption, "could not perform end-of-key merge for ", |
1004 | 0 | user_key); |
1005 | 0 | } |
1006 | 825k | } else { |
1007 | 825k | if (key_exists != nullptr) { |
1008 | 3 | *key_exists = false; |
1009 | 3 | } |
1010 | 825k | *status = STATUS(NotFound, ""); // Use an empty error message for speed |
1011 | 825k | } |
1012 | 827k | } |
1013 | | |
1014 | 8.27M | bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { |
1015 | | // Reaching the bottom level implies misses at all upper levels, so we'll |
1016 | | // skip checking the filters when we predict a hit. |
1017 | 8.27M | return cfd_->ioptions()->optimize_filters_for_hits && |
1018 | 8.27M | (561k level > 0561k || is_file_last_in_level224k ) && |
1019 | 8.27M | level == storage_info_.num_non_empty_levels() - 1545k ; |
1020 | 8.27M | } |
1021 | | |
1022 | 773k | void VersionStorageInfo::GenerateLevelFilesBrief() { |
1023 | 773k | level_files_brief_.resize(num_non_empty_levels_); |
1024 | 1.03M | for (int level = 0; level < num_non_empty_levels_; level++256k ) { |
1025 | 256k | DoGenerateLevelFilesBrief( |
1026 | 256k | &level_files_brief_[level], files_[level], &arena_); |
1027 | 256k | } |
1028 | 773k | } |
1029 | | |
1030 | | void Version::PrepareApply( |
1031 | | const MutableCFOptions& mutable_cf_options, |
1032 | 773k | bool update_stats) { |
1033 | 773k | UpdateAccumulatedStats(update_stats); |
1034 | 773k | storage_info_.UpdateNumNonEmptyLevels(); |
1035 | 773k | storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options); |
1036 | 773k | storage_info_.UpdateFilesByCompactionPri(mutable_cf_options); |
1037 | 773k | storage_info_.GenerateFileIndexer(); |
1038 | 773k | storage_info_.GenerateLevelFilesBrief(); |
1039 | 773k | storage_info_.GenerateLevel0NonOverlapping(); |
1040 | 773k | } |
1041 | | |
1042 | 961k | bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { |
1043 | 961k | if (file_meta->init_stats_from_file || |
1044 | 961k | file_meta->compensated_file_size > 0118k ) { |
1045 | 864k | return false; |
1046 | 864k | } |
1047 | 96.8k | std::shared_ptr<const TableProperties> tp; |
1048 | 96.8k | Status s = GetTableProperties(&tp, file_meta); |
1049 | 96.8k | file_meta->init_stats_from_file = true; |
1050 | 96.8k | if (!s.ok()) { |
1051 | 52 | RLOG(InfoLogLevel::ERROR_LEVEL, vset_->db_options_->info_log, |
1052 | 52 | "Unable to load table properties for file %" PRIu64 " --- %s\n", |
1053 | 52 | file_meta->fd.GetNumber(), s.ToString().c_str()); |
1054 | 52 | return false; |
1055 | 52 | } |
1056 | 96.7k | if (tp.get() == nullptr) return false0 ; |
1057 | 96.7k | file_meta->num_entries = tp->num_entries; |
1058 | 96.7k | file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties); |
1059 | 96.7k | file_meta->raw_value_size = tp->raw_value_size; |
1060 | 96.7k | file_meta->raw_key_size = tp->raw_key_size; |
1061 | | |
1062 | 96.7k | return true; |
1063 | 96.7k | } |
1064 | | |
1065 | 96.7k | void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) { |
1066 | 96.7k | assert(file_meta->init_stats_from_file); |
1067 | 0 | accumulated_file_size_ += file_meta->fd.GetTotalFileSize(); |
1068 | 96.7k | accumulated_raw_key_size_ += file_meta->raw_key_size; |
1069 | 96.7k | accumulated_raw_value_size_ += file_meta->raw_value_size; |
1070 | 96.7k | accumulated_num_non_deletions_ += |
1071 | 96.7k | file_meta->num_entries - file_meta->num_deletions; |
1072 | 96.7k | accumulated_num_deletions_ += file_meta->num_deletions; |
1073 | | |
1074 | 96.7k | current_num_non_deletions_ += |
1075 | 96.7k | file_meta->num_entries - file_meta->num_deletions; |
1076 | 96.7k | current_num_deletions_ += file_meta->num_deletions; |
1077 | 96.7k | current_num_samples_++; |
1078 | 96.7k | } |
1079 | | |
1080 | 65.3k | void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) { |
1081 | 65.3k | if (file_meta->init_stats_from_file) { |
1082 | 63.7k | current_num_non_deletions_ -= |
1083 | 63.7k | file_meta->num_entries - file_meta->num_deletions; |
1084 | 63.7k | current_num_deletions_ -= file_meta->num_deletions; |
1085 | 63.7k | current_num_samples_--; |
1086 | 63.7k | } |
1087 | 65.3k | } |
1088 | | |
1089 | 773k | void Version::UpdateAccumulatedStats(bool update_stats) { |
1090 | 773k | if (update_stats773k ) { |
1091 | | // maximum number of table properties loaded from files. |
1092 | 773k | const int kMaxInitCount = 20; |
1093 | 773k | int init_count = 0; |
1094 | | // here only the first kMaxInitCount files which haven't been |
1095 | | // initialized from file will be updated with num_deletions. |
1096 | | // The motivation here is to cap the maximum I/O per Version creation. |
1097 | | // The reason for choosing files from lower-level instead of higher-level |
1098 | | // is that such design is able to propagate the initialization from |
1099 | | // lower-level to higher-level: When the num_deletions of lower-level |
1100 | | // files are updated, it will make the lower-level files have accurate |
1101 | | // compensated_file_size, making lower-level to higher-level compaction |
1102 | | // will be triggered, which creates higher-level files whose num_deletions |
1103 | | // will be updated here. |
1104 | 773k | for (int level = 0; |
1105 | 1.96M | level < storage_info_.num_levels_ && init_count < kMaxInitCount1.19M ; |
1106 | 1.19M | ++level) { |
1107 | 1.19M | for (auto* file_meta : storage_info_.files_[level]) { |
1108 | 960k | if (MaybeInitializeFileMetaData(file_meta)) { |
1109 | | // each FileMeta will be initialized only once. |
1110 | 96.7k | storage_info_.UpdateAccumulatedStats(file_meta); |
1111 | 96.7k | if (++init_count >= kMaxInitCount) { |
1112 | 269 | break; |
1113 | 269 | } |
1114 | 96.7k | } |
1115 | 960k | } |
1116 | 1.19M | } |
1117 | | // In case all sampled-files contain only deletion entries, then we |
1118 | | // load the table-property of a file in higher-level to initialize |
1119 | | // that value. |
1120 | 773k | for (int level = storage_info_.num_levels_ - 1; |
1121 | 1.56M | storage_info_.accumulated_raw_value_size_ == 0 && level >= 01.48M ; |
1122 | 787k | --level) { |
1123 | 787k | for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1; |
1124 | 788k | storage_info_.accumulated_raw_value_size_ == 0 && i >= 0788k ; --i1.26k ) { |
1125 | 1.26k | if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) { |
1126 | 0 | storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]); |
1127 | 0 | } |
1128 | 1.26k | } |
1129 | 787k | } |
1130 | 773k | } |
1131 | | |
1132 | 773k | storage_info_.ComputeCompensatedSizes(); |
1133 | 773k | } |
1134 | | |
1135 | 773k | void VersionStorageInfo::ComputeCompensatedSizes() { |
1136 | 773k | static const int kDeletionWeightOnCompaction = 2; |
1137 | 773k | uint64_t average_value_size = GetAverageValueSize(); |
1138 | | |
1139 | | // compute the compensated size |
1140 | 1.96M | for (int level = 0; level < num_levels_; level++1.19M ) { |
1141 | 1.19M | for (auto* file_meta : files_[level]) { |
1142 | | // Here we only compute compensated_file_size for those file_meta |
1143 | | // which compensated_file_size is uninitialized (== 0). This is true only |
1144 | | // for files that have been created right now and no other thread has |
1145 | | // access to them. That's why we can safely mutate compensated_file_size. |
1146 | 966k | if (file_meta->compensated_file_size == 0) { |
1147 | 102k | file_meta->compensated_file_size = file_meta->fd.GetTotalFileSize(); |
1148 | | // Here we only boost the size of deletion entries of a file only |
1149 | | // when the number of deletion entries is greater than the number of |
1150 | | // non-deletion entries in the file. The motivation here is that in |
1151 | | // a stable workload, the number of deletion entries should be roughly |
1152 | | // equal to the number of non-deletion entries. If we compensate the |
1153 | | // size of deletion entries in a stable workload, the deletion |
1154 | | // compensation logic might introduce unwanted effet which changes the |
1155 | | // shape of LSM tree. |
1156 | 102k | if (file_meta->num_deletions * 2 >= file_meta->num_entries) { |
1157 | 11.3k | file_meta->compensated_file_size += |
1158 | 11.3k | (file_meta->num_deletions * 2 - file_meta->num_entries) * |
1159 | 11.3k | average_value_size * kDeletionWeightOnCompaction; |
1160 | 11.3k | } |
1161 | 102k | } |
1162 | 966k | } |
1163 | 1.19M | } |
1164 | 773k | } |
1165 | | |
1166 | 3.96M | int VersionStorageInfo::MaxInputLevel() const { |
1167 | 3.96M | if (compaction_style_ == kCompactionStyleLevel) { |
1168 | 1.67M | return num_levels() - 2; |
1169 | 1.67M | } |
1170 | 2.28M | return 0; |
1171 | 3.96M | } |
1172 | | |
1173 | | void VersionStorageInfo::EstimateCompactionBytesNeeded( |
1174 | 1.23M | const MutableCFOptions& mutable_cf_options) { |
1175 | | // Only implemented for level-based compaction |
1176 | 1.23M | if (compaction_style_ != kCompactionStyleLevel) { |
1177 | 1.14M | estimated_compaction_needed_bytes_ = 0; |
1178 | 1.14M | return; |
1179 | 1.14M | } |
1180 | | |
1181 | | // Start from Level 0, if level 0 qualifies compaction to level 1, |
1182 | | // we estimate the size of compaction. |
1183 | | // Then we move on to the next level and see whether it qualifies compaction |
1184 | | // to the next level. The size of the level is estimated as the actual size |
1185 | | // on the level plus the input bytes from the previous level if there is any. |
1186 | | // If it exceeds, take the exceeded bytes as compaction input and add the size |
1187 | | // of the compaction size to tatal size. |
1188 | | // We keep doing it to Level 2, 3, etc, until the last level and return the |
1189 | | // accumulated bytes. |
1190 | | |
1191 | 92.0k | uint64_t bytes_compact_to_next_level = 0; |
1192 | | // Level 0 |
1193 | 92.0k | bool level0_compact_triggered = false; |
1194 | 92.0k | if (static_cast<int>(files_[0].size()) > |
1195 | 92.0k | mutable_cf_options.level0_file_num_compaction_trigger) { |
1196 | 8.88k | level0_compact_triggered = true; |
1197 | 79.6k | for (auto* f : files_[0]) { |
1198 | 79.6k | bytes_compact_to_next_level += f->fd.GetTotalFileSize(); |
1199 | 79.6k | } |
1200 | 8.88k | estimated_compaction_needed_bytes_ = bytes_compact_to_next_level; |
1201 | 83.1k | } else { |
1202 | 83.1k | estimated_compaction_needed_bytes_ = 0; |
1203 | 83.1k | } |
1204 | | |
1205 | | // Level 1 and up. |
1206 | 542k | for (int level = base_level(); level <= MaxInputLevel(); level++450k ) { |
1207 | 450k | uint64_t level_size = 0; |
1208 | 600k | for (auto* f : files_[level]) { |
1209 | 600k | level_size += f->fd.GetTotalFileSize(); |
1210 | 600k | } |
1211 | 450k | if (level == base_level() && level0_compact_triggered91.8k ) { |
1212 | | // Add base level size to compaction if level0 compaction triggered. |
1213 | 8.87k | estimated_compaction_needed_bytes_ += level_size; |
1214 | 8.87k | } |
1215 | | // Add size added by previous compaction |
1216 | 450k | level_size += bytes_compact_to_next_level; |
1217 | 450k | bytes_compact_to_next_level = 0; |
1218 | 450k | uint64_t level_target = MaxBytesForLevel(level); |
1219 | 450k | if (level_size > level_target) { |
1220 | 42.5k | bytes_compact_to_next_level = level_size - level_target; |
1221 | | // Simplify to assume the actual compaction fan-out ratio is always |
1222 | | // mutable_cf_options.max_bytes_for_level_multiplier. |
1223 | 42.5k | estimated_compaction_needed_bytes_ += |
1224 | 42.5k | bytes_compact_to_next_level * |
1225 | 42.5k | (1 + mutable_cf_options.max_bytes_for_level_multiplier); |
1226 | 42.5k | } |
1227 | 450k | } |
1228 | 92.0k | } |
1229 | | |
1230 | | void VersionStorageInfo::ComputeCompactionScore( |
1231 | | const MutableCFOptions& mutable_cf_options, |
1232 | 1.23M | const CompactionOptionsFIFO& compaction_options_fifo) { |
1233 | 2.93M | for (int level = 0; level <= MaxInputLevel(); level++1.70M ) { |
1234 | 1.70M | double score; |
1235 | 1.70M | if (level == 0) { |
1236 | | // We treat level-0 specially by bounding the number of files |
1237 | | // instead of number of bytes for two reasons: |
1238 | | // |
1239 | | // (1) With larger write-buffer sizes, it is nice not to do too |
1240 | | // many level-0 compactions. |
1241 | | // |
1242 | | // (2) The files in level-0 are merged on every read and |
1243 | | // therefore we wish to avoid too many files when the individual |
1244 | | // file size is small (perhaps because of a small write-buffer |
1245 | | // setting, or very high compression ratios, or lots of |
1246 | | // overwrites/deletions). |
1247 | 1.23M | int num_sorted_runs = 0; |
1248 | 1.23M | uint64_t total_size = 0; |
1249 | 1.23M | for (auto* f : files_[level]) { |
1250 | 268k | if (!f->being_compacted) { |
1251 | 190k | total_size += f->compensated_file_size; |
1252 | 190k | num_sorted_runs++; |
1253 | 190k | } |
1254 | 268k | } |
1255 | 1.23M | if (compaction_style_ == kCompactionStyleUniversal) { |
1256 | | // For universal compaction, we use level0 score to indicate |
1257 | | // compaction score for the whole DB. Adding other levels as if |
1258 | | // they are L0 files. |
1259 | 1.21M | for (int i = 1; i < num_levels(); i++79.5k ) { |
1260 | 79.5k | if (!files_[i].empty() && !files_[i][0]->being_compacted17.1k ) { |
1261 | 15.2k | num_sorted_runs++; |
1262 | 15.2k | } |
1263 | 79.5k | } |
1264 | 1.13M | } |
1265 | | |
1266 | 1.23M | if (compaction_style_ == kCompactionStyleFIFO) { |
1267 | 900 | score = static_cast<double>(total_size) / |
1268 | 900 | compaction_options_fifo.max_table_files_size; |
1269 | 1.23M | } else { |
1270 | 1.23M | score = static_cast<double>(num_sorted_runs) / |
1271 | 1.23M | mutable_cf_options.level0_file_num_compaction_trigger; |
1272 | 1.23M | } |
1273 | 1.23M | } else { |
1274 | | // Compute the ratio of current size to size limit. |
1275 | 468k | uint64_t level_bytes_no_compacting = 0; |
1276 | 600k | for (auto f : files_[level]) { |
1277 | 600k | if (!f->being_compacted) { |
1278 | 556k | level_bytes_no_compacting += f->compensated_file_size; |
1279 | 556k | } |
1280 | 600k | } |
1281 | 468k | score = static_cast<double>(level_bytes_no_compacting) / |
1282 | 468k | MaxBytesForLevel(level); |
1283 | 468k | } |
1284 | 1.70M | compaction_level_[level] = level; |
1285 | 1.70M | compaction_score_[level] = score; |
1286 | 1.70M | } |
1287 | | |
1288 | | // sort all the levels based on their score. Higher scores get listed |
1289 | | // first. Use bubble sort because the number of entries are small. |
1290 | 1.76M | for (int i = 0; i < num_levels() - 2; i++534k ) { |
1291 | 2.53M | for (int j = i + 1; j < num_levels() - 1; j++2.00M ) { |
1292 | 2.00M | if (compaction_score_[i] < compaction_score_[j]) { |
1293 | 172k | double score = compaction_score_[i]; |
1294 | 172k | int level = compaction_level_[i]; |
1295 | 172k | compaction_score_[i] = compaction_score_[j]; |
1296 | 172k | compaction_level_[i] = compaction_level_[j]; |
1297 | 172k | compaction_score_[j] = score; |
1298 | 172k | compaction_level_[j] = level; |
1299 | 172k | } |
1300 | 2.00M | } |
1301 | 534k | } |
1302 | 1.23M | ComputeFilesMarkedForCompaction(); |
1303 | 1.23M | EstimateCompactionBytesNeeded(mutable_cf_options); |
1304 | 1.23M | } |
1305 | | |
1306 | 1.23M | void VersionStorageInfo::ComputeFilesMarkedForCompaction() { |
1307 | 1.23M | files_marked_for_compaction_.clear(); |
1308 | 1.23M | int last_qualify_level = 0; |
1309 | | |
1310 | | // Do not include files from the last level with data |
1311 | | // If table properties collector suggests a file on the last level, |
1312 | | // we should not move it to a new level. |
1313 | 1.61M | for (int level = num_levels() - 1; level >= 1; level--376k ) { |
1314 | 442k | if (!files_[level].empty()) { |
1315 | 65.6k | last_qualify_level = level - 1; |
1316 | 65.6k | break; |
1317 | 65.6k | } |
1318 | 442k | } |
1319 | | |
1320 | 2.66M | for (int level = 0; level <= last_qualify_level; level++1.43M ) { |
1321 | 1.43M | for (auto* f : files_[level]) { |
1322 | 696k | if (!f->being_compacted && f->marked_for_compaction590k ) { |
1323 | 38 | files_marked_for_compaction_.emplace_back(level, f); |
1324 | 38 | } |
1325 | 696k | } |
1326 | 1.43M | } |
1327 | 1.23M | } |
1328 | | |
1329 | | namespace { |
1330 | | |
1331 | | // used to sort files by size |
1332 | | struct Fsize { |
1333 | | size_t index; |
1334 | | FileMetaData* file; |
1335 | | }; |
1336 | | |
1337 | | // Compator that is used to sort files based on their size |
1338 | | // In normal mode: descending size |
1339 | 1.71M | bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { |
1340 | 1.71M | return (first.file->compensated_file_size > |
1341 | 1.71M | second.file->compensated_file_size); |
1342 | 1.71M | } |
1343 | | } // anonymous namespace |
1344 | | |
1345 | 967k | void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) { |
1346 | 967k | auto* level_files = &files_[level]; |
1347 | | // Must not overlap |
1348 | 967k | #ifndef NDEBUG |
1349 | 967k | if (level > 0 && !level_files->empty()733k && |
1350 | 967k | internal_comparator_->Compare( |
1351 | 628k | (*level_files)[level_files->size() - 1]->largest.key, f->smallest.key) >= 0) { |
1352 | 0 | auto* f2 = (*level_files)[level_files->size() - 1]; |
1353 | 0 | if (info_log != nullptr) { |
1354 | 0 | RERROR(info_log, "Adding new file %" PRIu64 |
1355 | 0 | " range (%s, %s) to level %d but overlapping " |
1356 | 0 | "with existing file %" PRIu64 " %s %s", |
1357 | 0 | f->fd.GetNumber(), |
1358 | 0 | f->smallest.key.DebugString(true).c_str(), |
1359 | 0 | f->largest.key.DebugString(true).c_str(), |
1360 | 0 | level, |
1361 | 0 | f2->fd.GetNumber(), |
1362 | 0 | f2->smallest.key.DebugString(true).c_str(), |
1363 | 0 | f2->largest.key.DebugString(true).c_str()); |
1364 | 0 | LogFlush(info_log); |
1365 | 0 | } |
1366 | 0 | assert(false); |
1367 | 0 | } |
1368 | 0 | #endif |
1369 | 0 | f->refs++; |
1370 | 967k | level_files->push_back(f); |
1371 | 967k | } |
1372 | | |
1373 | | // Version::PrepareApply() need to be called before calling the function, or |
1374 | | // following functions called: |
1375 | | // 1. UpdateNumNonEmptyLevels(); |
1376 | | // 2. CalculateBaseBytes(); |
1377 | | // 3. UpdateFilesByCompactionPri(); |
1378 | | // 4. GenerateFileIndexer(); |
1379 | | // 5. GenerateLevelFilesBrief(); |
1380 | | // 6. GenerateLevel0NonOverlapping(); |
1381 | 1.21M | void VersionStorageInfo::SetFinalized() { |
1382 | 1.21M | finalized_ = true; |
1383 | 1.21M | #ifndef NDEBUG |
1384 | 1.21M | if (compaction_style_ != kCompactionStyleLevel) { |
1385 | | // Not level based compaction. |
1386 | 1.14M | return; |
1387 | 1.14M | } |
1388 | 73.7k | assert(base_level_ < 0 || num_levels() == 1 || |
1389 | 73.7k | (base_level_ >= 1 && base_level_ < num_levels())); |
1390 | | // Verify all levels newer than base_level are empty except L0 |
1391 | 84.0k | for (int level = 1; level < base_level(); level++10.3k ) { |
1392 | 10.3k | assert(NumLevelBytes(level) == 0); |
1393 | 10.3k | } |
1394 | 73.7k | uint64_t max_bytes_prev_level = 0; |
1395 | 433k | for (int level = base_level(); level < num_levels() - 1; level++360k ) { |
1396 | 360k | if (LevelFiles(level).size() == 0) { |
1397 | 285k | continue; |
1398 | 285k | } |
1399 | 74.9k | assert(MaxBytesForLevel(level) >= max_bytes_prev_level); |
1400 | 0 | max_bytes_prev_level = MaxBytesForLevel(level); |
1401 | 74.9k | } |
1402 | 73.7k | int num_empty_non_l0_level = 0; |
1403 | 591k | for (int level = 0; level < num_levels(); level++517k ) { |
1404 | 517k | assert(LevelFiles(level).size() == 0 || |
1405 | 517k | LevelFiles(level).size() == LevelFilesBrief(level).num_files); |
1406 | 517k | if (level > 0 && NumLevelBytes(level) > 0444k ) { |
1407 | 87.8k | num_empty_non_l0_level++; |
1408 | 87.8k | } |
1409 | 517k | if (LevelFiles(level).size() > 0) { |
1410 | 117k | assert(level < num_non_empty_levels()); |
1411 | 117k | } |
1412 | 517k | } |
1413 | 73.7k | assert(compaction_level_.size() > 0); |
1414 | 0 | assert(compaction_level_.size() == compaction_score_.size()); |
1415 | 73.7k | #endif |
1416 | 73.7k | } |
1417 | | |
1418 | 773k | void VersionStorageInfo::UpdateNumNonEmptyLevels() { |
1419 | 773k | num_non_empty_levels_ = num_levels_; |
1420 | 1.71M | for (int i = num_levels_ - 1; i >= 0; i--940k ) { |
1421 | 1.01M | if (files_[i].size() != 0) { |
1422 | 75.7k | return; |
1423 | 940k | } else { |
1424 | 940k | num_non_empty_levels_ = i; |
1425 | 940k | } |
1426 | 1.01M | } |
1427 | 773k | } |
1428 | | |
1429 | | namespace { |
1430 | | // Sort `temp` based on ratio of overlapping size over file size |
1431 | | void SortFileByOverlappingRatio( |
1432 | | const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files, |
1433 | | const std::vector<FileMetaData*>& next_level_files, |
1434 | 111 | std::vector<Fsize>* temp) { |
1435 | 111 | std::unordered_map<uint64_t, uint64_t> file_to_order; |
1436 | 111 | auto next_level_it = next_level_files.begin(); |
1437 | | |
1438 | 111 | for (auto& file : files) { |
1439 | 64 | uint64_t overlapping_bytes = 0; |
1440 | | // Skip files in next level that is smaller than current file |
1441 | 66 | while (next_level_it != next_level_files.end() && |
1442 | 66 | icmp.Compare((*next_level_it)->largest.key, file->smallest.key) < 016 ) { |
1443 | 2 | next_level_it++; |
1444 | 2 | } |
1445 | | |
1446 | 74 | while (next_level_it != next_level_files.end() && |
1447 | 74 | icmp.Compare((*next_level_it)->smallest.key, file->largest.key) < 023 ) { |
1448 | 18 | overlapping_bytes += (*next_level_it)->fd.total_file_size; |
1449 | | |
1450 | 18 | if (icmp.Compare((*next_level_it)->largest.key, file->largest.key) > 0) { |
1451 | | // next level file cross large boundary of current file. |
1452 | 8 | break; |
1453 | 8 | } |
1454 | 10 | next_level_it++; |
1455 | 10 | } |
1456 | | |
1457 | 64 | assert(file->fd.total_file_size != 0); |
1458 | 0 | file_to_order[file->fd.GetNumber()] = |
1459 | 64 | overlapping_bytes * 1024u / file->fd.total_file_size; |
1460 | 64 | } |
1461 | | |
1462 | 111 | std::sort(temp->begin(), temp->end(), |
1463 | 111 | [&](const Fsize& f1, const Fsize& f2) -> bool { |
1464 | 34 | return file_to_order[f1.file->fd.GetNumber()] < |
1465 | 34 | file_to_order[f2.file->fd.GetNumber()]; |
1466 | 34 | }); |
1467 | 111 | } |
1468 | | } // namespace |
1469 | | |
1470 | | void VersionStorageInfo::UpdateFilesByCompactionPri( |
1471 | 773k | const MutableCFOptions& mutable_cf_options) { |
1472 | 773k | if (compaction_style_ == kCompactionStyleFIFO || |
1473 | 773k | compaction_style_ == kCompactionStyleUniversal773k ) { |
1474 | | // don't need this |
1475 | 714k | return; |
1476 | 714k | } |
1477 | | // No need to sort the highest level because it is never compacted. |
1478 | 410k | for (int level = 0; 59.3k level < num_levels() - 1; level++350k ) { |
1479 | 350k | const std::vector<FileMetaData*>& files = files_[level]; |
1480 | 350k | auto& files_by_compaction_pri = files_by_compaction_pri_[level]; |
1481 | 350k | assert(files_by_compaction_pri.size() == 0); |
1482 | | |
1483 | | // populate a temp vector for sorting based on size |
1484 | 0 | std::vector<Fsize> temp(files.size()); |
1485 | 832k | for (size_t i = 0; i < files.size(); i++481k ) { |
1486 | 481k | temp[i].index = i; |
1487 | 481k | temp[i].file = files[i]; |
1488 | 481k | } |
1489 | | |
1490 | | // sort the top number_of_files_to_sort_ based on file size |
1491 | 350k | size_t num = VersionStorageInfo::kNumberFilesToSort; |
1492 | 350k | if (num > temp.size()) { |
1493 | 350k | num = temp.size(); |
1494 | 350k | } |
1495 | 350k | switch (mutable_cf_options.compaction_pri) { |
1496 | 350k | case kByCompensatedSize: |
1497 | 350k | std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), |
1498 | 350k | CompareCompensatedSizeDescending); |
1499 | 350k | break; |
1500 | 96 | case kOldestLargestSeqFirst: |
1501 | 96 | std::sort(temp.begin(), temp.end(), |
1502 | 96 | [](const Fsize& f1, const Fsize& f2) -> bool { |
1503 | 17 | return f1.file->largest.seqno < f2.file->largest.seqno; |
1504 | 17 | }); |
1505 | 96 | break; |
1506 | 96 | case kOldestSmallestSeqFirst: |
1507 | 96 | std::sort(temp.begin(), temp.end(), |
1508 | 96 | [](const Fsize& f1, const Fsize& f2) -> bool { |
1509 | 17 | return f1.file->smallest.seqno < f2.file->smallest.seqno; |
1510 | 17 | }); |
1511 | 96 | break; |
1512 | 111 | case kMinOverlappingRatio: |
1513 | 111 | SortFileByOverlappingRatio(*internal_comparator_, files_[level], |
1514 | 111 | files_[level + 1], &temp); |
1515 | 111 | break; |
1516 | 0 | default: |
1517 | 0 | assert(false); |
1518 | 350k | } |
1519 | 350k | assert(temp.size() == files.size()); |
1520 | | |
1521 | | // initialize files_by_compaction_pri_ |
1522 | 832k | for (size_t i = 0; i < temp.size(); i++481k ) { |
1523 | 481k | files_by_compaction_pri.push_back(static_cast<int>(temp[i].index)); |
1524 | 481k | } |
1525 | 350k | next_file_to_compact_by_size_[level] = 0; |
1526 | 350k | assert(files_[level].size() == files_by_compaction_pri_[level].size()); |
1527 | 350k | } |
1528 | 59.3k | } |
1529 | | |
1530 | 773k | void VersionStorageInfo::GenerateLevel0NonOverlapping() { |
1531 | 773k | assert(!finalized_); |
1532 | 0 | level0_non_overlapping_ = true; |
1533 | 773k | if (level_files_brief_.size() == 0) { |
1534 | 696k | return; |
1535 | 696k | } |
1536 | | |
1537 | | // A copy of L0 files sorted by smallest key |
1538 | 76.2k | std::vector<FdWithBoundaries> level0_sorted_file( |
1539 | 76.2k | level_files_brief_[0].files, |
1540 | 76.2k | level_files_brief_[0].files + level_files_brief_[0].num_files); |
1541 | 76.2k | sort(level0_sorted_file.begin(), level0_sorted_file.end(), |
1542 | 508k | [this](const FdWithBoundaries& f1, const FdWithBoundaries& f2)->bool { |
1543 | 508k | return (internal_comparator_->Compare(f1.smallest.key, f2.smallest.key) < 0); |
1544 | 508k | }); |
1545 | | |
1546 | 152k | for (size_t i = 1; i < level0_sorted_file.size(); ++i76.7k ) { |
1547 | 102k | auto& f = level0_sorted_file[i]; |
1548 | 102k | auto& prev = level0_sorted_file[i - 1]; |
1549 | 102k | if (internal_comparator_->Compare(prev.largest.key, f.smallest.key) >= 0) { |
1550 | 25.7k | level0_non_overlapping_ = false; |
1551 | 25.7k | break; |
1552 | 25.7k | } |
1553 | 102k | } |
1554 | 76.2k | } |
1555 | | |
1556 | 3.71M | void Version::Ref() { |
1557 | 3.71M | ++refs_; |
1558 | 3.71M | } |
1559 | | |
1560 | 3.59M | bool Version::Unref() { |
1561 | 3.59M | assert(refs_ >= 1); |
1562 | 0 | --refs_; |
1563 | 3.59M | if (refs_ == 0) { |
1564 | 1.57M | delete this; |
1565 | 1.57M | return true; |
1566 | 1.57M | } |
1567 | 2.01M | return false; |
1568 | 3.59M | } |
1569 | | |
1570 | | bool VersionStorageInfo::OverlapInLevel(int level, |
1571 | | const Slice* smallest_user_key, |
1572 | 24.7k | const Slice* largest_user_key) { |
1573 | 24.7k | if (level >= num_non_empty_levels_) { |
1574 | | // empty level, no overlap |
1575 | 0 | return false; |
1576 | 0 | } |
1577 | 24.7k | return SomeFileOverlapsRange(*internal_comparator_, (level > 0), |
1578 | 24.7k | level_files_brief_[level], smallest_user_key, |
1579 | 24.7k | largest_user_key); |
1580 | 24.7k | } |
1581 | | |
1582 | | // Store in "*inputs" all files in "level" that overlap [begin,end] |
1583 | | // If hint_index is specified, then it points to a file in the |
1584 | | // overlapping range. |
1585 | | // The file_index returns a pointer to any file in an overlapping range. |
1586 | | void VersionStorageInfo::GetOverlappingInputs( |
1587 | | int level, const InternalKey* begin, const InternalKey* end, |
1588 | | std::vector<FileMetaData*>* inputs, int hint_index, int* file_index, |
1589 | 85.3k | bool expand_range) const { |
1590 | 85.3k | if (level >= num_non_empty_levels_) { |
1591 | | // this level is empty, no overlapping inputs |
1592 | 8.61k | return; |
1593 | 8.61k | } |
1594 | | |
1595 | 76.7k | inputs->clear(); |
1596 | 76.7k | Slice user_begin, user_end; |
1597 | 76.7k | if (begin != nullptr) { |
1598 | 72.1k | user_begin = begin->user_key(); |
1599 | 72.1k | } |
1600 | 76.7k | if (end != nullptr) { |
1601 | 72.0k | user_end = end->user_key(); |
1602 | 72.0k | } |
1603 | 76.7k | if (file_index) { |
1604 | 48.8k | *file_index = -1; |
1605 | 48.8k | } |
1606 | 76.7k | const Comparator* user_cmp = user_comparator_; |
1607 | 76.7k | if (begin != nullptr && end != nullptr72.1k && level > 072.0k ) { |
1608 | 61.6k | GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs, |
1609 | 61.6k | hint_index, file_index); |
1610 | 61.6k | return; |
1611 | 61.6k | } |
1612 | 70.2k | for (size_t i = 0; 15.1k i < level_files_brief_[level].num_files; ) { |
1613 | 55.1k | auto* f = &(level_files_brief_[level].files[i++]); |
1614 | 55.1k | const Slice file_start = f->smallest.user_key(); |
1615 | 55.1k | const Slice file_limit = f->largest.user_key(); |
1616 | 55.1k | if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 048.9k ) { |
1617 | | // "f" is completely before specified range; skip it |
1618 | 45.5k | } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 037.1k ) { |
1619 | | // "f" is completely after specified range; skip it |
1620 | 38.8k | } else { |
1621 | 38.8k | inputs->push_back(files_[level][i-1]); |
1622 | 38.8k | if (level == 0 && expand_range34.0k ) { |
1623 | | // Level-0 files may overlap each other. So check if the newly |
1624 | | // added file has expanded the range. If so, restart search. |
1625 | 34.0k | if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 030.3k ) { |
1626 | 991 | user_begin = file_start; |
1627 | 991 | inputs->clear(); |
1628 | 991 | i = 0; |
1629 | 33.0k | } else if (end != nullptr |
1630 | 33.0k | && user_cmp->Compare(file_limit, user_end) > 029.4k ) { |
1631 | 1.79k | user_end = file_limit; |
1632 | 1.79k | inputs->clear(); |
1633 | 1.79k | i = 0; |
1634 | 1.79k | } |
1635 | 34.0k | } else if (4.77k file_index4.77k ) { |
1636 | 0 | *file_index = static_cast<int>(i) - 1; |
1637 | 0 | } |
1638 | 38.8k | } |
1639 | 55.1k | } |
1640 | 15.1k | } |
1641 | | |
1642 | | // Store in "*inputs" all files in "level" that overlap [begin,end] |
1643 | | // Employ binary search to find at least one file that overlaps the |
1644 | | // specified range. From that file, iterate backwards and |
1645 | | // forwards to find all overlapping files. |
1646 | | void VersionStorageInfo::GetOverlappingInputsBinarySearch( |
1647 | | int level, const Slice& user_begin, const Slice& user_end, |
1648 | 61.6k | std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const { |
1649 | 61.6k | assert(level > 0); |
1650 | 0 | int min = 0; |
1651 | 61.6k | int mid = 0; |
1652 | 61.6k | int max = static_cast<int>(files_[level].size()) - 1; |
1653 | 61.6k | bool foundOverlap = false; |
1654 | 61.6k | const Comparator* user_cmp = user_comparator_; |
1655 | | |
1656 | | // if the caller already knows the index of a file that has overlap, |
1657 | | // then we can skip the binary search. |
1658 | 61.6k | if (hint_index != -1) { |
1659 | 9.36k | mid = hint_index; |
1660 | 9.36k | foundOverlap = true; |
1661 | 9.36k | } |
1662 | | |
1663 | 140k | while (!foundOverlap && min <= max131k ) { |
1664 | 99.7k | mid = (min + max)/2; |
1665 | 99.7k | FdWithBoundaries* f = level_files_brief_[level].files + mid; |
1666 | 99.7k | const Slice file_start = f->smallest.user_key(); |
1667 | 99.7k | const Slice file_limit = f->largest.user_key(); |
1668 | 99.7k | if (user_cmp->Compare(file_limit, user_begin) < 0) { |
1669 | 63.4k | min = mid + 1; |
1670 | 63.4k | } else if (36.3k user_cmp->Compare(user_end, file_start) < 036.3k ) { |
1671 | 15.3k | max = mid - 1; |
1672 | 20.9k | } else { |
1673 | 20.9k | foundOverlap = true; |
1674 | 20.9k | break; |
1675 | 20.9k | } |
1676 | 99.7k | } |
1677 | | |
1678 | | // If there were no overlapping files, return immediately. |
1679 | 61.6k | if (!foundOverlap) { |
1680 | 31.3k | return; |
1681 | 31.3k | } |
1682 | | // returns the index where an overlap is found |
1683 | 30.3k | if (file_index) { |
1684 | 24.9k | *file_index = mid; |
1685 | 24.9k | } |
1686 | 30.3k | ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid); |
1687 | 30.3k | } |
1688 | | |
1689 | | // Store in "*inputs" all files in "level" that overlap [begin,end] |
1690 | | // The midIndex specifies the index of at least one file that |
1691 | | // overlaps the specified range. From that file, iterate backward |
1692 | | // and forward to find all overlapping files. |
1693 | | // Use FileLevel in searching, make it faster |
1694 | | void VersionStorageInfo::ExtendOverlappingInputs( |
1695 | | int level, const Slice& user_begin, const Slice& user_end, |
1696 | 30.3k | std::vector<FileMetaData*>* inputs, unsigned int midIndex) const { |
1697 | 30.3k | const Comparator* user_cmp = user_comparator_; |
1698 | 30.3k | const FdWithBoundaries* files = level_files_brief_[level].files; |
1699 | 30.3k | #ifndef NDEBUG |
1700 | 30.3k | { |
1701 | | // assert that the file at midIndex overlaps with the range |
1702 | 30.3k | assert(midIndex < level_files_brief_[level].num_files); |
1703 | 0 | const auto* f = &files[midIndex]; |
1704 | 30.3k | const Slice fstart = f->smallest.user_key(); |
1705 | 30.3k | const Slice flimit = f->largest.user_key(); |
1706 | 30.3k | if (user_cmp->Compare(fstart, user_begin) >= 0) { |
1707 | 24.5k | assert(user_cmp->Compare(fstart, user_end) <= 0); |
1708 | 24.5k | } else { |
1709 | 5.80k | assert(user_cmp->Compare(flimit, user_begin) >= 0); |
1710 | 5.80k | } |
1711 | 30.3k | } |
1712 | 0 | #endif |
1713 | 0 | int startIndex = midIndex + 1; |
1714 | 30.3k | int endIndex = midIndex; |
1715 | 30.3k | int count __attribute__((unused)) = 0; |
1716 | | |
1717 | | // check backwards from 'mid' to lower indices |
1718 | 77.9k | for (int i = midIndex; i >= 0 ; i--47.6k ) { |
1719 | 59.4k | const auto* f = &files[i]; |
1720 | 59.4k | const Slice file_limit = f->largest.user_key(); |
1721 | 59.4k | if (user_cmp->Compare(file_limit, user_begin) >= 0) { |
1722 | 47.6k | startIndex = i; |
1723 | 47.6k | assert((count++, true)); |
1724 | 47.6k | } else { |
1725 | 11.7k | break; |
1726 | 11.7k | } |
1727 | 59.4k | } |
1728 | | // check forward from 'mid+1' to higher indices |
1729 | 30.3k | for (unsigned int i = midIndex+1; |
1730 | 49.9k | i < level_files_brief_[level].num_files; i++19.5k ) { |
1731 | 30.7k | const auto* f = &files[i]; |
1732 | 30.7k | const Slice file_start = f->smallest.user_key(); |
1733 | 30.7k | if (user_cmp->Compare(file_start, user_end) <= 0) { |
1734 | 19.5k | assert((count++, true)); |
1735 | 0 | endIndex = i; |
1736 | 19.5k | } else { |
1737 | 11.2k | break; |
1738 | 11.2k | } |
1739 | 30.7k | } |
1740 | 30.3k | assert(count == endIndex - startIndex + 1); |
1741 | | |
1742 | | // insert overlapping files into vector |
1743 | 97.5k | for (int i = startIndex; i <= endIndex; i++67.2k ) { |
1744 | 67.2k | FileMetaData* f = files_[level][i]; |
1745 | 67.2k | inputs->push_back(f); |
1746 | 67.2k | } |
1747 | 30.3k | } |
1748 | | |
1749 | | // Returns true iff the first or last file in inputs contains |
1750 | | // an overlapping user key to the file "just outside" of it (i.e. |
1751 | | // just after the last file, or just before the first file) |
1752 | | // REQUIRES: "*inputs" is a sorted list of non-overlapping files |
1753 | | bool VersionStorageInfo::HasOverlappingUserKey( |
1754 | 1.65k | const std::vector<FileMetaData*>* inputs, int level) { |
1755 | | |
1756 | | // If inputs empty, there is no overlap. |
1757 | | // If level == 0, it is assumed that all needed files were already included. |
1758 | 1.65k | if (inputs->empty() || level == 0) { |
1759 | 1.10k | return false; |
1760 | 1.10k | } |
1761 | | |
1762 | 554 | const Comparator* user_cmp = user_comparator_; |
1763 | 554 | const rocksdb::LevelFilesBrief& file_level = level_files_brief_[level]; |
1764 | 554 | const FdWithBoundaries* files = level_files_brief_[level].files; |
1765 | 554 | const size_t kNumFiles = file_level.num_files; |
1766 | | |
1767 | | // Check the last file in inputs against the file after it |
1768 | 554 | size_t last_file = FindFile(*internal_comparator_, file_level, |
1769 | 554 | inputs->back()->largest.key.Encode()); |
1770 | 554 | assert(last_file < kNumFiles); // File should exist! |
1771 | 554 | if (last_file < kNumFiles-1) { // If not the last file |
1772 | 478 | const Slice last_key_in_input = files[last_file].largest.user_key(); |
1773 | 478 | const Slice first_key_after = files[last_file+1].smallest.user_key(); |
1774 | 478 | if (user_cmp->Equal(last_key_in_input, first_key_after)) { |
1775 | | // The last user key in input overlaps with the next file's first key |
1776 | 0 | return true; |
1777 | 0 | } |
1778 | 478 | } |
1779 | | |
1780 | | // Check the first file in inputs against the file just before it |
1781 | 554 | size_t first_file = FindFile(*internal_comparator_, file_level, |
1782 | 554 | inputs->front()->smallest.key.Encode()); |
1783 | 554 | assert(first_file <= last_file); // File should exist! |
1784 | 554 | if (first_file > 0) { // If not first file |
1785 | 350 | const Slice& first_key_in_input = files[first_file].smallest.user_key(); |
1786 | 350 | const Slice& last_key_before = files[first_file-1].largest.user_key(); |
1787 | 350 | if (user_cmp->Equal(first_key_in_input, last_key_before)) { |
1788 | | // The first user key in input overlaps with the previous file's last key |
1789 | 0 | return true; |
1790 | 0 | } |
1791 | 350 | } |
1792 | | |
1793 | 554 | return false; |
1794 | 554 | } |
1795 | | |
1796 | 454k | uint64_t VersionStorageInfo::NumLevelBytes(int level) const { |
1797 | 454k | assert(level >= 0); |
1798 | 0 | assert(level < num_levels()); |
1799 | 0 | return TotalFileSize(files_[level]); |
1800 | 454k | } |
1801 | | |
1802 | | const char* VersionStorageInfo::LevelSummary( |
1803 | 42.2k | LevelSummaryStorage* scratch) const { |
1804 | 42.2k | int len = 0; |
1805 | 42.2k | if (compaction_style_ == kCompactionStyleLevel && num_levels() > 118.9k ) { |
1806 | 18.9k | assert(base_level_ < static_cast<int>(level_max_bytes_.size())); |
1807 | 0 | len = snprintf(scratch->buffer, sizeof(scratch->buffer), |
1808 | 18.9k | "base level %d max bytes base %" PRIu64 " ", base_level_, |
1809 | 18.9k | level_max_bytes_[base_level_]); |
1810 | 18.9k | } |
1811 | 0 | len += |
1812 | 42.2k | snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files["); |
1813 | 273k | for (int i = 0; i < num_levels(); i++230k ) { |
1814 | 230k | int sz = sizeof(scratch->buffer) - len; |
1815 | 230k | int ret = snprintf(scratch->buffer + len, sz, "%zd ", files_[i].size()); |
1816 | 230k | if (ret < 0 || ret >= sz) break0 ; |
1817 | 230k | len += ret; |
1818 | 230k | } |
1819 | 42.2k | if (len > 0) { |
1820 | | // overwrite the last space |
1821 | 42.2k | --len; |
1822 | 42.2k | } |
1823 | 42.2k | len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, |
1824 | 42.2k | "] max score %.2f", compaction_score_[0]); |
1825 | | |
1826 | 42.2k | if (!files_marked_for_compaction_.empty()) { |
1827 | 7 | snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, |
1828 | 7 | " (%" ROCKSDB_PRIszt " files need compaction)", |
1829 | 7 | files_marked_for_compaction_.size()); |
1830 | 7 | } |
1831 | | |
1832 | 42.2k | return scratch->buffer; |
1833 | 42.2k | } |
1834 | | |
1835 | | const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch, |
1836 | 0 | int level) const { |
1837 | 0 | int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); |
1838 | 0 | for (const auto& f : files_[level]) { |
1839 | 0 | int sz = sizeof(scratch->buffer) - len; |
1840 | 0 | char sztxt[16]; |
1841 | 0 | AppendHumanBytes(f->fd.GetTotalFileSize(), sztxt, sizeof(sztxt)); |
1842 | 0 | int ret = snprintf(scratch->buffer + len, sz, |
1843 | 0 | "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ", |
1844 | 0 | f->fd.GetNumber(), f->smallest.seqno, sztxt, |
1845 | 0 | static_cast<int>(f->being_compacted)); |
1846 | 0 | if (ret < 0 || ret >= sz) |
1847 | 0 | break; |
1848 | 0 | len += ret; |
1849 | 0 | } |
1850 | | // overwrite the last space (only if files_[level].size() is non-zero) |
1851 | 0 | if (files_[level].size() && len > 0) { |
1852 | 0 | --len; |
1853 | 0 | } |
1854 | 0 | snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); |
1855 | 0 | return scratch->buffer; |
1856 | 0 | } |
1857 | | |
1858 | 15 | int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() { |
1859 | 15 | uint64_t result = 0; |
1860 | 15 | std::vector<FileMetaData*> overlaps; |
1861 | 81 | for (int level = 1; level < num_levels() - 1; level++66 ) { |
1862 | 214 | for (const auto& f : files_[level]) { |
1863 | 214 | GetOverlappingInputs(level + 1, &f->smallest.key, &f->largest.key, &overlaps); |
1864 | 214 | const uint64_t sum = TotalFileSize(overlaps); |
1865 | 214 | if (sum > result) { |
1866 | 6 | result = sum; |
1867 | 6 | } |
1868 | 214 | } |
1869 | 66 | } |
1870 | 15 | return result; |
1871 | 15 | } |
1872 | | |
1873 | 1.06M | uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const { |
1874 | | // Note: the result for level zero is not really used since we set |
1875 | | // the level-0 compaction threshold based on number of files. |
1876 | 1.06M | assert(level >= 0); |
1877 | 0 | assert(level < static_cast<int>(level_max_bytes_.size())); |
1878 | 0 | return level_max_bytes_[level]; |
1879 | 1.06M | } |
1880 | | |
1881 | | void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions, |
1882 | 1.21M | const MutableCFOptions& options) { |
1883 | | // Special logic to set number of sorted runs. |
1884 | | // It is to match the previous behavior when all files are in L0. |
1885 | 1.21M | int num_l0_count = 0; |
1886 | 1.21M | if (options.MaxFileSizeForCompaction() == std::numeric_limits<uint64_t>::max()) { |
1887 | 1.21M | num_l0_count = static_cast<int>(files_[0].size()); |
1888 | 1.21M | } else { |
1889 | 556 | for (const auto& file : files_[0]) { |
1890 | 281 | if (file->fd.GetTotalFileSize() <= options.MaxFileSizeForCompaction()) { |
1891 | 225 | ++num_l0_count; |
1892 | 225 | } |
1893 | 281 | } |
1894 | 556 | } |
1895 | 1.21M | if (compaction_style_ == kCompactionStyleUniversal) { |
1896 | | // For universal compaction, we use level0 score to indicate |
1897 | | // compaction score for the whole DB. Adding other levels as if |
1898 | | // they are L0 files. |
1899 | 1.21M | for (int i = 1; i < num_levels(); i++79.1k ) { |
1900 | 79.1k | if (!files_[i].empty()) { |
1901 | 17.1k | num_l0_count++; |
1902 | 17.1k | } |
1903 | 79.1k | } |
1904 | 1.13M | } |
1905 | 1.21M | set_l0_delay_trigger_count(num_l0_count); |
1906 | | |
1907 | 1.21M | level_max_bytes_.resize(ioptions.num_levels); |
1908 | 1.21M | if (!ioptions.level_compaction_dynamic_level_bytes) { |
1909 | 1.21M | base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 169.3k : -11.14M ; |
1910 | | |
1911 | | // Calculate for static bytes base case |
1912 | 2.91M | for (int i = 0; i < ioptions.num_levels; ++i1.70M ) { |
1913 | 1.70M | if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal1.21M ) { |
1914 | 1.13M | level_max_bytes_[i] = options.max_bytes_for_level_base; |
1915 | 1.13M | } else if (567k i > 1567k ) { |
1916 | 413k | level_max_bytes_[i] = MultiplyCheckOverflow( |
1917 | 413k | MultiplyCheckOverflow(level_max_bytes_[i - 1], |
1918 | 413k | options.max_bytes_for_level_multiplier), |
1919 | 413k | options.MaxBytesMultiplerAdditional(i - 1)); |
1920 | 413k | } else { |
1921 | 154k | level_max_bytes_[i] = options.max_bytes_for_level_base; |
1922 | 154k | } |
1923 | 1.70M | } |
1924 | 1.21M | } else { |
1925 | 4.11k | uint64_t max_level_size = 0; |
1926 | | |
1927 | 4.11k | int first_non_empty_level = -1; |
1928 | | // Find size of non-L0 level of most data. |
1929 | | // Cannot use the size of the last level because it can be empty or less |
1930 | | // than previous levels after compaction. |
1931 | 33.3k | for (int i = 1; i < num_levels_; i++29.2k ) { |
1932 | 29.2k | uint64_t total_size = 0; |
1933 | 240k | for (const auto& f : files_[i]) { |
1934 | 240k | total_size += f->fd.GetTotalFileSize(); |
1935 | 240k | } |
1936 | 29.2k | if (total_size > 0 && first_non_empty_level == -116.4k ) { |
1937 | 4.54k | first_non_empty_level = i; |
1938 | 4.54k | } |
1939 | 29.2k | if (total_size > max_level_size) { |
1940 | 16.0k | max_level_size = total_size; |
1941 | 16.0k | } |
1942 | 29.2k | } |
1943 | | |
1944 | | // Prefill every level's max bytes to disallow compaction from there. |
1945 | 38.0k | for (int i = 0; i < num_levels_; i++33.8k ) { |
1946 | 33.8k | level_max_bytes_[i] = std::numeric_limits<uint64_t>::max(); |
1947 | 33.8k | } |
1948 | | |
1949 | 4.11k | if (max_level_size == 0) { |
1950 | | // No data for L1 and up. L0 compacts to last level directly. |
1951 | | // No compaction from L1+ needs to be scheduled. |
1952 | 94 | base_level_ = num_levels_ - 1; |
1953 | 4.02k | } else { |
1954 | 4.02k | uint64_t base_bytes_max = options.max_bytes_for_level_base; |
1955 | 4.02k | uint64_t base_bytes_min = |
1956 | 4.02k | base_bytes_max / options.max_bytes_for_level_multiplier; |
1957 | | |
1958 | | // Try whether we can make last level's target size to be max_level_size |
1959 | 4.02k | uint64_t cur_level_size = max_level_size; |
1960 | 16.3k | for (int i = num_levels_ - 2; i >= first_non_empty_level; i--12.2k ) { |
1961 | | // Round up after dividing |
1962 | 12.2k | cur_level_size /= options.max_bytes_for_level_multiplier; |
1963 | 12.2k | } |
1964 | | |
1965 | | // Calculate base level and its size. |
1966 | 4.02k | uint64_t base_level_size; |
1967 | 4.02k | if (cur_level_size <= base_bytes_min) { |
1968 | | // Case 1. If we make target size of last level to be max_level_size, |
1969 | | // target size of the first non-empty level would be smaller than |
1970 | | // base_bytes_min. We set it be base_bytes_min. |
1971 | 170 | base_level_size = base_bytes_min + 1U; |
1972 | 170 | base_level_ = first_non_empty_level; |
1973 | 170 | RWARN(ioptions.info_log, |
1974 | 170 | "More existing levels in DB than needed. " |
1975 | 170 | "max_bytes_for_level_multiplier may not be guaranteed."); |
1976 | 3.85k | } else { |
1977 | | // Find base level (where L0 data is compacted to). |
1978 | 3.85k | base_level_ = first_non_empty_level; |
1979 | 5.80k | while (base_level_ > 1 && cur_level_size > base_bytes_max5.03k ) { |
1980 | 1.95k | --base_level_; |
1981 | 1.95k | cur_level_size = |
1982 | 1.95k | cur_level_size / options.max_bytes_for_level_multiplier; |
1983 | 1.95k | } |
1984 | 3.85k | if (cur_level_size > base_bytes_max) { |
1985 | | // Even L1 will be too large |
1986 | 1 | assert(base_level_ == 1); |
1987 | 0 | base_level_size = base_bytes_max; |
1988 | 3.84k | } else { |
1989 | 3.84k | base_level_size = cur_level_size; |
1990 | 3.84k | } |
1991 | 3.85k | } |
1992 | | |
1993 | 0 | uint64_t level_size = base_level_size; |
1994 | 22.8k | for (int i = base_level_; i < num_levels_; i++18.7k ) { |
1995 | 18.7k | if (i > base_level_) { |
1996 | 14.2k | level_size = MultiplyCheckOverflow( |
1997 | 14.2k | level_size, options.max_bytes_for_level_multiplier); |
1998 | 14.2k | } |
1999 | 18.7k | level_max_bytes_[i] = level_size; |
2000 | 18.7k | } |
2001 | 4.02k | } |
2002 | 4.11k | } |
2003 | 1.21M | } |
2004 | | |
2005 | 3 | uint64_t VersionStorageInfo::EstimateLiveDataSize() const { |
2006 | | // Estimate the live data size by adding up the size of the last level for all |
2007 | | // key ranges. Note: Estimate depends on the ordering of files in level 0 |
2008 | | // because files in level 0 can be overlapping. |
2009 | 3 | uint64_t size = 0; |
2010 | | |
2011 | 21 | auto ikey_lt = [this](InternalKey* x, InternalKey* y) { |
2012 | 21 | return internal_comparator_->Compare(*x, *y) < 0; |
2013 | 21 | }; |
2014 | | // (Ordered) map of largest keys in non-overlapping files |
2015 | 3 | std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt); |
2016 | | |
2017 | 22 | for (int l = num_levels_ - 1; l >= 0; l--19 ) { |
2018 | 19 | bool found_end = false; |
2019 | 19 | for (auto file : files_[l]) { |
2020 | | // Find the first file where the largest key is larger than the smallest |
2021 | | // key of the current file. If this file does not overlap with the |
2022 | | // current file, none of the files in the map does. If there is |
2023 | | // no potential overlap, we can safely insert the rest of this level |
2024 | | // (if the level is not 0) into the map without checking again because |
2025 | | // the elements in the level are sorted and non-overlapping. |
2026 | 14 | auto lb = (found_end && l != 01 ) ? |
2027 | 14 | ranges.end()0 : ranges.lower_bound(&file->smallest.key); |
2028 | 14 | found_end = (lb == ranges.end()); |
2029 | 14 | if (found_end || internal_comparator_->Compare( |
2030 | 11 | file->largest.key, (*lb).second->smallest.key) < 0) { |
2031 | 5 | ranges.emplace_hint(lb, &file->largest.key, file); |
2032 | 5 | size += file->fd.total_file_size; |
2033 | 5 | } |
2034 | 14 | } |
2035 | 19 | } |
2036 | 3 | return size; |
2037 | 3 | } |
2038 | | |
2039 | | |
2040 | 2.16M | void Version::AddLiveFiles(std::vector<FileDescriptor>* live) { |
2041 | 7.28M | for (int level = 0; level < storage_info_.num_levels(); level++5.12M ) { |
2042 | 5.12M | const std::vector<FileMetaData*>& files = storage_info_.files_[level]; |
2043 | 5.12M | for (const auto& file : files) { |
2044 | 2.52M | live->push_back(file->fd); |
2045 | 2.52M | } |
2046 | 5.12M | } |
2047 | 2.16M | } |
2048 | | |
2049 | 78 | std::string Version::DebugString(bool hex) const { |
2050 | 78 | std::string r; |
2051 | 546 | for (int level = 0; level < storage_info_.num_levels_; level++468 ) { |
2052 | | // E.g., |
2053 | | // --- level 1 --- |
2054 | | // 17:123['a' .. 'd'] |
2055 | | // 20:43['e' .. 'g'] |
2056 | 468 | r.append("--- level "); |
2057 | 468 | AppendNumberTo(&r, level); |
2058 | 468 | r.append(" --- version# "); |
2059 | 468 | AppendNumberTo(&r, version_number_); |
2060 | 468 | r.append(" ---\n"); |
2061 | 468 | const std::vector<FileMetaData*>& files = storage_info_.files_[level]; |
2062 | 668 | for (size_t i = 0; i < files.size(); i++200 ) { |
2063 | 200 | r.append(files[i]->ToString()); |
2064 | 200 | } |
2065 | 468 | } |
2066 | 78 | return r; |
2067 | 78 | } |
2068 | | |
2069 | 144 | Result<std::string> Version::GetMiddleKey() { |
2070 | | // Largest files are at lowest level. |
2071 | 144 | const auto level = storage_info_.num_levels_ - 1; |
2072 | 144 | const FileMetaData* largest_sst_meta = nullptr; |
2073 | 474 | for (const auto* file : storage_info_.files_[level]) { |
2074 | 474 | if (!largest_sst_meta || |
2075 | 474 | file->fd.GetTotalFileSize() > largest_sst_meta->fd.GetTotalFileSize()331 ) { |
2076 | 351 | largest_sst_meta = file; |
2077 | 351 | } |
2078 | 474 | } |
2079 | 144 | if (!largest_sst_meta) { |
2080 | 1 | return STATUS(Incomplete, "No SST files."); |
2081 | 1 | } |
2082 | | |
2083 | 143 | const auto trwh = VERIFY_RESULT(table_cache_->GetTableReader( |
2084 | 143 | vset_->env_options_, cfd_->internal_comparator(), largest_sst_meta->fd, kDefaultQueryId, |
2085 | 143 | /* no_io =*/ false, cfd_->internal_stats()->GetFileReadHist(level), |
2086 | 143 | IsFilterSkipped(level, /* is_file_last_in_level =*/ true))); |
2087 | 0 | return trwh.table_reader->GetMiddleKey(); |
2088 | 143 | } |
2089 | | |
2090 | | // this is used to batch writes to the manifest file |
2091 | | struct VersionSet::ManifestWriter { |
2092 | | Status status; |
2093 | | bool done; |
2094 | | InstrumentedCondVar cv; |
2095 | | ColumnFamilyData* cfd; |
2096 | | VersionEdit* edit; |
2097 | | |
2098 | | explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, |
2099 | | VersionEdit* e) |
2100 | 335k | : done(false), cv(mu), cfd(_cfd), edit(e) {} |
2101 | | }; |
2102 | | |
2103 | | constexpr uint64_t VersionSet::kInitialNextFileNumber; |
2104 | | |
2105 | | VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, |
2106 | | const EnvOptions& storage_options, Cache* table_cache, |
2107 | | WriteBuffer* write_buffer, |
2108 | | WriteController* write_controller) |
2109 | | : column_family_set_(new ColumnFamilySet( |
2110 | | dbname, db_options, storage_options, table_cache, |
2111 | | write_buffer, write_controller)), |
2112 | | env_(db_options->env), |
2113 | | dbname_(dbname), |
2114 | | db_options_(db_options), |
2115 | | env_options_(storage_options), |
2116 | 437k | env_options_compactions_(env_options_) {} |
2117 | | |
2118 | 397k | VersionSet::~VersionSet() { |
2119 | | // we need to delete column_family_set_ because its destructor depends on |
2120 | | // VersionSet |
2121 | 397k | column_family_set_.reset(); |
2122 | 397k | for (auto file : obsolete_files_) { |
2123 | 36.4k | delete file; |
2124 | 36.4k | } |
2125 | 397k | obsolete_files_.clear(); |
2126 | 397k | } |
2127 | | |
2128 | | void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, |
2129 | 1.21M | Version* v) { |
2130 | | // compute new compaction score |
2131 | 1.21M | v->storage_info()->ComputeCompactionScore( |
2132 | 1.21M | *column_family_data->GetLatestMutableCFOptions(), |
2133 | 1.21M | column_family_data->ioptions()->compaction_options_fifo); |
2134 | | |
2135 | | // Mark v finalized |
2136 | 1.21M | v->storage_info_.SetFinalized(); |
2137 | | |
2138 | | // Make "v" current |
2139 | 1.21M | assert(v->refs_ == 0); |
2140 | 0 | Version* current = column_family_data->current(); |
2141 | 1.21M | assert(v != current); |
2142 | 1.21M | if (current != nullptr) { |
2143 | 773k | assert(current->refs_ > 0); |
2144 | 0 | current->Unref(); |
2145 | 773k | } |
2146 | 0 | column_family_data->SetCurrent(v); |
2147 | 1.21M | v->Ref(); |
2148 | | |
2149 | | // Append to linked list |
2150 | 1.21M | v->prev_ = column_family_data->dummy_versions()->prev_; |
2151 | 1.21M | v->next_ = column_family_data->dummy_versions(); |
2152 | 1.21M | v->prev_->next_ = v; |
2153 | 1.21M | v->next_->prev_ = v; |
2154 | 1.21M | } |
2155 | | |
2156 | | Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, |
2157 | | const MutableCFOptions& mutable_cf_options, |
2158 | | VersionEdit* edit, InstrumentedMutex* mu, |
2159 | | Directory* db_directory, bool new_descriptor_log, |
2160 | 335k | const ColumnFamilyOptions* new_cf_options) { |
2161 | 335k | mu->AssertHeld(); |
2162 | | |
2163 | | // column_family_data can be nullptr only if this is column_family_add. |
2164 | | // in that case, we also need to specify ColumnFamilyOptions |
2165 | 335k | if (column_family_data == nullptr) { |
2166 | 1.71k | assert(edit->column_family_name_); |
2167 | 0 | assert(new_cf_options != nullptr); |
2168 | 1.71k | } |
2169 | | |
2170 | | // queue our request |
2171 | 0 | ManifestWriter w(mu, column_family_data, edit); |
2172 | 335k | manifest_writers_.push_back(&w); |
2173 | 336k | while (!w.done && &w != manifest_writers_.front()336k ) { |
2174 | 1.31k | w.cv.Wait(); |
2175 | 1.31k | } |
2176 | 335k | if (w.done) { |
2177 | 22 | return w.status; |
2178 | 22 | } |
2179 | 335k | if (column_family_data != nullptr && column_family_data->IsDropped()333k ) { |
2180 | | // if column family is dropped by the time we get here, no need to write |
2181 | | // anything to the manifest |
2182 | 1 | manifest_writers_.pop_front(); |
2183 | | // Notify new head of write queue |
2184 | 1 | if (!manifest_writers_.empty()) { |
2185 | 0 | manifest_writers_.front()->cv.Signal(); |
2186 | 0 | } |
2187 | | // we steal this code to also inform about cf-drop |
2188 | 1 | return STATUS(ShutdownInProgress, ""); |
2189 | 1 | } |
2190 | | |
2191 | 335k | std::vector<VersionEdit*> batch_edits; |
2192 | 335k | Version* v = nullptr; |
2193 | 335k | std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr); |
2194 | | |
2195 | | // Process all requests in the queue. |
2196 | 335k | ManifestWriter* last_writer = &w; |
2197 | 335k | assert(!manifest_writers_.empty()); |
2198 | 0 | assert(manifest_writers_.front() == &w); |
2199 | | |
2200 | 0 | UserFrontierPtr flushed_frontier_override; |
2201 | 335k | if (edit->IsColumnFamilyManipulation()) { |
2202 | | // No group commits for column family add or drop. |
2203 | 1.74k | LogAndApplyCFHelper(edit); |
2204 | 1.74k | batch_edits.push_back(edit); |
2205 | 333k | } else { |
2206 | 333k | v = new Version(column_family_data, this, current_version_number_++); |
2207 | 333k | builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); |
2208 | 333k | auto* builder = builder_guard->version_builder(); |
2209 | 333k | for (const auto& writer : manifest_writers_) { |
2210 | 333k | if (writer->edit->IsColumnFamilyManipulation() || |
2211 | 333k | writer->cfd->GetID() != column_family_data->GetID()333k ) { |
2212 | | // No group commits for column family add or drop. |
2213 | | // Also, group commits across column families are not supported. |
2214 | 1 | break; |
2215 | 1 | } |
2216 | 333k | FrontierModificationMode frontier_mode = FrontierModificationMode::kUpdate; |
2217 | 333k | const bool force_flushed_frontier = writer->edit->force_flushed_frontier_; |
2218 | 333k | if (force_flushed_frontier) { |
2219 | 1.80k | if (writer != &w) { |
2220 | | // No group commit for edits that force a particular value of flushed frontier, either. |
2221 | | // (Also see the logic at the end of the for loop body.) |
2222 | 0 | break; |
2223 | 0 | } |
2224 | 1.80k | new_descriptor_log = true; |
2225 | 1.80k | flushed_frontier_override = edit->flushed_frontier_; |
2226 | 1.80k | } |
2227 | 333k | last_writer = writer; |
2228 | 333k | LogAndApplyHelper(column_family_data, builder, last_writer->edit, mu); |
2229 | 333k | batch_edits.push_back(last_writer->edit); |
2230 | | |
2231 | 333k | if (force_flushed_frontier) { |
2232 | | // This is also needed to disable group commit for flushed-frontier-forcing edits. |
2233 | 1.80k | break; |
2234 | 1.80k | } |
2235 | 333k | } |
2236 | | |
2237 | 333k | builder->SaveTo(v->storage_info()); |
2238 | 333k | } |
2239 | | |
2240 | | // Initialize new descriptor log file if necessary by creating |
2241 | | // a temporary file that contains a snapshot of the current version. |
2242 | 335k | uint64_t new_manifest_file_size = 0; |
2243 | 335k | Status s; |
2244 | | |
2245 | 335k | assert(pending_manifest_file_number_ == 0); |
2246 | 335k | if (!descriptor_log_ || |
2247 | 335k | manifest_file_size_ > db_options_->max_manifest_file_size60.4k ) { |
2248 | 275k | pending_manifest_file_number_ = NewFileNumber(); |
2249 | 275k | batch_edits.back()->SetNextFile(next_file_number_.load()); |
2250 | 275k | new_descriptor_log = true; |
2251 | 275k | } else { |
2252 | 59.9k | pending_manifest_file_number_ = manifest_file_number_; |
2253 | 59.9k | } |
2254 | | |
2255 | 335k | if (new_descriptor_log) { |
2256 | | // If we're writing out new snapshot make sure to persist max column family. |
2257 | 277k | if (column_family_set_->GetMaxColumnFamily() > 0) { |
2258 | 4.23k | edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); |
2259 | 4.23k | } |
2260 | 277k | } |
2261 | | |
2262 | | // Unlock during expensive operations. New writes cannot get here |
2263 | | // because &w is ensuring that all new writes get queued. |
2264 | 335k | { |
2265 | | |
2266 | 335k | mu->Unlock(); |
2267 | | |
2268 | 335k | TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); |
2269 | 335k | if (!edit->IsColumnFamilyManipulation() && |
2270 | 335k | db_options_->max_open_files == -1333k ) { |
2271 | | // unlimited table cache. Pre-load table handle now. |
2272 | | // Need to do it out of the mutex. |
2273 | 897 | builder_guard->version_builder()->LoadTableHandlers( |
2274 | 897 | column_family_data->internal_stats(), |
2275 | 897 | column_family_data->ioptions()->optimize_filters_for_hits); |
2276 | 897 | } |
2277 | | |
2278 | | // This is fine because everything inside of this block is serialized -- |
2279 | | // only one thread can be here at the same time. |
2280 | 335k | if (new_descriptor_log) { |
2281 | | // Create a new manifest file. |
2282 | 277k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
2283 | 277k | "Creating manifest %" PRIu64 "\n", pending_manifest_file_number_); |
2284 | 277k | unique_ptr<WritableFile> descriptor_file; |
2285 | 277k | EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_); |
2286 | 277k | descriptor_log_file_name_ = DescriptorFileName(dbname_, pending_manifest_file_number_); |
2287 | 277k | s = NewWritableFile( |
2288 | 277k | env_, descriptor_log_file_name_, |
2289 | 277k | &descriptor_file, opt_env_opts); |
2290 | 277k | if (s.ok()) { |
2291 | 277k | descriptor_file->SetPreallocationBlockSize( |
2292 | 277k | db_options_->manifest_preallocation_size); |
2293 | | |
2294 | 277k | unique_ptr<WritableFileWriter> file_writer( |
2295 | 277k | new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); |
2296 | 277k | descriptor_log_.reset(new log::Writer( |
2297 | 277k | std::move(file_writer), /* log_number */ 0, /* recycle_log_files */ false)); |
2298 | | // This will write a snapshot containing metadata for all files in this DB. If we are |
2299 | | // forcing a particular value of the flushed frontier, we need to set it in this snapshot |
2300 | | // version edit as well. |
2301 | 277k | s = WriteSnapshot(descriptor_log_.get(), flushed_frontier_override); |
2302 | 277k | } else { |
2303 | 24 | descriptor_log_file_name_ = ""; |
2304 | 24 | } |
2305 | 277k | } |
2306 | | |
2307 | 335k | if (!edit->IsColumnFamilyManipulation()) { |
2308 | | // This is cpu-heavy operations, which should be called outside mutex. |
2309 | 333k | v->PrepareApply(mutable_cf_options, true); |
2310 | 333k | } |
2311 | | |
2312 | | // Write new records to MANIFEST log. |
2313 | 335k | if (s.ok()) { |
2314 | 335k | for (auto& e : batch_edits) { |
2315 | 335k | std::string record; |
2316 | 335k | if (!e->AppendEncodedTo(&record)) { |
2317 | 0 | s = STATUS(Corruption, |
2318 | 0 | "Unable to Encode VersionEdit:" + e->DebugString(true)); |
2319 | 0 | break; |
2320 | 0 | } |
2321 | 335k | TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord", |
2322 | 335k | rocksdb_kill_odds * REDUCE_ODDS2); |
2323 | 335k | s = descriptor_log_->AddRecord(record); |
2324 | 335k | if (!s.ok()) { |
2325 | 2 | break; |
2326 | 2 | } |
2327 | 335k | } |
2328 | 335k | if (s.ok()) { |
2329 | 335k | s = SyncManifest(env_, db_options_, descriptor_log_->file()); |
2330 | 335k | } |
2331 | 335k | if (!s.ok()) { |
2332 | 3 | RLOG(InfoLogLevel::ERROR_LEVEL, db_options_->info_log, |
2333 | 3 | "MANIFEST write: %s\n", s.ToString().c_str()); |
2334 | 3 | } |
2335 | 335k | } |
2336 | | |
2337 | 335k | std::string obsolete_manifest; |
2338 | | // If we just created a new descriptor file, install it by writing a |
2339 | | // new CURRENT file that points to it. |
2340 | 335k | if (s.ok() && new_descriptor_log335k ) { |
2341 | 277k | s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, db_directory, |
2342 | 277k | db_options_->disableDataSync); |
2343 | | // Leave the old file behind since PurgeObsoleteFiles will take care of it |
2344 | | // later. It's unsafe to delete now since file deletion may be disabled. |
2345 | 277k | obsolete_manifest = DescriptorFileName("", manifest_file_number_); |
2346 | 277k | } |
2347 | | |
2348 | 335k | if (s.ok()335k ) { |
2349 | | // find offset in manifest file where this version is stored. |
2350 | 335k | s = db_options_->get_checkpoint_env()->GetFileSize( |
2351 | 335k | descriptor_log_file_name_, &new_manifest_file_size); |
2352 | 335k | } |
2353 | | |
2354 | 335k | if (edit->is_column_family_drop_) { |
2355 | 26 | TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0"); |
2356 | 26 | TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); |
2357 | 26 | TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); |
2358 | 26 | } |
2359 | | |
2360 | 335k | LogFlush(db_options_->info_log); |
2361 | 335k | TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone"); |
2362 | 335k | mu->Lock(); |
2363 | | |
2364 | 335k | if (!obsolete_manifest.empty()) { |
2365 | 277k | obsolete_manifests_.push_back(std::move(obsolete_manifest)); |
2366 | 277k | } |
2367 | 335k | } |
2368 | | |
2369 | | // Install the new version |
2370 | 335k | if (s.ok()) { |
2371 | 335k | if (edit->column_family_name_) { |
2372 | | // no group commit on column family add |
2373 | 1.71k | assert(batch_edits.size() == 1); |
2374 | 0 | assert(new_cf_options != nullptr); |
2375 | 0 | CreateColumnFamily(*new_cf_options, edit); |
2376 | 333k | } else if (edit->is_column_family_drop_) { |
2377 | 26 | assert(batch_edits.size() == 1); |
2378 | 0 | column_family_data->SetDropped(); |
2379 | 26 | if (column_family_data->Unref()) { |
2380 | 0 | delete column_family_data; |
2381 | 0 | } |
2382 | 333k | } else { |
2383 | 333k | uint64_t max_log_number_in_batch = 0; |
2384 | 333k | for (auto& e : batch_edits) { |
2385 | 333k | if (e->log_number_) { |
2386 | 38.3k | max_log_number_in_batch = |
2387 | 38.3k | std::max(max_log_number_in_batch, *e->log_number_); |
2388 | 38.3k | } |
2389 | 333k | } |
2390 | 333k | if (max_log_number_in_batch != 0) { |
2391 | 38.3k | assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); |
2392 | 0 | column_family_data->SetLogNumber(max_log_number_in_batch); |
2393 | 38.3k | } |
2394 | 0 | AppendVersion(column_family_data, v); |
2395 | 333k | } |
2396 | | |
2397 | 0 | manifest_file_number_ = pending_manifest_file_number_; |
2398 | 335k | manifest_file_size_ = new_manifest_file_size; |
2399 | 335k | prev_log_number_ = edit->prev_log_number_.get_value_or(0); |
2400 | 335k | if (flushed_frontier_override) { |
2401 | 1.80k | flushed_frontier_ = flushed_frontier_override; |
2402 | 333k | } else if (edit->flushed_frontier_) { |
2403 | 271k | UpdateFlushedFrontier(edit->flushed_frontier_); |
2404 | 271k | } |
2405 | 335k | } else { |
2406 | 7 | RLOG(InfoLogLevel::ERROR_LEVEL, db_options_->info_log, |
2407 | 7 | "Error in committing version %" PRIu64 " to [%s]", |
2408 | 7 | v->GetVersionNumber(), |
2409 | 7 | column_family_data ? column_family_data->GetName().c_str() |
2410 | 7 | : "<null>"); |
2411 | 7 | delete v; |
2412 | 7 | if (new_descriptor_log) { |
2413 | 2 | RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
2414 | 2 | "Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n", |
2415 | 2 | manifest_file_number_, pending_manifest_file_number_); |
2416 | 2 | descriptor_log_.reset(); |
2417 | 2 | env_->CleanupFile( |
2418 | 2 | DescriptorFileName(dbname_, pending_manifest_file_number_)); |
2419 | 2 | } |
2420 | 7 | } |
2421 | 0 | pending_manifest_file_number_ = 0; |
2422 | | |
2423 | | // wake up all the waiting writers |
2424 | 335k | while (true) { |
2425 | 335k | ManifestWriter* ready = manifest_writers_.front(); |
2426 | 335k | manifest_writers_.pop_front(); |
2427 | 335k | if (ready != &w) { |
2428 | 22 | ready->status = s; |
2429 | 22 | ready->done = true; |
2430 | 22 | ready->cv.Signal(); |
2431 | 22 | } |
2432 | 335k | if (ready == last_writer) break335k ; |
2433 | 335k | } |
2434 | | // Notify new head of write queue |
2435 | 335k | if (!manifest_writers_.empty()) { |
2436 | 1.28k | manifest_writers_.front()->cv.Signal(); |
2437 | 1.28k | } |
2438 | 335k | return s; |
2439 | 335k | } |
2440 | | |
2441 | 1.74k | void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { |
2442 | 1.74k | assert(edit->IsColumnFamilyManipulation()); |
2443 | 0 | edit->SetNextFile(next_file_number_.load()); |
2444 | 1.74k | edit->SetLastSequence(LastSequence()); |
2445 | 1.74k | if (edit->is_column_family_drop_) { |
2446 | | // if we drop column family, we have to make sure to save max column family, |
2447 | | // so that we don't reuse existing ID |
2448 | 26 | edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); |
2449 | 26 | } |
2450 | 1.74k | } |
2451 | | |
2452 | | void VersionSet::LogAndApplyHelper( |
2453 | | ColumnFamilyData* cfd, |
2454 | | VersionBuilder* builder, |
2455 | | VersionEdit* edit, |
2456 | 333k | InstrumentedMutex* mu) { |
2457 | 333k | mu->AssertHeld(); |
2458 | 333k | assert(!edit->IsColumnFamilyManipulation()); |
2459 | | |
2460 | 333k | if (edit->log_number_) { |
2461 | 38.3k | assert(edit->log_number_ >= cfd->GetLogNumber()); |
2462 | 0 | assert(edit->log_number_ < next_file_number_.load()); |
2463 | 38.3k | } |
2464 | | |
2465 | 333k | if (!edit->prev_log_number_) { |
2466 | 305k | edit->SetPrevLogNumber(prev_log_number_); |
2467 | 305k | } |
2468 | 333k | edit->SetNextFile(next_file_number_.load()); |
2469 | 333k | edit->SetLastSequence(LastSequence()); |
2470 | | |
2471 | 333k | if (flushed_frontier_ && !edit->force_flushed_frontier_11.3k ) { |
2472 | 9.59k | edit->UpdateFlushedFrontier(flushed_frontier_); |
2473 | 9.59k | } |
2474 | | |
2475 | 333k | builder->Apply(edit); |
2476 | 333k | } |
2477 | | |
2478 | | namespace { |
2479 | | |
2480 | | struct LogReporter : public log::Reader::Reporter { |
2481 | | Status* status; |
2482 | 0 | virtual void Corruption(size_t bytes, const Status& s) override { |
2483 | 0 | if (this->status->ok()) *this->status = s; |
2484 | 0 | } |
2485 | | }; |
2486 | | |
2487 | | class ManifestReader { |
2488 | | public: |
2489 | | ManifestReader(Env* env, Env* checkpoint_env, const EnvOptions& env_options, |
2490 | | BoundaryValuesExtractor* extractor, const std::string& dbname) |
2491 | | : env_(env), checkpoint_env_(checkpoint_env), env_options_(env_options), |
2492 | 436k | extractor_(extractor), dbname_(dbname) {} |
2493 | | |
2494 | 437k | Status OpenManifest() { |
2495 | 437k | auto status = ReadManifestFilename(); |
2496 | 437k | if (!status.ok()) { |
2497 | 2 | return status; |
2498 | 2 | } |
2499 | 437k | FileType type; |
2500 | 437k | bool parse_ok = ParseFileName(manifest_filename_, &manifest_file_number_, &type); |
2501 | 437k | if (!parse_ok437k || type != kDescriptorFile) { |
2502 | 0 | return STATUS(Corruption, "CURRENT file corrupted"); |
2503 | 0 | } |
2504 | | |
2505 | 437k | manifest_filename_ = dbname_ + "/" + manifest_filename_; |
2506 | 437k | std::unique_ptr<SequentialFileReader> manifest_file_reader; |
2507 | 437k | { |
2508 | 437k | std::unique_ptr<SequentialFile> manifest_file; |
2509 | 437k | status = env_->NewSequentialFile(manifest_filename_, &manifest_file, env_options_); |
2510 | 437k | if (!status.ok()) { |
2511 | 0 | return status; |
2512 | 0 | } |
2513 | 437k | manifest_file_reader.reset(new SequentialFileReader(std::move(manifest_file))); |
2514 | 437k | } |
2515 | 0 | status = checkpoint_env_->GetFileSize(manifest_filename_, ¤t_manifest_file_size_); |
2516 | 437k | if (!status.ok()) { |
2517 | 0 | return status; |
2518 | 0 | } |
2519 | | |
2520 | 437k | reader_.emplace(nullptr, std::move(manifest_file_reader), &reporter_, true /*checksum*/, |
2521 | 437k | 0 /*initial_offset*/, 0); |
2522 | 437k | reporter_.status = &status_; |
2523 | | |
2524 | 437k | return Status::OK(); |
2525 | 437k | } |
2526 | | |
2527 | 914k | CHECKED_STATUS Next() { |
2528 | 914k | Slice record; |
2529 | 914k | if (!reader_->ReadRecord(&record, &scratch_)) { |
2530 | 436k | return STATUS(EndOfFile, ""); |
2531 | 436k | } |
2532 | 477k | if (!status_.ok()) { |
2533 | 0 | return status_; |
2534 | 0 | } |
2535 | 477k | return edit_.DecodeFrom(extractor_, record); |
2536 | 477k | } |
2537 | | |
2538 | 477k | VersionEdit& operator*() { |
2539 | 477k | return edit_; |
2540 | 477k | } |
2541 | | |
2542 | 437k | uint64_t manifest_file_number() const { return manifest_file_number_; } |
2543 | 437k | uint64_t current_manifest_file_size() const { return current_manifest_file_size_; } |
2544 | 437k | const std::string& manifest_filename() const { return manifest_filename_; } |
2545 | | private: |
2546 | 437k | CHECKED_STATUS ReadManifestFilename() { |
2547 | | // Read "CURRENT" file, which contains a pointer to the current manifest file |
2548 | 437k | Status s = ReadFileToString(env_, CurrentFileName(dbname_), &manifest_filename_); |
2549 | 437k | if (!s.ok()) { |
2550 | 2 | return s; |
2551 | 2 | } |
2552 | 437k | if (manifest_filename_.empty() || manifest_filename_.back() != '\n'436k ) { |
2553 | 0 | return STATUS(Corruption, "CURRENT file does not end with newline"); |
2554 | 0 | } |
2555 | | // remove the trailing '\n' |
2556 | 437k | manifest_filename_.resize(manifest_filename_.size() - 1); |
2557 | 437k | return Status::OK(); |
2558 | 437k | } |
2559 | | |
2560 | | // In plaintext cluster, this is a default env, but in encrypted cluster, this encrypts on write |
2561 | | // and decrypts on read. |
2562 | | Env* const env_; |
2563 | | // Default env used to checkpoint files. In encrypted cluster, we don't want to decrypt |
2564 | | // checkpointed files, so using the default env preserves file encryption. |
2565 | | Env* const checkpoint_env_; |
2566 | | const EnvOptions& env_options_; |
2567 | | BoundaryValuesExtractor* extractor_; |
2568 | | std::string dbname_; |
2569 | | std::string manifest_filename_; |
2570 | | uint64_t manifest_file_number_ = 0; |
2571 | | uint64_t current_manifest_file_size_ = 0; |
2572 | | LogReporter reporter_; |
2573 | | boost::optional<log::Reader> reader_; |
2574 | | std::string scratch_; |
2575 | | Status status_; |
2576 | | VersionEdit edit_; |
2577 | | }; |
2578 | | |
2579 | | } // namespace |
2580 | | |
2581 | | Status VersionSet::Recover( |
2582 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
2583 | 437k | bool read_only) { |
2584 | 437k | std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options; |
2585 | 440k | for (auto cf : column_families) { |
2586 | 440k | cf_name_to_options.insert({cf.name, cf.options}); |
2587 | 440k | } |
2588 | | // keeps track of column families in manifest that were not found in |
2589 | | // column families parameters. if those column families are not dropped |
2590 | | // by subsequent manifest records, Recover() will return failure status |
2591 | 437k | std::unordered_map<int, std::string> column_families_not_found; |
2592 | | |
2593 | 437k | bool have_log_number = false; |
2594 | 437k | bool have_prev_log_number = false; |
2595 | 437k | bool have_next_file = false; |
2596 | 437k | bool have_last_sequence = false; |
2597 | 437k | UserFrontierPtr flushed_frontier; |
2598 | 437k | uint64_t next_file = 0; |
2599 | 437k | uint64_t last_sequence = 0; |
2600 | 437k | uint64_t log_number = 0; |
2601 | 437k | uint64_t previous_log_number = 0; |
2602 | 437k | uint32_t max_column_family = 0; |
2603 | 437k | std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> builders; |
2604 | | |
2605 | | // add default column family |
2606 | 437k | auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); |
2607 | 437k | if (default_cf_iter == cf_name_to_options.end()) { |
2608 | 2 | return STATUS(InvalidArgument, "Default column family not specified"); |
2609 | 2 | } |
2610 | 437k | VersionEdit default_cf_edit; |
2611 | 437k | default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); |
2612 | 437k | default_cf_edit.SetColumnFamily(0); |
2613 | 437k | ColumnFamilyData* default_cfd = |
2614 | 437k | CreateColumnFamily(default_cf_iter->second, &default_cf_edit); |
2615 | 437k | builders.emplace(0, std::make_unique<BaseReferencedVersionBuilder>(default_cfd)); |
2616 | | |
2617 | 437k | Status s; |
2618 | 437k | uint64_t current_manifest_file_size; |
2619 | 437k | std::string current_manifest_filename; |
2620 | 437k | { |
2621 | 437k | ManifestReader manifest_reader(env_, db_options_->get_checkpoint_env(), env_options_, |
2622 | 437k | db_options_->boundary_extractor.get(), dbname_); |
2623 | 437k | auto status = manifest_reader.OpenManifest(); |
2624 | 437k | if (!status.ok()) { |
2625 | 1 | return status; |
2626 | 1 | } |
2627 | 437k | current_manifest_file_size = manifest_reader.current_manifest_file_size(); |
2628 | 437k | current_manifest_filename = manifest_reader.manifest_filename(); |
2629 | 437k | manifest_file_number_ = manifest_reader.manifest_file_number(); |
2630 | | |
2631 | 913k | for (;;) { |
2632 | 913k | s = manifest_reader.Next(); |
2633 | 913k | if (!s.ok()) { |
2634 | 436k | break; |
2635 | 436k | } |
2636 | 476k | auto& edit = *manifest_reader; |
2637 | | // Not found means that user didn't supply that column |
2638 | | // family option AND we encountered column family add |
2639 | | // record. Once we encounter column family drop record, |
2640 | | // we will delete the column family from |
2641 | | // column_families_not_found. |
2642 | 476k | bool cf_in_not_found = |
2643 | 476k | column_families_not_found.find(edit.column_family_) != |
2644 | 476k | column_families_not_found.end(); |
2645 | | // in builders means that user supplied that column family |
2646 | | // option AND that we encountered column family add record |
2647 | 476k | bool cf_in_builders = |
2648 | 476k | builders.find(edit.column_family_) != builders.end(); |
2649 | | |
2650 | | // they can't both be true |
2651 | 476k | assert(!(cf_in_not_found && cf_in_builders)); |
2652 | | |
2653 | 0 | ColumnFamilyData* cfd = nullptr; |
2654 | | |
2655 | 476k | if (edit.column_family_name_) { |
2656 | 2.95k | if (cf_in_builders || cf_in_not_found) { |
2657 | 0 | s = STATUS(Corruption, |
2658 | 0 | "Manifest adding the same column family twice"); |
2659 | 0 | break; |
2660 | 0 | } |
2661 | 2.95k | auto cf_options = cf_name_to_options.find(*edit.column_family_name_); |
2662 | 2.95k | if (cf_options == cf_name_to_options.end()) { |
2663 | 14 | column_families_not_found.emplace(edit.column_family_, *edit.column_family_name_); |
2664 | 2.94k | } else { |
2665 | 2.94k | cfd = CreateColumnFamily(cf_options->second, &edit); |
2666 | 2.94k | builders.emplace(edit.column_family_, |
2667 | 2.94k | std::make_unique<BaseReferencedVersionBuilder>(cfd)); |
2668 | 2.94k | } |
2669 | 473k | } else if (edit.is_column_family_drop_) { |
2670 | 10 | if (cf_in_builders) { |
2671 | 2 | auto builder = builders.find(edit.column_family_); |
2672 | 2 | assert(builder != builders.end()); |
2673 | 0 | builders.erase(builder); |
2674 | 2 | cfd = column_family_set_->GetColumnFamily(edit.column_family_); |
2675 | 2 | if (cfd->Unref()) { |
2676 | 2 | delete cfd; |
2677 | 2 | cfd = nullptr; |
2678 | 2 | } else { |
2679 | | // who else can have reference to cfd!? |
2680 | 0 | assert(false); |
2681 | 0 | } |
2682 | 8 | } else if (cf_in_not_found) { |
2683 | 8 | column_families_not_found.erase(edit.column_family_); |
2684 | 8 | } else { |
2685 | 0 | s = STATUS(Corruption, |
2686 | 0 | "Manifest - dropping non-existing column family"); |
2687 | 0 | break; |
2688 | 0 | } |
2689 | 474k | } else if (473k !cf_in_not_found473k ) { |
2690 | 474k | if (!cf_in_builders) { |
2691 | 0 | s = STATUS(Corruption, |
2692 | 0 | "Manifest record referencing unknown column family"); |
2693 | 0 | break; |
2694 | 0 | } |
2695 | | |
2696 | 474k | cfd = column_family_set_->GetColumnFamily(edit.column_family_); |
2697 | | // this should never happen since cf_in_builders is true |
2698 | 474k | assert(cfd != nullptr); |
2699 | 474k | if (edit.max_level_ >= cfd->current()->storage_info()->num_levels()) { |
2700 | 6 | s = STATUS(InvalidArgument, |
2701 | 6 | "db has more levels than options.num_levels"); |
2702 | 6 | break; |
2703 | 6 | } |
2704 | | |
2705 | | // if it is not column family add or column family drop, |
2706 | | // then it's a file add/delete, which should be forwarded |
2707 | | // to builder |
2708 | 474k | auto builder = builders.find(edit.column_family_); |
2709 | 474k | assert(builder != builders.end()); |
2710 | 0 | builder->second->version_builder()->Apply(&edit); |
2711 | 474k | } |
2712 | | |
2713 | 477k | if (476k cfd != nullptr476k ) { |
2714 | 477k | if (edit.log_number_) { |
2715 | 456k | if (cfd->GetLogNumber() > edit.log_number_) { |
2716 | 0 | RLOG(InfoLogLevel::WARN_LEVEL, db_options_->info_log, |
2717 | 0 | "MANIFEST corruption detected, but ignored - Log numbers in " |
2718 | 0 | "records NOT monotonically increasing"); |
2719 | 456k | } else { |
2720 | 456k | cfd->SetLogNumber(*edit.log_number_); |
2721 | 456k | have_log_number = true; |
2722 | 456k | } |
2723 | 456k | } |
2724 | 477k | if (edit.comparator_ && |
2725 | 477k | *edit.comparator_ != cfd->user_comparator()->Name()14.2k ) { |
2726 | 5 | s = STATUS(InvalidArgument, |
2727 | 5 | cfd->user_comparator()->Name(), |
2728 | 5 | "does not match existing comparator " + *edit.comparator_); |
2729 | 5 | break; |
2730 | 5 | } |
2731 | 477k | } |
2732 | | |
2733 | 476k | if (edit.prev_log_number_) { |
2734 | 24.8k | previous_log_number = *edit.prev_log_number_; |
2735 | 24.8k | have_prev_log_number = true; |
2736 | 24.8k | } |
2737 | | |
2738 | 476k | if (edit.next_file_number_) { |
2739 | 451k | next_file = *edit.next_file_number_; |
2740 | 451k | have_next_file = true; |
2741 | 451k | } |
2742 | | |
2743 | 476k | if (edit.max_column_family_) { |
2744 | 2.60k | max_column_family = *edit.max_column_family_; |
2745 | 2.60k | } |
2746 | | |
2747 | 476k | if (edit.last_sequence_) { |
2748 | 451k | last_sequence = *edit.last_sequence_; |
2749 | 451k | have_last_sequence = true; |
2750 | 451k | } |
2751 | | |
2752 | 476k | if (edit.flushed_frontier_) { |
2753 | 11.4k | UpdateUserFrontier( |
2754 | 11.4k | &flushed_frontier, edit.flushed_frontier_, UpdateUserValueType::kLargest); |
2755 | 11.4k | VLOG(1) << "Updating flushed frontier with that from edit: " |
2756 | 7 | << edit.flushed_frontier_->ToString() |
2757 | 7 | << ", new flushed froniter: " << flushed_frontier->ToString(); |
2758 | 465k | } else { |
2759 | 18.4E | VLOG(1) << "No flushed frontier found in edit"; |
2760 | 465k | } |
2761 | 476k | } |
2762 | 437k | if (s.IsEndOfFile()) { |
2763 | 436k | s = Status::OK(); |
2764 | 436k | } |
2765 | 437k | } |
2766 | | |
2767 | 437k | if (s.ok()) { |
2768 | 436k | if (!have_next_file) { |
2769 | 1 | s = STATUS(Corruption, "no meta-nextfile entry in descriptor"); |
2770 | 436k | } else if (!have_log_number) { |
2771 | 0 | s = STATUS(Corruption, "no meta-lognumber entry in descriptor"); |
2772 | 436k | } else if (!have_last_sequence) { |
2773 | 0 | s = STATUS(Corruption, "no last-sequence-number entry in descriptor"); |
2774 | 0 | } |
2775 | | |
2776 | 436k | if (!have_prev_log_number) { |
2777 | 426k | previous_log_number = 0; |
2778 | 426k | } |
2779 | | |
2780 | 436k | column_family_set_->UpdateMaxColumnFamily(max_column_family); |
2781 | | |
2782 | 436k | MarkFileNumberUsedDuringRecovery(previous_log_number); |
2783 | 436k | MarkFileNumberUsedDuringRecovery(log_number); |
2784 | 436k | } |
2785 | | |
2786 | | // there were some column families in the MANIFEST that weren't specified |
2787 | | // in the argument. This is OK in read_only mode |
2788 | 437k | if (read_only == false && !column_families_not_found.empty()436k ) { |
2789 | 1 | std::string list_of_not_found; |
2790 | 3 | for (const auto& cf : column_families_not_found) { |
2791 | 3 | list_of_not_found += ", " + cf.second; |
2792 | 3 | } |
2793 | 1 | list_of_not_found = list_of_not_found.substr(2); |
2794 | 1 | s = STATUS(InvalidArgument, |
2795 | 1 | "You have to open all column families. Column families not opened: " + |
2796 | 1 | list_of_not_found); |
2797 | 1 | } |
2798 | | |
2799 | 437k | if (s.ok()) { |
2800 | 439k | for (auto cfd : *column_family_set_) { |
2801 | 439k | if (cfd->IsDropped()) { |
2802 | 0 | continue; |
2803 | 0 | } |
2804 | 439k | auto builders_iter = builders.find(cfd->GetID()); |
2805 | 439k | assert(builders_iter != builders.end()); |
2806 | 0 | auto* builder = builders_iter->second->version_builder(); |
2807 | | |
2808 | 439k | if (db_options_->max_open_files == -1) { |
2809 | | // unlimited table cache. Pre-load table handle now. |
2810 | | // Need to do it out of the mutex. |
2811 | 318 | builder->LoadTableHandlers(cfd->internal_stats(), |
2812 | 318 | db_options_->max_file_opening_threads); |
2813 | 318 | } |
2814 | | |
2815 | 439k | Version* v = new Version(cfd, this, current_version_number_++); |
2816 | 439k | builder->SaveTo(v->storage_info()); |
2817 | | |
2818 | | // Install recovered version |
2819 | 439k | v->PrepareApply(*cfd->GetLatestMutableCFOptions(), |
2820 | 439k | !(db_options_->skip_stats_update_on_db_open)); |
2821 | 439k | AppendVersion(cfd, v); |
2822 | 439k | } |
2823 | | |
2824 | 436k | manifest_file_size_ = current_manifest_file_size; |
2825 | 436k | next_file_number_.store(next_file + 1); |
2826 | 436k | SetLastSequenceNoSanityChecking(last_sequence); |
2827 | 436k | prev_log_number_ = previous_log_number; |
2828 | 436k | if (flushed_frontier) { |
2829 | 5.26k | UpdateFlushedFrontierNoSanityChecking(std::move(flushed_frontier)); |
2830 | 5.26k | } |
2831 | | |
2832 | 436k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
2833 | 436k | "Recovered from manifest file:%s succeeded," |
2834 | 436k | "manifest_file_number is %" PRIu64 ", next_file_number is %lu, " |
2835 | 436k | "last_sequence is %" PRIu64 ", log_number is %" PRIu64 "," |
2836 | 436k | "prev_log_number is %" PRIu64 "," |
2837 | 436k | "max_column_family is %u, flushed_values is %s\n", |
2838 | 436k | current_manifest_filename.c_str(), manifest_file_number_, |
2839 | 436k | next_file_number_.load(), LastSequence(), |
2840 | 436k | log_number, prev_log_number_, |
2841 | 436k | column_family_set_->GetMaxColumnFamily(), |
2842 | 436k | yb::ToString(flushed_frontier_).c_str()); |
2843 | | |
2844 | 440k | for (auto cfd : *column_family_set_) { |
2845 | 440k | if (cfd->IsDropped()) { |
2846 | 0 | continue; |
2847 | 0 | } |
2848 | 440k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
2849 | 440k | "Column family [%s] (ID %u), log number is %" PRIu64 "\n", |
2850 | 440k | cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); |
2851 | 440k | } |
2852 | 436k | } |
2853 | | |
2854 | 437k | return s; |
2855 | 437k | } |
2856 | | |
2857 | | Status VersionSet::Import(const std::string& source_dir, |
2858 | | SequenceNumber seqno, |
2859 | 0 | VersionEdit* edit) { |
2860 | 0 | ManifestReader manifest_reader(env_, db_options_->get_checkpoint_env(), env_options_, |
2861 | 0 | db_options_->boundary_extractor.get(), source_dir); |
2862 | 0 | auto status = manifest_reader.OpenManifest(); |
2863 | 0 | if (!status.ok()) { |
2864 | 0 | return status; |
2865 | 0 | } |
2866 | 0 | std::vector<FileMetaData> files; |
2867 | 0 | std::vector<std::pair<SequenceNumber, SequenceNumber>> segments; |
2868 | 0 | for (;;) { |
2869 | 0 | status = manifest_reader.Next(); |
2870 | 0 | if (!status.ok()) { |
2871 | 0 | break; |
2872 | 0 | } |
2873 | 0 | auto& current = *manifest_reader; |
2874 | 0 | if (!current.GetDeletedFiles().empty()) { |
2875 | 0 | return STATUS(Corruption, "Deleted files should be empty"); |
2876 | 0 | } |
2877 | 0 | for (const auto& file : current.GetNewFiles()) { |
2878 | 0 | auto filemeta = file.second; |
2879 | 0 | filemeta.largest.user_frontier.reset(); |
2880 | 0 | filemeta.smallest.user_frontier.reset(); |
2881 | 0 | filemeta.imported = true; |
2882 | 0 | if (filemeta.largest.seqno >= seqno) { |
2883 | 0 | return STATUS_FORMAT(InvalidArgument, |
2884 | 0 | "Imported DB contains seqno ($0) greater than active seqno ($1)", |
2885 | 0 | filemeta.largest.seqno, |
2886 | 0 | seqno); |
2887 | 0 | } |
2888 | 0 | files.push_back(filemeta); |
2889 | 0 | segments.emplace_back(filemeta.smallest.seqno, filemeta.largest.seqno); |
2890 | 0 | } |
2891 | 0 | } |
2892 | 0 | if (!status.IsEndOfFile()) { |
2893 | 0 | return status; |
2894 | 0 | } |
2895 | | |
2896 | 0 | if (files.empty()) { |
2897 | 0 | return STATUS_FORMAT(NotFound, "Imported DB is empty: $0", source_dir); |
2898 | 0 | } |
2899 | | |
2900 | 0 | std::vector<LiveFileMetaData> live_files; |
2901 | 0 | GetLiveFilesMetaData(&live_files); |
2902 | 0 | for (const auto& file : live_files) { |
2903 | 0 | segments.emplace_back(file.smallest.seqno, file.largest.seqno); |
2904 | 0 | } |
2905 | |
|
2906 | 0 | std::sort(segments.begin(), segments.end(), [](const auto& lhs, const auto& rhs) { |
2907 | 0 | return lhs.first < rhs.first; |
2908 | 0 | }); |
2909 | 0 | auto prev = segments.front(); |
2910 | 0 | for (size_t i = 1; i != segments.size(); ++i) { |
2911 | 0 | const auto& segment = segments[i]; |
2912 | 0 | if (segment.first <= prev.second) { |
2913 | 0 | return STATUS_FORMAT(Corruption, |
2914 | 0 | "Overlapping seqno ranges: [$0, $1] and [$2, $3]", |
2915 | 0 | prev.first, |
2916 | 0 | prev.second, |
2917 | 0 | segment.first, |
2918 | 0 | segment.second); |
2919 | 0 | } |
2920 | 0 | prev = segment; |
2921 | 0 | } |
2922 | | |
2923 | 0 | std::vector<std::string> revert_list; |
2924 | 0 | for (auto file : files) { |
2925 | 0 | auto source_base = MakeTableFileName(source_dir, file.fd.GetNumber()); |
2926 | 0 | auto source_data = TableBaseToDataFileName(source_base); |
2927 | 0 | auto new_number = NewFileNumber(); |
2928 | 0 | auto dest_base = MakeTableFileName(dbname_, new_number); |
2929 | 0 | auto dest_data = TableBaseToDataFileName(dest_base); |
2930 | 0 | LOG(INFO) << "Importing: " << source_base << " => " << dest_base; |
2931 | 0 | status = env_->LinkFile(source_base, dest_base); |
2932 | 0 | if (!status.ok()) { |
2933 | 0 | break; |
2934 | 0 | } |
2935 | 0 | revert_list.push_back(dest_base); |
2936 | 0 | status = env_->LinkFile(source_data, dest_data); |
2937 | 0 | if (!status.ok()) { |
2938 | 0 | break; |
2939 | 0 | } |
2940 | 0 | revert_list.push_back(dest_data); |
2941 | 0 | file.fd.packed_number_and_path_id = new_number; // path is 0 |
2942 | 0 | file.marked_for_compaction = false; |
2943 | 0 | edit->AddCleanedFile(0, file); |
2944 | 0 | } |
2945 | |
|
2946 | 0 | if (!status.ok()) { |
2947 | 0 | for (const auto& file : revert_list) { |
2948 | 0 | auto delete_status = env_->DeleteFile(file); |
2949 | 0 | LOG(ERROR) << "Failed to delete file: " << file << ", status: " << delete_status.ToString(); |
2950 | 0 | } |
2951 | 0 | return status; |
2952 | 0 | } |
2953 | | |
2954 | 0 | return Status::OK(); |
2955 | 0 | } |
2956 | | |
2957 | | Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families, |
2958 | | const std::string& dbname, |
2959 | | BoundaryValuesExtractor* extractor, |
2960 | 8 | Env* env) { |
2961 | | // these are just for performance reasons, not correctness, |
2962 | | // so we're fine using the defaults |
2963 | 8 | EnvOptions soptions; |
2964 | | // Read "CURRENT" file, which contains a pointer to the current manifest file |
2965 | 8 | std::string current; |
2966 | 8 | Status s = ReadFileToString(env, CurrentFileName(dbname), ¤t); |
2967 | 8 | if (!s.ok()) { |
2968 | 0 | return s; |
2969 | 0 | } |
2970 | 8 | if (current.empty() || current[current.size()-1] != '\n') { |
2971 | 1 | return STATUS(Corruption, "CURRENT file does not end with newline"); |
2972 | 1 | } |
2973 | 7 | current.resize(current.size() - 1); |
2974 | | |
2975 | 7 | std::string dscname = dbname + "/" + current; |
2976 | | |
2977 | 7 | unique_ptr<SequentialFileReader> file_reader; |
2978 | 7 | { |
2979 | 7 | unique_ptr<SequentialFile> file; |
2980 | 7 | s = env->NewSequentialFile(dscname, &file, soptions); |
2981 | 7 | if (!s.ok()) { |
2982 | 0 | return s; |
2983 | 0 | } |
2984 | 7 | file_reader.reset(new SequentialFileReader(std::move(file))); |
2985 | 7 | } |
2986 | | |
2987 | 0 | std::map<uint32_t, std::string> column_family_names; |
2988 | | // default column family is always implicitly there |
2989 | 7 | column_family_names.insert({0, kDefaultColumnFamilyName}); |
2990 | 7 | LogReporter reporter; |
2991 | 7 | reporter.status = &s; |
2992 | 7 | log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/, |
2993 | 7 | 0 /*initial_offset*/, 0); |
2994 | 7 | Slice record; |
2995 | 7 | std::string scratch; |
2996 | 39 | while (reader.ReadRecord(&record, &scratch) && s.ok()32 ) { |
2997 | 32 | VersionEdit edit; |
2998 | 32 | s = edit.DecodeFrom(extractor, record); |
2999 | 32 | if (!s.ok()) { |
3000 | 0 | break; |
3001 | 0 | } |
3002 | 32 | if (edit.column_family_name_) { |
3003 | 2 | if (column_family_names.find(edit.column_family_) != |
3004 | 2 | column_family_names.end()) { |
3005 | 0 | s = STATUS(Corruption, "Manifest adding the same column family twice"); |
3006 | 0 | break; |
3007 | 0 | } |
3008 | 2 | column_family_names.emplace(edit.column_family_, *edit.column_family_name_); |
3009 | 30 | } else if (edit.is_column_family_drop_) { |
3010 | 0 | if (column_family_names.find(edit.column_family_) == |
3011 | 0 | column_family_names.end()) { |
3012 | 0 | s = STATUS(Corruption, |
3013 | 0 | "Manifest - dropping non-existing column family"); |
3014 | 0 | break; |
3015 | 0 | } |
3016 | 0 | column_family_names.erase(edit.column_family_); |
3017 | 0 | } |
3018 | 32 | } |
3019 | | |
3020 | 7 | column_families->clear(); |
3021 | 7 | if (s.ok()) { |
3022 | 9 | for (const auto& iter : column_family_names) { |
3023 | 9 | column_families->push_back(iter.second); |
3024 | 9 | } |
3025 | 7 | } |
3026 | | |
3027 | 7 | return s; |
3028 | 7 | } |
3029 | | |
3030 | | #ifndef ROCKSDB_LITE |
3031 | | Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, |
3032 | | const Options* options, |
3033 | | const EnvOptions& env_options, |
3034 | 5 | int new_levels) { |
3035 | 5 | if (new_levels <= 1) { |
3036 | 0 | return STATUS(InvalidArgument, |
3037 | 0 | "Number of levels needs to be bigger than 1"); |
3038 | 0 | } |
3039 | | |
3040 | 5 | ColumnFamilyOptions cf_options(*options); |
3041 | 5 | std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10, |
3042 | 5 | options->table_cache_numshardbits)); |
3043 | 5 | WriteController wc(options->delayed_write_rate); |
3044 | 5 | WriteBuffer wb(options->db_write_buffer_size); |
3045 | 5 | VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc); |
3046 | 5 | Status status; |
3047 | | |
3048 | 5 | std::vector<ColumnFamilyDescriptor> dummy; |
3049 | 5 | ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, |
3050 | 5 | ColumnFamilyOptions(*options)); |
3051 | 5 | dummy.push_back(dummy_descriptor); |
3052 | 5 | status = versions.Recover(dummy); |
3053 | 5 | if (!status.ok()) { |
3054 | 0 | return status; |
3055 | 0 | } |
3056 | | |
3057 | 5 | Version* current_version = |
3058 | 5 | versions.GetColumnFamilySet()->GetDefault()->current(); |
3059 | 5 | auto* vstorage = current_version->storage_info(); |
3060 | 5 | int current_levels = vstorage->num_levels(); |
3061 | | |
3062 | 5 | if (current_levels <= new_levels) { |
3063 | 0 | return Status::OK(); |
3064 | 0 | } |
3065 | | |
3066 | | // Make sure there are file only on one level from |
3067 | | // (new_levels-1) to (current_levels-1) |
3068 | 5 | int first_nonempty_level = -1; |
3069 | 5 | int first_nonempty_level_filenum = 0; |
3070 | 636 | for (int i = new_levels - 1; i < current_levels; i++631 ) { |
3071 | 631 | int file_num = vstorage->NumLevelFiles(i); |
3072 | 631 | if (file_num != 0) { |
3073 | 5 | if (first_nonempty_level < 0) { |
3074 | 5 | first_nonempty_level = i; |
3075 | 5 | first_nonempty_level_filenum = file_num; |
3076 | 5 | } else { |
3077 | 0 | char msg[255]; |
3078 | 0 | snprintf(msg, sizeof(msg), |
3079 | 0 | "Found at least two levels containing files: " |
3080 | 0 | "[%d:%d],[%d:%d].\n", |
3081 | 0 | first_nonempty_level, first_nonempty_level_filenum, i, |
3082 | 0 | file_num); |
3083 | 0 | return STATUS(InvalidArgument, msg); |
3084 | 0 | } |
3085 | 5 | } |
3086 | 631 | } |
3087 | | |
3088 | | // we need to allocate an array with the old number of levels size to |
3089 | | // avoid SIGSEGV in WriteSnapshot() |
3090 | | // however, all levels bigger or equal to new_levels will be empty |
3091 | 5 | std::vector<FileMetaData*>* new_files_list = |
3092 | 5 | new std::vector<FileMetaData*>[current_levels]; |
3093 | 14 | for (int i = 0; i < new_levels - 1; i++9 ) { |
3094 | 9 | new_files_list[i] = vstorage->LevelFiles(i); |
3095 | 9 | } |
3096 | | |
3097 | 5 | if (first_nonempty_level > 0) { |
3098 | 5 | new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level); |
3099 | 5 | } |
3100 | | |
3101 | 5 | delete[] vstorage -> files_; |
3102 | 5 | vstorage->files_ = new_files_list; |
3103 | 5 | vstorage->num_levels_ = new_levels; |
3104 | | |
3105 | 5 | MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options)); |
3106 | 5 | VersionEdit ve; |
3107 | 5 | InstrumentedMutex dummy_mutex; |
3108 | 5 | InstrumentedMutexLock l(&dummy_mutex); |
3109 | 5 | return versions.LogAndApply( |
3110 | 5 | versions.GetColumnFamilySet()->GetDefault(), |
3111 | 5 | mutable_cf_options, &ve, &dummy_mutex, nullptr, true); |
3112 | 5 | } |
3113 | | |
3114 | | Status VersionSet::DumpManifest(const Options& options, const std::string& dscname, |
3115 | 0 | bool verbose, bool hex) { |
3116 | | // Open the specified manifest file. |
3117 | 0 | unique_ptr<SequentialFileReader> file_reader; |
3118 | 0 | Status s; |
3119 | 0 | { |
3120 | 0 | unique_ptr<SequentialFile> file; |
3121 | 0 | s = options.env->NewSequentialFile(dscname, &file, env_options_); |
3122 | 0 | if (!s.ok()) { |
3123 | 0 | return s; |
3124 | 0 | } |
3125 | 0 | file_reader.reset(new SequentialFileReader(std::move(file))); |
3126 | 0 | } |
3127 | | |
3128 | 0 | bool have_prev_log_number = false; |
3129 | 0 | bool have_next_file = false; |
3130 | 0 | bool have_last_sequence = false; |
3131 | 0 | uint64_t next_file = 0; |
3132 | 0 | uint64_t last_sequence = 0; |
3133 | 0 | uint64_t previous_log_number = 0; |
3134 | 0 | UserFrontier* flushed_frontier = nullptr; |
3135 | 0 | int count = 0; |
3136 | 0 | std::unordered_map<uint32_t, std::string> comparators; |
3137 | 0 | std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders; |
3138 | | |
3139 | | // add default column family |
3140 | 0 | VersionEdit default_cf_edit; |
3141 | 0 | default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); |
3142 | 0 | default_cf_edit.SetColumnFamily(0); |
3143 | 0 | ColumnFamilyData* default_cfd = |
3144 | 0 | CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit); |
3145 | 0 | builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); |
3146 | |
|
3147 | 0 | { |
3148 | 0 | LogReporter reporter; |
3149 | 0 | reporter.status = &s; |
3150 | 0 | log::Reader reader(NULL, std::move(file_reader), &reporter, |
3151 | 0 | true /*checksum*/, 0 /*initial_offset*/, 0); |
3152 | 0 | Slice record; |
3153 | 0 | std::string scratch; |
3154 | 0 | while (reader.ReadRecord(&record, &scratch) && s.ok()) { |
3155 | 0 | VersionEdit edit; |
3156 | 0 | s = edit.DecodeFrom(db_options_->boundary_extractor.get(), record); |
3157 | 0 | if (!s.ok()) { |
3158 | 0 | break; |
3159 | 0 | } |
3160 | | |
3161 | | // Write out each individual edit |
3162 | 0 | if (verbose) { |
3163 | 0 | printf("%s\n", edit.DebugString(hex).c_str()); |
3164 | 0 | } |
3165 | 0 | count++; |
3166 | |
|
3167 | 0 | bool cf_in_builders = |
3168 | 0 | builders.find(edit.column_family_) != builders.end(); |
3169 | |
|
3170 | 0 | if (edit.comparator_) { |
3171 | 0 | comparators.emplace(edit.column_family_, *edit.comparator_); |
3172 | 0 | } |
3173 | |
|
3174 | 0 | ColumnFamilyData* cfd = nullptr; |
3175 | |
|
3176 | 0 | if (edit.column_family_name_) { |
3177 | 0 | if (cf_in_builders) { |
3178 | 0 | s = STATUS(Corruption, |
3179 | 0 | "Manifest adding the same column family twice"); |
3180 | 0 | break; |
3181 | 0 | } |
3182 | 0 | cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); |
3183 | 0 | builders.insert( |
3184 | 0 | {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); |
3185 | 0 | } else if (edit.is_column_family_drop_) { |
3186 | 0 | if (!cf_in_builders) { |
3187 | 0 | s = STATUS(Corruption, |
3188 | 0 | "Manifest - dropping non-existing column family"); |
3189 | 0 | break; |
3190 | 0 | } |
3191 | 0 | auto builder_iter = builders.find(edit.column_family_); |
3192 | 0 | delete builder_iter->second; |
3193 | 0 | builders.erase(builder_iter); |
3194 | 0 | comparators.erase(edit.column_family_); |
3195 | 0 | cfd = column_family_set_->GetColumnFamily(edit.column_family_); |
3196 | 0 | assert(cfd != nullptr); |
3197 | 0 | cfd->Unref(); |
3198 | 0 | delete cfd; |
3199 | 0 | cfd = nullptr; |
3200 | 0 | } else { |
3201 | 0 | if (!cf_in_builders) { |
3202 | 0 | s = STATUS(Corruption, |
3203 | 0 | "Manifest record referencing unknown column family"); |
3204 | 0 | break; |
3205 | 0 | } |
3206 | | |
3207 | 0 | cfd = column_family_set_->GetColumnFamily(edit.column_family_); |
3208 | | // this should never happen since cf_in_builders is true |
3209 | 0 | assert(cfd != nullptr); |
3210 | | |
3211 | | // if it is not column family add or column family drop, |
3212 | | // then it's a file add/delete, which should be forwarded |
3213 | | // to builder |
3214 | 0 | auto builder = builders.find(edit.column_family_); |
3215 | 0 | assert(builder != builders.end()); |
3216 | 0 | builder->second->version_builder()->Apply(&edit); |
3217 | 0 | } |
3218 | | |
3219 | 0 | if (cfd != nullptr && edit.log_number_) { |
3220 | 0 | cfd->SetLogNumber(*edit.log_number_); |
3221 | 0 | } |
3222 | |
|
3223 | 0 | if (edit.prev_log_number_) { |
3224 | 0 | previous_log_number = *edit.prev_log_number_; |
3225 | 0 | have_prev_log_number = true; |
3226 | 0 | } |
3227 | |
|
3228 | 0 | if (edit.next_file_number_) { |
3229 | 0 | next_file = *edit.next_file_number_; |
3230 | 0 | have_next_file = true; |
3231 | 0 | } |
3232 | |
|
3233 | 0 | if (edit.last_sequence_) { |
3234 | 0 | last_sequence = *edit.last_sequence_; |
3235 | 0 | have_last_sequence = true; |
3236 | 0 | } |
3237 | |
|
3238 | 0 | if (edit.flushed_frontier_) { |
3239 | 0 | flushed_frontier = edit.flushed_frontier_.get(); |
3240 | 0 | } |
3241 | |
|
3242 | 0 | if (edit.max_column_family_) { |
3243 | 0 | column_family_set_->UpdateMaxColumnFamily(*edit.max_column_family_); |
3244 | 0 | } |
3245 | 0 | } |
3246 | 0 | } |
3247 | 0 | file_reader.reset(); |
3248 | |
|
3249 | 0 | if (s.ok()) { |
3250 | 0 | if (!have_next_file) { |
3251 | 0 | s = STATUS(Corruption, "no meta-nextfile entry in descriptor"); |
3252 | 0 | printf("no meta-nextfile entry in descriptor"); |
3253 | 0 | } else if (!have_last_sequence) { |
3254 | 0 | printf("no last-sequence-number entry in descriptor"); |
3255 | 0 | s = STATUS(Corruption, "no last-sequence-number entry in descriptor"); |
3256 | 0 | } |
3257 | |
|
3258 | 0 | if (!have_prev_log_number) { |
3259 | 0 | previous_log_number = 0; |
3260 | 0 | } |
3261 | 0 | } |
3262 | |
|
3263 | 0 | if (s.ok()) { |
3264 | 0 | for (auto cfd : *column_family_set_) { |
3265 | 0 | if (cfd->IsDropped()) { |
3266 | 0 | continue; |
3267 | 0 | } |
3268 | 0 | auto builders_iter = builders.find(cfd->GetID()); |
3269 | 0 | assert(builders_iter != builders.end()); |
3270 | 0 | auto builder = builders_iter->second->version_builder(); |
3271 | |
|
3272 | 0 | Version* v = new Version(cfd, this, current_version_number_++); |
3273 | 0 | builder->SaveTo(v->storage_info()); |
3274 | 0 | v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false); |
3275 | |
|
3276 | 0 | printf("--------------- Column family \"%s\" (ID %u) --------------\n", |
3277 | 0 | cfd->GetName().c_str(), (unsigned int)cfd->GetID()); |
3278 | 0 | printf("log number: %" PRIu64 "\n", cfd->GetLogNumber()); |
3279 | 0 | auto comparator = comparators.find(cfd->GetID()); |
3280 | 0 | if (comparator != comparators.end()) { |
3281 | 0 | printf("comparator: %s\n", comparator->second.c_str()); |
3282 | 0 | } else { |
3283 | 0 | printf("comparator: <NO COMPARATOR>\n"); |
3284 | 0 | } |
3285 | 0 | printf("%s \n", v->DebugString(hex).c_str()); |
3286 | 0 | delete v; |
3287 | 0 | } |
3288 | | |
3289 | | // Free builders |
3290 | 0 | for (auto& builder : builders) { |
3291 | 0 | delete builder.second; |
3292 | 0 | } |
3293 | |
|
3294 | 0 | next_file_number_.store(next_file + 1); |
3295 | 0 | SetLastSequenceNoSanityChecking(last_sequence); |
3296 | 0 | if (flushed_frontier) { |
3297 | 0 | DCHECK_EQ(*flushed_frontier, *FlushedFrontier()); |
3298 | 0 | } |
3299 | 0 | prev_log_number_ = previous_log_number; |
3300 | |
|
3301 | 0 | printf( |
3302 | 0 | "next_file_number %" PRIu64 " last_sequence " |
3303 | 0 | "%" PRIu64 " prev_log_number %" PRIu64 " max_column_family %u flushed_values %s\n", |
3304 | 0 | next_file_number_.load(), last_sequence, previous_log_number, |
3305 | 0 | column_family_set_->GetMaxColumnFamily(), |
3306 | 0 | yb::ToString(flushed_frontier).c_str()); |
3307 | 0 | } |
3308 | |
|
3309 | 0 | return s; |
3310 | 0 | } |
3311 | | #endif // ROCKSDB_LITE |
3312 | | |
3313 | | // Set the last sequence number to s. |
3314 | 31.3M | void VersionSet::SetLastSequence(SequenceNumber s) { |
3315 | 31.3M | #ifndef NDEBUG |
3316 | 31.3M | EnsureNonDecreasingLastSequence(LastSequence(), s); |
3317 | 31.3M | #endif |
3318 | 31.3M | SetLastSequenceNoSanityChecking(s); |
3319 | 31.3M | } |
3320 | | |
3321 | | // Set last sequence number without verifying that it always keeps increasing. |
3322 | 31.7M | void VersionSet::SetLastSequenceNoSanityChecking(SequenceNumber s) { |
3323 | 31.7M | last_sequence_.store(s, std::memory_order_release); |
3324 | 31.7M | } |
3325 | | |
3326 | | // Set the last flushed op id / hybrid time / history cutoff to the specified set of values. |
3327 | 271k | void VersionSet::UpdateFlushedFrontier(UserFrontierPtr values) { |
3328 | 271k | EnsureNonDecreasingFlushedFrontier(FlushedFrontier(), *values); |
3329 | 271k | UpdateFlushedFrontierNoSanityChecking(std::move(values)); |
3330 | 271k | } |
3331 | | |
3332 | 892k | void VersionSet::MarkFileNumberUsedDuringRecovery(uint64_t number) { |
3333 | | // only called during recovery which is single threaded, so this works because |
3334 | | // there can't be concurrent calls |
3335 | 892k | if (next_file_number_.load(std::memory_order_relaxed) <= number) { |
3336 | 5.04k | next_file_number_.store(number + 1, std::memory_order_relaxed); |
3337 | 5.04k | } |
3338 | 892k | } |
3339 | | |
3340 | | namespace { |
3341 | | |
3342 | 562k | CHECKED_STATUS AddEdit(const VersionEdit& edit, const DBOptions* db_options, log::Writer* log) { |
3343 | 562k | std::string record; |
3344 | 562k | if (!edit.AppendEncodedTo(&record)) { |
3345 | 0 | return STATUS(Corruption, |
3346 | 0 | "Unable to Encode VersionEdit:" + edit.DebugString(true)); |
3347 | 0 | } |
3348 | 562k | RLOG(InfoLogLevel::INFO_LEVEL, db_options->info_log, |
3349 | 562k | "Writing version edit: %s\n", edit.DebugString().c_str()); |
3350 | 562k | return log->AddRecord(record); |
3351 | 562k | } |
3352 | | |
3353 | | } // namespace |
3354 | | |
3355 | 277k | Status VersionSet::WriteSnapshot(log::Writer* log, UserFrontierPtr flushed_frontier_override) { |
3356 | | // TODO: Break up into multiple records to reduce memory usage on recovery? |
3357 | | |
3358 | | // WARNING: This method doesn't hold a mutex! |
3359 | | |
3360 | | // This is done without DB mutex lock held, but only within single-threaded |
3361 | | // LogAndApply. Column family manipulations can only happen within LogAndApply |
3362 | | // (the same single thread), so we're safe to iterate. |
3363 | 281k | for (auto cfd : *column_family_set_) { |
3364 | 281k | if (cfd->IsDropped()) { |
3365 | 0 | continue; |
3366 | 0 | } |
3367 | 281k | { |
3368 | | // Store column family info |
3369 | 281k | VersionEdit edit; |
3370 | 281k | if (cfd->GetID() != 0) { |
3371 | | // default column family is always there, |
3372 | | // no need to explicitly write it |
3373 | 3.54k | edit.AddColumnFamily(cfd->GetName()); |
3374 | 3.54k | edit.SetColumnFamily(cfd->GetID()); |
3375 | 3.54k | } |
3376 | 281k | edit.SetComparatorName( |
3377 | 281k | cfd->internal_comparator()->user_comparator()->Name()); |
3378 | 281k | RETURN_NOT_OK(AddEdit(edit, db_options_, log)); |
3379 | 281k | } |
3380 | | |
3381 | 281k | { |
3382 | | // Save files |
3383 | 281k | VersionEdit edit; |
3384 | 281k | edit.SetColumnFamily(cfd->GetID()); |
3385 | | |
3386 | 631k | for (int level = 0; level < cfd->NumberLevels(); level++350k ) { |
3387 | 350k | for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) { |
3388 | 28.9k | edit.AddCleanedFile(level, *f); |
3389 | 28.9k | } |
3390 | 350k | } |
3391 | 281k | edit.SetLogNumber(cfd->GetLogNumber()); |
3392 | 281k | if (flushed_frontier_override) { |
3393 | 1.80k | edit.flushed_frontier_ = flushed_frontier_override; |
3394 | 279k | } else { |
3395 | 279k | edit.flushed_frontier_ = flushed_frontier_; |
3396 | 279k | } |
3397 | 281k | RETURN_NOT_OK(AddEdit(edit, db_options_, log)); |
3398 | 281k | } |
3399 | 281k | } |
3400 | | |
3401 | 277k | return Status::OK(); |
3402 | 277k | } |
3403 | | |
3404 | | // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this |
3405 | | // function is called repeatedly with consecutive pairs of slices. For example |
3406 | | // if the slice list is [a, b, c, d] this function is called with arguments |
3407 | | // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where |
3408 | | // we avoid doing binary search for the keys b and c twice and instead somehow |
3409 | | // maintain state of where they first appear in the files. |
3410 | | uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start, |
3411 | | const Slice& end, int start_level, |
3412 | 14.1k | int end_level) { |
3413 | | // pre-condition |
3414 | 14.1k | assert(v->cfd_->internal_comparator()->Compare(start, end) <= 0); |
3415 | | |
3416 | 0 | uint64_t size = 0; |
3417 | 14.1k | const auto* vstorage = v->storage_info(); |
3418 | 14.1k | end_level = end_level == -1 |
3419 | 14.1k | ? vstorage->num_non_empty_levels()14.0k |
3420 | 14.1k | : std::min(end_level, vstorage->num_non_empty_levels())129 ; |
3421 | | |
3422 | 14.1k | assert(start_level <= end_level); |
3423 | | |
3424 | 42.2k | for (int level = start_level; level < end_level; level++28.0k ) { |
3425 | 28.0k | const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level); |
3426 | 28.0k | if (!files_brief.num_files) { |
3427 | | // empty level, skip exploration |
3428 | 13.4k | continue; |
3429 | 13.4k | } |
3430 | | |
3431 | 14.5k | if (!level) { |
3432 | | // level 0 data is sorted order, handle the use case explicitly |
3433 | 1.04k | size += ApproximateSizeLevel0(v, files_brief, start, end); |
3434 | 1.04k | continue; |
3435 | 1.04k | } |
3436 | | |
3437 | 13.5k | assert(level > 0); |
3438 | 0 | assert(files_brief.num_files > 0); |
3439 | | |
3440 | | // identify the file position for starting key |
3441 | 0 | const uint64_t idx_start = FindFileInRange( |
3442 | 13.5k | *v->cfd_->internal_comparator(), files_brief, start, |
3443 | 13.5k | /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1)); |
3444 | 13.5k | assert(idx_start < files_brief.num_files); |
3445 | | |
3446 | | // scan all files from the starting position until the ending position |
3447 | | // inferred from the sorted order |
3448 | 27.1k | for (uint64_t i = idx_start; i < files_brief.num_files; i++13.6k ) { |
3449 | 14.5k | uint64_t val; |
3450 | 14.5k | val = ApproximateSize(v, files_brief.files[i], end); |
3451 | 14.5k | if (!val) { |
3452 | | // the files after this will not have the range |
3453 | 814 | break; |
3454 | 814 | } |
3455 | | |
3456 | 13.6k | size += val; |
3457 | | |
3458 | 13.6k | if (i == idx_start) { |
3459 | | // subtract the bytes needed to be scanned to get to the starting |
3460 | | // key |
3461 | 12.7k | val = ApproximateSize(v, files_brief.files[i], start); |
3462 | 12.7k | assert(size >= val); |
3463 | 0 | size -= val; |
3464 | 12.7k | } |
3465 | 13.6k | } |
3466 | 13.5k | } |
3467 | | |
3468 | 14.1k | return size; |
3469 | 14.1k | } |
3470 | | |
3471 | | uint64_t VersionSet::ApproximateSizeLevel0(Version* v, |
3472 | | const LevelFilesBrief& files_brief, |
3473 | | const Slice& key_start, |
3474 | 1.04k | const Slice& key_end) { |
3475 | | // level 0 files are not in sorted order, we need to iterate through |
3476 | | // the list to compute the total bytes that require scanning |
3477 | 1.04k | uint64_t size = 0; |
3478 | 2.34k | for (size_t i = 0; i < files_brief.num_files; i++1.29k ) { |
3479 | 1.29k | const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start); |
3480 | 1.29k | const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end); |
3481 | 1.29k | assert(end >= start); |
3482 | 0 | size += end - start; |
3483 | 1.29k | } |
3484 | 1.04k | return size; |
3485 | 1.04k | } |
3486 | | |
3487 | 29.8k | uint64_t VersionSet::ApproximateSize(Version* v, const FdWithBoundaries& f, const Slice& key) { |
3488 | | // pre-condition |
3489 | 29.8k | assert(v); |
3490 | | |
3491 | 0 | uint64_t result = 0; |
3492 | 29.8k | if (v->cfd_->internal_comparator()->Compare(f.largest.key, key) <= 0) { |
3493 | | // Entire file is before "key", so just add the file size |
3494 | 2.05k | result = f.fd.GetTotalFileSize(); |
3495 | 27.7k | } else if (v->cfd_->internal_comparator()->Compare(f.smallest.key, key) > 0) { |
3496 | | // Entire file is after "key", so ignore |
3497 | 10.8k | result = 0; |
3498 | 16.8k | } else { |
3499 | | // "key" falls in the range for this table. Add the |
3500 | | // approximate offset of "key" within the table. |
3501 | 16.8k | TableReader* table_reader_ptr; |
3502 | 16.8k | InternalIterator* iter = v->cfd_->table_cache()->NewIterator( |
3503 | 16.8k | ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd, Slice() /* filter */, |
3504 | 16.8k | &table_reader_ptr); |
3505 | 16.8k | if (table_reader_ptr != nullptr) { |
3506 | 16.8k | result = table_reader_ptr->ApproximateOffsetOf(key); |
3507 | 16.8k | } |
3508 | 16.8k | delete iter; |
3509 | 16.8k | } |
3510 | 29.8k | return result; |
3511 | 29.8k | } |
3512 | | |
3513 | 1.85M | void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) { |
3514 | | // pre-calculate space requirement |
3515 | 1.85M | int64_t total_files = 0; |
3516 | 2.15M | for (auto cfd : *column_family_set_) { |
3517 | 2.15M | Version* dummy_versions = cfd->dummy_versions(); |
3518 | 4.31M | for (Version* v = dummy_versions->next_; v != dummy_versions; |
3519 | 2.16M | v = v->next_) { |
3520 | 2.16M | const auto* vstorage = v->storage_info(); |
3521 | 7.28M | for (int level = 0; level < vstorage->num_levels(); level++5.12M ) { |
3522 | 5.12M | total_files += vstorage->LevelFiles(level).size(); |
3523 | 5.12M | } |
3524 | 2.16M | } |
3525 | 2.15M | } |
3526 | | |
3527 | | // just one time extension to the right size |
3528 | 1.85M | live_list->reserve(live_list->size() + static_cast<size_t>(total_files)); |
3529 | | |
3530 | 2.15M | for (auto cfd : *column_family_set_) { |
3531 | 2.15M | auto* current = cfd->current(); |
3532 | 2.15M | bool found_current = false; |
3533 | 2.15M | Version* dummy_versions = cfd->dummy_versions(); |
3534 | 4.31M | for (Version* v = dummy_versions->next_; v != dummy_versions; |
3535 | 2.16M | v = v->next_) { |
3536 | 2.16M | v->AddLiveFiles(live_list); |
3537 | 2.16M | if (v == current) { |
3538 | 2.15M | found_current = true; |
3539 | 2.15M | } |
3540 | 2.16M | } |
3541 | 2.15M | if (!found_current && current != nullptr0 ) { |
3542 | | // Should never happen unless it is a bug. |
3543 | 0 | assert(false); |
3544 | 0 | current->AddLiveFiles(live_list); |
3545 | 0 | } |
3546 | 2.15M | } |
3547 | 1.85M | } |
3548 | | |
3549 | 11.4k | InternalIterator* VersionSet::MakeInputIterator(Compaction* c) { |
3550 | 11.4k | auto cfd = c->column_family_data(); |
3551 | 11.4k | ReadOptions read_options; |
3552 | 11.4k | read_options.verify_checksums = |
3553 | 11.4k | c->mutable_cf_options()->verify_checksums_in_compaction; |
3554 | 11.4k | read_options.fill_cache = false; |
3555 | 11.4k | if (c->ShouldFormSubcompactions()) { |
3556 | 1.07k | read_options.total_order_seek = true; |
3557 | 1.07k | } |
3558 | | |
3559 | | // Level-0 files have to be merged together. For other levels, |
3560 | | // we will make a concatenating iterator per level. |
3561 | | // TODO(opt): use concatenating iterator for level-0 if there is no overlap |
3562 | 11.4k | const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files + |
3563 | 9.33k | c->num_input_levels() - 1 |
3564 | 11.4k | : c->num_input_levels()2.09k ); |
3565 | 11.4k | InternalIterator** list = new InternalIterator* [space]; |
3566 | 11.4k | size_t num = 0; |
3567 | 43.2k | for (size_t which = 0; which < c->num_input_levels(); which++31.8k ) { |
3568 | 31.8k | if (c->input_levels(which)->num_files != 0) { |
3569 | 19.0k | if (c->level(which) == 0) { |
3570 | 9.33k | const LevelFilesBrief* flevel = c->input_levels(which); |
3571 | 37.8k | for (size_t i = 0; i < flevel->num_files; i++28.4k ) { |
3572 | 28.4k | FileMetaData* fmd = c->input(which, i); |
3573 | 28.4k | if (c->input(which, i)->delete_after_compaction()) { |
3574 | 23 | RLOG( |
3575 | 23 | InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
3576 | 23 | yb::Format( |
3577 | 23 | "[$0] File marked for deletion, will be removed after compaction. file: $1", |
3578 | 23 | c->column_family_data()->GetName(), fmd->ToString()).c_str()); |
3579 | 23 | RecordTick(cfd->ioptions()->statistics, COMPACTION_FILES_FILTERED); |
3580 | 23 | continue; |
3581 | 23 | } |
3582 | 28.4k | RecordTick(cfd->ioptions()->statistics, COMPACTION_FILES_NOT_FILTERED); |
3583 | 28.4k | list[num++] = cfd->table_cache()->NewIterator( |
3584 | 28.4k | read_options, env_options_compactions_, |
3585 | 28.4k | cfd->internal_comparator(), flevel->files[i].fd, flevel->files[i].user_filter_data, |
3586 | 28.4k | nullptr, nullptr /* no per level latency histogram*/, |
3587 | 28.4k | true /* for compaction */); |
3588 | 28.4k | } |
3589 | 9.70k | } else { |
3590 | | // Create concatenating iterator for the files from this level |
3591 | 9.70k | list[num++] = NewTwoLevelIterator( |
3592 | 9.70k | new LevelFileIteratorState( |
3593 | 9.70k | cfd->table_cache(), read_options, env_options_, |
3594 | 9.70k | cfd->internal_comparator(), |
3595 | 9.70k | nullptr /* no per level latency histogram */, |
3596 | 9.70k | true /* for_compaction */, false /* prefix enabled */, |
3597 | 9.70k | false /* skip_filters */), |
3598 | 9.70k | new LevelFileNumIterator(*cfd->internal_comparator(), |
3599 | 9.70k | c->input_levels(which))); |
3600 | 9.70k | } |
3601 | 19.0k | } |
3602 | 31.8k | } |
3603 | 11.4k | assert(num <= space); |
3604 | 0 | InternalIterator* result = |
3605 | 11.4k | NewMergingIterator(c->column_family_data()->internal_comparator().get(), list, |
3606 | 11.4k | static_cast<int>(num)); |
3607 | 11.4k | delete[] list; |
3608 | 11.4k | return result; |
3609 | 11.4k | } |
3610 | | |
3611 | | // verify that the files listed in this compaction are present |
3612 | | // in the current version |
3613 | 11.2k | bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { |
3614 | 11.2k | #ifndef NDEBUG |
3615 | 11.2k | Version* version = c->column_family_data()->current(); |
3616 | 11.2k | const VersionStorageInfo* vstorage = version->storage_info(); |
3617 | 11.2k | if (c->input_version_number() != version->GetVersionNumber()) { |
3618 | 4.35k | RLOG( |
3619 | 4.35k | InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
3620 | 4.35k | yb::Format( |
3621 | 4.35k | "[$0] compaction output being applied to a different base version ($1) from input " |
3622 | 4.35k | "version ($2)", |
3623 | 4.35k | c->column_family_data()->GetName(), version->GetVersionNumber(), |
3624 | 4.35k | c->input_version_number()) |
3625 | 4.35k | .c_str()); |
3626 | | |
3627 | 4.35k | if (vstorage->compaction_style_ == kCompactionStyleLevel && |
3628 | 4.35k | c->start_level() == 01.94k && c->num_input_levels() > 2U1.42k ) { |
3629 | | // We are doing a L0->base_level compaction. The assumption is if |
3630 | | // base level is not L1, levels from L1 to base_level - 1 is empty. |
3631 | | // This is ensured by having one compaction from L0 going on at the |
3632 | | // same time in level-based compaction. So that during the time, no |
3633 | | // compaction/flush can put files to those levels. |
3634 | 0 | for (int l = c->start_level() + 1; l < c->output_level(); l++) { |
3635 | 0 | if (vstorage->NumLevelFiles(l) != 0) { |
3636 | 0 | return false; |
3637 | 0 | } |
3638 | 0 | } |
3639 | 0 | } |
3640 | 4.35k | } |
3641 | | |
3642 | 42.8k | for (size_t input = 0; 11.2k input < c->num_input_levels(); ++input31.5k ) { |
3643 | 31.5k | int level = c->level(input); |
3644 | 79.1k | for (size_t i = 0; i < c->num_input_files(input); ++i47.6k ) { |
3645 | 47.6k | const auto& fd = c->input(input, i)->fd; |
3646 | 47.6k | uint64_t number = fd.GetNumber(); |
3647 | 47.6k | bool found = false; |
3648 | 268k | for (size_t j = 0; j < vstorage->files_[level].size(); j++220k ) { |
3649 | 268k | FileMetaData* f = vstorage->files_[level][j]; |
3650 | 268k | if (f->fd.GetNumber() == number) { |
3651 | 47.6k | found = true; |
3652 | 47.6k | break; |
3653 | 47.6k | } |
3654 | 268k | } |
3655 | 47.6k | if (!found) { |
3656 | 0 | RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log, |
3657 | 0 | yb::Format("[$0] compaction input file $1 not found in current version", |
3658 | 0 | c->column_family_data()->GetName(), fd).c_str()); |
3659 | 0 | return false; // input files non existent in current version |
3660 | 0 | } |
3661 | 47.6k | } |
3662 | 31.5k | } |
3663 | 11.2k | #endif |
3664 | 11.2k | return true; // everything good |
3665 | 11.2k | } |
3666 | | |
3667 | | Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, |
3668 | | FileMetaData** meta, |
3669 | 295 | ColumnFamilyData** cfd) { |
3670 | 297 | for (auto cfd_iter : *column_family_set_) { |
3671 | 297 | Version* version = cfd_iter->current(); |
3672 | 297 | const auto* vstorage = version->storage_info(); |
3673 | 519 | for (int level = 0; level < vstorage->num_levels(); level++222 ) { |
3674 | 1.03k | for (const auto& file : vstorage->LevelFiles(level)) { |
3675 | 1.03k | if (file->fd.GetNumber() == number) { |
3676 | 294 | *meta = file; |
3677 | 294 | *filelevel = level; |
3678 | 294 | *cfd = cfd_iter; |
3679 | 294 | return Status::OK(); |
3680 | 294 | } |
3681 | 1.03k | } |
3682 | 516 | } |
3683 | 297 | } |
3684 | 1 | return STATUS(NotFound, "File not present in any level"); |
3685 | 295 | } |
3686 | | |
3687 | 21.1M | void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) { |
3688 | 21.1M | for (auto cfd : *column_family_set_) { |
3689 | 21.1M | if (cfd->IsDropped()) { |
3690 | 0 | continue; |
3691 | 0 | } |
3692 | 42.4M | for (int level = 0; 21.1M level < cfd->NumberLevels(); level++21.3M ) { |
3693 | 21.3M | for (const auto& file : |
3694 | 21.3M | cfd->current()->storage_info()->LevelFiles(level)) { |
3695 | 384k | LiveFileMetaData filemetadata; |
3696 | 384k | filemetadata.column_family_name = cfd->GetName(); |
3697 | 384k | uint32_t path_id = file->fd.GetPathId(); |
3698 | 384k | if (path_id < db_options_->db_paths.size()384k ) { |
3699 | 384k | filemetadata.db_path = db_options_->db_paths[path_id].path; |
3700 | 18.4E | } else { |
3701 | 18.4E | assert(!db_options_->db_paths.empty()); |
3702 | 0 | filemetadata.db_path = db_options_->db_paths.back().path; |
3703 | 18.4E | } |
3704 | 0 | filemetadata.name = MakeTableFileName("", file->fd.GetNumber()); |
3705 | 384k | filemetadata.level = level; |
3706 | 384k | filemetadata.total_size = file->fd.GetTotalFileSize(); |
3707 | 384k | filemetadata.base_size = file->fd.GetBaseFileSize(); |
3708 | | // TODO: replace base_size with an accurate metadata size for |
3709 | | // uncompressed data. Look into: BlockBasedTableBuilder |
3710 | 384k | filemetadata.uncompressed_size = filemetadata.base_size + |
3711 | 384k | file->raw_key_size + file->raw_value_size; |
3712 | 384k | filemetadata.smallest = ConvertBoundaryValues(file->smallest); |
3713 | 384k | filemetadata.largest = ConvertBoundaryValues(file->largest); |
3714 | 384k | filemetadata.imported = file->imported; |
3715 | 384k | metadata->push_back(filemetadata); |
3716 | 384k | } |
3717 | 21.3M | } |
3718 | 21.1M | } |
3719 | 21.1M | } |
3720 | | |
3721 | | void VersionSet::GetObsoleteFiles(const FileNumbersProvider& pending_outputs, |
3722 | | std::vector<FileMetaData*>* files, |
3723 | 1.85M | std::vector<std::string>* manifest_filenames) { |
3724 | 1.85M | assert(manifest_filenames->empty()); |
3725 | 0 | obsolete_manifests_.swap(*manifest_filenames); |
3726 | 1.85M | std::vector<FileMetaData*> pending_files; |
3727 | 1.85M | for (auto f : obsolete_files_) { |
3728 | 61.9k | if (!pending_outputs.HasFileNumber(f->fd.GetNumber())) { |
3729 | 61.7k | files->push_back(f); |
3730 | 61.7k | } else { |
3731 | 204 | pending_files.push_back(f); |
3732 | 204 | } |
3733 | 61.9k | } |
3734 | 1.85M | obsolete_files_.swap(pending_files); |
3735 | 1.85M | } |
3736 | | |
3737 | | ColumnFamilyData* VersionSet::CreateColumnFamily( |
3738 | 441k | const ColumnFamilyOptions& cf_options, VersionEdit* edit) { |
3739 | 441k | assert(edit->column_family_name_); |
3740 | | |
3741 | 0 | Version* dummy_versions = new Version(nullptr, this); |
3742 | | // Ref() dummy version once so that later we can call Unref() to delete it |
3743 | | // by avoiding calling "delete" explicitly (~Version is private) |
3744 | 441k | dummy_versions->Ref(); |
3745 | 441k | auto new_cfd = column_family_set_->CreateColumnFamily( |
3746 | 441k | *edit->column_family_name_, |
3747 | 441k | edit->column_family_, |
3748 | 441k | dummy_versions, |
3749 | 441k | cf_options); |
3750 | | |
3751 | 441k | Version* v = new Version(new_cfd, this, current_version_number_++); |
3752 | | |
3753 | | // Fill level target base information. |
3754 | 441k | v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(), |
3755 | 441k | *new_cfd->GetLatestMutableCFOptions()); |
3756 | 441k | AppendVersion(new_cfd, v); |
3757 | | // GetLatestMutableCFOptions() is safe here without mutex since the |
3758 | | // cfd is not available to client |
3759 | 441k | new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(), |
3760 | 441k | LastSequence()); |
3761 | 441k | new_cfd->SetLogNumber(edit->log_number_.get_value_or(0)); |
3762 | 441k | return new_cfd; |
3763 | 441k | } |
3764 | | |
3765 | 983k | void VersionSet::UnrefFile(ColumnFamilyData* cfd, FileMetaData* f) { |
3766 | 983k | if (f->Unref(cfd->table_cache())) { |
3767 | 98.1k | obsolete_files_.push_back(f); |
3768 | 98.1k | } |
3769 | 983k | } |
3770 | | |
3771 | 5 | uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) { |
3772 | 5 | uint64_t count = 0; |
3773 | 14 | for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_9 ) { |
3774 | 9 | count++; |
3775 | 9 | } |
3776 | 5 | return count; |
3777 | 5 | } |
3778 | | |
3779 | 18 | uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) { |
3780 | 18 | std::unordered_set<uint64_t> unique_files; |
3781 | 18 | uint64_t total_files_size = 0; |
3782 | 50 | for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_32 ) { |
3783 | 32 | VersionStorageInfo* storage_info = v->storage_info(); |
3784 | 256 | for (int level = 0; level < storage_info->num_levels_; level++224 ) { |
3785 | 224 | for (const auto& file_meta : storage_info->LevelFiles(level)) { |
3786 | 86 | if (unique_files.find(file_meta->fd.packed_number_and_path_id) == |
3787 | 86 | unique_files.end()) { |
3788 | 66 | unique_files.insert(file_meta->fd.packed_number_and_path_id); |
3789 | 66 | total_files_size += file_meta->fd.GetTotalFileSize(); |
3790 | 66 | } |
3791 | 86 | } |
3792 | 224 | } |
3793 | 32 | } |
3794 | 18 | return total_files_size; |
3795 | 18 | } |
3796 | | |
3797 | | void VersionSet::EnsureNonDecreasingLastSequence( |
3798 | | SequenceNumber prev_last_seq, |
3799 | 31.3M | SequenceNumber new_last_seq) { |
3800 | 31.3M | if (new_last_seq < prev_last_seq) { |
3801 | 0 | LOG(DFATAL) << "New last sequence id " << new_last_seq << " is lower than " |
3802 | 0 | << "the previous last sequence " << prev_last_seq; |
3803 | 0 | } |
3804 | 31.3M | } |
3805 | | |
3806 | | void VersionSet::EnsureNonDecreasingFlushedFrontier( |
3807 | | const UserFrontier* prev_value, |
3808 | 271k | const UserFrontier& new_value) { |
3809 | 271k | if (!prev_value) { |
3810 | 262k | return; |
3811 | 262k | } |
3812 | 9.60k | if (!prev_value->IsUpdateValid(new_value, UpdateUserValueType::kLargest)) { |
3813 | 0 | LOG(DFATAL) << "Attempt to decrease flushed frontier " << prev_value->ToString() << " to " |
3814 | 0 | << new_value.ToString(); |
3815 | 0 | } |
3816 | 9.60k | } |
3817 | | |
3818 | 277k | void VersionSet::UpdateFlushedFrontierNoSanityChecking(UserFrontierPtr value) { |
3819 | 277k | UpdateUserFrontier(&flushed_frontier_, std::move(value), UpdateUserValueType::kLargest); |
3820 | 277k | } |
3821 | | |
3822 | | } // namespace rocksdb |