YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include "yb/rocksdb/utilities/write_batch_with_index.h"
24
25
#include <limits>
26
#include <memory>
27
28
#include "yb/rocksdb/db/column_family.h"
29
#include "yb/rocksdb/db/merge_context.h"
30
#include "yb/rocksdb/db/merge_helper.h"
31
#include "yb/rocksdb/db/skiplist.h"
32
#include "yb/rocksdb/comparator.h"
33
#include "yb/rocksdb/iterator.h"
34
#include "yb/rocksdb/util/arena.h"
35
#include "yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h"
36
37
namespace rocksdb {
38
39
// when direction == forward
40
// * current_at_base_ <=> base_iterator > delta_iterator
41
// when direction == backwards
42
// * current_at_base_ <=> base_iterator < delta_iterator
43
// always:
44
// * equal_keys_ <=> base_iterator == delta_iterator
45
class BaseDeltaIterator : public Iterator {
46
 public:
47
  BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
48
                    const Comparator* comparator)
49
      : forward_(true),
50
        current_at_base_(true),
51
        equal_keys_(false),
52
        status_(Status::OK()),
53
        base_iterator_(base_iterator),
54
        delta_iterator_(delta_iterator),
55
80
        comparator_(comparator) {}
56
57
80
  virtual ~BaseDeltaIterator() {}
58
59
512k
  bool Valid() const override {
60
512k
    return current_at_base_ ? 
BaseValid()8.28k
:
DeltaValid()504k
;
61
512k
  }
62
63
1.63k
  void SeekToFirst() override {
64
1.63k
    forward_ = true;
65
1.63k
    base_iterator_->SeekToFirst();
66
1.63k
    delta_iterator_->SeekToFirst();
67
1.63k
    UpdateCurrent();
68
1.63k
  }
69
70
1.61k
  void SeekToLast() override {
71
1.61k
    forward_ = false;
72
1.61k
    base_iterator_->SeekToLast();
73
1.61k
    delta_iterator_->SeekToLast();
74
1.61k
    UpdateCurrent();
75
1.61k
  }
76
77
252k
  void Seek(const Slice& k) override {
78
252k
    forward_ = true;
79
252k
    base_iterator_->Seek(k);
80
252k
    delta_iterator_->Seek(k);
81
252k
    UpdateCurrent();
82
252k
  }
83
84
120k
  void Next() override {
85
120k
    if (!Valid()) {
86
0
      status_ = STATUS(NotSupported, "Next() on invalid iterator");
87
0
    }
88
89
120k
    if (!forward_) {
90
      // Need to change direction
91
      // if our direction was backward and we're not equal, we have two states:
92
      // * both iterators are valid: we're already in a good state (current
93
      // shows to smaller)
94
      // * only one iterator is valid: we need to advance that iterator
95
29.3k
      forward_ = true;
96
29.3k
      equal_keys_ = false;
97
29.3k
      if (!BaseValid()) {
98
487
        assert(DeltaValid());
99
0
        base_iterator_->SeekToFirst();
100
28.8k
      } else if (!DeltaValid()) {
101
4
        delta_iterator_->SeekToFirst();
102
28.8k
      } else if (current_at_base_) {
103
        // Change delta from larger than base to smaller
104
180
        AdvanceDelta();
105
28.6k
      } else {
106
        // Change base from larger than delta to smaller
107
28.6k
        AdvanceBase();
108
28.6k
      }
109
29.3k
      if (DeltaValid() && 
BaseValid()29.2k
) {
110
29.2k
        if (comparator_->Equal(delta_iterator_->Entry().key,
111
29.2k
                               base_iterator_->key())) {
112
0
          equal_keys_ = true;
113
0
        }
114
29.2k
      }
115
29.3k
    }
116
0
    Advance();
117
120k
  }
118
119
120k
  void Prev() override {
120
120k
    if (!Valid()) {
121
0
      status_ = STATUS(NotSupported, "Prev() on invalid iterator");
122
0
    }
123
124
120k
    if (forward_) {
125
      // Need to change direction
126
      // if our direction was backward and we're not equal, we have two states:
127
      // * both iterators are valid: we're already in a good state (current
128
      // shows to smaller)
129
      // * only one iterator is valid: we need to advance that iterator
130
90.9k
      forward_ = false;
131
90.9k
      equal_keys_ = false;
132
90.9k
      if (!BaseValid()) {
133
4
        assert(DeltaValid());
134
0
        base_iterator_->SeekToLast();
135
90.9k
      } else if (!DeltaValid()) {
136
35
        delta_iterator_->SeekToLast();
137
90.8k
      } else if (current_at_base_) {
138
        // Change delta from less advanced than base to more advanced
139
227
        AdvanceDelta();
140
90.6k
      } else {
141
        // Change base from less advanced than delta to more advanced
142
90.6k
        AdvanceBase();
143
90.6k
      }
144
90.9k
      if (DeltaValid() && 
BaseValid()90.8k
) {
145
89.0k
        if (comparator_->Equal(delta_iterator_->Entry().key,
146
89.0k
                               base_iterator_->key())) {
147
0
          equal_keys_ = true;
148
0
        }
149
89.0k
      }
150
90.9k
    }
151
152
0
    Advance();
153
120k
  }
154
155
6.39k
  Slice key() const override {
156
6.39k
    return current_at_base_ ? 
base_iterator_->key()2.09k
157
6.39k
                            : 
delta_iterator_->Entry().key4.29k
;
158
6.39k
  }
159
160
6.39k
  Slice value() const override {
161
6.39k
    return current_at_base_ ? 
base_iterator_->value()2.10k
162
6.39k
                            : 
delta_iterator_->Entry().value4.29k
;
163
6.39k
  }
164
165
8.42k
  Status status() const override {
166
8.42k
    if (!status_.ok()) {
167
0
      return status_;
168
0
    }
169
8.42k
    if (!base_iterator_->status().ok()) {
170
0
      return base_iterator_->status();
171
0
    }
172
8.42k
    return delta_iterator_->status();
173
8.42k
  }
174
175
 private:
176
0
  void AssertInvariants() {
177
0
#ifndef NDEBUG
178
0
    if (!Valid()) {
179
0
      return;
180
0
    }
181
0
    if (!BaseValid()) {
182
0
      assert(!current_at_base_ && delta_iterator_->Valid());
183
0
      return;
184
0
    }
185
0
    if (!DeltaValid()) {
186
0
      assert(current_at_base_ && base_iterator_->Valid());
187
0
      return;
188
0
    }
189
0
    // we don't support those yet
190
0
    assert(delta_iterator_->Entry().type != kMergeRecord &&
191
0
           delta_iterator_->Entry().type != kLogDataRecord);
192
0
    int compare = comparator_->Compare(delta_iterator_->Entry().key,
193
0
                                       base_iterator_->key());
194
0
    if (forward_) {
195
0
      // current_at_base -> compare < 0
196
0
      assert(!current_at_base_ || compare < 0);
197
0
      // !current_at_base -> compare <= 0
198
0
      assert(current_at_base_ && compare >= 0);
199
0
    } else {
200
0
      // current_at_base -> compare > 0
201
0
      assert(!current_at_base_ || compare > 0);
202
0
      // !current_at_base -> compare <= 0
203
0
      assert(current_at_base_ && compare <= 0);
204
0
    }
205
0
    // equal_keys_ <=> compare == 0
206
0
    assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
207
0
#endif
208
0
  }
209
210
240k
  void Advance() {
211
240k
    if (equal_keys_) {
212
60.4k
      assert(BaseValid() && DeltaValid());
213
0
      AdvanceBase();
214
60.4k
      AdvanceDelta();
215
180k
    } else {
216
180k
      if (current_at_base_) {
217
883
        assert(BaseValid());
218
0
        AdvanceBase();
219
179k
      } else {
220
179k
        assert(DeltaValid());
221
0
        AdvanceDelta();
222
179k
      }
223
180k
    }
224
0
    UpdateCurrent();
225
240k
  }
226
227
804k
  void AdvanceDelta() {
228
804k
    if (forward_) {
229
500k
      delta_iterator_->Next();
230
500k
    } else {
231
303k
      delta_iterator_->Prev();
232
303k
    }
233
804k
  }
234
466k
  void AdvanceBase() {
235
466k
    if (forward_) {
236
268k
      base_iterator_->Next();
237
268k
    } else {
238
198k
      base_iterator_->Prev();
239
198k
    }
240
466k
  }
241
1.37M
  bool BaseValid() const { return base_iterator_->Valid(); }
242
3.10M
  bool DeltaValid() const { return delta_iterator_->Valid(); }
243
495k
  void UpdateCurrent() {
244
1.06M
    while (true) {
245
1.06M
      WriteEntry delta_entry;
246
1.06M
      if (DeltaValid()) {
247
1.04M
        delta_entry = delta_iterator_->Entry();
248
1.04M
      }
249
1.06M
      equal_keys_ = false;
250
1.06M
      if (!BaseValid()) {
251
        // Base has finished.
252
17.4k
        if (!DeltaValid()) {
253
          // Finished
254
12.9k
          return;
255
12.9k
        }
256
4.47k
        if (delta_entry.type == kDeleteRecord ||
257
4.47k
            
delta_entry.type == kSingleDeleteRecord2.05k
) {
258
2.42k
          AdvanceDelta();
259
2.42k
        } else {
260
2.05k
          current_at_base_ = false;
261
2.05k
          return;
262
2.05k
        }
263
1.04M
      } else if (!DeltaValid()) {
264
        // Delta has finished.
265
185
        current_at_base_ = true;
266
185
        return;
267
1.04M
      } else {
268
1.04M
        int compare =
269
1.04M
            (forward_ ? 
1745k
:
-1297k
) *
270
1.04M
            comparator_->Compare(delta_entry.key, base_iterator_->key());
271
1.04M
        if (compare <= 0) {  // delta bigger or equal
272
1.04M
          if (compare == 0) {
273
529k
            equal_keys_ = true;
274
529k
          }
275
1.04M
          if (delta_entry.type != kDeleteRecord &&
276
1.04M
              
delta_entry.type != kSingleDeleteRecord478k
) {
277
478k
            current_at_base_ = false;
278
478k
            return;
279
478k
          }
280
          // Delta is less advanced and is delete.
281
561k
          AdvanceDelta();
282
561k
          if (equal_keys_) {
283
286k
            AdvanceBase();
284
286k
          }
285
561k
        } else {
286
1.95k
          current_at_base_ = true;
287
1.95k
          return;
288
1.95k
        }
289
1.04M
      }
290
1.06M
    }
291
292
0
    AssertInvariants();
293
0
  }
294
295
  bool forward_;
296
  bool current_at_base_;
297
  bool equal_keys_;
298
  Status status_;
299
  std::unique_ptr<Iterator> base_iterator_;
300
  std::unique_ptr<WBWIIterator> delta_iterator_;
301
  const Comparator* comparator_;  // not owned
302
};
303
304
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
305
    WriteBatchEntrySkipList;
