YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/conflict_resolution.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/docdb/conflict_resolution.h"
14
15
#include <map>
16
17
#include "yb/common/hybrid_time.h"
18
#include "yb/common/row_mark.h"
19
#include "yb/common/transaction.h"
20
#include "yb/common/transaction_error.h"
21
#include "yb/common/transaction_priority.h"
22
#include "yb/docdb/doc_key.h"
23
#include "yb/docdb/docdb.h"
24
#include "yb/docdb/docdb.pb.h"
25
#include "yb/docdb/docdb_rocksdb_util.h"
26
#include "yb/docdb/intent.h"
27
#include "yb/docdb/shared_lock_manager.h"
28
#include "yb/docdb/transaction_dump.h"
29
#include "yb/util/logging.h"
30
#include "yb/util/metrics.h"
31
#include "yb/util/scope_exit.h"
32
#include "yb/util/status_format.h"
33
#include "yb/util/trace.h"
34
35
using namespace std::literals;
36
using namespace std::placeholders;
37
38
namespace yb {
39
namespace docdb {
40
41
namespace {
42
43
struct TransactionConflictInfo {
44
  WaitPolicy wait_policy;
45
  bool all_lock_only_conflicts;
46
47
0
  std::string ToString() const {
48
0
    return YB_STRUCT_TO_STRING(wait_policy, all_lock_only_conflicts);
49
0
  }
50
};
51
52
using TransactionConflictInfoMap = std::unordered_map<TransactionId,
53
                                                      TransactionConflictInfo,
54
                                                      TransactionIdHash>;
55
56
struct TransactionData {
57
  TransactionData(TransactionId id_, WaitPolicy wait_policy_, bool all_lock_only_conflicts_)
58
265k
      : id(id_), wait_policy(wait_policy_), all_lock_only_conflicts(all_lock_only_conflicts_) {}
59
60
  TransactionId id;
61
  WaitPolicy wait_policy;
62
  // all_lock_only_conflicts is true if all conflicting intents of this transaction are explicit row
63
  // lock intents and not intents that would result in modifications to data in regular db.
64
  bool all_lock_only_conflicts;
65
  TransactionStatus status;
66
  HybridTime commit_time;
67
  uint64_t priority;
68
  Status failure;
69
70
214k
  void ProcessStatus(const TransactionStatusResult& result) {
71
214k
    status = result.status;
72
214k
    if (status == TransactionStatus::COMMITTED) {
73
2
      LOG_IF(DFATAL, !result.status_time.is_valid())
74
2
          << "Status time not specified for committed transaction: " << id;
75
58.1k
      commit_time = result.status_time;
76
58.1k
    }
77
214k
  }
78
79
0
  std::string ToString() const {
80
0
    return Format("{ id: $0 status: $1 commit_time: $2 priority: $3 failure: $4 }",
81
0
                  id, TransactionStatus_Name(status), commit_time, priority, failure);
82
0
  }
83
};
84
85
CHECKED_STATUS MakeConflictStatus(const TransactionId& our_id, const TransactionId& other_id,
86
98.3k
                                  const char* reason, Counter* conflicts_metric) {
87
98.3k
  conflicts_metric->Increment();
88
98.3k
  return (STATUS(TryAgain, Format("$0 Conflicts with $1 transaction: $2", our_id, reason, other_id),
89
98.3k
                 Slice(), TransactionError(TransactionErrorCode::kConflict)));
90
98.3k
}
91
92
class ConflictResolver;
93
94
class ConflictResolverContext {
95
 public:
96
  // Read all conflicts for operation/transaction.
97
  virtual CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) = 0;
98
99
  // Check priority of this one against existing transactions.
100
  virtual CHECKED_STATUS CheckPriority(
101
      ConflictResolver* resolver,
102
      boost::iterator_range<TransactionData*> transactions) = 0;
103
104
  // Check for conflict against committed transaction.
105
  // Returns true if transaction could be removed from list of conflicts.
106
  virtual Result<bool> CheckConflictWithCommitted(
107
      const TransactionData& transaction_data, HybridTime commit_time) = 0;
108
109
  virtual HybridTime GetResolutionHt() = 0;
110
111
  virtual bool IgnoreConflictsWith(const TransactionId& other) = 0;
112
113
  virtual TransactionId transaction_id() const = 0;
114
115
  virtual std::string ToString() const = 0;
116
117
21
  std::string LogPrefix() const {
118
21
    return ToString() + ": ";
119
21
  }
120
121
605k
  virtual ~ConflictResolverContext() = default;
122
};
123
124
class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
125
 public:
126
  ConflictResolver(const DocDB& doc_db,
127
                   TransactionStatusManager* status_manager,
128
                   PartialRangeKeyIntents partial_range_key_intents,
129
                   std::unique_ptr<ConflictResolverContext> context,
130
                   ResolutionCallback callback)
131
      : doc_db_(doc_db), status_manager_(*status_manager), request_scope_(status_manager),
132
        partial_range_key_intents_(partial_range_key_intents), context_(std::move(context)),
133
602k
        callback_(std::move(callback)) {}
134
135
5.35M
  PartialRangeKeyIntents partial_range_key_intents() {
136
5.35M
    return partial_range_key_intents_;
137
5.35M
  }
138
139
611k
  TransactionStatusManager& status_manager() {
140
611k
    return status_manager_;
141
611k
  }
142
143
2.77M
  const DocDB& doc_db() {
144
2.77M
    return doc_db_;
145
2.77M
  }
146
147
543k
  Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) {
148
543k
    return status_manager_.PrepareMetadata(pb);
149
543k
  }
150
151
  void FillPriorities(
152
85.0k
      boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) {
153
85.0k
    return status_manager_.FillPriorities(inout);
154
85.0k
  }
155
156
604k
  void Resolve() {
157
604k
    auto status = context_->ReadConflicts(this);
158
604k
    if (!status.ok()) {
159
1.86k
      InvokeCallback(status);
160
1.86k
      return;
161
1.86k
    }
162
163
602k
    ResolveConflicts();
164
602k
  }
165
166
314
  Result<WaitPolicy> CombineWaitPolicy(WaitPolicy existing_policy, WaitPolicy new_policy) {
167
314
    RSTATUS_DCHECK(
168
314
        existing_policy != WAIT_BLOCK, InternalError, "WAIT_BLOCK isn't support yet.");
169
170
314
    switch(new_policy) {
171
0
      case WAIT_BLOCK:
172
0
        return STATUS(NotSupported, "WAIT_BLOCK isn't support yet.");
173
286
      case WAIT_ERROR:
174
        // Even if some intent had a wait policy of WAIT_SKIP, WAIT_ERROR overrides that policy.
175
286
        return new_policy;
176
28
      case WAIT_SKIP:
177
        // The existing_policy can either be WAIT_ERROR or WAIT_SKIP. In either case, we can leave
178
        // it untouched.
179
28
        return existing_policy;
180
0
    }
181
0
    return STATUS(NotSupported, "Unknown wait policy.");
182
0
  }
183
184
  // Reads conflicts for specified intent from DB.
185
  CHECKED_STATUS ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix,
186
10.3M
                                     WaitPolicy wait_policy) {
187
10.3M
    EnsureIntentIteratorCreated();
188
189
10.3M
    const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()];
