YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb.h"
15
16
#include <algorithm>
17
#include <limits>
18
#include <memory>
19
#include <stack>
20
#include <string>
21
#include <unordered_map>
22
#include <vector>
23
24
#include "yb/common/hybrid_time.h"
25
#include "yb/common/row_mark.h"
26
#include "yb/common/transaction.h"
27
28
#include "yb/docdb/conflict_resolution.h"
29
#include "yb/docdb/cql_operation.h"
30
#include "yb/docdb/docdb-internal.h"
31
#include "yb/docdb/docdb.pb.h"
32
#include "yb/docdb/docdb_debug.h"
33
#include "yb/docdb/doc_kv_util.h"
34
#include "yb/docdb/docdb_rocksdb_util.h"
35
#include "yb/docdb/docdb_types.h"
36
#include "yb/docdb/intent.h"
37
#include "yb/docdb/intent_aware_iterator.h"
38
#include "yb/docdb/pgsql_operation.h"
39
#include "yb/docdb/rocksdb_writer.h"
40
#include "yb/docdb/subdocument.h"
41
#include "yb/docdb/value.h"
42
#include "yb/docdb/value_type.h"
43
44
#include "yb/gutil/casts.h"
45
#include "yb/gutil/strings/substitute.h"
46
47
#include "yb/rocksutil/write_batch_formatter.h"
48
49
#include "yb/server/hybrid_clock.h"
50
51
#include "yb/util/bitmap.h"
52
#include "yb/util/bytes_formatter.h"
53
#include "yb/util/enums.h"
54
#include "yb/util/fast_varint.h"
55
#include "yb/util/flag_tags.h"
56
#include "yb/util/logging.h"
57
#include "yb/util/metrics.h"
58
#include "yb/util/pb_util.h"
59
#include "yb/util/status.h"
60
#include "yb/util/status_format.h"
61
#include "yb/util/status_log.h"
62
63
#include "yb/yql/cql/ql/util/errcodes.h"
64
65
using std::endl;
66
using std::list;
67
using std::string;
68
using std::stringstream;
69
using std::unique_ptr;
70
using std::shared_ptr;
71
using std::stack;
72
using std::vector;
73
using std::make_shared;
74
75
using yb::HybridTime;
76
using yb::FormatBytesAsStr;
77
using strings::Substitute;
78
79
using namespace std::placeholders;
80
81
DEFINE_int32(cdc_max_stream_intent_records, 1000,
82
             "Max number of intent records allowed in single cdc batch. ");
83
84
namespace yb {
85
namespace docdb {
86
87
namespace {
88
89
// key should be valid prefix of doc key, ending with some complete pritimive value or group end.
90
CHECKED_STATUS ApplyIntent(RefCntPrefix key,
91
                           const IntentTypeSet intent_types,
92
66.2M
                           LockBatchEntries *keys_locked) {
93
  // Have to strip kGroupEnd from end of key, because when only hash key is specified, we will
94
  // get two kGroupEnd at end of strong intent.
95
66.2M
  size_t size = key.size();
96
66.2M
  if (size > 0) {
97
58.8M
    if (key.data()[0] == ValueTypeAsChar::kGroupEnd) {
98
2.86M
      if (size != 1) {
99
0
        return STATUS_FORMAT(Corruption, "Key starting with group end: $0",
100
0
            key.as_slice().ToDebugHexString());
101
0
      }
102
2.86M
      size = 0;
103
55.9M
    } else {
104
89.1M
      while (key.data()[size - 1] == ValueTypeAsChar::kGroupEnd) {
105
33.1M
        --size;
106
33.1M
      }
107
55.9M
    }
108
58.8M
  }
109
66.2M
  key.Resize(size);
110
66.2M
  keys_locked->push_back({key, intent_types});
111
66.2M
  return Status::OK();
112
66.2M
}
113
114
struct DetermineKeysToLockResult {
115
  LockBatchEntries lock_batch;
116
  bool need_read_snapshot;
117
118
0
  std::string ToString() const {
119
0
    return YB_STRUCT_TO_STRING(lock_batch, need_read_snapshot);
120
0
  }
121
};
122
123
Result<DetermineKeysToLockResult> DetermineKeysToLock(
124
    const std::vector<std::unique_ptr<DocOperation>>& doc_write_ops,
125
    const google::protobuf::RepeatedPtrField<KeyValuePairPB>& read_pairs,
126
    const IsolationLevel isolation_level,
127
    const OperationKind operation_kind,
128
    const RowMarkType row_mark_type,
129
    bool transactional_table,
130
3.24M
    PartialRangeKeyIntents partial_range_key_intents) {
131
3.24M
  DetermineKeysToLockResult result;
132
3.24M
  boost::container::small_vector<RefCntPrefix, 8> doc_paths;
133
3.24M
  boost::container::small_vector<size_t, 32> key_prefix_lengths;
134
3.24M
  result.need_read_snapshot = false;
135
17.5M
  for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) {
136
17.5M
    doc_paths.clear();
137
17.5M
    IsolationLevel level;
138
17.5M
    RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kLock, &doc_paths, &level));
139
17.5M
    if (isolation_level != IsolationLevel::NON_TRANSACTIONAL) {
140
6.78M
      level = isolation_level;
141
6.78M
    }
142
17.5M
    IntentTypeSet strong_intent_types = GetStrongIntentTypeSet(level, operation_kind,
143
17.5M
                                                               row_mark_type);
144
17.5M
    if (isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION &&
145
17.5M
        
operation_kind == OperationKind::kWrite130k
&&
146
17.5M
        
doc_op->RequireReadSnapshot()130k
) {
147
72.9k
      strong_intent_types = IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite});
148
72.9k
    }
