YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/doc_write_batch.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/docdb/doc_write_batch.h"
14
15
#include "yb/common/doc_hybrid_time.h"
16
#include "yb/docdb/doc_key.h"
17
#include "yb/docdb/doc_path.h"
18
#include "yb/docdb/doc_ttl_util.h"
19
#include "yb/docdb/docdb-internal.h"
20
#include "yb/docdb/docdb.pb.h"
21
#include "yb/docdb/docdb_fwd.h"
22
#include "yb/docdb/docdb_rocksdb_util.h"
23
#include "yb/docdb/kv_debug.h"
24
#include "yb/docdb/subdocument.h"
25
#include "yb/docdb/value_type.h"
26
#include "yb/rocksdb/db.h"
27
#include "yb/rocksdb/write_batch.h"
28
#include "yb/rocksutil/write_batch_formatter.h"
29
#include "yb/server/hybrid_clock.h"
30
#include "yb/util/bytes_formatter.h"
31
#include "yb/util/enums.h"
32
#include "yb/util/logging.h"
33
#include "yb/util/result.h"
34
#include "yb/util/status_format.h"
35
36
using yb::BinaryOutputFormat;
37
38
using yb::server::HybridClock;
39
40
namespace yb {
41
namespace docdb {
42
43
DocWriteBatch::DocWriteBatch(const DocDB& doc_db,
44
                             InitMarkerBehavior init_marker_behavior,
45
                             std::atomic<int64_t>* monotonic_counter)
46
    : doc_db_(doc_db),
47
      init_marker_behavior_(init_marker_behavior),
48
1.64M
      monotonic_counter_(monotonic_counter) {}
49
50
98.2k
Status DocWriteBatch::SeekToKeyPrefix(LazyIterator* iter, bool has_ancestor) {
51
98.2k
  subdoc_exists_ = false;
52
98.2k
  current_entry_.value_type = ValueType::kInvalid;
53
54
  // Check the cache first.
55
98.2k
  boost::optional<DocWriteBatchCache::Entry> cached_entry =
56
98.2k
    cache_.Get(key_prefix_);
57
98.2k
  if (cached_entry) {
58
66.6k
    current_entry_ = *cached_entry;
59
66.6k
    subdoc_exists_ = current_entry_.value_type != ValueType::kTombstone;
60
66.6k
    return Status::OK();
61
66.6k
  }
62
31.5k
  return SeekToKeyPrefix(iter->Iterator(), has_ancestor);
63
31.5k
}
64
65
31.5k
Status DocWriteBatch::SeekToKeyPrefix(IntentAwareIterator* doc_iter, bool has_ancestor) {
66
31.5k
  const auto prev_subdoc_ht = current_entry_.doc_hybrid_time;
67
31.5k
  const auto prev_key_prefix_exact = current_entry_.found_exact_key_prefix;
68
69
  // Seek the value.
70
31.5k
  doc_iter->Seek(key_prefix_.AsSlice());
71
31.5k
  if (!doc_iter->valid()) {
72
226
    return Status::OK();
73
226
  }
74
75
31.3k
  auto key_data = VERIFY_RESULT(doc_iter->FetchKey());
76
31.3k
  if (!key_prefix_.IsPrefixOf(key_data.key)) {
77
30.8k
    return Status::OK();
78
30.8k
  }
79
80
  // Checking for expiration.
81
527
  uint64_t merge_flags = 0;
82
527
  MonoDelta ttl;
83
527
  Slice recent_value = doc_iter->value();
84
527
  RETURN_NOT_OK(Value::DecodePrimitiveValueType(
85
527
      recent_value, &(current_entry_.value_type),
86
527
      &merge_flags, &ttl, &(current_entry_.user_timestamp)));
87
88
527
  if (HasExpiredTTL(key_data.write_time.hybrid_time(), ttl, doc_iter->read_time().read)) {
89
2
    current_entry_.value_type = ValueType::kTombstone;
90
2
    current_entry_.doc_hybrid_time = key_data.write_time;
91
2
    cache_.Put(key_prefix_, current_entry_);
92
2
    return Status::OK();
93
2
  }
94
95
525
  Slice value;
96
525
  RETURN_NOT_OK(doc_iter->NextFullValue(&key_data.write_time, &value, &key_data.key));
97
98
525
  if (!doc_iter->valid()) {
99
0
    return Status::OK();
100
0
  }
101
102
  // If the first key >= key_prefix_ in RocksDB starts with key_prefix_, then a
103
  // document/subdocument pointed to by key_prefix_ exists, or has been recently deleted.
104
525
  if (key_prefix_.IsPrefixOf(key_data.key)) {
105
    // No need to decode again if no merge records were encountered.
106
525
    if (value != recent_value)
107
0
      RETURN_NOT_OK(Value::DecodePrimitiveValueType(value, &(current_entry_.value_type),
108
525
          /* merge flags */ nullptr, /* ttl */ nullptr, &(current_entry_.user_timestamp)));
109
525
    current_entry_.found_exact_key_prefix = key_prefix_ == key_data.key;
110
525
    current_entry_.doc_hybrid_time = key_data.write_time;
111
112
    // TODO: with optional init markers we can find something that is more than one level
113
    //       deep relative to the current prefix.
114
    // Note: this comment was originally placed right before the line decoding the HybridTime,
115
    // which has since been refactored away. Not sure what this means, so keeping it for now.
116
117
    // Cache the results of reading from RocksDB so that we don't have to read again in a later
118
    // operation in the same DocWriteBatch.
119
525
    DOCDB_DEBUG_LOG("Writing to DocWriteBatchCache: $0",
120
525
                    BestEffortDocDBKeyToStr(key_prefix_));
121
122
525
    if (has_ancestor && prev_subdoc_ht > current_entry_.doc_hybrid_time &&
123
0
        prev_key_prefix_exact) {
124
      // We already saw an object init marker or a tombstone one level higher with a higher
125
      // hybrid_time, so just ignore this key/value pair. This had to be added when we switched
126
      // from a format with intermediate hybrid_times to our current format without them.
127
      //
128
      // Example (from a real test case):
129
      //
130
      // SubDocKey(DocKey([], ["a"]), [HT(38)]) -> {}
131
      // SubDocKey(DocKey([], ["a"]), [HT(37)]) -> DEL
132
      // SubDocKey(DocKey([], ["a"]), [HT(36)]) -> false
133
      // SubDocKey(DocKey([], ["a"]), [HT(1)]) -> {}
134
      // SubDocKey(DocKey([], ["a"]), ["y", HT(35)]) -> "lD\x97\xaf^m\x0a1\xa0\xfc\xc8YM"
135
      //
136
      // Caveat (04/17/2017): the HybridTime encoding in the above example is outdated.
137
      //
138
      // In the above layout, if we try to set "a.y.x" to a new value, we first seek to the
139
      // document key "a" and find that it exists, but then we seek to "a.y" and find that it
140
      // also exists as a primitive value (assuming we don't check the hybrid_time), and
141
      // therefore we can't create "a.y.x", which would be incorrect.
142
0
      subdoc_exists_ = false;
143
525
    } else {
144
525
      cache_.Put(key_prefix_, current_entry_);
145
525
      subdoc_exists_ = current_entry_.value_type != ValueType::kTombstone;
146
525
    }
147
525
  }
148
525
  return Status::OK();
149
525
}
150
151
Result<bool> DocWriteBatch::SetPrimitiveInternalHandleUserTimestamp(
152
    const Value &value,
153
42.3M
    LazyIterator* iter) {
154
42.3M
  bool should_apply = true;
155
42.3M
  auto user_timestamp = value.user_timestamp();
156
42.3M
  if (user_timestamp != Value::kInvalidUserTimestamp) {
157
    // Seek for the older version of the key that we're about to write to. This is essentially a
158
    // NOOP if we've already performed the seek due to the cache.
159
390
    RETURN_NOT_OK(SeekToKeyPrefix(iter));
160
    // We'd like to include tombstones in our timestamp comparisons as well.
161
390
    if ((subdoc_exists_ || current_entry_.value_type == ValueType::kTombstone) &&
162
282
        current_entry_.found_exact_key_prefix) {
163
143
      if (current_entry_.user_timestamp != Value::kInvalidUserTimestamp) {
164
95
        should_apply = user_timestamp >= current_entry_.user_timestamp;
165
48
      } else {
166
        // Look at the hybrid time instead.
167
48
        const DocHybridTime& doc_hybrid_time = current_entry_.doc_hybrid_time;
168
48
        if (doc_hybrid_time.hybrid_time().is_valid()) {
169
48
          should_apply =
170
48
              user_timestamp >= 0 &&
171
48
              implicit_cast<size_t>(user_timestamp) >=
172
48
                  doc_hybrid_time.hybrid_time().GetPhysicalValueMicros();
173
48
        }
174
48
      }
175
143
    }
176
390
  }
177
42.3M
  return should_apply;
178
42.3M
}
179
180
namespace {
181
182
Status AppendToKeySafely(
183
21.1M
    const PrimitiveValue& subkey, const DocPath& doc_path, KeyBytes* key_bytes) {
184
21.1M
  if (subkey.value_type() == ValueType::kTombstone) {
185
    // See https://github.com/yugabyte/yugabyte-db/issues/7835. By returning an error we are
186
    // avoiding a tablet server crash even if the root cause is not clear.
187
0
    auto status = STATUS_FORMAT(
188
0
        IllegalState, "ValueType::kTombstone not allowed in keys. doc_path: $0", doc_path);
189
0
    YB_LOG_EVERY_N_SECS(WARNING, 5) << status;
190
0
    return status;
191
0
  }
192
21.1M
  subkey.AppendToKey(key_bytes);
193
21.1M
  return Status::OK();
194
21.1M
}
195
196
}  // namespace
197
198
CHECKED_STATUS DocWriteBatch::SetPrimitiveInternal(
199
    const DocPath& doc_path,
200
    const Value& value,
201
    LazyIterator* iter,
202
    const bool is_deletion,
203
21.3M
    const size_t num_subkeys) {
204
  // The write_id is always incremented by one for each new element of the write batch.
205
21.3M
  if (put_batch_.size() > numeric_limits<IntraTxnWriteId>::max()) {
206
0
    return STATUS_SUBSTITUTE(
207
0
        NotSupported,
208
0
        "Trying to add more than $0 key/value pairs in the same single-shard txn.",
209
0
        numeric_limits<IntraTxnWriteId>::max());
210
0
  }
211
212
21.3M
  if (value.has_user_timestamp() && !optional_init_markers()) {
213
0
    return STATUS(IllegalState,
214
0
                  "User Timestamp is only supported for Optional Init Markers");
215
0
  }
216
217
  // We need the write_id component of DocHybridTime to disambiguate between writes in the same
218
  // WriteBatch, as they will have the same HybridTime when committed. E.g. if we insert, delete,
219
  // and re-insert the same column in one WriteBatch, we need to know the order of these operations.
220
21.3M
  const auto write_id = static_cast<IntraTxnWriteId>(put_batch_.size());
221
21.3M
  const DocHybridTime hybrid_time = DocHybridTime(HybridTime::kMax, write_id);
222
223
42.4M
  for (size_t subkey_index = 0; subkey_index < num_subkeys; ++subkey_index) {
224
21.1M
    const PrimitiveValue& subkey = doc_path.subkey(subkey_index);
225
226
    // We don't need to check if intermediate documents already exist if init markers are optional,
227
    // or if we already know they exist (either from previous reads or our own writes in the same
228
    // single-shard operation.)
229
230
21.1M
    if (optional_init_markers() || subdoc_exists_) {
231
21.1M
      if (required_init_markers() && !IsObjectType(current_entry_.value_type)) {
232
        // REDIS
233
        // ~~~~~
234
        // We raise this error only if init markers are mandatory.
235
0
        return STATUS_FORMAT(IllegalState,
236
0
                             "Cannot set values inside a subdocument of type $0",
237
0
                             current_entry_.value_type);
238
0
      }
239
21.1M
      if (optional_init_markers()) {
240
        // CASSANDRA
241
        // ~~~~~~~~~
242
        // In the case where init markers are optional, we don't need to check existence of
243
        // the current subdocument. Although if we have a user timestamp specified, we need to
244
        // check whether the provided user timestamp is higher than what is already present. If
245
        // an intermediate subdocument is found with a higher timestamp, we consider it as an
246
        // overwrite and skip the entire write.
247
21.0M
        auto should_apply = SetPrimitiveInternalHandleUserTimestamp(value, iter);
248
21.0M
        RETURN_NOT_OK(should_apply);
249
21.0M
        if (!should_apply.get()) {
250
4
          return Status::OK();
251
4
        }
252
253
21.0M
        RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
254
66.0k
      } else if (subkey_index == num_subkeys - 1 && !is_deletion) {
255
        // REDIS
256
        // ~~~~~
257
        // We don't need to perform a RocksDB read at the last level for upserts, we just overwrite
258
        // the value within the last subdocument with what we're trying to write. We still perform
259
        // the read for deletions, because we try to avoid writing a new tombstone if the data is
260
        // not there anyway.
261
45.1k
        if (!subdoc_exists_) {
262
0
          return STATUS(IllegalState, "Subdocument is supposed to exist.");
263
0
        }
264
45.1k
        if (!IsObjectType(current_entry_.value_type)) {
265
0
          return STATUS(IllegalState, "Expected object subdocument type.");
266
0
        }
267
45.1k
        RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
268
20.8k
      } else {
269
        // REDIS
270
        // ~~~~~
271
        // We need to check if the subdocument at this subkey exists.
272
20.8k
        if (!subdoc_exists_) {
273
0
          return STATUS(IllegalState, "Subdocument is supposed to exist. $0");
274
0
        }
275
20.8k
        if (!IsObjectType(current_entry_.value_type)) {
276
0
          return STATUS(IllegalState, "Expected object subdocument type. $0");
277
0
        }
278
20.8k
        RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
279
20.8k
        RETURN_NOT_OK(SeekToKeyPrefix(iter, true));
280
20.8k
        if (is_deletion && !subdoc_exists_) {
281
          // A parent subdocument of the value we're trying to delete, or that value itself, does
282
          // not exist, nothing to do.
283
          //
284
          // TODO: in Redis's HDEL command we need to count the number of fields deleted, so we need
285
          // to count the deletes that are actually happening.
286
          // See http://redis.io/commands/hdel
287
0
          DOCDB_DEBUG_LOG("Subdocument does not exist at subkey level $0 (subkey: $1)",
288
0
                          subkey_index, subkey.ToString());
289
0
          return Status::OK();
290
0
        }
291
52.8k
      }
292
52.8k
    } else {
293
      // REDIS
294
      // ~~~~~
295
      // The subdocument at the current level does not exist.
296
52.8k
      if (is_deletion) {
297
        // A parent subdocument of the subdocument we're trying to delete does not exist, nothing
298
        // to do.
299
0
        return Status::OK();
300
0
      }
301
302
52.8k
      DCHECK(!value.has_user_timestamp());
303
304
      // Add the parent key to key/value batch before appending the encoded HybridTime to it.
305
      // (We replicate key/value pairs without the HybridTime and only add it before writing to
306
      // RocksDB.)
307
52.8k
      put_batch_.emplace_back(key_prefix_.ToStringBuffer(), string(1, ValueTypeAsChar::kObject));
308
309
      // Update our local cache to record the fact that we're adding this subdocument, so that
310
      // future operations in this DocWriteBatch don't have to add it or look for it in RocksDB.
311
52.8k
      cache_.Put(key_prefix_, hybrid_time, ValueType::kObject);
312
52.8k
      RETURN_NOT_OK(AppendToKeySafely(subkey, doc_path, &key_prefix_));
313
52.8k
    }
314
21.1M
  }
315
316
  // We need to handle the user timestamp if present.
317
21.3M
  auto should_apply = SetPrimitiveInternalHandleUserTimestamp(value, iter);
318
21.3M
  RETURN_NOT_OK(should_apply);
319
21.3M
  if (should_apply.get()) {
320
    // The key in the key/value batch does not have an encoded HybridTime.
321
21.3M
    put_batch_.emplace_back(key_prefix_.ToStringBuffer(), value.Encode());
322
323
    // The key we use in the DocWriteBatchCache does not have a final hybrid_time, because that's
324
    // the key we expect to look up.
325
21.3M
    cache_.Put(key_prefix_, hybrid_time, value.primitive_value().value_type(),
326
21.3M
               value.user_timestamp());
327
21.3M
  }
328
329
21.3M
  return Status::OK();
330
21.3M
}
331
332
Status DocWriteBatch::SetPrimitive(
333
    const DocPath& doc_path,
334
    const Value& value,
335
21.3M
    LazyIterator* iter) {
336
21.3M
  DOCDB_DEBUG_LOG("Called SetPrimitive with doc_path=$0, value=$1",
337
21.3M
                  doc_path.ToString(), value.ToString());
338
21.3M
  current_entry_.doc_hybrid_time = DocHybridTime::kMin;
339
21.3M
  const auto num_subkeys = doc_path.num_subkeys();
340
21.3M
  const bool is_deletion = value.primitive_value().value_type() == ValueType::kTombstone;
341
342
21.3M
  key_prefix_ = doc_path.encoded_doc_key();
343
344
  // If we are overwriting an entire document with a primitive value (not deleting it), we don't
345
  // need to perform any reads from RocksDB at all.
346
  //
347
  // Even if we are deleting a document, but we don't need to get any feedback on whether the
348
  // deletion was performed or the document was not there to begin with, we could also skip the
349
  // read as an optimization.
350
21.3M
  if (num_subkeys > 0 || is_deletion) {
351
21.2M
    if (required_init_markers()) {
352
      // Navigate to the root of the document. We don't yet know whether the document exists or when
353
      // it was last updated.
354
76.4k
      RETURN_NOT_OK(SeekToKeyPrefix(iter, false));
355
76.4k
      DOCDB_DEBUG_LOG("Top-level document exists: $0", subdoc_exists_);
356
76.4k
      if (!subdoc_exists_ && is_deletion) {
357
0
        DOCDB_DEBUG_LOG("We're performing a deletion, and the document is not present. "
358
0
                        "Nothing to do.");
359
0
        return Status::OK();
360
0
      }
361
21.3M
    }
362
21.2M
  }
363
21.3M
  return SetPrimitiveInternal(doc_path, value, iter, is_deletion, num_subkeys);
364
21.3M
}
365
366
Status DocWriteBatch::SetPrimitive(const DocPath& doc_path,
367
                                   const Value& value,
368
                                   const ReadHybridTime& read_ht,
369
                                   CoarseTimePoint deadline,
370
21.3M
                                   rocksdb::QueryId query_id) {
371
21.3M
  DOCDB_DEBUG_LOG("Called with doc_path=$0, value=$1",
372
21.3M
                  doc_path.ToString(), value.ToString());
373
374
21.3M
  std::function<std::unique_ptr<IntentAwareIterator>()> createrator =
375
31.4k
    [doc_path, query_id, deadline, read_ht, this]() {
376
31.4k
      return yb::docdb::CreateIntentAwareIterator(
377
31.4k
          doc_db_,
378
31.4k
          BloomFilterMode::USE_BLOOM_FILTER,
379
31.4k
          doc_path.encoded_doc_key().AsSlice(),
380
31.4k
          query_id,
381
31.4k
          TransactionOperationContext(),
382
31.4k
          deadline,
383
31.4k
          read_ht);
384
31.4k
    };
385
386
21.3M
  LazyIterator iter(&createrator);
387
388
21.3M
  return SetPrimitive(doc_path, value, &iter);
389
21.3M
}
390
391
Status DocWriteBatch::ExtendSubDocument(
392
    const DocPath& doc_path,
393
    const SubDocument& value,
394
    const ReadHybridTime& read_ht,
395
    const CoarseTimePoint deadline,
396
    rocksdb::QueryId query_id,
397
    MonoDelta ttl,
398
15.3M
    UserTimeMicros user_timestamp) {
399
15.3M
  if (IsObjectType(value.value_type())) {
400
52.2k
    const auto& map = value.object_container();
401
108k
    for (const auto& ent : map) {
402
108k
      DocPath child_doc_path = doc_path;
403
108k
      if (ent.first.value_type() != ValueType::kArray)
404
108k
          child_doc_path.AddSubKey(ent.first);
405
108k
      RETURN_NOT_OK(ExtendSubDocument(child_doc_path, ent.second,
406
108k
                                      read_ht, deadline, query_id, ttl, user_timestamp));
407
108k
    }
408
15.3M
  } else if (value.value_type() == ValueType::kArray) {
409
64
    RETURN_NOT_OK(ExtendList(
410
64
        doc_path, value, read_ht, deadline, query_id, ttl, user_timestamp));
411
15.3M
  } else {
412
15.3M
    if (!value.IsTombstoneOrPrimitive()) {
413
0
      return STATUS_FORMAT(
414
0
          InvalidArgument,
415
0
          "Found unexpected value type $0. Expecting a PrimitiveType or a Tombstone",
416
0
          value.value_type());
417
0
    }
418
15.3M
    RETURN_NOT_OK(SetPrimitive(doc_path, Value(value, ttl, user_timestamp),
419
15.3M
                               read_ht, deadline, query_id));
420
15.3M
  }
421
15.3M
  UpdateMaxValueTtl(ttl);
422
15.3M
  return Status::OK();
423
15.3M
}
424
425
Status DocWriteBatch::InsertSubDocument(
426
    const DocPath& doc_path,
427
    const SubDocument& value,
428
    const ReadHybridTime& read_ht,
429
    const CoarseTimePoint deadline,
430
    rocksdb::QueryId query_id,
431
    MonoDelta ttl,
432
    UserTimeMicros user_timestamp,
433
15.2M
    bool init_marker_ttl) {
434
15.2M
  if (!value.IsTombstoneOrPrimitive()) {
435
10.7k
    auto key_ttl = init_marker_ttl ? ttl : Value::kMaxTtl;
436
10.8k
    RETURN_NOT_OK(SetPrimitive(
437
10.8k
        doc_path, Value(PrimitiveValue(value.value_type()), key_ttl, user_timestamp),
438
10.8k
        read_ht, deadline, query_id));
439
10.8k
  }
440
15.2M
  return ExtendSubDocument(doc_path, value, read_ht, deadline, query_id, ttl, user_timestamp);
441
15.2M
}
442
443
Status DocWriteBatch::ExtendList(
444
    const DocPath& doc_path,
445
    const SubDocument& value,
446
    const ReadHybridTime& read_ht,
447
    const CoarseTimePoint deadline,
448
    rocksdb::QueryId query_id,
449
    MonoDelta ttl,
450
78
    UserTimeMicros user_timestamp) {
451
78
  if (monotonic_counter_ == nullptr) {
452
0
    return STATUS(IllegalState, "List cannot be extended if monotonic_counter_ is uninitialized");
453
0
  }
454
78
  if (value.value_type() != ValueType::kArray) {
455
0
    return STATUS_FORMAT(
456
0
        InvalidArgument,
457
0
        "Expecting Subdocument of type kArray, found $0",
458
0
        value.value_type());
459
0
  }
460
78
  const std::vector<SubDocument>& list = value.array_container();
461
  // It is assumed that there is an exclusive lock on the list key.
462
  // The lock ensures that there isn't another thread picking ArrayIndexes for the same list.
463
  // No additional lock is required.
464
78
  int64_t index =
465
78
      std::atomic_fetch_add(monotonic_counter_, static_cast<int64_t>(list.size()));
466
  // PREPEND - adding in reverse order with negated index
467
78
  if (value.GetExtendOrder() == ListExtendOrder::PREPEND_BLOCK) {
468
10
    for (size_t i = list.size(); i > 0; i--) {
469
5
      DocPath child_doc_path = doc_path;
470
5
      index++;
471
5
      child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(-index));
472
5
      RETURN_NOT_OK(ExtendSubDocument(child_doc_path, list[i - 1],
473
5
                                      read_ht, deadline, query_id, ttl, user_timestamp));
474
5
    }
475
73
  } else {
476
201
    for (size_t i = 0; i < list.size(); i++) {
477
128
      DocPath child_doc_path = doc_path;
478
128
      index++;
479
128
      child_doc_path.AddSubKey(PrimitiveValue::ArrayIndex(
480
128
          value.GetExtendOrder() == ListExtendOrder::APPEND ? index : -index));
481
128
      RETURN_NOT_OK(ExtendSubDocument(child_doc_path, list[i],
482
128
                                      read_ht, deadline, query_id, ttl, user_timestamp));
483
128
    }
484
73
  }
485
78
  return Status::OK();
486
78
}
487
488
Status DocWriteBatch::ReplaceRedisInList(
489
    const DocPath &doc_path,
490
    const std::vector<int64_t>& indices,
491
    const std::vector<SubDocument>& values,
492
    const ReadHybridTime& read_ht,
493
    const CoarseTimePoint deadline,
494
    const rocksdb::QueryId query_id,
495
    const Direction dir,
496
    const int64_t start_index,
497
    std::vector<string>* results,
498
    MonoDelta default_ttl,
499
0
    MonoDelta write_ttl) {
500
0
  SubDocKey sub_doc_key;
501
0
  RETURN_NOT_OK(sub_doc_key.FromDocPath(doc_path));
502
0
  key_prefix_ = sub_doc_key.Encode();
503
504
0
  auto iter = yb::docdb::CreateIntentAwareIterator(
505
0
      doc_db_,
506
0
      BloomFilterMode::USE_BLOOM_FILTER,
507
0
      key_prefix_.AsSlice(),
508
0
      query_id,
509
0
      TransactionOperationContext(),
510
0
      deadline,
511
0
      read_ht);
512
513
0
  Slice value_slice;
514
0
  SubDocKey found_key;
515
0
  auto current_index = start_index;
516
0
  size_t replace_index = 0;
517
518
0
  if (dir == Direction::kForward) {
519
    // Ensure we seek directly to indices and skip init marker if it exists.
520
0
    key_prefix_.AppendValueType(ValueType::kArrayIndex);
521
0
    RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false));
522
0
  } else {
523
    // We would like to seek past the entire list and go backwards.
524
0
    key_prefix_.AppendValueType(ValueType::kMaxByte);
525
0
    iter->PrevSubDocKey(key_prefix_);
526
0
    key_prefix_.RemoveValueTypeSuffix(ValueType::kMaxByte);
527
0
    key_prefix_.AppendValueType(ValueType::kArrayIndex);
528
0
  }
