YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/memtable.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/memtable.h"
25
26
#include <algorithm>
27
#include <limits>
28
29
#include "yb/rocksdb/db/dbformat.h"
30
#include "yb/rocksdb/db/merge_context.h"
31
#include "yb/rocksdb/env.h"
32
#include "yb/rocksdb/iterator.h"
33
#include "yb/rocksdb/merge_operator.h"
34
#include "yb/rocksdb/slice_transform.h"
35
#include "yb/rocksdb/table/internal_iterator.h"
36
#include "yb/rocksdb/util/arena.h"
37
#include "yb/rocksdb/util/coding.h"
38
#include "yb/rocksdb/util/murmurhash.h"
39
#include "yb/rocksdb/util/mutexlock.h"
40
#include "yb/rocksdb/util/perf_context_imp.h"
41
#include "yb/rocksdb/util/statistics.h"
42
#include "yb/rocksdb/util/stop_watch.h"
43
44
#include "yb/util/mem_tracker.h"
45
#include "yb/util/stats/perf_step_timer.h"
46
47
using std::ostringstream;
48
49
namespace rocksdb {
50
51
MemTableOptions::MemTableOptions(
52
    const ImmutableCFOptions& ioptions,
53
    const MutableCFOptions& mutable_cf_options)
54
  : write_buffer_size(mutable_cf_options.write_buffer_size),
55
    arena_block_size(mutable_cf_options.arena_block_size),
56
    memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits),
57
    memtable_prefix_bloom_probes(
58
        mutable_cf_options.memtable_prefix_bloom_probes),
59
    memtable_prefix_bloom_huge_page_tlb_size(
60
        mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size),
61
    inplace_update_support(ioptions.inplace_update_support),
62
    inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
63
    inplace_callback(ioptions.inplace_callback),
64
    max_successive_merges(mutable_cf_options.max_successive_merges),
65
    filter_deletes(mutable_cf_options.filter_deletes),
66
    statistics(ioptions.statistics),
67
    merge_operator(ioptions.merge_operator),
68
379k
    info_log(ioptions.info_log) {
69
379k
  if (ioptions.mem_tracker) {
70
331k
    mem_tracker = yb::MemTracker::FindOrCreateTracker("MemTable", ioptions.mem_tracker);
71
331k
  }
72
379k
}
73
74
MemTable::MemTable(const InternalKeyComparator& cmp,
75
                   const ImmutableCFOptions& ioptions,
76
                   const MutableCFOptions& mutable_cf_options,
77
                   WriteBuffer* write_buffer, SequenceNumber earliest_seq)
78
    : comparator_(cmp),
79
      moptions_(ioptions, mutable_cf_options),
80
      refs_(0),
81
      kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
82
      arena_(moptions_.arena_block_size, 0),
83
      allocator_(&arena_, write_buffer),
84
      table_(ioptions.memtable_factory->CreateMemTableRep(
85
          comparator_, &allocator_, ioptions.prefix_extractor,
86
          ioptions.info_log)),
87
      data_size_(0),
88
      num_entries_(0),
89
      num_deletes_(0),
90
      flush_in_progress_(false),
91
      flush_completed_(false),
92
      file_number_(0),
93
      first_seqno_(0),
94
      earliest_seqno_(earliest_seq),
95
      mem_next_logfile_number_(0),
96
      locks_(moptions_.inplace_update_support
97
                 ? moptions_.inplace_update_num_locks
98
                 : 0),
99
      prefix_extractor_(ioptions.prefix_extractor),
100
      flush_state_(FlushState::kNotRequested),
101
380k
      env_(ioptions.env) {
102
380k
  UpdateFlushState();
103
  // something went wrong if we need to flush before inserting anything
104
380k
  assert(!ShouldScheduleFlush());
105
106
380k
  if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
107
48
    prefix_bloom_.reset(new DynamicBloom(
108
48
        &allocator_,
109
48
        moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
110
48
        moptions_.memtable_prefix_bloom_probes, nullptr,
111
48
        moptions_.memtable_prefix_bloom_huge_page_tlb_size,
112
48
        ioptions.info_log));
113
48
  }
114
115
380k
  if (moptions_.mem_tracker) {
116
331k
    arena_.SetMemTracker(moptions_.mem_tracker);
117
331k
  }
