YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/rocksdb_writer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/rocksdb_writer.h"
15
16
#include "yb/common/row_mark.h"
17
18
#include "yb/docdb/conflict_resolution.h"
19
#include "yb/docdb/doc_key.h"
20
#include "yb/docdb/doc_kv_util.h"
21
#include "yb/docdb/docdb.pb.h"
22
#include "yb/docdb/docdb_rocksdb_util.h"
23
#include "yb/docdb/intent.h"
24
#include "yb/docdb/value_type.h"
25
26
#include "yb/gutil/walltime.h"
27
28
#include "yb/util/bitmap.h"
29
#include "yb/util/flag_tags.h"
30
#include "yb/util/pb_util.h"
31
32
DEFINE_bool(enable_transaction_sealing, false,
33
            "Whether transaction sealing is enabled.");
34
DEFINE_int32(txn_max_apply_batch_records, 100000,
35
             "Max number of apply records allowed in single RocksDB batch. "
36
             "When a transaction's data in one tablet does not fit into specified number of "
37
             "records, it will be applied using multiple RocksDB write batches.");
38
39
DEFINE_test_flag(bool, docdb_sort_weak_intents, false,
40
                "Sort weak intents to make their order deterministic.");
41
DEFINE_test_flag(bool, fail_on_replicated_batch_idx_set_in_txn_record, false,
42
                 "Fail when a set of replicated batch indexes is found in txn record.");
43
44
namespace yb {
45
namespace docdb {
46
47
namespace {
48
49
// Slice parts with the number of slices fixed at compile time.
50
template <int N>
51
struct FixedSliceParts {
52
27.9M
  FixedSliceParts(const std::array<Slice, N>& input) : parts(input.data()) { // NOLINT
53
27.9M
  }
54
55
55.8M
  operator SliceParts() const {
56
55.8M
    return SliceParts(parts, N);
57
55.8M
  }
58
59
  const Slice* parts;
60
};
61
62
// Main intent data::
63
// Prefix + DocPath + IntentType + DocHybridTime -> TxnId + value of the intent
64
// Reverse index by txn id:
65
// Prefix + TxnId + DocHybridTime -> Main intent data key
66
//
67
// Expects that last entry of key is DocHybridTime.
68
template <int N>
69
void AddIntent(
70
    const TransactionId& transaction_id,
71
    const FixedSliceParts<N>& key,
72
    const SliceParts& value,
73
    rocksdb::DirectWriteHandler* handler,
74
27.9M
    Slice reverse_value_prefix = Slice()) {
75
27.9M
  char reverse_key_prefix[1] = { ValueTypeAsChar::kTransactionId };
76
27.9M
  DocHybridTimeWordBuffer doc_ht_buffer;
77
27.9M
  auto doc_ht_slice = InvertEncodedDocHT(key.parts[N - 1], &doc_ht_buffer);
78
79
27.9M
  std::array<Slice, 3> reverse_key = {{
80
27.9M
      Slice(reverse_key_prefix, sizeof(reverse_key_prefix)),
81
27.9M
      transaction_id.AsSlice(),
82
27.9M
      doc_ht_slice,
83
27.9M
  }};
84
27.9M
  handler->Put(key, value);
85
27.9M
  if (reverse_value_prefix.empty()) {
86
27.9M
    handler->Put(reverse_key, key);
87
3.96k
  } else {
88
3.96k
    std::array<Slice, N + 1> reverse_value;
89
3.96k
    reverse_value[0] = reverse_value_prefix;
90
3.96k
    memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N);
91
3.96k
    handler->Put(reverse_key, reverse_value);
92
3.96k
  }
93
27.9M
}
94
95
template <size_t N>
96
void PutApplyState(
97
    const Slice& transaction_id_slice, HybridTime commit_ht, IntraTxnWriteId write_id,
98
2
    const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) {
99
2
  char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
100
2
  char group_end_value_type = ValueTypeAsChar::kGroupEnd;
101
2
  char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
102
2
  DocHybridTime doc_hybrid_time(commit_ht, write_id);
103
2
  char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
104
2
  char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
105
2
      doc_hybrid_time_buffer);
106
2
  std::array<Slice, 5> key_parts = {{
107
2
      Slice(&transaction_apply_state_value_type, 1),
108
2
      transaction_id_slice,
109
2
      Slice(&group_end_value_type, 1),
110
2
      Slice(&hybrid_time_value_type, 1),
111
2
      Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
112
2
  }};
113
2
  handler->Put(key_parts, value_parts);
114
2
}
rocksdb_writer.cc:_ZN2yb5docdb12_GLOBAL__N_113PutApplyStateILm2EEEvRKNS_5SliceENS_10HybridTimeEjRKNSt3__15arrayIS3_XT_EEEPN7rocksdb18DirectWriteHandlerE
Line
Count
Source
98
1
    const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) {
99
1
  char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
100
1
  char group_end_value_type = ValueTypeAsChar::kGroupEnd;
101
1
  char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
102
1
  DocHybridTime doc_hybrid_time(commit_ht, write_id);
103
1
  char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
104
1
  char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
105
1
      doc_hybrid_time_buffer);