149
150
17.5M
    for (const auto& doc_path : doc_paths) {
151
17.5M
      key_prefix_lengths.clear();
152
17.5M
      RETURN_NOT_OK(SubDocKey::DecodePrefixLengths(doc_path.as_slice(), &key_prefix_lengths));
153
      // At least entire doc_path should be returned, so empty key_prefix_lengths is an error.
154
17.5M
      if (key_prefix_lengths.empty()) {
155
0
        return STATUS_FORMAT(Corruption, "Unable to decode key prefixes from: $0",
156
0
                             doc_path.as_slice().ToDebugHexString());
157
0
      }
158
      // We will acquire strong lock on entire doc_path, so remove it from list of weak locks.
159
17.5M
      key_prefix_lengths.pop_back();
160
17.5M
      auto partial_key = doc_path;
161
      // Acquire weak lock on empty key for transactional tables,
162
      // unless specified key is already empty.
163
17.5M
      if (
doc_path.size() > 017.5M
&& transactional_table) {
164
7.38M
        partial_key.Resize(0);
165
7.38M
        RETURN_NOT_OK(ApplyIntent(
166
7.38M
            partial_key, StrongToWeak(strong_intent_types), &result.lock_batch));
167
7.38M
      }
168
35.5M
      
for (size_t prefix_length : key_prefix_lengths)17.5M
{
169
35.5M
        partial_key.Resize(prefix_length);
170
35.5M
        RETURN_NOT_OK(ApplyIntent(
171
35.5M
            partial_key, StrongToWeak(strong_intent_types), &result.lock_batch));
172
35.5M
      }
173
174
17.5M
      RETURN_NOT_OK(ApplyIntent(doc_path, strong_intent_types, &result.lock_batch));
175
17.5M
    }
176
177
17.5M
    if (doc_op->RequireReadSnapshot()) {
178
6.31M
      result.need_read_snapshot = true;
179
6.31M
    }
180
17.5M
  }
181
182
3.24M
  if (!read_pairs.empty()) {
183
314k
    RETURN_NOT_OK(EnumerateIntents(
184
314k
        read_pairs,
185
314k
        [&result](IntentStrength strength, FullDocKey, Slice value, KeyBytes* key, LastKey) {
186
314k
          RefCntPrefix prefix(key->AsSlice());
187
314k
          auto intent_types = strength == IntentStrength::kStrong
188
314k
              ? IntentTypeSet({IntentType::kStrongRead})
189
314k
              : IntentTypeSet({IntentType::kWeakRead});
190
314k
          return ApplyIntent(prefix, intent_types, &result.lock_batch);
191
314k
        }, partial_range_key_intents));
192
314k
  }
193
194
3.24M
  return result;
