YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_participant.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/tablet/transaction_participant.h"
17
18
#include <queue>
19
20
#include <boost/multi_index/hashed_index.hpp>
21
#include <boost/multi_index/mem_fun.hpp>
22
#include <boost/multi_index/ordered_index.hpp>
23
24
#include "yb/client/transaction_rpc.h"
25
26
#include "yb/common/pgsql_error.h"
27
#include "yb/common/transaction_error.h"
28
29
#include "yb/consensus/consensus_util.h"
30
31
#include "yb/docdb/docdb_rocksdb_util.h"
32
#include "yb/docdb/transaction_dump.h"
33
34
#include "yb/rpc/poller.h"
35
36
#include "yb/server/clock.h"
37
38
#include "yb/tablet/cleanup_aborts_task.h"
39
#include "yb/tablet/cleanup_intents_task.h"
40
#include "yb/tablet/operations/update_txn_operation.h"
41
#include "yb/tablet/remove_intents_task.h"
42
#include "yb/tablet/running_transaction.h"
43
#include "yb/tablet/running_transaction_context.h"
44
#include "yb/tablet/transaction_loader.h"
45
#include "yb/tablet/transaction_participant_context.h"
46
#include "yb/tablet/transaction_status_resolver.h"
47
48
#include "yb/tserver/tserver_service.pb.h"
49
50
#include "yb/util/countdown_latch.h"
51
#include "yb/util/debug-util.h"
52
#include "yb/util/flag_tags.h"
53
#include "yb/util/format.h"
54
#include "yb/util/logging.h"
55
#include "yb/util/lru_cache.h"
56
#include "yb/util/metrics.h"
57
#include "yb/util/operation_counter.h"
58
#include "yb/util/scope_exit.h"
59
#include "yb/util/status_format.h"
60
#include "yb/util/status_log.h"
61
#include "yb/util/tsan_util.h"
62
63
using namespace std::literals;
64
using namespace std::placeholders;
65
66
DEFINE_uint64(transaction_min_running_check_delay_ms, 50,
67
              "When transaction with minimal start hybrid time is updated at transaction "
68
              "participant, we wait at least this number of milliseconds before checking its "
69
              "status at transaction coordinator. Used for the optimization that deletes "
70
              "provisional records RocksDB SSTable files.");
71
72
DEFINE_uint64(transaction_min_running_check_interval_ms, 250,
73
              "While transaction with minimal start hybrid time remains the same, we will try "
74
              "to check its status at transaction coordinator at regular intervals this "
75
              "long (ms). Used for the optimization that deletes "
76
              "provisional records RocksDB SSTable files.");
77
78
DEFINE_test_flag(double, transaction_ignore_applying_probability, 0,
79
                 "Probability to ignore APPLYING update in tests.");
80
DEFINE_test_flag(bool, fail_in_apply_if_no_metadata, false,
81
                 "Fail when applying intents if metadata is not found.");
82
83
DEFINE_int32(max_transactions_in_status_request, 128,
84
             "Request status for at most specified number of transactions at once. "
85
                 "0 disables load time transaction status resolution.");
86
87
DEFINE_uint64(transactions_cleanup_cache_size, 256, "Transactions cleanup cache size.");
88
89
DEFINE_uint64(transactions_status_poll_interval_ms, 500 * yb::kTimeMultiplier,
90
              "Transactions poll interval.");
91
92
DEFINE_bool(transactions_poll_check_aborted, true, "Check aborted transactions during poll.");
93
94
DECLARE_int64(transaction_abort_check_timeout_ms);
95
96
METRIC_DEFINE_simple_counter(
97
    tablet, transaction_not_found, "Total number of missing transactions during load",
98
    yb::MetricUnit::kTransactions);
99
METRIC_DEFINE_simple_gauge_uint64(
100
    tablet, transactions_running, "Total number of transactions running in participant",
101
    yb::MetricUnit::kTransactions);
102
103
DEFINE_test_flag(int32, txn_participant_inject_latency_on_apply_update_txn_ms, 0,
104
                 "How much latency to inject when a update txn operation is applied.");
105
106
namespace yb {
107
namespace tablet {
108
109
namespace {
110
111
YB_STRONGLY_TYPED_BOOL(PostApplyCleanup);
112
113
} // namespace
114
115
4
std::string TransactionApplyData::ToString() const {
116
4
  return YB_STRUCT_TO_STRING(
117
4
      leader_term, transaction_id, op_id, commit_ht, log_ht, sealed, status_tablet, apply_state);
118
4
}
119
120
class TransactionParticipant::Impl
121
    : public RunningTransactionContext, public TransactionLoaderContext {
122
 public:
123
  Impl(TransactionParticipantContext* context, TransactionIntentApplier* applier,
124
       const scoped_refptr<MetricEntity>& entity)
125
      : RunningTransactionContext(context, applier),
126
        log_prefix_(context->LogPrefix()),
127
        loader_(this, entity),
128
25.8k
        poller_(log_prefix_, std::bind(&Impl::Poll, this)) {
129
25.8k
    LOG_WITH_PREFIX(INFO) << "Create";
130
25.8k
    metric_transactions_running_ = METRIC_transactions_running.Instantiate(entity, 0);
131
25.8k
    metric_transaction_not_found_ = METRIC_transaction_not_found.Instantiate(entity);
132
25.8k
  }
133
134
19.2k
  ~Impl() {
135
19.2k
    if (StartShutdown()) {
136
0
      CompleteShutdown();
137
19.2k
    } else {
138
18.4E
      LOG_IF_WITH_PREFIX(DFATAL, !shutdown_done_.load(std::memory_order_acquire))
139
18.4E
          << "Destroying transaction participant that did not complete shutdown";
140
19.2k
    }
141
19.2k
  }
142
143
38.6k
  bool StartShutdown() {
144
38.6k
    bool expected = false;
145
38.6k
    if (!closing_.compare_exchange_strong(expected, true)) {
146
19.2k
      return false;
147
19.2k
    }
148
149
19.3k
    poller_.Shutdown();
150
151
19.3k
    if (start_latch_.count()) {
152
0
      start_latch_.CountDown();
153
0
    }
154
155
19.3k
    LOG_WITH_PREFIX(INFO) << "Shutdown";
156
19.3k
    return true;
157
19.3k
  }
158
159
19.3k
  void CompleteShutdown() {
160
1
    LOG_IF_WITH_PREFIX(DFATAL, !closing_.load()) << __func__ << " w/o StartShutdown";
161
162
19.3k
    decltype(status_resolvers_) status_resolvers;
163
19.3k
    {
164
19.3k
      MinRunningNotifier min_running_notifier(nullptr /* applier */);
165
19.3k
      std::lock_guard<std::mutex> lock(mutex_);
166
19.3k
      transactions_.clear();
167
19.3k
      TransactionsModifiedUnlocked(&min_running_notifier);
168
19.3k
      status_resolvers.swap(status_resolvers_);
169
19.3k
    }
170
171
19.3k
    rpcs_.Shutdown();
172
19.3k
    loader_.Shutdown();
173
0
    for (auto& resolver : status_resolvers) {
174
0
      resolver.Shutdown();
175
0
    }
176
19.3k
    shutdown_done_.store(true, std::memory_order_release);
177
19.3k
  }
178
179
1
  bool Closing() const override {
180
1
    return closing_.load(std::memory_order_acquire);
181
1
  }
182
183
25.8k
  void Start() {
184
25.8k
    LOG_WITH_PREFIX(INFO) << "Start";
185
25.8k
    start_latch_.CountDown();
186
25.8k
  }
187
188
  // Adds new running transaction.
189
1.02M
  Result<bool> Add(const TransactionMetadata& metadata) {
190
1.02M
    loader_.WaitLoaded(metadata.transaction_id);
191
192
1.02M
    MinRunningNotifier min_running_notifier(&applier_);
193
1.02M
    std::lock_guard<std::mutex> lock(mutex_);
194
1.02M
    auto it = transactions_.find(metadata.transaction_id);
195
1.02M
    if (it != transactions_.end()) {
196
591
      return false;
197
591
    }
198
1.02M
    if (WasTransactionRecentlyRemoved(metadata.transaction_id) ||
199
1.02M
        cleanup_cache_.Erase(metadata.transaction_id) != 0) {
200
60.3k
      auto status = STATUS_EC_FORMAT(
201
60.3k
          TryAgain, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE),
202
60.3k
          "Transaction was recently aborted: $0", metadata.transaction_id);
203
60.3k
      return status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted));
204
60.3k
    }
205
18.4E
    VLOG_WITH_PREFIX(4) << "Create new transaction: " << metadata.transaction_id;
206
967k
    transactions_.insert(std::make_shared<RunningTransaction>(
207
967k
        metadata, TransactionalBatchData(), OneWayBitmap(), metadata.start_time, this));
208
967k
    TransactionsModifiedUnlocked(&min_running_notifier);
209
967k
    return true;
210
967k
  }
211
212
347k
  HybridTime LocalCommitTime(const TransactionId& id) {
213
347k
    std::lock_guard<std::mutex> lock(mutex_);
214
347k
    auto it = transactions_.find(id);
215
347k
    if (it == transactions_.end()) {
216
11.7k
      return HybridTime::kInvalid;
217
11.7k
    }
218
335k
    return (**it).local_commit_time();
219
335k
  }
220
221
67.4k
  boost::optional<CommitMetadata> LocalCommitData(const TransactionId& id) {
222
67.4k
    std::lock_guard<std::mutex> lock(mutex_);
223
67.4k
    auto it = transactions_.find(id);
224
67.4k
    if (it == transactions_.end()) {
225
1.29k
      return boost::none;
226
1.29k
    }
227
66.1k
    return boost::make_optional<CommitMetadata>({
228
66.1k
      .commit_ht = (**it).local_commit_time(),
229
66.1k
      .aborted_subtxn_set = (**it).local_commit_aborted_subtxn_set(),
230
66.1k
    });
231
66.1k
  }
232
233
0
  std::pair<size_t, size_t> TEST_CountIntents() {
234
0
    {
235
0
      MinRunningNotifier min_running_notifier(&applier_);
236
0
      std::lock_guard<std::mutex> lock(mutex_);
237
0
      ProcessRemoveQueueUnlocked(&min_running_notifier);
238
0
    }
239
240
0
    std::pair<size_t, size_t> result(0, 0);
241
0
    auto iter = docdb::CreateRocksDBIterator(db_.intents,
242
0
                                             key_bounds_,
243
0
                                             docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
244
0
                                             boost::none,
245
0
                                             rocksdb::kDefaultQueryId);
246
0
    for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
247
0
      ++result.first;
248
      // Count number of transaction, by counting metadata records.
249
0
      if (iter.key().size() == TransactionId::StaticSize() + 1) {
250
0
        ++result.second;
251
0
        auto key = iter.key();
252
0
        key.remove_prefix(1);
253
0
        auto id = CHECK_RESULT(FullyDecodeTransactionId(key));
254
0
        LOG_WITH_PREFIX(INFO) << "Stored txn meta: " << id;
255
0
      }
256
0
    }
