YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/conflict_resolution.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/docdb/conflict_resolution.h"
14
15
#include <map>
16
17
#include "yb/common/hybrid_time.h"
18
#include "yb/common/row_mark.h"
19
#include "yb/common/transaction.h"
20
#include "yb/common/transaction_error.h"
21
#include "yb/common/transaction_priority.h"
22
#include "yb/docdb/doc_key.h"
23
#include "yb/docdb/docdb.h"
24
#include "yb/docdb/docdb.pb.h"
25
#include "yb/docdb/docdb_rocksdb_util.h"
26
#include "yb/docdb/intent.h"
27
#include "yb/docdb/shared_lock_manager.h"
28
#include "yb/docdb/transaction_dump.h"
29
#include "yb/util/logging.h"
30
#include "yb/util/metrics.h"
31
#include "yb/util/scope_exit.h"
32
#include "yb/util/status_format.h"
33
#include "yb/util/trace.h"
34
35
using namespace std::literals;
36
using namespace std::placeholders;
37
38
namespace yb {
39
namespace docdb {
40
41
namespace {
42
43
struct TransactionConflictInfo {
44
  WaitPolicy wait_policy;
45
  bool all_lock_only_conflicts;
46
47
0
  std::string ToString() const {
48
0
    return YB_STRUCT_TO_STRING(wait_policy, all_lock_only_conflicts);
49
0
  }
50
};
51
52
using TransactionConflictInfoMap = std::unordered_map<TransactionId,
53
                                                      TransactionConflictInfo,
54
                                                      TransactionIdHash>;
55
56
struct TransactionData {
57
  TransactionData(TransactionId id_, WaitPolicy wait_policy_, bool all_lock_only_conflicts_)
58
414k
      : id(id_), wait_policy(wait_policy_), all_lock_only_conflicts(all_lock_only_conflicts_) {}
59
60
  TransactionId id;
61
  WaitPolicy wait_policy;
62
  // all_lock_only_conflicts is true if all conflicting intents of this transaction are explicit row
63
  // lock intents and not intents that would result in modifications to data in regular db.
64
  bool all_lock_only_conflicts;
65
  TransactionStatus status;
66
  HybridTime commit_time;
67
  uint64_t priority;
68
  Status failure;
69
70
334k
  void ProcessStatus(const TransactionStatusResult& result) {
71
334k
    status = result.status;
72
334k
    if (status == TransactionStatus::COMMITTED) {
73
94.8k
      LOG_IF(DFATAL, !result.status_time.is_valid())
74
1
          << "Status time not specified for committed transaction: " << id;
75
94.8k
      commit_time = result.status_time;
76
94.8k
    }
77
334k
  }
78
79
0
  std::string ToString() const {
80
0
    return Format("{ id: $0 status: $1 commit_time: $2 priority: $3 failure: $4 }",
81
0
                  id, TransactionStatus_Name(status), commit_time, priority, failure);
82
0
  }
83
};
84
85
CHECKED_STATUS MakeConflictStatus(const TransactionId& our_id, const TransactionId& other_id,
86
119k
                                  const char* reason, Counter* conflicts_metric) {
87
119k
  conflicts_metric->Increment();
88
119k
  return (STATUS(TryAgain, Format("$0 Conflicts with $1 transaction: $2", our_id, reason, other_id),
89
119k
                 Slice(), TransactionError(TransactionErrorCode::kConflict)));
90
119k
}
91
92
class ConflictResolver;
93
94
class ConflictResolverContext {
95
 public:
96
  // Read all conflicts for operation/transaction.
97
  virtual CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) = 0;
98
99
  // Check priority of this one against existing transactions.
100
  virtual CHECKED_STATUS CheckPriority(
101
      ConflictResolver* resolver,
102
      boost::iterator_range<TransactionData*> transactions) = 0;
103
104
  // Check for conflict against committed transaction.
105
  // Returns true if transaction could be removed from list of conflicts.
106
  virtual Result<bool> CheckConflictWithCommitted(
107
      const TransactionData& transaction_data, HybridTime commit_time) = 0;
108
109
  virtual HybridTime GetResolutionHt() = 0;
110
111
  virtual bool IgnoreConflictsWith(const TransactionId& other) = 0;
112
113
  virtual TransactionId transaction_id() const = 0;
114
115
  virtual std::string ToString() const = 0;
116
117
56
  std::string LogPrefix() const {
118
56
    return ToString() + ": ";
119
56
  }
120
121
1.11M
  virtual ~ConflictResolverContext() = default;
122
};
123
124
class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
125
 public:
126
  ConflictResolver(const DocDB& doc_db,
127
                   TransactionStatusManager* status_manager,
128
                   PartialRangeKeyIntents partial_range_key_intents,
129
                   std::unique_ptr<ConflictResolverContext> context,
130
                   ResolutionCallback callback)
131
      : doc_db_(doc_db), status_manager_(*status_manager), request_scope_(status_manager),
132
        partial_range_key_intents_(partial_range_key_intents), context_(std::move(context)),
133
1.10M
        callback_(std::move(callback)) {}
134
135
17.0M
  PartialRangeKeyIntents partial_range_key_intents() {
136
17.0M
    return partial_range_key_intents_;
137
17.0M
  }
138
139
928k
  TransactionStatusManager& status_manager() {
140
928k
    return status_manager_;
141
928k
  }
142
143
8.43M
  const DocDB& doc_db() {
144
8.43M
    return doc_db_;
145
8.43M
  }
146
147
986k
  Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) {
148
986k
    return status_manager_.PrepareMetadata(pb);
149
986k
  }
150
151
  void FillPriorities(
152
146k
      boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) {
153
146k
    return status_manager_.FillPriorities(inout);
154
146k
  }
155
156
1.10M
  void Resolve() {
157
1.10M
    auto status = context_->ReadConflicts(this);
158
1.10M
    if (!status.ok()) {
159
10.1k
      InvokeCallback(status);
160
10.1k
      return;
161
10.1k
    }
162
163
1.09M
    ResolveConflicts();
164
1.09M
  }
165
166
99.7k
  Result<WaitPolicy> CombineWaitPolicy(WaitPolicy existing_policy, WaitPolicy new_policy) {
167
99.7k
    RSTATUS_DCHECK(
168
99.7k
        existing_policy != WAIT_BLOCK, InternalError, "WAIT_BLOCK isn't support yet.");
169
170
99.7k
    switch(new_policy) {
171
0
      case WAIT_BLOCK:
172
0
        return STATUS(NotSupported, "WAIT_BLOCK isn't support yet.");
173
99.6k
      case WAIT_ERROR:
174
        // Even if some intent had a wait policy of WAIT_SKIP, WAIT_ERROR overrides that policy.
175
99.6k
        return new_policy;
176
14
      case WAIT_SKIP:
177
        // The existing_policy can either be WAIT_ERROR or WAIT_SKIP. In either case, we can leave
178
        // it untouched.
179
14
        return existing_policy;
180
99.7k
    }
181
0
    return STATUS(NotSupported, "Unknown wait policy.");
182
99.7k
  }
183
184
  // Reads conflicts for specified intent from DB.
185
  CHECKED_STATUS ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix,
186
29.0M
                                     WaitPolicy wait_policy) {
187
29.0M
    EnsureIntentIteratorCreated();
188
189
29.0M
    const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()];
