YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/doc_write_batch.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/docdb/doc_write_batch.h"
14
15
#include "yb/common/doc_hybrid_time.h"
16
#include "yb/common/ql_value.h"
17
18
#include "yb/docdb/doc_key.h"
19
#include "yb/docdb/doc_path.h"
20
#include "yb/docdb/doc_ttl_util.h"
21
#include "yb/docdb/docdb-internal.h"
22
#include "yb/docdb/docdb.pb.h"
23
#include "yb/docdb/docdb_fwd.h"
24
#include "yb/docdb/docdb_rocksdb_util.h"
25
#include "yb/docdb/kv_debug.h"
26
#include "yb/docdb/subdocument.h"
27
#include "yb/docdb/value_type.h"
28
#include "yb/rocksdb/db.h"
29
#include "yb/rocksdb/write_batch.h"
30
#include "yb/rocksutil/write_batch_formatter.h"
31
#include "yb/server/hybrid_clock.h"
32
#include "yb/util/bytes_formatter.h"
33
#include "yb/util/enums.h"
34
#include "yb/util/logging.h"
35
#include "yb/util/result.h"
36
#include "yb/util/status_format.h"
37
38
using yb::BinaryOutputFormat;
39
40
using yb::server::HybridClock;
41
42
namespace yb {
43
namespace docdb {
44
45
DocWriteBatch::DocWriteBatch(const DocDB& doc_db,
46
                             InitMarkerBehavior init_marker_behavior,
47
                             std::atomic<int64_t>* monotonic_counter)
48
    : doc_db_(doc_db),
49
      init_marker_behavior_(init_marker_behavior),
50
3.48M
      monotonic_counter_(monotonic_counter) {}
51
52
545k
Status DocWriteBatch::SeekToKeyPrefix(LazyIterator* iter, bool has_ancestor) {
53
545k
  subdoc_exists_ = false;
54
545k
  current_entry_.value_type = ValueType::kInvalid;
55
56
  // Check the cache first.
57
545k
  boost::optional<DocWriteBatchCache::Entry> cached_entry =
58
545k
    cache_.Get(key_prefix_);
59
545k
  if (cached_entry) {
60
133k
    current_entry_ = *cached_entry;
61
133k
    subdoc_exists_ = current_entry_.value_type != ValueType::kTombstone;
62
133k
    return Status::OK();
63
133k
  }
64
411k
  return SeekToKeyPrefix(iter->Iterator(), has_ancestor);
65
545k
}
66
67
411k
Status DocWriteBatch::SeekToKeyPrefix(IntentAwareIterator* doc_iter, bool has_ancestor) {
68
411k
  const auto prev_subdoc_ht = current_entry_.doc_hybrid_time;
69
411k
  const auto prev_key_prefix_exact = current_entry_.found_exact_key_prefix;
70
71
  // Seek the value.
72
411k
  doc_iter->Seek(key_prefix_.AsSlice());
73
411k
  if (!doc_iter->valid()) {
74
644
    return Status::OK();
75
644
  }
76
77
410k
  auto key_data = VERIFY_RESULT(doc_iter->FetchKey());
78
410k
  if (!key_prefix_.IsPrefixOf(key_data.key)) {
79
93.8k
    return Status::OK();
80
93.8k
  }
81
82
  // Checking for expiration.
83
317k
  Slice recent_value = doc_iter->value();
84
317k
  ValueControlFields control_fields;
85
317k
  {
86
317k
    auto value_copy = recent_value;
87
317k
    control_fields = VERIFY_RESULT(ValueControlFields::Decode(&value_copy));
88
0
    current_entry_.user_timestamp = control_fields.user_timestamp;
89
317k
    current_entry_.value_type = DecodeValueType(value_copy);
90
317k
  }
91
92
317k
  if (HasExpiredTTL(
93
317k
          key_data.write_time.hybrid_time(), control_fields.ttl, doc_iter->read_time().read)) {
94
2
    current_entry_.value_type = ValueType::kTombstone;
95
2
    current_entry_.doc_hybrid_time = key_data.write_time;
96
2
    cache_.Put(key_prefix_, current_entry_);
97
2
    return Status::OK();
98
2
  }
99
100
317k
  Slice value;
101
317k
  RETURN_NOT_OK(doc_iter->NextFullValue(&key_data.write_time, &value, &key_data.key));
102
103
317k
  if (!doc_iter->valid()) {
104
0
    return Status::OK();
105
0
  }
106
107
  // If the first key >= key_prefix_ in RocksDB starts with key_prefix_, then a
108
  // document/subdocument pointed to by key_prefix_ exists, or has been recently deleted.
109
317k
  if (key_prefix_.IsPrefixOf(key_data.key)) {
110
    // No need to decode again if no merge records were encountered.
111
317k
    if (value != recent_value) {
112
0
      auto value_copy = value;
113
0
      current_entry_.user_timestamp = VERIFY_RESULT(
114
0
          ValueControlFields::Decode(&value_copy)).user_timestamp;
115
0
      current_entry_.value_type = DecodeValueType(value_copy);
116
0
    }
117
317k
    current_entry_.found_exact_key_prefix = key_prefix_ == key_data.key;
118
317k
    current_entry_.doc_hybrid_time = key_data.write_time;
119
120
    // TODO: with optional init markers we can find something that is more than one level
121
    //       deep relative to the current prefix.
122
    // Note: this comment was originally placed right before the line decoding the HybridTime,
123
    // which has since been refactored away. Not sure what this means, so keeping it for now.
124
125
    // Cache the results of reading from RocksDB so that we don't have to read again in a later
126
    // operation in the same DocWriteBatch.
127
317k
    DOCDB_DEBUG_LOG("Writing to DocWriteBatchCache: $0",
128
317k
                    BestEffortDocDBKeyToStr(key_prefix_));
129
130
317k
    if (has_ancestor && 
prev_subdoc_ht > current_entry_.doc_hybrid_time110k
&&
131
317k
        
prev_key_prefix_exact62.7k
) {
132
      // We already saw an object init marker or a tombstone one level higher with a higher
133
      // hybrid_time, so just ignore this key/value pair. This had to be added when we switched
134
      // from a format with intermediate hybrid_times to our current format without them.
135
      //
136
      // Example (from a real test case):
137
      //
138
      // SubDocKey(DocKey([], ["a"]), [HT(38)]) -> {}
139
      // SubDocKey(DocKey([], ["a"]), [HT(37)]) -> DEL
140
      // SubDocKey(DocKey([], ["a"]), [HT(36)]) -> false
141
      // SubDocKey(DocKey([], ["a"]), [HT(1)]) -> {}
142
      // SubDocKey(DocKey([], ["a"]), ["y", HT(35)]) -> "lD\x97\xaf^m\x0a1\xa0\xfc\xc8YM"
143
      //
144
      // Caveat (04/17/2017): the HybridTime encoding in the above example is outdated.
145
      //
146
      // In the above layout, if we try to set "a.y.x" to a new value, we first seek to the
147
      // document key "a" and find that it exists, but then we seek to "a.y" and find that it
148
      // also exists as a primitive value (assuming we don't check the hybrid_time), and
149
      // therefore we can't create "a.y.x", which would be incorrect.
150
62.7k
      subdoc_exists_ = false;
151
254k
    } else {
152
254k
      cache_.Put(key_prefix_, current_entry_);
153
254k
      subdoc_exists_ = current_entry_.value_type != ValueType::kTombstone;
154
254k
    }
155
317k
  }
156
317k
  return Status::OK();
157
317k
}
158
159
Result<bool> DocWriteBatch::SetPrimitiveInternalHandleUserTimestamp(
160
    const ValueControlFields& control_fields,
161
141M
    LazyIterator* iter) {
162
141M
  if (!control_fields.has_user_timestamp()) {
163
141M
    return true;
164
141M
  }
165
  // Seek for the older version of the key that we're about to write to. This is essentially a
166
  // NOOP if we've already performed the seek due to the cache.
167
1.35k
  RETURN_NOT_OK(SeekToKeyPrefix(iter));
168
  // We'd like to include tombstones in our timestamp comparisons as well.
169
1.35k
  if (!current_entry_.found_exact_key_prefix) {
170
259
    return true;
171
259
  }
172
1.09k
  if (!subdoc_exists_ && 
current_entry_.value_type != ValueType::kTombstone42
) {
173
24
    return true;
174
24
  }
175
176
1.07k
  if (current_entry_.user_timestamp != ValueControlFields::kInvalidUserTimestamp) {
177
101
    return control_fields.user_timestamp >= current_entry_.user_timestamp;
178
101
  }
179
180
  // Look at the hybrid time instead.
181
972
  const DocHybridTime& doc_hybrid_time = current_entry_.doc_hybrid_time;
182
972
  if (!doc_hybrid_time.hybrid_time().is_valid()) {
183
0
    return true;
184
0
  }
185
186
972
  return control_fields.user_timestamp >= 0 &&
187
972
         implicit_cast<size_t>(control_fields.user_timestamp) >=
188
52
             doc_hybrid_time.hybrid_time().GetPhysicalValueMicros();
189
972
}
190
191
namespace {
192
193
Status AppendToKeySafely(
194
70.9M
    const PrimitiveValue& subkey, const DocPath& doc_path, KeyBytes* key_bytes) {
195
70.9M
  if (subkey.value_type() == ValueType::kTombstone) {
196
    // See https://github.com/yugabyte/yugabyte-db/issues/7835. By returning an error we are
197
    // avoiding a tablet server crash even if the root cause is not clear.
198
0
    auto status = STATUS_FORMAT(
199
0
        IllegalState, "ValueType::kTombstone not allowed in keys. doc_path: $0", doc_path);
200
0
    YB_LOG_EVERY_N_SECS(WARNING, 5) << status;
201
0
    return status;
202
0
  }
203
70.9M
  subkey.AppendToKey(key_bytes);
204
70.9M
  return Status::OK();
205
70.9M
}
206
207
}  // namespace
208
209
CHECKED_STATUS DocWriteBatch::SetPrimitiveInternal(
210
    const DocPath& doc_path,
211
    const ValueControlFields& control_fields,
212
    const ValueRef& value,
213
    LazyIterator* iter,
214
    const bool is_deletion,
215
71.6M
    const size_t num_subkeys) {
216
71.6M
  UpdateMaxValueTtl(control_fields.ttl);
217
218
  // The write_id is always incremented by one for each new element of the write batch.
219
71.6M
  if (put_batch_.size() > numeric_limits<IntraTxnWriteId>::max()) {
220
0
    return STATUS_SUBSTITUTE(
221
0
        NotSupported,
222
0
        "Trying to add more than $0 key/value pairs in the same single-shard txn.",
223
0
        numeric_limits<IntraTxnWriteId>::max());
224
0
  }
225
226
71.6M
  if (control_fields.has_user_timestamp() && 
!optional_init_markers()210
) {
227
2
    return STATUS(IllegalState,
228
2
                  "User Timestamp is only supported for Optional Init Markers");
229
2
  }
230
231
  // We need the write_id component of DocHybridTime to disambiguate between writes in the same
232
  // WriteBatch, as they will have the same HybridTime when committed. E.g. if we insert, delete,
233
  // and re-insert the same column in one WriteBatch, we need to know the order of these operations.
234
71.6M
  const auto write_id = static_cast<IntraTxnWriteId>(put_batch_.size());
235
71.6M
  const DocHybridTime hybrid_time = DocHybridTime(HybridTime::kMax, write_id);
236
237
142M
  for (size_t subkey_index = 0; subkey_index < num_subkeys; 
++subkey_index70.9M
) {
238
70.9M
    const PrimitiveValue& subkey = doc_path.subkey(subkey_index);
239
240
    // We don't need to check if intermediate documents already exist if init markers are optional,
241
    // or if we already know they exist (either from previous reads or our own writes in the same
242
    // single-shard operation.)
243
244
70.9M
    if (optional_init_markers() || 
subdoc_exists_702k
) {
245
70.6M
      if (required_init_markers() && 
!IsObjectType(current_entry_.value_type)347k
) {
246
        // REDIS
247
        // ~~~~~
248
        // We raise this error only if init markers are mandatory.
249
0
        return STATUS_FORMAT(IllegalState,
250
0
                             "Cannot set values inside a subdocument of type $0",
251
0
                             current_entry_.value_type);
252
0
      }
253
70.6M
      if (optional_init_markers()) {
254
        // CASSANDRA
255
        // ~~~~~~~~~
256
        // In the case where init markers are optional, we don't need to check existence of
257
        // the current subdocument. Although if we have a user timestamp specified, we need to
258
        // check whether the provided user timestamp is higher than what is already present. If
259
        // an intermediate subdocument is found with a higher timestamp, we consider it as an
260
        // overwrite and skip the entire write.
261
70.2M
        auto should_apply = SetPrimitiveInternalHandleUserTimestamp(control_fields, iter);
262
70.2M
        RETURN_NOT_OK(should_apply);
263
70.2M
        if (!should_apply.get()) {
264
6
          return Status::OK();
265
6
        }
266
267
70.2M
        RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
268
70.2M
      } else 
if (344k
subkey_index == num_subkeys - 1344k
&&
!is_deletion164k
) {
269
        // REDIS
270
        // ~~~~~
271
        // We don't need to perform a RocksDB read at the last level for upserts, we just overwrite
272
        // the value within the last subdocument with what we're trying to write. We still perform
273
        // the read for deletions, because we try to avoid writing a new tombstone if the data is
274
        // not there anyway.
275
163k
        if (!subdoc_exists_) {
276
0
          return STATUS(IllegalState, "Subdocument is supposed to exist.");
277
0
        }
278
163k
        if (!IsObjectType(current_entry_.value_type)) {
279
0
          return STATUS(IllegalState, "Expected object subdocument type.");
280
0
        }
281
163k
        RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
282
180k
      } else {
283
        // REDIS
284
        // ~~~~~
285
        // We need to check if the subdocument at this subkey exists.
286
180k
        if (!subdoc_exists_) {
287
0
          return STATUS(IllegalState, "Subdocument is supposed to exist. $0");
288
0
        }
289
180k
        if (!IsObjectType(current_entry_.value_type)) {
290
0
          return STATUS(IllegalState, "Expected object subdocument type. $0");
291
0
        }
292
180k
        RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
293
180k
        RETURN_NOT_OK(SeekToKeyPrefix(iter, true));
294
180k
        if (is_deletion && 
!subdoc_exists_2.07k
) {
295
          // A parent subdocument of the value we're trying to delete, or that value itself, does
296
          // not exist, nothing to do.
297
          //
298
          // TODO: in Redis's HDEL command we need to count the number of fields deleted, so we need
299
          // to count the deletes that are actually happening.
300
          // See http://redis.io/commands/hdel
301
1.15k
          DOCDB_DEBUG_LOG("Subdocument does not exist at subkey level $0 (subkey: $1)",
302
1.15k
                          subkey_index, subkey.ToString());
303
1.15k
          return Status::OK();
304
1.15k
        }
305
180k
      }
306
70.6M
    } else {
307
      // REDIS
308
      // ~~~~~
309
      // The subdocument at the current level does not exist.
310
357k
      if (is_deletion) {
311
        // A parent subdocument of the subdocument we're trying to delete does not exist, nothing
312
        // to do.
313
0
        return Status::OK();
314
0
      }
315
316
357k
      DCHECK(!control_fields.has_user_timestamp());
317
318
      // Add the parent key to key/value batch before appending the encoded HybridTime to it.
319
      // (We replicate key/value pairs without the HybridTime and only add it before writing to
320
      // RocksDB.)
321
357k
      put_batch_.emplace_back(key_prefix_.ToStringBuffer(), string(1, ValueTypeAsChar::kObject));
322
323
      // Update our local cache to record the fact that we're adding this subdocument, so that
324
      // future operations in this DocWriteBatch don't have to add it or look for it in RocksDB.
325
357k
      cache_.Put(key_prefix_, hybrid_time, ValueType::kObject);
326
357k
      RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
327
357k
    }
328
70.9M
  }
329
330
  // We need to handle the user timestamp if present.
331
71.6M
  if (VERIFY_RESULT(SetPrimitiveInternalHandleUserTimestamp(control_fields, iter))) {
332
    // The key in the key/value batch does not have an encoded HybridTime.
333
71.6M
    put_batch_.emplace_back(key_prefix_.ToStringBuffer(), std::string());
334
71.6M
    auto& encoded_value = put_batch_.back().second;
335
71.6M
    control_fields.AppendEncoded(&encoded_value);
336
71.6M
    size_t prefix_len = encoded_value.size();
337
338
71.6M
    AppendEncodedValue(value.value_pb(), value.sorting_type(), &encoded_value);
339
71.6M
    if (value.custom_value_type() != ValueType::kLowest) {
340
16.3M
      encoded_value[prefix_len] = static_cast<char>(value.custom_value_type());
341
16.3M
    }
342
343
    // The key we use in the DocWriteBatchCache does not have a final hybrid_time, because that's
344
    // the key we expect to look up.
345
71.6M
    cache_.Put(key_prefix_, hybrid_time, static_cast<ValueType>(encoded_value[prefix_len]),
346
71.6M
               control_fields.user_timestamp);
347
71.6M
  }
348
349
0
  return Status::OK();
350
71.6M
}
351
352
Status DocWriteBatch::SetPrimitive(
353
    const DocPath& doc_path,
354
    const ValueControlFields& control_fields,
355
    const ValueRef& value,
356
71.6M
    LazyIterator* iter) {
357
71.6M
  DOCDB_DEBUG_LOG("Called SetPrimitive with doc_path=$0, value=$1",
358
71.6M
                  doc_path.ToString(), value.ToString());
359
71.6M
  current_entry_.doc_hybrid_time = DocHybridTime::kMin;
360
71.6M
  const auto num_subkeys = doc_path.num_subkeys();
361
71.6M
  const bool is_deletion = value.custom_value_type() == ValueType::kTombstone;
362
363
71.6M
  key_prefix_ = doc_path.encoded_doc_key();
364
365
  // If we are overwriting an entire document with a primitive value (not deleting it), we don't
366
  // need to perform any reads from RocksDB at all.
367
  //
368
  // Even if we are deleting a document, but we don't need to get any feedback on whether the
369
  // deletion was performed or the document was not there to begin with, we could also skip the
370
  // read as an optimization.
371
71.6M
  if (num_subkeys > 0 || 
is_deletion1.06M
) {
372
71.5M
    if (required_init_markers()) {
373
      // Navigate to the root of the document. We don't yet know whether the document exists or when
374
      // it was last updated.
375
361k
      RETURN_NOT_OK(SeekToKeyPrefix(iter, false));
376
361k
      DOCDB_DEBUG_LOG("Top-level document exists: $0", subdoc_exists_);
377
361k
      if (!subdoc_exists_ && 
is_deletion39.4k
) {
378
232
        DOCDB_DEBUG_LOG("We're performing a deletion, and the document is not present. "
379
232
                        "Nothing to do.");
380
232
        return Status::OK();
381
232
      }
382
361k
    }
383
71.5M
  }
384
71.6M
  return SetPrimitiveInternal(
385
71.6M
      doc_path, control_fields, value, iter, is_deletion, num_subkeys);
386
71.6M
}
387
388
Status DocWriteBatch::SetPrimitive(const DocPath& doc_path,
389
                                   const ValueControlFields& control_fields,
390
                                   const ValueRef& value,
391
                                   const ReadHybridTime& read_ht,
392
                                   CoarseTimePoint deadline,
393
71.6M
                                   rocksdb::QueryId query_id) {
394
71.6M
  DOCDB_DEBUG_LOG("Called with doc_path=$0, value=$1", doc_path.ToString(), value.ToString());
395
396
71.6M
  std::function<std::unique_ptr<IntentAwareIterator>()> createrator =
397
71.6M
    [doc_path, query_id, deadline, read_ht, this]() {
398
270k
      return yb::docdb::CreateIntentAwareIterator(
399
270k
          doc_db_,
400
270k
          BloomFilterMode::USE_BLOOM_FILTER,
401
270k
          doc_path.encoded_doc_key().AsSlice(),
402
270k
          query_id,
403
270k
          TransactionOperationContext(),
404
270k
          deadline,
405
270k
          read_ht);
406
270k
    };
407
408
71.6M
  LazyIterator iter(&createrator);
409
410
71.6M
  return SetPrimitive(doc_path, control_fields, value, &iter);
411
71.6M
}
412
413
Status DocWriteBatch::ExtendSubDocument(
414
    const DocPath& doc_path,
415
    const ValueRef& value,
416
    const ReadHybridTime& read_ht,
417
    const CoarseTimePoint deadline,
418
    rocksdb::QueryId query_id,
419
    MonoDelta ttl,
420
54.8M
    UserTimeMicros user_timestamp) {
421
54.8M
  if (value.is_array()) {
422
90
    return ExtendList(doc_path, value, read_ht, deadline, query_id, ttl, user_timestamp);
423
90
  }
424
54.8M
  if (value.is_set()) {
425
76
    ValueRef value_ref(
426
76
        value.write_instruction() == bfql::TSOpcode::kSetRemove ||
427
76
        
value.write_instruction() == bfql::TSOpcode::kMapRemove71
428
76
        ? 
ValueType::kTombstone10
:
ValueType::kNullLow66
);
429
125
    for (const auto& key : value.value_pb().set_value().elems()) {
430
125
      DocPath child_doc_path = doc_path;
431
125
      child_doc_path.AddSubKey(PrimitiveValue::FromQLValuePB(key, value.sorting_type()));
432
125
      RETURN_NOT_OK(ExtendSubDocument(
433
125
          child_doc_path, value_ref, read_ht, deadline, query_id, ttl, user_timestamp));
434
125
    }
435
76
    return Status::OK();
436
76
  }
437
54.8M
  if (value.is_map()) {
438
105k
    const auto& map_value = value.value_pb().map_value();
439
105k
    int size = map_value.keys().size();
440
322k
    for (int i = 0; i != size; 
++i217k
) {
441
217k
      DocPath child_doc_path = doc_path;
442
217k
      const auto& key = map_value.keys(i);
443
217k
      if (key.value_case() != QLValuePB::kVirtualValue ||
444
217k
          
key.virtual_value() != QLVirtualValuePB::ARRAY63.3k
) {
445
217k
        auto sorting_type =
446
217k
            value.list_extend_order() == ListExtendOrder::APPEND
447
217k
            ? 
value.sorting_type()187k
:
SortingType::kDescending30.2k
;
448
217k
        if (value.write_instruction() == bfql::TSOpcode::kListAppend &&
449
217k
            
key.value_case() == QLValuePB::kInt64Value16
) {
450
16
          child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(key.int64_value()));
451
217k
        } else {
452
217k
          child_doc_path.AddSubKey(PrimitiveValue::FromQLValuePB(key, sorting_type));
453
217k
        }
454
217k
      }
455
217k
      RETURN_NOT_OK(ExtendSubDocument(
456
217k
          child_doc_path,
457
217k
          ValueRef(map_value.values(i), value),
458
217k
          read_ht, deadline, query_id, ttl, user_timestamp));
459
217k
    }
460
105k
    return Status::OK();
461
105k
  }
462
54.7M
  auto control_fields = ValueControlFields{
463
54.7M
    .ttl = ttl,
464
54.7M
    .user_timestamp = user_timestamp,
465
54.7M
  };
466
54.7M
  return SetPrimitive(doc_path, control_fields, value, read_ht, deadline, query_id);
467
54.8M
}
468
469
Status DocWriteBatch::InsertSubDocument(
470
    const DocPath& doc_path,
471
    const ValueRef& value,
472
    const ReadHybridTime& read_ht,
473
    const CoarseTimePoint deadline,
474
    rocksdb::QueryId query_id,
475
    MonoDelta ttl,
476
    UserTimeMicros user_timestamp,
477
54.5M
    bool init_marker_ttl) {
478
54.5M
  if (!value.IsTombstoneOrPrimitive()) {
479
21.5k
    auto key_ttl = init_marker_ttl ? 
ttl21.4k
:
ValueControlFields::kMaxTtl162
;
480
21.5k
    auto control_fields = ValueControlFields {
481
21.5k
      .ttl = key_ttl,
482
21.5k
      .user_timestamp = user_timestamp,
483
21.5k
    };
484
21.5k
    RETURN_NOT_OK(SetPrimitive(
485
21.5k
        doc_path, control_fields, ValueRef(value.ContainerValueType()), read_ht, deadline,
486
21.5k
        query_id));
487
21.5k
  }
488
54.5M
  return ExtendSubDocument(doc_path, value, read_ht, deadline, query_id, ttl, user_timestamp);
489
54.5M
}
490
491
Status DocWriteBatch::ExtendList(
492
    const DocPath& doc_path,
493
    const ValueRef& value,
494
    const ReadHybridTime& read_ht,
495
    const CoarseTimePoint deadline,
496
    rocksdb::QueryId query_id,
497
    MonoDelta ttl,
498
108
    UserTimeMicros user_timestamp) {
499
108
  if (monotonic_counter_ == nullptr) {
500
0
    return STATUS(IllegalState, "List cannot be extended if monotonic_counter_ is uninitialized");
501
0
  }
502
108
  SCHECK(value.is_array(), InvalidArgument, Format("Expecting array value ref, found $0", value));
503
504
108
  const auto& array = value.value_pb().list_value().elems();
505
  // It is assumed that there is an exclusive lock on the list key.
506
  // The lock ensures that there isn't another thread picking ArrayIndexes for the same list.
507
  // No additional lock is required.
508
108
  int64_t index = std::atomic_fetch_add(monotonic_counter_, static_cast<int64_t>(array.size()));
509
  // PREPEND - adding in reverse order with negated index
510
108
  if (value.list_extend_order() == ListExtendOrder::PREPEND_BLOCK) {
511
16
    for (auto i = array.size(); i-- > 0;) {
512
9
      DocPath child_doc_path = doc_path;
513
9
      index++;
514
9
      child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(-index));
515
9
      RETURN_NOT_OK(ExtendSubDocument(
516
9
          child_doc_path, ValueRef(array.Get(i), value), read_ht, deadline, query_id,
517
9
          ttl, user_timestamp));
518
9
    }
519
101
  } else {
520
176
    for (const auto& elem : array) {
521
176
      DocPath child_doc_path = doc_path;
522
176
      index++;
523
176
      child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(
524
176
          value.list_extend_order() == ListExtendOrder::APPEND ? 
index164
:
-index12
));
525
176
      RETURN_NOT_OK(ExtendSubDocument(
526
176
          child_doc_path, ValueRef(elem, value), read_ht, deadline, query_id, ttl,
527
176
          user_timestamp));
528
176
    }
529
101
  }
