/Users/deen/code/yugabyte-db/src/yb/docdb/rocksdb_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/rocksdb_writer.h" |
15 | | |
16 | | #include "yb/common/row_mark.h" |
17 | | |
18 | | #include "yb/docdb/conflict_resolution.h" |
19 | | #include "yb/docdb/doc_key.h" |
20 | | #include "yb/docdb/doc_kv_util.h" |
21 | | #include "yb/docdb/docdb.pb.h" |
22 | | #include "yb/docdb/docdb_rocksdb_util.h" |
23 | | #include "yb/docdb/intent.h" |
24 | | #include "yb/docdb/value_type.h" |
25 | | |
26 | | #include "yb/gutil/walltime.h" |
27 | | |
28 | | #include "yb/util/bitmap.h" |
29 | | #include "yb/util/flag_tags.h" |
30 | | #include "yb/util/pb_util.h" |
31 | | |
32 | | DEFINE_bool(enable_transaction_sealing, false, |
33 | | "Whether transaction sealing is enabled."); |
34 | | DEFINE_int32(txn_max_apply_batch_records, 100000, |
35 | | "Max number of apply records allowed in single RocksDB batch. " |
36 | | "When a transaction's data in one tablet does not fit into specified number of " |
37 | | "records, it will be applied using multiple RocksDB write batches."); |
38 | | |
39 | | DEFINE_test_flag(bool, docdb_sort_weak_intents, false, |
40 | | "Sort weak intents to make their order deterministic."); |
41 | | DEFINE_test_flag(bool, fail_on_replicated_batch_idx_set_in_txn_record, false, |
42 | | "Fail when a set of replicated batch indexes is found in txn record."); |
43 | | |
44 | | namespace yb { |
45 | | namespace docdb { |
46 | | |
47 | | namespace { |
48 | | |
49 | | // Slice parts with the number of slices fixed at compile time. |
50 | | template <int N> |
51 | | struct FixedSliceParts { |
52 | 27.9M | FixedSliceParts(const std::array<Slice, N>& input) : parts(input.data()) { // NOLINT |
53 | 27.9M | } |
54 | | |
55 | 55.8M | operator SliceParts() const { |
56 | 55.8M | return SliceParts(parts, N); |
57 | 55.8M | } |
58 | | |
59 | | const Slice* parts; |
60 | | }; |
61 | | |
62 | | // Main intent data:: |
63 | | // Prefix + DocPath + IntentType + DocHybridTime -> TxnId + value of the intent |
64 | | // Reverse index by txn id: |
65 | | // Prefix + TxnId + DocHybridTime -> Main intent data key |
66 | | // |
67 | | // Expects that last entry of key is DocHybridTime. |
68 | | template <int N> |
69 | | void AddIntent( |
70 | | const TransactionId& transaction_id, |
71 | | const FixedSliceParts<N>& key, |
72 | | const SliceParts& value, |
73 | | rocksdb::DirectWriteHandler* handler, |
74 | 27.9M | Slice reverse_value_prefix = Slice()) { |
75 | 27.9M | char reverse_key_prefix[1] = { ValueTypeAsChar::kTransactionId }; |
76 | 27.9M | DocHybridTimeWordBuffer doc_ht_buffer; |
77 | 27.9M | auto doc_ht_slice = InvertEncodedDocHT(key.parts[N - 1], &doc_ht_buffer); |
78 | | |
79 | 27.9M | std::array<Slice, 3> reverse_key = {{ |
80 | 27.9M | Slice(reverse_key_prefix, sizeof(reverse_key_prefix)), |
81 | 27.9M | transaction_id.AsSlice(), |
82 | 27.9M | doc_ht_slice, |
83 | 27.9M | }}; |
84 | 27.9M | handler->Put(key, value); |
85 | 27.9M | if (reverse_value_prefix.empty()) { |
86 | 27.9M | handler->Put(reverse_key, key); |
87 | 3.96k | } else { |
88 | 3.96k | std::array<Slice, N + 1> reverse_value; |
89 | 3.96k | reverse_value[0] = reverse_value_prefix; |
90 | 3.96k | memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N); |
91 | 3.96k | handler->Put(reverse_key, reverse_value); |
92 | 3.96k | } |
93 | 27.9M | } |
94 | | |
95 | | template <size_t N> |
96 | | void PutApplyState( |
97 | | const Slice& transaction_id_slice, HybridTime commit_ht, IntraTxnWriteId write_id, |
98 | 2 | const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) { |
99 | 2 | char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState; |
100 | 2 | char group_end_value_type = ValueTypeAsChar::kGroupEnd; |
101 | 2 | char hybrid_time_value_type = ValueTypeAsChar::kHybridTime; |
102 | 2 | DocHybridTime doc_hybrid_time(commit_ht, write_id); |
103 | 2 | char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime]; |
104 | 2 | char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat( |
105 | 2 | doc_hybrid_time_buffer); |
106 | 2 | std::array<Slice, 5> key_parts = {{ |
107 | 2 | Slice(&transaction_apply_state_value_type, 1), |
108 | 2 | transaction_id_slice, |
109 | 2 | Slice(&group_end_value_type, 1), |
110 | 2 | Slice(&hybrid_time_value_type, 1), |
111 | 2 | Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), |
112 | 2 | }}; |
113 | 2 | handler->Put(key_parts, value_parts); |
114 | 2 | } rocksdb_writer.cc:_ZN2yb5docdb12_GLOBAL__N_113PutApplyStateILm2EEEvRKNS_5SliceENS_10HybridTimeEjRKNSt3__15arrayIS3_XT_EEEPN7rocksdb18DirectWriteHandlerE Line | Count | Source | 98 | 1 | const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) { | 99 | 1 | char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState; | 100 | 1 | char group_end_value_type = ValueTypeAsChar::kGroupEnd; | 101 | 1 | char hybrid_time_value_type = ValueTypeAsChar::kHybridTime; | 102 | 1 | DocHybridTime doc_hybrid_time(commit_ht, write_id); | 103 | 1 | char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime]; | 104 | 1 | char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat( | 105 | 1 | doc_hybrid_time_buffer); | 106 | 1 | std::array<Slice, 5> key_parts = {{ | 107 | 1 | Slice(&transaction_apply_state_value_type, 1), | 108 | 1 | transaction_id_slice, | 109 | 1 | Slice(&group_end_value_type, 1), | 110 | 1 | Slice(&hybrid_time_value_type, 1), | 111 | 1 | Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), | 112 | 1 | }}; | 113 | 1 | handler->Put(key_parts, value_parts); | 114 | 1 | } |
rocksdb_writer.cc:_ZN2yb5docdb12_GLOBAL__N_113PutApplyStateILm1EEEvRKNS_5SliceENS_10HybridTimeEjRKNSt3__15arrayIS3_XT_EEEPN7rocksdb18DirectWriteHandlerE Line | Count | Source | 98 | 1 | const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) { | 99 | 1 | char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState; | 100 | 1 | char group_end_value_type = ValueTypeAsChar::kGroupEnd; | 101 | 1 | char hybrid_time_value_type = ValueTypeAsChar::kHybridTime; | 102 | 1 | DocHybridTime doc_hybrid_time(commit_ht, write_id); | 103 | 1 | char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime]; | 104 | 1 | char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat( | 105 | 1 | doc_hybrid_time_buffer); | 106 | 1 | std::array<Slice, 5> key_parts = {{ | 107 | 1 | Slice(&transaction_apply_state_value_type, 1), | 108 | 1 | transaction_id_slice, | 109 | 1 | Slice(&group_end_value_type, 1), | 110 | 1 | Slice(&hybrid_time_value_type, 1), | 111 | 1 | Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), | 112 | 1 | }}; | 113 | 1 | handler->Put(key_parts, value_parts); | 114 | 1 | } |
|
115 | | |
116 | | } // namespace |
117 | | |
118 | | NonTransactionalWriter::NonTransactionalWriter( |
119 | | std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch, HybridTime hybrid_time) |
120 | 3.42M | : put_batch_(put_batch), hybrid_time_(hybrid_time) { |
121 | 3.42M | } |
122 | | |
123 | 0 | bool NonTransactionalWriter::Empty() const { |
124 | 0 | return put_batch_.write_pairs().empty(); |
125 | 0 | } |
126 | | |
127 | 3.42M | Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { |
128 | 3.42M | DocHybridTimeBuffer doc_ht_buffer; |
129 | | |
130 | 3.42M | int write_id = 0; |
131 | 39.5M | for (const auto& kv_pair : put_batch_.write_pairs()) { |
132 | | |
133 | 39.5M | CHECK(!kv_pair.key().empty()); |
134 | 39.5M | CHECK(!kv_pair.value().empty()); |
135 | | |
136 | 39.5M | if (kv_pair.key()[0] == ValueTypeAsChar::kExternalTransactionId) { |
137 | 0 | continue; |
138 | 0 | } |
139 | | |
140 | 39.5M | #ifndef NDEBUG |
141 | | // Debug-only: ensure all keys we get in Raft replication can be decoded. |
142 | 39.5M | SubDocKey subdoc_key; |
143 | 39.5M | Status s = subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(kv_pair.key()); |
144 | 18.4E | CHECK(s.ok()) |
145 | 18.4E | << "Failed decoding key: " << s.ToString() << "; " |
146 | 18.4E | << "Problematic key: " << BestEffortDocDBKeyToStr(KeyBytes(kv_pair.key())) << "\n" |
147 | 18.4E | << "value: " << FormatBytesAsStr(kv_pair.value()); |
148 | 39.5M | #endif |
149 | | |
150 | | // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here. |
151 | | // The reason for this is that the HybridTime timestamp is only picked at the time of |
152 | | // appending an entry to the tablet's Raft log. Also this is a good way to save network |
153 | | // bandwidth. |
154 | | // |
155 | | // "Write id" is the final component of our HybridTime encoding (or, to be more precise, |
156 | | // DocHybridTime encoding) that helps disambiguate between different updates to the |
157 | | // same key (row/column) within a transaction. We set it based on the position of the write |
158 | | // operation in its write batch. |
159 | | |
160 | 39.5M | auto hybrid_time = kv_pair.has_external_hybrid_time() ? |
161 | 39.5M | HybridTime(kv_pair.external_hybrid_time()) : hybrid_time_; |
162 | 39.5M | std::array<Slice, 2> key_parts = {{ |
163 | 39.5M | Slice(kv_pair.key()), |
164 | 39.5M | doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id), |
165 | 39.5M | }}; |
166 | 39.5M | Slice key_value = kv_pair.value(); |
167 | 39.5M | handler->Put(key_parts, SliceParts(&key_value, 1)); |
168 | | |
169 | 39.5M | ++write_id; |
170 | 39.5M | } |
171 | | |
172 | 3.42M | return Status::OK(); |
173 | 3.42M | } |
174 | | |
175 | | TransactionalWriter::TransactionalWriter( |
176 | | std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch, |
177 | | HybridTime hybrid_time, |
178 | | const TransactionId& transaction_id, |
179 | | IsolationLevel isolation_level, |
180 | | PartialRangeKeyIntents partial_range_key_intents, |
181 | | const Slice& replicated_batches_state, |
182 | | IntraTxnWriteId intra_txn_write_id) |
183 | | : put_batch_(put_batch), |
184 | | hybrid_time_(hybrid_time), |
185 | | transaction_id_(transaction_id), |
186 | | isolation_level_(isolation_level), |
187 | | partial_range_key_intents_(partial_range_key_intents), |
188 | | replicated_batches_state_(replicated_batches_state), |
189 | 1.26M | intra_txn_write_id_(intra_txn_write_id) { |
190 | 1.26M | } |
191 | | |
192 | | // We have the following distinct types of data in this "intent store": |
193 | | // Main intent data: |
194 | | // Prefix + SubDocKey (no HybridTime) + IntentType + HybridTime -> TxnId + value of the intent |
195 | | // Transaction metadata |
196 | | // TxnId -> status tablet id + isolation level |
197 | | // Reverse index by txn id |
198 | | // TxnId + HybridTime -> Main intent data key |
199 | | // |
200 | | // Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with |
201 | | // appropriate value type. |
202 | 1.26M | CHECKED_STATUS TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { |
203 | 3.53k | VLOG(4) << "PrepareTransactionWriteBatch(), write_id = " << write_id_; |
204 | | |
205 | 1.26M | row_mark_ = GetRowMarkTypeFromPB(put_batch_); |
206 | 1.26M | handler_ = handler; |
207 | | |
208 | 1.26M | if (metadata_to_store_) { |
209 | 967k | auto txn_value_type = ValueTypeAsChar::kTransactionId; |
210 | 967k | std::array<Slice, 2> key = { |
211 | 967k | Slice(&txn_value_type, 1), |
212 | 967k | transaction_id_.AsSlice(), |
213 | 967k | }; |
214 | 967k | auto data_copy = *metadata_to_store_; |
215 | | // We use hybrid time only for backward compatibility, actually wall time is required. |
216 | 967k | data_copy.set_metadata_write_time(GetCurrentTimeMicros()); |
217 | 967k | auto value = data_copy.SerializeAsString(); |
218 | 967k | Slice value_slice(value); |
219 | 967k | handler->Put(key, SliceParts(&value_slice, 1)); |
220 | 967k | } |
221 | | |
222 | 1.26M | subtransaction_id_ = put_batch_.has_subtransaction() |
223 | 267 | ? put_batch_.subtransaction().subtransaction_id() |
224 | 1.26M | : kMinSubTransactionId; |
225 | | |
226 | 1.26M | if (!put_batch_.write_pairs().empty()) { |
227 | 980k | if (IsValidRowMarkType(row_mark_)) { |
228 | 0 | LOG(WARNING) << "Performing a write with row lock " << RowMarkType_Name(row_mark_) |
229 | 0 | << " when only reads are expected"; |
230 | 0 | } |
231 | 980k | strong_intent_types_ = GetStrongIntentTypeSet( |
232 | 980k | isolation_level_, OperationKind::kWrite, row_mark_); |
233 | | |
234 | | // We cannot recover from failures here, because it means that we cannot apply replicated |
235 | | // operation. |
236 | 980k | RETURN_NOT_OK(EnumerateIntents( |
237 | 980k | put_batch_.write_pairs(), std::ref(*this), partial_range_key_intents_)); |
238 | 980k | } |
239 | | |
240 | 1.26M | if (!put_batch_.read_pairs().empty()) { |
241 | 368k | strong_intent_types_ = GetStrongIntentTypeSet( |
242 | 368k | isolation_level_, OperationKind::kRead, row_mark_); |
243 | 368k | RETURN_NOT_OK(EnumerateIntents( |
244 | 368k | put_batch_.read_pairs(), std::ref(*this), partial_range_key_intents_)); |
245 | 368k | } |
246 | | |
247 | 1.26M | return Finish(); |
248 | 1.26M | } |
249 | | |
250 | | // Using operator() to pass this object conveniently to EnumerateIntents. |
251 | | CHECKED_STATUS TransactionalWriter::operator()( |
252 | | IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key, |
253 | 60.3M | LastKey last_key) { |
254 | 60.3M | if (intent_strength == IntentStrength::kWeak) { |
255 | 39.9M | weak_intents_[key->data()] |= StrongToWeak(strong_intent_types_); |
256 | 39.9M | return Status::OK(); |
257 | 39.9M | } |
258 | | |
259 | 20.4M | const auto transaction_value_type = ValueTypeAsChar::kTransactionId; |
260 | 20.4M | const auto write_id_value_type = ValueTypeAsChar::kWriteId; |
261 | 20.4M | const auto row_lock_value_type = ValueTypeAsChar::kRowLock; |
262 | 20.4M | IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(intra_txn_write_id_); |
263 | | |
264 | 20.4M | const auto subtransaction_value_type = ValueTypeAsChar::kSubTransactionId; |
265 | 20.4M | SubTransactionId big_endian_subtxn_id; |
266 | 20.4M | Slice subtransaction_marker; |
267 | 20.4M | Slice subtransaction_id; |
268 | 20.4M | if (subtransaction_id_ > kMinSubTransactionId) { |
269 | 1.52k | subtransaction_marker = Slice(&subtransaction_value_type, 1); |
270 | 1.52k | big_endian_subtxn_id = BigEndian::FromHost32(subtransaction_id_); |
271 | 1.52k | subtransaction_id = Slice::FromPod(&big_endian_subtxn_id); |
272 | 20.4M | } else { |
273 | 20.4M | DCHECK_EQ(subtransaction_id_, kMinSubTransactionId); |
274 | 20.4M | } |
275 | | |
276 | 20.4M | std::array<Slice, 7> value = {{ |
277 | 20.4M | Slice(&transaction_value_type, 1), |
278 | 20.4M | transaction_id_.AsSlice(), |
279 | 20.4M | subtransaction_marker, |
280 | 20.4M | subtransaction_id, |
281 | 20.4M | Slice(&write_id_value_type, 1), |
282 | 20.4M | Slice::FromPod(&big_endian_write_id), |
283 | 20.4M | value_slice, |
284 | 20.4M | }}; |
285 | | // Store a row lock indicator rather than data (in value_slice) for row lock intents. |
286 | 20.4M | if (IsValidRowMarkType(row_mark_)) { |
287 | 227k | value.back() = Slice(&row_lock_value_type, 1); |
288 | 227k | } |
289 | | |
290 | 20.4M | ++intra_txn_write_id_; |
291 | | |
292 | 20.4M | char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, |
293 | 20.4M | static_cast<char>(strong_intent_types_.ToUIntPtr()) }; |
294 | | |
295 | 20.4M | DocHybridTimeBuffer doc_ht_buffer; |
296 | | |
297 | 20.4M | constexpr size_t kNumKeyParts = 3; |
298 | 20.4M | std::array<Slice, kNumKeyParts> key_parts = {{ |
299 | 20.4M | key->AsSlice(), |
300 | 20.4M | Slice(intent_type, 2), |
301 | 20.4M | doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++), |
302 | 20.4M | }}; |
303 | | |
304 | 20.4M | Slice reverse_value_prefix; |
305 | 20.4M | if (last_key && FLAGS_enable_transaction_sealing) { |
306 | 0 | reverse_value_prefix = replicated_batches_state_; |
307 | 0 | } |
308 | 20.4M | AddIntent<kNumKeyParts>(transaction_id_, key_parts, value, handler_, reverse_value_prefix); |
309 | | |
310 | 20.4M | return Status::OK(); |
311 | 20.4M | } |
312 | | |
313 | 1.26M | CHECKED_STATUS TransactionalWriter::Finish() { |
314 | 1.26M | char transaction_id_value_type = ValueTypeAsChar::kTransactionId; |
315 | | |
316 | 1.26M | DocHybridTimeBuffer doc_ht_buffer; |
317 | | |
318 | 1.26M | std::array<Slice, 2> value = {{ |
319 | 1.26M | Slice(&transaction_id_value_type, 1), |
320 | 1.26M | transaction_id_.AsSlice(), |
321 | 1.26M | }}; |
322 | | |
323 | 1.26M | if (PREDICT_FALSE(FLAGS_TEST_docdb_sort_weak_intents)) { |
324 | | // This is done in tests when deterministic DocDB state is required. |
325 | 0 | std::vector<std::pair<KeyBuffer, IntentTypeSet>> intents_and_types( |
326 | 0 | weak_intents_.begin(), weak_intents_.end()); |
327 | 0 | sort(intents_and_types.begin(), intents_and_types.end()); |
328 | 0 | for (const auto& intent_and_types : intents_and_types) { |
329 | 0 | RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer)); |
330 | 0 | } |
331 | 0 | return Status::OK(); |
332 | 1.26M | } |
333 | | |
334 | 7.49M | for (const auto& intent_and_types : weak_intents_) { |
335 | 7.49M | RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer)); |
336 | 7.49M | } |
337 | | |
338 | 1.26M | return Status::OK(); |
339 | 1.26M | } |
340 | | |
341 | | CHECKED_STATUS TransactionalWriter::AddWeakIntent( |
342 | | const std::pair<KeyBuffer, IntentTypeSet>& intent_and_types, |
343 | | const std::array<Slice, 2>& value, |
344 | 7.49M | DocHybridTimeBuffer* doc_ht_buffer) { |
345 | 7.49M | char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, |
346 | 7.49M | static_cast<char>(intent_and_types.second.ToUIntPtr()) }; |
347 | 7.49M | constexpr size_t kNumKeyParts = 3; |
348 | 7.49M | std::array<Slice, kNumKeyParts> key = {{ |
349 | 7.49M | intent_and_types.first.AsSlice(), |
350 | 7.49M | Slice(intent_type, 2), |
351 | 7.49M | doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++), |
352 | 7.49M | }}; |
353 | | |
354 | 7.49M | AddIntent<kNumKeyParts>(transaction_id_, key, value, handler_); |
355 | | |
356 | 7.49M | return Status::OK(); |
357 | 7.49M | } |
358 | | |
359 | 92.5M | DocHybridTimeBuffer::DocHybridTimeBuffer() { |
360 | 92.5M | buffer_[0] = ValueTypeAsChar::kHybridTime; |
361 | 92.5M | } |
362 | | |
363 | | IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id) |
364 | | : transaction_id_(transaction_id), |
365 | 1.70M | left_records_(FLAGS_txn_max_apply_batch_records) { |
366 | 1.70M | } |
367 | | |
368 | | IntentsWriter::IntentsWriter(const Slice& start_key, |
369 | | rocksdb::DB* intents_db, |
370 | | IntentsWriterContext* context) |
371 | 1.70M | : start_key_(start_key), intents_db_(intents_db), context_(*context) { |
372 | 1.70M | } |
373 | | |
374 | 1.70M | CHECKED_STATUS IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) { |
375 | 1.70M | KeyBytes txn_reverse_index_prefix; |
376 | 1.70M | AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix); |
377 | 1.70M | txn_reverse_index_prefix.AppendValueType(ValueType::kMaxByte); |
378 | 1.70M | Slice key_prefix = txn_reverse_index_prefix.AsSlice(); |
379 | 1.70M | key_prefix.remove_suffix(1); |
380 | 1.70M | const Slice reverse_index_upperbound = txn_reverse_index_prefix.AsSlice(); |
381 | | |
382 | 1.70M | auto reverse_index_iter = CreateRocksDBIterator( |
383 | 1.70M | intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, |
384 | 1.70M | rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound); |
385 | | |
386 | 1.70M | DocHybridTimeBuffer doc_ht_buffer; |
387 | | |
388 | 1.70M | reverse_index_iter.Seek(start_key_.empty() ? key_prefix : start_key_); |
389 | | |
390 | 1.70M | context_.Start( |
391 | 18.4E | reverse_index_iter.Valid() ? boost::make_optional(reverse_index_iter.key()) : boost::none); |
392 | | |
393 | 57.2M | while (reverse_index_iter.Valid()) { |
394 | 55.5M | const Slice key_slice(reverse_index_iter.key()); |
395 | | |
396 | 55.5M | if (!key_slice.starts_with(key_prefix)) { |
397 | 0 | break; |
398 | 0 | } |
399 | | |
400 | 55.5M | auto reverse_index_value = reverse_index_iter.value(); |
401 | | |
402 | 55.5M | bool metadata = key_slice.size() == 1 + TransactionId::StaticSize(); |
403 | | // At this point, txn_reverse_index_prefix is a prefix of key_slice. If key_slice is equal to |
404 | | // txn_reverse_index_prefix in size, then they are identical, and we are seeked to transaction |
405 | | // metadata. Otherwise, we're seeked to an intent entry in the index which we may process. |
406 | 55.5M | if (!metadata) { |
407 | 53.8M | if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet) { |
408 | 0 | CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record); |
409 | 0 | reverse_index_value.remove_prefix(1); |
410 | 0 | RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value)); |
411 | 0 | } |
412 | 53.8M | } |
413 | | |
414 | 55.5M | if (VERIFY_RESULT(context_.Entry(key_slice, reverse_index_value, metadata, handler))) { |
415 | 41 | return Status::OK(); |
416 | 41 | } |
417 | | |
418 | 55.5M | reverse_index_iter.Next(); |
419 | 55.5M | } |
420 | | |
421 | 1.70M | context_.Complete(handler); |
422 | | |
423 | 1.70M | return Status::OK(); |
424 | 1.70M | } |
425 | | |
426 | | ApplyIntentsContext::ApplyIntentsContext( |
427 | | const TransactionId& transaction_id, |
428 | | const ApplyTransactionState* apply_state, |
429 | | const AbortedSubTransactionSet& aborted, |
430 | | HybridTime commit_ht, |
431 | | HybridTime log_ht, |
432 | | const KeyBounds* key_bounds, |
433 | | rocksdb::DB* intents_db) |
434 | | : IntentsWriterContext(transaction_id), |
435 | | apply_state_(apply_state), |
436 | | // In case we have passed in a non-null apply_state, it's aborted set will have been loaded |
437 | | // from persisted apply state, and the passed in aborted set will correspond to the aborted |
438 | | // set at commit time. Rather then copy that set upstream so it is passed in as aborted, we |
439 | | // simply grab a reference to it here, if it is defined, to use in this method. |
440 | | aborted_(apply_state ? apply_state->aborted : aborted), |
441 | | commit_ht_(commit_ht), |
442 | | log_ht_(log_ht), |
443 | | write_id_(apply_state ? apply_state->write_id : 0), |
444 | | key_bounds_(key_bounds), |
445 | | intent_iter_(CreateRocksDBIterator( |
446 | | intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, |
447 | 737k | rocksdb::kDefaultQueryId)) { |
448 | 737k | } |
449 | | |
450 | | Result<bool> ApplyIntentsContext::StoreApplyState( |
451 | 1 | const Slice& key, rocksdb::DirectWriteHandler* handler) { |
452 | 1 | SetApplyState(key, write_id_, aborted_); |
453 | 1 | ApplyTransactionStatePB pb; |
454 | 1 | apply_state().ToPB(&pb); |
455 | 1 | pb.set_commit_ht(commit_ht_.ToUint64()); |
456 | 1 | faststring encoded_pb; |
457 | 1 | pb_util::SerializeToString(pb, &encoded_pb); |
458 | 1 | char string_value_type = ValueTypeAsChar::kString; |
459 | 1 | std::array<Slice, 2> value_parts = {{ |
460 | 1 | Slice(&string_value_type, 1), |
461 | 1 | Slice(encoded_pb.data(), encoded_pb.size()) |
462 | 1 | }}; |
463 | 1 | PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler); |
464 | 1 | return true; |
465 | 1 | } |
466 | | |
467 | 737k | void ApplyIntentsContext::Start(const boost::optional<Slice>& first_key) { |
468 | 737k | if (!apply_state_) { |
469 | 737k | return; |
470 | 737k | } |
471 | | // This sanity check is invalid for remove case, because .SST file could be deleted. |
472 | 18.4E | LOG_IF(DFATAL, !first_key || *first_key != apply_state_->key) |
473 | 18.4E | << "Continue from wrong key: " << Slice(apply_state_->key).ToDebugString() << ", txn: " |
474 | 18.4E | << transaction_id() << ", position: " |
475 | 18.4E | << (first_key ? first_key->ToDebugString() : "<INVALID>") |
476 | 18.4E | << ", write id: " << apply_state_->write_id; |
477 | 18.4E | } |
478 | | |
479 | | Result<bool> ApplyIntentsContext::Entry( |
480 | 26.8M | const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) { |
481 | | // Value of reverse index is a key of original intent record, so seek it and check match. |
482 | 26.8M | if (metadata || !IsWithinBounds(key_bounds_, value)) { |
483 | 749k | return false; |
484 | 749k | } |
485 | | |
486 | | // We store apply state only if there are some more intents left. |
487 | | // So doing this check here, instead of right after write_id was incremented. |
488 | 26.1M | if (reached_records_limit()) { |
489 | 1 | return StoreApplyState(key, handler); |
490 | 1 | } |
491 | | |
492 | 26.1M | DocHybridTimeBuffer doc_ht_buffer; |
493 | 26.1M | intent_iter_.Seek(value); |
494 | 26.1M | if (!intent_iter_.Valid() || intent_iter_.key() != value) { |
495 | 0 | Slice temp_slice = value; |
496 | 0 | auto value_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice); |
497 | 0 | temp_slice = key; |
498 | 0 | auto key_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice); |
499 | 0 | LOG(DFATAL) << "Unable to find intent: " << value.ToDebugHexString() << " (" |
500 | 0 | << value_doc_ht << ") for " << key.ToDebugHexString() << "(" << key_doc_ht << ")"; |
501 | 0 | return false; |
502 | 0 | } |
503 | | |
504 | 26.1M | auto intent = VERIFY_RESULT(ParseIntentKey(value, transaction_id().AsSlice())); |
505 | | |
506 | 26.1M | if (intent.types.Test(IntentType::kStrongWrite)) { |
507 | 16.8M | const Slice transaction_id_slice = transaction_id().AsSlice(); |
508 | 16.8M | auto decoded_value = VERIFY_RESULT(DecodeIntentValue( |
509 | 16.8M | intent_iter_.value(), &transaction_id_slice)); |
510 | | |
511 | | // Write id should match to one that were calculated during append of intents. |
512 | | // Doing it just for sanity check. |
513 | 16.8M | RSTATUS_DCHECK_GE( |
514 | 16.8M | decoded_value.write_id, write_id_, |
515 | 16.8M | Corruption, |
516 | 16.8M | Format("Unexpected write id. Expected: $0, found: $1, raw value: $2", |
517 | 16.8M | write_id_, |
518 | 16.8M | decoded_value.write_id, |
519 | 16.8M | intent_iter_.value().ToDebugHexString())); |
520 | 16.8M | write_id_ = decoded_value.write_id; |
521 | | |
522 | | // Intents for row locks should be ignored (i.e. should not be written as regular records). |
523 | 16.8M | if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) { |
524 | 4.19k | return false; |
525 | 4.19k | } |
526 | | |
527 | | // Intents from aborted subtransactions should not be written as regular records. |
528 | 16.8M | if (aborted_.Test(decoded_value.subtransaction_id)) { |
529 | 615 | return false; |
530 | 615 | } |
531 | | |
532 | | // After strip of prefix and suffix intent_key contains just SubDocKey w/o a hybrid time. |
533 | | // Time will be added when writing batch to RocksDB. |
534 | 16.8M | std::array<Slice, 2> key_parts = {{ |
535 | 16.8M | intent.doc_path, |
536 | 16.8M | doc_ht_buffer.EncodeWithValueType(commit_ht_, write_id_), |
537 | 16.8M | }}; |
538 | 16.8M | std::array<Slice, 2> value_parts = {{ |
539 | 16.8M | intent.doc_ht, |
540 | 16.8M | decoded_value.body, |
541 | 16.8M | }}; |
542 | | |
543 | | // Useful when debugging transaction failure. |
544 | | #if defined(DUMP_APPLY) |
545 | | SubDocKey sub_doc_key; |
546 | | CHECK_OK(sub_doc_key.FullyDecodeFrom(intent.doc_path, HybridTimeRequired::kFalse)); |
547 | | if (!sub_doc_key.subkeys().empty()) { |
548 | | auto txn_id = FullyDecodeTransactionId(transaction_id_slice); |
549 | | LOG(INFO) << "Apply: " << sub_doc_key.ToString() |
550 | | << ", time: " << commit_ht << ", write id: " << *write_id << ", txn: " << txn_id |
551 | | << ", value: " << intent_value.ToDebugString(); |
552 | | } |
553 | | #endif |
554 | | |
555 | 16.8M | handler->Put(key_parts, value_parts); |
556 | 16.8M | ++write_id_; |
557 | 16.8M | RegisterRecord(); |
558 | 16.8M | } |
559 | | |
560 | 26.1M | return false; |
561 | 26.1M | } |
562 | | |
563 | 737k | void ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { |
564 | 737k | if (apply_state_) { |
565 | 1 | char tombstone_value_type = ValueTypeAsChar::kTombstone; |
566 | 1 | std::array<Slice, 1> value_parts = {{Slice(&tombstone_value_type, 1)}}; |
567 | 1 | PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler); |
568 | 1 | } |
569 | 737k | } |
570 | | |
571 | | RemoveIntentsContext::RemoveIntentsContext(const TransactionId& transaction_id) |
572 | 968k | : IntentsWriterContext(transaction_id) { |
573 | 968k | } |
574 | | |
575 | | Result<bool> RemoveIntentsContext::Entry( |
576 | 28.7M | const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) { |
577 | 28.7M | if (reached_records_limit()) { |
578 | 40 | SetApplyState(key, 0, AbortedSubTransactionSet()); |
579 | 40 | return true; |
580 | 40 | } |
581 | | |
582 | 28.7M | handler->SingleDelete(key); |
583 | 28.7M | RegisterRecord(); |
584 | 28.7M | if (!metadata) { |
585 | 27.7M | handler->SingleDelete(value); |
586 | 27.7M | RegisterRecord(); |
587 | 27.7M | } |
588 | 28.7M | return false; |
589 | 28.7M | } |
590 | | |
591 | 968k | void RemoveIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { |
592 | 968k | } |
593 | | |
594 | | } // namespace docdb |
595 | | } // namespace yb |