YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_set.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/version_set.h"
25
#include <memory>
26
27
#ifndef __STDC_FORMAT_MACROS
28
#define __STDC_FORMAT_MACROS
29
#endif
30
31
#include <inttypes.h>
32
#include <stdio.h>
33
#include <algorithm>
34
#include <map>
35
#include <set>
36
#include <climits>
37
#include <unordered_map>
38
#include <vector>
39
#include <string>
40
41
#include <glog/logging.h>
42
#include <boost/container/small_vector.hpp>
43
44
#include "yb/gutil/casts.h"
45
46
#include "yb/util/format.h"
47
#include "yb/util/status_format.h"
48
49
#include "yb/rocksdb/db/filename.h"
50
#include "yb/rocksdb/db/file_numbers.h"
51
#include "yb/rocksdb/db/internal_stats.h"
52
#include "yb/rocksdb/db/log_reader.h"
53
#include "yb/rocksdb/db/log_writer.h"
54
#include "yb/rocksdb/db/memtable.h"
55
#include "yb/rocksdb/db/merge_context.h"
56
#include "yb/rocksdb/db/table_cache.h"
57
#include "yb/rocksdb/db/compaction.h"
58
#include "yb/rocksdb/db/version_builder.h"
59
#include "yb/rocksdb/db/writebuffer.h"
60
#include "yb/rocksdb/env.h"
61
#include "yb/rocksdb/merge_operator.h"
62
#include "yb/rocksdb/table/internal_iterator.h"
63
#include "yb/rocksdb/table/iterator_wrapper.h"
64
#include "yb/rocksdb/table/table_reader.h"
65
#include "yb/rocksdb/table/merger.h"
66
#include "yb/rocksdb/table/two_level_iterator.h"
67
#include "yb/rocksdb/table/format.h"
68
#include "yb/rocksdb/table/meta_blocks.h"
69
#include "yb/rocksdb/table/get_context.h"
70
71
#include "yb/rocksdb/util/coding.h"
72
#include "yb/rocksdb/util/file_reader_writer.h"
73
#include "yb/rocksdb/util/logging.h"
74
#include "yb/rocksdb/util/statistics.h"
75
#include "yb/rocksdb/util/stop_watch.h"
76
#include "yb/rocksdb/util/sync_point.h"
77
78
namespace rocksdb {
79
80
namespace {
81
82
// Find File in LevelFilesBrief data structure
83
// Within an index range defined by left and right
84
int FindFileInRange(const InternalKeyComparator& icmp,
85
    const LevelFilesBrief& file_level,
86
    const Slice& key,
87
    uint32_t left,
88
3.02M
    uint32_t right) {
89
7.83M
  while (left < right) {
90
4.80M
    uint32_t mid = (left + right) / 2;
91
4.80M
    const FdWithBoundaries& f = file_level.files[mid];
92
4.80M
    if (icmp.InternalKeyComparator::Compare(f.largest.key, key) < 0) {
93
      // Key at "mid.largest" is < "target".  Therefore all
94
      // files at or before "mid" are uninteresting.
95
1.88M
      left = mid + 1;
96
2.92M
    } else {
97
      // Key at "mid.largest" is >= "target".  Therefore all files
98
      // after "mid" are uninteresting.
99
2.92M
      right = mid;
100
2.92M
    }
101
4.80M
  }
102
3.02M
  return right;
103
3.02M
}
104
105
// Class to help choose the next file to search for the particular key.
106
// Searches and returns files level by level.
107
// We can search level-by-level since entries never hop across
108
// levels. Therefore we are guaranteed that if we find data
109
// in a smaller level, later levels are irrelevant (unless we
110
// are MergeInProgress).
111
class FilePicker {
112
 public:
113
  FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
114
             const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
115
             unsigned int num_levels, FileIndexer* file_indexer,
116
             const Comparator* user_comparator,
117
             const InternalKeyComparator* internal_comparator)
118
      : num_levels_(num_levels),
119
        curr_level_(-1),
120
        hit_file_level_(-1),
121
        search_left_bound_(0),
122
        search_right_bound_(FileIndexer::kLevelMaxIndex),
123
#ifndef NDEBUG
124
        files_(files),
125
#endif
126
        level_files_brief_(file_levels),
127
        is_hit_file_last_in_level_(false),
128
        user_key_(user_key),
129
        ikey_(ikey),
130
        file_indexer_(file_indexer),
131
        user_comparator_(user_comparator),
132
7.30M
        internal_comparator_(internal_comparator) {
133
    // Setup member variables to search first level.
134
7.30M
    search_ended_ = !PrepareNextLevel();
135
7.30M
    if (!search_ended_) {
136
      // Prefetch Level 0 table data to avoid cache miss if possible.
137
16.0M
      for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; 
++i9.10M
) {
138
9.10M
        auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
139
9.10M
        if (r) {
140
71.4k
          r->Prepare(ikey);
141
71.4k
        }
142
9.10M
      }
143
6.98M
    }
144
7.30M
  }
145
146
9.08M
  FdWithBoundaries* GetNextFile() {
147
10.9M
    while (!search_ended_) {  // Loops over different levels.
148
12.4M
      while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
149
        // Loops over all files in current level.
150
10.8M
        FdWithBoundaries* f = &curr_file_level_->files[curr_index_in_curr_level_];
151
10.8M
        hit_file_level_ = curr_level_;
152
10.8M
        is_hit_file_last_in_level_ =
153
10.8M
            curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
154
10.8M
        int cmp_largest = -1;
155
156
        // Do key range filtering of files or/and fractional cascading if:
157
        // (1) not all the files are in level 0, or
158
        // (2) there are more than 3 Level 0 files
159
        // If there are only 3 or less level 0 files in the system, we skip
160
        // the key range filtering. In this case, more likely, the system is
161
        // highly tuned to minimize number of tables queried by each query,
162
        // so it is unlikely that key range filtering is more efficient than
163
        // querying the files.
164
10.8M
        if (num_levels_ > 1 || 
curr_file_level_->num_files > 35.26M
) {
165
          // Check if key is within a file's range. If search left bound and
166
          // right bound point to the same find, we are sure key falls in
167
          // range.
168
5.65M
          assert(
169
5.65M
              curr_level_ == 0 ||
170
5.65M
              curr_index_in_curr_level_ == start_index_in_curr_level_ ||
171
5.65M
              user_comparator_->Compare(user_key_, f->smallest.user_key()) <= 0);
172
173
0
          int cmp_smallest = user_comparator_->Compare(user_key_, f->smallest.user_key());
174
5.65M
          if (cmp_smallest >= 0) {
175
3.57M
            cmp_largest = user_comparator_->Compare(user_key_, f->largest.user_key());
176
3.57M
          }
177
178
          // Setup file search bound for the next level based on the
179
          // comparison results
180
5.65M
          if (curr_level_ > 0) {
181
2.64M
            file_indexer_->GetNextLevelIndex(curr_level_,
182
2.64M
                                            curr_index_in_curr_level_,
183
2.64M
                                            cmp_smallest, cmp_largest,
184
2.64M
                                            &search_left_bound_,
185
2.64M
                                            &search_right_bound_);
186
2.64M
          }
187
          // Key falls out of current file's range
188
5.65M
          if (cmp_smallest < 0 || 
cmp_largest > 03.57M
) {
189
2.58M
            if (curr_level_ == 0) {
190
2.31M
              ++curr_index_in_curr_level_;
191
2.31M
              continue;
192
2.31M
            } else {
193
              // Search next level.
194
275k
              break;
195
275k
            }
196
2.58M
          }
197
5.65M
        }
198
8.25M
#ifndef NDEBUG
199
        // Sanity check to make sure that the files are correctly sorted
200
8.25M
        if (prev_file_) {
201
702k
          if (curr_level_ != 0) {
202
0
            int comp_sign = internal_comparator_->Compare(
203
0
                prev_file_->largest.key, f->smallest.key);
204
0
            assert(comp_sign < 0);
205
702k
          } else {
206
            // level == 0, the current file cannot be newer than the previous
207
            // one. Use compressed data structure, has no attribute seqNo
208
702k
            assert(curr_index_in_curr_level_ > 0);
209
0
            assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
210
702k
                  files_[0][curr_index_in_curr_level_-1]));
211
702k
          }
212
702k
        }
213
0
        prev_file_ = f;
214
8.25M
#endif
215
8.25M
        if (curr_level_ > 0 && 
cmp_largest < 02.36M
) {
216
          // No more files to search in this level.
217
2.36M
          search_ended_ = !PrepareNextLevel();
218
5.89M
        } else {
219
5.89M
          ++curr_index_in_curr_level_;
220
5.89M
        }
221
8.25M
        return f;
222
10.8M
      }
223
      // Start searching next level.
224
1.85M
      search_ended_ = !PrepareNextLevel();
225
1.85M
    }
226
    // Search ended.
227
827k
    return nullptr;
228
9.08M
  }
229
230
  // getter for current file level
231
  // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
232
25.9M
  unsigned int GetHitFileLevel() { return hit_file_level_; }
233
234
  // Returns true if the most recent "hit file" (i.e., one returned by
235
  // GetNextFile()) is at the last index in its level.
236
8.25M
  bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
237
238
 private:
239
  unsigned int num_levels_;
240
  unsigned int curr_level_;
241
  unsigned int hit_file_level_;
242
  int32_t search_left_bound_;
243
  int32_t search_right_bound_;
244
#ifndef NDEBUG
245
  std::vector<FileMetaData*>* files_;
246
#endif
247
  autovector<LevelFilesBrief>* level_files_brief_;
248
  bool search_ended_;
249
  bool is_hit_file_last_in_level_;
250
  LevelFilesBrief* curr_file_level_;
251
  unsigned int curr_index_in_curr_level_;
252
  unsigned int start_index_in_curr_level_;
253
  Slice user_key_;
254
  Slice ikey_;
255
  FileIndexer* file_indexer_;
256
  const Comparator* user_comparator_;
257
  const InternalKeyComparator* internal_comparator_;
258
#ifndef NDEBUG
259
  FdWithBoundaries* prev_file_;
260
#endif
261
262
  // Setup local variables to search next level.
263
  // Returns false if there are no more levels to search.
264
11.5M
  bool PrepareNextLevel() {
265
11.5M
    curr_level_++;
266
18.2M
    while (curr_level_ < num_levels_) {
267
15.7M
      curr_file_level_ = &(*level_files_brief_)[curr_level_];
268
15.7M
      if (curr_file_level_->num_files == 0) {
269
        // When current level is empty, the search bound generated from upper
270
        // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
271
        // also empty.
272
6.74M
        assert(search_left_bound_ == 0);
273
0
        assert(search_right_bound_ == -1 ||
274
6.74M
               search_right_bound_ == FileIndexer::kLevelMaxIndex);
275
        // Since current level is empty, it will need to search all files in
276
        // the next level
277
0
        search_left_bound_ = 0;
278
6.74M
        search_right_bound_ = FileIndexer::kLevelMaxIndex;
279
6.74M
        curr_level_++;
280
6.74M
        continue;
281
6.74M
      }
282
283
      // Some files may overlap each other. We find
284
      // all files that overlap user_key and process them in order from
285
      // newest to oldest. In the context of merge-operator, this can occur at
286
      // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
287
      // are always compacted into a single entry).
288
8.95M
      int32_t start_index;
289
8.95M
      if (curr_level_ == 0) {
290
        // On Level-0, we read through all files to check for overlap.
291
5.98M
        start_index = 0;
292
5.98M
      } else {
293
        // On Level-n (n>=1), files are sorted. Binary search to find the
294
        // earliest file whose largest key >= ikey. Search left bound and
295
        // right bound are used to narrow the range.
296
2.96M
        if (search_left_bound_ == search_right_bound_) {
297
101k
          start_index = search_left_bound_;
298
2.86M
        } else if (search_left_bound_ < search_right_bound_) {
299
2.84M
          if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
300
2.23M
            search_right_bound_ =
301
2.23M
                static_cast<int32_t>(curr_file_level_->num_files) - 1;
302
2.23M
          }
303
2.84M
          start_index =
304
2.84M
              FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
305
2.84M
                              static_cast<uint32_t>(search_left_bound_),
306
2.84M
                              static_cast<uint32_t>(search_right_bound_));
307
2.84M
        } else {
308
          // search_left_bound > search_right_bound, key does not exist in
309
          // this level. Since no comparison is done in this level, it will
310
          // need to search all files in the next level.
311
25.2k
          search_left_bound_ = 0;
312
25.2k
          search_right_bound_ = FileIndexer::kLevelMaxIndex;
313
25.2k
          curr_level_++;
314
25.2k
          continue;
315
25.2k
        }
316
2.96M
      }
317
8.93M
      start_index_in_curr_level_ = start_index;
318
8.93M
      curr_index_in_curr_level_ = start_index;
319
8.93M
#ifndef NDEBUG
320
8.93M
      prev_file_ = nullptr;
321
8.93M
#endif
322
8.93M
      return true;
323
8.95M
    }
324
    // curr_level_ = num_levels_. So, no more levels to search.
325
2.58M
    return false;
326
11.5M
  }
327
};
328
329
778k
SstFileMetaData::BoundaryValues ConvertBoundaryValues(const FileMetaData::BoundaryValues& source) {
330
778k
  SstFileMetaData::BoundaryValues result;
331
778k
  result.key = source.key.user_key().ToBuffer();
332
778k
  result.seqno = source.seqno;
333
778k
  result.user_frontier = source.user_frontier;
334
778k
  result.user_values = source.user_values;
335
778k
  return result;
336
778k
}
337
338
}  // anonymous namespace
339
340
1.57M
VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
341
342
41.0M
uint64_t VersionStorageInfo::NumFiles() const {
343
41.0M
  uint64_t result = 0;
344
55.5M
  for (int level = num_non_empty_levels_; level-- > 0;) {
345
14.5M
    result += files_[level].size();
346
14.5M
  }
347
41.0M
  return result;
348
41.0M
}
349
350
1.57M
Version::~Version() {
351
1.57M
  assert(refs_ == 0);
352
353
  // Remove from linked list
354
0
  prev_->next_ = next_;
355
1.57M
  next_->prev_ = prev_;
356
357
  // Drop references to files
358
3.27M
  for (int level = 0; level < storage_info_.num_levels_; 
level++1.69M
) {
359
2.66M
    for (size_t i = 0; i < storage_info_.files_[level].size(); 
i++962k
) {
360
962k
      FileMetaData* f = storage_info_.files_[level][i];
361
962k
      assert(f->refs > 0);
362
0
      vset_->UnrefFile(cfd_, f);
363
962k
    }
364
1.69M
  }
365
1.57M
}
366
367
int FindFile(const InternalKeyComparator& icmp,
368
             const LevelFilesBrief& file_level,
369
173k
             const Slice& key) {
370
173k
  return FindFileInRange(icmp, file_level, key, 0,
371
173k
                         static_cast<uint32_t>(file_level.num_files));
372
173k
}
373
374
void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
375
        const std::vector<FileMetaData*>& files,
376
302k
        Arena* arena) {
377
302k
  assert(file_level);
378
0
  assert(arena);
379
380
0
  size_t num = files.size();
381
302k
  file_level->num_files = num;
382
302k
  char* mem = arena->AllocateAligned(num * sizeof(FdWithBoundaries));
383
302k
  file_level->files = reinterpret_cast<FdWithBoundaries*>(mem);
384
385
1.33M
  for (size_t i = 0; i < num; 
i++1.03M
) {
386
1.03M
    new (file_level->files + i) FdWithBoundaries(arena, *files[i]);
387
1.03M
  }
388
302k
}
389
390
static bool AfterFile(const Comparator* ucmp,
391
14
                      const Slice* user_key, const FdWithBoundaries* f) {
392
  // nullptr user_key occurs before all keys and is therefore never after *f
393
14
  return (user_key != nullptr &&
394
14
          ucmp->Compare(*user_key, f->largest.user_key()) > 0);
395
14
}
396
397
static bool BeforeFile(const Comparator* ucmp,
398
7.63k
                       const Slice* user_key, const FdWithBoundaries* f) {
399
  // nullptr user_key occurs after all keys and is therefore never before *f
400
7.63k
  return (user_key != nullptr &&
401
7.63k
          
ucmp->Compare(*user_key, f->smallest.user_key()) < 06.62k
);
402
7.63k
}
403
404
bool SomeFileOverlapsRange(
405
    const InternalKeyComparator& icmp,
406
    bool disjoint_sorted_files,
407
    const LevelFilesBrief& file_level,
408
    const Slice* smallest_user_key,
409
24.7k
    const Slice* largest_user_key) {
410
24.7k
  const Comparator* ucmp = icmp.user_comparator();
411
24.7k
  if (!disjoint_sorted_files) {
412
    // Need to check against all files
413
16
    for (size_t i = 0; i < file_level.num_files; 
i++4
) {
414
14
      const FdWithBoundaries* f = file_level.files + i;
415
14
      if (AfterFile(ucmp, smallest_user_key, f) ||
416
14
          
BeforeFile(ucmp, largest_user_key, f)12
) {
417
        // No overlap
418
10
      } else {
419
10
        return true;  // Overlap
420
10
      }
421
14
    }
422
2
    return false;
423
12
  }
424
425
  // Binary search over file list
426
24.7k
  uint32_t index = 0;
427
24.7k
  if (smallest_user_key != nullptr) {
428
    // Find the earliest possible internal key for smallest_user_key
429
23.5k
    InternalKey small = InternalKey::MaxPossibleForUserKey(*smallest_user_key);
430
23.5k
    index = FindFile(icmp, file_level, small.Encode());
431
23.5k
  }
432
433
24.7k
  if (index >= file_level.num_files) {
434
    // beginning of range is after all files, so no overlap.
435
17.1k
    return false;
436
17.1k
  }
437
438
7.62k
  return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
439
24.7k
}
440
441
namespace {
442
443
// An internal iterator.  For a given version/level pair, yields
444
// information about the files in the level.  For a given entry, key()
445
// is the largest key that occurs in the file, and value() is an
446
// 16-byte value containing the file number and file size, both
447
// encoded using EncodeFixed64.
448
class LevelFileNumIterator : public InternalIterator {
449
 public:
450
  LevelFileNumIterator(const InternalKeyComparator& icmp,
451
                       const LevelFilesBrief* flevel)
452
      : icmp_(icmp),
453
        flevel_(flevel),
454
        index_(static_cast<uint32_t>(flevel->num_files)),
455
27.8k
        current_value_(0, 0, 0, 0) {  // Marks as invalid
456
27.8k
  }
457
458
596k
  bool Valid() const override { return index_ < flevel_->num_files; }
459
460
148k
  void Seek(const Slice& target) override {
461
148k
    index_ = FindFile(icmp_, *flevel_, target);
462
148k
  }
463
464
17.4k
  void SeekToFirst() override { index_ = 0; }
465
466
55
  void SeekToLast() override {
467
55
    index_ = (flevel_->num_files == 0)
468
55
                 ? 
00
469
55
                 : static_cast<uint32_t>(flevel_->num_files) - 1;
470
55
  }
471
472
27.6k
  void Next() override {
473
27.6k
    assert(Valid());
474
0
    index_++;
475
27.6k
  }
476
477
144
  void Prev() override {
478
144
    assert(Valid());
479
144
    if (index_ == 0) {
480
39
      index_ = static_cast<uint32_t>(flevel_->num_files);  // Marks as invalid
481
105
    } else {
482
105
      index_--;
483
105
    }
484
144
  }
485
486
173k
  Slice key() const override {
487
173k
    assert(Valid());
488
0
    return flevel_->files[index_].largest.key;
489
173k
  }
490
491
173k
  Slice value() const override {
492
173k
    assert(Valid());
493
494
0
    auto file_meta = flevel_->files[index_];
495
173k
    current_value_ = file_meta.fd;
496
173k
    return Slice(reinterpret_cast<const char*>(&current_value_),
497
173k
                 sizeof(FileDescriptor));
498
173k
  }
499
500
54.9k
  Status status() const override { return Status::OK(); }
501
502
 private:
503
  const InternalKeyComparator icmp_;
504
  const LevelFilesBrief* flevel_;
505
  uint32_t index_;
506
  mutable FileDescriptor current_value_;
507
};
508
509
class LevelFileIteratorState : public TwoLevelIteratorState {
510
 public:
511
  // @param skip_filters Disables loading/accessing the filter block
512
  LevelFileIteratorState(TableCache* table_cache,
513
                         const ReadOptions& read_options,
514
                         const EnvOptions& env_options,
515
                         const InternalKeyComparatorPtr& icomparator,
516
                         HistogramImpl* file_read_hist, bool for_compaction,
517
                         bool prefix_enabled, bool skip_filters)
518
      : TwoLevelIteratorState(prefix_enabled),
519
        table_cache_(table_cache),
520
        read_options_(read_options),
521
        env_options_(env_options),
522
        icomparator_(icomparator),
523
        file_read_hist_(file_read_hist),
524
        for_compaction_(for_compaction),
525
27.8k
        skip_filters_(skip_filters) {}
526
527
154k
  InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
528
154k
    if (meta_handle.size() != sizeof(FileDescriptor)) {
529
0
      return NewErrorInternalIterator(
530
0
          STATUS(Corruption, "FileReader invoked with unexpected value"));
531
154k
    } else {
532
154k
      const FileDescriptor* fd =
533
154k
          reinterpret_cast<const FileDescriptor*>(meta_handle.data());
534
154k
      return table_cache_->NewIterator(
535
154k
          read_options_, env_options_, icomparator_, *fd, Slice() /* filter */,
536
154k
          nullptr /* don't need reference to table*/, file_read_hist_,
537
154k
          for_compaction_, nullptr /* arena */, skip_filters_);
538
154k
    }
539
154k
  }
540
541
1.84k
  bool PrefixMayMatch(const Slice& internal_key) override {
542
1.84k
    return true;
543
1.84k
  }
544
545
 private:
546
  TableCache* table_cache_;
547
  const ReadOptions read_options_;
548
  const EnvOptions& env_options_;
549
  InternalKeyComparatorPtr icomparator_;
550
  HistogramImpl* file_read_hist_;
551
  bool for_compaction_;
552
  bool skip_filters_;
553
};
554
555
// A wrapper of version builder which references the current version in
556
// constructor and unref it in the destructor.
557
// Both of the constructor and destructor need to be called inside DB Mutex.
558
class BaseReferencedVersionBuilder {
559
 public:
560
  explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
561
      : version_builder_(new VersionBuilder(
562
            cfd->current()->version_set()->env_options(), cfd->table_cache(),
563
            cfd->current()->storage_info(), cfd->ioptions()->info_log)),
564
773k
        version_(cfd->current()) {
565
773k
    version_->Ref();
566
773k
  }
567
774k
  ~BaseReferencedVersionBuilder() {
568
774k
    delete version_builder_;
569
774k
    version_->Unref();
570
774k
  }
571
1.24M
  VersionBuilder* version_builder() { return version_builder_; }
572
573
 private:
574
  VersionBuilder* version_builder_;
575
  Version* version_;
576
};
577
}  // anonymous namespace
578
579
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
580
                                   const FileMetaData* file_meta,
