YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_batch.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
//
24
// WriteBatch::rep_ :=
25
//    sequence: fixed64
26
//    count: fixed32
27
//    data: record[count]
28
// record :=
29
//    kTypeValue varstring varstring
30
//    kTypeDeletion varstring
31
//    kTypeSingleDeletion varstring
32
//    kTypeMerge varstring varstring
33
//    kTypeColumnFamilyValue varint32 varstring varstring
34
//    kTypeColumnFamilyDeletion varint32 varstring varstring
35
//    kTypeColumnFamilySingleDeletion varint32 varstring varstring
36
//    kTypeColumnFamilyMerge varint32 varstring varstring
37
// varstring :=
38
//    len: varint32
39
//    data: uint8[len]
40
//
41
// YugaByte-specific extensions stored out-of-band:
42
//   user_sequence_numbers_
43
44
#include <stack>
45
#include <stdexcept>
46
#include <vector>
47
48
#include "yb/rocksdb/db/column_family.h"
49
#include "yb/rocksdb/db/db_impl.h"
50
#include "yb/rocksdb/db/dbformat.h"
51
#include "yb/rocksdb/db/flush_scheduler.h"
52
#include "yb/rocksdb/db/memtable.h"
53
#include "yb/rocksdb/db/snapshot_impl.h"
54
#include "yb/rocksdb/db/write_batch_internal.h"
55
#include "yb/rocksdb/merge_operator.h"
56
#include "yb/rocksdb/util/coding.h"
57
#include "yb/rocksdb/util/perf_context_imp.h"
58
#include "yb/rocksdb/util/statistics.h"
59
60
#include "yb/util/stats/perf_step_timer.h"
61
62
namespace rocksdb {
63
64
// anon namespace for file-local types
65
namespace {
66
67
enum ContentFlags : uint32_t {
68
  DEFERRED = 1,
69
  HAS_PUT = 2,
70
  HAS_DELETE = 4,
71
  HAS_SINGLE_DELETE = 8,
72
  HAS_MERGE = 16,
73
  HAS_FRONTIERS = 32,
74
};
75
76
struct BatchContentClassifier : public WriteBatch::Handler {
77
  uint32_t content_flags = 0;
78
79
1
  CHECKED_STATUS PutCF(uint32_t, const SliceParts&, const SliceParts&) override {
80
1
    content_flags |= ContentFlags::HAS_PUT;
81
1
    return Status::OK();
82
1
  }
83
84
0
  CHECKED_STATUS DeleteCF(uint32_t, const Slice&) override {
85
0
    content_flags |= ContentFlags::HAS_DELETE;
86
0
    return Status::OK();
87
0
  }
88
89
0
  CHECKED_STATUS SingleDeleteCF(uint32_t, const Slice&) override {
90
0
    content_flags |= ContentFlags::HAS_SINGLE_DELETE;
91
0
    return Status::OK();
92
0
  }
93
94
0
  CHECKED_STATUS MergeCF(uint32_t, const Slice&, const Slice&) override {
95
0
    content_flags |= ContentFlags::HAS_MERGE;
96
0
    return Status::OK();
97
0
  }
98
99
0
  CHECKED_STATUS Frontiers(const UserFrontiers& range) override {
100
0
    content_flags |= ContentFlags::HAS_FRONTIERS;
101
0
    return Status::OK();
102
0
  }
103
};
104
105
class DirectWriteHandlerImpl : public DirectWriteHandler {
106
 public:
107
  explicit DirectWriteHandlerImpl(MemTable* mem_table, SequenceNumber seq)
108
11.6M
      : mem_table_(mem_table), seq_(seq) {}
109
110
353M
  void Put(const SliceParts& key, const SliceParts& value) override {
111
353M
    Add(ValueType::kTypeValue, key, value);
112
353M
  }
113
114
144M
  void SingleDelete(const Slice& key) override {
115
144M
    if (mem_table_->Erase(key)) {
116
136M
      return;
117
136M
    }
118
7.95M
    Add(ValueType::kTypeSingleDeletion, SliceParts(&key, 1), SliceParts());
119
7.95M
  }
120
121
11.6M
  size_t Complete() {
122
11.6M
    if (keys_.empty()) {
123
2.16M
      return 0;
124
2.16M
    }
125
9.47M
    auto compare =
126
3.55G
        [comparator = &mem_table_->GetInternalKeyComparator()](KeyHandle lhs, KeyHandle rhs) {
127
3.55G
      auto lhs_slice = GetLengthPrefixedSlice(static_cast<const char*>(lhs));
128
3.55G
      auto rhs_slice = GetLengthPrefixedSlice(static_cast<const char*>(rhs));
129
3.55G
      return comparator->Compare(lhs_slice, rhs_slice) < 0;
130
3.55G
    };
131
9.47M
    std::sort(keys_.begin(), keys_.end(), compare);
132
9.47M
    mem_table_->ApplyPreparedAdd(keys_.data(), keys_.size(), prepared_add_, false);
133
9.47M
    return keys_.size();
134
11.6M
  }
135
136
 private:
137
361M
  void Add(ValueType value_type, const SliceParts& key, const SliceParts& value) {
138
361M
    keys_.push_back(
139
361M
        mem_table_->PrepareAdd(seq_++, value_type, key, value, &prepared_add_));
140
361M
  }
141
142
  MemTable* mem_table_;
143
  SequenceNumber seq_;
144
  PreparedAdd prepared_add_;
145
  boost::container::small_vector<KeyHandle, 128> keys_;
146
};
147
148
}  // anon namespace
149
150
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
151
static const size_t kHeader = 12;
152
153
struct SavePoint {
154
  size_t size;  // size of rep_
155
  uint32_t count;    // count of elements in rep_
156
  uint32_t content_flags;
157
  const UserFrontiers* frontiers;
158
};
159
160
struct SavePoints {
161
  std::stack<SavePoint> stack;
162
};
163
164
WriteBatch::WriteBatch(size_t reserved_bytes)
165
40.1M
    : content_flags_(0) {
166
40.1M
  rep_.reserve(std::max(reserved_bytes, kHeader));
167
40.1M
  rep_.resize(kHeader);
168
40.1M
}
169
170
WriteBatch::WriteBatch(const std::string& rep)
171
    : content_flags_(ContentFlags::DEFERRED),
172
0
      rep_(rep) {}
173
174
WriteBatch::WriteBatch(const WriteBatch& src)
175
    : content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
176
      rep_(src.rep_),
177
79
      frontiers_(src.frontiers_) {
178
79
  if (src.save_points_) {
179
0
    save_points_.reset(new SavePoints(*src.save_points_));
180
0
  }
181
79
}
182
183
WriteBatch::WriteBatch(WriteBatch&& src)
184
    : save_points_(std::move(src.save_points_)),
185
      content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
186
      rep_(std::move(src.rep_)),
187
3
      frontiers_(std::move(src.frontiers_)) {}
188
189
3
WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
190
3
  if (&src != this) {
191
3
    this->~WriteBatch();
192
3
    new (this) WriteBatch(src);
193
3
  }
194
3
  return *this;
195
3
}
196
197
1
WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
198
1
  if (&src != this) {
199
1
    this->~WriteBatch();
200
1
    new (this) WriteBatch(std::move(src));
201
1
  }