190
191
10.3M
    KeyBytes upperbound_key(*intent_key_prefix);
192
10.3M
    upperbound_key.AppendValueType(ValueType::kMaxByte);
193
10.3M
    intent_key_upperbound_ = upperbound_key.AsSlice();
194
195
10.3M
    size_t original_size = intent_key_prefix->size();
196
10.3M
    intent_key_prefix->AppendValueType(ValueType::kIntentTypeSet);
197
    // Have only weak intents, so could skip other weak intents.
198
10.3M
    if (!HasStrong(type)) {
199
3.56M
      char value = 1 << kStrongIntentFlag;
200
3.56M
      intent_key_prefix->AppendRawBytes(&value, 1);
201
3.56M
    }
202
10.3M
    auto se = ScopeExit([this, intent_key_prefix, original_size] {
203
10.3M
      intent_key_prefix->Truncate(original_size);
204
10.3M
      intent_key_upperbound_.clear();
205
10.3M
    });
206
10.3M
    Slice prefix_slice(intent_key_prefix->AsSlice().data(), original_size);
207
4.02k
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Check conflicts in intents DB; Seek: "
208
4.02k
                                 << intent_key_prefix->AsSlice().ToDebugHexString() << " for type "
209
4.02k
                                 << ToString(type) << " and wait_policy=" << wait_policy;
210
10.3M
    intent_iter_.Seek(intent_key_prefix->AsSlice());
211
14.8M
    while (intent_iter_.Valid()) {
212
4.72M
      auto existing_key = intent_iter_.key();
213
4.72M
      auto existing_value = intent_iter_.value();
214
4.72M
      if (!existing_key.starts_with(prefix_slice)) {
215
0
        break;
216
0
      }
217
      // Support for obsolete intent type.
218
      // When looking for intent with specific prefix it should start with this prefix, followed
219
      // by ValueType::kIntentTypeSet.
220
      // Previously we were using intent type, so should support its value type also, now it is
221
      // kObsoleteIntentType.
222
      // Actual handling of obsolete intent type is done in ParseIntentKey.
223
4.72M
      if (existing_key.size() <= prefix_slice.size() ||
224
4.72M
          !IntentValueType(existing_key[prefix_slice.size()])) {
225
131k
        break;
226
131k
      }
227
228
4.58M
      auto existing_intent = VERIFY_RESULT(
229
4.58M
          docdb::ParseIntentKey(intent_iter_.key(), existing_value));
230
231
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(4) << "Found: " << existing_value.ToDebugString()
232
18.4E
                                   << " has intent types " << ToString(existing_intent.types);
233
4.58M
      auto decoded_value = VERIFY_RESULT(DecodeIntentValue(
234
4.58M
          existing_value, nullptr /* verify_transaction_id_slice */,
235
4.58M
          HasStrong(existing_intent.types)));
236
4.58M
      const auto intent_mask = kIntentTypeSetMask[existing_intent.types.ToUIntPtr()];
237
4.58M
      if ((conflicting_intent_types & intent_mask) != 0) {
238
437k
        auto transaction_id = decoded_value.transaction_id;
239
437k
        bool lock_only = decoded_value.body.starts_with(ValueTypeAsChar::kRowLock);
240
241
        // TODO(savepoints) - if the intent corresponds to an aborted subtransaction, ignore.
242
437k
        if (!context_->IgnoreConflictsWith(transaction_id)) {
243
266k
          auto p = conflicts_.emplace(transaction_id,
244
266k
                                      TransactionConflictInfo {
245
266k
                                        .wait_policy = wait_policy,
246
266k
                                        .all_lock_only_conflicts = lock_only,
247
266k
                                      });
248
266k
          if (!p.second) {
249
314
            p.first->second.wait_policy = VERIFY_RESULT(
250
314
                CombineWaitPolicy(p.first->second.wait_policy, wait_policy));
251
314
            p.first->second.all_lock_only_conflicts = p.first->second.all_lock_only_conflicts &&
252
28
                                                      lock_only;
253
314
          }
254
266k
        }
255
437k
      }
256
257
4.58M
      intent_iter_.Next();
258
4.58M
    }
259
260
10.3M
    return Status::OK();
261
10.3M
  }
262
263
10.8M
  void EnsureIntentIteratorCreated() {
264
10.8M
    if (!intent_iter_.Initialized()) {
265
605k
      intent_iter_ = CreateRocksDBIterator(
266
605k
          doc_db_.intents,
267
605k
          doc_db_.key_bounds,
268
605k
          BloomFilterMode::DONT_USE_BLOOM_FILTER,
269
605k
          boost::none /* user_key_for_filter */,
270
605k
          rocksdb::kDefaultQueryId,
271
605k
          nullptr /* file_filter */,
272
605k
          &intent_key_upperbound_);
273
605k
    }
274
10.8M
  }
275
276
 private:
277
605k
  void InvokeCallback(const Result<HybridTime>& result) {
278
605k
    YB_TRANSACTION_DUMP(
279
605k
        Conflicts, context_->transaction_id(),
280
605k
        result.ok() ? *result : HybridTime::kInvalid,
281
605k
        Slice(pointer_cast<const uint8_t*>(transactions_.data()),
282
605k
              transactions_.size() * sizeof(transactions_[0])));
283
605k
    intent_iter_.Reset();
284
605k
    callback_(result);
285
605k
  }
286
287
420k
  MUST_USE_RESULT bool CheckResolutionDone(const Result<bool>& result) {
288
420k
    if (!result.ok()) {
289
98.4k
      TRACE("Abort: $0", result.status().ToString());
290
2
      VLOG_WITH_PREFIX(4) << "Abort: " << result.status();
291
98.4k
      InvokeCallback(result.status());
292
98.4k
      return true;
293
98.4k
    }
294
295
322k
    if (result.get()) {
296
95.0k
      TRACE("No conflicts.");
297
2
      VLOG_WITH_PREFIX(4) << "No conflicts: " << context_->GetResolutionHt();
298
95.0k
      InvokeCallback(context_->GetResolutionHt());
299
95.0k
      return true;
300
95.0k
    }
301
302
227k
    return false;
303
227k
  }
304
305
603k
  void ResolveConflicts() {
306
49
    VLOG_WITH_PREFIX(3) << "Conflicts: " << yb::ToString(conflicts_);
307
603k
    if (conflicts_.empty()) {
308
409k
      VTRACE(1, LogPrefix());
309
409k
      TRACE("No conflicts.");
310
409k
      InvokeCallback(context_->GetResolutionHt());
311
409k
      return;
312
409k
    }
313
314
193k
    transactions_.reserve(conflicts_.size());
315
266k
    for (const auto& kv : conflicts_) {
316
266k
      transactions_.emplace_back(kv.first /* id */,
317
266k
                                 kv.second.wait_policy,
318
266k
                                 kv.second.all_lock_only_conflicts);
319
266k
    }
320
193k
    remaining_transactions_ = transactions_.size();
321
322
193k
    DoResolveConflicts();
323
193k
  }
