YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb.h"
15
16
#include <algorithm>
17
#include <limits>
18
#include <memory>
19
#include <stack>
20
#include <string>
21
#include <unordered_map>
22
#include <vector>
23
24
#include "yb/common/hybrid_time.h"
25
#include "yb/common/row_mark.h"
26
#include "yb/common/transaction.h"
27
28
#include "yb/docdb/conflict_resolution.h"
29
#include "yb/docdb/cql_operation.h"
30
#include "yb/docdb/docdb-internal.h"
31
#include "yb/docdb/docdb.pb.h"
32
#include "yb/docdb/docdb_debug.h"
33
#include "yb/docdb/doc_kv_util.h"
34
#include "yb/docdb/docdb_rocksdb_util.h"
35
#include "yb/docdb/docdb_types.h"
36
#include "yb/docdb/intent.h"
37
#include "yb/docdb/intent_aware_iterator.h"
38
#include "yb/docdb/pgsql_operation.h"
39
#include "yb/docdb/rocksdb_writer.h"
40
#include "yb/docdb/subdocument.h"
41
#include "yb/docdb/value.h"
42
#include "yb/docdb/value_type.h"
43
44
#include "yb/gutil/casts.h"
45
#include "yb/gutil/strings/substitute.h"
46
47
#include "yb/rocksutil/write_batch_formatter.h"
48
49
#include "yb/server/hybrid_clock.h"
50
51
#include "yb/util/bitmap.h"
52
#include "yb/util/bytes_formatter.h"
53
#include "yb/util/enums.h"
54
#include "yb/util/fast_varint.h"
55
#include "yb/util/flag_tags.h"
56
#include "yb/util/logging.h"
57
#include "yb/util/metrics.h"
58
#include "yb/util/pb_util.h"
59
#include "yb/util/status.h"
60
#include "yb/util/status_format.h"
61
#include "yb/util/status_log.h"
62
63
#include "yb/yql/cql/ql/util/errcodes.h"
64
65
using std::endl;
66
using std::list;
67
using std::string;
68
using std::stringstream;
69
using std::unique_ptr;
70
using std::shared_ptr;
71
using std::stack;
72
using std::vector;
73
using std::make_shared;
74
75
using yb::HybridTime;
76
using yb::FormatBytesAsStr;
77
using strings::Substitute;
78
79
using namespace std::placeholders;
80
81
DEFINE_int32(cdc_max_stream_intent_records, 1000,
82
             "Max number of intent records allowed in single cdc batch. ");
83
84
namespace yb {
85
namespace docdb {
86
87
namespace {
88
89
// key should be valid prefix of doc key, ending with some complete pritimive value or group end.
90
CHECKED_STATUS ApplyIntent(RefCntPrefix key,
91
                           const IntentTypeSet intent_types,
92
20.6M
                           LockBatchEntries *keys_locked) {
93
  // Have to strip kGroupEnd from end of key, because when only hash key is specified, we will
94
  // get two kGroupEnd at end of strong intent.
95
20.6M
  size_t size = key.size();
96
20.6M
  if (size > 0) {
97
18.3M
    if (key.data()[0] == ValueTypeAsChar::kGroupEnd) {
98
1.43M
      if (size != 1) {
99
0
        return STATUS_FORMAT(Corruption, "Key starting with group end: $0",
100
0
            key.as_slice().ToDebugHexString());
101
0
      }
102
1.43M
      size = 0;
103
16.9M
    } else {
104
31.0M
      while (key.data()[size - 1] == ValueTypeAsChar::kGroupEnd) {
105
14.1M
        --size;
106
14.1M
      }
107
16.9M
    }
108
18.3M
  }
109
20.6M
  key.Resize(size);
110
20.6M
  keys_locked->push_back({key, intent_types});
111
20.6M
  return Status::OK();
112
20.6M
}
113
114
struct DetermineKeysToLockResult {
115
  LockBatchEntries lock_batch;
116
  bool need_read_snapshot;
117
118
1
  std::string ToString() const {
119
1
    return YB_STRUCT_TO_STRING(lock_batch, need_read_snapshot);
120
1
  }
121
};
122
123
Result<DetermineKeysToLockResult> DetermineKeysToLock(
124
    const std::vector<std::unique_ptr<DocOperation>>& doc_write_ops,
125
    const google::protobuf::RepeatedPtrField<KeyValuePairPB>& read_pairs,
126
    const IsolationLevel isolation_level,
127
    const OperationKind operation_kind,
128
    const RowMarkType row_mark_type,
129
    bool transactional_table,
130
1.73M
    PartialRangeKeyIntents partial_range_key_intents) {
131
1.73M
  DetermineKeysToLockResult result;
132
1.73M
  boost::container::small_vector<RefCntPrefix, 8> doc_paths;
133
1.73M
  boost::container::small_vector<size_t, 32> key_prefix_lengths;
134
1.73M
  result.need_read_snapshot = false;
135
6.55M
  for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) {
136
6.55M
    doc_paths.clear();
137
6.55M
    IsolationLevel level;
138
6.55M
    RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kLock, &doc_paths, &level));
139
6.55M
    if (isolation_level != IsolationLevel::NON_TRANSACTIONAL) {
140
2.16M
      level = isolation_level;
141
2.16M
    }
142
6.55M
    IntentTypeSet strong_intent_types = GetStrongIntentTypeSet(level, operation_kind,
143
6.55M
                                                               row_mark_type);
144
6.55M
    if (isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION &&
145
58.4k
        operation_kind == OperationKind::kWrite &&
146
58.5k
        doc_op->RequireReadSnapshot()) {
147
30.1k
      strong_intent_types = IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite});
148
30.1k
    }