202
1
  return *this;
203
1
}
204
205
40.0M
WriteBatch::~WriteBatch() {}
206
207
31.2M
WriteBatch::Handler::~Handler() { }
208
209
6
void WriteBatch::Handler::LogData(const Slice& blob) {
210
  // If the user has not specified something to do with blobs, then we ignore
211
  // them.
212
6
}
213
214
38.1M
bool WriteBatch::Handler::Continue() {
215
38.1M
  return true;
216
38.1M
}
217
218
686k
void WriteBatch::Clear() {
219
686k
  rep_.clear();
220
686k
  rep_.resize(kHeader);
221
222
686k
  content_flags_.store(0, std::memory_order_relaxed);
223
224
686k
  if (save_points_ != nullptr) {
225
16
    while (!save_points_->stack.empty()) {
226
3
      save_points_->stack.pop();
227
3
    }
228
13
  }
229
230
686k
  frontiers_ = nullptr;
231
686k
}
232
233
12.6M
uint32_t WriteBatch::Count() const {
234
12.6M
  return WriteBatchInternal::Count(this);
235
12.6M
}
236
237
56.1k
uint32_t WriteBatch::ComputeContentFlags() const {
238
56.1k
  auto rv = content_flags_.load(std::memory_order_relaxed);
239
56.1k
  if ((rv & ContentFlags::DEFERRED) != 0) {
240
1
    BatchContentClassifier classifier;
241
1
    auto status = Iterate(&classifier);
242
1
    LOG_IF(ERROR, !status.ok()) << "Iterate failed during ComputeContentFlags: " << status;
243
1
    rv = classifier.content_flags;
244
245
    // this method is conceptually const, because it is performing a lazy
246
    // computation that doesn't affect the abstract state of the batch.
247
    // content_flags_ is marked mutable so that we can perform the
248
    // following assignment
249
1
    content_flags_.store(rv, std::memory_order_relaxed);
250
1
  }
251
56.1k
  return rv;
252
56.1k
}
253
254
30
bool WriteBatch::HasPut() const {
255
30
  return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
256
30
}
257
258
30
bool WriteBatch::HasDelete() const {
259
30
  return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
260
30
}
261
262
30
bool WriteBatch::HasSingleDelete() const {
263
30
  return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
264
30
}
265
266
56.0k
bool WriteBatch::HasMerge() const {
267
56.0k
  return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
268
56.0k
}
269
270
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
271
                                uint32_t* column_family, Slice* key,
