YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_iterator.cc
Line
Count
Source (jump to first uncovered line)
1
// Use of this source code is governed by a BSD-style license that can be
2
// found in the LICENSE file. See the AUTHORS file for names of contributors.
3
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
4
//  This source code is licensed under the BSD-style license found in the
5
//  LICENSE file in the root directory of this source tree. An additional grant
6
//  of patent rights can be found in the PATENTS file in the same directory.
7
//
8
// The following only applies to changes made to this file as part of YugaByte development.
9
//
10
// Portions Copyright (c) YugaByte, Inc.
11
//
12
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
13
// in compliance with the License.  You may obtain a copy of the License at
14
//
15
// http://www.apache.org/licenses/LICENSE-2.0
16
//
17
// Unless required by applicable law or agreed to in writing, software distributed under the License
18
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
19
// or implied.  See the License for the specific language governing permissions and limitations
20
// under the License.
21
//
22
23
#include "yb/rocksdb/db/compaction_iterator.h"
24
#include <iterator>
25
26
#include "yb/rocksdb/table/internal_iterator.h"
27
28
#include "yb/util/status_log.h"
29
30
namespace rocksdb {
31
32
CompactionIterator::CompactionIterator(
33
    InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
34
    SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
35
    SequenceNumber earliest_write_conflict_snapshot,
36
    bool expect_valid_internal_key, Compaction* compaction,
37
    CompactionFilter* compaction_filter, LogBuffer* log_buffer)
38
    : input_(input),
39
      cmp_(cmp),
40
      merge_helper_(merge_helper),
41
      snapshots_(snapshots),
42
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
43
      expect_valid_internal_key_(expect_valid_internal_key),
44
      compaction_(compaction),
45
      compaction_filter_(compaction_filter),
46
      log_buffer_(log_buffer),
47
38.6k
      merge_out_iter_(merge_helper_) {
48
38.6k
  assert(compaction_filter_ == nullptr || compaction_ != nullptr);
49
38.6k
  bottommost_level_ =
50
27.9k
      compaction_ == nullptr ? false : compaction_->bottommost_level();
51
38.6k
  if (compaction_ != nullptr) {
52
10.7k
    level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
53
10.7k
  }
54
55
38.6k
  if (snapshots_->size() == 0) {
56
    // optimize for fast path if there are no snapshots
57
37.8k
    visible_at_tip_ = last_sequence;
58
37.8k
    earliest_snapshot_ = visible_at_tip_;
59
37.8k
    latest_snapshot_ = 0;
60
753
  } else {
61
753
    visible_at_tip_ = 0;
62
753
    earliest_snapshot_ = snapshots_->at(0);
63
753
    latest_snapshot_ = snapshots_->back();
64
753
  }
65
38.6k
  if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) {
66
1
    ignore_snapshots_ = true;
67
38.6k
  } else {
68
38.6k
    ignore_snapshots_ = false;
69
38.6k
  }
70
71
38.6k
  if (compaction_filter_) {
72
2.43k
    auto ranges = compaction_filter_->GetLiveRanges();
73
2.79k
    while (!ranges.empty()) {
74
358
      auto range = ranges.back();
75
358
      ranges.pop_back();
76
358
      DCHECK(range.first.Less(range.second));
77
358
      if (!live_key_ranges_stack_.empty()) {
78
179
        DCHECK(live_key_ranges_stack_.back().first.GreaterOrEqual(range.second));
79
179
      }
80
358
      auto user_key_pair = std::make_pair(range.first, range.second);
81
358
      live_key_ranges_stack_.push_back(user_key_pair);
82
358
    }
83
2.43k
  }
84
38.6k
}
85
86
31.0k
void CompactionIterator::ResetRecordCounts() {
87
31.0k
  iter_stats_.num_record_drop_user = 0;
88
31.0k
  iter_stats_.num_record_drop_hidden = 0;
89
31.0k
  iter_stats_.num_record_drop_obsolete = 0;
90
31.0k
}
91
92
38.6k
void CompactionIterator::SeekToFirst() {
93
38.6k
  NextFromInput();
94
38.6k
  PrepareOutput();
95
38.6k
}
96
97
71.1M
void CompactionIterator::Next() {
98
  // If there is a merge output, return it before continuing to process the
99
  // input.
100
71.1M
  if (merge_out_iter_.Valid()) {
101
290k
    merge_out_iter_.Next();
102
103
    // Check if we returned all records of the merge output.
104
290k
    if (merge_out_iter_.Valid()) {
105
158k
      key_ = merge_out_iter_.key();
106
158k
      value_ = merge_out_iter_.value();
107
158k
      bool valid_key __attribute__((__unused__)) =
108
158k
          ParseInternalKey(key_, &ikey_);
109
      // MergeUntil stops when it encounters a corrupt key and does not
110
      // include them in the result, so we expect the keys here to be valid.
111
158k
      assert(valid_key);
112
      // Keep current_key_ in sync.
113
158k
      current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
114
158k
      key_ = current_key_.GetKey();
115
158k
      ikey_.user_key = current_key_.GetUserKey();
116
158k
      valid_ = true;
117
131k
    } else {
118
      // MergeHelper moves the iterator to the first record after the merged
119
      // records, so even though we reached the end of the merge output, we do
120
      // not want to advance the iterator.
121
131k
      NextFromInput();
122
131k
    }
123
70.8M
  } else {
124
    // Only advance the input iterator if there is no merge output and the
125
    // iterator is not already at the next record.
126
70.8M
    if (!at_next_) {
127
70.5M
      input_->Next();
128
70.5M
    }
129
70.8M
    NextFromInput();
130
70.8M
  }
131
132
71.1M
  if (valid_) {
133
    // Record that we've ouputted a record for the current key.
134
71.1M
    has_outputted_key_ = true;
135
71.1M
  }
136
137
71.1M
  PrepareOutput();
138
71.1M
}
139
140
70.9M
void CompactionIterator::NextFromInput() {
141
70.9M
  at_next_ = false;
142
70.9M
  valid_ = false;
143
144
161M
  while (!valid_ && input_->Valid()) {
145
90.1M
    key_ = input_->key();
146
90.1M
    value_ = input_->value();
147
90.1M
    iter_stats_.num_input_records++;
148
149
90.1M
    if (!ParseInternalKey(key_, &ikey_)) {
150
      // If `expect_valid_internal_key_` is false, return the corrupted key
151
      // and let the caller decide what to do with it.
152
      // TODO(noetzli): We should have a more elegant solution for this.
153
402
      if (expect_valid_internal_key_) {
154
0
        assert(!"Corrupted internal key not expected.");
155
0
        status_ = STATUS(Corruption, "Corrupted internal key not expected.");
156
0
        break;
157
0
      }
158
402
      key_ = current_key_.SetKey(key_);
159
402
      has_current_user_key_ = false;
160
402
      current_user_key_sequence_ = kMaxSequenceNumber;
161
402
      current_user_key_snapshot_ = 0;
162
402
      iter_stats_.num_input_corrupt_records++;
163
402
      valid_ = true;
164
402
      break;
165
402
    }
166
167
90.1M
    {
168
90.1M
      auto updated_live_range = false;
169
90.1M
      while (!live_key_ranges_stack_.empty() &&
170
3.88M
             !live_key_ranges_stack_.back().second.empty() &&
171
2.41M
             live_key_ranges_stack_.back().second.Less(ikey_.user_key)) {
172
        // As long as the active range is before the compaction iterator's current progress, pop to
173
        // the next active range.
174
226
        live_key_ranges_stack_.pop_back();
175
226
        updated_live_range = true;
176
226
      }
177
90.1M
      if (updated_live_range) {
178
226
        if (live_key_ranges_stack_.empty()) {
179
          // If we've iterated past the last active range, we're done.
180
47
          valid_ = false;
181
47
          return;
182
47
        }
183
184
179
        auto next_range_start = live_key_ranges_stack_.back().first;
185
179
        if (ikey_.user_key.Less(next_range_start)) {
186
          // If the next active range starts after the current key, then seek to it and continue.
187
79
          IterKey iter_key;
188
79
          iter_key.SetInternalKey(next_range_start, kMaxSequenceNumber, kValueTypeForSeek);
189
79
          input_->Seek(iter_key.GetKey());
190
79
          continue;
191
79
        }
192
90.1M
      }
193
90.1M
    }
194
195
    // Update input statistics
196
90.1M
    if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
197
3.56M
      iter_stats_.num_input_deletion_records++;
198
3.56M
    }
199
90.1M
    iter_stats_.total_input_raw_key_bytes += key_.size();
200
90.1M
    iter_stats_.total_input_raw_value_bytes += value_.size();
201
202
    // Check whether the user key changed. After this if statement current_key_
203
    // is a copy of the current input key (maybe converted to a delete by the
204
    // compaction filter). ikey_.user_key is pointing to the copy.
205
90.1M
    if (!has_current_user_key_ ||
206
89.8M
        !cmp_->Equal(ikey_.user_key, current_user_key_)) {
207
      // First occurrence of this user key
208
72.2M
      key_ = current_key_.SetKey(key_, &ikey_);
209
72.2M
      current_user_key_ = ikey_.user_key;
210
72.2M
      has_current_user_key_ = true;
211
72.2M
      has_outputted_key_ = false;
212
72.2M
      current_user_key_sequence_ = kMaxSequenceNumber;
213
72.2M
      current_user_key_snapshot_ = 0;
214
215
      // apply the compaction filter to the first occurrence of the user key
216
72.2M
      if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
217
12.2M
          (visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
218
12.2M
           ignore_snapshots_)) {
219
        // If the user has specified a compaction filter and the sequence
220
        // number is greater than any external snapshot, then invoke the
221
        // filter. If the return value of the compaction filter is true,
222
        // replace the entry with a deletion marker.
223
12.2M
        bool value_changed = false;
224
12.2M
        bool to_delete = false;
225
12.2M
        compaction_filter_value_.clear();
226
12.2M
        to_delete = compaction_filter_->Filter(
227
12.2M
            compaction_->level(), ikey_.user_key, value_,
228
12.2M
            &compaction_filter_value_, &value_changed) != FilterDecision::kKeep;
229
12.2M
        if (to_delete) {
230
          // convert the current key to a delete
231
104k
          ikey_.type = kTypeDeletion;
232
104k
          current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
233
          // no value associated with delete
234
104k
          value_.clear();
235
104k
          iter_stats_.num_record_drop_user++;
236
12.1M
        } else if (value_changed) {
237
1.40M
          value_ = compaction_filter_value_;
238
1.40M
        }
239
12.2M
      }
240
17.8M
    } else {
241
      // Update the current key to reflect the new sequence number/type without
242
      // copying the user key.
243
      // TODO(rven): Compaction filter does not process keys in this path
244
      // Need to have the compaction filter process multiple versions
245
      // if we have versions on both sides of a snapshot
246
17.8M
      current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
247
17.8M
      key_ = current_key_.GetKey();
248
17.8M
      ikey_.user_key = current_key_.GetUserKey();
249
17.8M
    }