118
380k
}
119
120
361k
MemTable::~MemTable() { DCHECK_EQ(refs_, 0); }
121
122
128k
size_t MemTable::ApproximateMemoryUsage() {
123
128k
  size_t arena_usage = arena_.ApproximateMemoryUsage();
124
128k
  size_t table_usage = table_->ApproximateMemoryUsage();
125
  // let MAX_USAGE =  std::numeric_limits<size_t>::max()
126
  // then if arena_usage + total_usage >= MAX_USAGE, return MAX_USAGE.
127
  // the following variation is to avoid numeric overflow.
128
128k
  if (arena_usage >= std::numeric_limits<size_t>::max() - table_usage) {
129
0
    return std::numeric_limits<size_t>::max();
130
0
  }
131
  // otherwise, return the actual usage
132
128k
  return arena_usage + table_usage;
133
128k
}
134
135
94.7M
bool MemTable::ShouldFlushNow() const {
136
  // In a lot of times, we cannot allocate arena blocks that exactly matches the
137
  // buffer size. Thus we have to decide if we should over-allocate or
138
  // under-allocate.
139
  // This constant variable can be interpreted as: if we still have more than
140
  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
141
  // allocate one more block.
142
94.7M
  const double kAllowOverAllocationRatio = 0.6;
143
144
  // If arena still have room for new block allocation, we can safely say it
145
  // shouldn't flush.
146
94.7M
  auto allocated_memory =
147
94.7M
      table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
148
149
  // if we can still allocate one more block without exceeding the
150
  // over-allocation ratio, then we should not flush.
151
94.7M
  if (allocated_memory + kArenaBlockSize <
152
94.7M
      moptions_.write_buffer_size +
153
93.9M
      kArenaBlockSize * kAllowOverAllocationRatio) {
154
93.9M
    return false;
155
93.9M
  }
156
157
  // if user keeps adding entries that exceeds moptions.write_buffer_size,
158
  // we need to flush earlier even though we still have much available
159
  // memory left.
160
829k
  if (allocated_memory > moptions_.write_buffer_size +
161
3.44k
      kArenaBlockSize * kAllowOverAllocationRatio) {
162
3.44k
    return true;
163
3.44k
  }
164
165
  // In this code path, Arena has already allocated its "last block", which
166
  // means the total allocatedmemory size is either:
167
  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
168
  // block size`. Or,
169
  //  (2) the allocated memory is less than write buffer size, but we'll stop
170
  // here since if we allocate a new arena block, we'll over allocate too much
171
  // more (half of the arena block size) memory.
172
  //
173
  // In either case, to avoid over-allocate, the last block will stop allocation
174
  // when its usage reaches a certain ratio, which we carefully choose "0.75
175
  // full" as the stop condition because it addresses the following issue with
176
  // great simplicity: What if the next inserted entry's size is
177
  // bigger than AllocatedAndUnused()?
178
  //
179
  // The answer is: if the entry size is also bigger than 0.25 *
180
  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
181
  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
182
  // and regular block. In either case, we *overly* over-allocated.
183
  //
184
  // Therefore, setting the last block to be at most "0.75 full" avoids both
185
  // cases.
186
  //
187
  // NOTE: the average percentage of waste space of this approach can be counted
188
  // as: "arena block size * 0.25 / write buffer size". User who specify a small
189
  // write buffer size and/or big arena block size may suffer.
190
825k
  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
191
825k
}
192
193
97.8M
void MemTable::UpdateFlushState() {
194
97.8M
  auto state = flush_state_.load(std::memory_order_relaxed);
195
97.8M
  if (state == FlushState::kNotRequested && ShouldFlushNow()) {
196
    // ignore CAS failure, because that means somebody else requested
197
    // a flush
198
11.8k
    flush_state_.compare_exchange_strong(state, FlushState::kRequested,
199
11.8k
                                         std::memory_order_relaxed,
200
11.8k
                                         std::memory_order_relaxed);
201
11.8k
  }
202
97.8M
}
203
204
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
205
8.46G
                                        const char* prefix_len_key2) const {
206
  // Internal keys are encoded as length-prefixed strings.
207
8.46G
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
208
8.46G
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
209
8.46G
  return comparator.Compare(k1, k2);
210
8.46G
}
211
212
int MemTable::KeyComparator::operator()(const char* prefix_len_key,
213
                                        const Slice& key)
214
757k
    const {
215
  // Internal keys are encoded as length-prefixed strings.
216
757k
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
217
757k
  return comparator.Compare(a, key);
218
757k
}
219
220
484k
Slice MemTableRep::UserKey(const char* key) const {
221
484k
  Slice slice = GetLengthPrefixedSlice(key);
222
484k
  return Slice(slice.data(), slice.size() - 8);
223
484k
}
224
225
285k
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
226
285k
  *buf = allocator_->Allocate(len);
227
285k
  return static_cast<KeyHandle>(*buf);
228
285k
}
229
230
// Encode a suitable internal key target for "target" and return it.
231
// Uses *scratch as scratch space, and the returned pointer will point
232
// into this scratch space.
233
67.2M
const char* EncodeKey(std::string* scratch, const Slice& target) {
234
67.2M
  scratch->clear();
235
67.2M
  PutVarint32(scratch, static_cast<uint32_t>(target.size()));
236
67.2M
  scratch->append(target.cdata(), target.size());
237
67.2M
  return scratch->data();
238
67.2M
}
239
240
class MemTableIterator : public InternalIterator {
241
 public:
242
  MemTableIterator(
243
      const MemTable& mem, const ReadOptions& read_options, Arena* arena)
244
      : bloom_(nullptr),
245
        prefix_extractor_(mem.prefix_extractor_),
246
        valid_(false),
247
15.4M
        arena_mode_(arena != nullptr) {
248
15.4M
    if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
249
1.65k
      bloom_ = mem.prefix_bloom_.get();
250
1.65k
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
251
15.4M
    } else {
252
15.4M
      iter_ = mem.table_->GetIterator(arena);
253
15.4M
    }
254
15.4M
  }
