YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_iter.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/db_iter.h"
25
26
#include <deque>
27
#include <limits>
28
29
#include "yb/rocksdb/db/dbformat.h"
30
#include "yb/rocksdb/env.h"
31
#include "yb/rocksdb/iterator.h"
32
#include "yb/rocksdb/merge_operator.h"
33
#include "yb/rocksdb/options.h"
34
#include "yb/rocksdb/table/internal_iterator.h"
35
#include "yb/rocksdb/util/arena.h"
36
#include "yb/rocksdb/util/mutexlock.h"
37
#include "yb/rocksdb/util/perf_context_imp.h"
38
#include "yb/rocksdb/util/statistics.h"
39
#include "yb/rocksdb/util/stop_watch.h"
40
41
#include "yb/util/stats/perf_step_timer.h"
42
#include "yb/util/status_log.h"
43
#include "yb/util/string_util.h"
44
45
namespace rocksdb {
46
47
#if 0
48
static void DumpInternalIter(Iterator* iter) {
49
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
50
    ParsedInternalKey k;
51
    if (!ParseInternalKey(iter->key(), &k)) {
52
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
53
    } else {
54
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
55
    }
56
  }
57
}
58
#endif
59
60
// Memtables and sstables that make the DB representation contain
61
// (userkey,seq,type) => uservalue entries.  DBIter
62
// combines multiple entries for the same userkey found in the DB
63
// representation into a single entry while accounting for sequence
64
// numbers, deletion markers, overwrites, etc.
65
class DBIter: public Iterator {
66
 public:
67
  // The following is grossly complicated. TODO: clean it up
68
  // Which direction is the iterator currently moving?
69
  // (1) When moving forward, the internal iterator is positioned at
70
  //     the exact entry that yields this->key(), this->value()
71
  // (2) When moving backwards, the internal iterator is positioned
72
  //     just before all entries whose user key == this->key().
73
  enum Direction {
74
    kForward,
75
    kReverse
76
  };
77
78
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
79
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
80
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
81
         const Slice* iterate_upper_bound = nullptr,
82
         bool prefix_same_as_start = false)
83
      : arena_mode_(arena_mode),
84
        env_(env),
85
        logger_(ioptions.info_log),
86
        user_comparator_(cmp),
87
        user_merge_operator_(ioptions.merge_operator),
88
        iter_(iter),
89
        sequence_(s),
90
        direction_(kForward),
91
        valid_(false),
92
        current_entry_is_merged_(false),
93
        statistics_(ioptions.statistics),
94
        version_number_(version_number),
95
        iterate_upper_bound_(iterate_upper_bound),
96
        prefix_same_as_start_(prefix_same_as_start),
97
15.3M
        iter_pinned_(false) {
98
15.3M
    RecordTick(statistics_, NO_ITERATORS);
99
15.3M
    prefix_extractor_ = ioptions.prefix_extractor;
100
15.3M
    max_skip_ = max_sequential_skip_in_iterations;
101
15.3M
  }
102
15.3M
  virtual ~DBIter() {
103
15.3M
    RecordTick(statistics_, NO_ITERATORS, -1);
104
15.3M
    if (!arena_mode_) {
105
1.08k
      delete iter_;
106
15.3M
    } else {
107
15.3M
      iter_->~InternalIterator();
108
15.3M
    }
109
15.3M
  }
110
15.3M
  virtual void SetIter(InternalIterator* iter) {
111
15.3M
    assert(iter_ == nullptr);
112
15.3M
    iter_ = iter;
113
15.3M
    if (iter_ && iter_pinned_) {
114
7
      CHECK_OK(iter_->PinData());
115
7
    }
116
15.3M
  }
117
1.90G
  bool Valid() const override { return valid_; }
118
4.86G
  Slice key() const override {
119
4.86G
    assert(valid_);
120
4.86G
    return saved_key_.GetKey();
121
4.86G
  }
122
1.08G
  Slice value() const override {
123
1.08G
    assert(valid_);
124
1.08G
    return (direction_ == kForward && !current_entry_is_merged_) ?
125
1.08G
      iter_->value() : saved_value_;
126
1.08G
  }
127
201k
  Status status() const override {
128
201k
    if (status_.ok()) {
129
201k
      return iter_->status();
130
0
    } else {
131
0
      return status_;
132
0
    }
133
201k
  }
