/Users/deen/code/yugabyte-db/src/yb/docdb/conflict_resolution.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include "yb/docdb/conflict_resolution.h" |
14 | | |
15 | | #include <map> |
16 | | |
17 | | #include "yb/common/hybrid_time.h" |
18 | | #include "yb/common/row_mark.h" |
19 | | #include "yb/common/transaction.h" |
20 | | #include "yb/common/transaction_error.h" |
21 | | #include "yb/common/transaction_priority.h" |
22 | | #include "yb/docdb/doc_key.h" |
23 | | #include "yb/docdb/docdb.h" |
24 | | #include "yb/docdb/docdb.pb.h" |
25 | | #include "yb/docdb/docdb_rocksdb_util.h" |
26 | | #include "yb/docdb/intent.h" |
27 | | #include "yb/docdb/shared_lock_manager.h" |
28 | | #include "yb/docdb/transaction_dump.h" |
29 | | #include "yb/util/logging.h" |
30 | | #include "yb/util/metrics.h" |
31 | | #include "yb/util/scope_exit.h" |
32 | | #include "yb/util/status_format.h" |
33 | | #include "yb/util/trace.h" |
34 | | |
35 | | using namespace std::literals; |
36 | | using namespace std::placeholders; |
37 | | |
38 | | namespace yb { |
39 | | namespace docdb { |
40 | | |
41 | | namespace { |
42 | | |
43 | | struct TransactionConflictInfo { |
44 | | WaitPolicy wait_policy; |
45 | | bool all_lock_only_conflicts; |
46 | | |
47 | 0 | std::string ToString() const { |
48 | 0 | return YB_STRUCT_TO_STRING(wait_policy, all_lock_only_conflicts); |
49 | 0 | } |
50 | | }; |
51 | | |
52 | | using TransactionConflictInfoMap = std::unordered_map<TransactionId, |
53 | | TransactionConflictInfo, |
54 | | TransactionIdHash>; |
55 | | |
56 | | struct TransactionData { |
57 | | TransactionData(TransactionId id_, WaitPolicy wait_policy_, bool all_lock_only_conflicts_) |
58 | 414k | : id(id_), wait_policy(wait_policy_), all_lock_only_conflicts(all_lock_only_conflicts_) {} |
59 | | |
60 | | TransactionId id; |
61 | | WaitPolicy wait_policy; |
62 | | // all_lock_only_conflicts is true if all conflicting intents of this transaction are explicit row |
63 | | // lock intents and not intents that would result in modifications to data in regular db. |
64 | | bool all_lock_only_conflicts; |
65 | | TransactionStatus status; |
66 | | HybridTime commit_time; |
67 | | uint64_t priority; |
68 | | Status failure; |
69 | | |
70 | 334k | void ProcessStatus(const TransactionStatusResult& result) { |
71 | 334k | status = result.status; |
72 | 334k | if (status == TransactionStatus::COMMITTED) { |
73 | 94.8k | LOG_IF(DFATAL, !result.status_time.is_valid()) |
74 | 1 | << "Status time not specified for committed transaction: " << id; |
75 | 94.8k | commit_time = result.status_time; |
76 | 94.8k | } |
77 | 334k | } |
78 | | |
79 | 0 | std::string ToString() const { |
80 | 0 | return Format("{ id: $0 status: $1 commit_time: $2 priority: $3 failure: $4 }", |
81 | 0 | id, TransactionStatus_Name(status), commit_time, priority, failure); |
82 | 0 | } |
83 | | }; |
84 | | |
85 | | CHECKED_STATUS MakeConflictStatus(const TransactionId& our_id, const TransactionId& other_id, |
86 | 119k | const char* reason, Counter* conflicts_metric) { |
87 | 119k | conflicts_metric->Increment(); |
88 | 119k | return (STATUS(TryAgain, Format("$0 Conflicts with $1 transaction: $2", our_id, reason, other_id), |
89 | 119k | Slice(), TransactionError(TransactionErrorCode::kConflict))); |
90 | 119k | } |
91 | | |
92 | | class ConflictResolver; |
93 | | |
94 | | class ConflictResolverContext { |
95 | | public: |
96 | | // Read all conflicts for operation/transaction. |
97 | | virtual CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) = 0; |
98 | | |
99 | | // Check priority of this one against existing transactions. |
100 | | virtual CHECKED_STATUS CheckPriority( |
101 | | ConflictResolver* resolver, |
102 | | boost::iterator_range<TransactionData*> transactions) = 0; |
103 | | |
104 | | // Check for conflict against committed transaction. |
105 | | // Returns true if transaction could be removed from list of conflicts. |
106 | | virtual Result<bool> CheckConflictWithCommitted( |
107 | | const TransactionData& transaction_data, HybridTime commit_time) = 0; |
108 | | |
109 | | virtual HybridTime GetResolutionHt() = 0; |
110 | | |
111 | | virtual bool IgnoreConflictsWith(const TransactionId& other) = 0; |
112 | | |
113 | | virtual TransactionId transaction_id() const = 0; |
114 | | |
115 | | virtual std::string ToString() const = 0; |
116 | | |
117 | 56 | std::string LogPrefix() const { |
118 | 56 | return ToString() + ": "; |
119 | 56 | } |
120 | | |
121 | 1.11M | virtual ~ConflictResolverContext() = default; |
122 | | }; |
123 | | |
124 | | class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> { |
125 | | public: |
126 | | ConflictResolver(const DocDB& doc_db, |
127 | | TransactionStatusManager* status_manager, |
128 | | PartialRangeKeyIntents partial_range_key_intents, |
129 | | std::unique_ptr<ConflictResolverContext> context, |
130 | | ResolutionCallback callback) |
131 | | : doc_db_(doc_db), status_manager_(*status_manager), request_scope_(status_manager), |
132 | | partial_range_key_intents_(partial_range_key_intents), context_(std::move(context)), |
133 | 1.10M | callback_(std::move(callback)) {} |
134 | | |
135 | 17.0M | PartialRangeKeyIntents partial_range_key_intents() { |
136 | 17.0M | return partial_range_key_intents_; |
137 | 17.0M | } |
138 | | |
139 | 928k | TransactionStatusManager& status_manager() { |
140 | 928k | return status_manager_; |
141 | 928k | } |
142 | | |
143 | 8.43M | const DocDB& doc_db() { |
144 | 8.43M | return doc_db_; |
145 | 8.43M | } |
146 | | |
147 | 986k | Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) { |
148 | 986k | return status_manager_.PrepareMetadata(pb); |
149 | 986k | } |
150 | | |
151 | | void FillPriorities( |
152 | 146k | boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) { |
153 | 146k | return status_manager_.FillPriorities(inout); |
154 | 146k | } |
155 | | |
156 | 1.10M | void Resolve() { |
157 | 1.10M | auto status = context_->ReadConflicts(this); |
158 | 1.10M | if (!status.ok()) { |
159 | 10.1k | InvokeCallback(status); |
160 | 10.1k | return; |
161 | 10.1k | } |
162 | | |
163 | 1.09M | ResolveConflicts(); |
164 | 1.09M | } |
165 | | |
166 | 99.7k | Result<WaitPolicy> CombineWaitPolicy(WaitPolicy existing_policy, WaitPolicy new_policy) { |
167 | 99.7k | RSTATUS_DCHECK( |
168 | 99.7k | existing_policy != WAIT_BLOCK, InternalError, "WAIT_BLOCK isn't support yet."); |
169 | | |
170 | 99.7k | switch(new_policy) { |
171 | 0 | case WAIT_BLOCK: |
172 | 0 | return STATUS(NotSupported, "WAIT_BLOCK isn't support yet."); |
173 | 99.6k | case WAIT_ERROR: |
174 | | // Even if some intent had a wait policy of WAIT_SKIP, WAIT_ERROR overrides that policy. |
175 | 99.6k | return new_policy; |
176 | 14 | case WAIT_SKIP: |
177 | | // The existing_policy can either be WAIT_ERROR or WAIT_SKIP. In either case, we can leave |
178 | | // it untouched. |
179 | 14 | return existing_policy; |
180 | 99.7k | } |
181 | 0 | return STATUS(NotSupported, "Unknown wait policy."); |
182 | 99.7k | } |
183 | | |
184 | | // Reads conflicts for specified intent from DB. |
185 | | CHECKED_STATUS ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix, |
186 | 29.0M | WaitPolicy wait_policy) { |
187 | 29.0M | EnsureIntentIteratorCreated(); |
188 | | |
189 | 29.0M | const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()]; |
190 | | |
191 | 29.0M | KeyBytes upperbound_key(*intent_key_prefix); |
192 | 29.0M | upperbound_key.AppendValueType(ValueType::kMaxByte); |
193 | 29.0M | intent_key_upperbound_ = upperbound_key.AsSlice(); |
194 | | |
195 | 29.0M | size_t original_size = intent_key_prefix->size(); |
196 | 29.0M | intent_key_prefix->AppendValueType(ValueType::kIntentTypeSet); |
197 | | // Have only weak intents, so could skip other weak intents. |
198 | 29.0M | if (!HasStrong(type)) { |
199 | 8.98M | char value = 1 << kStrongIntentFlag; |
200 | 8.98M | intent_key_prefix->AppendRawBytes(&value, 1); |
201 | 8.98M | } |
202 | 29.0M | auto se = ScopeExit([this, intent_key_prefix, original_size] { |
203 | 29.0M | intent_key_prefix->Truncate(original_size); |
204 | 29.0M | intent_key_upperbound_.clear(); |
205 | 29.0M | }); |
206 | 29.0M | Slice prefix_slice(intent_key_prefix->AsSlice().data(), original_size); |
207 | 29.0M | VLOG_WITH_PREFIX_AND_FUNC788 (4) << "Check conflicts in intents DB; Seek: " |
208 | 788 | << intent_key_prefix->AsSlice().ToDebugHexString() << " for type " |
209 | 788 | << ToString(type) << " and wait_policy=" << wait_policy; |
210 | 29.0M | intent_iter_.Seek(intent_key_prefix->AsSlice()); |
211 | 39.4M | while (intent_iter_.Valid()) { |
212 | 10.6M | auto existing_key = intent_iter_.key(); |
213 | 10.6M | auto existing_value = intent_iter_.value(); |
214 | 10.6M | if (!existing_key.starts_with(prefix_slice)) { |
215 | 0 | break; |
216 | 0 | } |
217 | | // Support for obsolete intent type. |
218 | | // When looking for intent with specific prefix it should start with this prefix, followed |
219 | | // by ValueType::kIntentTypeSet. |
220 | | // Previously we were using intent type, so should support its value type also, now it is |
221 | | // kObsoleteIntentType. |
222 | | // Actual handling of obsolete intent type is done in ParseIntentKey. |
223 | 10.6M | if (existing_key.size() <= prefix_slice.size() || |
224 | 10.6M | !IntentValueType(existing_key[prefix_slice.size()])) { |
225 | 239k | break; |
226 | 239k | } |
227 | | |
228 | 10.4M | auto existing_intent = VERIFY_RESULT( |
229 | 10.4M | docdb::ParseIntentKey(intent_iter_.key(), existing_value)); |
230 | | |
231 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Found: " << existing_value.ToDebugString() |
232 | 18.4E | << " has intent types " << ToString(existing_intent.types); |
233 | 10.4M | auto decoded_value = VERIFY_RESULT(DecodeIntentValue( |
234 | 10.4M | existing_value, nullptr /* verify_transaction_id_slice */, |
235 | 10.4M | HasStrong(existing_intent.types))); |
236 | 0 | const auto intent_mask = kIntentTypeSetMask[existing_intent.types.ToUIntPtr()]; |
237 | 10.4M | if ((conflicting_intent_types & intent_mask) != 0) { |
238 | 766k | auto transaction_id = decoded_value.transaction_id; |
239 | 766k | bool lock_only = decoded_value.body.starts_with(ValueTypeAsChar::kRowLock); |
240 | | |
241 | | // TODO(savepoints) - if the intent corresponds to an aborted subtransaction, ignore. |
242 | 766k | if (!context_->IgnoreConflictsWith(transaction_id)) { |
243 | 514k | auto p = conflicts_.emplace(transaction_id, |
244 | 514k | TransactionConflictInfo { |
245 | 514k | .wait_policy = wait_policy, |
246 | 514k | .all_lock_only_conflicts = lock_only, |
247 | 514k | }); |
248 | 514k | if (!p.second) { |
249 | 99.7k | p.first->second.wait_policy = VERIFY_RESULT( |
250 | 0 | CombineWaitPolicy(p.first->second.wait_policy, wait_policy)); |
251 | 99.7k | p.first->second.all_lock_only_conflicts = p.first->second.all_lock_only_conflicts && |
252 | 99.7k | lock_only14 ; |
253 | 99.7k | } |
254 | 514k | } |
255 | 766k | } |
256 | | |
257 | 10.4M | intent_iter_.Next(); |
258 | 10.4M | } |
259 | | |
260 | 29.0M | return Status::OK(); |
261 | 29.0M | } |
262 | | |
263 | 30.0M | void EnsureIntentIteratorCreated() { |
264 | 30.0M | if (!intent_iter_.Initialized()) { |
265 | 1.10M | intent_iter_ = CreateRocksDBIterator( |
266 | 1.10M | doc_db_.intents, |
267 | 1.10M | doc_db_.key_bounds, |
268 | 1.10M | BloomFilterMode::DONT_USE_BLOOM_FILTER, |
269 | 1.10M | boost::none /* user_key_for_filter */, |
270 | 1.10M | rocksdb::kDefaultQueryId, |
271 | 1.10M | nullptr /* file_filter */, |
272 | 1.10M | &intent_key_upperbound_); |
273 | 1.10M | } |
274 | 30.0M | } |
275 | | |
276 | | private: |
277 | 1.10M | void InvokeCallback(const Result<HybridTime>& result) { |
278 | 1.10M | YB_TRANSACTION_DUMP( |
279 | 1.10M | Conflicts, context_->transaction_id(), |
280 | 1.10M | result.ok() ? *result : HybridTime::kInvalid, |
281 | 1.10M | Slice(pointer_cast<const uint8_t*>(transactions_.data()), |
282 | 1.10M | transactions_.size() * sizeof(transactions_[0]))); |
283 | 1.10M | intent_iter_.Reset(); |
284 | 1.10M | callback_(result); |
285 | 1.10M | } |
286 | | |
287 | 585k | MUST_USE_RESULT bool CheckResolutionDone(const Result<bool>& result) { |
288 | 585k | if (!result.ok()) { |
289 | 119k | TRACE("Abort: $0", result.status().ToString()); |
290 | 18.4E | VLOG_WITH_PREFIX(4) << "Abort: " << result.status(); |
291 | 119k | InvokeCallback(result.status()); |
292 | 119k | return true; |
293 | 119k | } |
294 | | |
295 | 465k | if (result.get()) { |
296 | 159k | TRACE("No conflicts."); |
297 | 18.4E | VLOG_WITH_PREFIX(4) << "No conflicts: " << context_->GetResolutionHt(); |
298 | 159k | InvokeCallback(context_->GetResolutionHt()); |
299 | 159k | return true; |
300 | 159k | } |
301 | | |
302 | 306k | return false; |
303 | 465k | } |
304 | | |
305 | 1.09M | void ResolveConflicts() { |
306 | 1.09M | VLOG_WITH_PREFIX257 (3) << "Conflicts: " << yb::ToString(conflicts_)257 ; |
307 | 1.09M | if (conflicts_.empty()) { |
308 | 821k | VTRACE(1, LogPrefix()); |
309 | 821k | TRACE("No conflicts."); |
310 | 821k | InvokeCallback(context_->GetResolutionHt()); |
311 | 821k | return; |
312 | 821k | } |
313 | | |
314 | 278k | transactions_.reserve(conflicts_.size()); |
315 | 414k | for (const auto& kv : conflicts_) { |
316 | 414k | transactions_.emplace_back(kv.first /* id */, |
317 | 414k | kv.second.wait_policy, |
318 | 414k | kv.second.all_lock_only_conflicts); |
319 | 414k | } |
320 | 278k | remaining_transactions_ = transactions_.size(); |
321 | | |
322 | 278k | DoResolveConflicts(); |
323 | 278k | } |
324 | | |
325 | 279k | void DoResolveConflicts() { |
326 | 279k | if (CheckResolutionDone(CheckLocalCommits())) { |
327 | 22.4k | return; |
328 | 22.4k | } |
329 | | |
330 | 256k | FetchTransactionStatuses(); |
331 | 256k | } |
332 | | |
333 | 256k | void FetchTransactionStatusesDone() { |
334 | 256k | if (CheckResolutionDone(ContinueResolve())) { |
335 | 208k | return; |
336 | 208k | } |
337 | 256k | } |
338 | | |
339 | 256k | Result<bool> ContinueResolve() { |
340 | 256k | if (VERIFY_RESULT(Cleanup())) { |
341 | 106k | return true; |
342 | 106k | } |
343 | | |
344 | 150k | RETURN_NOT_OK(context_->CheckPriority(this, RemainingTransactions())); |
345 | | |
346 | 51.9k | AbortTransactions(); |
347 | 51.9k | return false; |
348 | 150k | } |
349 | | |
350 | | // Returns true when there are no conflicts left. |
351 | 279k | Result<bool> CheckLocalCommits() { |
352 | 415k | return DoCleanup([this](auto* transaction) -> Result<bool> { |
353 | 415k | return this->CheckLocalCommit(transaction); |
354 | 415k | }); |
355 | 279k | } |
356 | | |
357 | | // Check whether specified transaction was locally committed, and store this state if so. |
358 | | // Returns true if conflict with specified transaction is resolved. |
359 | 415k | Result<bool> CheckLocalCommit(TransactionData* transaction) { |
360 | | // TODO(savepoints): Do not conflict with aborted intents. |
361 | 415k | auto commit_time = status_manager().LocalCommitTime(transaction->id); |
362 | 415k | if (commit_time.is_valid()) { |
363 | 77.9k | transaction->commit_time = commit_time; |
364 | 77.9k | transaction->status = TransactionStatus::COMMITTED; |
365 | 77.9k | } |
366 | | // In case of failure status, we stop the resolution process, so `transactions_` content |
367 | | // does not matter in this case. |
368 | 415k | if (!(commit_time.is_valid() && |
369 | 415k | VERIFY_RESULT(context_->CheckConflictWithCommitted(*transaction, commit_time)))) { |
370 | 338k | return false; |
371 | 338k | } |
372 | 18.4E | VLOG_WITH_PREFIX(4) << "Locally committed: " << transaction->id << ", time: " << commit_time; |
373 | 77.4k | return true; |
374 | 415k | } |
375 | | |
376 | | // Apply specified functor to all active (i.e. not resolved) transactions. |
377 | | // If functor returns true, it means that transaction was resolved. |
378 | | // So such transaction is moved out of active transactions range. |
379 | | // Returns true if there are no active transaction left. |
380 | | template <class F> |
381 | 585k | Result<bool> DoCleanup(const F& f) { |
382 | 585k | auto end = transactions_.begin() + remaining_transactions_; |
383 | 1.20M | for (auto transaction = transactions_.begin(); transaction != end;) { |
384 | 808k | if (!VERIFY_RESULT(f(&*transaction))) { |
385 | 519k | ++transaction; |
386 | 519k | continue; |
387 | 519k | } |
388 | 288k | if (--end == transaction) { |
389 | 186k | break; |
390 | 186k | } |
391 | 101k | std::swap(*transaction, *end); |
392 | 101k | } |
393 | 585k | remaining_transactions_ = end - transactions_.begin(); |
394 | | |
395 | 585k | return remaining_transactions_ == 0; |
396 | 585k | } conflict_resolution.cc:yb::Result<bool> yb::docdb::(anonymous namespace)::ConflictResolver::DoCleanup<yb::docdb::(anonymous namespace)::ConflictResolver::CheckLocalCommits()::'lambda'(auto*)>(auto const&) Line | Count | Source | 381 | 279k | Result<bool> DoCleanup(const F& f) { | 382 | 279k | auto end = transactions_.begin() + remaining_transactions_; | 383 | 657k | for (auto transaction = transactions_.begin(); transaction != end;) { | 384 | 415k | if (!VERIFY_RESULT(f(&*transaction))) { | 385 | 338k | ++transaction; | 386 | 338k | continue; | 387 | 338k | } | 388 | 77.4k | if (--end == transaction) { | 389 | 37.4k | break; | 390 | 37.4k | } | 391 | 40.0k | std::swap(*transaction, *end); | 392 | 40.0k | } | 393 | 279k | remaining_transactions_ = end - transactions_.begin(); | 394 | | | 395 | 279k | return remaining_transactions_ == 0; | 396 | 279k | } |
conflict_resolution.cc:yb::Result<bool> yb::docdb::(anonymous namespace)::ConflictResolver::DoCleanup<yb::docdb::(anonymous namespace)::ConflictResolver::Cleanup()::'lambda'(auto*)>(auto const&) Line | Count | Source | 381 | 305k | Result<bool> DoCleanup(const F& f) { | 382 | 305k | auto end = transactions_.begin() + remaining_transactions_; | 383 | 548k | for (auto transaction = transactions_.begin(); transaction != end;) { | 384 | 392k | if (!VERIFY_RESULT(f(&*transaction))) { | 385 | 181k | ++transaction; | 386 | 181k | continue; | 387 | 181k | } | 388 | 211k | if (--end == transaction) { | 389 | 149k | break; | 390 | 149k | } | 391 | 61.9k | std::swap(*transaction, *end); | 392 | 61.9k | } | 393 | 305k | remaining_transactions_ = end - transactions_.begin(); | 394 | | | 395 | 305k | return remaining_transactions_ == 0; | 396 | 305k | } |
|
397 | | |
398 | | // Removes all transactions that would not conflict with us anymore. |
399 | | // Returns failure if we conflict with transaction that cannot be aborted. |
400 | 305k | Result<bool> Cleanup() { |
401 | 392k | return DoCleanup([this](auto* transaction) -> Result<bool> { |
402 | 392k | return this->CheckCleanup(transaction); |
403 | 392k | }); |
404 | 305k | } |
405 | | |
406 | 392k | Result<bool> CheckCleanup(TransactionData* transaction) { |
407 | 392k | RETURN_NOT_OK(transaction->failure); |
408 | 392k | auto status = transaction->status; |
409 | 392k | if (status == TransactionStatus::COMMITTED) { |
410 | 93.6k | if (VERIFY_RESULT(context_->CheckConflictWithCommitted( |
411 | 93.6k | *transaction, transaction->commit_time))) { |
412 | 71.9k | VLOG_WITH_PREFIX1 (4) |
413 | 1 | << "Committed: " << transaction->id << ", commit time: " << transaction->commit_time; |
414 | 71.9k | return true; |
415 | 71.9k | } |
416 | 298k | } else if (status == TransactionStatus::ABORTED) { |
417 | 118k | auto commit_time = status_manager().LocalCommitTime(transaction->id); |
418 | 118k | if (commit_time) { |
419 | 119 | if (VERIFY_RESULT(context_->CheckConflictWithCommitted(*transaction, commit_time))) { |
420 | 114 | VLOG_WITH_PREFIX0 (4) |
421 | 0 | << "Locally committed: " << transaction->id << "< commit time: " << commit_time; |
422 | 114 | return true; |
423 | 114 | } |
424 | 118k | } else { |
425 | 118k | VLOG_WITH_PREFIX0 (4) << "Aborted: " << transaction->id0 ; |
426 | 118k | return true; |
427 | 118k | } |
428 | 180k | } else if (status != TransactionStatus::PENDING && status != TransactionStatus::APPLYING0 ) { |
429 | 0 | return STATUS_FORMAT( |
430 | 0 | IllegalState, "Unexpected transaction state: $0", TransactionStatus_Name(status)); |
431 | 0 | } |
432 | 202k | return false; |
433 | 392k | } |
434 | | |
435 | 452k | boost::iterator_range<TransactionData*> RemainingTransactions() { |
436 | 452k | auto begin = transactions_.data(); |
437 | 452k | return boost::make_iterator_range(begin, begin + remaining_transactions_); |
438 | 452k | } |
439 | | |
440 | 256k | void FetchTransactionStatuses() { |
441 | 256k | static const std::string kRequestReason = "conflict resolution"s; |
442 | 256k | auto self = shared_from_this(); |
443 | 256k | pending_requests_.store(remaining_transactions_); |
444 | 338k | for (auto& i : RemainingTransactions()) { |
445 | 338k | auto& transaction = i; |
446 | 338k | TRACE("FetchingTransactionStatus for $0", yb::ToString(transaction.id)); |
447 | 338k | StatusRequest request = { |
448 | 338k | &transaction.id, |
449 | 338k | context_->GetResolutionHt(), |
450 | 338k | context_->GetResolutionHt(), |
451 | 338k | 0, // serial no. Could use 0 here, because read_ht == global_limit_ht. |
452 | | // So we cannot accept status with time >= read_ht and < global_limit_ht. |
453 | 338k | &kRequestReason, |
454 | 338k | TransactionLoadFlags{TransactionLoadFlag::kCleanup}, |
455 | 338k | [self, &transaction](Result<TransactionStatusResult> result) { |
456 | 338k | if (result.ok()) { |
457 | 278k | transaction.ProcessStatus(*result); |
458 | 278k | } else if (59.3k result.status().IsTryAgain()59.3k ) { |
459 | | // It is safe to suppose that transaction in PENDING state in case of try again error. |
460 | 48.4k | transaction.status = TransactionStatus::PENDING; |
461 | 48.4k | } else if (10.9k result.status().IsNotFound()10.9k ) { |
462 | 10.9k | transaction.status = TransactionStatus::ABORTED; |
463 | 18.4E | } else { |
464 | 18.4E | transaction.failure = result.status(); |
465 | 18.4E | } |
466 | 338k | if (self->pending_requests_.fetch_sub(1, std::memory_order_acq_rel) == 1) { |
467 | 256k | self->FetchTransactionStatusesDone(); |
468 | 256k | } |
469 | 338k | } |
470 | 338k | }; |
471 | 338k | status_manager().RequestStatusAt(request); |
472 | 338k | } |
473 | 256k | } |
474 | | |
475 | 48.8k | void AbortTransactions() { |
476 | 48.8k | auto self = shared_from_this(); |
477 | 48.8k | pending_requests_.store(remaining_transactions_); |
478 | 55.9k | for (auto& i : RemainingTransactions()) { |
479 | 55.9k | auto& transaction = i; |
480 | 55.9k | TRACE("Aborting $0", yb::ToString(transaction.id)); |
481 | 55.9k | status_manager().Abort( |
482 | 55.9k | transaction.id, |
483 | 55.9k | [self, &transaction](Result<TransactionStatusResult> result) { |
484 | 55.9k | VLOG(4) << self->LogPrefix() << "Abort received: " << AsString(result)0 ; |
485 | 55.9k | if (result.ok()) { |
486 | 55.9k | transaction.ProcessStatus(*result); |
487 | 55.9k | } else if (56 result.status().IsRemoteError()56 || result.status().IsAborted()56 ) { |
488 | | // Non retryable errors. Aborted could be caused by shutdown. |
489 | 0 | transaction.failure = result.status(); |
490 | 56 | } else { |
491 | 56 | LOG(INFO) << self->LogPrefix() << "Abort failed, would retry: " << result.status(); |
492 | 56 | } |
493 | 55.9k | if (self->pending_requests_.fetch_sub(1, std::memory_order_acq_rel) == 1) { |
494 | 48.8k | self->AbortTransactionsDone(); |
495 | 48.8k | } |
496 | 55.9k | }); |
497 | 55.9k | } |
498 | 48.8k | } |
499 | | |
500 | 48.8k | void AbortTransactionsDone() { |
501 | 48.8k | if (CheckResolutionDone(Cleanup())) { |
502 | 47.8k | return; |
503 | 47.8k | } |
504 | | |
505 | 978 | DoResolveConflicts(); |
506 | 978 | } |
507 | | |
508 | 56 | std::string LogPrefix() const { |
509 | 56 | return context_->LogPrefix(); |
510 | 56 | } |
511 | | |
512 | | DocDB doc_db_; |
513 | | TransactionStatusManager& status_manager_; |
514 | | RequestScope request_scope_; |
515 | | PartialRangeKeyIntents partial_range_key_intents_; |
516 | | std::unique_ptr<ConflictResolverContext> context_; |
517 | | ResolutionCallback callback_; |
518 | | |
519 | | BoundedRocksDbIterator intent_iter_; |
520 | | Slice intent_key_upperbound_; |
521 | | TransactionConflictInfoMap conflicts_; |
522 | | |
523 | | // Resolution state for all transactions. Resolved transactions are moved to the end of it. |
524 | | std::vector<TransactionData> transactions_; |
525 | | // Number of transactions that are not yet resolved. After successful resolution should be 0. |
526 | | size_t remaining_transactions_; |
527 | | |
528 | | std::atomic<size_t> pending_requests_{0}; |
529 | | }; |
530 | | |
531 | | struct IntentData { |
532 | | IntentTypeSet types; |
533 | | bool full_doc_key; |
534 | | |
535 | 0 | std::string ToString() const { |
536 | 0 | return YB_STRUCT_TO_STRING(types, full_doc_key); |
537 | 0 | } |
538 | | }; |
539 | | |
540 | | using IntentTypesContainer = std::map<KeyBuffer, IntentData>; |
541 | | |
542 | | class IntentProcessor { |
543 | | public: |
544 | | IntentProcessor(IntentTypesContainer* container, const IntentTypeSet& strong_intent_types) |
545 | | : container_(*container), |
546 | | strong_intent_types_(strong_intent_types), |
547 | | weak_intent_types_(StrongToWeak(strong_intent_types_)) |
548 | 1.37M | {} |
549 | | |
550 | 57.0M | void Process(IntentStrength strength, FullDocKey full_doc_key, KeyBytes* intent_key) { |
551 | 57.0M | const auto is_strong = strength == IntentStrength::kStrong; |
552 | 57.0M | const auto& intent_type_set = is_strong ? strong_intent_types_19.6M : weak_intent_types_37.4M ; |
553 | 57.0M | auto i = container_.find(intent_key->data()); |
554 | 57.0M | if (i == container_.end()) { |
555 | 27.2M | container_.emplace(intent_key->data(), |
556 | 27.2M | IntentData{intent_type_set, full_doc_key}); |
557 | 27.2M | return; |
558 | 27.2M | } |
559 | | |
560 | 29.8M | i->second.types |= intent_type_set; |
561 | | |
562 | | // In a batch of keys, the computed full_doc_key value might vary based on the key that produced |
563 | | // a particular intent. E.g. suppose we have a primary key (h, r) and s is a subkey. If we are |
564 | | // trying to write strong intents on (h) and (h, r, s) in a batch, we end up with the following |
565 | | // intent types: |
566 | | // |
567 | | // (h) -> strong, full_doc_key: true (always true for strong intents) |
568 | | // (h, r) -> weak, full_doc_key: true (we did not omit any final doc key components) |
569 | | // (h, r, s) -> strong, full_doc_key: true |
570 | | // |
571 | | // Note that full_doc_key is always true for strong intents because we process one key at a time |
572 | | // and when taking that key by itself, (h) looks like the full doc key (nothing follows it). |
573 | | // In the above example, the intent (h) is generated both as a strong intent and as a weak |
574 | | // intent based on keys (h, r) and (h, r, s), and we OR the value of full_doc_key and end up |
575 | | // with true. |
576 | | // |
577 | | // If we are trying to write strong intents on (h, r) and (h, r, s), we get: |
578 | | // |
579 | | // (h) -> weak, full_doc_key: false (because we know it is just part of the doc key) |
580 | | // (h, r) -> strong, full_doc_key: true |
581 | | // (h, r, s) -> strong, full_doc_key: true |
582 | | // |
583 | | // So we effectively end up with three types of intents: |
584 | | // - Weak intents with full_doc_key=false |
585 | | // - Weak intents with full_doc_key=true |
586 | | // - Strong intents with full_doc_key=true. |
587 | 29.8M | i->second.full_doc_key = i->second.full_doc_key || full_doc_key19.8M ; |
588 | 29.8M | } |
589 | | |
590 | | private: |
591 | | IntentTypesContainer& container_; |
592 | | const IntentTypeSet strong_intent_types_; |
593 | | const IntentTypeSet weak_intent_types_; |
594 | | }; |
595 | | |
596 | | class StrongConflictChecker { |
597 | | public: |
598 | | StrongConflictChecker(const TransactionId& transaction_id, |
599 | | HybridTime read_time, |
600 | | ConflictResolver* resolver, |
601 | | Counter* conflicts_metric, |
602 | | KeyBytes* buffer) |
603 | | : transaction_id_(transaction_id), |
604 | | read_time_(read_time), |
605 | | resolver_(*resolver), |
606 | | conflicts_metric_(*conflicts_metric), |
607 | | buffer_(*buffer) |
608 | 986k | {} |
609 | | |
610 | 21.5M | CHECKED_STATUS Check(const Slice& intent_key, bool strong, WaitPolicy wait_policy) { |
611 | 21.5M | const auto hash = VERIFY_RESULT(DecodeDocKeyHash(intent_key)); |
612 | 21.5M | if (PREDICT_FALSE(!value_iter_.Initialized() || hash != value_iter_hash_)) { |
613 | 4.21M | value_iter_ = CreateRocksDBIterator( |
614 | 4.21M | resolver_.doc_db().regular, |
615 | 4.21M | resolver_.doc_db().key_bounds, |
616 | 4.21M | BloomFilterMode::USE_BLOOM_FILTER, |
617 | 4.21M | intent_key, |
618 | 4.21M | rocksdb::kDefaultQueryId); |
619 | 4.21M | value_iter_hash_ = hash; |
620 | 4.21M | } |
621 | 21.5M | value_iter_.Seek(intent_key); |
622 | 21.5M | VLOG_WITH_PREFIX_AND_FUNC2.13k (4) |
623 | 2.13k | << "Overwrite; Seek: " << intent_key.ToDebugString() << " (" |
624 | 2.13k | << SubDocKey::DebugSliceToString(intent_key) << "), strong: " << strong << ", wait_policy: " |
625 | 2.13k | << AsString(wait_policy); |
626 | | // If we are resolving conflicts for writing a strong intent, look at records in regular RocksDB |
627 | | // with the same key as the intent's key (not including hybrid time) and any child keys. This is |
628 | | // because a strong intent indicates deletion or replacement of the entire subdocument tree and |
629 | | // any element of that tree that has already been committed at a higher hybrid time than the |
630 | | // read timestamp would be in conflict. |
631 | | // |
632 | | // (Note that when writing a strong intent on the entire table, e.g. as part of locking the |
633 | | // table, there is currently a performance issue and we'll need a better approach: |
634 | | // https://github.com/yugabyte/yugabyte-db/issues/6055). |
635 | | // |
636 | | // If we are resolving conflicts for writing a weak intent, only look at records in regular |
637 | | // RocksDB with the same key as the intent (not including hybrid time). This is because a weak |
638 | | // intent indicates that something in the document subtree rooted at that intent's key will |
639 | | // change, so it is only directly in conflict with a committed record that deletes or replaces |
640 | | // that entire document subtree (similar to a strong intent), so it would have the same exact |
641 | | // key as the weak intent (not including hybrid time). |
642 | 26.2M | while (value_iter_.Valid() && |
643 | 26.2M | (17.6M intent_key.starts_with(ValueTypeAsChar::kGroupEnd)17.6M || |
644 | 17.6M | value_iter_.key().starts_with(intent_key)17.6M )) { |
645 | 5.42M | auto existing_key = value_iter_.key(); |
646 | 5.42M | auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&existing_key)); |
647 | 5.42M | if (existing_key.empty() || |
648 | 5.42M | existing_key[existing_key.size() - 1] != ValueTypeAsChar::kHybridTime5.42M ) { |
649 | 0 | return STATUS_FORMAT( |
650 | 0 | Corruption, "Hybrid time expected at end of key: $0", |
651 | 0 | value_iter_.key().ToDebugString()); |
652 | 0 | } |
653 | 5.42M | if (!strong && existing_key.size() != intent_key.size() + 1684k ) { |
654 | 669k | VLOG_WITH_PREFIX12 (4) |
655 | 12 | << "Check value overwrite, key: " << intent_key.ToDebugString() |
656 | 12 | << ", out of bound key: " << existing_key.ToDebugString(); |
657 | 669k | break; |
658 | 669k | } |
659 | 18.4E | VLOG_WITH_PREFIX(4) |
660 | 18.4E | << "Check value overwrite, key: " << SubDocKey::DebugSliceToString(intent_key) |
661 | 18.4E | << ", read time: " << read_time_ |
662 | 18.4E | << ", doc ht: " << doc_ht.hybrid_time() |
663 | 18.4E | << ", found key: " << SubDocKey::DebugSliceToString(value_iter_.key()) |
664 | 18.4E | << ", after start: " << (doc_ht.hybrid_time() >= read_time_) |
665 | 18.4E | << ", value: " << value_iter_.value().ToDebugString(); |
666 | 4.75M | if (doc_ht.hybrid_time() >= read_time_) { |
667 | 9.35k | if (wait_policy == WAIT_SKIP) { |
668 | 2 | return STATUS(InternalError, "Skip locking since entity was modified in regular db", |
669 | 2 | TransactionError(TransactionErrorCode::kSkipLocking)); |
670 | 9.34k | } else { |
671 | 9.34k | conflicts_metric_.Increment(); |
672 | 9.34k | return STATUS_EC_FORMAT(TryAgain, TransactionError(TransactionErrorCode::kConflict), |
673 | 9.34k | "Value write after transaction start: $0 >= $1", |
674 | 9.34k | doc_ht.hybrid_time(), read_time_); |
675 | 9.34k | } |
676 | 9.35k | } |
677 | 4.75M | buffer_.Reset(existing_key); |
678 | | // Already have ValueType::kHybridTime at the end |
679 | 4.75M | buffer_.AppendHybridTime(DocHybridTime::kMin); |
680 | 4.75M | ROCKSDB_SEEK(&value_iter_, buffer_.AsSlice()); |
681 | 4.75M | } |
682 | | |
683 | 21.5M | return Status::OK(); |
684 | 21.5M | } |
685 | | |
686 | | private: |
687 | 0 | std::string LogPrefix() const { |
688 | 0 | return Format("$0: ", transaction_id_); |
689 | 0 | } |
690 | | |
691 | | const TransactionId& transaction_id_; |
692 | | const HybridTime read_time_; |
693 | | ConflictResolver& resolver_; |
694 | | Counter& conflicts_metric_; |
695 | | KeyBytes& buffer_; |
696 | | |
697 | | // RocksDb iterator with bloom filter can be reused in case keys has same hash component. |
698 | | BoundedRocksDbIterator value_iter_; |
699 | | boost::optional<DocKeyHash> value_iter_hash_; |
700 | | |
701 | | }; |
702 | | |
703 | | class ConflictResolverContextBase : public ConflictResolverContext { |
704 | | public: |
705 | | ConflictResolverContextBase(const DocOperations& doc_ops, |
706 | | HybridTime resolution_ht, |
707 | | Counter* conflicts_metric) |
708 | | : doc_ops_(doc_ops), |
709 | | resolution_ht_(resolution_ht), |
710 | 1.10M | conflicts_metric_(conflicts_metric) { |
711 | 1.10M | } |
712 | | |
713 | 1.10M | const DocOperations& doc_ops() { |
714 | 1.10M | return doc_ops_; |
715 | 1.10M | } |
716 | | |
717 | 1.65M | HybridTime GetResolutionHt() override { |
718 | 1.65M | return resolution_ht_; |
719 | 1.65M | } |
720 | | |
721 | 62 | void MakeResolutionAtLeast(const HybridTime& resolution_ht) { |
722 | 62 | resolution_ht_.MakeAtLeast(resolution_ht); |
723 | 62 | } |
724 | | |
725 | 1.10M | Counter* GetConflictsMetric() { |
726 | 1.10M | return conflicts_metric_; |
727 | 1.10M | } |
728 | | |
729 | | protected: |
730 | | CHECKED_STATUS CheckPriorityInternal( |
731 | | ConflictResolver* resolver, |
732 | | boost::iterator_range<TransactionData*> transactions, |
733 | | const TransactionId& our_transaction_id, |
734 | 147k | uint64_t our_priority) { |
735 | | |
736 | 147k | if (!fetched_metadata_for_transactions_) { |
737 | 146k | boost::container::small_vector<std::pair<TransactionId, uint64_t>, 8> ids_and_priorities; |
738 | 146k | ids_and_priorities.reserve(transactions.size()); |
739 | 179k | for (const auto& transaction : transactions) { |
740 | 179k | ids_and_priorities.emplace_back(transaction.id, 0); |
741 | 179k | } |
742 | 146k | resolver->FillPriorities(&ids_and_priorities); |
743 | 325k | for (size_t i = 0; i != transactions.size(); ++i179k ) { |
744 | 179k | transactions[i].priority = ids_and_priorities[i].second; |
745 | 179k | } |
746 | 146k | } |
747 | 159k | for (const auto& transaction : transactions) { |
748 | 159k | auto their_priority = transaction.priority; |
749 | 159k | if (transaction.wait_policy == WAIT_SKIP) { |
750 | 36 | return STATUS(InternalError, "Skip locking since entity is already locked", |
751 | 36 | TransactionError(TransactionErrorCode::kSkipLocking)); |
752 | 36 | } |
753 | | |
754 | | // READ COMMITTED txns require a guarantee that no txn abort it. They can handle facing a |
755 | | // kConflict due to another txn's conflicting intent, but can't handle aborts. To ensure |
756 | | // these guarantees - |
757 | | // 1. all READ COMMITTED txns are given kHighestPriority and |
758 | | // 2. a kConflict is raised even if their_priority equals our_priority. |
759 | 159k | if (our_priority <= their_priority) { |
760 | 98.3k | return MakeConflictStatus( |
761 | 98.3k | our_transaction_id, transaction.id, "higher priority", GetConflictsMetric()); |
762 | 98.3k | } |
763 | 159k | } |
764 | 48.8k | fetched_metadata_for_transactions_ = true; |
765 | | |
766 | 48.8k | return Status::OK(); |
767 | 147k | } |
768 | | |
769 | | private: |
770 | | const DocOperations& doc_ops_; |
771 | | |
772 | | // Hybrid time of conflict resolution, used to request transaction status from status tablet. |
773 | | HybridTime resolution_ht_; |
774 | | |
775 | | bool fetched_metadata_for_transactions_ = false; |
776 | | |
777 | | Counter* conflicts_metric_ = nullptr; |
778 | | }; |
779 | | |
780 | | // Utility class for ResolveTransactionConflicts implementation. |
781 | | class TransactionConflictResolverContext : public ConflictResolverContextBase { |
782 | | public: |
783 | | TransactionConflictResolverContext(const DocOperations& doc_ops, |
784 | | const KeyValueWriteBatchPB& write_batch, |
785 | | HybridTime resolution_ht, |
786 | | HybridTime read_time, |
787 | | Counter* conflicts_metric) |
788 | | : ConflictResolverContextBase(doc_ops, resolution_ht, conflicts_metric), |
789 | | write_batch_(write_batch), |
790 | | read_time_(read_time), |
791 | | transaction_id_(FullyDecodeTransactionId(write_batch.transaction().transaction_id())) |
792 | 985k | {} |
793 | | |
794 | 987k | virtual ~TransactionConflictResolverContext() {} |
795 | | |
796 | | private: |
797 | 987k | CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) override { |
798 | 987k | RETURN_NOT_OK(transaction_id_); |
799 | | |
800 | 987k | VLOG_WITH_PREFIX5.02k (3) << "Resolve conflicts"5.02k ; |
801 | | |
802 | 987k | metadata_ = VERIFY_RESULT987k (987k resolver->PrepareMetadata(write_batch_.transaction())); |
803 | | |
804 | 0 | boost::container::small_vector<RefCntPrefix, 8> paths; |
805 | | |
806 | 987k | const size_t kKeyBufferInitialSize = 512; |
807 | 987k | KeyBytes buffer; |
808 | 987k | buffer.Reserve(kKeyBufferInitialSize); |
809 | 987k | const auto row_mark = GetRowMarkTypeFromPB(write_batch_); |
810 | 987k | IntentTypesContainer container; |
811 | 987k | IntentProcessor write_processor( |
812 | 987k | &container, |
813 | 987k | GetStrongIntentTypeSet(metadata_.isolation, docdb::OperationKind::kWrite, row_mark)); |
814 | 6.78M | for (const auto& doc_op : doc_ops()) { |
815 | 6.78M | paths.clear(); |
816 | 6.78M | IsolationLevel ignored_isolation_level; |
817 | 6.78M | RETURN_NOT_OK(doc_op->GetDocPaths( |
818 | 6.78M | GetDocPathsMode::kIntents, &paths, &ignored_isolation_level)); |
819 | | |
820 | 16.7M | for (const auto& path : paths)6.78M { |
821 | 16.7M | VLOG_WITH_PREFIX_AND_FUNC306 (4) |
822 | 306 | << "Doc path: " << SubDocKey::DebugSliceToString(path.as_slice()); |
823 | 16.7M | RETURN_NOT_OK(EnumerateIntents( |
824 | 16.7M | path.as_slice(), |
825 | 16.7M | /* intent_value */ Slice(), |
826 | 16.7M | [&write_processor]( |
827 | 16.7M | auto strength, FullDocKey full_doc_key, auto, auto intent_key, auto) { |
828 | 16.7M | write_processor.Process(strength, full_doc_key, intent_key); |
829 | 16.7M | return Status::OK(); |
830 | 16.7M | }, |
831 | 16.7M | &buffer, |
832 | 16.7M | resolver->partial_range_key_intents())); |
833 | 16.7M | } |
834 | 6.78M | } |
835 | | // Either write_batch_.read_pairs is not empty or doc_ops is non empty. Both can't be non empty |
836 | | // together. This is because read_pairs is filled only in case of a read operation that has a |
837 | | // row mark or is part of a serializable txn. |
838 | | // 1. In case doc_ops are present, we use the default wait policy of WAIT_ERROR. |
839 | | // 2. In case of a read rpc that has wait_policy, we use that. |
840 | 987k | auto wait_policy = WAIT_ERROR; |
841 | 987k | const auto& pairs = write_batch_.read_pairs(); |
842 | 987k | if (!pairs.empty()) { |
843 | 386k | IntentProcessor read_processor( |
844 | 386k | &container, |
845 | 386k | GetStrongIntentTypeSet(metadata_.isolation, docdb::OperationKind::kRead, row_mark)); |
846 | 386k | wait_policy = write_batch_.wait_policy(); |
847 | 386k | RETURN_NOT_OK(EnumerateIntents( |
848 | 386k | pairs, |
849 | 386k | [&read_processor] ( |
850 | 386k | auto strength, FullDocKey full_doc_key, auto, auto intent_key, auto) { |
851 | 386k | read_processor.Process(strength, full_doc_key, intent_key); |
852 | 386k | return Status::OK(); |
853 | 386k | }, |
854 | 386k | resolver->partial_range_key_intents())); |
855 | 386k | } |
856 | | |
857 | 987k | if (container.empty()) { |
858 | 0 | return Status::OK(); |
859 | 0 | } |
860 | | |
861 | 987k | VLOG_WITH_PREFIX_AND_FUNC549 (4) << "Check txn's conflicts for following intents: " |
862 | 549 | << AsString(container); |
863 | | |
864 | 987k | StrongConflictChecker checker( |
865 | 987k | *transaction_id_, read_time_, resolver, GetConflictsMetric(), &buffer); |
866 | | // Iterator on intents DB should be created before iterator on regular DB. |
867 | | // This is to prevent the case when we create an iterator on the regular DB where a |
868 | | // provisional record has not yet been applied, and then create an iterator the intents |
869 | | // DB where the provisional record has already been removed. |
870 | 987k | resolver->EnsureIntentIteratorCreated(); |
871 | | |
872 | 27.2M | for (const auto& i : container) { |
873 | 27.2M | if (read_time_ != HybridTime::kMax) { |
874 | 23.4M | const Slice intent_key = i.first.AsSlice(); |
875 | 23.4M | bool strong = HasStrong(i.second.types); |
876 | | // For strong intents or weak intents at a full document key level (i.e. excluding intents |
877 | | // that omit some final range components of the document key), check for conflicts with |
878 | | // records in regular RocksDB. We need this because the row might have been deleted |
879 | | // concurrently by a single-shard transaction or a committed and applied transaction. |
880 | 23.4M | if (strong || i.second.full_doc_key6.96M ) { |
881 | 21.5M | RETURN_NOT_OK(checker.Check(intent_key, strong, wait_policy)); |
882 | 21.5M | } |
883 | 23.4M | } |
884 | 27.2M | buffer.Reset(i.first.AsSlice()); |
885 | 27.2M | RETURN_NOT_OK(resolver->ReadIntentConflicts(i.second.types, &buffer, wait_policy)); |
886 | 27.2M | } |
887 | | |
888 | 977k | return Status::OK(); |
889 | 987k | } |
890 | | |
891 | | CHECKED_STATUS CheckPriority(ConflictResolver* resolver, |
892 | 145k | boost::iterator_range<TransactionData*> transactions) override { |
893 | 145k | return CheckPriorityInternal(resolver, transactions, metadata_.transaction_id, |
894 | 145k | metadata_.priority); |
895 | 145k | } |
896 | | |
897 | | Result<bool> CheckConflictWithCommitted( |
898 | 170k | const TransactionData& transaction_data, HybridTime commit_time) override { |
899 | 170k | RSTATUS_DCHECK(commit_time.is_valid(), Corruption, "Invalid transaction commit time"); |
900 | | |
901 | 18.4E | VLOG_WITH_PREFIX(4) << "Committed: " << transaction_data.id << ", commit_time: " << commit_time |
902 | 18.4E | << ", read_time: " << read_time_ |
903 | 18.4E | << ", wait_policy:" << transaction_data.wait_policy |
904 | 18.4E | << ", all_lock_only_conflicts" << transaction_data.all_lock_only_conflicts; |
905 | | |
906 | | // If the intents to be written conflict with only "explicit row lock" intents of a committed |
907 | | // transaction, we can proceed now because a committed transaction implies that the locks are |
908 | | // released. In other words, only a committed transaction with some conflicting intent that |
909 | | // results in a modification to data in regular db, can result in a serialization error. |
910 | | // |
911 | | // commit_time equals to HybridTime::kMax means that transaction is not actually committed, |
912 | | // but is being committed. I.e. status tablet is trying to replicate COMMITTED state. |
913 | | // So we should always conflict with such transaction, because we are not able to read its |
914 | | // results. |
915 | | // |
916 | | // read_time equals to HybridTime::kMax in case of serializable isolation or when |
917 | | // read time was not yet picked for snapshot isolation. |
918 | | // So it should conflict only with transactions that are being committed. |
919 | | // |
920 | | // In all other cases we have concrete read time and should conflict with transactions |
921 | | // that were committed after this point. |
922 | 170k | if (!transaction_data.all_lock_only_conflicts && commit_time >= read_time_170k ) { |
923 | 20.8k | if (transaction_data.wait_policy == WAIT_SKIP) { |
924 | 0 | return STATUS(InternalError, "Skip locking since entity was modified by a recent commit", |
925 | 0 | TransactionError(TransactionErrorCode::kSkipLocking)); |
926 | 20.8k | } else { |
927 | 20.8k | return MakeConflictStatus( |
928 | 20.8k | *transaction_id_, transaction_data.id, "committed", GetConflictsMetric()); |
929 | 20.8k | } |
930 | 20.8k | } |
931 | | |
932 | 149k | return true; |
933 | 170k | } |
934 | | |
935 | 765k | bool IgnoreConflictsWith(const TransactionId& other) override { |
936 | 765k | return other == *transaction_id_; |
937 | 765k | } |
938 | | |
939 | 0 | TransactionId transaction_id() const override { |
940 | 0 | return *transaction_id_; |
941 | 0 | } |
942 | | |
943 | 56 | std::string ToString() const override { |
944 | 56 | return yb::ToString(transaction_id_); |
945 | 56 | } |
946 | | |
947 | | const KeyValueWriteBatchPB& write_batch_; |
948 | | |
949 | | // Read time of the transaction identified by transaction_id_, could be HybridTime::kMax in case |
950 | | // of serializable isolation or when read time not yet picked for snapshot isolation. |
951 | | const HybridTime read_time_; |
952 | | |
953 | | // Id of transaction when is writing intents, for which we are resolving conflicts. |
954 | | Result<TransactionId> transaction_id_; |
955 | | |
956 | | TransactionMetadata metadata_; |
957 | | |
958 | | Status result_ = Status::OK(); |
959 | | }; |
960 | | |
961 | | class OperationConflictResolverContext : public ConflictResolverContextBase { |
962 | | public: |
963 | | OperationConflictResolverContext(const DocOperations* doc_ops, |
964 | | HybridTime resolution_ht, |
965 | | Counter* conflicts_metric) |
966 | 122k | : ConflictResolverContextBase(*doc_ops, resolution_ht, conflicts_metric) { |
967 | 122k | } |
968 | | |
969 | 122k | virtual ~OperationConflictResolverContext() {} |
970 | | |
971 | | // Reads stored intents that could conflict with our operations. |
972 | 122k | CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) override { |
973 | 122k | boost::container::small_vector<RefCntPrefix, 8> doc_paths; |
974 | 122k | boost::container::small_vector<size_t, 32> key_prefix_lengths; |
975 | 122k | KeyBytes encoded_key_buffer; |
976 | | |
977 | 122k | IntentTypeSet strong_intent_types; |
978 | | |
979 | 122k | EnumerateIntentsCallback callback = [&strong_intent_types, resolver]( |
980 | 122k | IntentStrength intent_strength, FullDocKey full_doc_key, Slice, |
981 | 1.82M | KeyBytes* encoded_key_buffer, LastKey) { |
982 | 1.82M | return resolver->ReadIntentConflicts( |
983 | 1.82M | intent_strength == IntentStrength::kStrong ? strong_intent_types543k |
984 | 1.82M | : StrongToWeak(strong_intent_types)1.28M , |
985 | 1.82M | encoded_key_buffer, WAIT_ERROR); |
986 | 1.82M | }; |
987 | | |
988 | 498k | for (const auto& doc_op : doc_ops()) { |
989 | 498k | doc_paths.clear(); |
990 | 498k | IsolationLevel isolation; |
991 | 498k | RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kIntents, &doc_paths, &isolation)); |
992 | | |
993 | 498k | strong_intent_types = GetStrongIntentTypeSet(isolation, OperationKind::kWrite, |
994 | 498k | RowMarkType::ROW_MARK_ABSENT); |
995 | | |
996 | 543k | for (const auto& doc_path : doc_paths) { |
997 | 543k | VLOG_WITH_PREFIX_AND_FUNC11 (4) |
998 | 11 | << "Doc path: " << SubDocKey::DebugSliceToString(doc_path.as_slice()); |
999 | 543k | RETURN_NOT_OK(EnumerateIntents( |
1000 | 543k | doc_path.as_slice(), Slice(), callback, &encoded_key_buffer, |
1001 | 543k | PartialRangeKeyIntents::kTrue)); |
1002 | 543k | } |
1003 | 498k | } |
1004 | | |
1005 | 122k | return Status::OK(); |
1006 | 122k | } |
1007 | | |
1008 | | CHECKED_STATUS CheckPriority(ConflictResolver* resolver, |
1009 | 1.29k | boost::iterator_range<TransactionData*> transactions) override { |
1010 | 1.29k | return CheckPriorityInternal(resolver, |
1011 | 1.29k | transactions, |
1012 | 1.29k | TransactionId::Nil(), |
1013 | 1.29k | kHighPriTxnLowerBound - 1 /* our_priority */); |
1014 | 1.29k | } |
1015 | | |
1016 | 1.31k | bool IgnoreConflictsWith(const TransactionId& other) override { |
1017 | 1.31k | return false; |
1018 | 1.31k | } |
1019 | | |
1020 | 0 | TransactionId transaction_id() const override { |
1021 | 0 | return TransactionId::Nil(); |
1022 | 0 | } |
1023 | | |
1024 | 0 | std::string ToString() const override { |
1025 | 0 | return "Operation Context"; |
1026 | 0 | } |
1027 | | |
1028 | | Result<bool> CheckConflictWithCommitted( |
1029 | 1.01k | const TransactionData& transaction_data, HybridTime commit_time) override { |
1030 | 1.01k | if (commit_time != HybridTime::kMax) { |
1031 | 62 | MakeResolutionAtLeast(commit_time); |
1032 | 62 | return true; |
1033 | 62 | } |
1034 | 951 | return false; |
1035 | 1.01k | } |
1036 | | }; |
1037 | | |
1038 | | } // namespace |
1039 | | |
1040 | | void ResolveTransactionConflicts(const DocOperations& doc_ops, |
1041 | | const KeyValueWriteBatchPB& write_batch, |
1042 | | HybridTime hybrid_time, |
1043 | | HybridTime read_time, |
1044 | | const DocDB& doc_db, |
1045 | | PartialRangeKeyIntents partial_range_key_intents, |
1046 | | TransactionStatusManager* status_manager, |
1047 | | Counter* conflicts_metric, |
1048 | 987k | ResolutionCallback callback) { |
1049 | 987k | DCHECK(hybrid_time.is_valid()); |
1050 | 987k | TRACE("ResolveTransactionConflicts"); |
1051 | 987k | auto context = std::make_unique<TransactionConflictResolverContext>( |
1052 | 987k | doc_ops, write_batch, hybrid_time, read_time, conflicts_metric); |
1053 | 987k | auto resolver = std::make_shared<ConflictResolver>( |
1054 | 987k | doc_db, status_manager, partial_range_key_intents, std::move(context), std::move(callback)); |
1055 | | // Resolve takes a self reference to extend lifetime. |
1056 | 987k | resolver->Resolve(); |
1057 | 987k | TRACE("resolver->Resolve done"); |
1058 | 987k | } |
1059 | | |
1060 | | void ResolveOperationConflicts(const DocOperations& doc_ops, |
1061 | | HybridTime resolution_ht, |
1062 | | const DocDB& doc_db, |
1063 | | PartialRangeKeyIntents partial_range_key_intents, |
1064 | | TransactionStatusManager* status_manager, |
1065 | | Counter* conflicts_metric, |
1066 | 122k | ResolutionCallback callback) { |
1067 | 122k | TRACE("ResolveOperationConflicts"); |
1068 | 122k | auto context = std::make_unique<OperationConflictResolverContext>(&doc_ops, resolution_ht, |
1069 | 122k | conflicts_metric); |
1070 | 122k | auto resolver = std::make_shared<ConflictResolver>( |
1071 | 122k | doc_db, status_manager, partial_range_key_intents, std::move(context), std::move(callback)); |
1072 | | // Resolve takes a self reference to extend lifetime. |
1073 | 122k | resolver->Resolve(); |
1074 | 122k | TRACE("resolver->Resolve done"); |
1075 | 122k | } |
1076 | | |
1077 | | #define INTENT_KEY_SCHECK(lhs, op, rhs, msg) \ |
1078 | 260M | BOOST_PP_CAT(SCHECK_, op)(lhs, \ |
1079 | 260M | rhs, \ |
1080 | 260M | Corruption, \ |
1081 | 260M | Format("Bad intent key, $0 in $1, transaction from: $2", \ |
1082 | 260M | msg, \ |
1083 | 260M | intent_key.ToDebugHexString(), \ |
1084 | 260M | transaction_id_source.ToDebugHexString())) |
1085 | | |
1086 | | // transaction_id_slice used in INTENT_KEY_SCHECK |
1087 | 86.9M | Result<ParsedIntent> ParseIntentKey(Slice intent_key, Slice transaction_id_source) { |
1088 | 86.9M | ParsedIntent result; |
1089 | 86.9M | size_t doc_ht_size = 0; |
1090 | 86.9M | result.doc_path = intent_key; |
1091 | | // Intent is encoded as "DocPath + IntentType + DocHybridTime". |
1092 | 86.9M | RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(result.doc_path, &doc_ht_size)); |
1093 | | // 3 comes from (ValueType::kIntentType, the actual intent type, ValueType::kHybridTime). |
1094 | 86.9M | INTENT_KEY_SCHECK(result.doc_path.size(), GE, doc_ht_size + 3, "key too short"); |
1095 | 86.9M | result.doc_path.remove_suffix(doc_ht_size + 3); |
1096 | 86.9M | auto intent_type_and_doc_ht = result.doc_path.end(); |
1097 | 86.9M | if (intent_type_and_doc_ht[0] == ValueTypeAsChar::kObsoleteIntentType) { |
1098 | 0 | result.types = ObsoleteIntentTypeToSet(intent_type_and_doc_ht[1]); |
1099 | 86.9M | } else if (intent_type_and_doc_ht[0] == ValueTypeAsChar::kObsoleteIntentTypeSet) { |
1100 | 0 | result.types = ObsoleteIntentTypeSetToNew(intent_type_and_doc_ht[1]); |
1101 | 86.9M | } else { |
1102 | 86.9M | INTENT_KEY_SCHECK(intent_type_and_doc_ht[0], EQ, ValueTypeAsChar::kIntentTypeSet, |
1103 | 86.9M | "intent type set type expected"); |
1104 | 86.9M | result.types = IntentTypeSet(intent_type_and_doc_ht[1]); |
1105 | 86.9M | } |
1106 | 86.9M | INTENT_KEY_SCHECK(intent_type_and_doc_ht[2], EQ, ValueTypeAsChar::kHybridTime, |
1107 | 86.9M | "hybrid time value type expected"); |
1108 | 86.9M | result.doc_ht = Slice(result.doc_path.end() + 2, doc_ht_size + 1); |
1109 | 86.9M | return result; |
1110 | 86.9M | } |
1111 | | |
1112 | 0 | std::string DebugIntentKeyToString(Slice intent_key) { |
1113 | 0 | auto parsed = ParseIntentKey(intent_key, Slice()); |
1114 | 0 | if (!parsed.ok()) { |
1115 | 0 | LOG(WARNING) << "Failed to parse: " << intent_key.ToDebugHexString() << ": " << parsed.status(); |
1116 | 0 | return intent_key.ToDebugHexString(); |
1117 | 0 | } |
1118 | 0 | auto doc_ht = DocHybridTime::DecodeFromEnd(parsed->doc_ht); |
1119 | 0 | if (!doc_ht.ok()) { |
1120 | 0 | LOG(WARNING) << "Failed to decode doc ht: " << intent_key.ToDebugHexString() << ": " |
1121 | 0 | << doc_ht.status(); |
1122 | 0 | return intent_key.ToDebugHexString(); |
1123 | 0 | } |
1124 | 0 | return Format("$0 (key: $1 type: $2 doc_ht: $3)", |
1125 | 0 | intent_key.ToDebugHexString(), SubDocKey::DebugSliceToString(parsed->doc_path), |
1126 | 0 | parsed->types, *doc_ht); |
1127 | 0 | } |
1128 | | |
1129 | | } // namespace docdb |
1130 | | } // namespace yb |