YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_edit.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/version_edit.h"
25
26
#include "yb/rocksdb/db/version_edit.pb.h"
27
#include "yb/rocksdb/db/version_set.h"
28
#include "yb/rocksdb/metadata.h"
29
#include "yb/rocksdb/util/coding.h"
30
31
#include "yb/util/flag_tags.h"
32
#include "yb/util/logging.h"
33
#include "yb/util/slice.h"
34
#include "yb/util/status_format.h"
35
36
DEFINE_bool(use_per_file_metadata_for_flushed_frontier, false,
37
            "Allows taking per-file metadata in version edits into account when computing the "
38
            "flushed frontier.");
39
TAG_FLAG(use_per_file_metadata_for_flushed_frontier, hidden);
40
TAG_FLAG(use_per_file_metadata_for_flushed_frontier, advanced);
41
42
namespace rocksdb {
43
44
418k
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
45
418k
  assert(number <= kFileNumberMask);
46
0
  return number | (path_id * (kFileNumberMask + 1));
47
418k
}
48
49
FileMetaData::FileMetaData()
50
    : refs(0),
51
    being_compacted(false),
52
    table_reader_handle(nullptr),
53
    compensated_file_size(0),
54
    num_entries(0),
55
    num_deletions(0),
56
    raw_key_size(0),
57
    raw_value_size(0),
58
    init_stats_from_file(false),