134
7
  virtual Status PinData() {
135
7
    Status s;
136
7
    if (iter_) {
137
0
      s = iter_->PinData();
138
0
    }
139
7
    if (s.ok()) {
140
      // Even if iter_ is nullptr, we set iter_pinned_ to true so that when
141
      // iter_ is updated using SetIter, we Pin it.
142
7
      iter_pinned_ = true;
143
7
    }
144
7
    return s;
145
7
  }
146
0
  virtual Status ReleasePinnedData() {
147
0
    Status s;
148
0
    if (iter_) {
149
0
      s = iter_->ReleasePinnedData();
150
0
    }
151
0
    if (s.ok()) {
152
0
      iter_pinned_ = false;
153
0
    }
154
0
    return s;
155
0
  }
156
157
  virtual Status GetProperty(std::string prop_name,
158
424k
                             std::string* prop) override {
159
424k
    if (prop == nullptr) {
160
0
      return STATUS(InvalidArgument, "prop is nullptr");
161
0
    }
162
424k
    if (prop_name == "rocksdb.iterator.super-version-number") {
163
      // First try to pass the value returned from inner iterator.
164
8
      if (!iter_->GetProperty(prop_name, prop).ok()) {
165
4
        *prop = ToString(version_number_);
166
4
      }
167
8
      return Status::OK();
168
424k
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
169
424k
      if (valid_) {
170
424k
        *prop = (iter_pinned_ && saved_key_.IsKeyPinned()) ? "1" : "0";
171
1
      } else {
172
1
        *prop = "Iterator is not valid.";
173
1
      }
174
424k
      return Status::OK();
175
424k
    }
176
1
    return STATUS(InvalidArgument, "Undentified property.");
177
1
  }
178
179
  void Next() override;
180
  void Prev() override;
181
  void Seek(const Slice& target) override;
182
  void SeekToFirst() override;
183
  void SeekToLast() override;
184
185
50.6M
  void RevalidateAfterUpperBoundChange() override {
186
50.6M
    if (iter_->Valid() && direction_ == kForward) {
187
45.8M
      valid_ = true;
188
45.8M
      FindNextUserEntry(/* skipping= */ false);
189
45.8M
    }
190
50.6M
  }
191
192
 private:
193
  void ReverseToBackward();
194
  void PrevInternal();
195
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
196
  bool FindValueForCurrentKey();
197
  bool FindValueForCurrentKeyUsingSeek();
198
  void FindPrevUserKey();
199
  void FindNextUserKey();
200
  inline void FindNextUserEntry(bool skipping);
201
  void FindNextUserEntryInternal(bool skipping);
202
  bool ParseKey(ParsedInternalKey* key);
203
  void MergeValuesNewToOld();
204
205
60.7M
  inline void ClearSavedValue() {
206
60.7M
    if (saved_value_.capacity() > 1048576) {
207
0
      std::string empty;
208
0
      swap(empty, saved_value_);
209
60.7M
    } else {
210
60.7M
      saved_value_.clear();
211
60.7M
    }
212
60.7M
  }
213
214
  const SliceTransform* prefix_extractor_;
215
  bool arena_mode_;
216
  Env* const env_;
217
  Logger* logger_;
218
  const Comparator* const user_comparator_;
219
  const MergeOperator* const user_merge_operator_;
220
  InternalIterator* iter_;
221
  SequenceNumber const sequence_;
222
223
  Status status_;
224
  IterKey saved_key_;
225
  std::string saved_value_;
226
  Direction direction_;
227
  bool valid_;
228
  bool current_entry_is_merged_;
229
  Statistics* statistics_;
230
  uint64_t max_skip_;
231
  uint64_t version_number_;
232
  const Slice* iterate_upper_bound_;
233
  IterKey prefix_start_;
234
  bool prefix_same_as_start_;
235
  bool iter_pinned_;
236
  // List of operands for merge operator.
237
  std::deque<std::string> merge_operands_;
238
239
  // No copying allowed
240
  DBIter(const DBIter&);
241
  void operator=(const DBIter&);
242
};
243
244
747M
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
245
747M
  if (!ParseInternalKey(iter_->key(), ikey)) {
246
0
    status_ = STATUS(Corruption, "corrupted internal key in DBIter");
247
0
    RLOG(InfoLogLevel::ERROR_LEVEL,
248
0
        logger_, "corrupted internal key in DBIter: %s",
249
0
        iter_->key().ToString(true).c_str());
250
0
    return false;
251
747M
  } else {
252
747M
    return true;
253
747M
  }