257
258
0
    return result;
259
0
  }
260
261
774k
  Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) {
262
774k
    if (pb.has_isolation()) {
263
443k
      auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(pb));
264
443k
      std::unique_lock<std::mutex> lock(mutex_);
265
443k
      auto it = transactions_.find(metadata.transaction_id);
266
443k
      if (it != transactions_.end()) {
267
306
        RETURN_NOT_OK((**it).CheckAborted());
268
443k
      } else if (WasTransactionRecentlyRemoved(metadata.transaction_id)) {
269
0
        return MakeAbortedStatus(metadata.transaction_id);
270
0
      }
271
443k
      return metadata;
272
443k
    }
273
274
331k
    auto id = VERIFY_RESULT(FullyDecodeTransactionId(pb.transaction_id()));
275
276
    // We are not trying to cleanup intents here because we don't know whether this transaction
277
    // has intents or not.
278
331k
    auto lock_and_iterator = LockAndFind(
279
331k
        id, "metadata"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
280
331k
    if (!lock_and_iterator.found()) {
281
57
      return STATUS(TryAgain,
282
57
                    Format("Unknown transaction, could be recently aborted: $0", id), Slice(),
283
57
                    PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE));
284
57
    }
285
331k
    RETURN_NOT_OK(lock_and_iterator.transaction().CheckAborted());
286
331k
    return lock_and_iterator.transaction().metadata();
287
331k
  }
288
289
  boost::optional<std::pair<IsolationLevel, TransactionalBatchData>> PrepareBatchData(
290
      const TransactionId& id, size_t batch_idx,
291
1.10M
      boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
292
    // We are not trying to cleanup intents here because we don't know whether this transaction
293
    // has intents of not.
294
1.10M
    auto lock_and_iterator = LockAndFind(
295
1.10M
        id, "metadata with write id"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
296
1.10M
    if (!lock_and_iterator.found()) {
297
119
      return boost::none;
298
119
    }
299
1.10M
    auto& transaction = lock_and_iterator.transaction();
300
1.10M
    transaction.AddReplicatedBatch(batch_idx, encoded_replicated_batches);
301
1.10M
    return std::make_pair(transaction.metadata().isolation, transaction.last_batch_data());
302
1.10M
  }
303
304
1.26M
  void BatchReplicated(const TransactionId& id, const TransactionalBatchData& data) {
305
1.26M
    std::lock_guard<std::mutex> lock(mutex_);
306
1.26M
    auto it = transactions_.find(id);
307
1.26M
    if (it == transactions_.end()) {
308
0
      LOG_IF_WITH_PREFIX(DFATAL, !WasTransactionRecentlyRemoved(id))
309
0
          << "Update last write id for unknown transaction: " << id;
310
49
      return;
311
49
    }
312
1.26M
    (**it).BatchReplicated(data);
313
1.26M
  }
314
315
260k
  void RequestStatusAt(const StatusRequest& request) {
316
260k
    auto lock_and_iterator = LockAndFind(*request.id, *request.reason, request.flags);
317
260k
    if (!lock_and_iterator.found()) {
318
6.72k
      request.callback(
319
6.72k
          STATUS_FORMAT(NotFound, "Request status of unknown transaction: $0", *request.id));
320
6.72k
      return;
321
6.72k
    }
322
253k
    lock_and_iterator.transaction().RequestStatusAt(request, &lock_and_iterator.lock);
323
253k
  }
324
325
  // Registers a request, giving it a newly allocated id and returning this id.
326
3.36M
  int64_t RegisterRequest() {
327
3.36M
    std::lock_guard<std::mutex> lock(mutex_);
328
3.36M
    auto result = NextRequestIdUnlocked();
329
3.36M
    running_requests_.push_back(result);
330
3.36M
    return result;
331
3.36M
  }
332
333
  // Unregisters a previously registered request.
334
3.37M
  void UnregisterRequest(int64_t request) {
335
3.37M
    MinRunningNotifier min_running_notifier(&applier_);
336
3.37M
    {
337
3.37M
      std::lock_guard<std::mutex> lock(mutex_);
338
3.37M
      DCHECK(!running_requests_.empty());
339
3.37M
      if (running_requests_.front() != request) {
340
1.23M
        complete_requests_.push(request);
341
1.23M
        return;
342
1.23M
      }
343
2.13M
      running_requests_.pop_front();
344
3.36M
      while (!complete_requests_.empty() && complete_requests_.top() == running_requests_.front()) {
345
1.22M
        complete_requests_.pop();
346
1.22M
        running_requests_.pop_front();
347
1.22M
      }
348
349
2.13M
      CleanTransactionsUnlocked(&min_running_notifier);
350
2.13M
    }
351
2.13M
  }
352
353
  // Cleans transactions that are requested and now is safe to clean.
354
  // See RemoveUnlocked for details.
355
2.13M
  void CleanTransactionsUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
356
2.13M
    ProcessRemoveQueueUnlocked(min_running_notifier);
357
358
2.13M
    CleanTransactionsQueue(&immediate_cleanup_queue_, min_running_notifier);
359
2.13M
    CleanTransactionsQueue(&graceful_cleanup_queue_, min_running_notifier);
360
2.13M
  }
361
362
  template <class Queue>
363
  void CleanTransactionsQueue(
364
4.73M
      Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
365
4.33M
    int64_t min_request = running_requests_.empty() ? std::numeric_limits<int64_t>::max()
366
397k
                                                    : running_requests_.front();
367
4.73M
    HybridTime safe_time;
368
5.02M
    while (!queue->empty()) {
369
418k
      const auto& front = queue->front();
370
418k
      if (front.request_id >= min_request) {
371
62.5k
        break;
372
62.5k
      }
373
355k
      if (!front.Ready(&participant_context_, &safe_time)) {
374
62.7k
        break;
375
62.7k
      }
376
292k
      const auto& id = front.transaction_id;
377
292k
      RemoveIntentsData checkpoint;
378
292k
      auto it = transactions_.find(id);
379
380
292k
      if (it != transactions_.end() && !(**it).ProcessingApply()) {
381
239k
        OpId op_id = (**it).GetOpId();
382
239k
        participant_context_.GetLastCDCedData(&checkpoint);
383
18.4E
        VLOG_WITH_PREFIX(2) << "Cleaning tx opid is " << op_id.ToString()
384
18.4E
                            << " checkpoint opid is " << checkpoint.op_id.ToString();
385
386
239k
        if (checkpoint.op_id < op_id) {
387
190
          break;
388
190
        }
389
238k
        (**it).ScheduleRemoveIntents(*it);
390
238k
        RemoveTransaction(it, front.reason, min_running_notifier);
391
238k
      }
392
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id;
393
292k
      queue->pop_front();
394
292k
    }
395
4.73M
  }
_ZN2yb6tablet22TransactionParticipant4Impl22CleanTransactionsQueueINSt3__15dequeINS2_25GracefulCleanupQueueEntryENS4_9allocatorIS6_EEEEEEvPT_PNS0_18MinRunningNotifierE
Line
Count
Source
364
2.78M
      Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
365
2.57M
    int64_t min_request = running_requests_.empty() ? std::numeric_limits<int64_t>::max()
366
206k
                                                    : running_requests_.front();
367
2.78M
    HybridTime safe_time;
368
2.96M
    while (!queue->empty()) {
369
250k
      const auto& front = queue->front();
370
250k
      if (front.request_id >= min_request) {
371
6.55k
        break;
372
6.55k
      }
373
243k
      if (!front.Ready(&participant_context_, &safe_time)) {
374
62.7k
        break;
375
62.7k
      }
376
180k
      const auto& id = front.transaction_id;
377
180k
      RemoveIntentsData checkpoint;
378
180k
      auto it = transactions_.find(id);
379
380
180k
      if (it != transactions_.end() && !(**it).ProcessingApply()) {
381
130k
        OpId op_id = (**it).GetOpId();
382
130k
        participant_context_.GetLastCDCedData(&checkpoint);
383
18.4E
        VLOG_WITH_PREFIX(2) << "Cleaning tx opid is " << op_id.ToString()
384
18.4E
                            << " checkpoint opid is " << checkpoint.op_id.ToString();
385
386
130k
        if (checkpoint.op_id < op_id) {
387
0
          break;
388
0
        }
389
130k
        (**it).ScheduleRemoveIntents(*it);
390
130k
        RemoveTransaction(it, front.reason, min_running_notifier);
391
130k
      }
392
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id;
393
180k
      queue->pop_front();
394
180k
    }
395
2.78M
  }
_ZN2yb6tablet22TransactionParticipant4Impl22CleanTransactionsQueueINSt3__15dequeINS2_26ImmediateCleanupQueueEntryENS4_9allocatorIS6_EEEEEEvPT_PNS0_18MinRunningNotifierE
Line
Count
Source
364
1.94M
      Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
365
1.75M
    int64_t min_request = running_requests_.empty() ? std::numeric_limits<int64_t>::max()
366
190k
                                                    : running_requests_.front();
367
1.94M
    HybridTime safe_time;
368
2.06M
    while (!queue->empty()) {
369
168k
      const auto& front = queue->front();
370
168k
      if (front.request_id >= min_request) {
371
56.0k
        break;
372
56.0k
      }
373
112k
      if (!front.Ready(&participant_context_, &safe_time)) {
374
0
        break;
375
0
      }
376
112k
      const auto& id = front.transaction_id;
377
112k
      RemoveIntentsData checkpoint;
378
112k
      auto it = transactions_.find(id);
379
380
112k
      if (it != transactions_.end() && !(**it).ProcessingApply()) {
381
108k
        OpId op_id = (**it).GetOpId();
382
108k
        participant_context_.GetLastCDCedData(&checkpoint);
383
18.4E
        VLOG_WITH_PREFIX(2) << "Cleaning tx opid is " << op_id.ToString()
384
18.4E
                            << " checkpoint opid is " << checkpoint.op_id.ToString();
385
386
108k
        if (checkpoint.op_id < op_id) {
387
190
          break;
388
190
        }
389
108k
        (**it).ScheduleRemoveIntents(*it);
390
108k
        RemoveTransaction(it, front.reason, min_running_notifier);
391
108k
      }
392
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id;
393
111k
      queue->pop_front();
394
111k
    }
395
1.94M
  }
