YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/memtable.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#ifndef YB_ROCKSDB_DB_MEMTABLE_H
25
#define YB_ROCKSDB_DB_MEMTABLE_H
26
27
#pragma once
28
29
#include <atomic>
30
#include <deque>
31
#include <functional>
32
#include <memory>
33
#include <string>
34
#include <vector>
35
36
#include "yb/rocksdb/db.h"
37
#include "yb/rocksdb/db/dbformat.h"
38
#include "yb/rocksdb/db/file_numbers.h"
39
#include "yb/rocksdb/db/memtable_allocator.h"
40
#include "yb/rocksdb/db/version_edit.h"
41
#include "yb/rocksdb/env.h"
42
#include "yb/rocksdb/immutable_options.h"
43
#include "yb/rocksdb/memtablerep.h"
44
#include "yb/rocksdb/util/concurrent_arena.h"
45
#include "yb/rocksdb/util/dynamic_bloom.h"
46
#include "yb/rocksdb/util/mutable_cf_options.h"
47
48
namespace yb {
49
50
class MemTracker;
51
52
}
53
54
namespace rocksdb {
55
56
class Mutex;
57
class MemTableIterator;
58
class MergeContext;
59
class WriteBuffer;
60
class InternalIterator;
61
62
struct MemTableOptions {
63
  explicit MemTableOptions(
64
      const ImmutableCFOptions& ioptions,
65
      const MutableCFOptions& mutable_cf_options);
66
  size_t write_buffer_size;
67
  size_t arena_block_size;
68
  uint32_t memtable_prefix_bloom_bits;
69
  uint32_t memtable_prefix_bloom_probes;
70
  size_t memtable_prefix_bloom_huge_page_tlb_size;
71
  bool inplace_update_support;
72
  size_t inplace_update_num_locks;
73
  UpdateStatus (*inplace_callback)(char* existing_value,
74
                                   uint32_t* existing_value_size,
75
                                   Slice delta_value,
76
                                   std::string* merged_value);
77
  size_t max_successive_merges;
78
  bool filter_deletes;
79
  Statistics* statistics;
80
  MergeOperator* merge_operator;
81
  Logger* info_log;
82
  std::shared_ptr<yb::MemTracker> mem_tracker;
83
};
84
85
YB_DEFINE_ENUM(FlushState, (kNotRequested)(kRequested)(kScheduled));
86
87
struct PreparedAdd {
88
  SequenceNumber min_seq_no = 0;
89
  size_t total_encoded_len = 0;
90
  size_t num_deletes = 0;
91
};
92
93
// Note:  Many of the methods in this class have comments indicating that
94
// external synchromization is required as these methods are not thread-safe.
95
// It is up to higher layers of code to decide how to prevent concurrent
96
// invokation of these methods.  This is usually done by acquiring either
97
// the db mutex or the single writer thread.
98
//
99
// Some of these methods are documented to only require external
100
// synchronization if this memtable is immutable.  Calling MarkImmutable() is
101
// not sufficient to guarantee immutability.  It is up to higher layers of
102
// code to determine if this MemTable can still be modified by other threads.
103
// Eg: The Superversion stores a pointer to the current MemTable (that can
104
// be modified) and a separate list of the MemTables that can no longer be
105
// written to (aka the 'immutable memtables').
106
class MemTable {
107
 public:
108
  struct KeyComparator : public MemTableRep::KeyComparator {
109
    const InternalKeyComparator comparator;
110
478k
    explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
111
    virtual int operator()(const char* prefix_len_key1,
112
                           const char* prefix_len_key2) const override;
113
    virtual int operator()(const char* prefix_len_key,
114
                           const Slice& key) const override;
115
  };
116
117
  // MemTables are reference counted.  The initial reference count
118
  // is zero and the caller must call Ref() at least once.
119
  //
120
  // earliest_seq should be the current SequenceNumber in the db such that any
121
  // key inserted into this memtable will have an equal or larger seq number.
122
  // (When a db is first created, the earliest sequence number will be 0).
123
  // If the earliest sequence number is not known, kMaxSequenceNumber may be
124
  // used, but this may prevent some transactions from succeeding until the
125
  // first key is inserted into the memtable.
126
  explicit MemTable(const InternalKeyComparator& comparator,
127
                    const ImmutableCFOptions& ioptions,
128
                    const MutableCFOptions& mutable_cf_options,
129
                    WriteBuffer* write_buffer, SequenceNumber earliest_seq);
130
131
  // Do not delete this MemTable unless Unref() indicates it not in use.
132
  ~MemTable();
133
134
  // Increase reference count.
135
  // REQUIRES: external synchronization to prevent simultaneous
136
  // operations on the same MemTable.
137
1.75M
  void Ref() { ++refs_; }
138
139
  // Drop reference count.
140
  // If the refcount goes to zero return this memtable, otherwise return null.
141
  // REQUIRES: external synchronization to prevent simultaneous
142
  // operations on the same MemTable.
143
1.67M
  MemTable* Unref() {
144
1.67M
    --refs_;
145
1.67M
    DCHECK_GE(refs_, 0);
146
1.67M
    if (refs_ <= 0) {
147
438k
      return this;
148
438k
    }
149
1.23M
    return nullptr;
150
1.67M
  }
151
152
  // Returns an estimate of the number of bytes of data in use by this
153
  // data structure.
154
  //
155
  // REQUIRES: external synchronization to prevent simultaneous
156
  // operations on the same MemTable (unless this Memtable is immutable).
157
  size_t ApproximateMemoryUsage();
158
159
  // This method heuristically determines if the memtable should continue to
160
  // host more data.
161
49.9M
  bool ShouldScheduleFlush() const {
162
49.9M
    return flush_state_.load(std::memory_order_relaxed) == FlushState::kRequested;
163
49.9M
  }
164
165
  // Returns true if a flush should be scheduled and the caller should
166
  // be the one to schedule it
167
12.1k
  bool MarkFlushScheduled() {
168
12.1k
    auto before = FlushState::kRequested;
169
12.1k
    return flush_state_.compare_exchange_strong(before,
170
12.1k
                                                FlushState::kScheduled,
171
12.1k
                                                std::memory_order_relaxed,
172
12.1k
                                                std::memory_order_relaxed);
173
12.1k
  }
174
175
  // Return an iterator that yields the contents of the memtable.
176
  //
177
  // The caller must ensure that the underlying MemTable remains live
178
  // while the returned iterator is live.  The keys returned by this
179
  // iterator are internal keys encoded by AppendInternalKey in the
180
  // db/dbformat.{h,cc} module.
181
  //
182
  // By default, it returns an iterator for prefix seek if prefix_extractor
183
  // is configured in Options.
184
  // arena: If not null, the arena needs to be used to allocate the Iterator.
185
  //        Calling ~Iterator of the iterator will destroy all the states but
186
  //        those allocated in arena.
187
  InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
188
189
  // Add an entry into memtable that maps key to value at the
190
  // specified sequence number and with the specified type.
191
  // Typically value will be empty if type==kTypeDeletion.
192
  //
193
  // REQUIRES: if allow_concurrent = false, external synchronization to prevent
194
  // simultaneous operations on the same MemTable.
195
  void Add(SequenceNumber seq, ValueType type, const SliceParts& key,
196
           const SliceParts& value, bool allow_concurrent = false);
197
198
  KeyHandle PrepareAdd(
199
      SequenceNumber s, ValueType type, const SliceParts& key, const SliceParts& value,
200
      PreparedAdd* prepared_add);
201
202
  void ApplyPreparedAdd(
203
      const KeyHandle* handle, size_t count, const PreparedAdd& prepared_add,
204
      bool allow_concurrent);
205
206
  // If memtable contains a value for key, store it in *value and return true.
207
  // If memtable contains a deletion for key, store a NotFound() error
208
  // in *status and return true.
209
  // If memtable contains Merge operation as the most recent entry for a key,
210
  //   and the merge process does not stop (not reaching a value or delete),
211
  //   prepend the current merge operand to *operands.
212
  //   store MergeInProgress in s, and return false.
213
  // Else, return false.
214
  // If any operation was found, its most recent sequence number
215
  // will be stored in *seq on success (regardless of whether true/false is
216
  // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.
217
  // On success, *s may be set to OK, NotFound, or MergeInProgress.  Any other
218
  // status returned indicates a corruption or other unexpected error.
219
  bool Get(const LookupKey& key, std::string* value, Status* s,
220
           MergeContext* merge_context, SequenceNumber* seq);
221
222
  bool Get(const LookupKey& key, std::string* value, Status* s,
223
21.1M
           MergeContext* merge_context) {
224
21.1M
    SequenceNumber seq;
225
21.1M
    return Get(key, value, s, merge_context, &seq);
226
21.1M
  }
227
228
  // Attempts to update the new_value inplace, else does normal Add
229
  // Pseudocode
230
  //   if key exists in current memtable && prev_value is of type kTypeValue
231
  //     if new sizeof(new_value) <= sizeof(prev_value)
232
  //       update inplace
233
  //     else add(key, new_value)
234
  //   else add(key, new_value)
235
  //
236
  // REQUIRES: external synchronization to prevent simultaneous
237
  // operations on the same MemTable.
238
  void Update(SequenceNumber seq,
239
              const Slice& key,
240
              const Slice& value);
241
242
  bool Erase(const Slice& key);
243
244
  // If prev_value for key exists, attempts to update it inplace.
245
  // else returns false
246
  // Pseudocode
247
  //   if key exists in current memtable && prev_value is of type kTypeValue
248
  //     new_value = delta(prev_value)
249
  //     if sizeof(new_value) <= sizeof(prev_value)
250
  //       update inplace
251
  //     else add(key, new_value)
252
  //   else return false
253
  //
254
  // REQUIRES: external synchronization to prevent simultaneous
255
  // operations on the same MemTable.
256
  bool UpdateCallback(SequenceNumber seq,
257
                      const Slice& key,
258
                      const Slice& delta);
259
260
  // Returns the number of successive merge entries starting from the newest
261
  // entry for the key up to the last non-merge entry or last entry for the
262
  // key in the memtable.
263
  size_t CountSuccessiveMergeEntries(const LookupKey& key);
264
265
  // Get total number of entries in the mem table.
266
  // REQUIRES: external synchronization to prevent simultaneous
267
  // operations on the same MemTable (unless this Memtable is immutable).
268
29.2k
  uint64_t num_entries() const {
269
29.2k
    return num_entries_.load(std::memory_order_relaxed);
270
29.2k
  }
271
272
  // Get total number of deletes in the mem table.
273
  // REQUIRES: external synchronization to prevent simultaneous
274
  // operations on the same MemTable (unless this Memtable is immutable).
275
29.1k
  uint64_t num_deletes() const {
276
29.1k
    return num_deletes_.load(std::memory_order_relaxed);
277
29.1k
  }
278
279
  // Returns the edits area that is needed for flushing the memtable
280
29.0k
  VersionEdit* GetEdits() { return &edit_; }
281
282
  // Returns if there is no entry inserted to the mem table.
283
  // REQUIRES: external synchronization to prevent simultaneous
284
  // operations on the same MemTable (unless this Memtable is immutable).
285
99.6M
  bool IsEmpty() const { return first_seqno_ == 0; }
286
287
  // Returns the sequence number of the first element that was inserted
288
  // into the memtable.
289
  // REQUIRES: external synchronization to prevent simultaneous
290
  // operations on the same MemTable (unless this Memtable is immutable).
291
42.2k
  SequenceNumber GetFirstSequenceNumber() const {
292
42.2k
    return first_seqno_.load(std::memory_order_relaxed);
293
42.2k
  }
294
295
  // Returns the sequence number that is guaranteed to be smaller than or equal
296
  // to the sequence number of any key that could be inserted into this
297
  // memtable. It can then be assumed that any write with a larger(or equal)
298
  // sequence number will be present in this memtable or a later memtable.
299
  //
300
  // If the earliest sequence number could not be determined,
301
  // kMaxSequenceNumber will be returned.
302
270
  SequenceNumber GetEarliestSequenceNumber() const {
303
270
    return earliest_seqno_.load(std::memory_order_relaxed);
304
270
  }
305
306
  // Returns the next active logfile number when this memtable is about to
307
  // be flushed to storage
308
  // REQUIRES: external synchronization to prevent simultaneous
309
  // operations on the same MemTable.
310
57.6k
  uint64_t GetNextLogNumber() const { return mem_next_logfile_number_; }
311
312
  // Sets the next active logfile number when this memtable is about to
313
  // be flushed to storage
314
  // REQUIRES: external synchronization to prevent simultaneous
315
  // operations on the same MemTable.
316
29.0k
  void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
317
318
29.0k
  void SetFlushStartTime(std::chrono::steady_clock::time_point value) {
319
29.0k
    flush_start_time_ = value;
320
29.0k
  }
321
322
5.34k
  std::chrono::steady_clock::time_point FlushStartTime() const {
323
5.34k
    return flush_start_time_;
324
5.34k
  }
325
326
  // Notify the underlying storage that no more items will be added.
327
  // REQUIRES: external synchronization to prevent simultaneous
328
  // operations on the same MemTable.
329
  // After MarkImmutable() is called, you should not attempt to
330
  // write anything to this MemTable().  (Ie. do not call Add() or Update()).
331
29.0k
  void MarkImmutable() {
332
29.0k
    table_->MarkReadOnly();
333
29.0k
    allocator_.DoneAllocating();
334
29.0k
  }
335
336
  // return true if the current MemTableRep supports merge operator.
337
530
  bool IsMergeOperatorSupported() const {
338
530
    return table_->IsMergeOperatorSupported();
339
530
  }
340
341
  // return true if the current MemTableRep supports snapshots.
342
  // inplace update prevents snapshots,
343
439k
  bool IsSnapshotSupported() const {
344
439k
    return table_->IsSnapshotSupported() && 
!moptions_.inplace_update_support439k
;
345
439k
  }
346
347
  uint64_t ApproximateSize(const Slice& start_ikey, const Slice& end_ikey);
348
349
  // Get the lock associated for the key
350
  port::RWMutex* GetLock(const Slice& key);
351
352
25.3M
  const InternalKeyComparator& GetInternalKeyComparator() const {
353
25.3M
    return comparator_.comparator;
354
25.3M
  }
355
356
37.8M
  const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
357
358
11.6M
  void UpdateFrontiers(const UserFrontiers& value) {
359
11.6M
    std::lock_guard<SpinMutex> l(frontiers_mutex_);
360
11.6M
    if (frontiers_) {
361
11.5M
      frontiers_->MergeFrontiers(value);
362
11.5M
    } else {
363
91.6k
      frontiers_ = value.Clone();
364
91.6k
    }
365
11.6M
  }
366
367
  UserFrontierPtr GetFrontier(UpdateUserValueType type) const;
368
369
97.0k
  const UserFrontiers* Frontiers() const { return frontiers_.get(); }
370
371
  std::string ToString() const;
372
373
0
  bool FullyErased() const {
374
0
    return num_entries_.load(std::memory_order_acquire) ==
375
0
           num_erased_.load(std::memory_order_acquire);
376
0
  }
377
378
 private:
379
380
  friend class MemTableIterator;
381
  friend class MemTableBackwardIterator;
382
  friend class MemTableList;
383
384
  KeyComparator comparator_;
385
  const MemTableOptions moptions_;
386
  int refs_;
387
  const size_t kArenaBlockSize;
388
  ConcurrentArena arena_;
389
  MemTableAllocator allocator_;
390
  unique_ptr<MemTableRep> table_;
391
392
  // Total data size of all data inserted
393
  std::atomic<uint64_t> data_size_;
394
  std::atomic<uint64_t> num_entries_;
395
  std::atomic<uint64_t> num_deletes_;
396
  std::atomic<uint64_t> num_erased_{0};
397
398
  // These are used to manage memtable flushes to storage
399
  bool flush_in_progress_;        // started the flush
400
  bool flush_completed_;          // finished the flush
401
  uint64_t file_number_;          // filled up after flush is complete
402
  // Filled up after flush is complete to prevent file from being deleted util it is added into the
403
  // VersionSet.
404
  FileNumbersHolder file_number_holder_;
405
406
  // The updates to be applied to the transaction log when this
407
  // memtable is flushed to storage.
408
  VersionEdit edit_;
409
410
  // The sequence number of the kv that was inserted first
411
  std::atomic<SequenceNumber> first_seqno_;
412
413
  // The db sequence number at the time of creation or kMaxSequenceNumber
414
  // if not set.
415
  std::atomic<SequenceNumber> earliest_seqno_;
416
417
  // The log files earlier than this number can be deleted.
418
  uint64_t mem_next_logfile_number_;
419
420
  std::chrono::steady_clock::time_point flush_start_time_;
421
422
  // rw locks for inplace updates
423
  std::vector<port::RWMutex> locks_;
424
425
  const SliceTransform* const prefix_extractor_;
426
  std::unique_ptr<DynamicBloom> prefix_bloom_;
427
428
  std::atomic<FlushState> flush_state_;
429
430
  Env* env_;
431
432
  mutable SpinMutex frontiers_mutex_;
433
  std::unique_ptr<UserFrontiers> frontiers_;
434
435
  // Returns a heuristic flush decision
436
  bool ShouldFlushNow() const;
437
438
  // Updates flush_state_ using ShouldFlushNow()
439
  void UpdateFlushState();
440
441
  std::vector<char> erase_key_buffer_;
442
443
  // No copying allowed
444
  MemTable(const MemTable&) = delete;
445
  MemTable& operator=(const MemTable&) = delete;
446
};
447
448
extern const char* EncodeKey(std::string* scratch, const Slice& target);
449
450
}  // namespace rocksdb
451
452
#endif // YB_ROCKSDB_DB_MEMTABLE_H