149
150
6.56M
    for (const auto& doc_path : doc_paths) {
151
6.56M
      key_prefix_lengths.clear();
152
6.56M
      RETURN_NOT_OK(SubDocKey::DecodePrefixLengths(doc_path.as_slice(), &key_prefix_lengths));
153
      // At least entire doc_path should be returned, so empty key_prefix_lengths is an error.
154
6.56M
      if (key_prefix_lengths.empty()) {
155
0
        return STATUS_FORMAT(Corruption, "Unable to decode key prefixes from: $0",
156
0
                             doc_path.as_slice().ToDebugHexString());
157
0
      }
158
      // We will acquire strong lock on entire doc_path, so remove it from list of weak locks.
159
6.56M
      key_prefix_lengths.pop_back();
160
6.56M
      auto partial_key = doc_path;
161
      // Acquire weak lock on empty key for transactional tables,
162
      // unless specified key is already empty.
163
6.56M
      if (doc_path.size() > 0 && transactional_table) {
164
2.34M
        partial_key.Resize(0);
165
2.34M
        RETURN_NOT_OK(ApplyIntent(
166
2.34M
            partial_key, StrongToWeak(strong_intent_types), &result.lock_batch));
167
2.34M
      }
168
8.87M
      for (size_t prefix_length : key_prefix_lengths) {
169
8.87M
        partial_key.Resize(prefix_length);
170
8.87M
        RETURN_NOT_OK(ApplyIntent(
171
8.87M
            partial_key, StrongToWeak(strong_intent_types), &result.lock_batch));
172
8.87M
      }
173
174
6.56M
      RETURN_NOT_OK(ApplyIntent(doc_path, strong_intent_types, &result.lock_batch));
175
6.56M
    }
176
177
6.55M
    if (doc_op->RequireReadSnapshot()) {
178
1.98M
      result.need_read_snapshot = true;
179
1.98M
    }
180
6.55M
  }
181
182
1.73M
  if (!read_pairs.empty()) {
183
140k
    RETURN_NOT_OK(EnumerateIntents(
184
140k
        read_pairs,
185
140k
        [&result](IntentStrength strength, FullDocKey, Slice value, KeyBytes* key, LastKey) {
186
140k
          RefCntPrefix prefix(key->AsSlice());
187
140k
          auto intent_types = strength == IntentStrength::kStrong
188
140k
              ? IntentTypeSet({IntentType::kStrongRead})
189
140k
              : IntentTypeSet({IntentType::kWeakRead});
190
140k
          return ApplyIntent(prefix, intent_types, &result.lock_batch);
191
140k
        }, partial_range_key_intents));
192
140k
  }
193
194
1.73M
  return result;