396
397
38.7k
  void Abort(const TransactionId& id, TransactionStatusCallback callback) {
398
    // We are not trying to cleanup intents here because we don't know whether this transaction
399
    // has intents of not.
400
38.7k
    auto lock_and_iterator = LockAndFind(
401
38.7k
        id, "abort"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
402
38.7k
    if (!lock_and_iterator.found()) {
403
20
      callback(STATUS_FORMAT(NotFound, "Abort of unknown transaction: $0", id));
404
20
      return;
405
20
    }
406
38.7k
    auto client_result = client();
407
38.7k
    if (!client_result.ok()) {
408
0
      callback(client_result.status());
409
0
      return;
410
0
    }
411
38.7k
    lock_and_iterator.transaction().Abort(
412
38.7k
        *client_result, std::move(callback), &lock_and_iterator.lock);
413
38.7k
  }
414
415
129k
  CHECKED_STATUS CheckAborted(const TransactionId& id) {
416
    // We are not trying to cleanup intents here because we don't know whether this transaction
417
    // has intents of not.
418
129k
    auto lock_and_iterator = LockAndFind(id, "check aborted"s, TransactionLoadFlags{});
419
129k
    if (!lock_and_iterator.found()) {
420
0
      return MakeAbortedStatus(id);
421
0
    }
422
129k
    return lock_and_iterator.transaction().CheckAborted();
423
129k
  }
424
425
  void FillPriorities(
426
85.0k
      boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) {
427
    // TODO(dtxn) optimize locking
428
99.5k
    for (auto& pair : *inout) {
429
99.5k
      auto lock_and_iterator = LockAndFind(
430
99.5k
          pair.first, "fill priorities"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
431
99.5k
      if (!lock_and_iterator.found() || lock_and_iterator.transaction().WasAborted()) {
432
351
        pair.second = 0; // Minimal priority for already aborted transactions
433
99.2k
      } else {
434
99.2k
        pair.second = lock_and_iterator.transaction().metadata().priority;
435
99.2k
      }
436
99.5k
    }
437
85.0k
  }
438
439
794k
  void Handle(std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term) {
440
794k
    auto txn_status = operation->request()->status();
441
794k
    if (txn_status == TransactionStatus::APPLYING) {
442
245k
      HandleApplying(std::move(operation), term);
443
245k
      return;
444
245k
    }
445
446
548k
    if (txn_status == TransactionStatus::IMMEDIATE_CLEANUP ||
447
548k
        txn_status == TransactionStatus::GRACEFUL_CLEANUP) {
448
548k
      auto cleanup_type = txn_status == TransactionStatus::IMMEDIATE_CLEANUP
449
366k
          ? CleanupType::kImmediate
450
181k
          : CleanupType::kGraceful;
451
548k
      HandleCleanup(std::move(operation), term, cleanup_type);
452
548k
      return;
453
548k
    }
454
455
467
    auto error_status = STATUS_FORMAT(
456
467
        InvalidArgument, "Unexpected status in transaction participant Handle: $0", *operation);
457
467
    LOG_WITH_PREFIX(DFATAL) << error_status;
458
467
    operation->CompleteWithStatus(error_status);
459
467
  }
460
461
737k
  CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) {
462
737k
    if (FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms > 0) {
463
0
      SleepFor(1ms * FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms);
464
0
    }
465
466
737k
    auto id = FullyDecodeTransactionId(data.state.transaction_id());
467
737k
    if (!id.ok()) {
468
0
      return id.status();
469
0
    }
470
471
737k
    if (data.state.status() == TransactionStatus::APPLYING) {
472
736k
      return ReplicatedApplying(*id, data);
473
370
    } else if (data.state.status() == TransactionStatus::ABORTED) {
474
0
      return ReplicatedAborted(*id, data);
475
0
    }
476
477
370
    auto status = STATUS_FORMAT(
478
370
        InvalidArgument, "Unexpected status in transaction participant ProcessReplicated: $0, $1",
479
370
        data.op_id, data.state);
480
370
    LOG_WITH_PREFIX(DFATAL) << status;
481
370
    return status;
482
370
  }
483
484
0
  void Cleanup(TransactionIdSet&& set, TransactionStatusManager* status_manager) {
485
0
    auto cleanup_aborts_task = std::make_shared<CleanupAbortsTask>(
486
0
        &applier_, std::move(set), &participant_context_, status_manager, LogPrefix());
487
0
    cleanup_aborts_task->Prepare(cleanup_aborts_task);
488
0
    participant_context_.StrandEnqueue(cleanup_aborts_task.get());
489
0
  }
490
491
736k
  CHECKED_STATUS ProcessApply(const TransactionApplyData& data) {
492
18.4E
    VLOG_WITH_PREFIX(2) << "Apply: " << data.ToString();
493
494
736k
    loader_.WaitLoaded(data.transaction_id);
495
496
736k
    ScopedRWOperation operation(pending_op_counter_);
497
736k
    if (!operation.ok()) {
498
0
      LOG_WITH_PREFIX(WARNING) << "Process apply rejected";
499
0
      return Status::OK();
500
0
    }
501
502
736k
    bool was_applied = false;
503
504
736k
    {
505
      // It is our last chance to load transaction metadata, if missing.
506
      // Because it will be deleted when intents are applied.
507
      // We are not trying to cleanup intents here because we don't know whether this transaction
508
      // has intents of not.
509
736k
      auto lock_and_iterator = LockAndFind(
510
736k
          data.transaction_id, "pre apply"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
511
736k
      if (!lock_and_iterator.found()) {
512
        // This situation is normal and could be caused by 2 scenarios:
513
        // 1) Write batch failed, but originator doesn't know that.
514
        // 2) Failed to notify status tablet that we applied transaction.
515
4
        YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1)
516
4
            << Format("Apply of unknown transaction: $0", data);
517
7
        NotifyApplied(data);
518
7
        CHECK(!FLAGS_TEST_fail_in_apply_if_no_metadata);
519
7
        return Status::OK();
520
7
      }
521
522
736k
      auto existing_commit_ht = lock_and_iterator.transaction().local_commit_time();
523
736k
      if (existing_commit_ht) {
524
0
        was_applied = true;
525
0
        LOG_WITH_PREFIX(INFO) << "Transaction already applied: " << data.transaction_id;
526
0
        LOG_IF_WITH_PREFIX(DFATAL, data.commit_ht != existing_commit_ht)
527
0
            << "Transaction was previously applied with another commit ht: " << existing_commit_ht
528
0
            << ", new commit ht: " << data.commit_ht;
529
736k
      } else {
530
737k
        transactions_.modify(lock_and_iterator.iterator, [&data](auto& txn) {
531
737k
          txn->SetLocalCommitData(data.commit_ht, data.aborted);
532
737k
        });
533
534
18.4E
        LOG_IF_WITH_PREFIX(DFATAL, data.log_ht < last_safe_time_)
535
18.4E
            << "Apply transaction before last safe time " << data.transaction_id
536
18.4E
            << ": " << data.log_ht << " vs " << last_safe_time_;
537
736k
      }
538
736k
    }
539
540
736k
    if (!was_applied) {
541
736k
      auto apply_state = CHECK_RESULT(applier_.ApplyIntents(data));
542
543
18.4E
      VLOG_WITH_PREFIX(4) << "TXN: " << data.transaction_id << ": apply state: "
544
18.4E
                          << apply_state.ToString();
545
546
736k
      UpdateAppliedTransaction(data, apply_state, &operation);
547
736k
    }
548
549
736k
    NotifyApplied(data);
550
736k
    return Status::OK();
551
736k
  }
552
553
  void UpdateAppliedTransaction(
554
       const TransactionApplyData& data,
555
       const docdb::ApplyTransactionState& apply_state,
556
736k
       ScopedRWOperation* operation) NO_THREAD_SAFETY_ANALYSIS {
557
736k
    MinRunningNotifier min_running_notifier(&applier_);
558
    // We are not trying to cleanup intents here because we don't know whether this transaction
559
    // has intents or not.
560
736k
    auto lock_and_iterator = LockAndFind(
561
736k
        data.transaction_id, "apply"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
562
737k
    if (lock_and_iterator.found()) {
563
737k
      lock_and_iterator.transaction().SetOpId(data.op_id);
564
737k
      if (!apply_state.active()) {
565
737k
        RemoveUnlocked(lock_and_iterator.iterator, RemoveReason::kApplied, &min_running_notifier);
566
433
      } else {
567
433
        lock_and_iterator.transaction().SetApplyData(apply_state, &data, operation);
568
433
      }
569
737k
    }
570
736k
  }
571
572
737k
  void NotifyApplied(const TransactionApplyData& data) {
573
973
    VLOG_WITH_PREFIX(4) << Format("NotifyApplied($0)", data);
574
575
737k
    if (data.leader_term != OpId::kUnknownTerm) {
576
246k
      tserver::UpdateTransactionRequestPB req;
577
246k
      req.set_tablet_id(data.status_tablet);
578
246k
      req.set_propagated_hybrid_time(participant_context_.Now().ToUint64());
579
246k
      auto& state = *req.mutable_state();
580
246k
      state.set_transaction_id(data.transaction_id.data(), data.transaction_id.size());
581
246k
      state.set_status(TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS);
582
246k
      state.add_tablets(participant_context_.tablet_id());
583
246k
      auto client_result = client();
584
246k
      if (!client_result.ok()) {
585
0
        LOG_WITH_PREFIX(WARNING) << "Get client failed: " << client_result.status();
586
0
        return;
587
0
      }
588
589
246k
      auto handle = rpcs_.Prepare();
590
246k
      if (handle != rpcs_.InvalidHandle()) {
591
244k
        *handle = UpdateTransaction(
592
244k
            TransactionRpcDeadline(),
593
244k
            nullptr /* remote_tablet */,
594
244k
            *client_result,
595
244k
            &req,
596
244k
            [this, handle](const Status& status,
597
244k
                           const tserver::UpdateTransactionRequestPB& req,
598
246k
                           const tserver::UpdateTransactionResponsePB& resp) {
599
246k
              client::UpdateClock(resp, &participant_context_);
600
246k
              rpcs_.Unregister(handle);
601
34
              LOG_IF_WITH_PREFIX(WARNING, !status.ok()) << "Failed to send applied: " << status;
602
246k
            });
603
244k
        (**handle).SendRpc();
604
244k
      }
605
246k
    }
606
737k
  }
607
608
548k
  CHECKED_STATUS ProcessCleanup(const TransactionApplyData& data, CleanupType cleanup_type) {
609
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(data) << ", " << AsString(cleanup_type);
610
611
548k
    loader_.WaitLoaded(data.transaction_id);
612
613
548k
    MinRunningNotifier min_running_notifier(&applier_);
614
548k
    std::lock_guard<std::mutex> lock(mutex_);
615
548k
    auto it = transactions_.find(data.transaction_id);
616
548k
    if (it == transactions_.end()) {
617
343k
      if (cleanup_type == CleanupType::kImmediate) {
618
318k
        cleanup_cache_.Insert(data.transaction_id);
619
318k
        return Status::OK();
620
318k
      }
621
204k
    } else if ((**it).ProcessingApply()) {
622
0
      VLOG_WITH_PREFIX(2) << "Don't cleanup transaction because it is applying intents: "
623
0
                          << data.transaction_id;
624
0
      return Status::OK();
625
0
    }
626
627
230k
    if (cleanup_type == CleanupType::kGraceful) {
628
181k
      graceful_cleanup_queue_.push_back(GracefulCleanupQueueEntry{
629
181k
        .request_id = request_serial_,
630
181k
        .transaction_id = data.transaction_id,
631
181k
        .reason = RemoveReason::kProcessCleanup,
632
181k
        .required_safe_time = participant_context_.Now(),
633
181k
      });
634
181k
      return Status::OK();
635
181k
    }
636
637
48.6k
    if (!RemoveUnlocked(it, RemoveReason::kProcessCleanup, &min_running_notifier)) {
638
0
      VLOG_WITH_PREFIX(2) << "Have added aborted txn to cleanup queue: "
639
0
                          << data.transaction_id;
640
13.7k
    }
641
642
48.6k
    return Status::OK();
643
48.6k
  }
644
645
  void SetDB(
646
      const docdb::DocDB& db, const docdb::KeyBounds* key_bounds,
647
107k
      RWOperationCounter* pending_op_counter) {
648
107k
    bool had_db = db_.intents != nullptr;
649
107k
    db_ = db;
650
107k
    key_bounds_ = key_bounds;
651
107k
    pending_op_counter_ = pending_op_counter;
652
653
    // We should only load transactions on the initial call to SetDB (when opening the tablet), not
654
    // in case of truncate/restore.
655
107k
    if (!had_db) {
656
25.8k
      loader_.Start(pending_op_counter, db_);
657
25.8k
      return;
658
25.8k
    }
659
660
82.1k
    loader_.WaitAllLoaded();
661
82.1k
    MinRunningNotifier min_running_notifier(&applier_);
662
82.1k
    std::lock_guard<std::mutex> lock(mutex_);
663
82.1k
    transactions_.clear();
664
82.1k
    TransactionsModifiedUnlocked(&min_running_notifier);
665
82.1k
  }
666
667
  void GetStatus(
668
      const TransactionId& transaction_id,
669
      size_t required_num_replicated_batches,
670
      int64_t term,
671
      tserver::GetTransactionStatusAtParticipantResponsePB* response,
672
0
      rpc::RpcContext* context) {
673
0
    std::lock_guard<std::mutex> lock(mutex_);
674
0
    auto it = transactions_.find(transaction_id);
675
0
    if (it == transactions_.end()) {
676
0
      response->set_num_replicated_batches(0);
677
0
      response->set_status_hybrid_time(0);
678
0
    } else {
679
0
      if ((**it).WasAborted()) {
680
0
        response->set_aborted(true);
681
0
        return;
682
0
      }
683
0
      response->set_num_replicated_batches((**it).num_replicated_batches());
684
0
      response->set_status_hybrid_time((**it).last_batch_data().hybrid_time.ToUint64());
685
0
    }
686
0
  }
687
688
24
  TransactionParticipantContext* participant_context() const {
689
24
    return &participant_context_;
690
24
  }
691
692
4.51M
  HybridTime MinRunningHybridTime() {
693
4.51M
    auto result = min_running_ht_.load(std::memory_order_acquire);
694
4.51M
    if (result == HybridTime::kMax || result == HybridTime::kInvalid) {
695
1.93M
      return result;
696
1.93M
    }
697
2.58M
    auto now = CoarseMonoClock::now();
698
2.58M
    auto current_next_check_min_running = next_check_min_running_.load(std::memory_order_relaxed);
699
2.58M
    if (now >= current_next_check_min_running) {
700
10.3k
      if (next_check_min_running_.compare_exchange_strong(
701
10.3k
              current_next_check_min_running,
702
10.3k
              now + 1ms * FLAGS_transaction_min_running_check_interval_ms,
703
10.2k
              std::memory_order_acq_rel)) {
704
10.2k
        std::unique_lock<std::mutex> lock(mutex_);
705
10.2k
        if (transactions_.empty()) {
706
0
          return HybridTime::kMax;
707
0
        }
708
10.2k
        auto& first_txn = **transactions_.get<StartTimeTag>().begin();
709
3
        VLOG_WITH_PREFIX(1) << "Checking status of long running min txn " << first_txn.id()
710
3
                            << ": " << first_txn.WasAborted();
711
10.2k
        static const std::string kRequestReason = "min running check"s;
712
        // Get transaction status
713
10.2k
        auto now_ht = participant_context_.Now();
714
10.2k
        StatusRequest status_request = {
715
10.2k
            .id = &first_txn.id(),
716
10.2k
            .read_ht = now_ht,
717
10.2k
            .global_limit_ht = now_ht,
718
            // Could use 0 here, because read_ht == global_limit_ht.
719
            // So we cannot accept status with time >= read_ht and < global_limit_ht.
720
10.2k
            .serial_no = 0,
721
10.2k
            .reason = &kRequestReason,
722
10.2k
            .flags = TransactionLoadFlags{},
723
10.2k
            .callback = [this, id = first_txn.id()](Result<TransactionStatusResult> result) {
724
              // Aborted status will result in cleanup of intents.
725
18.4E
              VLOG_WITH_PREFIX(1) << "Min running status " << id << ": " << result;
726
10.2k
            }
727
10.2k
        };
728
10.2k
        first_txn.RequestStatusAt(status_request, &lock);
729
10.2k
      }
730
10.3k
    }
731
2.58M
    return result;
732
2.58M
  }
733
734
31
  void WaitMinRunningHybridTime(HybridTime ht) {
735
31
    MinRunningNotifier min_running_notifier(&applier_);
736
31
    std::unique_lock<std::mutex> lock(mutex_);
737
31
    waiting_for_min_running_ht_ = ht;
738
31
    CheckMinRunningHybridTimeSatisfiedUnlocked(&min_running_notifier);
739
31
  }
740
741
0
  CHECKED_STATUS ResolveIntents(HybridTime resolve_at, CoarseTimePoint deadline) {
742
0
    RETURN_NOT_OK(WaitUntil(participant_context_.clock_ptr().get(), resolve_at, deadline));
743
744
0
    if (FLAGS_max_transactions_in_status_request == 0) {
745
0
      return STATUS(
746
0
          IllegalState,
747
0
          "Cannot resolve intents when FLAGS_max_transactions_in_status_request is zero");
748
0
    }
749
750
0
    std::vector<TransactionId> recheck_ids, committed_ids;
751
752
    // Maintain a set of transactions, check their statuses, and remove them as they get
753
    // committed/applied, aborted or we realize that transaction was not committed at
754
    // resolve_at.
755
0
    for (;;) {
756
0
      TransactionStatusResolver resolver(
757
0
          &participant_context_, &rpcs_, FLAGS_max_transactions_in_status_request,
758
0
          [this, resolve_at, &recheck_ids, &committed_ids](
759
0
              const std::vector <TransactionStatusInfo>& status_infos) {
760
0
            std::vector<TransactionId> aborted;
761
0
            for (const auto& info : status_infos) {
762
0
              VLOG_WITH_PREFIX(4) << "Transaction status: " << info.ToString();
763
0
              if (info.status == TransactionStatus::COMMITTED) {
764
0
                if (info.status_ht <= resolve_at) {
765
                  // Transaction was committed, but not yet applied.
766
                  // So rely on filtering recheck_ids before next phase.
767
0
                  committed_ids.push_back(info.transaction_id);
768
0
                }
769
0
              } else if (info.status == TransactionStatus::ABORTED) {
770
0
                aborted.push_back(info.transaction_id);
771
0
              } else {
772
0
                LOG_IF_WITH_PREFIX(DFATAL, info.status != TransactionStatus::PENDING)
773
0
                    << "Transaction is in unexpected state: " << info.ToString();
774
0
                if (info.status_ht <= resolve_at) {
775
0
                  recheck_ids.push_back(info.transaction_id);
776
0
                }
777
0
              }
778
0
            }
779
0
            if (!aborted.empty()) {
780
0
              MinRunningNotifier min_running_notifier(&applier_);
781
0
              std::lock_guard<std::mutex> lock(mutex_);
782
0
              for (const auto& id : aborted) {
783
0
                EnqueueRemoveUnlocked(id, RemoveReason::kStatusReceived, &min_running_notifier);
784
0
              }
785
0
            }
786
0
          });
787
0
      auto se = ScopeExit([&resolver] {
788
0
        resolver.Shutdown();
789
0
      });
790
0
      {
791
0
        std::lock_guard <std::mutex> lock(mutex_);
792
0
        if (recheck_ids.empty() && committed_ids.empty()) {
793
          // First step, check all transactions.
794
0
          for (const auto& transaction : transactions_) {
795
0
            if (!transaction->local_commit_time().is_valid()) {
796
0
              resolver.Add(transaction->metadata().status_tablet, transaction->id());
797
0
            }
798
0
          }
799
0
        } else {
800
0
          for (const auto& id : recheck_ids) {
801
0
            auto it = transactions_.find(id);
802
0
            if (it == transactions_.end() || (**it).local_commit_time().is_valid()) {
803
0
              continue;
804
0
            }
805
0
            resolver.Add((**it).metadata().status_tablet, id);
806
0
          }
807
0
          auto filter = [this](const TransactionId& id) {
808
0
            auto it = transactions_.find(id);
809
0
            return it == transactions_.end() || (**it).local_commit_time().is_valid();
810
0
          };
811
0
          committed_ids.erase(std::remove_if(committed_ids.begin(), committed_ids.end(), filter),
812
0
                              committed_ids.end());
813
0
        }
814
0
      }
815
816
0
      recheck_ids.clear();
817
0
      resolver.Start(deadline);
818
819
0
      RETURN_NOT_OK(resolver.ResultFuture().get());
820
821
0
      if (recheck_ids.empty()) {
822
0
        if (committed_ids.empty()) {
823
0
          break;
824
0
        } else {
825
          // We are waiting only for committed transactions to be applied.
826
          // So just add some delay.
827
0
          std::this_thread::sleep_for(10ms * std::min<size_t>(10, committed_ids.size()));
828
0
        }
829
0
      }
830
0
    }
831
832
0
    return Status::OK();
833
0
  }
834
835
0
  size_t TEST_GetNumRunningTransactions() {
836
0
    std::lock_guard<std::mutex> lock(mutex_);
837
0
    auto txn_to_id = [](const RunningTransactionPtr& txn) {
838
0
      return txn->id();
839
0
    };
840
0
    VLOG_WITH_PREFIX(4) << "Transactions: " << AsString(transactions_, txn_to_id)
841
0
                        << ", requests: " << AsString(running_requests_);
842
0
    return transactions_.size();
843
0
  }
844
845
0
  OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) {
846
0
    std::lock_guard<std::mutex> lock(mutex_);
847
0
    auto it = transactions_.find(id);
848
0
    return it != transactions_.end() ? (**it).replicated_batches() : OneWayBitmap();
849
0
  }
850
851
0
  std::string DumpTransactions() {
852
0
    std::string result;
853
0
    std::lock_guard<std::mutex> lock(mutex_);
854
855
0
    result += Format(
856
0
        "{ safe_time_for_participant: $0 remove_queue_size: $1 ",
857
0
        participant_context_.SafeTimeForTransactionParticipant(), remove_queue_.size());
858
0
    if (!remove_queue_.empty()) {
859
0
      result += "remove_queue_front: " + AsString(remove_queue_.front());
860
0
    }
861
0
    if (!running_requests_.empty()) {
862
0
      result += "running_requests_front: " + AsString(running_requests_.front());
863
0
    }
864
0
    result += "}\n";
865
866
0
    for (const auto& txn : transactions_.get<StartTimeTag>()) {
867
0
      result += txn->ToString();
868
0
      result += "\n";
869
0
    }
870
0
    return result;
871
0
  }
872
873
  CHECKED_STATUS StopActiveTxnsPriorTo(
874
10.2k
      HybridTime cutoff, CoarseTimePoint deadline, TransactionId* exclude_txn_id) {
875
10.2k
    vector<TransactionId> ids_to_abort;
876
10.2k
    {
877
10.2k
      std::lock_guard<std::mutex> lock(mutex_);
878
97
      for (const auto& txn : transactions_.get<StartTimeTag>()) {
879
97
        if (txn->start_ht() > cutoff ||
880
97
            (exclude_txn_id != nullptr && txn->id() == *exclude_txn_id)) {
881
0
          break;
882
0
        }
883
97
        if (!txn->WasAborted()) {
884
97
          ids_to_abort.push_back(txn->id());
885
97
        }
886
97
      }
887
10.2k
    }
888
889
10.2k
    if (ids_to_abort.empty()) {
890
10.1k
      return Status::OK();
891
10.1k
    }
892
893
    // It is ok to attempt to abort txns that have committed. We don't care
894
    // if our request succeeds or not.
895
96
    CountDownLatch latch(ids_to_abort.size());
896
96
    std::atomic<bool> failed{false};
897
96
    Status return_status = Status::OK();
898
97
    for (const auto& id : ids_to_abort) {
899
97
      Abort(
900
97
          id, [this, id, &failed, &return_status, &latch](Result<TransactionStatusResult> result) {
901
0
            VLOG_WITH_PREFIX(2) << "Aborting " << id << " got " << result;
902
97
            if (!result ||
903
97
                (result->status != TransactionStatus::COMMITTED && result->status != ABORTED)) {
904
0
              LOG(INFO) << "Could not abort " << id << " got " << result;
905
906
0
              bool expected = false;
907
0
              if (failed.compare_exchange_strong(expected, true)) {
908
0
                if (!result) {
909
0
                  return_status = result.status();
910
0
                } else {
911
0
                  return_status =
912
0
                      STATUS_FORMAT(IllegalState, "Wrong status after abort: $0", result->status);
913
0
                }
914
0
              }
915
0
            }
916
97
            latch.CountDown();
917
97
          });
918
97
    }
919
920
85
    return latch.WaitUntil(deadline) ? return_status
921
11
                                     : STATUS(TimedOut, "TimedOut while aborting old transactions");
922
96
  }
923
924
10.2k
  Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) {
925
10.2k
    return participant_context_.WaitForSafeTime(safe_time, deadline);
926
10.2k
  }