581
105k
                                   const std::string* fname) const {
582
105k
  auto table_cache = cfd_->table_cache();
583
105k
  auto ioptions = cfd_->ioptions();
584
105k
  Status s = table_cache->GetTableProperties(
585
105k
      vset_->env_options_, cfd_->internal_comparator(), file_meta->fd,
586
105k
      tp, true /* no io */);
587
105k
  if (s.ok()) {
588
74.2k
    return s;
589
74.2k
  }
590
591
  // We only ignore error type `Incomplete` since it's by design that we
592
  // disallow table when it's not in table cache.
593
31.0k
  if (!s.IsIncomplete()) {
594
0
    return s;
595
0
  }
596
597
  // 2. Table is not present in table cache, we'll read the table properties
598
  // directly from the properties block in the file.
599
31.0k
  std::unique_ptr<RandomAccessFile> file;
600
31.0k
  if (fname != nullptr) {
601
26
    s = ioptions->env->NewRandomAccessFile(
602
26
        *fname, &file, vset_->env_options_);
603
31.0k
  } else {
604
31.0k
    s = ioptions->env->NewRandomAccessFile(
605
31.0k
        TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
606
31.0k
                      file_meta->fd.GetPathId()),
607
31.0k
        &file, vset_->env_options_);
608
31.0k
  }
609
31.0k
  if (!s.ok()) {
610
7
    return s;
611
7
  }
612
613
31.0k
  TableProperties* raw_table_properties;
614
  // By setting the magic number to kInvalidTableMagicNumber, we can by
615
  // pass the magic number check in the footer.
616
31.0k
  std::unique_ptr<RandomAccessFileReader> file_reader(
617
31.0k
      new RandomAccessFileReader(std::move(file)));
618
31.0k
  s = ReadTableProperties(
619
31.0k
      file_reader.get(), file_meta->fd.GetBaseFileSize(),
620
31.0k
      Footer::kInvalidTableMagicNumber /* table's magic number */, vset_->env_,
621
31.0k
      ioptions->info_log, &raw_table_properties);
622
31.0k
  if (!s.ok()) {
623
45
    return s;
624
45
  }
625
30.9k
  RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
626
627
30.9k
  *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
628
30.9k
  return s;
629
31.0k
}
630
631
159
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
632
159
  Status s;
633
1.26k
  for (int level = 0; level < storage_info_.num_levels_; 
level++1.10k
) {
634
1.10k
    s = GetPropertiesOfAllTables(props, level);
635
1.10k
    if (!s.ok()) {
636
0
      return s;
637
0
    }
638
1.10k
  }
639
640
159
  return Status::OK();
641
159
}
642
643
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
644
1.80k
                                         int level) {
645
5.83k
  for (const auto& file_meta : storage_info_.files_[level]) {
646
5.83k
    auto fname =
647
5.83k
        TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
648
5.83k
                      file_meta->fd.GetPathId());
649
    // 1. If the table is already present in table cache, load table
650
    // properties from there.
651
5.83k
    std::shared_ptr<const TableProperties> table_properties;
652
5.83k
    Status s = GetTableProperties(&table_properties, file_meta, &fname);
653
5.83k
    if (s.ok()) {
654
5.83k
      props->insert({fname, table_properties});
655
5.83k
    } else {
656
0
      return s;
657
0
    }
658
5.83k
  }
659
660
1.80k
  return Status::OK();
661
1.80k
}
662
663
Status Version::GetPropertiesOfTablesInRange(
664
0
    const Range* range, std::size_t n, TablePropertiesCollection* props) const {
665
0
  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
666
0
    for (decltype(n) i = 0; i < n; i++) {
667
      // Convert user_key into a corresponding internal key.
668
0
      const InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
669
0
      const InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
670
0
      std::vector<FileMetaData*> files;
671
0
      storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr, false);
672
0
      for (const auto& file_meta : files) {
673
0
        auto fname =
674
0
            TableFileName(vset_->db_options_->db_paths,
675
0
                          file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
676
0
        if (props->count(fname) == 0) {
677
          // 1. If the table is already present in table cache, load table
678
          // properties from there.
679
0
          std::shared_ptr<const TableProperties> table_properties;
680
0
          Status s = GetTableProperties(&table_properties, file_meta, &fname);
681
0
          if (s.ok()) {
682
0
            props->insert({fname, table_properties});
683
0
          } else {
684
0
            return s;
685
0
          }
686
0
        }
687
0
      }
688
0
    }
689
0
  }
690
691
0
  return Status::OK();
692
0
}
693
694
Status Version::GetAggregatedTableProperties(
695
803
    std::shared_ptr<const TableProperties>* tp, int level) {
696
803
  TablePropertiesCollection props;
697
803
  Status s;
698
803
  if (level < 0) {
699
103
    s = GetPropertiesOfAllTables(&props);
700
700
  } else {
701
700
    s = GetPropertiesOfAllTables(&props, level);
702
700
  }
703
803
  if (!s.ok()) {
704
0
    return s;
705
0
  }
706
707
803
  auto* new_tp = new TableProperties();
708
5.76k
  for (const auto& item : props) {
709
5.76k
    new_tp->Add(*item.second);
710
5.76k
  }
711
803
  tp->reset(new_tp);
712
803
  return Status::OK();
713
803
}
714
715
152
size_t Version::GetMemoryUsageByTableReaders() {
716
152
  size_t total_usage = 0;
717
152
  for (auto& file_level : storage_info_.level_files_brief_) {
718
168
    for (size_t i = 0; i < file_level.num_files; 
i++80
) {
719
80
      total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
720
80
          vset_->env_options_, cfd_->internal_comparator(),
721
80
          file_level.files[i].fd);
722
80
    }
723
88
  }
724
152
  return total_usage;
725
152
}
726
727
655
void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
728
655
  assert(cf_meta);
729
0
  assert(cfd_);
730
731
0
  cf_meta->name = cfd_->GetName();
732
655
  cf_meta->size = 0;
733
655
  cf_meta->file_count = 0;
734
655
  cf_meta->levels.clear();
735
736
655
  auto* ioptions = cfd_->ioptions();
737
655
  auto* vstorage = storage_info();
738
739
2.86k
  for (int level = 0; level < cfd_->NumberLevels(); 
level++2.20k
) {
740
2.20k
    uint64_t level_size = 0;
741
2.20k
    cf_meta->file_count += vstorage->LevelFiles(level).size();
742
2.20k
    std::vector<SstFileMetaData> files;
743
5.14k
    for (const auto& file : vstorage->LevelFiles(level)) {
744
5.14k
      uint32_t path_id = file->fd.GetPathId();
745
5.14k
      std::string file_path;
746
5.14k
      if (path_id < ioptions->db_paths.size()) {
747
5.14k
        file_path = ioptions->db_paths[path_id].path;
748
5.14k
      } else {
749
0
        assert(!ioptions->db_paths.empty());
750
0
        file_path = ioptions->db_paths.back().path;
751
0
      }
752
0
      files.emplace_back(
753
5.14k
          MakeTableFileName("", file->fd.GetNumber()),
754
5.14k
          file_path,
755
5.14k
          file->fd.GetTotalFileSize(),
756
5.14k
          file->fd.GetBaseFileSize(),
757
5.14k
          file->fd.GetBaseFileSize() +
758
5.14k
              file->raw_key_size + file->raw_value_size,
759
5.14k
          ConvertBoundaryValues(file->smallest),
760
5.14k
          ConvertBoundaryValues(file->largest),
761
5.14k
          file->being_compacted);
762
5.14k
      level_size += file->fd.GetTotalFileSize();
763
5.14k
    }
764
2.20k
    cf_meta->levels.emplace_back(
765
2.20k
        level, level_size, std::move(files));
766
2.20k
    cf_meta->size += level_size;
767
2.20k
  }
768
655
}
769
770
771
16
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
772
  // Estimation will be inaccurate when:
773
  // (1) there exist merge keys
774
  // (2) keys are directly overwritten
775
  // (3) deletion on non-existing keys
776
  // (4) low number of samples
777
16
  if (current_num_samples_ == 0) {
778
3
    return 0;
779
3
  }
780
781
13
  if (current_num_non_deletions_ <= current_num_deletions_) {
782
0
    return 0;
783
0
  }
784
785
13
  uint64_t est = current_num_non_deletions_ - current_num_deletions_;
786
787
13
  uint64_t file_count = 0;
788
96
  for (int level = 0; level < num_levels_; 
++level83
) {
789
83
    file_count += files_[level].size();
790
83
  }
791
792
13
  if (current_num_samples_ < file_count) {
793
    // casting to avoid overflowing
794
1
    return
795
1
      static_cast<uint64_t>(
796
1
        (est * static_cast<double>(file_count) / current_num_samples_)
797
1
      );
798
12
  } else {
799
12
    return est;
800
12
  }
801
13
}
802
803
void Version::AddIterators(const ReadOptions& read_options,
804
                           const EnvOptions& soptions,
805
38.1M
                           MergeIteratorBuilder* merge_iter_builder) {
806
38.1M
  assert(storage_info_.finalized_);
807
808
38.1M
  if (storage_info_.num_non_empty_levels() == 0) {
809
    // No file in the Version.
810
27.3M
    return;
811
27.3M
  }
812
813
10.7M
  auto* arena = merge_iter_builder->GetArena();
814
815
  // Merge all level zero files together since they may overlap
816
31.3M
  for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; 
i++20.5M
) {
817
20.5M
    const auto& file = storage_info_.LevelFilesBrief(0).files[i];
818
20.5M
    if (!read_options.file_filter || 
read_options.file_filter->Filter(file)14.8M
) {
819
20.5M
      InternalIterator *file_iter;
820
20.5M
      TableCache::TableReaderWithHandle trwh;
821
20.5M
      Status s = cfd_->table_cache()->GetTableReaderForIterator(read_options, soptions,
822
20.5M
          cfd_->internal_comparator(), file.fd, &trwh, cfd_->internal_stats()->GetFileReadHist(0),
823
20.5M
          false);
824
20.5M
      if (
s.ok()20.5M
) {
825
20.5M
        if (!read_options.table_aware_file_filter ||
826
20.5M
            
read_options.table_aware_file_filter->Filter(trwh.table_reader)12.9M
) {
827
14.3M
          file_iter = cfd_->table_cache()->NewIterator(
828
14.3M
              read_options, &trwh, storage_info_.LevelFiles(0)[i]->UserFilter(), false, arena);
829
14.3M
        } else {
830
6.13M
          file_iter = nullptr;
831
6.13M
        }
832
18.4E
      } else {
833
18.4E
        file_iter = NewErrorInternalIterator(s, arena);
834
18.4E
      }
835
20.5M
      if (file_iter) {
836
14.3M
        merge_iter_builder->AddIterator(file_iter);
837
14.3M
      }
838
20.5M
    }
839
20.5M
  }
840
841
  // For levels > 0, we can use a concatenating iterator that sequentially
842
  // walks through the non-overlapping files in the level, opening them
843
  // lazily.
844
10.8M
  for (int level = 1; level < storage_info_.num_non_empty_levels(); 
level++19.9k
) {
845
19.9k
    if (storage_info_.LevelFilesBrief(level).num_files != 0) {
846
18.0k
      auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
847
18.0k
      auto* state = new (mem)
848
18.0k
          LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
849
18.0k
                                 cfd_->internal_comparator(),
850
18.0k
                                 cfd_->internal_stats()->GetFileReadHist(level),
851
18.0k
                                 false /* for_compaction */,
852
18.0k
                                 cfd_->ioptions()->prefix_extractor != nullptr,
853
18.0k
                                 IsFilterSkipped(level));
854
18.0k
      mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
855
18.0k
      auto* first_level_iter = new (mem) LevelFileNumIterator(
856
18.0k
          *cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
857
18.0k
      merge_iter_builder->AddIterator(NewTwoLevelIterator(state, first_level_iter, arena, false));
858
18.0k
    }
859
19.9k
  }
860
10.7M
}
861
862
VersionStorageInfo::VersionStorageInfo(
863
    const InternalKeyComparatorPtr& internal_comparator,
864
    const Comparator* user_comparator, int levels,
865
    CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage)
866
    : internal_comparator_(internal_comparator),
867
      user_comparator_(user_comparator),
868
      // cfd is nullptr if Version is dummy
869
      num_levels_(levels),
870
      num_non_empty_levels_(0),
871
      file_indexer_(user_comparator),
872
      compaction_style_(compaction_style),
873
      files_(new std::vector<FileMetaData*>[num_levels_]),
874
      base_level_(num_levels_ == 1 ? -1 : 1),
875
      files_by_compaction_pri_(num_levels_),
876
      level0_non_overlapping_(false),
877
      next_file_to_compact_by_size_(num_levels_),
878
      compaction_score_(num_levels_),
879
      compaction_level_(num_levels_),
880
      l0_delay_trigger_count_(0),
881
      accumulated_file_size_(0),
882
      accumulated_raw_key_size_(0),
883
      accumulated_raw_value_size_(0),
884
      accumulated_num_non_deletions_(0),
885
      accumulated_num_deletions_(0),
886
      current_num_non_deletions_(0),
887
      current_num_deletions_(0),
888
      current_num_samples_(0),
889
      estimated_compaction_needed_bytes_(0),
890
1.65M
      finalized_(false) {
891
1.65M
  if (ref_vstorage != nullptr) {
892
774k
    accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
893
774k
    accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
894
774k
    accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
895
774k
    accumulated_num_non_deletions_ =
896
774k
        ref_vstorage->accumulated_num_non_deletions_;
897
774k
    accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
898
774k
    current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
899
774k
    current_num_deletions_ = ref_vstorage->current_num_deletions_;
900
774k
    current_num_samples_ = ref_vstorage->current_num_samples_;
901
774k
  }
902
1.65M
}
903
904
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
905
                 uint64_t version_number)
906
    : env_(vset->env_),
907
      cfd_(column_family_data),
908
      info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
909
      db_statistics_((cfd_ == nullptr) ? nullptr
910
                                       : cfd_->ioptions()->statistics),
911
      table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
912
      merge_operator_((cfd_ == nullptr) ? nullptr
913
                                        : cfd_->ioptions()->merge_operator),
914
      storage_info_((cfd_ == nullptr) ? nullptr : cfd_->internal_comparator(),
915
                    (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
916
                    cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
917
                    cfd_ == nullptr ? kCompactionStyleLevel
918
                                    : cfd_->ioptions()->compaction_style,
919
                    (cfd_ == nullptr || cfd_->current() == nullptr)
920
                        ? nullptr
921
                        : cfd_->current()->storage_info()),
922
      vset_(vset),
923
      next_(this),
924
      prev_(this),
925
      refs_(0),
926
1.65M
      version_number_(version_number) {}
927
928
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
929
                  std::string* value, Status* status,
930
                  MergeContext* merge_context, bool* value_found,
931
7.30M
                  bool* key_exists, SequenceNumber* seq) {
932
7.30M
  Slice ikey = k.internal_key();
933
7.30M
  Slice user_key = k.user_key();
934
935
7.30M
  assert(status->ok() || status->IsMergeInProgress());
936
937
7.30M
  if (key_exists != nullptr) {
938
    // will falsify below if not found
939
21
    *key_exists = true;
940
21
  }
941
942
7.30M
  GetContext get_context(
943
7.30M
      user_comparator(), merge_operator_, info_log_, db_statistics_,
944
7.30M
      status->ok() ? 
GetContext::kNotFound7.30M
:
GetContext::kMerge1.84k
, user_key,
945
7.30M
      value, value_found, merge_context, this->env_, seq);
946
947
7.30M
  FilePicker fp(
948
7.30M
      storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
949
7.30M
      storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
950
7.30M
      user_comparator(), internal_comparator().get());
951
7.30M
  FdWithBoundaries* f = fp.GetNextFile();
952
9.08M
  while (f != nullptr) {
953
8.25M
    *status = table_cache_->Get(
954
8.25M
        read_options, internal_comparator(), f->fd, ikey, &get_context,
955
8.25M
        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
956
8.25M
        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
957
8.25M
                        fp.IsHitFileLastInLevel()));
958
    // TODO: examine the behavior for corrupted key
959
8.25M
    if (!status->ok()) {
960
524
      return;
961
524
    }
962
963
8.25M
    switch (get_context.State()) {
964
1.78M
      case GetContext::kNotFound:
965
        // Keep searching in other files
966
1.78M
        break;
967
6.47M
      case GetContext::kFound:
968
6.47M
        if (fp.GetHitFileLevel() == 0) {
969
4.40M
          RecordTick(db_statistics_, GET_HIT_L0);
970
4.40M
        } else 
if (2.06M
fp.GetHitFileLevel() == 12.06M
) {
971
1.20M
          RecordTick(db_statistics_, GET_HIT_L1);
972
1.20M
        } else 
if (853k
fp.GetHitFileLevel() >= 2853k
) {
973
853k
          RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
974
853k
        }
975
6.47M
        return;
976
4.91k
      case GetContext::kDeleted:
977
        // Use empty error message for speed
978
4.91k
        *status = STATUS(NotFound, "");
979
4.91k
        return;
980
0
      case GetContext::kCorrupt:
981
0
        *status = STATUS(Corruption, "corrupted key for ", user_key);
982
0
        return;
983
567
      case GetContext::kMerge:
984
567
        break;
985
8.25M
    }
986
1.78M
    f = fp.GetNextFile();
987
1.78M
  }
988
989
827k
  if (GetContext::kMerge == get_context.State()) {
990
1.51k
    if (!merge_operator_) {
991
0
      *status =  STATUS(InvalidArgument,
992
0
          "merge_operator is not properly initialized.");
993
0
      return;
994
0
    }
995
    // merge_operands are in saver and we hit the beginning of the key history
996
    // do a final merge of nullptr and operands;
997
1.51k
    if (merge_operator_->FullMerge(user_key, nullptr,
998
1.51k
                                   merge_context->GetOperands(), value,
999
1.51k
                                   info_log_)) {
1000
1.51k
      *status = Status::OK();
1001
1.51k
    } else {
1002
0
      RecordTick(db_statistics_, NUMBER_MERGE_FAILURES);
1003
0
      *status = STATUS(Corruption, "could not perform end-of-key merge for ",
1004
0
                                   user_key);
1005
0
    }
1006
825k
  } else {
1007
825k
    if (key_exists != nullptr) {
1008
3
      *key_exists = false;
1009
3
    }
1010
825k
    *status = STATUS(NotFound, ""); // Use an empty error message for speed
1011
825k
  }
1012
827k
}
1013
1014
8.27M
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
1015
  // Reaching the bottom level implies misses at all upper levels, so we'll
1016
  // skip checking the filters when we predict a hit.
1017
8.27M
  return cfd_->ioptions()->optimize_filters_for_hits &&
1018
8.27M
         
(561k
level > 0561k
||
is_file_last_in_level224k
) &&
1019
8.27M
         
level == storage_info_.num_non_empty_levels() - 1545k
;
1020
8.27M
}
1021
1022
773k
void VersionStorageInfo::GenerateLevelFilesBrief() {
1023
773k
  level_files_brief_.resize(num_non_empty_levels_);
1024
1.03M
  for (int level = 0; level < num_non_empty_levels_; 
level++256k
) {
1025
256k
    DoGenerateLevelFilesBrief(
1026
256k
        &level_files_brief_[level], files_[level], &arena_);
1027
256k
  }
1028
773k
}
1029
1030
void Version::PrepareApply(
1031
    const MutableCFOptions& mutable_cf_options,
1032
773k
    bool update_stats) {
1033
773k
  UpdateAccumulatedStats(update_stats);
1034
773k
  storage_info_.UpdateNumNonEmptyLevels();
1035
773k
  storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
1036
773k
  storage_info_.UpdateFilesByCompactionPri(mutable_cf_options);
1037
773k
  storage_info_.GenerateFileIndexer();
1038
773k
  storage_info_.GenerateLevelFilesBrief();
1039
773k
  storage_info_.GenerateLevel0NonOverlapping();
1040
773k
}
1041
1042
961k
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
1043
961k
  if (file_meta->init_stats_from_file ||
1044
961k
      
file_meta->compensated_file_size > 0118k
) {
1045
864k
    return false;
1046
864k
  }