324
325
197k
  void DoResolveConflicts() {
326
197k
    if (CheckResolutionDone(CheckLocalCommits())) {
327
10.3k
      return;
328
10.3k
    }
329
330
187k
    FetchTransactionStatuses();
331
187k
  }
332
333
187k
  void FetchTransactionStatusesDone() {
334
187k
    if (CheckResolutionDone(ContinueResolve())) {
335
152k
      return;
336
152k
    }
337
187k
  }
338
339
187k
  Result<bool> ContinueResolve() {
340
187k
    if (VERIFY_RESULT(Cleanup())) {
341
72.8k
      return true;
342
72.8k
    }
343
344
114k
    RETURN_NOT_OK(context_->CheckPriority(this, RemainingTransactions()));
345
346
35.9k
    AbortTransactions();
347
35.9k
    return false;
348
114k
  }
349
350
  // Returns true when there are no conflicts left.
351
197k
  Result<bool> CheckLocalCommits() {
352
271k
    return DoCleanup([this](auto* transaction) -> Result<bool> {
353
271k
      return this->CheckLocalCommit(transaction);
354
271k
    });
355
197k
  }
356
357
  // Check whether specified transaction was locally committed, and store this state if so.
358
  // Returns true if conflict with specified transaction is resolved.
359
270k
  Result<bool> CheckLocalCommit(TransactionData* transaction) {
360
    // TODO(savepoints): Do not conflict with aborted intents.
361
270k
    auto commit_time = status_manager().LocalCommitTime(transaction->id);
362
270k
    if (commit_time.is_valid()) {
363
46.0k
      transaction->commit_time = commit_time;
364
46.0k
      transaction->status = TransactionStatus::COMMITTED;
365
46.0k
    }
366
    // In case of failure status, we stop the resolution process, so `transactions_` content
367
    // does not matter in this case.
368
270k
    if (!(commit_time.is_valid() &&
369
270k
          VERIFY_RESULT(context_->CheckConflictWithCommitted(*transaction, commit_time)))) {
370
225k
      return false;
371
225k
    }
372
18.4E
    VLOG_WITH_PREFIX(4) << "Locally committed: " << transaction->id << ", time: " << commit_time;
373
45.7k
    return true;
374
45.7k
  }
375
376
  // Apply specified functor to all active (i.e. not resolved) transactions.
377
  // If functor returns true, it means that transaction was resolved.
378
  // So such transaction is moved out of active transactions range.
379
  // Returns true if there are no active transaction left.
380
  template <class F>
381
420k
  Result<bool> DoCleanup(const F& f) {
382
420k
    auto end = transactions_.begin() + remaining_transactions_;
383
846k
    for (auto transaction = transactions_.begin(); transaction != end;) {
384
534k
      if (!VERIFY_RESULT(f(&*transaction))) {
385
359k
        ++transaction;
386
359k
        continue;
387
359k
      }
388
174k
      if (--end == transaction) {
389
108k
        break;
390
108k
      }
391
66.5k
      std::swap(*transaction, *end);
392
66.5k
    }
393
420k
    remaining_transactions_ = end - transactions_.begin();
394
395
420k
    return remaining_transactions_ == 0;
396
420k
  }
conflict_resolution.cc:_ZN2yb5docdb12_GLOBAL__N_116ConflictResolver9DoCleanupIZNS2_17CheckLocalCommitsEvEUlPT_E_EENS_6ResultIbEERKS4_
Line
Count
Source
381
197k
  Result<bool> DoCleanup(const F& f) {
382
197k
    auto end = transactions_.begin() + remaining_transactions_;
383
450k
    for (auto transaction = transactions_.begin(); transaction != end;) {
384
271k
      if (!VERIFY_RESULT(f(&*transaction))) {
385
225k
        ++transaction;
386
225k
        continue;
387
225k
      }
388
45.8k
      if (--end == transaction) {
389
18.4k
        break;
390
18.4k
      }
391
27.3k
      std::swap(*transaction, *end);
392
27.3k
    }
393
197k
    remaining_transactions_ = end - transactions_.begin();
394
395
197k
    return remaining_transactions_ == 0;
396
197k
  }
conflict_resolution.cc:_ZN2yb5docdb12_GLOBAL__N_116ConflictResolver9DoCleanupIZNS2_7CleanupEvEUlPT_E_EENS_6ResultIbEERKS4_
Line
Count
Source
381
222k
  Result<bool> DoCleanup(const F& f) {
382
222k
    auto end = transactions_.begin() + remaining_transactions_;
383
396k
    for (auto transaction = transactions_.begin(); transaction != end;) {
384
263k
      if (!VERIFY_RESULT(f(&*transaction))) {
385
133k
        ++transaction;
386
133k
        continue;
387
133k
      }
388
129k
      if (--end == transaction) {
389
89.9k
        break;
390
89.9k
      }
391
39.1k
      std::swap(*transaction, *end);
392
39.1k
    }
393
222k
    remaining_transactions_ = end - transactions_.begin();
394
395
222k
    return remaining_transactions_ == 0;
396
222k
  }
397
398
  // Removes all transactions that would not conflict with us anymore.
399
  // Returns failure if we conflict with transaction that cannot be aborted.
400
222k
  Result<bool> Cleanup() {
401
263k
    return DoCleanup([this](auto* transaction) -> Result<bool> {
402
263k
      return this->CheckCleanup(transaction);
403
263k
    });
404
222k
  }
405
406
263k
  Result<bool> CheckCleanup(TransactionData* transaction) {
407
263k
    RETURN_NOT_OK(transaction->failure);
408
263k
    auto status = transaction->status;
409
263k
    if (status == TransactionStatus::COMMITTED) {
410
57.6k
      if (VERIFY_RESULT(context_->CheckConflictWithCommitted(
411
33.1k
              *transaction, transaction->commit_time))) {
412
0
        VLOG_WITH_PREFIX(4)
413
0
            << "Committed: " << transaction->id << ", commit time: " << transaction->commit_time;
414
33.1k
        return true;
415
33.1k
      }
416
205k
    } else if (status == TransactionStatus::ABORTED) {
417
76.3k
      auto commit_time = status_manager().LocalCommitTime(transaction->id);
418
76.3k
      if (commit_time) {
419
21
        if (VERIFY_RESULT(context_->CheckConflictWithCommitted(*transaction, commit_time))) {
420
0
          VLOG_WITH_PREFIX(4)
421
0
              << "Locally committed: " << transaction->id << "< commit time: " << commit_time;
422
21
          return true;
423
21
        }
424
76.2k
      } else {
425
1
        VLOG_WITH_PREFIX(4) << "Aborted: " << transaction->id;
426
76.2k
        return true;
427
76.2k
      }
428
129k
    } else if (status != TransactionStatus::PENDING && status != TransactionStatus::APPLYING) {
429
0
      return STATUS_FORMAT(
430
0
          IllegalState, "Unexpected transaction state: $0", TransactionStatus_Name(status));
431
0
    }
432
153k
    return false;
433
153k
  }
434
435
336k
  boost::iterator_range<TransactionData*> RemainingTransactions() {
436
336k
    auto begin = transactions_.data();
437
336k
    return boost::make_iterator_range(begin, begin + remaining_transactions_);
438
336k
  }
