YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/write_batch.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
// This source code is licensed under the BSD-style license found in the
3
// LICENSE file in the root directory of this source tree. An additional grant
4
// of patent rights can be found in the PATENTS file in the same directory.
5
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
6
// Use of this source code is governed by a BSD-style license that can be
7
// found in the LICENSE file. See the AUTHORS file for names of contributors.
8
//
9
// The following only applies to changes made to this file as part of YugaByte development.
10
//
11
// Portions Copyright (c) YugaByte, Inc.
12
//
13
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
14
// in compliance with the License.  You may obtain a copy of the License at
15
//
16
// http://www.apache.org/licenses/LICENSE-2.0
17
//
18
// Unless required by applicable law or agreed to in writing, software distributed under the License
19
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
20
// or implied.  See the License for the specific language governing permissions and limitations
21
// under the License.
22
//
23
// WriteBatch holds a collection of updates to apply atomically to a DB.
24
//
25
// The updates are applied in the order in which they are added
26
// to the WriteBatch.  For example, the value of "key" will be "v3"
27
// after the following batch is written:
28
//
29
//    batch.Put("key", "v1");
30
//    batch.Delete("key");
31
//    batch.Put("key", "v2");
32
//    batch.Put("key", "v3");
33
//
34
// Multiple threads can invoke const methods on a WriteBatch without
35
// external synchronization, but if any of the threads may call a
36
// non-const method, all threads accessing the same WriteBatch must use
37
// external synchronization.
38
39
#ifndef YB_ROCKSDB_WRITE_BATCH_H
40
#define YB_ROCKSDB_WRITE_BATCH_H
41
42
#include <stdint.h>
43
44
#include <algorithm>
45
#include <atomic>
46
#include <string>
47
#include <vector>
48
49
#include "yb/rocksdb/status.h"
50
#include "yb/rocksdb/write_batch_base.h"
51
52
#include "yb/util/slice.h"
53
54
namespace rocksdb {
55
56
class ColumnFamilyHandle;
57
struct SavePoints;
58
class UserFrontiers;
59
60
class DirectWriteHandler {
61
 public:
62
  virtual void Put(const SliceParts& key, const SliceParts& value) = 0;
63
  virtual void SingleDelete(const Slice& key) = 0;
64
65
6.40M
  virtual ~DirectWriteHandler() = default;
66
};
67
68
// DirectWriter could be attached to WriteBatch, in this case when write batch is applied to
69
// rocksdb, it calls direct writer passing DirectWriteHandler, that could be used to add
70
// entries directly to mem table.
71
class DirectWriter {
72
 public:
73
  virtual CHECKED_STATUS Apply(DirectWriteHandler* handler) = 0;
74
75
6.39M
  virtual ~DirectWriter() = default;
76
};
77
78
class WriteBatch : public WriteBatchBase {
79
 public:
80
  explicit WriteBatch(size_t reserved_bytes = 0);
81
  ~WriteBatch();
82
83
  using WriteBatchBase::Put;
84
  // Store the mapping "key->value" in the database.
85
  void Put(ColumnFamilyHandle* column_family, const Slice& key,
86
           const Slice& value) override;
87
6.48M
  void Put(const Slice& key, const Slice& value) override {
88
6.48M
    Put(nullptr, key, value);
89
6.48M
  }
90
91
  // Variant of Put() that gathers output like writev(2).  The key and value
92
  // that will be written to the database are concatentations of arrays of
93
  // slices.
94
  void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
95
           const SliceParts& value) override;
96
2
  void Put(const SliceParts& key, const SliceParts& value) override {
97
2
    Put(nullptr, key, value);
98
2
  }
99
100
  using WriteBatchBase::Delete;
101
  // If the database contains a mapping for "key", erase it.  Else do nothing.
102
  void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
103
1.39M
  void Delete(const Slice& key) override { Delete(nullptr, key); }
104
105
  // variant that takes SliceParts
106
  void Delete(ColumnFamilyHandle* column_family,
107
              const SliceParts& key) override;
108
0
  void Delete(const SliceParts& key) override { Delete(nullptr, key); }
109
110
  using WriteBatchBase::SingleDelete;
111
  // WriteBatch implementation of DB::SingleDelete().  See db.h.
112
  void SingleDelete(ColumnFamilyHandle* column_family,
113
                    const Slice& key) override;
114
19
  void SingleDelete(const Slice& key) override { SingleDelete(nullptr, key); }
115
116
  // variant that takes SliceParts
117
  void SingleDelete(ColumnFamilyHandle* column_family,
118
                    const SliceParts& key) override;
119
0
  void SingleDelete(const SliceParts& key) override {
120
0
    SingleDelete(nullptr, key);
121
0
  }
122
123
  using WriteBatchBase::Merge;
124
  // Merge "value" with the existing value of "key" in the database.
125
  // "key->merge(existing, value)"
126
  void Merge(ColumnFamilyHandle* column_family, const Slice& key,
127
             const Slice& value) override;
128
39
  void Merge(const Slice& key, const Slice& value) override {
129
39
    Merge(nullptr, key, value);
130
39
  }
131
132
  // variant that takes SliceParts
133
  void Merge(ColumnFamilyHandle* column_family, const SliceParts& key,
134
             const SliceParts& value) override;
135
0
  void Merge(const SliceParts& key, const SliceParts& value) override {
136
0
    Merge(nullptr, key, value);
137
0
  }
138
139
  using WriteBatchBase::PutLogData;
140
  // Append a blob of arbitrary size to the records in this batch. The blob will
141
  // be stored in the transaction log but not in any other file. In particular,
142
  // it will not be persisted to the SST files. When iterating over this
143
  // WriteBatch, WriteBatch::Handler::LogData will be called with the contents
144
  // of the blob as it is encountered. Blobs, puts, deletes, and merges will be
145
  // encountered in the same order in thich they were inserted. The blob will
146
  // NOT consume sequence number(s) and will NOT increase the count of the batch
147
  //
148
  // Example application: add timestamps to the transaction log for use in
149
  // replication.
150
  void PutLogData(const Slice& blob) override;
151
152
  using WriteBatchBase::Clear;
153
  // Clear all updates buffered in this batch.
154
  void Clear() override;
155
156
  // Records the state of the batch for future calls to RollbackToSavePoint().
157
  // May be called multiple times to set multiple save points.
158
  void SetSavePoint() override;
159
160
  // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the
161
  // most recent call to SetSavePoint() and removes the most recent save point.
162
  // If there is no previous call to SetSavePoint(), STATUS(NotFound, "")
163
  // will be returned.
164
  // Oterwise returns Status::OK().
165
  Status RollbackToSavePoint() override;
166
167
  // Support for iterating over the contents of a batch.
168
  class Handler {
169
   public:
170
    virtual ~Handler();
171
    // default implementation will just call Put without column family for
172
    // backwards compatibility. If the column family is not default,
173
    // the function is noop
174
    virtual CHECKED_STATUS PutCF(uint32_t column_family_id, const SliceParts& key,
175
175k
                                 const SliceParts& value) {
176
175k
      if (column_family_id == 0) {
177
        // Put() historically doesn't return status. We didn't want to be
178
        // backwards incompatible so we didn't change the return status
179
        // (this is a public API). We do an ordinary get and return Status::OK()
180
175k
        Put(key.TheOnlyPart(), value.TheOnlyPart());
181
175k
        return Status::OK();
182
175k
      }
183
0
      return STATUS(InvalidArgument,
184
0
          "non-default column family and PutCF not implemented");
185
0
    }
186
1
    virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {}
187
188
173k
    virtual CHECKED_STATUS DeleteCF(uint32_t column_family_id, const Slice& key) {
189
173k
      if (column_family_id == 0) {
190
173k
        Delete(key);
191
173k
        return Status::OK();
192
173k
      }
193
0
      return STATUS(InvalidArgument,
194
0
          "non-default column family and DeleteCF not implemented");
195
0
    }
196
1
    virtual void Delete(const Slice& /*key*/) {}
197
198
1
    virtual CHECKED_STATUS SingleDeleteCF(uint32_t column_family_id, const Slice& key) {
199
1
      if (column_family_id == 0) {
200
1
        SingleDelete(key);
201
1
        return Status::OK();
202
1
      }
203
0
      return STATUS(InvalidArgument,
204
0
          "non-default column family and SingleDeleteCF not implemented");
205
0
    }
206
1
    virtual void SingleDelete(const Slice& /*key*/) {}
207
208
    // Merge and LogData are not pure virtual. Otherwise, we would break
209
    // existing clients of Handler on a source code level. The default
210
    // implementation of Merge does nothing.
211
    virtual CHECKED_STATUS MergeCF(uint32_t column_family_id, const Slice& key,
212
1
                           const Slice& value) {
213
1
      if (column_family_id == 0) {
214
1
        Merge(key, value);
215
1
        return Status::OK();
216
1
      }
217
0
      return STATUS(InvalidArgument,
218
0
          "non-default column family and MergeCF not implemented");
219
0
    }
220
1
    virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {}
221
222
0
    virtual CHECKED_STATUS Frontiers(const UserFrontiers&) {
223
0
      return STATUS(NotSupported, "UserFrontiers not implemented");
224
0
    }
Unexecuted instantiation: _ZN7rocksdb10WriteBatch7Handler9FrontiersERKNS_13UserFrontiersE
Unexecuted instantiation: _ZN7rocksdb10WriteBatch7Handler9FrontiersERKNS_13UserFrontiersE
225
226
    // The default implementation of LogData does nothing.
227
    virtual void LogData(const Slice& blob);
228
229
    // Continue is called by WriteBatch::Iterate. If it returns false,
230
    // iteration is halted. Otherwise, it continues iterating. The default
231
    // implementation always returns true.
232
    virtual bool Continue();
233
  };
234
  CHECKED_STATUS Iterate(Handler* handler) const;
235
236
  // Retrieve the serialized version of this batch.
237
111
  const std::string& Data() const { return rep_; }
238
239
  // Retrieve data size of the batch.
240
83.5M
  size_t GetDataSize() const { return rep_.size(); }
241
242
  // Returns the number of updates in the batch
243
  uint32_t Count() const;
244
245
  // Returns true if PutCF will be called during Iterate
246
  bool HasPut() const;
247
248
  // Returns true if DeleteCF will be called during Iterate
249
  bool HasDelete() const;
250
251
  // Returns true if SingleDeleteCF will be called during Iterate
252
  bool HasSingleDelete() const;
253
254
  // Returns trie if MergeCF will be called during Iterate
255
  bool HasMerge() const;
256
257
  using WriteBatchBase::GetWriteBatch;
258
0
  WriteBatch* GetWriteBatch() override { return this; }
259
260
  // Constructor with a serialized string object
261
  explicit WriteBatch(const std::string& rep);
262
263
  WriteBatch(const WriteBatch& src);
264
  WriteBatch(WriteBatch&& src);
265
  WriteBatch& operator=(const WriteBatch& src);
266
  WriteBatch& operator=(WriteBatch&& src);
267
268
6.39M
  void SetFrontiers(const UserFrontiers* value) { frontiers_ = value; }
269
10
  const UserFrontiers* Frontiers() const { return frontiers_; }
270
271
6.39M
  void SetDirectWriter(DirectWriter* direct_writer) {
272
6.39M
    direct_writer_ = direct_writer;
273
6.39M
  }
274
275
3.42M
  bool HasDirectWriter() const {
276
3.42M
    return direct_writer_ != nullptr;
277
3.42M
  }
278
279
23.1M
  size_t DirectEntries() const {
280
23.1M
    return direct_entries_;
281
23.1M
  }
282
283
 private:
284
  friend class WriteBatchInternal;
285
  std::unique_ptr<SavePoints> save_points_;
286
287
  // For HasXYZ.  Mutable to allow lazy computation of results
288
  mutable std::atomic<uint32_t> content_flags_;
289
290
  // Performs deferred computation of content_flags if necessary
291
  uint32_t ComputeContentFlags() const;
292
293
 protected:
294
  std::string rep_;  // See comment in write_batch.cc for the format of rep_
295
  const UserFrontiers* frontiers_ = nullptr;
296
  DirectWriter* direct_writer_ = nullptr;
297
  mutable size_t direct_entries_ = 0;
298
299
  // Intentionally copyable
300
};
301
302
}  // namespace rocksdb
303
304
#endif // YB_ROCKSDB_WRITE_BATCH_H