YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_builder.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/version_builder.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <algorithm>
32
#include <atomic>
33
#include <set>
34
#include <thread>
35
#include <unordered_map>
36
#include <unordered_set>
37
#include <utility>
38
#include <vector>
39
40
#include "yb/rocksdb/db/dbformat.h"
41
#include "yb/rocksdb/db/internal_stats.h"
42
#include "yb/rocksdb/db/table_cache.h"
43
#include "yb/rocksdb/db/version_set.h"
44
#include "yb/rocksdb/table/table_reader.h"
45
46
#include "yb/util/format.h"
47
#include "yb/util/logging.h"
48
#include "yb/util/status_log.h"
49
50
namespace rocksdb {
51
52
1.30M
bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
53
1.30M
  if (a->smallest.seqno != b->smallest.seqno) {
54
1.15M
    return a->smallest.seqno > b->smallest.seqno;
55
1.15M
  }
56
146k
  if (a->largest.seqno != b->largest.seqno) {
57
2.44k
    return a->largest.seqno > b->largest.seqno;
58
2.44k
  }
59
  // Break ties by file number
60
144k
  return a->fd.GetNumber() > b->fd.GetNumber();
61
146k
}
62
63
namespace {
64
bool BySmallestKey(FileMetaData* a, FileMetaData* b,
65
2.00M
                   const InternalKeyComparator* cmp) {
66
2.00M
  int r = cmp->Compare(a->smallest.key, b->smallest.key);
67
2.00M
  if (r != 0) {
68
1.99M
    return (r < 0);
69
1.99M
  }
70
  // Break ties by file number
71
5.88k
  return (a->fd.GetNumber() < b->fd.GetNumber());
72
2.00M
}
73
}  // namespace
74
75
class VersionBuilder::Rep {
76
 private:
77
  // Helper to sort files_ in v
78
  // kLevel0 -- NewestFirstBySeqNo
79
  // kLevelNon0 -- BySmallestKey
80
  struct FileComparator {
81
    enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
82
    InternalKeyComparatorPtr internal_comparator;
83
84
2.60M
    bool operator()(FileMetaData* f1, FileMetaData* f2) const {
85
2.60M
      switch (sort_method) {
86
602k
        case kLevel0:
87
602k
          return NewestFirstBySeqNo(f1, f2);
88
2.00M
        case kLevelNon0:
89
2.00M
          return BySmallestKey(f1, f2, internal_comparator.get());
90
2.60M
      }
91
0
      assert(false);
92
0
      return false;
93
2.60M
    }
94
  };
95
96
  struct LevelState {
97
    std::unordered_set<uint64_t> deleted_files;
98
    // Map from file number to file meta data.
99
    std::unordered_map<uint64_t, FileMetaData*> added_files;
100
  };
101
102
  const EnvOptions& env_options_;
103
  Logger* info_log_;
104
  TableCache* table_cache_;
105
  VersionStorageInfo* base_vstorage_;
106
  LevelState* levels_;
107
  FileComparator level_zero_cmp_;
108
  FileComparator level_nonzero_cmp_;
109
110
 public:
111
  Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache,
112
      VersionStorageInfo* base_vstorage)
113
      : env_options_(env_options),
114
        info_log_(info_log),
115
        table_cache_(table_cache),
116
773k
        base_vstorage_(base_vstorage) {
117
773k
    levels_ = new LevelState[base_vstorage_->num_levels()];
118
773k
    level_zero_cmp_.sort_method = FileComparator::kLevel0;
119
773k
    level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
120
773k
    level_nonzero_cmp_.internal_comparator =
121
773k
        base_vstorage_->InternalComparator();
122
773k
  }
123
124
774k
  ~Rep() {
125
1.97M
    for (int level = 0; level < base_vstorage_->num_levels(); 
level++1.19M
) {
126
1.19M
      const auto& added = levels_[level].added_files;
127
1.19M
      for (auto& pair : added) {
128
102k
        UnrefFile(pair.second);
129
102k
      }
130
1.19M
    }
131
132
774k
    delete[] levels_;
133
774k
  }
134
135
118k
  void UnrefFile(FileMetaData* f) {
136
118k
    if (f->Unref(table_cache_)) {
137
15.9k
      delete f;
138
15.9k
    }
139
118k
  }
140
141
3.12M
  void CheckConsistency(VersionStorageInfo* vstorage) {
142
3.12M
#ifndef NDEBUG
143
    // make sure the files are sorted correctly
144
8.05M
    for (int level = 0; level < vstorage->num_levels(); 
level++4.92M
) {
145
4.92M
      auto& level_files = vstorage->LevelFiles(level);
146
7.29M
      for (size_t i = 1; i < level_files.size(); 
i++2.36M
) {
147
2.36M
        auto f1 = level_files[i - 1];
148
2.36M
        auto f2 = level_files[i];
149
2.36M
        if (level == 0) {
150
489k
          assert(level_zero_cmp_(f1, f2));
151
0
          assert(f1->largest.seqno > f2->largest.seqno ||
152
                 // We can have multiple files with seqno = 0 as a result of
153
                 // using DB::AddFile()
154
489k
                 (f1->largest.seqno == 0 && f2->largest.seqno == 0));
155
1.87M
        } else {
156
1.87M
          assert(level_nonzero_cmp_(f1, f2));
157
158
          // Make sure there is no overlap in levels > 0
159
1.87M
          if (vstorage->InternalComparator()->Compare(f1->largest.key,
160
1.87M
                                                      f2->smallest.key) >= 0) {
161
0
            fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
162
0
                    f1->largest.key.DebugString().c_str(),
163
0
                    f2->smallest.key.DebugString().c_str());
164
0
            abort();
165
0
          }
166
1.87M
        }
167
2.36M
      }
168
4.92M
    }
