/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/memtable.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/memtable.h" |
25 | | |
26 | | #include <algorithm> |
27 | | #include <limits> |
28 | | |
29 | | #include "yb/rocksdb/db/dbformat.h" |
30 | | #include "yb/rocksdb/db/merge_context.h" |
31 | | #include "yb/rocksdb/env.h" |
32 | | #include "yb/rocksdb/iterator.h" |
33 | | #include "yb/rocksdb/merge_operator.h" |
34 | | #include "yb/rocksdb/slice_transform.h" |
35 | | #include "yb/rocksdb/table/internal_iterator.h" |
36 | | #include "yb/rocksdb/util/arena.h" |
37 | | #include "yb/rocksdb/util/coding.h" |
38 | | #include "yb/rocksdb/util/murmurhash.h" |
39 | | #include "yb/rocksdb/util/mutexlock.h" |
40 | | #include "yb/rocksdb/util/perf_context_imp.h" |
41 | | #include "yb/rocksdb/util/statistics.h" |
42 | | #include "yb/rocksdb/util/stop_watch.h" |
43 | | |
44 | | #include "yb/util/mem_tracker.h" |
45 | | #include "yb/util/stats/perf_step_timer.h" |
46 | | |
47 | | using std::ostringstream; |
48 | | |
49 | | namespace rocksdb { |
50 | | |
51 | | MemTableOptions::MemTableOptions( |
52 | | const ImmutableCFOptions& ioptions, |
53 | | const MutableCFOptions& mutable_cf_options) |
54 | | : write_buffer_size(mutable_cf_options.write_buffer_size), |
55 | | arena_block_size(mutable_cf_options.arena_block_size), |
56 | | memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits), |
57 | | memtable_prefix_bloom_probes( |
58 | | mutable_cf_options.memtable_prefix_bloom_probes), |
59 | | memtable_prefix_bloom_huge_page_tlb_size( |
60 | | mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size), |
61 | | inplace_update_support(ioptions.inplace_update_support), |
62 | | inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks), |
63 | | inplace_callback(ioptions.inplace_callback), |
64 | | max_successive_merges(mutable_cf_options.max_successive_merges), |
65 | | filter_deletes(mutable_cf_options.filter_deletes), |
66 | | statistics(ioptions.statistics), |
67 | | merge_operator(ioptions.merge_operator), |
68 | 477k | info_log(ioptions.info_log) { |
69 | 477k | if (ioptions.mem_tracker) { |
70 | 425k | mem_tracker = yb::MemTracker::FindOrCreateTracker("MemTable", ioptions.mem_tracker); |
71 | 425k | } |
72 | 477k | } |
73 | | |
74 | | MemTable::MemTable(const InternalKeyComparator& cmp, |
75 | | const ImmutableCFOptions& ioptions, |
76 | | const MutableCFOptions& mutable_cf_options, |
77 | | WriteBuffer* write_buffer, SequenceNumber earliest_seq) |
78 | | : comparator_(cmp), |
79 | | moptions_(ioptions, mutable_cf_options), |
80 | | refs_(0), |
81 | | kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), |
82 | | arena_(moptions_.arena_block_size, 0), |
83 | | allocator_(&arena_, write_buffer), |
84 | | table_(ioptions.memtable_factory->CreateMemTableRep( |
85 | | comparator_, &allocator_, ioptions.prefix_extractor, |
86 | | ioptions.info_log)), |
87 | | data_size_(0), |
88 | | num_entries_(0), |
89 | | num_deletes_(0), |
90 | | flush_in_progress_(false), |
91 | | flush_completed_(false), |
92 | | file_number_(0), |
93 | | first_seqno_(0), |
94 | | earliest_seqno_(earliest_seq), |
95 | | mem_next_logfile_number_(0), |
96 | | locks_(moptions_.inplace_update_support |
97 | | ? moptions_.inplace_update_num_locks |
98 | | : 0), |
99 | | prefix_extractor_(ioptions.prefix_extractor), |
100 | | flush_state_(FlushState::kNotRequested), |
101 | 478k | env_(ioptions.env) { |
102 | 478k | UpdateFlushState(); |
103 | | // something went wrong if we need to flush before inserting anything |
104 | 478k | assert(!ShouldScheduleFlush()); |
105 | | |
106 | 478k | if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 04.13k ) { |
107 | 48 | prefix_bloom_.reset(new DynamicBloom( |
108 | 48 | &allocator_, |
109 | 48 | moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality, |
110 | 48 | moptions_.memtable_prefix_bloom_probes, nullptr, |
111 | 48 | moptions_.memtable_prefix_bloom_huge_page_tlb_size, |
112 | 48 | ioptions.info_log)); |
113 | 48 | } |
114 | | |
115 | 478k | if (moptions_.mem_tracker) { |
116 | 425k | arena_.SetMemTracker(moptions_.mem_tracker); |
117 | 425k | } |
118 | 478k | } |
119 | | |
120 | 439k | MemTable::~MemTable() { DCHECK_EQ(refs_, 0); } |
121 | | |
122 | 117k | size_t MemTable::ApproximateMemoryUsage() { |
123 | 117k | size_t arena_usage = arena_.ApproximateMemoryUsage(); |
124 | 117k | size_t table_usage = table_->ApproximateMemoryUsage(); |
125 | | // let MAX_USAGE = std::numeric_limits<size_t>::max() |
126 | | // then if arena_usage + total_usage >= MAX_USAGE, return MAX_USAGE. |
127 | | // the following variation is to avoid numeric overflow. |
128 | 117k | if (arena_usage >= std::numeric_limits<size_t>::max() - table_usage) { |
129 | 0 | return std::numeric_limits<size_t>::max(); |
130 | 0 | } |
131 | | // otherwise, return the actual usage |
132 | 117k | return arena_usage + table_usage; |
133 | 117k | } |
134 | | |
135 | 181M | bool MemTable::ShouldFlushNow() const { |
136 | | // In a lot of times, we cannot allocate arena blocks that exactly matches the |
137 | | // buffer size. Thus we have to decide if we should over-allocate or |
138 | | // under-allocate. |
139 | | // This constant variable can be interpreted as: if we still have more than |
140 | | // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over |
141 | | // allocate one more block. |
142 | 181M | const double kAllowOverAllocationRatio = 0.6; |
143 | | |
144 | | // If arena still have room for new block allocation, we can safely say it |
145 | | // shouldn't flush. |
146 | 181M | auto allocated_memory = |
147 | 181M | table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes(); |
148 | | |
149 | | // if we can still allocate one more block without exceeding the |
150 | | // over-allocation ratio, then we should not flush. |
151 | 181M | if (allocated_memory + kArenaBlockSize < |
152 | 181M | moptions_.write_buffer_size + |
153 | 181M | kArenaBlockSize * kAllowOverAllocationRatio) { |
154 | 180M | return false; |
155 | 180M | } |
156 | | |
157 | | // if user keeps adding entries that exceeds moptions.write_buffer_size, |
158 | | // we need to flush earlier even though we still have much available |
159 | | // memory left. |
160 | 770k | if (allocated_memory > moptions_.write_buffer_size + |
161 | 770k | kArenaBlockSize * kAllowOverAllocationRatio) { |
162 | 3.58k | return true; |
163 | 3.58k | } |
164 | | |
165 | | // In this code path, Arena has already allocated its "last block", which |
166 | | // means the total allocatedmemory size is either: |
167 | | // (1) "moderately" over allocated the memory (no more than `0.6 * arena |
168 | | // block size`. Or, |
169 | | // (2) the allocated memory is less than write buffer size, but we'll stop |
170 | | // here since if we allocate a new arena block, we'll over allocate too much |
171 | | // more (half of the arena block size) memory. |
172 | | // |
173 | | // In either case, to avoid over-allocate, the last block will stop allocation |
174 | | // when its usage reaches a certain ratio, which we carefully choose "0.75 |
175 | | // full" as the stop condition because it addresses the following issue with |
176 | | // great simplicity: What if the next inserted entry's size is |
177 | | // bigger than AllocatedAndUnused()? |
178 | | // |
179 | | // The answer is: if the entry size is also bigger than 0.25 * |
180 | | // kArenaBlockSize, a dedicated block will be allocated for it; otherwise |
181 | | // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty |
182 | | // and regular block. In either case, we *overly* over-allocated. |
183 | | // |
184 | | // Therefore, setting the last block to be at most "0.75 full" avoids both |
185 | | // cases. |
186 | | // |
187 | | // NOTE: the average percentage of waste space of this approach can be counted |
188 | | // as: "arena block size * 0.25 / write buffer size". User who specify a small |
189 | | // write buffer size and/or big arena block size may suffer. |
190 | 767k | return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; |
191 | 770k | } |
192 | | |
193 | 184M | void MemTable::UpdateFlushState() { |
194 | 184M | auto state = flush_state_.load(std::memory_order_relaxed); |
195 | 184M | if (state == FlushState::kNotRequested && ShouldFlushNow()181M ) { |
196 | | // ignore CAS failure, because that means somebody else requested |
197 | | // a flush |
198 | 12.1k | flush_state_.compare_exchange_strong(state, FlushState::kRequested, |
199 | 12.1k | std::memory_order_relaxed, |
200 | 12.1k | std::memory_order_relaxed); |
201 | 12.1k | } |
202 | 184M | } |
203 | | |
204 | | int MemTable::KeyComparator::operator()(const char* prefix_len_key1, |
205 | 20.0G | const char* prefix_len_key2) const { |
206 | | // Internal keys are encoded as length-prefixed strings. |
207 | 20.0G | Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); |
208 | 20.0G | Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); |
209 | 20.0G | return comparator.Compare(k1, k2); |
210 | 20.0G | } |
211 | | |
212 | | int MemTable::KeyComparator::operator()(const char* prefix_len_key, |
213 | | const Slice& key) |
214 | 757k | const { |
215 | | // Internal keys are encoded as length-prefixed strings. |
216 | 757k | Slice a = GetLengthPrefixedSlice(prefix_len_key); |
217 | 757k | return comparator.Compare(a, key); |
218 | 757k | } |
219 | | |
220 | 494k | Slice MemTableRep::UserKey(const char* key) const { |
221 | 494k | Slice slice = GetLengthPrefixedSlice(key); |
222 | 494k | return Slice(slice.data(), slice.size() - 8); |
223 | 494k | } |
224 | | |
225 | 291k | KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { |
226 | 291k | *buf = allocator_->Allocate(len); |
227 | 291k | return static_cast<KeyHandle>(*buf); |
228 | 291k | } |
229 | | |
230 | | // Encode a suitable internal key target for "target" and return it. |
231 | | // Uses *scratch as scratch space, and the returned pointer will point |
232 | | // into this scratch space. |
233 | 184M | const char* EncodeKey(std::string* scratch, const Slice& target) { |
234 | 184M | scratch->clear(); |
235 | 184M | PutVarint32(scratch, static_cast<uint32_t>(target.size())); |
236 | 184M | scratch->append(target.cdata(), target.size()); |
237 | 184M | return scratch->data(); |
238 | 184M | } |
239 | | |
240 | | class MemTableIterator : public InternalIterator { |
241 | | public: |
242 | | MemTableIterator( |
243 | | const MemTable& mem, const ReadOptions& read_options, Arena* arena) |
244 | | : bloom_(nullptr), |
245 | | prefix_extractor_(mem.prefix_extractor_), |
246 | | valid_(false), |
247 | 38.5M | arena_mode_(arena != nullptr) { |
248 | 38.5M | if (prefix_extractor_ != nullptr && !read_options.total_order_seek5.05k ) { |
249 | 1.65k | bloom_ = mem.prefix_bloom_.get(); |
250 | 1.65k | iter_ = mem.table_->GetDynamicPrefixIterator(arena); |
251 | 38.5M | } else { |
252 | 38.5M | iter_ = mem.table_->GetIterator(arena); |
253 | 38.5M | } |
254 | 38.5M | } |
255 | | |
256 | 38.5M | ~MemTableIterator() { |
257 | 38.5M | if (arena_mode_38.5M ) { |
258 | 38.5M | iter_->~Iterator(); |
259 | 18.4E | } else { |
260 | 18.4E | delete iter_; |
261 | 18.4E | } |
262 | 38.5M | } |
263 | | |
264 | 4.00G | bool Valid() const override { return valid_; } |
265 | 184M | void Seek(const Slice& k) override { |
266 | 184M | PERF_TIMER_GUARD(seek_on_memtable_time); |
267 | 184M | PERF_COUNTER_ADD(seek_on_memtable_count, 1); |
268 | 184M | if (bloom_ != nullptr) { |
269 | 440k | if (!bloom_->MayContain( |
270 | 440k | prefix_extractor_->Transform(ExtractUserKey(k)))) { |
271 | 40.0k | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
272 | 40.0k | valid_ = false; |
273 | 40.0k | return; |
274 | 400k | } else { |
275 | 400k | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
276 | 400k | } |
277 | 440k | } |
278 | 184M | iter_->Seek(k, nullptr); |
279 | 184M | valid_ = iter_->Valid(); |
280 | 184M | } |
281 | 409k | void SeekToFirst() override { |
282 | 409k | iter_->SeekToFirst(); |
283 | 409k | valid_ = iter_->Valid(); |
284 | 409k | } |
285 | 1.52M | void SeekToLast() override { |
286 | 1.52M | iter_->SeekToLast(); |
287 | 1.52M | valid_ = iter_->Valid(); |
288 | 1.52M | } |
289 | 473M | void Next() override { |
290 | 473M | assert(Valid()); |
291 | 0 | iter_->Next(); |
292 | 473M | valid_ = iter_->Valid(); |
293 | 473M | } |
294 | 2.09M | void Prev() override { |
295 | 2.09M | assert(Valid()); |
296 | 0 | iter_->Prev(); |
297 | 2.09M | valid_ = iter_->Valid(); |
298 | 2.09M | } |
299 | 770M | Slice key() const override { |
300 | 770M | assert(Valid()); |
301 | 0 | return GetLengthPrefixedSlice(iter_->key()); |
302 | 770M | } |
303 | 1.17G | Slice value() const override { |
304 | 1.17G | assert(Valid()); |
305 | 0 | Slice key_slice = GetLengthPrefixedSlice(iter_->key()); |
306 | 1.17G | return GetLengthPrefixedSlice(key_slice.cdata() + key_slice.size()); |
307 | 1.17G | } |
308 | | |
309 | 364k | Status status() const override { return Status::OK(); } |
310 | | |
311 | 8 | Status PinData() override { |
312 | | // memtable data is always pinned |
313 | 8 | return Status::OK(); |
314 | 8 | } |
315 | | |
316 | 0 | Status ReleasePinnedData() override { |
317 | | // memtable data is always pinned |
318 | 0 | return Status::OK(); |
319 | 0 | } |
320 | | |
321 | 356M | bool IsKeyPinned() const override { |
322 | | // memtable data is always pinned |
323 | 356M | return true; |
324 | 356M | } |
325 | | |
326 | | private: |
327 | | DynamicBloom* bloom_; |
328 | | const SliceTransform* const prefix_extractor_; |
329 | | MemTableRep::Iterator* iter_; |
330 | | bool valid_; |
331 | | bool arena_mode_; |
332 | | |
333 | | // No copying allowed |
334 | | MemTableIterator(const MemTableIterator&); |
335 | | void operator=(const MemTableIterator&); |
336 | | }; |
337 | | |
338 | | InternalIterator* MemTable::NewIterator(const ReadOptions& read_options, |
339 | 38.5M | Arena* arena) { |
340 | 38.5M | assert(arena != nullptr); |
341 | 0 | auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); |
342 | 38.5M | return new (mem) MemTableIterator(*this, read_options, arena); |
343 | 38.5M | } |
344 | | |
345 | 4.53k | port::RWMutex* MemTable::GetLock(const Slice& key) { |
346 | 4.53k | static murmur_hash hash; |
347 | 4.53k | return &locks_[hash(key) % locks_.size()]; |
348 | 4.53k | } |
349 | | |
350 | 0 | std::string MemTable::ToString() const { |
351 | 0 | ostringstream ss; |
352 | 0 | auto* frontiers = Frontiers(); |
353 | 0 | ss << "MemTable {" |
354 | 0 | << " num_entries: " << num_entries() |
355 | 0 | << " num_deletes: " << num_deletes() |
356 | 0 | << " IsEmpty: " << IsEmpty() |
357 | 0 | << " flush_state: " << flush_state_ |
358 | 0 | << " first_seqno: " << GetFirstSequenceNumber() |
359 | 0 | << " eariest_seqno: " << GetEarliestSequenceNumber() |
360 | 0 | << " frontiers: "; |
361 | 0 | if (frontiers) { |
362 | 0 | ss << frontiers->ToString(); |
363 | 0 | } else { |
364 | 0 | ss << "N/A"; |
365 | 0 | } |
366 | 0 | ss << " }"; |
367 | 0 | return ss.str(); |
368 | 0 | } |
369 | | |
370 | | uint64_t MemTable::ApproximateSize(const Slice& start_ikey, |
371 | 22 | const Slice& end_ikey) { |
372 | 22 | uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey); |
373 | 22 | if (entry_count == 0) { |
374 | 13 | return 0; |
375 | 13 | } |
376 | 9 | uint64_t n = num_entries_.load(std::memory_order_relaxed); |
377 | 9 | if (n == 0) { |
378 | 0 | return 0; |
379 | 0 | } |
380 | 9 | if (entry_count > n) { |
381 | | // table_->ApproximateNumEntries() is just an estimate so it can be larger |
382 | | // than actual entries we have. Cap it to entries we have to limit the |
383 | | // inaccuracy. |
384 | 0 | entry_count = n; |
385 | 0 | } |
386 | 9 | uint64_t data_size = data_size_.load(std::memory_order_relaxed); |
387 | 9 | return entry_count * (data_size / n); |
388 | 9 | } |
389 | | |
390 | | void MemTable::Add(SequenceNumber seq, ValueType type, const SliceParts& key, |
391 | 38.0M | const SliceParts& value, bool allow_concurrent) { |
392 | 38.0M | PreparedAdd prepared_add; |
393 | 38.0M | auto handle = PrepareAdd(seq, type, key, value, &prepared_add); |
394 | 38.0M | ApplyPreparedAdd(&handle, 1, prepared_add, allow_concurrent); |
395 | 38.0M | } |
396 | | |
397 | | KeyHandle MemTable::PrepareAdd(SequenceNumber s, ValueType type, |
398 | | const SliceParts& key, |
399 | | const SliceParts& value, |
400 | 399M | PreparedAdd* prepared_add) { |
401 | | // Format of an entry is concatenation of: |
402 | | // key_size : varint32 of internal_key.size() |
403 | | // key bytes : char[internal_key.size()] |
404 | | // value_size : varint32 of value.size() |
405 | | // value bytes : char[value.size()] |
406 | 399M | uint32_t key_size = static_cast<uint32_t>(key.SumSizes()); |
407 | 399M | uint32_t val_size = static_cast<uint32_t>(value.SumSizes()); |
408 | 399M | uint32_t internal_key_size = key_size + 8; |
409 | 399M | const uint32_t encoded_len = VarintLength(internal_key_size) + |
410 | 399M | internal_key_size + VarintLength(val_size) + |
411 | 399M | val_size; |
412 | 399M | char* buf = nullptr; |
413 | 399M | KeyHandle handle = table_->Allocate(encoded_len, &buf); |
414 | | |
415 | 399M | char* p = EncodeVarint32(buf, internal_key_size); |
416 | 399M | p = key.CopyAllTo(p); |
417 | 399M | uint64_t packed = PackSequenceAndType(s, type); |
418 | 399M | EncodeFixed64(p, packed); |
419 | 399M | p += 8; |
420 | 399M | p = EncodeVarint32(p, val_size); |
421 | 399M | p = value.CopyAllTo(p); |
422 | 399M | assert((unsigned)(p - buf) == (unsigned)encoded_len); |
423 | | |
424 | 399M | if (prefix_bloom_) { |
425 | 400k | assert(prefix_extractor_); |
426 | 0 | prefix_bloom_->Add(prefix_extractor_->Transform(key.TheOnlyPart())); |
427 | 400k | } |
428 | | |
429 | 399M | if (!prepared_add->min_seq_no) { |
430 | 47.5M | prepared_add->min_seq_no = s; |
431 | 47.5M | } |
432 | 399M | prepared_add->total_encoded_len += encoded_len; |
433 | 399M | if (type == ValueType::kTypeDeletion) { |
434 | 1.87M | ++prepared_add->num_deletes; |
435 | 1.87M | } |
436 | 399M | return handle; |
437 | 399M | } |
438 | | |
439 | | void MemTable::ApplyPreparedAdd( |
440 | 47.5M | const KeyHandle* handle, size_t count, const PreparedAdd& prepared_add, bool allow_concurrent) { |
441 | 47.5M | if (!allow_concurrent) { |
442 | 445M | for (const auto* end = handle + count; handle != end; ++handle398M ) { |
443 | 398M | table_->Insert(*handle); |
444 | 398M | } |
445 | | |
446 | | // this is a bit ugly, but is the way to avoid locked instructions |
447 | | // when incrementing an atomic |
448 | 47.0M | num_entries_.store(num_entries_.load(std::memory_order_relaxed) + count, |
449 | 47.0M | std::memory_order_relaxed); |
450 | 47.0M | data_size_.store(data_size_.load(std::memory_order_relaxed) + prepared_add.total_encoded_len, |
451 | 47.0M | std::memory_order_relaxed); |
452 | 47.0M | if (prepared_add.num_deletes) { |
453 | 1.87M | num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + prepared_add.num_deletes, |
454 | 1.87M | std::memory_order_relaxed); |
455 | 1.87M | } |
456 | | |
457 | | // The first sequence number inserted into the memtable. |
458 | | // Multiple occurences of the same sequence number in the write batch are allowed |
459 | | // as long as they touch different keys. |
460 | 18.4E | DCHECK(first_seqno_ == 0 || prepared_add.min_seq_no >= first_seqno_) |
461 | 18.4E | << "first_seqno_: " << first_seqno_ << ", prepared_add.min_seq_no: " |
462 | 18.4E | << prepared_add.min_seq_no; |
463 | | |
464 | 47.0M | if (first_seqno_ == 0) { |
465 | 124k | first_seqno_.store(prepared_add.min_seq_no, std::memory_order_relaxed); |
466 | | |
467 | 124k | if (earliest_seqno_ == kMaxSequenceNumber) { |
468 | 3.05k | earliest_seqno_.store(GetFirstSequenceNumber(), |
469 | 3.05k | std::memory_order_relaxed); |
470 | 3.05k | } |
471 | 124k | DCHECK_GE(first_seqno_.load(), earliest_seqno_.load()); |
472 | 124k | } |
473 | 47.0M | } else { |
474 | 1.05M | for (const auto* end = handle + count; handle != end; ++handle525k ) { |
475 | 525k | table_->InsertConcurrently(*handle); |
476 | 525k | } |
477 | | |
478 | 525k | num_entries_.fetch_add(count, std::memory_order_relaxed); |
479 | 525k | data_size_.fetch_add(prepared_add.total_encoded_len, std::memory_order_relaxed); |
480 | 525k | if (prepared_add.num_deletes) { |
481 | 0 | num_deletes_.fetch_add(prepared_add.num_deletes, std::memory_order_relaxed); |
482 | 0 | } |
483 | | |
484 | | // atomically update first_seqno_ and earliest_seqno_. |
485 | 525k | uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); |
486 | 525k | while ((cur_seq_num == 0 || prepared_add.min_seq_no < cur_seq_num522k ) && |
487 | 525k | !first_seqno_.compare_exchange_weak(cur_seq_num, prepared_add.min_seq_no)68 ) { |
488 | 0 | } |
489 | 525k | uint64_t cur_earliest_seqno = |
490 | 525k | earliest_seqno_.load(std::memory_order_relaxed); |
491 | 525k | while ( |
492 | 525k | (cur_earliest_seqno == kMaxSequenceNumber || |
493 | 525k | prepared_add.min_seq_no < cur_earliest_seqno523k ) && |
494 | 525k | !first_seqno_.compare_exchange_weak(cur_earliest_seqno, prepared_add.min_seq_no)0 ) { |
495 | 0 | } |
496 | 525k | } |
497 | | |
498 | 47.5M | UpdateFlushState(); |
499 | 47.5M | } |
500 | | |
501 | | // This comparator is used for deciding whether to erase a found key from a memtable instead of |
502 | | // writing a deletion mark. This is exactly what we need for erasing records in memory |
503 | | // (without writing new deletion marks). It expects a special key consisting of the user key being |
504 | | // erased followed by 8 0xff bytes as the first argument, and a key from a memtable as the second |
505 | | // argument (with the usual user_key + value_type + seqno format). |
506 | | // It returns zero if the user key parts of both arguments match and the second argument's value |
507 | | // type is not a deletion. |
508 | | // |
509 | | // Note: this comparator's return value cannot be used to establish order, |
510 | | // only to test for "equality" as defined above. |
511 | | class EraseHelperKeyComparator : public MemTableRep::KeyComparator { |
512 | | public: |
513 | | explicit EraseHelperKeyComparator(const Comparator* user_comparator, bool* had_delete) |
514 | 144M | : user_comparator_(user_comparator), had_delete_(had_delete) {} |
515 | | |
516 | 141M | int operator()(const char* prefix_len_key1, const char* prefix_len_key2) const override { |
517 | | // Internal keys are encoded as length-prefixed strings. |
518 | 141M | Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); |
519 | 141M | Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); |
520 | 141M | return Compare(k1, k2); |
521 | 141M | } |
522 | | |
523 | 0 | int operator()(const char* prefix_len_key, const Slice& key) const override { |
524 | | // Internal keys are encoded as length-prefixed strings. |
525 | 0 | Slice a = GetLengthPrefixedSlice(prefix_len_key); |
526 | 0 | return Compare(a, key); |
527 | 0 | } |
528 | | |
529 | 141M | int Compare(const Slice& a, const Slice& b) const { |
530 | 141M | auto user_b = ExtractUserKey(b); |
531 | 141M | auto result = user_comparator_->Compare(ExtractUserKey(a), user_b); |
532 | 141M | if (result == 0) { |
533 | | // This comparator is used only to check whether we should delete the entry we found. |
534 | | // So any non zero result should satisfy our needs. |
535 | | // `b` is a value stored in mem table, so we check only it. |
536 | | // `a` is key that we created for erase and user key is always followed by eight 0xff. |
537 | 136M | auto value_type = static_cast<ValueType>(b[user_b.size()]); |
538 | 136M | DCHECK_LE(value_type, ValueType::kTypeColumnFamilySingleDeletion); |
539 | 136M | if (value_type == ValueType::kTypeSingleDeletion || |
540 | 136M | value_type == ValueType::kTypeColumnFamilySingleDeletion) { |
541 | 0 | *had_delete_ = true; |
542 | 0 | return -1; |
543 | 0 | } |
544 | 136M | } |
545 | 141M | return result; |
546 | 141M | } |
547 | | |
548 | | private: |
549 | | const Comparator* user_comparator_; |
550 | | bool* had_delete_; |
551 | | }; |
552 | | |
553 | 144M | bool MemTable::Erase(const Slice& user_key) { |
554 | 144M | uint32_t user_key_size = static_cast<uint32_t>(user_key.size()); |
555 | 144M | uint32_t internal_key_size = user_key_size + 8; |
556 | 144M | const uint32_t encoded_len = VarintLength(internal_key_size) + internal_key_size; |
557 | | |
558 | 144M | if (erase_key_buffer_.size() < encoded_len) { |
559 | 112k | erase_key_buffer_.resize(encoded_len); |
560 | 112k | } |
561 | 144M | char* buf = erase_key_buffer_.data(); |
562 | 144M | char* p = EncodeVarint32(buf, internal_key_size); |
563 | 144M | memcpy(p, user_key.data(), user_key_size); |
564 | 144M | p += user_key_size; |
565 | | // Fill key tail with 0xffffffffffffffff so it we be less than actual user key. |
566 | | // Please note descending order is used for key tail. |
567 | 144M | EncodeFixed64(p, -1LL); |
568 | 144M | bool had_delete = false; |
569 | 144M | EraseHelperKeyComparator only_user_key_comparator( |
570 | 144M | comparator_.comparator.user_comparator(), &had_delete); |
571 | 144M | if (table_->Erase(buf, only_user_key_comparator)) { |
572 | | // this is a bit ugly, but is the way to avoid locked instructions |
573 | | // when incrementing an atomic |
574 | 136M | num_erased_.store(num_erased_.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); |
575 | | |
576 | 136M | UpdateFlushState(); |
577 | 136M | return true; |
578 | 136M | } else if (8.03M had_delete8.03M ) { // Do nothing in case when we already had delete. |
579 | 0 | return true; |
580 | 0 | } |
581 | | |
582 | 8.03M | return false; |
583 | 144M | } |
584 | | |
585 | | // Callback from MemTable::Get() |
586 | | namespace { |
587 | | |
588 | | struct Saver { |
589 | | Status* status; |
590 | | const LookupKey* key; |
591 | | bool* found_final_value; // Is value set correctly? Used by KeyMayExist |
592 | | bool* merge_in_progress; |
593 | | std::string* value; |
594 | | SequenceNumber seq; |
595 | | const MergeOperator* merge_operator; |
596 | | // the merge operations encountered; |
597 | | MergeContext* merge_context; |
598 | | MemTable* mem; |
599 | | Logger* logger; |
600 | | Statistics* statistics; |
601 | | bool inplace_update_support; |
602 | | Env* env_; |
603 | | }; |
604 | | } // namespace |
605 | | |
606 | 15.9M | static bool SaveValue(void* arg, const char* entry) { |
607 | 15.9M | Saver* s = reinterpret_cast<Saver*>(arg); |
608 | 15.9M | MergeContext* merge_context = s->merge_context; |
609 | 15.9M | const MergeOperator* merge_operator = s->merge_operator; |
610 | | |
611 | 15.9M | assert(s != nullptr && merge_context != nullptr); |
612 | | |
613 | | // entry format is: |
614 | | // klength varint32 |
615 | | // userkey char[klength-8] |
616 | | // tag uint64 |
617 | | // vlength varint32 |
618 | | // value char[vlength] |
619 | | // Check that it belongs to same user key. We do not check the |
620 | | // sequence number since the Seek() call above should have skipped |
621 | | // all entries with overly large sequence numbers. |
622 | 0 | uint32_t key_length; |
623 | 15.9M | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
624 | 15.9M | if (s->mem->GetInternalKeyComparator().user_comparator()->Equal( |
625 | 15.9M | Slice(key_ptr, key_length - 8), s->key->user_key())) { |
626 | | // Correct user key |
627 | 13.9M | auto seq_and_type = UnPackSequenceAndTypeFromEnd(key_ptr + key_length); |
628 | 13.9M | s->seq = seq_and_type.sequence; |
629 | | |
630 | 13.9M | switch (seq_and_type.type) { |
631 | 13.8M | case kTypeValue: { |
632 | 13.8M | if (s->inplace_update_support) { |
633 | 1.53k | s->mem->GetLock(s->key->user_key())->ReadLock(); |
634 | 1.53k | } |
635 | 13.8M | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
636 | 13.8M | *(s->status) = Status::OK(); |
637 | 13.8M | if (*(s->merge_in_progress)) { |
638 | 34 | assert(merge_operator); |
639 | 0 | bool merge_success = false; |
640 | 34 | { |
641 | 34 | StopWatchNano timer(s->env_, s->statistics != nullptr); |
642 | 34 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
643 | 34 | merge_success = merge_operator->FullMerge( |
644 | 34 | s->key->user_key(), &v, merge_context->GetOperands(), s->value, |
645 | 34 | s->logger); |
646 | 34 | RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, |
647 | 34 | timer.ElapsedNanos()); |
648 | 34 | } |
649 | 34 | if (!merge_success) { |
650 | 0 | RecordTick(s->statistics, NUMBER_MERGE_FAILURES); |
651 | 0 | *(s->status) = |
652 | 0 | STATUS(Corruption, "Error: Could not perform merge."); |
653 | 0 | } |
654 | 13.8M | } else if (13.8M s->value != nullptr13.8M ) { |
655 | 13.8M | s->value->assign(v.cdata(), v.size()); |
656 | 13.8M | } |
657 | 13.8M | if (s->inplace_update_support) { |
658 | 1.53k | s->mem->GetLock(s->key->user_key())->ReadUnlock(); |
659 | 1.53k | } |
660 | 13.8M | *(s->found_final_value) = true; |
661 | 13.8M | return false; |
662 | 0 | } |
663 | 1.06k | case kTypeDeletion: |
664 | 1.12k | case kTypeSingleDeletion: { |
665 | 1.12k | if (*(s->merge_in_progress)) { |
666 | 16 | assert(merge_operator != nullptr); |
667 | 0 | *(s->status) = Status::OK(); |
668 | 16 | bool merge_success = false; |
669 | 16 | { |
670 | 16 | StopWatchNano timer(s->env_, s->statistics != nullptr); |
671 | 16 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
672 | 16 | merge_success = merge_operator->FullMerge( |
673 | 16 | s->key->user_key(), nullptr, merge_context->GetOperands(), |
674 | 16 | s->value, s->logger); |
675 | 16 | RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, |
676 | 16 | timer.ElapsedNanos()); |
677 | 16 | } |
678 | 16 | if (!merge_success) { |
679 | 0 | RecordTick(s->statistics, NUMBER_MERGE_FAILURES); |
680 | 0 | *(s->status) = |
681 | 0 | STATUS(Corruption, "Error: Could not perform merge."); |
682 | 0 | } |
683 | 1.11k | } else { |
684 | 1.11k | *(s->status) = STATUS(NotFound, ""); |
685 | 1.11k | } |
686 | 0 | *(s->found_final_value) = true; |
687 | 1.12k | return false; |
688 | 1.06k | } |
689 | 44.3k | case kTypeMerge: { |
690 | 44.3k | if (!merge_operator) { |
691 | 0 | *(s->status) = STATUS(InvalidArgument, |
692 | 0 | "merge_operator is not properly initialized."); |
693 | | // Normally we continue the loop (return true) when we see a merge |
694 | | // operand. But in case of an error, we should stop the loop |
695 | | // immediately and pretend we have found the value to stop further |
696 | | // seek. Otherwise, the later call will override this error status. |
697 | 0 | *(s->found_final_value) = true; |
698 | 0 | return false; |
699 | 0 | } |
700 | 44.3k | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
701 | 44.3k | *(s->merge_in_progress) = true; |
702 | 44.3k | merge_context->PushOperand(v); |
703 | 44.3k | return true; |
704 | 44.3k | } |
705 | 0 | default: |
706 | 0 | assert(false); |
707 | 0 | return true; |
708 | 13.9M | } |
709 | 13.9M | } |
710 | | |
711 | | // s->state could be Corrupt, merge or notfound |
712 | 1.97M | return false; |
713 | 15.9M | } |
714 | | |
715 | | bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, |
716 | 21.4M | MergeContext* merge_context, SequenceNumber* seq) { |
717 | | // The sequence number is updated synchronously in version_set.h |
718 | 21.4M | if (IsEmpty()) { |
719 | | // Avoiding recording stats for speed. |
720 | 5.29M | return false; |
721 | 5.29M | } |
722 | 16.1M | PERF_TIMER_GUARD(get_from_memtable_time); |
723 | | |
724 | 16.1M | Slice user_key = key.user_key(); |
725 | 16.1M | bool found_final_value = false; |
726 | 16.1M | bool merge_in_progress = s->IsMergeInProgress(); |
727 | 16.1M | bool const may_contain = |
728 | 16.1M | nullptr == prefix_bloom_ |
729 | 16.1M | ? false16.1M |
730 | 16.1M | : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))39.6k ; |
731 | 16.1M | if (prefix_bloom_ && !may_contain129 ) { |
732 | | // iter is null if prefix bloom says the key does not exist |
733 | 19 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
734 | 19 | *seq = kMaxSequenceNumber; |
735 | 16.1M | } else { |
736 | 16.1M | if (prefix_bloom_) { |
737 | 110 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
738 | 110 | } |
739 | 16.1M | Saver saver; |
740 | 16.1M | saver.status = s; |
741 | 16.1M | saver.found_final_value = &found_final_value; |
742 | 16.1M | saver.merge_in_progress = &merge_in_progress; |
743 | 16.1M | saver.key = &key; |
744 | 16.1M | saver.value = value; |
745 | 16.1M | saver.seq = kMaxSequenceNumber; |
746 | 16.1M | saver.mem = this; |
747 | 16.1M | saver.merge_context = merge_context; |
748 | 16.1M | saver.merge_operator = moptions_.merge_operator; |
749 | 16.1M | saver.logger = moptions_.info_log; |
750 | 16.1M | saver.inplace_update_support = moptions_.inplace_update_support; |
751 | 16.1M | saver.statistics = moptions_.statistics; |
752 | 16.1M | saver.env_ = env_; |
753 | 16.1M | table_->Get(key, &saver, SaveValue); |
754 | | |
755 | 16.1M | *seq = saver.seq; |
756 | 16.1M | } |
757 | | |
758 | | // No change to value, since we have not yet found a Put/Delete |
759 | 16.1M | if (!found_final_value && merge_in_progress2.27M ) { |
760 | 1.55k | *s = STATUS(MergeInProgress, ""); |
761 | 1.55k | } |
762 | 16.1M | PERF_COUNTER_ADD(get_from_memtable_count, 1); |
763 | 16.1M | return found_final_value; |
764 | 21.4M | } |
765 | | |
766 | | void MemTable::Update(SequenceNumber seq, |
767 | | const Slice& key, |
768 | 100 | const Slice& value) { |
769 | 100 | LookupKey lkey(key, seq); |
770 | 100 | Slice mem_key = lkey.memtable_key(); |
771 | | |
772 | 100 | std::unique_ptr<MemTableRep::Iterator> iter( |
773 | 100 | table_->GetDynamicPrefixIterator()); |
774 | 100 | iter->Seek(lkey.internal_key(), mem_key.cdata()); |
775 | | |
776 | 100 | if (iter->Valid()) { |
777 | | // entry format is: |
778 | | // key_length varint32 |
779 | | // userkey char[klength-8] |
780 | | // tag uint64 |
781 | | // vlength varint32 |
782 | | // value char[vlength] |
783 | | // Check that it belongs to same user key. We do not check the |
784 | | // sequence number since the Seek() call above should have skipped |
785 | | // all entries with overly large sequence numbers. |
786 | 90 | const char* entry = iter->key(); |
787 | 90 | uint32_t key_length = 0; |
788 | 90 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
789 | 90 | if (comparator_.comparator.user_comparator()->Equal( |
790 | 90 | Slice(key_ptr, key_length - 8), lkey.user_key())) { |
791 | | // Correct user key |
792 | 90 | auto seq_and_type = UnPackSequenceAndTypeFromEnd(key_ptr + key_length); |
793 | 90 | switch (seq_and_type.type) { |
794 | 90 | case kTypeValue: { |
795 | 90 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); |
796 | 90 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
797 | 90 | uint32_t new_size = static_cast<uint32_t>(value.size()); |
798 | | |
799 | | // Update value, if new value size <= previous value size |
800 | 90 | if (new_size <= prev_size ) { |
801 | 45 | char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, |
802 | 45 | new_size); |
803 | 45 | WriteLock wl(GetLock(lkey.user_key())); |
804 | 45 | memcpy(p, value.data(), value.size()); |
805 | 45 | assert((unsigned)((p + value.size()) - entry) == |
806 | 45 | (unsigned)(VarintLength(key_length) + key_length + |
807 | 45 | VarintLength(value.size()) + value.size())); |
808 | 0 | return; |
809 | 45 | } |
810 | | // TODO (YugaByte): verify this is not a bug. The behavior for kTypeValue in case there |
811 | | // is not enough room for an in-place update, . |
812 | 90 | FALLTHROUGH_INTENDED45 ; |
813 | 45 | } |
814 | 45 | default: |
815 | | // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData |
816 | | // we don't have enough space for update inplace |
817 | 45 | Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); |
818 | 45 | return; |
819 | 90 | } |
820 | 90 | } |
821 | 90 | } |
822 | | |
823 | | // key doesn't exist |
824 | 10 | Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); |
825 | 10 | } |
826 | | |
827 | | bool MemTable::UpdateCallback(SequenceNumber seq, |
828 | | const Slice& key, |
829 | 1.44k | const Slice& delta) { |
830 | 1.44k | LookupKey lkey(key, seq); |
831 | 1.44k | Slice memkey = lkey.memtable_key(); |
832 | | |
833 | 1.44k | std::unique_ptr<MemTableRep::Iterator> iter( |
834 | 1.44k | table_->GetDynamicPrefixIterator()); |
835 | 1.44k | iter->Seek(lkey.internal_key(), memkey.cdata()); |
836 | | |
837 | 1.44k | if (iter->Valid()) { |
838 | | // entry format is: |
839 | | // key_length varint32 |
840 | | // userkey char[klength-8] |
841 | | // tag uint64 |
842 | | // vlength varint32 |
843 | | // value char[vlength] |
844 | | // Check that it belongs to same user key. We do not check the |
845 | | // sequence number since the Seek() call above should have skipped |
846 | | // all entries with overly large sequence numbers. |
847 | 1.42k | const char* entry = iter->key(); |
848 | 1.42k | uint32_t key_length = 0; |
849 | 1.42k | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
850 | 1.42k | if (comparator_.comparator.user_comparator()->Equal( |
851 | 1.42k | Slice(key_ptr, key_length - 8), lkey.user_key())) { |
852 | | // Correct user key |
853 | 1.42k | switch (UnPackSequenceAndTypeFromEnd(key_ptr + key_length).type) { |
854 | 1.42k | case kTypeValue: { |
855 | 1.42k | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); |
856 | 1.42k | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
857 | | |
858 | 1.42k | char* prev_buffer = const_cast<char*>(prev_value.cdata()); |
859 | 1.42k | uint32_t new_prev_size = prev_size; |
860 | | |
861 | 1.42k | std::string str_value; |
862 | 1.42k | WriteLock wl(GetLock(lkey.user_key())); |
863 | 1.42k | auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, |
864 | 1.42k | delta, &str_value); |
865 | 1.42k | if (status == UpdateStatus::UPDATED_INPLACE) { |
866 | | // Value already updated by callback. |
867 | 1.37k | assert(new_prev_size <= prev_size); |
868 | 1.37k | if (new_prev_size < prev_size) { |
869 | | // overwrite the new prev_size |
870 | 55 | char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, |
871 | 55 | new_prev_size); |
872 | 55 | if (VarintLength(new_prev_size) < VarintLength(prev_size)) { |
873 | | // shift the value buffer as well. |
874 | 5 | memcpy(p, prev_buffer, new_prev_size); |
875 | 5 | } |
876 | 55 | } |
877 | 1.37k | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); |
878 | 1.37k | UpdateFlushState(); |
879 | 1.37k | return true; |
880 | 1.37k | } else if (45 status == UpdateStatus::UPDATED45 ) { |
881 | 45 | Slice value(str_value); |
882 | 45 | Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); |
883 | 45 | RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); |
884 | 45 | UpdateFlushState(); |
885 | 45 | return true; |
886 | 45 | } else if (0 status == UpdateStatus::UPDATE_FAILED0 ) { |
887 | | // No action required. Return. |
888 | 0 | UpdateFlushState(); |
889 | 0 | return true; |
890 | 0 | } |
891 | 1.42k | FALLTHROUGH_INTENDED0 ; |
892 | 0 | } |
893 | 0 | default: |
894 | 0 | break; |
895 | 1.42k | } |
896 | 1.42k | } |
897 | 1.42k | } |
898 | | // If the latest value is not kTypeValue |
899 | | // or key doesn't exist |
900 | 20 | return false; |
901 | 1.44k | } |
902 | | |
903 | 134 | size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { |
904 | 134 | Slice memkey = key.memtable_key(); |
905 | | |
906 | | // A total ordered iterator is costly for some memtablerep (prefix aware |
907 | | // reps). By passing in the user key, we allow efficient iterator creation. |
908 | | // The iterator only needs to be ordered within the same user key. |
909 | 134 | std::unique_ptr<MemTableRep::Iterator> iter( |
910 | 134 | table_->GetDynamicPrefixIterator()); |
911 | 134 | iter->Seek(key.internal_key(), memkey.cdata()); |
912 | | |
913 | 134 | size_t num_successive_merges = 0; |
914 | | |
915 | 446 | for (; iter->Valid(); iter->Next()312 ) { |
916 | 446 | const char* entry = iter->key(); |
917 | 446 | uint32_t key_length = 0; |
918 | 446 | const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
919 | 446 | if (!comparator_.comparator.user_comparator()->Equal( |
920 | 446 | Slice(iter_key_ptr, key_length - 8), key.user_key())) { |
921 | 12 | break; |
922 | 12 | } |
923 | | |
924 | 434 | if (UnPackSequenceAndTypeFromEnd(iter_key_ptr + key_length).type != kTypeMerge) { |
925 | 122 | break; |
926 | 122 | } |
927 | | |
928 | 312 | ++num_successive_merges; |
929 | 312 | } |
930 | | |
931 | 134 | return num_successive_merges; |
932 | 134 | } |
933 | | |
934 | 2.46M | UserFrontierPtr MemTable::GetFrontier(UpdateUserValueType type) const { |
935 | 2.46M | std::lock_guard<SpinMutex> l(frontiers_mutex_); |
936 | 2.46M | if (!frontiers_) { |
937 | 0 | return nullptr; |
938 | 0 | } |
939 | | |
940 | 2.46M | switch (type) { |
941 | 7.15k | case UpdateUserValueType::kSmallest: |
942 | 7.15k | return frontiers_->Smallest().Clone(); |
943 | 2.45M | case UpdateUserValueType::kLargest: |
944 | 2.45M | return frontiers_->Largest().Clone(); |
945 | 2.46M | } |
946 | | |
947 | 0 | FATAL_INVALID_ENUM_VALUE(UpdateUserValueType, type); |
948 | 0 | } |
949 | | |
950 | | void MemTableRep::Get(const LookupKey& k, void* callback_args, |
951 | 0 | bool (*callback_func)(void* arg, const char* entry)) { |
952 | 0 | auto iter = GetDynamicPrefixIterator(); |
953 | 0 | for (iter->Seek(k.internal_key(), k.memtable_key().cdata()); |
954 | 0 | iter->Valid() && callback_func(callback_args, iter->key()); |
955 | 0 | iter->Next()) { |
956 | 0 | } |
957 | 0 | } |
958 | | |
959 | | } // namespace rocksdb |