254
747M
}
255
256
346M
void DBIter::Next() {
257
346M
  assert(valid_);
258
259
346M
  if (direction_ == kReverse) {
260
42.3k
    FindNextUserKey();
261
42.3k
    direction_ = kForward;
262
42.3k
    if (!iter_->Valid()) {
263
3.68k
      iter_->SeekToFirst();
264
3.68k
    }
265
346M
  } else if (iter_->Valid() && !current_entry_is_merged_) {
266
    // If the current value is not a merge, the iter position is the
267
    // current key, which is already returned. We can safely issue a
268
    // Next() without checking the current key.
269
    // If the current key is a merge, very likely iter already points
270
    // to the next internal position.
271
346M
    iter_->Next();
272
346M
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
273
346M
  }
274
275
  // Now we point to the next internal position, for both of merge and
276
  // not merge cases.
277
346M
  if (!iter_->Valid()) {
278
1.35M
    valid_ = false;
279
1.35M
    return;
280
1.35M
  }
281
345M
  FindNextUserEntry(true /* skipping the current user key */);
282
345M
  if (statistics_ != nullptr) {
283
331M
    RecordTick(statistics_, NUMBER_DB_NEXT);
284
331M
    if (valid_) {
285
329M
      RecordTick(statistics_, NUMBER_DB_NEXT_FOUND);
286
329M
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
287
329M
    }
288
331M
  }
289
345M
  if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
290
32
      prefix_extractor_->Transform(saved_key_.GetKey())
291
8
              .compare(prefix_start_.GetKey()) != 0) {
292
8
    valid_ = false;
293
8
  }
294
345M
}
295
296
// PRE: saved_key_ has the current user key if skipping
297
// POST: saved_key_ should have the next user key if valid_,
298
//       if the current entry is a result of merge
299
//           current_entry_is_merged_ => true
300
//           saved_value_             => the merged value
301
//
302
// NOTE: In between, saved_key_ can point to a user key that has
303
//       a delete marker
304
451M
inline void DBIter::FindNextUserEntry(bool skipping) {
305
451M
  PERF_TIMER_GUARD(find_next_user_entry_time);
306
451M
  FindNextUserEntryInternal(skipping);
307
451M
}
308
309
// Actual implementation of DBIter::FindNextUserEntry()
310
450M
void DBIter::FindNextUserEntryInternal(bool skipping) {
311
  // Loop until we hit an acceptable entry to yield
312
450M
  assert(iter_->Valid());
313
450M
  assert(direction_ == kForward);
314
450M
  current_entry_is_merged_ = false;
315
450M
  uint64_t num_skipped = 0;
316
742M
  do {
317
742M
    ParsedInternalKey ikey;
318
319
742M
    if (ParseKey(&ikey)) {
320
742M
      if (iterate_upper_bound_ != nullptr &&
321
405M
          user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
322
53.1M
        break;
323
53.1M
      }
324
325
689M
      if (ikey.sequence <= sequence_) {
326
686M
        if (skipping &&
327
630M
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
328
792k
          num_skipped++;  // skip this entry
329
792k
          PERF_COUNTER_ADD(internal_key_skipped_count, 1);
330
685M
        } else {
331
685M
          switch (ikey.type) {
332
1.20M
            case kTypeDeletion:
333
287M
            case kTypeSingleDeletion:
334
              // Arrange to skip all upcoming entries for this key since
335
              // they are hidden by this deletion.
336
287M
              saved_key_.SetKey(ikey.user_key,
337
287M
                                !iter_->IsKeyPinned() /* copy */);
338
287M
              skipping = true;
339
287M
              num_skipped = 0;
340
287M
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
341
287M
              break;
342
398M
            case kTypeValue:
343
398M
              valid_ = true;
344
398M
              saved_key_.SetKey(ikey.user_key,
345
398M
                                !iter_->IsKeyPinned() /* copy */);
346
398M
              return;
347
21.7k
            case kTypeMerge:
348
              // By now, we are sure the current ikey is going to yield a value
349
21.7k
              saved_key_.SetKey(ikey.user_key,
350
21.7k
                                !iter_->IsKeyPinned() /* copy */);
351
21.7k
              current_entry_is_merged_ = true;
352
21.7k
              valid_ = true;
353
21.7k
              MergeValuesNewToOld();  // Go to a different state machine
354
21.7k
              return;
355
0
            default:
356
0
              assert(false);
357
0
              break;
358
291M
          }
359
291M
        }
360
686M
      }
361
689M
    }
362
    // If we have sequentially iterated via numerous keys and still not
363
    // found the next user-key, then it is better to seek so that we can
364
    // avoid too many key comparisons. We seek to the last occurrence of
365
    // our current key by looking for sequence number 0 and type deletion
366
    // (the smallest type).
367
291M
    if (skipping && num_skipped > max_skip_) {
368
9.10k
      num_skipped = 0;
369
9.10k
      std::string last_key;
370
9.10k
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
371
9.10k
                                                     kTypeDeletion));
372
9.10k
      iter_->Seek(last_key);
373
9.10k
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
374
291M
    } else {
375
291M
      iter_->Next();
376
291M
    }