439
440
187k
  void FetchTransactionStatuses() {
441
187k
    static const std::string kRequestReason = "conflict resolution"s;
442
187k
    auto self = shared_from_this();
443
187k
    pending_requests_.store(remaining_transactions_);
444
225k
    for (auto& i : RemainingTransactions()) {
445
225k
      auto& transaction = i;
446
225k
      TRACE("FetchingTransactionStatus for $0", yb::ToString(transaction.id));
447
225k
      StatusRequest request = {
448
225k
        &transaction.id,
449
225k
        context_->GetResolutionHt(),
450
225k
        context_->GetResolutionHt(),
451
225k
        0, // serial no. Could use 0 here, because read_ht == global_limit_ht.
452
           // So we cannot accept status with time >= read_ht and < global_limit_ht.
453
225k
        &kRequestReason,
454
225k
        TransactionLoadFlags{TransactionLoadFlag::kCleanup},
455
225k
        [self, &transaction](Result<TransactionStatusResult> result) {
456
225k
          if (result.ok()) {
457
175k
            transaction.ProcessStatus(*result);
458
49.3k
          } else if (result.status().IsTryAgain()) {
459
            // It is safe to suppose that transaction in PENDING state in case of try again error.
460
43.9k
            transaction.status = TransactionStatus::PENDING;
461
5.47k
          } else if (result.status().IsNotFound()) {
462
5.46k
            transaction.status = TransactionStatus::ABORTED;
463
3
          } else {
464
3
            transaction.failure = result.status();
465
3
          }
466
225k
          if (self->pending_requests_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
467
187k
            self->FetchTransactionStatusesDone();
468
187k
          }
469
225k
        }
470
225k
      };
471
225k
      status_manager().RequestStatusAt(request);
472
225k
    }
473
187k
  }
474
475
35.2k
  void AbortTransactions() {
476
35.2k
    auto self = shared_from_this();
477
35.2k
    pending_requests_.store(remaining_transactions_);
478
38.7k
    for (auto& i : RemainingTransactions()) {
479
38.7k
      auto& transaction = i;
480
38.7k
      TRACE("Aborting $0", yb::ToString(transaction.id));
481
38.7k
      status_manager().Abort(
482
38.7k
          transaction.id,
483
38.7k
          [self, &transaction](Result<TransactionStatusResult> result) {
484
1
        VLOG(4) << self->LogPrefix() << "Abort received: " << AsString(result);
485
38.7k
        if (result.ok()) {
486
38.6k
          transaction.ProcessStatus(*result);
487
20
        } else if (result.status().IsRemoteError() || result.status().IsAborted()) {
488
          // Non retryable errors. Aborted could be caused by shutdown.
489
0
          transaction.failure = result.status();
490
19
        } else {
491
19
          LOG(INFO) << self->LogPrefix() << "Abort failed, would retry: " << result.status();
492
19
        }
493
38.7k
        if (self->pending_requests_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
494
35.2k
          self->AbortTransactionsDone();
495
35.2k
        }
496
38.7k
      });
497
38.7k
    }
498
35.2k
  }
499
500
35.2k
  void AbortTransactionsDone() {
501
35.2k
    if (CheckResolutionDone(Cleanup())) {
502
30.6k
      return;
503
30.6k
    }
504
505
4.53k
    DoResolveConflicts();
506
4.53k
  }
507
508
21
  std::string LogPrefix() const {
509
21
    return context_->LogPrefix();
510
21
  }
511
512
  DocDB doc_db_;
513
  TransactionStatusManager& status_manager_;
514
  RequestScope request_scope_;
515
  PartialRangeKeyIntents partial_range_key_intents_;
516
  std::unique_ptr<ConflictResolverContext> context_;
517
  ResolutionCallback callback_;
518
519
  BoundedRocksDbIterator intent_iter_;
520
  Slice intent_key_upperbound_;
521
  TransactionConflictInfoMap conflicts_;
522
523
  // Resolution state for all transactions. Resolved transactions are moved to the end of it.
524
  std::vector<TransactionData> transactions_;
525
  // Number of transactions that are not yet resolved. After successful resolution should be 0.
526
  size_t remaining_transactions_;
527
528
  std::atomic<size_t> pending_requests_{0};