59
282k
    marked_for_compaction(false) {
60
282k
  smallest.seqno = kMaxSequenceNumber;
61
282k
  largest.seqno = 0;
62
282k
}
63
64
158M
void FileMetaData::UpdateBoundaries(InternalKey key, const FileBoundaryValuesBase& source) {
65
158M
  largest.key = std::move(key);
66
158M
  if (smallest.key.empty()) {
67
54.1k
    smallest.key = largest.key;
68
54.1k
  }
69
158M
  UpdateBoundariesExceptKey(source, UpdateBoundariesType::kAll);
70
158M
}
71
72
1.10M
bool FileMetaData::Unref(TableCache* table_cache) {
73
1.10M
  refs--;
74
1.10M
  if (refs <= 0) {
75
114k
    if (table_reader_handle) {
76
1.17k
      DCHECK_ONLY_NOTNULL(table_cache);
77
1.17k
      table_cache->ReleaseHandle(table_reader_handle);
78
1.17k
      table_reader_handle = nullptr;
79
1.17k
    }
80
114k
    return true;
81
988k
  } else {
82
988k
    return false;
83
988k
  }
84
1.10M
}
85
86
void FileMetaData::UpdateBoundariesExceptKey(const FileBoundaryValuesBase& source,
87
159M
                                             UpdateBoundariesType type) {
88
159M
  if (type != UpdateBoundariesType::kLargest) {
89
158M
    smallest.seqno = std::min(smallest.seqno, source.seqno);
90
158M
    UserFrontier::Update(
91
158M
        source.user_frontier.get(), UpdateUserValueType::kSmallest, &smallest.user_frontier);
92
93
158M
    for (const auto& user_value : source.user_values) {
94
150M
      UpdateUserValue(&smallest.user_values, user_value, UpdateUserValueType::kSmallest);
95
150M
    }
96
158M
  }
97
159M
  if (type != UpdateBoundariesType::kSmallest) {
98
158M
    largest.seqno = std::max(largest.seqno, source.seqno);
99
158M
    UserFrontier::Update(
100
158M
        source.user_frontier.get(), UpdateUserValueType::kLargest, &largest.user_frontier);
101
102
158M
    for (const auto& user_value : source.user_values) {
103
151M
      UpdateUserValue(&largest.user_values, user_value, UpdateUserValueType::kLargest);
104
151M
    }
105
158M
  }
106
159M
}
107
108
14.4M
Slice FileMetaData::UserFilter() const {
109
14.4M
  return largest.user_frontier ? 
largest.user_frontier->Filter()12.4M
:
Slice()1.95M
;
110
14.4M
}
111
112
51.4k
std::string FileMetaData::FrontiersToString() const {
113
51.4k
  return yb::Format("frontiers: { smallest: $0 largest: $1 }",
114
51.4k
      smallest.user_frontier ? 
smallest.user_frontier->ToString()4.93k
:
"none"46.4k
,
115
51.4k
      largest.user_frontier ? 
largest.user_frontier->ToString()4.93k
:
"none"46.4k
);
116
51.4k
}
117
118
228
std::string FileMetaData::ToString() const {
119
228
  return yb::Format("{ number: $0 total_size: $1 base_size: $2 "
120
228
                    "being_compacted: $3 smallest: $4 largest: $5 }",
121
228
                    fd.GetNumber(), fd.GetTotalFileSize(), fd.GetBaseFileSize(),
122
228
                    being_compacted, smallest, largest);
123
228
}
124
125
3.12M
void VersionEdit::Clear() {
126
3.12M
  comparator_.reset();
127
3.12M
  max_level_ = 0;
128
3.12M
  log_number_.reset();
129
3.12M
  prev_log_number_.reset();
130
3.12M
  last_sequence_.reset();
131
3.12M
  next_file_number_.reset();
132
3.12M
  max_column_family_.reset();
133
3.12M
  deleted_files_.clear();
134
3.12M
  new_files_.clear();
135
3.12M
  column_family_ = 0;
136
3.12M
  column_family_name_.reset();
137
3.12M
  is_column_family_drop_ = false;
138
3.12M
  flushed_frontier_.reset();
139
3.12M
}
140
141
269k
void EncodeBoundaryValues(const FileBoundaryValues<InternalKey>& values, BoundaryValuesPB* out) {
142
269k
  auto key = values.key.Encode();
143
269k
  out->set_key(key.data(), key.size());
144
269k
  out->set_seqno(values.seqno);
145
269k
  if (values.user_frontier) {
146
50.7k
    values.user_frontier->ToPB(out->mutable_user_frontier());
147
50.7k
  }
148
149
269k
  for (const auto& user_value : values.user_values) {
150
267k
    auto* value = out->add_user_values();
151
267k
    value->set_tag(user_value->Tag());
152
267k
    auto encoded_user_value = user_value->Encode();
153
267k
    value->set_data(encoded_user_value.data(), encoded_user_value.size());
154
267k
  }
155
269k
}
156
157
Status DecodeBoundaryValues(BoundaryValuesExtractor* extractor,
158
                            const BoundaryValuesPB& values,
159
83.5k
                            FileBoundaryValues<InternalKey>* out) {
160
83.5k
  out->key = InternalKey::DecodeFrom(values.key());
161
83.5k
  out->seqno = values.seqno();
162
83.5k
  if (extractor != nullptr) {
163
23.1k
    if (values.has_user_frontier()) {
164
22.8k
      out->user_frontier = extractor->CreateFrontier();
165
22.8k
      out->user_frontier->FromPB(values.user_frontier());
166
22.8k
    }
167
101k
    for (const auto &user_value : values.user_values()) {
168
101k
      UserBoundaryValuePtr decoded;
169
101k
      auto status = extractor->Decode(user_value.tag(), user_value.data(), &decoded);
170
101k
      if (!status.ok()) {
171
0
        return status;
172
0
      }
173
101k
      if (decoded) {
174
101k
        out->user_values.push_back(std::move(decoded));
175
101k
      }
176
101k
    }
177
60.4k
  } else if (values.has_user_frontier()) {
178
0
    return STATUS_FORMAT(
179
0
        IllegalState, "Boundary values contains user frontier but extractor is not specified: $0",
180
0
        values);
181
0
  }
182
83.5k
  return Status::OK();
183
83.5k
}
184
185
1.32M
bool VersionEdit::AppendEncodedTo(std::string* dst) const {
186
1.32M
  VersionEditPB pb;
187
1.32M
  auto result = EncodeTo(&pb);
188
1.32M
  if (result) {
189
1.32M
    pb.AppendToString(dst);
190
1.32M
  }
191
1.32M
  return result;
192
1.32M
}
193
194
1.88M
bool VersionEdit::EncodeTo(VersionEditPB* dst) const {
195
1.88M
  VersionEditPB& pb = *dst;
196
1.88M
  if (comparator_) {
197
563k
    pb.set_comparator(*comparator_);
198
563k
  }
199
1.88M
  if (log_number_) {
200
1.02M
    pb.set_log_number(*log_number_);
201
1.02M
  }
202
1.88M
  if (prev_log_number_) {
203
333k
    pb.set_prev_log_number(*prev_log_number_);
204
333k
  }
205
1.88M
  if (next_file_number_) {
206
759k
    pb.set_next_file_number(*next_file_number_);
207
759k
  }
208
1.88M
  if (last_sequence_) {
209
759k
    pb.set_last_sequence(*last_sequence_);
210
759k
  }
211
1.88M
  if (flushed_frontier_) {
212
285k
    flushed_frontier_->ToPB(pb.mutable_flushed_frontier());
213
285k
  }
214
1.88M
  if (max_column_family_) {
215
4.26k
    pb.set_max_column_family(*max_column_family_);
216
4.26k
  }
217
218
1.88M
  for (const auto& deleted : deleted_files_) {
219
65.4k
    auto& deleted_file = *pb.add_deleted_files();
220
65.4k
    deleted_file.set_level(deleted.first);
221
65.4k
    deleted_file.set_file_number(deleted.second);
222
65.4k
  }
223
224
2.01M
  for (size_t i = 0; i < new_files_.size(); 
i++134k
) {
225
134k
    const FileMetaData& f = new_files_[i].second;
226
134k
    if (!f.smallest.key.Valid() || 
!f.largest.key.Valid()134k
) {
227
1
      return false;
228
1
    }
229
134k
    auto& new_file = *pb.add_new_files();
230
134k
    new_file.set_level(new_files_[i].first);
231
134k
    new_file.set_number(f.fd.GetNumber());
232
134k
    new_file.set_total_file_size(f.fd.GetTotalFileSize());
233
134k
    new_file.set_base_file_size(f.fd.GetBaseFileSize());
234
134k
    EncodeBoundaryValues(f.smallest, new_file.mutable_smallest());
235
134k
    EncodeBoundaryValues(f.largest, new_file.mutable_largest());
236
134k
    if (f.fd.GetPathId() != 0) {
237
402
      new_file.set_path_id(f.fd.GetPathId());
238
402
    }
239
134k
    if (f.marked_for_compaction) {
240
19
      new_file.set_marked_for_compaction(true);
241
19
    }
242
134k
    if (f.imported) {
243
0
      new_file.set_imported(true);
244
0
    }
245
134k
  }
246
247
  // 0 is default and does not need to be explicitly written
248
1.88M
  if (column_family_ != 0) {
249
32.6k
    pb.set_column_family(column_family_);
250
32.6k
  }
251
252
1.88M
  if (column_family_name_) {
253
8.80k
    pb.set_column_family_name(*column_family_name_);
254
8.80k
  }
255
256
1.88M
  if (is_column_family_drop_) {
257
30
    pb.set_is_column_family_drop(true);
258
30
  }
259
260
1.88M
  return true;
261
1.88M
}
262
263
477k
Status VersionEdit::DecodeFrom(BoundaryValuesExtractor* extractor, const Slice& src) {
264
477k
  Clear();
265
477k
  VersionEditPB pb;
266
477k
  if (!pb.ParseFromArray(src.data(), static_cast<int>(src.size()))) {
267
0
    return STATUS(Corruption, "VersionEdit");
268
0
  }
269
270
477k
  VLOG
(1) << "Parsed version edit: " << pb.ShortDebugString()177
;
271
272
477k
  if (pb.has_comparator()) {
273
14.3k
    comparator_ = std::move(*pb.mutable_comparator());
274
14.3k
  }
275
477k
  if (pb.has_log_number()) {
276
456k
    log_number_ = pb.log_number();
277
456k
  }
278
477k
  if (pb.has_prev_log_number()) {
279
24.8k
    prev_log_number_ = pb.prev_log_number();
280
24.8k
  }
281
477k
  if (pb.has_next_file_number()) {
282
452k
    next_file_number_ = pb.next_file_number();
283
452k
  }
284
477k
  if (pb.has_last_sequence()) {
285
452k
    last_sequence_ = pb.last_sequence();
286
452k
  }
287
477k
  if (extractor) {
288
    // BoundaryValuesExtractor could be not set when running from internal RocksDB tools.
289
438k
    if (pb.has_obsolete_last_op_id()) {
290
0
      flushed_frontier_ = extractor->CreateFrontier();
291
0
      flushed_frontier_->FromOpIdPBDeprecated(pb.obsolete_last_op_id());
292
0
    }
293
438k
    if (pb.has_flushed_frontier()) {
294
11.4k
      flushed_frontier_ = extractor->CreateFrontier();
295
11.4k
      flushed_frontier_->FromPB(pb.flushed_frontier());
296
11.4k
    }
297
438k
  }
298
477k
  if (pb.has_max_column_family()) {
299
2.60k
    max_column_family_ = pb.max_column_family();
300
2.60k
  }
301
302
477k
  for (const auto& deleted : pb.deleted_files()) {
303
15.8k
    deleted_files_.emplace(deleted.level(), deleted.file_number());
304
15.8k
  }
305
306
477k
  const size_t new_files_size = static_cast<size_t>(pb.new_files_size());
307
477k
  new_files_.resize(new_files_size);
308
309
519k
  for (size_t i = 0; i < new_files_size; 
++i41.7k
) {
310
41.7k
    auto& source = pb.new_files(static_cast<int>(i));
311
41.7k
    int level = source.level();
312
41.7k
    new_files_[i].first = level;
313
41.7k
    auto& meta = new_files_[i].second;
314
41.7k
    meta.fd = FileDescriptor(source.number(),
315
41.7k
                             source.path_id(),
316
41.7k
                             source.total_file_size(),
317
41.7k
                             source.base_file_size());
318
41.7k
    if (source.has_obsolete_last_op_id() && 
extractor0
) {
319
0
      meta.largest.user_frontier = extractor->CreateFrontier();
320
0
      meta.largest.user_frontier->FromOpIdPBDeprecated(source.obsolete_last_op_id());
321
0
    }
322
41.7k
    auto status = DecodeBoundaryValues(extractor, source.smallest(), &meta.smallest);
323
41.7k
    if (!status.ok()) {
324
0
      return status;
325
0
    }
326
41.7k
    status = DecodeBoundaryValues(extractor, source.largest(), &meta.largest);
327
41.7k
    if (!status.ok()) {
328
0
      return status;
329
0
    }
330
41.7k
    meta.marked_for_compaction = source.marked_for_compaction();
331
41.7k
    max_level_ = std::max(max_level_, level);
332
41.7k
    meta.imported = source.imported();
333
334
    // Use the relevant fields in the "largest" frontier to update the "flushed" frontier for this
335
    // version edit. In practice this will only look at OpId and will discard hybrid time and
336
    // history cutoff (which probably won't be there anyway) coming from the boundary values.
337
    //
338
    // This is enabled only if --use_per_file_metadata_for_flushed_frontier is specified, until we
339
    // know that we don't have any clusters with wrong per-file flushed frontier metadata in version
340
    // edits, such as that restored from old backups from unrelated clusters.
341
41.7k
    if (FLAGS_use_per_file_metadata_for_flushed_frontier && 
meta.largest.user_frontier0
) {
342
0
      if (!flushed_frontier_) {
343
0
        LOG(DFATAL) << "Flushed frontier not present but a file's largest user frontier present: "
344
0
                    << meta.largest.user_frontier->ToString()
345
0
                    << ", version edit protobuf:\n" << pb.DebugString();
346
0
      } else if (!flushed_frontier_->Dominates(*meta.largest.user_frontier,
347
0
                                               UpdateUserValueType::kLargest)) {
348
        // The flushed frontier of this VersionEdit must already include the information provided
349
        // by flushed frontiers of individual files.
350
0
        LOG(DFATAL) << "Flushed frontier is present but has to be updated with data from "
351
0
                    << "file boundary: flushed_frontier=" << flushed_frontier_->ToString()
352
0
                    << ", a file's larget user frontier: "
353
0
                    << meta.largest.user_frontier->ToString()
354
0
                    << ", version edit protobuf:\n" << pb.DebugString();
355
0
      }
356
0
      UpdateUserFrontier(
357
0
          &flushed_frontier_, meta.largest.user_frontier, UpdateUserValueType::kLargest);
358
0
    }
359
41.7k
  }
360
361
477k
  column_family_ = pb.column_family();
362
363
477k
  if (!pb.column_family_name().empty()) {
364
2.95k
    column_family_name_ = pb.column_family_name();
365
2.95k
  }
366
367
477k
  is_column_family_drop_ = pb.is_column_family_drop();
368
369
477k
  return Status();
370
477k
}
371
372
562k
std::string VersionEdit::DebugString(bool hex_key) const {
373
562k
  VersionEditPB pb;
374
562k
  EncodeTo(&pb);
375
562k
  return pb.DebugString();
376
562k
}
377
378
424k
void VersionEdit::InitNewDB() {
379
424k
  log_number_ = 0;
380
424k
  next_file_number_ = VersionSet::kInitialNextFileNumber;
381
424k
  last_sequence_ = 0;
382
424k
  flushed_frontier_.reset();
383
424k
}
384
385
void VersionEdit::AddTestFile(int level,
386
                              const FileDescriptor& fd,
387
                              const FileMetaData::BoundaryValues& smallest,
388
                              const FileMetaData::BoundaryValues& largest,
389
69
                              bool marked_for_compaction) {
390
69
  DCHECK_LE(smallest.seqno, largest.seqno);
391
69
  FileMetaData f;
392
69
  f.fd = fd;
393
69
  f.fd.table_reader = nullptr;
394
69
  f.smallest = smallest;
395
69
  f.largest = largest;
396
69
  f.marked_for_compaction = marked_for_compaction;
397
69
  new_files_.emplace_back(level, f);
398
69
}
399
400
22.7k
void VersionEdit::AddFile(int level, const FileMetaData& f) {
401
22.7k
  DCHECK_LE(f.smallest.seqno, f.largest.seqno);
402
22.7k
  new_files_.emplace_back(level, f);
403
22.7k
}
404
405
82.8k
void VersionEdit::AddCleanedFile(int level, const FileMetaData& f) {
406
82.8k
  DCHECK_LE(f.smallest.seqno, f.largest.seqno);
407
82.8k
  FileMetaData nf;
408
82.8k
  nf.fd = f.fd;
409
82.8k
  nf.fd.table_reader = nullptr;
410
82.8k
  nf.smallest = f.smallest;
411
82.8k
  nf.largest = f.largest;
412
82.8k
  nf.marked_for_compaction = f.marked_for_compaction;
413
82.8k
  nf.imported = f.imported;
414
82.8k
  new_files_.emplace_back(level, std::move(nf));
415
82.8k
}
416
417
15.3k
void VersionEdit::UpdateFlushedFrontier(UserFrontierPtr value) {
418
15.3k
  ModifyFlushedFrontier(std::move(value), FrontierModificationMode::kUpdate);
419
15.3k
}
420
421
279k
void VersionEdit::ModifyFlushedFrontier(UserFrontierPtr value, FrontierModificationMode mode) {
422
279k
  if (mode == FrontierModificationMode::kForce) {
423
1.80k
    flushed_frontier_ = std::move(value);
424
1.80k
    force_flushed_frontier_ = true;
425
277k
  } else {
426
277k
    UpdateUserFrontier(&flushed_frontier_, std::move(value), UpdateUserValueType::kLargest);
427
277k
  }
428
279k
}
429
430
0
std::string FileDescriptor::ToString() const {
431
0
  return yb::Format("{ number: $0 path_id: $1 total_file_size: $2 base_file_size: $3 }",
432
0
                    GetNumber(), GetPathId(), total_file_size, base_file_size);
433
0
}
434
435
}  // namespace rocksdb