377
291M
  } while (iter_->Valid());
378
53.0M
  valid_ = false;
379
53.0M
}
380
381
// Merge values of the same user key starting from the current iter_ position
382
// Scan from the newer entries to older entries.
383
// PRE: iter_->key() points to the first merge type entry
384
//      saved_key_ stores the user key
385
// POST: saved_value_ has the merged value for the user key
386
//       iter_ points to the next entry (or invalid)
387
21.7k
void DBIter::MergeValuesNewToOld() {
388
21.7k
  if (!user_merge_operator_) {
389
0
    RLOG(InfoLogLevel::ERROR_LEVEL,
390
0
        logger_, "Options::merge_operator is null.");
391
0
    status_ = STATUS(InvalidArgument, "user_merge_operator_ must be set.");
392
0
    valid_ = false;
393
0
    return;
394
0
  }
395
396
  // Start the merge process by pushing the first operand
397
21.7k
  std::deque<std::string> operands;
398
21.7k
  operands.push_front(iter_->value().ToString());
399
400
21.7k
  ParsedInternalKey ikey;
401
23.1k
  for (iter_->Next(); iter_->Valid(); iter_->Next()) {
402
23.1k
    if (!ParseKey(&ikey)) {
403
      // skip corrupted key
404
0
      continue;
405
0
    }
406
407
23.1k
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
408
      // hit the next user key, stop right here
409
10.4k
      break;
410
12.7k
    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
411
      // hit a delete with the same user key, stop right here
412
      // iter_ is positioned after delete
413
820
      iter_->Next();
414
820
      break;
415
11.9k
    } else if (kTypeValue == ikey.type) {
416
      // hit a put, merge the put value with operands and store the
417
      // final result in saved_value_. We are done!
418
      // ignore corruption if there is any.
419
10.5k
      const Slice val = iter_->value();
420
10.5k
      {
421
10.5k
        StopWatchNano timer(env_, statistics_ != nullptr);
422
10.5k
        PERF_TIMER_GUARD(merge_operator_time_nanos);
423
10.5k
        user_merge_operator_->FullMerge(ikey.user_key, &val, operands,
424
10.5k
                                        &saved_value_, logger_);
425
10.5k
        RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
426
10.5k
                   timer.ElapsedNanos());
427
10.5k
      }
428
      // iter_ is positioned after put
429
10.5k
      iter_->Next();
430
10.5k
      return;
431
1.39k
    } else if (kTypeMerge == ikey.type) {
432
      // hit a merge, add the value as an operand and run associative merge.
433
      // when complete, add result to operands and continue.
434
1.39k
      const Slice& val = iter_->value();
435
1.39k
      operands.push_front(val.ToString());
436
0
    } else {
437
0
      assert(false);
438
0
    }
439
23.1k
  }
440
441
11.2k
  {
442
11.2k
    StopWatchNano timer(env_, statistics_ != nullptr);
443
11.2k
    PERF_TIMER_GUARD(merge_operator_time_nanos);
444
    // we either exhausted all internal keys under this user key, or hit
445
    // a deletion marker.
446
    // feed null as the existing value to the merge operator, such that
447
    // client can differentiate this scenario and do things accordingly.
448
11.2k
    user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
449
11.2k
                                    &saved_value_, logger_);
450
11.2k
    RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
451
11.2k
  }
452
11.2k
}
453
454
937k
void DBIter::Prev() {
455
937k
  assert(valid_);
456
937k
  if (direction_ == kForward) {
457
480k
    ReverseToBackward();
458
480k
  }
459
937k
  PrevInternal();
460
937k
  if (statistics_ != nullptr) {
461
416k
    RecordTick(statistics_, NUMBER_DB_PREV);
462
416k
    if (valid_) {
463
416k
      RecordTick(statistics_, NUMBER_DB_PREV_FOUND);
464
416k
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
465
416k
    }
466
416k
  }
467
937k
  if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
468
0
      prefix_extractor_->Transform(saved_key_.GetKey())
469
0
              .compare(prefix_start_.GetKey()) != 0) {
470
0
    valid_ = false;
471
0
  }