272
84.1M
                                Slice* value, Slice* blob) {
273
84.1M
  assert(key != nullptr && value != nullptr);
274
0
  *tag = (*input)[0];
275
84.1M
  input->remove_prefix(1);
276
84.1M
  *column_family = 0;  // default
277
84.1M
  switch (*tag) {
278
21.9M
    case kTypeColumnFamilyValue:
279
21.9M
      if (!GetVarint32(input, column_family)) {
280
0
        return STATUS(Corruption, "bad WriteBatch Put");
281
0
      }
282
21.9M
      FALLTHROUGH_INTENDED;
283
58.1M
    case kTypeValue:
284
58.1M
      if (!GetLengthPrefixedSlice(input, key) ||
285
58.1M
          !GetLengthPrefixedSlice(input, value)) {
286
0
        return STATUS(Corruption, "bad WriteBatch Put");
287
0
      }
288
58.1M
      break;
289
58.1M
    case kTypeColumnFamilyDeletion:
290
82.7k
    case kTypeColumnFamilySingleDeletion:
291
82.7k
      if (!GetVarint32(input, column_family)) {
292
0
        return STATUS(Corruption, "bad WriteBatch Delete");
293
0
      }
294
82.7k
      FALLTHROUGH_INTENDED;
295
25.9M
    case kTypeDeletion:
296
25.9M
    case kTypeSingleDeletion:
297
25.9M
      if (!GetLengthPrefixedSlice(input, key)) {
298
2
        return STATUS(Corruption, "bad WriteBatch Delete");
299
2
      }
300
25.9M
      break;
301
25.9M
    case kTypeColumnFamilyMerge:
302
488
      if (!GetVarint32(input, column_family)) {
303
0
        return STATUS(Corruption, "bad WriteBatch Merge");
304
0
      }
305
488
      FALLTHROUGH_INTENDED;
306
96.1k
    case kTypeMerge:
307
96.1k
      if (!GetLengthPrefixedSlice(input, key) ||
308
96.1k
          !GetLengthPrefixedSlice(input, value)) {
309
0
        return STATUS(Corruption, "bad WriteBatch Merge");
310
0
      }
311
96.1k
      break;
312
96.1k
    case kTypeLogData:
313
11
      assert(blob != nullptr);
314
11
      if (!GetLengthPrefixedSlice(input, blob)) {
315
0
        return STATUS(Corruption, "bad WriteBatch Blob");
316
0
      }
317
11
      break;
318
11
    default:
319
0
      return STATUS(Corruption, "unknown WriteBatch tag");
320
84.1M
  }
321
84.1M
  return Status::OK();
322
84.1M
}
323
324
Result<size_t> DirectInsert(WriteBatch::Handler* handler, DirectWriter* writer);
325
326
32.4M
Status WriteBatch::Iterate(Handler* handler) const {
327
32.4M
  Slice input(rep_);
328
32.4M
  if (input.size() < kHeader) {
329
0
    return STATUS(Corruption, "malformed WriteBatch (too small)");
330
0
  }
331
332
32.4M
  input.remove_prefix(kHeader);
333
32.4M
  Slice key, value, blob;
334
32.4M
  size_t found = 0;
335
32.4M
  Status s;
336
337
32.4M
  if (frontiers_) {
338
11.6M
    s = handler->Frontiers(*frontiers_);
339
11.6M
  }
340
32.4M
  if (s.ok() && 
direct_writer_32.4M
) {
341
11.6M
    auto result = DirectInsert(handler, direct_writer_);
342
11.6M
    if (
result.ok()11.6M
) {
343
11.6M
      direct_entries_ = *result;
344
18.4E
    } else {
345
18.4E
      s = result.status();
346
18.4E
    }
347
11.6M
  }
348
70.6M
  while (
s.ok()70.6M
&& !input.empty() &&
handler->Continue()38.1M
) {
349
38.1M
    char tag = 0;
350
38.1M
    uint32_t column_family = 0;  // default
351
352
38.1M
    s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
353
38.1M
                                 &blob);
354
38.1M
    if (!s.ok()) {
355
2
      return s;
356
2
    }
357
358
38.1M
    switch (tag) {
359
21.8M
      case kTypeColumnFamilyValue:
360
36.0M
      case kTypeValue:
361
36.0M
        assert(content_flags_.load(std::memory_order_relaxed) &
362
36.0M
               (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
363
0
        s = handler->PutCF(column_family, SliceParts(&key, 1), SliceParts(&value, 1));
364
36.0M
        found++;
365
36.0M
        break;
366
777
      case kTypeColumnFamilyDeletion:
367
2.04M
      case kTypeDeletion:
368
2.04M
        assert(content_flags_.load(std::memory_order_relaxed) &
369
2.04M
               (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
370
0
        s = handler->DeleteCF(column_family, key);
371
2.04M
        found++;
372
2.04M
        break;
373
185
      case kTypeColumnFamilySingleDeletion:
374
199
      case kTypeSingleDeletion:
375
199
        assert(content_flags_.load(std::memory_order_relaxed) &
376
199
               (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
377
0
        s = handler->SingleDeleteCF(column_family, key);
378
199
        found++;
379
199
        break;
380
342
      case kTypeColumnFamilyMerge:
381
94.2k
      case kTypeMerge:
382
94.2k
        assert(content_flags_.load(std::memory_order_relaxed) &
383
94.2k
               (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
384
0
        s = handler->MergeCF(column_family, key, value);
385
94.2k
        found++;
386
94.2k
        break;
387
11
      case kTypeLogData:
388
11
        handler->LogData(blob);
389
11
        break;
390
0
      default:
391
0
        return STATUS(Corruption, "unknown WriteBatch tag");
392
38.1M
    }
393
38.1M
  }
394
32.4M
  if (!s.ok()) {
395
1
    return s;
396
1
  }
397
32.4M
  if (found != WriteBatchInternal::Count(this)) {
398
1
    return STATUS(Corruption, "WriteBatch has wrong count");
399
32.4M
  } else {
400
32.4M
    return Status::OK();
401
32.4M
  }
402
32.4M
}
403
404
135M
uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
405
135M
  return DecodeFixed32(b->rep_.data() + 8);
406
135M
}
407
408
41.1M
void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
409
41.1M
  EncodeFixed32(&b->rep_[8], n);
410
41.1M
}
411
412
5.97M
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
413
5.97M
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
414
5.97M
}
415
416
18.7M
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
417
18.7M
  EncodeFixed64(&b->rep_[0], seq);
418
18.7M
}
419
420
16
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return kHeader; }
421
422
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
423
37.5M
                             const Slice& key, const Slice& value) {
424
37.5M
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
425
37.5M
  if (column_family_id == 0) {
426
15.7M
    b->rep_.push_back(static_cast<char>(kTypeValue));
427
21.7M
  } else {
428
21.7M
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
429
21.7M
    PutVarint32(&b->rep_, column_family_id);
430
21.7M
  }
431
37.5M
  PutLengthPrefixedSlice(&b->rep_, key);
432
37.5M
  PutLengthPrefixedSlice(&b->rep_, value);
433
37.5M
  b->content_flags_.store(
434
37.5M
      b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
435
37.5M
      std::memory_order_relaxed);
436
37.5M
}
437
438
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
439
37.5M
                     const Slice& value) {
440
37.5M
  WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
441
37.5M
}
442
443
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
444
2.54k
                             const SliceParts& key, const SliceParts& value) {
445
2.54k
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
446
2.54k
  if (column_family_id == 0) {
447
2.49k
    b->rep_.push_back(static_cast<char>(kTypeValue));
448
2.49k
  } else {
449
49
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
450
49
    PutVarint32(&b->rep_, column_family_id);
451
49
  }
452
2.54k
  PutLengthPrefixedSliceParts(&b->rep_, key);
453
2.54k
  PutLengthPrefixedSliceParts(&b->rep_, value);
454
2.54k
  b->content_flags_.store(
455
2.54k
      b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
456
2.54k
      std::memory_order_relaxed);
457
2.54k
}
458
459
void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
460
2.54k
                     const SliceParts& value) {
461
2.54k
  WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
462
2.54k
}
463
464
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
465
2.06M
                                const Slice& key) {
466
2.06M
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
467
2.06M
  if (column_family_id == 0) {
468
2.06M
    b->rep_.push_back(static_cast<char>(kTypeDeletion));
469
2.06M
  } else {
470
985
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
471
985
    PutVarint32(&b->rep_, column_family_id);
472
985
  }
473
2.06M
  PutLengthPrefixedSlice(&b->rep_, key);
474
2.06M
  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
475
2.06M
                              ContentFlags::HAS_DELETE,
476
2.06M
                          std::memory_order_relaxed);