169
3.12M
#endif
170
3.12M
  }
171
172
  void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
173
81.2k
                                  int level) {
174
81.2k
#ifndef NDEBUG
175
    // a file to be deleted better exist in the previous version
176
81.2k
    bool found = false;
177
323k
    for (int l = 0; !found && 
l < base_vstorage_->num_levels()258k
;
l++242k
) {
178
242k
      const std::vector<FileMetaData*>& base_files =
179
242k
          base_vstorage_->LevelFiles(l);
180
615k
      for (size_t i = 0; i < base_files.size(); 
i++373k
) {
181
438k
        FileMetaData* f = base_files[i];
182
438k
        if (f->fd.GetNumber() == number) {
183
65.3k
          found = true;
184
65.3k
          break;
185
65.3k
        }
186
438k
      }
187
242k
    }
188
    // if the file did not exist in the previous version, then it
189
    // is possibly moved from lower level to higher level in current
190
    // version
191
139k
    for (int l = level + 1; !found && 
l < base_vstorage_->num_levels()73.9k
;
l++58.1k
) {
192
58.1k
      auto& level_added = levels_[l].added_files;
193
58.1k
      auto got = level_added.find(number);
194
58.1k
      if (got != level_added.end()) {
195
0
        found = true;
196
0
        break;
197
0
      }
198
58.1k
    }
199
200
    // maybe this file was added in a previous edit that was Applied
201
81.2k
    if (!found) {
202
15.8k
      auto& level_added = levels_[level].added_files;
203
15.8k
      auto got = level_added.find(number);
204
15.8k
      if (got != level_added.end()) {
205
15.8k
        found = true;
206
15.8k
      }
207
15.8k
    }
208
81.2k
    LOG_IF(DFATAL, !found) << yb::Format(
209
0
        "$0SST file not found: $1", info_log_ ? info_log_->Prefix() : "", number);
210
81.2k
#endif
211
81.2k
  }
212
213
  // Apply all of the edits in *edit to the current state.
214
808k
  void Apply(VersionEdit* edit) {
215
808k
    CheckConsistency(base_vstorage_);
216
217
    // Delete files
218
808k
    const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
219
808k
    for (const auto& del_file : del) {
220
81.2k
      const auto level = del_file.first;
221
81.2k
      const auto number = del_file.second;
222
81.2k
      levels_[level].deleted_files.insert(number);
223
81.2k
      CheckConsistencyForDeletes(edit, number, level);
224
225
81.2k
      auto exising = levels_[level].added_files.find(number);
226
81.2k
      if (exising != levels_[level].added_files.end()) {
227
15.8k
        UnrefFile(exising->second);
228
15.8k
        levels_[level].added_files.erase(number);
229
15.8k
      }
230
81.2k
    }
231
232
    // Add new files
233
808k
    for (const auto& new_file : edit->GetNewFiles()) {
234
118k
      const int level = new_file.first;
235
118k
      FileMetaData* f = new FileMetaData(new_file.second);
236
118k
      f->refs = 1;
237
238
118k
      assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
239
118k
             levels_[level].added_files.end());
240
0
      levels_[level].deleted_files.erase(f->fd.GetNumber());
241
118k
      levels_[level].added_files[f->fd.GetNumber()] = f;
242
118k
    }
243
808k
  }
244
245
  // Save the current state in *v.
246
774k
  void SaveTo(VersionStorageInfo* vstorage) {
247
774k
    CheckConsistency(base_vstorage_);
248
774k
    CheckConsistency(vstorage);
249
250
1.96M
    for (int level = 0; level < base_vstorage_->num_levels(); 
level++1.19M
) {
251
1.19M
      const auto& cmp = (level == 0) ? 
level_zero_cmp_773k
:
level_nonzero_cmp_421k
;
252
      // Merge the set of added files with the set of pre-existing files.
253
      // Drop any deleted files.  Store the result in *v.
254
1.19M
      const auto& base_files = base_vstorage_->LevelFiles(level);
255
1.19M
      auto base_iter = base_files.begin();
256
1.19M
      auto base_end = base_files.end();
257
1.19M
      const auto& unordered_added_files = levels_[level].added_files;
258
1.19M
      vstorage->Reserve(level,
259
1.19M
                        base_files.size() + unordered_added_files.size());
260
261
      // Sort added files for the level.
262
1.19M
      std::vector<FileMetaData*> added_files;
263
1.19M
      added_files.reserve(unordered_added_files.size());
264
1.19M
      for (const auto& pair : unordered_added_files) {
265
102k
        added_files.push_back(pair.second);
266
102k
      }
267
1.19M
      std::sort(added_files.begin(), added_files.end(), cmp);
268
269
1.19M
#ifndef NDEBUG
270
1.19M
      FileMetaData* prev_file = nullptr;
271
1.19M
#endif
272
273
1.19M
      for (const auto& added : added_files) {
274
102k
#ifndef NDEBUG
275
102k
        if (level > 0 && 
prev_file != nullptr38.3k
) {
276
14.4k
          assert(base_vstorage_->InternalComparator()->Compare(
277
14.4k
                     prev_file->smallest.key, added->smallest.key) <= 0);
278
14.4k
        }
279
0
        prev_file = added;
280
102k
#endif
281
282
        // Add all smaller files listed in base_
283
102k
        for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp);
284
200k
             base_iter != bpos; 
++base_iter97.6k
) {
285
97.6k
          MaybeAddFile(vstorage, level, *base_iter);
286
97.6k
        }
287
288
102k
        MaybeAddFile(vstorage, level, added);
289
102k
      }