472
937k
}
473
474
480k
void DBIter::ReverseToBackward() {
475
480k
  if (current_entry_is_merged_) {
476
    // Not placed in the same key. Need to call Prev() until finding the
477
    // previous key.
478
3
    if (!iter_->Valid()) {
479
1
      iter_->SeekToLast();
480
1
    }
481
3
    ParsedInternalKey ikey;
482
3
    FindParseableKey(&ikey, kReverse);
483
5
    while (iter_->Valid() &&
484
5
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
485
2
      iter_->Prev();
486
2
      FindParseableKey(&ikey, kReverse);
487
2
    }
488
3
  }
489
480k
#ifndef NDEBUG
490
480k
  if (iter_->Valid()) {
491
480k
    ParsedInternalKey ikey;
492
480k
    assert(ParseKey(&ikey));
493
480k
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
494
480k
  }
495
480k
#endif
496
497
480k
  FindPrevUserKey();
498
480k
  direction_ = kReverse;
499
480k
}
500
501
1.07M
void DBIter::PrevInternal() {
502
1.07M
  if (!iter_->Valid()) {
503
35.5k
    valid_ = false;
504
35.5k
    return;
505
35.5k
  }
506
507
1.04M
  ParsedInternalKey ikey;
508
509
1.09M
  while (iter_->Valid()) {
510
1.09M
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
511
1.09M
                      !iter_->IsKeyPinned() /* copy */);
512
1.09M
    if (FindValueForCurrentKey()) {
513
1.04M
      valid_ = true;
514
1.04M
      if (!iter_->Valid()) {
515
21.5k
        return;
516
21.5k
      }
517
1.01M
      FindParseableKey(&ikey, kReverse);
518
1.01M
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
519
651
        FindPrevUserKey();
520
651
      }
521
1.01M
      return;
522
1.01M
    }
523
55.8k
    if (!iter_->Valid()) {
524
1.41k
      break;
525
1.41k
    }
526
54.4k
    FindParseableKey(&ikey, kReverse);
527
54.4k
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
528
12.0k
      FindPrevUserKey();
529
12.0k
    }
530
54.4k
  }
531
  // We haven't found any key - iterator is not valid
532
1.89k
  assert(!iter_->Valid());
533
1.89k
  valid_ = false;
534
1.89k
}
535
536
// This function checks, if the entry with biggest sequence_number <= sequence_
537
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
538
// saved_value_
539
1.09M
bool DBIter::FindValueForCurrentKey() {
540
1.09M
  assert(iter_->Valid());
541
1.09M
  merge_operands_.clear();
542
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
543
  // kTypeValue)
544
1.09M
  ValueType last_not_merge_type = kTypeDeletion;
545
1.09M
  ValueType last_key_entry_type = kTypeDeletion;
546
547
1.09M
  ParsedInternalKey ikey;
548
1.09M
  FindParseableKey(&ikey, kReverse);
549
550
1.09M
  size_t num_skipped = 0;
551
2.28M
  while (iter_->Valid() && ikey.sequence <= sequence_ &&
552
2.24M
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
553
    // We iterate too much: let's use Seek() to avoid too much key comparisons
554
1.18M
    if (num_skipped >= max_skip_) {
555
970
      return FindValueForCurrentKeyUsingSeek();
556
970
    }
557
558
1.18M
    last_key_entry_type = ikey.type;
559
1.18M
    switch (last_key_entry_type) {
560
1.09M
      case kTypeValue:
561
1.09M
        merge_operands_.clear();
562
1.09M
        saved_value_ = iter_->value().ToString();
563
1.09M
        last_not_merge_type = kTypeValue;
564
1.09M
        break;
565
66.1k
      case kTypeDeletion:
566
66.1k
      case kTypeSingleDeletion:
567
66.1k
        merge_operands_.clear();
568
66.1k
        last_not_merge_type = last_key_entry_type;
569
66.1k
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
570
66.1k
        break;
571
20.3k
      case kTypeMerge:
572
20.3k
        assert(user_merge_operator_ != nullptr);
573
20.3k
        merge_operands_.push_back(iter_->value().ToString());
574
20.3k
        break;
575
0
      default:
576
0
        assert(false);
577
1.18M
    }
578
579
1.18M
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
580
1.18M
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
581
1.18M
    iter_->Prev();
582
1.18M
    ++num_skipped;
583
1.18M
    FindParseableKey(&ikey, kReverse);
584
1.18M
  }
585
586
1.09M
  switch (last_key_entry_type) {
587
55.4k
    case kTypeDeletion:
588
55.4k
    case kTypeSingleDeletion:
589
55.4k
      valid_ = false;
590
55.4k
      return false;
591
15.4k
    case kTypeMerge:
592
15.4k
      if (last_not_merge_type == kTypeDeletion) {
593
8.99k
        StopWatchNano timer(env_, statistics_ != nullptr);
594
8.99k
        PERF_TIMER_GUARD(merge_operator_time_nanos);
595
8.99k
        user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
596
8.99k
                                        merge_operands_, &saved_value_,
597
8.99k
                                        logger_);
598
8.99k
        RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
599
8.99k
                   timer.ElapsedNanos());
600
6.50k
      } else {
601
6.50k
        assert(last_not_merge_type == kTypeValue);
602
6.50k
        std::string last_put_value = saved_value_;
603
6.50k
        Slice temp_slice(last_put_value);
604
6.50k
        {
605
6.50k
          StopWatchNano timer(env_, statistics_ != nullptr);
606
6.50k
          PERF_TIMER_GUARD(merge_operator_time_nanos);
607
6.50k
          user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
608
6.50k
                                          merge_operands_, &saved_value_,
609
6.50k
                                          logger_);
610
6.50k
          RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
611
6.50k
                     timer.ElapsedNanos());