195
1.73M
}
196
197
// Collapse keys_locked into a unique set of keys with intent_types representing the union of
198
// intent_types originally present. In other words, suppose keys_locked is originally the following:
199
// [
200
//   (k1, {kWeakRead, kWeakWrite}),
201
//   (k1, {kStrongRead}),
202
//   (k2, {kWeakRead}),
203
//   (k3, {kStrongRead}),
204
//   (k2, {kStrongWrite}),
205
// ]
206
// Then after calling FilterKeysToLock we will have:
207
// [
208
//   (k1, {kWeakRead, kWeakWrite, kStrongRead}),
209
//   (k2, {kWeakRead}),
210
//   (k3, {kStrongRead, kStrongWrite}),
211
// ]
212
// Note that only keys which appear in order in keys_locked will be collapsed in this manner.
213
1.73M
void FilterKeysToLock(LockBatchEntries *keys_locked) {
214
1.73M
  if (keys_locked->empty()) {
215
0
    return;
216
0
  }
217
218
1.73M
  std::sort(keys_locked->begin(), keys_locked->end(),
219
145M
            [](const auto& lhs, const auto& rhs) {
220
145M
              return lhs.key < rhs.key;
221
145M
            });
222
223
1.73M
  auto w = keys_locked->begin();
224
20.6M
  for (auto it = keys_locked->begin(); ++it != keys_locked->end();) {
225
18.9M
    if (it->key == w->key) {
226
5.99M
      w->intent_types |= it->intent_types;
227
12.9M
    } else {
228
12.9M
      ++w;
229
12.9M
      *w = *it;
230
12.9M
    }
231
18.9M
  }
232
233
1.73M
  ++w;
234
1.73M
  keys_locked->erase(w, keys_locked->end());
235
1.73M
}
236
237
}  // namespace
238
239
Result<PrepareDocWriteOperationResult> PrepareDocWriteOperation(
240
    const std::vector<std::unique_ptr<DocOperation>>& doc_write_ops,
241
    const google::protobuf::RepeatedPtrField<KeyValuePairPB>& read_pairs,
242
    const scoped_refptr<Histogram>& write_lock_latency,
243
    const IsolationLevel isolation_level,
244
    const OperationKind operation_kind,
245
    const RowMarkType row_mark_type,
246
    bool transactional_table,
247
    CoarseTimePoint deadline,
248
    PartialRangeKeyIntents partial_range_key_intents,
249
1.74M
    SharedLockManager *lock_manager) {
250
1.74M
  PrepareDocWriteOperationResult result;
251
252
1.74M
  auto determine_keys_to_lock_result = VERIFY_RESULT(DetermineKeysToLock(
253
1.74M
      doc_write_ops, read_pairs, isolation_level, operation_kind, row_mark_type,
254
1.74M
      transactional_table, partial_range_key_intents));
255
4.02k
  VLOG_WITH_FUNC(4) << "determine_keys_to_lock_result=" << determine_keys_to_lock_result.ToString();
256
1.74M
  if (determine_keys_to_lock_result.lock_batch.empty()) {
257
0
    LOG(ERROR) << "Empty lock batch, doc_write_ops: " << yb::ToString(doc_write_ops)
258
0
               << ", read pairs: " << yb::ToString(read_pairs);
259
0
    return STATUS(Corruption, "Empty lock batch");
260
0
  }
261
1.74M
  result.need_read_snapshot = determine_keys_to_lock_result.need_read_snapshot;
262
263
1.74M
  FilterKeysToLock(&determine_keys_to_lock_result.lock_batch);
264
691
  VLOG_WITH_FUNC(4) << "filtered determine_keys_to_lock_result="
265
691
                    << determine_keys_to_lock_result.ToString();
266
1.73M
  const MonoTime start_time = (write_lock_latency != nullptr) ? MonoTime::Now() : MonoTime();
267
1.74M
  result.lock_batch = LockBatch(
268
1.74M
      lock_manager, std::move(determine_keys_to_lock_result.lock_batch), deadline);
269
1.74M
  RETURN_NOT_OK_PREPEND(
270
1.74M
      result.lock_batch.status(), Format("Timeout: $0", deadline - ToCoarse(start_time)));
271
1.74M
  if (write_lock_latency != nullptr) {
272
1.73M
    const MonoDelta elapsed_time = MonoTime::Now().GetDeltaSince(start_time);
273
1.73M
    write_lock_latency->Increment(elapsed_time.ToMicroseconds());
274
1.73M
  }
275
276
1.74M
  return result;
277
1.74M
}
278
279
30
Status SetDocOpQLErrorResponse(DocOperation* doc_op, string err_msg) {
280
30
  switch (doc_op->OpType()) {
281
30
    case DocOperation::Type::QL_WRITE_OPERATION: {
282
30
      const auto &resp = down_cast<QLWriteOperation *>(doc_op)->response();
283
30
      resp->set_status(QLResponsePB::YQL_STATUS_QUERY_ERROR);
284
30
      resp->set_error_message(err_msg);
285
30
      break;
286
0
    }
287
0
    case DocOperation::Type::PGSQL_WRITE_OPERATION: {
288
0
      const auto &resp = down_cast<PgsqlWriteOperation *>(doc_op)->response();
289
0
      resp->set_status(PgsqlResponsePB::PGSQL_STATUS_USAGE_ERROR);
290
0
      resp->set_error_message(err_msg);
291
0
      break;
292
0
    }
293
0
    default:
294
0
      return STATUS_FORMAT(InternalError,
295
30
                           "Invalid status (QLError) for doc operation %d",
296
30
                           doc_op->OpType());
297
30
  }
298
30
  return Status::OK();
299
30
}
300
301
Status AssembleDocWriteBatch(const vector<unique_ptr<DocOperation>>& doc_write_ops,
302
                                CoarseTimePoint deadline,
303
                                const ReadHybridTime& read_time,
304
                                const DocDB& doc_db,
305
                                KeyValueWriteBatchPB* write_batch,
306
                                InitMarkerBehavior init_marker_behavior,
307
                                std::atomic<int64_t>* monotonic_counter,
308
                                HybridTime* restart_read_ht,
309
1.64M
                                const string& table_name) {
310
1.64M
  DCHECK_ONLY_NOTNULL(restart_read_ht);
311
1.64M
  DocWriteBatch doc_write_batch(doc_db, init_marker_behavior, monotonic_counter);
312
1.64M
  DocOperationApplyData data = {&doc_write_batch, deadline, read_time, restart_read_ht};
313
6.48M
  for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) {
314
6.48M
    Status s = doc_op->Apply(data);
315
6.48M
    if (s.IsQLError()) {
316
30
      string error_msg;
317
30
      if (ql::GetErrorCode(s) == ql::ErrorCode::CONDITION_NOT_SATISFIED) {
318
        // Generating the error message here because 'table_name'
319
        // is not available on the lower level - in doc_op->Apply().
320
19
        error_msg = Format("Condition on table $0 was not satisfied.", table_name);
321
11
      } else {
322
11
        error_msg =  s.message().ToBuffer();
323
11
      }
324
325
      // Ensure we set appropriate error in the response object for QL errors.
326
30
      RETURN_NOT_OK(SetDocOpQLErrorResponse(doc_op.get(), error_msg));
327
30
      continue;
328
6.48M
    }
329
330
6.48M
    RETURN_NOT_OK(s);
331
6.48M
  }
332
1.64M
  doc_write_batch.MoveToWriteBatchPB(write_batch);
333
1.64M
  return Status::OK();