1047
96.8k
  std::shared_ptr<const TableProperties> tp;
1048
96.8k
  Status s = GetTableProperties(&tp, file_meta);
1049
96.8k
  file_meta->init_stats_from_file = true;
1050
96.8k
  if (!s.ok()) {
1051
52
    RLOG(InfoLogLevel::ERROR_LEVEL, vset_->db_options_->info_log,
1052
52
        "Unable to load table properties for file %" PRIu64 " --- %s\n",
1053
52
        file_meta->fd.GetNumber(), s.ToString().c_str());
1054
52
    return false;
1055
52
  }
1056
96.7k
  if (tp.get() == nullptr) 
return false0
;
1057
96.7k
  file_meta->num_entries = tp->num_entries;
1058
96.7k
  file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties);
1059
96.7k
  file_meta->raw_value_size = tp->raw_value_size;
1060
96.7k
  file_meta->raw_key_size = tp->raw_key_size;
1061
1062
96.7k
  return true;
1063
96.7k
}
1064
1065
96.7k
void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
1066
96.7k
  assert(file_meta->init_stats_from_file);
1067
0
  accumulated_file_size_ += file_meta->fd.GetTotalFileSize();
1068
96.7k
  accumulated_raw_key_size_ += file_meta->raw_key_size;
1069
96.7k
  accumulated_raw_value_size_ += file_meta->raw_value_size;
1070
96.7k
  accumulated_num_non_deletions_ +=
1071
96.7k
      file_meta->num_entries - file_meta->num_deletions;
1072
96.7k
  accumulated_num_deletions_ += file_meta->num_deletions;
1073
1074
96.7k
  current_num_non_deletions_ +=
1075
96.7k
      file_meta->num_entries - file_meta->num_deletions;
1076
96.7k
  current_num_deletions_ += file_meta->num_deletions;
1077
96.7k
  current_num_samples_++;
1078
96.7k
}
1079
1080
65.3k
void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
1081
65.3k
  if (file_meta->init_stats_from_file) {
1082
63.7k
    current_num_non_deletions_ -=
1083
63.7k
        file_meta->num_entries - file_meta->num_deletions;
1084
63.7k
    current_num_deletions_ -= file_meta->num_deletions;
1085
63.7k
    current_num_samples_--;
1086
63.7k
  }
1087
65.3k
}
1088
1089
773k
void Version::UpdateAccumulatedStats(bool update_stats) {
1090
773k
  if (
update_stats773k
) {
1091
    // maximum number of table properties loaded from files.
1092
773k
    const int kMaxInitCount = 20;
1093
773k
    int init_count = 0;
1094
    // here only the first kMaxInitCount files which haven't been
1095
    // initialized from file will be updated with num_deletions.
1096
    // The motivation here is to cap the maximum I/O per Version creation.
1097
    // The reason for choosing files from lower-level instead of higher-level
1098
    // is that such design is able to propagate the initialization from
1099
    // lower-level to higher-level:  When the num_deletions of lower-level
1100
    // files are updated, it will make the lower-level files have accurate
1101
    // compensated_file_size, making lower-level to higher-level compaction
1102
    // will be triggered, which creates higher-level files whose num_deletions
1103
    // will be updated here.
1104
773k
    for (int level = 0;
1105
1.96M
         level < storage_info_.num_levels_ && 
init_count < kMaxInitCount1.19M
;
1106
1.19M
         ++level) {
1107
1.19M
      for (auto* file_meta : storage_info_.files_[level]) {
1108
960k
        if (MaybeInitializeFileMetaData(file_meta)) {
1109
          // each FileMeta will be initialized only once.
1110
96.7k
          storage_info_.UpdateAccumulatedStats(file_meta);
1111
96.7k
          if (++init_count >= kMaxInitCount) {
1112
269
            break;
1113
269
          }
1114
96.7k
        }
1115
960k
      }
1116
1.19M
    }
1117
    // In case all sampled-files contain only deletion entries, then we
1118
    // load the table-property of a file in higher-level to initialize
1119
    // that value.
1120
773k
    for (int level = storage_info_.num_levels_ - 1;
1121
1.56M
         storage_info_.accumulated_raw_value_size_ == 0 && 
level >= 01.48M
;
1122
787k
         --level) {
1123
787k
      for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
1124
788k
           storage_info_.accumulated_raw_value_size_ == 0 && 
i >= 0788k
;
--i1.26k
) {
1125
1.26k
        if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
1126
0
          storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
1127
0
        }
1128
1.26k
      }
1129
787k
    }
1130
773k
  }
1131
1132
773k
  storage_info_.ComputeCompensatedSizes();
1133
773k
}
1134
1135
773k
void VersionStorageInfo::ComputeCompensatedSizes() {
1136
773k
  static const int kDeletionWeightOnCompaction = 2;
1137
773k
  uint64_t average_value_size = GetAverageValueSize();
1138
1139
  // compute the compensated size
1140
1.96M
  for (int level = 0; level < num_levels_; 
level++1.19M
) {
1141
1.19M
    for (auto* file_meta : files_[level]) {
1142
      // Here we only compute compensated_file_size for those file_meta
1143
      // which compensated_file_size is uninitialized (== 0). This is true only
1144
      // for files that have been created right now and no other thread has
1145
      // access to them. That's why we can safely mutate compensated_file_size.
1146
966k
      if (file_meta->compensated_file_size == 0) {
1147
102k
        file_meta->compensated_file_size = file_meta->fd.GetTotalFileSize();
1148
        // Here we only boost the size of deletion entries of a file only
1149
        // when the number of deletion entries is greater than the number of
1150
        // non-deletion entries in the file.  The motivation here is that in
1151
        // a stable workload, the number of deletion entries should be roughly
1152
        // equal to the number of non-deletion entries.  If we compensate the
1153
        // size of deletion entries in a stable workload, the deletion
1154
        // compensation logic might introduce unwanted effet which changes the
1155
        // shape of LSM tree.
1156
102k
        if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
1157
11.3k
          file_meta->compensated_file_size +=
1158
11.3k
              (file_meta->num_deletions * 2 - file_meta->num_entries) *
1159
11.3k
              average_value_size * kDeletionWeightOnCompaction;
1160
11.3k
        }
1161
102k
      }
1162
966k
    }
1163
1.19M
  }
1164
773k
}
1165
1166
3.96M
int VersionStorageInfo::MaxInputLevel() const {
1167
3.96M
  if (compaction_style_ == kCompactionStyleLevel) {
1168
1.67M
    return num_levels() - 2;
1169
1.67M
  }
1170
2.28M
  return 0;
1171
3.96M
}
1172
1173
void VersionStorageInfo::EstimateCompactionBytesNeeded(
1174
1.23M
    const MutableCFOptions& mutable_cf_options) {
1175
  // Only implemented for level-based compaction
1176
1.23M
  if (compaction_style_ != kCompactionStyleLevel) {
1177
1.14M
    estimated_compaction_needed_bytes_ = 0;
1178
1.14M
    return;
1179
1.14M
  }
1180
1181
  // Start from Level 0, if level 0 qualifies compaction to level 1,
1182
  // we estimate the size of compaction.
1183
  // Then we move on to the next level and see whether it qualifies compaction
1184
  // to the next level. The size of the level is estimated as the actual size
1185
  // on the level plus the input bytes from the previous level if there is any.
1186
  // If it exceeds, take the exceeded bytes as compaction input and add the size
1187
  // of the compaction size to tatal size.
1188
  // We keep doing it to Level 2, 3, etc, until the last level and return the
1189
  // accumulated bytes.
1190
1191
92.0k
  uint64_t bytes_compact_to_next_level = 0;
1192
  // Level 0
1193
92.0k
  bool level0_compact_triggered = false;
1194
92.0k
  if (static_cast<int>(files_[0].size()) >
1195
92.0k
      mutable_cf_options.level0_file_num_compaction_trigger) {
1196
8.88k
    level0_compact_triggered = true;
1197
79.6k
    for (auto* f : files_[0]) {
1198
79.6k
      bytes_compact_to_next_level += f->fd.GetTotalFileSize();
1199
79.6k
    }
1200
8.88k
    estimated_compaction_needed_bytes_ = bytes_compact_to_next_level;
1201
83.1k
  } else {
1202
83.1k
    estimated_compaction_needed_bytes_ = 0;
1203
83.1k
  }
1204
1205
  // Level 1 and up.
1206
542k
  for (int level = base_level(); level <= MaxInputLevel(); 
level++450k
) {
1207
450k
    uint64_t level_size = 0;
1208
600k
    for (auto* f : files_[level]) {
1209
600k
      level_size += f->fd.GetTotalFileSize();
1210
600k
    }
1211
450k
    if (level == base_level() && 
level0_compact_triggered91.8k
) {
1212
      // Add base level size to compaction if level0 compaction triggered.
1213
8.87k
      estimated_compaction_needed_bytes_ += level_size;
1214
8.87k
    }
1215
    // Add size added by previous compaction
1216
450k
    level_size += bytes_compact_to_next_level;
1217
450k
    bytes_compact_to_next_level = 0;
1218
450k
    uint64_t level_target = MaxBytesForLevel(level);
1219
450k
    if (level_size > level_target) {
1220
42.5k
      bytes_compact_to_next_level = level_size - level_target;
1221
      // Simplify to assume the actual compaction fan-out ratio is always
1222
      // mutable_cf_options.max_bytes_for_level_multiplier.
1223
42.5k
      estimated_compaction_needed_bytes_ +=
1224
42.5k
          bytes_compact_to_next_level *
1225
42.5k
          (1 + mutable_cf_options.max_bytes_for_level_multiplier);
1226
42.5k
    }
1227
450k
  }
1228
92.0k
}
1229
1230
void VersionStorageInfo::ComputeCompactionScore(
1231
    const MutableCFOptions& mutable_cf_options,
1232
1.23M
    const CompactionOptionsFIFO& compaction_options_fifo) {
1233
2.93M
  for (int level = 0; level <= MaxInputLevel(); 
level++1.70M
) {
1234
1.70M
    double score;
1235
1.70M
    if (level == 0) {
1236
      // We treat level-0 specially by bounding the number of files
1237
      // instead of number of bytes for two reasons:
1238
      //
1239
      // (1) With larger write-buffer sizes, it is nice not to do too
1240
      // many level-0 compactions.
1241
      //
1242
      // (2) The files in level-0 are merged on every read and
1243
      // therefore we wish to avoid too many files when the individual
1244
      // file size is small (perhaps because of a small write-buffer
1245
      // setting, or very high compression ratios, or lots of
1246
      // overwrites/deletions).
1247
1.23M
      int num_sorted_runs = 0;
1248
1.23M
      uint64_t total_size = 0;
1249
1.23M
      for (auto* f : files_[level]) {
1250
268k
        if (!f->being_compacted) {
1251
190k
          total_size += f->compensated_file_size;
1252
190k
          num_sorted_runs++;
1253
190k
        }
1254
268k
      }
1255
1.23M
      if (compaction_style_ == kCompactionStyleUniversal) {
1256
        // For universal compaction, we use level0 score to indicate
1257
        // compaction score for the whole DB. Adding other levels as if
1258
        // they are L0 files.
1259
1.21M
        for (int i = 1; i < num_levels(); 
i++79.5k
) {
1260
79.5k
          if (!files_[i].empty() && 
!files_[i][0]->being_compacted17.1k
) {
1261
15.2k
            num_sorted_runs++;
1262
15.2k
          }
1263
79.5k
        }
1264
1.13M
      }
1265
1266
1.23M
      if (compaction_style_ == kCompactionStyleFIFO) {
1267
900
        score = static_cast<double>(total_size) /
1268
900
                compaction_options_fifo.max_table_files_size;
1269
1.23M
      } else {
1270
1.23M
        score = static_cast<double>(num_sorted_runs) /
1271
1.23M
                mutable_cf_options.level0_file_num_compaction_trigger;
1272
1.23M
      }
1273
1.23M
    } else {
1274
      // Compute the ratio of current size to size limit.
1275
468k
      uint64_t level_bytes_no_compacting = 0;
1276
600k
      for (auto f : files_[level]) {
1277
600k
        if (!f->being_compacted) {
1278
556k
          level_bytes_no_compacting += f->compensated_file_size;
1279
556k
        }
1280
600k
      }
1281
468k
      score = static_cast<double>(level_bytes_no_compacting) /
1282
468k
              MaxBytesForLevel(level);
1283
468k
    }
1284
1.70M
    compaction_level_[level] = level;
1285
1.70M
    compaction_score_[level] = score;
1286
1.70M
  }
1287
1288
  // sort all the levels based on their score. Higher scores get listed
1289
  // first. Use bubble sort because the number of entries are small.
1290
1.76M
  for (int i = 0; i < num_levels() - 2; 
i++534k
) {
1291
2.53M
    for (int j = i + 1; j < num_levels() - 1; 
j++2.00M
) {
1292
2.00M
      if (compaction_score_[i] < compaction_score_[j]) {
1293
172k
        double score = compaction_score_[i];
1294
172k
        int level = compaction_level_[i];
1295
172k
        compaction_score_[i] = compaction_score_[j];
1296
172k
        compaction_level_[i] = compaction_level_[j];
1297
172k
        compaction_score_[j] = score;
1298
172k
        compaction_level_[j] = level;
1299
172k
      }
1300
2.00M
    }
1301
534k
  }
1302
1.23M
  ComputeFilesMarkedForCompaction();
1303
1.23M
  EstimateCompactionBytesNeeded(mutable_cf_options);
1304
1.23M
}
1305
1306
1.23M
void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
1307
1.23M
  files_marked_for_compaction_.clear();
1308
1.23M
  int last_qualify_level = 0;
1309
1310
  // Do not include files from the last level with data
1311
  // If table properties collector suggests a file on the last level,
1312
  // we should not move it to a new level.
1313
1.61M
  for (int level = num_levels() - 1; level >= 1; 
level--376k
) {
1314
442k
    if (!files_[level].empty()) {
1315
65.6k
      last_qualify_level = level - 1;
1316
65.6k
      break;
1317
65.6k
    }
1318
442k
  }
1319
1320
2.66M
  for (int level = 0; level <= last_qualify_level; 
level++1.43M
) {
1321
1.43M
    for (auto* f : files_[level]) {
1322
696k
      if (!f->being_compacted && 
f->marked_for_compaction590k
) {
1323
38
        files_marked_for_compaction_.emplace_back(level, f);
1324
38
      }
1325
696k
    }
1326
1.43M
  }
1327
1.23M
}
1328
1329
namespace {
1330
1331
// used to sort files by size
1332
struct Fsize {
1333
  size_t index;
1334
  FileMetaData* file;
1335
};
1336
1337
// Compator that is used to sort files based on their size
1338
// In normal mode: descending size
1339
1.71M
bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
1340
1.71M
  return (first.file->compensated_file_size >
1341
1.71M
      second.file->compensated_file_size);
1342
1.71M
}
1343
} // anonymous namespace
1344
1345
967k
void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
1346
967k
  auto* level_files = &files_[level];
1347
  // Must not overlap
1348
967k
#ifndef NDEBUG
1349
967k
  if (level > 0 && 
!level_files->empty()733k
&&
1350
967k
      internal_comparator_->Compare(
1351
628k
          (*level_files)[level_files->size() - 1]->largest.key, f->smallest.key) >= 0) {
1352
0
    auto* f2 = (*level_files)[level_files->size() - 1];
1353
0
    if (info_log != nullptr) {
1354
0
      RERROR(info_log, "Adding new file %" PRIu64
1355
0
                      " range (%s, %s) to level %d but overlapping "
1356
0
                      "with existing file %" PRIu64 " %s %s",
1357
0
            f->fd.GetNumber(),
1358
0
            f->smallest.key.DebugString(true).c_str(),
1359
0
            f->largest.key.DebugString(true).c_str(),
1360
0
            level,
1361
0
            f2->fd.GetNumber(),
1362
0
            f2->smallest.key.DebugString(true).c_str(),
1363
0
            f2->largest.key.DebugString(true).c_str());
1364
0
      LogFlush(info_log);
1365
0
    }
1366
0
    assert(false);
1367
0
  }
1368
0
#endif
1369
0
  f->refs++;
1370
967k
  level_files->push_back(f);
1371
967k
}
1372
1373
// Version::PrepareApply() need to be called before calling the function, or
1374
// following functions called:
1375
// 1. UpdateNumNonEmptyLevels();
1376
// 2. CalculateBaseBytes();
1377
// 3. UpdateFilesByCompactionPri();
1378
// 4. GenerateFileIndexer();
1379
// 5. GenerateLevelFilesBrief();
1380
// 6. GenerateLevel0NonOverlapping();
1381
1.21M
void VersionStorageInfo::SetFinalized() {
1382
1.21M
  finalized_ = true;
1383
1.21M
#ifndef NDEBUG
1384
1.21M
  if (compaction_style_ != kCompactionStyleLevel) {
1385
    // Not level based compaction.
1386
1.14M
    return;
1387
1.14M
  }
1388
73.7k
  assert(base_level_ < 0 || num_levels() == 1 ||
1389
73.7k
         (base_level_ >= 1 && base_level_ < num_levels()));
1390
  // Verify all levels newer than base_level are empty except L0
1391
84.0k
  for (int level = 1; level < base_level(); 
level++10.3k
) {
1392
10.3k
    assert(NumLevelBytes(level) == 0);
1393
10.3k
  }
1394
73.7k
  uint64_t max_bytes_prev_level = 0;
1395
433k
  for (int level = base_level(); level < num_levels() - 1; 
level++360k
) {
1396
360k
    if (LevelFiles(level).size() == 0) {
1397
285k
      continue;
1398
285k
    }
1399
74.9k
    assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
1400
0
    max_bytes_prev_level = MaxBytesForLevel(level);
1401
74.9k
  }
1402
73.7k
  int num_empty_non_l0_level = 0;
1403
591k
  for (int level = 0; level < num_levels(); 
level++517k
) {
1404
517k
    assert(LevelFiles(level).size() == 0 ||
1405
517k
           LevelFiles(level).size() == LevelFilesBrief(level).num_files);
1406
517k
    if (level > 0 && 
NumLevelBytes(level) > 0444k
) {
1407
87.8k
      num_empty_non_l0_level++;
1408
87.8k
    }
1409
517k
    if (LevelFiles(level).size() > 0) {
1410
117k
      assert(level < num_non_empty_levels());
1411
117k
    }
1412
517k
  }
1413
73.7k
  assert(compaction_level_.size() > 0);
1414
0
  assert(compaction_level_.size() == compaction_score_.size());
1415
73.7k
#endif
1416
73.7k
}
1417
1418
773k
void VersionStorageInfo::UpdateNumNonEmptyLevels() {
1419
773k
  num_non_empty_levels_ = num_levels_;
1420
1.71M
  for (int i = num_levels_ - 1; i >= 0; 
i--940k
) {
1421
1.01M
    if (files_[i].size() != 0) {
1422
75.7k
      return;
1423
940k
    } else {
1424
940k
      num_non_empty_levels_ = i;
1425
940k
    }
1426
1.01M
  }
1427
773k
}
1428
1429
namespace {
1430
// Sort `temp` based on ratio of overlapping size over file size
1431
void SortFileByOverlappingRatio(
1432
    const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
1433
    const std::vector<FileMetaData*>& next_level_files,
1434
111
    std::vector<Fsize>* temp) {
1435
111
  std::unordered_map<uint64_t, uint64_t> file_to_order;
1436
111
  auto next_level_it = next_level_files.begin();
1437
1438
111
  for (auto& file : files) {
1439
64
    uint64_t overlapping_bytes = 0;
1440
    // Skip files in next level that is smaller than current file
1441
66
    while (next_level_it != next_level_files.end() &&
1442
66
           
icmp.Compare((*next_level_it)->largest.key, file->smallest.key) < 016
) {
1443
2
      next_level_it++;
1444
2
    }
1445
1446
74
    while (next_level_it != next_level_files.end() &&
1447
74
           
icmp.Compare((*next_level_it)->smallest.key, file->largest.key) < 023
) {
1448
18
      overlapping_bytes += (*next_level_it)->fd.total_file_size;
1449
1450
18
      if (icmp.Compare((*next_level_it)->largest.key, file->largest.key) > 0) {
1451
        // next level file cross large boundary of current file.
1452
8
        break;
1453
8
      }
1454
10
      next_level_it++;
1455
10
    }
1456
1457
64
    assert(file->fd.total_file_size != 0);
1458
0
    file_to_order[file->fd.GetNumber()] =
1459
64
        overlapping_bytes * 1024u / file->fd.total_file_size;
1460
64
  }
1461
1462
111
  std::sort(temp->begin(), temp->end(),
1463
111
            [&](const Fsize& f1, const Fsize& f2) -> bool {
1464
34
              return file_to_order[f1.file->fd.GetNumber()] <
1465
34
                     file_to_order[f2.file->fd.GetNumber()];
1466
34
            });
1467
111
}
1468
}  // namespace
1469
1470
void VersionStorageInfo::UpdateFilesByCompactionPri(
1471
773k
    const MutableCFOptions& mutable_cf_options) {
1472
773k
  if (compaction_style_ == kCompactionStyleFIFO ||
1473
773k
      
compaction_style_ == kCompactionStyleUniversal773k
) {
1474
    // don't need this
1475
714k
    return;
1476
714k
  }
1477
  // No need to sort the highest level because it is never compacted.
1478
410k
  
for (int level = 0; 59.3k
level < num_levels() - 1;
level++350k
) {
1479
350k
    const std::vector<FileMetaData*>& files = files_[level];
1480
350k
    auto& files_by_compaction_pri = files_by_compaction_pri_[level];
1481
350k
    assert(files_by_compaction_pri.size() == 0);
1482
1483
    // populate a temp vector for sorting based on size
1484
0
    std::vector<Fsize> temp(files.size());
1485
832k
    for (size_t i = 0; i < files.size(); 
i++481k
) {
1486
481k
      temp[i].index = i;
1487
481k
      temp[i].file = files[i];
1488
481k
    }
1489
1490
    // sort the top number_of_files_to_sort_ based on file size
1491
350k
    size_t num = VersionStorageInfo::kNumberFilesToSort;
1492
350k
    if (num > temp.size()) {
1493
350k
      num = temp.size();
1494
350k
    }
1495
350k
    switch (mutable_cf_options.compaction_pri) {
1496
350k
      case kByCompensatedSize:
1497
350k
        std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
1498
350k
                          CompareCompensatedSizeDescending);
1499
350k
        break;
1500
96
      case kOldestLargestSeqFirst:
1501
96
        std::sort(temp.begin(), temp.end(),
1502
96
                  [](const Fsize& f1, const Fsize& f2) -> bool {
1503
17
                    return f1.file->largest.seqno < f2.file->largest.seqno;
1504
17
                  });
1505
96
        break;
1506
96
      case kOldestSmallestSeqFirst:
1507
96
        std::sort(temp.begin(), temp.end(),
1508
96
                  [](const Fsize& f1, const Fsize& f2) -> bool {
1509
17
                    return f1.file->smallest.seqno < f2.file->smallest.seqno;
1510
17
                  });
1511
96
        break;
1512
111
      case kMinOverlappingRatio:
1513
111
        SortFileByOverlappingRatio(*internal_comparator_, files_[level],
1514
111
                                   files_[level + 1], &temp);
1515
111
        break;
1516
0
      default:
1517
0
        assert(false);
1518
350k
    }
1519
350k
    assert(temp.size() == files.size());
1520
1521
    // initialize files_by_compaction_pri_
1522
832k
    for (size_t i = 0; i < temp.size(); 
i++481k
) {
1523
481k
      files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
1524
481k
    }
1525
350k
    next_file_to_compact_by_size_[level] = 0;
1526
350k
    assert(files_[level].size() == files_by_compaction_pri_[level].size());
1527
350k
  }