529
530
0
  FetchKeyResult key_data;
531
0
  while (true) {
532
0
    if (indices[replace_index] <= 0 || !iter->valid() ||
533
0
        !(key_data = VERIFY_RESULT(iter->FetchKey())).key.starts_with(key_prefix_)) {
534
0
      return STATUS_SUBSTITUTE(Corruption,
535
0
          "Index Error: $0, reached beginning of list with size $1",
536
0
          indices[replace_index] - 1, // YQL layer list index starts from 0, not 1 as in DocDB.
537
0
          current_index);
538
0
    }
539
540
0
    RETURN_NOT_OK(found_key.FullyDecodeFrom(key_data.key, HybridTimeRequired::kFalse));
541
542
0
    MonoDelta entry_ttl;
543
0
    ValueType value_type;
544
0
    value_slice = iter->value();
545
0
    RETURN_NOT_OK(Value::DecodePrimitiveValueType(value_slice, &value_type, nullptr, &entry_ttl));
546
547
0
    if (value_type == ValueType::kTombstone) {
548
0
      found_key.KeepPrefix(sub_doc_key.num_subkeys() + 1);
549
0
      if (dir == Direction::kForward) {
550
0
        iter->SeekPastSubKey(key_data.key);
551
0
      } else {
552
0
        iter->PrevSubDocKey(KeyBytes(key_data.key));
553
0
      }
554
0
      continue;
555
0
    }
556
557
    // TODO (rahul): it may be cleaner to put this in the read path.
558
    // The code below is meant specifically for POP functionality in Redis lists.
559
0
    if (results) {
560
0
      Value v;
561
0
      RETURN_NOT_OK(v.Decode(iter->value()));
562
0
      results->push_back(v.primitive_value().GetString());
563
0
    }
564
565
0
    if (dir == Direction::kForward)
566
0
      current_index++;
567
0
    else
568
0
      current_index--;
569
570
    // Should we verify that the subkeys are indeed numbers as list indices should be?
571
    // Or just go in order for the index'th largest key in any subdocument?
572
0
    if (current_index == indices[replace_index]) {
573
      // When inserting, key_prefix_ is modified.
574
0
      KeyBytes array_index_prefix(key_prefix_);
575
0
      DocPath child_doc_path = doc_path;
576
0
      child_doc_path.AddSubKey(found_key.subkeys()[sub_doc_key.num_subkeys()]);
577
0
      RETURN_NOT_OK(InsertSubDocument(child_doc_path, values[replace_index],
578
0
                                      read_ht, deadline, query_id, write_ttl));
579
0
      replace_index++;
580
0
      if (replace_index == indices.size()) {
581
0
        return Status::OK();
582
0
      }
583
0
      key_prefix_ = array_index_prefix;
584
0
    }
585
586
0
    if (dir == Direction::kForward) {
587
0
      iter->SeekPastSubKey(key_data.key);
588
0
    } else {
589
0
      iter->PrevSubDocKey(KeyBytes(key_data.key));
590
0
    }
591
0
  }