255
256
15.5M
  ~MemTableIterator() {
257
15.5M
    if (arena_mode_) {
258
15.5M
      iter_->~Iterator();
259
4
    } else {
260
4
      delete iter_;
261
4
    }
262
15.5M
  }
263
264
2.42G
  bool Valid() const override { return valid_; }
265
67.5M
  void Seek(const Slice& k) override {
266
67.5M
    PERF_TIMER_GUARD(seek_on_memtable_time);
267
67.5M
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
268
67.5M
    if (bloom_ != nullptr) {
269
440k
      if (!bloom_->MayContain(
270
40.0k
              prefix_extractor_->Transform(ExtractUserKey(k)))) {
271
40.0k
        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
272
40.0k
        valid_ = false;
273
40.0k
        return;
274
400k
      } else {
275
400k
        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
276
400k
      }
277
440k
    }
278
67.5M
    iter_->Seek(k, nullptr);
279
67.5M
    valid_ = iter_->Valid();
280
67.5M
  }
281
367k
  void SeekToFirst() override {
282
367k
    iter_->SeekToFirst();
283
367k
    valid_ = iter_->Valid();
284
367k
  }
285
718k
  void SeekToLast() override {
286
718k
    iter_->SeekToLast();
287
718k
    valid_ = iter_->Valid();
288
718k
  }
289
473M
  void Next() override {
290
473M
    assert(Valid());
291
473M
    iter_->Next();
292
473M
    valid_ = iter_->Valid();
293
473M
  }
294
1.17M
  void Prev() override {
295
1.17M
    assert(Valid());
296
1.17M
    iter_->Prev();
297
1.17M
    valid_ = iter_->Valid();
298
1.17M
  }
299
578M
  Slice key() const override {
300
578M
    assert(Valid());
301
578M
    return GetLengthPrefixedSlice(iter_->key());
302
578M
  }
303
462M
  Slice value() const override {
304
462M
    assert(Valid());
305
462M
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
306
462M
    return GetLengthPrefixedSlice(key_slice.cdata() + key_slice.size());
307
462M
  }
308
309
361k
  Status status() const override { return Status::OK(); }
310
311
8
  Status PinData() override {
312
    // memtable data is always pinned
313
8
    return Status::OK();
314
8
  }
315
316
0
  Status ReleasePinnedData() override {
317
    // memtable data is always pinned
318
0
    return Status::OK();
319
0
  }
320
321
437M
  bool IsKeyPinned() const override {
322
    // memtable data is always pinned
323
437M
    return true;
324
437M
  }
325
326
 private:
327
  DynamicBloom* bloom_;
328
  const SliceTransform* const prefix_extractor_;
329
  MemTableRep::Iterator* iter_;
330
  bool valid_;
331
  bool arena_mode_;
332
333
  // No copying allowed
334
  MemTableIterator(const MemTableIterator&);
335
  void operator=(const MemTableIterator&);