927
928
0
  void IgnoreAllTransactionsStartedBefore(HybridTime limit) {
929
0
    std::lock_guard<std::mutex> lock(mutex_);
930
0
    ignore_all_transactions_started_before_ =
931
0
        std::max(ignore_all_transactions_started_before_, limit);
932
0
  }
933
934
 private:
935
  class AbortCheckTimeTag;
936
  class StartTimeTag;
937
938
  typedef boost::multi_index_container<RunningTransactionPtr,
939
      boost::multi_index::indexed_by <
940
          boost::multi_index::hashed_unique <
941
              boost::multi_index::const_mem_fun <
942
                  RunningTransaction, const TransactionId&, &RunningTransaction::id>
943
          >,
944
          boost::multi_index::ordered_non_unique <
945
              boost::multi_index::tag<StartTimeTag>,
946
              boost::multi_index::const_mem_fun <
947
                  RunningTransaction, HybridTime, &RunningTransaction::start_ht>
948
          >,
949
          boost::multi_index::ordered_non_unique <
950
              boost::multi_index::tag<AbortCheckTimeTag>,
951
              boost::multi_index::const_mem_fun <
952
                  RunningTransaction, HybridTime, &RunningTransaction::abort_check_ht>
953
          >
954
      >
955
  > Transactions;