530
108
  return Status::OK();
531
108
}
532
533
Status DocWriteBatch::ReplaceRedisInList(
534
    const DocPath &doc_path,
535
    int64_t index,
536
    const ValueRef& value,
537
    const ReadHybridTime& read_ht,
538
    const CoarseTimePoint deadline,
539
    const rocksdb::QueryId query_id,
540
    const Direction dir,
541
    const int64_t start_index,
542
    std::vector<string>* results,
543
    MonoDelta default_ttl,
544
0
    MonoDelta write_ttl) {
545
0
  SubDocKey sub_doc_key;
546
0
  RETURN_NOT_OK(sub_doc_key.FromDocPath(doc_path));
547
0
  key_prefix_ = sub_doc_key.Encode();
548
549
0
  auto iter = yb::docdb::CreateIntentAwareIterator(
550
0
      doc_db_,
551
0
      BloomFilterMode::USE_BLOOM_FILTER,
552
0
      key_prefix_.AsSlice(),
553
0
      query_id,
554
0
      TransactionOperationContext(),
555
0
      deadline,
556
0
      read_ht);
557
558
0
  if (dir == Direction::kForward) {
559
    // Ensure we seek directly to indices and skip init marker if it exists.
560
0
    key_prefix_.AppendValueType(ValueType::kArrayIndex);
561
0
    RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false));
