YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_builder.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/version_builder.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <algorithm>
32
#include <atomic>
33
#include <set>
34
#include <thread>
35
#include <unordered_map>
36
#include <unordered_set>
37
#include <utility>
38
#include <vector>
39
40
#include "yb/rocksdb/db/dbformat.h"
41
#include "yb/rocksdb/db/internal_stats.h"
42
#include "yb/rocksdb/db/table_cache.h"
43
#include "yb/rocksdb/db/version_set.h"
44
#include "yb/rocksdb/table/table_reader.h"
45
46
#include "yb/util/format.h"
47
#include "yb/util/logging.h"
48
#include "yb/util/status_log.h"
49
50
namespace rocksdb {
51
52
1.28M
bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
53
1.28M
  if (a->smallest.seqno != b->smallest.seqno) {
54
1.13M
    return a->smallest.seqno > b->smallest.seqno;
55
1.13M
  }
56
145k
  if (a->largest.seqno != b->largest.seqno) {
57
1.98k
    return a->largest.seqno > b->largest.seqno;
58
1.98k
  }
59
  // Break ties by file number
60
143k
  return a->fd.GetNumber() > b->fd.GetNumber();
61
143k
}
62
63
namespace {
64
bool BySmallestKey(FileMetaData* a, FileMetaData* b,
65
1.99M
                   const InternalKeyComparator* cmp) {
66
1.99M
  int r = cmp->Compare(a->smallest.key, b->smallest.key);
67
1.99M
  if (r != 0) {
68
1.98M
    return (r < 0);
69
1.98M
  }
70
  // Break ties by file number
71
5.85k
  return (a->fd.GetNumber() < b->fd.GetNumber());
72
5.85k
}
73
}  // namespace
74
75
class VersionBuilder::Rep {
76
 private:
77
  // Helper to sort files_ in v
78
  // kLevel0 -- NewestFirstBySeqNo
79
  // kLevelNon0 -- BySmallestKey
80
  struct FileComparator {
81
    enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
82
    InternalKeyComparatorPtr internal_comparator;
83
84
2.57M
    bool operator()(FileMetaData* f1, FileMetaData* f2) const {
85
2.57M
      switch (sort_method) {
86
581k
        case kLevel0:
87
581k
          return NewestFirstBySeqNo(f1, f2);
88
1.99M
        case kLevelNon0:
89
1.99M
          return BySmallestKey(f1, f2, internal_comparator.get());
90
0
      }
91
0
      assert(false);
92
0
      return false;
93
0
    }
94
  };
95
96
  struct LevelState {
97
    std::unordered_set<uint64_t> deleted_files;
98
    // Map from file number to file meta data.
99
    std::unordered_map<uint64_t, FileMetaData*> added_files;
100
  };
101
102
  const EnvOptions& env_options_;
103
  Logger* info_log_;
104
  TableCache* table_cache_;
105
  VersionStorageInfo* base_vstorage_;
106
  LevelState* levels_;
107
  FileComparator level_zero_cmp_;
108
  FileComparator level_nonzero_cmp_;
109
110
 public:
111
  Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache,
112
      VersionStorageInfo* base_vstorage)
113
      : env_options_(env_options),
114
        info_log_(info_log),
115
        table_cache_(table_cache),
116
653k
        base_vstorage_(base_vstorage) {
117
653k
    levels_ = new LevelState[base_vstorage_->num_levels()];
118
653k
    level_zero_cmp_.sort_method = FileComparator::kLevel0;
119
653k
    level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
120
653k
    level_nonzero_cmp_.internal_comparator =
121
653k
        base_vstorage_->InternalComparator();
122
653k
  }
123
124
653k
  ~Rep() {
125
1.72M
    for (int level = 0; level < base_vstorage_->num_levels(); level++) {
126
1.07M
      const auto& added = levels_[level].added_files;
127
91.4k
      for (auto& pair : added) {
128
91.4k
        UnrefFile(pair.second);
129
91.4k
      }
130
1.07M
    }
131
132
653k
    delete[] levels_;
133
653k
  }
134
135
107k
  void UnrefFile(FileMetaData* f) {
136
107k
    if (f->Unref(table_cache_)) {
137
15.6k
      delete f;
138
15.6k
    }
139
107k
  }
140
141
2.64M
  void CheckConsistency(VersionStorageInfo* vstorage) {
142
2.64M
#ifndef NDEBUG
143
    // make sure the files are sorted correctly
144
7.07M
    for (int level = 0; level < vstorage->num_levels(); level++) {
145
4.43M
      auto& level_files = vstorage->LevelFiles(level);
146
6.78M
      for (size_t i = 1; i < level_files.size(); i++) {
147
2.34M
        auto f1 = level_files[i - 1];
148
2.34M
        auto f2 = level_files[i];
149
2.34M
        if (level == 0) {
150
477k
          assert(level_zero_cmp_(f1, f2));
151
477k
          assert(f1->largest.seqno > f2->largest.seqno ||
152
                 // We can have multiple files with seqno = 0 as a result of
153
                 // using DB::AddFile()
154
477k
                 (f1->largest.seqno == 0 && f2->largest.seqno == 0));
155
1.86M
        } else {
156
1.86M
          assert(level_nonzero_cmp_(f1, f2));
157
158
          // Make sure there is no overlap in levels > 0
159
1.86M
          if (vstorage->InternalComparator()->Compare(f1->largest.key,
160
0
                                                      f2->smallest.key) >= 0) {
161
0
            fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
162
0
                    f1->largest.key.DebugString().c_str(),
163
0
                    f2->smallest.key.DebugString().c_str());
164
0
            abort();
165
0
          }
166
1.86M
        }
167
2.34M
      }
168
4.43M
    }