592
0
}
593
594
15.3M
void DocWriteBatch::UpdateMaxValueTtl(const MonoDelta& ttl) {
595
  // Don't update the max value TTL if the value is uninitialized or if it is set to
596
  // kMaxTtl (i.e. use table TTL).
597
15.3M
  if (!ttl.Initialized() || ttl.Equals(Value::kMaxTtl)) {
598
15.3M
    return;
599
15.3M
  }
600
2.71k
  if (!ttl_.Initialized() || ttl > ttl_) {
601
92
    ttl_ = ttl;
602
92
  }
603
2.71k
}
604
605
Status DocWriteBatch::ReplaceCqlInList(
606
    const DocPath& doc_path,
607
    const int target_cql_index,
608
    const SubDocument& value,
609
    const ReadHybridTime& read_ht,
610
    const CoarseTimePoint deadline,
611
    const rocksdb::QueryId query_id,
612
    MonoDelta default_ttl,
613
15
    MonoDelta write_ttl) {
614
15
  SubDocKey sub_doc_key;
615
15
  RETURN_NOT_OK(sub_doc_key.FromDocPath(doc_path));
616
15
  key_prefix_ = sub_doc_key.Encode();
617
618
15
  auto iter = yb::docdb::CreateIntentAwareIterator(
619
15
      doc_db_,
620
15
      BloomFilterMode::USE_BLOOM_FILTER,
621
15
      key_prefix_.AsSlice(),
622
15
      query_id,
623
15
      TransactionOperationContext(),
624
15
      deadline,
625
15
      read_ht);
626
627
15
  RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false));