306
307
class WBWIIteratorImpl : public WBWIIterator {
308
 public:
309
  WBWIIteratorImpl(uint32_t column_family_id,
310
                   WriteBatchEntrySkipList* skip_list,
311
                   const ReadableWriteBatch* write_batch)
312
      : column_family_id_(column_family_id),
313
        skip_list_iter_(skip_list),
314
501k
        write_batch_(write_batch) {}
315
316
501k
  virtual ~WBWIIteratorImpl() {}
317
318
3.60M
  bool Valid() const override {
319
3.60M
    if (!skip_list_iter_.Valid()) {
320
39.9k
      return false;
321
39.9k
    }
322
3.56M
    const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
323
3.56M
    return (iter_entry != nullptr &&
324
3.56M
            iter_entry->column_family == column_family_id_);
325
3.60M
  }
326
327
1.66k
  void SeekToFirst() override {
328
1.66k
    WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
329
1.66k
                                      column_family_id_);
330
1.66k
    skip_list_iter_.Seek(&search_entry);
331
1.66k
  }
332
333
1.84k
  void SeekToLast() override {
334
1.84k
    WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
335
1.84k
                                      column_family_id_ + 1);
336
1.84k
    skip_list_iter_.Seek(&search_entry);
337
1.84k
    if (!skip_list_iter_.Valid()) {
338
1.41k
      skip_list_iter_.SeekToLast();
339
1.41k
    } else {
340
431
      skip_list_iter_.Prev();
341
431
    }