106
1
  std::array<Slice, 5> key_parts = {{
107
1
      Slice(&transaction_apply_state_value_type, 1),
108
1
      transaction_id_slice,
109
1
      Slice(&group_end_value_type, 1),
110
1
      Slice(&hybrid_time_value_type, 1),
111
1
      Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
112
1
  }};
113
1
  handler->Put(key_parts, value_parts);
114
1
}
rocksdb_writer.cc:_ZN2yb5docdb12_GLOBAL__N_113PutApplyStateILm1EEEvRKNS_5SliceENS_10HybridTimeEjRKNSt3__15arrayIS3_XT_EEEPN7rocksdb18DirectWriteHandlerE
Line
Count
Source
98
1
    const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) {
99
1
  char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
100
1
  char group_end_value_type = ValueTypeAsChar::kGroupEnd;
101
1
  char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
102
1
  DocHybridTime doc_hybrid_time(commit_ht, write_id);
103
1
  char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
104
1
  char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
105
1
      doc_hybrid_time_buffer);
106
1
  std::array<Slice, 5> key_parts = {{
107
1
      Slice(&transaction_apply_state_value_type, 1),
108
1
      transaction_id_slice,
109
1
      Slice(&group_end_value_type, 1),
110
1
      Slice(&hybrid_time_value_type, 1),
111
1
      Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
112
1
  }};
113
1
  handler->Put(key_parts, value_parts);
114
1
}
115
116
} // namespace
117
118
NonTransactionalWriter::NonTransactionalWriter(
119
    std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch, HybridTime hybrid_time)
120
3.42M
    : put_batch_(put_batch), hybrid_time_(hybrid_time) {
121
3.42M
}
122
123
0
bool NonTransactionalWriter::Empty() const {
124
0
  return put_batch_.write_pairs().empty();
125
0
}
126
127
3.42M
Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) {
128
3.42M
  DocHybridTimeBuffer doc_ht_buffer;
129
130
3.42M
  int write_id = 0;
131
39.5M
  for (const auto& kv_pair : put_batch_.write_pairs()) {
132
133
39.5M
    CHECK(!kv_pair.key().empty());
134
39.5M
    CHECK(!kv_pair.value().empty());
135
136
39.5M
    if (kv_pair.key()[0] == ValueTypeAsChar::kExternalTransactionId) {
137
0
      continue;
138
0
    }
139
140
39.5M
#ifndef NDEBUG
141
    // Debug-only: ensure all keys we get in Raft replication can be decoded.
142
39.5M
    SubDocKey subdoc_key;
143
39.5M
    Status s = subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(kv_pair.key());
144
18.4E
    CHECK(s.ok())
145
18.4E
        << "Failed decoding key: " << s.ToString() << "; "
146
18.4E
        << "Problematic key: " << BestEffortDocDBKeyToStr(KeyBytes(kv_pair.key())) << "\n"
147
18.4E
        << "value: " << FormatBytesAsStr(kv_pair.value());
148
39.5M
#endif
149
150
    // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here.
151
    // The reason for this is that the HybridTime timestamp is only picked at the time of
152
    // appending  an entry to the tablet's Raft log. Also this is a good way to save network
153
    // bandwidth.
154
    //
155
    // "Write id" is the final component of our HybridTime encoding (or, to be more precise,
156
    // DocHybridTime encoding) that helps disambiguate between different updates to the
157
    // same key (row/column) within a transaction. We set it based on the position of the write
158
    // operation in its write batch.
159
160
39.5M
    auto hybrid_time = kv_pair.has_external_hybrid_time() ?
161
39.5M
        HybridTime(kv_pair.external_hybrid_time()) : hybrid_time_;
162
39.5M
    std::array<Slice, 2> key_parts = {{
163
39.5M
        Slice(kv_pair.key()),
164
39.5M
        doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id),
165
39.5M
    }};
166
39.5M
    Slice key_value = kv_pair.value();
167
39.5M
    handler->Put(key_parts, SliceParts(&key_value, 1));
168
169
39.5M
    ++write_id;
170
39.5M
  }