195
3.24M
}
196
197
// Collapse keys_locked into a unique set of keys with intent_types representing the union of
198
// intent_types originally present. In other words, suppose keys_locked is originally the following:
199
// [
200
//   (k1, {kWeakRead, kWeakWrite}),
201
//   (k1, {kStrongRead}),
202
//   (k2, {kWeakRead}),
203
//   (k3, {kStrongRead}),
204
//   (k2, {kStrongWrite}),
205
// ]
206
// Then after calling FilterKeysToLock we will have:
207
// [
208
//   (k1, {kWeakRead, kWeakWrite, kStrongRead}),
209
//   (k2, {kWeakRead}),
210
//   (k3, {kStrongRead, kStrongWrite}),
211
// ]
212
// Note that only keys which appear in order in keys_locked will be collapsed in this manner.
213
3.24M
void FilterKeysToLock(LockBatchEntries *keys_locked) {
214
3.24M
  if (keys_locked->empty()) {
215
0
    return;
216
0
  }
217
218
3.24M
  std::sort(keys_locked->begin(), keys_locked->end(),
219
560M
            [](const auto& lhs, const auto& rhs) {
220
560M
              return lhs.key < rhs.key;
221
560M
            });
222
223
3.24M
  auto w = keys_locked->begin();
224
66.2M
  for (auto it = keys_locked->begin(); ++it != keys_locked->end();) {
225
62.9M
    if (it->key == w->key) {
226
22.0M
      w->intent_types |= it->intent_types;
227
40.9M
    } else {
228
40.9M
      ++w;
229
40.9M
      *w = *it;
230
40.9M
    }
231
62.9M
  }
232
233
3.24M
  ++w;
234
3.24M
  keys_locked->erase(w, keys_locked->end());
235
3.24M
}
236
237
}  // namespace
238
239
Result<PrepareDocWriteOperationResult> PrepareDocWriteOperation(
240
    const std::vector<std::unique_ptr<DocOperation>>& doc_write_ops,
241
    const google::protobuf::RepeatedPtrField<KeyValuePairPB>& read_pairs,
242
    const scoped_refptr<Histogram>& write_lock_latency,
243
    const IsolationLevel isolation_level,
244
    const OperationKind operation_kind,
245
    const RowMarkType row_mark_type,
246
    bool transactional_table,
247
    CoarseTimePoint deadline,
248
    PartialRangeKeyIntents partial_range_key_intents,
249
3.24M
    SharedLockManager *lock_manager) {
250
3.24M
  PrepareDocWriteOperationResult result;
251
252
3.24M
  auto determine_keys_to_lock_result = VERIFY_RESULT(DetermineKeysToLock(
253
3.24M
      doc_write_ops, read_pairs, isolation_level, operation_kind, row_mark_type,
254
3.24M
      transactional_table, partial_range_key_intents));
255
4.40k
  VLOG_WITH_FUNC(4) << "determine_keys_to_lock_result=" << determine_keys_to_lock_result.ToString();
256
3.24M
  if (determine_keys_to_lock_result.lock_batch.empty()) {
257
0
    LOG(ERROR) << "Empty lock batch, doc_write_ops: " << yb::ToString(doc_write_ops)
258
0
               << ", read pairs: " << yb::ToString(read_pairs);
259
0
    return STATUS(Corruption, "Empty lock batch");
260
0
  }
261
3.24M
  result.need_read_snapshot = determine_keys_to_lock_result.need_read_snapshot;
262
263
3.24M
  FilterKeysToLock(&determine_keys_to_lock_result.lock_batch);
264
3.24M
  
VLOG_WITH_FUNC1.86k
(4) << "filtered determine_keys_to_lock_result="
265
1.86k
                    << determine_keys_to_lock_result.ToString();
266
3.24M
  const MonoTime start_time = (write_lock_latency != nullptr) ? 
MonoTime::Now()3.24M
:
MonoTime()1.18k
;
267
3.24M
  result.lock_batch = LockBatch(
268
3.24M
      lock_manager, std::move(determine_keys_to_lock_result.lock_batch), deadline);
269
3.24M
  RETURN_NOT_OK_PREPEND(
270
3.24M
      result.lock_batch.status(), Format("Timeout: $0", deadline - ToCoarse(start_time)));
271
3.24M
  if (write_lock_latency != nullptr) {
272
3.24M
    const MonoDelta elapsed_time = MonoTime::Now().GetDeltaSince(start_time);
273
3.24M
    write_lock_latency->Increment(elapsed_time.ToMicroseconds());
274
3.24M
  }
275
276
3.24M
  return result;
277
3.24M
}
278
279
30
Status SetDocOpQLErrorResponse(DocOperation* doc_op, string err_msg) {
280
30
  switch (doc_op->OpType()) {
281
30
    case DocOperation::Type::QL_WRITE_OPERATION: {
282
30
      const auto &resp = down_cast<QLWriteOperation *>(doc_op)->response();
283
30
      resp->set_status(QLResponsePB::YQL_STATUS_QUERY_ERROR);
284
30
      resp->set_error_message(err_msg);
285
30
      break;
286
0
    }
287
0
    case DocOperation::Type::PGSQL_WRITE_OPERATION: {
288
0
      const auto &resp = down_cast<PgsqlWriteOperation *>(doc_op)->response();
289
0
      resp->set_status(PgsqlResponsePB::PGSQL_STATUS_USAGE_ERROR);
290
0
      resp->set_error_message(err_msg);
291
0
      break;
292
0
    }
293
0
    default:
294
0
      return STATUS_FORMAT(InternalError,
295
30
                           "Invalid status (QLError) for doc operation %d",
296
30
                           doc_op->OpType());
297
30
  }
298
30
  return Status::OK();
299
30
}
300
301
Status AssembleDocWriteBatch(const vector<unique_ptr<DocOperation>>& doc_write_ops,
302
                                CoarseTimePoint deadline,
303
                                const ReadHybridTime& read_time,
304
                                const DocDB& doc_db,
305
                                KeyValueWriteBatchPB* write_batch,
306
                                InitMarkerBehavior init_marker_behavior,
307
                                std::atomic<int64_t>* monotonic_counter,
308
                                HybridTime* restart_read_ht,
309
3.11M
                                const string& table_name) {
310
3.11M
  DCHECK_ONLY_NOTNULL(restart_read_ht);
311
3.11M
  DocWriteBatch doc_write_batch(doc_db, init_marker_behavior, monotonic_counter);
312
3.11M
  DocOperationApplyData data = {&doc_write_batch, deadline, read_time, restart_read_ht};
313
17.4M
  for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) {
314
17.4M
    Status s = doc_op->Apply(data);
315
17.4M
    if (s.IsQLError()) {
316
30
      string error_msg;
317
30
      if (ql::GetErrorCode(s) == ql::ErrorCode::CONDITION_NOT_SATISFIED) {
318
        // Generating the error message here because 'table_name'
319
        // is not available on the lower level - in doc_op->Apply().
320
19
        error_msg = Format("Condition on table $0 was not satisfied.", table_name);
321
19
      } else {
322
11
        error_msg =  s.message().ToBuffer();
323
11
      }
324
325
      // Ensure we set appropriate error in the response object for QL errors.
326
30
      RETURN_NOT_OK(SetDocOpQLErrorResponse(doc_op.get(), error_msg));
327
30
      continue;
328
30
    }
329
330
17.4M
    RETURN_NOT_OK(s);
331
17.4M
  }
332
3.11M
  doc_write_batch.MoveToWriteBatchPB(write_batch);
333
3.11M
  return Status::OK();
