YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_impl_experimental.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/db_impl.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <vector>
32
33
#include "yb/rocksdb/db/column_family.h"
34
#include "yb/rocksdb/db/job_context.h"
35
#include "yb/rocksdb/db/version_set.h"
36
#include "yb/rocksdb/status.h"
37
38
namespace rocksdb {
39
40
#ifndef ROCKSDB_LITE
41
Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
42
6
                                   const Slice* begin, const Slice* end) {
43
6
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
44
6
  auto cfd = cfh->cfd();
45
6
  InternalKey start_key, end_key;
46
6
  if (begin != nullptr) {
47
2
    start_key = InternalKey::MaxPossibleForUserKey(*begin);
48
2
  }
49
6
  if (end != nullptr) {
50
2
    end_key = InternalKey::MinPossibleForUserKey(*end);
51
2
  }
52
6
  {
53
6
    InstrumentedMutexLock l(&mutex_);
54
6
    auto vstorage = cfd->current()->storage_info();
55
18
    for (int level = 0; level < vstorage->num_non_empty_levels() - 1; ++level) {
56
12
      std::vector<FileMetaData*> inputs;
57
12
      vstorage->GetOverlappingInputs(
58
8
          level, begin == nullptr ? nullptr : &start_key,
59
8
          end == nullptr ? nullptr : &end_key, &inputs);
60
12
      for (auto f : inputs) {
61
12
        f->marked_for_compaction = true;
62
12
      }
63
12
    }
64
    // Since we have some more files to compact, we should also recompute
65
    // compaction score
66
6
    vstorage->ComputeCompactionScore(*cfd->GetLatestMutableCFOptions(),
67
6
                                     CompactionOptionsFIFO());
68
6
    SchedulePendingCompaction(cfd);
69
6
    MaybeScheduleFlushOrCompaction();
70
6
  }
71
6
  return Status::OK();
72
6
}
73
74
3
Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
75
3
  assert(column_family);
76
77
3
  if (target_level < 1) {
78
0
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
79
0
        "PromoteL0 FAILED. Invalid target level %d\n", target_level);
80
0
    return STATUS(InvalidArgument, "Invalid target level");
81
0
  }
82
83
3
  Status status;
84
3
  VersionEdit edit;
85
3
  JobContext job_context(next_job_id_.fetch_add(1), true);
86
3
  {
87
3
    InstrumentedMutexLock l(&mutex_);
88
3
    auto* cfd = static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
89
3
    const auto* vstorage = cfd->current()->storage_info();
90
91
3
    if (target_level >= vstorage->num_levels()) {
92
0
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
93
0
          "PromoteL0 FAILED. Target level %d does not exist\n", target_level);
94
0
      job_context.Clean();
95
0
      return STATUS(InvalidArgument, "Target level does not exist");
96
0
    }
97
98
    // Sort L0 files by range.
99
3
    const InternalKeyComparator* icmp = cfd->internal_comparator().get();
100
3
    auto l0_files = vstorage->LevelFiles(0);
101
3
    std::sort(l0_files.begin(), l0_files.end(),
102
6
              [icmp](FileMetaData* f1, FileMetaData* f2) {
103
6
                return icmp->Compare(f1->largest.key, f2->largest.key) < 0;
104
6
              });
105
106
    // Check that no L0 file is being compacted and that they have
107
    // non-overlapping ranges.
108
9
    for (size_t i = 0; i < l0_files.size(); ++i) {
109
7
      auto f = l0_files[i];
110
7
      if (f->being_compacted) {
111
0
        RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
112
0
            "PromoteL0 FAILED. File %" PRIu64 " being compacted\n",
113
0
            f->fd.GetNumber());
114
0
        job_context.Clean();
115
0
        return STATUS(InvalidArgument, "PromoteL0 called during L0 compaction");
116
0
      }
117
118
7
      if (i == 0) continue;
119
4
      auto prev_f = l0_files[i - 1];
120
4
      if (icmp->Compare(prev_f->largest.key, f->smallest.key) >= 0) {
121
1
        RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
122
1
            "PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64
123
1
            " have overlapping ranges\n",
124
1
            prev_f->fd.GetNumber(), f->fd.GetNumber());
125
1
        job_context.Clean();
126
1
        return STATUS(InvalidArgument, "L0 has overlapping files");
127
1
      }
128
4
    }
129
130
    // Check that all levels up to target_level are empty.
131
4
    for (int level = 1; level <= target_level; ++level) {
132
3
      if (vstorage->NumLevelFiles(level) > 0) {
133
1
        RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
134
1
            "PromoteL0 FAILED. Level %d not empty\n", level);
135
1
        job_context.Clean();
136
1
        return STATUS(InvalidArgument,
137
1
            "All levels up to target_level "
138
1
            "must be empty");
139
1
      }
140
3
    }
141
142
1
    edit.SetColumnFamily(cfd->GetID());
143
4
    for (const auto& f : l0_files) {
144
4
      edit.DeleteFile(0, f->fd.GetNumber());
145
4
      edit.AddCleanedFile(target_level, *f);
146
4
    }
147
148
1
    status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
149
1
                                    &edit, &mutex_, directories_.GetDbDir());
150
1
    if (status.ok()) {
151
1
      InstallSuperVersionAndScheduleWorkWrapper(
152
1
          cfd, &job_context, *cfd->GetLatestMutableCFOptions());
153
1
    }
154
1
  }  // lock released here
155
1
  LogFlush(db_options_.info_log);
156
1
  job_context.Clean();
157
158
1
  return status;
159
2
}
160
#endif  // ROCKSDB_LITE
161
162
}  // namespace rocksdb