956
957
25.8k
  void CompleteLoad(const std::function<void()>& functor) override {
958
25.8k
    MinRunningNotifier min_running_notifier(&applier_);
959
25.8k
    std::lock_guard<std::mutex> lock(mutex_);
960
25.8k
    functor();
961
25.8k
    TransactionsModifiedUnlocked(&min_running_notifier);
962
25.8k
  }
963
964
25.8k
  void LoadFinished(const ApplyStatesMap& pending_applies) override {
965
25.8k
    start_latch_.Wait();
966
25.8k
    std::vector<ScopedRWOperation> operations;
967
25.8k
    operations.reserve(pending_applies.size());
968
25.8k
    for (;;) {
969
25.8k
      if (closing_.load(std::memory_order_acquire)) {
970
0
        LOG_WITH_PREFIX(INFO)
971
0
            << __func__ << ": closing, not starting transaction status resolution";
972
0
        return;
973
0
      }
974
25.8k
      while (operations.size() < pending_applies.size()) {
975
0
        ScopedRWOperation operation(pending_op_counter_);
976
0
        if (!operation.ok()) {
977
0
          break;
978
0
        }
979
0
        operations.push_back(std::move(operation));
980
0
      }
981
25.8k
      if (operations.size() == pending_applies.size()) {
982
25.8k
        break;
983
25.8k
      }
984
2
      operations.clear();
985
0
      YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 5)
986
0
          << __func__ << ": unable to start scoped RW operation";
987
2
      std::this_thread::sleep_for(10ms);
988
2
    }
989
990
25.8k
    if (!pending_applies.empty()) {
991
0
      LOG_WITH_PREFIX(INFO)
992
0
          << __func__ << ": starting " << pending_applies.size() << " pending applies";
993
0
      std::lock_guard<std::mutex> lock(mutex_);
994
0
      size_t idx = 0;
995
0
      for (const auto& p : pending_applies) {
996
0
        auto it = transactions_.find(p.first);
997
0
        if (it == transactions_.end()) {
998
0
          LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first);
999
0
          continue;
1000
0
        }
1001
1002
0
        TransactionApplyData apply_data;
1003
0
        apply_data.transaction_id = p.first;
1004
0
        apply_data.commit_ht = p.second.commit_ht;
1005
0
        (**it).SetApplyData(p.second.state, &apply_data, &operations[idx]);
1006
0
        ++idx;
1007
0
      }
1008
0
    }
1009
1010
25.8k
    {
1011
25.8k
      LOG_WITH_PREFIX(INFO) << __func__ << ": starting transaction status resolution";
1012
25.8k
      std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
1013
4
      for (auto& status_resolver : status_resolvers_) {
1014
4
        status_resolver.Start(CoarseTimePoint::max());
1015
4
      }
1016
25.8k
    }
1017
1018
25.8k
    poller_.Start(
1019
25.8k
        &participant_context_.scheduler(), 1ms * FLAGS_transactions_status_poll_interval_ms);
1020
25.8k
  }
1021
1022
2.06M
  void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
1023
2.06M
    metric_transactions_running_->set_value(transactions_.size());
1024
2.06M
    if (!loader_.complete()) {
1025
4
      return;
1026
4
    }
1027
1028
2.06M
    if (transactions_.empty()) {
1029
527k
      min_running_ht_.store(HybridTime::kMax, std::memory_order_release);
1030
527k
      CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier);
1031
527k
      return;
1032
527k
    }
1033
1034
1.53M
    auto& first_txn = **transactions_.get<StartTimeTag>().begin();
1035
1.53M
    if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) {
1036
727k
      min_running_ht_.store(first_txn.start_ht(), std::memory_order_release);
1037
727k
      next_check_min_running_.store(
1038
727k
          CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms,
1039
727k
          std::memory_order_release);
1040
727k
      CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier);
1041
727k
      return;
1042
727k
    }
1043
1.53M
  }
1044
1045
  void EnqueueRemoveUnlocked(
1046
      const TransactionId& id, RemoveReason reason,
1047
55.1k
      MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) override {
1048
55.1k
    auto now = participant_context_.Now();
1049
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << id << " at " << now << ", reason: " << AsString(reason);
1050
55.1k
    remove_queue_.emplace_back(RemoveQueueEntry{
1051
55.1k
      .id = id,
1052
55.1k
      .time = now,
1053
55.1k
      .reason = reason,
1054
55.1k
    });
1055
55.1k
    ProcessRemoveQueueUnlocked(min_running_notifier);
1056
55.1k
  }
1057
1058
3.11M
  void ProcessRemoveQueueUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
1059
3.11M
    if (!remove_queue_.empty()) {
1060
      // When a transaction participant receives an "aborted" response from the coordinator,
1061
      // it puts this transaction into a "remove queue", also storing the current hybrid
1062
      // time. Then queue entries where time is less than current safe time are removed.
1063
      //
1064
      // This is correct because, from a transaction participant's point of view:
1065
      //
1066
      // (1) After we receive a response for a transaction status request, and
1067
      // learn that the transaction is unknown to the coordinator, our local
1068
      // hybrid time is at least as high as the local hybrid time on the
1069
      // transaction status coordinator at the time the transaction was deleted
1070
      // from the coordinator, due to hybrid time propagation on RPC response.
1071
      //
1072
      // (2) If our safe time is greater than the hybrid time when the
1073
      // transaction was deleted from the coordinator, then we have already
1074
      // applied this transaction's provisional records if the transaction was
1075
      // committed.
1076
110k
      auto safe_time = participant_context_.SafeTimeForTransactionParticipant();
1077
110k
      if (!safe_time.is_valid()) {
1078
0
        VLOG_WITH_PREFIX(3) << "Unable to obtain safe time to check remove queue";
1079
0
        return;
1080
0
      }
1081
110k
      VLOG_WITH_PREFIX(3) << "Checking remove queue: " << safe_time << ", "
1082
2
                          << remove_queue_.front().ToString();
1083
1
      LOG_IF_WITH_PREFIX(DFATAL, safe_time < last_safe_time_)
1084
1
          << "Safe time decreased: " << safe_time << " vs " << last_safe_time_;
1085
110k
      last_safe_time_ = safe_time;
1086
166k
      while (!remove_queue_.empty()) {
1087
112k
        auto& front = remove_queue_.front();
1088
112k
        auto it = transactions_.find(front.id);
1089
112k
        if (it == transactions_.end() || (**it).local_commit_time().is_valid()) {
1090
          // It is regular case, since the coordinator returns ABORTED for already applied
1091
          // transaction. But this particular tablet could not yet apply it, so
1092
          // it would add such transaction to remove queue.
1093
          // And it is the main reason why we are waiting for safe time, before removing intents.
1094
0
          VLOG_WITH_PREFIX(4) << "Evicting txn from remove queue, w/o removing intents: "
1095
0
                              << front.ToString();
1096
1.02k
          remove_queue_.pop_front();
1097
1.02k
          continue;
1098
1.02k
        }
1099
111k
        if (safe_time <= front.time) {
1100
57.1k
          break;
1101
57.1k
        }
1102
54.1k
        VLOG_WITH_PREFIX(4) << "Removing from remove queue: " << front.ToString();
1103
54.1k
        RemoveUnlocked(front.id, front.reason, min_running_notifier);
1104
54.1k
        remove_queue_.pop_front();
1105
54.1k
      }
1106
110k
    }