290
291
      // Add remaining base files
292
2.02M
      for (; base_iter != base_end; 
++base_iter831k
) {
293
831k
        MaybeAddFile(vstorage, level, *base_iter);
294
831k
      }
295
1.19M
    }
296
297
774k
    CheckConsistency(vstorage);
298
774k
  }
299
300
1.21k
  void LoadTableHandlers(InternalStats* internal_stats, int max_threads) {
301
1.21k
    assert(table_cache_ != nullptr);
302
    // <file metadata, level>
303
0
    std::vector<std::pair<FileMetaData*, int>> files_meta;
304
9.65k
    for (int level = 0; level < base_vstorage_->num_levels(); 
level++8.43k
) {
305
8.43k
      for (auto& file_meta_pair : levels_[level].added_files) {
306
1.17k
        auto* file_meta = file_meta_pair.second;
307
1.17k
        assert(!file_meta->table_reader_handle);
308
0
        files_meta.emplace_back(file_meta, level);
309
1.17k
      }
310
8.43k
    }
311
312
1.21k
    std::atomic<size_t> next_file_meta_idx(0);
313
1.22k
    std::function<void()> load_handlers_func = [&]() {
314
2.39k
      while (true) {
315
2.39k
        size_t file_idx = next_file_meta_idx.fetch_add(1);
316
2.39k
        if (file_idx >= files_meta.size()) {
317
1.22k
          break;
318
1.22k
        }
319
320
1.17k
        auto* file_meta = files_meta[file_idx].first;
321
1.17k
        int level = files_meta[file_idx].second;
322
1.17k
        CHECK_OK(table_cache_->FindTable(env_options_,
323
1.17k
                                         base_vstorage_->InternalComparator(),
324
1.17k
                                         file_meta->fd, &file_meta->table_reader_handle,
325
1.17k
                                         kDefaultQueryId,
326
1.17k
                                         false /*no_io */, true /* record_read_stats */,
327
1.17k
                                         internal_stats->GetFileReadHist(level)));
328
1.17k
        if (file_meta->table_reader_handle != nullptr) {
329
          // Load table_reader
330
1.17k
          file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
331
1.17k
              file_meta->table_reader_handle);
332
1.17k
        }
333
1.17k
      }
334
1.22k
    };
335
336
1.21k
    if (max_threads <= 1) {
337
1.21k
      load_handlers_func();
338
1.21k
    } else {
339
2
      std::vector<std::thread> threads;
340
12
      for (int i = 0; i < max_threads; 
i++10
) {
341
10
        threads.emplace_back(load_handlers_func);
342
10
      }
343
344
10
      for (auto& t : threads) {
345
10
        t.join();
346
10
      }
347
2
    }
348
1.21k
  }
349
350
1.03M
  void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
351
1.03M
    if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
352
      // f is to-be-delected table file
353
65.3k
      vstorage->RemoveCurrentStats(f);
354
966k
    } else {
355
966k
      vstorage->AddFile(level, f, info_log_);
356
966k
    }
357
1.03M
  }
358
};
359
360
VersionBuilder::VersionBuilder(const EnvOptions& env_options,
361
                               TableCache* table_cache,
362
                               VersionStorageInfo* base_vstorage,
363
                               Logger* info_log)
364
773k
    : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
365
774k
VersionBuilder::~VersionBuilder() { delete rep_; }
366
0
void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
367
0
  rep_->CheckConsistency(vstorage);
368
0
}
369
void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
370
0
                                                uint64_t number, int level) {
371
0
  rep_->CheckConsistencyForDeletes(edit, number, level);
372
0
}
373
808k
void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
374
774k
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
375
774k
  rep_->SaveTo(vstorage);
376
774k
}
377
void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
378
1.21k
                                       int max_threads) {
379
1.21k
  rep_->LoadTableHandlers(internal_stats, max_threads);
380
1.21k
}
381
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
382
0
                                  FileMetaData* f) {
383
0
  rep_->MaybeAddFile(vstorage, level, f);
384
0
}
385
386
}  // namespace rocksdb