190
191
29.0M
    KeyBytes upperbound_key(*intent_key_prefix);
192
29.0M
    upperbound_key.AppendValueType(ValueType::kMaxByte);
193
29.0M
    intent_key_upperbound_ = upperbound_key.AsSlice();
194
195
29.0M
    size_t original_size = intent_key_prefix->size();
196
29.0M
    intent_key_prefix->AppendValueType(ValueType::kIntentTypeSet);
197
    // Have only weak intents, so could skip other weak intents.
198
29.0M
    if (!HasStrong(type)) {
199
8.98M
      char value = 1 << kStrongIntentFlag;
200
8.98M
      intent_key_prefix->AppendRawBytes(&value, 1);
201
8.98M
    }
202
29.0M
    auto se = ScopeExit([this, intent_key_prefix, original_size] {
203
29.0M
      intent_key_prefix->Truncate(original_size);
204
29.0M
      intent_key_upperbound_.clear();
205
29.0M
    });
206
29.0M
    Slice prefix_slice(intent_key_prefix->AsSlice().data(), original_size);
207
29.0M
    
VLOG_WITH_PREFIX_AND_FUNC788
(4) << "Check conflicts in intents DB; Seek: "
208
788
                                 << intent_key_prefix->AsSlice().ToDebugHexString() << " for type "
209
788
                                 << ToString(type) << " and wait_policy=" << wait_policy;
210
29.0M
    intent_iter_.Seek(intent_key_prefix->AsSlice());
211
39.4M
    while (intent_iter_.Valid()) {
212
10.6M
      auto existing_key = intent_iter_.key();
213
10.6M
      auto existing_value = intent_iter_.value();
214
10.6M
      if (!existing_key.starts_with(prefix_slice)) {
215
0
        break;
216
0
      }
217
      // Support for obsolete intent type.
218
      // When looking for intent with specific prefix it should start with this prefix, followed
219
      // by ValueType::kIntentTypeSet.
220
      // Previously we were using intent type, so should support its value type also, now it is
221
      // kObsoleteIntentType.
222
      // Actual handling of obsolete intent type is done in ParseIntentKey.
223
10.6M
      if (existing_key.size() <= prefix_slice.size() ||
224
10.6M
          !IntentValueType(existing_key[prefix_slice.size()])) {
225
239k
        break;
226
239k
      }
227
228
10.4M
      auto existing_intent = VERIFY_RESULT(
229
10.4M
          docdb::ParseIntentKey(intent_iter_.key(), existing_value));
230
231
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(4) << "Found: " << existing_value.ToDebugString()
232
18.4E
                                   << " has intent types " << ToString(existing_intent.types);
233
10.4M
      auto decoded_value = VERIFY_RESULT(DecodeIntentValue(
234
10.4M
          existing_value, nullptr /* verify_transaction_id_slice */,
235
10.4M
          HasStrong(existing_intent.types)));
236
0
      const auto intent_mask = kIntentTypeSetMask[existing_intent.types.ToUIntPtr()];
237
10.4M
      if ((conflicting_intent_types & intent_mask) != 0) {
238
766k
        auto transaction_id = decoded_value.transaction_id;
239
766k
        bool lock_only = decoded_value.body.starts_with(ValueTypeAsChar::kRowLock);
240
241
        // TODO(savepoints) - if the intent corresponds to an aborted subtransaction, ignore.
242
766k
        if (!context_->IgnoreConflictsWith(transaction_id)) {
243
514k
          auto p = conflicts_.emplace(transaction_id,
244
514k
                                      TransactionConflictInfo {
245
514k
                                        .wait_policy = wait_policy,
246
514k
                                        .all_lock_only_conflicts = lock_only,
247
514k
                                      });
248
514k
          if (!p.second) {
249
99.7k
            p.first->second.wait_policy = VERIFY_RESULT(
250
0
                CombineWaitPolicy(p.first->second.wait_policy, wait_policy));
251
99.7k
            p.first->second.all_lock_only_conflicts = p.first->second.all_lock_only_conflicts &&
252
99.7k
                                                      
lock_only14
;
253
99.7k
          }
254
514k
        }
255
766k
      }
256
257
10.4M
      intent_iter_.Next();
258
10.4M
    }
259
260
29.0M
    return Status::OK();
261
29.0M
  }
262
263
30.0M
  void EnsureIntentIteratorCreated() {
264
30.0M
    if (!intent_iter_.Initialized()) {
265
1.10M
      intent_iter_ = CreateRocksDBIterator(
266
1.10M
          doc_db_.intents,
267
1.10M
          doc_db_.key_bounds,
268
1.10M
          BloomFilterMode::DONT_USE_BLOOM_FILTER,
269
1.10M
          boost::none /* user_key_for_filter */,
270
1.10M
          rocksdb::kDefaultQueryId,
271
1.10M
          nullptr /* file_filter */,
272
1.10M
          &intent_key_upperbound_);
273
1.10M
    }
274
30.0M
  }
275
276
 private:
277
1.10M
  void InvokeCallback(const Result<HybridTime>& result) {
278
1.10M
    YB_TRANSACTION_DUMP(
279
1.10M
        Conflicts, context_->transaction_id(),
280
1.10M
        result.ok() ? *result : HybridTime::kInvalid,
281
1.10M
        Slice(pointer_cast<const uint8_t*>(transactions_.data()),
282
1.10M
              transactions_.size() * sizeof(transactions_[0])));
283
1.10M
    intent_iter_.Reset();
284
1.10M
    callback_(result);
285
1.10M
  }
286
287
585k
  MUST_USE_RESULT bool CheckResolutionDone(const Result<bool>& result) {
288
585k
    if (!result.ok()) {
289
119k
      TRACE("Abort: $0", result.status().ToString());
290
18.4E
      VLOG_WITH_PREFIX(4) << "Abort: " << result.status();
291
119k
      InvokeCallback(result.status());
292
119k
      return true;
293
119k
    }
294
295
465k
    if (result.get()) {
296
159k
      TRACE("No conflicts.");
297
18.4E
      VLOG_WITH_PREFIX(4) << "No conflicts: " << context_->GetResolutionHt();
298
159k
      InvokeCallback(context_->GetResolutionHt());
299
159k
      return true;
300
159k
    }
301
302
306k
    return false;
303
465k
  }
304
305
1.09M
  void ResolveConflicts() {
306
1.09M
    
VLOG_WITH_PREFIX257
(3) << "Conflicts: " << yb::ToString(conflicts_)257
;
307
1.09M
    if (conflicts_.empty()) {
308
821k
      VTRACE(1, LogPrefix());
309
821k
      TRACE("No conflicts.");
310
821k
      InvokeCallback(context_->GetResolutionHt());
311
821k
      return;
312
821k
    }
313
314
278k
    transactions_.reserve(conflicts_.size());
315
414k
    for (const auto& kv : conflicts_) {
316
414k
      transactions_.emplace_back(kv.first /* id */,
317
414k
                                 kv.second.wait_policy,
318
414k
                                 kv.second.all_lock_only_conflicts);
319
414k
    }
320
278k
    remaining_transactions_ = transactions_.size();
321
322
278k
    DoResolveConflicts();
323
278k
  }
