YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/rocksdb_writer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/rocksdb_writer.h"
15
16
#include "yb/common/row_mark.h"
17
18
#include "yb/docdb/conflict_resolution.h"
19
#include "yb/docdb/doc_key.h"
20
#include "yb/docdb/doc_kv_util.h"
21
#include "yb/docdb/docdb.pb.h"
22
#include "yb/docdb/docdb_rocksdb_util.h"
23
#include "yb/docdb/intent.h"
24
#include "yb/docdb/value_type.h"
25
26
#include "yb/gutil/walltime.h"
27
28
#include "yb/util/bitmap.h"
29
#include "yb/util/flag_tags.h"
30
#include "yb/util/pb_util.h"
31
32
DEFINE_bool(enable_transaction_sealing, false,
33
            "Whether transaction sealing is enabled.");
34
DEFINE_int32(txn_max_apply_batch_records, 100000,
35
             "Max number of apply records allowed in single RocksDB batch. "
36
             "When a transaction's data in one tablet does not fit into specified number of "
37
             "records, it will be applied using multiple RocksDB write batches.");
38
39
DEFINE_test_flag(bool, docdb_sort_weak_intents, false,
40
                "Sort weak intents to make their order deterministic.");
41
DEFINE_test_flag(bool, fail_on_replicated_batch_idx_set_in_txn_record, false,
42
                 "Fail when a set of replicated batch indexes is found in txn record.");
43
44
namespace yb {
45
namespace docdb {
46
47
namespace {
48
49
// Slice parts with the number of slices fixed at compile time.
50
template <int N>
51
struct FixedSliceParts {
52
86.6M
  FixedSliceParts(const std::array<Slice, N>& input) : parts(input.data()) { // NOLINT
53
86.6M
  }
54
55
173M
  operator SliceParts() const {
56
173M
    return SliceParts(parts, N);
57
173M
  }
58
59
  const Slice* parts;
60
};
61
62
// Main intent data::
63
// Prefix + DocPath + IntentType + DocHybridTime -> TxnId + value of the intent
64
// Reverse index by txn id:
65
// Prefix + TxnId + DocHybridTime -> Main intent data key
66
//
67
// Expects that last entry of key is DocHybridTime.
68
template <int N>
69
void AddIntent(
70
    const TransactionId& transaction_id,
71
    const FixedSliceParts<N>& key,
72
    const SliceParts& value,
73
    rocksdb::DirectWriteHandler* handler,
74
86.6M
    Slice reverse_value_prefix = Slice()) {
75
86.6M
  char reverse_key_prefix[1] = { ValueTypeAsChar::kTransactionId };
76
86.6M
  DocHybridTimeWordBuffer doc_ht_buffer;
77
86.6M
  auto doc_ht_slice = InvertEncodedDocHT(key.parts[N - 1], &doc_ht_buffer);
78
79
86.6M
  std::array<Slice, 3> reverse_key = {{
80
86.6M
      Slice(reverse_key_prefix, sizeof(reverse_key_prefix)),
81
86.6M
      transaction_id.AsSlice(),
82
86.6M
      doc_ht_slice,
83
86.6M
  }};
84
86.6M
  handler->Put(key, value);
85
86.6M
  if (reverse_value_prefix.empty()) {
86
86.6M
    handler->Put(reverse_key, key);
87
86.6M
  } else {
88
38.2k
    std::array<Slice, N + 1> reverse_value;
89
38.2k
    reverse_value[0] = reverse_value_prefix;
90
38.2k
    memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N);
91
38.2k
    handler->Put(reverse_key, reverse_value);
92
38.2k
  }
93
86.6M
}
94
95
template <size_t N>
96
void PutApplyState(
97
    const Slice& transaction_id_slice, HybridTime commit_ht, IntraTxnWriteId write_id,
98
243
    const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) {
99
243
  char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
100
243
  char group_end_value_type = ValueTypeAsChar::kGroupEnd;
101
243
  char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
102
243
  DocHybridTime doc_hybrid_time(commit_ht, write_id);
103
243
  char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
104
243
  char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
105
243
      doc_hybrid_time_buffer);
106
243
  std::array<Slice, 5> key_parts = {{
107
243
      Slice(&transaction_apply_state_value_type, 1),
108
243
      transaction_id_slice,
109
243
      Slice(&group_end_value_type, 1),
110
243
      Slice(&hybrid_time_value_type, 1),
111
243
      Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
112
243
  }};
113
243
  handler->Put(key_parts, value_parts);
114
243
}
rocksdb_writer.cc:void yb::docdb::(anonymous namespace)::PutApplyState<2ul>(yb::Slice const&, yb::HybridTime, unsigned int, std::__1::array<yb::Slice, 2ul> const&, rocksdb::DirectWriteHandler*)
Line
Count
Source
98
183
    const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) {
99
183
  char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
100
183
  char group_end_value_type = ValueTypeAsChar::kGroupEnd;
101
183
  char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
102
183
  DocHybridTime doc_hybrid_time(commit_ht, write_id);
103
183
  char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
104
183
  char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
105
183
      doc_hybrid_time_buffer);
