YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_iter.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/db_iter.h"
25
26
#include <deque>
27
#include <limits>
28
29
#include "yb/rocksdb/db/dbformat.h"
30
#include "yb/rocksdb/env.h"
31
#include "yb/rocksdb/iterator.h"
32
#include "yb/rocksdb/merge_operator.h"
33
#include "yb/rocksdb/options.h"
34
#include "yb/rocksdb/table/internal_iterator.h"
35
#include "yb/rocksdb/util/arena.h"
36
#include "yb/rocksdb/util/mutexlock.h"
37
#include "yb/rocksdb/util/perf_context_imp.h"
38
#include "yb/rocksdb/util/statistics.h"
39
#include "yb/rocksdb/util/stop_watch.h"
40
41
#include "yb/util/stats/perf_step_timer.h"
42
#include "yb/util/status_log.h"
43
#include "yb/util/string_util.h"
44
45
namespace rocksdb {
46
47
#if 0
48
static void DumpInternalIter(Iterator* iter) {
49
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
50
    ParsedInternalKey k;
51
    if (!ParseInternalKey(iter->key(), &k)) {
52
      fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
53
    } else {
54
      fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
55
    }
56
  }
57
}
58
#endif
59
60
// Memtables and sstables that make the DB representation contain
61
// (userkey,seq,type) => uservalue entries.  DBIter
62
// combines multiple entries for the same userkey found in the DB
63
// representation into a single entry while accounting for sequence
64
// numbers, deletion markers, overwrites, etc.
65
class DBIter: public Iterator {
66
 public:
67
  // The following is grossly complicated. TODO: clean it up
68
  // Which direction is the iterator currently moving?
69
  // (1) When moving forward, the internal iterator is positioned at
70
  //     the exact entry that yields this->key(), this->value()
71
  // (2) When moving backwards, the internal iterator is positioned
72
  //     just before all entries whose user key == this->key().
73
  enum Direction {
74
    kForward,
75
    kReverse
76
  };
77
78
  DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp,
79
         InternalIterator* iter, SequenceNumber s, bool arena_mode,
80
         uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
81
         const Slice* iterate_upper_bound = nullptr,
82
         bool prefix_same_as_start = false)
83
      : arena_mode_(arena_mode),
84
        env_(env),
85
        logger_(ioptions.info_log),
86
        user_comparator_(cmp),
87
        user_merge_operator_(ioptions.merge_operator),
88
        iter_(iter),
89
        sequence_(s),
90
        direction_(kForward),
91
        valid_(false),
92
        current_entry_is_merged_(false),
93
        statistics_(ioptions.statistics),
94
        version_number_(version_number),
95
        iterate_upper_bound_(iterate_upper_bound),
96
        prefix_same_as_start_(prefix_same_as_start),
97
38.1M
        iter_pinned_(false) {
98
38.1M
    RecordTick(statistics_, NO_ITERATORS);
99
38.1M
    prefix_extractor_ = ioptions.prefix_extractor;
100
38.1M
    max_skip_ = max_sequential_skip_in_iterations;
101
38.1M
  }
102
38.1M
  virtual ~DBIter() {
103
38.1M
    RecordTick(statistics_, NO_ITERATORS, -1);
104
38.1M
    if (!arena_mode_) {
105
1.08k
      delete iter_;
106
38.1M
    } else {
107
38.1M
      iter_->~InternalIterator();
108
38.1M
    }
109
38.1M
  }
110
38.1M
  virtual void SetIter(InternalIterator* iter) {
111
38.1M
    assert(iter_ == nullptr);
112
0
    iter_ = iter;
113
38.1M
    if (iter_ && 
iter_pinned_38.1M
) {
114
7
      CHECK_OK(iter_->PinData());
115
7
    }
116
38.1M
  }
117
5.49G
  bool Valid() const override { return valid_; }
118
13.8G
  Slice key() const override {
119
13.8G
    assert(valid_);
120
0
    return saved_key_.GetKey();
121
13.8G
  }
122
3.12G
  Slice value() const override {
123
3.12G
    assert(valid_);
124
3.12G
    return (direction_ == kForward && 
!current_entry_is_merged_3.11G
) ?
125
3.11G
      iter_->value() : 
saved_value_7.84M
;
126
3.12G
  }
127
201k
  Status status() const override {
128
201k
    if (status_.ok()) {
129
201k
      return iter_->status();
130
201k
    } else {
131
0
      return status_;
132
0
    }
133
201k
  }
134
7
  virtual Status PinData() {
135
7
    Status s;
136
7
    if (iter_) {
137
0
      s = iter_->PinData();
138
0
    }
139
7
    if (s.ok()) {
140
      // Even if iter_ is nullptr, we set iter_pinned_ to true so that when
141
      // iter_ is updated using SetIter, we Pin it.
142
7
      iter_pinned_ = true;
143
7
    }
144
7
    return s;
145
7
  }