1528
59.3k
}
1529
1530
773k
void VersionStorageInfo::GenerateLevel0NonOverlapping() {
1531
773k
  assert(!finalized_);
1532
0
  level0_non_overlapping_ = true;
1533
773k
  if (level_files_brief_.size() == 0) {
1534
696k
    return;
1535
696k
  }
1536
1537
  // A copy of L0 files sorted by smallest key
1538
76.2k
  std::vector<FdWithBoundaries> level0_sorted_file(
1539
76.2k
      level_files_brief_[0].files,
1540
76.2k
      level_files_brief_[0].files + level_files_brief_[0].num_files);
1541
76.2k
  sort(level0_sorted_file.begin(), level0_sorted_file.end(),
1542
508k
       [this](const FdWithBoundaries& f1, const FdWithBoundaries& f2)->bool {
1543
508k
    return (internal_comparator_->Compare(f1.smallest.key, f2.smallest.key) < 0);
1544
508k
  });
1545
1546
152k
  for (size_t i = 1; i < level0_sorted_file.size(); 
++i76.7k
) {
1547
102k
    auto& f = level0_sorted_file[i];
1548
102k
    auto& prev = level0_sorted_file[i - 1];
1549
102k
    if (internal_comparator_->Compare(prev.largest.key, f.smallest.key) >= 0) {
1550
25.7k
      level0_non_overlapping_ = false;
1551
25.7k
      break;
1552
25.7k
    }
1553
102k
  }
1554
76.2k
}
1555
1556
3.71M
void Version::Ref() {
1557
3.71M
  ++refs_;
1558
3.71M
}
1559
1560
3.59M
bool Version::Unref() {
1561
3.59M
  assert(refs_ >= 1);
1562
0
  --refs_;
1563
3.59M
  if (refs_ == 0) {
1564
1.57M
    delete this;
1565
1.57M
    return true;
1566
1.57M
  }
1567
2.01M
  return false;
1568
3.59M
}
1569
1570
bool VersionStorageInfo::OverlapInLevel(int level,
1571
                                        const Slice* smallest_user_key,
1572
24.7k
                                        const Slice* largest_user_key) {
1573
24.7k
  if (level >= num_non_empty_levels_) {
1574
    // empty level, no overlap
1575
0
    return false;
1576
0
  }
1577
24.7k
  return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
1578
24.7k
                               level_files_brief_[level], smallest_user_key,
1579
24.7k
                               largest_user_key);
1580
24.7k
}
1581
1582
// Store in "*inputs" all files in "level" that overlap [begin,end]
1583
// If hint_index is specified, then it points to a file in the
1584
// overlapping range.
1585
// The file_index returns a pointer to any file in an overlapping range.
1586
void VersionStorageInfo::GetOverlappingInputs(
1587
    int level, const InternalKey* begin, const InternalKey* end,
1588
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
1589
85.3k
    bool expand_range) const {
1590
85.3k
  if (level >= num_non_empty_levels_) {
1591
    // this level is empty, no overlapping inputs
1592
8.61k
    return;
1593
8.61k
  }
1594
1595
76.7k
  inputs->clear();
1596
76.7k
  Slice user_begin, user_end;
1597
76.7k
  if (begin != nullptr) {
1598
72.1k
    user_begin = begin->user_key();
1599
72.1k
  }
1600
76.7k
  if (end != nullptr) {
1601
72.0k
    user_end = end->user_key();
1602
72.0k
  }
1603
76.7k
  if (file_index) {
1604
48.8k
    *file_index = -1;
1605
48.8k
  }
1606
76.7k
  const Comparator* user_cmp = user_comparator_;
1607
76.7k
  if (begin != nullptr && 
end != nullptr72.1k
&&
level > 072.0k
) {
1608
61.6k
    GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs,
1609
61.6k
      hint_index, file_index);
1610
61.6k
    return;
1611
61.6k
  }
1612
70.2k
  
for (size_t i = 0; 15.1k
i < level_files_brief_[level].num_files; ) {
1613
55.1k
    auto* f = &(level_files_brief_[level].files[i++]);
1614
55.1k
    const Slice file_start = f->smallest.user_key();
1615
55.1k
    const Slice file_limit = f->largest.user_key();
1616
55.1k
    if (begin != nullptr && 
user_cmp->Compare(file_limit, user_begin) < 048.9k
) {
1617
      // "f" is completely before specified range; skip it
1618
45.5k
    } else if (end != nullptr && 
user_cmp->Compare(file_start, user_end) > 037.1k
) {
1619
      // "f" is completely after specified range; skip it
1620
38.8k
    } else {
1621
38.8k
      inputs->push_back(files_[level][i-1]);
1622
38.8k
      if (level == 0 && 
expand_range34.0k
) {
1623
        // Level-0 files may overlap each other.  So check if the newly
1624
        // added file has expanded the range.  If so, restart search.
1625
34.0k
        if (begin != nullptr && 
user_cmp->Compare(file_start, user_begin) < 030.3k
) {
1626
991
          user_begin = file_start;
1627
991
          inputs->clear();
1628
991
          i = 0;
1629
33.0k
        } else if (end != nullptr
1630
33.0k
            && 
user_cmp->Compare(file_limit, user_end) > 029.4k
) {
1631
1.79k
          user_end = file_limit;
1632
1.79k
          inputs->clear();
1633
1.79k
          i = 0;
1634
1.79k
        }
1635
34.0k
      } else 
if (4.77k
file_index4.77k
) {
1636
0
        *file_index = static_cast<int>(i) - 1;
1637
0
      }
1638
38.8k
    }
1639
55.1k
  }
1640
15.1k
}
1641
1642
// Store in "*inputs" all files in "level" that overlap [begin,end]
1643
// Employ binary search to find at least one file that overlaps the
1644
// specified range. From that file, iterate backwards and
1645
// forwards to find all overlapping files.
1646
void VersionStorageInfo::GetOverlappingInputsBinarySearch(
1647
    int level, const Slice& user_begin, const Slice& user_end,
1648
61.6k
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
1649
61.6k
  assert(level > 0);
1650
0
  int min = 0;
1651
61.6k
  int mid = 0;
1652
61.6k
  int max = static_cast<int>(files_[level].size()) - 1;
1653
61.6k
  bool foundOverlap = false;
1654
61.6k
  const Comparator* user_cmp = user_comparator_;
1655
1656
  // if the caller already knows the index of a file that has overlap,
1657
  // then we can skip the binary search.
1658
61.6k
  if (hint_index != -1) {
1659
9.36k
    mid = hint_index;
1660
9.36k
    foundOverlap = true;
1661
9.36k
  }
1662
1663
140k
  while (!foundOverlap && 
min <= max131k
) {
1664
99.7k
    mid = (min + max)/2;
1665
99.7k
    FdWithBoundaries* f = level_files_brief_[level].files + mid;
1666
99.7k
    const Slice file_start = f->smallest.user_key();
1667
99.7k
    const Slice file_limit = f->largest.user_key();
1668
99.7k
    if (user_cmp->Compare(file_limit, user_begin) < 0) {
1669
63.4k
      min = mid + 1;
1670
63.4k
    } else 
if (36.3k
user_cmp->Compare(user_end, file_start) < 036.3k
) {
1671
15.3k
      max = mid - 1;
1672
20.9k
    } else {
1673
20.9k
      foundOverlap = true;
1674
20.9k
      break;
1675
20.9k
    }
1676
99.7k
  }
1677
1678
  // If there were no overlapping files, return immediately.
1679
61.6k
  if (!foundOverlap) {
1680
31.3k
    return;
1681
31.3k
  }
1682
  // returns the index where an overlap is found
1683
30.3k
  if (file_index) {
1684
24.9k
    *file_index = mid;
1685
24.9k
  }
1686
30.3k
  ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid);
1687
30.3k
}
1688
1689
// Store in "*inputs" all files in "level" that overlap [begin,end]
1690
// The midIndex specifies the index of at least one file that
1691
// overlaps the specified range. From that file, iterate backward
1692
// and forward to find all overlapping files.
1693
// Use FileLevel in searching, make it faster
1694
void VersionStorageInfo::ExtendOverlappingInputs(
1695
    int level, const Slice& user_begin, const Slice& user_end,
1696
30.3k
    std::vector<FileMetaData*>* inputs, unsigned int midIndex) const {
1697
30.3k
  const Comparator* user_cmp = user_comparator_;
1698
30.3k
  const FdWithBoundaries* files = level_files_brief_[level].files;
1699
30.3k
#ifndef NDEBUG
1700
30.3k
  {
1701
    // assert that the file at midIndex overlaps with the range
1702
30.3k
    assert(midIndex < level_files_brief_[level].num_files);
1703
0
    const auto* f = &files[midIndex];
1704
30.3k
    const Slice fstart = f->smallest.user_key();
1705
30.3k
    const Slice flimit = f->largest.user_key();
1706
30.3k
    if (user_cmp->Compare(fstart, user_begin) >= 0) {
1707
24.5k
      assert(user_cmp->Compare(fstart, user_end) <= 0);
1708
24.5k
    } else {
1709
5.80k
      assert(user_cmp->Compare(flimit, user_begin) >= 0);
1710
5.80k
    }
1711
30.3k
  }
1712
0
#endif
1713
0
  int startIndex = midIndex + 1;
1714
30.3k
  int endIndex = midIndex;
1715
30.3k
  int count __attribute__((unused)) = 0;
1716
1717
  // check backwards from 'mid' to lower indices
1718
77.9k
  for (int i = midIndex; i >= 0 ; 
i--47.6k
) {
1719
59.4k
    const auto* f = &files[i];
1720
59.4k
    const Slice file_limit = f->largest.user_key();
1721
59.4k
    if (user_cmp->Compare(file_limit, user_begin) >= 0) {
1722
47.6k
      startIndex = i;
1723
47.6k
      assert((count++, true));
1724
47.6k
    } else {
1725
11.7k
      break;
1726
11.7k
    }
1727
59.4k
  }
1728
  // check forward from 'mid+1' to higher indices
1729
30.3k
  for (unsigned int i = midIndex+1;
1730
49.9k
       i < level_files_brief_[level].num_files; 
i++19.5k
) {
1731
30.7k
    const auto* f = &files[i];
1732
30.7k
    const Slice file_start = f->smallest.user_key();
1733
30.7k
    if (user_cmp->Compare(file_start, user_end) <= 0) {
1734
19.5k
      assert((count++, true));
1735
0
      endIndex = i;
1736
19.5k
    } else {
1737
11.2k
      break;
1738
11.2k
    }
1739
30.7k
  }
1740
30.3k
  assert(count == endIndex - startIndex + 1);
1741
1742
  // insert overlapping files into vector
1743
97.5k
  for (int i = startIndex; i <= endIndex; 
i++67.2k
) {
1744
67.2k
    FileMetaData* f = files_[level][i];
1745
67.2k
    inputs->push_back(f);
1746
67.2k
  }
1747
30.3k
}
1748
1749
// Returns true iff the first or last file in inputs contains
1750
// an overlapping user key to the file "just outside" of it (i.e.
1751
// just after the last file, or just before the first file)
1752
// REQUIRES: "*inputs" is a sorted list of non-overlapping files
1753
bool VersionStorageInfo::HasOverlappingUserKey(
1754
1.65k
    const std::vector<FileMetaData*>* inputs, int level) {
1755
1756
  // If inputs empty, there is no overlap.
1757
  // If level == 0, it is assumed that all needed files were already included.
1758
1.65k
  if (inputs->empty() || level == 0) {
1759
1.10k
    return false;
1760
1.10k
  }
1761
1762
554
  const Comparator* user_cmp = user_comparator_;
1763
554
  const rocksdb::LevelFilesBrief& file_level = level_files_brief_[level];
1764
554
  const FdWithBoundaries* files = level_files_brief_[level].files;
1765
554
  const size_t kNumFiles = file_level.num_files;
1766
1767
  // Check the last file in inputs against the file after it
1768
554
  size_t last_file = FindFile(*internal_comparator_, file_level,
1769
554
                              inputs->back()->largest.key.Encode());
1770
554
  assert(last_file < kNumFiles);  // File should exist!
1771
554
  if (last_file < kNumFiles-1) {                    // If not the last file
1772
478
    const Slice last_key_in_input = files[last_file].largest.user_key();
1773
478
    const Slice first_key_after = files[last_file+1].smallest.user_key();
1774
478
    if (user_cmp->Equal(last_key_in_input, first_key_after)) {
1775
      // The last user key in input overlaps with the next file's first key
1776
0
      return true;
1777
0
    }
1778
478
  }
1779
1780
  // Check the first file in inputs against the file just before it
1781
554
  size_t first_file = FindFile(*internal_comparator_, file_level,
1782
554
                               inputs->front()->smallest.key.Encode());
1783
554
  assert(first_file <= last_file);   // File should exist!
1784
554
  if (first_file > 0) {                                 // If not first file
1785
350
    const Slice& first_key_in_input = files[first_file].smallest.user_key();
1786
350
    const Slice& last_key_before = files[first_file-1].largest.user_key();
1787
350
    if (user_cmp->Equal(first_key_in_input, last_key_before)) {
1788
      // The first user key in input overlaps with the previous file's last key
1789
0
      return true;
1790
0
    }
1791
350
  }
1792
1793
554
  return false;
1794
554
}
1795
1796
454k
uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
1797
454k
  assert(level >= 0);
1798
0
  assert(level < num_levels());
1799
0
  return TotalFileSize(files_[level]);
1800
454k
}
1801
1802
const char* VersionStorageInfo::LevelSummary(
1803
42.2k
    LevelSummaryStorage* scratch) const {
1804
42.2k
  int len = 0;
1805
42.2k
  if (compaction_style_ == kCompactionStyleLevel && 
num_levels() > 118.9k
) {
1806
18.9k
    assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
1807
0
    len = snprintf(scratch->buffer, sizeof(scratch->buffer),
1808
18.9k
                   "base level %d max bytes base %" PRIu64 " ", base_level_,
1809
18.9k
                   level_max_bytes_[base_level_]);
1810
18.9k
  }
1811
0
  len +=
1812
42.2k
      snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
1813
273k
  for (int i = 0; i < num_levels(); 
i++230k
) {
1814
230k
    int sz = sizeof(scratch->buffer) - len;
1815
230k
    int ret = snprintf(scratch->buffer + len, sz, "%zd ", files_[i].size());
1816
230k
    if (ret < 0 || ret >= sz) 
break0
;
1817
230k
    len += ret;
1818
230k
  }
1819
42.2k
  if (len > 0) {
1820
    // overwrite the last space
1821
42.2k
    --len;
1822
42.2k
  }
1823
42.2k
  len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
1824
42.2k
                  "] max score %.2f", compaction_score_[0]);
1825
1826
42.2k
  if (!files_marked_for_compaction_.empty()) {
1827
7
    snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
1828
7
             " (%" ROCKSDB_PRIszt " files need compaction)",
1829
7
             files_marked_for_compaction_.size());
1830
7
  }
1831
1832
42.2k
  return scratch->buffer;
1833
42.2k
}
1834
1835
const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
1836
0
                                                 int level) const {
1837
0
  int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
1838
0
  for (const auto& f : files_[level]) {
1839
0
    int sz = sizeof(scratch->buffer) - len;
1840
0
    char sztxt[16];
1841
0
    AppendHumanBytes(f->fd.GetTotalFileSize(), sztxt, sizeof(sztxt));
1842
0
    int ret = snprintf(scratch->buffer + len, sz,
1843
0
                       "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
1844
0
                       f->fd.GetNumber(), f->smallest.seqno, sztxt,
1845
0
                       static_cast<int>(f->being_compacted));
1846
0
    if (ret < 0 || ret >= sz)
1847
0
      break;
1848
0
    len += ret;
1849
0
  }
1850
  // overwrite the last space (only if files_[level].size() is non-zero)
1851
0
  if (files_[level].size() && len > 0) {
1852
0
    --len;
1853
0
  }
1854
0
  snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
1855
0
  return scratch->buffer;
1856
0
}
1857
1858
15
int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
1859
15
  uint64_t result = 0;
1860
15
  std::vector<FileMetaData*> overlaps;
1861
81
  for (int level = 1; level < num_levels() - 1; 
level++66
) {
1862
214
    for (const auto& f : files_[level]) {
1863
214
      GetOverlappingInputs(level + 1, &f->smallest.key, &f->largest.key, &overlaps);
1864
214
      const uint64_t sum = TotalFileSize(overlaps);
1865
214
      if (sum > result) {
1866
6
        result = sum;
1867
6
      }
1868
214
    }
1869
66
  }
1870
15
  return result;
1871
15
}
1872
1873
1.06M
uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
1874
  // Note: the result for level zero is not really used since we set
1875
  // the level-0 compaction threshold based on number of files.
1876
1.06M
  assert(level >= 0);
1877
0
  assert(level < static_cast<int>(level_max_bytes_.size()));
1878
0
  return level_max_bytes_[level];
