YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compacted_db_impl.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
#include "yb/rocksdb/db/compacted_db_impl.h"
23
#include "yb/rocksdb/db/db_impl.h"
24
#include "yb/rocksdb/db/version_set.h"
25
26
#include "yb/rocksdb/table/get_context.h"
27
#include "yb/rocksdb/table/table_reader.h"
28
29
namespace rocksdb {
30
31
extern void MarkKeyMayExist(void* arg);
32
extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
33
                      const Slice& v, bool hit_and_return);
34
35
CompactedDBImpl::CompactedDBImpl(
36
  const DBOptions& options, const std::string& dbname)
37
4
  : DBImpl(options, dbname) {
38
4
}
39
40
4
CompactedDBImpl::~CompactedDBImpl() {
41
4
}
42
43
18
size_t CompactedDBImpl::FindFile(const Slice& key) {
44
18
  size_t left = 0;
45
18
  size_t right = files_.num_files - 1;
46
49
  while (left < right) {
47
31
    size_t mid = (left + right) >> 1;
48
31
    const FdWithBoundaries& f = files_.files[mid];
49
31
    if (user_comparator_->Compare(ExtractUserKey(f.largest.key), key) < 0) {
50
      // Key at "mid.largest" is < "target".  Therefore all
51
      // files at or before "mid" are uninteresting.
52
9
      left = mid + 1;
53
22
    } else {
54
      // Key at "mid.largest" is >= "target".  Therefore all files
55
      // after "mid" are uninteresting.
56
22
      right = mid;
57
22
    }
58
31
  }
59
18
  return right;
60
18
}
61
62
Status CompactedDBImpl::Get(const ReadOptions& options,
63
12
     ColumnFamilyHandle*, const Slice& key, std::string* value) {
64
12
  GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
65
12
                         GetContext::kNotFound, key, value, nullptr, nullptr,
66
12
                         nullptr);
67
12
  LookupKey lkey(key, kMaxSequenceNumber);
68
12
  RETURN_NOT_OK(files_.files[FindFile(key)].fd.table_reader->Get(
69
12
      options, lkey.internal_key(), &get_context));
70
12
  if (get_context.State() == GetContext::kFound) {
71
8
    return Status::OK();
72
8
  }
73
4
  return STATUS(NotFound, "");
74
12
}
75
76
std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
77
    const std::vector<ColumnFamilyHandle*>&,
78
1
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
79
1
  autovector<TableReader*, 16> reader_list;
80
6
  for (const auto& key : keys) {
81
6
    const FdWithBoundaries& f = files_.files[FindFile(key)];
82
6
    if (user_comparator_->Compare(key, ExtractUserKey(f.smallest.key)) < 0) {
83
0
      reader_list.push_back(nullptr);
84
6
    } else {
85
6
      LookupKey lkey(key, kMaxSequenceNumber);
86
6
      f.fd.table_reader->Prepare(lkey.internal_key());
87
6
      reader_list.push_back(f.fd.table_reader);
88
6
    }
89
6
  }
90
1
  std::vector<Status> statuses(keys.size(), STATUS(NotFound, ""));
91
1
  values->resize(keys.size());
92
1
  int idx = 0;
93
6
  for (auto* r : reader_list) {
94
6
    if (r != nullptr) {
95
6
      GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
96
6
                             GetContext::kNotFound, keys[idx], &(*values)[idx],
97
6
                             nullptr, nullptr, nullptr);
98
6
      LookupKey lkey(keys[idx], kMaxSequenceNumber);
99
6
      auto status = r->Get(options, lkey.internal_key(), &get_context);
100
6
      if (!status.ok()) {
101
0
        statuses[idx] = status;
102
6
      } else if (get_context.State() == GetContext::kFound) {
103
3
        statuses[idx] = Status::OK();
104
3
      }
105
6
    }
106
6
    ++idx;
107
6
  }
108
1
  return statuses;
109
1
}
110
111
4
Status CompactedDBImpl::Init(const Options& options) {
112
4
  {
113
4
    std::unique_ptr<SuperVersion> old_superversion;
114
4
    InstrumentedMutexLock lock(&mutex_);
115
4
    ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
116
4
                              ColumnFamilyOptions(options));
117
4
    RETURN_NOT_OK(Recover({ cf }, true /* read only */, false));
118
4
    cfd_ = down_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
119
4
    old_superversion = cfd_->InstallSuperVersion(new SuperVersion(), &mutex_);
120
4
  }
121
0
  version_ = cfd_->GetSuperVersion()->current;
122
4
  user_comparator_ = cfd_->user_comparator();
123
4
  auto* vstorage = version_->storage_info();
124
4
  if (vstorage->num_non_empty_levels() == 0) {
125
1
    return STATUS(NotSupported, "no file exists");
126
1
  }
127
3
  const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
128
  // L0 should not have files
129
3
  if (l0.num_files > 1) {
130
1
    return STATUS(NotSupported, "L0 contain more than 1 file");
131
1
  }
132
2
  if (l0.num_files == 1) {
133
1
    if (vstorage->num_non_empty_levels() > 1) {
134
0
      return STATUS(NotSupported, "Both L0 and other level contain files");
135
0
    }
136
1
    files_ = l0;
137
1
    return Status::OK();
138
1
  }
139
140
1
  for (int i = 1; i < vstorage->num_non_empty_levels() - 1; 
++i0
) {
141
0
    if (vstorage->LevelFilesBrief(i).num_files > 0) {
142
0
      return STATUS(NotSupported, "Other levels also contain files");
143
0
    }
144
0
  }
145
146
1
  int level = vstorage->num_non_empty_levels() - 1;
147
1
  if (vstorage->LevelFilesBrief(level).num_files > 0) {
148
1
    files_ = vstorage->LevelFilesBrief(level);
149
1
    return Status::OK();
150
1
  }
151
0
  return STATUS(NotSupported, "no file exists");
152
1
}
153
154
Status CompactedDBImpl::Open(const Options& options,
155
19
                             const std::string& dbname, DB** dbptr) {
156
19
  *dbptr = nullptr;
157
158
19
  if (options.max_open_files != -1) {
159
15
    return STATUS(InvalidArgument, "require max_open_files = -1");
160
15
  }
161
4
  if (options.merge_operator.get() != nullptr) {
162
0
    return STATUS(InvalidArgument, "merge operator is not supported");
163
0
  }
164
4
  DBOptions db_options(options);
165
4
  std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
166
4
  Status s = db->Init(options);
167
4
  if (s.ok()) {
168
2
    RLOG(INFO_LEVEL, db->db_options_.info_log,
169
2
        "Opened the db as fully compacted mode");
170
2
    LogFlush(db->db_options_.info_log);
171
2
    *dbptr = db.release();
172
2
  }
173
4
  return s;
174
4
}
175
176
}   // namespace rocksdb
177
#endif  // ROCKSDB_LITE