106
183
  std::array<Slice, 5> key_parts = {{
107
183
      Slice(&transaction_apply_state_value_type, 1),
108
183
      transaction_id_slice,
109
183
      Slice(&group_end_value_type, 1),
110
183
      Slice(&hybrid_time_value_type, 1),
111
183
      Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
112
183
  }};
113
183
  handler->Put(key_parts, value_parts);
114
183
}
rocksdb_writer.cc:void yb::docdb::(anonymous namespace)::PutApplyState<1ul>(yb::Slice const&, yb::HybridTime, unsigned int, std::__1::array<yb::Slice, 1ul> const&, rocksdb::DirectWriteHandler*)
Line
Count
Source
98
60
    const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) {
99
60
  char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
100
60
  char group_end_value_type = ValueTypeAsChar::kGroupEnd;
101
60
  char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
102
60
  DocHybridTime doc_hybrid_time(commit_ht, write_id);
103
60
  char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
104
60
  char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
105
60
      doc_hybrid_time_buffer);
106
60
  std::array<Slice, 5> key_parts = {{
107
60
      Slice(&transaction_apply_state_value_type, 1),
108
60
      transaction_id_slice,
109
60
      Slice(&group_end_value_type, 1),
110
60
      Slice(&hybrid_time_value_type, 1),
111
60
      Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
112
60
  }};
113
60
  handler->Put(key_parts, value_parts);
114
60
}
115
116
} // namespace
117
118
NonTransactionalWriter::NonTransactionalWriter(
119
    std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch, HybridTime hybrid_time)
120
6.34M
    : put_batch_(put_batch), hybrid_time_(hybrid_time) {
121
6.34M
}
122
123
0
bool NonTransactionalWriter::Empty() const {
124
0
  return put_batch_.write_pairs().empty();
125
0
}
126
127
6.34M
Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) {
128
6.34M
  DocHybridTimeBuffer doc_ht_buffer;
129
130
6.34M
  int write_id = 0;
131
125M
  for (const auto& kv_pair : put_batch_.write_pairs()) {
132
133
125M
    CHECK(!kv_pair.key().empty());
134
125M
    CHECK(!kv_pair.value().empty());
135
136
125M
    if (kv_pair.key()[0] == ValueTypeAsChar::kExternalTransactionId) {
137
0
      continue;
138
0
    }
139
140
125M
#ifndef NDEBUG
141
    // Debug-only: ensure all keys we get in Raft replication can be decoded.
142
125M
    SubDocKey subdoc_key;
143
125M
    Status s = subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(kv_pair.key());
144
18.4E
    CHECK(s.ok())
145
18.4E
        << "Failed decoding key: " << s.ToString() << "; "
146
18.4E
        << "Problematic key: " << BestEffortDocDBKeyToStr(KeyBytes(kv_pair.key())) << "\n"
147
18.4E
        << "value: " << FormatBytesAsStr(kv_pair.value());
148
125M
#endif
149
150
    // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here.
151
    // The reason for this is that the HybridTime timestamp is only picked at the time of
152
    // appending  an entry to the tablet's Raft log. Also this is a good way to save network
153
    // bandwidth.
154
    //
155
    // "Write id" is the final component of our HybridTime encoding (or, to be more precise,
156
    // DocHybridTime encoding) that helps disambiguate between different updates to the
157
    // same key (row/column) within a transaction. We set it based on the position of the write
158
    // operation in its write batch.
159
160
125M
    auto hybrid_time = kv_pair.has_external_hybrid_time() ?
161
125M
        
HybridTime(kv_pair.external_hybrid_time())0
: hybrid_time_;
162
125M
    std::array<Slice, 2> key_parts = {{
163
125M
        Slice(kv_pair.key()),
164
125M
        doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id),
165
125M
    }};
166
125M
    Slice key_value = kv_pair.value();
167
125M
    handler->Put(key_parts, SliceParts(&key_value, 1));
168
169
125M
    ++write_id;
170
125M
  }