324
325
279k
  void DoResolveConflicts() {
326
279k
    if (CheckResolutionDone(CheckLocalCommits())) {
327
22.4k
      return;
328
22.4k
    }
329
330
256k
    FetchTransactionStatuses();
331
256k
  }
332
333
256k
  void FetchTransactionStatusesDone() {
334
256k
    if (CheckResolutionDone(ContinueResolve())) {
335
208k
      return;
336
208k
    }
337
256k
  }
338
339
256k
  Result<bool> ContinueResolve() {
340
256k
    if (VERIFY_RESULT(Cleanup())) {
341
106k
      return true;
342
106k
    }
343
344
150k
    RETURN_NOT_OK(context_->CheckPriority(this, RemainingTransactions()));
345
346
51.9k
    AbortTransactions();
347
51.9k
    return false;
348
150k
  }
349
350
  // Returns true when there are no conflicts left.
351
279k
  Result<bool> CheckLocalCommits() {
352
415k
    return DoCleanup([this](auto* transaction) -> Result<bool> {
353
415k
      return this->CheckLocalCommit(transaction);
354
415k
    });
355
279k
  }
356
357
  // Check whether specified transaction was locally committed, and store this state if so.
358
  // Returns true if conflict with specified transaction is resolved.
359
415k
  Result<bool> CheckLocalCommit(TransactionData* transaction) {
360
    // TODO(savepoints): Do not conflict with aborted intents.
361
415k
    auto commit_time = status_manager().LocalCommitTime(transaction->id);
362
415k
    if (commit_time.is_valid()) {
363
77.9k
      transaction->commit_time = commit_time;
364
77.9k
      transaction->status = TransactionStatus::COMMITTED;
365
77.9k
    }
366
    // In case of failure status, we stop the resolution process, so `transactions_` content
367
    // does not matter in this case.
368
415k
    if (!(commit_time.is_valid() &&
369
415k
          VERIFY_RESULT(context_->CheckConflictWithCommitted(*transaction, commit_time)))) {
370
338k
      return false;
371
338k
    }
372
18.4E
    VLOG_WITH_PREFIX(4) << "Locally committed: " << transaction->id << ", time: " << commit_time;
373
77.4k
    return true;
374
415k
  }
375
376
  // Apply specified functor to all active (i.e. not resolved) transactions.
377
  // If functor returns true, it means that transaction was resolved.
378
  // So such transaction is moved out of active transactions range.
379
  // Returns true if there are no active transaction left.
380
  template <class F>
381
585k
  Result<bool> DoCleanup(const F& f) {
382
585k
    auto end = transactions_.begin() + remaining_transactions_;
383
1.20M
    for (auto transaction = transactions_.begin(); transaction != end;) {
384
808k
      if (!VERIFY_RESULT(f(&*transaction))) {
385
519k
        ++transaction;
386
519k
        continue;
387
519k
      }
388
288k
      if (--end == transaction) {
389
186k
        break;
390
186k
      }
391
101k
      std::swap(*transaction, *end);
392
101k
    }
393
585k
    remaining_transactions_ = end - transactions_.begin();
394
395
585k
    return remaining_transactions_ == 0;
396
585k
  }
conflict_resolution.cc:yb::Result<bool> yb::docdb::(anonymous namespace)::ConflictResolver::DoCleanup<yb::docdb::(anonymous namespace)::ConflictResolver::CheckLocalCommits()::'lambda'(auto*)>(auto const&)
Line
Count
Source
381
279k
  Result<bool> DoCleanup(const F& f) {
382
279k
    auto end = transactions_.begin() + remaining_transactions_;
383
657k
    for (auto transaction = transactions_.begin(); transaction != end;) {
384
415k
      if (!VERIFY_RESULT(f(&*transaction))) {
385
338k
        ++transaction;
386
338k
        continue;
387
338k
      }
388
77.4k
      if (--end == transaction) {
389
37.4k
        break;
390
37.4k
      }
391
40.0k
      std::swap(*transaction, *end);
392
40.0k
    }
393
279k
    remaining_transactions_ = end - transactions_.begin();
394
395
279k
    return remaining_transactions_ == 0;
396
279k
  }
conflict_resolution.cc:yb::Result<bool> yb::docdb::(anonymous namespace)::ConflictResolver::DoCleanup<yb::docdb::(anonymous namespace)::ConflictResolver::Cleanup()::'lambda'(auto*)>(auto const&)
Line
Count
Source
381
305k
  Result<bool> DoCleanup(const F& f) {
382
305k
    auto end = transactions_.begin() + remaining_transactions_;
383
548k
    for (auto transaction = transactions_.begin(); transaction != end;) {
384
392k
      if (!VERIFY_RESULT(f(&*transaction))) {
385
181k
        ++transaction;
386
181k
        continue;
387
181k
      }
388
211k
      if (--end == transaction) {
389
149k
        break;
390
149k
      }
391
61.9k
      std::swap(*transaction, *end);
392
61.9k
    }
393
305k
    remaining_transactions_ = end - transactions_.begin();
394
395
305k
    return remaining_transactions_ == 0;
396
305k
  }
397
398
  // Removes all transactions that would not conflict with us anymore.
399
  // Returns failure if we conflict with transaction that cannot be aborted.
400
305k
  Result<bool> Cleanup() {
401
392k
    return DoCleanup([this](auto* transaction) -> Result<bool> {
402
392k
      return this->CheckCleanup(transaction);
403
392k
    });
404
305k
  }
405
406
392k
  Result<bool> CheckCleanup(TransactionData* transaction) {
407
392k
    RETURN_NOT_OK(transaction->failure);
408
392k
    auto status = transaction->status;
409
392k
    if (status == TransactionStatus::COMMITTED) {
410
93.6k
      if (VERIFY_RESULT(context_->CheckConflictWithCommitted(
411
93.6k
              *transaction, transaction->commit_time))) {
412
71.9k
        
VLOG_WITH_PREFIX1
(4)
413
1
            << "Committed: " << transaction->id << ", commit time: " << transaction->commit_time;
414
71.9k
        return true;
415
71.9k
      }
416
298k
    } else if (status == TransactionStatus::ABORTED) {
417
118k
      auto commit_time = status_manager().LocalCommitTime(transaction->id);
418
118k
      if (commit_time) {
419
119
        if (VERIFY_RESULT(context_->CheckConflictWithCommitted(*transaction, commit_time))) {
420
114
          
VLOG_WITH_PREFIX0
(4)
421
0
              << "Locally committed: " << transaction->id << "< commit time: " << commit_time;
422
114
          return true;
423
114
        }
424
118k
      } else {
425
118k
        
VLOG_WITH_PREFIX0
(4) << "Aborted: " << transaction->id0
;
426
118k
        return true;
427
118k
      }
428
180k
    } else if (status != TransactionStatus::PENDING && 
status != TransactionStatus::APPLYING0
) {
429
0
      return STATUS_FORMAT(
430
0
          IllegalState, "Unexpected transaction state: $0", TransactionStatus_Name(status));
431
0
    }
432
202k
    return false;
433
392k
  }