146
0
  virtual Status ReleasePinnedData() {
147
0
    Status s;
148
0
    if (iter_) {
149
0
      s = iter_->ReleasePinnedData();
150
0
    }
151
0
    if (s.ok()) {
152
0
      iter_pinned_ = false;
153
0
    }
154
0
    return s;
155
0
  }
156
157
  virtual Status GetProperty(std::string prop_name,
158
424k
                             std::string* prop) override {
159
424k
    if (prop == nullptr) {
160
0
      return STATUS(InvalidArgument, "prop is nullptr");
161
0
    }
162
424k
    if (prop_name == "rocksdb.iterator.super-version-number") {
163
      // First try to pass the value returned from inner iterator.
164
8
      if (!iter_->GetProperty(prop_name, prop).ok()) {
165
4
        *prop = ToString(version_number_);
166
4
      }
167
8
      return Status::OK();
168
424k
    } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
169
424k
      if (valid_) {
170
424k
        *prop = (iter_pinned_ && 
saved_key_.IsKeyPinned()424k
) ?
"1"424k
:
"0"1
;
171
424k
      } else {
172
1
        *prop = "Iterator is not valid.";
173
1
      }
174
424k
      return Status::OK();
175
424k
    }
176
1
    return STATUS(InvalidArgument, "Undentified property.");
177
424k
  }
178
179
  void Next() override;
180
  void Prev() override;
181
  void Seek(const Slice& target) override;
182
  void SeekToFirst() override;
183
  void SeekToLast() override;
184
185
149M
  void RevalidateAfterUpperBoundChange() override {
186
149M
    if (iter_->Valid() && 
direction_ == kForward136M
) {
187
136M
      valid_ = true;
188
136M
      FindNextUserEntry(/* skipping= */ false);
189
136M
    }
190
149M
  }
191
192
 private:
193
  void ReverseToBackward();
194
  void PrevInternal();
195
  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
196
  bool FindValueForCurrentKey();
197
  bool FindValueForCurrentKeyUsingSeek();
198
  void FindPrevUserKey();
199
  void FindNextUserKey();
200
  inline void FindNextUserEntry(bool skipping);
201
  void FindNextUserEntryInternal(bool skipping);
202
  bool ParseKey(ParsedInternalKey* key);
203
  void MergeValuesNewToOld();
204
205
176M
  inline void ClearSavedValue() {
206
176M
    if (saved_value_.capacity() > 1048576) {
207
0
      std::string empty;
208
0
      swap(empty, saved_value_);
209
176M
    } else {
210
176M
      saved_value_.clear();
211
176M
    }
212
176M
  }
213
214
  const SliceTransform* prefix_extractor_;
215
  bool arena_mode_;
216
  Env* const env_;
217
  Logger* logger_;
218
  const Comparator* const user_comparator_;
219
  const MergeOperator* const user_merge_operator_;
220
  InternalIterator* iter_;
221
  SequenceNumber const sequence_;
222
223
  Status status_;
224
  IterKey saved_key_;
225
  std::string saved_value_;
226
  Direction direction_;
227
  bool valid_;
228
  bool current_entry_is_merged_;
229
  Statistics* statistics_;
230
  uint64_t max_skip_;
231
  uint64_t version_number_;
232
  const Slice* iterate_upper_bound_;
233
  IterKey prefix_start_;
234
  bool prefix_same_as_start_;
235
  bool iter_pinned_;
236
  // List of operands for merge operator.
237
  std::deque<std::string> merge_operands_;
238
239
  // No copying allowed
240
  DBIter(const DBIter&);
241
  void operator=(const DBIter&);
242
};
243
244
1.32G
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
245
1.32G
  if (!ParseInternalKey(iter_->key(), ikey)) {
246
0
    status_ = STATUS(Corruption, "corrupted internal key in DBIter");
247
0
    RLOG(InfoLogLevel::ERROR_LEVEL,
248
0
        logger_, "corrupted internal key in DBIter: %s",
249
0
        iter_->key().ToString(true).c_str());
250
0
    return false;
251
1.32G
  } else {
252
1.32G
    return true;
253
1.32G
  }