250
251
    // If there are no snapshots, then this kv affect visibility at tip.
252
    // Otherwise, search though all existing snapshots to find the earliest
253
    // snapshot that is affected by this kv.
254
90.1M
    SequenceNumber last_sequence __attribute__((__unused__)) =
255
90.1M
        current_user_key_sequence_;
256
90.1M
    current_user_key_sequence_ = ikey_.sequence;
257
90.1M
    SequenceNumber last_snapshot = current_user_key_snapshot_;
258
90.1M
    SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
259
90.1M
    current_user_key_snapshot_ =
260
90.1M
        visible_at_tip_ ? visible_at_tip_
261
18.4E
                        : FindEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
262
263
90.1M
    if (clear_and_output_next_key_) {
264
      // In the previous iteration we encountered a single delete that we could
265
      // not compact out.  We will keep this Put, but can drop it's data.
266
      // (See Optimization 3, below.)
267
19
      assert(ikey_.type == kTypeValue);
268
19
      assert(current_user_key_snapshot_ == last_snapshot);
269
270
19
      value_.clear();
271
19
      valid_ = true;
272
19
      clear_and_output_next_key_ = false;
273
90.1M
    } else if (ikey_.type == kTypeSingleDeletion) {
274
      // We can compact out a SingleDelete if:
275
      // 1) We encounter the corresponding PUT -OR- we know that this key
276
      //    doesn't appear past this output level
277
      // =AND=
278
      // 2) We've already returned a record in this snapshot -OR-
279
      //    there are no earlier earliest_write_conflict_snapshot.
280
      //
281
      // Rule 1 is needed for SingleDelete correctness.  Rule 2 is needed to
282
      // allow Transactions to do write-conflict checking (if we compacted away
283
      // all keys, then we wouldn't know that a write happened in this
284
      // snapshot).  If there is no earlier snapshot, then we know that there
285
      // are no active transactions that need to know about any writes.
286
      //
287
      // Optimization 3:
288
      // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
289
      // true, then we must output a SingleDelete.  In this case, we will decide
290
      // to also output the PUT.  While we are compacting less by outputting the
291
      // PUT now, hopefully this will lead to better compaction in the future
292
      // when Rule 2 is later true (Ie, We are hoping we can later compact out
293
      // both the SingleDelete and the Put, while we couldn't if we only
294
      // outputted the SingleDelete now).
295
      // In this case, we can save space by removing the PUT's value as it will
296
      // never be read.
297
      //
298
      // Deletes and Merges are not supported on the same key that has a
299
      // SingleDelete as it is not possible to correctly do any partial
300
      // compaction of such a combination of operations.  The result of mixing
301
      // those operations for a given key is documented as being undefined.  So
302
      // we can choose how to handle such a combinations of operations.  We will
303
      // try to compact out as much as we can in these cases.
304
305
      // The easiest way to process a SingleDelete during iteration is to peek
306
      // ahead at the next key.
307
177k
      ParsedInternalKey next_ikey;
308
177k
      input_->Next();
309
310
      // Check whether the next key exists, is not corrupt, and is the same key
311
      // as the single delete.
312
177k
      if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
313
177k
          cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
314
        // Check whether the next key belongs to the same snapshot as the
315
        // SingleDelete.
316
247
        if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) {
317
186
          if (next_ikey.type == kTypeSingleDeletion) {
318
            // We encountered two SingleDeletes in a row.  This could be due to
319
            // unexpected user input.
320
            // Skip the first SingleDelete and let the next iteration decide how
321
            // to handle the second SingleDelete
322
323
            // First SingleDelete has been skipped since we already called
324
            // input_->Next().
325
6
            ++iter_stats_.num_record_drop_obsolete;
326
180
          } else if ((ikey_.sequence <= earliest_write_conflict_snapshot_) ||
327
161
                     has_outputted_key_) {
328
            // Found a matching value, we can drop the single delete and the
329
            // value.  It is safe to drop both records since we've already
330
            // outputted a key in this snapshot, or there is no earlier
331
            // snapshot (Rule 2 above).
332
333
            // Note: it doesn't matter whether the second key is a Put or if it
334
            // is an unexpected Merge or Delete.  We will compact it out
335
            // either way.
336
161
            ++iter_stats_.num_record_drop_hidden;
337
161
            ++iter_stats_.num_record_drop_obsolete;
338
            // Already called input_->Next() once.  Call it a second time to
339
            // skip past the second key.
340
161
            input_->Next();
341
19
          } else {
342
            // Found a matching value, but we cannot drop both keys since
343
            // there is an earlier snapshot and we need to leave behind a record
344
            // to know that a write happened in this snapshot (Rule 2 above).
345
            // Clear the value and output the SingleDelete. (The value will be
346
            // outputted on the next iteration.)
347
19
            ++iter_stats_.num_record_drop_hidden;
348
349
            // Setting valid_ to true will output the current SingleDelete
350
19
            valid_ = true;
351
352
            // Set up the Put to be outputted in the next iteration.
353
            // (Optimization 3).
354
19
            clear_and_output_next_key_ = true;
355
19
          }
356
61
        } else {
357
          // We hit the next snapshot without hitting a put, so the iterator
358
          // returns the single delete.
359
61
          valid_ = true;
360
61
        }
