/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include "yb/rocksdb/utilities/write_batch_with_index.h" |
24 | | |
25 | | #include <limits> |
26 | | #include <memory> |
27 | | |
28 | | #include "yb/rocksdb/db/column_family.h" |
29 | | #include "yb/rocksdb/db/merge_context.h" |
30 | | #include "yb/rocksdb/db/merge_helper.h" |
31 | | #include "yb/rocksdb/db/skiplist.h" |
32 | | #include "yb/rocksdb/comparator.h" |
33 | | #include "yb/rocksdb/iterator.h" |
34 | | #include "yb/rocksdb/util/arena.h" |
35 | | #include "yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h" |
36 | | |
37 | | namespace rocksdb { |
38 | | |
39 | | // when direction == forward |
40 | | // * current_at_base_ <=> base_iterator > delta_iterator |
41 | | // when direction == backwards |
42 | | // * current_at_base_ <=> base_iterator < delta_iterator |
43 | | // always: |
44 | | // * equal_keys_ <=> base_iterator == delta_iterator |
45 | | class BaseDeltaIterator : public Iterator { |
46 | | public: |
47 | | BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, |
48 | | const Comparator* comparator) |
49 | | : forward_(true), |
50 | | current_at_base_(true), |
51 | | equal_keys_(false), |
52 | | status_(Status::OK()), |
53 | | base_iterator_(base_iterator), |
54 | | delta_iterator_(delta_iterator), |
55 | 80 | comparator_(comparator) {} |
56 | | |
57 | 80 | virtual ~BaseDeltaIterator() {} |
58 | | |
59 | 512k | bool Valid() const override { |
60 | 504k | return current_at_base_ ? BaseValid() : DeltaValid(); |
61 | 512k | } |
62 | | |
63 | 1.63k | void SeekToFirst() override { |
64 | 1.63k | forward_ = true; |
65 | 1.63k | base_iterator_->SeekToFirst(); |
66 | 1.63k | delta_iterator_->SeekToFirst(); |
67 | 1.63k | UpdateCurrent(); |
68 | 1.63k | } |
69 | | |
70 | 1.61k | void SeekToLast() override { |
71 | 1.61k | forward_ = false; |
72 | 1.61k | base_iterator_->SeekToLast(); |
73 | 1.61k | delta_iterator_->SeekToLast(); |
74 | 1.61k | UpdateCurrent(); |
75 | 1.61k | } |
76 | | |
77 | 252k | void Seek(const Slice& k) override { |
78 | 252k | forward_ = true; |
79 | 252k | base_iterator_->Seek(k); |
80 | 252k | delta_iterator_->Seek(k); |
81 | 252k | UpdateCurrent(); |
82 | 252k | } |
83 | | |
84 | 120k | void Next() override { |
85 | 120k | if (!Valid()) { |
86 | 0 | status_ = STATUS(NotSupported, "Next() on invalid iterator"); |
87 | 0 | } |
88 | | |
89 | 120k | if (!forward_) { |
90 | | // Need to change direction |
91 | | // if our direction was backward and we're not equal, we have two states: |
92 | | // * both iterators are valid: we're already in a good state (current |
93 | | // shows to smaller) |
94 | | // * only one iterator is valid: we need to advance that iterator |
95 | 29.3k | forward_ = true; |
96 | 29.3k | equal_keys_ = false; |
97 | 29.3k | if (!BaseValid()) { |
98 | 487 | assert(DeltaValid()); |
99 | 487 | base_iterator_->SeekToFirst(); |
100 | 28.8k | } else if (!DeltaValid()) { |
101 | 4 | delta_iterator_->SeekToFirst(); |
102 | 28.8k | } else if (current_at_base_) { |
103 | | // Change delta from larger than base to smaller |
104 | 180 | AdvanceDelta(); |
105 | 28.6k | } else { |
106 | | // Change base from larger than delta to smaller |
107 | 28.6k | AdvanceBase(); |
108 | 28.6k | } |
109 | 29.3k | if (DeltaValid() && BaseValid()) { |
110 | 29.2k | if (comparator_->Equal(delta_iterator_->Entry().key, |
111 | 0 | base_iterator_->key())) { |
112 | 0 | equal_keys_ = true; |
113 | 0 | } |
114 | 29.2k | } |
115 | 29.3k | } |
116 | 120k | Advance(); |
117 | 120k | } |
118 | | |
119 | 120k | void Prev() override { |
120 | 120k | if (!Valid()) { |
121 | 0 | status_ = STATUS(NotSupported, "Prev() on invalid iterator"); |
122 | 0 | } |
123 | | |
124 | 120k | if (forward_) { |
125 | | // Need to change direction |
126 | | // if our direction was backward and we're not equal, we have two states: |
127 | | // * both iterators are valid: we're already in a good state (current |
128 | | // shows to smaller) |
129 | | // * only one iterator is valid: we need to advance that iterator |
130 | 90.9k | forward_ = false; |
131 | 90.9k | equal_keys_ = false; |
132 | 90.9k | if (!BaseValid()) { |
133 | 4 | assert(DeltaValid()); |
134 | 4 | base_iterator_->SeekToLast(); |
135 | 90.9k | } else if (!DeltaValid()) { |
136 | 35 | delta_iterator_->SeekToLast(); |
137 | 90.8k | } else if (current_at_base_) { |
138 | | // Change delta from less advanced than base to more advanced |
139 | 227 | AdvanceDelta(); |
140 | 90.6k | } else { |
141 | | // Change base from less advanced than delta to more advanced |
142 | 90.6k | AdvanceBase(); |
143 | 90.6k | } |
144 | 90.9k | if (DeltaValid() && BaseValid()) { |
145 | 89.0k | if (comparator_->Equal(delta_iterator_->Entry().key, |
146 | 0 | base_iterator_->key())) { |
147 | 0 | equal_keys_ = true; |
148 | 0 | } |
149 | 89.0k | } |
150 | 90.9k | } |
151 | | |
152 | 120k | Advance(); |
153 | 120k | } |
154 | | |
155 | 6.39k | Slice key() const override { |
156 | 2.09k | return current_at_base_ ? base_iterator_->key() |
157 | 4.29k | : delta_iterator_->Entry().key; |
158 | 6.39k | } |
159 | | |
160 | 6.39k | Slice value() const override { |
161 | 2.10k | return current_at_base_ ? base_iterator_->value() |
162 | 4.29k | : delta_iterator_->Entry().value; |
163 | 6.39k | } |
164 | | |
165 | 8.42k | Status status() const override { |
166 | 8.42k | if (!status_.ok()) { |
167 | 0 | return status_; |
168 | 0 | } |
169 | 8.42k | if (!base_iterator_->status().ok()) { |
170 | 0 | return base_iterator_->status(); |
171 | 0 | } |
172 | 8.42k | return delta_iterator_->status(); |
173 | 8.42k | } |
174 | | |
175 | | private: |
176 | 0 | void AssertInvariants() { |
177 | 0 | #ifndef NDEBUG |
178 | 0 | if (!Valid()) { |
179 | 0 | return; |
180 | 0 | } |
181 | 0 | if (!BaseValid()) { |
182 | 0 | assert(!current_at_base_ && delta_iterator_->Valid()); |
183 | 0 | return; |
184 | 0 | } |
185 | 0 | if (!DeltaValid()) { |
186 | 0 | assert(current_at_base_ && base_iterator_->Valid()); |
187 | 0 | return; |
188 | 0 | } |
189 | 0 | // we don't support those yet |
190 | 0 | assert(delta_iterator_->Entry().type != kMergeRecord && |
191 | 0 | delta_iterator_->Entry().type != kLogDataRecord); |
192 | 0 | int compare = comparator_->Compare(delta_iterator_->Entry().key, |
193 | 0 | base_iterator_->key()); |
194 | 0 | if (forward_) { |
195 | 0 | // current_at_base -> compare < 0 |
196 | 0 | assert(!current_at_base_ || compare < 0); |
197 | 0 | // !current_at_base -> compare <= 0 |
198 | 0 | assert(current_at_base_ && compare >= 0); |
199 | 0 | } else { |
200 | 0 | // current_at_base -> compare > 0 |
201 | 0 | assert(!current_at_base_ || compare > 0); |
202 | 0 | // !current_at_base -> compare <= 0 |
203 | 0 | assert(current_at_base_ && compare <= 0); |
204 | 0 | } |
205 | 0 | // equal_keys_ <=> compare == 0 |
206 | 0 | assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); |
207 | 0 | #endif |
208 | 0 | } |
209 | | |
210 | 240k | void Advance() { |
211 | 240k | if (equal_keys_) { |
212 | 60.4k | assert(BaseValid() && DeltaValid()); |
213 | 60.4k | AdvanceBase(); |
214 | 60.4k | AdvanceDelta(); |
215 | 180k | } else { |
216 | 180k | if (current_at_base_) { |
217 | 883 | assert(BaseValid()); |
218 | 883 | AdvanceBase(); |
219 | 179k | } else { |
220 | 179k | assert(DeltaValid()); |
221 | 179k | AdvanceDelta(); |
222 | 179k | } |
223 | 180k | } |
224 | 240k | UpdateCurrent(); |
225 | 240k | } |
226 | | |
227 | 804k | void AdvanceDelta() { |
228 | 804k | if (forward_) { |
229 | 500k | delta_iterator_->Next(); |
230 | 303k | } else { |
231 | 303k | delta_iterator_->Prev(); |
232 | 303k | } |
233 | 804k | } |
234 | 466k | void AdvanceBase() { |
235 | 466k | if (forward_) { |
236 | 268k | base_iterator_->Next(); |
237 | 198k | } else { |
238 | 198k | base_iterator_->Prev(); |
239 | 198k | } |
240 | 466k | } |
241 | 1.37M | bool BaseValid() const { return base_iterator_->Valid(); } |
242 | 3.10M | bool DeltaValid() const { return delta_iterator_->Valid(); } |
243 | 495k | void UpdateCurrent() { |
244 | 1.06M | while (true) { |
245 | 1.06M | WriteEntry delta_entry; |
246 | 1.06M | if (DeltaValid()) { |
247 | 1.04M | delta_entry = delta_iterator_->Entry(); |
248 | 1.04M | } |
249 | 1.06M | equal_keys_ = false; |
250 | 1.06M | if (!BaseValid()) { |
251 | | // Base has finished. |
252 | 17.4k | if (!DeltaValid()) { |
253 | | // Finished |
254 | 12.9k | return; |
255 | 12.9k | } |
256 | 4.47k | if (delta_entry.type == kDeleteRecord || |
257 | 2.42k | delta_entry.type == kSingleDeleteRecord) { |
258 | 2.42k | AdvanceDelta(); |
259 | 2.05k | } else { |
260 | 2.05k | current_at_base_ = false; |
261 | 2.05k | return; |
262 | 2.05k | } |
263 | 1.04M | } else if (!DeltaValid()) { |
264 | | // Delta has finished. |
265 | 185 | current_at_base_ = true; |
266 | 185 | return; |
267 | 1.04M | } else { |
268 | 1.04M | int compare = |
269 | 745k | (forward_ ? 1 : -1) * |
270 | 1.04M | comparator_->Compare(delta_entry.key, base_iterator_->key()); |
271 | 1.04M | if (compare <= 0) { // delta bigger or equal |
272 | 1.04M | if (compare == 0) { |
273 | 529k | equal_keys_ = true; |
274 | 529k | } |
275 | 1.04M | if (delta_entry.type != kDeleteRecord && |
276 | 478k | delta_entry.type != kSingleDeleteRecord) { |
277 | 478k | current_at_base_ = false; |
278 | 478k | return; |
279 | 478k | } |
280 | | // Delta is less advanced and is delete. |
281 | 561k | AdvanceDelta(); |
282 | 561k | if (equal_keys_) { |
283 | 286k | AdvanceBase(); |
284 | 286k | } |
285 | 1.95k | } else { |
286 | 1.95k | current_at_base_ = true; |
287 | 1.95k | return; |
288 | 1.95k | } |
289 | 1.04M | } |
290 | 1.06M | } |
291 | | |
292 | 0 | AssertInvariants(); |
293 | 0 | } |
294 | | |
295 | | bool forward_; |
296 | | bool current_at_base_; |
297 | | bool equal_keys_; |
298 | | Status status_; |
299 | | std::unique_ptr<Iterator> base_iterator_; |
300 | | std::unique_ptr<WBWIIterator> delta_iterator_; |
301 | | const Comparator* comparator_; // not owned |
302 | | }; |
303 | | |
304 | | typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&> |
305 | | WriteBatchEntrySkipList; |
306 | | |
307 | | class WBWIIteratorImpl : public WBWIIterator { |
308 | | public: |
309 | | WBWIIteratorImpl(uint32_t column_family_id, |
310 | | WriteBatchEntrySkipList* skip_list, |
311 | | const ReadableWriteBatch* write_batch) |
312 | | : column_family_id_(column_family_id), |
313 | | skip_list_iter_(skip_list), |
314 | 501k | write_batch_(write_batch) {} |
315 | | |
316 | 501k | virtual ~WBWIIteratorImpl() {} |
317 | | |
318 | 3.60M | bool Valid() const override { |
319 | 3.60M | if (!skip_list_iter_.Valid()) { |
320 | 39.9k | return false; |
321 | 39.9k | } |
322 | 3.56M | const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); |
323 | 3.56M | return (iter_entry != nullptr && |
324 | 3.56M | iter_entry->column_family == column_family_id_); |
325 | 3.56M | } |
326 | | |
327 | 1.66k | void SeekToFirst() override { |
328 | 1.66k | WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, |
329 | 1.66k | column_family_id_); |
330 | 1.66k | skip_list_iter_.Seek(&search_entry); |
331 | 1.66k | } |
332 | | |
333 | 1.84k | void SeekToLast() override { |
334 | 1.84k | WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, |
335 | 1.84k | column_family_id_ + 1); |
336 | 1.84k | skip_list_iter_.Seek(&search_entry); |
337 | 1.84k | if (!skip_list_iter_.Valid()) { |
338 | 1.41k | skip_list_iter_.SeekToLast(); |
339 | 431 | } else { |
340 | 431 | skip_list_iter_.Prev(); |
341 | 431 | } |
342 | 1.84k | } |
343 | | |
344 | 753k | void Seek(const Slice& key) override { |
345 | 753k | WriteBatchIndexEntry search_entry(&key, column_family_id_); |
346 | 753k | skip_list_iter_.Seek(&search_entry); |
347 | 753k | } |
348 | | |
349 | 500k | void Next() override { skip_list_iter_.Next(); } |
350 | | |
351 | 303k | void Prev() override { skip_list_iter_.Prev(); } |
352 | | |
353 | 1.67M | WriteEntry Entry() const override { |
354 | 1.67M | WriteEntry ret; |
355 | 1.67M | Slice blob; |
356 | 1.67M | const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); |
357 | | // this is guaranteed with Valid() |
358 | 1.67M | assert(iter_entry != nullptr && |
359 | 1.67M | iter_entry->column_family == column_family_id_); |
360 | 1.67M | auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type, |
361 | 1.67M | &ret.key, &ret.value, &blob); |
362 | 1.67M | assert(s.ok()); |
363 | 1.67M | assert(ret.type == kPutRecord || ret.type == kDeleteRecord || |
364 | 1.67M | ret.type == kSingleDeleteRecord || ret.type == kMergeRecord); |
365 | 1.67M | return ret; |
366 | 1.67M | } |
367 | | |
368 | 8.61k | Status status() const override { |
369 | | // this is in-memory data structure, so the only way status can be non-ok is |
370 | | // through memory corruption |
371 | 8.61k | return Status::OK(); |
372 | 8.61k | } |
373 | | |
374 | 499k | const WriteBatchIndexEntry* GetRawEntry() const { |
375 | 499k | return skip_list_iter_.key(); |
376 | 499k | } |
377 | | |
378 | | private: |
379 | | uint32_t column_family_id_; |
380 | | WriteBatchEntrySkipList::Iterator skip_list_iter_; |
381 | | const ReadableWriteBatch* write_batch_; |
382 | | }; |
383 | | |
384 | | struct WriteBatchWithIndex::Rep { |
385 | | Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, |
386 | | bool _overwrite_key = false) |
387 | | : write_batch(reserved_bytes), |
388 | | comparator(index_comparator, &write_batch), |
389 | | skip_list(comparator, &arena), |
390 | | overwrite_key(_overwrite_key), |
391 | 740k | last_entry_offset(0) {} |
392 | | ReadableWriteBatch write_batch; |
393 | | WriteBatchEntryComparator comparator; |
394 | | Arena arena; |
395 | | WriteBatchEntrySkipList skip_list; |
396 | | bool overwrite_key; |
397 | | size_t last_entry_offset; |
398 | | |
399 | | // Remember current offset of internal write batch, which is used as |
400 | | // the starting offset of the next record. |
401 | 7.86M | void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } |
402 | | |
403 | | // In overwrite mode, find the existing entry for the same key and update it |
404 | | // to point to the current entry. |
405 | | // Return true if the key is found and updated. |
406 | | bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); |
407 | | bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); |
408 | | |
409 | | // Add the recent entry to the update. |
410 | | // In overwrite mode, if key already exists in the index, update it. |
411 | | void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); |
412 | | void AddOrUpdateIndex(const Slice& key); |
413 | | |
414 | | // Allocate an index entry pointing to the last entry in the write batch and |
415 | | // put it to skip list. |
416 | | void AddNewEntry(uint32_t column_family_id); |
417 | | |
418 | | // Clear all updates buffered in this batch. |
419 | | void Clear(); |
420 | | void ClearIndex(); |
421 | | |
422 | | // Rebuild index by reading all records from the batch. |
423 | | // Returns non-ok status on corruption. |
424 | | Status ReBuildIndex(); |
425 | | }; |
426 | | |
427 | | bool WriteBatchWithIndex::Rep::UpdateExistingEntry( |
428 | 7.35M | ColumnFamilyHandle* column_family, const Slice& key) { |
429 | 7.35M | uint32_t cf_id = GetColumnFamilyID(column_family); |
430 | 7.35M | return UpdateExistingEntryWithCfId(cf_id, key); |
431 | 7.35M | } |
432 | | |
433 | | bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( |
434 | 7.85M | uint32_t column_family_id, const Slice& key) { |
435 | 7.85M | if (!overwrite_key) { |
436 | 7.35M | return false; |
437 | 7.35M | } |
438 | | |
439 | 500k | WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); |
440 | 500k | iter.Seek(key); |
441 | 500k | if (!iter.Valid()) { |
442 | 778 | return false; |
443 | 778 | } |
444 | 500k | if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { |
445 | 59 | return false; |
446 | 59 | } |
447 | 500k | WriteBatchIndexEntry* non_const_entry = |
448 | 500k | const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry()); |
449 | 500k | non_const_entry->offset = last_entry_offset; |
450 | 500k | return true; |
451 | 500k | } |
452 | | |
453 | | void WriteBatchWithIndex::Rep::AddOrUpdateIndex( |
454 | 7.35M | ColumnFamilyHandle* column_family, const Slice& key) { |
455 | 7.35M | if (!UpdateExistingEntry(column_family, key)) { |
456 | 7.35M | uint32_t cf_id = GetColumnFamilyID(column_family); |
457 | 7.35M | const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); |
458 | 7.35M | if (cf_cmp != nullptr) { |
459 | 7.35M | comparator.SetComparatorForCF(cf_id, cf_cmp); |
460 | 7.35M | } |
461 | 7.35M | AddNewEntry(cf_id); |
462 | 7.35M | } |
463 | 7.35M | } |
464 | | |
465 | 500k | void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { |
466 | 500k | if (!UpdateExistingEntryWithCfId(0, key)) { |
467 | 176 | AddNewEntry(0); |
468 | 176 | } |
469 | 500k | } |
470 | | |
471 | 7.36M | void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { |
472 | 7.36M | auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); |
473 | 7.36M | auto* index_entry = |
474 | 7.36M | new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id); |
475 | 7.36M | skip_list.Insert(index_entry); |
476 | 7.36M | } |
477 | | |
478 | 3.22k | void WriteBatchWithIndex::Rep::Clear() { |
479 | 3.22k | write_batch.Clear(); |
480 | 3.22k | ClearIndex(); |
481 | 3.22k | } |
482 | | |
483 | 3.24k | void WriteBatchWithIndex::Rep::ClearIndex() { |
484 | 3.24k | skip_list.~WriteBatchEntrySkipList(); |
485 | 3.24k | arena.~Arena(); |
486 | 3.24k | new (&arena) Arena(); |
487 | 3.24k | new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); |
488 | 3.24k | last_entry_offset = 0; |
489 | 3.24k | } |
490 | | |
491 | 21 | Status WriteBatchWithIndex::Rep::ReBuildIndex() { |
492 | 21 | Status s; |
493 | | |
494 | 21 | ClearIndex(); |
495 | | |
496 | 21 | if (write_batch.Count() == 0) { |
497 | | // Nothing to re-index |
498 | 5 | return s; |
499 | 5 | } |
500 | | |
501 | 16 | size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); |
502 | | |
503 | 16 | Slice input(write_batch.Data()); |
504 | 16 | input.remove_prefix(offset); |
505 | | |
506 | | // Loop through all entries in Rep and add each one to the index |
507 | 16 | size_t found = 0; |
508 | 111 | while (s.ok() && !input.empty()) { |
509 | 95 | Slice key, value, blob; |
510 | 95 | uint32_t column_family_id = 0; // default |
511 | 95 | char tag = 0; |
512 | | |
513 | | // set offset of current entry for call to AddNewEntry() |
514 | 95 | last_entry_offset = input.cdata() - write_batch.Data().data(); |
515 | | |
516 | 95 | s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, |
517 | 95 | &value, &blob); |
518 | 95 | if (!s.ok()) { |
519 | 0 | break; |
520 | 0 | } |
521 | | |
522 | 95 | switch (tag) { |
523 | 21 | case kTypeColumnFamilyValue: |
524 | 74 | case kTypeValue: |
525 | 85 | case kTypeColumnFamilyDeletion: |
526 | 91 | case kTypeDeletion: |
527 | 95 | case kTypeColumnFamilySingleDeletion: |
528 | 95 | case kTypeSingleDeletion: |
529 | 95 | case kTypeColumnFamilyMerge: |
530 | 95 | case kTypeMerge: |
531 | 95 | found++; |
532 | 95 | if (!UpdateExistingEntryWithCfId(column_family_id, key)) { |
533 | 90 | AddNewEntry(column_family_id); |
534 | 90 | } |
535 | 95 | break; |
536 | 0 | case kTypeLogData: |
537 | 0 | break; |
538 | 0 | default: |
539 | 0 | return STATUS(Corruption, "unknown WriteBatch tag"); |
540 | 95 | } |
541 | 95 | } |
542 | | |
543 | 16 | if (s.ok() && found != write_batch.Count()) { |
544 | 0 | s = STATUS(Corruption, "WriteBatch has wrong count"); |
545 | 0 | } |
546 | | |
547 | 16 | return s; |
548 | 16 | } |
549 | | |
550 | | WriteBatchWithIndex::WriteBatchWithIndex( |
551 | | const Comparator* default_index_comparator, size_t reserved_bytes, |
552 | | bool overwrite_key) |
553 | 741k | : rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {} |
554 | | |
555 | 741k | WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; } |
556 | | |
557 | 743k | WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } |
558 | | |
559 | 24 | WBWIIterator* WriteBatchWithIndex::NewIterator() { |
560 | 24 | return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); |
561 | 24 | } |
562 | | |
563 | | WBWIIterator* WriteBatchWithIndex::NewIterator( |
564 | 334 | ColumnFamilyHandle* column_family) { |
565 | 334 | return new WBWIIteratorImpl(GetColumnFamilyID(column_family), |
566 | 334 | &(rep->skip_list), &rep->write_batch); |
567 | 334 | } |
568 | | |
569 | | Iterator* WriteBatchWithIndex::NewIteratorWithBase( |
570 | 71 | ColumnFamilyHandle* column_family, Iterator* base_iterator) { |
571 | 71 | if (rep->overwrite_key == false) { |
572 | 0 | assert(false); |
573 | 0 | return nullptr; |
574 | 0 | } |
575 | 71 | return new BaseDeltaIterator(base_iterator, NewIterator(column_family), |
576 | 71 | GetColumnFamilyUserComparator(column_family)); |
577 | 71 | } |
578 | | |
579 | 9 | Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { |
580 | 9 | if (rep->overwrite_key == false) { |
581 | 0 | assert(false); |
582 | 0 | return nullptr; |
583 | 0 | } |
584 | | // default column family's comparator |
585 | 9 | return new BaseDeltaIterator(base_iterator, NewIterator(), |
586 | 9 | rep->comparator.default_comparator()); |
587 | 9 | } |
588 | | |
589 | | void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, |
590 | 7.36M | const Slice& key, const Slice& value) { |
591 | 7.36M | rep->SetLastEntryOffset(); |
592 | 7.36M | rep->write_batch.Put(column_family, key, value); |
593 | 7.36M | rep->AddOrUpdateIndex(column_family, key); |
594 | 7.36M | } |
595 | | |
596 | 250k | void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { |
597 | 250k | rep->SetLastEntryOffset(); |
598 | 250k | rep->write_batch.Put(key, value); |
599 | 250k | rep->AddOrUpdateIndex(key); |
600 | 250k | } |
601 | | |
602 | | void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, |
603 | 286 | const Slice& key) { |
604 | 286 | rep->SetLastEntryOffset(); |
605 | 286 | rep->write_batch.Delete(column_family, key); |
606 | 286 | rep->AddOrUpdateIndex(column_family, key); |
607 | 286 | } |
608 | | |
609 | 250k | void WriteBatchWithIndex::Delete(const Slice& key) { |
610 | 250k | rep->SetLastEntryOffset(); |
611 | 250k | rep->write_batch.Delete(key); |
612 | 250k | rep->AddOrUpdateIndex(key); |
613 | 250k | } |
614 | | |
615 | | void WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, |
616 | 9 | const Slice& key) { |
617 | 9 | rep->SetLastEntryOffset(); |
618 | 9 | rep->write_batch.SingleDelete(column_family, key); |
619 | 9 | rep->AddOrUpdateIndex(column_family, key); |
620 | 9 | } |
621 | | |
622 | 15 | void WriteBatchWithIndex::SingleDelete(const Slice& key) { |
623 | 15 | rep->SetLastEntryOffset(); |
624 | 15 | rep->write_batch.SingleDelete(key); |
625 | 15 | rep->AddOrUpdateIndex(key); |
626 | 15 | } |
627 | | |
628 | | void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, |
629 | 13 | const Slice& key, const Slice& value) { |
630 | 13 | rep->SetLastEntryOffset(); |
631 | 13 | rep->write_batch.Merge(column_family, key, value); |
632 | 13 | rep->AddOrUpdateIndex(column_family, key); |
633 | 13 | } |
634 | | |
635 | 19 | void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { |
636 | 19 | rep->SetLastEntryOffset(); |
637 | 19 | rep->write_batch.Merge(key, value); |
638 | 19 | rep->AddOrUpdateIndex(key); |
639 | 19 | } |
640 | | |
641 | 0 | void WriteBatchWithIndex::PutLogData(const Slice& blob) { |
642 | 0 | rep->write_batch.PutLogData(blob); |
643 | 0 | } |
644 | | |
645 | 3.22k | void WriteBatchWithIndex::Clear() { rep->Clear(); } |
646 | | |
647 | | Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, |
648 | | const DBOptions& options, |
649 | 46 | const Slice& key, std::string* value) { |
650 | 46 | Status s; |
651 | 46 | MergeContext merge_context; |
652 | | |
653 | 46 | WriteBatchWithIndexInternal::Result result = |
654 | 46 | WriteBatchWithIndexInternal::GetFromBatch( |
655 | 46 | options, this, column_family, key, &merge_context, &rep->comparator, |
656 | 46 | value, rep->overwrite_key, &s); |
657 | | |
658 | 46 | switch (result) { |
659 | 21 | case WriteBatchWithIndexInternal::Result::kFound: |
660 | 21 | case WriteBatchWithIndexInternal::Result::kError: |
661 | | // use returned status |
662 | 21 | break; |
663 | 9 | case WriteBatchWithIndexInternal::Result::kDeleted: |
664 | 15 | case WriteBatchWithIndexInternal::Result::kNotFound: |
665 | 15 | s = STATUS(NotFound, ""); |
666 | 15 | break; |
667 | 10 | case WriteBatchWithIndexInternal::Result::kMergeInProgress: |
668 | 10 | s = STATUS(MergeInProgress, ""); |
669 | 10 | break; |
670 | 0 | default: |
671 | 0 | assert(false); |
672 | 46 | } |
673 | | |
674 | 46 | return s; |
675 | 46 | } |
676 | | |
677 | | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, |
678 | | const ReadOptions& read_options, |
679 | | const Slice& key, |
680 | 26 | std::string* value) { |
681 | 26 | return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, |
682 | 26 | value); |
683 | 26 | } |
684 | | |
685 | | Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, |
686 | | const ReadOptions& read_options, |
687 | | ColumnFamilyHandle* column_family, |
688 | | const Slice& key, |
689 | 190 | std::string* value) { |
690 | 190 | Status s; |
691 | 190 | MergeContext merge_context; |
692 | 190 | const DBOptions& options = db->GetDBOptions(); |
693 | | |
694 | 190 | std::string batch_value; |
695 | 190 | WriteBatchWithIndexInternal::Result result = |
696 | 190 | WriteBatchWithIndexInternal::GetFromBatch( |
697 | 190 | options, this, column_family, key, &merge_context, &rep->comparator, |
698 | 190 | &batch_value, rep->overwrite_key, &s); |
699 | | |
700 | 190 | if (result == WriteBatchWithIndexInternal::Result::kFound) { |
701 | 37 | value->assign(batch_value.data(), batch_value.size()); |
702 | 37 | return s; |
703 | 37 | } |
704 | 153 | if (result == WriteBatchWithIndexInternal::Result::kDeleted) { |
705 | 13 | return STATUS(NotFound, ""); |
706 | 13 | } |
707 | 140 | if (result == WriteBatchWithIndexInternal::Result::kError) { |
708 | 0 | return s; |
709 | 0 | } |
710 | 140 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && |
711 | 15 | rep->overwrite_key == true) { |
712 | | // Since we've overwritten keys, we do not know what other operations are |
713 | | // in this batch for this key, so we cannot do a Merge to compute the |
714 | | // result. Instead, we will simply return MergeInProgress. |
715 | 5 | return STATUS(MergeInProgress, ""); |
716 | 5 | } |
717 | | |
718 | 135 | assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || |
719 | 135 | result == WriteBatchWithIndexInternal::Result::kNotFound); |
720 | | |
721 | | // Did not find key in batch OR could not resolve Merges. Try DB. |
722 | 135 | s = db->Get(read_options, column_family, key, value); |
723 | | |
724 | 135 | if (s.ok() || s.IsNotFound()) { // DB Get Succeeded |
725 | 135 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { |
726 | | // Merge result from DB with merges in Batch |
727 | 10 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
728 | 10 | const MergeOperator* merge_operator = |
729 | 10 | cfh->cfd()->ioptions()->merge_operator; |
730 | 10 | Statistics* statistics = options.statistics.get(); |
731 | 10 | Env* env = options.env; |
732 | 10 | Logger* logger = options.info_log.get(); |
733 | | |
734 | 10 | Slice db_slice(*value); |
735 | 10 | Slice* merge_data; |
736 | 10 | if (s.ok()) { |
737 | 5 | merge_data = &db_slice; |
738 | 5 | } else { // Key not present in db (s.IsNotFound()) |
739 | 5 | merge_data = nullptr; |
740 | 5 | } |
741 | | |
742 | 10 | s = MergeHelper::TimedFullMerge( |
743 | 10 | key, merge_data, merge_context.GetOperands(), merge_operator, |
744 | 10 | statistics, env, logger, value); |
745 | 10 | } |
746 | 135 | } |
747 | | |
748 | 135 | return s; |
749 | 135 | } |
750 | | |
751 | 26 | void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } |
752 | | |
753 | 29 | Status WriteBatchWithIndex::RollbackToSavePoint() { |
754 | 29 | Status s = rep->write_batch.RollbackToSavePoint(); |
755 | | |
756 | 29 | if (s.ok()) { |
757 | 21 | s = rep->ReBuildIndex(); |
758 | 21 | } |
759 | | |
760 | 29 | return s; |
761 | 29 | } |
762 | | |
763 | | } // namespace rocksdb |
764 | | #endif // !ROCKSDB_LITE |