1879
1.06M
}
1880
1881
void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
1882
1.21M
                                            const MutableCFOptions& options) {
1883
  // Special logic to set number of sorted runs.
1884
  // It is to match the previous behavior when all files are in L0.
1885
1.21M
  int num_l0_count = 0;
1886
1.21M
  if (options.MaxFileSizeForCompaction() == std::numeric_limits<uint64_t>::max()) {
1887
1.21M
    num_l0_count = static_cast<int>(files_[0].size());
1888
1.21M
  } else {
1889
556
    for (const auto& file : files_[0]) {
1890
281
      if (file->fd.GetTotalFileSize() <= options.MaxFileSizeForCompaction()) {
1891
225
        ++num_l0_count;
1892
225
      }
1893
281
    }
1894
556
  }
1895
1.21M
  if (compaction_style_ == kCompactionStyleUniversal) {
1896
    // For universal compaction, we use level0 score to indicate
1897
    // compaction score for the whole DB. Adding other levels as if
1898
    // they are L0 files.
1899
1.21M
    for (int i = 1; i < num_levels(); 
i++79.1k
) {
1900
79.1k
      if (!files_[i].empty()) {
1901
17.1k
        num_l0_count++;
1902
17.1k
      }
1903
79.1k
    }
1904
1.13M
  }
1905
1.21M
  set_l0_delay_trigger_count(num_l0_count);
1906
1907
1.21M
  level_max_bytes_.resize(ioptions.num_levels);
1908
1.21M
  if (!ioptions.level_compaction_dynamic_level_bytes) {
1909
1.21M
    base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 
169.3k
:
-11.14M
;
1910
1911
    // Calculate for static bytes base case
1912
2.91M
    for (int i = 0; i < ioptions.num_levels; 
++i1.70M
) {
1913
1.70M
      if (i == 0 && 
ioptions.compaction_style == kCompactionStyleUniversal1.21M
) {
1914
1.13M
        level_max_bytes_[i] = options.max_bytes_for_level_base;
1915
1.13M
      } else 
if (567k
i > 1567k
) {
1916
413k
        level_max_bytes_[i] = MultiplyCheckOverflow(
1917
413k
            MultiplyCheckOverflow(level_max_bytes_[i - 1],
1918
413k
                                  options.max_bytes_for_level_multiplier),
1919
413k
            options.MaxBytesMultiplerAdditional(i - 1));
1920
413k
      } else {
1921
154k
        level_max_bytes_[i] = options.max_bytes_for_level_base;
1922
154k
      }
1923
1.70M
    }
1924
1.21M
  } else {
1925
4.11k
    uint64_t max_level_size = 0;
1926
1927
4.11k
    int first_non_empty_level = -1;
1928
    // Find size of non-L0 level of most data.
1929
    // Cannot use the size of the last level because it can be empty or less
1930
    // than previous levels after compaction.
1931
33.3k
    for (int i = 1; i < num_levels_; 
i++29.2k
) {
1932
29.2k
      uint64_t total_size = 0;
1933
240k
      for (const auto& f : files_[i]) {
1934
240k
        total_size += f->fd.GetTotalFileSize();
1935
240k
      }
1936
29.2k
      if (total_size > 0 && 
first_non_empty_level == -116.4k
) {
1937
4.54k
        first_non_empty_level = i;
1938
4.54k
      }
1939
29.2k
      if (total_size > max_level_size) {
1940
16.0k
        max_level_size = total_size;
1941
16.0k
      }
1942
29.2k
    }
1943
1944
    // Prefill every level's max bytes to disallow compaction from there.
1945
38.0k
    for (int i = 0; i < num_levels_; 
i++33.8k
) {
1946
33.8k
      level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
1947
33.8k
    }
1948
1949
4.11k
    if (max_level_size == 0) {
1950
      // No data for L1 and up. L0 compacts to last level directly.
1951
      // No compaction from L1+ needs to be scheduled.
1952
94
      base_level_ = num_levels_ - 1;
1953
4.02k
    } else {
1954
4.02k
      uint64_t base_bytes_max = options.max_bytes_for_level_base;
1955
4.02k
      uint64_t base_bytes_min =
1956
4.02k
          base_bytes_max / options.max_bytes_for_level_multiplier;
1957
1958
      // Try whether we can make last level's target size to be max_level_size
1959
4.02k
      uint64_t cur_level_size = max_level_size;
1960
16.3k
      for (int i = num_levels_ - 2; i >= first_non_empty_level; 
i--12.2k
) {
1961
        // Round up after dividing
1962
12.2k
        cur_level_size /= options.max_bytes_for_level_multiplier;
1963
12.2k
      }
1964
1965
      // Calculate base level and its size.
1966
4.02k
      uint64_t base_level_size;
1967
4.02k
      if (cur_level_size <= base_bytes_min) {
1968
        // Case 1. If we make target size of last level to be max_level_size,
1969
        // target size of the first non-empty level would be smaller than
1970
        // base_bytes_min. We set it be base_bytes_min.
1971
170
        base_level_size = base_bytes_min + 1U;
1972
170
        base_level_ = first_non_empty_level;
1973
170
        RWARN(ioptions.info_log,
1974
170
            "More existing levels in DB than needed. "
1975
170
                "max_bytes_for_level_multiplier may not be guaranteed.");
1976
3.85k
      } else {
1977
        // Find base level (where L0 data is compacted to).
1978
3.85k
        base_level_ = first_non_empty_level;
1979
5.80k
        while (base_level_ > 1 && 
cur_level_size > base_bytes_max5.03k
) {
1980
1.95k
          --base_level_;
1981
1.95k
          cur_level_size =
1982
1.95k
              cur_level_size / options.max_bytes_for_level_multiplier;
1983
1.95k
        }
1984
3.85k
        if (cur_level_size > base_bytes_max) {
1985
          // Even L1 will be too large
1986
1
          assert(base_level_ == 1);
1987
0
          base_level_size = base_bytes_max;
1988
3.84k
        } else {
1989
3.84k
          base_level_size = cur_level_size;
1990
3.84k
        }
1991
3.85k
      }
1992
1993
0
      uint64_t level_size = base_level_size;
1994
22.8k
      for (int i = base_level_; i < num_levels_; 
i++18.7k
) {
1995
18.7k
        if (i > base_level_) {
1996
14.2k
          level_size = MultiplyCheckOverflow(
1997
14.2k
              level_size, options.max_bytes_for_level_multiplier);
1998
14.2k
        }
1999
18.7k
        level_max_bytes_[i] = level_size;
2000
18.7k
      }
2001
4.02k
    }
2002
4.11k
  }
2003
1.21M
}
2004
2005
3
uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
2006
  // Estimate the live data size by adding up the size of the last level for all
2007
  // key ranges. Note: Estimate depends on the ordering of files in level 0
2008
  // because files in level 0 can be overlapping.
2009
3
  uint64_t size = 0;
2010
2011
21
  auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
2012
21
    return internal_comparator_->Compare(*x, *y) < 0;
2013
21
  };
2014
  // (Ordered) map of largest keys in non-overlapping files
2015
3
  std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
2016
2017
22
  for (int l = num_levels_ - 1; l >= 0; 
l--19
) {
2018
19
    bool found_end = false;
2019
19
    for (auto file : files_[l]) {
2020
      // Find the first file where the largest key is larger than the smallest
2021
      // key of the current file. If this file does not overlap with the
2022
      // current file, none of the files in the map does. If there is
2023
      // no potential overlap, we can safely insert the rest of this level
2024
      // (if the level is not 0) into the map without checking again because
2025
      // the elements in the level are sorted and non-overlapping.
2026
14
      auto lb = (found_end && 
l != 01
) ?
2027
14
        
ranges.end()0
: ranges.lower_bound(&file->smallest.key);
2028
14
      found_end = (lb == ranges.end());
2029
14
      if (found_end || internal_comparator_->Compare(
2030
11
            file->largest.key, (*lb).second->smallest.key) < 0) {
2031
5
          ranges.emplace_hint(lb, &file->largest.key, file);
2032
5
          size += file->fd.total_file_size;
2033
5
      }
2034
14
    }
2035
19
  }
2036
3
  return size;
2037
3
}
2038
2039
2040
2.16M
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
2041
7.28M
  for (int level = 0; level < storage_info_.num_levels(); 
level++5.12M
) {
2042
5.12M
    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2043
5.12M
    for (const auto& file : files) {
2044
2.52M
      live->push_back(file->fd);
2045
2.52M
    }
2046
5.12M
  }
2047
2.16M
}
2048
2049
78
std::string Version::DebugString(bool hex) const {
2050
78
  std::string r;
2051
546
  for (int level = 0; level < storage_info_.num_levels_; 
level++468
) {
2052
    // E.g.,
2053
    //   --- level 1 ---
2054
    //   17:123['a' .. 'd']
2055
    //   20:43['e' .. 'g']
2056
468
    r.append("--- level ");
2057
468
    AppendNumberTo(&r, level);
2058
468
    r.append(" --- version# ");
2059
468
    AppendNumberTo(&r, version_number_);
2060
468
    r.append(" ---\n");
2061
468
    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2062
668
    for (size_t i = 0; i < files.size(); 
i++200
) {
2063
200
      r.append(files[i]->ToString());
2064
200
    }
2065
468
  }
2066
78
  return r;
2067
78
}
2068
2069
144
Result<std::string> Version::GetMiddleKey() {
2070
  // Largest files are at lowest level.
2071
144
  const auto level = storage_info_.num_levels_ - 1;
2072
144
  const FileMetaData* largest_sst_meta = nullptr;
2073
474
  for (const auto* file : storage_info_.files_[level]) {
2074
474
    if (!largest_sst_meta ||
2075
474
        
file->fd.GetTotalFileSize() > largest_sst_meta->fd.GetTotalFileSize()331
) {
2076
351
      largest_sst_meta = file;
2077
351
    }
2078
474
  }
2079
144
  if (!largest_sst_meta) {
2080
1
    return STATUS(Incomplete, "No SST files.");
2081
1
  }
2082
2083
143
  const auto trwh = VERIFY_RESULT(table_cache_->GetTableReader(
2084
143
      vset_->env_options_, cfd_->internal_comparator(), largest_sst_meta->fd, kDefaultQueryId,
2085
143
      /* no_io =*/ false, cfd_->internal_stats()->GetFileReadHist(level),
2086
143
      IsFilterSkipped(level, /* is_file_last_in_level =*/ true)));
2087
0
  return trwh.table_reader->GetMiddleKey();
2088
143
}
2089
2090
// this is used to batch writes to the manifest file
2091
struct VersionSet::ManifestWriter {
2092
  Status status;
2093
  bool done;
2094
  InstrumentedCondVar cv;
2095
  ColumnFamilyData* cfd;
2096
  VersionEdit* edit;
2097
2098
  explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
2099
                          VersionEdit* e)
2100
335k
      : done(false), cv(mu), cfd(_cfd), edit(e) {}
2101
};
2102
2103
constexpr uint64_t VersionSet::kInitialNextFileNumber;
2104
2105
VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options,
2106
                       const EnvOptions& storage_options, Cache* table_cache,
2107
                       WriteBuffer* write_buffer,
2108
                       WriteController* write_controller)
2109
    : column_family_set_(new ColumnFamilySet(
2110
          dbname, db_options, storage_options, table_cache,
2111
          write_buffer, write_controller)),
2112
      env_(db_options->env),
2113
      dbname_(dbname),
2114
      db_options_(db_options),
2115
      env_options_(storage_options),
2116
437k
      env_options_compactions_(env_options_) {}
2117
2118
397k
VersionSet::~VersionSet() {
2119
  // we need to delete column_family_set_ because its destructor depends on
2120
  // VersionSet
2121
397k
  column_family_set_.reset();
2122
397k
  for (auto file : obsolete_files_) {
2123
36.4k
    delete file;
2124
36.4k
  }
2125
397k
  obsolete_files_.clear();
2126
397k
}
2127
2128
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
2129
1.21M
                               Version* v) {
2130
  // compute new compaction score
2131
1.21M
  v->storage_info()->ComputeCompactionScore(
2132
1.21M
      *column_family_data->GetLatestMutableCFOptions(),
2133
1.21M
      column_family_data->ioptions()->compaction_options_fifo);
2134
2135
  // Mark v finalized
2136
1.21M
  v->storage_info_.SetFinalized();
2137
2138
  // Make "v" current
2139
1.21M
  assert(v->refs_ == 0);
2140
0
  Version* current = column_family_data->current();
2141
1.21M
  assert(v != current);
2142
1.21M
  if (current != nullptr) {
2143
773k
    assert(current->refs_ > 0);
2144
0
    current->Unref();
2145
773k
  }
2146
0
  column_family_data->SetCurrent(v);
2147
1.21M
  v->Ref();
2148
2149
  // Append to linked list
2150
1.21M
  v->prev_ = column_family_data->dummy_versions()->prev_;
2151
1.21M
  v->next_ = column_family_data->dummy_versions();
2152
1.21M
  v->prev_->next_ = v;
2153
1.21M
  v->next_->prev_ = v;
2154
1.21M
}
2155
2156
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
2157
                               const MutableCFOptions& mutable_cf_options,
2158
                               VersionEdit* edit, InstrumentedMutex* mu,
2159
                               Directory* db_directory, bool new_descriptor_log,
2160
335k
                               const ColumnFamilyOptions* new_cf_options) {
2161
335k
  mu->AssertHeld();
2162
2163
  // column_family_data can be nullptr only if this is column_family_add.
2164
  // in that case, we also need to specify ColumnFamilyOptions
2165
335k
  if (column_family_data == nullptr) {
2166
1.71k
    assert(edit->column_family_name_);
2167
0
    assert(new_cf_options != nullptr);
2168
1.71k
  }
2169
2170
  // queue our request
2171
0
  ManifestWriter w(mu, column_family_data, edit);
2172
335k
  manifest_writers_.push_back(&w);
2173
336k
  while (!w.done && 
&w != manifest_writers_.front()336k
) {
2174
1.31k
    w.cv.Wait();
2175
1.31k
  }
2176
335k
  if (w.done) {
2177
22
    return w.status;
2178
22
  }
2179
335k
  if (column_family_data != nullptr && 
column_family_data->IsDropped()333k
) {
2180
    // if column family is dropped by the time we get here, no need to write
2181
    // anything to the manifest
2182
1
    manifest_writers_.pop_front();
2183
    // Notify new head of write queue
2184
1
    if (!manifest_writers_.empty()) {
2185
0
      manifest_writers_.front()->cv.Signal();
2186
0
    }
2187
    // we steal this code to also inform about cf-drop
2188
1
    return STATUS(ShutdownInProgress, "");
2189
1
  }
2190
2191
335k
  std::vector<VersionEdit*> batch_edits;
2192
335k
  Version* v = nullptr;
2193
335k
  std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr);
2194
2195
  // Process all requests in the queue.
2196
335k
  ManifestWriter* last_writer = &w;
2197
335k
  assert(!manifest_writers_.empty());
2198
0
  assert(manifest_writers_.front() == &w);
2199
2200
0
  UserFrontierPtr flushed_frontier_override;
2201
335k
  if (edit->IsColumnFamilyManipulation()) {
2202
    // No group commits for column family add or drop.
2203
1.74k
    LogAndApplyCFHelper(edit);
2204
1.74k
    batch_edits.push_back(edit);
2205
333k
  } else {
2206
333k
    v = new Version(column_family_data, this, current_version_number_++);
2207
333k
    builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data));
2208
333k
    auto* builder = builder_guard->version_builder();
2209
333k
    for (const auto& writer : manifest_writers_) {
2210
333k
      if (writer->edit->IsColumnFamilyManipulation() ||
2211
333k
          
writer->cfd->GetID() != column_family_data->GetID()333k
) {
2212
        // No group commits for column family add or drop.
2213
        // Also, group commits across column families are not supported.
2214
1
        break;
2215
1
      }
2216
333k
      FrontierModificationMode frontier_mode = FrontierModificationMode::kUpdate;
2217
333k
      const bool force_flushed_frontier = writer->edit->force_flushed_frontier_;
2218
333k
      if (force_flushed_frontier) {
2219
1.80k
        if (writer != &w) {
2220
          // No group commit for edits that force a particular value of flushed frontier, either.
2221
          // (Also see the logic at the end of the for loop body.)
2222
0
          break;
2223
0
        }
2224
1.80k
        new_descriptor_log = true;
2225
1.80k
        flushed_frontier_override = edit->flushed_frontier_;
2226
1.80k
      }
2227
333k
      last_writer = writer;
2228
333k
      LogAndApplyHelper(column_family_data, builder, last_writer->edit, mu);
2229
333k
      batch_edits.push_back(last_writer->edit);
2230
2231
333k
      if (force_flushed_frontier) {
2232
        // This is also needed to disable group commit for flushed-frontier-forcing edits.
2233
1.80k
        break;
2234
1.80k
      }
2235
333k
    }
2236
2237
333k
    builder->SaveTo(v->storage_info());
2238
333k
  }
2239
2240
  // Initialize new descriptor log file if necessary by creating
2241
  // a temporary file that contains a snapshot of the current version.
2242
335k
  uint64_t new_manifest_file_size = 0;
2243
335k
  Status s;
2244
2245
335k
  assert(pending_manifest_file_number_ == 0);
2246
335k
  if (!descriptor_log_ ||
2247
335k
      
manifest_file_size_ > db_options_->max_manifest_file_size60.4k
) {
2248
275k
    pending_manifest_file_number_ = NewFileNumber();
2249
275k
    batch_edits.back()->SetNextFile(next_file_number_.load());
2250
275k
    new_descriptor_log = true;
2251
275k
  } else {
2252
59.9k
    pending_manifest_file_number_ = manifest_file_number_;
2253
59.9k
  }
2254
2255
335k
  if (new_descriptor_log) {
2256
    // If we're writing out new snapshot make sure to persist max column family.
2257
277k
    if (column_family_set_->GetMaxColumnFamily() > 0) {
2258
4.23k
      edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
2259
4.23k
    }
2260
277k
  }
2261
2262
  // Unlock during expensive operations. New writes cannot get here
2263
  // because &w is ensuring that all new writes get queued.
2264
335k
  {
2265
2266
335k
    mu->Unlock();
2267
2268
335k
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
2269
335k
    if (!edit->IsColumnFamilyManipulation() &&
2270
335k
        
db_options_->max_open_files == -1333k
) {
2271
      // unlimited table cache. Pre-load table handle now.
2272
      // Need to do it out of the mutex.
2273
897
      builder_guard->version_builder()->LoadTableHandlers(
2274
897
          column_family_data->internal_stats(),
2275
897
          column_family_data->ioptions()->optimize_filters_for_hits);
2276
897
    }
2277
2278
    // This is fine because everything inside of this block is serialized --
2279
    // only one thread can be here at the same time.
2280
335k
    if (new_descriptor_log) {
2281
      // Create a new manifest file.
2282
277k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
2283
277k
          "Creating manifest %" PRIu64 "\n", pending_manifest_file_number_);
2284
277k
      unique_ptr<WritableFile> descriptor_file;
2285
277k
      EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
2286
277k
      descriptor_log_file_name_ = DescriptorFileName(dbname_, pending_manifest_file_number_);
2287
277k
      s = NewWritableFile(
2288
277k
          env_, descriptor_log_file_name_,
2289
277k
          &descriptor_file, opt_env_opts);
2290
277k
      if (s.ok()) {
2291
277k
        descriptor_file->SetPreallocationBlockSize(
2292
277k
            db_options_->manifest_preallocation_size);
2293
2294
277k
        unique_ptr<WritableFileWriter> file_writer(
2295
277k
            new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
2296
277k
        descriptor_log_.reset(new log::Writer(
2297
277k
            std::move(file_writer), /* log_number */ 0, /* recycle_log_files */ false));
2298
        // This will write a snapshot containing metadata for all files in this DB. If we are
2299
        // forcing a particular value of the flushed frontier, we need to set it in this snapshot
2300
        // version edit as well.
2301
277k
        s = WriteSnapshot(descriptor_log_.get(), flushed_frontier_override);
2302
277k
      } else {
2303
24
        descriptor_log_file_name_ = "";
2304
24
      }
2305
277k
    }
2306
2307
335k
    if (!edit->IsColumnFamilyManipulation()) {
2308
      // This is cpu-heavy operations, which should be called outside mutex.
2309
333k
      v->PrepareApply(mutable_cf_options, true);
2310
333k
    }
2311
2312
    // Write new records to MANIFEST log.
2313
335k
    if (s.ok()) {
2314
335k
      for (auto& e : batch_edits) {
2315
335k
        std::string record;
2316
335k
        if (!e->AppendEncodedTo(&record)) {
2317
0
          s = STATUS(Corruption,
2318
0
              "Unable to Encode VersionEdit:" + e->DebugString(true));
2319
0
          break;
2320
0
        }
2321
335k
        TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
2322
335k
                         rocksdb_kill_odds * REDUCE_ODDS2);
2323
335k
        s = descriptor_log_->AddRecord(record);
2324
335k
        if (!s.ok()) {
2325
2
          break;
2326
2
        }
2327
335k
      }
2328
335k
      if (s.ok()) {
2329
335k
        s = SyncManifest(env_, db_options_, descriptor_log_->file());
2330
335k
      }
2331
335k
      if (!s.ok()) {
2332
3
        RLOG(InfoLogLevel::ERROR_LEVEL, db_options_->info_log,
2333
3
            "MANIFEST write: %s\n", s.ToString().c_str());
2334
3
      }
2335
335k
    }
2336
2337
335k
    std::string obsolete_manifest;
2338
    // If we just created a new descriptor file, install it by writing a
2339
    // new CURRENT file that points to it.
2340
335k
    if (s.ok() && 
new_descriptor_log335k
) {
2341
277k
      s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, db_directory,
2342
277k
        db_options_->disableDataSync);
2343
      // Leave the old file behind since PurgeObsoleteFiles will take care of it
2344
      // later. It's unsafe to delete now since file deletion may be disabled.
2345
277k
      obsolete_manifest = DescriptorFileName("", manifest_file_number_);
2346
277k
    }
2347
2348
335k
    if (
s.ok()335k
) {
2349
      // find offset in manifest file where this version is stored.
2350
335k
      s = db_options_->get_checkpoint_env()->GetFileSize(
2351
335k
          descriptor_log_file_name_, &new_manifest_file_size);
2352
335k
    }
2353
2354
335k
    if (edit->is_column_family_drop_) {
2355
26
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
2356
26
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
2357
26
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
2358
26
    }
2359
2360
335k
    LogFlush(db_options_->info_log);
2361
335k
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
2362
335k
    mu->Lock();
2363
2364
335k
    if (!obsolete_manifest.empty()) {
2365
277k
      obsolete_manifests_.push_back(std::move(obsolete_manifest));
2366
277k
    }
2367
335k
  }
2368
2369
  // Install the new version