254
1.32G
}
255
256
986M
void DBIter::Next() {
257
986M
  assert(valid_);
258
259
986M
  if (direction_ == kReverse) {
260
48.2k
    FindNextUserKey();
261
48.2k
    direction_ = kForward;
262
48.2k
    if (!iter_->Valid()) {
263
3.68k
      iter_->SeekToFirst();
264
3.68k
    }
265
986M
  } else if (iter_->Valid() && 
!current_entry_is_merged_985M
) {
266
    // If the current value is not a merge, the iter position is the
267
    // current key, which is already returned. We can safely issue a
268
    // Next() without checking the current key.
269
    // If the current key is a merge, very likely iter already points
270
    // to the next internal position.
271
985M
    iter_->Next();
272
985M
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
273
985M
  }
274
275
  // Now we point to the next internal position, for both of merge and
276
  // not merge cases.
277
986M
  if (!iter_->Valid()) {
278
2.02M
    valid_ = false;
279
2.02M
    return;
280
2.02M
  }
281
984M
  FindNextUserEntry(true /* skipping the current user key */);
282
984M
  if (statistics_ != nullptr) {
283
969M
    RecordTick(statistics_, NUMBER_DB_NEXT);
284
969M
    if (valid_) {
285
969M
      RecordTick(statistics_, NUMBER_DB_NEXT_FOUND);
286
969M
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
287
969M
    }
288
969M
  }
289
984M
  if (valid_ && 
prefix_extractor_982M
&&
prefix_same_as_start_2.17M
&&
290
984M
      prefix_extractor_->Transform(saved_key_.GetKey())
291
32
              .compare(prefix_start_.GetKey()) != 0) {
292
8
    valid_ = false;
293
8
  }
294
984M
}
295
296
// PRE: saved_key_ has the current user key if skipping
297
// POST: saved_key_ should have the next user key if valid_,
298
//       if the current entry is a result of merge
299
//           current_entry_is_merged_ => true
300
//           saved_value_             => the merged value
301
//
302
// NOTE: In between, saved_key_ can point to a user key that has
303
//       a delete marker
304
1.29G
inline void DBIter::FindNextUserEntry(bool skipping) {
305
1.29G
  PERF_TIMER_GUARD(find_next_user_entry_time);
306
1.29G
  FindNextUserEntryInternal(skipping);
307
1.29G
}
308
309
// Actual implementation of DBIter::FindNextUserEntry()
310
1.29G
void DBIter::FindNextUserEntryInternal(bool skipping) {
311
  // Loop until we hit an acceptable entry to yield
312
1.29G
  assert(iter_->Valid());
313
0
  assert(direction_ == kForward);
314
0
  current_entry_is_merged_ = false;
315
1.29G
  uint64_t num_skipped = 0;
316
1.30G
  do {
317
1.30G
    ParsedInternalKey ikey;
318
319
1.30G
    if (
ParseKey(&ikey)1.30G
) {
320
1.30G
      if (iterate_upper_bound_ != nullptr &&
321
1.30G
          
user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0336M
) {
322
157M
        break;
323
157M
      }
324
325
1.14G
      if (ikey.sequence <= sequence_) {
326
1.14G
        if (skipping &&
327
1.14G
           
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0981M
) {
328
1.14M
          num_skipped++;  // skip this entry
329
1.14M
          PERF_COUNTER_ADD(internal_key_skipped_count, 1);
330
1.13G
        } else {
331
1.13G
          switch (ikey.type) {
332
1.20M
            case kTypeDeletion:
333
3.18M
            case kTypeSingleDeletion:
334
              // Arrange to skip all upcoming entries for this key since
335
              // they are hidden by this deletion.
336
3.18M
              saved_key_.SetKey(ikey.user_key,
337
3.18M
                                !iter_->IsKeyPinned() /* copy */);
338
3.18M
              skipping = true;
339
3.18M
              num_skipped = 0;
340
3.18M
              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
341
3.18M
              break;
342
1.13G
            case kTypeValue:
343
1.13G
              valid_ = true;
344
1.13G
              saved_key_.SetKey(ikey.user_key,
345
1.13G
                                !iter_->IsKeyPinned() /* copy */);
346
1.13G
              return;
347
21.4k
            case kTypeMerge:
348
              // By now, we are sure the current ikey is going to yield a value
349
21.4k
              saved_key_.SetKey(ikey.user_key,
350
21.4k
                                !iter_->IsKeyPinned() /* copy */);
351
21.4k
              current_entry_is_merged_ = true;
352
21.4k
              valid_ = true;
353
21.4k
              MergeValuesNewToOld();  // Go to a different state machine
354
21.4k
              return;
355
0
            default:
356
0
              assert(false);
357
0
              break;
358
1.13G
          }
359
1.13G
        }
360
1.14G
      }
361
1.14G
    }
362
    // If we have sequentially iterated via numerous keys and still not
363
    // found the next user-key, then it is better to seek so that we can
364
    // avoid too many key comparisons. We seek to the last occurrence of
365
    // our current key by looking for sequence number 0 and type deletion
366
    // (the smallest type).
367
13.0M
    
if (11.0M
skipping11.0M
&& num_skipped > max_skip_) {
368
9.09k
      num_skipped = 0;
369
9.09k
      std::string last_key;
370
9.09k
      AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
371
9.09k
                                                     kTypeDeletion));
372
9.09k
      iter_->Seek(last_key);
373
9.09k
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
374
11.0M
    } else {
375
11.0M
      iter_->Next();
376
11.0M
    }