361
177k
      } else {
362
        // We are at the end of the input, could not parse the next key, or hit
363
        // the next key. The iterator returns the single delete if the key
364
        // possibly exists beyond the current output level.  We set
365
        // has_current_user_key to false so that if the iterator is at the next
366
        // key, we do not compare it again against the previous key at the next
367
        // iteration. If the next key is corrupt, we return before the
368
        // comparison, so the value of has_current_user_key does not matter.
369
177k
        has_current_user_key_ = false;
370
177k
        if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ &&
371
37
            compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
372
8
                                                       &level_ptrs_)) {
373
          // Key doesn't exist outside of this range.
374
          // Can compact out this SingleDelete.
375
8
          ++iter_stats_.num_record_drop_obsolete;
376
177k
        } else {
377
          // Output SingleDelete
378
177k
          valid_ = true;
379
177k
        }
380
177k
      }
381
382
177k
      if (valid_) {
383
177k
        at_next_ = true;
384
177k
      }
385
89.9M
    } else if (last_snapshot == current_user_key_snapshot_) {
386
      // If the earliest snapshot is which this key is visible in
387
      // is the same as the visibility of a previous instance of the
388
      // same key, then this kv is not visible in any snapshot.
389
      // Hidden by an newer entry for same user key
390
      // TODO: why not > ?
391
      //
392
      // Note: Dropping this key will not affect TransactionDB write-conflict
393
      // checking since there has already been a record returned for this key
394
      // in this snapshot.
395
17.8M
      assert(last_sequence >= current_user_key_sequence_);
396
17.8M
      ++iter_stats_.num_record_drop_hidden;  // (A)
397
17.8M
      input_->Next();
398
72.1M
    } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