434
435
452k
  boost::iterator_range<TransactionData*> RemainingTransactions() {
436
452k
    auto begin = transactions_.data();
437
452k
    return boost::make_iterator_range(begin, begin + remaining_transactions_);
438
452k
  }
439
440
256k
  void FetchTransactionStatuses() {
441
256k
    static const std::string kRequestReason = "conflict resolution"s;
442
256k
    auto self = shared_from_this();
443
256k
    pending_requests_.store(remaining_transactions_);
444
338k
    for (auto& i : RemainingTransactions()) {
445
338k
      auto& transaction = i;
446
338k
      TRACE("FetchingTransactionStatus for $0", yb::ToString(transaction.id));
447
338k
      StatusRequest request = {
448
338k
        &transaction.id,
449
338k
        context_->GetResolutionHt(),
450
338k
        context_->GetResolutionHt(),
451
338k
        0, // serial no. Could use 0 here, because read_ht == global_limit_ht.
452
           // So we cannot accept status with time >= read_ht and < global_limit_ht.
453
338k
        &kRequestReason,
454
338k
        TransactionLoadFlags{TransactionLoadFlag::kCleanup},
455
338k
        [self, &transaction](Result<TransactionStatusResult> result) {
456
338k
          if (result.ok()) {
457
278k
            transaction.ProcessStatus(*result);
458
278k
          } else 
if (59.3k
result.status().IsTryAgain()59.3k
) {
459
            // It is safe to suppose that transaction in PENDING state in case of try again error.
460
48.4k
            transaction.status = TransactionStatus::PENDING;
461
48.4k
          } else 
if (10.9k
result.status().IsNotFound()10.9k
) {
462
10.9k
            transaction.status = TransactionStatus::ABORTED;
463
18.4E
          } else {
464
18.4E
            transaction.failure = result.status();
465
18.4E
          }
466
338k
          if (self->pending_requests_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
467
256k
            self->FetchTransactionStatusesDone();
468
256k
          }
469
338k
        }
470
338k
      };
471
338k
      status_manager().RequestStatusAt(request);
472
338k
    }
473
256k
  }
474
475
48.8k
  void AbortTransactions() {
476
48.8k
    auto self = shared_from_this();
477
48.8k
    pending_requests_.store(remaining_transactions_);
478
55.9k
    for (auto& i : RemainingTransactions()) {
479
55.9k
      auto& transaction = i;
480
55.9k
      TRACE("Aborting $0", yb::ToString(transaction.id));
481
55.9k
      status_manager().Abort(
482
55.9k
          transaction.id,
483
55.9k
          [self, &transaction](Result<TransactionStatusResult> result) {
484
55.9k
        VLOG
(4) << self->LogPrefix() << "Abort received: " << AsString(result)0
;
485
55.9k
        if (result.ok()) {
486
55.9k
          transaction.ProcessStatus(*result);
487
55.9k
        } else 
if (56
result.status().IsRemoteError()56
||
result.status().IsAborted()56
) {
488
          // Non retryable errors. Aborted could be caused by shutdown.
489
0
          transaction.failure = result.status();
490
56
        } else {
491
56
          LOG(INFO) << self->LogPrefix() << "Abort failed, would retry: " << result.status();
492
56
        }
493
55.9k
        if (self->pending_requests_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
494
48.8k
          self->AbortTransactionsDone();
495
48.8k
        }
496
55.9k
      });
497
55.9k
    }
498
48.8k
  }
499
500
48.8k
  void AbortTransactionsDone() {
501
48.8k
    if (CheckResolutionDone(Cleanup())) {
502
47.8k
      return;
503
47.8k
    }
504
505
978
    DoResolveConflicts();
506
978
  }
507
508
56
  std::string LogPrefix() const {
509
56
    return context_->LogPrefix();
510
56
  }
511
512
  DocDB doc_db_;
513
  TransactionStatusManager& status_manager_;
514
  RequestScope request_scope_;
515
  PartialRangeKeyIntents partial_range_key_intents_;
516
  std::unique_ptr<ConflictResolverContext> context_;
517
  ResolutionCallback callback_;
518
519
  BoundedRocksDbIterator intent_iter_;
520
  Slice intent_key_upperbound_;
521
  TransactionConflictInfoMap conflicts_;
522
523
  // Resolution state for all transactions. Resolved transactions are moved to the end of it.
524
  std::vector<TransactionData> transactions_;
525
  // Number of transactions that are not yet resolved. After successful resolution should be 0.
526
  size_t remaining_transactions_;
527
528
  std::atomic<size_t> pending_requests_{0};
