YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_edit.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/version_edit.h"
25
26
#include "yb/rocksdb/db/version_edit.pb.h"
27
#include "yb/rocksdb/db/version_set.h"
28
#include "yb/rocksdb/metadata.h"
29
#include "yb/rocksdb/util/coding.h"
30
31
#include "yb/util/flag_tags.h"
32
#include "yb/util/logging.h"
33
#include "yb/util/slice.h"
34
#include "yb/util/status_format.h"
35
36
DEFINE_bool(use_per_file_metadata_for_flushed_frontier, false,
37
            "Allows taking per-file metadata in version edits into account when computing the "
38
            "flushed frontier.");
39
TAG_FLAG(use_per_file_metadata_for_flushed_frontier, hidden);
40
TAG_FLAG(use_per_file_metadata_for_flushed_frontier, advanced);
41
42
namespace rocksdb {
43
44
337k
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
45
337k
  assert(number <= kFileNumberMask);
46
337k
  return number | (path_id * (kFileNumberMask + 1));
47
337k
}
48
49
FileMetaData::FileMetaData()
50
    : refs(0),
51
    being_compacted(false),
52
    table_reader_handle(nullptr),
53
    compensated_file_size(0),
54
    num_entries(0),
55
    num_deletions(0),
56
    raw_key_size(0),
57
    raw_value_size(0),
58
    init_stats_from_file(false),