529
};
530
531
struct IntentData {
532
  IntentTypeSet types;
533
  bool full_doc_key;
534
535
0
  std::string ToString() const {
536
0
    return YB_STRUCT_TO_STRING(types, full_doc_key);
537
0
  }
538
};
539
540
using IntentTypesContainer = std::map<KeyBuffer, IntentData>;
541
542
class IntentProcessor {
543
 public:
544
  IntentProcessor(IntentTypesContainer* container, const IntentTypeSet& strong_intent_types)
545
      : container_(*container),
546
        strong_intent_types_(strong_intent_types),
547
        weak_intent_types_(StrongToWeak(strong_intent_types_))
548
717k
  {}
549
550
19.1M
  void Process(IntentStrength strength, FullDocKey full_doc_key, KeyBytes* intent_key) {
551
19.1M
    const auto is_strong = strength == IntentStrength::kStrong;
552
12.4M
    const auto& intent_type_set = is_strong ? strong_intent_types_ : weak_intent_types_;
553
19.1M
    auto i = container_.find(intent_key->data());
554
19.1M
    if (i == container_.end()) {
555
9.57M
      container_.emplace(intent_key->data(),
556
9.57M
                         IntentData{intent_type_set, full_doc_key});
557
9.57M
      return;
558
9.57M
    }
559
560
9.55M
    i->second.types |= intent_type_set;
561
562
    // In a batch of keys, the computed full_doc_key value might vary based on the key that produced
563
    // a particular intent. E.g. suppose we have a primary key (h, r) and s is a subkey. If we are
564
    // trying to write strong intents on (h) and (h, r, s) in a batch, we end up with the following
565
    // intent types:
566
    //
567
    // (h) -> strong, full_doc_key: true (always true for strong intents)
568
    // (h, r) -> weak, full_doc_key: true (we did not omit any final doc key components)
569
    // (h, r, s) -> strong, full_doc_key: true
570
    //
571
    // Note that full_doc_key is always true for strong intents because we process one key at a time
572
    // and when taking that key by itself, (h) looks like the full doc key (nothing follows it).
573
    // In the above example, the intent (h) is generated both as a strong intent and as a weak
574
    // intent based on keys (h, r) and (h, r, s), and we OR the value of full_doc_key and end up
575
    // with true.
576
    //
577
    // If we are trying to write strong intents on (h, r) and (h, r, s), we get:
578
    //
579
    // (h) -> weak, full_doc_key: false (because we know it is just part of the doc key)
580
    // (h, r) -> strong, full_doc_key: true
581
    // (h, r, s) -> strong, full_doc_key: true
582
    //
583
    // So we effectively end up with three types of intents:
584
    // - Weak intents with full_doc_key=false
585
    // - Weak intents with full_doc_key=true
586
    // - Strong intents with full_doc_key=true.
587
9.55M
    i->second.full_doc_key = i->second.full_doc_key || full_doc_key;
588
9.55M
  }
589
590
 private:
591
  IntentTypesContainer& container_;
592
  const IntentTypeSet strong_intent_types_;
593
  const IntentTypeSet weak_intent_types_;
594
};
595
596
class StrongConflictChecker {
597
 public:
598
  StrongConflictChecker(const TransactionId& transaction_id,
599
                        HybridTime read_time,
600
                        ConflictResolver* resolver,
601
                        Counter* conflicts_metric,
602
                        KeyBytes* buffer)
603
      : transaction_id_(transaction_id),
604
        read_time_(read_time),
605
        resolver_(*resolver),
606
        conflicts_metric_(*conflicts_metric),
607
        buffer_(*buffer)
608
546k
  {}
609
610
6.76M
  CHECKED_STATUS Check(const Slice& intent_key, bool strong, WaitPolicy wait_policy) {
611
6.76M
    const auto hash = VERIFY_RESULT(DecodeDocKeyHash(intent_key));
612
6.76M
    if (PREDICT_FALSE(!value_iter_.Initialized() || hash != value_iter_hash_)) {
613
1.38M
      value_iter_ = CreateRocksDBIterator(
614
1.38M
          resolver_.doc_db().regular,
615
1.38M
          resolver_.doc_db().key_bounds,
616
1.38M
          BloomFilterMode::USE_BLOOM_FILTER,
617
1.38M
          intent_key,
618
1.38M
          rocksdb::kDefaultQueryId);
619
1.38M
      value_iter_hash_ = hash;
620
1.38M
    }
621
6.76M
    value_iter_.Seek(intent_key);
622
743
    VLOG_WITH_PREFIX_AND_FUNC(4)
623
743
        << "Overwrite; Seek: " << intent_key.ToDebugString() << " ("
624
743
        << SubDocKey::DebugSliceToString(intent_key) << "), strong: " << strong << ", wait_policy: "
625
743
        << AsString(wait_policy);
626
    // If we are resolving conflicts for writing a strong intent, look at records in regular RocksDB
627
    // with the same key as the intent's key (not including hybrid time) and any child keys. This is
628
    // because a strong intent indicates deletion or replacement of the entire subdocument tree and
629
    // any element of that tree that has already been committed at a higher hybrid time than the
630
    // read timestamp would be in conflict.
631
    //
632
    // (Note that when writing a strong intent on the entire table, e.g. as part of locking the
633
    // table, there is currently a performance issue and we'll need a better approach:
634
    // https://github.com/yugabyte/yugabyte-db/issues/6055).
635
    //
636
    // If we are resolving conflicts for writing a weak intent, only look at records in regular
637
    // RocksDB with the same key as the intent (not including hybrid time). This is because a weak
638
    // intent indicates that something in the document subtree rooted at that intent's key will
639
    // change, so it is only directly in conflict with a committed record that deletes or replaces
640
    // that entire document subtree (similar to a strong intent), so it would have the same exact
641
    // key as the weak intent (not including hybrid time).
642
7.89M
    while (value_iter_.Valid() &&
643
5.51M
           (intent_key.starts_with(ValueTypeAsChar::kGroupEnd) ||
644
5.51M
            value_iter_.key().starts_with(intent_key))) {
645
1.41M
      auto existing_key = value_iter_.key();
646
1.41M
      auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&existing_key));
647
1.41M
      if (existing_key.empty() ||
648
1.41M
          existing_key[existing_key.size() - 1] != ValueTypeAsChar::kHybridTime) {
649
0
        return STATUS_FORMAT(
650
0
            Corruption, "Hybrid time expected at end of key: $0",
651
0
            value_iter_.key().ToDebugString());
652
0
      }
653
1.41M
      if (!strong && existing_key.size() != intent_key.size() + 1) {
654
18.4E
        VLOG_WITH_PREFIX(4)
655
18.4E
            << "Check value overwrite, key: " << intent_key.ToDebugString()
656
18.4E
            << ", out of bound key: " << existing_key.ToDebugString();
657
287k
        break;
658
287k
      }
659
1.12M
      VLOG_WITH_PREFIX(4)
660
45
          << "Check value overwrite, key: " << SubDocKey::DebugSliceToString(intent_key)
661
45
          << ", read time: " << read_time_
662
45
          << ", doc ht: " << doc_ht.hybrid_time()
663
45
          << ", found key: " << SubDocKey::DebugSliceToString(value_iter_.key())
664
45
          << ", after start: " << (doc_ht.hybrid_time() >= read_time_)
665
45
          << ", value: " << value_iter_.value().ToDebugString();
666
1.12M
      if (doc_ht.hybrid_time() >= read_time_) {
667
1.85k
        if (wait_policy == WAIT_SKIP) {
668
3
          return STATUS(InternalError, "Skip locking since entity was modified in regular db",
669
3
                        TransactionError(TransactionErrorCode::kSkipLocking));
670
1.85k
        } else {
671
1.85k
          conflicts_metric_.Increment();
672
1.85k
          return STATUS_EC_FORMAT(TryAgain, TransactionError(TransactionErrorCode::kConflict),
673
1.85k
                                  "Value write after transaction start: $0 >= $1",
674
1.85k
                                  doc_ht.hybrid_time(), read_time_);
675
1.85k
        }
676
1.12M
      }
677
1.12M
      buffer_.Reset(existing_key);
678
      // Already have ValueType::kHybridTime at the end
679
1.12M
      buffer_.AppendHybridTime(DocHybridTime::kMin);
680
1.12M
      ROCKSDB_SEEK(&value_iter_, buffer_.AsSlice());
681
1.12M
    }
682
683
6.76M
    return Status::OK();
684
6.76M
  }
685
686
 private:
687
0
  std::string LogPrefix() const {
688
0
    return Format("$0: ", transaction_id_);
689
0
  }
690
691
  const TransactionId& transaction_id_;
692
  const HybridTime read_time_;
693
  ConflictResolver& resolver_;
694
  Counter& conflicts_metric_;
695
  KeyBytes& buffer_;
696
697
  // RocksDb iterator with bloom filter can be reused in case keys has same hash component.
698
  BoundedRocksDbIterator value_iter_;
699
  boost::optional<DocKeyHash> value_iter_hash_;
