/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/version_edit.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/version_edit.h" |
25 | | |
26 | | #include "yb/rocksdb/db/version_edit.pb.h" |
27 | | #include "yb/rocksdb/db/version_set.h" |
28 | | #include "yb/rocksdb/metadata.h" |
29 | | #include "yb/rocksdb/util/coding.h" |
30 | | |
31 | | #include "yb/util/flag_tags.h" |
32 | | #include "yb/util/logging.h" |
33 | | #include "yb/util/slice.h" |
34 | | #include "yb/util/status_format.h" |
35 | | |
36 | | DEFINE_bool(use_per_file_metadata_for_flushed_frontier, false, |
37 | | "Allows taking per-file metadata in version edits into account when computing the " |
38 | | "flushed frontier."); |
39 | | TAG_FLAG(use_per_file_metadata_for_flushed_frontier, hidden); |
40 | | TAG_FLAG(use_per_file_metadata_for_flushed_frontier, advanced); |
41 | | |
42 | | namespace rocksdb { |
43 | | |
44 | 337k | uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { |
45 | 337k | assert(number <= kFileNumberMask); |
46 | 337k | return number | (path_id * (kFileNumberMask + 1)); |
47 | 337k | } |
48 | | |
49 | | FileMetaData::FileMetaData() |
50 | | : refs(0), |
51 | | being_compacted(false), |
52 | | table_reader_handle(nullptr), |
53 | | compensated_file_size(0), |
54 | | num_entries(0), |
55 | | num_deletions(0), |
56 | | raw_key_size(0), |
57 | | raw_value_size(0), |
58 | | init_stats_from_file(false), |
59 | 210k | marked_for_compaction(false) { |
60 | 210k | smallest.seqno = kMaxSequenceNumber; |
61 | 210k | largest.seqno = 0; |
62 | 210k | } |
63 | | |
64 | 70.9M | void FileMetaData::UpdateBoundaries(InternalKey key, const FileBoundaryValuesBase& source) { |
65 | 70.9M | largest.key = std::move(key); |
66 | 70.9M | if (smallest.key.empty()) { |
67 | 49.9k | smallest.key = largest.key; |
68 | 49.9k | } |
69 | 70.9M | UpdateBoundariesExceptKey(source, UpdateBoundariesType::kAll); |
70 | 70.9M | } |
71 | | |
72 | 1.07M | bool FileMetaData::Unref(TableCache* table_cache) { |
73 | 1.07M | refs--; |
74 | 1.07M | if (refs <= 0) { |
75 | 104k | if (table_reader_handle) { |
76 | 1.19k | DCHECK_ONLY_NOTNULL(table_cache); |
77 | 1.19k | table_cache->ReleaseHandle(table_reader_handle); |
78 | 1.19k | table_reader_handle = nullptr; |
79 | 1.19k | } |
80 | 104k | return true; |
81 | 967k | } else { |
82 | 967k | return false; |
83 | 967k | } |
84 | 1.07M | } |
85 | | |
86 | | void FileMetaData::UpdateBoundariesExceptKey(const FileBoundaryValuesBase& source, |
87 | 71.2M | UpdateBoundariesType type) { |
88 | 71.2M | if (type != UpdateBoundariesType::kLargest) { |
89 | 71.0M | smallest.seqno = std::min(smallest.seqno, source.seqno); |
90 | 71.0M | UserFrontier::Update( |
91 | 71.0M | source.user_frontier.get(), UpdateUserValueType::kSmallest, &smallest.user_frontier); |
92 | | |
93 | 41.2M | for (const auto& user_value : source.user_values) { |
94 | 41.2M | UpdateUserValue(&smallest.user_values, user_value, UpdateUserValueType::kSmallest); |
95 | 41.2M | } |
96 | 71.0M | } |
97 | 71.2M | if (type != UpdateBoundariesType::kSmallest) { |
98 | 71.1M | largest.seqno = std::max(largest.seqno, source.seqno); |
99 | 71.1M | UserFrontier::Update( |
100 | 71.1M | source.user_frontier.get(), UpdateUserValueType::kLargest, &largest.user_frontier); |
101 | | |
102 | 41.5M | for (const auto& user_value : source.user_values) { |
103 | 41.5M | UpdateUserValue(&largest.user_values, user_value, UpdateUserValueType::kLargest); |
104 | 41.5M | } |
105 | 71.1M | } |
106 | 71.2M | } |
107 | | |
108 | 3.56M | Slice FileMetaData::UserFilter() const { |
109 | 3.42M | return largest.user_frontier ? largest.user_frontier->Filter() : Slice(); |
110 | 3.56M | } |
111 | | |
112 | 47.0k | std::string FileMetaData::FrontiersToString() const { |
113 | 47.0k | return yb::Format("frontiers: { smallest: $0 largest: $1 }", |
114 | 43.8k | smallest.user_frontier ? smallest.user_frontier->ToString() : "none", |
115 | 43.8k | largest.user_frontier ? largest.user_frontier->ToString() : "none"); |
116 | 47.0k | } |
117 | | |
118 | 205 | std::string FileMetaData::ToString() const { |
119 | 205 | return yb::Format("{ number: $0 total_size: $1 base_size: $2 refs: $3 " |
120 | 205 | "being_compacted: $4 smallest: $5 largest: $6 }", |
121 | 205 | fd.GetNumber(), fd.GetTotalFileSize(), fd.GetBaseFileSize(), refs, |
122 | 205 | being_compacted, smallest, largest); |
123 | 205 | } |
124 | | |
125 | 2.57M | void VersionEdit::Clear() { |
126 | 2.57M | comparator_.reset(); |
127 | 2.57M | max_level_ = 0; |
128 | 2.57M | log_number_.reset(); |
129 | 2.57M | prev_log_number_.reset(); |
130 | 2.57M | last_sequence_.reset(); |
131 | 2.57M | next_file_number_.reset(); |
132 | 2.57M | max_column_family_.reset(); |
133 | 2.57M | deleted_files_.clear(); |
134 | 2.57M | new_files_.clear(); |
135 | 2.57M | column_family_ = 0; |
136 | 2.57M | column_family_name_.reset(); |
137 | 2.57M | is_column_family_drop_ = false; |
138 | 2.57M | flushed_frontier_.reset(); |
139 | 2.57M | } |
140 | | |
141 | 240k | void EncodeBoundaryValues(const FileBoundaryValues<InternalKey>& values, BoundaryValuesPB* out) { |
142 | 240k | auto key = values.key.Encode(); |
143 | 240k | out->set_key(key.data(), key.size()); |
144 | 240k | out->set_seqno(values.seqno); |
145 | 240k | if (values.user_frontier) { |
146 | 27.2k | values.user_frontier->ToPB(out->mutable_user_frontier()); |
147 | 27.2k | } |
148 | | |
149 | 121k | for (const auto& user_value : values.user_values) { |
150 | 121k | auto* value = out->add_user_values(); |
151 | 121k | value->set_tag(user_value->Tag()); |
152 | 121k | auto encoded_user_value = user_value->Encode(); |
153 | 121k | value->set_data(encoded_user_value.data(), encoded_user_value.size()); |
154 | 121k | } |
155 | 240k | } |
156 | | |
157 | | Status DecodeBoundaryValues(BoundaryValuesExtractor* extractor, |
158 | | const BoundaryValuesPB& values, |
159 | 72.8k | FileBoundaryValues<InternalKey>* out) { |
160 | 72.8k | out->key = InternalKey::DecodeFrom(values.key()); |
161 | 72.8k | out->seqno = values.seqno(); |
162 | 72.8k | if (extractor != nullptr) { |
163 | 12.5k | if (values.has_user_frontier()) { |
164 | 12.5k | out->user_frontier = extractor->CreateFrontier(); |
165 | 12.5k | out->user_frontier->FromPB(values.user_frontier()); |
166 | 12.5k | } |
167 | 51.9k | for (const auto &user_value : values.user_values()) { |
168 | 51.9k | UserBoundaryValuePtr decoded; |
169 | 51.9k | auto status = extractor->Decode(user_value.tag(), user_value.data(), &decoded); |
170 | 51.9k | if (!status.ok()) { |
171 | 0 | return status; |
172 | 0 | } |
173 | 51.9k | if (decoded) { |
174 | 51.9k | out->user_values.push_back(std::move(decoded)); |
175 | 51.9k | } |
176 | 51.9k | } |
177 | 60.2k | } else if (values.has_user_frontier()) { |
178 | 0 | return STATUS_FORMAT( |
179 | 0 | IllegalState, "Boundary values contains user frontier but extractor is not specified: $0", |
180 | 0 | values); |
181 | 0 | } |
182 | 72.8k | return Status::OK(); |
183 | 72.8k | } |
184 | | |
185 | 1.15M | bool VersionEdit::AppendEncodedTo(std::string* dst) const { |
186 | 1.15M | VersionEditPB pb; |
187 | 1.15M | auto result = EncodeTo(&pb); |
188 | 1.15M | if (result) { |
189 | 1.15M | pb.AppendToString(dst); |
190 | 1.15M | } |
191 | 1.15M | return result; |
192 | 1.15M | } |
193 | | |
194 | 1.67M | bool VersionEdit::EncodeTo(VersionEditPB* dst) const { |
195 | 1.67M | VersionEditPB& pb = *dst; |
196 | 1.67M | if (comparator_) { |
197 | 518k | pb.set_comparator(*comparator_); |
198 | 518k | } |
199 | 1.67M | if (log_number_) { |
200 | 886k | pb.set_log_number(*log_number_); |
201 | 886k | } |
202 | 1.67M | if (prev_log_number_) { |
203 | 307k | pb.set_prev_log_number(*prev_log_number_); |
204 | 307k | } |
205 | 1.67M | if (next_file_number_) { |
206 | 642k | pb.set_next_file_number(*next_file_number_); |
207 | 642k | } |
208 | 1.67M | if (last_sequence_) { |
209 | 642k | pb.set_last_sequence(*last_sequence_); |
210 | 642k | } |
211 | 1.67M | if (flushed_frontier_) { |
212 | 254k | flushed_frontier_->ToPB(pb.mutable_flushed_frontier()); |
213 | 254k | } |
214 | 1.67M | if (max_column_family_) { |
215 | 4.26k | pb.set_max_column_family(*max_column_family_); |
216 | 4.26k | } |
217 | | |
218 | 60.8k | for (const auto& deleted : deleted_files_) { |
219 | 60.8k | auto& deleted_file = *pb.add_deleted_files(); |
220 | 60.8k | deleted_file.set_level(deleted.first); |
221 | 60.8k | deleted_file.set_file_number(deleted.second); |
222 | 60.8k | } |
223 | | |
224 | 1.79M | for (size_t i = 0; i < new_files_.size(); i++) { |
225 | 120k | const FileMetaData& f = new_files_[i].second; |
226 | 120k | if (!f.smallest.key.Valid() || !f.largest.key.Valid()) { |
227 | 1 | return false; |
228 | 1 | } |
229 | 120k | auto& new_file = *pb.add_new_files(); |
230 | 120k | new_file.set_level(new_files_[i].first); |
231 | 120k | new_file.set_number(f.fd.GetNumber()); |
232 | 120k | new_file.set_total_file_size(f.fd.GetTotalFileSize()); |
233 | 120k | new_file.set_base_file_size(f.fd.GetBaseFileSize()); |
234 | 120k | EncodeBoundaryValues(f.smallest, new_file.mutable_smallest()); |
235 | 120k | EncodeBoundaryValues(f.largest, new_file.mutable_largest()); |
236 | 120k | if (f.fd.GetPathId() != 0) { |
237 | 402 | new_file.set_path_id(f.fd.GetPathId()); |
238 | 402 | } |
239 | 120k | if (f.marked_for_compaction) { |
240 | 19 | new_file.set_marked_for_compaction(true); |
241 | 19 | } |
242 | 120k | if (f.imported) { |
243 | 0 | new_file.set_imported(true); |
244 | 0 | } |
245 | 120k | } |
246 | | |
247 | | // 0 is default and does not need to be explicitly written |
248 | 1.67M | if (column_family_ != 0) { |
249 | 32.7k | pb.set_column_family(column_family_); |
250 | 32.7k | } |
251 | | |
252 | 1.67M | if (column_family_name_) { |
253 | 8.80k | pb.set_column_family_name(*column_family_name_); |
254 | 8.80k | } |
255 | | |
256 | 1.67M | if (is_column_family_drop_) { |
257 | 30 | pb.set_is_column_family_drop(true); |
258 | 30 | } |
259 | | |
260 | 1.67M | return true; |
261 | 1.67M | } |
262 | | |
263 | 376k | Status VersionEdit::DecodeFrom(BoundaryValuesExtractor* extractor, const Slice& src) { |
264 | 376k | Clear(); |
265 | 376k | VersionEditPB pb; |
266 | 376k | if (!pb.ParseFromArray(src.data(), static_cast<int>(src.size()))) { |
267 | 0 | return STATUS(Corruption, "VersionEdit"); |
268 | 0 | } |
269 | | |
270 | 318 | VLOG(1) << "Parsed version edit: " << pb.ShortDebugString(); |
271 | | |
272 | 376k | if (pb.has_comparator()) { |
273 | 11.6k | comparator_ = std::move(*pb.mutable_comparator()); |
274 | 11.6k | } |
275 | 376k | if (pb.has_log_number()) { |
276 | 359k | log_number_ = pb.log_number(); |
277 | 359k | } |
278 | 376k | if (pb.has_prev_log_number()) { |
279 | 20.6k | prev_log_number_ = pb.prev_log_number(); |
280 | 20.6k | } |
281 | 376k | if (pb.has_next_file_number()) { |
282 | 356k | next_file_number_ = pb.next_file_number(); |
283 | 356k | } |
284 | 376k | if (pb.has_last_sequence()) { |
285 | 356k | last_sequence_ = pb.last_sequence(); |
286 | 356k | } |
287 | 376k | if (extractor) { |
288 | | // BoundaryValuesExtractor could be not set when running from internal RocksDB tools. |
289 | 337k | if (pb.has_obsolete_last_op_id()) { |
290 | 0 | flushed_frontier_ = extractor->CreateFrontier(); |
291 | 0 | flushed_frontier_->FromOpIdPBDeprecated(pb.obsolete_last_op_id()); |
292 | 0 | } |
293 | 337k | if (pb.has_flushed_frontier()) { |
294 | 6.13k | flushed_frontier_ = extractor->CreateFrontier(); |
295 | 6.13k | flushed_frontier_->FromPB(pb.flushed_frontier()); |
296 | 6.13k | } |
297 | 337k | } |
298 | 376k | if (pb.has_max_column_family()) { |
299 | 2.60k | max_column_family_ = pb.max_column_family(); |
300 | 2.60k | } |
301 | | |
302 | 15.6k | for (const auto& deleted : pb.deleted_files()) { |
303 | 15.6k | deleted_files_.emplace(deleted.level(), deleted.file_number()); |
304 | 15.6k | } |
305 | | |
306 | 376k | const size_t new_files_size = static_cast<size_t>(pb.new_files_size()); |
307 | 376k | new_files_.resize(new_files_size); |
308 | | |
309 | 412k | for (size_t i = 0; i < new_files_size; ++i) { |
310 | 36.4k | auto& source = pb.new_files(static_cast<int>(i)); |
311 | 36.4k | int level = source.level(); |
312 | 36.4k | new_files_[i].first = level; |
313 | 36.4k | auto& meta = new_files_[i].second; |
314 | 36.4k | meta.fd = FileDescriptor(source.number(), |
315 | 36.4k | source.path_id(), |
316 | 36.4k | source.total_file_size(), |
317 | 36.4k | source.base_file_size()); |
318 | 36.4k | if (source.has_obsolete_last_op_id() && extractor) { |
319 | 0 | meta.largest.user_frontier = extractor->CreateFrontier(); |
320 | 0 | meta.largest.user_frontier->FromOpIdPBDeprecated(source.obsolete_last_op_id()); |
321 | 0 | } |
322 | 36.4k | auto status = DecodeBoundaryValues(extractor, source.smallest(), &meta.smallest); |
323 | 36.4k | if (!status.ok()) { |
324 | 0 | return status; |
325 | 0 | } |
326 | 36.4k | status = DecodeBoundaryValues(extractor, source.largest(), &meta.largest); |
327 | 36.4k | if (!status.ok()) { |
328 | 0 | return status; |
329 | 0 | } |
330 | 36.4k | meta.marked_for_compaction = source.marked_for_compaction(); |
331 | 36.4k | max_level_ = std::max(max_level_, level); |
332 | 36.4k | meta.imported = source.imported(); |
333 | | |
334 | | // Use the relevant fields in the "largest" frontier to update the "flushed" frontier for this |
335 | | // version edit. In practice this will only look at OpId and will discard hybrid time and |
336 | | // history cutoff (which probably won't be there anyway) coming from the boundary values. |
337 | | // |
338 | | // This is enabled only if --use_per_file_metadata_for_flushed_frontier is specified, until we |
339 | | // know that we don't have any clusters with wrong per-file flushed frontier metadata in version |
340 | | // edits, such as that restored from old backups from unrelated clusters. |
341 | 36.4k | if (FLAGS_use_per_file_metadata_for_flushed_frontier && meta.largest.user_frontier) { |
342 | 0 | if (!flushed_frontier_) { |
343 | 0 | LOG(DFATAL) << "Flushed frontier not present but a file's largest user frontier present: " |
344 | 0 | << meta.largest.user_frontier->ToString() |
345 | 0 | << ", version edit protobuf:\n" << pb.DebugString(); |
346 | 0 | } else if (!flushed_frontier_->Dominates(*meta.largest.user_frontier, |
347 | 0 | UpdateUserValueType::kLargest)) { |
348 | | // The flushed frontier of this VersionEdit must already include the information provided |
349 | | // by flushed frontiers of individual files. |
350 | 0 | LOG(DFATAL) << "Flushed frontier is present but has to be updated with data from " |
351 | 0 | << "file boundary: flushed_frontier=" << flushed_frontier_->ToString() |
352 | 0 | << ", a file's larget user frontier: " |
353 | 0 | << meta.largest.user_frontier->ToString() |
354 | 0 | << ", version edit protobuf:\n" << pb.DebugString(); |
355 | 0 | } |
356 | 0 | UpdateUserFrontier( |
357 | 0 | &flushed_frontier_, meta.largest.user_frontier, UpdateUserValueType::kLargest); |
358 | 0 | } |
359 | 36.4k | } |
360 | | |
361 | 376k | column_family_ = pb.column_family(); |
362 | | |
363 | 376k | if (!pb.column_family_name().empty()) { |
364 | 2.95k | column_family_name_ = pb.column_family_name(); |
365 | 2.95k | } |
366 | | |
367 | 376k | is_column_family_drop_ = pb.is_column_family_drop(); |
368 | | |
369 | 376k | return Status(); |
370 | 376k | } |
371 | | |
372 | 517k | std::string VersionEdit::DebugString(bool hex_key) const { |
373 | 517k | VersionEditPB pb; |
374 | 517k | EncodeTo(&pb); |
375 | 517k | return pb.DebugString(); |
376 | 517k | } |
377 | | |
378 | 333k | void VersionEdit::InitNewDB() { |
379 | 333k | log_number_ = 0; |
380 | 333k | next_file_number_ = VersionSet::kInitialNextFileNumber; |
381 | 333k | last_sequence_ = 0; |
382 | 333k | flushed_frontier_.reset(); |
383 | 333k | } |
384 | | |
385 | 7.65k | void VersionEdit::UpdateFlushedFrontier(UserFrontierPtr value) { |
386 | 7.65k | ModifyFlushedFrontier(std::move(value), FrontierModificationMode::kUpdate); |
387 | 7.65k | } |
388 | | |
389 | 251k | void VersionEdit::ModifyFlushedFrontier(UserFrontierPtr value, FrontierModificationMode mode) { |
390 | 251k | if (mode == FrontierModificationMode::kForce) { |
391 | 862 | flushed_frontier_ = std::move(value); |
392 | 862 | force_flushed_frontier_ = true; |
393 | 250k | } else { |
394 | 250k | UpdateUserFrontier(&flushed_frontier_, std::move(value), UpdateUserValueType::kLargest); |
395 | 250k | } |
396 | 251k | } |
397 | | |
398 | 0 | std::string FileDescriptor::ToString() const { |
399 | 0 | return yb::Format("{ number: $0 path_id: $1 total_file_size: $2 base_file_size: $3 }", |
400 | 0 | GetNumber(), GetPathId(), total_file_size, base_file_size); |
401 | 0 | } |
402 | | |
403 | | } // namespace rocksdb |