377
11.0M
  } while (iter_->Valid());
378
154M
  valid_ = false;
379
154M
}
380
381
// Merge values of the same user key starting from the current iter_ position
382
// Scan from the newer entries to older entries.
383
// PRE: iter_->key() points to the first merge type entry
384
//      saved_key_ stores the user key
385
// POST: saved_value_ has the merged value for the user key
386
//       iter_ points to the next entry (or invalid)
387
21.4k
void DBIter::MergeValuesNewToOld() {
388
21.4k
  if (!user_merge_operator_) {
389
0
    RLOG(InfoLogLevel::ERROR_LEVEL,
390
0
        logger_, "Options::merge_operator is null.");
391
0
    status_ = STATUS(InvalidArgument, "user_merge_operator_ must be set.");
392
0
    valid_ = false;
393
0
    return;
394
0
  }
395
396
  // Start the merge process by pushing the first operand
397
21.4k
  std::deque<std::string> operands;
398
21.4k
  operands.push_front(iter_->value().ToString());
399
400
21.4k
  ParsedInternalKey ikey;
401
22.9k
  for (iter_->Next(); iter_->Valid(); 
iter_->Next()1.42k
) {
402
22.8k
    if (!ParseKey(&ikey)) {
403
      // skip corrupted key
404
0
      continue;
405
0
    }
406
407
22.8k
    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
408
      // hit the next user key, stop right here
409
9.90k
      break;
410
12.9k
    } else if (kTypeDeletion == ikey.type || 
kTypeSingleDeletion == ikey.type12.1k
) {
411
      // hit a delete with the same user key, stop right here
412
      // iter_ is positioned after delete
413
868
      iter_->Next();
414
868
      break;
415
12.1k
    } else if (kTypeValue == ikey.type) {
416
      // hit a put, merge the put value with operands and store the
417
      // final result in saved_value_. We are done!
418
      // ignore corruption if there is any.
419
10.6k
      const Slice val = iter_->value();
420
10.6k
      {
421
10.6k
        StopWatchNano timer(env_, statistics_ != nullptr);
422
10.6k
        PERF_TIMER_GUARD(merge_operator_time_nanos);
423
10.6k
        user_merge_operator_->FullMerge(ikey.user_key, &val, operands,
424
10.6k
                                        &saved_value_, logger_);
425
10.6k
        RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
426
10.6k
                   timer.ElapsedNanos());
427
10.6k
      }
428
      // iter_ is positioned after put
429
10.6k
      iter_->Next();
430
10.6k
      return;
431
10.6k
    } else 
if (1.42k
kTypeMerge == ikey.type1.42k
) {
432
      // hit a merge, add the value as an operand and run associative merge.
433
      // when complete, add result to operands and continue.
434
1.42k
      const Slice& val = iter_->value();
435
1.42k
      operands.push_front(val.ToString());
436
1.42k
    } else {
437
0
      assert(false);
438
0
    }
439
22.8k
  }
440
441
10.7k
  {
442
10.7k
    StopWatchNano timer(env_, statistics_ != nullptr);
443
10.7k
    PERF_TIMER_GUARD(merge_operator_time_nanos);
444
    // we either exhausted all internal keys under this user key, or hit
445
    // a deletion marker.
446
    // feed null as the existing value to the merge operator, such that
447
    // client can differentiate this scenario and do things accordingly.
448
10.7k
    user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
449
10.7k
                                    &saved_value_, logger_);
450
10.7k
    RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
451
10.7k
  }