342
1.84k
  }
343
344
753k
  void Seek(const Slice& key) override {
345
753k
    WriteBatchIndexEntry search_entry(&key, column_family_id_);
346
753k
    skip_list_iter_.Seek(&search_entry);
347
753k
  }
348
349
500k
  void Next() override { skip_list_iter_.Next(); }
350
351
303k
  void Prev() override { skip_list_iter_.Prev(); }
352
353
1.67M
  WriteEntry Entry() const override {
354
1.67M
    WriteEntry ret;
355
1.67M
    Slice blob;
356
1.67M
    const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
357
    // this is guaranteed with Valid()
358
1.67M
    assert(iter_entry != nullptr &&
359
1.67M
           iter_entry->column_family == column_family_id_);
360
0
    auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type,
361
1.67M
                                                  &ret.key, &ret.value, &blob);
362
1.67M
    assert(s.ok());
363
0
    assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
364
1.67M
           ret.type == kSingleDeleteRecord || ret.type == kMergeRecord);
365
0
    return ret;
366
1.67M
  }
367
368
8.61k
  Status status() const override {
369
    // this is in-memory data structure, so the only way status can be non-ok is
370
    // through memory corruption
371
8.61k
    return Status::OK();
372
8.61k
  }
373
374
499k
  const WriteBatchIndexEntry* GetRawEntry() const {
375
499k
    return skip_list_iter_.key();
376
499k
  }