612
6.50k
        }
613
6.50k
      }
614
15.4k
      break;
615
1.02M
    case kTypeValue:
616
      // do nothing - we've already has value in saved_value_
617
1.02M
      break;
618
0
    default:
619
0
      assert(false);
620
0
      break;
621
1.04M
  }
622
1.04M
  valid_ = true;
623
1.04M
  return true;
624
1.04M
}
625
626
// This function is used in FindValueForCurrentKey.
627
// We use Seek() function instead of Prev() to find necessary value
628
970
bool DBIter::FindValueForCurrentKeyUsingSeek() {
629
970
  std::string last_key;
630
970
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
631
970
                                                 kValueTypeForSeek));
632
970
  iter_->Seek(last_key);
633
970
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
634
635
  // assume there is at least one parseable key for this user key
636
970
  ParsedInternalKey ikey;
637
970
  FindParseableKey(&ikey, kForward);
638
639
970
  if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
640
776
      ikey.type == kTypeSingleDeletion) {
641
776
    if (ikey.type == kTypeValue) {
642
391
      saved_value_ = iter_->value().ToString();
643
391
      valid_ = true;
644
391
      return true;
645
391
    }
646
385
    valid_ = false;
647
385
    return false;
648
385
  }
649
650
  // kTypeMerge. We need to collect all kTypeMerge values and save them
651
  // in operands
652
194
  std::deque<std::string> operands;
653
  // TODO: we dont need rocksdb level merge records and only use RocksDB level tombstones in
654
  // intentsdb, so maybe we can be more efficient here.
655
20.2k
  while (iter_->Valid() &&
656
20.0k
         user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
657
20.0k
         ikey.type == kTypeMerge) {
658
20.0k
    operands.push_front(iter_->value().ToString());
659
20.0k
    iter_->Next();
660
20.0k
    FindParseableKey(&ikey, kForward);
661
20.0k
  }
662
663
194
  if (!iter_->Valid() ||
664
2
      !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
665
194
      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
666
194
    {
667
194
      StopWatchNano timer(env_, statistics_ != nullptr);
668
194
      PERF_TIMER_GUARD(merge_operator_time_nanos);
669
194
      user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
670
194
                                      &saved_value_, logger_);
671
194
      RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
672
194
    }
673
    // Make iter_ valid and point to saved_key_
674
194
    if (!iter_->Valid() ||
675
192
        !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
676
192
      iter_->Seek(last_key);
677
192
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
678
192
    }
679
194
    valid_ = true;
680
194
    return true;
681
194
  }
682
683
0
  const Slice& val = iter_->value();
684
0
  {
685
0
    StopWatchNano timer(env_, statistics_ != nullptr);
686
0
    PERF_TIMER_GUARD(merge_operator_time_nanos);
687
0
    user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands,
688
0
                                    &saved_value_, logger_);
689
0
    RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
690
0
  }
691
0
  valid_ = true;
692
0
  return true;