169
2.64M
#endif
170
2.64M
  }
171
172
  void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
173
76.3k
                                  int level) {
174
76.3k
#ifndef NDEBUG
175
    // a file to be deleted better exist in the previous version
176
76.3k
    bool found = false;
177
312k
    for (int l = 0; !found && l < base_vstorage_->num_levels(); l++) {
178
236k
      const std::vector<FileMetaData*>& base_files =
179
236k
          base_vstorage_->LevelFiles(l);
180
604k
      for (size_t i = 0; i < base_files.size(); i++) {
181
429k
        FileMetaData* f = base_files[i];
182
429k
        if (f->fd.GetNumber() == number) {
183
60.7k
          found = true;
184
60.7k
          break;
185
60.7k
        }
186
429k
      }
187
236k
    }
188
    // if the file did not exist in the previous version, then it
189
    // is possibly moved from lower level to higher level in current
190
    // version
191
134k
    for (int l = level + 1; !found && l < base_vstorage_->num_levels(); l++) {
192
57.8k
      auto& level_added = levels_[l].added_files;
193
57.8k
      auto got = level_added.find(number);
194
57.8k
      if (got != level_added.end()) {
195
0
        found = true;
196
0
        break;
197
0
      }
198
57.8k
    }
199
200
    // maybe this file was added in a previous edit that was Applied
201
76.3k
    if (!found) {
202
15.6k
      auto& level_added = levels_[level].added_files;
203
15.6k
      auto got = level_added.find(number);
204
15.6k
      if (got != level_added.end()) {
205
15.6k
        found = true;
206
15.6k
      }
207
15.6k
    }
208
1
    LOG_IF(DFATAL, !found) << yb::Format(
209
1
        "$0SST file not found: $1", info_log_ ? info_log_->Prefix() : "", number);
210
76.3k
#endif
211
76.3k
  }
212
213
  // Apply all of the edits in *edit to the current state.
214
680k
  void Apply(VersionEdit* edit) {
215
680k
    CheckConsistency(base_vstorage_);
216
217
    // Delete files
218
680k
    const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
219
76.3k
    for (const auto& del_file : del) {
220
76.3k
      const auto level = del_file.first;
221
76.3k
      const auto number = del_file.second;
222
76.3k
      levels_[level].deleted_files.insert(number);
223
76.3k
      CheckConsistencyForDeletes(edit, number, level);
224
225
76.3k
      auto exising = levels_[level].added_files.find(number);
226
76.3k
      if (exising != levels_[level].added_files.end()) {
227
15.6k
        UnrefFile(exising->second);
228
15.6k
        levels_[level].added_files.erase(number);
229
15.6k
      }
230
76.3k
    }
231
232
    // Add new files
233
107k
    for (const auto& new_file : edit->GetNewFiles()) {
234
107k
      const int level = new_file.first;
235
107k
      FileMetaData* f = new FileMetaData(new_file.second);
236
107k
      f->refs = 1;
237
238
107k
      assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
239
107k
             levels_[level].added_files.end());
240
107k
      levels_[level].deleted_files.erase(f->fd.GetNumber());
241
107k
      levels_[level].added_files[f->fd.GetNumber()] = f;
242
107k
    }
243
680k
  }
244
245
  // Save the current state in *v.
246
653k
  void SaveTo(VersionStorageInfo* vstorage) {
247
653k
    CheckConsistency(base_vstorage_);
248
653k
    CheckConsistency(vstorage);
249
250
1.72M
    for (int level = 0; level < base_vstorage_->num_levels(); level++) {
251
653k
      const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
252
      // Merge the set of added files with the set of pre-existing files.
253
      // Drop any deleted files.  Store the result in *v.
254
1.07M
      const auto& base_files = base_vstorage_->LevelFiles(level);
255
1.07M
      auto base_iter = base_files.begin();
256
1.07M
      auto base_end = base_files.end();
257
1.07M
      const auto& unordered_added_files = levels_[level].added_files;
258
1.07M
      vstorage->Reserve(level,
259
1.07M
                        base_files.size() + unordered_added_files.size());
260
261
      // Sort added files for the level.
262
1.07M
      std::vector<FileMetaData*> added_files;
263
1.07M
      added_files.reserve(unordered_added_files.size());
264
91.3k
      for (const auto& pair : unordered_added_files) {
265
91.3k
        added_files.push_back(pair.second);
266
91.3k
      }
267
1.07M
      std::sort(added_files.begin(), added_files.end(), cmp);
268
269
1.07M
#ifndef NDEBUG
270
1.07M
      FileMetaData* prev_file = nullptr;
271
1.07M
#endif
272
273
91.4k
      for (const auto& added : added_files) {
274
91.4k
#ifndef NDEBUG
275
91.4k
        if (level > 0 && prev_file != nullptr) {
276
14.2k
          assert(base_vstorage_->InternalComparator()->Compare(
277
14.2k
                     prev_file->smallest.key, added->smallest.key) <= 0);
278
14.2k
        }
279
91.4k
        prev_file = added;
280
91.4k
#endif
281
282
        // Add all smaller files listed in base_
283
91.4k
        for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp);
284
186k
             base_iter != bpos; ++base_iter) {
285
95.4k
          MaybeAddFile(vstorage, level, *base_iter);
286
95.4k
        }
287
288
91.4k
        MaybeAddFile(vstorage, level, added);
289
91.4k
      }