377
378
 private:
379
  uint32_t column_family_id_;
380
  WriteBatchEntrySkipList::Iterator skip_list_iter_;
381
  const ReadableWriteBatch* write_batch_;
382
};
383
384
struct WriteBatchWithIndex::Rep {
385
  Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
386
      bool _overwrite_key = false)
387
      : write_batch(reserved_bytes),
388
        comparator(index_comparator, &write_batch),
389
        skip_list(comparator, &arena),
390
        overwrite_key(_overwrite_key),
391
732k
        last_entry_offset(0) {}
392
  ReadableWriteBatch write_batch;
393
  WriteBatchEntryComparator comparator;
394
  Arena arena;
395
  WriteBatchEntrySkipList skip_list;
396
  bool overwrite_key;
397
  size_t last_entry_offset;
398
399
  // Remember current offset of internal write batch, which is used as
400
  // the starting offset of the next record.
401
7.78M
  void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
402
403
  // In overwrite mode, find the existing entry for the same key and update it
404
  // to point to the current entry.
405
  // Return true if the key is found and updated.
406
  bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
407
  bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
408
409
  // Add the recent entry to the update.
410
  // In overwrite mode, if key already exists in the index, update it.
411
  void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
412
  void AddOrUpdateIndex(const Slice& key);
413
414
  // Allocate an index entry pointing to the last entry in the write batch and
415
  // put it to skip list.
416
  void AddNewEntry(uint32_t column_family_id);
417
418
  // Clear all updates buffered in this batch.
419
  void Clear();
420
  void ClearIndex();
421
422
  // Rebuild index by reading all records from the batch.
423
  // Returns non-ok status on corruption.
424
  Status ReBuildIndex();
