/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_iterator.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Use of this source code is governed by a BSD-style license that can be |
2 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
3 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
4 | | // This source code is licensed under the BSD-style license found in the |
5 | | // LICENSE file in the root directory of this source tree. An additional grant |
6 | | // of patent rights can be found in the PATENTS file in the same directory. |
7 | | // |
8 | | // The following only applies to changes made to this file as part of YugaByte development. |
9 | | // |
10 | | // Portions Copyright (c) YugaByte, Inc. |
11 | | // |
12 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
13 | | // in compliance with the License. You may obtain a copy of the License at |
14 | | // |
15 | | // http://www.apache.org/licenses/LICENSE-2.0 |
16 | | // |
17 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
18 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
19 | | // or implied. See the License for the specific language governing permissions and limitations |
20 | | // under the License. |
21 | | // |
22 | | |
23 | | #include "yb/rocksdb/db/compaction_iterator.h" |
24 | | #include <iterator> |
25 | | |
26 | | #include "yb/rocksdb/table/internal_iterator.h" |
27 | | |
28 | | #include "yb/util/status_log.h" |
29 | | |
30 | | namespace rocksdb { |
31 | | |
32 | | CompactionIterator::CompactionIterator( |
33 | | InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, |
34 | | SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, |
35 | | SequenceNumber earliest_write_conflict_snapshot, |
36 | | bool expect_valid_internal_key, Compaction* compaction, |
37 | | CompactionFilter* compaction_filter, LogBuffer* log_buffer) |
38 | | : input_(input), |
39 | | cmp_(cmp), |
40 | | merge_helper_(merge_helper), |
41 | | snapshots_(snapshots), |
42 | | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), |
43 | | expect_valid_internal_key_(expect_valid_internal_key), |
44 | | compaction_(compaction), |
45 | | compaction_filter_(compaction_filter), |
46 | | log_buffer_(log_buffer), |
47 | 38.6k | merge_out_iter_(merge_helper_) { |
48 | 38.6k | assert(compaction_filter_ == nullptr || compaction_ != nullptr); |
49 | 38.6k | bottommost_level_ = |
50 | 27.9k | compaction_ == nullptr ? false : compaction_->bottommost_level(); |
51 | 38.6k | if (compaction_ != nullptr) { |
52 | 10.7k | level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); |
53 | 10.7k | } |
54 | | |
55 | 38.6k | if (snapshots_->size() == 0) { |
56 | | // optimize for fast path if there are no snapshots |
57 | 37.8k | visible_at_tip_ = last_sequence; |
58 | 37.8k | earliest_snapshot_ = visible_at_tip_; |
59 | 37.8k | latest_snapshot_ = 0; |
60 | 753 | } else { |
61 | 753 | visible_at_tip_ = 0; |
62 | 753 | earliest_snapshot_ = snapshots_->at(0); |
63 | 753 | latest_snapshot_ = snapshots_->back(); |
64 | 753 | } |
65 | 38.6k | if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) { |
66 | 1 | ignore_snapshots_ = true; |
67 | 38.6k | } else { |
68 | 38.6k | ignore_snapshots_ = false; |
69 | 38.6k | } |
70 | | |
71 | 38.6k | if (compaction_filter_) { |
72 | 2.43k | auto ranges = compaction_filter_->GetLiveRanges(); |
73 | 2.79k | while (!ranges.empty()) { |
74 | 358 | auto range = ranges.back(); |
75 | 358 | ranges.pop_back(); |
76 | 358 | DCHECK(range.first.Less(range.second)); |
77 | 358 | if (!live_key_ranges_stack_.empty()) { |
78 | 179 | DCHECK(live_key_ranges_stack_.back().first.GreaterOrEqual(range.second)); |
79 | 179 | } |
80 | 358 | auto user_key_pair = std::make_pair(range.first, range.second); |
81 | 358 | live_key_ranges_stack_.push_back(user_key_pair); |
82 | 358 | } |
83 | 2.43k | } |
84 | 38.6k | } |
85 | | |
86 | 31.0k | void CompactionIterator::ResetRecordCounts() { |
87 | 31.0k | iter_stats_.num_record_drop_user = 0; |
88 | 31.0k | iter_stats_.num_record_drop_hidden = 0; |
89 | 31.0k | iter_stats_.num_record_drop_obsolete = 0; |
90 | 31.0k | } |
91 | | |
92 | 38.6k | void CompactionIterator::SeekToFirst() { |
93 | 38.6k | NextFromInput(); |
94 | 38.6k | PrepareOutput(); |
95 | 38.6k | } |
96 | | |
97 | 71.1M | void CompactionIterator::Next() { |
98 | | // If there is a merge output, return it before continuing to process the |
99 | | // input. |
100 | 71.1M | if (merge_out_iter_.Valid()) { |
101 | 290k | merge_out_iter_.Next(); |
102 | | |
103 | | // Check if we returned all records of the merge output. |
104 | 290k | if (merge_out_iter_.Valid()) { |
105 | 158k | key_ = merge_out_iter_.key(); |
106 | 158k | value_ = merge_out_iter_.value(); |
107 | 158k | bool valid_key __attribute__((__unused__)) = |
108 | 158k | ParseInternalKey(key_, &ikey_); |
109 | | // MergeUntil stops when it encounters a corrupt key and does not |
110 | | // include them in the result, so we expect the keys here to be valid. |
111 | 158k | assert(valid_key); |
112 | | // Keep current_key_ in sync. |
113 | 158k | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
114 | 158k | key_ = current_key_.GetKey(); |
115 | 158k | ikey_.user_key = current_key_.GetUserKey(); |
116 | 158k | valid_ = true; |
117 | 131k | } else { |
118 | | // MergeHelper moves the iterator to the first record after the merged |
119 | | // records, so even though we reached the end of the merge output, we do |
120 | | // not want to advance the iterator. |
121 | 131k | NextFromInput(); |
122 | 131k | } |
123 | 70.8M | } else { |
124 | | // Only advance the input iterator if there is no merge output and the |
125 | | // iterator is not already at the next record. |
126 | 70.8M | if (!at_next_) { |
127 | 70.5M | input_->Next(); |
128 | 70.5M | } |
129 | 70.8M | NextFromInput(); |
130 | 70.8M | } |
131 | | |
132 | 71.1M | if (valid_) { |
133 | | // Record that we've ouputted a record for the current key. |
134 | 71.1M | has_outputted_key_ = true; |
135 | 71.1M | } |
136 | | |
137 | 71.1M | PrepareOutput(); |
138 | 71.1M | } |
139 | | |
140 | 70.9M | void CompactionIterator::NextFromInput() { |
141 | 70.9M | at_next_ = false; |
142 | 70.9M | valid_ = false; |
143 | | |
144 | 161M | while (!valid_ && input_->Valid()) { |
145 | 90.1M | key_ = input_->key(); |
146 | 90.1M | value_ = input_->value(); |
147 | 90.1M | iter_stats_.num_input_records++; |
148 | | |
149 | 90.1M | if (!ParseInternalKey(key_, &ikey_)) { |
150 | | // If `expect_valid_internal_key_` is false, return the corrupted key |
151 | | // and let the caller decide what to do with it. |
152 | | // TODO(noetzli): We should have a more elegant solution for this. |
153 | 402 | if (expect_valid_internal_key_) { |
154 | 0 | assert(!"Corrupted internal key not expected."); |
155 | 0 | status_ = STATUS(Corruption, "Corrupted internal key not expected."); |
156 | 0 | break; |
157 | 0 | } |
158 | 402 | key_ = current_key_.SetKey(key_); |
159 | 402 | has_current_user_key_ = false; |
160 | 402 | current_user_key_sequence_ = kMaxSequenceNumber; |
161 | 402 | current_user_key_snapshot_ = 0; |
162 | 402 | iter_stats_.num_input_corrupt_records++; |
163 | 402 | valid_ = true; |
164 | 402 | break; |
165 | 402 | } |
166 | | |
167 | 90.1M | { |
168 | 90.1M | auto updated_live_range = false; |
169 | 90.1M | while (!live_key_ranges_stack_.empty() && |
170 | 3.88M | !live_key_ranges_stack_.back().second.empty() && |
171 | 2.41M | live_key_ranges_stack_.back().second.Less(ikey_.user_key)) { |
172 | | // As long as the active range is before the compaction iterator's current progress, pop to |
173 | | // the next active range. |
174 | 226 | live_key_ranges_stack_.pop_back(); |
175 | 226 | updated_live_range = true; |
176 | 226 | } |
177 | 90.1M | if (updated_live_range) { |
178 | 226 | if (live_key_ranges_stack_.empty()) { |
179 | | // If we've iterated past the last active range, we're done. |
180 | 47 | valid_ = false; |
181 | 47 | return; |
182 | 47 | } |
183 | | |
184 | 179 | auto next_range_start = live_key_ranges_stack_.back().first; |
185 | 179 | if (ikey_.user_key.Less(next_range_start)) { |
186 | | // If the next active range starts after the current key, then seek to it and continue. |
187 | 79 | IterKey iter_key; |
188 | 79 | iter_key.SetInternalKey(next_range_start, kMaxSequenceNumber, kValueTypeForSeek); |
189 | 79 | input_->Seek(iter_key.GetKey()); |
190 | 79 | continue; |
191 | 79 | } |
192 | 90.1M | } |
193 | 90.1M | } |
194 | | |
195 | | // Update input statistics |
196 | 90.1M | if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { |
197 | 3.56M | iter_stats_.num_input_deletion_records++; |
198 | 3.56M | } |
199 | 90.1M | iter_stats_.total_input_raw_key_bytes += key_.size(); |
200 | 90.1M | iter_stats_.total_input_raw_value_bytes += value_.size(); |
201 | | |
202 | | // Check whether the user key changed. After this if statement current_key_ |
203 | | // is a copy of the current input key (maybe converted to a delete by the |
204 | | // compaction filter). ikey_.user_key is pointing to the copy. |
205 | 90.1M | if (!has_current_user_key_ || |
206 | 89.8M | !cmp_->Equal(ikey_.user_key, current_user_key_)) { |
207 | | // First occurrence of this user key |
208 | 72.2M | key_ = current_key_.SetKey(key_, &ikey_); |
209 | 72.2M | current_user_key_ = ikey_.user_key; |
210 | 72.2M | has_current_user_key_ = true; |
211 | 72.2M | has_outputted_key_ = false; |
212 | 72.2M | current_user_key_sequence_ = kMaxSequenceNumber; |
213 | 72.2M | current_user_key_snapshot_ = 0; |
214 | | |
215 | | // apply the compaction filter to the first occurrence of the user key |
216 | 72.2M | if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && |
217 | 12.2M | (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || |
218 | 12.2M | ignore_snapshots_)) { |
219 | | // If the user has specified a compaction filter and the sequence |
220 | | // number is greater than any external snapshot, then invoke the |
221 | | // filter. If the return value of the compaction filter is true, |
222 | | // replace the entry with a deletion marker. |
223 | 12.2M | bool value_changed = false; |
224 | 12.2M | bool to_delete = false; |
225 | 12.2M | compaction_filter_value_.clear(); |
226 | 12.2M | to_delete = compaction_filter_->Filter( |
227 | 12.2M | compaction_->level(), ikey_.user_key, value_, |
228 | 12.2M | &compaction_filter_value_, &value_changed) != FilterDecision::kKeep; |
229 | 12.2M | if (to_delete) { |
230 | | // convert the current key to a delete |
231 | 104k | ikey_.type = kTypeDeletion; |
232 | 104k | current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); |
233 | | // no value associated with delete |
234 | 104k | value_.clear(); |
235 | 104k | iter_stats_.num_record_drop_user++; |
236 | 12.1M | } else if (value_changed) { |
237 | 1.40M | value_ = compaction_filter_value_; |
238 | 1.40M | } |
239 | 12.2M | } |
240 | 17.8M | } else { |
241 | | // Update the current key to reflect the new sequence number/type without |
242 | | // copying the user key. |
243 | | // TODO(rven): Compaction filter does not process keys in this path |
244 | | // Need to have the compaction filter process multiple versions |
245 | | // if we have versions on both sides of a snapshot |
246 | 17.8M | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
247 | 17.8M | key_ = current_key_.GetKey(); |
248 | 17.8M | ikey_.user_key = current_key_.GetUserKey(); |
249 | 17.8M | } |
250 | | |
251 | | // If there are no snapshots, then this kv affect visibility at tip. |
252 | | // Otherwise, search though all existing snapshots to find the earliest |
253 | | // snapshot that is affected by this kv. |
254 | 90.1M | SequenceNumber last_sequence __attribute__((__unused__)) = |
255 | 90.1M | current_user_key_sequence_; |
256 | 90.1M | current_user_key_sequence_ = ikey_.sequence; |
257 | 90.1M | SequenceNumber last_snapshot = current_user_key_snapshot_; |
258 | 90.1M | SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot |
259 | 90.1M | current_user_key_snapshot_ = |
260 | 90.1M | visible_at_tip_ ? visible_at_tip_ |
261 | 18.4E | : FindEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); |
262 | | |
263 | 90.1M | if (clear_and_output_next_key_) { |
264 | | // In the previous iteration we encountered a single delete that we could |
265 | | // not compact out. We will keep this Put, but can drop it's data. |
266 | | // (See Optimization 3, below.) |
267 | 19 | assert(ikey_.type == kTypeValue); |
268 | 19 | assert(current_user_key_snapshot_ == last_snapshot); |
269 | | |
270 | 19 | value_.clear(); |
271 | 19 | valid_ = true; |
272 | 19 | clear_and_output_next_key_ = false; |
273 | 90.1M | } else if (ikey_.type == kTypeSingleDeletion) { |
274 | | // We can compact out a SingleDelete if: |
275 | | // 1) We encounter the corresponding PUT -OR- we know that this key |
276 | | // doesn't appear past this output level |
277 | | // =AND= |
278 | | // 2) We've already returned a record in this snapshot -OR- |
279 | | // there are no earlier earliest_write_conflict_snapshot. |
280 | | // |
281 | | // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to |
282 | | // allow Transactions to do write-conflict checking (if we compacted away |
283 | | // all keys, then we wouldn't know that a write happened in this |
284 | | // snapshot). If there is no earlier snapshot, then we know that there |
285 | | // are no active transactions that need to know about any writes. |
286 | | // |
287 | | // Optimization 3: |
288 | | // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT |
289 | | // true, then we must output a SingleDelete. In this case, we will decide |
290 | | // to also output the PUT. While we are compacting less by outputting the |
291 | | // PUT now, hopefully this will lead to better compaction in the future |
292 | | // when Rule 2 is later true (Ie, We are hoping we can later compact out |
293 | | // both the SingleDelete and the Put, while we couldn't if we only |
294 | | // outputted the SingleDelete now). |
295 | | // In this case, we can save space by removing the PUT's value as it will |
296 | | // never be read. |
297 | | // |
298 | | // Deletes and Merges are not supported on the same key that has a |
299 | | // SingleDelete as it is not possible to correctly do any partial |
300 | | // compaction of such a combination of operations. The result of mixing |
301 | | // those operations for a given key is documented as being undefined. So |
302 | | // we can choose how to handle such a combinations of operations. We will |
303 | | // try to compact out as much as we can in these cases. |
304 | | |
305 | | // The easiest way to process a SingleDelete during iteration is to peek |
306 | | // ahead at the next key. |
307 | 177k | ParsedInternalKey next_ikey; |
308 | 177k | input_->Next(); |
309 | | |
310 | | // Check whether the next key exists, is not corrupt, and is the same key |
311 | | // as the single delete. |
312 | 177k | if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && |
313 | 177k | cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { |
314 | | // Check whether the next key belongs to the same snapshot as the |
315 | | // SingleDelete. |
316 | 247 | if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) { |
317 | 186 | if (next_ikey.type == kTypeSingleDeletion) { |
318 | | // We encountered two SingleDeletes in a row. This could be due to |
319 | | // unexpected user input. |
320 | | // Skip the first SingleDelete and let the next iteration decide how |
321 | | // to handle the second SingleDelete |
322 | | |
323 | | // First SingleDelete has been skipped since we already called |
324 | | // input_->Next(). |
325 | 6 | ++iter_stats_.num_record_drop_obsolete; |
326 | 180 | } else if ((ikey_.sequence <= earliest_write_conflict_snapshot_) || |
327 | 161 | has_outputted_key_) { |
328 | | // Found a matching value, we can drop the single delete and the |
329 | | // value. It is safe to drop both records since we've already |
330 | | // outputted a key in this snapshot, or there is no earlier |
331 | | // snapshot (Rule 2 above). |
332 | | |
333 | | // Note: it doesn't matter whether the second key is a Put or if it |
334 | | // is an unexpected Merge or Delete. We will compact it out |
335 | | // either way. |
336 | 161 | ++iter_stats_.num_record_drop_hidden; |
337 | 161 | ++iter_stats_.num_record_drop_obsolete; |
338 | | // Already called input_->Next() once. Call it a second time to |
339 | | // skip past the second key. |
340 | 161 | input_->Next(); |
341 | 19 | } else { |
342 | | // Found a matching value, but we cannot drop both keys since |
343 | | // there is an earlier snapshot and we need to leave behind a record |
344 | | // to know that a write happened in this snapshot (Rule 2 above). |
345 | | // Clear the value and output the SingleDelete. (The value will be |
346 | | // outputted on the next iteration.) |
347 | 19 | ++iter_stats_.num_record_drop_hidden; |
348 | | |
349 | | // Setting valid_ to true will output the current SingleDelete |
350 | 19 | valid_ = true; |
351 | | |
352 | | // Set up the Put to be outputted in the next iteration. |
353 | | // (Optimization 3). |
354 | 19 | clear_and_output_next_key_ = true; |
355 | 19 | } |
356 | 61 | } else { |
357 | | // We hit the next snapshot without hitting a put, so the iterator |
358 | | // returns the single delete. |
359 | 61 | valid_ = true; |
360 | 61 | } |
361 | 177k | } else { |
362 | | // We are at the end of the input, could not parse the next key, or hit |
363 | | // the next key. The iterator returns the single delete if the key |
364 | | // possibly exists beyond the current output level. We set |
365 | | // has_current_user_key to false so that if the iterator is at the next |
366 | | // key, we do not compare it again against the previous key at the next |
367 | | // iteration. If the next key is corrupt, we return before the |
368 | | // comparison, so the value of has_current_user_key does not matter. |
369 | 177k | has_current_user_key_ = false; |
370 | 177k | if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ && |
371 | 37 | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
372 | 8 | &level_ptrs_)) { |
373 | | // Key doesn't exist outside of this range. |
374 | | // Can compact out this SingleDelete. |
375 | 8 | ++iter_stats_.num_record_drop_obsolete; |
376 | 177k | } else { |
377 | | // Output SingleDelete |
378 | 177k | valid_ = true; |
379 | 177k | } |
380 | 177k | } |
381 | | |
382 | 177k | if (valid_) { |
383 | 177k | at_next_ = true; |
384 | 177k | } |
385 | 89.9M | } else if (last_snapshot == current_user_key_snapshot_) { |
386 | | // If the earliest snapshot is which this key is visible in |
387 | | // is the same as the visibility of a previous instance of the |
388 | | // same key, then this kv is not visible in any snapshot. |
389 | | // Hidden by an newer entry for same user key |
390 | | // TODO: why not > ? |
391 | | // |
392 | | // Note: Dropping this key will not affect TransactionDB write-conflict |
393 | | // checking since there has already been a record returned for this key |
394 | | // in this snapshot. |
395 | 17.8M | assert(last_sequence >= current_user_key_sequence_); |
396 | 17.8M | ++iter_stats_.num_record_drop_hidden; // (A) |
397 | 17.8M | input_->Next(); |
398 | 72.1M | } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && |
399 | 1.58M | ikey_.sequence <= earliest_snapshot_ && |
400 | 1.58M | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
401 | 1.44M | &level_ptrs_)) { |
402 | | // TODO(noetzli): This is the only place where we use compaction_ |
403 | | // (besides the constructor). We should probably get rid of this |
404 | | // dependency and find a way to do similar filtering during flushes. |
405 | | // |
406 | | // For this user key: |
407 | | // (1) there is no data in higher levels |
408 | | // (2) data in lower levels will have larger sequence numbers |
409 | | // (3) data in layers that are being compacted here and have |
410 | | // smaller sequence numbers will be dropped in the next |
411 | | // few iterations of this loop (by rule (A) above). |
412 | | // Therefore this deletion marker is obsolete and can be dropped. |
413 | | // |
414 | | // Note: Dropping this Delete will not affect TransactionDB |
415 | | // write-conflict checking since it is earlier than any snapshot. |
416 | 1.44M | ++iter_stats_.num_record_drop_obsolete; |
417 | 1.44M | input_->Next(); |
418 | 70.6M | } else if (ikey_.type == kTypeMerge) { |
419 | 131k | if (!merge_helper_->HasOperator()) { |
420 | 0 | LOG_TO_BUFFER(log_buffer_, "Options::merge_operator is null."); |
421 | 0 | status_ = STATUS(InvalidArgument, |
422 | 0 | "merge_operator is not properly initialized."); |
423 | 0 | return; |
424 | 0 | } |
425 | | |
426 | | // We know the merge type entry is not hidden, otherwise we would |
427 | | // have hit (A) |
428 | | // We encapsulate the merge related state machine in a different |
429 | | // object to minimize change to the existing flow. |
430 | 131k | WARN_NOT_OK(merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_), |
431 | 131k | "Merge until failed"); |
432 | 131k | merge_out_iter_.SeekToFirst(); |
433 | | |
434 | 131k | if (merge_out_iter_.Valid()) { |
435 | | // NOTE: key, value, and ikey_ refer to old entries. |
436 | | // These will be correctly set below. |
437 | 131k | key_ = merge_out_iter_.key(); |
438 | 131k | value_ = merge_out_iter_.value(); |
439 | 131k | bool valid_key __attribute__((__unused__)) = |
440 | 131k | ParseInternalKey(key_, &ikey_); |
441 | | // MergeUntil stops when it encounters a corrupt key and does not |
442 | | // include them in the result, so we expect the keys here to valid. |
443 | 131k | assert(valid_key); |
444 | | // Keep current_key_ in sync. |
445 | 131k | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
446 | 131k | key_ = current_key_.GetKey(); |
447 | 131k | ikey_.user_key = current_key_.GetUserKey(); |
448 | 131k | valid_ = true; |
449 | 4 | } else { |
450 | | // all merge operands were filtered out. reset the user key, since the |
451 | | // batch consumed by the merge operator should not shadow any keys |
452 | | // coming after the merges |
453 | 4 | has_current_user_key_ = false; |
454 | 4 | } |
455 | 70.5M | } else { |
456 | 70.5M | valid_ = true; |
457 | 70.5M | } |
458 | 90.1M | } |
459 | 70.9M | } |
460 | | |
461 | 71.1M | void CompactionIterator::PrepareOutput() { |
462 | | // Zeroing out the sequence number leads to better compression. |
463 | | // If this is the bottommost level (no files in lower levels) |
464 | | // and the earliest snapshot is larger than this seqno |
465 | | // and the userkey differs from the last userkey in compaction |
466 | | // then we can squash the seqno to zero. |
467 | | |
468 | | // This is safe for TransactionDB write-conflict checking since transactions |
469 | | // only care about sequence number larger than any active snapshots. |
470 | 71.1M | if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ && |
471 | 21.9M | ikey_.type != kTypeMerge && |
472 | 21.8M | !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) { |
473 | 21.8M | assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); |
474 | 21.8M | ikey_.sequence = 0; |
475 | 21.8M | current_key_.UpdateInternalKey(0, ikey_.type); |
476 | 21.8M | } |
477 | 71.1M | } |
478 | | |
479 | | inline SequenceNumber CompactionIterator::FindEarliestVisibleSnapshot( |
480 | 41.5k | SequenceNumber in, SequenceNumber* prev_snapshot) { |
481 | 41.5k | assert(snapshots_->size()); |
482 | 41.5k | SequenceNumber prev __attribute__((unused)) = 0; |
483 | 79.1M | for (const auto cur : *snapshots_) { |
484 | 79.1M | assert(prev <= cur); |
485 | 79.1M | if (cur >= in) { |
486 | 36.4k | *prev_snapshot = prev; |
487 | 36.4k | return cur; |
488 | 36.4k | } |
489 | 79.0M | prev = cur; |
490 | 79.0M | assert(prev); |
491 | 79.0M | } |
492 | 5.06k | *prev_snapshot = prev; |
493 | 5.06k | return kMaxSequenceNumber; |
494 | 41.5k | } |
495 | | |
496 | | } // namespace rocksdb |