171
172
3.42M
  return Status::OK();
173
3.42M
}
174
175
TransactionalWriter::TransactionalWriter(
176
    std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch,
177
    HybridTime hybrid_time,
178
    const TransactionId& transaction_id,
179
    IsolationLevel isolation_level,
180
    PartialRangeKeyIntents partial_range_key_intents,
181
    const Slice& replicated_batches_state,
182
    IntraTxnWriteId intra_txn_write_id)
183
    : put_batch_(put_batch),
184
      hybrid_time_(hybrid_time),
185
      transaction_id_(transaction_id),
186
      isolation_level_(isolation_level),
187
      partial_range_key_intents_(partial_range_key_intents),
188
      replicated_batches_state_(replicated_batches_state),
189
1.26M
      intra_txn_write_id_(intra_txn_write_id) {
190
1.26M
}
191
192
// We have the following distinct types of data in this "intent store":
193
// Main intent data:
194
//   Prefix + SubDocKey (no HybridTime) + IntentType + HybridTime -> TxnId + value of the intent
195
// Transaction metadata
196
//   TxnId -> status tablet id + isolation level
197
// Reverse index by txn id
198
//   TxnId + HybridTime -> Main intent data key
199
//
200
// Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with
201
// appropriate value type.
202
1.26M
CHECKED_STATUS TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) {
203
3.53k
  VLOG(4) << "PrepareTransactionWriteBatch(), write_id = " << write_id_;
204
205
1.26M
  row_mark_ = GetRowMarkTypeFromPB(put_batch_);
206
1.26M
  handler_ = handler;
207
208
1.26M
  if (metadata_to_store_) {
209
967k
    auto txn_value_type = ValueTypeAsChar::kTransactionId;
210
967k
    std::array<Slice, 2> key = {
211
967k
      Slice(&txn_value_type, 1),
212
967k
      transaction_id_.AsSlice(),
213
967k
    };
214
967k
    auto data_copy = *metadata_to_store_;
215
    // We use hybrid time only for backward compatibility, actually wall time is required.
216
967k
    data_copy.set_metadata_write_time(GetCurrentTimeMicros());
217
967k
    auto value = data_copy.SerializeAsString();
218
967k
    Slice value_slice(value);
219
967k
    handler->Put(key, SliceParts(&value_slice, 1));
220
967k
  }
221
222
1.26M
  subtransaction_id_ = put_batch_.has_subtransaction()
223
267
      ? put_batch_.subtransaction().subtransaction_id()
224
1.26M
      : kMinSubTransactionId;
225
226
1.26M
  if (!put_batch_.write_pairs().empty()) {
227
980k
    if (IsValidRowMarkType(row_mark_)) {
228
0
      LOG(WARNING) << "Performing a write with row lock " << RowMarkType_Name(row_mark_)
229
0
                   << " when only reads are expected";
230
0
    }
231
980k
    strong_intent_types_ = GetStrongIntentTypeSet(
232
980k
        isolation_level_, OperationKind::kWrite, row_mark_);
233
234
    // We cannot recover from failures here, because it means that we cannot apply replicated
235
    // operation.
236
980k
    RETURN_NOT_OK(EnumerateIntents(
237
980k
        put_batch_.write_pairs(), std::ref(*this), partial_range_key_intents_));
238
980k
  }
239
240
1.26M
  if (!put_batch_.read_pairs().empty()) {
241
368k
    strong_intent_types_ = GetStrongIntentTypeSet(
242
368k
        isolation_level_, OperationKind::kRead, row_mark_);
243
368k
    RETURN_NOT_OK(EnumerateIntents(
244
368k
        put_batch_.read_pairs(), std::ref(*this), partial_range_key_intents_));
245
368k
  }
246
247
1.26M
  return Finish();
248
1.26M
}
249
250
// Using operator() to pass this object conveniently to EnumerateIntents.
251
CHECKED_STATUS TransactionalWriter::operator()(
252
    IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key,
253
60.3M
    LastKey last_key) {
254
60.3M
  if (intent_strength == IntentStrength::kWeak) {
255
39.9M
    weak_intents_[key->data()] |= StrongToWeak(strong_intent_types_);
256
39.9M
    return Status::OK();
257
39.9M
  }
258
259
20.4M
  const auto transaction_value_type = ValueTypeAsChar::kTransactionId;
260
20.4M
  const auto write_id_value_type = ValueTypeAsChar::kWriteId;
261
20.4M
  const auto row_lock_value_type = ValueTypeAsChar::kRowLock;
262
20.4M
  IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(intra_txn_write_id_);
263
264
20.4M
  const auto subtransaction_value_type = ValueTypeAsChar::kSubTransactionId;
265
20.4M
  SubTransactionId big_endian_subtxn_id;
266
20.4M
  Slice subtransaction_marker;
267
20.4M
  Slice subtransaction_id;
268
20.4M
  if (subtransaction_id_ > kMinSubTransactionId) {
269
1.52k
    subtransaction_marker = Slice(&subtransaction_value_type, 1);
270
1.52k
    big_endian_subtxn_id = BigEndian::FromHost32(subtransaction_id_);
271
1.52k
    subtransaction_id = Slice::FromPod(&big_endian_subtxn_id);
272
20.4M
  } else {
273
20.4M
    DCHECK_EQ(subtransaction_id_, kMinSubTransactionId);
274
20.4M
  }
275
276
20.4M
  std::array<Slice, 7> value = {{
277
20.4M
      Slice(&transaction_value_type, 1),
278
20.4M
      transaction_id_.AsSlice(),
279
20.4M
      subtransaction_marker,
280
20.4M
      subtransaction_id,
281
20.4M
      Slice(&write_id_value_type, 1),
282
20.4M
      Slice::FromPod(&big_endian_write_id),
283
20.4M
      value_slice,
284
20.4M
  }};
285
  // Store a row lock indicator rather than data (in value_slice) for row lock intents.
286
20.4M
  if (IsValidRowMarkType(row_mark_)) {
287
227k
    value.back() = Slice(&row_lock_value_type, 1);
288
227k
  }
289
290
20.4M
  ++intra_txn_write_id_;
291
292
20.4M
  char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet,
293
20.4M
                          static_cast<char>(strong_intent_types_.ToUIntPtr()) };