336
};
337
338
InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
339
15.4M
                                        Arena* arena) {
340
15.4M
  assert(arena != nullptr);
341
15.4M
  auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
342
15.4M
  return new (mem) MemTableIterator(*this, read_options, arena);
343
15.4M
}
344
345
4.53k
port::RWMutex* MemTable::GetLock(const Slice& key) {
346
4.53k
  static murmur_hash hash;
347
4.53k
  return &locks_[hash(key) % locks_.size()];
348
4.53k
}
349
350
0
std::string MemTable::ToString() const {
351
0
  ostringstream ss;
352
0
  auto* frontiers = Frontiers();
353
0
  ss << "MemTable {"
354
0
     << " num_entries: " << num_entries()
355
0
     << " num_deletes: " << num_deletes()
356
0
     << " IsEmpty: " << IsEmpty()
357
0
     << " flush_state: " << flush_state_
358
0
     << " first_seqno: " << GetFirstSequenceNumber()
359
0
     << " eariest_seqno: " << GetEarliestSequenceNumber()
360
0
     << " frontiers: ";
361
0
  if (frontiers) {
362
0
    ss << frontiers->ToString();
363
0
  } else {
364
0
    ss << "N/A";
365
0
  }
366
0
  ss << " }";
367
0
  return ss.str();
368
0
}
369
370
uint64_t MemTable::ApproximateSize(const Slice& start_ikey,
371
22
                                   const Slice& end_ikey) {
372
22
  uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
373
22
  if (entry_count == 0) {
374
13
    return 0;
375
13
  }
376
9
  uint64_t n = num_entries_.load(std::memory_order_relaxed);
377
9
  if (n == 0) {
378
0
    return 0;
379
0
  }
380
9
  if (entry_count > n) {
381
    // table_->ApproximateNumEntries() is just an estimate so it can be larger
382
    // than actual entries we have. Cap it to entries we have to limit the
383
    // inaccuracy.
384
0
    entry_count = n;
385
0
  }
386
9
  uint64_t data_size = data_size_.load(std::memory_order_relaxed);
387
9
  return entry_count * (data_size / n);
388
9
}
389
390
void MemTable::Add(SequenceNumber seq, ValueType type, const SliceParts& key,
391
36.8M
                   const SliceParts& value, bool allow_concurrent) {
392
36.8M
  PreparedAdd prepared_add;
393
36.8M
  auto handle = PrepareAdd(seq, type, key, value, &prepared_add);
394
36.8M
  ApplyPreparedAdd(&handle, 1, prepared_add, allow_concurrent);
395
36.8M
}
396
397
KeyHandle MemTable::PrepareAdd(SequenceNumber s, ValueType type,
398
                               const SliceParts& key,
399
                               const SliceParts& value,
400
150M
                               PreparedAdd* prepared_add) {
401
  // Format of an entry is concatenation of:
402
  //  key_size     : varint32 of internal_key.size()
403
  //  key bytes    : char[internal_key.size()]
404
  //  value_size   : varint32 of value.size()
405
  //  value bytes  : char[value.size()]
406
150M
  uint32_t key_size = static_cast<uint32_t>(key.SumSizes());
407
150M
  uint32_t val_size = static_cast<uint32_t>(value.SumSizes());
408
150M
  uint32_t internal_key_size = key_size + 8;
409
150M
  const uint32_t encoded_len = VarintLength(internal_key_size) +
410
150M
                               internal_key_size + VarintLength(val_size) +
411
150M
                               val_size;
412
150M
  char* buf = nullptr;
413
150M
  KeyHandle handle = table_->Allocate(encoded_len, &buf);
414
415
150M
  char* p = EncodeVarint32(buf, internal_key_size);
416
150M
  p = key.CopyAllTo(p);
417
150M
  uint64_t packed = PackSequenceAndType(s, type);
418
150M
  EncodeFixed64(p, packed);
419
150M
  p += 8;
420
150M
  p = EncodeVarint32(p, val_size);
421
150M
  p = value.CopyAllTo(p);
422
150M
  assert((unsigned)(p - buf) == (unsigned)encoded_len);
423
424
150M
  if (prefix_bloom_) {
425
400k
    assert(prefix_extractor_);
426
400k
    prefix_bloom_->Add(prefix_extractor_->Transform(key.TheOnlyPart()));
427
400k
  }
428
429
150M
  if (!prepared_add->min_seq_no) {
430
42.0M
    prepared_add->min_seq_no = s;
431
42.0M
  }
432
150M
  prepared_add->total_encoded_len += encoded_len;
433
150M
  if (type == ValueType::kTypeDeletion) {
434
1.87M
    ++prepared_add->num_deletes;
435
1.87M
  }
436
150M
  return handle;
437
150M
}
438
439
void MemTable::ApplyPreparedAdd(
440
42.0M
    const KeyHandle* handle, size_t count, const PreparedAdd& prepared_add, bool allow_concurrent) {
441
42.0M
  if (!allow_concurrent) {
442
192M
    for (const auto* end = handle + count; handle != end; ++handle) {
443
150M
      table_->Insert(*handle);
444
150M
    }
445
446
    // this is a bit ugly, but is the way to avoid locked instructions
447
    // when incrementing an atomic
448
41.5M
    num_entries_.store(num_entries_.load(std::memory_order_relaxed) + count,
449
41.5M
                       std::memory_order_relaxed);
450
41.5M
    data_size_.store(data_size_.load(std::memory_order_relaxed) + prepared_add.total_encoded_len,
451
41.5M
                     std::memory_order_relaxed);
452
41.5M
    if (prepared_add.num_deletes) {
453
1.87M
      num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + prepared_add.num_deletes,
454
1.87M
                         std::memory_order_relaxed);
455
1.87M
    }
456
457
    // The first sequence number inserted into the memtable.
458
    // Multiple occurences of the same sequence number in the write batch are allowed
459
    // as long as they touch different keys.
460
18.4E
    DCHECK(first_seqno_ == 0 || prepared_add.min_seq_no >= first_seqno_)
461
18.4E
        << "first_seqno_: " << first_seqno_ << ", prepared_add.min_seq_no: "
462
18.4E
        << prepared_add.min_seq_no;
463
464
41.5M
    if (first_seqno_ == 0) {
465
88.5k
      first_seqno_.store(prepared_add.min_seq_no, std::memory_order_relaxed);
466
467
88.5k
      if (earliest_seqno_ == kMaxSequenceNumber) {
468
3.05k
        earliest_seqno_.store(GetFirstSequenceNumber(),
469
3.05k
                              std::memory_order_relaxed);
470
3.05k
      }
471
88.5k
      DCHECK_GE(first_seqno_.load(), earliest_seqno_.load());
472
88.5k
    }
473
522k
  } else {
474
1.04M
    for (const auto* end = handle + count; handle != end; ++handle) {
475
523k
      table_->InsertConcurrently(*handle);
476
523k
    }
477
478
522k
    num_entries_.fetch_add(count, std::memory_order_relaxed);
479
522k
    data_size_.fetch_add(prepared_add.total_encoded_len, std::memory_order_relaxed);
480
522k
    if (prepared_add.num_deletes) {
481
0
      num_deletes_.fetch_add(prepared_add.num_deletes, std::memory_order_relaxed);
482
0
    }
483
484
    // atomically update first_seqno_ and earliest_seqno_.
485
522k
    uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
486
522k
    while ((cur_seq_num == 0 || prepared_add.min_seq_no < cur_seq_num) &&
487
88
           !first_seqno_.compare_exchange_weak(cur_seq_num, prepared_add.min_seq_no)) {
488
0
    }
489
522k
    uint64_t cur_earliest_seqno =
490
522k
        earliest_seqno_.load(std::memory_order_relaxed);
491
522k
    while (
492
522k
        (cur_earliest_seqno == kMaxSequenceNumber ||
493
519k
             prepared_add.min_seq_no < cur_earliest_seqno) &&
494
0
        !first_seqno_.compare_exchange_weak(cur_earliest_seqno, prepared_add.min_seq_no)) {
495
0
    }
496
522k
  }