562
0
  } else {
563
    // We would like to seek past the entire list and go backwards.
564
0
    key_prefix_.AppendValueType(ValueType::kMaxByte);
565
0
    iter->PrevSubDocKey(key_prefix_);
566
0
    key_prefix_.RemoveValueTypeSuffix(ValueType::kMaxByte);
567
0
    key_prefix_.AppendValueType(ValueType::kArrayIndex);
568
0
  }
569
570
0
  SubDocKey found_key;
571
0
  FetchKeyResult key_data;
572
0
  for (auto current_index = start_index;;) {
573
0
    if (index <= 0 || !iter->valid() ||
574
0
        !(key_data = VERIFY_RESULT(iter->FetchKey())).key.starts_with(key_prefix_)) {
575
0
      return STATUS_SUBSTITUTE(Corruption,
576
0
          "Index Error: $0, reached beginning of list with size $1",
577
0
          index - 1, // YQL layer list index starts from 0, not 1 as in DocDB.
578
0
          current_index);
579
0
    }
580
581
0
    RETURN_NOT_OK(found_key.FullyDecodeFrom(key_data.key, HybridTimeRequired::kFalse));
582
583
0
    if (VERIFY_RESULT(Value::IsTombstoned(iter->value()))) {
584
0
      found_key.KeepPrefix(sub_doc_key.num_subkeys() + 1);
585
0
      if (dir == Direction::kForward) {
586
0
        iter->SeekPastSubKey(key_data.key);
587
0
      } else {
588
0
        iter->PrevSubDocKey(KeyBytes(key_data.key));
589
0
      }
590
0
      continue;
591
0
    }
592
593
    // TODO (rahul): it may be cleaner to put this in the read path.
594
    // The code below is meant specifically for POP functionality in Redis lists.
595
0
    if (results) {
596
0
      Value v;
597
0
      RETURN_NOT_OK(v.Decode(iter->value()));
598
0
      results->push_back(v.primitive_value().GetString());
599
0
    }
600
601
0
    if (dir == Direction::kForward) {
602
0
      current_index++;
603
0
    } else {
604
0
      current_index--;
605
0
    }
606
607
    // Should we verify that the subkeys are indeed numbers as list indices should be?
608
    // Or just go in order for the index'th largest key in any subdocument?
609
0
    if (current_index == index) {
610
      // When inserting, key_prefix_ is modified.
611
0
      KeyBytes array_index_prefix(key_prefix_);
612
0
      DocPath child_doc_path = doc_path;
613
0
      child_doc_path.AddSubKey(found_key.subkeys()[sub_doc_key.num_subkeys()]);
614
0
      return InsertSubDocument(child_doc_path, value, read_ht, deadline, query_id, write_ttl);
615
0
    }
616
617
0
    if (dir == Direction::kForward) {
618
0
      iter->SeekPastSubKey(key_data.key);
619
0
    } else {
620
0
      iter->PrevSubDocKey(KeyBytes(key_data.key));
621
0
    }
622
0
  }