171
172
6.34M
  return Status::OK();
173
6.34M
}
174
175
TransactionalWriter::TransactionalWriter(
176
    std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch,
177
    HybridTime hybrid_time,
178
    const TransactionId& transaction_id,
179
    IsolationLevel isolation_level,
180
    PartialRangeKeyIntents partial_range_key_intents,
181
    const Slice& replicated_batches_state,
182
    IntraTxnWriteId intra_txn_write_id)
183
    : put_batch_(put_batch),
184
      hybrid_time_(hybrid_time),
185
      transaction_id_(transaction_id),
186
      isolation_level_(isolation_level),
187
      partial_range_key_intents_(partial_range_key_intents),
188
      replicated_batches_state_(replicated_batches_state),
189
2.31M
      intra_txn_write_id_(intra_txn_write_id) {
190
2.31M
}
191
192
// We have the following distinct types of data in this "intent store":
193
// Main intent data:
194
//   Prefix + SubDocKey (no HybridTime) + IntentType + HybridTime -> TxnId + value of the intent
195
// Transaction metadata
196
//   TxnId -> status tablet id + isolation level
197
// Reverse index by txn id
198
//   TxnId + HybridTime -> Main intent data key
199
//
200
// Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with
201
// appropriate value type.
202
2.30M
CHECKED_STATUS TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) {
203
2.30M
  VLOG
(4) << "PrepareTransactionWriteBatch(), write_id = " << write_id_4.30k
;
204
205
2.30M
  row_mark_ = GetRowMarkTypeFromPB(put_batch_);
206
2.30M
  handler_ = handler;
207
208
2.30M
  if (metadata_to_store_) {
209
1.66M
    auto txn_value_type = ValueTypeAsChar::kTransactionId;
210
1.66M
    std::array<Slice, 2> key = {
211
1.66M
      Slice(&txn_value_type, 1),
212
1.66M
      transaction_id_.AsSlice(),
213
1.66M
    };
214
1.66M
    auto data_copy = *metadata_to_store_;
215
    // We use hybrid time only for backward compatibility, actually wall time is required.
216
1.66M
    data_copy.set_metadata_write_time(GetCurrentTimeMicros());
217
1.66M
    auto value = data_copy.SerializeAsString();
218
1.66M
    Slice value_slice(value);
219
1.66M
    handler->Put(key, SliceParts(&value_slice, 1));
220
1.66M
  }
221
222
2.30M
  subtransaction_id_ = put_batch_.has_subtransaction()
223
2.30M
      ? 
put_batch_.subtransaction().subtransaction_id()9.27k
224
2.30M
      : 
kMinSubTransactionId2.30M
;
225
226
2.30M
  if (!put_batch_.write_pairs().empty()) {
227
1.66M
    if (IsValidRowMarkType(row_mark_)) {
228
0
      LOG(WARNING) << "Performing a write with row lock " << RowMarkType_Name(row_mark_)
229
0
                   << " when only reads are expected";
230
0
    }
231
1.66M
    strong_intent_types_ = GetStrongIntentTypeSet(
232
1.66M
        isolation_level_, OperationKind::kWrite, row_mark_);
233
234
    // We cannot recover from failures here, because it means that we cannot apply replicated
235
    // operation.
236
1.66M
    RETURN_NOT_OK(EnumerateIntents(
237
1.66M
        put_batch_.write_pairs(), std::ref(*this), partial_range_key_intents_));
238
1.66M
  }
239
240
2.30M
  if (!put_batch_.read_pairs().empty()) {
241
852k
    strong_intent_types_ = GetStrongIntentTypeSet(
242
852k
        isolation_level_, OperationKind::kRead, row_mark_);
243
852k
    RETURN_NOT_OK(EnumerateIntents(
244
852k
        put_batch_.read_pairs(), std::ref(*this), partial_range_key_intents_));
245
852k
  }
246
247
2.30M
  return Finish();
248
2.30M
}
249
250
// Using operator() to pass this object conveniently to EnumerateIntents.
251
CHECKED_STATUS TransactionalWriter::operator()(
252
    IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key,
253
193M
    LastKey last_key) {
254
193M
  if (intent_strength == IntentStrength::kWeak) {
255
129M
    weak_intents_[key->data()] |= StrongToWeak(strong_intent_types_);
256
129M
    return Status::OK();
257
129M
  }
258
259
64.4M
  const auto transaction_value_type = ValueTypeAsChar::kTransactionId;
260
64.4M
  const auto write_id_value_type = ValueTypeAsChar::kWriteId;
261
64.4M
  const auto row_lock_value_type = ValueTypeAsChar::kRowLock;
262
64.4M
  IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(intra_txn_write_id_);
263
264
64.4M
  const auto subtransaction_value_type = ValueTypeAsChar::kSubTransactionId;
265
64.4M
  SubTransactionId big_endian_subtxn_id;
266
64.4M
  Slice subtransaction_marker;
267
64.4M
  Slice subtransaction_id;
268
64.4M
  if (subtransaction_id_ > kMinSubTransactionId) {
269
29.8k
    subtransaction_marker = Slice(&subtransaction_value_type, 1);
270
29.8k
    big_endian_subtxn_id = BigEndian::FromHost32(subtransaction_id_);
271
29.8k
    subtransaction_id = Slice::FromPod(&big_endian_subtxn_id);
272
64.4M
  } else {
273
64.4M
    DCHECK_EQ(subtransaction_id_, kMinSubTransactionId);
274
64.4M
  }
275
276
64.4M
  std::array<Slice, 7> value = {{
277
64.4M
      Slice(&transaction_value_type, 1),
278
64.4M
      transaction_id_.AsSlice(),
279
64.4M
      subtransaction_marker,
280
64.4M
      subtransaction_id,
281
64.4M
      Slice(&write_id_value_type, 1),
282
64.4M
      Slice::FromPod(&big_endian_write_id),
283
64.4M
      value_slice,
284
64.4M
  }};
285
  // Store a row lock indicator rather than data (in value_slice) for row lock intents.
286
64.4M
  if (IsValidRowMarkType(row_mark_)) {
287
242k
    value.back() = Slice(&row_lock_value_type, 1);
288
242k
  }
289
290
64.4M
  ++intra_txn_write_id_;
291
292
64.4M
  char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet,
293
64.4M
                          static_cast<char>(strong_intent_types_.ToUIntPtr()) };
