/Users/deen/code/yugabyte-db/src/yb/docdb/intent_aware_iterator.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/intent_aware_iterator.h" |
15 | | |
16 | | #include <future> |
17 | | |
18 | | #include "yb/common/doc_hybrid_time.h" |
19 | | #include "yb/common/hybrid_time.h" |
20 | | #include "yb/common/transaction.h" |
21 | | |
22 | | #include "yb/docdb/docdb_fwd.h" |
23 | | #include "yb/docdb/shared_lock_manager_fwd.h" |
24 | | #include "yb/docdb/conflict_resolution.h" |
25 | | #include "yb/docdb/doc_key.h" |
26 | | #include "yb/docdb/doc_kv_util.h" |
27 | | #include "yb/docdb/docdb-internal.h" |
28 | | #include "yb/docdb/docdb_rocksdb_util.h" |
29 | | #include "yb/docdb/intent.h" |
30 | | #include "yb/docdb/key_bounds.h" |
31 | | #include "yb/docdb/transaction_dump.h" |
32 | | #include "yb/docdb/value.h" |
33 | | #include "yb/docdb/value_type.h" |
34 | | |
35 | | #include "yb/util/bytes_formatter.h" |
36 | | #include "yb/util/debug-util.h" |
37 | | #include "yb/util/result.h" |
38 | | #include "yb/util/status_format.h" |
39 | | #include "yb/util/trace.h" |
40 | | |
41 | | using namespace std::literals; |
42 | | |
43 | | namespace yb { |
44 | | namespace docdb { |
45 | | |
46 | | namespace { |
47 | | |
48 | 25.2M | void GetIntentPrefixForKeyWithoutHt(const Slice& key, KeyBytes* out) { |
49 | 25.2M | out->Clear(); |
50 | | // Since caller guarantees that key_bytes doesn't have hybrid time, we can simply use it |
51 | | // to get prefix for all related intents. |
52 | 25.2M | out->AppendRawBytes(key); |
53 | 25.2M | } |
54 | | |
55 | 13.0k | KeyBytes GetIntentPrefixForKeyWithoutHt(const Slice& key) { |
56 | 13.0k | KeyBytes result; |
57 | 13.0k | GetIntentPrefixForKeyWithoutHt(key, &result); |
58 | 13.0k | return result; |
59 | 13.0k | } |
60 | | |
61 | 886M | void AppendEncodedDocHt(const Slice& encoded_doc_ht, KeyBytes* key_bytes) { |
62 | 886M | key_bytes->AppendValueType(ValueType::kHybridTime); |
63 | 886M | key_bytes->AppendRawBytes(encoded_doc_ht); |
64 | 886M | } |
65 | | |
66 | | const char kStrongWriteTail[] = { |
67 | | ValueTypeAsChar::kIntentTypeSet, |
68 | | static_cast<char>(IntentTypeSet({IntentType::kStrongWrite}).ToUIntPtr()) }; |
69 | | |
70 | | const Slice kStrongWriteTailSlice = Slice(kStrongWriteTail, sizeof(kStrongWriteTail)); |
71 | | |
72 | | char kEmptyKeyStrongWriteTail[] = { |
73 | | ValueTypeAsChar::kGroupEnd, |
74 | | ValueTypeAsChar::kIntentTypeSet, |
75 | | static_cast<char>(IntentTypeSet({IntentType::kStrongWrite}).ToUIntPtr()) }; |
76 | | |
77 | | const Slice kEmptyKeyStrongWriteTailSlice = |
78 | | Slice(kEmptyKeyStrongWriteTail, sizeof(kEmptyKeyStrongWriteTail)); |
79 | | |
80 | 131M | Slice StrongWriteSuffix(const KeyBytes& key) { |
81 | 131M | return key.empty() ? kEmptyKeyStrongWriteTailSlice14 : kStrongWriteTailSlice131M ; |
82 | 131M | } |
83 | | |
84 | | // We are not interested in weak and read intents here. |
85 | | // So could just skip them. |
86 | 8.00M | void AppendStrongWrite(KeyBytes* out) { |
87 | 8.00M | out->AppendRawBytes(StrongWriteSuffix(*out)); |
88 | 8.00M | } |
89 | | |
90 | | } // namespace |
91 | | |
92 | | namespace { |
93 | | |
94 | | struct DecodeStrongWriteIntentResult { |
95 | | Slice intent_prefix; |
96 | | Slice intent_value; |
97 | | DocHybridTime intent_time; |
98 | | DocHybridTime value_time; |
99 | | IntentTypeSet intent_types; |
100 | | |
101 | | // Whether this intent from the same transaction as specified in context. |
102 | | bool same_transaction = false; |
103 | | |
104 | 0 | std::string ToString() const { |
105 | 0 | return Format("{ intent_prefix: $0 intent_value: $1 intent_time: $2 value_time: $3 " |
106 | 0 | "same_transaction: $4 intent_types: $5 }", |
107 | 0 | intent_prefix.ToDebugHexString(), intent_value.ToDebugHexString(), intent_time, |
108 | 0 | value_time, same_transaction, intent_types); |
109 | 0 | } |
110 | | |
111 | | // Returns the upper limit for the "value time" of an intent in order for the intent to be visible |
112 | | // in the read results. The "value time" is defined as follows: |
113 | | // - For uncommitted transactions, the "value time" is the time when the intent was written. |
114 | | // Note that same_transaction or in_txn_limit could only be set for uncommited transactions. |
115 | | // - For committed transactions, the "value time" is the commit time. |
116 | | // |
117 | | // The logic here is as follows: |
118 | | // - When a transaction is reading its own intents, the in_txn_limit allows a statement to |
119 | | // avoid seeing its own partial results. This is necessary for statements such as INSERT ... |
120 | | // SELECT to avoid reading rows that the same statement generated and going into an infinite |
121 | | // loop. |
122 | | // - If an intent's hybrid time is greater than the tablet's local limit, then this intent |
123 | | // cannot lead to a read restart and we only need to see it if its commit time is less than or |
124 | | // equal to read_time. |
125 | | // - If an intent's hybrid time is <= than the tablet's local limit, then we cannot claim that |
126 | | // the intent was written after the read transaction began based on the local limit, and we |
127 | | // must compare the intent's commit time with global_limit and potentially perform a read |
128 | | // restart, because the transaction that wrote the intent might have been committed before our |
129 | | // read transaction begin. |
130 | 4.93M | HybridTime MaxAllowedValueTime(const ReadHybridTime& read_time) const { |
131 | 4.93M | if (same_transaction) { |
132 | 4.34M | return read_time.in_txn_limit; |
133 | 4.34M | } |
134 | 596k | return intent_time.hybrid_time() > read_time.local_limit |
135 | 596k | ? read_time.read692 : read_time.global_limit595k ; |
136 | 4.93M | } |
137 | | }; |
138 | | |
139 | 0 | std::ostream& operator<<(std::ostream& out, const DecodeStrongWriteIntentResult& result) { |
140 | 0 | return out << result.ToString(); |
141 | 0 | } |
142 | | |
143 | | // Decodes intent based on intent_iterator and its transaction commit time if intent is a strong |
144 | | // write intent, intent is not for row locking, and transaction is already committed at specified |
145 | | // time or is current transaction. |
146 | | // Returns HybridTime::kMin as value_time otherwise. |
147 | | // For current transaction returns intent record hybrid time as value_time. |
148 | | // Consumes intent from value_slice leaving only value itself. |
149 | | Result<DecodeStrongWriteIntentResult> DecodeStrongWriteIntent( |
150 | | HybridTime global_limit, |
151 | | const TransactionOperationContext& txn_op_context, |
152 | | rocksdb::Iterator* intent_iter, |
153 | 6.14M | TransactionStatusCache* transaction_status_cache) { |
154 | 6.14M | DecodeStrongWriteIntentResult result; |
155 | 6.14M | auto decoded_intent_key = VERIFY_RESULT(DecodeIntentKey(intent_iter->key())); |
156 | 0 | result.intent_prefix = decoded_intent_key.intent_prefix; |
157 | 6.14M | result.intent_types = decoded_intent_key.intent_types; |
158 | 6.14M | if (result.intent_types.Test(IntentType::kStrongWrite)) { |
159 | 5.57M | auto intent_value = intent_iter->value(); |
160 | 5.57M | auto decoded_intent_value = VERIFY_RESULT(DecodeIntentValue(intent_value)); |
161 | | |
162 | 0 | auto decoded_txn_id = decoded_intent_value.transaction_id; |
163 | 5.57M | auto decoded_subtxn_id = decoded_intent_value.subtransaction_id; |
164 | | |
165 | 5.57M | result.intent_value = decoded_intent_value.body; |
166 | 5.57M | result.intent_time = decoded_intent_key.doc_ht; |
167 | 5.57M | result.same_transaction = decoded_txn_id == txn_op_context.transaction_id; |
168 | | |
169 | | // By setting the value time to kMin, we ensure the caller ignores this intent. This is true |
170 | | // because the caller is skipping all intents written before or at the same time as |
171 | | // intent_dht_from_same_txn_ or resolved_intent_txn_dht_, which of course are greater than or |
172 | | // equal to DocHybridTime::kMin. |
173 | 5.57M | if (result.intent_value.starts_with(ValueTypeAsChar::kRowLock)) { |
174 | 125k | result.value_time = DocHybridTime::kMin; |
175 | 5.44M | } else if (result.same_transaction) { |
176 | 4.62M | if (txn_op_context.subtransaction.aborted.Test(decoded_subtxn_id)) { |
177 | | // If this intent is from the same transaction, we can check the aborted set from this |
178 | | // txn_op_context to see whether the intent is still live. If not, mask it from the caller. |
179 | 752 | result.value_time = DocHybridTime::kMin; |
180 | 4.62M | } else { |
181 | 4.62M | result.value_time = decoded_intent_key.doc_ht; |
182 | 4.62M | } |
183 | 4.62M | } else if (817k result.intent_time.hybrid_time() > global_limit817k ) { |
184 | 108k | VTRACE(1, "Ignoring intent from a different txn written after read.global_limit"); |
185 | 108k | result.value_time = DocHybridTime::kMin; |
186 | 709k | } else { |
187 | 709k | auto commit_data = VERIFY_RESULT(transaction_status_cache->GetCommitData(decoded_txn_id)); |
188 | 0 | auto commit_ht = commit_data.commit_ht; |
189 | 709k | auto aborted_subtxn_set = commit_data.aborted_subtxn_set; |
190 | 709k | auto is_aborted_subtxn = aborted_subtxn_set.Test(decoded_subtxn_id); |
191 | 709k | result.value_time = commit_ht == HybridTime::kMin || is_aborted_subtxn629k |
192 | 709k | ? DocHybridTime::kMin79.6k |
193 | 709k | : DocHybridTime(commit_ht, decoded_intent_value.write_id)629k ; |
194 | 709k | VLOG(4) << "Transaction id: " << decoded_txn_id |
195 | 68 | << ", subtransaction id: " << decoded_subtxn_id |
196 | 68 | << ", value time: " << result.value_time |
197 | 68 | << ", value: " << result.intent_value.ToDebugHexString() |
198 | 68 | << ", aborted subtxn set: " << aborted_subtxn_set.ToString(); |
199 | 709k | } |
200 | 5.57M | } else { |
201 | 569k | result.value_time = DocHybridTime::kMin; |
202 | 569k | } |
203 | 6.14M | return result; |
204 | 6.14M | } |
205 | | |
206 | | // Given that key is well-formed DocDB encoded key, checks if it is an intent key for the same key |
207 | | // as intent_prefix. If key is not well-formed DocDB encoded key, result could be true or false. |
208 | 4.61M | bool IsIntentForTheSameKey(const Slice& key, const Slice& intent_prefix) { |
209 | 4.61M | return key.starts_with(intent_prefix) && |
210 | 4.61M | key.size() > intent_prefix.size()346k && |
211 | 4.61M | IntentValueType(key[intent_prefix.size()])346k ; |
212 | 4.61M | } |
213 | | |
214 | 0 | std::string DebugDumpKeyToStr(const Slice &key) { |
215 | 0 | return key.ToDebugString() + " (" + SubDocKey::DebugSliceToString(key) + ")"; |
216 | 0 | } |
217 | | |
218 | 0 | std::string DebugDumpKeyToStr(const KeyBytes &key) { |
219 | 0 | return DebugDumpKeyToStr(key.AsSlice()); |
220 | 0 | } |
221 | | |
222 | 84.6M | bool DebugHasHybridTime(const Slice& subdoc_key_encoded) { |
223 | 84.6M | SubDocKey subdoc_key; |
224 | 84.6M | CHECK(subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(subdoc_key_encoded).ok()); |
225 | 84.6M | return subdoc_key.has_hybrid_time(); |
226 | 84.6M | } |
227 | | |
228 | 62.2M | std::string EncodeHybridTime(HybridTime value) { |
229 | 62.2M | return DocHybridTime(value, kMaxWriteId).EncodedInDocDbFormat(); |
230 | 62.2M | } |
231 | | |
232 | | } // namespace |
233 | | |
234 | | IntentAwareIterator::IntentAwareIterator( |
235 | | const DocDB& doc_db, |
236 | | const rocksdb::ReadOptions& read_opts, |
237 | | CoarseTimePoint deadline, |
238 | | const ReadHybridTime& read_time, |
239 | | const TransactionOperationContext& txn_op_context) |
240 | | : read_time_(read_time), |
241 | | encoded_read_time_read_(EncodeHybridTime(read_time_.read)), |
242 | | encoded_read_time_local_limit_(EncodeHybridTime(read_time_.local_limit)), |
243 | | encoded_read_time_global_limit_(EncodeHybridTime(read_time_.global_limit)), |
244 | | encoded_read_time_regular_limit_( |
245 | | read_time_.local_limit > read_time_.read ? Slice(encoded_read_time_local_limit_) |
246 | | : Slice(encoded_read_time_read_)), |
247 | | txn_op_context_(txn_op_context), |
248 | 20.7M | transaction_status_cache_(txn_op_context_, read_time, deadline) { |
249 | 20.7M | VTRACE(1, __func__); |
250 | 18.4E | VLOG(4) << "IntentAwareIterator, read_time: " << read_time |
251 | 18.4E | << ", txn_op_context: " << txn_op_context_; |
252 | | |
253 | 20.7M | if (txn_op_context) { |
254 | 12.3M | VTRACE(1, "Checking MinRunningTime"); |
255 | 12.3M | const auto min_running_ht = txn_op_context.txn_status_manager->MinRunningHybridTime(); |
256 | 12.3M | if (min_running_ht != HybridTime::kMax && min_running_ht < read_time.global_limit7.62M ) { |
257 | 7.61M | intent_iter_ = docdb::CreateRocksDBIterator(doc_db.intents, |
258 | 7.61M | doc_db.key_bounds, |
259 | 7.61M | docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, |
260 | 7.61M | boost::none, |
261 | 7.61M | rocksdb::kDefaultQueryId, |
262 | 7.61M | nullptr /* file_filter */, |
263 | 7.61M | &intent_upperbound_); |
264 | 7.61M | } else { |
265 | 4.76M | VLOG(4) << "No relevant transactions running: " |
266 | 16.8k | << "min_running_ht=" << min_running_ht << ", " |
267 | 16.8k | << "global_limit=" << read_time.global_limit; |
268 | 4.76M | } |
269 | 12.3M | } |
270 | 20.7M | VTRACE(2, "Done Checking MinRunningTime"); |
271 | | // WARNING: Is is important for regular DB iterator to be created after intents DB iterator, |
272 | | // otherwise consistency could break, for example in following scenario: |
273 | | // 1) Transaction is T1 committed with value v1 for k1, but not yet applied to regular DB. |
274 | | // 2) Client reads v1 for k1. |
275 | | // 3) Regular DB iterator is created on a regular DB snapshot containing no values for k1. |
276 | | // 4) Transaction T1 is applied, k1->v1 is written into regular DB, intent k1->v1 is deleted. |
277 | | // 5) Intents DB iterator is created on an intents DB snapshot containing no intents for k1. |
278 | | // 6) Client reads no values for k1. |
279 | 20.7M | iter_ = BoundedRocksDbIterator(doc_db.regular, read_opts, doc_db.key_bounds); |
280 | 20.7M | VTRACE(2, "Created iterator"); |
281 | 20.7M | } |
282 | | |
283 | 3.10k | void IntentAwareIterator::Seek(const DocKey &doc_key) { |
284 | 3.10k | Seek(doc_key.Encode()); |
285 | 3.10k | } |
286 | | |
287 | 35.1M | void IntentAwareIterator::Seek(const Slice& key) { |
288 | 18.4E | VLOG(4) << "Seek(" << SubDocKey::DebugSliceToString(key) << ")"; |
289 | 35.1M | DOCDB_DEBUG_SCOPE_LOG( |
290 | 0 | key.ToDebugString(), |
291 | 0 | std::bind(&IntentAwareIterator::DebugDump, this)); |
292 | 35.1M | if (!status_.ok()) { |
293 | 0 | return; |
294 | 0 | } |
295 | | |
296 | 35.1M | ROCKSDB_SEEK(&iter_, key); |
297 | 35.1M | skip_future_records_needed_ = true; |
298 | | |
299 | 35.1M | if (intent_iter_.Initialized()) { |
300 | 8.01M | seek_intent_iter_needed_ = SeekIntentIterNeeded::kSeek; |
301 | 8.01M | GetIntentPrefixForKeyWithoutHt(key, &seek_key_buffer_); |
302 | 8.01M | AppendStrongWrite(&seek_key_buffer_); |
303 | 8.01M | } |
304 | 35.1M | } |
305 | | |
306 | 83.6M | void IntentAwareIterator::SeekForward(const Slice& key) { |
307 | 83.6M | KeyBytes key_bytes; |
308 | | // Reserve space for key plus kMaxBytesPerEncodedHybridTime + 1 bytes for SeekForward() below to |
309 | | // avoid extra realloc while appending the read time. |
310 | 83.6M | key_bytes.Reserve(key.size() + kMaxBytesPerEncodedHybridTime + 1); |
311 | 83.6M | key_bytes.AppendRawBytes(key); |
312 | 83.6M | SeekForward(&key_bytes); |
313 | 83.6M | } |
314 | | |
315 | 803M | void IntentAwareIterator::SeekForward(KeyBytes* key_bytes) { |
316 | 18.4E | VLOG(4) << "SeekForward(" << SubDocKey::DebugSliceToString(*key_bytes) << ")"; |
317 | 803M | DOCDB_DEBUG_SCOPE_LOG( |
318 | 0 | SubDocKey::DebugSliceToString(*key_bytes), |
319 | 0 | std::bind(&IntentAwareIterator::DebugDump, this)); |
320 | 803M | if (!status_.ok()) { |
321 | 0 | return; |
322 | 0 | } |
323 | | |
324 | 803M | const size_t key_size = key_bytes->size(); |
325 | 803M | AppendEncodedDocHt(encoded_read_time_global_limit_, key_bytes); |
326 | 803M | SeekForwardRegular(*key_bytes); |
327 | 803M | key_bytes->Truncate(key_size); |
328 | 803M | if (intent_iter_.Initialized() && status_.ok()123M ) { |
329 | 123M | UpdatePlannedIntentSeekForward( |
330 | 123M | *key_bytes, StrongWriteSuffix(*key_bytes), /* use_suffix_for_prefix= */ false); |
331 | 123M | } |
332 | 803M | } |
333 | | |
334 | | void IntentAwareIterator::UpdatePlannedIntentSeekForward(const Slice& key, |
335 | | const Slice& suffix, |
336 | 143M | bool use_suffix_for_prefix) { |
337 | 143M | if (seek_intent_iter_needed_ != SeekIntentIterNeeded::kNoNeed && |
338 | 143M | seek_key_buffer_.AsSlice().GreaterOrEqual(key, suffix)0 ) { |
339 | 0 | return; |
340 | 0 | } |
341 | 143M | seek_key_buffer_.Clear(); |
342 | 143M | seek_key_buffer_.AppendRawBytes(key); |
343 | 143M | seek_key_buffer_.AppendRawBytes(suffix); |
344 | 143M | if (seek_intent_iter_needed_ == SeekIntentIterNeeded::kNoNeed143M ) { |
345 | 143M | seek_intent_iter_needed_ = SeekIntentIterNeeded::kSeekForward; |
346 | 143M | } |
347 | 143M | seek_key_prefix_ = seek_key_buffer_.AsSlice(); |
348 | 143M | if (!use_suffix_for_prefix) { |
349 | 123M | seek_key_prefix_.remove_suffix(suffix.size()); |
350 | 123M | } |
351 | 143M | } |
352 | | |
353 | | // TODO: If TTL rows are ever supported on subkeys, this may need to change appropriately. |
354 | | // Otherwise, this function might seek past the TTL merge record, but not the original |
355 | | // record for the actual subkey. |
356 | 98.3M | void IntentAwareIterator::SeekPastSubKey(const Slice& key) { |
357 | 18.4E | VLOG(4) << "SeekPastSubKey(" << SubDocKey::DebugSliceToString(key) << ")"; |
358 | 98.3M | if (!status_.ok()) { |
359 | 0 | return; |
360 | 0 | } |
361 | | |
362 | 98.3M | docdb::SeekPastSubKey(key, &iter_); |
363 | 98.3M | skip_future_records_needed_ = true; |
364 | 98.3M | if (intent_iter_.Initialized() && status_.ok()2.29M ) { |
365 | | // Skip all intents for subdoc_key. |
366 | 2.29M | char kSuffix = ValueTypeAsChar::kGreaterThanIntentType; |
367 | 2.29M | UpdatePlannedIntentSeekForward(key, Slice(&kSuffix, 1)); |
368 | 2.29M | } |
369 | 98.3M | } |
370 | | |
371 | 135M | void IntentAwareIterator::SeekOutOfSubDoc(KeyBytes* key_bytes) { |
372 | 18.4E | VLOG(4) << "SeekOutOfSubDoc(" << SubDocKey::DebugSliceToString(*key_bytes) << ")"; |
373 | 135M | if (!status_.ok()) { |
374 | 0 | return; |
375 | 0 | } |
376 | | |
377 | 135M | docdb::SeekOutOfSubKey(key_bytes, &iter_); |
378 | 135M | skip_future_records_needed_ = true; |
379 | 135M | if (intent_iter_.Initialized() && status_.ok()17.8M ) { |
380 | | // See comment for SubDocKey::AdvanceOutOfSubDoc. |
381 | 17.8M | const char kSuffix = ValueTypeAsChar::kMaxByte; |
382 | 17.8M | UpdatePlannedIntentSeekForward(*key_bytes, Slice(&kSuffix, 1)); |
383 | 17.8M | } |
384 | 135M | } |
385 | | |
386 | 133M | void IntentAwareIterator::SeekOutOfSubDoc(const Slice& key) { |
387 | 133M | KeyBytes key_bytes; |
388 | | // Reserve space for key + 1 byte for docdb::SeekOutOfSubKey() above to avoid extra realloc while |
389 | | // appending kMaxByte. |
390 | 133M | key_bytes.Reserve(key.size() + 1); |
391 | 133M | key_bytes.AppendRawBytes(key); |
392 | 133M | SeekOutOfSubDoc(&key_bytes); |
393 | 133M | } |
394 | | |
395 | 1.23G | bool IntentAwareIterator::HasCurrentEntry() { |
396 | 1.23G | return iter_valid_ || resolved_intent_state_ == ResolvedIntentState::kValid139M ; |
397 | 1.23G | } |
398 | | |
399 | 716k | void IntentAwareIterator::SeekToLastDocKey() { |
400 | 716k | iter_.SeekToLast(); |
401 | 716k | SkipFutureRecords(Direction::kBackward); |
402 | 716k | if (intent_iter_.Initialized()) { |
403 | 0 | ResetIntentUpperbound(); |
404 | 0 | intent_iter_.SeekToLast(); |
405 | 0 | SeekToSuitableIntent<Direction::kBackward>(); |
406 | 0 | seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed; |
407 | 0 | skip_future_intents_needed_ = false; |
408 | 0 | } |
409 | 716k | if (HasCurrentEntry()) { |
410 | 714k | SeekToLatestDocKeyInternal(); |
411 | 714k | } |
412 | 716k | } |
413 | | |
414 | | template <class T> |
415 | 634k | void Assign(const T& value, T* out) { |
416 | 634k | if (out) { |
417 | 634k | *out = value; |
418 | 634k | } |
419 | 634k | } void yb::docdb::Assign<yb::Slice>(yb::Slice const&, yb::Slice*) Line | Count | Source | 415 | 317k | void Assign(const T& value, T* out) { | 416 | 317k | if (out) { | 417 | 317k | *out = value; | 418 | 317k | } | 419 | 317k | } |
void yb::docdb::Assign<yb::DocHybridTime>(yb::DocHybridTime const&, yb::DocHybridTime*) Line | Count | Source | 415 | 317k | void Assign(const T& value, T* out) { | 416 | 317k | if (out) { | 417 | 317k | *out = value; | 418 | 317k | } | 419 | 317k | } |
|
420 | | |
421 | | // If we reach a different key, stop seeking. |
422 | | Status IntentAwareIterator::NextFullValue( |
423 | | DocHybridTime* latest_record_ht, |
424 | | Slice* result_value, |
425 | 317k | Slice* final_key) { |
426 | 317k | if (!latest_record_ht || !result_value) |
427 | 0 | return STATUS(Corruption, "The arguments latest_record_ht and " |
428 | 317k | "result_value cannot be null pointers."); |
429 | 317k | RETURN_NOT_OK(status_); |
430 | 317k | Slice v; |
431 | 317k | if (!valid() || !IsMergeRecord(v = value())) { |
432 | 317k | auto key_data = VERIFY_RESULT(FetchKey()); |
433 | 0 | Assign(key_data.key, final_key); |
434 | 317k | Assign(key_data.write_time, latest_record_ht); |
435 | 317k | *result_value = v; |
436 | 317k | return status_; |
437 | 317k | } |
438 | | |
439 | 180 | *latest_record_ht = DocHybridTime::kMin; |
440 | 180 | const auto key_data = VERIFY_RESULT(FetchKey()); |
441 | 0 | auto key = key_data.key; |
442 | 180 | const size_t key_size = key.size(); |
443 | 180 | bool found_record = false; |
444 | | |
445 | 390 | while ((found_record = iter_.Valid()) && // as long as we're pointing to a record |
446 | 390 | (key = iter_.key()).starts_with(key_data.key) && // with the same key we started with |
447 | 390 | key[key_size] == ValueTypeAsChar::kHybridTime && // whose key ends with a HT |
448 | 390 | IsMergeRecord(v = iter_.value())) { // and whose value is a merge record |
449 | 210 | iter_.Next(); // advance the iterator |
450 | 210 | } |
451 | | |
452 | 180 | if (found_record) { |
453 | 180 | *result_value = v; |
454 | 180 | *latest_record_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key)); |
455 | 0 | Assign(key, final_key); |
456 | 180 | } |
457 | | |
458 | 180 | found_record = false; |
459 | 180 | if (intent_iter_.Initialized()) { |
460 | 0 | while ((found_record = IsIntentForTheSameKey(intent_iter_.key(), key_data.key)) && |
461 | 0 | IsMergeRecord(v = intent_iter_.value())) { |
462 | 0 | intent_iter_.Next(); |
463 | 0 | } |
464 | 0 | DocHybridTime doc_ht; |
465 | 0 | if (found_record && !(key = intent_iter_.key()).empty() && |
466 | 0 | (doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key))) >= *latest_record_ht) { |
467 | 0 | *latest_record_ht = doc_ht; |
468 | 0 | *result_value = v; |
469 | 0 | Assign(key, final_key); |
470 | 0 | } |
471 | 0 | } |
472 | | |
473 | 180 | if (*latest_record_ht == DocHybridTime::kMin) { |
474 | 0 | iter_valid_ = false; |
475 | 0 | } |
476 | 180 | return status_; |
477 | 180 | } |
478 | | |
479 | 454k | bool IntentAwareIterator::PreparePrev(const Slice& key) { |
480 | 18.4E | VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key) << ")"; |
481 | | |
482 | 454k | ROCKSDB_SEEK(&iter_, key); |
483 | | |
484 | 454k | if (iter_.Valid()) { |
485 | 451k | iter_.Prev(); |
486 | 451k | } else { |
487 | 2.89k | iter_.SeekToLast(); |
488 | 2.89k | } |
489 | 454k | SkipFutureRecords(Direction::kBackward); |
490 | | |
491 | 454k | if (intent_iter_.Initialized()) { |
492 | 13.0k | ResetIntentUpperbound(); |
493 | 13.0k | ROCKSDB_SEEK(&intent_iter_, GetIntentPrefixForKeyWithoutHt(key)); |
494 | 13.0k | if (intent_iter_.Valid()) { |
495 | 13.0k | intent_iter_.Prev(); |
496 | 13.0k | } else { |
497 | 2 | intent_iter_.SeekToLast(); |
498 | 2 | } |
499 | 13.0k | SeekToSuitableIntent<Direction::kBackward>(); |
500 | 13.0k | seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed; |
501 | 13.0k | skip_future_intents_needed_ = false; |
502 | 13.0k | } |
503 | | |
504 | 454k | return HasCurrentEntry(); |
505 | 454k | } |
506 | | |
507 | 0 | void IntentAwareIterator::PrevSubDocKey(const KeyBytes& key_bytes) { |
508 | 0 | if (PreparePrev(key_bytes)) { |
509 | 0 | SeekToLatestSubDocKeyInternal(); |
510 | 0 | } |
511 | 0 | } |
512 | | |
513 | 0 | void IntentAwareIterator::PrevDocKey(const DocKey& doc_key) { |
514 | 0 | PrevDocKey(doc_key.Encode().AsSlice()); |
515 | 0 | } |
516 | | |
517 | 454k | void IntentAwareIterator::PrevDocKey(const Slice& encoded_doc_key) { |
518 | 454k | if (PreparePrev(encoded_doc_key)) { |
519 | 454k | SeekToLatestDocKeyInternal(); |
520 | 454k | } |
521 | 454k | } |
522 | | |
523 | 1.16M | Slice IntentAwareIterator::LatestSubDocKey() { |
524 | 1.16M | DCHECK(HasCurrentEntry()) |
525 | 31 | << "Expected iter_valid(" << iter_valid_ << ") || resolved_intent_state_(" |
526 | 31 | << resolved_intent_state_ << ") == ResolvedIntentState::kValid"; |
527 | 1.16M | return IsEntryRegular(/* descending */ true) ? iter_.key()1.16M |
528 | 1.16M | : resolved_intent_key_prefix_.AsSlice()3.97k ; |
529 | 1.16M | } |
530 | | |
531 | 0 | void IntentAwareIterator::SeekToLatestSubDocKeyInternal() { |
532 | 0 | auto subdockey_slice = LatestSubDocKey(); |
533 | | |
534 | | // Strip the hybrid time and seek the slice. |
535 | 0 | auto doc_ht = DocHybridTime::DecodeFromEnd(&subdockey_slice); |
536 | 0 | if (!doc_ht.ok()) { |
537 | 0 | status_ = doc_ht.status(); |
538 | 0 | return; |
539 | 0 | } |
540 | 0 | subdockey_slice.remove_suffix(1); |
541 | 0 | Seek(subdockey_slice); |
542 | 0 | } |
543 | | |
544 | 1.16M | void IntentAwareIterator::SeekToLatestDocKeyInternal() { |
545 | 1.16M | auto subdockey_slice = LatestSubDocKey(); |
546 | | |
547 | | // Seek to the first key for row containing found subdockey. |
548 | 1.16M | auto dockey_size = DocKey::EncodedSize(subdockey_slice, DocKeyPart::kWholeDocKey); |
549 | 1.16M | if (!dockey_size.ok()) { |
550 | 0 | status_ = dockey_size.status(); |
551 | 0 | return; |
552 | 0 | } |
553 | 1.16M | Seek(Slice(subdockey_slice.data(), *dockey_size)); |
554 | 1.16M | } |
555 | | |
556 | 1.23G | void IntentAwareIterator::SeekIntentIterIfNeeded() { |
557 | 1.23G | if (seek_intent_iter_needed_ == SeekIntentIterNeeded::kNoNeed || !status_.ok()149M ) { |
558 | 1.08G | return; |
559 | 1.08G | } |
560 | 149M | status_ = SetIntentUpperbound(); |
561 | 149M | if (!status_.ok()) { |
562 | 0 | return; |
563 | 0 | } |
564 | 149M | switch (seek_intent_iter_needed_) { |
565 | 0 | case SeekIntentIterNeeded::kNoNeed: |
566 | 0 | break; |
567 | 7.87M | case SeekIntentIterNeeded::kSeek: |
568 | 7.87M | VLOG(4) << __func__ << ", seek: " << SubDocKey::DebugSliceToString(seek_key_buffer_)242 ; |
569 | 7.87M | ROCKSDB_SEEK(&intent_iter_, seek_key_buffer_); |
570 | 7.87M | SeekToSuitableIntent<Direction::kForward>(); |
571 | 7.87M | seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed; |
572 | 7.87M | return; |
573 | 141M | case SeekIntentIterNeeded::kSeekForward: |
574 | 141M | SeekForwardToSuitableIntent(); |
575 | 141M | seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed; |
576 | 141M | return; |
577 | 149M | } |
578 | 0 | FATAL_INVALID_ENUM_VALUE(SeekIntentIterNeeded, seek_intent_iter_needed_); |
579 | 0 | } |
580 | | |
581 | 1.23G | bool IntentAwareIterator::valid() { |
582 | 1.23G | if (skip_future_records_needed_) { |
583 | 1.14G | SkipFutureRecords(Direction::kForward); |
584 | 1.14G | } |
585 | 1.23G | SeekIntentIterIfNeeded(); |
586 | 1.23G | if (skip_future_intents_needed_) { |
587 | 1.03G | SkipFutureIntents(); |
588 | 1.03G | } |
589 | 1.23G | return !status_.ok()1.23G || HasCurrentEntry(); |
590 | 1.23G | } |
591 | | |
592 | 1.60G | bool IntentAwareIterator::IsEntryRegular(bool descending) { |
593 | 1.60G | if (PREDICT_FALSE(!iter_valid_)) { |
594 | 8.87M | return false; |
595 | 8.87M | } |
596 | 1.60G | if (resolved_intent_state_ == ResolvedIntentState::kValid) { |
597 | 1.44M | return (iter_.key().compare(resolved_intent_sub_doc_key_encoded_) < 0) != descending; |
598 | 1.44M | } |
599 | 1.59G | return true; |
600 | 1.60G | } |
601 | | |
602 | 849M | Result<FetchKeyResult> IntentAwareIterator::FetchKey() { |
603 | 849M | RETURN_NOT_OK(status_); |
604 | 849M | FetchKeyResult result; |
605 | 849M | if (IsEntryRegular()) { |
606 | 843M | result.key = iter_.key(); |
607 | 843M | result.write_time = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&result.key)); |
608 | 18.4E | DCHECK(result.key.ends_with(ValueTypeAsChar::kHybridTime)) << result.key.ToDebugString(); |
609 | 843M | result.key.remove_suffix(1); |
610 | 843M | result.same_transaction = false; |
611 | 843M | max_seen_ht_.MakeAtLeast(result.write_time.hybrid_time()); |
612 | 843M | } else { |
613 | 6.61M | DCHECK_EQ(ResolvedIntentState::kValid, resolved_intent_state_); |
614 | 6.61M | result.key = resolved_intent_key_prefix_.AsSlice(); |
615 | 6.61M | result.write_time = GetIntentDocHybridTime(); |
616 | 6.61M | result.same_transaction = ResolvedIntentFromSameTransaction(); |
617 | 6.61M | max_seen_ht_.MakeAtLeast(resolved_intent_txn_dht_.hybrid_time()); |
618 | 6.61M | } |
619 | 849M | VLOG(4) << "Fetched key " << SubDocKey::DebugSliceToString(result.key) |
620 | 97.2k | << ", regular: " << IsEntryRegular() |
621 | 97.2k | << ", with time: " << result.write_time |
622 | 97.2k | << ", while read bounds are: " << read_time_; |
623 | | |
624 | 849M | YB_TRANSACTION_DUMP( |
625 | 849M | Read, txn_op_context_ ? txn_op_context_.txn_status_manager->tablet_id() : TabletId(), |
626 | 849M | txn_op_context_ ? txn_op_context_.transaction_id : TransactionId::Nil(), |
627 | 849M | read_time_, result.write_time, result.same_transaction, |
628 | 849M | result.key.size(), result.key, value().size(), value()); |
629 | | |
630 | 849M | return result; |
631 | 849M | } |
632 | | |
633 | 763M | Slice IntentAwareIterator::value() { |
634 | 763M | if (IsEntryRegular()) { |
635 | 18.4E | VLOG(4) << "IntentAwareIterator::value() returning iter_.value(): " |
636 | 18.4E | << iter_.value().ToDebugHexString() << " or " << FormatSliceAsStr(iter_.value()); |
637 | 758M | return iter_.value(); |
638 | 758M | } else { |
639 | 4.37M | DCHECK_EQ(ResolvedIntentState::kValid, resolved_intent_state_); |
640 | 18.4E | VLOG(4) << "IntentAwareIterator::value() returning resolved_intent_value_: " |
641 | 18.4E | << resolved_intent_value_.AsSlice().ToDebugHexString(); |
642 | 4.37M | return resolved_intent_value_; |
643 | 4.37M | } |
644 | 763M | } |
645 | | |
646 | 886M | void IntentAwareIterator::SeekForwardRegular(const Slice& slice) { |
647 | 18.4E | VLOG(4) << "SeekForwardRegular(" << SubDocKey::DebugSliceToString(slice) << ")"; |
648 | 886M | docdb::SeekForward(slice, &iter_); |
649 | 886M | skip_future_records_needed_ = true; |
650 | 886M | } |
651 | | |
652 | 1.04G | bool IntentAwareIterator::SatisfyBounds(const Slice& slice) { |
653 | 1.04G | return upperbound_.empty() || slice.compare(upperbound_) <= 0911M ; |
654 | 1.04G | } |
655 | | |
656 | 6.14M | void IntentAwareIterator::ProcessIntent() { |
657 | 6.14M | auto decode_result = DecodeStrongWriteIntent( |
658 | 6.14M | read_time_.global_limit, txn_op_context_, &intent_iter_, &transaction_status_cache_); |
659 | 6.14M | if (!decode_result.ok()) { |
660 | 0 | status_ = decode_result.status(); |
661 | 0 | return; |
662 | 0 | } |
663 | 6.14M | VLOG(4) << "Intent decode: " << DebugIntentKeyToString(intent_iter_.key()) |
664 | 3 | << " => " << intent_iter_.value().ToDebugHexString() << ", result: " << *decode_result; |
665 | 6.14M | DOCDB_DEBUG_LOG( |
666 | 6.14M | "resolved_intent_txn_dht_: $0 value_time: $1 read_time: $2", |
667 | 6.14M | resolved_intent_txn_dht_.ToString(), |
668 | 6.14M | decode_result->value_time.ToString(), |
669 | 6.14M | read_time_.ToString()); |
670 | 6.14M | auto resolved_intent_time = decode_result->same_transaction ? intent_dht_from_same_txn_4.75M |
671 | 6.14M | : resolved_intent_txn_dht_1.38M ; |
672 | | // If we already resolved intent that is newer that this one, we should ignore current |
673 | | // intent because we are interested in the most recent intent only. |
674 | 6.14M | if (decode_result->value_time <= resolved_intent_time) { |
675 | 1.20M | return; |
676 | 1.20M | } |
677 | | |
678 | | // Ignore intent past read limit. |
679 | 4.93M | if (decode_result->value_time.hybrid_time() > decode_result->MaxAllowedValueTime(read_time_)) { |
680 | 1.55k | return; |
681 | 1.55k | } |
682 | | |
683 | 4.93M | if (resolved_intent_state_ == ResolvedIntentState::kNoIntent) { |
684 | 4.92M | resolved_intent_key_prefix_.Reset(decode_result->intent_prefix); |
685 | 4.92M | auto prefix = CurrentPrefix(); |
686 | 4.92M | if (!decode_result->intent_prefix.starts_with(prefix)) { |
687 | 0 | resolved_intent_state_ = ResolvedIntentState::kInvalidPrefix; |
688 | 4.92M | } else if (!SatisfyBounds(decode_result->intent_prefix)) { |
689 | 0 | resolved_intent_state_ = ResolvedIntentState::kNoIntent; |
690 | 4.92M | } else { |
691 | 4.92M | resolved_intent_state_ = ResolvedIntentState::kValid; |
692 | 4.92M | } |
693 | 4.92M | } |
694 | 4.93M | if (decode_result->same_transaction) { |
695 | 4.34M | intent_dht_from_same_txn_ = decode_result->value_time; |
696 | | // We set resolved_intent_txn_dht_ to maximum possible time (time higher than read_time_.read |
697 | | // will cause read restart or will be ignored if higher than read_time_.global_limit) in |
698 | | // order to ignore intents/values from other transactions. But we save origin intent time into |
699 | | // intent_dht_from_same_txn_, so we can compare time of intents for the same key from the same |
700 | | // transaction and select the latest one. |
701 | 4.34M | resolved_intent_txn_dht_ = DocHybridTime(read_time_.read, kMaxWriteId); |
702 | 4.34M | } else { |
703 | 595k | resolved_intent_txn_dht_ = decode_result->value_time; |
704 | 595k | } |
705 | 4.93M | resolved_intent_value_.Reset(decode_result->intent_value); |
706 | 4.93M | } |
707 | | |
708 | 4.92M | void IntentAwareIterator::UpdateResolvedIntentSubDocKeyEncoded() { |
709 | 4.92M | resolved_intent_sub_doc_key_encoded_.Reset(resolved_intent_key_prefix_.AsSlice()); |
710 | 4.92M | resolved_intent_sub_doc_key_encoded_.AppendValueType(ValueType::kHybridTime); |
711 | 4.92M | resolved_intent_sub_doc_key_encoded_.AppendHybridTime(resolved_intent_txn_dht_); |
712 | 4.92M | VLOG(4) << "Resolved intent SubDocKey: " |
713 | 5 | << DebugDumpKeyToStr(resolved_intent_sub_doc_key_encoded_); |
714 | 4.92M | } |
715 | | |
716 | 158M | void IntentAwareIterator::SeekForwardToSuitableIntent() { |
717 | 18.4E | VLOG(4) << __func__ << "(" << DebugDumpKeyToStr(seek_key_buffer_) << ")"; |
718 | | |
719 | 158M | DOCDB_DEBUG_SCOPE_LOG(seek_key_buffer_.ToString(), |
720 | 0 | std::bind(&IntentAwareIterator::DebugDump, this)); |
721 | 158M | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent && |
722 | 158M | resolved_intent_key_prefix_.CompareTo(seek_key_prefix_) >= 06.32M ) { |
723 | 18.4E | VLOG(4) << __func__ << ", has suitable " << AsString(resolved_intent_state_) << " intent: " |
724 | 18.4E | << DebugDumpKeyToStr(resolved_intent_key_prefix_); |
725 | 1.54M | return; |
726 | 1.54M | } |
727 | | |
728 | 157M | if (VLOG_IS_ON(4)) { |
729 | 0 | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) { |
730 | 0 | VLOG(4) << __func__ << ", has NOT suitable " << AsString(resolved_intent_state_) |
731 | 0 | << " intent: " << DebugDumpKeyToStr(resolved_intent_key_prefix_); |
732 | 0 | } |
733 | |
|
734 | 0 | if (intent_iter_.Valid()) { |
735 | 0 | VLOG(4) << __func__ << ", current position: " << DebugDumpKeyToStr(intent_iter_.key()); |
736 | 0 | } else { |
737 | 0 | VLOG(4) << __func__ << ", iterator invalid"; |
738 | 0 | } |
739 | 0 | } |
740 | | |
741 | 157M | docdb::SeekForward(seek_key_buffer_.AsSlice(), &intent_iter_); |
742 | 157M | SeekToSuitableIntent<Direction::kForward>(); |
743 | 157M | } |
744 | | |
745 | | template<Direction direction> |
746 | 300M | void IntentAwareIterator::SeekToSuitableIntent() { |
747 | 300M | DOCDB_DEBUG_SCOPE_LOG(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));0 |
748 | 300M | resolved_intent_state_ = ResolvedIntentState::kNoIntent; |
749 | 300M | resolved_intent_txn_dht_ = DocHybridTime::kMin; |
750 | 300M | intent_dht_from_same_txn_ = DocHybridTime::kMin; |
751 | 300M | auto prefix = CurrentPrefix(); |
752 | | |
753 | | // Find latest suitable intent for the first SubDocKey having suitable intents. |
754 | 307M | while (intent_iter_.Valid()) { |
755 | 14.7M | auto intent_key = intent_iter_.key(); |
756 | 14.7M | if (intent_key[0] == ValueTypeAsChar::kTransactionId) { |
757 | | // If the intent iterator ever enters the transaction metadata and reverse index region, skip |
758 | | // past it. |
759 | 538k | switch (direction) { |
760 | 538k | case Direction::kForward: { |
761 | 538k | static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1}; |
762 | 538k | static const Slice kAfterTxnRegion(kAfterTransactionId); |
763 | 538k | intent_iter_.Seek(kAfterTxnRegion); |
764 | 538k | break; |
765 | 0 | } |
766 | 2 | case Direction::kBackward: |
767 | 2 | intent_upperbound_keybytes_.Clear(); |
768 | 2 | intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId); |
769 | 2 | intent_upperbound_ = intent_upperbound_keybytes_.AsSlice(); |
770 | 2 | intent_iter_.SeekToLast(); |
771 | 2 | break; |
772 | 538k | } |
773 | 538k | continue; |
774 | 538k | } |
775 | 14.2M | VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key) |
776 | 251 | << ", resolved state: " << yb::ToString(resolved_intent_state_); |
777 | 14.2M | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent && |
778 | | // Only scan intents for the first SubDocKey having suitable intents. |
779 | 14.2M | !IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)4.61M ) { |
780 | 4.27M | break; |
781 | 4.27M | } |
782 | 9.97M | if (!intent_key.starts_with(prefix) || !SatisfyBounds(intent_key)8.74M ) { |
783 | 3.83M | break; |
784 | 3.83M | } |
785 | 6.14M | ProcessIntent(); |
786 | 6.14M | if (!status_.ok()) { |
787 | 0 | return; |
788 | 0 | } |
789 | 6.14M | switch (direction) { |
790 | 5.97M | case Direction::kForward: |
791 | 5.97M | intent_iter_.Next(); |
792 | 5.97M | break; |
793 | 170k | case Direction::kBackward: |
794 | 170k | intent_iter_.Prev(); |
795 | 170k | break; |
796 | 6.14M | } |
797 | 6.14M | } |
798 | 300M | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) { |
799 | 4.92M | UpdateResolvedIntentSubDocKeyEncoded(); |
800 | 4.92M | } |
801 | 300M | } void yb::docdb::IntentAwareIterator::SeekToSuitableIntent<(yb::docdb::Direction)1>() Line | Count | Source | 746 | 13.0k | void IntentAwareIterator::SeekToSuitableIntent() { | 747 | 13.0k | DOCDB_DEBUG_SCOPE_LOG(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));0 | 748 | 13.0k | resolved_intent_state_ = ResolvedIntentState::kNoIntent; | 749 | 13.0k | resolved_intent_txn_dht_ = DocHybridTime::kMin; | 750 | 13.0k | intent_dht_from_same_txn_ = DocHybridTime::kMin; | 751 | 13.0k | auto prefix = CurrentPrefix(); | 752 | | | 753 | | // Find latest suitable intent for the first SubDocKey having suitable intents. | 754 | 183k | while (intent_iter_.Valid()) { | 755 | 182k | auto intent_key = intent_iter_.key(); | 756 | 182k | if (intent_key[0] == ValueTypeAsChar::kTransactionId) { | 757 | | // If the intent iterator ever enters the transaction metadata and reverse index region, skip | 758 | | // past it. | 759 | 2 | switch (direction) { | 760 | 0 | case Direction::kForward: { | 761 | 0 | static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1}; | 762 | 0 | static const Slice kAfterTxnRegion(kAfterTransactionId); | 763 | 0 | intent_iter_.Seek(kAfterTxnRegion); | 764 | 0 | break; | 765 | 0 | } | 766 | 2 | case Direction::kBackward: | 767 | 2 | intent_upperbound_keybytes_.Clear(); | 768 | 2 | intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId); | 769 | 2 | intent_upperbound_ = intent_upperbound_keybytes_.AsSlice(); | 770 | 2 | intent_iter_.SeekToLast(); | 771 | 2 | break; | 772 | 2 | } | 773 | 2 | continue; | 774 | 2 | } | 775 | 182k | VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key) | 776 | 0 | << ", resolved state: " << yb::ToString(resolved_intent_state_); | 777 | 182k | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent && | 778 | | // Only scan intents for the first SubDocKey having suitable intents. | 779 | 182k | !IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)30.3k ) { | 780 | 12.0k | break; | 781 | 12.0k | } | 782 | 170k | if (!intent_key.starts_with(prefix) || !SatisfyBounds(intent_key)) { | 783 | 0 | break; | 784 | 0 | } | 785 | 170k | ProcessIntent(); | 786 | 170k | if (!status_.ok()) { | 787 | 0 | return; | 788 | 0 | } | 789 | 170k | switch (direction) { | 790 | 0 | case Direction::kForward: | 791 | 0 | intent_iter_.Next(); | 792 | 0 | break; | 793 | 170k | case Direction::kBackward: | 794 | 170k | intent_iter_.Prev(); | 795 | 170k | break; | 796 | 170k | } | 797 | 170k | } | 798 | 13.0k | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) { | 799 | 12.0k | UpdateResolvedIntentSubDocKeyEncoded(); | 800 | 12.0k | } | 801 | 13.0k | } |
void yb::docdb::IntentAwareIterator::SeekToSuitableIntent<(yb::docdb::Direction)0>() Line | Count | Source | 746 | 300M | void IntentAwareIterator::SeekToSuitableIntent() { | 747 | 300M | DOCDB_DEBUG_SCOPE_LOG(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));0 | 748 | 300M | resolved_intent_state_ = ResolvedIntentState::kNoIntent; | 749 | 300M | resolved_intent_txn_dht_ = DocHybridTime::kMin; | 750 | 300M | intent_dht_from_same_txn_ = DocHybridTime::kMin; | 751 | 300M | auto prefix = CurrentPrefix(); | 752 | | | 753 | | // Find latest suitable intent for the first SubDocKey having suitable intents. | 754 | 306M | while (intent_iter_.Valid()) { | 755 | 14.6M | auto intent_key = intent_iter_.key(); | 756 | 14.6M | if (intent_key[0] == ValueTypeAsChar::kTransactionId) { | 757 | | // If the intent iterator ever enters the transaction metadata and reverse index region, skip | 758 | | // past it. | 759 | 538k | switch (direction) { | 760 | 538k | case Direction::kForward: { | 761 | 538k | static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1}; | 762 | 538k | static const Slice kAfterTxnRegion(kAfterTransactionId); | 763 | 538k | intent_iter_.Seek(kAfterTxnRegion); | 764 | 538k | break; | 765 | 0 | } | 766 | 0 | case Direction::kBackward: | 767 | 0 | intent_upperbound_keybytes_.Clear(); | 768 | 0 | intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId); | 769 | 0 | intent_upperbound_ = intent_upperbound_keybytes_.AsSlice(); | 770 | 0 | intent_iter_.SeekToLast(); | 771 | 0 | break; | 772 | 538k | } | 773 | 538k | continue; | 774 | 538k | } | 775 | 14.0M | VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key) | 776 | 251 | << ", resolved state: " << yb::ToString(resolved_intent_state_); | 777 | 14.0M | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent && | 778 | | // Only scan intents for the first SubDocKey having suitable intents. | 779 | 14.0M | !IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)4.58M ) { | 780 | 4.26M | break; | 781 | 4.26M | } | 782 | 9.80M | if (!intent_key.starts_with(prefix) || !SatisfyBounds(intent_key)8.57M ) { | 783 | 3.83M | break; | 784 | 3.83M | } | 785 | 5.97M | ProcessIntent(); | 786 | 5.97M | if (!status_.ok()) { | 787 | 0 | return; | 788 | 0 | } | 789 | 5.97M | switch (direction) { | 790 | 5.97M | case Direction::kForward: | 791 | 5.97M | intent_iter_.Next(); | 792 | 5.97M | break; | 793 | 0 | case Direction::kBackward: | 794 | 0 | intent_iter_.Prev(); | 795 | 0 | break; | 796 | 5.97M | } | 797 | 5.97M | } | 798 | 300M | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) { | 799 | 4.91M | UpdateResolvedIntentSubDocKeyEncoded(); | 800 | 4.91M | } | 801 | 300M | } |
|
802 | | |
803 | 0 | void IntentAwareIterator::DebugDump() { |
804 | 0 | bool is_valid = valid(); |
805 | 0 | LOG(INFO) << ">> IntentAwareIterator dump"; |
806 | 0 | LOG(INFO) << "iter_.Valid(): " << iter_.Valid(); |
807 | 0 | if (iter_.Valid()) { |
808 | 0 | LOG(INFO) << "iter_.key(): " << DebugDumpKeyToStr(iter_.key()); |
809 | 0 | } |
810 | 0 | if (intent_iter_.Initialized()) { |
811 | 0 | LOG(INFO) << "intent_iter_.Valid(): " << intent_iter_.Valid(); |
812 | 0 | if (intent_iter_.Valid()) { |
813 | 0 | LOG(INFO) << "intent_iter_.key(): " << intent_iter_.key().ToDebugHexString(); |
814 | 0 | } |
815 | 0 | } |
816 | 0 | LOG(INFO) << "resolved_intent_state_: " << yb::ToString(resolved_intent_state_); |
817 | 0 | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) { |
818 | 0 | LOG(INFO) << "resolved_intent_sub_doc_key_encoded_: " |
819 | 0 | << DebugDumpKeyToStr(resolved_intent_sub_doc_key_encoded_); |
820 | 0 | } |
821 | 0 | LOG(INFO) << "valid(): " << is_valid; |
822 | 0 | if (valid()) { |
823 | 0 | auto key_data = FetchKey(); |
824 | 0 | if (key_data.ok()) { |
825 | 0 | LOG(INFO) << "key(): " << DebugDumpKeyToStr(key_data->key) |
826 | 0 | << ", doc_ht: " << key_data->write_time; |
827 | 0 | } else { |
828 | 0 | LOG(INFO) << "key(): fetch failed: " << key_data.status(); |
829 | 0 | } |
830 | 0 | } |
831 | 0 | LOG(INFO) << "<< IntentAwareIterator dump"; |
832 | 0 | } |
833 | | |
834 | | Result<DocHybridTime> |
835 | 17.1M | IntentAwareIterator::FindMatchingIntentRecordDocHybridTime(const Slice& key_without_ht) { |
836 | 17.1M | VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key_without_ht) << ")"1.58k ; |
837 | 17.1M | GetIntentPrefixForKeyWithoutHt(key_without_ht, &seek_key_buffer_); |
838 | 17.1M | seek_key_prefix_ = seek_key_buffer_.AsSlice(); |
839 | | |
840 | 17.1M | SeekForwardToSuitableIntent(); |
841 | 17.1M | RETURN_NOT_OK(status_); |
842 | | |
843 | 17.1M | if (resolved_intent_state_ != ResolvedIntentState::kValid) { |
844 | 16.6M | return DocHybridTime::kInvalid; |
845 | 16.6M | } |
846 | | |
847 | 521k | if (resolved_intent_key_prefix_.CompareTo(seek_key_buffer_) == 0) { |
848 | 19.2k | max_seen_ht_.MakeAtLeast(resolved_intent_txn_dht_.hybrid_time()); |
849 | 19.2k | return GetIntentDocHybridTime(); |
850 | 19.2k | } |
851 | 502k | return DocHybridTime::kInvalid; |
852 | 521k | } |
853 | | |
854 | | Result<DocHybridTime> |
855 | | IntentAwareIterator::GetMatchingRegularRecordDocHybridTime( |
856 | 83.3M | const Slice& key_without_ht) { |
857 | 83.3M | size_t other_encoded_ht_size = 0; |
858 | 83.3M | RETURN_NOT_OK(CheckHybridTimeSizeAndValueType(iter_.key(), &other_encoded_ht_size)); |
859 | 83.3M | Slice iter_key_without_ht = iter_.key(); |
860 | 83.3M | iter_key_without_ht.remove_suffix(1 + other_encoded_ht_size); |
861 | 83.3M | if (key_without_ht == iter_key_without_ht) { |
862 | 1.44M | DocHybridTime doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(iter_.key())); |
863 | 0 | max_seen_ht_.MakeAtLeast(doc_ht.hybrid_time()); |
864 | 1.44M | return doc_ht; |
865 | 1.44M | } |
866 | 81.9M | return DocHybridTime::kInvalid; |
867 | 83.3M | } |
868 | | |
869 | | Result<HybridTime> IntentAwareIterator::FindOldestRecord( |
870 | 2.00k | const Slice& key_without_ht, HybridTime min_hybrid_time) { |
871 | 2.00k | VLOG(4) << "FindOldestRecord(" |
872 | 0 | << SubDocKey::DebugSliceToString(key_without_ht) << " = " |
873 | 0 | << key_without_ht.ToDebugHexString() << " , " << min_hybrid_time |
874 | 0 | << ")"; |
875 | 2.00k | #define DOCDB_DEBUG |
876 | 2.00k | DOCDB_DEBUG_SCOPE_LOG(SubDocKey::DebugSliceToString(key_without_ht) + ", " + |
877 | 0 | yb::ToString(min_hybrid_time), |
878 | 0 | std::bind(&IntentAwareIterator::DebugDump, this)); |
879 | 2.00k | #undef DOCDB_DEBUG |
880 | 2.00k | DCHECK(!DebugHasHybridTime(key_without_ht)); |
881 | | |
882 | 2.00k | RETURN_NOT_OK(status_); |
883 | 2.00k | if (!valid()) { |
884 | 0 | VLOG(4) << "Returning kInvalid"; |
885 | 0 | return HybridTime::kInvalid; |
886 | 0 | } |
887 | | |
888 | 2.00k | HybridTime result; |
889 | 2.00k | if (intent_iter_.Initialized()) { |
890 | 565 | auto intent_dht = VERIFY_RESULT(FindMatchingIntentRecordDocHybridTime(key_without_ht)); |
891 | 0 | VLOG(4) << "Looking for Intent Record found ? = " |
892 | 0 | << (intent_dht != DocHybridTime::kInvalid); |
893 | 565 | if (intent_dht != DocHybridTime::kInvalid && |
894 | 565 | intent_dht.hybrid_time() > min_hybrid_time9 ) { |
895 | 5 | result = intent_dht.hybrid_time(); |
896 | 5 | VLOG(4) << " oldest_record_ht is now " << result0 ; |
897 | 5 | } |
898 | 1.43k | } else { |
899 | 1.43k | VLOG(4) << "intent_iter_ not Initialized"1 ; |
900 | 1.43k | } |
901 | | |
902 | 2.00k | seek_key_buffer_.Reserve(key_without_ht.size() + |
903 | 2.00k | kMaxBytesPerEncodedHybridTime); |
904 | 2.00k | seek_key_buffer_.Reset(key_without_ht); |
905 | 2.00k | seek_key_buffer_.AppendValueType(ValueType::kHybridTime); |
906 | 2.00k | seek_key_buffer_.AppendHybridTime( |
907 | 2.00k | DocHybridTime(min_hybrid_time, kMaxWriteId)); |
908 | 2.00k | SeekForwardRegular(seek_key_buffer_); |
909 | 2.00k | RETURN_NOT_OK(status_); |
910 | 2.00k | if (iter_.Valid()) { |
911 | 1.99k | iter_.Prev(); |
912 | 1.99k | } else { |
913 | 6 | iter_.SeekToLast(); |
914 | 6 | } |
915 | 2.00k | SkipFutureRecords(Direction::kForward); |
916 | | |
917 | 2.00k | if (iter_valid_) { |
918 | 1.86k | DocHybridTime regular_dht = |
919 | 1.86k | VERIFY_RESULT(GetMatchingRegularRecordDocHybridTime(key_without_ht)); |
920 | 0 | VLOG(4) << "Looking for Matching Regular Record found = " << regular_dht; |
921 | 1.86k | if (regular_dht != DocHybridTime::kInvalid && |
922 | 1.86k | regular_dht.hybrid_time() > min_hybrid_time38 ) { |
923 | 38 | result.MakeAtMost(regular_dht.hybrid_time()); |
924 | 38 | } |
925 | 1.86k | } else { |
926 | 18.4E | VLOG(4) << "iter_valid_ is false"; |
927 | 141 | } |
928 | 18.4E | VLOG(4) << "Returning " << result; |
929 | 2.00k | return result; |
930 | 2.00k | } |
931 | | |
932 | | Status IntentAwareIterator::FindLatestRecord( |
933 | | const Slice& key_without_ht, |
934 | | DocHybridTime* latest_record_ht, |
935 | 84.6M | Slice* result_value) { |
936 | 84.6M | if (!latest_record_ht) { |
937 | 0 | return STATUS(Corruption, "latest_record_ht should not be a null pointer"); |
938 | 0 | } |
939 | 18.4E | VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key_without_ht) << ", " |
940 | 18.4E | << *latest_record_ht << ")"; |
941 | 84.6M | DOCDB_DEBUG_SCOPE_LOG( |
942 | 0 | SubDocKey::DebugSliceToString(key_without_ht) + ", " + yb::ToString(latest_record_ht) + ", " |
943 | 0 | + yb::ToString(result_value), |
944 | 0 | std::bind(&IntentAwareIterator::DebugDump, this)); |
945 | 18.4E | DCHECK(!DebugHasHybridTime(key_without_ht)) << SubDocKey::DebugSliceToString(key_without_ht); |
946 | | |
947 | 84.6M | RETURN_NOT_OK(status_); |
948 | 84.6M | if (!valid()) { |
949 | 1.05M | return Status::OK(); |
950 | 1.05M | } |
951 | | |
952 | 83.5M | bool found_later_intent_result = false; |
953 | 83.5M | if (intent_iter_.Initialized()) { |
954 | 17.1M | DocHybridTime dht = VERIFY_RESULT(FindMatchingIntentRecordDocHybridTime(key_without_ht)); |
955 | 17.1M | if (dht != DocHybridTime::kInvalid && dht > *latest_record_ht19.2k ) { |
956 | 19.2k | *latest_record_ht = dht; |
957 | 19.2k | found_later_intent_result = true; |
958 | 19.2k | } |
959 | 17.1M | } |
960 | | |
961 | 83.5M | seek_key_buffer_.Reserve(key_without_ht.size() + encoded_read_time_global_limit_.size() + 1); |
962 | 83.5M | seek_key_buffer_.Reset(key_without_ht); |
963 | 83.5M | AppendEncodedDocHt(encoded_read_time_global_limit_, &seek_key_buffer_); |
964 | | |
965 | 83.5M | SeekForwardRegular(seek_key_buffer_); |
966 | 83.5M | RETURN_NOT_OK(status_); |
967 | | // After SeekForwardRegular(), we need to call valid() to skip future records and see if the |
968 | | // current key still matches the pushed prefix if any. If it does not, we are done. |
969 | 83.5M | if (!valid()) { |
970 | 65 | return Status::OK(); |
971 | 65 | } |
972 | | |
973 | 83.5M | bool found_later_regular_result = false; |
974 | 83.5M | if (iter_valid_) { |
975 | 83.3M | DocHybridTime dht = VERIFY_RESULT(GetMatchingRegularRecordDocHybridTime(key_without_ht)); |
976 | 83.3M | if (dht != DocHybridTime::kInvalid && dht > *latest_record_ht1.44M ) { |
977 | 1.43M | *latest_record_ht = dht; |
978 | 1.43M | found_later_regular_result = true; |
979 | 1.43M | } |
980 | 83.3M | } |
981 | | |
982 | 83.5M | if (83.5M result_value83.5M ) { |
983 | 83.5M | if (found_later_regular_result) { |
984 | 1.43M | *result_value = iter_.value(); |
985 | 82.1M | } else if (found_later_intent_result) { |
986 | 19.2k | *result_value = resolved_intent_value_; |
987 | 19.2k | } |
988 | 83.5M | } |
989 | 83.5M | return Status::OK(); |
990 | 83.5M | } |
991 | | |
992 | 1.54G | void IntentAwareIterator::PushPrefix(const Slice& prefix) { |
993 | 18.4E | VLOG(4) << "PushPrefix: " << SubDocKey::DebugSliceToString(prefix); |
994 | 1.54G | prefix_stack_.push_back(prefix); |
995 | 1.54G | skip_future_records_needed_ = true; |
996 | 1.54G | skip_future_intents_needed_ = true; |
997 | 1.54G | } |
998 | | |
999 | 1.55G | void IntentAwareIterator::PopPrefix() { |
1000 | 1.55G | prefix_stack_.pop_back(); |
1001 | 1.55G | skip_future_records_needed_ = true; |
1002 | 1.55G | skip_future_intents_needed_ = true; |
1003 | 18.4E | VLOG(4) << "PopPrefix: " |
1004 | 18.4E | << (prefix_stack_.empty() ? std::string()1 |
1005 | 18.4E | : SubDocKey::DebugSliceToString(prefix_stack_.back())18.4E ); |
1006 | 1.55G | } |
1007 | | |
1008 | 1.59G | Slice IntentAwareIterator::CurrentPrefix() const { |
1009 | 1.59G | return prefix_stack_.empty() ? Slice()164M : prefix_stack_.back()1.42G ; |
1010 | 1.59G | } |
1011 | | |
1012 | 1.14G | void IntentAwareIterator::SkipFutureRecords(const Direction direction) { |
1013 | 1.14G | skip_future_records_needed_ = false; |
1014 | 1.14G | if (!status_.ok()) { |
1015 | 0 | return; |
1016 | 0 | } |
1017 | 1.14G | auto prefix = CurrentPrefix(); |
1018 | 1.15G | while (iter_.Valid()1.14G ) { |
1019 | 1.15G | if (!iter_.key().starts_with(prefix)) { |
1020 | 126M | VLOG(4) << "Unmatched prefix: " << SubDocKey::DebugSliceToString(iter_.key()) |
1021 | 23.9k | << ", prefix: " << SubDocKey::DebugSliceToString(prefix); |
1022 | 126M | iter_valid_ = false; |
1023 | 126M | return; |
1024 | 126M | } |
1025 | 1.02G | if (!SatisfyBounds(iter_.key())) { |
1026 | 18.4E | VLOG(4) << "Out of bounds: " << SubDocKey::DebugSliceToString(iter_.key()) |
1027 | 18.4E | << ", upperbound: " << SubDocKey::DebugSliceToString(upperbound_); |
1028 | 8.51M | iter_valid_ = false; |
1029 | 8.51M | return; |
1030 | 8.51M | } |
1031 | 1.01G | Slice encoded_doc_ht = iter_.key(); |
1032 | 1.01G | if (encoded_doc_ht.TryConsumeByte(ValueTypeAsChar::kTransactionApplyState)) { |
1033 | 0 | if (!NextRegular(direction)) { |
1034 | 0 | return; |
1035 | 0 | } |
1036 | 0 | continue; |
1037 | 0 | } |
1038 | 1.01G | size_t doc_ht_size = 0; |
1039 | 1.01G | auto decode_status = DocHybridTime::CheckAndGetEncodedSize(encoded_doc_ht, &doc_ht_size); |
1040 | 1.01G | if (!decode_status.ok()) { |
1041 | 0 | LOG(ERROR) << "Decode doc ht from key failed: " << decode_status |
1042 | 0 | << ", key: " << iter_.key().ToDebugHexString(); |
1043 | 0 | status_ = std::move(decode_status); |
1044 | 0 | return; |
1045 | 0 | } |
1046 | 1.01G | encoded_doc_ht.remove_prefix(encoded_doc_ht.size() - doc_ht_size); |
1047 | 1.01G | auto value = iter_.value(); |
1048 | 1.01G | auto value_type = DecodeValueType(value); |
1049 | 18.4E | VLOG(4) << "Checking for skip, type " << value_type << ", encoded_doc_ht: " |
1050 | 18.4E | << DocHybridTime::DebugSliceToString(encoded_doc_ht) |
1051 | 18.4E | << " value: " << value.ToDebugHexString(); |
1052 | 1.01G | if (value_type == ValueType::kHybridTime) { |
1053 | | // Value came from a transaction, we could try to filter it by original intent time. |
1054 | 199M | Slice encoded_intent_doc_ht = value; |
1055 | 199M | encoded_intent_doc_ht.consume_byte(); |
1056 | | // The logic here replicates part of the logic in |
1057 | | // DecodeStrongWriteIntentResult:: MaxAllowedValueTime for intents that have been committed |
1058 | | // and applied to regular RocksDB only. Note that here we are comparing encoded hybrid times, |
1059 | | // so comparisons are reversed vs. the un-encoded case. If a value is found "invalid", it |
1060 | | // can't cause a read restart. If it is found "valid", it will cause a read restart if it is |
1061 | | // greater than read_time.read. That last comparison is done outside this function. |
1062 | 199M | Slice max_allowed = encoded_intent_doc_ht.compare(encoded_read_time_local_limit_) > 0 |
1063 | 199M | ? Slice(encoded_read_time_global_limit_)199M |
1064 | 199M | : Slice(encoded_read_time_read_)67.0k ; |
1065 | 199M | if (encoded_doc_ht.compare(max_allowed) > 0) { |
1066 | 199M | iter_valid_ = true; |
1067 | 199M | return; |
1068 | 199M | } |
1069 | 815M | } else if (encoded_doc_ht.compare(encoded_read_time_regular_limit_) > 0) { |
1070 | | // If a value does not contain the hybrid time of the intent that wrote the original |
1071 | | // transaction, then it either (a) originated from a single-shard transaction or (b) the |
1072 | | // intent hybrid time has already been garbage-collected during a compaction because the |
1073 | | // corresponding transaction's commit time (stored in the key) became lower than the history |
1074 | | // cutoff. See the following commit for the details of this intent hybrid time GC. |
1075 | | // |
1076 | | // https://github.com/yugabyte/yugabyte-db/commit/26260e0143e521e219d93f4aba6310fcc030a628 |
1077 | | // |
1078 | | // encoded_read_time_regular_limit_ is simply the encoded value of max(read_ht, local_limit). |
1079 | | // The above condition |
1080 | | // |
1081 | | // encoded_doc_ht.compare(encoded_read_time_regular_limit_) >= 0 |
1082 | | // |
1083 | | // corresponds to the following in terms of decoded hybrid times (order is reversed): |
1084 | | // |
1085 | | // commit_ht <= max(read_ht, local_limit) |
1086 | | // |
1087 | | // and the inverse of that can be written as |
1088 | | // |
1089 | | // commit_ht > read_ht && commit_ht > local_limit |
1090 | | // |
1091 | | // The reason this is correct here is that in case (a) the event of writing a single-shard |
1092 | | // record to the tablet would certainly be after our read transaction's start time in case |
1093 | | // commit_ht > local_limit, so it can never cause a read restart. In case (b) we know that |
1094 | | // commit_ht < history_cutoff and read_ht >= history_cutoff (by definition of history cutoff) |
1095 | | // so commit_ht < read_ht, and in this case read restart is impossible regardless of the |
1096 | | // value of local_limit. |
1097 | 814M | iter_valid_ = true; |
1098 | 814M | return; |
1099 | 814M | } |
1100 | 18.4E | VLOG(4) << "Skipping because of time: " << SubDocKey::DebugSliceToString(iter_.key()) |
1101 | 18.4E | << ", read time: " << read_time_; |
1102 | 1.49M | if (!NextRegular(direction)) { |
1103 | 0 | return; |
1104 | 0 | } |
1105 | 1.49M | } |
1106 | 18.4E | iter_valid_ = false; |
1107 | 18.4E | } |
1108 | | |
1109 | 8.02M | bool IntentAwareIterator::NextRegular(Direction direction) { |
1110 | 8.02M | switch (direction) { |
1111 | 3.21M | case Direction::kForward: |
1112 | 3.21M | iter_.Next(); // TODO(dtxn) use seek with the same key, but read limit as doc hybrid time. |
1113 | 3.21M | return true; |
1114 | 4.80M | case Direction::kBackward: |
1115 | 4.80M | iter_.Prev(); |
1116 | 4.80M | return true; |
1117 | 8.02M | } |
1118 | | |
1119 | 0 | status_ = STATUS_FORMAT(Corruption, "Unexpected direction: $0", direction); |
1120 | 0 | LOG(ERROR) << status_; |
1121 | 0 | iter_valid_ = false; |
1122 | 0 | return false; |
1123 | 8.02M | } |
1124 | | |
1125 | 1.03G | void IntentAwareIterator::SkipFutureIntents() { |
1126 | 1.03G | skip_future_intents_needed_ = false; |
1127 | 1.03G | if (!intent_iter_.Initialized() || !status_.ok()141M ) { |
1128 | 888M | return; |
1129 | 888M | } |
1130 | 141M | auto prefix = CurrentPrefix(); |
1131 | 141M | if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) { |
1132 | 5.66M | auto compare_result = resolved_intent_key_prefix_.AsSlice().compare_prefix(prefix); |
1133 | 5.66M | VLOG(4) << "Checking resolved intent subdockey: " |
1134 | 1 | << DebugDumpKeyToStr(resolved_intent_key_prefix_) |
1135 | 1 | << ", against new prefix: " << DebugDumpKeyToStr(prefix) << ": " |
1136 | 1 | << compare_result; |
1137 | 5.66M | if (compare_result == 0) { |
1138 | 5.66M | if (!SatisfyBounds(resolved_intent_key_prefix_.AsSlice())) { |
1139 | 0 | resolved_intent_state_ = ResolvedIntentState::kNoIntent; |
1140 | 5.66M | } else { |
1141 | 5.66M | resolved_intent_state_ = ResolvedIntentState::kValid; |
1142 | 5.66M | } |
1143 | 5.66M | return; |
1144 | 5.66M | } else if (319 compare_result > 0319 ) { |
1145 | 326 | resolved_intent_state_ = ResolvedIntentState::kInvalidPrefix; |
1146 | 326 | return; |
1147 | 326 | } |
1148 | 5.66M | } |
1149 | 136M | SeekToSuitableIntent<Direction::kForward>(); |
1150 | 136M | } |
1151 | | |
1152 | 149M | Status IntentAwareIterator::SetIntentUpperbound() { |
1153 | 149M | if (iter_.Valid()) { |
1154 | 146M | intent_upperbound_keybytes_.Clear(); |
1155 | | // Strip ValueType::kHybridTime + DocHybridTime at the end of SubDocKey in iter_ and append |
1156 | | // to upperbound with 0xff. |
1157 | 146M | Slice subdoc_key = iter_.key(); |
1158 | 146M | size_t doc_ht_size = 0; |
1159 | 146M | RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(subdoc_key, &doc_ht_size)); |
1160 | 146M | subdoc_key.remove_suffix(1 + doc_ht_size); |
1161 | 146M | intent_upperbound_keybytes_.AppendRawBytes(subdoc_key); |
1162 | 18.4E | VLOG(4) << "SetIntentUpperbound = " |
1163 | 18.4E | << SubDocKey::DebugSliceToString(intent_upperbound_keybytes_.AsSlice()); |
1164 | 146M | intent_upperbound_keybytes_.AppendValueType(ValueType::kMaxByte); |
1165 | 146M | intent_upperbound_ = intent_upperbound_keybytes_.AsSlice(); |
1166 | 146M | intent_iter_.RevalidateAfterUpperBoundChange(); |
1167 | 146M | } else { |
1168 | | // In case the current position of the regular iterator is invalid, set the exclusive intent |
1169 | | // upperbound high to be able to find all intents higher than the last regular record. |
1170 | 2.95M | ResetIntentUpperbound(); |
1171 | 2.95M | } |
1172 | 149M | return Status::OK(); |
1173 | 149M | } |
1174 | | |
1175 | 2.96M | void IntentAwareIterator::ResetIntentUpperbound() { |
1176 | 2.96M | intent_upperbound_keybytes_.Clear(); |
1177 | 2.96M | intent_upperbound_keybytes_.AppendValueType(ValueType::kHighest); |
1178 | 2.96M | intent_upperbound_ = intent_upperbound_keybytes_.AsSlice(); |
1179 | 2.96M | intent_iter_.RevalidateAfterUpperBoundChange(); |
1180 | 18.4E | VLOG(4) << "ResetIntentUpperbound = " << intent_upperbound_.ToDebugString(); |
1181 | 2.96M | } |
1182 | | |
1183 | | } // namespace docdb |
1184 | | } // namespace yb |