YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/merger.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/table/merger.h"
25
26
#include <vector>
27
28
#include "yb/rocksdb/comparator.h"
29
#include "yb/rocksdb/iterator.h"
30
#include "yb/rocksdb/table/internal_iterator.h"
31
#include "yb/rocksdb/table/iter_heap.h"
32
#include "yb/rocksdb/table/iterator_wrapper.h"
33
#include "yb/rocksdb/util/arena.h"
34
#include "yb/rocksdb/util/autovector.h"
35
#include "yb/rocksdb/util/heap.h"
36
#include "yb/rocksdb/util/perf_context_imp.h"
37
#include "yb/rocksdb/util/sync_point.h"
38
39
#include "yb/util/stats/perf_step_timer.h"
40
#include "yb/util/status_log.h"
41
42
namespace rocksdb {
43
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
44
namespace {
45
typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
46
typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
47
}  // namespace
48
49
const size_t kNumIterReserve = 4;
50
51
class MergingIterator : public InternalIterator {
52
 public:
53
  MergingIterator(const Comparator* comparator, InternalIterator** children,
54
                  int n, bool is_arena_mode)
55
      : data_pinned_(false),
56
        is_arena_mode_(is_arena_mode),
57
        comparator_(comparator),
58
        current_(nullptr),
59
        direction_(kForward),
60
38.1M
        minHeap_(comparator_) {
61
38.1M
    children_.resize(n);
62
38.2M
    for (int i = 0; i < n; 
i++43.9k
) {
63
43.9k
      children_[i].Set(children[i]);
64
43.9k
    }
65
38.1M
    for (auto& child : children_) {
66
43.9k
      if (child.Valid()) {
67
20
        minHeap_.push(&child);
68
20
      }
69
43.9k
    }
70
38.1M
    current_ = CurrentForward();
71
38.1M
  }
72
73
24.2M
  virtual void AddIterator(InternalIterator* iter) {
74
24.2M
    assert(direction_ == kForward);
75
0
    children_.emplace_back(iter);
76
24.2M
    if (data_pinned_) {
77
0
      Status s = iter->PinData();
78
0
      assert(s.ok());
79
0
    }
80
0
    auto new_wrapper = children_.back();
81
24.2M
    if (new_wrapper.Valid()) {
82
0
      minHeap_.push(&new_wrapper);
83
0
      current_ = CurrentForward();
84
0
    }
85
24.2M
  }
86
87
9.46M
  virtual ~MergingIterator() {
88
24.2M
    for (auto& child : children_) {
89
24.2M
      child.DeleteIter(is_arena_mode_);
90
24.2M
    }
91
9.46M
  }
92
93
7.22G
  bool Valid() const override { return (current_ != nullptr); }
94
95
53.2k
  void SeekToFirst() override {
96
53.2k
    ClearHeaps();
97
203k
    for (auto& child : children_) {
98
203k
      child.SeekToFirst();
99
203k
      if (child.Valid()) {
100
198k
        minHeap_.push(&child);
101
198k
      }
102
203k
    }
103
53.2k
    direction_ = kForward;
104
53.2k
    current_ = CurrentForward();
105
53.2k
  }
106
107
604k
  void SeekToLast() override {
108
604k
    ClearHeaps();
109
604k
    InitMaxHeap();
110
1.78M
    for (auto& child : children_) {
111
1.78M
      child.SeekToLast();
112
1.78M
      if (child.Valid()) {
113
1.56M
        maxHeap_->push(&child);
114
1.56M
      }
115
1.78M
    }
116
604k
    direction_ = kReverse;
117
604k
    current_ = CurrentReverse();
118
604k
  }
119
120
54.3M
  void Seek(const Slice& target) override {
121
54.3M
    if (direction_ == kForward && 
current_53.2M
&&
current_->Valid()44.3M
) {
122
44.3M
      int key_vs_target = comparator_->Compare(current_->key(), target);
123
44.3M
      if (key_vs_target == 0) {
124
        // We're already at the right key.
125
0
        return;
126
0
      }
127
44.3M
      if (key_vs_target < 0) {
128
        // This is a "seek forward" operation, and the current key is less than the target. Keep
129
        // doing a seek on the top iterator and re-adding it to the min heap, until the top iterator
130
        // gives is a key >= target.
131
57.1M
        while (key_vs_target < 0) {
132
          // For the heap modifications below to be correct, current_ must be the current top of the
133
          // heap.
134
33.3M
          DCHECK_EQ(current_, CurrentForward());
135
33.3M
          current_->Seek(target);
136
33.3M
          UpdateHeapAfterCurrentAdvancement();
137
33.3M
          if (current_ == nullptr || 
!current_->Valid()33.3M
)
138
36.3k
            return;  // Reached the end.
139
33.3M
          key_vs_target = comparator_->Compare(current_->key(), target);
140
33.3M
        }
141
142
        // The current key is >= target, this is what we're looking for.
143
23.7M
        return;
144
23.8M
      }
145
146
      // The current key is already greater than the target, so this is not a forward seek.
147
      // Fall back to a full rebuild of the heap.
148
44.3M
    }
149
150
30.4M
    ClearHeaps();
151
89.6M
    for (auto& child : children_) {
152
89.6M
      {
153
89.6M
        PERF_TIMER_GUARD(seek_child_seek_time);
154
89.6M
        child.Seek(target);
155
89.6M
      }
156
89.6M
      PERF_COUNTER_ADD(seek_child_seek_count, 1);
157
158
89.6M
      if (child.Valid()) {
159
81.0M
        PERF_TIMER_GUARD(seek_min_heap_time);
160
81.0M
        minHeap_.push(&child);
161
81.0M
      }
162
89.6M
    }
163
30.4M
    direction_ = kForward;
164
30.4M
    {
165
30.4M
      PERF_TIMER_GUARD(seek_min_heap_time);
166
30.4M
      current_ = CurrentForward();
167
30.4M
    }
168
30.4M
  }
169
170
798M
  void Next() override {
171
798M
    assert(Valid());
172
173
    // Ensure that all children are positioned after key().
174
    // If we are moving in the forward direction, it is already
175
    // true for all of the non-current children since current_ is
176
    // the smallest child and key() == current_->key().
177
798M
    if (direction_ != kForward) {
178
      // Otherwise, advance the non-current children.  We advance current_
179
      // just after the if-block.
180
9.06k
      ClearHeaps();
181
791k
      for (auto& child : children_) {
182
791k
        if (&child != current_) {
183
782k
          child.Seek(key());
184
782k
          if (child.Valid() && 
comparator_->Equal(key(), child.key())712k
) {
185
0
            child.Next();
186
0
          }
187
782k
        }
188
791k
        if (child.Valid()) {
189
721k
          minHeap_.push(&child);
190
721k
        }
191
791k
      }
192
9.06k
      direction_ = kForward;
193
      // The loop advanced all non-current children to be > key() so current_
194
      // should still be strictly the smallest key.
195
9.06k
      assert(current_ == CurrentForward());
196
9.06k
    }
197
198
    // For the heap modifications below to be correct, current_ must be the current top of the heap.
199
0
    assert(current_ == CurrentForward());
200
201
    // As current_ points to the current record, move the iterator forward.
202
0
    current_->Next();
203
798M
    UpdateHeapAfterCurrentAdvancement();
204
798M
  }
205
206
7.19M
  void Prev() override {
207
7.19M
    assert(Valid());
208
    // Ensure that all children are positioned before key().
209
    // If we are moving in the reverse direction, it is already
210
    // true for all of the non-current children since current_ is
211
    // the largest child and key() == current_->key().
212
7.19M
    if (direction_ != kReverse) {
213
      // Otherwise, retreat the non-current children.  We retreat current_
214
      // just after the if-block.
215
452k
      ClearHeaps();
216
452k
      InitMaxHeap();
217
2.35M
      for (auto& child : children_) {
218
2.35M
        if (&child != current_) {
219
1.90M
          child.Seek(key());
220
1.90M
          if (child.Valid()) {
221
            // Child is at first entry >= key().  Step back one to be < key()
222
1.16M
            TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
223
1.16M
            child.Prev();
224
1.16M
          } else {
225
            // Child has no entries >= key().  Position at last entry.
226
737k
            TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
227
737k
            child.SeekToLast();
228
737k
          }
229
1.90M
        }
230
2.35M
        if (child.Valid()) {
231
1.70M
          maxHeap_->push(&child);
232
1.70M
        }
233
2.35M
      }
234
452k
      direction_ = kReverse;
235
      // Note that we don't do assert(current_ == CurrentReverse()) here
236
      // because it is possible to have some keys larger than the seek-key
237
      // inserted between Seek() and SeekToLast(), which makes current_ not
238
      // equal to CurrentReverse().
239
452k
      current_ = CurrentReverse();
240
452k
    }
241
242
    // For the heap modifications below to be correct, current_ must be the
243
    // current top of the heap.
244
7.19M
    assert(current_ == CurrentReverse());
245
246
0
    current_->Prev();
247
7.19M
    if (current_->Valid()) {
248
      // current is still valid after the Prev() call above.  Call
249
      // replace_top() to restore the heap property.  When the same child
250
      // iterator yields a sequence of keys, this is cheap.
251
7.13M
      maxHeap_->replace_top(current_);
252
7.13M
    } else {
253
      // current stopped being valid, remove it from the heap.
254
59.1k
      maxHeap_->pop();
255
59.1k
    }
256
7.19M
    current_ = CurrentReverse();
257
7.19M
  }
258
259
883M
  Slice key() const override {
260
883M
    assert(Valid());
261
0
    return current_->key();
262
883M
  }
263
264
2.37G
  Slice value() const override {
265
2.37G
    assert(Valid());
266
0
    return current_->value();
267
2.37G
  }
268
269
236k
  Status status() const override {
270
236k
    Status s;
271
3.10M
    for (auto& child : children_) {
272
3.10M
      s = child.status();
273
3.10M
      if (!s.ok()) {
274
47
        break;
275
47
      }
276
3.10M
    }
277
236k
    return s;
278
236k
  }
279
280
6
  Status PinData() override {
281
6
    Status s;
282
6
    if (data_pinned_) {
283
0
      return s;
284
0
    }
285
286
49
    
for (size_t i = 0; 6
i < children_.size();
i++43
) {
287
43
      s = children_[i].PinData();
288
43
      if (!s.ok()) {
289
        // We failed to pin an iterator, clean up
290
0
        for (size_t j = 0; j < i; j++) {
291
0
          WARN_NOT_OK(children_[j].ReleasePinnedData(), "Failed to release pinned data");
292
0
        }
293
0
        break;
294
0
      }
295
43
    }
296
6
    data_pinned_ = s.ok();
297
6
    return s;
298
6
  }
299
300
0
  Status ReleasePinnedData() override {
301
0
    Status s;
302
0
    if (!data_pinned_) {
303
0
      return s;
304
0
    }
305
306
0
    for (auto& child : children_) {
307
0
      Status release_status = child.ReleasePinnedData();
308
0
      if (s.ok() && !release_status.ok()) {
309
0
        s = release_status;
310
0
      }
311
0
    }
312
0
    data_pinned_ = false;
313
314
0
    return s;
315
0
  }
316
317
792M
  bool IsKeyPinned() const override {
318
792M
    assert(Valid());
319
0
    return current_->IsKeyPinned();
320
792M
  }
321
322
 private:
323
  bool data_pinned_;
324
  // Clears heaps for both directions, used when changing direction or seeking
325
  void ClearHeaps();
326
  // Ensures that maxHeap_ is initialized when starting to go in the reverse
327
  // direction
328
  void InitMaxHeap();
329
330
  bool is_arena_mode_;
331
  const Comparator* comparator_;
332
  autovector<IteratorWrapper, kNumIterReserve> children_;
333
334
  // Cached pointer to child iterator with the current key, or nullptr if no
335
  // child iterators are valid.  This is the top of minHeap_ or maxHeap_
336
  // depending on the direction.
337
  IteratorWrapper* current_;
338
  // Which direction is the iterator moving?
339
  enum Direction {
340
    kForward,
341
    kReverse
342
  };
343
  Direction direction_;
344
  MergerMinIterHeap minHeap_;
345
  // Max heap is used for reverse iteration, which is way less common than
346
  // forward.  Lazily initialize it to save memory.
347
  std::unique_ptr<MergerMaxIterHeap> maxHeap_;
348
349
1.73G
  IteratorWrapper* CurrentForward() const {
350
1.73G
    assert(direction_ == kForward);
351
1.73G
    return !minHeap_.empty() ? 
minHeap_.top()1.69G
:
nullptr39.2M
;
352
1.73G
  }
353
354
15.4M
  IteratorWrapper* CurrentReverse() const {
355
15.4M
    assert(direction_ == kReverse);
356
0
    assert(maxHeap_);
357
15.4M
    return !maxHeap_->empty() ? 
maxHeap_->top()15.4M
:
nullptr5.50k
;
358
15.4M
  }
359
360
  // This should be called after calling Next() or a forward seek on the top element.
361
832M
  void UpdateHeapAfterCurrentAdvancement() {
362
832M
    if (current_->Valid()) {
363
      // current_ is still valid after the previous Next() / forward Seek() call.  Call
364
      // replace_top() to restore the heap property.  When the same child iterator yields a sequence
365
      // of keys, this is cheap.
366
831M
      minHeap_.replace_top(current_);
367
831M
    } else {
368
      // current_ stopped being valid, remove it from the heap.
369
427k
      minHeap_.pop();
370
427k
    }
371
832M
    current_ = CurrentForward();
372
832M
  }
373
374
};
375
376
31.6M
void MergingIterator::ClearHeaps() {
377
31.6M
  minHeap_.clear();
378
31.6M
  if (maxHeap_) {
379
4.15M
    maxHeap_->clear();
380
4.15M
  }
381
31.6M
}
382
383
1.05M
void MergingIterator::InitMaxHeap() {
384
1.05M
  if (!maxHeap_) {
385
598k
    maxHeap_.reset(new MergerMaxIterHeap(comparator_));
386
598k
  }
387
1.05M
}
388
389
InternalIterator* NewMergingIterator(const Comparator* cmp,
390
                                     InternalIterator** list, int n,
391
39.9k
                                     Arena* arena) {
392
39.9k
  assert(n >= 0);
393
39.9k
  if (n == 0) {
394
2
    return NewEmptyInternalIterator(arena);
395
39.9k
  } else if (n == 1) {
396
28.5k
    return list[0];
397
28.5k
  } else {
398
11.4k
    if (arena == nullptr) {
399
10.9k
      return new MergingIterator(cmp, list, n, false);
400
10.9k
    } else {
401
479
      auto mem = arena->AllocateAligned(sizeof(MergingIterator));
402
479
      return new (mem) MergingIterator(cmp, list, n, true);
403
479
    }
404
11.4k
  }
405
39.9k
}
406
407
MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator,
408
                                           Arena* a)
409
38.1M
    : first_iter(nullptr), use_merging_iter(false), arena(a) {
410
411
38.1M
  auto mem = arena->AllocateAligned(sizeof(MergingIterator));
412
38.1M
  merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true);
413
38.1M
}
414
415
52.8M
void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
416
52.8M
  if (!use_merging_iter && 
first_iter != nullptr47.5M
) {
417
9.45M
    merge_iter->AddIterator(first_iter);
418
9.45M
    use_merging_iter = true;
419
9.45M
  }
420
52.8M
  if (use_merging_iter) {
421
14.7M
    merge_iter->AddIterator(iter);
422
38.0M
  } else {
423
38.0M
    first_iter = iter;
424
38.0M
  }
425
52.8M
}
426
427
38.1M
InternalIterator* MergeIteratorBuilder::Finish() {
428
38.1M
  if (!use_merging_iter) {
429
28.6M
    return first_iter;
430
28.6M
  } else {
431
9.45M
    auto ret = merge_iter;
432
9.45M
    merge_iter = nullptr;
433
9.45M
    return ret;
434
9.45M
  }
435
38.1M
}
436
437
}  // namespace rocksdb