294
295
64.4M
  DocHybridTimeBuffer doc_ht_buffer;
296
297
64.4M
  constexpr size_t kNumKeyParts = 3;
298
64.4M
  std::array<Slice, kNumKeyParts> key_parts = {{
299
64.4M
      key->AsSlice(),
300
64.4M
      Slice(intent_type, 2),
301
64.4M
      doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++),
302
64.4M
  }};
303
304
64.4M
  Slice reverse_value_prefix;
305
64.4M
  if (last_key && 
FLAGS_enable_transaction_sealing2.51M
) {
306
0
    reverse_value_prefix = replicated_batches_state_;
307
0
  }
308
64.4M
  AddIntent<kNumKeyParts>(transaction_id_, key_parts, value, handler_, reverse_value_prefix);
309
310
64.4M
  return Status::OK();
311
193M
}
312
313
2.31M
CHECKED_STATUS TransactionalWriter::Finish() {
314
2.31M
  char transaction_id_value_type = ValueTypeAsChar::kTransactionId;
315
316
2.31M
  DocHybridTimeBuffer doc_ht_buffer;
317
318
2.31M
  std::array<Slice, 2> value = {{
319
2.31M
      Slice(&transaction_id_value_type, 1),
320
2.31M
      transaction_id_.AsSlice(),
321
2.31M
  }};
322
323
2.31M
  if (PREDICT_FALSE(FLAGS_TEST_docdb_sort_weak_intents)) {
324
    // This is done in tests when deterministic DocDB state is required.
325
20
    std::vector<std::pair<KeyBuffer, IntentTypeSet>> intents_and_types(
326
20
        weak_intents_.begin(), weak_intents_.end());
327
20
    sort(intents_and_types.begin(), intents_and_types.end());
328
54
    for (const auto& intent_and_types : intents_and_types) {
329
54
      RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer));
330
54
    }