294
295
20.4M
  DocHybridTimeBuffer doc_ht_buffer;
296
297
20.4M
  constexpr size_t kNumKeyParts = 3;
298
20.4M
  std::array<Slice, kNumKeyParts> key_parts = {{
299
20.4M
      key->AsSlice(),
300
20.4M
      Slice(intent_type, 2),
301
20.4M
      doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++),
302
20.4M
  }};
303
304
20.4M
  Slice reverse_value_prefix;
305
20.4M
  if (last_key && FLAGS_enable_transaction_sealing) {
306
0
    reverse_value_prefix = replicated_batches_state_;
307
0
  }
308
20.4M
  AddIntent<kNumKeyParts>(transaction_id_, key_parts, value, handler_, reverse_value_prefix);
309
310
20.4M
  return Status::OK();
311
20.4M
}
312
313
1.26M
CHECKED_STATUS TransactionalWriter::Finish() {
314
1.26M
  char transaction_id_value_type = ValueTypeAsChar::kTransactionId;
315
316
1.26M
  DocHybridTimeBuffer doc_ht_buffer;
317
318
1.26M
  std::array<Slice, 2> value = {{
319
1.26M
      Slice(&transaction_id_value_type, 1),
320
1.26M
      transaction_id_.AsSlice(),
321
1.26M
  }};
322
323
1.26M
  if (PREDICT_FALSE(FLAGS_TEST_docdb_sort_weak_intents)) {
324
    // This is done in tests when deterministic DocDB state is required.
325
0
    std::vector<std::pair<KeyBuffer, IntentTypeSet>> intents_and_types(
326
0
        weak_intents_.begin(), weak_intents_.end());
327
0
    sort(intents_and_types.begin(), intents_and_types.end());
328
0
    for (const auto& intent_and_types : intents_and_types) {
329
0
      RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer));
330
0
    }