334
3.11M
}
335
336
namespace {
337
338
0
CHECKED_STATUS NotEnoughBytes(size_t present, size_t required, const Slice& full) {
339
0
  return STATUS_FORMAT(
340
0
      Corruption, "Not enough bytes in external intents $0 while $1 expected, full: $2",
341
0
      present, required, full.ToDebugHexString());
342
0
}
343
344
CHECKED_STATUS PrepareApplyExternalIntentsBatch(
345
    HybridTime commit_ht,
346
    const Slice& original_input_value,
347
    rocksdb::WriteBatch* regular_batch,
348
0
    IntraTxnWriteId* write_id) {
349
0
  auto input_value = original_input_value;
350
0
  DocHybridTimeBuffer doc_ht_buffer;
351
0
  RETURN_NOT_OK(input_value.consume_byte(ValueTypeAsChar::kUuid));
352
0
  Uuid status_tablet;
353
0
  RETURN_NOT_OK(status_tablet.FromSlice(input_value.Prefix(kUuidSize)));
354
0
  input_value.remove_prefix(kUuidSize);
355
0
  RETURN_NOT_OK(input_value.consume_byte(ValueTypeAsChar::kExternalIntents));
356
0
  for (;;) {
357
0
    auto key_size = VERIFY_RESULT(util::FastDecodeUnsignedVarInt(&input_value));
358
0
    if (key_size == 0) {
359
0
      break;
360
0
    }
361
0
    if (input_value.size() < key_size) {
362
0
      return NotEnoughBytes(input_value.size(), key_size, original_input_value);
363
0
    }
364
0
    auto output_key = input_value.Prefix(key_size);
365
0
    input_value.remove_prefix(key_size);
366
0
    auto value_size = VERIFY_RESULT(util::FastDecodeUnsignedVarInt(&input_value));
367
0
    if (input_value.size() < value_size) {
368
0
      return NotEnoughBytes(input_value.size(), value_size, original_input_value);
369
0
    }
370
0
    auto output_value = input_value.Prefix(value_size);
371
0
    input_value.remove_prefix(value_size);
372
0
    std::array<Slice, 2> key_parts = {{
373
0
        output_key,
374
0
        doc_ht_buffer.EncodeWithValueType(commit_ht, *write_id),
375
0
    }};
376
0
    std::array<Slice, 1> value_parts = {{
377
0
        output_value,
378
0
    }};
379
0
    regular_batch->Put(key_parts, value_parts);
380
0
    ++*write_id;
381
0
  }
382
383
0
  return Status::OK();
384
0
}
385
386
// Reads all stored external intents for provided transactions and prepares batches that will apply
387
// them into regular db and remove from intents db.
388
CHECKED_STATUS PrepareApplyExternalIntents(
389
    ExternalTxnApplyState* apply_external_transactions,
390
    rocksdb::WriteBatch* regular_batch,
391
    rocksdb::DB* intents_db,
392
6.34M
    rocksdb::WriteBatch* intents_batch) {
393
6.34M
  if (apply_external_transactions->empty()) {
394
6.34M
    return Status::OK();
395
6.34M
  }
396
397
501
  KeyBytes key_prefix;
398
501
  KeyBytes key_upperbound;
399
501
  Slice key_upperbound_slice;
400
401
501
  auto iter = CreateRocksDBIterator(
402
501
      intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER,
403
501
      /* user_key_for_filter= */ boost::none,
404
501
      rocksdb::kDefaultQueryId, /* read_filter= */ nullptr, &key_upperbound_slice);
405
406
501
  for (auto& apply : *apply_external_transactions) {
407
0
    key_prefix.Clear();
408
0
    key_prefix.AppendValueType(ValueType::kExternalTransactionId);
409
0
    key_prefix.AppendRawBytes(apply.first.AsSlice());
410
411
0
    key_upperbound = key_prefix;
412
0
    key_upperbound.AppendValueType(ValueType::kMaxByte);
413
0
    key_upperbound_slice = key_upperbound.AsSlice();
414
415
0
    IntraTxnWriteId& write_id = apply.second.write_id;
416
417
0
    iter.Seek(key_prefix);
418
0
    while (iter.Valid()) {
419
0
      const Slice input_key(iter.key());
420
421
0
      if (!input_key.starts_with(key_prefix.AsSlice())) {
422
0
        break;
423
0
      }
424
425
0
      if (regular_batch) {
426
0
        RETURN_NOT_OK(PrepareApplyExternalIntentsBatch(
427
0
            apply.second.commit_ht, iter.value(), regular_batch, &write_id));
428
0
      }
429
0
      if (intents_batch) {
430
0
        intents_batch->SingleDelete(input_key);
431
0
      }
432
433
0
      iter.Next();
434
0
    }
435
0
  }
436
437
501
  return Status::OK();
438
501
}
439
440
6.34M
ExternalTxnApplyState ProcessApplyExternalTransactions(const KeyValueWriteBatchPB& put_batch) {
441
6.34M
  ExternalTxnApplyState result;
442
6.34M
  for (const auto& apply : put_batch.apply_external_transactions()) {
443
0
    auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(apply.transaction_id()));
444
0
    auto commit_ht = HybridTime(apply.commit_hybrid_time());
445
0
    result.emplace(
446
0
        txn_id,
447
0
        ExternalTxnApplyStateData{
448
0
          .commit_ht = commit_ht
449
0
        });
450
0
  }
451
452
6.34M
  return result;
453
6.34M
}
454
455
} // namespace
456
457
bool AddExternalPairToWriteBatch(
458
    const KeyValuePairPB& kv_pair,
459
    HybridTime hybrid_time,
460
    int write_id,
461
    ExternalTxnApplyState* apply_external_transactions,
462
    rocksdb::WriteBatch* regular_write_batch,
463
125M
    rocksdb::WriteBatch* intents_write_batch) {
464
125M
  DocHybridTimeBuffer doc_ht_buffer;
465
125M
  DocHybridTimeWordBuffer inverted_doc_ht_buffer;
466
467
125M
  CHECK(!kv_pair.key().empty());
468
125M
  CHECK(!kv_pair.value().empty());
469
470
125M
  if (kv_pair.key()[0] != ValueTypeAsChar::kExternalTransactionId) {
471
125M
    return true;
472
125M
  }
473
474
  // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here.
475
  // The reason for this is that the HybridTime timestamp is only picked at the time of
476
  // appending  an entry to the tablet's Raft log. Also this is a good way to save network
477
  // bandwidth.
478
  //
479
  // "Write id" is the final component of our HybridTime encoding (or, to be more precise,
480
  // DocHybridTime encoding) that helps disambiguate between different updates to the
481
  // same key (row/column) within a transaction. We set it based on the position of the write
482
  // operation in its write batch.
483
484
786
  hybrid_time = kv_pair.has_external_hybrid_time() ?
485
786
      
HybridTime(kv_pair.external_hybrid_time())0
: hybrid_time;
486
786
  std::array<Slice, 2> key_parts = {{
487
786
      Slice(kv_pair.key()),
488
786
      doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id),
489
786
  }};
