/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_batch.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | // |
24 | | // WriteBatch::rep_ := |
25 | | // sequence: fixed64 |
26 | | // count: fixed32 |
27 | | // data: record[count] |
28 | | // record := |
29 | | // kTypeValue varstring varstring |
30 | | // kTypeDeletion varstring |
31 | | // kTypeSingleDeletion varstring |
32 | | // kTypeMerge varstring varstring |
33 | | // kTypeColumnFamilyValue varint32 varstring varstring |
34 | | // kTypeColumnFamilyDeletion varint32 varstring varstring |
35 | | // kTypeColumnFamilySingleDeletion varint32 varstring varstring |
36 | | // kTypeColumnFamilyMerge varint32 varstring varstring |
37 | | // varstring := |
38 | | // len: varint32 |
39 | | // data: uint8[len] |
40 | | // |
41 | | // YugaByte-specific extensions stored out-of-band: |
42 | | // user_sequence_numbers_ |
43 | | |
44 | | #include <stack> |
45 | | #include <stdexcept> |
46 | | #include <vector> |
47 | | |
48 | | #include "yb/rocksdb/db/column_family.h" |
49 | | #include "yb/rocksdb/db/db_impl.h" |
50 | | #include "yb/rocksdb/db/dbformat.h" |
51 | | #include "yb/rocksdb/db/flush_scheduler.h" |
52 | | #include "yb/rocksdb/db/memtable.h" |
53 | | #include "yb/rocksdb/db/snapshot_impl.h" |
54 | | #include "yb/rocksdb/db/write_batch_internal.h" |
55 | | #include "yb/rocksdb/merge_operator.h" |
56 | | #include "yb/rocksdb/util/coding.h" |
57 | | #include "yb/rocksdb/util/perf_context_imp.h" |
58 | | #include "yb/rocksdb/util/statistics.h" |
59 | | |
60 | | #include "yb/util/stats/perf_step_timer.h" |
61 | | |
62 | | namespace rocksdb { |
63 | | |
64 | | // anon namespace for file-local types |
65 | | namespace { |
66 | | |
67 | | enum ContentFlags : uint32_t { |
68 | | DEFERRED = 1, |
69 | | HAS_PUT = 2, |
70 | | HAS_DELETE = 4, |
71 | | HAS_SINGLE_DELETE = 8, |
72 | | HAS_MERGE = 16, |
73 | | HAS_FRONTIERS = 32, |
74 | | }; |
75 | | |
76 | | struct BatchContentClassifier : public WriteBatch::Handler { |
77 | | uint32_t content_flags = 0; |
78 | | |
79 | 1 | CHECKED_STATUS PutCF(uint32_t, const SliceParts&, const SliceParts&) override { |
80 | 1 | content_flags |= ContentFlags::HAS_PUT; |
81 | 1 | return Status::OK(); |
82 | 1 | } |
83 | | |
84 | 0 | CHECKED_STATUS DeleteCF(uint32_t, const Slice&) override { |
85 | 0 | content_flags |= ContentFlags::HAS_DELETE; |
86 | 0 | return Status::OK(); |
87 | 0 | } |
88 | | |
89 | 0 | CHECKED_STATUS SingleDeleteCF(uint32_t, const Slice&) override { |
90 | 0 | content_flags |= ContentFlags::HAS_SINGLE_DELETE; |
91 | 0 | return Status::OK(); |
92 | 0 | } |
93 | | |
94 | 0 | CHECKED_STATUS MergeCF(uint32_t, const Slice&, const Slice&) override { |
95 | 0 | content_flags |= ContentFlags::HAS_MERGE; |
96 | 0 | return Status::OK(); |
97 | 0 | } |
98 | | |
99 | 0 | CHECKED_STATUS Frontiers(const UserFrontiers& range) override { |
100 | 0 | content_flags |= ContentFlags::HAS_FRONTIERS; |
101 | 0 | return Status::OK(); |
102 | 0 | } |
103 | | }; |
104 | | |
105 | | class DirectWriteHandlerImpl : public DirectWriteHandler { |
106 | | public: |
107 | | explicit DirectWriteHandlerImpl(MemTable* mem_table, SequenceNumber seq) |
108 | 6.39M | : mem_table_(mem_table), seq_(seq) {} |
109 | | |
110 | 113M | void Put(const SliceParts& key, const SliceParts& value) override { |
111 | 113M | Add(ValueType::kTypeValue, key, value); |
112 | 113M | } |
113 | | |
114 | 56.4M | void SingleDelete(const Slice& key) override { |
115 | 56.4M | if (mem_table_->Erase(key)) { |
116 | 55.4M | return; |
117 | 55.4M | } |
118 | 955k | Add(ValueType::kTypeSingleDeletion, SliceParts(&key, 1), SliceParts()); |
119 | 955k | } |
120 | | |
121 | 6.40M | size_t Complete() { |
122 | 6.40M | if (keys_.empty()) { |
123 | 1.19M | return 0; |
124 | 1.19M | } |
125 | 5.20M | auto compare = |
126 | 990M | [comparator = &mem_table_->GetInternalKeyComparator()](KeyHandle lhs, KeyHandle rhs) { |
127 | 990M | auto lhs_slice = GetLengthPrefixedSlice(static_cast<const char*>(lhs)); |
128 | 990M | auto rhs_slice = GetLengthPrefixedSlice(static_cast<const char*>(rhs)); |
129 | 990M | return comparator->Compare(lhs_slice, rhs_slice) < 0; |
130 | 990M | }; |
131 | 5.20M | std::sort(keys_.begin(), keys_.end(), compare); |
132 | 5.20M | mem_table_->ApplyPreparedAdd(keys_.data(), keys_.size(), prepared_add_, false); |
133 | 5.20M | return keys_.size(); |
134 | 5.20M | } |
135 | | |
136 | | private: |
137 | 114M | void Add(ValueType value_type, const SliceParts& key, const SliceParts& value) { |
138 | 114M | keys_.push_back( |
139 | 114M | mem_table_->PrepareAdd(seq_++, value_type, key, value, &prepared_add_)); |
140 | 114M | } |
141 | | |
142 | | MemTable* mem_table_; |
143 | | SequenceNumber seq_; |
144 | | PreparedAdd prepared_add_; |
145 | | boost::container::small_vector<KeyHandle, 128> keys_; |
146 | | }; |
147 | | |
148 | | } // anon namespace |
149 | | |
150 | | // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. |
151 | | static const size_t kHeader = 12; |
152 | | |
153 | | struct SavePoint { |
154 | | size_t size; // size of rep_ |
155 | | uint32_t count; // count of elements in rep_ |
156 | | uint32_t content_flags; |
157 | | const UserFrontiers* frontiers; |
158 | | }; |
159 | | |
160 | | struct SavePoints { |
161 | | std::stack<SavePoint> stack; |
162 | | }; |
163 | | |
164 | | WriteBatch::WriteBatch(size_t reserved_bytes) |
165 | 31.1M | : content_flags_(0) { |
166 | 31.1M | rep_.reserve(std::max(reserved_bytes, kHeader)); |
167 | 31.1M | rep_.resize(kHeader); |
168 | 31.1M | } |
169 | | |
170 | | WriteBatch::WriteBatch(const std::string& rep) |
171 | | : content_flags_(ContentFlags::DEFERRED), |
172 | 0 | rep_(rep) {} |
173 | | |
174 | | WriteBatch::WriteBatch(const WriteBatch& src) |
175 | | : content_flags_(src.content_flags_.load(std::memory_order_relaxed)), |
176 | | rep_(src.rep_), |
177 | 79 | frontiers_(src.frontiers_) { |
178 | 79 | if (src.save_points_) { |
179 | 0 | save_points_.reset(new SavePoints(*src.save_points_)); |
180 | 0 | } |
181 | 79 | } |
182 | | |
183 | | WriteBatch::WriteBatch(WriteBatch&& src) |
184 | | : save_points_(std::move(src.save_points_)), |
185 | | content_flags_(src.content_flags_.load(std::memory_order_relaxed)), |
186 | | rep_(std::move(src.rep_)), |
187 | 3 | frontiers_(std::move(src.frontiers_)) {} |
188 | | |
189 | 3 | WriteBatch& WriteBatch::operator=(const WriteBatch& src) { |
190 | 3 | if (&src != this) { |
191 | 3 | this->~WriteBatch(); |
192 | 3 | new (this) WriteBatch(src); |
193 | 3 | } |
194 | 3 | return *this; |
195 | 3 | } |
196 | | |
197 | 1 | WriteBatch& WriteBatch::operator=(WriteBatch&& src) { |
198 | 1 | if (&src != this) { |
199 | 1 | this->~WriteBatch(); |
200 | 1 | new (this) WriteBatch(std::move(src)); |
201 | 1 | } |
202 | 1 | return *this; |
203 | 1 | } |
204 | | |
205 | 31.0M | WriteBatch::~WriteBatch() {} |
206 | | |
207 | 25.2M | WriteBatch::Handler::~Handler() { } |
208 | | |
209 | 6 | void WriteBatch::Handler::LogData(const Slice& blob) { |
210 | | // If the user has not specified something to do with blobs, then we ignore |
211 | | // them. |
212 | 6 | } |
213 | | |
214 | 36.9M | bool WriteBatch::Handler::Continue() { |
215 | 36.9M | return true; |
216 | 36.9M | } |
217 | | |
218 | 692k | void WriteBatch::Clear() { |
219 | 692k | rep_.clear(); |
220 | 692k | rep_.resize(kHeader); |
221 | | |
222 | 692k | content_flags_.store(0, std::memory_order_relaxed); |
223 | | |
224 | 692k | if (save_points_ != nullptr) { |
225 | 16 | while (!save_points_->stack.empty()) { |
226 | 3 | save_points_->stack.pop(); |
227 | 3 | } |
228 | 13 | } |
229 | | |
230 | 692k | frontiers_ = nullptr; |
231 | 692k | } |
232 | | |
233 | 6.85M | uint32_t WriteBatch::Count() const { |
234 | 6.85M | return WriteBatchInternal::Count(this); |
235 | 6.85M | } |
236 | | |
237 | 55.6k | uint32_t WriteBatch::ComputeContentFlags() const { |
238 | 55.6k | auto rv = content_flags_.load(std::memory_order_relaxed); |
239 | 55.6k | if ((rv & ContentFlags::DEFERRED) != 0) { |
240 | 1 | BatchContentClassifier classifier; |
241 | 1 | auto status = Iterate(&classifier); |
242 | 1 | LOG_IF(ERROR, !status.ok()) << "Iterate failed during ComputeContentFlags: " << status; |
243 | 1 | rv = classifier.content_flags; |
244 | | |
245 | | // this method is conceptually const, because it is performing a lazy |
246 | | // computation that doesn't affect the abstract state of the batch. |
247 | | // content_flags_ is marked mutable so that we can perform the |
248 | | // following assignment |
249 | 1 | content_flags_.store(rv, std::memory_order_relaxed); |
250 | 1 | } |
251 | 55.6k | return rv; |
252 | 55.6k | } |
253 | | |
254 | 30 | bool WriteBatch::HasPut() const { |
255 | 30 | return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; |
256 | 30 | } |
257 | | |
258 | 30 | bool WriteBatch::HasDelete() const { |
259 | 30 | return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; |
260 | 30 | } |
261 | | |
262 | 30 | bool WriteBatch::HasSingleDelete() const { |
263 | 30 | return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0; |
264 | 30 | } |
265 | | |
266 | 55.5k | bool WriteBatch::HasMerge() const { |
267 | 55.5k | return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0; |
268 | 55.5k | } |
269 | | |
270 | | Status ReadRecordFromWriteBatch(Slice* input, char* tag, |
271 | | uint32_t* column_family, Slice* key, |
272 | 74.7M | Slice* value, Slice* blob) { |
273 | 74.7M | assert(key != nullptr && value != nullptr); |
274 | 74.7M | *tag = (*input)[0]; |
275 | 74.7M | input->remove_prefix(1); |
276 | 74.7M | *column_family = 0; // default |
277 | 74.7M | switch (*tag) { |
278 | 22.0M | case kTypeColumnFamilyValue: |
279 | 22.0M | if (!GetVarint32(input, column_family)) { |
280 | 0 | return STATUS(Corruption, "bad WriteBatch Put"); |
281 | 0 | } |
282 | 22.0M | FALLTHROUGH_INTENDED; |
283 | 52.8M | case kTypeValue: |
284 | 52.8M | if (!GetLengthPrefixedSlice(input, key) || |
285 | 52.8M | !GetLengthPrefixedSlice(input, value)) { |
286 | 0 | return STATUS(Corruption, "bad WriteBatch Put"); |
287 | 0 | } |
288 | 52.8M | break; |
289 | 79.8k | case kTypeColumnFamilyDeletion: |
290 | 80.1k | case kTypeColumnFamilySingleDeletion: |
291 | 80.1k | if (!GetVarint32(input, column_family)) { |
292 | 0 | return STATUS(Corruption, "bad WriteBatch Delete"); |
293 | 0 | } |
294 | 80.1k | FALLTHROUGH_INTENDED; |
295 | 21.8M | case kTypeDeletion: |
296 | 21.8M | case kTypeSingleDeletion: |
297 | 21.8M | if (!GetLengthPrefixedSlice(input, key)) { |
298 | 2 | return STATUS(Corruption, "bad WriteBatch Delete"); |
299 | 2 | } |
300 | 21.8M | break; |
301 | 433 | case kTypeColumnFamilyMerge: |
302 | 433 | if (!GetVarint32(input, column_family)) { |
303 | 0 | return STATUS(Corruption, "bad WriteBatch Merge"); |
304 | 0 | } |
305 | 433 | FALLTHROUGH_INTENDED; |
306 | 96.4k | case kTypeMerge: |
307 | 96.4k | if (!GetLengthPrefixedSlice(input, key) || |
308 | 96.4k | !GetLengthPrefixedSlice(input, value)) { |
309 | 0 | return STATUS(Corruption, "bad WriteBatch Merge"); |
310 | 0 | } |
311 | 96.4k | break; |
312 | 11 | case kTypeLogData: |
313 | 11 | assert(blob != nullptr); |
314 | 11 | if (!GetLengthPrefixedSlice(input, blob)) { |
315 | 0 | return STATUS(Corruption, "bad WriteBatch Blob"); |
316 | 0 | } |
317 | 11 | break; |
318 | 0 | default: |
319 | 0 | return STATUS(Corruption, "unknown WriteBatch tag"); |
320 | 74.7M | } |
321 | 74.7M | return Status::OK(); |
322 | 74.7M | } |
323 | | |
324 | | Result<size_t> DirectInsert(WriteBatch::Handler* handler, DirectWriter* writer); |
325 | | |
326 | 26.4M | Status WriteBatch::Iterate(Handler* handler) const { |
327 | 26.4M | Slice input(rep_); |
328 | 26.4M | if (input.size() < kHeader) { |
329 | 0 | return STATUS(Corruption, "malformed WriteBatch (too small)"); |
330 | 0 | } |
331 | | |
332 | 26.4M | input.remove_prefix(kHeader); |
333 | 26.4M | Slice key, value, blob; |
334 | 26.4M | size_t found = 0; |
335 | 26.4M | Status s; |
336 | | |
337 | 26.4M | if (frontiers_) { |
338 | 6.39M | s = handler->Frontiers(*frontiers_); |
339 | 6.39M | } |
340 | 26.4M | if (s.ok() && direct_writer_) { |
341 | 6.39M | auto result = DirectInsert(handler, direct_writer_); |
342 | 6.39M | if (result.ok()) { |
343 | 6.39M | direct_entries_ = *result; |
344 | 18.4E | } else { |
345 | 18.4E | s = result.status(); |
346 | 18.4E | } |
347 | 6.39M | } |
348 | 63.3M | while (s.ok() && !input.empty() && handler->Continue()) { |
349 | 36.9M | char tag = 0; |
350 | 36.9M | uint32_t column_family = 0; // default |
351 | | |
352 | 36.9M | s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, |
353 | 36.9M | &blob); |
354 | 36.9M | if (!s.ok()) { |
355 | 2 | return s; |
356 | 2 | } |
357 | | |
358 | 36.9M | switch (tag) { |
359 | 22.0M | case kTypeColumnFamilyValue: |
360 | 34.7M | case kTypeValue: |
361 | 34.7M | assert(content_flags_.load(std::memory_order_relaxed) & |
362 | 34.7M | (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); |
363 | 34.7M | s = handler->PutCF(column_family, SliceParts(&key, 1), SliceParts(&value, 1)); |
364 | 34.7M | found++; |
365 | 34.7M | break; |
366 | 777 | case kTypeColumnFamilyDeletion: |
367 | 2.04M | case kTypeDeletion: |
368 | 2.04M | assert(content_flags_.load(std::memory_order_relaxed) & |
369 | 2.04M | (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); |
370 | 2.04M | s = handler->DeleteCF(column_family, key); |
371 | 2.04M | found++; |
372 | 2.04M | break; |
373 | 185 | case kTypeColumnFamilySingleDeletion: |
374 | 199 | case kTypeSingleDeletion: |
375 | 199 | assert(content_flags_.load(std::memory_order_relaxed) & |
376 | 199 | (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); |
377 | 199 | s = handler->SingleDeleteCF(column_family, key); |
378 | 199 | found++; |
379 | 199 | break; |
380 | 342 | case kTypeColumnFamilyMerge: |
381 | 94.2k | case kTypeMerge: |
382 | 94.2k | assert(content_flags_.load(std::memory_order_relaxed) & |
383 | 94.2k | (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); |
384 | 94.2k | s = handler->MergeCF(column_family, key, value); |
385 | 94.2k | found++; |
386 | 94.2k | break; |
387 | 11 | case kTypeLogData: |
388 | 11 | handler->LogData(blob); |
389 | 11 | break; |
390 | 0 | default: |
391 | 0 | return STATUS(Corruption, "unknown WriteBatch tag"); |
392 | 36.9M | } |
393 | 36.9M | } |
394 | 26.4M | if (!s.ok()) { |
395 | 1 | return s; |
396 | 1 | } |
397 | 26.4M | if (found != WriteBatchInternal::Count(this)) { |
398 | 1 | return STATUS(Corruption, "WriteBatch has wrong count"); |
399 | 26.4M | } else { |
400 | 26.4M | return Status::OK(); |
401 | 26.4M | } |
402 | 26.4M | } |
403 | | |
404 | 116M | uint32_t WriteBatchInternal::Count(const WriteBatch* b) { |
405 | 116M | return DecodeFixed32(b->rep_.data() + 8); |
406 | 116M | } |
407 | | |
408 | 39.9M | void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) { |
409 | 39.9M | EncodeFixed32(&b->rep_[8], n); |
410 | 39.9M | } |
411 | | |
412 | 5.97M | SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { |
413 | 5.97M | return SequenceNumber(DecodeFixed64(b->rep_.data())); |
414 | 5.97M | } |
415 | | |
416 | 18.7M | void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { |
417 | 18.7M | EncodeFixed64(&b->rep_[0], seq); |
418 | 18.7M | } |
419 | | |
420 | 16 | size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return kHeader; } |
421 | | |
422 | | void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, |
423 | 36.2M | const Slice& key, const Slice& value) { |
424 | 36.2M | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
425 | 36.2M | if (column_family_id == 0) { |
426 | 14.3M | b->rep_.push_back(static_cast<char>(kTypeValue)); |
427 | 21.9M | } else { |
428 | 21.9M | b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); |
429 | 21.9M | PutVarint32(&b->rep_, column_family_id); |
430 | 21.9M | } |
431 | 36.2M | PutLengthPrefixedSlice(&b->rep_, key); |
432 | 36.2M | PutLengthPrefixedSlice(&b->rep_, value); |
433 | 36.2M | b->content_flags_.store( |
434 | 36.2M | b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, |
435 | 36.2M | std::memory_order_relaxed); |
436 | 36.2M | } |
437 | | |
438 | | void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, |
439 | 36.2M | const Slice& value) { |
440 | 36.2M | WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); |
441 | 36.2M | } |
442 | | |
443 | | void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, |
444 | 51 | const SliceParts& key, const SliceParts& value) { |
445 | 51 | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
446 | 51 | if (column_family_id == 0) { |
447 | 2 | b->rep_.push_back(static_cast<char>(kTypeValue)); |
448 | 49 | } else { |
449 | 49 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); |
450 | 49 | PutVarint32(&b->rep_, column_family_id); |
451 | 49 | } |
452 | 51 | PutLengthPrefixedSliceParts(&b->rep_, key); |
453 | 51 | PutLengthPrefixedSliceParts(&b->rep_, value); |
454 | 51 | b->content_flags_.store( |
455 | 51 | b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, |
456 | 51 | std::memory_order_relaxed); |
457 | 51 | } |
458 | | |
459 | | void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, |
460 | 51 | const SliceParts& value) { |
461 | 51 | WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); |
462 | 51 | } |
463 | | |
464 | | void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, |
465 | 2.06M | const Slice& key) { |
466 | 2.06M | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
467 | 2.06M | if (column_family_id == 0) { |
468 | 2.06M | b->rep_.push_back(static_cast<char>(kTypeDeletion)); |
469 | 985 | } else { |
470 | 985 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); |
471 | 985 | PutVarint32(&b->rep_, column_family_id); |
472 | 985 | } |
473 | 2.06M | PutLengthPrefixedSlice(&b->rep_, key); |
474 | 2.06M | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | |
475 | 2.06M | ContentFlags::HAS_DELETE, |
476 | 2.06M | std::memory_order_relaxed); |
477 | 2.06M | } |
478 | | |
479 | 2.06M | void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { |
480 | 2.06M | WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); |
481 | 2.06M | } |
482 | | |
483 | | void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, |
484 | 21 | const SliceParts& key) { |
485 | 21 | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
486 | 21 | if (column_family_id == 0) { |
487 | 0 | b->rep_.push_back(static_cast<char>(kTypeDeletion)); |
488 | 21 | } else { |
489 | 21 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); |
490 | 21 | PutVarint32(&b->rep_, column_family_id); |
491 | 21 | } |
492 | 21 | PutLengthPrefixedSliceParts(&b->rep_, key); |
493 | 21 | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | |
494 | 21 | ContentFlags::HAS_DELETE, |
495 | 21 | std::memory_order_relaxed); |
496 | 21 | } |
497 | | |
498 | | void WriteBatch::Delete(ColumnFamilyHandle* column_family, |
499 | 21 | const SliceParts& key) { |
500 | 21 | WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); |
501 | 21 | } |
502 | | |
503 | | void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, |
504 | 214 | const Slice& key) { |
505 | 214 | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
506 | 214 | if (column_family_id == 0) { |
507 | 28 | b->rep_.push_back(static_cast<char>(kTypeSingleDeletion)); |
508 | 186 | } else { |
509 | 186 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion)); |
510 | 186 | PutVarint32(&b->rep_, column_family_id); |
511 | 186 | } |
512 | 214 | PutLengthPrefixedSlice(&b->rep_, key); |
513 | 214 | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | |
514 | 214 | ContentFlags::HAS_SINGLE_DELETE, |
515 | 214 | std::memory_order_relaxed); |
516 | 214 | } |
517 | | |
518 | | void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, |
519 | 214 | const Slice& key) { |
520 | 214 | WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); |
521 | 214 | } |
522 | | |
523 | | void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, |
524 | 0 | const SliceParts& key) { |
525 | 0 | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
526 | 0 | if (column_family_id == 0) { |
527 | 0 | b->rep_.push_back(static_cast<char>(kTypeSingleDeletion)); |
528 | 0 | } else { |
529 | 0 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion)); |
530 | 0 | PutVarint32(&b->rep_, column_family_id); |
531 | 0 | } |
532 | 0 | PutLengthPrefixedSliceParts(&b->rep_, key); |
533 | 0 | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | |
534 | 0 | ContentFlags::HAS_SINGLE_DELETE, |
535 | 0 | std::memory_order_relaxed); |
536 | 0 | } |
537 | | |
538 | | void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, |
539 | 0 | const SliceParts& key) { |
540 | 0 | WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); |
541 | 0 | } |
542 | | |
543 | | void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, |
544 | 91.3k | const Slice& key, const Slice& value) { |
545 | 91.3k | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
546 | 91.3k | if (column_family_id == 0) { |
547 | 91.1k | b->rep_.push_back(static_cast<char>(kTypeMerge)); |
548 | 236 | } else { |
549 | 236 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge)); |
550 | 236 | PutVarint32(&b->rep_, column_family_id); |
551 | 236 | } |
552 | 91.3k | PutLengthPrefixedSlice(&b->rep_, key); |
553 | 91.3k | PutLengthPrefixedSlice(&b->rep_, value); |
554 | 91.3k | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | |
555 | 91.3k | ContentFlags::HAS_MERGE, |
556 | 91.3k | std::memory_order_relaxed); |
557 | 91.3k | } |
558 | | |
559 | | void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, |
560 | 90.6k | const Slice& value) { |
561 | 90.6k | WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value); |
562 | 90.6k | } |
563 | | |
564 | | void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, |
565 | | const SliceParts& key, |
566 | 0 | const SliceParts& value) { |
567 | 0 | WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); |
568 | 0 | if (column_family_id == 0) { |
569 | 0 | b->rep_.push_back(static_cast<char>(kTypeMerge)); |
570 | 0 | } else { |
571 | 0 | b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge)); |
572 | 0 | PutVarint32(&b->rep_, column_family_id); |
573 | 0 | } |
574 | 0 | PutLengthPrefixedSliceParts(&b->rep_, key); |
575 | 0 | PutLengthPrefixedSliceParts(&b->rep_, value); |
576 | 0 | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | |
577 | 0 | ContentFlags::HAS_MERGE, |
578 | 0 | std::memory_order_relaxed); |
579 | 0 | } |
580 | | |
581 | | void WriteBatch::Merge(ColumnFamilyHandle* column_family, |
582 | | const SliceParts& key, |
583 | 0 | const SliceParts& value) { |
584 | 0 | WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), |
585 | 0 | key, value); |
586 | 0 | } |
587 | | |
588 | 6 | void WriteBatch::PutLogData(const Slice& blob) { |
589 | 6 | rep_.push_back(static_cast<char>(kTypeLogData)); |
590 | 6 | PutLengthPrefixedSlice(&rep_, blob); |
591 | 6 | } |
592 | | |
593 | 44 | void WriteBatch::SetSavePoint() { |
594 | 44 | if (save_points_ == nullptr) { |
595 | 15 | save_points_.reset(new SavePoints()); |
596 | 15 | } |
597 | | // Record length and count of current batch of writes. |
598 | 44 | save_points_->stack.push( |
599 | 44 | {GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed), frontiers_}); |
600 | 44 | } |
601 | | |
602 | 55 | Status WriteBatch::RollbackToSavePoint() { |
603 | 55 | if (save_points_ == nullptr || save_points_->stack.size() == 0) { |
604 | 16 | return STATUS(NotFound, ""); |
605 | 16 | } |
606 | | |
607 | | // Pop the most recent savepoint off the stack |
608 | 39 | SavePoint savepoint = save_points_->stack.top(); |
609 | 39 | save_points_->stack.pop(); |
610 | | |
611 | 39 | DCHECK_LE(savepoint.size, rep_.size()); |
612 | 39 | DCHECK_LE(savepoint.count, Count()); |
613 | | |
614 | 39 | if (savepoint.size == rep_.size()) { |
615 | | // No changes to rollback |
616 | 25 | } else if (savepoint.size == 0) { |
617 | | // Rollback everything |
618 | 0 | Clear(); |
619 | 25 | } else { |
620 | 25 | rep_.resize(savepoint.size); |
621 | 25 | WriteBatchInternal::SetCount(this, savepoint.count); |
622 | 25 | content_flags_.store(savepoint.content_flags, std::memory_order_relaxed); |
623 | 25 | } |
624 | 39 | frontiers_ = savepoint.frontiers; |
625 | | |
626 | 39 | return Status::OK(); |
627 | 39 | } |
628 | | |
629 | | namespace { |
630 | | |
631 | | YB_STRONGLY_TYPED_BOOL(InMemoryErase); |
632 | | |
633 | | class MemTableInserter : public WriteBatch::Handler { |
634 | | public: |
635 | | SequenceNumber sequence_; |
636 | | ColumnFamilyMemTables* const cf_mems_; |
637 | | FlushScheduler* const flush_scheduler_; |
638 | | const bool ignore_missing_column_families_; |
639 | | const uint64_t log_number_; |
640 | | DBImpl* db_; |
641 | | const InsertFlags insert_flags_; |
642 | | |
643 | | // cf_mems should not be shared with concurrent inserters |
644 | | MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, |
645 | | FlushScheduler* flush_scheduler, |
646 | | bool ignore_missing_column_families, uint64_t log_number, |
647 | | DB* db, InsertFlags insert_flags) |
648 | | : sequence_(sequence), |
649 | | cf_mems_(cf_mems), |
650 | | flush_scheduler_(flush_scheduler), |
651 | | ignore_missing_column_families_(ignore_missing_column_families), |
652 | | log_number_(log_number), |
653 | | db_(reinterpret_cast<DBImpl*>(db)), |
654 | 24.9M | insert_flags_(insert_flags) { |
655 | 24.9M | assert(cf_mems_); |
656 | 24.9M | if (insert_flags_.Test(InsertFlag::kFilterDeletes)) { |
657 | 21.9M | assert(db_); |
658 | 21.9M | } |
659 | 24.9M | } |
660 | | |
661 | 42.9M | bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { |
662 | | // If we are in a concurrent mode, it is the caller's responsibility |
663 | | // to clone the original ColumnFamilyMemTables so that each thread |
664 | | // has its own instance. Otherwise, it must be guaranteed that there |
665 | | // is no concurrent access |
666 | 42.9M | bool found = cf_mems_->Seek(column_family_id); |
667 | 42.9M | if (!found) { |
668 | 7 | if (ignore_missing_column_families_) { |
669 | 6 | *s = Status::OK(); |
670 | 1 | } else { |
671 | 1 | *s = STATUS(InvalidArgument, |
672 | 1 | "Invalid column family specified in write batch"); |
673 | 1 | } |
674 | 7 | return false; |
675 | 7 | } |
676 | 42.9M | if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) { |
677 | | // This is true only in recovery environment (log_number_ is always 0 in |
678 | | // non-recovery, regular write code-path) |
679 | | // * If log_number_ < cf_mems_->GetLogNumber(), this means that column |
680 | | // family already contains updates from this log. We can't apply updates |
681 | | // twice because of update-in-place or merge workloads -- ignore the |
682 | | // update |
683 | 19.0k | *s = Status::OK(); |
684 | 19.0k | return false; |
685 | 19.0k | } |
686 | 42.9M | return true; |
687 | 42.9M | } |
688 | | |
689 | | CHECKED_STATUS PutCF( |
690 | 34.6M | uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override { |
691 | 34.6M | Status seek_status; |
692 | 34.6M | if (!SeekToColumnFamily(column_family_id, &seek_status)) { |
693 | 19.0k | ++sequence_; |
694 | 19.0k | return seek_status; |
695 | 19.0k | } |
696 | 34.6M | MemTable* mem = cf_mems_->GetMemTable(); |
697 | 34.6M | auto* moptions = mem->GetMemTableOptions(); |
698 | 34.6M | if (!moptions->inplace_update_support) { |
699 | 34.6M | mem->Add(CurrentSequenceNumber(), kTypeValue, key, value, |
700 | 34.6M | insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); |
701 | 2.71k | } else if (moptions->inplace_callback == nullptr) { |
702 | 100 | assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); |
703 | 100 | mem->Update(CurrentSequenceNumber(), key.TheOnlyPart(), value.TheOnlyPart()); |
704 | 100 | RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED); |
705 | 2.61k | } else { |
706 | 2.61k | assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); |
707 | 2.61k | SequenceNumber current_seq = CurrentSequenceNumber(); |
708 | 2.61k | if (mem->UpdateCallback(current_seq, key.TheOnlyPart(), value.TheOnlyPart())) { |
709 | 1.19k | } else { |
710 | | // key not found in memtable. Do sst get, update, add |
711 | 1.19k | SnapshotImpl read_from_snapshot; |
712 | 1.19k | read_from_snapshot.number_ = current_seq; |
713 | 1.19k | ReadOptions ropts; |
714 | 1.19k | ropts.snapshot = &read_from_snapshot; |
715 | | |
716 | 1.19k | std::string prev_value; |
717 | 1.19k | std::string merged_value; |
718 | | |
719 | 1.19k | auto cf_handle = cf_mems_->GetColumnFamilyHandle(); |
720 | 1.19k | if (cf_handle == nullptr) { |
721 | 0 | cf_handle = db_->DefaultColumnFamily(); |
722 | 0 | } |
723 | 1.19k | Status s = db_->Get(ropts, cf_handle, key.TheOnlyPart(), &prev_value); |
724 | | |
725 | 1.19k | char* prev_buffer = const_cast<char*>(prev_value.c_str()); |
726 | 1.19k | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
727 | 1.19k | auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr, |
728 | 1.19k | s.ok() ? &prev_size : nullptr, |
729 | 1.19k | value.TheOnlyPart(), &merged_value); |
730 | 1.19k | if (status == UpdateStatus::UPDATED_INPLACE) { |
731 | | // prev_value is updated in-place with final value. |
732 | 0 | Slice new_value(prev_buffer, prev_size); |
733 | 0 | mem->Add(current_seq, kTypeValue, key, SliceParts(&new_value, 1)); |
734 | 0 | RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); |
735 | 1.19k | } else if (status == UpdateStatus::UPDATED) { |
736 | | // merged_value contains the final value. |
737 | 15 | Slice new_value(merged_value); |
738 | 15 | mem->Add(current_seq, kTypeValue, key, SliceParts(&new_value, 1)); |
739 | 15 | RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); |
740 | 15 | } |
741 | 1.19k | } |
742 | 2.61k | } |
743 | | // Since all Puts are logged in transaction logs (if enabled), always bump |
744 | | // sequence number. Even if the update eventually fails and does not result |
745 | | // in memtable add/update. |
746 | 34.6M | sequence_++; |
747 | 34.6M | CheckMemtableFull(); |
748 | 34.6M | return Status::OK(); |
749 | 34.6M | } |
750 | | |
751 | | CHECKED_STATUS DeleteImpl(uint32_t column_family_id, const Slice& key, |
752 | 1.87M | ValueType delete_type) { |
753 | 1.87M | Status seek_status; |
754 | 1.87M | if (!SeekToColumnFamily(column_family_id, &seek_status)) { |
755 | 0 | ++sequence_; |
756 | 0 | return seek_status; |
757 | 0 | } |
758 | 1.87M | MemTable* mem = cf_mems_->GetMemTable(); |
759 | 1.87M | if ((delete_type == ValueType::kTypeSingleDeletion || |
760 | 1.87M | delete_type == ValueType::kTypeColumnFamilySingleDeletion) && |
761 | 194 | mem->Erase(key)) { |
762 | 0 | return Status::OK(); |
763 | 0 | } |
764 | 1.87M | auto* moptions = mem->GetMemTableOptions(); |
765 | 1.87M | if (insert_flags_.Test(InsertFlag::kFilterDeletes) && moptions->filter_deletes) { |
766 | 145 | assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); |
767 | 145 | SnapshotImpl read_from_snapshot; |
768 | 145 | read_from_snapshot.number_ = sequence_; |
769 | 145 | ReadOptions ropts; |
770 | 145 | ropts.snapshot = &read_from_snapshot; |
771 | 145 | std::string value; |
772 | 145 | auto cf_handle = cf_mems_->GetColumnFamilyHandle(); |
773 | 145 | if (cf_handle == nullptr) { |
774 | 0 | cf_handle = db_->DefaultColumnFamily(); |
775 | 0 | } |
776 | 145 | if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { |
777 | 11 | RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); |
778 | 11 | return Status::OK(); |
779 | 11 | } |
780 | 1.87M | } |
781 | 1.87M | mem->Add(CurrentSequenceNumber(), delete_type, SliceParts(&key, 1), SliceParts(), |
782 | 1.87M | insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); |
783 | 1.87M | sequence_++; |
784 | 1.87M | CheckMemtableFull(); |
785 | 1.87M | return Status::OK(); |
786 | 1.87M | } |
787 | | |
788 | | virtual CHECKED_STATUS DeleteCF(uint32_t column_family_id, |
789 | 1.87M | const Slice& key) override { |
790 | 1.87M | return DeleteImpl(column_family_id, key, kTypeDeletion); |
791 | 1.87M | } |
792 | | |
793 | | virtual CHECKED_STATUS SingleDeleteCF(uint32_t column_family_id, |
794 | 194 | const Slice& key) override { |
795 | 194 | return DeleteImpl(column_family_id, key, kTypeSingleDeletion); |
796 | 194 | } |
797 | | |
798 | | virtual CHECKED_STATUS MergeCF(uint32_t column_family_id, const Slice& key, |
799 | 93.5k | const Slice& value) override { |
800 | 93.5k | assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); |
801 | 93.5k | Status seek_status; |
802 | 93.5k | if (!SeekToColumnFamily(column_family_id, &seek_status)) { |
803 | 0 | ++sequence_; |
804 | 0 | return seek_status; |
805 | 0 | } |
806 | 93.5k | MemTable* mem = cf_mems_->GetMemTable(); |
807 | 93.5k | auto* moptions = mem->GetMemTableOptions(); |
808 | 93.5k | bool perform_merge = false; |
809 | | |
810 | 93.5k | SequenceNumber current_seq = CurrentSequenceNumber(); |
811 | 93.5k | if (moptions->max_successive_merges > 0 && db_ != nullptr) { |
812 | 134 | LookupKey lkey(key, current_seq); |
813 | | |
814 | | // Count the number of successive merges at the head |
815 | | // of the key in the memtable |
816 | 134 | size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); |
817 | | |
818 | 134 | if (num_merges >= moptions->max_successive_merges) { |
819 | 20 | perform_merge = true; |
820 | 20 | } |
821 | 134 | } |
822 | | |
823 | 93.5k | if (perform_merge) { |
824 | | // 1) Get the existing value |
825 | 20 | std::string get_value; |
826 | | |
827 | | // Pass in the sequence number so that we also include previous merge |
828 | | // operations in the same batch. |
829 | 20 | SnapshotImpl read_from_snapshot; |
830 | 20 | read_from_snapshot.number_ = current_seq; |
831 | 20 | ReadOptions read_options; |
832 | 20 | read_options.snapshot = &read_from_snapshot; |
833 | | |
834 | 20 | auto cf_handle = cf_mems_->GetColumnFamilyHandle(); |
835 | 20 | if (cf_handle == nullptr) { |
836 | 0 | cf_handle = db_->DefaultColumnFamily(); |
837 | 0 | } |
838 | 20 | RETURN_NOT_OK(db_->Get(read_options, cf_handle, key, &get_value)); |
839 | 20 | Slice get_value_slice = Slice(get_value); |
840 | | |
841 | | // 2) Apply this merge |
842 | 20 | auto merge_operator = moptions->merge_operator; |
843 | 20 | assert(merge_operator); |
844 | | |
845 | 20 | std::deque<std::string> operands; |
846 | 20 | operands.push_front(value.ToString()); |
847 | 20 | std::string new_value; |
848 | 20 | bool merge_success = false; |
849 | 20 | { |
850 | 20 | StopWatchNano timer(Env::Default(), moptions->statistics != nullptr); |
851 | 20 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
852 | 20 | merge_success = merge_operator->FullMerge( |
853 | 20 | key, &get_value_slice, operands, &new_value, moptions->info_log); |
854 | 20 | RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME, |
855 | 20 | timer.ElapsedNanos()); |
856 | 20 | } |
857 | | |
858 | 20 | if (!merge_success) { |
859 | | // Failed to merge! |
860 | 0 | RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES); |
861 | | |
862 | | // Store the delta in memtable |
863 | 0 | perform_merge = false; |
864 | 20 | } else { |
865 | | // 3) Add value to memtable |
866 | 20 | Slice value_slice(new_value); |
867 | 20 | mem->Add(current_seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value_slice, 1)); |
868 | 20 | } |
869 | 20 | } |
870 | | |
871 | 93.5k | if (!perform_merge) { |
872 | | // Add merge operator to memtable |
873 | 93.5k | mem->Add(current_seq, kTypeMerge, SliceParts(&key, 1), SliceParts(&value, 1)); |
874 | 93.5k | } |
875 | | |
876 | 93.5k | sequence_++; |
877 | 93.5k | CheckMemtableFull(); |
878 | 93.5k | return Status::OK(); |
879 | 93.5k | } |
880 | | |
881 | 6.39M | CHECKED_STATUS Frontiers(const UserFrontiers& frontiers) override { |
882 | 6.39M | Status seek_status; |
883 | 6.39M | if (!SeekToColumnFamily(0, &seek_status)) { |
884 | 0 | return seek_status; |
885 | 0 | } |
886 | 6.39M | cf_mems_->GetMemTable()->UpdateFrontiers(frontiers); |
887 | 6.39M | return Status::OK(); |
888 | 6.39M | } |
889 | | |
890 | 42.9M | void CheckMemtableFull() { |
891 | 42.9M | if (flush_scheduler_ != nullptr) { |
892 | 42.9M | auto* cfd = cf_mems_->current(); |
893 | 42.9M | assert(cfd != nullptr); |
894 | 42.9M | if (cfd->mem()->ShouldScheduleFlush() && |
895 | 11.8k | cfd->mem()->MarkFlushScheduled()) { |
896 | | // MarkFlushScheduled only returns true if we are the one that |
897 | | // should take action, so no need to dedup further |
898 | 11.8k | flush_scheduler_->ScheduleFlush(cfd); |
899 | 11.8k | } |
900 | 42.9M | } |
901 | 42.9M | } |
902 | | |
903 | | private: |
904 | 36.5M | SequenceNumber CurrentSequenceNumber() { |
905 | 36.5M | return sequence_; |
906 | 36.5M | } |
907 | | }; |
908 | | |
909 | | } // namespace |
910 | | |
911 | | // This function can only be called in these conditions: |
912 | | // 1) During Recovery() |
913 | | // 2) During Write(), in a single-threaded write thread |
914 | | // 3) During Write(), in a concurrent context where memtables has been cloned |
915 | | // The reason is that it calls memtables->Seek(), which has a stateful cache |
916 | | Status WriteBatchInternal::InsertInto( |
917 | | const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence, |
918 | | ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, |
919 | | bool ignore_missing_column_families, uint64_t log_number, DB* db, |
920 | 21.9M | InsertFlags insert_flags) { |
921 | 21.9M | MemTableInserter inserter(sequence, memtables, flush_scheduler, |
922 | 21.9M | ignore_missing_column_families, log_number, db, |
923 | 21.9M | insert_flags); |
924 | | |
925 | 45.0M | for (size_t i = 0; i < writers.size(); i++) { |
926 | 23.1M | if (!writers[i]->CallbackFailed()) { |
927 | 23.1M | writers[i]->status = writers[i]->batch->Iterate(&inserter); |
928 | 23.1M | if (!writers[i]->status.ok()) { |
929 | 1 | return writers[i]->status; |
930 | 1 | } |
931 | 23.1M | } |
932 | 23.1M | } |
933 | 21.9M | return Status::OK(); |
934 | 21.9M | } |
935 | | |
936 | | Status WriteBatchInternal::InsertInto(const WriteBatch* batch, |
937 | | ColumnFamilyMemTables* memtables, |
938 | | FlushScheduler* flush_scheduler, |
939 | | bool ignore_missing_column_families, |
940 | | uint64_t log_number, DB* db, |
941 | 3.00M | InsertFlags insert_flags) { |
942 | 3.00M | MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, |
943 | 3.00M | flush_scheduler, ignore_missing_column_families, |
944 | 3.00M | log_number, db, insert_flags); |
945 | 3.00M | return batch->Iterate(&inserter); |
946 | 3.00M | } |
947 | | |
948 | 2.96M | void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { |
949 | 2.96M | DCHECK_GE(contents.size(), kHeader); |
950 | 2.96M | b->rep_.assign(contents.cdata(), contents.size()); |
951 | 2.96M | b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); |
952 | 2.96M | } |
953 | | |
954 | 1.62M | void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { |
955 | 1.62M | SetCount(dst, Count(dst) + Count(src)); |
956 | 1.62M | DCHECK_GE(src->rep_.size(), kHeader); |
957 | 1.62M | dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); |
958 | 1.62M | dst->content_flags_.store( |
959 | 1.62M | dst->content_flags_.load(std::memory_order_relaxed) | |
960 | 1.62M | src->content_flags_.load(std::memory_order_relaxed), |
961 | 1.62M | std::memory_order_relaxed); |
962 | 1.62M | } |
963 | | |
964 | | size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, |
965 | 23.2M | size_t rightByteSize) { |
966 | 23.2M | if (leftByteSize == 0 || rightByteSize == 0) { |
967 | 21.9M | return leftByteSize + rightByteSize; |
968 | 1.28M | } else { |
969 | 1.28M | return leftByteSize + rightByteSize - kHeader; |
970 | 1.28M | } |
971 | 23.2M | } |
972 | | |
973 | 6.39M | Result<size_t> DirectInsert(WriteBatch::Handler* handler, DirectWriter* writer) { |
974 | 6.39M | auto mem_table_inserter = down_cast<MemTableInserter*>(handler); |
975 | 6.39M | auto* mems = mem_table_inserter->cf_mems_; |
976 | 6.39M | auto current = mems->current(); |
977 | 6.39M | if (!current) { |
978 | 0 | mems->Seek(0); |
979 | 0 | current = mems->current(); |
980 | 0 | } |
981 | 6.39M | DirectWriteHandlerImpl direct_write_handler( |
982 | 6.39M | current->mem(), mem_table_inserter->sequence_); |
983 | 6.39M | RETURN_NOT_OK(writer->Apply(&direct_write_handler)); |
984 | 6.39M | auto result = direct_write_handler.Complete(); |
985 | 6.39M | mem_table_inserter->CheckMemtableFull(); |
986 | 6.39M | return result; |
987 | 6.39M | } |
988 | | |
989 | | } // namespace rocksdb |