452
10.7k
}
453
454
5.97M
void DBIter::Prev() {
455
5.97M
  assert(valid_);
456
5.97M
  if (direction_ == kForward) {
457
532k
    ReverseToBackward();
458
532k
  }
459
5.97M
  PrevInternal();
460
5.97M
  if (statistics_ != nullptr) {
461
5.44M
    RecordTick(statistics_, NUMBER_DB_PREV);
462
5.44M
    if (valid_) {
463
5.44M
      RecordTick(statistics_, NUMBER_DB_PREV_FOUND);
464
5.44M
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
465
5.44M
    }
466
5.44M
  }
467
5.97M
  if (valid_ && 
prefix_extractor_5.93M
&&
prefix_same_as_start_15
&&
468
5.97M
      prefix_extractor_->Transform(saved_key_.GetKey())
469
0
              .compare(prefix_start_.GetKey()) != 0) {
470
0
    valid_ = false;
471
0
  }
472
5.97M
}
473
474
532k
void DBIter::ReverseToBackward() {
475
532k
  if (current_entry_is_merged_) {
476
    // Not placed in the same key. Need to call Prev() until finding the
477
    // previous key.
478
3
    if (!iter_->Valid()) {
479
1
      iter_->SeekToLast();
480
1
    }
481
3
    ParsedInternalKey ikey;
482
3
    FindParseableKey(&ikey, kReverse);
483
5
    while (iter_->Valid() &&
484
5
           user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
485
2
      iter_->Prev();
486
2
      FindParseableKey(&ikey, kReverse);
487
2
    }
488
3
  }
489
532k
#ifndef NDEBUG
490
532k
  if (iter_->Valid()) {
491
532k
    ParsedInternalKey ikey;
492
532k
    assert(ParseKey(&ikey));
493
0
    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0);
494
532k
  }
495
0
#endif
496
497
0
  FindPrevUserKey();
498
532k
  direction_ = kReverse;
499
532k
}
500
501
6.83M
void DBIter::PrevInternal() {
502
6.83M
  if (!iter_->Valid()) {
503
40.8k
    valid_ = false;
504
40.8k
    return;
505
40.8k
  }
506
507
6.79M
  ParsedInternalKey ikey;
508
509
6.84M
  while (iter_->Valid()) {
510
6.84M
    saved_key_.SetKey(ExtractUserKey(iter_->key()),
511
6.84M
                      !iter_->IsKeyPinned() /* copy */);
512
6.84M
    if (FindValueForCurrentKey()) {
513
6.78M
      valid_ = true;
514
6.78M
      if (!iter_->Valid()) {
515
25.1k
        return;
516
25.1k
      }
517
6.76M
      FindParseableKey(&ikey, kReverse);
518
6.76M
      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
519
651
        FindPrevUserKey();
520
651
      }
521
6.76M
      return;
522
6.78M
    }
523
54.1k
    if (!iter_->Valid()) {
524
1.41k
      break;
525
1.41k
    }
526
52.7k
    FindParseableKey(&ikey, kReverse);
527
52.7k
    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
528
10.6k
      FindPrevUserKey();
529
10.6k
    }
530
52.7k
  }
531
  // We haven't found any key - iterator is not valid
532
1.79k
  assert(!iter_->Valid());
533
0
  valid_ = false;
534
1.79k
}
535
536
// This function checks, if the entry with biggest sequence_number <= sequence_
537
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
538
// saved_value_
539
6.84M
bool DBIter::FindValueForCurrentKey() {
540
6.84M
  assert(iter_->Valid());
541
0
  merge_operands_.clear();
542
  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
543
  // kTypeValue)
544
6.84M
  ValueType last_not_merge_type = kTypeDeletion;
545
6.84M
  ValueType last_key_entry_type = kTypeDeletion;
546
547
6.84M
  ParsedInternalKey ikey;
548
6.84M
  FindParseableKey(&ikey, kReverse);
549
550
6.84M
  size_t num_skipped = 0;
551
13.7M
  while (iter_->Valid() && 
ikey.sequence <= sequence_13.7M
&&
552
13.7M
         
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())13.7M
) {
553
    // We iterate too much: let's use Seek() to avoid too much key comparisons
554
6.93M
    if (num_skipped >= max_skip_) {
555
970
      return FindValueForCurrentKeyUsingSeek();
556
970
    }
557
558
6.93M
    last_key_entry_type = ikey.type;
559
6.93M
    switch (last_key_entry_type) {
560
6.84M
      case kTypeValue:
561
6.84M
        merge_operands_.clear();
562
6.84M
        saved_value_ = iter_->value().ToString();
563
6.84M
        last_not_merge_type = kTypeValue;
564
6.84M
        break;
565
66.1k
      case kTypeDeletion:
566
66.1k
      case kTypeSingleDeletion:
567
66.1k
        merge_operands_.clear();
568
66.1k
        last_not_merge_type = last_key_entry_type;
569
66.1k
        PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
570
66.1k
        break;
571
20.1k
      case kTypeMerge:
572
20.1k
        assert(user_merge_operator_ != nullptr);
573
0
        merge_operands_.push_back(iter_->value().ToString());
574
20.1k
        break;
575
0
      default:
576
0
        assert(false);
577
6.93M
    }
578
579
6.93M
    PERF_COUNTER_ADD(internal_key_skipped_count, 1);
580
6.93M
    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()));
581
0
    iter_->Prev();
582
6.93M
    ++num_skipped;
583
6.93M
    FindParseableKey(&ikey, kReverse);
584
6.93M
  }