529
};
530
531
struct IntentData {
532
  IntentTypeSet types;
533
  bool full_doc_key;
534
535
0
  std::string ToString() const {
536
0
    return YB_STRUCT_TO_STRING(types, full_doc_key);
537
0
  }
538
};
539
540
using IntentTypesContainer = std::map<KeyBuffer, IntentData>;
541
542
class IntentProcessor {
543
 public:
544
  IntentProcessor(IntentTypesContainer* container, const IntentTypeSet& strong_intent_types)
545
      : container_(*container),
546
        strong_intent_types_(strong_intent_types),
547
        weak_intent_types_(StrongToWeak(strong_intent_types_))
548
1.37M
  {}
549
550
57.0M
  void Process(IntentStrength strength, FullDocKey full_doc_key, KeyBytes* intent_key) {
551
57.0M
    const auto is_strong = strength == IntentStrength::kStrong;
552
57.0M
    const auto& intent_type_set = is_strong ? 
strong_intent_types_19.6M
:
weak_intent_types_37.4M
;
553
57.0M
    auto i = container_.find(intent_key->data());
554
57.0M
    if (i == container_.end()) {
555
27.2M
      container_.emplace(intent_key->data(),
556
27.2M
                         IntentData{intent_type_set, full_doc_key});
557
27.2M
      return;
558
27.2M
    }
559
560
29.8M
    i->second.types |= intent_type_set;
561
562
    // In a batch of keys, the computed full_doc_key value might vary based on the key that produced
563
    // a particular intent. E.g. suppose we have a primary key (h, r) and s is a subkey. If we are
564
    // trying to write strong intents on (h) and (h, r, s) in a batch, we end up with the following
565
    // intent types:
566
    //
567
    // (h) -> strong, full_doc_key: true (always true for strong intents)
568
    // (h, r) -> weak, full_doc_key: true (we did not omit any final doc key components)
569
    // (h, r, s) -> strong, full_doc_key: true
570
    //
571
    // Note that full_doc_key is always true for strong intents because we process one key at a time
572
    // and when taking that key by itself, (h) looks like the full doc key (nothing follows it).
573
    // In the above example, the intent (h) is generated both as a strong intent and as a weak
574
    // intent based on keys (h, r) and (h, r, s), and we OR the value of full_doc_key and end up
575
    // with true.
576
    //
577
    // If we are trying to write strong intents on (h, r) and (h, r, s), we get:
578
    //
579
    // (h) -> weak, full_doc_key: false (because we know it is just part of the doc key)
580
    // (h, r) -> strong, full_doc_key: true
581
    // (h, r, s) -> strong, full_doc_key: true
582
    //
583
    // So we effectively end up with three types of intents:
584
    // - Weak intents with full_doc_key=false
585
    // - Weak intents with full_doc_key=true
586
    // - Strong intents with full_doc_key=true.
587
29.8M
    i->second.full_doc_key = i->second.full_doc_key || 
full_doc_key19.8M
;
588
29.8M
  }
589
590
 private:
591
  IntentTypesContainer& container_;
592
  const IntentTypeSet strong_intent_types_;
593
  const IntentTypeSet weak_intent_types_;
594
};
595
596
class StrongConflictChecker {
597
 public:
598
  StrongConflictChecker(const TransactionId& transaction_id,
599
                        HybridTime read_time,
600
                        ConflictResolver* resolver,
601
                        Counter* conflicts_metric,
602
                        KeyBytes* buffer)
603
      : transaction_id_(transaction_id),
604
        read_time_(read_time),
605
        resolver_(*resolver),
606
        conflicts_metric_(*conflicts_metric),
607
        buffer_(*buffer)
608
986k
  {}
609
610
21.5M
  CHECKED_STATUS Check(const Slice& intent_key, bool strong, WaitPolicy wait_policy) {
611
21.5M
    const auto hash = VERIFY_RESULT(DecodeDocKeyHash(intent_key));
612
21.5M
    if (PREDICT_FALSE(!value_iter_.Initialized() || hash != value_iter_hash_)) {
613
4.21M
      value_iter_ = CreateRocksDBIterator(
614
4.21M
          resolver_.doc_db().regular,
615
4.21M
          resolver_.doc_db().key_bounds,
616
4.21M
          BloomFilterMode::USE_BLOOM_FILTER,
617
4.21M
          intent_key,
618
4.21M
          rocksdb::kDefaultQueryId);
619
4.21M
      value_iter_hash_ = hash;
620
4.21M
    }
621
21.5M
    value_iter_.Seek(intent_key);
622
21.5M
    
VLOG_WITH_PREFIX_AND_FUNC2.13k
(4)
623
2.13k
        << "Overwrite; Seek: " << intent_key.ToDebugString() << " ("
624
2.13k
        << SubDocKey::DebugSliceToString(intent_key) << "), strong: " << strong << ", wait_policy: "
625
2.13k
        << AsString(wait_policy);
626
    // If we are resolving conflicts for writing a strong intent, look at records in regular RocksDB
627
    // with the same key as the intent's key (not including hybrid time) and any child keys. This is
628
    // because a strong intent indicates deletion or replacement of the entire subdocument tree and
629
    // any element of that tree that has already been committed at a higher hybrid time than the
630
    // read timestamp would be in conflict.
631
    //
632
    // (Note that when writing a strong intent on the entire table, e.g. as part of locking the
633
    // table, there is currently a performance issue and we'll need a better approach:
634
    // https://github.com/yugabyte/yugabyte-db/issues/6055).
635
    //
636
    // If we are resolving conflicts for writing a weak intent, only look at records in regular
637
    // RocksDB with the same key as the intent (not including hybrid time). This is because a weak
638
    // intent indicates that something in the document subtree rooted at that intent's key will
639
    // change, so it is only directly in conflict with a committed record that deletes or replaces
640
    // that entire document subtree (similar to a strong intent), so it would have the same exact
641
    // key as the weak intent (not including hybrid time).
642
26.2M
    while (value_iter_.Valid() &&
643
26.2M
           
(17.6M
intent_key.starts_with(ValueTypeAsChar::kGroupEnd)17.6M
||
644
17.6M
            
value_iter_.key().starts_with(intent_key)17.6M
)) {
645
5.42M
      auto existing_key = value_iter_.key();
646
5.42M
      auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&existing_key));
647
5.42M
      if (existing_key.empty() ||
648
5.42M
          
existing_key[existing_key.size() - 1] != ValueTypeAsChar::kHybridTime5.42M
) {
649
0
        return STATUS_FORMAT(
650
0
            Corruption, "Hybrid time expected at end of key: $0",
651
0
            value_iter_.key().ToDebugString());
652
0
      }
653
5.42M
      if (!strong && 
existing_key.size() != intent_key.size() + 1684k
) {
654
669k
        
VLOG_WITH_PREFIX12
(4)
655
12
            << "Check value overwrite, key: " << intent_key.ToDebugString()
656
12
            << ", out of bound key: " << existing_key.ToDebugString();
657
669k
        break;
658
669k
      }
659
18.4E
      VLOG_WITH_PREFIX(4)
660
18.4E
          << "Check value overwrite, key: " << SubDocKey::DebugSliceToString(intent_key)
661
18.4E
          << ", read time: " << read_time_
662
18.4E
          << ", doc ht: " << doc_ht.hybrid_time()
663
18.4E
          << ", found key: " << SubDocKey::DebugSliceToString(value_iter_.key())
664
18.4E
          << ", after start: " << (doc_ht.hybrid_time() >= read_time_)
665
18.4E
          << ", value: " << value_iter_.value().ToDebugString();
666
4.75M
      if (doc_ht.hybrid_time() >= read_time_) {
667
9.35k
        if (wait_policy == WAIT_SKIP) {
668
2
          return STATUS(InternalError, "Skip locking since entity was modified in regular db",
669
2
                        TransactionError(TransactionErrorCode::kSkipLocking));
670
9.34k
        } else {
671
9.34k
          conflicts_metric_.Increment();
672
9.34k
          return STATUS_EC_FORMAT(TryAgain, TransactionError(TransactionErrorCode::kConflict),
673
9.34k
                                  "Value write after transaction start: $0 >= $1",
674
9.34k
                                  doc_ht.hybrid_time(), read_time_);
675
9.34k
        }
676
9.35k
      }
677
4.75M
      buffer_.Reset(existing_key);
678
      // Already have ValueType::kHybridTime at the end
679
4.75M
      buffer_.AppendHybridTime(DocHybridTime::kMin);
680
4.75M
      ROCKSDB_SEEK(&value_iter_, buffer_.AsSlice());
681
4.75M
    }
682
683
21.5M
    return Status::OK();
684
21.5M
  }
685
686
 private:
687
0
  std::string LogPrefix() const {
688
0
    return Format("$0: ", transaction_id_);
689
0
  }
690
691
  const TransactionId& transaction_id_;
692
  const HybridTime read_time_;
693
  ConflictResolver& resolver_;
694
  Counter& conflicts_metric_;
695
  KeyBytes& buffer_;
696
697
  // RocksDb iterator with bloom filter can be reused in case keys has same hash component.
698
  BoundedRocksDbIterator value_iter_;
699
  boost::optional<DocKeyHash> value_iter_hash_;