477
2.06M
}
478
479
2.06M
void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
480
2.06M
  WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
481
2.06M
}
482
483
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
484
21
                                const SliceParts& key) {
485
21
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
486
21
  if (column_family_id == 0) {
487
0
    b->rep_.push_back(static_cast<char>(kTypeDeletion));
488
21
  } else {
489
21
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
490
21
    PutVarint32(&b->rep_, column_family_id);
491
21
  }
492
21
  PutLengthPrefixedSliceParts(&b->rep_, key);
493
21
  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
494
21
                              ContentFlags::HAS_DELETE,
495
21
                          std::memory_order_relaxed);
496
21
}
497
498
void WriteBatch::Delete(ColumnFamilyHandle* column_family,
499
21
                        const SliceParts& key) {
500
21
  WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
501
21
}
502
503
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
504
214
                                      const Slice& key) {
505
214
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
506
214
  if (column_family_id == 0) {
507
28
    b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
508
186
  } else {
509
186
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
510
186
    PutVarint32(&b->rep_, column_family_id);
511
186
  }
512
214
  PutLengthPrefixedSlice(&b->rep_, key);
513
214
  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
514
214
                              ContentFlags::HAS_SINGLE_DELETE,
515
214
                          std::memory_order_relaxed);
516
214
}
517
518
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
519
214
                              const Slice& key) {
520
214
  WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
521
214
}
522
523
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
524
0
                                      const SliceParts& key) {
525
0
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
526
0
  if (column_family_id == 0) {
527
0
    b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
528
0
  } else {
529
0
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
530
0
    PutVarint32(&b->rep_, column_family_id);
531
0
  }
532
0
  PutLengthPrefixedSliceParts(&b->rep_, key);
533
0
  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
534
0
                              ContentFlags::HAS_SINGLE_DELETE,
535
0
                          std::memory_order_relaxed);