425
};
426
427
bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
428
7.27M
    ColumnFamilyHandle* column_family, const Slice& key) {
429
7.27M
  uint32_t cf_id = GetColumnFamilyID(column_family);
430
7.27M
  return UpdateExistingEntryWithCfId(cf_id, key);
431
7.27M
}
432
433
bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
434
7.77M
    uint32_t column_family_id, const Slice& key) {
435
7.77M
  if (!overwrite_key) {
436
7.28M
    return false;
437
7.28M
  }
438
439
493k
  WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
440
493k
  iter.Seek(key);
441
493k
  if (!iter.Valid()) {
442
778
    return false;
443
778
  }
444
492k
  if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
445
59
    return false;
446
59
  }
447
492k
  WriteBatchIndexEntry* non_const_entry =
448
492k
      const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
449
492k
  non_const_entry->offset = last_entry_offset;
450
492k
  return true;
451
492k
}
452
453
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
454
7.27M
    ColumnFamilyHandle* column_family, const Slice& key) {
455
7.27M
  if (!UpdateExistingEntry(column_family, key)) {
456
7.27M
    uint32_t cf_id = GetColumnFamilyID(column_family);
457
7.27M
    const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
458
7.27M
    if (
cf_cmp != nullptr7.27M
) {
459
7.27M
      comparator.SetComparatorForCF(cf_id, cf_cmp);
460
7.27M
    }
461
7.27M
    AddNewEntry(cf_id);
462
7.27M
  }
463
7.27M
}
464
465
500k
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
466
500k
  if (!UpdateExistingEntryWithCfId(0, key)) {
467
176
    AddNewEntry(0);
468
176
  }
469
500k
}
470
471
7.28M
void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
472
7.28M
    auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
473
7.28M
    auto* index_entry =
474
7.28M
        new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id);
475
7.28M
    skip_list.Insert(index_entry);
476
7.28M
  }
477
478
3.22k
  void WriteBatchWithIndex::Rep::Clear() {
479
3.22k
    write_batch.Clear();
480
3.22k
    ClearIndex();
481
3.22k
  }
482
483
3.24k
  void WriteBatchWithIndex::Rep::ClearIndex() {
484
3.24k
    skip_list.~WriteBatchEntrySkipList();
485
3.24k
    arena.~Arena();
486
3.24k
    new (&arena) Arena();
487
3.24k
    new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
488
3.24k
    last_entry_offset = 0;
489
3.24k
  }
490
491
21
  Status WriteBatchWithIndex::Rep::ReBuildIndex() {
492
21
    Status s;
493
494
21
    ClearIndex();
495
496
21
    if (write_batch.Count() == 0) {
497
      // Nothing to re-index
498
5
      return s;
499
5
    }
500
501
16
    size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
502
503
16
    Slice input(write_batch.Data());
504
16
    input.remove_prefix(offset);
505
506
    // Loop through all entries in Rep and add each one to the index
507
16
    size_t found = 0;
508
111
    while (s.ok() && !input.empty()) {
509
95
      Slice key, value, blob;
510
95
      uint32_t column_family_id = 0;  // default
511
95
      char tag = 0;
512
513
      // set offset of current entry for call to AddNewEntry()
514
95
      last_entry_offset = input.cdata() - write_batch.Data().data();
515
516
95
      s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
517
95
                                   &value, &blob);
518
95
      if (!s.ok()) {
519
0
        break;
520
0
      }
521
522
95
      switch (tag) {
523
21
        case kTypeColumnFamilyValue:
524
74
        case kTypeValue:
525
85
        case kTypeColumnFamilyDeletion:
526
91
        case kTypeDeletion:
527
95
        case kTypeColumnFamilySingleDeletion:
528
95
        case kTypeSingleDeletion:
529
95
        case kTypeColumnFamilyMerge:
530
95
        case kTypeMerge:
531
95
          found++;
532
95
          if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
533
90
            AddNewEntry(column_family_id);
534
90
          }
535
95
          break;
536
0
        case kTypeLogData:
537
0
          break;
538
0
        default:
539
0
          return STATUS(Corruption, "unknown WriteBatch tag");
540
95
      }
541
95
    }
542
543
16
    if (s.ok() && found != write_batch.Count()) {
544
0
      s = STATUS(Corruption, "WriteBatch has wrong count");
545
0
    }