700
701
};
702
703
class ConflictResolverContextBase : public ConflictResolverContext {
704
 public:
705
  ConflictResolverContextBase(const DocOperations& doc_ops,
706
                              HybridTime resolution_ht,
707
                              Counter* conflicts_metric)
708
      : doc_ops_(doc_ops),
709
        resolution_ht_(resolution_ht),
710
1.10M
        conflicts_metric_(conflicts_metric) {
711
1.10M
  }
712
713
1.10M
  const DocOperations& doc_ops() {
714
1.10M
    return doc_ops_;
715
1.10M
  }
716
717
1.65M
  HybridTime GetResolutionHt() override {
718
1.65M
    return resolution_ht_;
719
1.65M
  }
720
721
62
  void MakeResolutionAtLeast(const HybridTime& resolution_ht) {
722
62
    resolution_ht_.MakeAtLeast(resolution_ht);
723
62
  }
724
725
1.10M
  Counter* GetConflictsMetric() {
726
1.10M
    return conflicts_metric_;
727
1.10M
  }
728
729
 protected:
730
  CHECKED_STATUS CheckPriorityInternal(
731
      ConflictResolver* resolver,
732
      boost::iterator_range<TransactionData*> transactions,
733
      const TransactionId& our_transaction_id,
734
147k
      uint64_t our_priority) {
735
736
147k
    if (!fetched_metadata_for_transactions_) {
737
146k
      boost::container::small_vector<std::pair<TransactionId, uint64_t>, 8> ids_and_priorities;
738
146k
      ids_and_priorities.reserve(transactions.size());
739
179k
      for (const auto& transaction : transactions) {
740
179k
        ids_and_priorities.emplace_back(transaction.id, 0);
741
179k
      }
742
146k
      resolver->FillPriorities(&ids_and_priorities);
743
325k
      for (size_t i = 0; i != transactions.size(); 
++i179k
) {
744
179k
        transactions[i].priority = ids_and_priorities[i].second;
745
179k
      }
746
146k
    }
747
159k
    for (const auto& transaction : transactions) {
748
159k
      auto their_priority = transaction.priority;
749
159k
      if (transaction.wait_policy == WAIT_SKIP) {
750
36
        return STATUS(InternalError, "Skip locking since entity is already locked",
751
36
                      TransactionError(TransactionErrorCode::kSkipLocking));
752
36
      }
753
754
      // READ COMMITTED txns require a guarantee that no txn abort it. They can handle facing a
755
      // kConflict due to another txn's conflicting intent, but can't handle aborts. To ensure
756
      // these guarantees -
757
      //   1. all READ COMMITTED txns are given kHighestPriority and
758
      //   2. a kConflict is raised even if their_priority equals our_priority.
759
159k
      if (our_priority <= their_priority) {
760
98.3k
        return MakeConflictStatus(
761
98.3k
            our_transaction_id, transaction.id, "higher priority", GetConflictsMetric());
762
98.3k
      }
763
159k
    }
764
48.8k
    fetched_metadata_for_transactions_ = true;
765
766
48.8k
    return Status::OK();
767
147k
  }
768
769
 private:
770
  const DocOperations& doc_ops_;
771
772
  // Hybrid time of conflict resolution, used to request transaction status from status tablet.
773
  HybridTime resolution_ht_;
774
775
  bool fetched_metadata_for_transactions_ = false;
776
777
  Counter* conflicts_metric_ = nullptr;
778
};
779
780
// Utility class for ResolveTransactionConflicts implementation.
781
class TransactionConflictResolverContext : public ConflictResolverContextBase {
782
 public:
783
  TransactionConflictResolverContext(const DocOperations& doc_ops,
784
                                     const KeyValueWriteBatchPB& write_batch,
785
                                     HybridTime resolution_ht,
786
                                     HybridTime read_time,
787
                                     Counter* conflicts_metric)
788
      : ConflictResolverContextBase(doc_ops, resolution_ht, conflicts_metric),
789
        write_batch_(write_batch),
790
        read_time_(read_time),
791
        transaction_id_(FullyDecodeTransactionId(write_batch.transaction().transaction_id()))
792
985k
  {}
793
794
987k
  virtual ~TransactionConflictResolverContext() {}
795
796
 private:
797
987k
  CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) override {
798
987k
    RETURN_NOT_OK(transaction_id_);
799
800
987k
    
VLOG_WITH_PREFIX5.02k
(3) << "Resolve conflicts"5.02k
;
801
802
987k
    metadata_ = 
VERIFY_RESULT987k
(987k
resolver->PrepareMetadata(write_batch_.transaction()));
803
804
0
    boost::container::small_vector<RefCntPrefix, 8> paths;
805
806
987k
    const size_t kKeyBufferInitialSize = 512;
807
987k
    KeyBytes buffer;
808
987k
    buffer.Reserve(kKeyBufferInitialSize);
809
987k
    const auto row_mark = GetRowMarkTypeFromPB(write_batch_);
810
987k
    IntentTypesContainer container;
811
987k
    IntentProcessor write_processor(
812
987k
        &container,
813
987k
        GetStrongIntentTypeSet(metadata_.isolation, docdb::OperationKind::kWrite, row_mark));
814
6.78M
    for (const auto& doc_op : doc_ops()) {
815
6.78M
      paths.clear();
816
6.78M
      IsolationLevel ignored_isolation_level;
817
6.78M
      RETURN_NOT_OK(doc_op->GetDocPaths(
818
6.78M
          GetDocPathsMode::kIntents, &paths, &ignored_isolation_level));
819
820
16.7M
      
for (const auto& path : paths)6.78M
{
821
16.7M
        
VLOG_WITH_PREFIX_AND_FUNC306
(4)
822
306
            << "Doc path: " << SubDocKey::DebugSliceToString(path.as_slice());
823
16.7M
        RETURN_NOT_OK(EnumerateIntents(
824
16.7M
            path.as_slice(),
825
16.7M
            /* intent_value */ Slice(),
826
16.7M
            [&write_processor](
827
16.7M
                auto strength, FullDocKey full_doc_key, auto, auto intent_key, auto) {
828
16.7M
              write_processor.Process(strength, full_doc_key, intent_key);
829
16.7M
              return Status::OK();
830
16.7M
            },
831
16.7M
            &buffer,
832
16.7M
            resolver->partial_range_key_intents()));
833
16.7M
      }
834
6.78M
    }
835
    // Either write_batch_.read_pairs is not empty or doc_ops is non empty. Both can't be non empty
836
    // together. This is because read_pairs is filled only in case of a read operation that has a
837
    // row mark or is part of a serializable txn.
838
    // 1. In case doc_ops are present, we use the default wait policy of WAIT_ERROR.
839
    // 2. In case of a read rpc that has wait_policy, we use that.
840
987k
    auto wait_policy = WAIT_ERROR;
841
987k
    const auto& pairs = write_batch_.read_pairs();
842
987k
    if (!pairs.empty()) {
843
386k
      IntentProcessor read_processor(
844
386k
          &container,
845
386k
          GetStrongIntentTypeSet(metadata_.isolation, docdb::OperationKind::kRead, row_mark));
846
386k
      wait_policy = write_batch_.wait_policy();
847
386k
      RETURN_NOT_OK(EnumerateIntents(
848
386k
          pairs,
849
386k
          [&read_processor] (
850
386k
              auto strength, FullDocKey full_doc_key, auto, auto intent_key, auto) {
851
386k
            read_processor.Process(strength, full_doc_key, intent_key);
852
386k
            return Status::OK();
853
386k
          },
854
386k
          resolver->partial_range_key_intents()));
855
386k
    }
856
857
987k
    if (container.empty()) {
858
0
      return Status::OK();
859
0
    }
860
861
987k
    
VLOG_WITH_PREFIX_AND_FUNC549
(4) << "Check txn's conflicts for following intents: "
862
549
                                 << AsString(container);
863
864
987k
    StrongConflictChecker checker(
865
987k
        *transaction_id_, read_time_, resolver, GetConflictsMetric(), &buffer);
866
    // Iterator on intents DB should be created before iterator on regular DB.
867
    // This is to prevent the case when we create an iterator on the regular DB where a
868
    // provisional record has not yet been applied, and then create an iterator the intents
869
    // DB where the provisional record has already been removed.
870
987k
    resolver->EnsureIntentIteratorCreated();
871
872
27.2M
    for (const auto& i : container) {
873
27.2M
      if (read_time_ != HybridTime::kMax) {
874
23.4M
        const Slice intent_key = i.first.AsSlice();
875
23.4M
        bool strong = HasStrong(i.second.types);
876
        // For strong intents or weak intents at a full document key level (i.e. excluding intents
877
        // that omit some final range components of the document key), check for conflicts with
878
        // records in regular RocksDB. We need this because the row might have been deleted
879
        // concurrently by a single-shard transaction or a committed and applied transaction.
880
23.4M
        if (strong || 
i.second.full_doc_key6.96M
) {
881
21.5M
          RETURN_NOT_OK(checker.Check(intent_key, strong, wait_policy));
882
21.5M
        }
883
23.4M
      }
884
27.2M
      buffer.Reset(i.first.AsSlice());
885
27.2M
      RETURN_NOT_OK(resolver->ReadIntentConflicts(i.second.types, &buffer, wait_policy));
886
27.2M
    }
887
888
977k
    return Status::OK();
889
987k
  }