536
0
}
537
538
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
539
0
                              const SliceParts& key) {
540
0
  WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
541
0
}
542
543
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
544
91.3k
                               const Slice& key, const Slice& value) {
545
91.3k
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
546
91.3k
  if (column_family_id == 0) {
547
91.1k
    b->rep_.push_back(static_cast<char>(kTypeMerge));
548
91.1k
  } else {
549
236
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
550
236
    PutVarint32(&b->rep_, column_family_id);
551
236
  }
552
91.3k
  PutLengthPrefixedSlice(&b->rep_, key);
553
91.3k
  PutLengthPrefixedSlice(&b->rep_, value);
554
91.3k
  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
555
91.3k
                              ContentFlags::HAS_MERGE,
556
91.3k
                          std::memory_order_relaxed);
557
91.3k
}
558
559
void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
560
90.6k
                       const Slice& value) {
561
90.6k
  WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value);
562
90.6k
}
563
564
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
565
                               const SliceParts& key,
566
0
                               const SliceParts& value) {
567
0
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
568
0
  if (column_family_id == 0) {
569
0
    b->rep_.push_back(static_cast<char>(kTypeMerge));
570
0
  } else {
571
0
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
572
0
    PutVarint32(&b->rep_, column_family_id);
573
0
  }
574
0
  PutLengthPrefixedSliceParts(&b->rep_, key);
575
0
  PutLengthPrefixedSliceParts(&b->rep_, value);
576
0
  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
577
0
                              ContentFlags::HAS_MERGE,
578
0
                          std::memory_order_relaxed);
579
0
}
580
581
void WriteBatch::Merge(ColumnFamilyHandle* column_family,
582
                       const SliceParts& key,
583
0
                       const SliceParts& value) {
584
0
  WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family),
585
0
                            key, value);
586
0
}
587
588
6
void WriteBatch::PutLogData(const Slice& blob) {
589
6
  rep_.push_back(static_cast<char>(kTypeLogData));
590
6
  PutLengthPrefixedSlice(&rep_, blob);
591
6
}
592
593
44
void WriteBatch::SetSavePoint() {
594
44
  if (save_points_ == nullptr) {
595
15
    save_points_.reset(new SavePoints());
596
15
  }
597
  // Record length and count of current batch of writes.
598
44
  save_points_->stack.push(
599
44
     {GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed), frontiers_});
600
44
}
601
602
55
Status WriteBatch::RollbackToSavePoint() {
603
55
  if (save_points_ == nullptr || 
save_points_->stack.size() == 051
) {
604
16
    return STATUS(NotFound, "");
605
16
  }
606
607
  // Pop the most recent savepoint off the stack
608
39
  SavePoint savepoint = save_points_->stack.top();
609
39
  save_points_->stack.pop();
610
611
39
  DCHECK_LE(savepoint.size, rep_.size());
612
39
  DCHECK_LE(savepoint.count, Count());
613
614
39
  if (savepoint.size == rep_.size()) {
615
    // No changes to rollback
616
25
  } else if (savepoint.size == 0) {
617
    // Rollback everything
618
0
    Clear();
619
25
  } else {
620
25
    rep_.resize(savepoint.size);
621
25
    WriteBatchInternal::SetCount(this, savepoint.count);
622
25
    content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
623
25
  }
624
39
  frontiers_ = savepoint.frontiers;
625
626
39
  return Status::OK();
627
55
}
628
629
namespace {
630
631
YB_STRONGLY_TYPED_BOOL(InMemoryErase);
632
633
class MemTableInserter : public WriteBatch::Handler {
634
 public:
635
  SequenceNumber sequence_;
636
  ColumnFamilyMemTables* const cf_mems_;
637
  FlushScheduler* const flush_scheduler_;
638
  const bool ignore_missing_column_families_;
639
  const uint64_t log_number_;
640
  DBImpl* db_;
641
  const InsertFlags insert_flags_;
642
643
  // cf_mems should not be shared with concurrent inserters
644
  MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
645
                   FlushScheduler* flush_scheduler,
646
                   bool ignore_missing_column_families, uint64_t log_number,
647
                   DB* db, InsertFlags insert_flags)
648
      : sequence_(sequence),
649
        cf_mems_(cf_mems),
650
        flush_scheduler_(flush_scheduler),
651
        ignore_missing_column_families_(ignore_missing_column_families),
652
        log_number_(log_number),
653
        db_(reinterpret_cast<DBImpl*>(db)),
654
30.9M
        insert_flags_(insert_flags) {
655
30.9M
    assert(cf_mems_);
656
30.9M
    if (insert_flags_.Test(InsertFlag::kFilterDeletes)) {
657
27.9M
      assert(db_);
658
27.9M
    }
659
30.9M
  }
660
661
49.4M
  bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
662
    // If we are in a concurrent mode, it is the caller's responsibility
663
    // to clone the original ColumnFamilyMemTables so that each thread
664
    // has its own instance.  Otherwise, it must be guaranteed that there
665
    // is no concurrent access
666
49.4M
    bool found = cf_mems_->Seek(column_family_id);
667
49.4M
    if (!found) {
668
7
      if (ignore_missing_column_families_) {
669
6
        *s = Status::OK();
670
6
      } else {
671
1
        *s = STATUS(InvalidArgument,
672
1
            "Invalid column family specified in write batch");
673
1
      }
674
7
      return false;
675
7
    }
676
49.4M
    if (log_number_ != 0 && 
log_number_ < cf_mems_->GetLogNumber()3.02M
) {
677
      // This is true only in recovery environment (log_number_ is always 0 in
678
      // non-recovery, regular write code-path)
679
      // * If log_number_ < cf_mems_->GetLogNumber(), this means that column
680
      // family already contains updates from this log. We can't apply updates
681
      // twice because of update-in-place or merge workloads -- ignore the
682
      // update
683
19.0k
      *s = Status::OK();
684
19.0k
      return false;
685
19.0k
    }
686
49.4M
    return true;
687
49.4M
  }