331
0
    return Status::OK();
332
1.26M
  }
333
334
7.49M
  for (const auto& intent_and_types : weak_intents_) {
335
7.49M
    RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer));
336
7.49M
  }
337
338
1.26M
  return Status::OK();
339
1.26M
}
340
341
CHECKED_STATUS TransactionalWriter::AddWeakIntent(
342
    const std::pair<KeyBuffer, IntentTypeSet>& intent_and_types,
343
    const std::array<Slice, 2>& value,
344
7.49M
    DocHybridTimeBuffer* doc_ht_buffer) {
345
7.49M
  char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet,
346
7.49M
                          static_cast<char>(intent_and_types.second.ToUIntPtr()) };
347
7.49M
  constexpr size_t kNumKeyParts = 3;
348
7.49M
  std::array<Slice, kNumKeyParts> key = {{
349
7.49M
      intent_and_types.first.AsSlice(),
350
7.49M
      Slice(intent_type, 2),
351
7.49M
      doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++),
352
7.49M
  }};
353
354
7.49M
  AddIntent<kNumKeyParts>(transaction_id_, key, value, handler_);
355
356
7.49M
  return Status::OK();
357
7.49M
}
358
359
92.5M
DocHybridTimeBuffer::DocHybridTimeBuffer() {
360
92.5M
  buffer_[0] = ValueTypeAsChar::kHybridTime;
361
92.5M
}
362
363
IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id)
364
    : transaction_id_(transaction_id),
365
1.70M
      left_records_(FLAGS_txn_max_apply_batch_records) {
366
1.70M
}
367
368
IntentsWriter::IntentsWriter(const Slice& start_key,
369
                             rocksdb::DB* intents_db,
370
                             IntentsWriterContext* context)
371
1.70M
    : start_key_(start_key), intents_db_(intents_db), context_(*context) {
372
1.70M
}
373
374
1.70M
CHECKED_STATUS IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) {
375
1.70M
  KeyBytes txn_reverse_index_prefix;
376
1.70M
  AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix);
377
1.70M
  txn_reverse_index_prefix.AppendValueType(ValueType::kMaxByte);
378
1.70M
  Slice key_prefix = txn_reverse_index_prefix.AsSlice();
379
1.70M
  key_prefix.remove_suffix(1);
380
1.70M
  const Slice reverse_index_upperbound = txn_reverse_index_prefix.AsSlice();
381
382
1.70M
  auto reverse_index_iter = CreateRocksDBIterator(
383
1.70M
      intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
384
1.70M
      rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound);
385
386
1.70M
  DocHybridTimeBuffer doc_ht_buffer;
387
388
1.70M
  reverse_index_iter.Seek(start_key_.empty() ? key_prefix : start_key_);
389
390
1.70M
  context_.Start(
391
18.4E
      reverse_index_iter.Valid() ? boost::make_optional(reverse_index_iter.key()) : boost::none);
392
393
57.2M
  while (reverse_index_iter.Valid()) {
394
55.5M
    const Slice key_slice(reverse_index_iter.key());
395
396
55.5M
    if (!key_slice.starts_with(key_prefix)) {
397
0
      break;
398
0
    }
399
400
55.5M
    auto reverse_index_value = reverse_index_iter.value();
401
402
55.5M
    bool metadata = key_slice.size() == 1 + TransactionId::StaticSize();
403
    // At this point, txn_reverse_index_prefix is a prefix of key_slice. If key_slice is equal to
404
    // txn_reverse_index_prefix in size, then they are identical, and we are seeked to transaction
405
    // metadata. Otherwise, we're seeked to an intent entry in the index which we may process.
406
55.5M
    if (!metadata) {
407
53.8M
      if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet) {
408
0
        CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record);
409
0
        reverse_index_value.remove_prefix(1);
410
0
        RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value));
411
0
      }
412
53.8M
    }