497
498
42.0M
  UpdateFlushState();
499
42.0M
}
500
501
// This comparator is used for deciding whether to erase a found key from a memtable instead of
502
// writing a deletion mark. This is exactly what we need for erasing records in memory
503
// (without writing new deletion marks). It expects a special key consisting of the user key being
504
// erased followed by 8 0xff bytes as the first argument, and a key from a memtable as the second
505
// argument (with the usual user_key + value_type + seqno format).
506
// It returns zero if the user key parts of both arguments match and the second argument's value
507
// type is not a deletion.
508
//
509
// Note: this comparator's return value cannot be used to establish order,
510
// only to test for "equality" as defined above.
511
class EraseHelperKeyComparator : public MemTableRep::KeyComparator {
512
 public:
513
  explicit EraseHelperKeyComparator(const Comparator* user_comparator, bool* had_delete)
514
56.4M
      : user_comparator_(user_comparator), had_delete_(had_delete) {}
515
516
56.0M
  int operator()(const char* prefix_len_key1, const char* prefix_len_key2) const override {
517
    // Internal keys are encoded as length-prefixed strings.
518
56.0M
    Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
519
56.0M
    Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
520
56.0M
    return Compare(k1, k2);
521
56.0M
  }
522
523
0
  int operator()(const char* prefix_len_key, const Slice& key) const override {
524
    // Internal keys are encoded as length-prefixed strings.
525
0
    Slice a = GetLengthPrefixedSlice(prefix_len_key);
526
0
    return Compare(a, key);
527
0
  }
528
529
56.1M
  int Compare(const Slice& a, const Slice& b) const {
530
56.1M
    auto user_b = ExtractUserKey(b);
531
56.1M
    auto result = user_comparator_->Compare(ExtractUserKey(a), user_b);
532
56.1M
    if (result == 0) {
533
      // This comparator is used only to check whether we should delete the entry we found.
534
      // So any non zero result should satisfy our needs.
535
      // `b` is a value stored in mem table, so we check only it.
536
      // `a` is key that we created for erase and user key is always followed by eight 0xff.
537
55.4M
      auto value_type = static_cast<ValueType>(b[user_b.size()]);
538
55.4M
      DCHECK_LE(value_type, ValueType::kTypeColumnFamilySingleDeletion);
539
55.4M
      if (value_type == ValueType::kTypeSingleDeletion ||
540
55.4M
          value_type == ValueType::kTypeColumnFamilySingleDeletion) {
541
0
        *had_delete_ = true;
542
0
        return -1;
543
0
      }
544
56.1M
    }
545
56.1M
    return result;
546
56.1M
  }
547
548
 private:
549
  const Comparator* user_comparator_;
550
  bool* had_delete_;
551
};
552
553
56.4M
bool MemTable::Erase(const Slice& user_key) {
554
56.4M
  uint32_t user_key_size = static_cast<uint32_t>(user_key.size());
555
56.4M
  uint32_t internal_key_size = user_key_size + 8;
556
56.4M
  const uint32_t encoded_len = VarintLength(internal_key_size) + internal_key_size;
557
558
56.4M
  if (erase_key_buffer_.size() < encoded_len) {
559
61.5k
    erase_key_buffer_.resize(encoded_len);
560
61.5k
  }
561
56.4M
  char* buf = erase_key_buffer_.data();
562
56.4M
  char* p = EncodeVarint32(buf, internal_key_size);
563
56.4M
  memcpy(p, user_key.data(), user_key_size);
564
56.4M
  p += user_key_size;
565
  // Fill key tail with 0xffffffffffffffff so it we be less than actual user key.
566
  // Please note descending order is used for key tail.
567
56.4M
  EncodeFixed64(p, -1LL);
568
56.4M
  bool had_delete = false;
569
56.4M
  EraseHelperKeyComparator only_user_key_comparator(
570
56.4M
      comparator_.comparator.user_comparator(), &had_delete);
571
56.4M
  if (table_->Erase(buf, only_user_key_comparator)) {
572
    // this is a bit ugly, but is the way to avoid locked instructions
573
    // when incrementing an atomic
574
55.4M
    num_erased_.store(num_erased_.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
575
576
55.4M
    UpdateFlushState();
577
55.4M
    return true;
578
1.00M
  } else if (had_delete) { // Do nothing in case when we already had delete.
579
0
    return true;
580
0
  }
581
582
1.00M
  return false;
583
1.00M
}
584
585
// Callback from MemTable::Get()
586
namespace {
587
588
struct Saver {
589
  Status* status;
590
  const LookupKey* key;
591
  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
592
  bool* merge_in_progress;
593
  std::string* value;
594
  SequenceNumber seq;
595
  const MergeOperator* merge_operator;
596
  // the merge operations encountered;
597
  MergeContext* merge_context;
598
  MemTable* mem;
599
  Logger* logger;
600
  Statistics* statistics;
601
  bool inplace_update_support;
602
  Env* env_;
603
};
604
}  // namespace
605
606
16.0M
static bool SaveValue(void* arg, const char* entry) {
607
16.0M
  Saver* s = reinterpret_cast<Saver*>(arg);
608
16.0M
  MergeContext* merge_context = s->merge_context;
609
16.0M
  const MergeOperator* merge_operator = s->merge_operator;
610
611
16.0M
  assert(s != nullptr && merge_context != nullptr);
612
613
  // entry format is:
614
  //    klength  varint32
615
  //    userkey  char[klength-8]
616
  //    tag      uint64
617
  //    vlength  varint32
618
  //    value    char[vlength]
619
  // Check that it belongs to same user key.  We do not check the
620
  // sequence number since the Seek() call above should have skipped
621
  // all entries with overly large sequence numbers.
622
16.0M
  uint32_t key_length;
623
16.0M
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
624
16.0M
  if (s->mem->GetInternalKeyComparator().user_comparator()->Equal(
625
14.0M
          Slice(key_ptr, key_length - 8), s->key->user_key())) {
626
    // Correct user key
627
14.0M
    auto seq_and_type = UnPackSequenceAndTypeFromEnd(key_ptr + key_length);
628
14.0M
    s->seq = seq_and_type.sequence;
629
630
14.0M
    switch (seq_and_type.type) {
631
14.0M
      case kTypeValue: {
632
14.0M
        if (s->inplace_update_support) {
633
1.53k
          s->mem->GetLock(s->key->user_key())->ReadLock();
634
1.53k
        }
635
14.0M
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
636
14.0M
        *(s->status) = Status::OK();
637
14.0M
        if (*(s->merge_in_progress)) {
638
34
          assert(merge_operator);
639
34
          bool merge_success = false;
640
34
          {
641
34
            StopWatchNano timer(s->env_, s->statistics != nullptr);
642
34
            PERF_TIMER_GUARD(merge_operator_time_nanos);
643
34
            merge_success = merge_operator->FullMerge(
644
34
                s->key->user_key(), &v, merge_context->GetOperands(), s->value,
645
34
                s->logger);
646
34
            RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME,
647
34
                       timer.ElapsedNanos());
648
34
          }
649
34
          if (!merge_success) {
650
0
            RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
651
0
            *(s->status) =
652
0
                STATUS(Corruption, "Error: Could not perform merge.");
653
0
          }
654
14.0M
        } else if (s->value != nullptr) {
655
14.0M
          s->value->assign(v.cdata(), v.size());
656
14.0M
        }
657
14.0M
        if (s->inplace_update_support) {
658
1.53k
          s->mem->GetLock(s->key->user_key())->ReadUnlock();
659
1.53k
        }
660
14.0M
        *(s->found_final_value) = true;
661
14.0M
        return false;
662
0
      }
663
1.06k
      case kTypeDeletion:
664
1.12k
      case kTypeSingleDeletion: {
665
1.12k
        if (*(s->merge_in_progress)) {
666
16
          assert(merge_operator != nullptr);
667
16
          *(s->status) = Status::OK();
668
16
          bool merge_success = false;
669
16
          {
670
16
            StopWatchNano timer(s->env_, s->statistics != nullptr);
671
16
            PERF_TIMER_GUARD(merge_operator_time_nanos);
672
16
            merge_success = merge_operator->FullMerge(
673
16
                s->key->user_key(), nullptr, merge_context->GetOperands(),
674
16
                s->value, s->logger);
675
16
            RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME,
676
16
                       timer.ElapsedNanos());
677
16
          }
678
16
          if (!merge_success) {
679
0
            RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
680
0
            *(s->status) =
681
0
                STATUS(Corruption, "Error: Could not perform merge.");
682
0
          }
683
1.11k
        } else {
684
1.11k
          *(s->status) = STATUS(NotFound, "");
685
1.11k
        }
686
1.12k
        *(s->found_final_value) = true;
687
1.12k
        return false;
688
1.06k
      }
689
44.3k
      case kTypeMerge: {
690
44.3k
        if (!merge_operator) {
691
0
          *(s->status) = STATUS(InvalidArgument,
692
0
              "merge_operator is not properly initialized.");
693
          // Normally we continue the loop (return true) when we see a merge
694
          // operand.  But in case of an error, we should stop the loop
695
          // immediately and pretend we have found the value to stop further
696
          // seek.  Otherwise, the later call will override this error status.
697
0
          *(s->found_final_value) = true;
698
0
          return false;
699
0
        }
700
44.3k
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
701
44.3k
        *(s->merge_in_progress) = true;
702
44.3k
        merge_context->PushOperand(v);
703
44.3k
        return true;
704
44.3k
      }
705
0
      default:
706
0
        assert(false);
707
0
        return true;
708
2.04M
    }
709
2.04M
  }