331
20
    return Status::OK();
332
20
  }
333
334
22.0M
  
for (const auto& intent_and_types : weak_intents_)2.31M
{
335
22.0M
    RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer));
336
22.0M
  }
337
338
2.31M
  return Status::OK();
339
2.31M
}
340
341
CHECKED_STATUS TransactionalWriter::AddWeakIntent(
342
    const std::pair<KeyBuffer, IntentTypeSet>& intent_and_types,
343
    const std::array<Slice, 2>& value,
344
22.0M
    DocHybridTimeBuffer* doc_ht_buffer) {
345
22.0M
  char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet,
346
22.0M
                          static_cast<char>(intent_and_types.second.ToUIntPtr()) };
347
22.0M
  constexpr size_t kNumKeyParts = 3;
348
22.0M
  std::array<Slice, kNumKeyParts> key = {{
349
22.0M
      intent_and_types.first.AsSlice(),
350
22.0M
      Slice(intent_type, 2),
351
22.0M
      doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++),
352
22.0M
  }};
353
354
22.0M
  AddIntent<kNumKeyParts>(transaction_id_, key, value, handler_);
355
356
22.0M
  return Status::OK();
357
22.0M
}
358
359
278M
DocHybridTimeBuffer::DocHybridTimeBuffer() {
360
278M
  buffer_[0] = ValueTypeAsChar::kHybridTime;
361
278M
}
362
363
IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id)
364
    : transaction_id_(transaction_id),
365
2.97M
      left_records_(FLAGS_txn_max_apply_batch_records) {
366
2.97M
}
367
368
IntentsWriter::IntentsWriter(const Slice& start_key,
369
                             rocksdb::DB* intents_db,
370
                             IntentsWriterContext* context)
371
2.97M
    : start_key_(start_key), intents_db_(intents_db), context_(*context) {
372
2.97M
  AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix_);
373
2.97M
  txn_reverse_index_prefix_.AppendValueType(ValueType::kMaxByte);
374
2.97M
  reverse_index_upperbound_ = txn_reverse_index_prefix_.AsSlice();
375
2.97M
  reverse_index_iter_ = CreateRocksDBIterator(
376
2.97M
      intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
377
2.97M
      rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound_);
378
2.97M
}
379
380
2.97M
CHECKED_STATUS IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) {
381
2.97M
  Slice key_prefix = txn_reverse_index_prefix_.AsSlice();
382
2.97M
  key_prefix.remove_suffix(1);
383
384
2.97M
  DocHybridTimeBuffer doc_ht_buffer;
385
386
2.97M
  reverse_index_iter_.Seek(start_key_.empty() ? 
key_prefix2.96M
:
start_key_5.65k
);
387
388
2.97M
  context_.Start(
389
18.4E
      
reverse_index_iter_.Valid()2.97M
?
boost::make_optional(reverse_index_iter_.key())2.97M
: boost::none);
390
391
153M
  while (reverse_index_iter_.Valid()) {
392
150M
    const Slice key_slice(reverse_index_iter_.key());
393
394
150M
    if (!key_slice.starts_with(key_prefix)) {
395
0
      break;
396
0
    }
397
398
150M
    auto reverse_index_value = reverse_index_iter_.value();
399
400
150M
    bool metadata = key_slice.size() == 1 + TransactionId::StaticSize();
401
    // At this point, txn_reverse_index_prefix is a prefix of key_slice. If key_slice is equal to
402
    // txn_reverse_index_prefix in size, then they are identical, and we are seeked to transaction
403
    // metadata. Otherwise, we're seeked to an intent entry in the index which we may process.
404
150M
    if (!metadata) {
405
147M
      if (!reverse_index_value.empty() && 
reverse_index_value[0] == ValueTypeAsChar::kBitSet147M
) {
406
0
        CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record);
407
0
        reverse_index_value.remove_prefix(1);
408
0
        RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value));
409
0
      }
410
147M
    }