2370
335k
  if (s.ok()) {
2371
335k
    if (edit->column_family_name_) {
2372
      // no group commit on column family add
2373
1.71k
      assert(batch_edits.size() == 1);
2374
0
      assert(new_cf_options != nullptr);
2375
0
      CreateColumnFamily(*new_cf_options, edit);
2376
333k
    } else if (edit->is_column_family_drop_) {
2377
26
      assert(batch_edits.size() == 1);
2378
0
      column_family_data->SetDropped();
2379
26
      if (column_family_data->Unref()) {
2380
0
        delete column_family_data;
2381
0
      }
2382
333k
    } else {
2383
333k
      uint64_t max_log_number_in_batch  = 0;
2384
333k
      for (auto& e : batch_edits) {
2385
333k
        if (e->log_number_) {
2386
38.3k
          max_log_number_in_batch =
2387
38.3k
              std::max(max_log_number_in_batch, *e->log_number_);
2388
38.3k
        }
2389
333k
      }
2390
333k
      if (max_log_number_in_batch != 0) {
2391
38.3k
        assert(column_family_data->GetLogNumber() <= max_log_number_in_batch);
2392
0
        column_family_data->SetLogNumber(max_log_number_in_batch);
2393
38.3k
      }
2394
0
      AppendVersion(column_family_data, v);
2395
333k
    }
2396
2397
0
    manifest_file_number_ = pending_manifest_file_number_;
2398
335k
    manifest_file_size_ = new_manifest_file_size;
2399
335k
    prev_log_number_ = edit->prev_log_number_.get_value_or(0);
2400
335k
    if (flushed_frontier_override) {
2401
1.80k
      flushed_frontier_ = flushed_frontier_override;
2402
333k
    } else if (edit->flushed_frontier_) {
2403
271k
      UpdateFlushedFrontier(edit->flushed_frontier_);
2404
271k
    }
2405
335k
  } else {
2406
7
    RLOG(InfoLogLevel::ERROR_LEVEL, db_options_->info_log,
2407
7
        "Error in committing version %" PRIu64 " to [%s]",
2408
7
        v->GetVersionNumber(),
2409
7
        column_family_data ? column_family_data->GetName().c_str()
2410
7
                           : "<null>");
2411
7
    delete v;
2412
7
    if (new_descriptor_log) {
2413
2
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
2414
2
        "Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n",
2415
2
        manifest_file_number_, pending_manifest_file_number_);
2416
2
      descriptor_log_.reset();
2417
2
      env_->CleanupFile(
2418
2
          DescriptorFileName(dbname_, pending_manifest_file_number_));
2419
2
    }
2420
7
  }
2421
0
  pending_manifest_file_number_ = 0;
2422
2423
  // wake up all the waiting writers
2424
335k
  while (true) {
2425
335k
    ManifestWriter* ready = manifest_writers_.front();
2426
335k
    manifest_writers_.pop_front();
2427
335k
    if (ready != &w) {
2428
22
      ready->status = s;
2429
22
      ready->done = true;
2430
22
      ready->cv.Signal();
2431
22
    }
2432
335k
    if (ready == last_writer) 
break335k
;
2433
335k
  }
2434
  // Notify new head of write queue
2435
335k
  if (!manifest_writers_.empty()) {
2436
1.28k
    manifest_writers_.front()->cv.Signal();
2437
1.28k
  }
2438
335k
  return s;
2439
335k
}
2440
2441
1.74k
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
2442
1.74k
  assert(edit->IsColumnFamilyManipulation());
2443
0
  edit->SetNextFile(next_file_number_.load());
2444
1.74k
  edit->SetLastSequence(LastSequence());
2445
1.74k
  if (edit->is_column_family_drop_) {
2446
    // if we drop column family, we have to make sure to save max column family,
2447
    // so that we don't reuse existing ID
2448
26
    edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
2449
26
  }
2450
1.74k
}
2451
2452
void VersionSet::LogAndApplyHelper(
2453
    ColumnFamilyData* cfd,
2454
    VersionBuilder* builder,
2455
    VersionEdit* edit,
2456
333k
    InstrumentedMutex* mu) {
2457
333k
  mu->AssertHeld();
2458
333k
  assert(!edit->IsColumnFamilyManipulation());
2459
2460
333k
  if (edit->log_number_) {
2461
38.3k
    assert(edit->log_number_ >= cfd->GetLogNumber());
2462
0
    assert(edit->log_number_ < next_file_number_.load());
2463
38.3k
  }
2464
2465
333k
  if (!edit->prev_log_number_) {
2466
305k
    edit->SetPrevLogNumber(prev_log_number_);
2467
305k
  }
2468
333k
  edit->SetNextFile(next_file_number_.load());
2469
333k
  edit->SetLastSequence(LastSequence());
2470
2471
333k
  if (flushed_frontier_ && 
!edit->force_flushed_frontier_11.3k
) {
2472
9.59k
    edit->UpdateFlushedFrontier(flushed_frontier_);
2473
9.59k
  }
2474
2475
333k
  builder->Apply(edit);
2476
333k
}
2477
2478
namespace {
2479
2480
struct LogReporter : public log::Reader::Reporter {
2481
  Status* status;
2482
0
  virtual void Corruption(size_t bytes, const Status& s) override {
2483
0
    if (this->status->ok()) *this->status = s;
2484
0
  }
2485
};
2486
2487
class ManifestReader {
2488
 public:
2489
  ManifestReader(Env* env, Env* checkpoint_env, const EnvOptions& env_options,
2490
                 BoundaryValuesExtractor* extractor, const std::string& dbname)
2491
      : env_(env), checkpoint_env_(checkpoint_env), env_options_(env_options),
2492
436k
        extractor_(extractor), dbname_(dbname) {}
2493
2494
437k
  Status OpenManifest() {
2495
437k
    auto status = ReadManifestFilename();
2496
437k
    if (!status.ok()) {
2497
2
      return status;
2498
2
    }
2499
437k
    FileType type;
2500
437k
    bool parse_ok = ParseFileName(manifest_filename_, &manifest_file_number_, &type);
2501
437k
    if (
!parse_ok437k
|| type != kDescriptorFile) {
2502
0
      return STATUS(Corruption, "CURRENT file corrupted");
2503
0
    }
2504
2505
437k
    manifest_filename_ = dbname_ + "/" + manifest_filename_;
2506
437k
    std::unique_ptr<SequentialFileReader> manifest_file_reader;
2507
437k
    {
2508
437k
      std::unique_ptr<SequentialFile> manifest_file;
2509
437k
      status = env_->NewSequentialFile(manifest_filename_, &manifest_file, env_options_);
2510
437k
      if (!status.ok()) {
2511
0
        return status;
2512
0
      }
2513
437k
      manifest_file_reader.reset(new SequentialFileReader(std::move(manifest_file)));
2514
437k
    }
2515
0
    status = checkpoint_env_->GetFileSize(manifest_filename_, &current_manifest_file_size_);
2516
437k
    if (!status.ok()) {
2517
0
      return status;
2518
0
    }
2519
2520
437k
    reader_.emplace(nullptr, std::move(manifest_file_reader), &reporter_, true /*checksum*/,
2521
437k
                    0 /*initial_offset*/, 0);
2522
437k
    reporter_.status = &status_;
2523
2524
437k
    return Status::OK();
2525
437k
  }
2526
2527
914k
  CHECKED_STATUS Next() {
2528
914k
    Slice record;
2529
914k
    if (!reader_->ReadRecord(&record, &scratch_)) {
2530
436k
      return STATUS(EndOfFile, "");
2531
436k
    }
2532
477k
    if (!status_.ok()) {
2533
0
      return status_;
2534
0
    }
2535
477k
    return edit_.DecodeFrom(extractor_, record);
2536
477k
  }
2537
2538
477k
  VersionEdit& operator*() {
2539
477k
    return edit_;
2540
477k
  }
2541
2542
437k
  uint64_t manifest_file_number() const { return manifest_file_number_; }
2543
437k
  uint64_t current_manifest_file_size() const { return current_manifest_file_size_; }
2544
437k
  const std::string& manifest_filename() const { return manifest_filename_; }
2545
 private:
2546
437k
  CHECKED_STATUS ReadManifestFilename() {
2547
    // Read "CURRENT" file, which contains a pointer to the current manifest file
2548
437k
    Status s = ReadFileToString(env_, CurrentFileName(dbname_), &manifest_filename_);
2549
437k
    if (!s.ok()) {
2550
2
      return s;
2551
2
    }
2552
437k
    if (manifest_filename_.empty() || 
manifest_filename_.back() != '\n'436k
) {
2553
0
      return STATUS(Corruption, "CURRENT file does not end with newline");
2554
0
    }
2555
    // remove the trailing '\n'
2556
437k
    manifest_filename_.resize(manifest_filename_.size() - 1);
2557
437k
    return Status::OK();
2558
437k
  }
2559
2560
  // In plaintext cluster, this is a default env, but in encrypted cluster, this encrypts on write
2561
  // and decrypts on read.
2562
  Env* const env_;
2563
  // Default env used to checkpoint files. In encrypted cluster, we don't want to decrypt
2564
  // checkpointed files, so using the default env preserves file encryption.
2565
  Env* const checkpoint_env_;
2566
  const EnvOptions& env_options_;
2567
  BoundaryValuesExtractor* extractor_;
2568
  std::string dbname_;
2569
  std::string manifest_filename_;
2570
  uint64_t manifest_file_number_ = 0;
2571
  uint64_t current_manifest_file_size_ = 0;
2572
  LogReporter reporter_;
2573
  boost::optional<log::Reader> reader_;
2574
  std::string scratch_;
2575
  Status status_;
2576
  VersionEdit edit_;
2577
};
2578
2579
} // namespace
2580
2581
Status VersionSet::Recover(
2582
    const std::vector<ColumnFamilyDescriptor>& column_families,
2583
437k
    bool read_only) {
2584
437k
  std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
2585
440k
  for (auto cf : column_families) {
2586
440k
    cf_name_to_options.insert({cf.name, cf.options});
2587
440k
  }
2588
  // keeps track of column families in manifest that were not found in
2589
  // column families parameters. if those column families are not dropped
2590
  // by subsequent manifest records, Recover() will return failure status
2591
437k
  std::unordered_map<int, std::string> column_families_not_found;
2592
2593
437k
  bool have_log_number = false;
2594
437k
  bool have_prev_log_number = false;
2595
437k
  bool have_next_file = false;
2596
437k
  bool have_last_sequence = false;
2597
437k
  UserFrontierPtr flushed_frontier;
2598
437k
  uint64_t next_file = 0;
2599
437k
  uint64_t last_sequence = 0;
2600
437k
  uint64_t log_number = 0;
2601
437k
  uint64_t previous_log_number = 0;
2602
437k
  uint32_t max_column_family = 0;
2603
437k
  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> builders;
2604
2605
  // add default column family
2606
437k
  auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
2607
437k
  if (default_cf_iter == cf_name_to_options.end()) {
2608
2
    return STATUS(InvalidArgument, "Default column family not specified");
2609
2
  }
2610
437k
  VersionEdit default_cf_edit;
2611
437k
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
2612
437k
  default_cf_edit.SetColumnFamily(0);
2613
437k
  ColumnFamilyData* default_cfd =
2614
437k
      CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
2615
437k
  builders.emplace(0, std::make_unique<BaseReferencedVersionBuilder>(default_cfd));
2616
2617
437k
  Status s;
2618
437k
  uint64_t current_manifest_file_size;
2619
437k
  std::string current_manifest_filename;
2620
437k
  {
2621
437k
    ManifestReader manifest_reader(env_, db_options_->get_checkpoint_env(), env_options_,
2622
437k
                                   db_options_->boundary_extractor.get(), dbname_);
2623
437k
    auto status = manifest_reader.OpenManifest();
2624
437k
    if (!status.ok()) {
2625
1
      return status;
2626
1
    }
2627
437k
    current_manifest_file_size = manifest_reader.current_manifest_file_size();
2628
437k
    current_manifest_filename = manifest_reader.manifest_filename();
2629
437k
    manifest_file_number_ = manifest_reader.manifest_file_number();
2630
2631
913k
    for (;;) {
2632
913k
      s = manifest_reader.Next();
2633
913k
      if (!s.ok()) {
2634
436k
        break;
2635
436k
      }
2636
476k
      auto& edit = *manifest_reader;
2637
      // Not found means that user didn't supply that column
2638
      // family option AND we encountered column family add
2639
      // record. Once we encounter column family drop record,
2640
      // we will delete the column family from
2641
      // column_families_not_found.
2642
476k
      bool cf_in_not_found =
2643
476k
          column_families_not_found.find(edit.column_family_) !=
2644
476k
          column_families_not_found.end();
2645
      // in builders means that user supplied that column family
2646
      // option AND that we encountered column family add record
2647
476k
      bool cf_in_builders =
2648
476k
          builders.find(edit.column_family_) != builders.end();
2649
2650
      // they can't both be true
2651
476k
      assert(!(cf_in_not_found && cf_in_builders));
2652
2653
0
      ColumnFamilyData* cfd = nullptr;
2654
2655
476k
      if (edit.column_family_name_) {
2656
2.95k
        if (cf_in_builders || cf_in_not_found) {
2657
0
          s = STATUS(Corruption,
2658
0
              "Manifest adding the same column family twice");
2659
0
          break;
2660
0
        }
2661
2.95k
        auto cf_options = cf_name_to_options.find(*edit.column_family_name_);
2662
2.95k
        if (cf_options == cf_name_to_options.end()) {
2663
14
          column_families_not_found.emplace(edit.column_family_, *edit.column_family_name_);
2664
2.94k
        } else {
2665
2.94k
          cfd = CreateColumnFamily(cf_options->second, &edit);
2666
2.94k
          builders.emplace(edit.column_family_,
2667
2.94k
                           std::make_unique<BaseReferencedVersionBuilder>(cfd));
2668
2.94k
        }
2669
473k
      } else if (edit.is_column_family_drop_) {
2670
10
        if (cf_in_builders) {
2671
2
          auto builder = builders.find(edit.column_family_);
2672
2
          assert(builder != builders.end());
2673
0
          builders.erase(builder);
2674
2
          cfd = column_family_set_->GetColumnFamily(edit.column_family_);
2675
2
          if (cfd->Unref()) {
2676
2
            delete cfd;
2677
2
            cfd = nullptr;
2678
2
          } else {
2679
            // who else can have reference to cfd!?
2680
0
            assert(false);
2681
0
          }
2682
8
        } else if (cf_in_not_found) {
2683
8
          column_families_not_found.erase(edit.column_family_);
2684
8
        } else {
2685
0
          s = STATUS(Corruption,
2686
0
              "Manifest - dropping non-existing column family");
2687
0
          break;
2688
0
        }
2689
474k
      } else 
if (473k
!cf_in_not_found473k
) {
2690
474k
        if (!cf_in_builders) {
2691
0
          s = STATUS(Corruption,
2692
0
              "Manifest record referencing unknown column family");
2693
0
          break;
2694
0
        }
2695
2696
474k
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
2697
        // this should never happen since cf_in_builders is true
2698
474k
        assert(cfd != nullptr);
2699
474k
        if (edit.max_level_ >= cfd->current()->storage_info()->num_levels()) {
2700
6
          s = STATUS(InvalidArgument,
2701
6
              "db has more levels than options.num_levels");
2702
6
          break;
2703
6
        }
2704
2705
        // if it is not column family add or column family drop,
2706
        // then it's a file add/delete, which should be forwarded
2707
        // to builder
2708
474k
        auto builder = builders.find(edit.column_family_);
2709
474k
        assert(builder != builders.end());
2710
0
        builder->second->version_builder()->Apply(&edit);
2711
474k
      }
2712
2713
477k
      
if (476k
cfd != nullptr476k
) {
2714
477k
        if (edit.log_number_) {
2715
456k
          if (cfd->GetLogNumber() > edit.log_number_) {
2716
0
            RLOG(InfoLogLevel::WARN_LEVEL, db_options_->info_log,
2717
0
                "MANIFEST corruption detected, but ignored - Log numbers in "
2718
0
                "records NOT monotonically increasing");
2719
456k
          } else {
2720
456k
            cfd->SetLogNumber(*edit.log_number_);
2721
456k
            have_log_number = true;
2722
456k
          }
2723
456k
        }
2724
477k
        if (edit.comparator_ &&
2725
477k
            
*edit.comparator_ != cfd->user_comparator()->Name()14.2k
) {
2726
5
          s = STATUS(InvalidArgument,
2727
5
              cfd->user_comparator()->Name(),
2728
5
              "does not match existing comparator " + *edit.comparator_);
2729
5
          break;
2730
5
        }
2731
477k
      }
2732
2733
476k
      if (edit.prev_log_number_) {
2734
24.8k
        previous_log_number = *edit.prev_log_number_;
2735
24.8k
        have_prev_log_number = true;
2736
24.8k
      }
2737
2738
476k
      if (edit.next_file_number_) {
2739
451k
        next_file = *edit.next_file_number_;
2740
451k
        have_next_file = true;
2741
451k
      }
2742
2743
476k
      if (edit.max_column_family_) {
2744
2.60k
        max_column_family = *edit.max_column_family_;
2745
2.60k
      }
2746
2747
476k
      if (edit.last_sequence_) {
2748
451k
        last_sequence = *edit.last_sequence_;
2749
451k
        have_last_sequence = true;
2750
451k
      }
2751
2752
476k
      if (edit.flushed_frontier_) {
2753
11.4k
        UpdateUserFrontier(
2754
11.4k
            &flushed_frontier, edit.flushed_frontier_, UpdateUserValueType::kLargest);
2755
11.4k
        VLOG(1) << "Updating flushed frontier with that from edit: "
2756
7
                << edit.flushed_frontier_->ToString()
2757
7
                << ", new flushed froniter: " << flushed_frontier->ToString();
2758
465k
      } else {
2759
18.4E
        VLOG(1) << "No flushed frontier found in edit";
2760
465k
      }
2761
476k
    }
2762
437k
    if (s.IsEndOfFile()) {
2763
436k
      s = Status::OK();
2764
436k
    }
2765
437k
  }
2766
2767
437k
  if (s.ok()) {
2768
436k
    if (!have_next_file) {
2769
1
      s = STATUS(Corruption, "no meta-nextfile entry in descriptor");
2770
436k
    } else if (!have_log_number) {
2771
0
      s = STATUS(Corruption, "no meta-lognumber entry in descriptor");
2772
436k
    } else if (!have_last_sequence) {
2773
0
      s = STATUS(Corruption, "no last-sequence-number entry in descriptor");
2774
0
    }
2775
2776
436k
    if (!have_prev_log_number) {
2777
426k
      previous_log_number = 0;
2778
426k
    }
2779
2780
436k
    column_family_set_->UpdateMaxColumnFamily(max_column_family);
2781
2782
436k
    MarkFileNumberUsedDuringRecovery(previous_log_number);
2783
436k
    MarkFileNumberUsedDuringRecovery(log_number);
2784
436k
  }
2785
2786
  // there were some column families in the MANIFEST that weren't specified
2787
  // in the argument. This is OK in read_only mode
2788
437k
  if (read_only == false && 
!column_families_not_found.empty()436k
) {
2789
1
    std::string list_of_not_found;
2790
3
    for (const auto& cf : column_families_not_found) {
2791
3
      list_of_not_found += ", " + cf.second;
2792
3
    }
2793
1
    list_of_not_found = list_of_not_found.substr(2);
2794
1
    s = STATUS(InvalidArgument,
2795
1
        "You have to open all column families. Column families not opened: " +
2796
1
        list_of_not_found);
2797
1
  }
2798
2799
437k
  if (s.ok()) {
2800
439k
    for (auto cfd : *column_family_set_) {
2801
439k
      if (cfd->IsDropped()) {
2802
0
        continue;
2803
0
      }
2804
439k
      auto builders_iter = builders.find(cfd->GetID());
2805
439k
      assert(builders_iter != builders.end());
2806
0
      auto* builder = builders_iter->second->version_builder();
2807
2808
439k
      if (db_options_->max_open_files == -1) {
2809
        // unlimited table cache. Pre-load table handle now.
2810
        // Need to do it out of the mutex.
2811
318
        builder->LoadTableHandlers(cfd->internal_stats(),
2812
318
                                   db_options_->max_file_opening_threads);
2813
318
      }
2814
2815
439k
      Version* v = new Version(cfd, this, current_version_number_++);
2816
439k
      builder->SaveTo(v->storage_info());
2817
2818
      // Install recovered version
2819
439k
      v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
2820
439k
          !(db_options_->skip_stats_update_on_db_open));
2821
439k
      AppendVersion(cfd, v);
2822
439k
    }
2823
2824
436k
    manifest_file_size_ = current_manifest_file_size;
2825
436k
    next_file_number_.store(next_file + 1);
2826
436k
    SetLastSequenceNoSanityChecking(last_sequence);
2827
436k
    prev_log_number_ = previous_log_number;
2828
436k
    if (flushed_frontier) {
2829
5.26k
      UpdateFlushedFrontierNoSanityChecking(std::move(flushed_frontier));
2830
5.26k
    }
2831
2832
436k
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
2833
436k
        "Recovered from manifest file:%s succeeded,"
2834
436k
        "manifest_file_number is %" PRIu64 ", next_file_number is %lu, "
2835
436k
        "last_sequence is %" PRIu64 ", log_number is %" PRIu64 ","
2836
436k
        "prev_log_number is %" PRIu64 ","
2837
436k
        "max_column_family is %u, flushed_values is %s\n",
2838
436k
        current_manifest_filename.c_str(), manifest_file_number_,
2839
436k
        next_file_number_.load(), LastSequence(),
2840
436k
        log_number, prev_log_number_,
2841
436k
        column_family_set_->GetMaxColumnFamily(),
2842
436k
        yb::ToString(flushed_frontier_).c_str());
2843
2844
440k
    for (auto cfd : *column_family_set_) {
2845
440k
      if (cfd->IsDropped()) {
2846
0
        continue;
2847
0
      }
2848
440k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
2849
440k
          "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
2850
440k
          cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
2851
440k
    }
2852
436k
  }
2853
2854
437k
  return s;