688
689
  CHECKED_STATUS PutCF(
690
35.8M
      uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override {
691
35.8M
    Status seek_status;
692
35.8M
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
693
19.0k
      ++sequence_;
694
19.0k
      return seek_status;
695
19.0k
    }
696
35.8M
    MemTable* mem = cf_mems_->GetMemTable();
697
35.8M
    auto* moptions = mem->GetMemTableOptions();
698
35.8M
    if (!moptions->inplace_update_support) {
699
35.8M
      mem->Add(CurrentSequenceNumber(), kTypeValue, key, value,
700
35.8M
               insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites));
701
35.8M
    } else 
if (2.13k
moptions->inplace_callback == nullptr2.13k
) {
702
100
      assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites));
703
0
      mem->Update(CurrentSequenceNumber(), key.TheOnlyPart(), value.TheOnlyPart());
704
100
      RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
705
2.03k
    } else {
706
2.03k
      assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites));
707
0
      SequenceNumber current_seq = CurrentSequenceNumber();
708
2.03k
      if (mem->UpdateCallback(current_seq, key.TheOnlyPart(), value.TheOnlyPart())) {
709
1.42k
      } else {
710
        // key not found in memtable. Do sst get, update, add
711
619
        SnapshotImpl read_from_snapshot;
712
619
        read_from_snapshot.number_ = current_seq;
713
619
        ReadOptions ropts;
714
619
        ropts.snapshot = &read_from_snapshot;
715
716
619
        std::string prev_value;
717
619
        std::string merged_value;
718
719
619
        auto cf_handle = cf_mems_->GetColumnFamilyHandle();
720
619
        if (cf_handle == nullptr) {
721
0
          cf_handle = db_->DefaultColumnFamily();
722
0
        }
723
619
        Status s = db_->Get(ropts, cf_handle, key.TheOnlyPart(), &prev_value);
724
725
619
        char* prev_buffer = const_cast<char*>(prev_value.c_str());
726
619
        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
727
619
        auto status = moptions->inplace_callback(s.ok() ? 
prev_buffer0
: nullptr,
728
619
                                                 s.ok() ? 
&prev_size0
: nullptr,
729
619
                                                 value.TheOnlyPart(), &merged_value);
730
619
        if (status == UpdateStatus::UPDATED_INPLACE) {
731
          // prev_value is updated in-place with final value.
732
0
          Slice new_value(prev_buffer, prev_size);
733
0
          mem->Add(current_seq, kTypeValue, key, SliceParts(&new_value, 1));
734
0
          RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
735
619
        } else if (status == UpdateStatus::UPDATED) {
736
          // merged_value contains the final value.
737
15
          Slice new_value(merged_value);
738
15
          mem->Add(current_seq, kTypeValue, key, SliceParts(&new_value, 1));
739
15
          RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
740
15
        }
741
619
      }
742
2.03k
    }
743
    // Since all Puts are logged in transaction logs (if enabled), always bump
744
    // sequence number. Even if the update eventually fails and does not result
745
    // in memtable add/update.
746
0
    sequence_++;
747
35.8M
    CheckMemtableFull();
748
35.8M
    return Status::OK();
749
35.8M
  }
750
751
  CHECKED_STATUS DeleteImpl(uint32_t column_family_id, const Slice& key,
752
1.87M
                            ValueType delete_type) {
753
1.87M
    Status seek_status;
754
1.87M
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
755
0
      ++sequence_;
756
0
      return seek_status;
757
0
    }
758
1.87M
    MemTable* mem = cf_mems_->GetMemTable();
759
1.87M
    if ((delete_type == ValueType::kTypeSingleDeletion ||
760
1.87M
         
delete_type == ValueType::kTypeColumnFamilySingleDeletion1.87M
) &&
761
1.87M
        
mem->Erase(key)194
) {
762
0
      return Status::OK();
763
0
    }
