/Users/deen/code/yugabyte-db/src/yb/docdb/rocksdb_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/rocksdb_writer.h" |
15 | | |
16 | | #include "yb/common/row_mark.h" |
17 | | |
18 | | #include "yb/docdb/conflict_resolution.h" |
19 | | #include "yb/docdb/doc_key.h" |
20 | | #include "yb/docdb/doc_kv_util.h" |
21 | | #include "yb/docdb/docdb.pb.h" |
22 | | #include "yb/docdb/docdb_rocksdb_util.h" |
23 | | #include "yb/docdb/intent.h" |
24 | | #include "yb/docdb/value_type.h" |
25 | | |
26 | | #include "yb/gutil/walltime.h" |
27 | | |
28 | | #include "yb/util/bitmap.h" |
29 | | #include "yb/util/flag_tags.h" |
30 | | #include "yb/util/pb_util.h" |
31 | | |
32 | | DEFINE_bool(enable_transaction_sealing, false, |
33 | | "Whether transaction sealing is enabled."); |
34 | | DEFINE_int32(txn_max_apply_batch_records, 100000, |
35 | | "Max number of apply records allowed in single RocksDB batch. " |
36 | | "When a transaction's data in one tablet does not fit into specified number of " |
37 | | "records, it will be applied using multiple RocksDB write batches."); |
38 | | |
39 | | DEFINE_test_flag(bool, docdb_sort_weak_intents, false, |
40 | | "Sort weak intents to make their order deterministic."); |
41 | | DEFINE_test_flag(bool, fail_on_replicated_batch_idx_set_in_txn_record, false, |
42 | | "Fail when a set of replicated batch indexes is found in txn record."); |
43 | | |
44 | | namespace yb { |
45 | | namespace docdb { |
46 | | |
47 | | namespace { |
48 | | |
49 | | // Slice parts with the number of slices fixed at compile time. |
50 | | template <int N> |
51 | | struct FixedSliceParts { |
52 | 86.6M | FixedSliceParts(const std::array<Slice, N>& input) : parts(input.data()) { // NOLINT |
53 | 86.6M | } |
54 | | |
55 | 173M | operator SliceParts() const { |
56 | 173M | return SliceParts(parts, N); |
57 | 173M | } |
58 | | |
59 | | const Slice* parts; |
60 | | }; |
61 | | |
62 | | // Main intent data:: |
63 | | // Prefix + DocPath + IntentType + DocHybridTime -> TxnId + value of the intent |
64 | | // Reverse index by txn id: |
65 | | // Prefix + TxnId + DocHybridTime -> Main intent data key |
66 | | // |
67 | | // Expects that last entry of key is DocHybridTime. |
68 | | template <int N> |
69 | | void AddIntent( |
70 | | const TransactionId& transaction_id, |
71 | | const FixedSliceParts<N>& key, |
72 | | const SliceParts& value, |
73 | | rocksdb::DirectWriteHandler* handler, |
74 | 86.6M | Slice reverse_value_prefix = Slice()) { |
75 | 86.6M | char reverse_key_prefix[1] = { ValueTypeAsChar::kTransactionId }; |
76 | 86.6M | DocHybridTimeWordBuffer doc_ht_buffer; |
77 | 86.6M | auto doc_ht_slice = InvertEncodedDocHT(key.parts[N - 1], &doc_ht_buffer); |
78 | | |
79 | 86.6M | std::array<Slice, 3> reverse_key = {{ |
80 | 86.6M | Slice(reverse_key_prefix, sizeof(reverse_key_prefix)), |
81 | 86.6M | transaction_id.AsSlice(), |
82 | 86.6M | doc_ht_slice, |
83 | 86.6M | }}; |
84 | 86.6M | handler->Put(key, value); |
85 | 86.6M | if (reverse_value_prefix.empty()) { |
86 | 86.6M | handler->Put(reverse_key, key); |
87 | 86.6M | } else { |
88 | 38.2k | std::array<Slice, N + 1> reverse_value; |
89 | 38.2k | reverse_value[0] = reverse_value_prefix; |
90 | 38.2k | memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N); |
91 | 38.2k | handler->Put(reverse_key, reverse_value); |
92 | 38.2k | } |
93 | 86.6M | } |
94 | | |
95 | | template <size_t N> |
96 | | void PutApplyState( |
97 | | const Slice& transaction_id_slice, HybridTime commit_ht, IntraTxnWriteId write_id, |
98 | 243 | const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) { |
99 | 243 | char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState; |
100 | 243 | char group_end_value_type = ValueTypeAsChar::kGroupEnd; |
101 | 243 | char hybrid_time_value_type = ValueTypeAsChar::kHybridTime; |
102 | 243 | DocHybridTime doc_hybrid_time(commit_ht, write_id); |
103 | 243 | char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime]; |
104 | 243 | char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat( |
105 | 243 | doc_hybrid_time_buffer); |
106 | 243 | std::array<Slice, 5> key_parts = {{ |
107 | 243 | Slice(&transaction_apply_state_value_type, 1), |
108 | 243 | transaction_id_slice, |
109 | 243 | Slice(&group_end_value_type, 1), |
110 | 243 | Slice(&hybrid_time_value_type, 1), |
111 | 243 | Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), |
112 | 243 | }}; |
113 | 243 | handler->Put(key_parts, value_parts); |
114 | 243 | } rocksdb_writer.cc:void yb::docdb::(anonymous namespace)::PutApplyState<2ul>(yb::Slice const&, yb::HybridTime, unsigned int, std::__1::array<yb::Slice, 2ul> const&, rocksdb::DirectWriteHandler*) Line | Count | Source | 98 | 183 | const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) { | 99 | 183 | char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState; | 100 | 183 | char group_end_value_type = ValueTypeAsChar::kGroupEnd; | 101 | 183 | char hybrid_time_value_type = ValueTypeAsChar::kHybridTime; | 102 | 183 | DocHybridTime doc_hybrid_time(commit_ht, write_id); | 103 | 183 | char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime]; | 104 | 183 | char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat( | 105 | 183 | doc_hybrid_time_buffer); | 106 | 183 | std::array<Slice, 5> key_parts = {{ | 107 | 183 | Slice(&transaction_apply_state_value_type, 1), | 108 | 183 | transaction_id_slice, | 109 | 183 | Slice(&group_end_value_type, 1), | 110 | 183 | Slice(&hybrid_time_value_type, 1), | 111 | 183 | Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), | 112 | 183 | }}; | 113 | 183 | handler->Put(key_parts, value_parts); | 114 | 183 | } |
rocksdb_writer.cc:void yb::docdb::(anonymous namespace)::PutApplyState<1ul>(yb::Slice const&, yb::HybridTime, unsigned int, std::__1::array<yb::Slice, 1ul> const&, rocksdb::DirectWriteHandler*) Line | Count | Source | 98 | 60 | const std::array<Slice, N>& value_parts, rocksdb::DirectWriteHandler* handler) { | 99 | 60 | char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState; | 100 | 60 | char group_end_value_type = ValueTypeAsChar::kGroupEnd; | 101 | 60 | char hybrid_time_value_type = ValueTypeAsChar::kHybridTime; | 102 | 60 | DocHybridTime doc_hybrid_time(commit_ht, write_id); | 103 | 60 | char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime]; | 104 | 60 | char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat( | 105 | 60 | doc_hybrid_time_buffer); | 106 | 60 | std::array<Slice, 5> key_parts = {{ | 107 | 60 | Slice(&transaction_apply_state_value_type, 1), | 108 | 60 | transaction_id_slice, | 109 | 60 | Slice(&group_end_value_type, 1), | 110 | 60 | Slice(&hybrid_time_value_type, 1), | 111 | 60 | Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end), | 112 | 60 | }}; | 113 | 60 | handler->Put(key_parts, value_parts); | 114 | 60 | } |
|
115 | | |
116 | | } // namespace |
117 | | |
118 | | NonTransactionalWriter::NonTransactionalWriter( |
119 | | std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch, HybridTime hybrid_time) |
120 | 6.34M | : put_batch_(put_batch), hybrid_time_(hybrid_time) { |
121 | 6.34M | } |
122 | | |
123 | 0 | bool NonTransactionalWriter::Empty() const { |
124 | 0 | return put_batch_.write_pairs().empty(); |
125 | 0 | } |
126 | | |
127 | 6.34M | Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { |
128 | 6.34M | DocHybridTimeBuffer doc_ht_buffer; |
129 | | |
130 | 6.34M | int write_id = 0; |
131 | 125M | for (const auto& kv_pair : put_batch_.write_pairs()) { |
132 | | |
133 | 125M | CHECK(!kv_pair.key().empty()); |
134 | 125M | CHECK(!kv_pair.value().empty()); |
135 | | |
136 | 125M | if (kv_pair.key()[0] == ValueTypeAsChar::kExternalTransactionId) { |
137 | 0 | continue; |
138 | 0 | } |
139 | | |
140 | 125M | #ifndef NDEBUG |
141 | | // Debug-only: ensure all keys we get in Raft replication can be decoded. |
142 | 125M | SubDocKey subdoc_key; |
143 | 125M | Status s = subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(kv_pair.key()); |
144 | 18.4E | CHECK(s.ok()) |
145 | 18.4E | << "Failed decoding key: " << s.ToString() << "; " |
146 | 18.4E | << "Problematic key: " << BestEffortDocDBKeyToStr(KeyBytes(kv_pair.key())) << "\n" |
147 | 18.4E | << "value: " << FormatBytesAsStr(kv_pair.value()); |
148 | 125M | #endif |
149 | | |
150 | | // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here. |
151 | | // The reason for this is that the HybridTime timestamp is only picked at the time of |
152 | | // appending an entry to the tablet's Raft log. Also this is a good way to save network |
153 | | // bandwidth. |
154 | | // |
155 | | // "Write id" is the final component of our HybridTime encoding (or, to be more precise, |
156 | | // DocHybridTime encoding) that helps disambiguate between different updates to the |
157 | | // same key (row/column) within a transaction. We set it based on the position of the write |
158 | | // operation in its write batch. |
159 | | |
160 | 125M | auto hybrid_time = kv_pair.has_external_hybrid_time() ? |
161 | 125M | HybridTime(kv_pair.external_hybrid_time())0 : hybrid_time_; |
162 | 125M | std::array<Slice, 2> key_parts = {{ |
163 | 125M | Slice(kv_pair.key()), |
164 | 125M | doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id), |
165 | 125M | }}; |
166 | 125M | Slice key_value = kv_pair.value(); |
167 | 125M | handler->Put(key_parts, SliceParts(&key_value, 1)); |
168 | | |
169 | 125M | ++write_id; |
170 | 125M | } |
171 | | |
172 | 6.34M | return Status::OK(); |
173 | 6.34M | } |
174 | | |
175 | | TransactionalWriter::TransactionalWriter( |
176 | | std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch, |
177 | | HybridTime hybrid_time, |
178 | | const TransactionId& transaction_id, |
179 | | IsolationLevel isolation_level, |
180 | | PartialRangeKeyIntents partial_range_key_intents, |
181 | | const Slice& replicated_batches_state, |
182 | | IntraTxnWriteId intra_txn_write_id) |
183 | | : put_batch_(put_batch), |
184 | | hybrid_time_(hybrid_time), |
185 | | transaction_id_(transaction_id), |
186 | | isolation_level_(isolation_level), |
187 | | partial_range_key_intents_(partial_range_key_intents), |
188 | | replicated_batches_state_(replicated_batches_state), |
189 | 2.31M | intra_txn_write_id_(intra_txn_write_id) { |
190 | 2.31M | } |
191 | | |
192 | | // We have the following distinct types of data in this "intent store": |
193 | | // Main intent data: |
194 | | // Prefix + SubDocKey (no HybridTime) + IntentType + HybridTime -> TxnId + value of the intent |
195 | | // Transaction metadata |
196 | | // TxnId -> status tablet id + isolation level |
197 | | // Reverse index by txn id |
198 | | // TxnId + HybridTime -> Main intent data key |
199 | | // |
200 | | // Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with |
201 | | // appropriate value type. |
202 | 2.30M | CHECKED_STATUS TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { |
203 | 2.30M | VLOG(4) << "PrepareTransactionWriteBatch(), write_id = " << write_id_4.30k ; |
204 | | |
205 | 2.30M | row_mark_ = GetRowMarkTypeFromPB(put_batch_); |
206 | 2.30M | handler_ = handler; |
207 | | |
208 | 2.30M | if (metadata_to_store_) { |
209 | 1.66M | auto txn_value_type = ValueTypeAsChar::kTransactionId; |
210 | 1.66M | std::array<Slice, 2> key = { |
211 | 1.66M | Slice(&txn_value_type, 1), |
212 | 1.66M | transaction_id_.AsSlice(), |
213 | 1.66M | }; |
214 | 1.66M | auto data_copy = *metadata_to_store_; |
215 | | // We use hybrid time only for backward compatibility, actually wall time is required. |
216 | 1.66M | data_copy.set_metadata_write_time(GetCurrentTimeMicros()); |
217 | 1.66M | auto value = data_copy.SerializeAsString(); |
218 | 1.66M | Slice value_slice(value); |
219 | 1.66M | handler->Put(key, SliceParts(&value_slice, 1)); |
220 | 1.66M | } |
221 | | |
222 | 2.30M | subtransaction_id_ = put_batch_.has_subtransaction() |
223 | 2.30M | ? put_batch_.subtransaction().subtransaction_id()9.27k |
224 | 2.30M | : kMinSubTransactionId2.30M ; |
225 | | |
226 | 2.30M | if (!put_batch_.write_pairs().empty()) { |
227 | 1.66M | if (IsValidRowMarkType(row_mark_)) { |
228 | 0 | LOG(WARNING) << "Performing a write with row lock " << RowMarkType_Name(row_mark_) |
229 | 0 | << " when only reads are expected"; |
230 | 0 | } |
231 | 1.66M | strong_intent_types_ = GetStrongIntentTypeSet( |
232 | 1.66M | isolation_level_, OperationKind::kWrite, row_mark_); |
233 | | |
234 | | // We cannot recover from failures here, because it means that we cannot apply replicated |
235 | | // operation. |
236 | 1.66M | RETURN_NOT_OK(EnumerateIntents( |
237 | 1.66M | put_batch_.write_pairs(), std::ref(*this), partial_range_key_intents_)); |
238 | 1.66M | } |
239 | | |
240 | 2.30M | if (!put_batch_.read_pairs().empty()) { |
241 | 852k | strong_intent_types_ = GetStrongIntentTypeSet( |
242 | 852k | isolation_level_, OperationKind::kRead, row_mark_); |
243 | 852k | RETURN_NOT_OK(EnumerateIntents( |
244 | 852k | put_batch_.read_pairs(), std::ref(*this), partial_range_key_intents_)); |
245 | 852k | } |
246 | | |
247 | 2.30M | return Finish(); |
248 | 2.30M | } |
249 | | |
250 | | // Using operator() to pass this object conveniently to EnumerateIntents. |
251 | | CHECKED_STATUS TransactionalWriter::operator()( |
252 | | IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key, |
253 | 193M | LastKey last_key) { |
254 | 193M | if (intent_strength == IntentStrength::kWeak) { |
255 | 129M | weak_intents_[key->data()] |= StrongToWeak(strong_intent_types_); |
256 | 129M | return Status::OK(); |
257 | 129M | } |
258 | | |
259 | 64.4M | const auto transaction_value_type = ValueTypeAsChar::kTransactionId; |
260 | 64.4M | const auto write_id_value_type = ValueTypeAsChar::kWriteId; |
261 | 64.4M | const auto row_lock_value_type = ValueTypeAsChar::kRowLock; |
262 | 64.4M | IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(intra_txn_write_id_); |
263 | | |
264 | 64.4M | const auto subtransaction_value_type = ValueTypeAsChar::kSubTransactionId; |
265 | 64.4M | SubTransactionId big_endian_subtxn_id; |
266 | 64.4M | Slice subtransaction_marker; |
267 | 64.4M | Slice subtransaction_id; |
268 | 64.4M | if (subtransaction_id_ > kMinSubTransactionId) { |
269 | 29.8k | subtransaction_marker = Slice(&subtransaction_value_type, 1); |
270 | 29.8k | big_endian_subtxn_id = BigEndian::FromHost32(subtransaction_id_); |
271 | 29.8k | subtransaction_id = Slice::FromPod(&big_endian_subtxn_id); |
272 | 64.4M | } else { |
273 | 64.4M | DCHECK_EQ(subtransaction_id_, kMinSubTransactionId); |
274 | 64.4M | } |
275 | | |
276 | 64.4M | std::array<Slice, 7> value = {{ |
277 | 64.4M | Slice(&transaction_value_type, 1), |
278 | 64.4M | transaction_id_.AsSlice(), |
279 | 64.4M | subtransaction_marker, |
280 | 64.4M | subtransaction_id, |
281 | 64.4M | Slice(&write_id_value_type, 1), |
282 | 64.4M | Slice::FromPod(&big_endian_write_id), |
283 | 64.4M | value_slice, |
284 | 64.4M | }}; |
285 | | // Store a row lock indicator rather than data (in value_slice) for row lock intents. |
286 | 64.4M | if (IsValidRowMarkType(row_mark_)) { |
287 | 242k | value.back() = Slice(&row_lock_value_type, 1); |
288 | 242k | } |
289 | | |
290 | 64.4M | ++intra_txn_write_id_; |
291 | | |
292 | 64.4M | char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, |
293 | 64.4M | static_cast<char>(strong_intent_types_.ToUIntPtr()) }; |
294 | | |
295 | 64.4M | DocHybridTimeBuffer doc_ht_buffer; |
296 | | |
297 | 64.4M | constexpr size_t kNumKeyParts = 3; |
298 | 64.4M | std::array<Slice, kNumKeyParts> key_parts = {{ |
299 | 64.4M | key->AsSlice(), |
300 | 64.4M | Slice(intent_type, 2), |
301 | 64.4M | doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++), |
302 | 64.4M | }}; |
303 | | |
304 | 64.4M | Slice reverse_value_prefix; |
305 | 64.4M | if (last_key && FLAGS_enable_transaction_sealing2.51M ) { |
306 | 0 | reverse_value_prefix = replicated_batches_state_; |
307 | 0 | } |
308 | 64.4M | AddIntent<kNumKeyParts>(transaction_id_, key_parts, value, handler_, reverse_value_prefix); |
309 | | |
310 | 64.4M | return Status::OK(); |
311 | 193M | } |
312 | | |
313 | 2.31M | CHECKED_STATUS TransactionalWriter::Finish() { |
314 | 2.31M | char transaction_id_value_type = ValueTypeAsChar::kTransactionId; |
315 | | |
316 | 2.31M | DocHybridTimeBuffer doc_ht_buffer; |
317 | | |
318 | 2.31M | std::array<Slice, 2> value = {{ |
319 | 2.31M | Slice(&transaction_id_value_type, 1), |
320 | 2.31M | transaction_id_.AsSlice(), |
321 | 2.31M | }}; |
322 | | |
323 | 2.31M | if (PREDICT_FALSE(FLAGS_TEST_docdb_sort_weak_intents)) { |
324 | | // This is done in tests when deterministic DocDB state is required. |
325 | 20 | std::vector<std::pair<KeyBuffer, IntentTypeSet>> intents_and_types( |
326 | 20 | weak_intents_.begin(), weak_intents_.end()); |
327 | 20 | sort(intents_and_types.begin(), intents_and_types.end()); |
328 | 54 | for (const auto& intent_and_types : intents_and_types) { |
329 | 54 | RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer)); |
330 | 54 | } |
331 | 20 | return Status::OK(); |
332 | 20 | } |
333 | | |
334 | 22.0M | for (const auto& intent_and_types : weak_intents_)2.31M { |
335 | 22.0M | RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer)); |
336 | 22.0M | } |
337 | | |
338 | 2.31M | return Status::OK(); |
339 | 2.31M | } |
340 | | |
341 | | CHECKED_STATUS TransactionalWriter::AddWeakIntent( |
342 | | const std::pair<KeyBuffer, IntentTypeSet>& intent_and_types, |
343 | | const std::array<Slice, 2>& value, |
344 | 22.0M | DocHybridTimeBuffer* doc_ht_buffer) { |
345 | 22.0M | char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, |
346 | 22.0M | static_cast<char>(intent_and_types.second.ToUIntPtr()) }; |
347 | 22.0M | constexpr size_t kNumKeyParts = 3; |
348 | 22.0M | std::array<Slice, kNumKeyParts> key = {{ |
349 | 22.0M | intent_and_types.first.AsSlice(), |
350 | 22.0M | Slice(intent_type, 2), |
351 | 22.0M | doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++), |
352 | 22.0M | }}; |
353 | | |
354 | 22.0M | AddIntent<kNumKeyParts>(transaction_id_, key, value, handler_); |
355 | | |
356 | 22.0M | return Status::OK(); |
357 | 22.0M | } |
358 | | |
359 | 278M | DocHybridTimeBuffer::DocHybridTimeBuffer() { |
360 | 278M | buffer_[0] = ValueTypeAsChar::kHybridTime; |
361 | 278M | } |
362 | | |
363 | | IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id) |
364 | | : transaction_id_(transaction_id), |
365 | 2.97M | left_records_(FLAGS_txn_max_apply_batch_records) { |
366 | 2.97M | } |
367 | | |
368 | | IntentsWriter::IntentsWriter(const Slice& start_key, |
369 | | rocksdb::DB* intents_db, |
370 | | IntentsWriterContext* context) |
371 | 2.97M | : start_key_(start_key), intents_db_(intents_db), context_(*context) { |
372 | 2.97M | AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix_); |
373 | 2.97M | txn_reverse_index_prefix_.AppendValueType(ValueType::kMaxByte); |
374 | 2.97M | reverse_index_upperbound_ = txn_reverse_index_prefix_.AsSlice(); |
375 | 2.97M | reverse_index_iter_ = CreateRocksDBIterator( |
376 | 2.97M | intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, |
377 | 2.97M | rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound_); |
378 | 2.97M | } |
379 | | |
380 | 2.97M | CHECKED_STATUS IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) { |
381 | 2.97M | Slice key_prefix = txn_reverse_index_prefix_.AsSlice(); |
382 | 2.97M | key_prefix.remove_suffix(1); |
383 | | |
384 | 2.97M | DocHybridTimeBuffer doc_ht_buffer; |
385 | | |
386 | 2.97M | reverse_index_iter_.Seek(start_key_.empty() ? key_prefix2.96M : start_key_5.65k ); |
387 | | |
388 | 2.97M | context_.Start( |
389 | 18.4E | reverse_index_iter_.Valid()2.97M ? boost::make_optional(reverse_index_iter_.key())2.97M : boost::none); |
390 | | |
391 | 153M | while (reverse_index_iter_.Valid()) { |
392 | 150M | const Slice key_slice(reverse_index_iter_.key()); |
393 | | |
394 | 150M | if (!key_slice.starts_with(key_prefix)) { |
395 | 0 | break; |
396 | 0 | } |
397 | | |
398 | 150M | auto reverse_index_value = reverse_index_iter_.value(); |
399 | | |
400 | 150M | bool metadata = key_slice.size() == 1 + TransactionId::StaticSize(); |
401 | | // At this point, txn_reverse_index_prefix is a prefix of key_slice. If key_slice is equal to |
402 | | // txn_reverse_index_prefix in size, then they are identical, and we are seeked to transaction |
403 | | // metadata. Otherwise, we're seeked to an intent entry in the index which we may process. |
404 | 150M | if (!metadata) { |
405 | 147M | if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet147M ) { |
406 | 0 | CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record); |
407 | 0 | reverse_index_value.remove_prefix(1); |
408 | 0 | RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value)); |
409 | 0 | } |
410 | 147M | } |
411 | | |
412 | 150M | if (VERIFY_RESULT(context_.Entry(key_slice, reverse_index_value, metadata, handler))) { |
413 | 820 | return Status::OK(); |
414 | 820 | } |
415 | | |
416 | 150M | reverse_index_iter_.Next(); |
417 | 150M | } |
418 | | |
419 | 2.97M | context_.Complete(handler); |
420 | | |
421 | 2.97M | return Status::OK(); |
422 | 2.97M | } |
423 | | |
424 | | ApplyIntentsContext::ApplyIntentsContext( |
425 | | const TransactionId& transaction_id, |
426 | | const ApplyTransactionState* apply_state, |
427 | | const AbortedSubTransactionSet& aborted, |
428 | | HybridTime commit_ht, |
429 | | HybridTime log_ht, |
430 | | const KeyBounds* key_bounds, |
431 | | rocksdb::DB* intents_db) |
432 | | : IntentsWriterContext(transaction_id), |
433 | | apply_state_(apply_state), |
434 | | // In case we have passed in a non-null apply_state, it's aborted set will have been loaded |
435 | | // from persisted apply state, and the passed in aborted set will correspond to the aborted |
436 | | // set at commit time. Rather then copy that set upstream so it is passed in as aborted, we |
437 | | // simply grab a reference to it here, if it is defined, to use in this method. |
438 | | aborted_(apply_state ? apply_state->aborted : aborted), |
439 | | commit_ht_(commit_ht), |
440 | | log_ht_(log_ht), |
441 | | write_id_(apply_state ? apply_state->write_id : 0), |
442 | | key_bounds_(key_bounds), |
443 | | intent_iter_(CreateRocksDBIterator( |
444 | | intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, |
445 | 1.30M | rocksdb::kDefaultQueryId)) { |
446 | 1.30M | } |
447 | | |
448 | | Result<bool> ApplyIntentsContext::StoreApplyState( |
449 | 183 | const Slice& key, rocksdb::DirectWriteHandler* handler) { |
450 | 183 | SetApplyState(key, write_id_, aborted_); |
451 | 183 | ApplyTransactionStatePB pb; |
452 | 183 | apply_state().ToPB(&pb); |
453 | 183 | pb.set_commit_ht(commit_ht_.ToUint64()); |
454 | 183 | faststring encoded_pb; |
455 | 183 | pb_util::SerializeToString(pb, &encoded_pb); |
456 | 183 | char string_value_type = ValueTypeAsChar::kString; |
457 | 183 | std::array<Slice, 2> value_parts = {{ |
458 | 183 | Slice(&string_value_type, 1), |
459 | 183 | Slice(encoded_pb.data(), encoded_pb.size()) |
460 | 183 | }}; |
461 | 183 | PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler); |
462 | 183 | return true; |
463 | 183 | } |
464 | | |
465 | 1.30M | void ApplyIntentsContext::Start(const boost::optional<Slice>& first_key) { |
466 | 1.30M | if (!apply_state_1.30M ) { |
467 | 1.30M | return; |
468 | 1.30M | } |
469 | | // This sanity check is invalid for remove case, because .SST file could be deleted. |
470 | 18.4E | LOG_IF(DFATAL, !first_key || *first_key != apply_state_->key) |
471 | 18.4E | << "Continue from wrong key: " << Slice(apply_state_->key).ToDebugString() << ", txn: " |
472 | 18.4E | << transaction_id() << ", position: " |
473 | 18.4E | << (first_key ? first_key->ToDebugString()0 : "<INVALID>") |
474 | 18.4E | << ", write id: " << apply_state_->write_id; |
475 | 18.4E | } |
476 | | |
477 | | Result<bool> ApplyIntentsContext::Entry( |
478 | 77.7M | const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) { |
479 | | // Value of reverse index is a key of original intent record, so seek it and check match. |
480 | 77.7M | if (metadata || !IsWithinBounds(key_bounds_, value)76.4M ) { |
481 | 1.30M | return false; |
482 | 1.30M | } |
483 | | |
484 | | // We store apply state only if there are some more intents left. |
485 | | // So doing this check here, instead of right after write_id was incremented. |
486 | 76.4M | if (reached_records_limit()) { |
487 | 183 | return StoreApplyState(key, handler); |
488 | 183 | } |
489 | | |
490 | 76.4M | DocHybridTimeBuffer doc_ht_buffer; |
491 | 76.4M | intent_iter_.Seek(value); |
492 | 76.5M | if (!intent_iter_.Valid()76.4M || intent_iter_.key() != value) { |
493 | 0 | Slice temp_slice = value; |
494 | 0 | auto value_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice); |
495 | 0 | temp_slice = key; |
496 | 0 | auto key_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice); |
497 | 0 | LOG(DFATAL) << "Unable to find intent: " << value.ToDebugHexString() << " (" |
498 | 0 | << value_doc_ht << ") for " << key.ToDebugHexString() << "(" << key_doc_ht << ")"; |
499 | 0 | return false; |
500 | 0 | } |
501 | | |
502 | 76.4M | auto intent = VERIFY_RESULT(ParseIntentKey(value, transaction_id().AsSlice())); |
503 | | |
504 | 76.4M | if (intent.types.Test(IntentType::kStrongWrite)) { |
505 | 52.7M | const Slice transaction_id_slice = transaction_id().AsSlice(); |
506 | 52.7M | auto decoded_value = VERIFY_RESULT(DecodeIntentValue( |
507 | 52.7M | intent_iter_.value(), &transaction_id_slice)); |
508 | | |
509 | | // Write id should match to one that were calculated during append of intents. |
510 | | // Doing it just for sanity check. |
511 | 52.7M | RSTATUS_DCHECK_GE( |
512 | 52.7M | decoded_value.write_id, write_id_, |
513 | 52.7M | Corruption, |
514 | 52.7M | Format("Unexpected write id. Expected: $0, found: $1, raw value: $2", |
515 | 52.7M | write_id_, |
516 | 52.7M | decoded_value.write_id, |
517 | 52.7M | intent_iter_.value().ToDebugHexString())); |
518 | 52.7M | write_id_ = decoded_value.write_id; |
519 | | |
520 | | // Intents for row locks should be ignored (i.e. should not be written as regular records). |
521 | 52.7M | if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) { |
522 | 13.2k | return false; |
523 | 13.2k | } |
524 | | |
525 | | // Intents from aborted subtransactions should not be written as regular records. |
526 | 52.7M | if (aborted_.Test(decoded_value.subtransaction_id)) { |
527 | 2.83k | return false; |
528 | 2.83k | } |
529 | | |
530 | | // After strip of prefix and suffix intent_key contains just SubDocKey w/o a hybrid time. |
531 | | // Time will be added when writing batch to RocksDB. |
532 | 52.7M | std::array<Slice, 2> key_parts = {{ |
533 | 52.7M | intent.doc_path, |
534 | 52.7M | doc_ht_buffer.EncodeWithValueType(commit_ht_, write_id_), |
535 | 52.7M | }}; |
536 | 52.7M | std::array<Slice, 2> value_parts = {{ |
537 | 52.7M | intent.doc_ht, |
538 | 52.7M | decoded_value.body, |
539 | 52.7M | }}; |
540 | | |
541 | | // Useful when debugging transaction failure. |
542 | | #if defined(DUMP_APPLY) |
543 | | SubDocKey sub_doc_key; |
544 | | CHECK_OK(sub_doc_key.FullyDecodeFrom(intent.doc_path, HybridTimeRequired::kFalse)); |
545 | | if (!sub_doc_key.subkeys().empty()) { |
546 | | auto txn_id = FullyDecodeTransactionId(transaction_id_slice); |
547 | | LOG(INFO) << "Apply: " << sub_doc_key.ToString() |
548 | | << ", time: " << commit_ht << ", write id: " << *write_id << ", txn: " << txn_id |
549 | | << ", value: " << intent_value.ToDebugString(); |
550 | | } |
551 | | #endif |
552 | | |
553 | 52.7M | handler->Put(key_parts, value_parts); |
554 | 52.7M | ++write_id_; |
555 | 52.7M | RegisterRecord(); |
556 | 52.7M | } |
557 | | |
558 | 76.4M | return false; |
559 | 76.4M | } |
560 | | |
561 | 1.30M | void ApplyIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { |
562 | 1.30M | if (apply_state_) { |
563 | 60 | char tombstone_value_type = ValueTypeAsChar::kTombstone; |
564 | 60 | std::array<Slice, 1> value_parts = {{Slice(&tombstone_value_type, 1)}}; |
565 | 60 | PutApplyState(transaction_id().AsSlice(), commit_ht_, write_id_, value_parts, handler); |
566 | 60 | } |
567 | 1.30M | } |
568 | | |
569 | | RemoveIntentsContext::RemoveIntentsContext(const TransactionId& transaction_id) |
570 | 1.67M | : IntentsWriterContext(transaction_id) { |
571 | 1.67M | } |
572 | | |
573 | | Result<bool> RemoveIntentsContext::Entry( |
574 | 72.9M | const Slice& key, const Slice& value, bool metadata, rocksdb::DirectWriteHandler* handler) { |
575 | 72.9M | if (reached_records_limit()) { |
576 | 637 | SetApplyState(key, 0, AbortedSubTransactionSet()); |
577 | 637 | return true; |
578 | 637 | } |
579 | | |
580 | 72.9M | handler->SingleDelete(key); |
581 | 72.9M | RegisterRecord(); |
582 | 72.9M | if (!metadata) { |
583 | 71.3M | handler->SingleDelete(value); |
584 | 71.3M | RegisterRecord(); |
585 | 71.3M | } |
586 | 72.9M | return false; |
587 | 72.9M | } |
588 | | |
589 | 1.67M | void RemoveIntentsContext::Complete(rocksdb::DirectWriteHandler* handler) { |
590 | 1.67M | } |
591 | | |
592 | | } // namespace docdb |
593 | | } // namespace yb |