490
786
  Slice key_value = kv_pair.value();
491
  // This entry contains external intents.
492
786
  Slice key = kv_pair.key();
493
786
  key.consume_byte();
494
786
  auto txn_id = CHECK_RESULT(DecodeTransactionId(&key));
495
786
  auto it = apply_external_transactions->find(txn_id);
496
786
  if (it != apply_external_transactions->end()) {
497
    // The same write operation could contain external intents and instruct us to apply them.
498
0
    CHECK_OK(PrepareApplyExternalIntentsBatch(
499
0
        it->second.commit_ht, key_value, regular_write_batch, &it->second.write_id));
500
0
    return false;
501
0
  }
502
786
  key_parts[1] = InvertEncodedDocHT(key_parts[1], &inverted_doc_ht_buffer);
503
786
  constexpr size_t kNumValueParts = 1;
504
786
  intents_write_batch->Put(key_parts, { &key_value, kNumValueParts });
505
506
786
  return false;
507
786
}
508
509
// Usually put_batch contains only records that should be applied to regular DB.
510
// So apply_external_transactions will be empty and regular_entry will be true.
511
//
512
// But in general case on consumer side of CDC put_batch could contain various kinds of records,
513
// that should be applied into regular and intents db.
514
// They are:
515
// apply_external_transactions
516
//   The list of external transactions that should be applied.
517
//   For each such transaction we should lookup for existing external intents (stored in intents DB)
518
//   and convert them to Put command in regular_write_batch plus SingleDelete command in
519
//   intents_write_batch.
520
// write_pairs
521
//   Could contain regular entries, that should be stored into regular DB as is.
522
//   Also pair could contain external intents, that should be stored into intents DB.
523
//   But if apply_external_transactions contains transaction for those external intents, then
524
//   those intents will be applied directly to regular DB, avoiding unnecessary write to intents DB.
525
//   This case is very common for short running transactions.
526
bool PrepareExternalWriteBatch(
527
    const KeyValueWriteBatchPB& put_batch,
528
    HybridTime hybrid_time,
529
    rocksdb::DB* intents_db,
530
    rocksdb::WriteBatch* regular_write_batch,
531
6.34M
    rocksdb::WriteBatch* intents_write_batch) {
532
6.34M
  CHECK(put_batch.read_pairs().empty());
533
534
6.34M
  auto apply_external_transactions = ProcessApplyExternalTransactions(put_batch);
535
536
6.34M
  CHECK_OK(PrepareApplyExternalIntents(
537
6.34M
      &apply_external_transactions, regular_write_batch, intents_db, intents_write_batch));
538
539
6.34M
  bool has_non_external_kvs = false;
540
132M
  for (int write_id = 0; write_id < put_batch.write_pairs_size(); 
++write_id125M
) {
541
125M
    has_non_external_kvs = AddExternalPairToWriteBatch(
542
125M
        put_batch.write_pairs(write_id), hybrid_time, write_id, &apply_external_transactions,
543
125M
        regular_write_batch, intents_write_batch) || 
has_non_external_kvs0
;
544
125M
  }
545
6.34M
  return has_non_external_kvs;
546
6.34M
}
547
548
namespace {
549
550
// Checks if the given slice points to the part of an encoded SubDocKey past all of the subkeys
551
// (and definitely past all the hash/range keys). The only remaining part could be a hybrid time.
552
108M
inline bool IsEndOfSubKeys(const Slice& key) {
553
108M
  return key[0] == ValueTypeAsChar::kGroupEnd &&
554
108M
         
(87.5M
key.size() == 187.5M
||
key[1] == ValueTypeAsChar::kHybridTime71.5M
);
555
108M
}
556
557
// Enumerates weak intent keys generated by considering specified prefixes of the given key and
558
// invoking the provided callback with each combination considered, stored in encoded_key_buffer.
559
// On return, *encoded_key_buffer contains the corresponding strong intent, for which the callback
560
// has not yet been called. It is left to the caller to use the final state of encoded_key_buffer.
561
//
562
// The prefixes of the key considered are as follows:
563
// 1. Up to and including the whole hash key.
564
// 2. Up to and including the whole range key, or if partial_range_key_intents is
565
//    PartialRangeKeyIntents::kTrue, then enumerate the prefix up to the end of each component of
566
//    the range key separately.
567
// 3. Up to and including each subkey component, separately.
568
//
569
// In any case, we stop short of enumerating the last intent key generated based on the above, as
570
// this represents the strong intent key and will be stored in encoded_key_buffer at the end of this
571
// call.
572
//
573
// The beginning of each intent key will also include any cotable_id or colocation_id,
574
// if present.
575
Status EnumerateWeakIntents(
576
    Slice key,
577
    const EnumerateIntentsCallback& functor,
578
    KeyBytes* encoded_key_buffer,
579
87.6M
    PartialRangeKeyIntents partial_range_key_intents) {
580
87.6M
  static const Slice kEmptyIntentValue;
581
582
87.6M
  encoded_key_buffer->Clear();
583
87.6M
  if (key.empty()) {
584
0
    return STATUS(Corruption, "An empty slice is not a valid encoded SubDocKey");
585
0
  }
586
587
87.6M
  const bool has_cotable_id    = *key.cdata() == ValueTypeAsChar::kTableId;
588
87.6M
  const bool has_colocation_id = *key.cdata() == ValueTypeAsChar::kColocationId;
589
87.6M
  {
590
87.6M
    bool is_table_root_key = false;
591
87.6M
    if (has_cotable_id) {
592
12.2M
      const auto kMinExpectedSize = kUuidSize + 2;
593
12.2M
      if (key.size() < kMinExpectedSize) {
594
0
        return STATUS_FORMAT(
595
0
            Corruption,
596
0
            "Expected an encoded SubDocKey starting with a cotable id to be at least $0 bytes long",
597
0
            kMinExpectedSize);
598
0
      }
599
12.2M
      encoded_key_buffer->AppendRawBytes(key.cdata(), kUuidSize + 1);
600
12.2M
      is_table_root_key = key[kUuidSize + 1] == ValueTypeAsChar::kGroupEnd;
601
75.4M
    } else if (has_colocation_id) {
602
1.66k
      const auto kMinExpectedSize = sizeof(ColocationId) + 2;
603
1.66k
      if (key.size() < kMinExpectedSize) {
604
0
        return STATUS_FORMAT(
605
0
            Corruption,
606
0
            "Expected an encoded SubDocKey starting with a colocation id to be"
607
0
            " at least $0 bytes long",
608
0
            kMinExpectedSize);
609
0
      }
610
1.66k
      encoded_key_buffer->AppendRawBytes(key.cdata(), sizeof(ColocationId) + 1);
611
1.66k
      is_table_root_key = key[sizeof(ColocationId) + 1] == ValueTypeAsChar::kGroupEnd;
612
75.4M
    } else {
613
75.4M
      is_table_root_key = *key.cdata() == ValueTypeAsChar::kGroupEnd;
614
75.4M
    }
615
616
87.6M
    encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
617
618
87.6M
    if (is_table_root_key) {
619
      // This must be a "table root" (or "tablet root") key (no hash components, no range
620
      // components, but the cotable might still be there). We are not really considering the case
621
      // of any subkeys under the empty key, so we can return here.
622
95.7k
      return Status::OK();
623
95.7k
    }
624
87.6M
  }
625
626
  // For any non-empty key we already know that the empty key intent is weak.
627
87.5M
  RETURN_NOT_OK(functor(
628
87.5M
      IntentStrength::kWeak, FullDocKey::kFalse, kEmptyIntentValue, encoded_key_buffer,
629
87.5M
      LastKey::kFalse));
630
631
87.5M
  auto hashed_part_size = VERIFY_RESULT(DocKey::EncodedSize(key, DocKeyPart::kUpToHash));
632
633
  // Remove kGroupEnd that we just added to generate a weak intent.
634
0
  encoded_key_buffer->RemoveLastByte();
635
636
87.5M
  if (hashed_part_size != encoded_key_buffer->size()) {
637
    // A hash component is present. Note that if cotable id is present, hashed_part_size would
638
    // also include it, so we only need to append the new bytes.
639
70.8M
    encoded_key_buffer->AppendRawBytes(
640
70.8M
        key.cdata() + encoded_key_buffer->size(), hashed_part_size - encoded_key_buffer->size());
641
70.8M
    key.remove_prefix(hashed_part_size);
642
70.8M
    if (key.empty()) {
643
0
      return STATUS(Corruption, "Range key part missing, expected at least a kGroupEnd");
644
0
    }
645
646
    // Append the kGroupEnd at the end for the empty range part to make this a valid encoded DocKey.
647
70.8M
    encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
648
70.8M
    if (IsEndOfSubKeys(key)) {
649
      // This means the key ends at the hash component -- no range keys and no subkeys.
650
13.3M
      return Status::OK();
651
13.3M
    }
652
653
    // Generate a weak intent that only includes the hash component.
654
57.5M
    RETURN_NOT_OK(functor(
655
57.5M
        IntentStrength::kWeak, FullDocKey(key[0] == ValueTypeAsChar::kGroupEnd), kEmptyIntentValue,
656
57.5M
        encoded_key_buffer, LastKey::kFalse));
657
658
    // Remove the kGroupEnd we added a bit earlier so we can append some range components.
659
57.5M
    encoded_key_buffer->RemoveLastByte();
660
57.5M
  } else {
661
    // No hash component.
662
16.6M
    key.remove_prefix(hashed_part_size);
663
16.6M
  }
664
665
  // Range components.
666
74.1M
  auto range_key_start = key.cdata();
667
74.1M
  while (VERIFY_RESULT(ConsumePrimitiveValueFromKey(&key))) {
668
    // Append the consumed primitive value to encoded_key_buffer.
669
37.4M
    encoded_key_buffer->AppendRawBytes(range_key_start, key.cdata() - range_key_start);
670
    // We always need kGroupEnd at the end to make this a valid encoded DocKey.
671
37.4M
    encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
672
37.4M
    if (key.empty()) {
673
0
      return STATUS(Corruption, "Range key part is not terminated with a kGroupEnd");
674
0
    }
675
37.4M
    if (IsEndOfSubKeys(key)) {
676
      // This is the last range key and there are no subkeys.
677
2.60M
      return Status::OK();
678
2.60M
    }
679
34.8M
    FullDocKey full_doc_key(key[0] == ValueTypeAsChar::kGroupEnd);
680
34.8M
    if (partial_range_key_intents || 
full_doc_key19.9M
) {
681
26.0M
      RETURN_NOT_OK(functor(
682
26.0M
          IntentStrength::kWeak, full_doc_key, kEmptyIntentValue, encoded_key_buffer,
683
26.0M
          LastKey::kFalse));
684
26.0M
    }
685
34.8M
    encoded_key_buffer->RemoveLastByte();
686
34.8M
    range_key_start = key.cdata();
687
34.8M
  }
688
689
  // We still need to append the kGroupEnd byte that closes the range portion to our buffer.
690
  // The corresponding kGroupEnd has already been consumed from the key slice by the last call to
691
  // ConsumePrimitiveValueFromKey, which returned false.
692
71.5M
  encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
693
694
  // Subkey components.
695
71.5M
  auto subkey_start = key.cdata();
696
71.5M
  while (VERIFY_RESULT(SubDocKey::DecodeSubkey(&key))) {
697
    // Append the consumed value to encoded_key_buffer.
698
71.5M
    encoded_key_buffer->AppendRawBytes(subkey_start, key.cdata() - subkey_start);
699
71.5M
    if (key.empty() || 
*key.cdata() == ValueTypeAsChar::kHybridTime840
) {
700
      // This was the last subkey.
701
71.5M
      return Status::OK();
702
71.5M
    }
703
48.0k
    RETURN_NOT_OK(functor(
704
48.0k
        IntentStrength::kWeak, FullDocKey::kTrue, kEmptyIntentValue, encoded_key_buffer,
705
48.0k
        LastKey::kFalse));
706
48.0k
    subkey_start = key.cdata();
707
48.0k
  }
708
709
37.5k
  return STATUS(
710
71.5M
      Corruption,
711
71.5M
      "Expected to reach the end of the key after decoding last valid subkey");
712
71.5M
}
713
714
}  // anonymous namespace
715
716
Status EnumerateIntents(
717
    Slice key, const Slice& intent_value, const EnumerateIntentsCallback& functor,
718
    KeyBytes* encoded_key_buffer, PartialRangeKeyIntents partial_range_key_intents,
719
87.6M
    LastKey last_key) {
720
87.6M
  RETURN_NOT_OK(EnumerateWeakIntents(
721
87.6M
      key, functor, encoded_key_buffer, partial_range_key_intents));
722
87.6M
  return functor(
723
87.6M
      IntentStrength::kStrong, FullDocKey::kTrue, intent_value, encoded_key_buffer, last_key);
724
87.6M
}
725
726
Status EnumerateIntents(
727
    const google::protobuf::RepeatedPtrField<KeyValuePairPB> &kv_pairs,
728
3.21M
    const EnumerateIntentsCallback& functor, PartialRangeKeyIntents partial_range_key_intents) {
729
3.21M
  KeyBytes encoded_key;
730
731
73.6M
  for (int index = 0; index < kv_pairs.size(); ) {
732
70.3M
    const auto &kv_pair = kv_pairs.Get(index);
733
70.3M
    ++index;
734
70.3M
    CHECK(!kv_pair.key().empty());
735
70.3M
    CHECK(!kv_pair.value().empty());
736
70.3M
    RETURN_NOT_OK(EnumerateIntents(
737
70.3M
        kv_pair.key(), kv_pair.value(), functor, &encoded_key, partial_range_key_intents,
738
70.3M
        LastKey(index == kv_pairs.size())));
739
70.3M
  }
740
741
3.21M
  return Status::OK();
742
3.21M
}
743
744
// ------------------------------------------------------------------------------------------------
745
// Standalone functions
746
// ------------------------------------------------------------------------------------------------
747
748
3.03M
void AppendTransactionKeyPrefix(const TransactionId& transaction_id, KeyBytes* out) {
749
3.03M
  out->AppendValueType(ValueType::kTransactionId);
750
3.03M
  out->AppendRawBytes(transaction_id.AsSlice());
751
3.03M
}
752
753
Result<ApplyTransactionState> GetIntentsBatch(
754
    const TransactionId& transaction_id,
755
    const KeyBounds* key_bounds,
756
    const ApplyTransactionState* stream_state,
757
    rocksdb::DB* intents_db,
758
147
    std::vector<IntentKeyValueForCDC>* key_value_intents) {
759
147
  KeyBytes txn_reverse_index_prefix;
760
147
  Slice transaction_id_slice = transaction_id.AsSlice();
761
147
  AppendTransactionKeyPrefix(transaction_id, &txn_reverse_index_prefix);
762
147
  txn_reverse_index_prefix.AppendValueType(ValueType::kMaxByte);
763
147
  Slice key_prefix = txn_reverse_index_prefix.AsSlice();
764
147
  key_prefix.remove_suffix(1);
765
147
  const Slice reverse_index_upperbound = txn_reverse_index_prefix.AsSlice();
766
767
147
  auto reverse_index_iter = CreateRocksDBIterator(
768
147
      intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
769
147
      rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound);
770
771
147
  BoundedRocksDbIterator intent_iter = CreateRocksDBIterator(
772
147
      intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
773
147
      rocksdb::kDefaultQueryId);
774
775
147
  reverse_index_iter.Seek(key_prefix);
776
777
147
  DocHybridTimeBuffer doc_ht_buffer;
778
147
  IntraTxnWriteId write_id = 0;
779
147
  if (stream_state != nullptr && stream_state->active() && 
stream_state->write_id != 00
) {
780
0
    reverse_index_iter.Seek(stream_state->key);
781
0
    write_id = stream_state->write_id;
782
0
    reverse_index_iter.Next();
783
0
  }
784
147
  const uint64_t max_records = FLAGS_cdc_max_stream_intent_records;
785
147
  const uint64_t write_id_limit = write_id + max_records;
786
787
1.49k
  while (reverse_index_iter.Valid()) {
788
1.34k
    const Slice key_slice(reverse_index_iter.key());
789
790
1.34k
    if (!key_slice.starts_with(key_prefix)) {
791
0
      break;
792
0
    }
793
    // If the key ends at the transaction id then it is transaction metadata (status tablet,
794
    // isolation level etc.).
795
1.34k
    if (key_slice.size() > txn_reverse_index_prefix.size()) {
796
1.19k
      auto reverse_index_value = reverse_index_iter.value();
797
1.19k
      if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet) {
798
0
        reverse_index_value.remove_prefix(1);
799
0
        RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value));
800
0
      }