411
412
150M
    if (VERIFY_RESULT(context_.Entry(key_slice, reverse_index_value, metadata, handler))) {
413
820
      return Status::OK();
414
820
    }
415
416
150M
    reverse_index_iter_.Next();
417
150M
  }
418
419
2.97M
  context_.Complete(handler);
420
421
2.97M
  return Status::OK();
422
2.97M
}
423
424
ApplyIntentsContext::ApplyIntentsContext(
425
    const TransactionId& transaction_id,
426
    const ApplyTransactionState* apply_state,
427
    const AbortedSubTransactionSet& aborted,
428
    HybridTime commit_ht,
429
    HybridTime log_ht,
430
    const KeyBounds* key_bounds,
431
    rocksdb::DB* intents_db)
432
    : IntentsWriterContext(transaction_id),
433
      apply_state_(apply_state),
434
      // In case we have passed in a non-null apply_state, it's aborted set will have been loaded
435
      // from persisted apply state, and the passed in aborted set will correspond to the aborted
436
      // set at commit time. Rather then copy that set upstream so it is passed in as aborted, we
437
      // simply grab a reference to it here, if it is defined, to use in this method.
438
      aborted_(apply_state ? apply_state->aborted : aborted),
439
      commit_ht_(commit_ht),
440
      log_ht_(log_ht),
441
      write_id_(apply_state ? apply_state->write_id : 0),
442
      key_bounds_(key_bounds),
443
      intent_iter_(CreateRocksDBIterator(
444
          intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
445
1.30M
          rocksdb::kDefaultQueryId)) {
446
1.30M
}
447
448
Result<bool> ApplyIntentsContext::StoreApplyState(
449
183
    const Slice& key, rocksdb::DirectWriteHandler* handler) {
450
183
  SetApplyState(key, write_id_, aborted_);
451
183
  ApplyTransactionStatePB pb;
452
183
  apply_state().ToPB(&pb);
453
183
  pb.set_commit_ht(commit_ht_.ToUint64());
454
183
  faststring encoded_pb;
455
183
  pb_util::SerializeToString(pb, &encoded_pb);
456
183
  char string_value_type = ValueTypeAsChar::kString;
457
183
  std::array<Slice, 2> value_parts = {{
458
183
    Slice(&string_value_type, 1),
459
183
    Slice(encoded_pb.data(), encoded_pb.size())
460
183
  }};
461
183
  PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler);
462
183
  return true;
463
183
}
464
465
1.30M
void ApplyIntentsContext::Start(const boost::optional<Slice>& first_key) {
466
1.30M
  if (
!apply_state_1.30M
) {
467
1.30M
    return;
468
1.30M
  }
469
  // This sanity check is invalid for remove case, because .SST file could be deleted.
470
18.4E
  LOG_IF(DFATAL, !first_key || *first_key != apply_state_->key)
471
18.4E
      << "Continue from wrong key: " << Slice(apply_state_->key).ToDebugString() << ", txn: "
472
18.4E
      << transaction_id() << ", position: "
473
18.4E
      << (first_key ? 
first_key->ToDebugString()0
: "<INVALID>")
474
18.4E
      << ", write id: " << apply_state_->write_id;
475
18.4E
}
476
477
Result<bool> ApplyIntentsContext::Entry(
478
77.7M
    const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) {
479
  // Value of reverse index is a key of original intent record, so seek it and check match.
480
77.7M
  if (metadata || 
!IsWithinBounds(key_bounds_, value)76.4M
) {
481
1.30M
    return false;
482
1.30M
  }
483
484
  // We store apply state only if there are some more intents left.
485
  // So doing this check here, instead of right after write_id was incremented.
486
76.4M
  if (reached_records_limit()) {
487
183
    return StoreApplyState(key, handler);
488
183
  }
489
490
76.4M
  DocHybridTimeBuffer doc_ht_buffer;
491
76.4M
  intent_iter_.Seek(value);
492
76.5M
  if (
!intent_iter_.Valid()76.4M
|| intent_iter_.key() != value) {
493
0
    Slice temp_slice = value;
494
0
    auto value_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice);
495
0
    temp_slice = key;
496
0
    auto key_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice);
497
0
    LOG(DFATAL) << "Unable to find intent: " << value.ToDebugHexString() << " ("
498
0
                << value_doc_ht << ") for " << key.ToDebugHexString() << "(" << key_doc_ht << ")";