700
701
};
702
703
class ConflictResolverContextBase : public ConflictResolverContext {
704
 public:
705
  ConflictResolverContextBase(const DocOperations& doc_ops,
706
                              HybridTime resolution_ht,
707
                              Counter* conflicts_metric)
708
      : doc_ops_(doc_ops),
709
        resolution_ht_(resolution_ht),
710
604k
        conflicts_metric_(conflicts_metric) {
711
604k
  }
712
713
604k
  const DocOperations& doc_ops() {
714
604k
    return doc_ops_;
715
604k
  }
716
717
955k
  HybridTime GetResolutionHt() override {
718
955k
    return resolution_ht_;
719
955k
  }
720
721
29
  void MakeResolutionAtLeast(const HybridTime& resolution_ht) {
722
29
    resolution_ht_.MakeAtLeast(resolution_ht);
723
29
  }
724
725
645k
  Counter* GetConflictsMetric() {
726
645k
    return conflicts_metric_;
727
645k
  }
728
729
 protected:
730
  CHECKED_STATUS CheckPriorityInternal(
731
      ConflictResolver* resolver,
732
      boost::iterator_range<TransactionData*> transactions,
733
      const TransactionId& our_transaction_id,
734
85.0k
      uint64_t our_priority) {
735
736
85.0k
    if (!fetched_metadata_for_transactions_) {
737
85.0k
      boost::container::small_vector<std::pair<TransactionId, uint64_t>, 8> ids_and_priorities;
738
85.0k
      ids_and_priorities.reserve(transactions.size());
739
99.5k
      for (const auto& transaction : transactions) {
740
99.5k
        ids_and_priorities.emplace_back(transaction.id, 0);
741
99.5k
      }
742
85.0k
      resolver->FillPriorities(&ids_and_priorities);
743
184k
      for (size_t i = 0; i != transactions.size(); ++i) {
744
99.6k
        transactions[i].priority = ids_and_priorities[i].second;
745
99.6k
      }
746
85.0k
    }
747
90.2k
    for (const auto& transaction : transactions) {
748
90.2k
      auto their_priority = transaction.priority;
749
90.2k
      if (transaction.wait_policy == WAIT_SKIP) {
750
72
        return STATUS(InternalError, "Skip locking since entity is already locked",
751
72
                      TransactionError(TransactionErrorCode::kSkipLocking));
752
72
      }
753
754
      // READ COMMITTED txns require a guarantee that no txn abort it. They can handle facing a
755
      // kConflict due to another txn's conflicting intent, but can't handle aborts. To ensure
756
      // these guarantees -
757
      //   1. all READ COMMITTED txns are given kHighestPriority and
758
      //   2. a kConflict is raised even if their_priority equals our_priority.
759
90.1k
      if (our_priority <= their_priority) {
760
59.9k
        return MakeConflictStatus(
761
59.9k
            our_transaction_id, transaction.id, "higher priority", GetConflictsMetric());
762
59.9k
      }
763
90.1k
    }
764
25.1k
    fetched_metadata_for_transactions_ = true;
765
766
25.1k
    return Status::OK();
767
85.0k
  }
768
769
 private:
770
  const DocOperations& doc_ops_;
771
772
  // Hybrid time of conflict resolution, used to request transaction status from status tablet.
773
  HybridTime resolution_ht_;
774
775
  bool fetched_metadata_for_transactions_ = false;
776
777
  Counter* conflicts_metric_ = nullptr;
778
};
779
780
// Utility class for ResolveTransactionConflicts implementation.
781
class TransactionConflictResolverContext : public ConflictResolverContextBase {
782
 public:
783
  TransactionConflictResolverContext(const DocOperations& doc_ops,
784
                                     const KeyValueWriteBatchPB& write_batch,
785
                                     HybridTime resolution_ht,
786
                                     HybridTime read_time,
787
                                     Counter* conflicts_metric)
788
      : ConflictResolverContextBase(doc_ops, resolution_ht, conflicts_metric),
789
        write_batch_(write_batch),
790
        read_time_(read_time),
791
        transaction_id_(FullyDecodeTransactionId(write_batch.transaction().transaction_id()))
792
545k
  {}
793
794
547k
  virtual ~TransactionConflictResolverContext() {}
795
796
 private:
797
545k
  CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) override {
798
545k
    RETURN_NOT_OK(transaction_id_);
799
800
4.56k
    VLOG_WITH_PREFIX(3) << "Resolve conflicts";
801
802
545k
    metadata_ = VERIFY_RESULT(resolver->PrepareMetadata(write_batch_.transaction()));
803
804
545k
    boost::container::small_vector<RefCntPrefix, 8> paths;
805
806
545k
    const size_t kKeyBufferInitialSize = 512;
807
545k
    KeyBytes buffer;
808
545k
    buffer.Reserve(kKeyBufferInitialSize);
809
545k
    const auto row_mark = GetRowMarkTypeFromPB(write_batch_);
810
545k
    IntentTypesContainer container;
811
545k
    IntentProcessor write_processor(
812
545k
        &container,
813
545k
        GetStrongIntentTypeSet(metadata_.isolation, docdb::OperationKind::kWrite, row_mark));
814
2.17M
    for (const auto& doc_op : doc_ops()) {
815
2.17M
      paths.clear();
816
2.17M
      IsolationLevel ignored_isolation_level;
817
2.17M
      RETURN_NOT_OK(doc_op->GetDocPaths(
818
2.17M
          GetDocPathsMode::kIntents, &paths, &ignored_isolation_level));
819
820
5.18M
      for (const auto& path : paths) {
821
227
        VLOG_WITH_PREFIX_AND_FUNC(4)
822
227
            << "Doc path: " << SubDocKey::DebugSliceToString(path.as_slice());
823
5.18M
        RETURN_NOT_OK(EnumerateIntents(
824
5.18M
            path.as_slice(),
825
5.18M
            /* intent_value */ Slice(),
826
5.18M
            [&write_processor](
827
5.18M
                auto strength, FullDocKey full_doc_key, auto, auto intent_key, auto) {
828
5.18M
              write_processor.Process(strength, full_doc_key, intent_key);
829
5.18M
              return Status::OK();
830
5.18M
            },
831
5.18M
            &buffer,
832
5.18M
            resolver->partial_range_key_intents()));
833
5.18M
      }
834
2.17M
    }
835
    // Either write_batch_.read_pairs is not empty or doc_ops is non empty. Both can't be non empty
836
    // together. This is because read_pairs is filled only in case of a read operation that has a
837
    // row mark or is part of a serializable txn.
838
    // 1. In case doc_ops are present, we use the default wait policy of WAIT_ERROR.
839
    // 2. In case of a read rpc that has wait_policy, we use that.
840
545k
    auto wait_policy = WAIT_ERROR;
841
545k
    const auto& pairs = write_batch_.read_pairs();
842
545k
    if (!pairs.empty()) {
843
170k
      IntentProcessor read_processor(
844
170k
          &container,
845
170k
          GetStrongIntentTypeSet(metadata_.isolation, docdb::OperationKind::kRead, row_mark));
846
170k
      wait_policy = write_batch_.wait_policy();
847
170k
      RETURN_NOT_OK(EnumerateIntents(
848
170k
          pairs,
849
170k
          [&read_processor] (
850
170k
              auto strength, FullDocKey full_doc_key, auto, auto intent_key, auto) {
851
170k
            read_processor.Process(strength, full_doc_key, intent_key);
852
170k
            return Status::OK();
853
170k
          },
854
170k
          resolver->partial_range_key_intents()));
855
170k
    }
856
857
545k
    if (container.empty()) {
858
0
      return Status::OK();
859
0
    }
860
861
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Check txn's conflicts for following intents: "
862
18.4E
                                 << AsString(container);
863
864
545k
    StrongConflictChecker checker(
865
545k
        *transaction_id_, read_time_, resolver, GetConflictsMetric(), &buffer);
866
    // Iterator on intents DB should be created before iterator on regular DB.
867
    // This is to prevent the case when we create an iterator on the regular DB where a
868
    // provisional record has not yet been applied, and then create an iterator the intents
869
    // DB where the provisional record has already been removed.
870
545k
    resolver->EnsureIntentIteratorCreated();
871
872
9.56M
    for (const auto& i : container) {
873
9.56M
      if (read_time_ != HybridTime::kMax) {
874
7.68M
        const Slice intent_key = i.first.AsSlice();
875
7.68M
        bool strong = HasStrong(i.second.types);
876
        // For strong intents or weak intents at a full document key level (i.e. excluding intents
877
        // that omit some final range components of the document key), check for conflicts with
878
        // records in regular RocksDB. We need this because the row might have been deleted
879
        // concurrently by a single-shard transaction or a committed and applied transaction.
880
7.68M
        if (strong || i.second.full_doc_key) {
881
6.76M
          RETURN_NOT_OK(checker.Check(intent_key, strong, wait_policy));
882
6.76M
        }
883
7.68M
      }
884
9.56M
      buffer.Reset(i.first.AsSlice());
885
9.56M
      RETURN_NOT_OK(resolver->ReadIntentConflicts(i.second.types, &buffer, wait_policy));
886
9.56M
    }
887
888
543k
    return Status::OK();
889
545k
  }