334
1.64M
}
335
336
namespace {
337
338
0
CHECKED_STATUS NotEnoughBytes(size_t present, size_t required, const Slice& full) {
339
0
  return STATUS_FORMAT(
340
0
      Corruption, "Not enough bytes in external intents $0 while $1 expected, full: $2",
341
0
      present, required, full.ToDebugHexString());
342
0
}
343
344
CHECKED_STATUS PrepareApplyExternalIntentsBatch(
345
    HybridTime commit_ht,
346
    const Slice& original_input_value,
347
    rocksdb::WriteBatch* regular_batch,
348
0
    IntraTxnWriteId* write_id) {
349
0
  auto input_value = original_input_value;
350
0
  DocHybridTimeBuffer doc_ht_buffer;
351
0
  RETURN_NOT_OK(input_value.consume_byte(ValueTypeAsChar::kUuid));
352
0
  Uuid status_tablet;
353
0
  RETURN_NOT_OK(status_tablet.FromSlice(input_value.Prefix(kUuidSize)));
354
0
  input_value.remove_prefix(kUuidSize);
355
0
  RETURN_NOT_OK(input_value.consume_byte(ValueTypeAsChar::kExternalIntents));
356
0
  for (;;) {
357
0
    auto key_size = VERIFY_RESULT(util::FastDecodeUnsignedVarInt(&input_value));
358
0
    if (key_size == 0) {
359
0
      break;
360
0
    }
361
0
    if (input_value.size() < key_size) {
362
0
      return NotEnoughBytes(input_value.size(), key_size, original_input_value);
363
0
    }
364
0
    auto output_key = input_value.Prefix(key_size);
365
0
    input_value.remove_prefix(key_size);
366
0
    auto value_size = VERIFY_RESULT(util::FastDecodeUnsignedVarInt(&input_value));
367
0
    if (input_value.size() < value_size) {
368
0
      return NotEnoughBytes(input_value.size(), value_size, original_input_value);
369
0
    }
370
0
    auto output_value = input_value.Prefix(value_size);
371
0
    input_value.remove_prefix(value_size);
372
0
    std::array<Slice, 2> key_parts = {{
373
0
        output_key,
374
0
        doc_ht_buffer.EncodeWithValueType(commit_ht, *write_id),
375
0
    }};
376
0
    std::array<Slice, 1> value_parts = {{
377
0
        output_value,
378
0
    }};
379
0
    regular_batch->Put(key_parts, value_parts);
380
0
    ++*write_id;
381
0
  }
382
383
0
  return Status::OK();
384
0
}
385
386
// Reads all stored external intents for provided transactions and prepares batches that will apply
387
// them into regular db and remove from intents db.
388
CHECKED_STATUS PrepareApplyExternalIntents(
389
    ExternalTxnApplyState* apply_external_transactions,
390
    rocksdb::WriteBatch* regular_batch,
391
    rocksdb::DB* intents_db,
392
3.42M
    rocksdb::WriteBatch* intents_batch) {
393
3.42M
  if (apply_external_transactions->empty()) {
394
3.42M
    return Status::OK();
395
3.42M
  }
396
397
153
  KeyBytes key_prefix;
398
153
  KeyBytes key_upperbound;
399
153
  Slice key_upperbound_slice;
400
401
153
  auto iter = CreateRocksDBIterator(
402
153
      intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER,
403
153
      /* user_key_for_filter= */ boost::none,
404
153
      rocksdb::kDefaultQueryId, /* read_filter= */ nullptr, &key_upperbound_slice);
405
406
0
  for (auto& apply : *apply_external_transactions) {
407
0
    key_prefix.Clear();
408
0
    key_prefix.AppendValueType(ValueType::kExternalTransactionId);
409
0
    key_prefix.AppendRawBytes(apply.first.AsSlice());
410
411
0
    key_upperbound = key_prefix;
412
0
    key_upperbound.AppendValueType(ValueType::kMaxByte);
413
0
    key_upperbound_slice = key_upperbound.AsSlice();
414
415
0
    IntraTxnWriteId& write_id = apply.second.write_id;
416
417
0
    iter.Seek(key_prefix);
418
0
    while (iter.Valid()) {
419
0
      const Slice input_key(iter.key());
420
421
0
      if (!input_key.starts_with(key_prefix.AsSlice())) {
422
0
        break;
423
0
      }
424
425
0
      if (regular_batch) {
426
0
        RETURN_NOT_OK(PrepareApplyExternalIntentsBatch(
427
0
            apply.second.commit_ht, iter.value(), regular_batch, &write_id));
428
0
      }
429
0
      if (intents_batch) {
430
0
        intents_batch->SingleDelete(input_key);
431
0
      }
432
433
0
      iter.Next();
434
0
    }
435
0
  }
436
437
153
  return Status::OK();
438
153
}
439
440
3.42M
ExternalTxnApplyState ProcessApplyExternalTransactions(const KeyValueWriteBatchPB& put_batch) {
441
3.42M
  ExternalTxnApplyState result;
442
0
  for (const auto& apply : put_batch.apply_external_transactions()) {
443
0
    auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(apply.transaction_id()));
444
0
    auto commit_ht = HybridTime(apply.commit_hybrid_time());
445
0
    result.emplace(
446
0
        txn_id,
447
0
        ExternalTxnApplyStateData{
448
0
          .commit_ht = commit_ht
449
0
        });
450
0
  }
451
452
3.42M
  return result;
453
3.42M
}
454
455
} // namespace
456
457
bool AddExternalPairToWriteBatch(
458
    const KeyValuePairPB& kv_pair,
459
    HybridTime hybrid_time,
460
    int write_id,
461
    ExternalTxnApplyState* apply_external_transactions,
462
    rocksdb::WriteBatch* regular_write_batch,
463
39.5M
    rocksdb::WriteBatch* intents_write_batch) {
464
39.5M
  DocHybridTimeBuffer doc_ht_buffer;
465
39.5M
  DocHybridTimeWordBuffer inverted_doc_ht_buffer;
466
467
39.5M
  CHECK(!kv_pair.key().empty());
468
39.5M
  CHECK(!kv_pair.value().empty());
469
470
39.5M
  if (kv_pair.key()[0] != ValueTypeAsChar::kExternalTransactionId) {
471
39.5M
    return true;
472
39.5M
  }
473
474
  // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here.
475
  // The reason for this is that the HybridTime timestamp is only picked at the time of
476
  // appending  an entry to the tablet's Raft log. Also this is a good way to save network
477
  // bandwidth.
478
  //
479
  // "Write id" is the final component of our HybridTime encoding (or, to be more precise,
480
  // DocHybridTime encoding) that helps disambiguate between different updates to the
481
  // same key (row/column) within a transaction. We set it based on the position of the write
482
  // operation in its write batch.
483
484
18.4E
  hybrid_time = kv_pair.has_external_hybrid_time() ?
485
18.4E
      HybridTime(kv_pair.external_hybrid_time()) : hybrid_time;
486
18.4E
  std::array<Slice, 2> key_parts = {{
487
18.4E
      Slice(kv_pair.key()),
488
18.4E
      doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id),
489
18.4E
  }};