693
0
}
694
695
// Used in Next to change directions
696
// Go to next user key
697
// Don't use Seek(),
698
// because next user key will be very close
699
42.3k
void DBIter::FindNextUserKey() {
700
42.3k
  if (!iter_->Valid()) {
701
3.68k
    return;
702
3.68k
  }
703
38.6k
  ParsedInternalKey ikey;
704
38.6k
  FindParseableKey(&ikey, kForward);
705
77.3k
  while (iter_->Valid() &&
706
77.3k
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
707
38.6k
    iter_->Next();
708
38.6k
    FindParseableKey(&ikey, kForward);
709
38.6k
  }
710
38.6k
}
711
712
// Go to previous user_key
713
493k
void DBIter::FindPrevUserKey() {
714
493k
  if (!iter_->Valid()) {
715
0
    return;
716
0
  }
717
493k
  size_t num_skipped = 0;
718
493k
  ParsedInternalKey ikey;
719
493k
  FindParseableKey(&ikey, kReverse);
720
493k
  int cmp;
721
996k
  while (iter_->Valid() && ((cmp = user_comparator_->Compare(
722
968k
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
723
502k
                            (cmp > 0 && ikey.sequence > sequence_))) {
724
502k
    if (cmp == 0) {
725
502k
      if (num_skipped >= max_skip_) {
726
1.17k
        num_skipped = 0;
727
1.17k
        IterKey last_key;
728
1.17k
        last_key.SetInternalKey(ParsedInternalKey(
729
1.17k
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
730
1.17k
        iter_->Seek(last_key.GetKey());
731
1.17k
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
732
501k
      } else {
733
501k
        ++num_skipped;
734
501k
      }
735
502k
    }
736
502k
    iter_->Prev();
737
502k
    FindParseableKey(&ikey, kReverse);
738
502k
  }
739
493k
}
740
741
// Skip all unparseable keys
742
4.44M
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
743
4.44M
  while (iter_->Valid() && !ParseKey(ikey)) {
744
0
    if (direction == kReverse) {
745
0
      iter_->Prev();
746
0
    } else {
747
0
      iter_->Next();
748
0
    }
749
0
  }
750
4.44M
}
751
752
67.8M
void DBIter::Seek(const Slice& target) {
753
67.8M
  saved_key_.Clear();
754
  // now savved_key is used to store internal key.
755
67.8M
  saved_key_.SetInternalKey(target, sequence_);
756
757
67.8M
  {
758
67.8M
    PERF_TIMER_GUARD(seek_internal_seek_time);
759
67.8M
    iter_->Seek(saved_key_.GetKey());
760
67.8M
  }
761
762
67.8M
  RecordTick(statistics_, NUMBER_DB_SEEK);
763
67.8M
  if (iter_->Valid()) {
764
60.4M
    direction_ = kForward;
765
60.4M
    ClearSavedValue();
766
60.4M
    FindNextUserEntry(false /* not skipping */);
767
60.4M
    if (statistics_ != nullptr) {
768
59.7M
      if (valid_) {
769
52.7M
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
770
52.7M
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
771
52.7M
      }
772
59.7M
    }
773
7.39M
  } else {
774
7.39M
    valid_ = false;
775
7.39M
  }
776
67.8M
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
777
8
    prefix_start_.SetKey(prefix_extractor_->Transform(target));
778
8
  }
779
67.8M
}
780
781
211k
void DBIter::SeekToFirst() {
782
  // Don't use iter_::Seek() if we set a prefix extractor
783
  // because prefix seek will be used.
784
211k
  if (prefix_extractor_ != nullptr) {
785
1.41k
    max_skip_ = std::numeric_limits<uint64_t>::max();
786
1.41k
  }
787
211k
  direction_ = kForward;
788
211k
  ClearSavedValue();
789
790
211k
  {
791
211k
    PERF_TIMER_GUARD(seek_internal_seek_time);
792
211k
    iter_->SeekToFirst();
793
211k
  }
794
795
211k
  RecordTick(statistics_, NUMBER_DB_SEEK);
796
211k
  if (iter_->Valid()) {
797
172k
    FindNextUserEntry(false /* not skipping */);
798
172k
    if (statistics_ != nullptr) {
799
31.0k
      if (valid_) {
800
30.8k
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
801
30.8k
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
802
30.8k
      }
803
31.0k
    }
804
38.4k
  } else {
805
38.4k
    valid_ = false;
806
38.4k
  }
807
211k
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
808
0
    prefix_start_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
809
0
  }
810
211k
}
811
812
140k
void DBIter::SeekToLast() {
813
  // Don't use iter_::Seek() if we set a prefix extractor
814
  // because prefix seek will be used.
815
140k
  if (prefix_extractor_ != nullptr) {
816
36
    max_skip_ = std::numeric_limits<uint64_t>::max();
817
36
  }
818
140k
  direction_ = kReverse;
819
140k
  ClearSavedValue();
820
821
140k
  {
822
140k
    PERF_TIMER_GUARD(seek_internal_seek_time);
823
140k
    iter_->SeekToLast();
824
140k
  }
825
  // When the iterate_upper_bound is set to a value,
826
  // it will seek to the last key before the
827
  // ReadOptions.iterate_upper_bound
828
140k
  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
829
12
    saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
830
12
    std::string last_key;
831
12
    AppendInternalKey(&last_key,
832
12
                      ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
833
12
                                        kValueTypeForSeek));
834
835
12
    iter_->Seek(last_key);
836
837
12
    if (!iter_->Valid()) {
838
2
      iter_->SeekToLast();
839
10
    } else {
840
10
      iter_->Prev();
841
10
      if (!iter_->Valid()) {
842
2
        valid_ = false;
843
2
        return;
844
2
      }
845
140k
    }
846
12
  }