628
629
15
  if (!iter->valid()) {
630
2
    return STATUS(QLError, "Unable to replace items in empty list.");
631
2
  }
632
633
13
  auto current_key = VERIFY_RESULT(iter->FetchKey());
634
  // Note that the only case we should have a collection without an init marker is if the collection
635
  // was created with upsert semantics. e.g.:
636
  // UPDATE foo SET v = v + [1, 2] WHERE k = 1
637
  // If the value v at row k = 1 did not exist before, then it will be written without an init
638
  // marker. In this case, using DocHybridTime::kMin is valid, as it has the effect of treating each
639
  // collection item found in DocDB as if there were no higher-level overwrite or invalidation of
640
  // it.
641
13
  auto current_key_is_init_marker = current_key.key.compare(key_prefix_) == 0;
642
13
  auto collection_write_time = current_key_is_init_marker
643
13
      ? current_key.write_time : DocHybridTime::kMin;
644
645
13
  Slice value_slice;
646
13
  SubDocKey found_key;
647
13
  int current_cql_index = 0;
648
649
  // Seek past init marker if it exists.
650
13
  key_prefix_.AppendValueType(ValueType::kArrayIndex);
651
13
  RETURN_NOT_OK(SeekToKeyPrefix(iter.get(), false));
652
653
13
  FetchKeyResult key_data;