890
891
  CHECKED_STATUS CheckPriority(ConflictResolver* resolver,
892
109k
                               boost::iterator_range<TransactionData*> transactions) override {
893
109k
    return CheckPriorityInternal(resolver, transactions, metadata_.transaction_id,
894
109k
                                 metadata_.priority);
895
109k
  }
896
897
  Result<bool> CheckConflictWithCommitted(
898
98.7k
      const TransactionData& transaction_data, HybridTime commit_time) override {
899
98.7k
    RSTATUS_DCHECK(commit_time.is_valid(), Corruption, "Invalid transaction commit time");
900
901
1
    VLOG_WITH_PREFIX(4) << "Committed: " << transaction_data.id << ", commit_time: " << commit_time
902
1
                        << ", read_time: " << read_time_
903
1
                        << ", wait_policy:" << transaction_data.wait_policy
904
1
                        << ", all_lock_only_conflicts" << transaction_data.all_lock_only_conflicts;
905
906
    // If the intents to be written conflict with only "explicit row lock" intents of a committed
907
    // transaction, we can proceed now because a committed transaction implies that the locks are
908
    // released. In other words, only a committed transaction with some conflicting intent that
909
    // results in a modification to data in regular db, can result in a serialization error.
910
    //
911
    // commit_time equals to HybridTime::kMax means that transaction is not actually committed,
912
    // but is being committed. I.e. status tablet is trying to replicate COMMITTED state.
913
    // So we should always conflict with such transaction, because we are not able to read its
914
    // results.
915
    //
916
    // read_time equals to HybridTime::kMax in case of serializable isolation or when
917
    // read time was not yet picked for snapshot isolation.
918
    // So it should conflict only with transactions that are being committed.
919
    //
920
    // In all other cases we have concrete read time and should conflict with transactions
921
    // that were committed after this point.
922
98.7k
    if (!transaction_data.all_lock_only_conflicts && commit_time >= read_time_) {
923
19.5k
      if (transaction_data.wait_policy == WAIT_SKIP) {
924
1
        return STATUS(InternalError, "Skip locking since entity was modified by a recent commit",
925
1
                      TransactionError(TransactionErrorCode::kSkipLocking));
926
19.5k
      } else {
927
19.5k
        return MakeConflictStatus(
928
19.5k
          *transaction_id_, transaction_data.id, "committed", GetConflictsMetric());
929
19.5k
      }
930
79.1k
    }
931
932
79.1k
    return true;
933
79.1k
  }
934
935
437k
  bool IgnoreConflictsWith(const TransactionId& other) override {
936
437k
    return other == *transaction_id_;
937
437k
  }
938
939
0
  TransactionId transaction_id() const override {
940
0
    return *transaction_id_;
941
0
  }
942
943
20
  std::string ToString() const override {
944
20
    return yb::ToString(transaction_id_);
945
20
  }
946
947
  const KeyValueWriteBatchPB& write_batch_;
948
949
  // Read time of the transaction identified by transaction_id_, could be HybridTime::kMax in case
950
  // of serializable isolation or when read time not yet picked for snapshot isolation.
951
  const HybridTime read_time_;
952
953
  // Id of transaction when is writing intents, for which we are resolving conflicts.
954
  Result<TransactionId> transaction_id_;
955
956
  TransactionMetadata metadata_;
957
958
  Status result_ = Status::OK();
959
};
960
961
class OperationConflictResolverContext : public ConflictResolverContextBase {
962
 public:
963
  OperationConflictResolverContext(const DocOperations* doc_ops,
964
                                   HybridTime resolution_ht,
965
                                   Counter* conflicts_metric)
966
58.3k
      : ConflictResolverContextBase(*doc_ops, resolution_ht, conflicts_metric) {
967
58.3k
  }
968
969
58.3k
  virtual ~OperationConflictResolverContext() {}
970
971
  // Reads stored intents that could conflict with our operations.
972
58.2k
  CHECKED_STATUS ReadConflicts(ConflictResolver* resolver) override {
973
58.2k
    boost::container::small_vector<RefCntPrefix, 8> doc_paths;
974
58.2k
    boost::container::small_vector<size_t, 32> key_prefix_lengths;
975
58.2k
    KeyBytes encoded_key_buffer;
976
977
58.2k
    IntentTypeSet strong_intent_types;
978
979
58.2k
    EnumerateIntentsCallback callback = [&strong_intent_types, resolver](
980
58.2k
        IntentStrength intent_strength, FullDocKey full_doc_key, Slice,
981
727k
        KeyBytes* encoded_key_buffer, LastKey) {
982
727k
      return resolver->ReadIntentConflicts(
983
193k
          intent_strength == IntentStrength::kStrong ? strong_intent_types
984
533k
                                                     : StrongToWeak(strong_intent_types),
985
727k
          encoded_key_buffer, WAIT_ERROR);
986
727k
    };
987
988
170k
    for (const auto& doc_op : doc_ops()) {
989
170k
      doc_paths.clear();
990
170k
      IsolationLevel isolation;
991
170k
      RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kIntents, &doc_paths, &isolation));
992
993
170k
      strong_intent_types = GetStrongIntentTypeSet(isolation, OperationKind::kWrite,
994
170k
                                                   RowMarkType::ROW_MARK_ABSENT);
995
996
193k
      for (const auto& doc_path : doc_paths) {
997
20
        VLOG_WITH_PREFIX_AND_FUNC(4)
998
20
            << "Doc path: " << SubDocKey::DebugSliceToString(doc_path.as_slice());
999
193k
        RETURN_NOT_OK(EnumerateIntents(
1000
193k
            doc_path.as_slice(), Slice(), callback, &encoded_key_buffer,
1001
193k
            PartialRangeKeyIntents::kTrue));
1002
193k
      }
1003
170k
    }
1004
1005
58.2k
    return Status::OK();
1006
58.2k
  }