623
0
}
624
625
71.6M
void DocWriteBatch::UpdateMaxValueTtl(const MonoDelta& ttl) {
626
  // Don't update the max value TTL if the value is uninitialized or if it is set to
627
  // kMaxTtl (i.e. use table TTL).
628
71.6M
  if (!ttl.Initialized() || 
ttl.Equals(ValueControlFields::kMaxTtl)71.6M
) {
629
71.1M
    return;
630
71.1M
  }
631
488k
  if (!ttl_.Initialized() || 
ttl > ttl_361k
) {
632
121k
    ttl_ = ttl;
633
121k
  }
634
488k
}
635
636
Status DocWriteBatch::ReplaceCqlInList(
637
    const DocPath& doc_path,
638
    const int target_cql_index,
639
    const ValueRef& value,
640
    const ReadHybridTime& read_ht,
641
    const CoarseTimePoint deadline,
642
    const rocksdb::QueryId query_id,
643
    MonoDelta default_ttl,
644
23
    MonoDelta write_ttl) {
645
23
  SubDocKey sub_doc_key;
646
23
  RETURN_NOT_OK(sub_doc_key.FromDocPath(doc_path));
647
23
  key_prefix_ = sub_doc_key.Encode();
648
649
23
  auto iter = yb::docdb::CreateIntentAwareIterator(
650
23
      doc_db_,
651
23
      BloomFilterMode::USE_BLOOM_FILTER,
652
23
      key_prefix_.AsSlice(),
653
23
      query_id,
654
23
      TransactionOperationContext(),
655
23
      deadline,
656
23
      read_ht);
657
658
23
  RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false));