399
1.58M
               ikey_.sequence <= earliest_snapshot_ &&
400
1.58M
               compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
401
1.44M
                                                          &level_ptrs_)) {
402
      // TODO(noetzli): This is the only place where we use compaction_
403
      // (besides the constructor). We should probably get rid of this
404
      // dependency and find a way to do similar filtering during flushes.
405
      //
406
      // For this user key:
407
      // (1) there is no data in higher levels
408
      // (2) data in lower levels will have larger sequence numbers
409
      // (3) data in layers that are being compacted here and have
410
      //     smaller sequence numbers will be dropped in the next
411
      //     few iterations of this loop (by rule (A) above).
412
      // Therefore this deletion marker is obsolete and can be dropped.
413
      //
414
      // Note:  Dropping this Delete will not affect TransactionDB
415
      // write-conflict checking since it is earlier than any snapshot.
416
1.44M
      ++iter_stats_.num_record_drop_obsolete;
417
1.44M
      input_->Next();
418
70.6M
    } else if (ikey_.type == kTypeMerge) {
419
131k
      if (!merge_helper_->HasOperator()) {
420
0
        LOG_TO_BUFFER(log_buffer_, "Options::merge_operator is null.");
421
0
        status_ = STATUS(InvalidArgument,
422
0
            "merge_operator is not properly initialized.");
423
0
        return;
424
0
      }
425
426
      // We know the merge type entry is not hidden, otherwise we would
427
      // have hit (A)
428
      // We encapsulate the merge related state machine in a different
429
      // object to minimize change to the existing flow.
430
131k
      WARN_NOT_OK(merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_),
431
131k
                  "Merge until failed");