546
547
16
    return s;
548
16
  }
549
550
WriteBatchWithIndex::WriteBatchWithIndex(
551
    const Comparator* default_index_comparator, size_t reserved_bytes,
552
    bool overwrite_key)
553
732k
    : rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {}
554
555
733k
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
556
557
735k
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
558
559
24
WBWIIterator* WriteBatchWithIndex::NewIterator() {
560
24
  return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
561
24
}
562
563
WBWIIterator* WriteBatchWithIndex::NewIterator(
564
334
    ColumnFamilyHandle* column_family) {
565
334
  return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
566
334
                              &(rep->skip_list), &rep->write_batch);
567
334
}
568
569
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
570
71
    ColumnFamilyHandle* column_family, Iterator* base_iterator) {
571
71
  if (rep->overwrite_key == false) {
572
0
    assert(false);
573
0
    return nullptr;
574
0
  }
575
71
  return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
576
71
                               GetColumnFamilyUserComparator(column_family));
577
71
}
578
579
9
Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
580
9
  if (rep->overwrite_key == false) {
581
0
    assert(false);
582
0
    return nullptr;
583
0
  }
584
  // default column family's comparator
585
9
  return new BaseDeltaIterator(base_iterator, NewIterator(),
586
9
                               rep->comparator.default_comparator());
587
9
}
588
589
void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
590
7.28M
                              const Slice& key, const Slice& value) {
591
7.28M
  rep->SetLastEntryOffset();
592
7.28M
  rep->write_batch.Put(column_family, key, value);
593
7.28M
  rep->AddOrUpdateIndex(column_family, key);
594
7.28M
}
595
596
250k
void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
597
250k
  rep->SetLastEntryOffset();
598
250k
  rep->write_batch.Put(key, value);
599
250k
  rep->AddOrUpdateIndex(key);
600
250k
}
601
602
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
603
286
                                 const Slice& key) {
604
286
  rep->SetLastEntryOffset();
605
286
  rep->write_batch.Delete(column_family, key);
606
286
  rep->AddOrUpdateIndex(column_family, key);
607
286
}
608
609
250k
void WriteBatchWithIndex::Delete(const Slice& key) {
610
250k
  rep->SetLastEntryOffset();
611
250k
  rep->write_batch.Delete(key);
612
250k
  rep->AddOrUpdateIndex(key);
613
250k
}
614
615
void WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
616
9
                                       const Slice& key) {
617
9
  rep->SetLastEntryOffset();
618
9
  rep->write_batch.SingleDelete(column_family, key);
619
9
  rep->AddOrUpdateIndex(column_family, key);
620
9
}
621
622
15
void WriteBatchWithIndex::SingleDelete(const Slice& key) {
623
15
  rep->SetLastEntryOffset();
624
15
  rep->write_batch.SingleDelete(key);
625
15
  rep->AddOrUpdateIndex(key);
626
15
}
627
628
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
629
13
                                const Slice& key, const Slice& value) {
630
13
  rep->SetLastEntryOffset();
631
13
  rep->write_batch.Merge(column_family, key, value);
632
13
  rep->AddOrUpdateIndex(column_family, key);
633
13
}
634
635
19
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
636
19
  rep->SetLastEntryOffset();
637
19
  rep->write_batch.Merge(key, value);
638
19
  rep->AddOrUpdateIndex(key);
639
19
}
640
641
0
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
642
0
  rep->write_batch.PutLogData(blob);
643
0
}
644
645
3.22k
void WriteBatchWithIndex::Clear() { rep->Clear(); }
646
647
Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
648
                                         const DBOptions& options,