2855
437k
}
2856
2857
Status VersionSet::Import(const std::string& source_dir,
2858
                          SequenceNumber seqno,
2859
0
                          VersionEdit* edit) {
2860
0
  ManifestReader manifest_reader(env_, db_options_->get_checkpoint_env(), env_options_,
2861
0
                                 db_options_->boundary_extractor.get(), source_dir);
2862
0
  auto status = manifest_reader.OpenManifest();
2863
0
  if (!status.ok()) {
2864
0
    return status;
2865
0
  }
2866
0
  std::vector<FileMetaData> files;
2867
0
  std::vector<std::pair<SequenceNumber, SequenceNumber>> segments;
2868
0
  for (;;) {
2869
0
    status = manifest_reader.Next();
2870
0
    if (!status.ok()) {
2871
0
      break;
2872
0
    }
2873
0
    auto& current = *manifest_reader;
2874
0
    if (!current.GetDeletedFiles().empty()) {
2875
0
      return STATUS(Corruption, "Deleted files should be empty");
2876
0
    }
2877
0
    for (const auto& file : current.GetNewFiles()) {
2878
0
      auto filemeta = file.second;
2879
0
      filemeta.largest.user_frontier.reset();
2880
0
      filemeta.smallest.user_frontier.reset();
2881
0
      filemeta.imported = true;
2882
0
      if (filemeta.largest.seqno >= seqno) {
2883
0
        return STATUS_FORMAT(InvalidArgument,
2884
0
                             "Imported DB contains seqno ($0) greater than active seqno ($1)",
2885
0
                             filemeta.largest.seqno,
2886
0
                             seqno);
2887
0
      }
2888
0
      files.push_back(filemeta);
2889
0
      segments.emplace_back(filemeta.smallest.seqno, filemeta.largest.seqno);
2890
0
    }
2891
0
  }
2892
0
  if (!status.IsEndOfFile()) {
2893
0
    return status;
2894
0
  }
2895
2896
0
  if (files.empty()) {
2897
0
    return STATUS_FORMAT(NotFound, "Imported DB is empty: $0", source_dir);
2898
0
  }
2899
2900
0
  std::vector<LiveFileMetaData> live_files;
2901
0
  GetLiveFilesMetaData(&live_files);
2902
0
  for (const auto& file : live_files) {
2903
0
    segments.emplace_back(file.smallest.seqno, file.largest.seqno);
2904
0
  }
2905
2906
0
  std::sort(segments.begin(), segments.end(), [](const auto& lhs, const auto& rhs) {
2907
0
    return lhs.first < rhs.first;
2908
0
  });
2909
0
  auto prev = segments.front();
2910
0
  for (size_t i = 1; i != segments.size(); ++i) {
2911
0
    const auto& segment = segments[i];
2912
0
    if (segment.first <= prev.second) {
2913
0
      return STATUS_FORMAT(Corruption,
2914
0
                           "Overlapping seqno ranges: [$0, $1] and [$2, $3]",
2915
0
                           prev.first,
2916
0
                           prev.second,
2917
0
                           segment.first,
2918
0
                           segment.second);
2919
0
    }
2920
0
    prev = segment;
2921
0
  }
2922
2923
0
  std::vector<std::string> revert_list;
2924
0
  for (auto file : files) {
2925
0
    auto source_base = MakeTableFileName(source_dir, file.fd.GetNumber());
2926
0
    auto source_data = TableBaseToDataFileName(source_base);
2927
0
    auto new_number = NewFileNumber();
2928
0
    auto dest_base = MakeTableFileName(dbname_, new_number);
2929
0
    auto dest_data = TableBaseToDataFileName(dest_base);
2930
0
    LOG(INFO) << "Importing: " << source_base << " => " << dest_base;
2931
0
    status = env_->LinkFile(source_base, dest_base);
2932
0
    if (!status.ok()) {
2933
0
      break;
2934
0
    }
2935
0
    revert_list.push_back(dest_base);
2936
0
    status = env_->LinkFile(source_data, dest_data);
2937
0
    if (!status.ok()) {
2938
0
      break;
2939
0
    }
2940
0
    revert_list.push_back(dest_data);
2941
0
    file.fd.packed_number_and_path_id = new_number; // path is 0
2942
0
    file.marked_for_compaction = false;
2943
0
    edit->AddCleanedFile(0, file);
2944
0
  }
2945
2946
0
  if (!status.ok()) {
2947
0
    for (const auto& file : revert_list) {
2948
0
      auto delete_status = env_->DeleteFile(file);
2949
0
      LOG(ERROR) << "Failed to delete file: " << file << ", status: " << delete_status.ToString();
2950
0
    }
2951
0
    return status;
2952
0
  }
2953
2954
0
  return Status::OK();
2955
0
}
2956
2957
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
2958
                                      const std::string& dbname,
2959
                                      BoundaryValuesExtractor* extractor,
2960
8
                                      Env* env) {
2961
  // these are just for performance reasons, not correctness,
2962
  // so we're fine using the defaults
2963
8
  EnvOptions soptions;
2964
  // Read "CURRENT" file, which contains a pointer to the current manifest file
2965
8
  std::string current;
2966
8
  Status s = ReadFileToString(env, CurrentFileName(dbname), &current);
2967
8
  if (!s.ok()) {
2968
0
    return s;
2969
0
  }
2970
8
  if (current.empty() || current[current.size()-1] != '\n') {
2971
1
    return STATUS(Corruption, "CURRENT file does not end with newline");
2972
1
  }
2973
7
  current.resize(current.size() - 1);
2974
2975
7
  std::string dscname = dbname + "/" + current;
2976
2977
7
  unique_ptr<SequentialFileReader> file_reader;
2978
7
  {
2979
7
    unique_ptr<SequentialFile> file;
2980
7
    s = env->NewSequentialFile(dscname, &file, soptions);
2981
7
    if (!s.ok()) {
2982
0
      return s;
2983
0
    }
2984
7
    file_reader.reset(new SequentialFileReader(std::move(file)));
2985
7
  }
2986
2987
0
  std::map<uint32_t, std::string> column_family_names;
2988
  // default column family is always implicitly there
2989
7
  column_family_names.insert({0, kDefaultColumnFamilyName});
2990
7
  LogReporter reporter;
2991
7
  reporter.status = &s;
2992
7
  log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/,
2993
7
                     0 /*initial_offset*/, 0);
2994
7
  Slice record;
2995
7
  std::string scratch;
2996
39
  while (reader.ReadRecord(&record, &scratch) && 
s.ok()32
) {
2997
32
    VersionEdit edit;
2998
32
    s = edit.DecodeFrom(extractor, record);
2999
32
    if (!s.ok()) {
3000
0
      break;
3001
0
    }
3002
32
    if (edit.column_family_name_) {
3003
2
      if (column_family_names.find(edit.column_family_) !=
3004
2
          column_family_names.end()) {
3005
0
        s = STATUS(Corruption, "Manifest adding the same column family twice");
3006
0
        break;
3007
0
      }
3008
2
      column_family_names.emplace(edit.column_family_, *edit.column_family_name_);
3009
30
    } else if (edit.is_column_family_drop_) {
3010
0
      if (column_family_names.find(edit.column_family_) ==
3011
0
          column_family_names.end()) {
3012
0
        s = STATUS(Corruption,
3013
0
            "Manifest - dropping non-existing column family");
3014
0
        break;
3015
0
      }
3016
0
      column_family_names.erase(edit.column_family_);
3017
0
    }
3018
32
  }
3019
3020
7
  column_families->clear();
3021
7
  if (s.ok()) {
3022
9
    for (const auto& iter : column_family_names) {
3023
9
      column_families->push_back(iter.second);
3024
9
    }
3025
7
  }
3026
3027
7
  return s;
3028
7
}
3029
3030
#ifndef ROCKSDB_LITE
3031
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
3032
                                        const Options* options,
3033
                                        const EnvOptions& env_options,
3034
5
                                        int new_levels) {
3035
5
  if (new_levels <= 1) {
3036
0
    return STATUS(InvalidArgument,
3037
0
        "Number of levels needs to be bigger than 1");
3038
0
  }
3039
3040
5
  ColumnFamilyOptions cf_options(*options);
3041
5
  std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
3042
5
                                        options->table_cache_numshardbits));
3043
5
  WriteController wc(options->delayed_write_rate);
3044
5
  WriteBuffer wb(options->db_write_buffer_size);
3045
5
  VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc);
3046
5
  Status status;
3047
3048
5
  std::vector<ColumnFamilyDescriptor> dummy;
3049
5
  ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
3050
5
                                          ColumnFamilyOptions(*options));
3051
5
  dummy.push_back(dummy_descriptor);
3052
5
  status = versions.Recover(dummy);
3053
5
  if (!status.ok()) {
3054
0
    return status;
3055
0
  }
3056
3057
5
  Version* current_version =
3058
5
      versions.GetColumnFamilySet()->GetDefault()->current();
3059
5
  auto* vstorage = current_version->storage_info();
3060
5
  int current_levels = vstorage->num_levels();
3061
3062
5
  if (current_levels <= new_levels) {
3063
0
    return Status::OK();
3064
0
  }
3065
3066
  // Make sure there are file only on one level from
3067
  // (new_levels-1) to (current_levels-1)
3068
5
  int first_nonempty_level = -1;
3069
5
  int first_nonempty_level_filenum = 0;
3070
636
  for (int i = new_levels - 1; i < current_levels; 
i++631
) {
3071
631
    int file_num = vstorage->NumLevelFiles(i);
3072
631
    if (file_num != 0) {
3073
5
      if (first_nonempty_level < 0) {
3074
5
        first_nonempty_level = i;
3075
5
        first_nonempty_level_filenum = file_num;
3076
5
      } else {
3077
0
        char msg[255];
3078
0
        snprintf(msg, sizeof(msg),
3079
0
                 "Found at least two levels containing files: "
3080
0
                 "[%d:%d],[%d:%d].\n",
3081
0
                 first_nonempty_level, first_nonempty_level_filenum, i,
3082
0
                 file_num);
3083
0
        return STATUS(InvalidArgument, msg);
3084
0
      }
3085
5
    }
3086
631
  }
3087
3088
  // we need to allocate an array with the old number of levels size to
3089
  // avoid SIGSEGV in WriteSnapshot()
3090
  // however, all levels bigger or equal to new_levels will be empty
3091
5
  std::vector<FileMetaData*>* new_files_list =
3092
5
      new std::vector<FileMetaData*>[current_levels];
3093
14
  for (int i = 0; i < new_levels - 1; 
i++9
) {
3094
9
    new_files_list[i] = vstorage->LevelFiles(i);
3095
9
  }
3096
3097
5
  if (first_nonempty_level > 0) {
3098
5
    new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
3099
5
  }
3100
3101
5
  delete[] vstorage -> files_;
3102
5
  vstorage->files_ = new_files_list;
3103
5
  vstorage->num_levels_ = new_levels;
3104
3105
5
  MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options));
3106
5
  VersionEdit ve;
3107
5
  InstrumentedMutex dummy_mutex;
3108
5
  InstrumentedMutexLock l(&dummy_mutex);
3109
5
  return versions.LogAndApply(
3110
5
      versions.GetColumnFamilySet()->GetDefault(),
3111
5
      mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
3112
5
}
3113
3114
Status VersionSet::DumpManifest(const Options& options, const std::string& dscname,
3115
0
                                bool verbose, bool hex) {
3116
  // Open the specified manifest file.
3117
0
  unique_ptr<SequentialFileReader> file_reader;
3118
0
  Status s;
3119
0
  {
3120
0
    unique_ptr<SequentialFile> file;
3121
0
    s = options.env->NewSequentialFile(dscname, &file, env_options_);
3122
0
    if (!s.ok()) {
3123
0
      return s;
3124
0
    }
3125
0
    file_reader.reset(new SequentialFileReader(std::move(file)));
3126
0
  }
3127
3128
0
  bool have_prev_log_number = false;
3129
0
  bool have_next_file = false;
3130
0
  bool have_last_sequence = false;
3131
0
  uint64_t next_file = 0;
3132
0
  uint64_t last_sequence = 0;
3133
0
  uint64_t previous_log_number = 0;
3134
0
  UserFrontier* flushed_frontier = nullptr;
3135
0
  int count = 0;
3136
0
  std::unordered_map<uint32_t, std::string> comparators;
3137
0
  std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
3138
3139
  // add default column family
3140
0
  VersionEdit default_cf_edit;
3141
0
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
3142
0
  default_cf_edit.SetColumnFamily(0);
3143
0
  ColumnFamilyData* default_cfd =
3144
0
      CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
3145
0
  builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
3146
3147
0
  {
3148
0
    LogReporter reporter;
3149
0
    reporter.status = &s;
3150
0
    log::Reader reader(NULL, std::move(file_reader), &reporter,
3151
0
                       true /*checksum*/, 0 /*initial_offset*/, 0);
3152
0
    Slice record;
3153
0
    std::string scratch;
3154
0
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3155
0
      VersionEdit edit;
3156
0
      s = edit.DecodeFrom(db_options_->boundary_extractor.get(), record);
3157
0
      if (!s.ok()) {
3158
0
        break;
3159
0
      }
3160
3161
      // Write out each individual edit
3162
0
      if (verbose) {
3163
0
        printf("%s\n", edit.DebugString(hex).c_str());
3164
0
      }
3165
0
      count++;
3166
3167
0
      bool cf_in_builders =
3168
0
          builders.find(edit.column_family_) != builders.end();
3169
3170
0
      if (edit.comparator_) {
3171
0
        comparators.emplace(edit.column_family_, *edit.comparator_);
3172
0
      }
3173
3174
0
      ColumnFamilyData* cfd = nullptr;
3175
3176
0
      if (edit.column_family_name_) {
3177
0
        if (cf_in_builders) {
3178
0
          s = STATUS(Corruption,
3179
0
              "Manifest adding the same column family twice");
3180
0
          break;
3181
0
        }
3182
0
        cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
3183
0
        builders.insert(
3184
0
            {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
3185
0
      } else if (edit.is_column_family_drop_) {
3186
0
        if (!cf_in_builders) {
3187
0
          s = STATUS(Corruption,
3188
0
              "Manifest - dropping non-existing column family");
3189
0
          break;
3190
0
        }
3191
0
        auto builder_iter = builders.find(edit.column_family_);
3192
0
        delete builder_iter->second;
3193
0
        builders.erase(builder_iter);
3194
0
        comparators.erase(edit.column_family_);
3195
0
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3196
0
        assert(cfd != nullptr);
3197
0
        cfd->Unref();
3198
0
        delete cfd;
3199
0
        cfd = nullptr;
3200
0
      } else {
3201
0
        if (!cf_in_builders) {
3202
0
          s = STATUS(Corruption,
3203
0
              "Manifest record referencing unknown column family");
3204
0
          break;
3205
0
        }
3206
3207
0
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3208
        // this should never happen since cf_in_builders is true
3209
0
        assert(cfd != nullptr);
3210
3211
        // if it is not column family add or column family drop,
3212
        // then it's a file add/delete, which should be forwarded
3213
        // to builder
3214
0
        auto builder = builders.find(edit.column_family_);
3215
0
        assert(builder != builders.end());
3216
0
        builder->second->version_builder()->Apply(&edit);
3217
0
      }
3218
3219
0
      if (cfd != nullptr && edit.log_number_) {
3220
0
        cfd->SetLogNumber(*edit.log_number_);
3221
0
      }
3222
3223
0
      if (edit.prev_log_number_) {
3224
0
        previous_log_number = *edit.prev_log_number_;
3225
0
        have_prev_log_number = true;
3226
0
      }
3227
3228
0
      if (edit.next_file_number_) {
3229
0
        next_file = *edit.next_file_number_;
3230
0
        have_next_file = true;
3231
0
      }
3232
3233
0
      if (edit.last_sequence_) {
3234
0
        last_sequence = *edit.last_sequence_;
3235
0
        have_last_sequence = true;
3236
0
      }
3237
3238
0
      if (edit.flushed_frontier_) {
3239
0
        flushed_frontier = edit.flushed_frontier_.get();
3240
0
      }
3241
3242
0
      if (edit.max_column_family_) {
3243
0
        column_family_set_->UpdateMaxColumnFamily(*edit.max_column_family_);
3244
0
      }
3245
0
    }
3246
0
  }
3247
0
  file_reader.reset();
3248
3249
0
  if (s.ok()) {
3250
0
    if (!have_next_file) {
3251
0
      s = STATUS(Corruption, "no meta-nextfile entry in descriptor");
3252
0
      printf("no meta-nextfile entry in descriptor");
3253
0
    } else if (!have_last_sequence) {
3254
0
      printf("no last-sequence-number entry in descriptor");
3255
0
      s = STATUS(Corruption, "no last-sequence-number entry in descriptor");
3256
0
    }
3257
3258
0
    if (!have_prev_log_number) {
3259
0
      previous_log_number = 0;
3260
0
    }
3261
0
  }
3262
3263
0
  if (s.ok()) {
3264
0
    for (auto cfd : *column_family_set_) {
3265
0
      if (cfd->IsDropped()) {
3266
0
        continue;
3267
0
      }
3268
0
      auto builders_iter = builders.find(cfd->GetID());
3269
0
      assert(builders_iter != builders.end());
3270
0
      auto builder = builders_iter->second->version_builder();
3271
3272
0
      Version* v = new Version(cfd, this, current_version_number_++);
3273
0
      builder->SaveTo(v->storage_info());
3274
0
      v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
3275
3276
0
      printf("--------------- Column family \"%s\"  (ID %u) --------------\n",
3277
0
             cfd->GetName().c_str(), (unsigned int)cfd->GetID());
3278
0
      printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
3279
0
      auto comparator = comparators.find(cfd->GetID());
3280
0
      if (comparator != comparators.end()) {
3281
0
        printf("comparator: %s\n", comparator->second.c_str());
3282
0
      } else {
3283
0
        printf("comparator: <NO COMPARATOR>\n");
3284
0
      }
3285
0
      printf("%s \n", v->DebugString(hex).c_str());
3286
0
      delete v;
3287
0
    }
3288
3289
    // Free builders
3290
0
    for (auto& builder : builders) {
3291
0
      delete builder.second;
3292
0
    }
3293
3294
0
    next_file_number_.store(next_file + 1);
3295
0
    SetLastSequenceNoSanityChecking(last_sequence);
3296
0
    if (flushed_frontier) {
3297
0
      DCHECK_EQ(*flushed_frontier, *FlushedFrontier());
3298
0
    }
3299
0
    prev_log_number_ = previous_log_number;
3300
3301
0
    printf(
3302
0
        "next_file_number %" PRIu64 " last_sequence "
3303
0
        "%" PRIu64 " prev_log_number %" PRIu64 " max_column_family %u flushed_values %s\n",
3304
0
        next_file_number_.load(), last_sequence, previous_log_number,
3305
0
        column_family_set_->GetMaxColumnFamily(),
3306
0
        yb::ToString(flushed_frontier).c_str());
3307
0
  }
3308
3309
0
  return s;
3310
0
}
3311
#endif  // ROCKSDB_LITE
3312
3313
// Set the last sequence number to s.
3314
31.3M
void VersionSet::SetLastSequence(SequenceNumber s) {
3315
31.3M
#ifndef NDEBUG
3316
31.3M
  EnsureNonDecreasingLastSequence(LastSequence(), s);
3317
31.3M
#endif
3318
31.3M
  SetLastSequenceNoSanityChecking(s);
3319
31.3M
}
3320
3321
// Set last sequence number without verifying that it always keeps increasing.
3322
31.7M
void VersionSet::SetLastSequenceNoSanityChecking(SequenceNumber s) {
3323
31.7M
  last_sequence_.store(s, std::memory_order_release);
3324
31.7M
}
3325
3326
// Set the last flushed op id / hybrid time / history cutoff to the specified set of values.
3327
271k
void VersionSet::UpdateFlushedFrontier(UserFrontierPtr values) {
3328
271k
  EnsureNonDecreasingFlushedFrontier(FlushedFrontier(), *values);
3329
271k
  UpdateFlushedFrontierNoSanityChecking(std::move(values));
3330
271k
}
3331
3332
892k
void VersionSet::MarkFileNumberUsedDuringRecovery(uint64_t number) {
3333
  // only called during recovery which is single threaded, so this works because
3334
  // there can't be concurrent calls
3335
892k
  if (next_file_number_.load(std::memory_order_relaxed) <= number) {
3336
5.04k
    next_file_number_.store(number + 1, std::memory_order_relaxed);
3337
5.04k
  }
3338
892k
}
3339
3340
namespace {
3341
3342
562k
CHECKED_STATUS AddEdit(const VersionEdit& edit, const DBOptions* db_options, log::Writer* log) {
3343
562k
  std::string record;
3344
562k
  if (!edit.AppendEncodedTo(&record)) {
3345
0
    return STATUS(Corruption,
3346
0
        "Unable to Encode VersionEdit:" + edit.DebugString(true));
3347
0
  }
3348
562k
  RLOG(InfoLogLevel::INFO_LEVEL, db_options->info_log,
3349
562k
      "Writing version edit: %s\n", edit.DebugString().c_str());
3350
562k
  return log->AddRecord(record);
3351
562k
}
3352
3353
} // namespace
3354
3355
277k
Status VersionSet::WriteSnapshot(log::Writer* log, UserFrontierPtr flushed_frontier_override) {
3356
  // TODO: Break up into multiple records to reduce memory usage on recovery?
3357
3358
  // WARNING: This method doesn't hold a mutex!
3359
3360
  // This is done without DB mutex lock held, but only within single-threaded
3361
  // LogAndApply. Column family manipulations can only happen within LogAndApply
3362
  // (the same single thread), so we're safe to iterate.
3363
281k
  for (auto cfd : *column_family_set_) {
3364
281k
    if (cfd->IsDropped()) {
3365
0
      continue;
3366
0
    }
3367
281k
    {
3368
      // Store column family info
3369
281k
      VersionEdit edit;
3370
281k
      if (cfd->GetID() != 0) {
3371
        // default column family is always there,
3372
        // no need to explicitly write it
3373
3.54k
        edit.AddColumnFamily(cfd->GetName());
3374
3.54k
        edit.SetColumnFamily(cfd->GetID());
3375
3.54k
      }
3376
281k
      edit.SetComparatorName(
3377
281k
          cfd->internal_comparator()->user_comparator()->Name());
3378
281k
      RETURN_NOT_OK(AddEdit(edit, db_options_, log));
3379
281k
    }
3380
3381
281k
    {
3382
      // Save files
3383
281k
      VersionEdit edit;
3384
281k
      edit.SetColumnFamily(cfd->GetID());
3385
3386
631k
      for (int level = 0; level < cfd->NumberLevels(); 
level++350k
) {
3387
350k
        for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) {
3388
28.9k
          edit.AddCleanedFile(level, *f);
3389
28.9k
        }
3390
350k
      }
3391
281k
      edit.SetLogNumber(cfd->GetLogNumber());
3392
281k
      if (flushed_frontier_override) {
3393
1.80k
        edit.flushed_frontier_ = flushed_frontier_override;
3394
279k
      } else {
3395
279k
        edit.flushed_frontier_ = flushed_frontier_;
3396
279k
      }
3397
281k
      RETURN_NOT_OK(AddEdit(edit, db_options_, log));
3398
281k
    }
3399
281k
  }
3400
3401
277k
  return Status::OK();
3402
277k
}
3403
3404
// TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
3405
// function is called repeatedly with consecutive pairs of slices. For example
3406
// if the slice list is [a, b, c, d] this function is called with arguments
3407
// (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
3408
// we avoid doing binary search for the keys b and c twice and instead somehow
3409
// maintain state of where they first appear in the files.
3410
uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start,
3411
                                     const Slice& end, int start_level,