290
291
      // Add remaining base files
292
1.89M
      for (; base_iter != base_end; ++base_iter) {
293
822k
        MaybeAddFile(vstorage, level, *base_iter);
294
822k
      }
295
1.07M
    }
296
297
653k
    CheckConsistency(vstorage);
298
653k
  }
299
300
1.23k
  void LoadTableHandlers(InternalStats* internal_stats, int max_threads) {
301
1.23k
    assert(table_cache_ != nullptr);
302
    // <file metadata, level>
303
1.23k
    std::vector<std::pair<FileMetaData*, int>> files_meta;
304
9.80k
    for (int level = 0; level < base_vstorage_->num_levels(); level++) {
305
1.19k
      for (auto& file_meta_pair : levels_[level].added_files) {
306
1.19k
        auto* file_meta = file_meta_pair.second;
307
1.19k
        assert(!file_meta->table_reader_handle);
308
1.19k
        files_meta.emplace_back(file_meta, level);
309
1.19k
      }
310
8.57k
    }
311
312
1.23k
    std::atomic<size_t> next_file_meta_idx(0);
313
1.24k
    std::function<void()> load_handlers_func = [&]() {
314
2.43k
      while (true) {
315
2.43k
        size_t file_idx = next_file_meta_idx.fetch_add(1);
316
2.43k
        if (file_idx >= files_meta.size()) {
317
1.24k
          break;
318
1.24k
        }
319
320
1.19k
        auto* file_meta = files_meta[file_idx].first;
321
1.19k
        int level = files_meta[file_idx].second;
322
1.19k
        CHECK_OK(table_cache_->FindTable(env_options_,
323
1.19k
                                         base_vstorage_->InternalComparator(),
324
1.19k
                                         file_meta->fd, &file_meta->table_reader_handle,
325
1.19k
                                         kDefaultQueryId,
326
1.19k
                                         false /*no_io */, true /* record_read_stats */,
327
1.19k
                                         internal_stats->GetFileReadHist(level)));
328
1.19k
        if (file_meta->table_reader_handle != nullptr) {
329
          // Load table_reader
330
1.19k
          file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
331
1.19k
              file_meta->table_reader_handle);
332
1.19k
        }
333
1.19k
      }
334
1.24k
    };
335
336
1.23k
    if (max_threads <= 1) {
337
1.23k
      load_handlers_func();
338
2
    } else {
339
2
      std::vector<std::thread> threads;
340
12
      for (int i = 0; i < max_threads; i++) {
341
10
        threads.emplace_back(load_handlers_func);
342
10
      }
343
344
10
      for (auto& t : threads) {
345
10
        t.join();
346
10
      }
347
2
    }
348
1.23k
  }
349
350
1.00M
  void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
351
1.00M
    if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
352
      // f is to-be-delected table file
353
60.7k
      vstorage->RemoveCurrentStats(f);
354
948k
    } else {
355
948k
      vstorage->AddFile(level, f, info_log_);
356
948k
    }
357
1.00M
  }
358
};
359
360
VersionBuilder::VersionBuilder(const EnvOptions& env_options,
361
                               TableCache* table_cache,
362
                               VersionStorageInfo* base_vstorage,
363
                               Logger* info_log)
364
653k
    : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
365
653k
VersionBuilder::~VersionBuilder() { delete rep_; }
366
0
void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
367
0
  rep_->CheckConsistency(vstorage);
368
0
}
369
void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
370
0
                                                uint64_t number, int level) {
371
0
  rep_->CheckConsistencyForDeletes(edit, number, level);
372
0
}
373
680k
void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
374
653k
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
375
653k
  rep_->SaveTo(vstorage);
376
653k
}
377
void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
378
1.23k
                                       int max_threads) {
379
1.23k
  rep_->LoadTableHandlers(internal_stats, max_threads);
380
1.23k
}
381
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
382
0
                                  FileMetaData* f) {
383
0
  rep_->MaybeAddFile(vstorage, level, f);
384
0
}
385
386
}  // namespace rocksdb