649
46
                                         const Slice& key, std::string* value) {
650
46
  Status s;
651
46
  MergeContext merge_context;
652
653
46
  WriteBatchWithIndexInternal::Result result =
654
46
      WriteBatchWithIndexInternal::GetFromBatch(
655
46
          options, this, column_family, key, &merge_context, &rep->comparator,
656
46
          value, rep->overwrite_key, &s);
657
658
46
  switch (result) {
659
21
    case WriteBatchWithIndexInternal::Result::kFound:
660
21
    case WriteBatchWithIndexInternal::Result::kError:
661
      // use returned status
662
21
      break;
663
9
    case WriteBatchWithIndexInternal::Result::kDeleted:
664
15
    case WriteBatchWithIndexInternal::Result::kNotFound:
665
15
      s = STATUS(NotFound, "");
666
15
      break;
667
10
    case WriteBatchWithIndexInternal::Result::kMergeInProgress:
668
10
      s = STATUS(MergeInProgress, "");
669
10
      break;
670
0
    default:
671
0
      assert(false);
672
46
  }
673
674
46
  return s;
675
46
}
676
677
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
678
                                              const ReadOptions& read_options,
679
                                              const Slice& key,
680
26
                                              std::string* value) {
681
26
  return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
682
26
                           value);
683
26
}
684
685
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
686
                                              const ReadOptions& read_options,
687
                                              ColumnFamilyHandle* column_family,
688
                                              const Slice& key,
689
190
                                              std::string* value) {
690
190
  Status s;
691
190
  MergeContext merge_context;
692
190
  const DBOptions& options = db->GetDBOptions();
693
694
190
  std::string batch_value;
695
190
  WriteBatchWithIndexInternal::Result result =
696
190
      WriteBatchWithIndexInternal::GetFromBatch(
697
190
          options, this, column_family, key, &merge_context, &rep->comparator,
698
190
          &batch_value, rep->overwrite_key, &s);
699
700
190
  if (result == WriteBatchWithIndexInternal::Result::kFound) {
701
37
    value->assign(batch_value.data(), batch_value.size());
702
37
    return s;
703
37
  }
704
153
  if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
705
13
    return STATUS(NotFound, "");
706
13
  }
707
140
  if (result == WriteBatchWithIndexInternal::Result::kError) {
708
0
    return s;
709
0
  }
710
140
  if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
711
140
      
rep->overwrite_key == true15
) {
712
    // Since we've overwritten keys, we do not know what other operations are
713
    // in this batch for this key, so we cannot do a Merge to compute the
714
    // result.  Instead, we will simply return MergeInProgress.
715
5
    return STATUS(MergeInProgress, "");
716
5
  }
717
718
135
  assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
719
135
         result == WriteBatchWithIndexInternal::Result::kNotFound);
720
721
  // Did not find key in batch OR could not resolve Merges.  Try DB.
722
0
  s = db->Get(read_options, column_family, key, value);
723
724
135
  if (s.ok() || 
s.IsNotFound()51
) { // DB Get Succeeded
725
135
    if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
726
      // Merge result from DB with merges in Batch
727
10
      auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
728
10
      const MergeOperator* merge_operator =
729
10
          cfh->cfd()->ioptions()->merge_operator;
730
10
      Statistics* statistics = options.statistics.get();
731
10
      Env* env = options.env;
732
10
      Logger* logger = options.info_log.get();
733
734
10
      Slice db_slice(*value);
735
10
      Slice* merge_data;
736
10
      if (s.ok()) {
737
5
        merge_data = &db_slice;
738
5
      } else {  // Key not present in db (s.IsNotFound())
739
5
        merge_data = nullptr;
740
5
      }
741
742
10
      s = MergeHelper::TimedFullMerge(
743
10
          key, merge_data, merge_context.GetOperands(), merge_operator,
744
10
          statistics, env, logger, value);
745
10
    }
746
135
  }
747
748
135
  return s;
749
140
}
750
751
26
void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
752
753
29
Status WriteBatchWithIndex::RollbackToSavePoint() {
754
29
  Status s = rep->write_batch.RollbackToSavePoint();
755
756
29
  if (s.ok()) {
757
21
    s = rep->ReBuildIndex();
758
21
  }
759
760
29
  return s;
761
29
}
762
763
}  // namespace rocksdb
764
#endif  // !ROCKSDB_LITE