585
586
6.84M
  switch (last_key_entry_type) {
587
53.7k
    case kTypeDeletion:
588
53.7k
    case kTypeSingleDeletion:
589
53.7k
      valid_ = false;
590
53.7k
      return false;
591
15.2k
    case kTypeMerge:
592
15.2k
      if (last_not_merge_type == kTypeDeletion) {
593
8.63k
        StopWatchNano timer(env_, statistics_ != nullptr);
594
8.63k
        PERF_TIMER_GUARD(merge_operator_time_nanos);
595
8.63k
        user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
596
8.63k
                                        merge_operands_, &saved_value_,
597
8.63k
                                        logger_);
598
8.63k
        RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
599
8.63k
                   timer.ElapsedNanos());
600
8.63k
      } else {
601
6.59k
        assert(last_not_merge_type == kTypeValue);
602
0
        std::string last_put_value = saved_value_;
603
6.59k
        Slice temp_slice(last_put_value);
604
6.59k
        {
605
6.59k
          StopWatchNano timer(env_, statistics_ != nullptr);
606
6.59k
          PERF_TIMER_GUARD(merge_operator_time_nanos);
607
6.59k
          user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
608
6.59k
                                          merge_operands_, &saved_value_,
609
6.59k
                                          logger_);
610
6.59k
          RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
611
6.59k
                     timer.ElapsedNanos());
612
6.59k
        }
613
6.59k
      }
614
0
      break;
615
6.77M
    case kTypeValue:
616
      // do nothing - we've already has value in saved_value_
617
6.77M
      break;
618
0
    default:
619
0
      assert(false);
620
0
      break;
621
6.84M
  }
622
6.78M
  valid_ = true;
623
6.78M
  return true;
624
6.84M
}
625
626
// This function is used in FindValueForCurrentKey.
627
// We use Seek() function instead of Prev() to find necessary value
628
970
bool DBIter::FindValueForCurrentKeyUsingSeek() {
629
970
  std::string last_key;
630
970
  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
631
970
                                                 kValueTypeForSeek));
632
970
  iter_->Seek(last_key);
633
970
  RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
634
635
  // assume there is at least one parseable key for this user key
636
970
  ParsedInternalKey ikey;
637
970
  FindParseableKey(&ikey, kForward);
638
639
970
  if (ikey.type == kTypeValue || 
ikey.type == kTypeDeletion579
||
640
970
      
ikey.type == kTypeSingleDeletion195
) {
641
776
    if (ikey.type == kTypeValue) {
642
391
      saved_value_ = iter_->value().ToString();
643
391
      valid_ = true;
644
391
      return true;
645
391
    }
646
385
    valid_ = false;
647
385
    return false;
648
776
  }
649
650
  // kTypeMerge. We need to collect all kTypeMerge values and save them
651
  // in operands
652
194
  std::deque<std::string> operands;
653
  // TODO: we dont need rocksdb level merge records and only use RocksDB level tombstones in
654
  // intentsdb, so maybe we can be more efficient here.
655
20.2k
  while (iter_->Valid() &&
656
20.2k
         
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())20.0k
&&
657
20.2k
         
ikey.type == kTypeMerge20.0k
) {
658
20.0k
    operands.push_front(iter_->value().ToString());
659
20.0k
    iter_->Next();
660
20.0k
    FindParseableKey(&ikey, kForward);
661
20.0k
  }
662
663
194
  if (!iter_->Valid() ||
664
194
      
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())2
||
665
194
      
ikey.type == kTypeDeletion2
||
ikey.type == kTypeSingleDeletion0
) {
666
194
    {
667
194
      StopWatchNano timer(env_, statistics_ != nullptr);
668
194
      PERF_TIMER_GUARD(merge_operator_time_nanos);
669
194
      user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
670
194
                                      &saved_value_, logger_);
671
194
      RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
672
194
    }
673
    // Make iter_ valid and point to saved_key_
674
194
    if (!iter_->Valid() ||
675
194
        
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())2
) {
676
192
      iter_->Seek(last_key);
677
192
      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
678
192
    }
679
194
    valid_ = true;
680
194
    return true;
681
194
  }
682
683
0
  const Slice& val = iter_->value();
684
0
  {
685
0
    StopWatchNano timer(env_, statistics_ != nullptr);
686
0
    PERF_TIMER_GUARD(merge_operator_time_nanos);
687
0
    user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands,
688
0
                                    &saved_value_, logger_);
689
0
    RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
690
0
  }
691
0
  valid_ = true;
692
0
  return true;