847
140k
  PrevInternal();
848
140k
  if (statistics_ != nullptr) {
849
1.74k
    RecordTick(statistics_, NUMBER_DB_SEEK);
850
1.74k
    if (valid_) {
851
1.48k
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
852
1.48k
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
853
1.48k
    }
854
1.74k
  }
855
140k
  if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
856
0
    prefix_start_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
857
0
  }
858
140k
}
859
860
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
861
                        const Comparator* user_key_comparator,
862
                        InternalIterator* internal_iter,
863
                        const SequenceNumber& sequence,
864
                        uint64_t max_sequential_skip_in_iterations,
865
                        uint64_t version_number,
866
                        const Slice* iterate_upper_bound,
867
1.08k
                        bool prefix_same_as_start, bool pin_data) {
868
1.08k
  DBIter* db_iter =
869
1.08k
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
870
1.08k
                 false, max_sequential_skip_in_iterations, version_number,
871
1.08k
                 iterate_upper_bound, prefix_same_as_start);
872
1.08k
  if (pin_data) {
873
0
    CHECK_OK(db_iter->PinData());
874
0
  }
875
1.08k
  return db_iter;
876
1.08k
}
877
878
15.3M
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
879
880
15.3M
void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
881
882
15.3M
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
883
15.3M
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
884
15.3M
}
885
886
1.90G
inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
887
210k
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
888
139k
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
889
67.7M
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
890
67.7M
  db_iter_->Seek(target);
891
67.7M
}
892
346M
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
893
935k
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
894
4.49G
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
895
702M
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
896
201k
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
897
7
inline Status ArenaWrappedDBIter::PinData() { return db_iter_->PinData(); }
898
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
899
424k
                                              std::string* prop) {
900
424k
  return db_iter_->GetProperty(prop_name, prop);
901
424k
}
902
0
inline Status ArenaWrappedDBIter::ReleasePinnedData() {
903
0
  return db_iter_->ReleasePinnedData();
904
0
}
905
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
906
0
                                         void* arg2) {
907
0
  db_iter_->RegisterCleanup(function, arg1, arg2);
908
0
}
909
910
50.6M
void ArenaWrappedDBIter::RevalidateAfterUpperBoundChange() {
911
50.6M
  db_iter_->RevalidateAfterUpperBoundChange();
912
50.6M
}
913
914
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
915
    Env* env, const ImmutableCFOptions& ioptions,
916
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
917
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
918
    const Slice* iterate_upper_bound, bool prefix_same_as_start,
919
15.3M
    bool pin_data) {
920
15.3M
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
921
15.3M
  Arena* arena = iter->GetArena();
922
15.3M
  auto mem = arena->AllocateAligned(sizeof(DBIter));
923
15.3M
  DBIter* db_iter =
924
15.3M
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
925
15.3M
                       true, max_sequential_skip_in_iterations, version_number,
926
15.3M
                       iterate_upper_bound, prefix_same_as_start);
927
928
15.3M
  iter->SetDBIter(db_iter);
929
15.3M
  if (pin_data) {
930
7
    CHECK_OK(iter->PinData());
931
7
  }
932
933
15.3M
  return iter;
934
15.3M
}
935
936
}  // namespace rocksdb