801
      // Value of reverse index is a key of original intent record, so seek it and check match.
802
1.19k
      if ((!key_bounds || key_bounds->IsWithinBounds(reverse_index_iter.value()))) {
803
        // return when we have reached the batch limit.
804
1.19k
        if (write_id >= write_id_limit) {
805
0
          return ApplyTransactionState{
806
0
              .key = key_slice.ToBuffer(),
807
0
              .write_id = write_id,
808
0
          };
809
0
        }
810
1.19k
        {
811
1.19k
          intent_iter.Seek(reverse_index_value);
812
1.19k
          if (!intent_iter.Valid() || intent_iter.key() != reverse_index_value) {
813
0
            LOG(WARNING) << "Unable to find intent: " << reverse_index_value.ToDebugHexString()
814
0
                        << " for " << key_slice.ToDebugHexString();
815
0
            return ApplyTransactionState{};
816
0
          }
817
818
1.19k
          auto intent = VERIFY_RESULT(ParseIntentKey(intent_iter.key(), transaction_id_slice));
819
820
1.19k
          if (intent.types.Test(IntentType::kStrongWrite)) {
821
617
            auto decoded_value =
822
617
                VERIFY_RESULT(DecodeIntentValue(intent_iter.value(), &transaction_id_slice));
823
0
            write_id = decoded_value.write_id;
824
825
617
            if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) {
826
0
              continue;
827
0
            }
828
829
617
            std::array<Slice, 1> key_parts = {{
830
617
                intent.doc_path,
831
617
            }};
832
617
            std::array<Slice, 1> value_parts = {{
833
617
                decoded_value.body,
834
617
            }};
835
836
617
            IntentKeyValueForCDC intent_metadata;
837
617
            intent_metadata.key = Slice(key_parts, &(intent_metadata.key_buf));
838
617
            intent_metadata.value = Slice(value_parts, &(intent_metadata.value_buf));
839
617
            intent_metadata.reverse_index_key = key_slice.ToBuffer();
840
617
            intent_metadata.write_id = write_id;
841
617
            (*key_value_intents).push_back(intent_metadata);
842
843
617
            VLOG(4) << "The size of intentKeyValues in GetIntentList "
844
0
                      << (*key_value_intents).size();
845
617
            ++write_id;
846
617
          }
847
1.19k
        }
