YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/doc_write_batch.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_DOCDB_DOC_WRITE_BATCH_H
15
#define YB_DOCDB_DOC_WRITE_BATCH_H
16
17
#include "yb/common/hybrid_time.h"
18
#include "yb/common/read_hybrid_time.h"
19
20
#include "yb/docdb/doc_write_batch_cache.h"
21
#include "yb/docdb/docdb_types.h"
22
#include "yb/docdb/intent_aware_iterator.h"
23
#include "yb/docdb/key_bounds.h"
24
#include "yb/docdb/value.h"
25
26
#include "yb/rocksdb/cache.h"
27
28
#include "yb/rocksutil/write_batch_formatter.h"
29
30
#include "yb/util/enums.h"
31
#include "yb/util/monotime.h"
32
33
namespace rocksdb {
34
class DB;
35
}
36
37
namespace yb {
38
namespace docdb {
39
40
class KeyValueWriteBatchPB;
41
class IntentAwareIterator;
42
43
struct LazyIterator {
44
  std::function<std::unique_ptr<IntentAwareIterator>()>* creator;
45
  std::unique_ptr<IntentAwareIterator> iterator;
46
47
  explicit LazyIterator(std::function<std::unique_ptr<IntentAwareIterator>()>* c)
48
21.3M
    : iterator(nullptr) {
49
21.3M
    creator = c;
50
21.3M
  }
51
52
0
  explicit LazyIterator(std::unique_ptr<IntentAwareIterator> i) {
53
0
    iterator = std::move(i);
54
0
  }
55
56
21.3M
  ~LazyIterator() {}
57
58
31.5k
  IntentAwareIterator* Iterator() {
59
31.5k
    if (!iterator)
60
31.4k
      iterator = (*creator)();
61
31.5k
    return iterator.get();
62
31.5k
  }
63
};
64
65
// This controls whether "init markers" are required at all intermediate levels.
66
YB_DEFINE_ENUM(InitMarkerBehavior,
67
               // This is used in Redis. We need to keep track of document types such as strings,
68
               // hashes, sets, because there is no schema and due to Redis's error checking.
69
               (kRequired)
70
71
               // This is used in CQL. Existence of "a.b.c" implies existence of "a" and "a.b",
72
               // unless there are delete markers / TTL expiration involved.
73
               (kOptional));
74
75
// The DocWriteBatch class is used to build a RocksDB write batch for a DocDB batch of operations
76
// that may include a mix of write (set) or delete operations. It may read from RocksDB while
77
// writing, and builds up an internal rocksdb::WriteBatch while handling the operations.
78
// When all the operations are applied, the rocksdb::WriteBatch should be taken as output.
79
// Take ownership of it using std::move if it needs to live longer than this DocWriteBatch.
80
class DocWriteBatch {
81
 public:
82
  explicit DocWriteBatch(const DocDB& doc_db,
83
                         InitMarkerBehavior init_marker_behavior,
84
                         std::atomic<int64_t>* monotonic_counter = nullptr);
85
86
  Status SeekToKeyPrefix(LazyIterator* doc_iter, bool has_ancestor = false);
87
  Status SeekToKeyPrefix(IntentAwareIterator* doc_iter, bool has_ancestor);
88
89
  // Set the primitive at the given path to the given value. Intermediate subdocuments are created
90
  // if necessary and possible.
91
  CHECKED_STATUS SetPrimitive(
92
      const DocPath& doc_path,
93
      const Value& value,
94
      LazyIterator* doc_iter);
95
96
  CHECKED_STATUS SetPrimitive(
97
      const DocPath& doc_path,
98
      const Value& value,
99
      const ReadHybridTime& read_ht = ReadHybridTime::Max(),
100
      const CoarseTimePoint deadline = CoarseTimePoint::max(),
101
      rocksdb::QueryId query_id = rocksdb::kDefaultQueryId);
102
103
  CHECKED_STATUS SetPrimitive(
104
      const DocPath& doc_path,
105
      const Value& value,
106
0
      std::unique_ptr<IntentAwareIterator> intent_iter) {
107
0
    LazyIterator iter(std::move(intent_iter));
108
0
    return SetPrimitive(doc_path, value, &iter);
109
0
  }
110
111
112
  CHECKED_STATUS SetPrimitive(
113
      const DocPath& doc_path,
114
      const PrimitiveValue& value,
115
      const ReadHybridTime& read_ht = ReadHybridTime::Max(),
116
      const CoarseTimePoint deadline = CoarseTimePoint::max(),
117
      rocksdb::QueryId query_id = rocksdb::kDefaultQueryId,
118
122k
      UserTimeMicros user_timestamp = Value::kInvalidUserTimestamp) {
119
122k
    return SetPrimitive(doc_path, Value(value, Value::kMaxTtl, user_timestamp),
120
122k
                        read_ht, deadline, query_id);
121
122k
  }
122
123
  // Extend the SubDocument in the given key. We'll support List with Append and Prepend mode later.
124
  // TODO(akashnil): 03/20/17 ENG-1107
125
  // In each SetPrimitive call, some common work is repeated. It may be made more
126
  // efficient by not calling SetPrimitive internally.
127
  CHECKED_STATUS ExtendSubDocument(
128
      const DocPath& doc_path,
129
      const SubDocument& value,
130
      const ReadHybridTime& read_ht = ReadHybridTime::Max(),
131
      const CoarseTimePoint deadline = CoarseTimePoint::max(),
132
      rocksdb::QueryId query_id = rocksdb::kDefaultQueryId,
133
      MonoDelta ttl = Value::kMaxTtl,
134
      UserTimeMicros user_timestamp = Value::kInvalidUserTimestamp);
135
136
  CHECKED_STATUS InsertSubDocument(
137
      const DocPath& doc_path,
138
      const SubDocument& value,
139
      const ReadHybridTime& read_ht = ReadHybridTime::Max(),
140
      const CoarseTimePoint deadline = CoarseTimePoint::max(),
141
      rocksdb::QueryId query_id = rocksdb::kDefaultQueryId,
142
      MonoDelta ttl = Value::kMaxTtl,
143
      UserTimeMicros user_timestamp = Value::kInvalidUserTimestamp,
144
      bool init_marker_ttl = true);
145
146
  CHECKED_STATUS ExtendList(
147
      const DocPath& doc_path,
148
      const SubDocument& value,
149
      const ReadHybridTime& read_ht = ReadHybridTime::Max(),
150
      const CoarseTimePoint deadline = CoarseTimePoint::max(),
151
      rocksdb::QueryId query_id = rocksdb::kDefaultQueryId,
152
      MonoDelta ttl = Value::kMaxTtl,
153
      UserTimeMicros user_timestamp = Value::kInvalidUserTimestamp);
154
155
  // 'indices' must be sorted. List indexes are not zero indexed, the first element is list[1].
156
  CHECKED_STATUS ReplaceRedisInList(
157
      const DocPath &doc_path,
158
      const std::vector<int64_t>& indices,
159
      const std::vector<SubDocument>& values,
160
      const ReadHybridTime& read_ht,
161
      const CoarseTimePoint deadline,
162
      const rocksdb::QueryId query_id,
163
      const Direction dir = Direction::kForward,
164
      const int64_t start_index = 0,
165
      std::vector<string>* results = nullptr,
166
      MonoDelta default_ttl = Value::kMaxTtl,
167
      MonoDelta write_ttl = Value::kMaxTtl);
168
169
  CHECKED_STATUS ReplaceCqlInList(
170
      const DocPath &doc_path,
171
      const int index,
172
      const SubDocument& value,
173
      const ReadHybridTime& read_ht,
174
      const CoarseTimePoint deadline,
175
      const rocksdb::QueryId query_id,
176
      MonoDelta default_ttl = Value::kMaxTtl,
177
      MonoDelta write_ttl = Value::kMaxTtl);
178
179
  CHECKED_STATUS DeleteSubDoc(
180
      const DocPath& doc_path,
181
      const ReadHybridTime& read_ht = ReadHybridTime::Max(),
182
      const CoarseTimePoint deadline = CoarseTimePoint::max(),
183
      rocksdb::QueryId query_id = rocksdb::kDefaultQueryId,
184
122k
      UserTimeMicros user_timestamp = Value::kInvalidUserTimestamp) {
185
122k
    return SetPrimitive(doc_path, PrimitiveValue::kTombstone,
186
122k
                        read_ht, deadline, query_id, user_timestamp);
187
122k
  }
188
189
  void Clear();
190
0
  bool IsEmpty() const { return put_batch_.empty(); }
191
192
0
  size_t size() const { return put_batch_.size(); }
193
194
0
  const std::vector<std::pair<std::string, std::string>>& key_value_pairs() const {
195
0
    return put_batch_;
196
0
  }
197
198
  void MoveToWriteBatchPB(KeyValueWriteBatchPB *kv_pb);
199
200
  // This method has worse performance comparing to MoveToWriteBatchPB and intented to be used in
201
  // testing. Consider using MoveToWriteBatchPB in production code.
202
  void TEST_CopyToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) const;
203
204
  // This is used in tests when measuring the number of seeks that a given update to this batch
205
  // performs. The internal seek count is reset.
206
  int GetAndResetNumRocksDBSeeks();
207
208
1.94M
  const DocDB& doc_db() { return doc_db_; }
209
210
20.5k
  boost::optional<DocWriteBatchCache::Entry> LookupCache(const KeyBytes& encoded_key_prefix) {
211
20.5k
    return cache_.Get(encoded_key_prefix);
212
20.5k
  }
213
214
0
  std::pair<std::string, std::string>& AddRaw() {
215
0
    put_batch_.emplace_back();
216
0
    return put_batch_.back();
217
0
  }
218
219
  void UpdateMaxValueTtl(const MonoDelta& ttl);
220
221
92
  int64_t ttl_ns() const {
222
92
    return ttl_.ToNanoseconds();
223
92
  }
224
225
1.64M
  bool has_ttl() const {
226
1.64M
    return ttl_.Initialized();
227
1.64M
  }
228
229
 private:
230
  // This member function performs the necessary operations to set a primitive value for a given
231
  // docpath assuming the appropriate operations have been taken care of for subkeys with index <
232
  // subkey_index. This method assumes responsibility of ensuring the proper DocDB structure
233
  // (e.g: init markers) is maintained for subdocuments starting at the given subkey_index.
234
  CHECKED_STATUS SetPrimitiveInternal(
235
      const DocPath& doc_path,
236
      const Value& value,
237
      LazyIterator* doc_iter,
238
      bool is_deletion,
239
      size_t num_subkeys);
240
241
  // Handle the user provided timestamp during writes.
242
  Result<bool> SetPrimitiveInternalHandleUserTimestamp(const Value &value,
243
                                                       LazyIterator* doc_iter);
244
245
42.3M
  bool required_init_markers() {
246
42.3M
    return init_marker_behavior_ == InitMarkerBehavior::kRequired;
247
42.3M
  }
248
249
42.2M
  bool optional_init_markers() {
250
42.2M
    return init_marker_behavior_ == InitMarkerBehavior::kOptional;
251
42.2M
  }
252
253
  DocWriteBatchCache cache_;
254
255
  DocDB doc_db_;
256
257
  InitMarkerBehavior init_marker_behavior_;
258
  std::atomic<int64_t>* monotonic_counter_;
259
  std::vector<std::pair<std::string, std::string>> put_batch_;
260
261
  // Taken from internal_doc_iterator
262
  KeyBytes key_prefix_;
263
  bool subdoc_exists_ = true;
264
  DocWriteBatchCache::Entry current_entry_;
265
266
  MonoDelta ttl_;
267
};
268
269
// Converts a RocksDB WriteBatch to a string.
270
// line_prefix is the prefix to be added to each line of the result. Could be used for indentation.
271
Result<std::string> WriteBatchToString(
272
    const rocksdb::WriteBatch& write_batch,
273
    StorageDbType storage_db_type,
274
    BinaryOutputFormat binary_output_format,
275
    WriteBatchOutputFormat batch_output_format,
276
    const std::string& line_prefix);
277
278
}  // namespace docdb
279
}  // namespace yb
280
281
#endif // YB_DOCDB_DOC_WRITE_BATCH_H