890
891
  CHECKED_STATUS CheckPriority(ConflictResolver* resolver,
892
145k
                               boost::iterator_range<TransactionData*> transactions) override {
893
145k
    return CheckPriorityInternal(resolver, transactions, metadata_.transaction_id,
894
145k
                                 metadata_.priority);
895
145k
  }
896
897
  Result<bool> CheckConflictWithCommitted(
898
170k
      const TransactionData& transaction_data, HybridTime commit_time) override {
899
170k
    RSTATUS_DCHECK(commit_time.is_valid(), Corruption, "Invalid transaction commit time");
900
901
18.4E
    VLOG_WITH_PREFIX(4) << "Committed: " << transaction_data.id << ", commit_time: " << commit_time
902
18.4E
                        << ", read_time: " << read_time_
903
18.4E
                        << ", wait_policy:" << transaction_data.wait_policy
904
18.4E
                        << ", all_lock_only_conflicts" << transaction_data.all_lock_only_conflicts;
905
906
    // If the intents to be written conflict with only "explicit row lock" intents of a committed
907
    // transaction, we can proceed now because a committed transaction implies that the locks are
908
    // released. In other words, only a committed transaction with some conflicting intent that
909
    // results in a modification to data in regular db, can result in a serialization error.
910
    //
911
    // commit_time equals to HybridTime::kMax means that transaction is not actually committed,
912
    // but is being committed. I.e. status tablet is trying to replicate COMMITTED state.
913
    // So we should always conflict with such transaction, because we are not able to read its
914
    // results.
915
    //
916
    // read_time equals to HybridTime::kMax in case of serializable isolation or when
917
    // read time was not yet picked for snapshot isolation.
918
    // So it should conflict only with transactions that are being committed.
919
    //
920
    // In all other cases we have concrete read time and should conflict with transactions
921
    // that were committed after this point.
922
170k
    if (!transaction_data.all_lock_only_conflicts && 
commit_time >= read_time_170k
) {
923
20.8k
      if (transaction_data.wait_policy == WAIT_SKIP) {
924
0
        return STATUS(InternalError, "Skip locking since entity was modified by a recent commit",
925
0
                      TransactionError(TransactionErrorCode::kSkipLocking));
926
20.8k
      } else {
927
20.8k
        return MakeConflictStatus(
928
20.8k
          *transaction_id_, transaction_data.id, "committed", GetConflictsMetric());
929
20.8k
      }
930
20.8k
    }
931
932
149k
    return true;
933
170k
  }
934
935
765k
  bool IgnoreConflictsWith(const TransactionId& other) override {
936
765k
    return other == *transaction_id_;
937
765k
  }
938
939
0
  TransactionId transaction_id() const override {
940
0
    return *transaction_id_;
941
0
  }
942
943
56
  std::string ToString() const override {
944
56
    return yb::ToString(transaction_id_);
945
56
  }
946
947
  const KeyValueWriteBatchPB& write_batch_;
948
949
  // Read time of the transaction identified by transaction_id_, could be HybridTime::kMax in case
950
  // of serializable isolation or when read time not yet picked for snapshot isolation.
951
  const HybridTime read_time_;
952
953
  // Id of transaction when is writing intents, for which we are resolving conflicts.
954
  Result<TransactionId> transaction_id_;
955
956
  TransactionMetadata metadata_;
957
958
  Status result_ = Status::OK();
959
};
960
961
class OperationConflictResolverContext : public ConflictResolverContextBase {
962
 public:
963
  OperationConflictResolverContext(const DocOperations* doc_ops,
964
                                   HybridTime resolution_ht,
965
                                   Counter* conflicts_metric)
966
122k
      : ConflictResolverContextBase(*doc_ops, resolution_ht, conflicts_metric) {
967
122k
  }
968
969
122k
  virtual ~OperationConflictResolverContext() {}
970
971
  // Reads stored intents that could conflict with our operations.
972
122k
  CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) override {
973
122k
    boost::container::small_vector<RefCntPrefix, 8> doc_paths;
974
122k
    boost::container::small_vector<size_t, 32> key_prefix_lengths;
975
122k
    KeyBytes encoded_key_buffer;
976
977
122k
    IntentTypeSet strong_intent_types;
978
979
122k
    EnumerateIntentsCallback callback = [&strong_intent_types, resolver](
980
122k
        IntentStrength intent_strength, FullDocKey full_doc_key, Slice,
981
1.82M
        KeyBytes* encoded_key_buffer, LastKey) {
982
1.82M
      return resolver->ReadIntentConflicts(
983
1.82M
          intent_strength == IntentStrength::kStrong ? 
strong_intent_types543k
984
1.82M
                                                     : 
StrongToWeak(strong_intent_types)1.28M
,
985
1.82M
          encoded_key_buffer, WAIT_ERROR);
986
1.82M
    };
987
988
498k
    for (const auto& doc_op : doc_ops()) {
989
498k
      doc_paths.clear();
990
498k
      IsolationLevel isolation;
991
498k
      RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kIntents, &doc_paths, &isolation));
992
993
498k
      strong_intent_types = GetStrongIntentTypeSet(isolation, OperationKind::kWrite,
994
498k
                                                   RowMarkType::ROW_MARK_ABSENT);
995
996
543k
      for (const auto& doc_path : doc_paths) {
997
543k
        
VLOG_WITH_PREFIX_AND_FUNC11
(4)
998
11
            << "Doc path: " << SubDocKey::DebugSliceToString(doc_path.as_slice());
999
543k
        RETURN_NOT_OK(EnumerateIntents(
1000
543k
            doc_path.as_slice(), Slice(), callback, &encoded_key_buffer,
1001
543k
            PartialRangeKeyIntents::kTrue));
1002
543k
      }
1003
498k
    }
1004
1005
122k
    return Status::OK();
1006
122k
  }