654
27
  while (true) {
655
27
    if (target_cql_index < 0 || !iter->valid() ||
656
26
        !(key_data = VERIFY_RESULT(iter->FetchKey())).key.starts_with(key_prefix_)) {
657
3
      return STATUS_SUBSTITUTE(
658
3
          QLError,
659
3
          "Unable to replace items into list, expecting index $0, reached end of list with size $1",
660
3
          target_cql_index,
661
3
          current_cql_index);
662
3
    }
663
664
24
    RETURN_NOT_OK(found_key.FullyDecodeFrom(key_data.key, HybridTimeRequired::kFalse));
665
666
24
    MonoDelta entry_ttl;
667
24
    ValueType value_type;
668
24
    value_slice = iter->value();
669
24
    RETURN_NOT_OK(Value::DecodePrimitiveValueType(value_slice, &value_type, nullptr, &entry_ttl));
670
671
24
    bool has_expired = false;
672
24
    if (value_type == ValueType::kTombstone || key_data.write_time < collection_write_time) {
673
2
      has_expired = true;
674
22
    } else {
675
22
      entry_ttl = ComputeTTL(entry_ttl, default_ttl);
676
22
      has_expired = HasExpiredTTL(key_data.write_time.hybrid_time(), entry_ttl, read_ht.read);
677
22
    }
678
679
24
    if (has_expired) {
680
3
      found_key.KeepPrefix(sub_doc_key.num_subkeys() + 1);
681
3
      iter->SeekPastSubKey(key_data.key);
682
3
      continue;
683
3
    }
684
685
    // Should we verify that the subkeys are indeed numbers as list indices should be?
686
    // Or just go in order for the index'th largest key in any subdocument?
687
21
    if (current_cql_index == target_cql_index) {
688
      // When inserting, key_prefix_ is modified.
689
10
      KeyBytes array_index_prefix(key_prefix_);
690
10
      DocPath child_doc_path = doc_path;
691
10
      child_doc_path.AddSubKey(found_key.subkeys()[sub_doc_key.num_subkeys()]);
692
10
      RETURN_NOT_OK(
693
10
          InsertSubDocument(child_doc_path, value, read_ht, deadline, query_id, write_ttl));
694
10
      return Status::OK();
695
11
    }
696
697
11
    current_cql_index++;
698
11
    iter->SeekPastSubKey(key_data.key);
699
11
  }