659
660
23
  if (!iter->valid()) {
661
2
    return STATUS(QLError, "Unable to replace items in empty list.");
662
2
  }
663
664
21
  auto current_key = VERIFY_RESULT(iter->FetchKey());
665
  // Note that the only case we should have a collection without an init marker is if the collection
666
  // was created with upsert semantics. e.g.:
667
  // UPDATE foo SET v = v + [1, 2] WHERE k = 1
668
  // If the value v at row k = 1 did not exist before, then it will be written without an init
669
  // marker. In this case, using DocHybridTime::kMin is valid, as it has the effect of treating each
670
  // collection item found in DocDB as if there were no higher-level overwrite or invalidation of
671
  // it.
672
0
  auto current_key_is_init_marker = current_key.key.compare(key_prefix_) == 0;
673
21
  auto collection_write_time = current_key_is_init_marker
674
21
      ? 
current_key.write_time17
:
DocHybridTime::kMin4
;
675
676
21
  Slice value_slice;
677
21
  SubDocKey found_key;
678
21
  int current_cql_index = 0;
679
680
  // Seek past init marker if it exists.
681
21
  key_prefix_.AppendValueType(ValueType::kArrayIndex);
682
21
  RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false));
683
684
21
  FetchKeyResult key_data;
685
55
  while (true) {
686
55
    if (target_cql_index < 0 || !iter->valid() ||
687
55
        
!(key_data = 52
VERIFY_RESULT52
(iter->FetchKey())).key.starts_with(key_prefix_)52
) {
688
5
      return STATUS_SUBSTITUTE(
689
5
          QLError,
690
5
          "Unable to replace items into list, expecting index $0, reached end of list with size $1",
691
5
          target_cql_index,
692
5
          current_cql_index);
693
5
    }
694
695
50
    RETURN_NOT_OK(found_key.FullyDecodeFrom(key_data.key, HybridTimeRequired::kFalse));
696
697
50
    value_slice = iter->value();
698
50
    auto entry_ttl = VERIFY_RESULT(ValueControlFields::Decode(&value_slice)).ttl;
699
0
    auto value_type = DecodeValueType(value_slice);
700
701
50
    bool has_expired = false;
702
50
    if (value_type == ValueType::kTombstone || 
key_data.write_time < collection_write_time46
) {
703
8
      has_expired = true;
704
42
    } else {
705
42
      entry_ttl = ComputeTTL(entry_ttl, default_ttl);
706
42
      has_expired = HasExpiredTTL(key_data.write_time.hybrid_time(), entry_ttl, read_ht.read);
707
42
    }
708
709
50
    if (has_expired) {
710
9
      found_key.KeepPrefix(sub_doc_key.num_subkeys() + 1);
711
9
      iter->SeekPastSubKey(key_data.key);
712
9
      continue;
713
9
    }
714
715
    // Should we verify that the subkeys are indeed numbers as list indices should be?
716
    // Or just go in order for the index'th largest key in any subdocument?
717
41
    if (current_cql_index == target_cql_index) {
718
      // When inserting, key_prefix_ is modified.
719
16
      KeyBytes array_index_prefix(key_prefix_);
720
16
      DocPath child_doc_path = doc_path;
721
16
      child_doc_path.AddSubKey(found_key.subkeys()[sub_doc_key.num_subkeys()]);
722
16
      return InsertSubDocument(child_doc_path, value, read_ht, deadline, query_id, write_ttl);
723
16
    }
724
725
25
    current_cql_index++;
726
25
    iter->SeekPastSubKey(key_data.key);
727
25
  }
