/Users/deen/code/yugabyte-db/src/yb/docdb/doc_write_batch.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include "yb/docdb/doc_write_batch.h" |
14 | | |
15 | | #include "yb/common/doc_hybrid_time.h" |
16 | | #include "yb/common/ql_value.h" |
17 | | |
18 | | #include "yb/docdb/doc_key.h" |
19 | | #include "yb/docdb/doc_path.h" |
20 | | #include "yb/docdb/doc_ttl_util.h" |
21 | | #include "yb/docdb/docdb-internal.h" |
22 | | #include "yb/docdb/docdb.pb.h" |
23 | | #include "yb/docdb/docdb_fwd.h" |
24 | | #include "yb/docdb/docdb_rocksdb_util.h" |
25 | | #include "yb/docdb/kv_debug.h" |
26 | | #include "yb/docdb/subdocument.h" |
27 | | #include "yb/docdb/value_type.h" |
28 | | #include "yb/rocksdb/db.h" |
29 | | #include "yb/rocksdb/write_batch.h" |
30 | | #include "yb/rocksutil/write_batch_formatter.h" |
31 | | #include "yb/server/hybrid_clock.h" |
32 | | #include "yb/util/bytes_formatter.h" |
33 | | #include "yb/util/enums.h" |
34 | | #include "yb/util/logging.h" |
35 | | #include "yb/util/result.h" |
36 | | #include "yb/util/status_format.h" |
37 | | |
38 | | using yb::BinaryOutputFormat; |
39 | | |
40 | | using yb::server::HybridClock; |
41 | | |
42 | | namespace yb { |
43 | | namespace docdb { |
44 | | |
45 | | DocWriteBatch::DocWriteBatch(const DocDB& doc_db, |
46 | | InitMarkerBehavior init_marker_behavior, |
47 | | std::atomic<int64_t>* monotonic_counter) |
48 | | : doc_db_(doc_db), |
49 | | init_marker_behavior_(init_marker_behavior), |
50 | 3.48M | monotonic_counter_(monotonic_counter) {} |
51 | | |
52 | 545k | Status DocWriteBatch::SeekToKeyPrefix(LazyIterator* iter, bool has_ancestor) { |
53 | 545k | subdoc_exists_ = false; |
54 | 545k | current_entry_.value_type = ValueType::kInvalid; |
55 | | |
56 | | // Check the cache first. |
57 | 545k | boost::optional<DocWriteBatchCache::Entry> cached_entry = |
58 | 545k | cache_.Get(key_prefix_); |
59 | 545k | if (cached_entry) { |
60 | 133k | current_entry_ = *cached_entry; |
61 | 133k | subdoc_exists_ = current_entry_.value_type != ValueType::kTombstone; |
62 | 133k | return Status::OK(); |
63 | 133k | } |
64 | 411k | return SeekToKeyPrefix(iter->Iterator(), has_ancestor); |
65 | 545k | } |
66 | | |
67 | 411k | Status DocWriteBatch::SeekToKeyPrefix(IntentAwareIterator* doc_iter, bool has_ancestor) { |
68 | 411k | const auto prev_subdoc_ht = current_entry_.doc_hybrid_time; |
69 | 411k | const auto prev_key_prefix_exact = current_entry_.found_exact_key_prefix; |
70 | | |
71 | | // Seek the value. |
72 | 411k | doc_iter->Seek(key_prefix_.AsSlice()); |
73 | 411k | if (!doc_iter->valid()) { |
74 | 644 | return Status::OK(); |
75 | 644 | } |
76 | | |
77 | 410k | auto key_data = VERIFY_RESULT(doc_iter->FetchKey()); |
78 | 410k | if (!key_prefix_.IsPrefixOf(key_data.key)) { |
79 | 93.8k | return Status::OK(); |
80 | 93.8k | } |
81 | | |
82 | | // Checking for expiration. |
83 | 317k | Slice recent_value = doc_iter->value(); |
84 | 317k | ValueControlFields control_fields; |
85 | 317k | { |
86 | 317k | auto value_copy = recent_value; |
87 | 317k | control_fields = VERIFY_RESULT(ValueControlFields::Decode(&value_copy)); |
88 | 0 | current_entry_.user_timestamp = control_fields.user_timestamp; |
89 | 317k | current_entry_.value_type = DecodeValueType(value_copy); |
90 | 317k | } |
91 | | |
92 | 317k | if (HasExpiredTTL( |
93 | 317k | key_data.write_time.hybrid_time(), control_fields.ttl, doc_iter->read_time().read)) { |
94 | 2 | current_entry_.value_type = ValueType::kTombstone; |
95 | 2 | current_entry_.doc_hybrid_time = key_data.write_time; |
96 | 2 | cache_.Put(key_prefix_, current_entry_); |
97 | 2 | return Status::OK(); |
98 | 2 | } |
99 | | |
100 | 317k | Slice value; |
101 | 317k | RETURN_NOT_OK(doc_iter->NextFullValue(&key_data.write_time, &value, &key_data.key)); |
102 | | |
103 | 317k | if (!doc_iter->valid()) { |
104 | 0 | return Status::OK(); |
105 | 0 | } |
106 | | |
107 | | // If the first key >= key_prefix_ in RocksDB starts with key_prefix_, then a |
108 | | // document/subdocument pointed to by key_prefix_ exists, or has been recently deleted. |
109 | 317k | if (key_prefix_.IsPrefixOf(key_data.key)) { |
110 | | // No need to decode again if no merge records were encountered. |
111 | 317k | if (value != recent_value) { |
112 | 0 | auto value_copy = value; |
113 | 0 | current_entry_.user_timestamp = VERIFY_RESULT( |
114 | 0 | ValueControlFields::Decode(&value_copy)).user_timestamp; |
115 | 0 | current_entry_.value_type = DecodeValueType(value_copy); |
116 | 0 | } |
117 | 317k | current_entry_.found_exact_key_prefix = key_prefix_ == key_data.key; |
118 | 317k | current_entry_.doc_hybrid_time = key_data.write_time; |
119 | | |
120 | | // TODO: with optional init markers we can find something that is more than one level |
121 | | // deep relative to the current prefix. |
122 | | // Note: this comment was originally placed right before the line decoding the HybridTime, |
123 | | // which has since been refactored away. Not sure what this means, so keeping it for now. |
124 | | |
125 | | // Cache the results of reading from RocksDB so that we don't have to read again in a later |
126 | | // operation in the same DocWriteBatch. |
127 | 317k | DOCDB_DEBUG_LOG("Writing to DocWriteBatchCache: $0", |
128 | 317k | BestEffortDocDBKeyToStr(key_prefix_)); |
129 | | |
130 | 317k | if (has_ancestor && prev_subdoc_ht > current_entry_.doc_hybrid_time110k && |
131 | 317k | prev_key_prefix_exact62.7k ) { |
132 | | // We already saw an object init marker or a tombstone one level higher with a higher |
133 | | // hybrid_time, so just ignore this key/value pair. This had to be added when we switched |
134 | | // from a format with intermediate hybrid_times to our current format without them. |
135 | | // |
136 | | // Example (from a real test case): |
137 | | // |
138 | | // SubDocKey(DocKey([], ["a"]), [HT(38)]) -> {} |
139 | | // SubDocKey(DocKey([], ["a"]), [HT(37)]) -> DEL |
140 | | // SubDocKey(DocKey([], ["a"]), [HT(36)]) -> false |
141 | | // SubDocKey(DocKey([], ["a"]), [HT(1)]) -> {} |
142 | | // SubDocKey(DocKey([], ["a"]), ["y", HT(35)]) -> "lD\x97\xaf^m\x0a1\xa0\xfc\xc8YM" |
143 | | // |
144 | | // Caveat (04/17/2017): the HybridTime encoding in the above example is outdated. |
145 | | // |
146 | | // In the above layout, if we try to set "a.y.x" to a new value, we first seek to the |
147 | | // document key "a" and find that it exists, but then we seek to "a.y" and find that it |
148 | | // also exists as a primitive value (assuming we don't check the hybrid_time), and |
149 | | // therefore we can't create "a.y.x", which would be incorrect. |
150 | 62.7k | subdoc_exists_ = false; |
151 | 254k | } else { |
152 | 254k | cache_.Put(key_prefix_, current_entry_); |
153 | 254k | subdoc_exists_ = current_entry_.value_type != ValueType::kTombstone; |
154 | 254k | } |
155 | 317k | } |
156 | 317k | return Status::OK(); |
157 | 317k | } |
158 | | |
159 | | Result<bool> DocWriteBatch::SetPrimitiveInternalHandleUserTimestamp( |
160 | | const ValueControlFields& control_fields, |
161 | 141M | LazyIterator* iter) { |
162 | 141M | if (!control_fields.has_user_timestamp()) { |
163 | 141M | return true; |
164 | 141M | } |
165 | | // Seek for the older version of the key that we're about to write to. This is essentially a |
166 | | // NOOP if we've already performed the seek due to the cache. |
167 | 1.35k | RETURN_NOT_OK(SeekToKeyPrefix(iter)); |
168 | | // We'd like to include tombstones in our timestamp comparisons as well. |
169 | 1.35k | if (!current_entry_.found_exact_key_prefix) { |
170 | 259 | return true; |
171 | 259 | } |
172 | 1.09k | if (!subdoc_exists_ && current_entry_.value_type != ValueType::kTombstone42 ) { |
173 | 24 | return true; |
174 | 24 | } |
175 | | |
176 | 1.07k | if (current_entry_.user_timestamp != ValueControlFields::kInvalidUserTimestamp) { |
177 | 101 | return control_fields.user_timestamp >= current_entry_.user_timestamp; |
178 | 101 | } |
179 | | |
180 | | // Look at the hybrid time instead. |
181 | 972 | const DocHybridTime& doc_hybrid_time = current_entry_.doc_hybrid_time; |
182 | 972 | if (!doc_hybrid_time.hybrid_time().is_valid()) { |
183 | 0 | return true; |
184 | 0 | } |
185 | | |
186 | 972 | return control_fields.user_timestamp >= 0 && |
187 | 972 | implicit_cast<size_t>(control_fields.user_timestamp) >= |
188 | 52 | doc_hybrid_time.hybrid_time().GetPhysicalValueMicros(); |
189 | 972 | } |
190 | | |
191 | | namespace { |
192 | | |
193 | | Status AppendToKeySafely( |
194 | 70.9M | const PrimitiveValue& subkey, const DocPath& doc_path, KeyBytes* key_bytes) { |
195 | 70.9M | if (subkey.value_type() == ValueType::kTombstone) { |
196 | | // See https://github.com/yugabyte/yugabyte-db/issues/7835. By returning an error we are |
197 | | // avoiding a tablet server crash even if the root cause is not clear. |
198 | 0 | auto status = STATUS_FORMAT( |
199 | 0 | IllegalState, "ValueType::kTombstone not allowed in keys. doc_path: $0", doc_path); |
200 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 5) << status; |
201 | 0 | return status; |
202 | 0 | } |
203 | 70.9M | subkey.AppendToKey(key_bytes); |
204 | 70.9M | return Status::OK(); |
205 | 70.9M | } |
206 | | |
207 | | } // namespace |
208 | | |
209 | | CHECKED_STATUS DocWriteBatch::SetPrimitiveInternal( |
210 | | const DocPath& doc_path, |
211 | | const ValueControlFields& control_fields, |
212 | | const ValueRef& value, |
213 | | LazyIterator* iter, |
214 | | const bool is_deletion, |
215 | 71.6M | const size_t num_subkeys) { |
216 | 71.6M | UpdateMaxValueTtl(control_fields.ttl); |
217 | | |
218 | | // The write_id is always incremented by one for each new element of the write batch. |
219 | 71.6M | if (put_batch_.size() > numeric_limits<IntraTxnWriteId>::max()) { |
220 | 0 | return STATUS_SUBSTITUTE( |
221 | 0 | NotSupported, |
222 | 0 | "Trying to add more than $0 key/value pairs in the same single-shard txn.", |
223 | 0 | numeric_limits<IntraTxnWriteId>::max()); |
224 | 0 | } |
225 | | |
226 | 71.6M | if (control_fields.has_user_timestamp() && !optional_init_markers()210 ) { |
227 | 2 | return STATUS(IllegalState, |
228 | 2 | "User Timestamp is only supported for Optional Init Markers"); |
229 | 2 | } |
230 | | |
231 | | // We need the write_id component of DocHybridTime to disambiguate between writes in the same |
232 | | // WriteBatch, as they will have the same HybridTime when committed. E.g. if we insert, delete, |
233 | | // and re-insert the same column in one WriteBatch, we need to know the order of these operations. |
234 | 71.6M | const auto write_id = static_cast<IntraTxnWriteId>(put_batch_.size()); |
235 | 71.6M | const DocHybridTime hybrid_time = DocHybridTime(HybridTime::kMax, write_id); |
236 | | |
237 | 142M | for (size_t subkey_index = 0; subkey_index < num_subkeys; ++subkey_index70.9M ) { |
238 | 70.9M | const PrimitiveValue& subkey = doc_path.subkey(subkey_index); |
239 | | |
240 | | // We don't need to check if intermediate documents already exist if init markers are optional, |
241 | | // or if we already know they exist (either from previous reads or our own writes in the same |
242 | | // single-shard operation.) |
243 | | |
244 | 70.9M | if (optional_init_markers() || subdoc_exists_702k ) { |
245 | 70.6M | if (required_init_markers() && !IsObjectType(current_entry_.value_type)347k ) { |
246 | | // REDIS |
247 | | // ~~~~~ |
248 | | // We raise this error only if init markers are mandatory. |
249 | 0 | return STATUS_FORMAT(IllegalState, |
250 | 0 | "Cannot set values inside a subdocument of type $0", |
251 | 0 | current_entry_.value_type); |
252 | 0 | } |
253 | 70.6M | if (optional_init_markers()) { |
254 | | // CASSANDRA |
255 | | // ~~~~~~~~~ |
256 | | // In the case where init markers are optional, we don't need to check existence of |
257 | | // the current subdocument. Although if we have a user timestamp specified, we need to |
258 | | // check whether the provided user timestamp is higher than what is already present. If |
259 | | // an intermediate subdocument is found with a higher timestamp, we consider it as an |
260 | | // overwrite and skip the entire write. |
261 | 70.2M | auto should_apply = SetPrimitiveInternalHandleUserTimestamp(control_fields, iter); |
262 | 70.2M | RETURN_NOT_OK(should_apply); |
263 | 70.2M | if (!should_apply.get()) { |
264 | 6 | return Status::OK(); |
265 | 6 | } |
266 | | |
267 | 70.2M | RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_)); |
268 | 70.2M | } else if (344k subkey_index == num_subkeys - 1344k && !is_deletion164k ) { |
269 | | // REDIS |
270 | | // ~~~~~ |
271 | | // We don't need to perform a RocksDB read at the last level for upserts, we just overwrite |
272 | | // the value within the last subdocument with what we're trying to write. We still perform |
273 | | // the read for deletions, because we try to avoid writing a new tombstone if the data is |
274 | | // not there anyway. |
275 | 163k | if (!subdoc_exists_) { |
276 | 0 | return STATUS(IllegalState, "Subdocument is supposed to exist."); |
277 | 0 | } |
278 | 163k | if (!IsObjectType(current_entry_.value_type)) { |
279 | 0 | return STATUS(IllegalState, "Expected object subdocument type."); |
280 | 0 | } |
281 | 163k | RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_)); |
282 | 180k | } else { |
283 | | // REDIS |
284 | | // ~~~~~ |
285 | | // We need to check if the subdocument at this subkey exists. |
286 | 180k | if (!subdoc_exists_) { |
287 | 0 | return STATUS(IllegalState, "Subdocument is supposed to exist. $0"); |
288 | 0 | } |
289 | 180k | if (!IsObjectType(current_entry_.value_type)) { |
290 | 0 | return STATUS(IllegalState, "Expected object subdocument type. $0"); |
291 | 0 | } |
292 | 180k | RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_)); |
293 | 180k | RETURN_NOT_OK(SeekToKeyPrefix(iter, true)); |
294 | 180k | if (is_deletion && !subdoc_exists_2.07k ) { |
295 | | // A parent subdocument of the value we're trying to delete, or that value itself, does |
296 | | // not exist, nothing to do. |
297 | | // |
298 | | // TODO: in Redis's HDEL command we need to count the number of fields deleted, so we need |
299 | | // to count the deletes that are actually happening. |
300 | | // See http://redis.io/commands/hdel |
301 | 1.15k | DOCDB_DEBUG_LOG("Subdocument does not exist at subkey level $0 (subkey: $1)", |
302 | 1.15k | subkey_index, subkey.ToString()); |
303 | 1.15k | return Status::OK(); |
304 | 1.15k | } |
305 | 180k | } |
306 | 70.6M | } else { |
307 | | // REDIS |
308 | | // ~~~~~ |
309 | | // The subdocument at the current level does not exist. |
310 | 357k | if (is_deletion) { |
311 | | // A parent subdocument of the subdocument we're trying to delete does not exist, nothing |
312 | | // to do. |
313 | 0 | return Status::OK(); |
314 | 0 | } |
315 | | |
316 | 357k | DCHECK(!control_fields.has_user_timestamp()); |
317 | | |
318 | | // Add the parent key to key/value batch before appending the encoded HybridTime to it. |
319 | | // (We replicate key/value pairs without the HybridTime and only add it before writing to |
320 | | // RocksDB.) |
321 | 357k | put_batch_.emplace_back(key_prefix_.ToStringBuffer(), string(1, ValueTypeAsChar::kObject)); |
322 | | |
323 | | // Update our local cache to record the fact that we're adding this subdocument, so that |
324 | | // future operations in this DocWriteBatch don't have to add it or look for it in RocksDB. |
325 | 357k | cache_.Put(key_prefix_, hybrid_time, ValueType::kObject); |
326 | 357k | RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_)); |
327 | 357k | } |
328 | 70.9M | } |
329 | | |
330 | | // We need to handle the user timestamp if present. |
331 | 71.6M | if (VERIFY_RESULT(SetPrimitiveInternalHandleUserTimestamp(control_fields, iter))) { |
332 | | // The key in the key/value batch does not have an encoded HybridTime. |
333 | 71.6M | put_batch_.emplace_back(key_prefix_.ToStringBuffer(), std::string()); |
334 | 71.6M | auto& encoded_value = put_batch_.back().second; |
335 | 71.6M | control_fields.AppendEncoded(&encoded_value); |
336 | 71.6M | size_t prefix_len = encoded_value.size(); |
337 | | |
338 | 71.6M | AppendEncodedValue(value.value_pb(), value.sorting_type(), &encoded_value); |
339 | 71.6M | if (value.custom_value_type() != ValueType::kLowest) { |
340 | 16.3M | encoded_value[prefix_len] = static_cast<char>(value.custom_value_type()); |
341 | 16.3M | } |
342 | | |
343 | | // The key we use in the DocWriteBatchCache does not have a final hybrid_time, because that's |
344 | | // the key we expect to look up. |
345 | 71.6M | cache_.Put(key_prefix_, hybrid_time, static_cast<ValueType>(encoded_value[prefix_len]), |
346 | 71.6M | control_fields.user_timestamp); |
347 | 71.6M | } |
348 | | |
349 | 0 | return Status::OK(); |
350 | 71.6M | } |
351 | | |
352 | | Status DocWriteBatch::SetPrimitive( |
353 | | const DocPath& doc_path, |
354 | | const ValueControlFields& control_fields, |
355 | | const ValueRef& value, |
356 | 71.6M | LazyIterator* iter) { |
357 | 71.6M | DOCDB_DEBUG_LOG("Called SetPrimitive with doc_path=$0, value=$1", |
358 | 71.6M | doc_path.ToString(), value.ToString()); |
359 | 71.6M | current_entry_.doc_hybrid_time = DocHybridTime::kMin; |
360 | 71.6M | const auto num_subkeys = doc_path.num_subkeys(); |
361 | 71.6M | const bool is_deletion = value.custom_value_type() == ValueType::kTombstone; |
362 | | |
363 | 71.6M | key_prefix_ = doc_path.encoded_doc_key(); |
364 | | |
365 | | // If we are overwriting an entire document with a primitive value (not deleting it), we don't |
366 | | // need to perform any reads from RocksDB at all. |
367 | | // |
368 | | // Even if we are deleting a document, but we don't need to get any feedback on whether the |
369 | | // deletion was performed or the document was not there to begin with, we could also skip the |
370 | | // read as an optimization. |
371 | 71.6M | if (num_subkeys > 0 || is_deletion1.06M ) { |
372 | 71.5M | if (required_init_markers()) { |
373 | | // Navigate to the root of the document. We don't yet know whether the document exists or when |
374 | | // it was last updated. |
375 | 361k | RETURN_NOT_OK(SeekToKeyPrefix(iter, false)); |
376 | 361k | DOCDB_DEBUG_LOG("Top-level document exists: $0", subdoc_exists_); |
377 | 361k | if (!subdoc_exists_ && is_deletion39.4k ) { |
378 | 232 | DOCDB_DEBUG_LOG("We're performing a deletion, and the document is not present. " |
379 | 232 | "Nothing to do."); |
380 | 232 | return Status::OK(); |
381 | 232 | } |
382 | 361k | } |
383 | 71.5M | } |
384 | 71.6M | return SetPrimitiveInternal( |
385 | 71.6M | doc_path, control_fields, value, iter, is_deletion, num_subkeys); |
386 | 71.6M | } |
387 | | |
388 | | Status DocWriteBatch::SetPrimitive(const DocPath& doc_path, |
389 | | const ValueControlFields& control_fields, |
390 | | const ValueRef& value, |
391 | | const ReadHybridTime& read_ht, |
392 | | CoarseTimePoint deadline, |
393 | 71.6M | rocksdb::QueryId query_id) { |
394 | 71.6M | DOCDB_DEBUG_LOG("Called with doc_path=$0, value=$1", doc_path.ToString(), value.ToString()); |
395 | | |
396 | 71.6M | std::function<std::unique_ptr<IntentAwareIterator>()> createrator = |
397 | 71.6M | [doc_path, query_id, deadline, read_ht, this]() { |
398 | 270k | return yb::docdb::CreateIntentAwareIterator( |
399 | 270k | doc_db_, |
400 | 270k | BloomFilterMode::USE_BLOOM_FILTER, |
401 | 270k | doc_path.encoded_doc_key().AsSlice(), |
402 | 270k | query_id, |
403 | 270k | TransactionOperationContext(), |
404 | 270k | deadline, |
405 | 270k | read_ht); |
406 | 270k | }; |
407 | | |
408 | 71.6M | LazyIterator iter(&createrator); |
409 | | |
410 | 71.6M | return SetPrimitive(doc_path, control_fields, value, &iter); |
411 | 71.6M | } |
412 | | |
413 | | Status DocWriteBatch::ExtendSubDocument( |
414 | | const DocPath& doc_path, |
415 | | const ValueRef& value, |
416 | | const ReadHybridTime& read_ht, |
417 | | const CoarseTimePoint deadline, |
418 | | rocksdb::QueryId query_id, |
419 | | MonoDelta ttl, |
420 | 54.8M | UserTimeMicros user_timestamp) { |
421 | 54.8M | if (value.is_array()) { |
422 | 90 | return ExtendList(doc_path, value, read_ht, deadline, query_id, ttl, user_timestamp); |
423 | 90 | } |
424 | 54.8M | if (value.is_set()) { |
425 | 76 | ValueRef value_ref( |
426 | 76 | value.write_instruction() == bfql::TSOpcode::kSetRemove || |
427 | 76 | value.write_instruction() == bfql::TSOpcode::kMapRemove71 |
428 | 76 | ? ValueType::kTombstone10 : ValueType::kNullLow66 ); |
429 | 125 | for (const auto& key : value.value_pb().set_value().elems()) { |
430 | 125 | DocPath child_doc_path = doc_path; |
431 | 125 | child_doc_path.AddSubKey(PrimitiveValue::FromQLValuePB(key, value.sorting_type())); |
432 | 125 | RETURN_NOT_OK(ExtendSubDocument( |
433 | 125 | child_doc_path, value_ref, read_ht, deadline, query_id, ttl, user_timestamp)); |
434 | 125 | } |
435 | 76 | return Status::OK(); |
436 | 76 | } |
437 | 54.8M | if (value.is_map()) { |
438 | 105k | const auto& map_value = value.value_pb().map_value(); |
439 | 105k | int size = map_value.keys().size(); |
440 | 322k | for (int i = 0; i != size; ++i217k ) { |
441 | 217k | DocPath child_doc_path = doc_path; |
442 | 217k | const auto& key = map_value.keys(i); |
443 | 217k | if (key.value_case() != QLValuePB::kVirtualValue || |
444 | 217k | key.virtual_value() != QLVirtualValuePB::ARRAY63.3k ) { |
445 | 217k | auto sorting_type = |
446 | 217k | value.list_extend_order() == ListExtendOrder::APPEND |
447 | 217k | ? value.sorting_type()187k : SortingType::kDescending30.2k ; |
448 | 217k | if (value.write_instruction() == bfql::TSOpcode::kListAppend && |
449 | 217k | key.value_case() == QLValuePB::kInt64Value16 ) { |
450 | 16 | child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(key.int64_value())); |
451 | 217k | } else { |
452 | 217k | child_doc_path.AddSubKey(PrimitiveValue::FromQLValuePB(key, sorting_type)); |
453 | 217k | } |
454 | 217k | } |
455 | 217k | RETURN_NOT_OK(ExtendSubDocument( |
456 | 217k | child_doc_path, |
457 | 217k | ValueRef(map_value.values(i), value), |
458 | 217k | read_ht, deadline, query_id, ttl, user_timestamp)); |
459 | 217k | } |
460 | 105k | return Status::OK(); |
461 | 105k | } |
462 | 54.7M | auto control_fields = ValueControlFields{ |
463 | 54.7M | .ttl = ttl, |
464 | 54.7M | .user_timestamp = user_timestamp, |
465 | 54.7M | }; |
466 | 54.7M | return SetPrimitive(doc_path, control_fields, value, read_ht, deadline, query_id); |
467 | 54.8M | } |
468 | | |
469 | | Status DocWriteBatch::InsertSubDocument( |
470 | | const DocPath& doc_path, |
471 | | const ValueRef& value, |
472 | | const ReadHybridTime& read_ht, |
473 | | const CoarseTimePoint deadline, |
474 | | rocksdb::QueryId query_id, |
475 | | MonoDelta ttl, |
476 | | UserTimeMicros user_timestamp, |
477 | 54.5M | bool init_marker_ttl) { |
478 | 54.5M | if (!value.IsTombstoneOrPrimitive()) { |
479 | 21.5k | auto key_ttl = init_marker_ttl ? ttl21.4k : ValueControlFields::kMaxTtl162 ; |
480 | 21.5k | auto control_fields = ValueControlFields { |
481 | 21.5k | .ttl = key_ttl, |
482 | 21.5k | .user_timestamp = user_timestamp, |
483 | 21.5k | }; |
484 | 21.5k | RETURN_NOT_OK(SetPrimitive( |
485 | 21.5k | doc_path, control_fields, ValueRef(value.ContainerValueType()), read_ht, deadline, |
486 | 21.5k | query_id)); |
487 | 21.5k | } |
488 | 54.5M | return ExtendSubDocument(doc_path, value, read_ht, deadline, query_id, ttl, user_timestamp); |
489 | 54.5M | } |
490 | | |
491 | | Status DocWriteBatch::ExtendList( |
492 | | const DocPath& doc_path, |
493 | | const ValueRef& value, |
494 | | const ReadHybridTime& read_ht, |
495 | | const CoarseTimePoint deadline, |
496 | | rocksdb::QueryId query_id, |
497 | | MonoDelta ttl, |
498 | 108 | UserTimeMicros user_timestamp) { |
499 | 108 | if (monotonic_counter_ == nullptr) { |
500 | 0 | return STATUS(IllegalState, "List cannot be extended if monotonic_counter_ is uninitialized"); |
501 | 0 | } |
502 | 108 | SCHECK(value.is_array(), InvalidArgument, Format("Expecting array value ref, found $0", value)); |
503 | | |
504 | 108 | const auto& array = value.value_pb().list_value().elems(); |
505 | | // It is assumed that there is an exclusive lock on the list key. |
506 | | // The lock ensures that there isn't another thread picking ArrayIndexes for the same list. |
507 | | // No additional lock is required. |
508 | 108 | int64_t index = std::atomic_fetch_add(monotonic_counter_, static_cast<int64_t>(array.size())); |
509 | | // PREPEND - adding in reverse order with negated index |
510 | 108 | if (value.list_extend_order() == ListExtendOrder::PREPEND_BLOCK) { |
511 | 16 | for (auto i = array.size(); i-- > 0;) { |
512 | 9 | DocPath child_doc_path = doc_path; |
513 | 9 | index++; |
514 | 9 | child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(-index)); |
515 | 9 | RETURN_NOT_OK(ExtendSubDocument( |
516 | 9 | child_doc_path, ValueRef(array.Get(i), value), read_ht, deadline, query_id, |
517 | 9 | ttl, user_timestamp)); |
518 | 9 | } |
519 | 101 | } else { |
520 | 176 | for (const auto& elem : array) { |
521 | 176 | DocPath child_doc_path = doc_path; |
522 | 176 | index++; |
523 | 176 | child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex( |
524 | 176 | value.list_extend_order() == ListExtendOrder::APPEND ? index164 : -index12 )); |
525 | 176 | RETURN_NOT_OK(ExtendSubDocument( |
526 | 176 | child_doc_path, ValueRef(elem, value), read_ht, deadline, query_id, ttl, |
527 | 176 | user_timestamp)); |
528 | 176 | } |
529 | 101 | } |
530 | 108 | return Status::OK(); |
531 | 108 | } |
532 | | |
533 | | Status DocWriteBatch::ReplaceRedisInList( |
534 | | const DocPath &doc_path, |
535 | | int64_t index, |
536 | | const ValueRef& value, |
537 | | const ReadHybridTime& read_ht, |
538 | | const CoarseTimePoint deadline, |
539 | | const rocksdb::QueryId query_id, |
540 | | const Direction dir, |
541 | | const int64_t start_index, |
542 | | std::vector<string>* results, |
543 | | MonoDelta default_ttl, |
544 | 0 | MonoDelta write_ttl) { |
545 | 0 | SubDocKey sub_doc_key; |
546 | 0 | RETURN_NOT_OK(sub_doc_key.FromDocPath(doc_path)); |
547 | 0 | key_prefix_ = sub_doc_key.Encode(); |
548 | |
|
549 | 0 | auto iter = yb::docdb::CreateIntentAwareIterator( |
550 | 0 | doc_db_, |
551 | 0 | BloomFilterMode::USE_BLOOM_FILTER, |
552 | 0 | key_prefix_.AsSlice(), |
553 | 0 | query_id, |
554 | 0 | TransactionOperationContext(), |
555 | 0 | deadline, |
556 | 0 | read_ht); |
557 | |
|
558 | 0 | if (dir == Direction::kForward) { |
559 | | // Ensure we seek directly to indices and skip init marker if it exists. |
560 | 0 | key_prefix_.AppendValueType(ValueType::kArrayIndex); |
561 | 0 | RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false)); |
562 | 0 | } else { |
563 | | // We would like to seek past the entire list and go backwards. |
564 | 0 | key_prefix_.AppendValueType(ValueType::kMaxByte); |
565 | 0 | iter->PrevSubDocKey(key_prefix_); |
566 | 0 | key_prefix_.RemoveValueTypeSuffix(ValueType::kMaxByte); |
567 | 0 | key_prefix_.AppendValueType(ValueType::kArrayIndex); |
568 | 0 | } |
569 | | |
570 | 0 | SubDocKey found_key; |
571 | 0 | FetchKeyResult key_data; |
572 | 0 | for (auto current_index = start_index;;) { |
573 | 0 | if (index <= 0 || !iter->valid() || |
574 | 0 | !(key_data = VERIFY_RESULT(iter->FetchKey())).key.starts_with(key_prefix_)) { |
575 | 0 | return STATUS_SUBSTITUTE(Corruption, |
576 | 0 | "Index Error: $0, reached beginning of list with size $1", |
577 | 0 | index - 1, // YQL layer list index starts from 0, not 1 as in DocDB. |
578 | 0 | current_index); |
579 | 0 | } |
580 | | |
581 | 0 | RETURN_NOT_OK(found_key.FullyDecodeFrom(key_data.key, HybridTimeRequired::kFalse)); |
582 | | |
583 | 0 | if (VERIFY_RESULT(Value::IsTombstoned(iter->value()))) { |
584 | 0 | found_key.KeepPrefix(sub_doc_key.num_subkeys() + 1); |
585 | 0 | if (dir == Direction::kForward) { |
586 | 0 | iter->SeekPastSubKey(key_data.key); |
587 | 0 | } else { |
588 | 0 | iter->PrevSubDocKey(KeyBytes(key_data.key)); |
589 | 0 | } |
590 | 0 | continue; |
591 | 0 | } |
592 | | |
593 | | // TODO (rahul): it may be cleaner to put this in the read path. |
594 | | // The code below is meant specifically for POP functionality in Redis lists. |
595 | 0 | if (results) { |
596 | 0 | Value v; |
597 | 0 | RETURN_NOT_OK(v.Decode(iter->value())); |
598 | 0 | results->push_back(v.primitive_value().GetString()); |
599 | 0 | } |
600 | | |
601 | 0 | if (dir == Direction::kForward) { |
602 | 0 | current_index++; |
603 | 0 | } else { |
604 | 0 | current_index--; |
605 | 0 | } |
606 | | |
607 | | // Should we verify that the subkeys are indeed numbers as list indices should be? |
608 | | // Or just go in order for the index'th largest key in any subdocument? |
609 | 0 | if (current_index == index) { |
610 | | // When inserting, key_prefix_ is modified. |
611 | 0 | KeyBytes array_index_prefix(key_prefix_); |
612 | 0 | DocPath child_doc_path = doc_path; |
613 | 0 | child_doc_path.AddSubKey(found_key.subkeys()[sub_doc_key.num_subkeys()]); |
614 | 0 | return InsertSubDocument(child_doc_path, value, read_ht, deadline, query_id, write_ttl); |
615 | 0 | } |
616 | | |
617 | 0 | if (dir == Direction::kForward) { |
618 | 0 | iter->SeekPastSubKey(key_data.key); |
619 | 0 | } else { |
620 | 0 | iter->PrevSubDocKey(KeyBytes(key_data.key)); |
621 | 0 | } |
622 | 0 | } |
623 | 0 | } |
624 | | |
625 | 71.6M | void DocWriteBatch::UpdateMaxValueTtl(const MonoDelta& ttl) { |
626 | | // Don't update the max value TTL if the value is uninitialized or if it is set to |
627 | | // kMaxTtl (i.e. use table TTL). |
628 | 71.6M | if (!ttl.Initialized() || ttl.Equals(ValueControlFields::kMaxTtl)71.6M ) { |
629 | 71.1M | return; |
630 | 71.1M | } |
631 | 488k | if (!ttl_.Initialized() || ttl > ttl_361k ) { |
632 | 121k | ttl_ = ttl; |
633 | 121k | } |
634 | 488k | } |
635 | | |
636 | | Status DocWriteBatch::ReplaceCqlInList( |
637 | | const DocPath& doc_path, |
638 | | const int target_cql_index, |
639 | | const ValueRef& value, |
640 | | const ReadHybridTime& read_ht, |
641 | | const CoarseTimePoint deadline, |
642 | | const rocksdb::QueryId query_id, |
643 | | MonoDelta default_ttl, |
644 | 23 | MonoDelta write_ttl) { |
645 | 23 | SubDocKey sub_doc_key; |
646 | 23 | RETURN_NOT_OK(sub_doc_key.FromDocPath(doc_path)); |
647 | 23 | key_prefix_ = sub_doc_key.Encode(); |
648 | | |
649 | 23 | auto iter = yb::docdb::CreateIntentAwareIterator( |
650 | 23 | doc_db_, |
651 | 23 | BloomFilterMode::USE_BLOOM_FILTER, |
652 | 23 | key_prefix_.AsSlice(), |
653 | 23 | query_id, |
654 | 23 | TransactionOperationContext(), |
655 | 23 | deadline, |
656 | 23 | read_ht); |
657 | | |
658 | 23 | RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false)); |
659 | | |
660 | 23 | if (!iter->valid()) { |
661 | 2 | return STATUS(QLError, "Unable to replace items in empty list."); |
662 | 2 | } |
663 | | |
664 | 21 | auto current_key = VERIFY_RESULT(iter->FetchKey()); |
665 | | // Note that the only case we should have a collection without an init marker is if the collection |
666 | | // was created with upsert semantics. e.g.: |
667 | | // UPDATE foo SET v = v + [1, 2] WHERE k = 1 |
668 | | // If the value v at row k = 1 did not exist before, then it will be written without an init |
669 | | // marker. In this case, using DocHybridTime::kMin is valid, as it has the effect of treating each |
670 | | // collection item found in DocDB as if there were no higher-level overwrite or invalidation of |
671 | | // it. |
672 | 0 | auto current_key_is_init_marker = current_key.key.compare(key_prefix_) == 0; |
673 | 21 | auto collection_write_time = current_key_is_init_marker |
674 | 21 | ? current_key.write_time17 : DocHybridTime::kMin4 ; |
675 | | |
676 | 21 | Slice value_slice; |
677 | 21 | SubDocKey found_key; |
678 | 21 | int current_cql_index = 0; |
679 | | |
680 | | // Seek past init marker if it exists. |
681 | 21 | key_prefix_.AppendValueType(ValueType::kArrayIndex); |
682 | 21 | RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false)); |
683 | | |
684 | 21 | FetchKeyResult key_data; |
685 | 55 | while (true) { |
686 | 55 | if (target_cql_index < 0 || !iter->valid() || |
687 | 55 | !(key_data = 52 VERIFY_RESULT52 (iter->FetchKey())).key.starts_with(key_prefix_)52 ) { |
688 | 5 | return STATUS_SUBSTITUTE( |
689 | 5 | QLError, |
690 | 5 | "Unable to replace items into list, expecting index $0, reached end of list with size $1", |
691 | 5 | target_cql_index, |
692 | 5 | current_cql_index); |
693 | 5 | } |
694 | | |
695 | 50 | RETURN_NOT_OK(found_key.FullyDecodeFrom(key_data.key, HybridTimeRequired::kFalse)); |
696 | | |
697 | 50 | value_slice = iter->value(); |
698 | 50 | auto entry_ttl = VERIFY_RESULT(ValueControlFields::Decode(&value_slice)).ttl; |
699 | 0 | auto value_type = DecodeValueType(value_slice); |
700 | | |
701 | 50 | bool has_expired = false; |
702 | 50 | if (value_type == ValueType::kTombstone || key_data.write_time < collection_write_time46 ) { |
703 | 8 | has_expired = true; |
704 | 42 | } else { |
705 | 42 | entry_ttl = ComputeTTL(entry_ttl, default_ttl); |
706 | 42 | has_expired = HasExpiredTTL(key_data.write_time.hybrid_time(), entry_ttl, read_ht.read); |
707 | 42 | } |
708 | | |
709 | 50 | if (has_expired) { |
710 | 9 | found_key.KeepPrefix(sub_doc_key.num_subkeys() + 1); |
711 | 9 | iter->SeekPastSubKey(key_data.key); |
712 | 9 | continue; |
713 | 9 | } |
714 | | |
715 | | // Should we verify that the subkeys are indeed numbers as list indices should be? |
716 | | // Or just go in order for the index'th largest key in any subdocument? |
717 | 41 | if (current_cql_index == target_cql_index) { |
718 | | // When inserting, key_prefix_ is modified. |
719 | 16 | KeyBytes array_index_prefix(key_prefix_); |
720 | 16 | DocPath child_doc_path = doc_path; |
721 | 16 | child_doc_path.AddSubKey(found_key.subkeys()[sub_doc_key.num_subkeys()]); |
722 | 16 | return InsertSubDocument(child_doc_path, value, read_ht, deadline, query_id, write_ttl); |
723 | 16 | } |
724 | | |
725 | 25 | current_cql_index++; |
726 | 25 | iter->SeekPastSubKey(key_data.key); |
727 | 25 | } |
728 | 21 | } |
729 | | |
730 | | CHECKED_STATUS DocWriteBatch::DeleteSubDoc( |
731 | | const DocPath& doc_path, |
732 | | const ReadHybridTime& read_ht, |
733 | | const CoarseTimePoint deadline, |
734 | | rocksdb::QueryId query_id, |
735 | 936k | UserTimeMicros user_timestamp) { |
736 | 936k | return SetPrimitive( |
737 | 936k | doc_path, ValueRef(ValueType::kTombstone), read_ht, deadline, query_id, user_timestamp); |
738 | 936k | } |
739 | | |
740 | 400k | void DocWriteBatch::Clear() { |
741 | 400k | put_batch_.clear(); |
742 | 400k | cache_.Clear(); |
743 | 400k | } |
744 | | |
745 | 3.11M | void DocWriteBatch::MoveToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) { |
746 | 3.11M | kv_pb->mutable_write_pairs()->Reserve(narrow_cast<int>(put_batch_.size())); |
747 | 70.6M | for (auto& entry : put_batch_) { |
748 | 70.6M | KeyValuePairPB* kv_pair = kv_pb->add_write_pairs(); |
749 | 70.6M | kv_pair->mutable_key()->swap(entry.first); |
750 | 70.6M | kv_pair->mutable_value()->swap(entry.second); |
751 | 70.6M | } |
752 | 3.11M | if (has_ttl()) { |
753 | 204 | kv_pb->set_ttl(ttl_ns()); |
754 | 204 | } |
755 | 3.11M | } |
756 | | |
757 | 266 | void DocWriteBatch::TEST_CopyToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) const { |
758 | 266 | kv_pb->mutable_write_pairs()->Reserve(narrow_cast<int>(put_batch_.size())); |
759 | 548 | for (auto& entry : put_batch_) { |
760 | 548 | KeyValuePairPB* kv_pair = kv_pb->add_write_pairs(); |
761 | 548 | kv_pair->mutable_key()->assign(entry.first); |
762 | 548 | kv_pair->mutable_value()->assign(entry.second); |
763 | 548 | } |
764 | 266 | if (has_ttl()) { |
765 | 242 | kv_pb->set_ttl(ttl_ns()); |
766 | 242 | } |
767 | 266 | } |
768 | | |
769 | | // ------------------------------------------------------------------------------------------------ |
770 | | // Converting a RocksDB write batch to a string. |
771 | | // ------------------------------------------------------------------------------------------------ |
772 | | |
773 | | class DocWriteBatchFormatter : public WriteBatchFormatter { |
774 | | public: |
775 | | DocWriteBatchFormatter( |
776 | | StorageDbType storage_db_type, |
777 | | BinaryOutputFormat binary_output_format, |
778 | | WriteBatchOutputFormat batch_output_format, |
779 | | std::string line_prefix) |
780 | | : WriteBatchFormatter(binary_output_format, batch_output_format, std::move(line_prefix)), |
781 | 0 | storage_db_type_(storage_db_type) {} |
782 | | protected: |
783 | 0 | std::string FormatKey(const Slice& key) override { |
784 | 0 | const auto key_result = DocDBKeyToDebugStr(key, storage_db_type_); |
785 | 0 | if (key_result.ok()) { |
786 | 0 | return *key_result; |
787 | 0 | } |
788 | 0 | return Format( |
789 | 0 | "$0 (error: $1)", |
790 | 0 | WriteBatchFormatter::FormatKey(key), |
791 | 0 | key_result.status()); |
792 | 0 | } |
793 | | |
794 | 0 | std::string FormatValue(const Slice& key, const Slice& value) override { |
795 | 0 | auto key_type = GetKeyType(key, storage_db_type_); |
796 | 0 | const auto value_result = DocDBValueToDebugStr(key_type, key, value); |
797 | 0 | if (value_result.ok()) { |
798 | 0 | return *value_result; |
799 | 0 | } |
800 | 0 | return Format( |
801 | 0 | "$0 (error: $1)", |
802 | 0 | WriteBatchFormatter::FormatValue(key, value), |
803 | 0 | value_result.status()); |
804 | 0 | } |
805 | | |
806 | | private: |
807 | | StorageDbType storage_db_type_; |
808 | | }; |
809 | | |
810 | | Result<std::string> WriteBatchToString( |
811 | | const rocksdb::WriteBatch& write_batch, |
812 | | StorageDbType storage_db_type, |
813 | | BinaryOutputFormat binary_output_format, |
814 | | WriteBatchOutputFormat batch_output_format, |
815 | 0 | const std::string& line_prefix) { |
816 | 0 | DocWriteBatchFormatter formatter( |
817 | 0 | storage_db_type, binary_output_format, batch_output_format, line_prefix); |
818 | 0 | RETURN_NOT_OK(write_batch.Iterate(&formatter)); |
819 | 0 | return formatter.str(); |
820 | 0 | } |
821 | | |
822 | | namespace { |
823 | | |
824 | | const QLValuePB kNullValuePB; |
825 | | |
826 | | } |
827 | | |
828 | 16.4M | ValueRef::ValueRef(ValueType value_type) : value_pb_(&kNullValuePB), value_type_(value_type) { |
829 | 16.4M | } |
830 | | |
831 | 0 | std::string ValueRef::ToString() const { |
832 | 0 | return YB_CLASS_TO_STRING(value_pb, value_type); |
833 | 0 | } |
834 | | |
835 | 54.5M | bool ValueRef::IsTombstoneOrPrimitive() const { |
836 | 54.5M | return !is_array() && !is_map()54.5M && !is_set()54.5M ; |
837 | 54.5M | } |
838 | | |
839 | 21.5k | ValueType ValueRef::ContainerValueType() const { |
840 | 21.5k | if (value_type_ != ValueType::kLowest) { |
841 | 21.2k | return value_type_; |
842 | 21.2k | } |
843 | 361 | if (is_array()) { |
844 | 68 | return ValueType::kArray; |
845 | 68 | } |
846 | 293 | if (is_map() || is_set()56 ) { |
847 | 293 | return ValueType::kObject; |
848 | 293 | } |
849 | 0 | FATAL_INVALID_ENUM_VALUE(QLValuePB::ValueCase, value_pb_->value_case()); |
850 | 0 | return ValueType::kLowest; |
851 | 293 | } |
852 | | |
853 | 109M | bool ValueRef::is_array() const { |
854 | 109M | return value_pb_->value_case() == QLValuePB::kListValue; |
855 | 109M | } |
856 | | |
857 | 109M | bool ValueRef::is_set() const { |
858 | 109M | return value_pb_->value_case() == QLValuePB::kSetValue; |
859 | 109M | } |
860 | | |
861 | 109M | bool ValueRef::is_map() const { |
862 | 109M | return value_pb_->value_case() == QLValuePB::kMapValue; |
863 | 109M | } |
864 | | |
865 | | } // namespace docdb |
866 | | } // namespace yb |