710
711
  // s->state could be Corrupt, merge or notfound
712
2.04M
  return false;
713
2.04M
}
714
715
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
716
21.6M
                   MergeContext* merge_context, SequenceNumber* seq) {
717
  // The sequence number is updated synchronously in version_set.h
718
21.6M
  if (IsEmpty()) {
719
    // Avoiding recording stats for speed.
720
5.28M
    return false;
721
5.28M
  }
722
16.3M
  PERF_TIMER_GUARD(get_from_memtable_time);
723
724
16.3M
  Slice user_key = key.user_key();
725
16.3M
  bool found_final_value = false;
726
16.3M
  bool merge_in_progress = s->IsMergeInProgress();
727
16.3M
  bool const may_contain =
728
16.3M
      nullptr == prefix_bloom_
729
16.3M
          ? false
730
47.6k
          : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
731
16.3M
  if (prefix_bloom_ && !may_contain) {
732
    // iter is null if prefix bloom says the key does not exist
733
19
    PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
734
19
    *seq = kMaxSequenceNumber;
735
16.3M
  } else {
736
16.3M
    if (prefix_bloom_) {
737
110
      PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
738
110
    }
739
16.3M
    Saver saver;
740
16.3M
    saver.status = s;
741
16.3M
    saver.found_final_value = &found_final_value;
742
16.3M
    saver.merge_in_progress = &merge_in_progress;
743
16.3M
    saver.key = &key;
744
16.3M
    saver.value = value;
745
16.3M
    saver.seq = kMaxSequenceNumber;
746
16.3M
    saver.mem = this;
747
16.3M
    saver.merge_context = merge_context;
748
16.3M
    saver.merge_operator = moptions_.merge_operator;
749
16.3M
    saver.logger = moptions_.info_log;
750
16.3M
    saver.inplace_update_support = moptions_.inplace_update_support;
751
16.3M
    saver.statistics = moptions_.statistics;
752
16.3M
    saver.env_ = env_;
753
16.3M
    table_->Get(key, &saver, SaveValue);
754
755
16.3M
    *seq = saver.seq;
756
16.3M
  }
757
758
  // No change to value, since we have not yet found a Put/Delete
759
16.3M
  if (!found_final_value && merge_in_progress) {
760
1.55k
    *s = STATUS(MergeInProgress, "");
761
1.55k
  }
762
16.3M
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
763
16.3M
  return found_final_value;
764
16.3M
}
765
766
void MemTable::Update(SequenceNumber seq,
767
                      const Slice& key,
768
100
                      const Slice& value) {
769
100
  LookupKey lkey(key, seq);
770
100
  Slice mem_key = lkey.memtable_key();
771
772
100
  std::unique_ptr<MemTableRep::Iterator> iter(
773
100
      table_->GetDynamicPrefixIterator());
774
100
  iter->Seek(lkey.internal_key(), mem_key.cdata());
775
776
100
  if (iter->Valid()) {
777
    // entry format is:
778
    //    key_length  varint32
779
    //    userkey  char[klength-8]
780
    //    tag      uint64
781
    //    vlength  varint32
782
    //    value    char[vlength]
783
    // Check that it belongs to same user key.  We do not check the
784
    // sequence number since the Seek() call above should have skipped
785
    // all entries with overly large sequence numbers.
786
90
    const char* entry = iter->key();
787
90
    uint32_t key_length = 0;
788
90
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
789
90
    if (comparator_.comparator.user_comparator()->Equal(
790
90
            Slice(key_ptr, key_length - 8), lkey.user_key())) {
791
      // Correct user key
792
90
      auto seq_and_type = UnPackSequenceAndTypeFromEnd(key_ptr + key_length);
793
90
      switch (seq_and_type.type) {
794
90
        case kTypeValue: {
795
90
          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
796
90
          uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
797
90
          uint32_t new_size = static_cast<uint32_t>(value.size());
798
799
          // Update value, if new value size  <= previous value size
800
90
          if (new_size <= prev_size ) {
801
45
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
802
45
                                     new_size);
803
45
            WriteLock wl(GetLock(lkey.user_key()));
804
45
            memcpy(p, value.data(), value.size());
805
45
            assert((unsigned)((p + value.size()) - entry) ==
806
45
                   (unsigned)(VarintLength(key_length) + key_length +
807
45
                              VarintLength(value.size()) + value.size()));
808
45
            return;
809
45
          }
810
          // TODO (YugaByte): verify this is not a bug. The behavior for kTypeValue in case there
811
          // is not enough room for an in-place update, .
812
45
          FALLTHROUGH_INTENDED;
813
45
        }
814
45
        default:
815
          // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
816
          // we don't have enough space for update inplace
817
45
            Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1));
818
45
            return;
819
10
      }