1107
3.11M
  }
1108
1109
  // Tries to remove transaction with specified id.
1110
  // Returns true if transaction is not exists after call to this method, otherwise returns false.
1111
  // Which means that transaction will be removed later.
1112
  bool RemoveUnlocked(
1113
      const TransactionId& id, RemoveReason reason,
1114
54.1k
      MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) override {
1115
54.1k
    auto it = transactions_.find(id);
1116
54.1k
    if (it == transactions_.end()) {
1117
0
      return true;
1118
0
    }
1119
54.1k
    return RemoveUnlocked(it, reason, min_running_notifier);
1120
54.1k
  }
1121
1122
  bool RemoveUnlocked(
1123
      const Transactions::iterator& it, RemoveReason reason,
1124
745k
      MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
1125
745k
    TransactionId txn_id = (**it).id();
1126
745k
    RemoveIntentsData checkpoint;
1127
745k
    auto itr = transactions_.find(txn_id);
1128
745k
    OpId op_id = (**itr).GetOpId();
1129
745k
    participant_context_.GetLastCDCedData(&checkpoint);
1130
1131
325
    VLOG_WITH_PREFIX(2) << "Cleaning tx, data opid is " << op_id.ToString()
1132
325
              << " checkpoint opid is " << checkpoint.op_id.ToString();
1133
1134
745k
    if (running_requests_.empty() &&
1135
633k
        (op_id < checkpoint.op_id)) {
1136
633k
      (**it).ScheduleRemoveIntents(*it);
1137
633k
      RemoveTransaction(it, reason, min_running_notifier);
1138
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned transaction: " << txn_id << ", reason: " << reason
1139
18.4E
                          << ", left: " << transactions_.size();
1140
633k
      return true;
1141
633k
    }
1142
1143
    // We cannot remove the transaction at this point, because there are running requests
1144
    // that are reading the provisional DB and could request status of this transaction.
1145
    // So we store transaction in a queue and wait when all requests that we launched before our
1146
    // attempt to remove this transaction are completed.
1147
    // Since we try to remove the transaction after all its records are removed from the provisional
1148
    // DB, it is safe to complete removal at this point, because it means that there will be no more
1149
    // queries to status of this transactions.
1150
112k
    immediate_cleanup_queue_.push_back(ImmediateCleanupQueueEntry{
1151
112k
      .request_id = request_serial_,
1152
112k
      .transaction_id = (**it).id(),
1153
112k
      .reason = reason,
1154
112k
    });
1155
628
    VLOG_WITH_PREFIX(2) << "Queued for cleanup: " << (**it).id() << ", reason: " << reason;
1156
112k
    return false;
1157
112k
  }
1158
1159
  struct LockAndFindResult {
1160
6.99k
    static Transactions::const_iterator UninitializedIterator() {
1161
6.99k
      static const Transactions empty_transactions;
1162
6.99k
      return empty_transactions.end();
1163
6.99k
    }
1164
1165
    std::unique_lock<std::mutex> lock;
1166
    Transactions::const_iterator iterator = UninitializedIterator();
1167
    bool recently_removed = false;
1168
1169
3.62M
    bool found() const {
1170
3.62M
      return lock.owns_lock();
1171
3.62M
    }
1172
1173
3.97M
    RunningTransaction& transaction() const {
1174
3.97M
      return **iterator;
1175
3.97M
    }
1176
  };
1177
1178
  LockAndFindResult LockAndFind(
1179
3.62M
      const TransactionId& id, const std::string& reason, TransactionLoadFlags flags) {
1180
3.62M
    loader_.WaitLoaded(id);
1181
3.62M
    bool recently_removed;
1182
3.62M
    {
1183
3.62M
      std::unique_lock<std::mutex> lock(mutex_);
1184
3.62M
      auto it = transactions_.find(id);
1185
3.62M
      if (it != transactions_.end()) {
1186
3.61M
        if ((**it).start_ht() <= ignore_all_transactions_started_before_) {
1187
0
          YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 1)
1188
0
              << "Ignore transaction for '" << reason << "' because of limit: "
1189
0
              << ignore_all_transactions_started_before_ << ", txn: " << AsString(**it);
1190
0
          return LockAndFindResult{};
1191
0
        }
1192
3.61M
        return LockAndFindResult{ std::move(lock), it };
1193
3.61M
      }
1194
5.89k
      recently_removed = WasTransactionRecentlyRemoved(id);
1195
5.89k
    }
1196
6.94k
    if (recently_removed) {
1197
0
      VLOG_WITH_PREFIX(1)
1198
0
          << "Attempt to load recently removed transaction: " << id << ", for: " << reason;
1199
6.94k
      LockAndFindResult result;
1200
6.94k
      result.recently_removed = true;
1201
6.94k
      return result;
1202
6.94k
    }
1203
18.4E
    metric_transaction_not_found_->Increment();
1204
18.4E
    if (flags.Test(TransactionLoadFlag::kMustExist)) {
1205
24
      YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1)
1206
24
          << "Transaction not found: " << id << ", for: " << reason;
1207
18.4E
    } else {
1208
0
      YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 1)
1209
0
          << "Transaction not found: " << id << ", for: " << reason;
1210
18.4E
    }
1211
18.4E
    if (flags.Test(TransactionLoadFlag::kCleanup)) {
1212
0
      VLOG_WITH_PREFIX(2) << "Schedule cleanup for: " << id;
1213
0
      auto cleanup_task = std::make_shared<CleanupIntentsTask>(
1214
0
          &participant_context_, &applier_, id);
1215
0
      cleanup_task->Prepare(cleanup_task);
1216
0
      participant_context_.StrandEnqueue(cleanup_task.get());
1217
0
    }
1218
18.4E
    return LockAndFindResult{};
1219
18.4E
  }
1220
1221
  void LoadTransaction(
1222
      TransactionMetadata&& metadata,
1223
      TransactionalBatchData&& last_batch_data,
1224
      OneWayBitmap&& replicated_batches,
1225
4
      const ApplyStateWithCommitHt* pending_apply) override {
1226
4
    MinRunningNotifier min_running_notifier(&applier_);
1227
4
    std::lock_guard<std::mutex> lock(mutex_);
1228
4
    auto txn = std::make_shared<RunningTransaction>(
1229
4
        std::move(metadata), std::move(last_batch_data), std::move(replicated_batches),
1230
4
        participant_context_.Now().AddDelta(1ms * FLAGS_transaction_abort_check_timeout_ms), this);
1231
4
    if (pending_apply) {
1232
0
      VLOG_WITH_PREFIX(4) << "Apply state found for " << txn->id() << ": "
1233
0
                          << pending_apply->ToString();
1234
0
      txn->SetLocalCommitData(pending_apply->commit_ht, pending_apply->state.aborted);
1235
0
      txn->SetApplyData(pending_apply->state);
1236
0
    }
1237
4
    transactions_.insert(txn);
1238
4
    TransactionsModifiedUnlocked(&min_running_notifier);
1239
4
  }
1240
1241
284k
  Result<client::YBClient*> client() const {
1242
284k
    auto cached_value = client_cache_.load(std::memory_order_acquire);
1243
284k
    if (cached_value != nullptr) {
1244
281k
      return cached_value;
1245
281k
    }
1246
2.91k
    auto future_status = participant_context_.client_future().wait_for(
1247
2.91k
        TransactionRpcTimeout().ToSteadyDuration());
1248
2.91k
    if (future_status != std::future_status::ready) {
1249
0
      return STATUS(TimedOut, "Client not ready");
1250
0
    }
1251
2.91k
    auto result = participant_context_.client_future().get();
1252
2.91k
    client_cache_.store(result, std::memory_order_release);
1253
2.91k
    return result;
1254
2.91k
  }
1255
1256
148k
  const std::string& LogPrefix() const override {
1257
148k
    return log_prefix_;
1258
148k
  }
1259
1260
  void RemoveTransaction(Transactions::iterator it, RemoveReason reason,
1261
                         MinRunningNotifier* min_running_notifier)
1262
872k
      REQUIRES(mutex_) {
1263
872k
    auto now = CoarseMonoClock::now();
1264
872k
    CleanupRecentlyRemovedTransactions(now);
1265
872k
    auto& transaction = **it;
1266
872k
    YB_TRANSACTION_DUMP(
1267
872k
        Remove, participant_context_.tablet_id(), transaction.id(), participant_context_.Now(),
1268
872k
        static_cast<uint8_t>(reason));
1269
872k
    recently_removed_transactions_cleanup_queue_.push_back({transaction.id(), now + 15s});
1270
229
    LOG_IF_WITH_PREFIX(DFATAL, !recently_removed_transactions_.insert(transaction.id()).second)
1271
229
        << "Transaction removed twice: " << transaction.id();
1272
391
    VLOG_WITH_PREFIX(4) << "Remove transaction: " << transaction.id();
1273
872k
    transactions_.erase(it);
1274
872k
    TransactionsModifiedUnlocked(min_running_notifier);
1275
872k
  }
1276
1277
2.44M
  void CleanupRecentlyRemovedTransactions(CoarseTimePoint now) {
1278
3.12M
    while (!recently_removed_transactions_cleanup_queue_.empty() &&
1279
3.10M
           recently_removed_transactions_cleanup_queue_.front().time <= now) {
1280
678k
      recently_removed_transactions_.erase(recently_removed_transactions_cleanup_queue_.front().id);
1281
678k
      recently_removed_transactions_cleanup_queue_.pop_front();
1282
678k
    }
1283
2.44M
  }