499
0
    return false;
500
0
  }
501
502
76.4M
  auto intent = VERIFY_RESULT(ParseIntentKey(value, transaction_id().AsSlice()));
503
504
76.4M
  if (intent.types.Test(IntentType::kStrongWrite)) {
505
52.7M
    const Slice transaction_id_slice = transaction_id().AsSlice();
506
52.7M
    auto decoded_value = VERIFY_RESULT(DecodeIntentValue(
507
52.7M
        intent_iter_.value(), &transaction_id_slice));
508
509
    // Write id should match to one that were calculated during append of intents.
510
    // Doing it just for sanity check.
511
52.7M
    RSTATUS_DCHECK_GE(
512
52.7M
        decoded_value.write_id, write_id_,
513
52.7M
        Corruption,
514
52.7M
        Format("Unexpected write id. Expected: $0, found: $1, raw value: $2",
515
52.7M
               write_id_,
516
52.7M
               decoded_value.write_id,
517
52.7M
               intent_iter_.value().ToDebugHexString()));
518
52.7M
    write_id_ = decoded_value.write_id;
519
520
    // Intents for row locks should be ignored (i.e. should not be written as regular records).
521
52.7M
    if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) {
522
13.2k
      return false;
523
13.2k
    }
524
525
    // Intents from aborted subtransactions should not be written as regular records.
526
52.7M
    if (aborted_.Test(decoded_value.subtransaction_id)) {
527
2.83k
      return false;
528
2.83k
    }
529
530
    // After strip of prefix and suffix intent_key contains just SubDocKey w/o a hybrid time.
531
    // Time will be added when writing batch to RocksDB.
532
52.7M
    std::array<Slice, 2> key_parts = {{
533
52.7M
        intent.doc_path,
534
52.7M
        doc_ht_buffer.EncodeWithValueType(commit_ht_, write_id_),
535
52.7M
    }};
536
52.7M
    std::array<Slice, 2> value_parts = {{
537
52.7M
        intent.doc_ht,
538
52.7M
        decoded_value.body,
539
52.7M
    }};
540
541
    // Useful when debugging transaction failure.
542
#if defined(DUMP_APPLY)
543
    SubDocKey sub_doc_key;
544
    CHECK_OK(sub_doc_key.FullyDecodeFrom(intent.doc_path, HybridTimeRequired::kFalse));
545
    if (!sub_doc_key.subkeys().empty()) {
546
      auto txn_id = FullyDecodeTransactionId(transaction_id_slice);
547
      LOG(INFO) << "Apply: " << sub_doc_key.ToString()
548
                << ", time: " << commit_ht << ", write id: " << *write_id << ", txn: " << txn_id
549
                << ", value: " << intent_value.ToDebugString();
550
    }
551
#endif
552
553
52.7M
    handler->Put(key_parts, value_parts);
554
52.7M
    ++write_id_;
555
52.7M
    RegisterRecord();
556
52.7M
  }
557
558
76.4M
  return false;
559
76.4M
}
560
561
1.30M
void ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) {
562
1.30M
  if (apply_state_) {
563
60
    char tombstone_value_type = ValueTypeAsChar::kTombstone;
564
60
    std::array<Slice, 1> value_parts = {{Slice(&tombstone_value_type, 1)}};
565
60
    PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler);
566
60
  }
567
1.30M
}
568
569
RemoveIntentsContext::RemoveIntentsContext(const TransactionId& transaction_id)
570
1.67M
    : IntentsWriterContext(transaction_id) {
571
1.67M
}
572
573
Result<bool> RemoveIntentsContext::Entry(
574
72.9M
    const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) {
575
72.9M
  if (reached_records_limit()) {
576
637
    SetApplyState(key, 0, AbortedSubTransactionSet());
577
637
    return true;
578
637
  }
579
580
72.9M
  handler->SingleDelete(key);
581
72.9M
  RegisterRecord();
582
72.9M
  if (!metadata) {
583
71.3M
    handler->SingleDelete(value);
584
71.3M
    RegisterRecord();
585
71.3M
  }
586
72.9M
  return false;
587
72.9M
}
588
589
1.67M
void RemoveIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) {
590
1.67M
}
591
592
} // namespace docdb
593
} // namespace yb