764
1.87M
    auto* moptions = mem->GetMemTableOptions();
765
1.87M
    if (insert_flags_.Test(InsertFlag::kFilterDeletes) && 
moptions->filter_deletes1.69M
) {
766
145
      assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites));
767
0
      SnapshotImpl read_from_snapshot;
768
145
      read_from_snapshot.number_ = sequence_;
769
145
      ReadOptions ropts;
770
145
      ropts.snapshot = &read_from_snapshot;
771
145
      std::string value;
772
145
      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
773
145
      if (cf_handle == nullptr) {
774
0
        cf_handle = db_->DefaultColumnFamily();
775
0
      }
776
145
      if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
777
11
        RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
778
11
        return Status::OK();
779
11
      }
780
145
    }
781
1.87M
    mem->Add(CurrentSequenceNumber(), delete_type, SliceParts(&key, 1), SliceParts(),
782
1.87M
             insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites));
783
1.87M
    sequence_++;
784
1.87M
    CheckMemtableFull();
785
1.87M
    return Status::OK();
786
1.87M
  }
787
788
  virtual CHECKED_STATUS DeleteCF(uint32_t column_family_id,
789
1.87M
                                  const Slice& key) override {
790
1.87M
    return DeleteImpl(column_family_id, key, kTypeDeletion);
791
1.87M
  }
792
793
  virtual CHECKED_STATUS SingleDeleteCF(uint32_t column_family_id,
794
194
                                        const Slice& key) override {
795
194
    return DeleteImpl(column_family_id, key, kTypeSingleDeletion);
796
194
  }
797
798
  virtual CHECKED_STATUS MergeCF(uint32_t column_family_id, const Slice& key,
799
93.5k
                                 const Slice& value) override {
800
93.5k
    assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites));
801
0
    Status seek_status;
802
93.5k
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
803
0
      ++sequence_;
804
0
      return seek_status;
805
0
    }
806
93.5k
    MemTable* mem = cf_mems_->GetMemTable();
807
93.5k
    auto* moptions = mem->GetMemTableOptions();
808
93.5k
    bool perform_merge = false;
809
810
93.5k
    SequenceNumber current_seq = CurrentSequenceNumber();
811
93.5k
    if (moptions->max_successive_merges > 0 && 
db_ != nullptr134
) {
812
134
      LookupKey lkey(key, current_seq);
813
814
      // Count the number of successive merges at the head
815
      // of the key in the memtable
816
134
      size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
817
818
134
      if (num_merges >= moptions->max_successive_merges) {
819
20
        perform_merge = true;
820
20
      }
821
134
    }
822
823
93.5k
    if (perform_merge) {
824
      // 1) Get the existing value
825
20
      std::string get_value;
826
827
      // Pass in the sequence number so that we also include previous merge
828
      // operations in the same batch.
829
20
      SnapshotImpl read_from_snapshot;
830
20
      read_from_snapshot.number_ = current_seq;
831
20
      ReadOptions read_options;
832
20
      read_options.snapshot = &read_from_snapshot;
833
834
20
      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
835
20
      if (cf_handle == nullptr) {
836
0
        cf_handle = db_->DefaultColumnFamily();
837
0
      }
838
20
      RETURN_NOT_OK(db_->Get(read_options, cf_handle, key, &get_value));
839
20
      Slice get_value_slice = Slice(get_value);
840
841
      // 2) Apply this merge
842
20
      auto merge_operator = moptions->merge_operator;
843
20
      assert(merge_operator);
844
845
0
      std::deque<std::string> operands;
846
20
      operands.push_front(value.ToString());
847
20
      std::string new_value;
848
20
      bool merge_success = false;
849
20
      {
850
20
        StopWatchNano timer(Env::Default(), moptions->statistics != nullptr);
851
20
        PERF_TIMER_GUARD(merge_operator_time_nanos);
852
20
        merge_success = merge_operator->FullMerge(
853
20
            key, &get_value_slice, operands, &new_value, moptions->info_log);
854
20
        RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME,
855
20
                   timer.ElapsedNanos());
856
20
      }
857
858
20
      if (!merge_success) {
859
          // Failed to merge!
860
0
        RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);
861
862
        // Store the delta in memtable
863
0
        perform_merge = false;
864
20
      } else {
865
        // 3) Add value to memtable
866
20
        Slice value_slice(new_value);
867
20
        mem->Add(current_seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value_slice, 1));
868
20
      }
869
20
    }
870
871
93.5k
    if (!perform_merge) {
872
      // Add merge operator to memtable
873
93.5k
      mem->Add(current_seq, kTypeMerge, SliceParts(&key, 1), SliceParts(&value, 1));
874
93.5k
    }
875
876
93.5k
    sequence_++;
877
93.5k
    CheckMemtableFull();
878
93.5k
    return Status::OK();
879
93.5k
  }