490
18.4E
  Slice key_value = kv_pair.value();
491
  // This entry contains external intents.
492
18.4E
  Slice key = kv_pair.key();
493
18.4E
  key.consume_byte();
494
18.4E
  auto txn_id = CHECK_RESULT(DecodeTransactionId(&key));
495
18.4E
  auto it = apply_external_transactions->find(txn_id);
496
18.4E
  if (it != apply_external_transactions->end()) {
497
    // The same write operation could contain external intents and instruct us to apply them.
498
0
    CHECK_OK(PrepareApplyExternalIntentsBatch(
499
0
        it->second.commit_ht, key_value, regular_write_batch, &it->second.write_id));
500
0
    return false;
501
0
  }
502
18.4E
  key_parts[1] = InvertEncodedDocHT(key_parts[1], &inverted_doc_ht_buffer);
503
18.4E
  constexpr size_t kNumValueParts = 1;
504
18.4E
  intents_write_batch->Put(key_parts, { &key_value, kNumValueParts });
505
506
18.4E
  return false;
507
18.4E
}
508
509
// Usually put_batch contains only records that should be applied to regular DB.
510
// So apply_external_transactions will be empty and regular_entry will be true.
511
//
512
// But in general case on consumer side of CDC put_batch could contain various kinds of records,
513
// that should be applied into regular and intents db.
514
// They are:
515
// apply_external_transactions
516
//   The list of external transactions that should be applied.
517
//   For each such transaction we should lookup for existing external intents (stored in intents DB)
518
//   and convert them to Put command in regular_write_batch plus SingleDelete command in
519
//   intents_write_batch.
520
// write_pairs
521
//   Could contain regular entries, that should be stored into regular DB as is.
522
//   Also pair could contain external intents, that should be stored into intents DB.
523
//   But if apply_external_transactions contains transaction for those external intents, then
524
//   those intents will be applied directly to regular DB, avoiding unnecessary write to intents DB.
525
//   This case is very common for short running transactions.
526
bool PrepareExternalWriteBatch(
527
    const KeyValueWriteBatchPB& put_batch,
528
    HybridTime hybrid_time,
529
    rocksdb::DB* intents_db,
530
    rocksdb::WriteBatch* regular_write_batch,
531
3.42M
    rocksdb::WriteBatch* intents_write_batch) {
532
3.42M
  CHECK(put_batch.read_pairs().empty());
533
534
3.42M
  auto apply_external_transactions = ProcessApplyExternalTransactions(put_batch);
535
536
3.42M
  CHECK_OK(PrepareApplyExternalIntents(
537
3.42M
      &apply_external_transactions, regular_write_batch, intents_db, intents_write_batch));
538
539
3.42M
  bool has_non_external_kvs = false;
540
43.0M
  for (int write_id = 0; write_id < put_batch.write_pairs_size(); ++write_id) {
541
39.5M
    has_non_external_kvs = AddExternalPairToWriteBatch(
542
39.5M
        put_batch.write_pairs(write_id), hybrid_time, write_id, &apply_external_transactions,
543
0
        regular_write_batch, intents_write_batch) || has_non_external_kvs;
544
39.5M
  }
545
3.42M
  return has_non_external_kvs;
546
3.42M
}
547
548
namespace {
549
550
// Checks if the given slice points to the part of an encoded SubDocKey past all of the subkeys
551
// (and definitely past all the hash/range keys). The only remaining part could be a hybrid time.
552
35.1M
inline bool IsEndOfSubKeys(const Slice& key) {
553
35.1M
  return key[0] == ValueTypeAsChar::kGroupEnd &&
554
28.7M
         (key.size() == 1 || key[1] == ValueTypeAsChar::kHybridTime);
555
35.1M
}
556
557
// Enumerates weak intent keys generated by considering specified prefixes of the given key and
558
// invoking the provided callback with each combination considered, stored in encoded_key_buffer.
559
// On return, *encoded_key_buffer contains the corresponding strong intent, for which the callback
560
// has not yet been called. It is left to the caller to use the final state of encoded_key_buffer.
561
//
562
// The prefixes of the key considered are as follows:
563
// 1. Up to and including the whole hash key.
564
// 2. Up to and including the whole range key, or if partial_range_key_intents is
565
//    PartialRangeKeyIntents::kTrue, then enumerate the prefix up to the end of each component of
566
//    the range key separately.
567
// 3. Up to and including each subkey component, separately.
568
//
569
// In any case, we stop short of enumerating the last intent key generated based on the above, as
570
// this represents the strong intent key and will be stored in encoded_key_buffer at the end of this
571
// call.
572
//
573
// The beginning of each intent key will also include any cotable_id or PgTableOid, if present.
574
Status EnumerateWeakIntents(
575
    Slice key,
576
    const EnumerateIntentsCallback& functor,
577
    KeyBytes* encoded_key_buffer,
578
28.7M
    PartialRangeKeyIntents partial_range_key_intents) {
579
28.7M
  static const Slice kEmptyIntentValue;
580
581
28.7M
  encoded_key_buffer->Clear();
582
28.7M
  if (key.empty()) {
583
0
    return STATUS(Corruption, "An empty slice is not a valid encoded SubDocKey");
584
0
  }
585
586
28.7M
  const bool has_cotable_id = *key.cdata() == ValueTypeAsChar::kTableId;
587
28.7M
  const bool has_pgtable_id = *key.cdata() == ValueTypeAsChar::kPgTableOid;
588
28.7M
  {
589
28.7M
    bool is_table_root_key = false;
590
28.7M
    if (has_cotable_id) {
591
3.78M
      const auto kMinExpectedSize = kUuidSize + 2;
592
3.78M
      if (key.size() < kMinExpectedSize) {
593
0
        return STATUS_FORMAT(
594
0
            Corruption,
595
0
            "Expected an encoded SubDocKey starting with a cotable id to be at least $0 bytes long",
596
0
            kMinExpectedSize);
597
0
      }
598
3.78M
      encoded_key_buffer->AppendRawBytes(key.cdata(), kUuidSize + 1);
599
3.78M
      is_table_root_key = key[kUuidSize + 1] == ValueTypeAsChar::kGroupEnd;
600
24.9M
    } else if (has_pgtable_id) {
601
457
      const auto kMinExpectedSize = sizeof(PgTableOid) + 2;
602
457
      if (key.size() < kMinExpectedSize) {
603
0
        return STATUS_FORMAT(
604
0
            Corruption,
605
0
            "Expected an encoded SubDocKey starting with a pgtable id to be at least $0 bytes long",
606
0
            kMinExpectedSize);
607
0
      }
608
457
      encoded_key_buffer->AppendRawBytes(key.cdata(), sizeof(PgTableOid) + 1);
609
457
      is_table_root_key = key[sizeof(PgTableOid) + 1] == ValueTypeAsChar::kGroupEnd;
610
24.9M
    } else {
611
24.9M
      is_table_root_key = *key.cdata() == ValueTypeAsChar::kGroupEnd;
612
24.9M
    }
613
614
28.7M
    encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
615
616
28.7M
    if (is_table_root_key) {
617
      // This must be a "table root" (or "tablet root") key (no hash components, no range
618
      // components, but the cotable might still be there). We are not really considering the case
619
      // of any subkeys under the empty key, so we can return here.
620
3.37k
      return Status::OK();
621
3.37k
    }
622
28.7M
  }
623
624
  // For any non-empty key we already know that the empty key intent is weak.
625
28.7M
  RETURN_NOT_OK(functor(
626
28.7M
      IntentStrength::kWeak, FullDocKey::kFalse, kEmptyIntentValue, encoded_key_buffer,
627
28.7M
      LastKey::kFalse));
628
629
28.7M
  auto hashed_part_size = VERIFY_RESULT(DocKey::EncodedSize(key, DocKeyPart::kUpToHash));
630
631
  // Remove kGroupEnd that we just added to generate a weak intent.
632
28.7M
  encoded_key_buffer->RemoveLastByte();
633
634
28.7M
  if (hashed_part_size != encoded_key_buffer->size()) {
635
    // A hash component is present. Note that if cotable id is present, hashed_part_size would
636
    // also include it, so we only need to append the new bytes.
637
21.5M
    encoded_key_buffer->AppendRawBytes(
638
21.5M
        key.cdata() + encoded_key_buffer->size(), hashed_part_size - encoded_key_buffer->size());
639
21.5M
    key.remove_prefix(hashed_part_size);
640
21.5M
    if (key.empty()) {
641
0
      return STATUS(Corruption, "Range key part missing, expected at least a kGroupEnd");
642
0
    }
643
644
    // Append the kGroupEnd at the end for the empty range part to make this a valid encoded DocKey.
645
21.5M
    encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
646
21.5M
    if (IsEndOfSubKeys(key)) {
647
      // This means the key ends at the hash component -- no range keys and no subkeys.
648
5.67M
      return Status::OK();
649
5.67M
    }
650
651
    // Generate a weak intent that only includes the hash component.
652
15.8M
    RETURN_NOT_OK(functor(
653
15.8M
        IntentStrength::kWeak, FullDocKey(key[0] == ValueTypeAsChar::kGroupEnd), kEmptyIntentValue,
654
15.8M
        encoded_key_buffer, LastKey::kFalse));
655
656
    // Remove the kGroupEnd we added a bit earlier so we can append some range components.
657
15.8M
    encoded_key_buffer->RemoveLastByte();
658
7.20M
  } else {
659
    // No hash component.
660
7.20M
    key.remove_prefix(hashed_part_size);
661
7.20M
  }
662
663
  // Range components.
664
23.0M
  auto range_key_start = key.cdata();
665
13.6M
  while (VERIFY_RESULT(ConsumePrimitiveValueFromKey(&key))) {
666
    // Append the consumed primitive value to encoded_key_buffer.
667
13.6M
    encoded_key_buffer->AppendRawBytes(range_key_start, key.cdata() - range_key_start);
668
    // We always need kGroupEnd at the end to make this a valid encoded DocKey.
669
13.6M
    encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
670
13.6M
    if (key.empty()) {
671
0
      return STATUS(Corruption, "Range key part is not terminated with a kGroupEnd");
672
0
    }
673
13.6M
    if (IsEndOfSubKeys(key)) {
674
      // This is the last range key and there are no subkeys.
675
997k
      return Status::OK();
676
997k
    }
677
12.6M
    FullDocKey full_doc_key(key[0] == ValueTypeAsChar::kGroupEnd);
678
12.6M
    if (partial_range_key_intents || full_doc_key) {
679
9.92M
      RETURN_NOT_OK(functor(
680
9.92M
          IntentStrength::kWeak, full_doc_key, kEmptyIntentValue, encoded_key_buffer,
681
9.92M
          LastKey::kFalse));
682
9.92M
    }
683
12.6M
    encoded_key_buffer->RemoveLastByte();
684
12.6M
    range_key_start = key.cdata();
685
12.6M
  }
686
687
  // We still need to append the kGroupEnd byte that closes the range portion to our buffer.
688
  // The corresponding kGroupEnd has already been consumed from the key slice by the last call to
689
  // ConsumePrimitiveValueFromKey, which returned false.
690
22.0M
  encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
691
692
  // Subkey components.
693
22.0M
  auto subkey_start = key.cdata();
694
22.0M
  while (VERIFY_RESULT(SubDocKey::DecodeSubkey(&key))) {
695
    // Append the consumed value to encoded_key_buffer.
696
22.0M
    encoded_key_buffer->AppendRawBytes(subkey_start, key.cdata() - subkey_start);
697
22.0M
    if (key.empty() || *key.cdata() == ValueTypeAsChar::kHybridTime) {
698
      // This was the last subkey.
699
22.0M
      return Status::OK();
700
22.0M
    }
701
12.6k
    RETURN_NOT_OK(functor(
702
12.6k
        IntentStrength::kWeak, FullDocKey::kTrue, kEmptyIntentValue, encoded_key_buffer,
703
12.6k
        LastKey::kFalse));
704
12.6k
    subkey_start = key.cdata();
705
12.6k
  }
706
707
8.24k
  return STATUS(
708
22.0M
      Corruption,
709
22.0M
      "Expected to reach the end of the key after decoding last valid subkey");
710
22.0M
}
711
712
}  // anonymous namespace
713
714
Status EnumerateIntents(
715
    Slice key, const Slice& intent_value, const EnumerateIntentsCallback& functor,
716
    KeyBytes* encoded_key_buffer, PartialRangeKeyIntents partial_range_key_intents,
717
28.7M
    LastKey last_key) {
718
28.7M
  RETURN_NOT_OK(EnumerateWeakIntents(
719
28.7M
      key, functor, encoded_key_buffer, partial_range_key_intents));
720
28.7M
  return functor(
721
28.7M
      IntentStrength::kStrong, FullDocKey::kTrue, intent_value, encoded_key_buffer, last_key);
722
28.7M
}
723
724
Status EnumerateIntents(
725
    const google::protobuf::RepeatedPtrField<KeyValuePairPB> &kv_pairs,
726
1.65M
    const EnumerateIntentsCallback& functor, PartialRangeKeyIntents partial_range_key_intents) {
727
1.65M
  KeyBytes encoded_key;
728
729
25.0M
  for (int index = 0; index < kv_pairs.size(); ) {
730
23.3M
    const auto &kv_pair = kv_pairs.Get(index);
731
23.3M
    ++index;
732
23.3M
    CHECK(!kv_pair.key().empty());
733
23.3M
    CHECK(!kv_pair.value().empty());
734
23.3M
    RETURN_NOT_OK(EnumerateIntents(
735
23.3M
        kv_pair.key(), kv_pair.value(), functor, &encoded_key, partial_range_key_intents,
736
23.3M
        LastKey(index == kv_pairs.size())));
737
23.3M
  }
738
739
1.65M
  return Status::OK();
740
1.65M
}
741
742
// ------------------------------------------------------------------------------------------------
743
// Standalone functions
744
// ------------------------------------------------------------------------------------------------
745
746
1.73M
void AppendTransactionKeyPrefix(const TransactionId& transaction_id, KeyBytes* out) {
747
1.73M
  out->AppendValueType(ValueType::kTransactionId);
748
1.73M
  out->AppendRawBytes(transaction_id.AsSlice());
749
1.73M
}
750
751
Result<ApplyTransactionState> GetIntentsBatch(
752
    const TransactionId& transaction_id,
753
    const KeyBounds* key_bounds,
754
    const ApplyTransactionState* stream_state,
755
    rocksdb::DB* intents_db,
756
74
    std::vector<IntentKeyValueForCDC>* key_value_intents) {
757
74
  KeyBytes txn_reverse_index_prefix;
758
74
  Slice transaction_id_slice = transaction_id.AsSlice();
759
74
  AppendTransactionKeyPrefix(transaction_id, &txn_reverse_index_prefix);
760
74
  txn_reverse_index_prefix.AppendValueType(ValueType::kMaxByte);
761
74
  Slice key_prefix = txn_reverse_index_prefix.AsSlice();
762
74
  key_prefix.remove_suffix(1);
763
74
  const Slice reverse_index_upperbound = txn_reverse_index_prefix.AsSlice();
764
765
74
  auto reverse_index_iter = CreateRocksDBIterator(
766
74
      intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
767
74
      rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound);
768
769
74
  BoundedRocksDbIterator intent_iter = CreateRocksDBIterator(
770
74
      intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
771
74
      rocksdb::kDefaultQueryId);
772
773
74
  reverse_index_iter.Seek(key_prefix);
774
775
74
  DocHybridTimeBuffer doc_ht_buffer;
776
74
  IntraTxnWriteId write_id = 0;
777
74
  if (stream_state != nullptr && stream_state->active() && stream_state->write_id != 0) {
778
0
    reverse_index_iter.Seek(stream_state->key);
779
0
    write_id = stream_state->write_id;
780
0
    reverse_index_iter.Next();
781
0
  }
782
74
  const uint64_t max_records = FLAGS_cdc_max_stream_intent_records;
783
74
  const uint64_t write_id_limit = write_id + max_records;
784
785
749
  while (reverse_index_iter.Valid()) {
786
675
    const Slice key_slice(reverse_index_iter.key());
787
788
675
    if (!key_slice.starts_with(key_prefix)) {
789
0
      break;
790
0
    }
791
    // If the key ends at the transaction id then it is transaction metadata (status tablet,
792
    // isolation level etc.).
793
675
    if (key_slice.size() > txn_reverse_index_prefix.size()) {
794
601
      auto reverse_index_value = reverse_index_iter.value();
795
601
      if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet) {
796
0
        reverse_index_value.remove_prefix(1);
797
0
        RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value));
798
0
      }