1284
1285
1.48M
  bool WasTransactionRecentlyRemoved(const TransactionId& id) {
1286
1.48M
    CleanupRecentlyRemovedTransactions(CoarseMonoClock::now());
1287
1.48M
    return recently_removed_transactions_.count(id) != 0;
1288
1.48M
  }
1289
1290
  void CheckMinRunningHybridTimeSatisfiedUnlocked(
1291
1.25M
      MinRunningNotifier* min_running_notifier) {
1292
1.25M
    if (min_running_ht_.load(std::memory_order_acquire) <= waiting_for_min_running_ht_) {
1293
1.25M
      return;
1294
1.25M
    }
1295
18.4E
    waiting_for_min_running_ht_ = HybridTime::kMax;
1296
18.4E
    min_running_notifier->Satisfied();
1297
18.4E
  }
1298
1299
  void TransactionsStatus(
1300
432
      const std::vector<TransactionStatusInfo>& status_infos) {
1301
432
    MinRunningNotifier min_running_notifier(&applier_);
1302
432
    std::lock_guard<std::mutex> lock(mutex_);
1303
432
    HybridTime now = participant_context_.Now();
1304
435
    for (const auto& info : status_infos) {
1305
435
      auto it = transactions_.find(info.transaction_id);
1306
435
      if (it == transactions_.end()) {
1307
2
        continue;
1308
2
      }
1309
433
      if ((**it).UpdateStatus(
1310
0
          info.status, info.status_ht, info.coordinator_safe_time, info.aborted_subtxn_set)) {
1311
0
        EnqueueRemoveUnlocked(
1312
0
            info.transaction_id, RemoveReason::kStatusReceived, &min_running_notifier);
1313
433
      } else {
1314
428
        transactions_.modify(it, [now](const auto& txn) {
1315
428
          txn->UpdateAbortCheckHT(now, UpdateAbortCheckHTMode::kStatusResponseReceived);
1316
428
        });
1317
433
      }
1318
433
    }
1319
432
  }
1320
1321
245k
  void HandleApplying(std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term) {
1322
245k
    if (RandomActWithProbability(GetAtomicFlag(
1323
0
        &FLAGS_TEST_transaction_ignore_applying_probability))) {
1324
0
      VLOG_WITH_PREFIX(2)
1325
0
          << "TEST: Rejected apply: "
1326
0
          << FullyDecodeTransactionId(operation->request()->transaction_id());
1327
0
      operation->CompleteWithStatus(Status::OK());
1328
0
      return;
1329
0
    }
1330
245k
    participant_context_.SubmitUpdateTransaction(std::move(operation), term);
1331
245k
  }
1332
1333
  void HandleCleanup(
1334
      std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term,
1335
548k
      CleanupType cleanup_type) {
1336
256
    VLOG_WITH_PREFIX(3) << "Cleanup";
1337
548k
    auto id = FullyDecodeTransactionId(operation->request()->transaction_id());
1338
548k
    if (!id.ok()) {
1339
0
      operation->CompleteWithStatus(id.status());
1340
0
      return;
1341
0
    }
1342
1343
548k
    TransactionApplyData data = {
1344
548k
        .leader_term = term,
1345
548k
        .transaction_id = *id,
1346
548k
        .aborted = AbortedSubTransactionSet(),
1347
548k
        .op_id = OpId(),
1348
548k
        .commit_ht = HybridTime(),
1349
548k
        .log_ht = HybridTime(),
1350
548k
        .sealed = operation->request()->sealed(),
1351
548k
        .status_tablet = std::string()
1352
548k
    };
1353
548k
    WARN_NOT_OK(ProcessCleanup(data, cleanup_type), "Process cleanup failed");
1354
548k
    operation->CompleteWithStatus(Status::OK());
1355
548k
  }
1356
1357
737k
  CHECKED_STATUS ReplicatedApplying(const TransactionId& id, const ReplicatedData& data) {
1358
    // data.state.tablets contains only status tablet.
1359
737k
    if (data.state.tablets_size() != 1) {
1360
0
      return STATUS_FORMAT(InvalidArgument,
1361
0
                           "Expected only one table during APPLYING, state received: $0",
1362
0
                           data.state);
1363
0
    }
1364
737k
    HybridTime commit_time(data.state.commit_hybrid_time());
1365
737k
    TransactionApplyData apply_data = {
1366
737k
        .leader_term = data.leader_term,
1367
737k
        .transaction_id = id,
1368
737k
        .aborted = VERIFY_RESULT(AbortedSubTransactionSet::FromPB(data.state.aborted().set())),
1369
737k
        .op_id = data.op_id,
1370
737k
        .commit_ht = commit_time,
1371
737k
        .log_ht = data.hybrid_time,
1372
737k
        .sealed = data.sealed,
1373
737k
        .status_tablet = data.state.tablets(0)
1374
737k
      };
1375
737k
    if (!data.already_applied_to_regular_db) {
1376
736k
      return ProcessApply(apply_data);
1377
736k
    }
1378
1.46k
    if (!data.sealed) {
1379
4
      return ProcessCleanup(apply_data, CleanupType::kImmediate);
1380
4
    }
1381
1.46k
    return Status::OK();
1382
1.46k
  }
1383
1384
0
  CHECKED_STATUS ReplicatedAborted(const TransactionId& id, const ReplicatedData& data) {
1385
0
    MinRunningNotifier min_running_notifier(&applier_);
1386
0
    std::lock_guard<std::mutex> lock(mutex_);
1387
0
    auto it = transactions_.find(id);
1388
0
    if (it == transactions_.end()) {
1389
0
      TransactionMetadata metadata = {
1390
0
        .transaction_id = id,
1391
0
        .isolation = IsolationLevel::NON_TRANSACTIONAL,
1392
0
        .status_tablet = TabletId(),
1393
0
        .priority = 0
1394
0
      };
1395
0
      it = transactions_.insert(std::make_shared<RunningTransaction>(
1396
0
          metadata, TransactionalBatchData(), OneWayBitmap(), HybridTime::kMax, this)).first;
1397
0
      TransactionsModifiedUnlocked(&min_running_notifier);
1398
0
    }
1399
1400
    // TODO(dtxn) store this fact to rocksdb.
1401
0
    (**it).Aborted();
1402
1403
0
    return Status::OK();
1404
0
  }
1405
1406
928k
  void Poll() {
1407
928k
    {
1408
928k
      MinRunningNotifier min_running_notifier(&applier_);
1409
928k
      std::lock_guard<std::mutex> lock(mutex_);
1410
1411
928k
      ProcessRemoveQueueUnlocked(&min_running_notifier);
1412
928k
      if (ANNOTATE_UNPROTECTED_READ(FLAGS_transactions_poll_check_aborted)) {
1413
914k
        CheckForAbortedTransactions();
1414
914k
      }
1415
928k
      CleanTransactionsQueue(&graceful_cleanup_queue_, &min_running_notifier);
1416
928k
    }
1417
928k
    CleanupStatusResolvers();
1418
928k
  }
1419
1420
920k
  void CheckForAbortedTransactions() REQUIRES(mutex_) {
1421
920k
    if (transactions_.empty()) {
1422
885k
      return;
1423
885k
    }
1424
34.7k
    auto now = participant_context_.Now();
1425
34.7k
    auto& index = transactions_.get<AbortCheckTimeTag>();
1426
34.7k
    TransactionStatusResolver* resolver = nullptr;
1427
27.9k
    for (;;) {
1428
27.9k
      auto& txn = **index.begin();
1429
27.9k
      if (txn.abort_check_ht() > now) {
1430
27.3k
        break;
1431
27.3k
      }
1432
646
      if (!resolver) {
1433
428
        resolver = &AddStatusResolver();
1434
428
      }
1435
646
      const auto& metadata = txn.metadata();
1436
219
      VLOG_WITH_PREFIX(4)
1437
219
          << "Check aborted: " << metadata.status_tablet << ", " << metadata.transaction_id;
1438
646
      resolver->Add(metadata.status_tablet, metadata.transaction_id);
1439
422
      index.modify(index.begin(), [now](const auto& txn) {
1440
422
        txn->UpdateAbortCheckHT(now, UpdateAbortCheckHTMode::kStatusRequestSent);
1441
422
      });
1442
646
    }
1443
1444
    // We don't introduce limit on number of status resolutions here, because we cannot predict
1445
    // transactions throughput. And we rely the logic that we cannot start multiple resolutions
1446
    // for single transaction because we set abort check hybrid time to the same value as
1447
    // status resolution deadline.
1448
34.7k
    if (resolver) {
1449
422
      resolver->Start(CoarseMonoClock::now() + 1ms * FLAGS_transaction_abort_check_timeout_ms);
1450
422
    }
1451
34.7k
  }
1452
1453
910k
  void CleanupStatusResolvers() EXCLUDES(status_resolvers_mutex_) {
1454
910k
    std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
1455
910k
    while (!status_resolvers_.empty() && !status_resolvers_.front().Running()) {
1456
429
      status_resolvers_.front().Shutdown();
1457
429
      status_resolvers_.pop_front();
1458
429
    }
1459
910k
  }
1460
1461
432
  TransactionStatusResolver& AddStatusResolver() override EXCLUDES(status_resolvers_mutex_) {
1462
432
    std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
1463
432
    status_resolvers_.emplace_back(
1464
432
        &participant_context_, &rpcs_, FLAGS_max_transactions_in_status_request,
1465
432
        std::bind(&Impl::TransactionsStatus, this, _1));
1466
432
    return status_resolvers_.back();
1467
432
  }
1468
1469
  struct ImmediateCleanupQueueEntry {
1470
    int64_t request_id;
1471
    TransactionId transaction_id;
1472
    RemoveReason reason;
1473
1474
114k
    bool Ready(TransactionParticipantContext* participant_context, HybridTime* safe_time) const {
1475
114k
      return true;
1476
114k
    }
1477
  };
1478
1479
  struct GracefulCleanupQueueEntry {
1480
    int64_t request_id;
1481
    TransactionId transaction_id;
1482
    RemoveReason reason;
1483
    HybridTime required_safe_time;
1484
1485
244k
    bool Ready(TransactionParticipantContext* participant_context, HybridTime* safe_time) const {
1486
244k
      if (!*safe_time) {
1487
210k
        *safe_time = participant_context->SafeTimeForTransactionParticipant();
1488
210k
      }
1489
244k
      return *safe_time >= required_safe_time;
1490
244k
    }
1491
  };
1492
1493
  std::string log_prefix_;
1494
1495
  docdb::DocDB db_;
1496
  const docdb::KeyBounds* key_bounds_;
1497
  // Owned externally, should be guaranteed that would not be destroyed before this.
1498
  RWOperationCounter* pending_op_counter_ = nullptr;
1499
1500
  Transactions transactions_;
1501
  // Ids of running requests, stored in increasing order.
1502
  std::deque<int64_t> running_requests_;
1503
  // Ids of complete requests, minimal request is on top.
