/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_builder.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/version_builder.h" |
25 | | |
26 | | #ifndef __STDC_FORMAT_MACROS |
27 | | #define __STDC_FORMAT_MACROS |
28 | | #endif |
29 | | |
30 | | #include <inttypes.h> |
31 | | #include <algorithm> |
32 | | #include <atomic> |
33 | | #include <set> |
34 | | #include <thread> |
35 | | #include <unordered_map> |
36 | | #include <unordered_set> |
37 | | #include <utility> |
38 | | #include <vector> |
39 | | |
40 | | #include "yb/rocksdb/db/dbformat.h" |
41 | | #include "yb/rocksdb/db/internal_stats.h" |
42 | | #include "yb/rocksdb/db/table_cache.h" |
43 | | #include "yb/rocksdb/db/version_set.h" |
44 | | #include "yb/rocksdb/table/table_reader.h" |
45 | | |
46 | | #include "yb/util/format.h" |
47 | | #include "yb/util/logging.h" |
48 | | #include "yb/util/status_log.h" |
49 | | |
50 | | namespace rocksdb { |
51 | | |
52 | 1.28M | bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { |
53 | 1.28M | if (a->smallest.seqno != b->smallest.seqno) { |
54 | 1.13M | return a->smallest.seqno > b->smallest.seqno; |
55 | 1.13M | } |
56 | 145k | if (a->largest.seqno != b->largest.seqno) { |
57 | 1.98k | return a->largest.seqno > b->largest.seqno; |
58 | 1.98k | } |
59 | | // Break ties by file number |
60 | 143k | return a->fd.GetNumber() > b->fd.GetNumber(); |
61 | 143k | } |
62 | | |
63 | | namespace { |
64 | | bool BySmallestKey(FileMetaData* a, FileMetaData* b, |
65 | 1.99M | const InternalKeyComparator* cmp) { |
66 | 1.99M | int r = cmp->Compare(a->smallest.key, b->smallest.key); |
67 | 1.99M | if (r != 0) { |
68 | 1.98M | return (r < 0); |
69 | 1.98M | } |
70 | | // Break ties by file number |
71 | 5.85k | return (a->fd.GetNumber() < b->fd.GetNumber()); |
72 | 5.85k | } |
73 | | } // namespace |
74 | | |
75 | | class VersionBuilder::Rep { |
76 | | private: |
77 | | // Helper to sort files_ in v |
78 | | // kLevel0 -- NewestFirstBySeqNo |
79 | | // kLevelNon0 -- BySmallestKey |
80 | | struct FileComparator { |
81 | | enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method; |
82 | | InternalKeyComparatorPtr internal_comparator; |
83 | | |
84 | 2.57M | bool operator()(FileMetaData* f1, FileMetaData* f2) const { |
85 | 2.57M | switch (sort_method) { |
86 | 581k | case kLevel0: |
87 | 581k | return NewestFirstBySeqNo(f1, f2); |
88 | 1.99M | case kLevelNon0: |
89 | 1.99M | return BySmallestKey(f1, f2, internal_comparator.get()); |
90 | 0 | } |
91 | 0 | assert(false); |
92 | 0 | return false; |
93 | 0 | } |
94 | | }; |
95 | | |
96 | | struct LevelState { |
97 | | std::unordered_set<uint64_t> deleted_files; |
98 | | // Map from file number to file meta data. |
99 | | std::unordered_map<uint64_t, FileMetaData*> added_files; |
100 | | }; |
101 | | |
102 | | const EnvOptions& env_options_; |
103 | | Logger* info_log_; |
104 | | TableCache* table_cache_; |
105 | | VersionStorageInfo* base_vstorage_; |
106 | | LevelState* levels_; |
107 | | FileComparator level_zero_cmp_; |
108 | | FileComparator level_nonzero_cmp_; |
109 | | |
110 | | public: |
111 | | Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache, |
112 | | VersionStorageInfo* base_vstorage) |
113 | | : env_options_(env_options), |
114 | | info_log_(info_log), |
115 | | table_cache_(table_cache), |
116 | 653k | base_vstorage_(base_vstorage) { |
117 | 653k | levels_ = new LevelState[base_vstorage_->num_levels()]; |
118 | 653k | level_zero_cmp_.sort_method = FileComparator::kLevel0; |
119 | 653k | level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; |
120 | 653k | level_nonzero_cmp_.internal_comparator = |
121 | 653k | base_vstorage_->InternalComparator(); |
122 | 653k | } |
123 | | |
124 | 653k | ~Rep() { |
125 | 1.72M | for (int level = 0; level < base_vstorage_->num_levels(); level++) { |
126 | 1.07M | const auto& added = levels_[level].added_files; |
127 | 91.4k | for (auto& pair : added) { |
128 | 91.4k | UnrefFile(pair.second); |
129 | 91.4k | } |
130 | 1.07M | } |
131 | | |
132 | 653k | delete[] levels_; |
133 | 653k | } |
134 | | |
135 | 107k | void UnrefFile(FileMetaData* f) { |
136 | 107k | if (f->Unref(table_cache_)) { |
137 | 15.6k | delete f; |
138 | 15.6k | } |
139 | 107k | } |
140 | | |
141 | 2.64M | void CheckConsistency(VersionStorageInfo* vstorage) { |
142 | 2.64M | #ifndef NDEBUG |
143 | | // make sure the files are sorted correctly |
144 | 7.07M | for (int level = 0; level < vstorage->num_levels(); level++) { |
145 | 4.43M | auto& level_files = vstorage->LevelFiles(level); |
146 | 6.78M | for (size_t i = 1; i < level_files.size(); i++) { |
147 | 2.34M | auto f1 = level_files[i - 1]; |
148 | 2.34M | auto f2 = level_files[i]; |
149 | 2.34M | if (level == 0) { |
150 | 477k | assert(level_zero_cmp_(f1, f2)); |
151 | 477k | assert(f1->largest.seqno > f2->largest.seqno || |
152 | | // We can have multiple files with seqno = 0 as a result of |
153 | | // using DB::AddFile() |
154 | 477k | (f1->largest.seqno == 0 && f2->largest.seqno == 0)); |
155 | 1.86M | } else { |
156 | 1.86M | assert(level_nonzero_cmp_(f1, f2)); |
157 | | |
158 | | // Make sure there is no overlap in levels > 0 |
159 | 1.86M | if (vstorage->InternalComparator()->Compare(f1->largest.key, |
160 | 0 | f2->smallest.key) >= 0) { |
161 | 0 | fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", |
162 | 0 | f1->largest.key.DebugString().c_str(), |
163 | 0 | f2->smallest.key.DebugString().c_str()); |
164 | 0 | abort(); |
165 | 0 | } |
166 | 1.86M | } |
167 | 2.34M | } |
168 | 4.43M | } |
169 | 2.64M | #endif |
170 | 2.64M | } |
171 | | |
172 | | void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, |
173 | 76.3k | int level) { |
174 | 76.3k | #ifndef NDEBUG |
175 | | // a file to be deleted better exist in the previous version |
176 | 76.3k | bool found = false; |
177 | 312k | for (int l = 0; !found && l < base_vstorage_->num_levels(); l++) { |
178 | 236k | const std::vector<FileMetaData*>& base_files = |
179 | 236k | base_vstorage_->LevelFiles(l); |
180 | 604k | for (size_t i = 0; i < base_files.size(); i++) { |
181 | 429k | FileMetaData* f = base_files[i]; |
182 | 429k | if (f->fd.GetNumber() == number) { |
183 | 60.7k | found = true; |
184 | 60.7k | break; |
185 | 60.7k | } |
186 | 429k | } |
187 | 236k | } |
188 | | // if the file did not exist in the previous version, then it |
189 | | // is possibly moved from lower level to higher level in current |
190 | | // version |
191 | 134k | for (int l = level + 1; !found && l < base_vstorage_->num_levels(); l++) { |
192 | 57.8k | auto& level_added = levels_[l].added_files; |
193 | 57.8k | auto got = level_added.find(number); |
194 | 57.8k | if (got != level_added.end()) { |
195 | 0 | found = true; |
196 | 0 | break; |
197 | 0 | } |
198 | 57.8k | } |
199 | | |
200 | | // maybe this file was added in a previous edit that was Applied |
201 | 76.3k | if (!found) { |
202 | 15.6k | auto& level_added = levels_[level].added_files; |
203 | 15.6k | auto got = level_added.find(number); |
204 | 15.6k | if (got != level_added.end()) { |
205 | 15.6k | found = true; |
206 | 15.6k | } |
207 | 15.6k | } |
208 | 1 | LOG_IF(DFATAL, !found) << yb::Format( |
209 | 1 | "$0SST file not found: $1", info_log_ ? info_log_->Prefix() : "", number); |
210 | 76.3k | #endif |
211 | 76.3k | } |
212 | | |
213 | | // Apply all of the edits in *edit to the current state. |
214 | 680k | void Apply(VersionEdit* edit) { |
215 | 680k | CheckConsistency(base_vstorage_); |
216 | | |
217 | | // Delete files |
218 | 680k | const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles(); |
219 | 76.3k | for (const auto& del_file : del) { |
220 | 76.3k | const auto level = del_file.first; |
221 | 76.3k | const auto number = del_file.second; |
222 | 76.3k | levels_[level].deleted_files.insert(number); |
223 | 76.3k | CheckConsistencyForDeletes(edit, number, level); |
224 | | |
225 | 76.3k | auto exising = levels_[level].added_files.find(number); |
226 | 76.3k | if (exising != levels_[level].added_files.end()) { |
227 | 15.6k | UnrefFile(exising->second); |
228 | 15.6k | levels_[level].added_files.erase(number); |
229 | 15.6k | } |
230 | 76.3k | } |
231 | | |
232 | | // Add new files |
233 | 107k | for (const auto& new_file : edit->GetNewFiles()) { |
234 | 107k | const int level = new_file.first; |
235 | 107k | FileMetaData* f = new FileMetaData(new_file.second); |
236 | 107k | f->refs = 1; |
237 | | |
238 | 107k | assert(levels_[level].added_files.find(f->fd.GetNumber()) == |
239 | 107k | levels_[level].added_files.end()); |
240 | 107k | levels_[level].deleted_files.erase(f->fd.GetNumber()); |
241 | 107k | levels_[level].added_files[f->fd.GetNumber()] = f; |
242 | 107k | } |
243 | 680k | } |
244 | | |
245 | | // Save the current state in *v. |
246 | 653k | void SaveTo(VersionStorageInfo* vstorage) { |
247 | 653k | CheckConsistency(base_vstorage_); |
248 | 653k | CheckConsistency(vstorage); |
249 | | |
250 | 1.72M | for (int level = 0; level < base_vstorage_->num_levels(); level++) { |
251 | 653k | const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; |
252 | | // Merge the set of added files with the set of pre-existing files. |
253 | | // Drop any deleted files. Store the result in *v. |
254 | 1.07M | const auto& base_files = base_vstorage_->LevelFiles(level); |
255 | 1.07M | auto base_iter = base_files.begin(); |
256 | 1.07M | auto base_end = base_files.end(); |
257 | 1.07M | const auto& unordered_added_files = levels_[level].added_files; |
258 | 1.07M | vstorage->Reserve(level, |
259 | 1.07M | base_files.size() + unordered_added_files.size()); |
260 | | |
261 | | // Sort added files for the level. |
262 | 1.07M | std::vector<FileMetaData*> added_files; |
263 | 1.07M | added_files.reserve(unordered_added_files.size()); |
264 | 91.3k | for (const auto& pair : unordered_added_files) { |
265 | 91.3k | added_files.push_back(pair.second); |
266 | 91.3k | } |
267 | 1.07M | std::sort(added_files.begin(), added_files.end(), cmp); |
268 | | |
269 | 1.07M | #ifndef NDEBUG |
270 | 1.07M | FileMetaData* prev_file = nullptr; |
271 | 1.07M | #endif |
272 | | |
273 | 91.4k | for (const auto& added : added_files) { |
274 | 91.4k | #ifndef NDEBUG |
275 | 91.4k | if (level > 0 && prev_file != nullptr) { |
276 | 14.2k | assert(base_vstorage_->InternalComparator()->Compare( |
277 | 14.2k | prev_file->smallest.key, added->smallest.key) <= 0); |
278 | 14.2k | } |
279 | 91.4k | prev_file = added; |
280 | 91.4k | #endif |
281 | | |
282 | | // Add all smaller files listed in base_ |
283 | 91.4k | for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp); |
284 | 186k | base_iter != bpos; ++base_iter) { |
285 | 95.4k | MaybeAddFile(vstorage, level, *base_iter); |
286 | 95.4k | } |
287 | | |
288 | 91.4k | MaybeAddFile(vstorage, level, added); |
289 | 91.4k | } |
290 | | |
291 | | // Add remaining base files |
292 | 1.89M | for (; base_iter != base_end; ++base_iter) { |
293 | 822k | MaybeAddFile(vstorage, level, *base_iter); |
294 | 822k | } |
295 | 1.07M | } |
296 | | |
297 | 653k | CheckConsistency(vstorage); |
298 | 653k | } |
299 | | |
300 | 1.23k | void LoadTableHandlers(InternalStats* internal_stats, int max_threads) { |
301 | 1.23k | assert(table_cache_ != nullptr); |
302 | | // <file metadata, level> |
303 | 1.23k | std::vector<std::pair<FileMetaData*, int>> files_meta; |
304 | 9.80k | for (int level = 0; level < base_vstorage_->num_levels(); level++) { |
305 | 1.19k | for (auto& file_meta_pair : levels_[level].added_files) { |
306 | 1.19k | auto* file_meta = file_meta_pair.second; |
307 | 1.19k | assert(!file_meta->table_reader_handle); |
308 | 1.19k | files_meta.emplace_back(file_meta, level); |
309 | 1.19k | } |
310 | 8.57k | } |
311 | | |
312 | 1.23k | std::atomic<size_t> next_file_meta_idx(0); |
313 | 1.24k | std::function<void()> load_handlers_func = [&]() { |
314 | 2.43k | while (true) { |
315 | 2.43k | size_t file_idx = next_file_meta_idx.fetch_add(1); |
316 | 2.43k | if (file_idx >= files_meta.size()) { |
317 | 1.24k | break; |
318 | 1.24k | } |
319 | | |
320 | 1.19k | auto* file_meta = files_meta[file_idx].first; |
321 | 1.19k | int level = files_meta[file_idx].second; |
322 | 1.19k | CHECK_OK(table_cache_->FindTable(env_options_, |
323 | 1.19k | base_vstorage_->InternalComparator(), |
324 | 1.19k | file_meta->fd, &file_meta->table_reader_handle, |
325 | 1.19k | kDefaultQueryId, |
326 | 1.19k | false /*no_io */, true /* record_read_stats */, |
327 | 1.19k | internal_stats->GetFileReadHist(level))); |
328 | 1.19k | if (file_meta->table_reader_handle != nullptr) { |
329 | | // Load table_reader |
330 | 1.19k | file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( |
331 | 1.19k | file_meta->table_reader_handle); |
332 | 1.19k | } |
333 | 1.19k | } |
334 | 1.24k | }; |
335 | | |
336 | 1.23k | if (max_threads <= 1) { |
337 | 1.23k | load_handlers_func(); |
338 | 2 | } else { |
339 | 2 | std::vector<std::thread> threads; |
340 | 12 | for (int i = 0; i < max_threads; i++) { |
341 | 10 | threads.emplace_back(load_handlers_func); |
342 | 10 | } |
343 | | |
344 | 10 | for (auto& t : threads) { |
345 | 10 | t.join(); |
346 | 10 | } |
347 | 2 | } |
348 | 1.23k | } |
349 | | |
350 | 1.00M | void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { |
351 | 1.00M | if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) { |
352 | | // f is to-be-delected table file |
353 | 60.7k | vstorage->RemoveCurrentStats(f); |
354 | 948k | } else { |
355 | 948k | vstorage->AddFile(level, f, info_log_); |
356 | 948k | } |
357 | 1.00M | } |
358 | | }; |
359 | | |
360 | | VersionBuilder::VersionBuilder(const EnvOptions& env_options, |
361 | | TableCache* table_cache, |
362 | | VersionStorageInfo* base_vstorage, |
363 | | Logger* info_log) |
364 | 653k | : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {} |
365 | 653k | VersionBuilder::~VersionBuilder() { delete rep_; } |
366 | 0 | void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { |
367 | 0 | rep_->CheckConsistency(vstorage); |
368 | 0 | } |
369 | | void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, |
370 | 0 | uint64_t number, int level) { |
371 | 0 | rep_->CheckConsistencyForDeletes(edit, number, level); |
372 | 0 | } |
373 | 680k | void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } |
374 | 653k | void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { |
375 | 653k | rep_->SaveTo(vstorage); |
376 | 653k | } |
377 | | void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats, |
378 | 1.23k | int max_threads) { |
379 | 1.23k | rep_->LoadTableHandlers(internal_stats, max_threads); |
380 | 1.23k | } |
381 | | void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, |
382 | 0 | FileMetaData* f) { |
383 | 0 | rep_->MaybeAddFile(vstorage, level, f); |
384 | 0 | } |
385 | | |
386 | | } // namespace rocksdb |