799
      // Value of reverse index is a key of original intent record, so seek it and check match.
800
601
      if ((!key_bounds || key_bounds->IsWithinBounds(reverse_index_iter.value()))) {
801
        // return when we have reached the batch limit.
802
601
        if (write_id >= write_id_limit) {
803
0
          return ApplyTransactionState{
804
0
              .key = key_slice.ToBuffer(),
805
0
              .write_id = write_id,
806
0
          };
807
0
        }
808
601
        {
809
601
          intent_iter.Seek(reverse_index_value);
810
601
          if (!intent_iter.Valid() || intent_iter.key() != reverse_index_value) {
811
0
            LOG(WARNING) << "Unable to find intent: " << reverse_index_value.ToDebugHexString()
812
0
                        << " for " << key_slice.ToDebugHexString();
813
0
            return ApplyTransactionState{};
814
0
          }
815
816
601
          auto intent = VERIFY_RESULT(ParseIntentKey(intent_iter.key(), transaction_id_slice));
817
818
601
          if (intent.types.Test(IntentType::kStrongWrite)) {
819
309
            auto decoded_value =
820
309
                VERIFY_RESULT(DecodeIntentValue(intent_iter.value(), &transaction_id_slice));
821
309
            write_id = decoded_value.write_id;
822
823
309
            if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) {
824
0
              continue;
825
0
            }
826
827
309
            std::array<Slice, 1> key_parts = {{
828
309
                intent.doc_path,
829
309
            }};
830
309
            std::array<Slice, 1> value_parts = {{
831
309
                decoded_value.body,
832
309
            }};
833
834
309
            IntentKeyValueForCDC intent_metadata;
835
309
            intent_metadata.key = Slice(key_parts, &(intent_metadata.key_buf));
836
309
            intent_metadata.value = Slice(value_parts, &(intent_metadata.value_buf));
837
309
            intent_metadata.reverse_index_key = key_slice.ToBuffer();
838
309
            intent_metadata.write_id = write_id;
839
309
            (*key_value_intents).push_back(intent_metadata);
840
841
0
            VLOG(4) << "The size of intentKeyValues in GetIntentList "
842
0
                      << (*key_value_intents).size();
843
309
            ++write_id;
844
309
          }
845
601
        }