1504
  // Contains only ids greater than first running request id, otherwise entry is removed
1505
  // from both collections.
1506
  std::priority_queue<int64_t, std::vector<int64_t>, std::greater<void>> complete_requests_;
1507
1508
  // Queues of transaction ids that should be cleaned, paired with request that should be completed
1509
  // in order to be able to do clean.
1510
  // Immediate cleanup is performed as soon as possible.
1511
  // Graceful cleanup is performed after safe time becomes greater than cleanup request hybrid time.
1512
  std::deque<ImmediateCleanupQueueEntry> immediate_cleanup_queue_ GUARDED_BY(mutex_);
1513
  std::deque<GracefulCleanupQueueEntry> graceful_cleanup_queue_ GUARDED_BY(mutex_);
1514
1515
  // Remove queue maintains transactions that could be cleaned when safe time for follower reaches
1516
  // appropriate time for an entry.
1517
  // Since we add entries with increasing time, this queue is ordered by time.
1518
  struct RemoveQueueEntry {
1519
    TransactionId id;
1520
    HybridTime time;
1521
    RemoveReason reason;
1522
1523
0
    std::string ToString() const {
1524
0
      return YB_STRUCT_TO_STRING(id, time, reason);
1525
0
    }
1526
  };
1527
1528
  // Guarded by RunningTransactionContext::mutex_
1529
  std::deque<RemoveQueueEntry> remove_queue_;
1530
1531
  // Guarded by RunningTransactionContext::mutex_
1532
  HybridTime last_safe_time_ = HybridTime::kMin;
1533
1534
  HybridTime ignore_all_transactions_started_before_ GUARDED_BY(mutex_) = HybridTime::kMin;
1535
1536
  std::unordered_set<TransactionId, TransactionIdHash> recently_removed_transactions_;
1537
  struct RecentlyRemovedTransaction {
1538
    TransactionId id;
1539
    CoarseTimePoint time;
1540
  };
1541
  std::deque<RecentlyRemovedTransaction> recently_removed_transactions_cleanup_queue_;
1542
1543
  std::mutex status_resolvers_mutex_;
1544
  std::deque<TransactionStatusResolver> status_resolvers_ GUARDED_BY(status_resolvers_mutex_);
1545
1546
  scoped_refptr<AtomicGauge<uint64_t>> metric_transactions_running_;
1547
  scoped_refptr<Counter> metric_transaction_not_found_;
1548
1549
  TransactionLoader loader_;
1550
  std::atomic<bool> closing_{false};
1551
  CountDownLatch start_latch_{1};
1552
1553
  std::atomic<HybridTime> min_running_ht_{HybridTime::kInvalid};
1554
  std::atomic<CoarseTimePoint> next_check_min_running_{CoarseTimePoint()};
1555
  HybridTime waiting_for_min_running_ht_ = HybridTime::kMax;
1556
  std::atomic<bool> shutdown_done_{false};
1557
1558
  mutable std::atomic<client::YBClient*> client_cache_{nullptr};
1559
1560
  LRUCache<TransactionId> cleanup_cache_{FLAGS_transactions_cleanup_cache_size};
1561
1562
  rpc::Poller poller_;
1563
};
1564
1565
TransactionParticipant::TransactionParticipant(
1566
    TransactionParticipantContext* context, TransactionIntentApplier* applier,
1567
    const scoped_refptr<MetricEntity>& entity)
1568
25.8k
    : impl_(new Impl(context, applier, entity)) {
1569
25.8k
}
1570
1571
19.2k
TransactionParticipant::~TransactionParticipant() {
1572
19.2k
}
1573
1574
25.8k
void TransactionParticipant::Start() {
1575
25.8k
  impl_->Start();
1576
25.8k
}
1577
1578
1.02M
Result<bool> TransactionParticipant::Add(const TransactionMetadata& metadata) {
1579
1.02M
  return impl_->Add(metadata);
1580
1.02M
}
1581
1582
Result<TransactionMetadata> TransactionParticipant::PrepareMetadata(
1583
776k
    const TransactionMetadataPB& pb) {
1584
776k
  return impl_->PrepareMetadata(pb);
1585
776k
}
1586
1587
boost::optional<std::pair<IsolationLevel, TransactionalBatchData>>
1588
    TransactionParticipant::PrepareBatchData(
1589
    const TransactionId& id, size_t batch_idx,
1590
1.10M
    boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
1591
1.10M
  return impl_->PrepareBatchData(id, batch_idx, encoded_replicated_batches);
1592
1.10M
}
1593
1594
void TransactionParticipant::BatchReplicated(
1595
1.26M
    const TransactionId& id, const TransactionalBatchData& data) {
1596
1.26M
  return impl_->BatchReplicated(id, data);
1597
1.26M
}
1598
1599
347k
HybridTime TransactionParticipant::LocalCommitTime(const TransactionId& id) {
1600
347k
  return impl_->LocalCommitTime(id);
1601
347k
}
1602
1603
67.5k
boost::optional<CommitMetadata> TransactionParticipant::LocalCommitData(const TransactionId& id) {
1604
67.5k
  return impl_->LocalCommitData(id);
1605
67.5k
}
1606
1607
0
std::pair<size_t, size_t> TransactionParticipant::TEST_CountIntents() const {
1608
0
  return impl_->TEST_CountIntents();
1609
0
}
1610
1611
260k
void TransactionParticipant::RequestStatusAt(const StatusRequest& request) {
1612
260k
  return impl_->RequestStatusAt(request);
1613
260k
}
1614
1615
3.36M
int64_t TransactionParticipant::RegisterRequest() {
1616
3.36M
  return impl_->RegisterRequest();
1617
3.36M
}
1618
1619
3.37M
void TransactionParticipant::UnregisterRequest(int64_t request) {
1620
3.37M
  impl_->UnregisterRequest(request);
1621
3.37M
}
1622
1623
void TransactionParticipant::Abort(const TransactionId& id,
1624
38.7k
                                   TransactionStatusCallback callback) {
1625
38.7k
  return impl_->Abort(id, std::move(callback));
1626
38.7k
}
1627
1628
void TransactionParticipant::Handle(
1629
795k
    std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) {
1630
795k
  impl_->Handle(std::move(request), term);
1631
795k
}
1632
1633
0
void TransactionParticipant::Cleanup(TransactionIdSet&& set) {
1634
0
  return impl_->Cleanup(std::move(set), this);
1635
0
}
1636
1637
737k
Status TransactionParticipant::ProcessReplicated(const ReplicatedData& data) {
1638
737k
  return impl_->ProcessReplicated(data);
1639
737k
}
1640
1641
129k
Status TransactionParticipant::CheckAborted(const TransactionId& id) {
1642
129k
  return impl_->CheckAborted(id);
1643
129k
}
1644
1645
void TransactionParticipant::FillPriorities(
1646
85.0k
    boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) {
1647
85.0k
  return impl_->FillPriorities(inout);
1648
85.0k
}
1649
1650
void TransactionParticipant::SetDB(
1651
    const docdb::DocDB& db, const docdb::KeyBounds* key_bounds,
1652
107k
    RWOperationCounter* pending_op_counter) {
1653
107k
  impl_->SetDB(db, key_bounds, pending_op_counter);
1654
107k
}
1655
1656
void TransactionParticipant::GetStatus(
1657
    const TransactionId& transaction_id,
1658
    size_t required_num_replicated_batches,
1659
    int64_t term,
1660
    tserver::GetTransactionStatusAtParticipantResponsePB* response,
1661
0
    rpc::RpcContext* context) {
1662
0
  impl_->GetStatus(transaction_id, required_num_replicated_batches, term, response, context);
1663
0
}
1664
1665
24
TransactionParticipantContext* TransactionParticipant::context() const {
1666
24
  return impl_->participant_context();
1667
24
}
1668
1669
4.51M
HybridTime TransactionParticipant::MinRunningHybridTime() const {
1670
4.51M
  return impl_->MinRunningHybridTime();
1671
4.51M
}
1672
1673
31
void TransactionParticipant::WaitMinRunningHybridTime(HybridTime ht) {
1674
31
  impl_->WaitMinRunningHybridTime(ht);
1675
31
}
1676
1677
0
Status TransactionParticipant::ResolveIntents(HybridTime resolve_at, CoarseTimePoint deadline) {
1678
0
  return impl_->ResolveIntents(resolve_at, deadline);
1679
0
}
1680
1681
0
size_t TransactionParticipant::TEST_GetNumRunningTransactions() const {
1682
0
  return impl_->TEST_GetNumRunningTransactions();
1683
0
}
1684
1685
OneWayBitmap TransactionParticipant::TEST_TransactionReplicatedBatches(
1686
0
    const TransactionId& id) const {
1687
0
  return impl_->TEST_TransactionReplicatedBatches(id);
1688
0
}
1689
1690
0
std::string TransactionParticipant::ReplicatedData::ToString() const {
1691
0
  return YB_STRUCT_TO_STRING(leader_term, state, op_id, hybrid_time, already_applied_to_regular_db);
1692
0
}
1693
1694
19.3k
void TransactionParticipant::StartShutdown() {
1695
19.3k
  impl_->StartShutdown();
1696
19.3k
}
1697
1698
19.3k
void TransactionParticipant::CompleteShutdown() {
1699
19.3k
  impl_->CompleteShutdown();
1700
19.3k
}
1701
1702
0
std::string TransactionParticipant::DumpTransactions() const {
1703
0
  return impl_->DumpTransactions();
1704
0
}
1705
1706
Status TransactionParticipant::StopActiveTxnsPriorTo(
1707
10.2k
    HybridTime cutoff, CoarseTimePoint deadline, TransactionId* exclude_txn_id) {
1708
10.2k
  return impl_->StopActiveTxnsPriorTo(cutoff, deadline, exclude_txn_id);
1709
10.2k
}
1710
1711
Result<HybridTime> TransactionParticipant::WaitForSafeTime(
1712
10.2k
    HybridTime safe_time, CoarseTimePoint deadline) {
1713
10.2k
  return impl_->WaitForSafeTime(safe_time, deadline);
1714
10.2k
}
1715
1716
0
void TransactionParticipant::IgnoreAllTransactionsStartedBefore(HybridTime limit) {
1717
0
  impl_->IgnoreAllTransactionsStartedBefore(limit);
1718
0
}
1719
1720
0
const TabletId& TransactionParticipant::tablet_id() const {
1721
0
  return impl_->participant_context()->tablet_id();
1722
0
}
1723
1724
26.2k
std::string TransactionParticipantContext::LogPrefix() const {
1725
26.2k
  return consensus::MakeTabletLogPrefix(tablet_id(), permanent_uuid());
1726
26.2k
}
1727
1728
756k
HybridTime TransactionParticipantContext::Now() {
1729
756k
  return clock_ptr()->Now();
1730
756k
}
1731
1732
} // namespace tablet
1733
} // namespace yb