3412
14.1k
                                     int end_level) {
3413
  // pre-condition
3414
14.1k
  assert(v->cfd_->internal_comparator()->Compare(start, end) <= 0);
3415
3416
0
  uint64_t size = 0;
3417
14.1k
  const auto* vstorage = v->storage_info();
3418
14.1k
  end_level = end_level == -1
3419
14.1k
                  ? 
vstorage->num_non_empty_levels()14.0k
3420
14.1k
                  : 
std::min(end_level, vstorage->num_non_empty_levels())129
;
3421
3422
14.1k
  assert(start_level <= end_level);
3423
3424
42.2k
  for (int level = start_level; level < end_level; 
level++28.0k
) {
3425
28.0k
    const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
3426
28.0k
    if (!files_brief.num_files) {
3427
      // empty level, skip exploration
3428
13.4k
      continue;
3429
13.4k
    }
3430
3431
14.5k
    if (!level) {
3432
      // level 0 data is sorted order, handle the use case explicitly
3433
1.04k
      size += ApproximateSizeLevel0(v, files_brief, start, end);
3434
1.04k
      continue;
3435
1.04k
    }
3436
3437
13.5k
    assert(level > 0);
3438
0
    assert(files_brief.num_files > 0);
3439
3440
    // identify the file position for starting key
3441
0
    const uint64_t idx_start = FindFileInRange(
3442
13.5k
        *v->cfd_->internal_comparator(), files_brief, start,
3443
13.5k
        /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1));
3444
13.5k
    assert(idx_start < files_brief.num_files);
3445
3446
    // scan all files from the starting position until the ending position
3447
    // inferred from the sorted order
3448
27.1k
    for (uint64_t i = idx_start; i < files_brief.num_files; 
i++13.6k
) {
3449
14.5k
      uint64_t val;
3450
14.5k
      val = ApproximateSize(v, files_brief.files[i], end);
3451
14.5k
      if (!val) {
3452
        // the files after this will not have the range
3453
814
        break;
3454
814
      }
3455
3456
13.6k
      size += val;
3457
3458
13.6k
      if (i == idx_start) {
3459
        // subtract the bytes needed to be scanned to get to the starting
3460
        // key
3461
12.7k
        val = ApproximateSize(v, files_brief.files[i], start);
3462
12.7k
        assert(size >= val);
3463
0
        size -= val;
3464
12.7k
      }
3465
13.6k
    }
3466
13.5k
  }
3467
3468
14.1k
  return size;
3469
14.1k
}
3470
3471
uint64_t VersionSet::ApproximateSizeLevel0(Version* v,
3472
                                           const LevelFilesBrief& files_brief,
3473
                                           const Slice& key_start,
3474
1.04k
                                           const Slice& key_end) {
3475
  // level 0 files are not in sorted order, we need to iterate through
3476
  // the list to compute the total bytes that require scanning
3477
1.04k
  uint64_t size = 0;
3478
2.34k
  for (size_t i = 0; i < files_brief.num_files; 
i++1.29k
) {
3479
1.29k
    const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start);
3480
1.29k
    const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end);
3481
1.29k
    assert(end >= start);
3482
0
    size += end - start;
3483
1.29k
  }
3484
1.04k
  return size;
3485
1.04k
}
3486
3487
29.8k
uint64_t VersionSet::ApproximateSize(Version* v, const FdWithBoundaries& f, const Slice& key) {
3488
  // pre-condition
3489
29.8k
  assert(v);
3490
3491
0
  uint64_t result = 0;
3492
29.8k
  if (v->cfd_->internal_comparator()->Compare(f.largest.key, key) <= 0) {
3493
    // Entire file is before "key", so just add the file size
3494
2.05k
    result = f.fd.GetTotalFileSize();
3495
27.7k
  } else if (v->cfd_->internal_comparator()->Compare(f.smallest.key, key) > 0) {
3496
    // Entire file is after "key", so ignore
3497
10.8k
    result = 0;
3498
16.8k
  } else {
3499
    // "key" falls in the range for this table.  Add the
3500
    // approximate offset of "key" within the table.
3501
16.8k
    TableReader* table_reader_ptr;
3502
16.8k
    InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
3503
16.8k
        ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd, Slice() /* filter */,
3504
16.8k
        &table_reader_ptr);
3505
16.8k
    if (table_reader_ptr != nullptr) {
3506
16.8k
      result = table_reader_ptr->ApproximateOffsetOf(key);
3507
16.8k
    }
3508
16.8k
    delete iter;
3509
16.8k
  }
3510
29.8k
  return result;
3511
29.8k
}
3512
3513
1.85M
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
3514
  // pre-calculate space requirement
3515
1.85M
  int64_t total_files = 0;
3516
2.15M
  for (auto cfd : *column_family_set_) {
3517
2.15M
    Version* dummy_versions = cfd->dummy_versions();
3518
4.31M
    for (Version* v = dummy_versions->next_; v != dummy_versions;
3519
2.16M
         v = v->next_) {
3520
2.16M
      const auto* vstorage = v->storage_info();
3521
7.28M
      for (int level = 0; level < vstorage->num_levels(); 
level++5.12M
) {
3522
5.12M
        total_files += vstorage->LevelFiles(level).size();
3523
5.12M
      }
3524
2.16M
    }
3525
2.15M
  }
3526
3527
  // just one time extension to the right size
3528
1.85M
  live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
3529
3530
2.15M
  for (auto cfd : *column_family_set_) {
3531
2.15M
    auto* current = cfd->current();
3532
2.15M
    bool found_current = false;
3533
2.15M
    Version* dummy_versions = cfd->dummy_versions();
3534
4.31M
    for (Version* v = dummy_versions->next_; v != dummy_versions;
3535
2.16M
         v = v->next_) {
3536
2.16M
      v->AddLiveFiles(live_list);
3537
2.16M
      if (v == current) {
3538
2.15M
        found_current = true;
3539
2.15M
      }
3540
2.16M
    }
3541
2.15M
    if (!found_current && 
current != nullptr0
) {
3542
      // Should never happen unless it is a bug.
3543
0
      assert(false);
3544
0
      current->AddLiveFiles(live_list);
3545
0
    }
3546
2.15M
  }
3547
1.85M
}
3548
3549
11.4k
InternalIterator* VersionSet::MakeInputIterator(Compaction* c) {
3550
11.4k
  auto cfd = c->column_family_data();
3551
11.4k
  ReadOptions read_options;
3552
11.4k
  read_options.verify_checksums =
3553
11.4k
    c->mutable_cf_options()->verify_checksums_in_compaction;
3554
11.4k
  read_options.fill_cache = false;
3555
11.4k
  if (c->ShouldFormSubcompactions()) {
3556
1.07k
    read_options.total_order_seek = true;
3557
1.07k
  }
3558
3559
  // Level-0 files have to be merged together.  For other levels,
3560
  // we will make a concatenating iterator per level.
3561
  // TODO(opt): use concatenating iterator for level-0 if there is no overlap
3562
11.4k
  const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
3563
9.33k
                                              c->num_input_levels() - 1
3564
11.4k
                                        : 
c->num_input_levels()2.09k
);
3565
11.4k
  InternalIterator** list = new InternalIterator* [space];
3566
11.4k
  size_t num = 0;
3567
43.2k
  for (size_t which = 0; which < c->num_input_levels(); 
which++31.8k
) {
3568
31.8k
    if (c->input_levels(which)->num_files != 0) {
3569
19.0k
      if (c->level(which) == 0) {
3570
9.33k
        const LevelFilesBrief* flevel = c->input_levels(which);
3571
37.8k
        for (size_t i = 0; i < flevel->num_files; 
i++28.4k
) {
3572
28.4k
          FileMetaData* fmd = c->input(which, i);
3573
28.4k
          if (c->input(which, i)->delete_after_compaction()) {
3574
23
            RLOG(
3575
23
                InfoLogLevel::INFO_LEVEL, db_options_->info_log,
3576
23
                yb::Format(
3577
23
                    "[$0] File marked for deletion, will be removed after compaction. file: $1",
3578
23
                    c->column_family_data()->GetName(), fmd->ToString()).c_str());
3579
23
            RecordTick(cfd->ioptions()->statistics, COMPACTION_FILES_FILTERED);
3580
23
            continue;
3581
23
          }
3582
28.4k
          RecordTick(cfd->ioptions()->statistics, COMPACTION_FILES_NOT_FILTERED);
3583
28.4k
          list[num++] = cfd->table_cache()->NewIterator(
3584
28.4k
              read_options, env_options_compactions_,
3585
28.4k
              cfd->internal_comparator(), flevel->files[i].fd, flevel->files[i].user_filter_data,
3586
28.4k
              nullptr, nullptr /* no per level latency histogram*/,
3587
28.4k
              true /* for compaction */);
3588
28.4k
        }
3589
9.70k
      } else {
3590
        // Create concatenating iterator for the files from this level
3591
9.70k
        list[num++] = NewTwoLevelIterator(
3592
9.70k
            new LevelFileIteratorState(
3593
9.70k
                cfd->table_cache(), read_options, env_options_,
3594
9.70k
                cfd->internal_comparator(),
3595
9.70k
                nullptr /* no per level latency histogram */,
3596
9.70k
                true /* for_compaction */, false /* prefix enabled */,
3597
9.70k
                false /* skip_filters */),
3598
9.70k
            new LevelFileNumIterator(*cfd->internal_comparator(),
3599
9.70k
                                     c->input_levels(which)));
3600
9.70k
      }
3601
19.0k
    }
3602
31.8k
  }
3603
11.4k
  assert(num <= space);
3604
0
  InternalIterator* result =
3605
11.4k
      NewMergingIterator(c->column_family_data()->internal_comparator().get(), list,
3606
11.4k
                         static_cast<int>(num));
3607
11.4k
  delete[] list;
3608
11.4k
  return result;
3609
11.4k
}
3610
3611
// verify that the files listed in this compaction are present
3612
// in the current version
3613
11.2k
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
3614
11.2k
#ifndef NDEBUG
3615
11.2k
  Version* version = c->column_family_data()->current();
3616
11.2k
  const VersionStorageInfo* vstorage = version->storage_info();
3617
11.2k
  if (c->input_version_number() != version->GetVersionNumber()) {
3618
4.35k
    RLOG(
3619
4.35k
        InfoLogLevel::INFO_LEVEL, db_options_->info_log,
3620
4.35k
        yb::Format(
3621
4.35k
            "[$0] compaction output being applied to a different base version ($1) from input "
3622
4.35k
            "version ($2)",
3623
4.35k
            c->column_family_data()->GetName(), version->GetVersionNumber(),
3624
4.35k
            c->input_version_number())
3625
4.35k
            .c_str());
3626
3627
4.35k
    if (vstorage->compaction_style_ == kCompactionStyleLevel &&
3628
4.35k
        
c->start_level() == 01.94k
&&
c->num_input_levels() > 2U1.42k
) {
3629
      // We are doing a L0->base_level compaction. The assumption is if
3630
      // base level is not L1, levels from L1 to base_level - 1 is empty.
3631
      // This is ensured by having one compaction from L0 going on at the
3632
      // same time in level-based compaction. So that during the time, no
3633
      // compaction/flush can put files to those levels.
3634
0
      for (int l = c->start_level() + 1; l < c->output_level(); l++) {
3635
0
        if (vstorage->NumLevelFiles(l) != 0) {
3636
0
          return false;
3637
0
        }
3638
0
      }
3639
0
    }
3640
4.35k
  }
3641
3642
42.8k
  
for (size_t input = 0; 11.2k
input < c->num_input_levels();
++input31.5k
) {
3643
31.5k
    int level = c->level(input);
3644
79.1k
    for (size_t i = 0; i < c->num_input_files(input); 
++i47.6k
) {
3645
47.6k
      const auto& fd = c->input(input, i)->fd;
3646
47.6k
      uint64_t number = fd.GetNumber();
3647
47.6k
      bool found = false;
3648
268k
      for (size_t j = 0; j < vstorage->files_[level].size(); 
j++220k
) {
3649
268k
        FileMetaData* f = vstorage->files_[level][j];
3650
268k
        if (f->fd.GetNumber() == number) {
3651
47.6k
          found = true;
3652
47.6k
          break;
3653
47.6k
        }
3654
268k
      }
3655
47.6k
      if (!found) {
3656
0
        RLOG(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
3657
0
            yb::Format("[$0] compaction input file $1 not found in current version",
3658
0
            c->column_family_data()->GetName(), fd).c_str());
3659
0
        return false;  // input files non existent in current version
3660
0
      }
3661
47.6k
    }
3662
31.5k
  }
3663
11.2k
#endif
3664
11.2k
  return true;     // everything good
3665
11.2k
}
3666
3667
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
3668
                                      FileMetaData** meta,
3669
295
                                      ColumnFamilyData** cfd) {
3670
297
  for (auto cfd_iter : *column_family_set_) {
3671
297
    Version* version = cfd_iter->current();
3672
297
    const auto* vstorage = version->storage_info();
3673
519
    for (int level = 0; level < vstorage->num_levels(); 
level++222
) {
3674
1.03k
      for (const auto& file : vstorage->LevelFiles(level)) {
3675
1.03k
        if (file->fd.GetNumber() == number) {
3676
294
          *meta = file;
3677
294
          *filelevel = level;
3678
294
          *cfd = cfd_iter;
3679
294
          return Status::OK();
3680
294
        }
3681
1.03k
      }
3682
516
    }
3683
297
  }
3684
1
  return STATUS(NotFound, "File not present in any level");
3685
295
}
3686
3687
21.1M
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3688
21.1M
  for (auto cfd : *column_family_set_) {
3689
21.1M
    if (cfd->IsDropped()) {
3690
0
      continue;
3691
0
    }
3692
42.4M
    
for (int level = 0; 21.1M
level < cfd->NumberLevels();
level++21.3M
) {
3693
21.3M
      for (const auto& file :
3694
21.3M
           cfd->current()->storage_info()->LevelFiles(level)) {
3695
384k
        LiveFileMetaData filemetadata;
3696
384k
        filemetadata.column_family_name = cfd->GetName();
3697
384k
        uint32_t path_id = file->fd.GetPathId();
3698
384k
        if (
path_id < db_options_->db_paths.size()384k
) {
3699
384k
          filemetadata.db_path = db_options_->db_paths[path_id].path;
3700
18.4E
        } else {
3701
18.4E
          assert(!db_options_->db_paths.empty());
3702
0
          filemetadata.db_path = db_options_->db_paths.back().path;
3703
18.4E
        }
3704
0
        filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
3705
384k
        filemetadata.level = level;
3706
384k
        filemetadata.total_size = file->fd.GetTotalFileSize();
3707
384k
        filemetadata.base_size = file->fd.GetBaseFileSize();
3708
        // TODO: replace base_size with an accurate metadata size for
3709
        // uncompressed data. Look into: BlockBasedTableBuilder
3710
384k
        filemetadata.uncompressed_size = filemetadata.base_size +
3711
384k
            file->raw_key_size + file->raw_value_size;
3712
384k
        filemetadata.smallest = ConvertBoundaryValues(file->smallest);
3713
384k
        filemetadata.largest = ConvertBoundaryValues(file->largest);
3714
384k
        filemetadata.imported = file->imported;
3715
384k
        metadata->push_back(filemetadata);
3716
384k
      }
3717
21.3M
    }
3718
21.1M
  }
3719
21.1M
}
3720
3721
void VersionSet::GetObsoleteFiles(const FileNumbersProvider& pending_outputs,
3722
                                  std::vector<FileMetaData*>* files,
3723
1.85M
                                  std::vector<std::string>* manifest_filenames) {
3724
1.85M
  assert(manifest_filenames->empty());
3725
0
  obsolete_manifests_.swap(*manifest_filenames);
3726
1.85M
  std::vector<FileMetaData*> pending_files;
3727
1.85M
  for (auto f : obsolete_files_) {
3728
61.9k
    if (!pending_outputs.HasFileNumber(f->fd.GetNumber())) {
3729
61.7k
      files->push_back(f);
3730
61.7k
    } else {
3731
204
      pending_files.push_back(f);
3732
204
    }
3733
61.9k
  }
3734
1.85M
  obsolete_files_.swap(pending_files);
3735
1.85M
}
3736
3737
ColumnFamilyData* VersionSet::CreateColumnFamily(
3738
441k
    const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
3739
441k
  assert(edit->column_family_name_);
3740
3741
0
  Version* dummy_versions = new Version(nullptr, this);
3742
  // Ref() dummy version once so that later we can call Unref() to delete it
3743
  // by avoiding calling "delete" explicitly (~Version is private)
3744
441k
  dummy_versions->Ref();
3745
441k
  auto new_cfd = column_family_set_->CreateColumnFamily(
3746
441k
      *edit->column_family_name_,
3747
441k
      edit->column_family_,
3748
441k
      dummy_versions,
3749
441k
      cf_options);
3750
3751
441k
  Version* v = new Version(new_cfd, this, current_version_number_++);
3752
3753
  // Fill level target base information.
3754
441k
  v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
3755
441k
                                        *new_cfd->GetLatestMutableCFOptions());
3756
441k
  AppendVersion(new_cfd, v);
3757
  // GetLatestMutableCFOptions() is safe here without mutex since the
3758
  // cfd is not available to client
3759
441k
  new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
3760
441k
                             LastSequence());
3761
441k
  new_cfd->SetLogNumber(edit->log_number_.get_value_or(0));
3762
441k
  return new_cfd;
3763
441k
}
3764
3765
983k
void VersionSet::UnrefFile(ColumnFamilyData* cfd, FileMetaData* f) {
3766
983k
  if (f->Unref(cfd->table_cache())) {
3767
98.1k
    obsolete_files_.push_back(f);
3768
98.1k
  }
3769
983k
}
3770
3771
5
uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
3772
5
  uint64_t count = 0;
3773
14
  for (Version* v = dummy_versions->next_; v != dummy_versions; 
v = v->next_9
) {
3774
9
    count++;
3775
9
  }
3776
5
  return count;
3777
5
}
3778
3779
18
uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
3780
18
  std::unordered_set<uint64_t> unique_files;
3781
18
  uint64_t total_files_size = 0;
3782
50
  for (Version* v = dummy_versions->next_; v != dummy_versions; 
v = v->next_32
) {
3783
32
    VersionStorageInfo* storage_info = v->storage_info();
3784
256
    for (int level = 0; level < storage_info->num_levels_; 
level++224
) {
3785
224
      for (const auto& file_meta : storage_info->LevelFiles(level)) {
3786
86
        if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
3787
86
            unique_files.end()) {
3788
66
          unique_files.insert(file_meta->fd.packed_number_and_path_id);
3789
66
          total_files_size += file_meta->fd.GetTotalFileSize();
3790
66
        }
3791
86
      }
3792
224
    }
3793
32
  }
3794
18
  return total_files_size;
3795
18
}
3796
3797
void VersionSet::EnsureNonDecreasingLastSequence(
3798
    SequenceNumber prev_last_seq,
3799
31.3M
    SequenceNumber new_last_seq) {
3800
31.3M
  if (new_last_seq < prev_last_seq) {
3801
0
    LOG(DFATAL) << "New last sequence id " << new_last_seq << " is lower than "
3802
0
                << "the previous last sequence " << prev_last_seq;
3803
0
  }
3804
31.3M
}
3805
3806
void VersionSet::EnsureNonDecreasingFlushedFrontier(
3807
    const UserFrontier* prev_value,
3808
271k
    const UserFrontier& new_value) {
3809
271k
  if (!prev_value) {
3810
262k
    return;
3811
262k
  }
3812
9.60k
  if (!prev_value->IsUpdateValid(new_value, UpdateUserValueType::kLargest)) {
3813
0
    LOG(DFATAL) << "Attempt to decrease flushed frontier " << prev_value->ToString() << " to "
3814
0
                << new_value.ToString();
3815
0
  }
3816
9.60k
}
3817
3818
277k
void VersionSet::UpdateFlushedFrontierNoSanityChecking(UserFrontierPtr value) {
3819
277k
  UpdateUserFrontier(&flushed_frontier_, std::move(value), UpdateUserValueType::kLargest);
3820
277k
}
3821
3822
}  // namespace rocksdb