846
601
      }
847
601
    }
848
675
    reverse_index_iter.Next();
849
675
  }
850
851
74
  return ApplyTransactionState{};
852
74
}
853
854
0
std::string ApplyTransactionState::ToString() const {
855
0
  return Format(
856
0
      "{ key: $0 write_id: $1 aborted: $2 }", Slice(key).ToDebugString(), write_id, aborted);
857
0
}
858
859
void CombineExternalIntents(
860
    const TransactionId& txn_id,
861
0
    ExternalIntentsProvider* provider) {
862
  // External intents are stored in the following format:
863
  // key: kExternalTransactionId, txn_id
864
  // value: size(intent1_key), intent1_key, size(intent1_value), intent1_value, size(intent2_key)...
865
  // where size is encoded as varint.
866
867
0
  docdb::KeyBytes buffer;
868
0
  buffer.AppendValueType(ValueType::kExternalTransactionId);
869
0
  buffer.AppendRawBytes(txn_id.AsSlice());
870
0
  provider->SetKey(buffer.AsSlice());
871
0
  buffer.Clear();
872
0
  buffer.AppendValueType(ValueType::kUuid);
873
0
  buffer.AppendRawBytes(provider->InvolvedTablet().AsSlice());
874
0
  buffer.AppendValueType(ValueType::kExternalIntents);
875
0
  while (auto key_value = provider->Next()) {
876
0
    buffer.AppendUInt64AsVarInt(key_value->first.size());
877
0
    buffer.AppendRawBytes(key_value->first);
878
0
    buffer.AppendUInt64AsVarInt(key_value->second.size());
879
0
    buffer.AppendRawBytes(key_value->second);
880
0
  }
881
0
  buffer.AppendUInt64AsVarInt(0);
882
0
  provider->SetValue(buffer.AsSlice());
883
0
}
884
885
}  // namespace docdb
886
}  // namespace yb