728
21
}
729
730
CHECKED_STATUS DocWriteBatch::DeleteSubDoc(
731
    const DocPath& doc_path,
732
    const ReadHybridTime& read_ht,
733
    const CoarseTimePoint deadline,
734
    rocksdb::QueryId query_id,
735
936k
    UserTimeMicros user_timestamp) {
736
936k
  return SetPrimitive(
737
936k
      doc_path, ValueRef(ValueType::kTombstone), read_ht, deadline, query_id, user_timestamp);
738
936k
}
739
740
400k
void DocWriteBatch::Clear() {
741
400k
  put_batch_.clear();
742
400k
  cache_.Clear();
743
400k
}
744
745
3.11M
void DocWriteBatch::MoveToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) {
746
3.11M
  kv_pb->mutable_write_pairs()->Reserve(narrow_cast<int>(put_batch_.size()));
747
70.6M
  for (auto& entry : put_batch_) {
748
70.6M
    KeyValuePairPB* kv_pair = kv_pb->add_write_pairs();
749
70.6M
    kv_pair->mutable_key()->swap(entry.first);
750
70.6M
    kv_pair->mutable_value()->swap(entry.second);
751
70.6M
  }
752
3.11M
  if (has_ttl()) {
753
204
    kv_pb->set_ttl(ttl_ns());
754
204
  }
755
3.11M
}
756
757
266
void DocWriteBatch::TEST_CopyToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) const {
758
266
  kv_pb->mutable_write_pairs()->Reserve(narrow_cast<int>(put_batch_.size()));
759
548
  for (auto& entry : put_batch_) {
760
548
    KeyValuePairPB* kv_pair = kv_pb->add_write_pairs();
761
548
    kv_pair->mutable_key()->assign(entry.first);
762
548
    kv_pair->mutable_value()->assign(entry.second);
763
548
  }
764
266
  if (has_ttl()) {
765
242
    kv_pb->set_ttl(ttl_ns());
766
242
  }
767
266
}
768
769
// ------------------------------------------------------------------------------------------------
770
// Converting a RocksDB write batch to a string.
771
// ------------------------------------------------------------------------------------------------
772
773
class DocWriteBatchFormatter : public WriteBatchFormatter {
774
 public:
775
  DocWriteBatchFormatter(
776
      StorageDbType storage_db_type,
777
      BinaryOutputFormat binary_output_format,
778
      WriteBatchOutputFormat batch_output_format,
779
      std::string line_prefix)
780
      : WriteBatchFormatter(binary_output_format, batch_output_format, std::move(line_prefix)),
781
0
        storage_db_type_(storage_db_type) {}
782
 protected:
783
0
  std::string FormatKey(const Slice& key) override {
784
0
    const auto key_result = DocDBKeyToDebugStr(key, storage_db_type_);
785
0
    if (key_result.ok()) {
786
0
      return *key_result;
787
0
    }
788
0
    return Format(
789
0
        "$0 (error: $1)",
790
0
        WriteBatchFormatter::FormatKey(key),
791
0
        key_result.status());
792
0
  }
793
794
0
  std::string FormatValue(const Slice& key, const Slice& value) override {
795
0
    auto key_type = GetKeyType(key, storage_db_type_);
796
0
    const auto value_result = DocDBValueToDebugStr(key_type, key, value);
797
0
    if (value_result.ok()) {
798
0
      return *value_result;
799
0
    }
800
0
    return Format(
801
0
        "$0 (error: $1)",
802
0
        WriteBatchFormatter::FormatValue(key, value),
803
0
        value_result.status());
804
0
  }
805
806
 private:
807
  StorageDbType storage_db_type_;
808
};
809
810
Result<std::string> WriteBatchToString(
811
    const rocksdb::WriteBatch& write_batch,
812
    StorageDbType storage_db_type,
813
    BinaryOutputFormat binary_output_format,
814
    WriteBatchOutputFormat batch_output_format,
815
0
    const std::string& line_prefix) {
816
0
  DocWriteBatchFormatter formatter(
817
0
      storage_db_type, binary_output_format, batch_output_format, line_prefix);
818
0
  RETURN_NOT_OK(write_batch.Iterate(&formatter));
819
0
  return formatter.str();
820
0
}
821
822
namespace {
823
824
const QLValuePB kNullValuePB;
825
826
}
827
828
16.4M
ValueRef::ValueRef(ValueType value_type) : value_pb_(&kNullValuePB), value_type_(value_type) {
829
16.4M
}
830
831
0
std::string ValueRef::ToString() const {
832
0
  return YB_CLASS_TO_STRING(value_pb, value_type);
833
0
}
834
835
54.5M
bool ValueRef::IsTombstoneOrPrimitive() const {
836
54.5M
  return !is_array() && 
!is_map()54.5M
&&
!is_set()54.5M
;
837
54.5M
}
838
839
21.5k
ValueType ValueRef::ContainerValueType() const {
840
21.5k
  if (value_type_ != ValueType::kLowest) {
841
21.2k
    return value_type_;
842
21.2k
  }
843
361
  if (is_array()) {
844
68
    return ValueType::kArray;
845
68
  }
846
293
  if (is_map() || 
is_set()56
) {
847
293
    return ValueType::kObject;
848
293
  }
849
0
  FATAL_INVALID_ENUM_VALUE(QLValuePB::ValueCase, value_pb_->value_case());
850
0
  return ValueType::kLowest;
851
293
}
852
853
109M
bool ValueRef::is_array() const {
854
109M
  return value_pb_->value_case() == QLValuePB::kListValue;
855
109M
}
856
857
109M
bool ValueRef::is_set() const {
858
109M
  return value_pb_->value_case() == QLValuePB::kSetValue;
859
109M
}
860
861
109M
bool ValueRef::is_map() const {
862
109M
  return value_pb_->value_case() == QLValuePB::kMapValue;
863
109M
}
864
865
}  // namespace docdb
866
}  // namespace yb