693
194
}
694
695
// Used in Next to change directions
696
// Go to next user key
697
// Don't use Seek(),
698
// because next user key will be very close
699
48.2k
void DBIter::FindNextUserKey() {
700
48.2k
  if (!iter_->Valid()) {
701
3.68k
    return;
702
3.68k
  }
703
44.5k
  ParsedInternalKey ikey;
704
44.5k
  FindParseableKey(&ikey, kForward);
705
89.1k
  while (iter_->Valid() &&
706
89.1k
         !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
707
44.5k
    iter_->Next();
708
44.5k
    FindParseableKey(&ikey, kForward);
709
44.5k
  }
710
44.5k
}
711
712
// Go to previous user_key
713
544k
void DBIter::FindPrevUserKey() {
714
544k
  if (!iter_->Valid()) {
715
0
    return;
716
0
  }
717
544k
  size_t num_skipped = 0;
718
544k
  ParsedInternalKey ikey;
719
544k
  FindParseableKey(&ikey, kReverse);
720
544k
  int cmp;
721
1.09M
  while (iter_->Valid() && 
(1.06M
(cmp = user_comparator_->Compare(
722
1.06M
                                 ikey.user_key, saved_key_.GetKey())) == 0 ||
723
1.06M
                            
(515k
cmp > 0515k
&&
ikey.sequence > sequence_18
))) {
724
553k
    if (
cmp == 0553k
) {
725
553k
      if (num_skipped >= max_skip_) {
726
1.17k
        num_skipped = 0;
727
1.17k
        IterKey last_key;
728
1.17k
        last_key.SetInternalKey(ParsedInternalKey(
729
1.17k
            saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
730
1.17k
        iter_->Seek(last_key.GetKey());
731
1.17k
        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
732
552k
      } else {
733
552k
        ++num_skipped;
734
552k
      }
735
553k
    }
736
553k
    iter_->Prev();
737
553k
    FindParseableKey(&ikey, kReverse);
738
553k
  }
739
544k
}
740
741
// Skip all unparseable keys
742
21.7M
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
743
21.7M
  while (iter_->Valid() && 
!ParseKey(ikey)21.7M
) {
744
0
    if (direction == kReverse) {
745
0
      iter_->Prev();
746
0
    } else {
747
0
      iter_->Next();
748
0
    }
749
0
  }
750
21.7M
}
751
752
195M
void DBIter::Seek(const Slice& target) {
753
195M
  saved_key_.Clear();
754
  // now savved_key is used to store internal key.
755
195M
  saved_key_.SetInternalKey(target, sequence_);
756
757
195M
  {
758
195M
    PERF_TIMER_GUARD(seek_internal_seek_time);
759
195M
    iter_->Seek(saved_key_.GetKey());
760
195M
  }
761
762
195M
  RecordTick(statistics_, NUMBER_DB_SEEK);
763
195M
  if (iter_->Valid()) {
764
175M
    direction_ = kForward;
765
175M
    ClearSavedValue();
766
175M
    FindNextUserEntry(false /* not skipping */);
767
175M
    if (statistics_ != nullptr) {
768
174M
      if (valid_) {
769
152M
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
770
152M
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
771
152M
      }
772
174M
    }
773
175M
  } else {
774
20.6M
    valid_ = false;
775
20.6M
  }
776
195M
  if (valid_ && 
prefix_extractor_152M
&&
prefix_same_as_start_400k
) {
777
8
    prefix_start_.SetKey(prefix_extractor_->Transform(target));
778
8
  }
779
195M
}
780
781
249k
void DBIter::SeekToFirst() {
782
  // Don't use iter_::Seek() if we set a prefix extractor
783
  // because prefix seek will be used.
784
249k
  if (prefix_extractor_ != nullptr) {
785
1.41k
    max_skip_ = std::numeric_limits<uint64_t>::max();
786
1.41k
  }
787
249k
  direction_ = kForward;
788
249k
  ClearSavedValue();
789
790
249k
  {
791
249k
    PERF_TIMER_GUARD(seek_internal_seek_time);
792
249k
    iter_->SeekToFirst();
793
249k
  }
794
795
249k
  RecordTick(statistics_, NUMBER_DB_SEEK);
796
249k
  if (iter_->Valid()) {
797
195k
    FindNextUserEntry(false /* not skipping */);
798
195k
    if (statistics_ != nullptr) {
799
53.4k
      if (valid_) {
800
53.2k
        RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
801
53.2k
        RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
802
53.2k
      }
803
53.4k
    }
804
195k
  } else {
805
54.2k
    valid_ = false;
806
54.2k
  }
807
249k
  if (valid_ && 
prefix_extractor_194k
&&
prefix_same_as_start_1.41k
) {
808
0
    prefix_start_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
809
0
  }
810
249k
}
811
812
860k
void DBIter::SeekToLast() {
813
  // Don't use iter_::Seek() if we set a prefix extractor
814
  // because prefix seek will be used.
815
860k
  if (prefix_extractor_ != nullptr) {
816
36
    max_skip_ = std::numeric_limits<uint64_t>::max();
817
36
  }
818
860k
  direction_ = kReverse;
819
860k
  ClearSavedValue();
820
821
860k
  {
822
860k
    PERF_TIMER_GUARD(seek_internal_seek_time);
823
860k
    iter_->SeekToLast();
824
860k
  }
825
  // When the iterate_upper_bound is set to a value,
826
  // it will seek to the last key before the
827
  // ReadOptions.iterate_upper_bound
828
860k
  if (iter_->Valid() && 
iterate_upper_bound_ != nullptr858k
) {
829
16
    saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
830
16
    std::string last_key;
831
16
    AppendInternalKey(&last_key,
832
16
                      ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
833
16
                                        kValueTypeForSeek));
834
835
16
    iter_->Seek(last_key);
836
837
16
    if (!iter_->Valid()) {
838
4
      iter_->SeekToLast();
839
12
    } else {
840
12
      iter_->Prev();
841
12
      if (!iter_->Valid()) {
842
2
        valid_ = false;
843
2
        return;
844
2
      }
845
12
    }
846
16
  }
847
860k
  PrevInternal();
848
860k
  if (statistics_ != nullptr) {
849
721k
    RecordTick(statistics_, NUMBER_DB_SEEK);
850
721k
    if (valid_) {
851
721k
      RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
852
721k
      RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
853
721k
    }
854
721k
  }
855
860k
  if (valid_ && 
prefix_extractor_858k
&&
prefix_same_as_start_30
) {
856
0
    prefix_start_.SetKey(prefix_extractor_->Transform(saved_key_.GetKey()));
857
0
  }
858
860k
}
859
860
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
861
                        const Comparator* user_key_comparator,