1007
1008
  CHECKED_STATUS CheckPriority(ConflictResolver* resolver,
1009
1.29k
                               boost::iterator_range<TransactionData*> transactions) override {
1010
1.29k
    return CheckPriorityInternal(resolver,
1011
1.29k
                                 transactions,
1012
1.29k
                                 TransactionId::Nil(),
1013
1.29k
                                 kHighPriTxnLowerBound - 1 /* our_priority */);
1014
1.29k
  }
1015
1016
1.31k
  bool IgnoreConflictsWith(const TransactionId& other) override {
1017
1.31k
    return false;
1018
1.31k
  }
1019
1020
0
  TransactionId transaction_id() const override {
1021
0
    return TransactionId::Nil();
1022
0
  }
1023
1024
0
  std::string ToString() const override {
1025
0
    return "Operation Context";
1026
0
  }
1027
1028
  Result<bool> CheckConflictWithCommitted(
1029
1.01k
      const TransactionData& transaction_data, HybridTime commit_time) override {
1030
1.01k
    if (commit_time != HybridTime::kMax) {
1031
62
      MakeResolutionAtLeast(commit_time);
1032
62
      return true;
1033
62
    }
1034
951
    return false;
1035
1.01k
  }
1036
};
1037
1038
} // namespace
1039
1040
void ResolveTransactionConflicts(const DocOperations& doc_ops,
1041
                                 const KeyValueWriteBatchPB& write_batch,
1042
                                 HybridTime hybrid_time,
1043
                                 HybridTime read_time,
1044
                                 const DocDB& doc_db,
1045
                                 PartialRangeKeyIntents partial_range_key_intents,
1046
                                 TransactionStatusManager* status_manager,
1047
                                 Counter* conflicts_metric,
1048
987k
                                 ResolutionCallback callback) {
1049
987k
  DCHECK(hybrid_time.is_valid());
1050
987k
  TRACE("ResolveTransactionConflicts");
1051
987k
  auto context = std::make_unique<TransactionConflictResolverContext>(
1052
987k
      doc_ops, write_batch, hybrid_time, read_time, conflicts_metric);
1053
987k
  auto resolver = std::make_shared<ConflictResolver>(
1054
987k
      doc_db, status_manager, partial_range_key_intents, std::move(context), std::move(callback));
1055
  // Resolve takes a self reference to extend lifetime.
1056
987k
  resolver->Resolve();
1057
987k
  TRACE("resolver->Resolve done");
1058
987k
}
1059
1060
void ResolveOperationConflicts(const DocOperations& doc_ops,
1061
                               HybridTime resolution_ht,
1062
                               const DocDB& doc_db,
1063
                               PartialRangeKeyIntents partial_range_key_intents,
1064
                               TransactionStatusManager* status_manager,
1065
                               Counter* conflicts_metric,
1066
122k
                               ResolutionCallback callback) {
1067
122k
  TRACE("ResolveOperationConflicts");
1068
122k
  auto context = std::make_unique<OperationConflictResolverContext>(&doc_ops, resolution_ht,
1069
122k
                                                                    conflicts_metric);
1070
122k
  auto resolver = std::make_shared<ConflictResolver>(
1071
122k
      doc_db, status_manager, partial_range_key_intents, std::move(context), std::move(callback));
1072
  // Resolve takes a self reference to extend lifetime.
1073
122k
  resolver->Resolve();
1074
122k
  TRACE("resolver->Resolve done");
1075
122k
}
1076
1077
#define INTENT_KEY_SCHECK(lhs, op, rhs, msg) \
1078
260M
  BOOST_PP_CAT(SCHECK_, op)(lhs, \
1079
260M
                            rhs, \
1080
260M
                            Corruption, \
1081
260M
                            Format("Bad intent key, $0 in $1, transaction from: $2", \
1082
260M
                                   msg, \
1083
260M
                                   intent_key.ToDebugHexString(), \
1084
260M
                                   transaction_id_source.ToDebugHexString()))
1085
1086
// transaction_id_slice used in INTENT_KEY_SCHECK
1087
86.9M
Result<ParsedIntent> ParseIntentKey(Slice intent_key, Slice transaction_id_source) {
1088
86.9M
  ParsedIntent result;
1089
86.9M
  size_t doc_ht_size = 0;
1090
86.9M
  result.doc_path = intent_key;
1091
  // Intent is encoded as "DocPath + IntentType + DocHybridTime".
1092
86.9M
  RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(result.doc_path, &doc_ht_size));
1093
  // 3 comes from (ValueType::kIntentType, the actual intent type, ValueType::kHybridTime).
1094
86.9M
  INTENT_KEY_SCHECK(result.doc_path.size(), GE, doc_ht_size + 3, "key too short");
1095
86.9M
  result.doc_path.remove_suffix(doc_ht_size + 3);
1096
86.9M
  auto intent_type_and_doc_ht = result.doc_path.end();
1097
86.9M
  if (intent_type_and_doc_ht[0] == ValueTypeAsChar::kObsoleteIntentType) {
1098
0
    result.types = ObsoleteIntentTypeToSet(intent_type_and_doc_ht[1]);
1099
86.9M
  } else if (intent_type_and_doc_ht[0] == ValueTypeAsChar::kObsoleteIntentTypeSet) {
1100
0
    result.types = ObsoleteIntentTypeSetToNew(intent_type_and_doc_ht[1]);
1101
86.9M
  } else {
1102
86.9M
    INTENT_KEY_SCHECK(intent_type_and_doc_ht[0], EQ, ValueTypeAsChar::kIntentTypeSet,
1103
86.9M
        "intent type set type expected");
1104
86.9M
    result.types = IntentTypeSet(intent_type_and_doc_ht[1]);
1105
86.9M
  }
1106
86.9M
  INTENT_KEY_SCHECK(intent_type_and_doc_ht[2], EQ, ValueTypeAsChar::kHybridTime,
1107
86.9M
                    "hybrid time value type expected");
1108
86.9M
  result.doc_ht = Slice(result.doc_path.end() + 2, doc_ht_size + 1);
1109
86.9M
  return result;
1110
86.9M
}
1111
1112
0
std::string DebugIntentKeyToString(Slice intent_key) {
1113
0
  auto parsed = ParseIntentKey(intent_key, Slice());
1114
0
  if (!parsed.ok()) {
1115
0
    LOG(WARNING) << "Failed to parse: " << intent_key.ToDebugHexString() << ": " << parsed.status();
1116
0
    return intent_key.ToDebugHexString();
1117
0
  }
1118
0
  auto doc_ht = DocHybridTime::DecodeFromEnd(parsed->doc_ht);
1119
0
  if (!doc_ht.ok()) {
1120
0
    LOG(WARNING) << "Failed to decode doc ht: " << intent_key.ToDebugHexString() << ": "
1121
0
                 << doc_ht.status();
1122
0
    return intent_key.ToDebugHexString();
1123
0
  }
1124
0
  return Format("$0 (key: $1 type: $2 doc_ht: $3)",
1125
0
                intent_key.ToDebugHexString(), SubDocKey::DebugSliceToString(parsed->doc_path),
1126
0
                parsed->types, *doc_ht);
1127
0
}
1128
1129
} // namespace docdb
1130
} // namespace yb