59
210k
    marked_for_compaction(false) {
60
210k
  smallest.seqno = kMaxSequenceNumber;
61
210k
  largest.seqno = 0;
62
210k
}
63
64
70.9M
void FileMetaData::UpdateBoundaries(InternalKey key, const FileBoundaryValuesBase& source) {
65
70.9M
  largest.key = std::move(key);
66
70.9M
  if (smallest.key.empty()) {
67
49.9k
    smallest.key = largest.key;
68
49.9k
  }
69
70.9M
  UpdateBoundariesExceptKey(source, UpdateBoundariesType::kAll);
70
70.9M
}
71
72
1.07M
bool FileMetaData::Unref(TableCache* table_cache) {
73
1.07M
  refs--;
74
1.07M
  if (refs <= 0) {
75
104k
    if (table_reader_handle) {
76
1.19k
      DCHECK_ONLY_NOTNULL(table_cache);
77
1.19k
      table_cache->ReleaseHandle(table_reader_handle);
78
1.19k
      table_reader_handle = nullptr;
79
1.19k
    }
80
104k
    return true;
81
967k
  } else {
82
967k
    return false;
83
967k
  }
84
1.07M
}
85
86
void FileMetaData::UpdateBoundariesExceptKey(const FileBoundaryValuesBase& source,
87
71.2M
                                             UpdateBoundariesType type) {
88
71.2M
  if (type != UpdateBoundariesType::kLargest) {
89
71.0M
    smallest.seqno = std::min(smallest.seqno, source.seqno);
90
71.0M
    UserFrontier::Update(
91
71.0M
        source.user_frontier.get(), UpdateUserValueType::kSmallest, &smallest.user_frontier);
92
93
41.2M
    for (const auto& user_value : source.user_values) {
94
41.2M
      UpdateUserValue(&smallest.user_values, user_value, UpdateUserValueType::kSmallest);
95
41.2M
    }
96
71.0M
  }
97
71.2M
  if (type != UpdateBoundariesType::kSmallest) {
98
71.1M
    largest.seqno = std::max(largest.seqno, source.seqno);
99
71.1M
    UserFrontier::Update(
100
71.1M
        source.user_frontier.get(), UpdateUserValueType::kLargest, &largest.user_frontier);
101
102
41.5M
    for (const auto& user_value : source.user_values) {
103
41.5M
      UpdateUserValue(&largest.user_values, user_value, UpdateUserValueType::kLargest);
104
41.5M
    }
105
71.1M
  }
106
71.2M
}
107
108
3.56M
Slice FileMetaData::UserFilter() const {
109
3.42M
  return largest.user_frontier ? largest.user_frontier->Filter() : Slice();
110
3.56M
}
111
112
47.0k
std::string FileMetaData::FrontiersToString() const {
113
47.0k
  return yb::Format("frontiers: { smallest: $0 largest: $1 }",
114
43.8k
      smallest.user_frontier ? smallest.user_frontier->ToString() : "none",
115
43.8k
      largest.user_frontier ? largest.user_frontier->ToString() : "none");
116
47.0k
}
117
118
205
std::string FileMetaData::ToString() const {
119
205
  return yb::Format("{ number: $0 total_size: $1 base_size: $2 refs: $3 "
120
205
                    "being_compacted: $4 smallest: $5 largest: $6 }",
121
205
                    fd.GetNumber(), fd.GetTotalFileSize(), fd.GetBaseFileSize(), refs,
122
205
                    being_compacted, smallest, largest);
123
205
}
124
125
2.57M
void VersionEdit::Clear() {
126
2.57M
  comparator_.reset();
127
2.57M
  max_level_ = 0;
128
2.57M
  log_number_.reset();
129
2.57M
  prev_log_number_.reset();
130
2.57M
  last_sequence_.reset();
131
2.57M
  next_file_number_.reset();
132
2.57M
  max_column_family_.reset();
133
2.57M
  deleted_files_.clear();
134
2.57M
  new_files_.clear();
135
2.57M
  column_family_ = 0;
136
2.57M
  column_family_name_.reset();
137
2.57M
  is_column_family_drop_ = false;
138
2.57M
  flushed_frontier_.reset();
139
2.57M
}
140
141
240k
void EncodeBoundaryValues(const FileBoundaryValues<InternalKey>& values, BoundaryValuesPB* out) {
142
240k
  auto key = values.key.Encode();
143
240k
  out->set_key(key.data(), key.size());
144
240k
  out->set_seqno(values.seqno);
145
240k
  if (values.user_frontier) {
146
27.2k
    values.user_frontier->ToPB(out->mutable_user_frontier());
147
27.2k
  }
148
149
121k
  for (const auto& user_value : values.user_values) {
150
121k
    auto* value = out->add_user_values();
151
121k
    value->set_tag(user_value->Tag());
152
121k
    auto encoded_user_value = user_value->Encode();
153
121k
    value->set_data(encoded_user_value.data(), encoded_user_value.size());
154
121k
  }
155
240k
}
156
157
Status DecodeBoundaryValues(BoundaryValuesExtractor* extractor,
158
                            const BoundaryValuesPB& values,
159
72.8k
                            FileBoundaryValues<InternalKey>* out) {
160
72.8k
  out->key = InternalKey::DecodeFrom(values.key());
161
72.8k
  out->seqno = values.seqno();
162
72.8k
  if (extractor != nullptr) {
163
12.5k
    if (values.has_user_frontier()) {
164
12.5k
      out->user_frontier = extractor->CreateFrontier();
165
12.5k
      out->user_frontier->FromPB(values.user_frontier());
166
12.5k
    }
167
51.9k
    for (const auto &user_value : values.user_values()) {
168
51.9k
      UserBoundaryValuePtr decoded;
169
51.9k
      auto status = extractor->Decode(user_value.tag(), user_value.data(), &decoded);
170
51.9k
      if (!status.ok()) {
171
0
        return status;
172
0
      }
173
51.9k
      if (decoded) {
174
51.9k
        out->user_values.push_back(std::move(decoded));
175
51.9k
      }
176
51.9k
    }
177
60.2k
  } else if (values.has_user_frontier()) {
178
0
    return STATUS_FORMAT(
179
0
        IllegalState, "Boundary values contains user frontier but extractor is not specified: $0",
180
0
        values);
181
0
  }
182
72.8k
  return Status::OK();
183
72.8k
}
184
185
1.15M
bool VersionEdit::AppendEncodedTo(std::string* dst) const {
186
1.15M
  VersionEditPB pb;
187
1.15M
  auto result = EncodeTo(&pb);
188
1.15M
  if (result) {
189
1.15M
    pb.AppendToString(dst);
190
1.15M
  }
191
1.15M
  return result;
192
1.15M
}
193
194
1.67M
bool VersionEdit::EncodeTo(VersionEditPB* dst) const {
195
1.67M
  VersionEditPB& pb = *dst;
196
1.67M
  if (comparator_) {
197
518k
    pb.set_comparator(*comparator_);
198
518k
  }
199
1.67M
  if (log_number_) {
200
886k
    pb.set_log_number(*log_number_);
201
886k
  }
202
1.67M
  if (prev_log_number_) {
203
307k
    pb.set_prev_log_number(*prev_log_number_);
204
307k
  }
205
1.67M
  if (next_file_number_) {
206
642k
    pb.set_next_file_number(*next_file_number_);
207
642k
  }
208
1.67M
  if (last_sequence_) {
209
642k
    pb.set_last_sequence(*last_sequence_);
210
642k
  }
211
1.67M
  if (flushed_frontier_) {
212
254k
    flushed_frontier_->ToPB(pb.mutable_flushed_frontier());
213
254k
  }
214
1.67M
  if (max_column_family_) {
215
4.26k
    pb.set_max_column_family(*max_column_family_);
216
4.26k
  }
217
218
60.8k
  for (const auto& deleted : deleted_files_) {
219
60.8k
    auto& deleted_file = *pb.add_deleted_files();
220
60.8k
    deleted_file.set_level(deleted.first);
221
60.8k
    deleted_file.set_file_number(deleted.second);
222
60.8k
  }
223
224
1.79M
  for (size_t i = 0; i < new_files_.size(); i++) {
225
120k
    const FileMetaData& f = new_files_[i].second;
226
120k
    if (!f.smallest.key.Valid() || !f.largest.key.Valid()) {
227
1
      return false;
228
1
    }
229
120k
    auto& new_file = *pb.add_new_files();
230
120k
    new_file.set_level(new_files_[i].first);
231
120k
    new_file.set_number(f.fd.GetNumber());
232
120k
    new_file.set_total_file_size(f.fd.GetTotalFileSize());
233
120k
    new_file.set_base_file_size(f.fd.GetBaseFileSize());
234
120k
    EncodeBoundaryValues(f.smallest, new_file.mutable_smallest());
235
120k
    EncodeBoundaryValues(f.largest, new_file.mutable_largest());
236
120k
    if (f.fd.GetPathId() != 0) {
237
402
      new_file.set_path_id(f.fd.GetPathId());
238
402
    }
239
120k
    if (f.marked_for_compaction) {
240
19
      new_file.set_marked_for_compaction(true);
241
19
    }
242
120k
    if (f.imported) {
243
0
      new_file.set_imported(true);
244
0
    }
245
120k
  }
246
247
  // 0 is default and does not need to be explicitly written
248
1.67M
  if (column_family_ != 0) {
249
32.7k
    pb.set_column_family(column_family_);
250
32.7k
  }
251
252
1.67M
  if (column_family_name_) {
253
8.80k
    pb.set_column_family_name(*column_family_name_);
254
8.80k
  }
255
256
1.67M
  if (is_column_family_drop_) {
257
30
    pb.set_is_column_family_drop(true);
258
30
  }
259
260
1.67M
  return true;
261
1.67M
}
262
263
376k
Status VersionEdit::DecodeFrom(BoundaryValuesExtractor* extractor, const Slice& src) {
264
376k
  Clear();
265
376k
  VersionEditPB pb;
266
376k
  if (!pb.ParseFromArray(src.data(), static_cast<int>(src.size()))) {
267
0
    return STATUS(Corruption, "VersionEdit");
268
0
  }
269
270
318
  VLOG(1) << "Parsed version edit: " << pb.ShortDebugString();
271
272
376k
  if (pb.has_comparator()) {
273
11.6k
    comparator_ = std::move(*pb.mutable_comparator());
274
11.6k
  }
275
376k
  if (pb.has_log_number()) {
276
359k
    log_number_ = pb.log_number();
277
359k
  }
278
376k
  if (pb.has_prev_log_number()) {
279
20.6k
    prev_log_number_ = pb.prev_log_number();
280
20.6k
  }
281
376k
  if (pb.has_next_file_number()) {
282
356k
    next_file_number_ = pb.next_file_number();
283
356k
  }
284
376k
  if (pb.has_last_sequence()) {
285
356k
    last_sequence_ = pb.last_sequence();
286
356k
  }
287
376k
  if (extractor) {
288
    // BoundaryValuesExtractor could be not set when running from internal RocksDB tools.
289
337k
    if (pb.has_obsolete_last_op_id()) {
290
0
      flushed_frontier_ = extractor->CreateFrontier();
291
0
      flushed_frontier_->FromOpIdPBDeprecated(pb.obsolete_last_op_id());
292
0
    }
293
337k
    if (pb.has_flushed_frontier()) {
294
6.13k
      flushed_frontier_ = extractor->CreateFrontier();
295
6.13k
      flushed_frontier_->FromPB(pb.flushed_frontier());
296
6.13k
    }
297
337k
  }
298
376k
  if (pb.has_max_column_family()) {
299
2.60k
    max_column_family_ = pb.max_column_family();
300
2.60k
  }
301
302
15.6k
  for (const auto& deleted : pb.deleted_files()) {
303
15.6k
    deleted_files_.emplace(deleted.level(), deleted.file_number());
304
15.6k
  }
305
306
376k
  const size_t new_files_size = static_cast<size_t>(pb.new_files_size());
307
376k
  new_files_.resize(new_files_size);
308
309
412k
  for (size_t i = 0; i < new_files_size; ++i) {
310
36.4k
    auto& source = pb.new_files(static_cast<int>(i));
311
36.4k
    int level = source.level();
312
36.4k
    new_files_[i].first = level;
313
36.4k
    auto& meta = new_files_[i].second;
314
36.4k
    meta.fd = FileDescriptor(source.number(),
315
36.4k
                             source.path_id(),
316
36.4k
                             source.total_file_size(),
317
36.4k
                             source.base_file_size());
318
36.4k
    if (source.has_obsolete_last_op_id() && extractor) {
319
0
      meta.largest.user_frontier = extractor->CreateFrontier();
320
0
      meta.largest.user_frontier->FromOpIdPBDeprecated(source.obsolete_last_op_id());
321
0
    }
322
36.4k
    auto status = DecodeBoundaryValues(extractor, source.smallest(), &meta.smallest);
323
36.4k
    if (!status.ok()) {
324
0
      return status;
325
0
    }
326
36.4k
    status = DecodeBoundaryValues(extractor, source.largest(), &meta.largest);
327
36.4k
    if (!status.ok()) {
328
0
      return status;
329
0
    }
330
36.4k
    meta.marked_for_compaction = source.marked_for_compaction();
331
36.4k
    max_level_ = std::max(max_level_, level);
332
36.4k
    meta.imported = source.imported();
333
334
    // Use the relevant fields in the "largest" frontier to update the "flushed" frontier for this
335
    // version edit. In practice this will only look at OpId and will discard hybrid time and
336
    // history cutoff (which probably won't be there anyway) coming from the boundary values.
337
    //
338
    // This is enabled only if --use_per_file_metadata_for_flushed_frontier is specified, until we
339
    // know that we don't have any clusters with wrong per-file flushed frontier metadata in version
340
    // edits, such as that restored from old backups from unrelated clusters.
341
36.4k
    if (FLAGS_use_per_file_metadata_for_flushed_frontier && meta.largest.user_frontier) {
342
0
      if (!flushed_frontier_) {
343
0
        LOG(DFATAL) << "Flushed frontier not present but a file's largest user frontier present: "
344
0
                    << meta.largest.user_frontier->ToString()
345
0
                    << ", version edit protobuf:\n" << pb.DebugString();
346
0
      } else if (!flushed_frontier_->Dominates(*meta.largest.user_frontier,
347
0
                                               UpdateUserValueType::kLargest)) {
348
        // The flushed frontier of this VersionEdit must already include the information provided
349
        // by flushed frontiers of individual files.
350
0
        LOG(DFATAL) << "Flushed frontier is present but has to be updated with data from "
351
0
                    << "file boundary: flushed_frontier=" << flushed_frontier_->ToString()
352
0
                    << ", a file's larget user frontier: "
353
0
                    << meta.largest.user_frontier->ToString()
354
0
                    << ", version edit protobuf:\n" << pb.DebugString();
355
0
      }
356
0
      UpdateUserFrontier(
357
0
          &flushed_frontier_, meta.largest.user_frontier, UpdateUserValueType::kLargest);
358
0
    }
359
36.4k
  }
360
361
376k
  column_family_ = pb.column_family();
362
363
376k
  if (!pb.column_family_name().empty()) {
364
2.95k
    column_family_name_ = pb.column_family_name();
365
2.95k
  }
366
367
376k
  is_column_family_drop_ = pb.is_column_family_drop();
368
369
376k
  return Status();
370
376k
}
371
372
517k
std::string VersionEdit::DebugString(bool hex_key) const {
373
517k
  VersionEditPB pb;
374
517k
  EncodeTo(&pb);
375
517k
  return pb.DebugString();
376
517k
}
377
378
333k
void VersionEdit::InitNewDB() {
379
333k
  log_number_ = 0;
380
333k
  next_file_number_ = VersionSet::kInitialNextFileNumber;
381
333k
  last_sequence_ = 0;
382
333k
  flushed_frontier_.reset();
383
333k
}
384
385
7.65k
void VersionEdit::UpdateFlushedFrontier(UserFrontierPtr value) {
386
7.65k
  ModifyFlushedFrontier(std::move(value), FrontierModificationMode::kUpdate);
387
7.65k
}
388
389
251k
void VersionEdit::ModifyFlushedFrontier(UserFrontierPtr value, FrontierModificationMode mode) {
390
251k
  if (mode == FrontierModificationMode::kForce) {
391
862
    flushed_frontier_ = std::move(value);
392
862
    force_flushed_frontier_ = true;
393
250k
  } else {
394
250k
    UpdateUserFrontier(&flushed_frontier_, std::move(value), UpdateUserValueType::kLargest);
395
250k
  }
396
251k
}
397
398
0
std::string FileDescriptor::ToString() const {
399
0
  return yb::Format("{ number: $0 path_id: $1 total_file_size: $2 base_file_size: $3 }",
400
0
                    GetNumber(), GetPathId(), total_file_size, base_file_size);
401
0
}
402
403
}  // namespace rocksdb