413
414
55.5M
    if (VERIFY_RESULT(context_.Entry(key_slice, reverse_index_value, metadata, handler))) {
415
41
      return Status::OK();
416
41
    }
417
418
55.5M
    reverse_index_iter.Next();
419
55.5M
  }
420
421
1.70M
  context_.Complete(handler);
422
423
1.70M
  return Status::OK();
424
1.70M
}
425
426
ApplyIntentsContext::ApplyIntentsContext(
427
    const TransactionId& transaction_id,
428
    const ApplyTransactionState* apply_state,
429
    const AbortedSubTransactionSet& aborted,
430
    HybridTime commit_ht,
431
    HybridTime log_ht,
432
    const KeyBounds* key_bounds,
433
    rocksdb::DB* intents_db)
434
    : IntentsWriterContext(transaction_id),
435
      apply_state_(apply_state),
436
      // In case we have passed in a non-null apply_state, it's aborted set will have been loaded
437
      // from persisted apply state, and the passed in aborted set will correspond to the aborted
438
      // set at commit time. Rather then copy that set upstream so it is passed in as aborted, we
439
      // simply grab a reference to it here, if it is defined, to use in this method.
440
      aborted_(apply_state ? apply_state->aborted : aborted),
441
      commit_ht_(commit_ht),
442
      log_ht_(log_ht),
443
      write_id_(apply_state ? apply_state->write_id : 0),
444
      key_bounds_(key_bounds),
445
      intent_iter_(CreateRocksDBIterator(
446
          intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
447
737k
          rocksdb::kDefaultQueryId)) {
448
737k
}
449
450
Result<bool> ApplyIntentsContext::StoreApplyState(
451
1
    const Slice& key, rocksdb::DirectWriteHandler* handler) {
452
1
  SetApplyState(key, write_id_, aborted_);
453
1
  ApplyTransactionStatePB pb;
454
1
  apply_state().ToPB(&pb);
455
1
  pb.set_commit_ht(commit_ht_.ToUint64());
456
1
  faststring encoded_pb;
457
1
  pb_util::SerializeToString(pb, &encoded_pb);
458
1
  char string_value_type = ValueTypeAsChar::kString;
459
1
  std::array<Slice, 2> value_parts = {{
460
1
    Slice(&string_value_type, 1),
461
1
    Slice(encoded_pb.data(), encoded_pb.size())
462
1
  }};
463
1
  PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler);
464
1
  return true;
465
1
}
466
467
737k
void ApplyIntentsContext::Start(const boost::optional<Slice>& first_key) {
468
737k
  if (!apply_state_) {
469
737k
    return;
470
737k
  }
471
  // This sanity check is invalid for remove case, because .SST file could be deleted.
472
18.4E
  LOG_IF(DFATAL, !first_key || *first_key != apply_state_->key)
473
18.4E
      << "Continue from wrong key: " << Slice(apply_state_->key).ToDebugString() << ", txn: "
474
18.4E
      << transaction_id() << ", position: "
475
18.4E
      << (first_key ? first_key->ToDebugString() : "<INVALID>")
476
18.4E
      << ", write id: " << apply_state_->write_id;
477
18.4E
}
478
479
Result<bool> ApplyIntentsContext::Entry(
480
26.8M
    const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) {
481
  // Value of reverse index is a key of original intent record, so seek it and check match.
482
26.8M
  if (metadata || !IsWithinBounds(key_bounds_, value)) {
483
749k
    return false;
484
749k
  }
485
486
  // We store apply state only if there are some more intents left.
487
  // So doing this check here, instead of right after write_id was incremented.
488
26.1M
  if (reached_records_limit()) {
489
1
    return StoreApplyState(key, handler);
490
1
  }
491
492
26.1M
  DocHybridTimeBuffer doc_ht_buffer;
493
26.1M
  intent_iter_.Seek(value);
494
26.1M
  if (!intent_iter_.Valid() || intent_iter_.key() != value) {
495
0
    Slice temp_slice = value;
496
0
    auto value_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice);
497
0
    temp_slice = key;
498
0
    auto key_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice);
499
0
    LOG(DFATAL) << "Unable to find intent: " << value.ToDebugHexString() << " ("