848
1.19k
      }
849
1.19k
    }
850
1.34k
    reverse_index_iter.Next();
851
1.34k
  }
852
853
147
  return ApplyTransactionState{};
854
147
}
855
856
0
std::string ApplyTransactionState::ToString() const {
857
0
  return Format(
858
0
      "{ key: $0 write_id: $1 aborted: $2 }", Slice(key).ToDebugString(), write_id, aborted);
859
0
}
860
861
void CombineExternalIntents(
862
    const TransactionId& txn_id,
863
2
    ExternalIntentsProvider* provider) {
864
  // External intents are stored in the following format:
865
  // key: kExternalTransactionId, txn_id
866
  // value: size(intent1_key), intent1_key, size(intent1_value), intent1_value, size(intent2_key)...
867
  // where size is encoded as varint.
868
869
2
  docdb::KeyBytes buffer;
870
2
  buffer.AppendValueType(ValueType::kExternalTransactionId);
871
2
  buffer.AppendRawBytes(txn_id.AsSlice());
872
2
  provider->SetKey(buffer.AsSlice());
873
2
  buffer.Clear();
874
2
  buffer.AppendValueType(ValueType::kUuid);
875
2
  buffer.AppendRawBytes(provider->InvolvedTablet().AsSlice());
876
2
  buffer.AppendValueType(ValueType::kExternalIntents);
877
6
  while (auto key_value = provider->Next()) {
878
4
    buffer.AppendUInt64AsVarInt(key_value->first.size());
879
4
    buffer.AppendRawBytes(key_value->first);
880
4
    buffer.AppendUInt64AsVarInt(key_value->second.size());
881
4
    buffer.AppendRawBytes(key_value->second);
882
4
  }
883
2
  buffer.AppendUInt64AsVarInt(0);
884
2
  provider->SetValue(buffer.AsSlice());
885
2
}
886
887
}  // namespace docdb
888
}  // namespace yb