880
881
11.6M
  CHECKED_STATUS Frontiers(const UserFrontiers& frontiers) override {
882
11.6M
    Status seek_status;
883
11.6M
    if (!SeekToColumnFamily(0, &seek_status)) {
884
0
      return seek_status;
885
0
    }
886
11.6M
    cf_mems_->GetMemTable()->UpdateFrontiers(frontiers);
887
11.6M
    return Status::OK();
888
11.6M
  }
889
890
49.4M
  void CheckMemtableFull() {
891
49.4M
    if (flush_scheduler_ != nullptr) {
892
49.4M
      auto* cfd = cf_mems_->current();
893
49.4M
      assert(cfd != nullptr);
894
49.4M
      if (cfd->mem()->ShouldScheduleFlush() &&
895
49.4M
          
cfd->mem()->MarkFlushScheduled()12.1k
) {
896
        // MarkFlushScheduled only returns true if we are the one that
897
        // should take action, so no need to dedup further
898
12.1k
        flush_scheduler_->ScheduleFlush(cfd);
899
12.1k
      }
900
49.4M
    }
901
49.4M
  }
902
903
 private:
904
37.8M
  SequenceNumber CurrentSequenceNumber() {
905
37.8M
    return sequence_;
906
37.8M
  }
907
};
908
909
}  // namespace
910
911
// This function can only be called in these conditions:
912
// 1) During Recovery()
913
// 2) During Write(), in a single-threaded write thread
914
// 3) During Write(), in a concurrent context where memtables has been cloned
915
// The reason is that it calls memtables->Seek(), which has a stateful cache
916
Status WriteBatchInternal::InsertInto(
917
    const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
918
    ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
919
    bool ignore_missing_column_families, uint64_t log_number, DB* db,
920
27.9M
    InsertFlags insert_flags) {
921
27.9M
  MemTableInserter inserter(sequence, memtables, flush_scheduler,
922
27.9M
                            ignore_missing_column_families, log_number, db,
923
27.9M
                            insert_flags);
924
925
57.0M
  for (size_t i = 0; i < writers.size(); 
i++29.1M
) {
926
29.1M
    if (
!writers[i]->CallbackFailed()29.1M
) {
927
29.1M
      writers[i]->status = writers[i]->batch->Iterate(&inserter);
928
29.1M
      if (!writers[i]->status.ok()) {
929
1
        return writers[i]->status;
930
1
      }
931
29.1M
    }
932
29.1M
  }
933
27.9M
  return Status::OK();
934
27.9M
}
935
936
Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
937
                                      ColumnFamilyMemTables* memtables,
938
                                      FlushScheduler* flush_scheduler,
939
                                      bool ignore_missing_column_families,
940
                                      uint64_t log_number, DB* db,
941
3.01M
                                      InsertFlags insert_flags) {
942
3.01M
  MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
943
3.01M
                            flush_scheduler, ignore_missing_column_families,
944
3.01M
                            log_number, db, insert_flags);
945
3.01M
  return batch->Iterate(&inserter);
946
3.01M
}
947
948
2.96M
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
949
2.96M
  DCHECK_GE(contents.size(), kHeader);
950
2.96M
  b->rep_.assign(contents.cdata(), contents.size());
951
2.96M
  b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
952
2.96M
}
953
954
1.59M
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
955
1.59M
  SetCount(dst, Count(dst) + Count(src));
956
1.59M
  DCHECK_GE(src->rep_.size(), kHeader);
957
1.59M
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
958
1.59M
  dst->content_flags_.store(
959
1.59M
      dst->content_flags_.load(std::memory_order_relaxed) |
960
1.59M
          src->content_flags_.load(std::memory_order_relaxed),
961
1.59M
      std::memory_order_relaxed);
962
1.59M
}
963
964
size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
965
29.2M
                                            size_t rightByteSize) {
966
29.2M
  if (leftByteSize == 0 || 
rightByteSize == 01.25M
) {
967
27.9M
    return leftByteSize + rightByteSize;
968
27.9M
  } else {
969
1.26M
    return leftByteSize + rightByteSize - kHeader;
970
1.26M
  }
971
29.2M
}
972
973
11.6M
Result<size_t> DirectInsert(WriteBatch::Handler* handler, DirectWriter* writer) {
974
11.6M
  auto mem_table_inserter = down_cast<MemTableInserter*>(handler);
975
11.6M
  auto* mems = mem_table_inserter->cf_mems_;
976
11.6M
  auto current = mems->current();
977
11.6M
  if (!current) {
978
0
    mems->Seek(0);
979
0
    current = mems->current();
980
0
  }
981
11.6M
  DirectWriteHandlerImpl direct_write_handler(
982
11.6M
      current->mem(), mem_table_inserter->sequence_);
983
11.6M
  RETURN_NOT_OK(writer->Apply(&direct_write_handler));
984
11.6M
  auto result = direct_write_handler.Complete();
985
11.6M
  mem_table_inserter->CheckMemtableFull();
986
11.6M
  return result;
987
11.6M
}
988
989
}  // namespace rocksdb