862
                        InternalIterator* internal_iter,
863
                        const SequenceNumber& sequence,
864
                        uint64_t max_sequential_skip_in_iterations,
865
                        uint64_t version_number,
866
                        const Slice* iterate_upper_bound,
867
1.08k
                        bool prefix_same_as_start, bool pin_data) {
868
1.08k
  DBIter* db_iter =
869
1.08k
      new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
870
1.08k
                 false, max_sequential_skip_in_iterations, version_number,
871
1.08k
                 iterate_upper_bound, prefix_same_as_start);
872
1.08k
  if (pin_data) {
873
0
    CHECK_OK(db_iter->PinData());
874
0
  }
875
1.08k
  return db_iter;
876
1.08k
}
877
878
38.1M
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
879
880
38.1M
void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
881
882
38.1M
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
883
38.1M
  static_cast<DBIter*>(db_iter_)->SetIter(iter);
884
38.1M
}
885
886
5.49G
inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
887
249k
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
888
859k
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
889
195M
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
890
195M
  db_iter_->Seek(target);
891
195M
}
892
987M
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
893
5.96M
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
894
12.8G
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
895
2.00G
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
896
201k
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
897
7
inline Status ArenaWrappedDBIter::PinData() { return db_iter_->PinData(); }
898
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
899
424k
                                              std::string* prop) {
900
424k
  return db_iter_->GetProperty(prop_name, prop);
901
424k
}
902
0
inline Status ArenaWrappedDBIter::ReleasePinnedData() {
903
0
  return db_iter_->ReleasePinnedData();
904
0
}
905
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
906
0
                                         void* arg2) {
907
0
  db_iter_->RegisterCleanup(function, arg1, arg2);
908
0
}
909
910
149M
void ArenaWrappedDBIter::RevalidateAfterUpperBoundChange() {
911
149M
  db_iter_->RevalidateAfterUpperBoundChange();
912
149M
}
913
914
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
915
    Env* env, const ImmutableCFOptions& ioptions,
916
    const Comparator* user_key_comparator, const SequenceNumber& sequence,
917
    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
918
    const Slice* iterate_upper_bound, bool prefix_same_as_start,
919
38.1M
    bool pin_data) {
920
38.1M
  ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
921
38.1M
  Arena* arena = iter->GetArena();
922
38.1M
  auto mem = arena->AllocateAligned(sizeof(DBIter));
923
38.1M
  DBIter* db_iter =
924
38.1M
      new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
925
38.1M
                       true, max_sequential_skip_in_iterations, version_number,
926
38.1M
                       iterate_upper_bound, prefix_same_as_start);
927
928
38.1M
  iter->SetDBIter(db_iter);
929
38.1M
  if (pin_data) {
930
7
    CHECK_OK(iter->PinData());
931
7
  }
932
933
38.1M
  return iter;
934
38.1M
}
935
936
}  // namespace rocksdb