500
0
                << value_doc_ht << ") for " << key.ToDebugHexString() << "(" << key_doc_ht << ")";
501
0
    return false;
502
0
  }
503
504
26.1M
  auto intent = VERIFY_RESULT(ParseIntentKey(value, transaction_id().AsSlice()));
505
506
26.1M
  if (intent.types.Test(IntentType::kStrongWrite)) {
507
16.8M
    const Slice transaction_id_slice = transaction_id().AsSlice();
508
16.8M
    auto decoded_value = VERIFY_RESULT(DecodeIntentValue(
509
16.8M
        intent_iter_.value(), &transaction_id_slice));
510
511
    // Write id should match to one that were calculated during append of intents.
512
    // Doing it just for sanity check.
513
16.8M
    RSTATUS_DCHECK_GE(
514
16.8M
        decoded_value.write_id, write_id_,
515
16.8M
        Corruption,
516
16.8M
        Format("Unexpected write id. Expected: $0, found: $1, raw value: $2",
517
16.8M
               write_id_,
518
16.8M
               decoded_value.write_id,
519
16.8M
               intent_iter_.value().ToDebugHexString()));
520
16.8M
    write_id_ = decoded_value.write_id;
521
522
    // Intents for row locks should be ignored (i.e. should not be written as regular records).
523
16.8M
    if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) {
524
4.19k
      return false;
525
4.19k
    }
526
527
    // Intents from aborted subtransactions should not be written as regular records.
528
16.8M
    if (aborted_.Test(decoded_value.subtransaction_id)) {
529
615
      return false;
530
615
    }
531
532
    // After strip of prefix and suffix intent_key contains just SubDocKey w/o a hybrid time.
533
    // Time will be added when writing batch to RocksDB.
534
16.8M
    std::array<Slice, 2> key_parts = {{
535
16.8M
        intent.doc_path,
536
16.8M
        doc_ht_buffer.EncodeWithValueType(commit_ht_, write_id_),
537
16.8M
    }};
538
16.8M
    std::array<Slice, 2> value_parts = {{
539
16.8M
        intent.doc_ht,
540
16.8M
        decoded_value.body,
541
16.8M
    }};
542
543
    // Useful when debugging transaction failure.
544
#if defined(DUMP_APPLY)
545
    SubDocKey sub_doc_key;
546
    CHECK_OK(sub_doc_key.FullyDecodeFrom(intent.doc_path, HybridTimeRequired::kFalse));
547
    if (!sub_doc_key.subkeys().empty()) {
548
      auto txn_id = FullyDecodeTransactionId(transaction_id_slice);
549
      LOG(INFO) << "Apply: " << sub_doc_key.ToString()
550
                << ", time: " << commit_ht << ", write id: " << *write_id << ", txn: " << txn_id
551
                << ", value: " << intent_value.ToDebugString();
552
    }
553
#endif
554
555
16.8M
    handler->Put(key_parts, value_parts);
556
16.8M
    ++write_id_;
557
16.8M
    RegisterRecord();
558
16.8M
  }
559
560
26.1M
  return false;
561
26.1M
}
562
563
737k
void ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) {
564
737k
  if (apply_state_) {
565
1
    char tombstone_value_type = ValueTypeAsChar::kTombstone;
566
1
    std::array<Slice, 1> value_parts = {{Slice(&tombstone_value_type, 1)}};
567
1
    PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler);
568
1
  }
569
737k
}
570
571
RemoveIntentsContext::RemoveIntentsContext(const TransactionId& transaction_id)
572
968k
    : IntentsWriterContext(transaction_id) {
573
968k
}
574
575
Result<bool> RemoveIntentsContext::Entry(
576
28.7M
    const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) {
577
28.7M
  if (reached_records_limit()) {
578
40
    SetApplyState(key, 0, AbortedSubTransactionSet());
579
40
    return true;
580
40
  }
581
582
28.7M
  handler->SingleDelete(key);
583
28.7M
  RegisterRecord();
584
28.7M
  if (!metadata) {
585
27.7M
    handler->SingleDelete(value);
586
27.7M
    RegisterRecord();
587
27.7M
  }
588
28.7M
  return false;
589
28.7M
}
590
591
968k
void RemoveIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) {
592
968k
}
593
594
} // namespace docdb
595
} // namespace yb