1007
1008
  CHECKED_STATUS CheckPriority(ConflictResolver* resolver,
1009
4.51k
                               boost::iterator_range<TransactionData*> transactions) override {
1010
4.51k
    return CheckPriorityInternal(resolver,
1011
4.51k
                                 transactions,
1012
4.51k
                                 TransactionId::Nil(),
1013
4.51k
                                 kHighPriTxnLowerBound - 1 /* our_priority */);
1014
4.51k
  }
1015
1016
48
  bool IgnoreConflictsWith(const TransactionId& other) override {
1017
48
    return false;
1018
48
  }
1019
1020
0
  TransactionId transaction_id() const override {
1021
0
    return TransactionId::Nil();
1022
0
  }
1023
1024
1
  std::string ToString() const override {
1025
1
    return "Operation Context";
1026
1
  }
1027
1028
  Result<bool> CheckConflictWithCommitted(
1029
4.99k
      const TransactionData& transaction_data, HybridTime commit_time) override {
1030
4.99k
    if (commit_time != HybridTime::kMax) {
1031
29
      MakeResolutionAtLeast(commit_time);
1032
29
      return true;
1033
29
    }
1034
4.96k
    return false;
1035
4.96k
  }
1036
};
1037
1038
} // namespace
1039
1040
void ResolveTransactionConflicts(const DocOperations& doc_ops,
1041
                                 const KeyValueWriteBatchPB& write_batch,
1042
                                 HybridTime hybrid_time,
1043
                                 HybridTime read_time,
1044
                                 const DocDB& doc_db,
1045
                                 PartialRangeKeyIntents partial_range_key_intents,
1046
                                 TransactionStatusManager* status_manager,
1047
                                 Counter* conflicts_metric,
1048
546k
                                 ResolutionCallback callback) {
1049
546k
  DCHECK(hybrid_time.is_valid());
1050
546k
  TRACE("ResolveTransactionConflicts");
1051
546k
  auto context = std::make_unique<TransactionConflictResolverContext>(
1052
546k
      doc_ops, write_batch, hybrid_time, read_time, conflicts_metric);
1053
546k
  auto resolver = std::make_shared<ConflictResolver>(
1054
546k
      doc_db, status_manager, partial_range_key_intents, std::move(context), std::move(callback));
1055
  // Resolve takes a self reference to extend lifetime.
1056
546k
  resolver->Resolve();
1057
546k
  TRACE("resolver->Resolve done");
1058
546k
}
1059
1060
void ResolveOperationConflicts(const DocOperations& doc_ops,
1061
                               HybridTime resolution_ht,
1062
                               const DocDB& doc_db,
1063
                               PartialRangeKeyIntents partial_range_key_intents,
1064
                               TransactionStatusManager* status_manager,
1065
                               Counter* conflicts_metric,
1066
58.3k
                               ResolutionCallback callback) {
1067
58.3k
  TRACE("ResolveOperationConflicts");
1068
58.3k
  auto context = std::make_unique<OperationConflictResolverContext>(&doc_ops, resolution_ht,
1069
58.3k
                                                                    conflicts_metric);
1070
58.3k
  auto resolver = std::make_shared<ConflictResolver>(
1071
58.3k
      doc_db, status_manager, partial_range_key_intents, std::move(context), std::move(callback));
1072
  // Resolve takes a self reference to extend lifetime.
1073
58.3k
  resolver->Resolve();
1074
58.3k
  TRACE("resolver->Resolve done");
1075
58.3k
}
1076
1077
#define INTENT_KEY_SCHECK(lhs, op, rhs, msg) \
1078
92.1M
  BOOST_PP_CAT(SCHECK_, op)(lhs, \
1079
92.1M
                            rhs, \
1080
92.1M
                            Corruption, \
1081
92.1M
                            Format("Bad intent key, $0 in $1, transaction from: $2", \
1082
92.1M
                                   msg, \
1083
92.1M
                                   intent_key.ToDebugHexString(), \
1084
92.1M
                                   transaction_id_source.ToDebugHexString()))
1085
1086
// transaction_id_slice used in INTENT_KEY_SCHECK
1087
30.7M
Result<ParsedIntent> ParseIntentKey(Slice intent_key, Slice transaction_id_source) {
1088
30.7M
  ParsedIntent result;
1089
30.7M
  size_t doc_ht_size = 0;
1090
30.7M
  result.doc_path = intent_key;
1091
  // Intent is encoded as "DocPath + IntentType + DocHybridTime".
1092
30.7M
  RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(result.doc_path, &doc_ht_size));
1093
  // 3 comes from (ValueType::kIntentType, the actual intent type, ValueType::kHybridTime).
1094
30.7M
  INTENT_KEY_SCHECK(result.doc_path.size(), GE, doc_ht_size + 3, "key too short");
1095
30.7M
  result.doc_path.remove_suffix(doc_ht_size + 3);
1096
30.7M
  auto intent_type_and_doc_ht = result.doc_path.end();
1097
30.7M
  if (intent_type_and_doc_ht[0] == ValueTypeAsChar::kObsoleteIntentType) {
1098
0
    result.types = ObsoleteIntentTypeToSet(intent_type_and_doc_ht[1]);
1099
30.7M
  } else if (intent_type_and_doc_ht[0] == ValueTypeAsChar::kObsoleteIntentTypeSet) {
1100
0
    result.types = ObsoleteIntentTypeSetToNew(intent_type_and_doc_ht[1]);
1101
30.7M
  } else {
1102
30.7M
    INTENT_KEY_SCHECK(intent_type_and_doc_ht[0], EQ, ValueTypeAsChar::kIntentTypeSet,
1103
30.7M
        "intent type set type expected");
1104
30.7M
    result.types = IntentTypeSet(intent_type_and_doc_ht[1]);
1105
30.7M
  }
1106
30.7M
  INTENT_KEY_SCHECK(intent_type_and_doc_ht[2], EQ, ValueTypeAsChar::kHybridTime,
1107
30.7M
                    "hybrid time value type expected");
1108
30.7M
  result.doc_ht = Slice(result.doc_path.end() + 2, doc_ht_size + 1);
1109
30.7M
  return result;
1110
30.7M
}
1111
1112
0
std::string DebugIntentKeyToString(Slice intent_key) {
1113
0
  auto parsed = ParseIntentKey(intent_key, Slice());
1114
0
  if (!parsed.ok()) {
1115
0
    LOG(WARNING) << "Failed to parse: " << intent_key.ToDebugHexString() << ": " << parsed.status();
1116
0
    return intent_key.ToDebugHexString();
1117
0
  }
1118
0
  DocHybridTime doc_ht;
1119
0
  auto status = doc_ht.DecodeFromEnd(parsed->doc_ht);
1120
0
  if (!status.ok()) {
1121
0
    LOG(WARNING) << "Failed to decode doc ht: " << intent_key.ToDebugHexString() << ": " << status;
1122
0
    return intent_key.ToDebugHexString();
1123
0
  }
1124
0
  return Format("$0 (key: $1 type: $2 doc_ht: $3 )",
1125
0
                intent_key.ToDebugHexString(),
1126
0
                SubDocKey::DebugSliceToString(parsed->doc_path),
1127
0
                parsed->types,
1128
0
                doc_ht.ToString());
1129
0
}
1130
1131
} // namespace docdb
1132
} // namespace yb