700
13
}
701
702
0
void DocWriteBatch::Clear() {
703
0
  put_batch_.clear();
704
0
  cache_.Clear();
705
0
}
706
707
1.64M
void DocWriteBatch::MoveToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) {
708
1.64M
  kv_pb->mutable_write_pairs()->Reserve(narrow_cast<int>(put_batch_.size()));
709
21.3M
  for (auto& entry : put_batch_) {
710
21.3M
    KeyValuePairPB* kv_pair = kv_pb->add_write_pairs();
711
21.3M
    kv_pair->mutable_key()->swap(entry.first);
712
21.3M
    kv_pair->mutable_value()->swap(entry.second);
713
21.3M
  }
714
1.64M
  if (has_ttl()) {
715
92
    kv_pb->set_ttl(ttl_ns());
716
92
  }
717
1.64M
}
718
719
0
void DocWriteBatch::TEST_CopyToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) const {
720
0
  kv_pb->mutable_write_pairs()->Reserve(narrow_cast<int>(put_batch_.size()));
721
0
  for (auto& entry : put_batch_) {
722
0
    KeyValuePairPB* kv_pair = kv_pb->add_write_pairs();
723
0
    kv_pair->mutable_key()->assign(entry.first);
724
0
    kv_pair->mutable_value()->assign(entry.second);
725
0
  }
726
0
  if (has_ttl()) {
727
0
    kv_pb->set_ttl(ttl_ns());
728
0
  }
729
0
}
730
731
// ------------------------------------------------------------------------------------------------
732
// Converting a RocksDB write batch to a string.
733
// ------------------------------------------------------------------------------------------------
734
735
class DocWriteBatchFormatter : public WriteBatchFormatter {
736
 public:
737
  DocWriteBatchFormatter(
738
      StorageDbType storage_db_type,
739
      BinaryOutputFormat binary_output_format,
740
      WriteBatchOutputFormat batch_output_format,
741
      std::string line_prefix)
742
      : WriteBatchFormatter(binary_output_format, batch_output_format, std::move(line_prefix)),
743
0
        storage_db_type_(storage_db_type) {}
744
 protected:
745
0
  std::string FormatKey(const Slice& key) override {
746
0
    const auto key_result = DocDBKeyToDebugStr(key, storage_db_type_);
747
0
    if (key_result.ok()) {
748
0
      return *key_result;
749
0
    }
750
0
    return Format(
751
0
        "$0 (error: $1)",
752
0
        WriteBatchFormatter::FormatKey(key),
753
0
        key_result.status());
754
0
  }
755
756
0
  std::string FormatValue(const Slice& key, const Slice& value) override {
757
0
    auto key_type = GetKeyType(key, storage_db_type_);
758
0
    const auto value_result = DocDBValueToDebugStr(key_type, key, value);
759
0
    if (value_result.ok()) {
760
0
      return *value_result;
761
0
    }
762
0
    return Format(
763
0
        "$0 (error: $1)",
764
0
        WriteBatchFormatter::FormatValue(key, value),
765
0
        value_result.status());
766
0
  }
767
768
 private:
769
  StorageDbType storage_db_type_;
770
};
771
772
Result<std::string> WriteBatchToString(
773
    const rocksdb::WriteBatch& write_batch,
774
    StorageDbType storage_db_type,
775
    BinaryOutputFormat binary_output_format,
776
    WriteBatchOutputFormat batch_output_format,
777
0
    const std::string& line_prefix) {
778
0
  DocWriteBatchFormatter formatter(
779
0
      storage_db_type, binary_output_format, batch_output_format, line_prefix);
780
0
  RETURN_NOT_OK(write_batch.Iterate(&formatter));
781
0
  return formatter.str();
782
0
}
783
784
}  // namespace docdb
785
}  // namespace yb