432
131k
      merge_out_iter_.SeekToFirst();
433
434
131k
      if (merge_out_iter_.Valid()) {
435
        // NOTE: key, value, and ikey_ refer to old entries.
436
        //       These will be correctly set below.
437
131k
        key_ = merge_out_iter_.key();
438
131k
        value_ = merge_out_iter_.value();
439
131k
        bool valid_key __attribute__((__unused__)) =
440
131k
            ParseInternalKey(key_, &ikey_);
441
        // MergeUntil stops when it encounters a corrupt key and does not
442
        // include them in the result, so we expect the keys here to valid.
443
131k
        assert(valid_key);
444
        // Keep current_key_ in sync.
445
131k
        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
446
131k
        key_ = current_key_.GetKey();
447
131k
        ikey_.user_key = current_key_.GetUserKey();
448
131k
        valid_ = true;
449
4
      } else {
450
        // all merge operands were filtered out. reset the user key, since the
451
        // batch consumed by the merge operator should not shadow any keys
452
        // coming after the merges
453
4
        has_current_user_key_ = false;
454
4
      }
455
70.5M
    } else {
456
70.5M
      valid_ = true;
457
70.5M
    }
458
90.1M
  }
459
70.9M
}
460
461
71.1M
void CompactionIterator::PrepareOutput() {
462
  // Zeroing out the sequence number leads to better compression.
463
  // If this is the bottommost level (no files in lower levels)
464
  // and the earliest snapshot is larger than this seqno
465
  // and the userkey differs from the last userkey in compaction
466
  // then we can squash the seqno to zero.
467
468
  // This is safe for TransactionDB write-conflict checking since transactions
469
  // only care about sequence number larger than any active snapshots.
470
71.1M
  if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ &&
471
21.9M
      ikey_.type != kTypeMerge &&
472
21.8M
      !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
473
21.8M
    assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
474
21.8M
    ikey_.sequence = 0;
475
21.8M
    current_key_.UpdateInternalKey(0, ikey_.type);
476
21.8M
  }
477
71.1M
}
478
479
inline SequenceNumber CompactionIterator::FindEarliestVisibleSnapshot(
480
41.5k
    SequenceNumber in, SequenceNumber* prev_snapshot) {
481
41.5k
  assert(snapshots_->size());
482
41.5k
  SequenceNumber prev __attribute__((unused)) = 0;
483
79.1M
  for (const auto cur : *snapshots_) {
484
79.1M
    assert(prev <= cur);
485
79.1M
    if (cur >= in) {
486
36.4k
      *prev_snapshot = prev;
487
36.4k
      return cur;
488
36.4k
    }
489
79.0M
    prev = cur;
490
79.0M
    assert(prev);
491
79.0M
  }
492
5.06k
  *prev_snapshot = prev;
493
5.06k
  return kMaxSequenceNumber;
494
41.5k
}
495
496
}  // namespace rocksdb