820
10
    }
821
90
  }
822
823
  // key doesn't exist
824
10
  Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1));
825
10
}
826
827
bool MemTable::UpdateCallback(SequenceNumber seq,
828
                              const Slice& key,
829
1.44k
                              const Slice& delta) {
830
1.44k
  LookupKey lkey(key, seq);
831
1.44k
  Slice memkey = lkey.memtable_key();
832
833
1.44k
  std::unique_ptr<MemTableRep::Iterator> iter(
834
1.44k
      table_->GetDynamicPrefixIterator());
835
1.44k
  iter->Seek(lkey.internal_key(), memkey.cdata());
836
837
1.44k
  if (iter->Valid()) {
838
    // entry format is:
839
    //    key_length  varint32
840
    //    userkey  char[klength-8]
841
    //    tag      uint64
842
    //    vlength  varint32
843
    //    value    char[vlength]
844
    // Check that it belongs to same user key.  We do not check the
845
    // sequence number since the Seek() call above should have skipped
846
    // all entries with overly large sequence numbers.
847
1.42k
    const char* entry = iter->key();
848
1.42k
    uint32_t key_length = 0;
849
1.42k
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
850
1.42k
    if (comparator_.comparator.user_comparator()->Equal(
851
1.42k
            Slice(key_ptr, key_length - 8), lkey.user_key())) {
852
      // Correct user key
853
1.42k
      switch (UnPackSequenceAndTypeFromEnd(key_ptr + key_length).type) {
854
1.42k
        case kTypeValue: {
855
1.42k
          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
856
1.42k
          uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
857
858
1.42k
          char* prev_buffer = const_cast<char*>(prev_value.cdata());
859
1.42k
          uint32_t new_prev_size = prev_size;
860
861
1.42k
          std::string str_value;
862
1.42k
          WriteLock wl(GetLock(lkey.user_key()));
863
1.42k
          auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
864
1.42k
                                                   delta, &str_value);
865
1.42k
          if (status == UpdateStatus::UPDATED_INPLACE) {
866
            // Value already updated by callback.
867
1.37k
            assert(new_prev_size <= prev_size);
868
1.37k
            if (new_prev_size < prev_size) {
869
              // overwrite the new prev_size
870
55
              char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
871
55
                                       new_prev_size);
872
55
              if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
873
                // shift the value buffer as well.
874
5
                memcpy(p, prev_buffer, new_prev_size);
875
5
              }
876
55
            }
877
1.37k
            RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
878
1.37k
            UpdateFlushState();
879
1.37k
            return true;
880
45
          } else if (status == UpdateStatus::UPDATED) {
881
45
            Slice value(str_value);
882
45
            Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1));
883
45
            RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
884
45
            UpdateFlushState();
885
45
            return true;
886
0
          } else if (status == UpdateStatus::UPDATE_FAILED) {
887
            // No action required. Return.
888
0
            UpdateFlushState();
889
0
            return true;
890
0
          }
891
0
          FALLTHROUGH_INTENDED;
892
0
        }
893
0
        default:
894
0
          break;
895
20
      }
896
20
    }
897
1.42k
  }
898
  // If the latest value is not kTypeValue
899
  // or key doesn't exist
900
20
  return false;
901
20
}
902
903
134
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
904
134
  Slice memkey = key.memtable_key();
905
906
  // A total ordered iterator is costly for some memtablerep (prefix aware
907
  // reps). By passing in the user key, we allow efficient iterator creation.
908
  // The iterator only needs to be ordered within the same user key.
909
134
  std::unique_ptr<MemTableRep::Iterator> iter(
910
134
      table_->GetDynamicPrefixIterator());
911
134
  iter->Seek(key.internal_key(), memkey.cdata());
912
913
134
  size_t num_successive_merges = 0;
914
915
446
  for (; iter->Valid(); iter->Next()) {
916
446
    const char* entry = iter->key();
917
446
    uint32_t key_length = 0;
918
446
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
919
446
    if (!comparator_.comparator.user_comparator()->Equal(
920
12
            Slice(iter_key_ptr, key_length - 8), key.user_key())) {
921
12
      break;
922
12
    }
923
924
434
    if (UnPackSequenceAndTypeFromEnd(iter_key_ptr + key_length).type != kTypeMerge) {
925
122
      break;
926
122
    }
927
928
312
    ++num_successive_merges;
929
312
  }
930
931
134
  return num_successive_merges;
932
134
}
933
934
427k
UserFrontierPtr MemTable::GetFrontier(UpdateUserValueType type) const {
935
427k
  std::lock_guard<SpinMutex> l(frontiers_mutex_);
936
427k
  if (!frontiers_) {
937
0
    return nullptr;
938
0
  }
939
940
427k
  switch (type) {
941
820
    case UpdateUserValueType::kSmallest:
942
820
      return frontiers_->Smallest().Clone();
943
426k
    case UpdateUserValueType::kLargest:
944
426k
      return frontiers_->Largest().Clone();
945
0
  }
946
947
0
  FATAL_INVALID_ENUM_VALUE(UpdateUserValueType, type);
948
0
}
949
950
void MemTableRep::Get(const LookupKey& k, void* callback_args,
951
0
                      bool (*callback_func)(void* arg, const char* entry)) {
952
0
  auto iter = GetDynamicPrefixIterator();
953
0
  for (iter->Seek(k.internal_key(), k.memtable_key().cdata());
954
0
       iter->Valid() && callback_func(callback_args, iter->key());
955
0
       iter->Next()) {
956
0
  }
957
0
}
958
959
}  // namespace rocksdb