YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_participant.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/tablet/transaction_participant.h"
17
18
#include <queue>
19
20
#include <boost/multi_index/hashed_index.hpp>
21
#include <boost/multi_index/mem_fun.hpp>
22
#include <boost/multi_index/ordered_index.hpp>
23
24
#include "yb/client/transaction_rpc.h"
25
26
#include "yb/common/pgsql_error.h"
27
#include "yb/common/transaction_error.h"
28
29
#include "yb/consensus/consensus_util.h"
30
31
#include "yb/docdb/docdb_rocksdb_util.h"
32
#include "yb/docdb/transaction_dump.h"
33
34
#include "yb/rpc/poller.h"
35
36
#include "yb/server/clock.h"
37
38
#include "yb/tablet/cleanup_aborts_task.h"
39
#include "yb/tablet/cleanup_intents_task.h"
40
#include "yb/tablet/operations/update_txn_operation.h"
41
#include "yb/tablet/remove_intents_task.h"
42
#include "yb/tablet/running_transaction.h"
43
#include "yb/tablet/running_transaction_context.h"
44
#include "yb/tablet/transaction_loader.h"
45
#include "yb/tablet/transaction_participant_context.h"
46
#include "yb/tablet/transaction_status_resolver.h"
47
48
#include "yb/tserver/tserver_service.pb.h"
49
50
#include "yb/util/countdown_latch.h"
51
#include "yb/util/debug-util.h"
52
#include "yb/util/flag_tags.h"
53
#include "yb/util/format.h"
54
#include "yb/util/logging.h"
55
#include "yb/util/lru_cache.h"
56
#include "yb/util/metrics.h"
57
#include "yb/util/operation_counter.h"
58
#include "yb/util/scope_exit.h"
59
#include "yb/util/status_format.h"
60
#include "yb/util/status_log.h"
61
#include "yb/util/tsan_util.h"
62
63
using namespace std::literals;
64
using namespace std::placeholders;
65
66
DEFINE_uint64(transaction_min_running_check_delay_ms, 50,
67
              "When transaction with minimal start hybrid time is updated at transaction "
68
              "participant, we wait at least this number of milliseconds before checking its "
69
              "status at transaction coordinator. Used for the optimization that deletes "
70
              "provisional records RocksDB SSTable files.");
71
72
DEFINE_uint64(transaction_min_running_check_interval_ms, 250,
73
              "While transaction with minimal start hybrid time remains the same, we will try "
74
              "to check its status at transaction coordinator at regular intervals this "
75
              "long (ms). Used for the optimization that deletes "
76
              "provisional records RocksDB SSTable files.");
77
78
DEFINE_test_flag(double, transaction_ignore_applying_probability, 0,
79
                 "Probability to ignore APPLYING update in tests.");
80
DEFINE_test_flag(bool, fail_in_apply_if_no_metadata, false,
81
                 "Fail when applying intents if metadata is not found.");
82
83
DEFINE_int32(max_transactions_in_status_request, 128,
84
             "Request status for at most specified number of transactions at once. "
85
                 "0 disables load time transaction status resolution.");
86
87
DEFINE_uint64(transactions_cleanup_cache_size, 256, "Transactions cleanup cache size.");
88
89
DEFINE_uint64(transactions_status_poll_interval_ms, 500 * yb::kTimeMultiplier,
90
              "Transactions poll interval.");
91
92
DEFINE_bool(transactions_poll_check_aborted, true, "Check aborted transactions during poll.");
93
94
DECLARE_int64(transaction_abort_check_timeout_ms);
95
96
METRIC_DEFINE_simple_counter(
97
    tablet, transaction_not_found, "Total number of missing transactions during load",
98
    yb::MetricUnit::kTransactions);
99
METRIC_DEFINE_simple_gauge_uint64(
100
    tablet, transactions_running, "Total number of transactions running in participant",
101
    yb::MetricUnit::kTransactions);
102
103
DEFINE_test_flag(int32, txn_participant_inject_latency_on_apply_update_txn_ms, 0,
104
                 "How much latency to inject when a update txn operation is applied.");
105
106
namespace yb {
107
namespace tablet {
108
109
namespace {
110
111
YB_STRONGLY_TYPED_BOOL(PostApplyCleanup);
112
113
} // namespace
114
115
93
std::string TransactionApplyData::ToString() const {
116
93
  return YB_STRUCT_TO_STRING(
117
93
      leader_term, transaction_id, op_id, commit_ht, log_ht, sealed, status_tablet, apply_state);
118
93
}
119
120
class TransactionParticipant::Impl
121
    : public RunningTransactionContext, public TransactionLoaderContext {
122
 public:
123
  Impl(TransactionParticipantContext* context, TransactionIntentApplier* applier,
124
       const scoped_refptr<MetricEntity>& entity)
125
      : RunningTransactionContext(context, applier),
126
        log_prefix_(context->LogPrefix()),
127
        loader_(this, entity),
128
56.9k
        poller_(log_prefix_, std::bind(&Impl::Poll, this)) {
129
56.9k
    LOG_WITH_PREFIX(INFO) << "Create";
130
56.9k
    metric_transactions_running_ = METRIC_transactions_running.Instantiate(entity, 0);
131
56.9k
    metric_transaction_not_found_ = METRIC_transaction_not_found.Instantiate(entity);
132
56.9k
  }
133
134
44.5k
  ~Impl() {
135
44.5k
    if (StartShutdown()) {
136
0
      CompleteShutdown();
137
44.5k
    } else {
138
18.4E
      LOG_IF_WITH_PREFIX(DFATAL, !shutdown_done_.load(std::memory_order_acquire))
139
18.4E
          << "Destroying transaction participant that did not complete shutdown";
140
44.5k
    }
141
44.5k
  }
142
143
89.4k
  bool StartShutdown() {
144
89.4k
    bool expected = false;
145
89.4k
    if (!closing_.compare_exchange_strong(expected, true)) {
146
44.6k
      return false;
147
44.6k
    }
148
149
44.8k
    poller_.Shutdown();
150
151
44.8k
    if (start_latch_.count()) {
152
0
      start_latch_.CountDown();
153
0
    }
154
155
44.8k
    LOG_WITH_PREFIX(INFO) << "Shutdown";
156
44.8k
    return true;
157
89.4k
  }
158
159
44.7k
  void CompleteShutdown() {
160
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, !closing_.load()) << __func__ << " w/o StartShutdown";
161
162
44.7k
    decltype(status_resolvers_) status_resolvers;
163
44.7k
    {
164
44.7k
      MinRunningNotifier min_running_notifier(nullptr /* applier */);
165
44.7k
      std::lock_guard<std::mutex> lock(mutex_);
166
44.7k
      transactions_.clear();
167
44.7k
      TransactionsModifiedUnlocked(&min_running_notifier);
168
44.7k
      status_resolvers.swap(status_resolvers_);
169
44.7k
    }
170
171
44.7k
    rpcs_.Shutdown();
172
44.7k
    loader_.Shutdown();
173
44.7k
    for (auto& resolver : status_resolvers) {
174
3
      resolver.Shutdown();
175
3
    }
176
44.7k
    shutdown_done_.store(true, std::memory_order_release);
177
44.7k
  }
178
179
183
  bool Closing() const override {
180
183
    return closing_.load(std::memory_order_acquire);
181
183
  }
182
183
56.9k
  void Start() {
184
56.9k
    LOG_WITH_PREFIX(INFO) << "Start";
185
56.9k
    start_latch_.CountDown();
186
56.9k
  }
187
188
  // Adds new running transaction.
189
1.80M
  Result<bool> Add(const TransactionMetadata& metadata) {
190
1.80M
    loader_.WaitLoaded(metadata.transaction_id);
191
192
1.80M
    MinRunningNotifier min_running_notifier(&applier_);
193
1.80M
    std::lock_guard<std::mutex> lock(mutex_);
194
1.80M
    auto it = transactions_.find(metadata.transaction_id);
195
1.80M
    if (it != transactions_.end()) {
196
586
      return false;
197
586
    }
198
1.80M
    if (WasTransactionRecentlyRemoved(metadata.transaction_id) ||
199
1.80M
        cleanup_cache_.Erase(metadata.transaction_id) != 0) {
200
130k
      auto status = STATUS_EC_FORMAT(
201
130k
          TryAgain, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE),
202
130k
          "Transaction was recently aborted: $0", metadata.transaction_id);
203
130k
      return status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted));
204
130k
    }
205
18.4E
    VLOG_WITH_PREFIX(4) << "Create new transaction: " << metadata.transaction_id;
206
1.67M
    transactions_.insert(std::make_shared<RunningTransaction>(
207
1.67M
        metadata, TransactionalBatchData(), OneWayBitmap(), metadata.start_time, this));
208
1.67M
    TransactionsModifiedUnlocked(&min_running_notifier);
209
1.67M
    return true;
210
1.80M
  }
211
212
533k
  HybridTime LocalCommitTime(const TransactionId& id) {
213
533k
    std::lock_guard<std::mutex> lock(mutex_);
214
533k
    auto it = transactions_.find(id);
215
533k
    if (it == transactions_.end()) {
216
23.9k
      return HybridTime::kInvalid;
217
23.9k
    }
218
509k
    return (**it).local_commit_time();
219
533k
  }
220
221
219k
  boost::optional<CommitMetadata> LocalCommitData(const TransactionId& id) {
222
219k
    std::lock_guard<std::mutex> lock(mutex_);
223
219k
    auto it = transactions_.find(id);
224
219k
    if (it == transactions_.end()) {
225
4.12k
      return boost::none;
226
4.12k
    }
227
215k
    return boost::make_optional<CommitMetadata>({
228
215k
      .commit_ht = (**it).local_commit_time(),
229
215k
      .aborted_subtxn_set = (**it).local_commit_aborted_subtxn_set(),
230
215k
    });
231
219k
  }
232
233
0
  std::pair<size_t, size_t> TEST_CountIntents() {
234
0
    {
235
0
      MinRunningNotifier min_running_notifier(&applier_);
236
0
      std::lock_guard<std::mutex> lock(mutex_);
237
0
      ProcessRemoveQueueUnlocked(&min_running_notifier);
238
0
    }
239
240
0
    std::pair<size_t, size_t> result(0, 0);
241
0
    auto iter = docdb::CreateRocksDBIterator(db_.intents,
242
0
                                             key_bounds_,
243
0
                                             docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
244
0
                                             boost::none,
245
0
                                             rocksdb::kDefaultQueryId);
246
0
    for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
247
0
      ++result.first;
248
      // Count number of transaction, by counting metadata records.
249
0
      if (iter.key().size() == TransactionId::StaticSize() + 1) {
250
0
        ++result.second;
251
0
        auto key = iter.key();
252
0
        key.remove_prefix(1);
253
0
        auto id = CHECK_RESULT(FullyDecodeTransactionId(key));
254
0
        LOG_WITH_PREFIX(INFO) << "Stored txn meta: " << id;
255
0
      }
256
0
    }
257
258
0
    return result;
259
0
  }
260
261
1.60M
  Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) {
262
1.60M
    if (pb.has_isolation()) {
263
737k
      auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(pb));
264
0
      std::unique_lock<std::mutex> lock(mutex_);
265
737k
      auto it = transactions_.find(metadata.transaction_id);
266
737k
      if (it != transactions_.end()) {
267
305
        RETURN_NOT_OK((**it).CheckAborted());
268
737k
      } else if (WasTransactionRecentlyRemoved(metadata.transaction_id)) {
269
0
        return MakeAbortedStatus(metadata.transaction_id);
270
0
      }
271
737k
      return metadata;
272
737k
    }
273
274
862k
    auto id = VERIFY_RESULT(FullyDecodeTransactionId(pb.transaction_id()));
275
276
    // We are not trying to cleanup intents here because we don't know whether this transaction
277
    // has intents or not.
278
0
    auto lock_and_iterator = LockAndFind(
279
862k
        id, "metadata"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
280
862k
    if (!lock_and_iterator.found()) {
281
1.22k
      return STATUS(TryAgain,
282
1.22k
                    Format("Unknown transaction, could be recently aborted: $0", id), Slice(),
283
1.22k
                    PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE));
284
1.22k
    }
285
861k
    RETURN_NOT_OK(lock_and_iterator.transaction().CheckAborted());
286
860k
    return lock_and_iterator.transaction().metadata();
287
861k
  }
288
289
  boost::optional<std::pair<IsolationLevel, TransactionalBatchData>> PrepareBatchData(
290
      const TransactionId& id, size_t batch_idx,
291
2.30M
      boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
292
    // We are not trying to cleanup intents here because we don't know whether this transaction
293
    // has intents of not.
294
2.30M
    auto lock_and_iterator = LockAndFind(
295
2.30M
        id, "metadata with write id"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
296
2.30M
    if (!lock_and_iterator.found()) {
297
481
      return boost::none;
298
481
    }
299
2.30M
    auto& transaction = lock_and_iterator.transaction();
300
2.30M
    transaction.AddReplicatedBatch(batch_idx, encoded_replicated_batches);
301
2.30M
    return std::make_pair(transaction.metadata().isolation, transaction.last_batch_data());
302
2.30M
  }
303
304
2.31M
  void BatchReplicated(const TransactionId& id, const TransactionalBatchData& data) {
305
2.31M
    std::lock_guard<std::mutex> lock(mutex_);
306
2.31M
    auto it = transactions_.find(id);
307
2.31M
    if (it == transactions_.end()) {
308
99
      
LOG_IF_WITH_PREFIX0
(DFATAL, !WasTransactionRecentlyRemoved(id))
309
0
          << "Update last write id for unknown transaction: " << id;
310
99
      return;
311
99
    }
312
2.31M
    (**it).BatchReplicated(data);
313
2.31M
  }
314
315
388k
  void RequestStatusAt(const StatusRequest& request) {
316
388k
    auto lock_and_iterator = LockAndFind(*request.id, *request.reason, request.flags);
317
388k
    if (!lock_and_iterator.found()) {
318
15.0k
      request.callback(
319
15.0k
          STATUS_FORMAT(NotFound, "Request status of unknown transaction: $0", *request.id));
320
15.0k
      return;
321
15.0k
    }
322
373k
    lock_and_iterator.transaction().RequestStatusAt(request, &lock_and_iterator.lock);
323
373k
  }
324
325
  // Registers a request, giving it a newly allocated id and returning this id.
326
6.70M
  int64_t RegisterRequest() {
327
6.70M
    std::lock_guard<std::mutex> lock(mutex_);
328
6.70M
    auto result = NextRequestIdUnlocked();
329
6.70M
    running_requests_.push_back(result);
330
6.70M
    return result;
331
6.70M
  }
332
333
  // Unregisters a previously registered request.
334
6.71M
  void UnregisterRequest(int64_t request) {
335
6.71M
    MinRunningNotifier min_running_notifier(&applier_);
336
6.71M
    {
337
6.71M
      std::lock_guard<std::mutex> lock(mutex_);
338
6.71M
      DCHECK(!running_requests_.empty());
339
6.71M
      if (running_requests_.front() != request) {
340
2.43M
        complete_requests_.push(request);
341
2.43M
        return;
342
2.43M
      }
343
4.27M
      running_requests_.pop_front();
344
6.71M
      while (!complete_requests_.empty() && 
complete_requests_.top() == running_requests_.front()2.65M
) {
345
2.43M
        complete_requests_.pop();
346
2.43M
        running_requests_.pop_front();
347
2.43M
      }
348
349
4.27M
      CleanTransactionsUnlocked(&min_running_notifier);
350
4.27M
    }
351
4.27M
  }
352
353
  // Cleans transactions that are requested and now is safe to clean.
354
  // See RemoveUnlocked for details.
355
4.27M
  void CleanTransactionsUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
356
4.27M
    ProcessRemoveQueueUnlocked(min_running_notifier);
357
358
4.27M
    CleanTransactionsQueue(&immediate_cleanup_queue_, min_running_notifier);
359
4.27M
    CleanTransactionsQueue(&graceful_cleanup_queue_, min_running_notifier);
360
4.27M
  }
361
362
  template <class Queue>
363
  void CleanTransactionsQueue(
364
18.5M
      Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
365
18.5M
    int64_t min_request = running_requests_.empty() ? 
std::numeric_limits<int64_t>::max()17.5M
366
18.5M
                                                    : 
running_requests_.front()979k
;
367
18.5M
    HybridTime safe_time;
368
19.0M
    while (!queue->empty()) {
369
748k
      const auto& front = queue->front();
370
748k
      if (front.request_id >= min_request) {
371
142k
        break;
372
142k
      }
373
606k
      if (!front.Ready(&participant_context_, &safe_time)) {
374
117k
        break;
375
117k
      }
376
488k
      const auto& id = front.transaction_id;
377
488k
      RemoveIntentsData checkpoint;
378
488k
      auto it = transactions_.find(id);
379
380
488k
      if (it != transactions_.end() && 
!(**it).ProcessingApply()417k
) {
381
417k
        OpId op_id = (**it).GetOpId();
382
417k
        participant_context_.GetLastCDCedData(&checkpoint);
383
417k
        
VLOG_WITH_PREFIX10
(2) << "Cleaning tx opid is " << op_id.ToString()
384
10
                            << " checkpoint opid is " << checkpoint.op_id.ToString();
385
386
417k
        if (checkpoint.op_id < op_id) {
387
380
          break;
388
380
        }
389
416k
        (**it).ScheduleRemoveIntents(*it);
390
416k
        RemoveTransaction(it, front.reason, min_running_notifier);
391
416k
      }
392
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id;
393
488k
      queue->pop_front();
394
488k
    }
395
18.5M
  }
void yb::tablet::TransactionParticipant::Impl::CleanTransactionsQueue<std::__1::deque<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry> > >(std::__1::deque<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry> >*, yb::tablet::MinRunningNotifier*)
Line
Count
Source
364
14.2M
      Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
365
14.2M
    int64_t min_request = running_requests_.empty() ? 
std::numeric_limits<int64_t>::max()13.7M
366
14.2M
                                                    : 
running_requests_.front()522k
;
367
14.2M
    HybridTime safe_time;
368
14.5M
    while (!queue->empty()) {
369
380k
      const auto& front = queue->front();
370
380k
      if (front.request_id >= min_request) {
371
14.6k
        break;
372
14.6k
      }
373
366k
      if (!front.Ready(&participant_context_, &safe_time)) {
374
117k
        break;
375
117k
      }
376
248k
      const auto& id = front.transaction_id;
377
248k
      RemoveIntentsData checkpoint;
378
248k
      auto it = transactions_.find(id);
379
380
248k
      if (it != transactions_.end() && 
!(**it).ProcessingApply()186k
) {
381
186k
        OpId op_id = (**it).GetOpId();
382
186k
        participant_context_.GetLastCDCedData(&checkpoint);
383
186k
        
VLOG_WITH_PREFIX5
(2) << "Cleaning tx opid is " << op_id.ToString()
384
5
                            << " checkpoint opid is " << checkpoint.op_id.ToString();
385
386
186k
        if (checkpoint.op_id < op_id) {
387
0
          break;
388
0
        }
389
186k
        (**it).ScheduleRemoveIntents(*it);
390
186k
        RemoveTransaction(it, front.reason, min_running_notifier);
391
186k
      }
392
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id;
393
248k
      queue->pop_front();
394
248k
    }
395
14.2M
  }
void yb::tablet::TransactionParticipant::Impl::CleanTransactionsQueue<std::__1::deque<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry> > >(std::__1::deque<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry> >*, yb::tablet::MinRunningNotifier*)
Line
Count
Source
364
4.27M
      Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
365
4.27M
    int64_t min_request = running_requests_.empty() ? 
std::numeric_limits<int64_t>::max()3.82M
366
4.27M
                                                    : 
running_requests_.front()457k
;
367
4.27M
    HybridTime safe_time;
368
4.51M
    while (!queue->empty()) {
369
368k
      const auto& front = queue->front();
370
368k
      if (front.request_id >= min_request) {
371
128k
        break;
372
128k
      }
373
239k
      if (!front.Ready(&participant_context_, &safe_time)) {
374
0
        break;
375
0
      }
376
239k
      const auto& id = front.transaction_id;
377
239k
      RemoveIntentsData checkpoint;
378
239k
      auto it = transactions_.find(id);
379
380
239k
      if (it != transactions_.end() && 
!(**it).ProcessingApply()231k
) {
381
231k
        OpId op_id = (**it).GetOpId();
382
231k
        participant_context_.GetLastCDCedData(&checkpoint);
383
231k
        
VLOG_WITH_PREFIX5
(2) << "Cleaning tx opid is " << op_id.ToString()
384
5
                            << " checkpoint opid is " << checkpoint.op_id.ToString();
385
386
231k
        if (checkpoint.op_id < op_id) {
387
380
          break;
388
380
        }
389
230k
        (**it).ScheduleRemoveIntents(*it);
390
230k
        RemoveTransaction(it, front.reason, min_running_notifier);
391
230k
      }
392
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id;
393
239k
      queue->pop_front();
394
239k
    }
395
4.27M
  }
396
397
56.1k
  void Abort(const TransactionId& id, TransactionStatusCallback callback) {
398
    // We are not trying to cleanup intents here because we don't know whether this transaction
399
    // has intents of not.
400
56.1k
    auto lock_and_iterator = LockAndFind(
401
56.1k
        id, "abort"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
402
56.1k
    if (!lock_and_iterator.found()) {
403
56
      callback(STATUS_FORMAT(NotFound, "Abort of unknown transaction: $0", id));
404
56
      return;
405
56
    }
406
56.1k
    auto client_result = client();
407
56.1k
    if (!client_result.ok()) {
408
0
      callback(client_result.status());
409
0
      return;
410
0
    }
411
56.1k
    lock_and_iterator.transaction().Abort(
412
56.1k
        *client_result, std::move(callback), &lock_and_iterator.lock);
413
56.1k
  }
414
415
369k
  CHECKED_STATUS CheckAborted(const TransactionId& id) {
416
    // We are not trying to cleanup intents here because we don't know whether this transaction
417
    // has intents of not.
418
369k
    auto lock_and_iterator = LockAndFind(id, "check aborted"s, TransactionLoadFlags{});
419
369k
    if (!lock_and_iterator.found()) {
420
4
      return MakeAbortedStatus(id);
421
4
    }
422
369k
    return lock_and_iterator.transaction().CheckAborted();
423
369k
  }
424
425
  void FillPriorities(
426
146k
      boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) {
427
    // TODO(dtxn) optimize locking
428
179k
    for (auto& pair : *inout) {
429
179k
      auto lock_and_iterator = LockAndFind(
430
179k
          pair.first, "fill priorities"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
431
179k
      if (!lock_and_iterator.found() || 
lock_and_iterator.transaction().WasAborted()179k
) {
432
981
        pair.second = 0; // Minimal priority for already aborted transactions
433
178k
      } else {
434
178k
        pair.second = lock_and_iterator.transaction().metadata().priority;
435
178k
      }
436
179k
    }
437
146k
  }
438
439
1.36M
  void Handle(std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term) {
440
1.36M
    auto txn_status = operation->request()->status();
441
1.36M
    if (txn_status == TransactionStatus::APPLYING) {
442
436k
      HandleApplying(std::move(operation), term);
443
436k
      return;
444
436k
    }
445
446
926k
    if (txn_status == TransactionStatus::IMMEDIATE_CLEANUP ||
447
926k
        
txn_status == TransactionStatus::GRACEFUL_CLEANUP248k
) {
448
926k
      auto cleanup_type = txn_status == TransactionStatus::IMMEDIATE_CLEANUP
449
926k
          ? 
CleanupType::kImmediate677k
450
926k
          : 
CleanupType::kGraceful248k
;
451
926k
      HandleCleanup(std::move(operation), term, cleanup_type);
452
926k
      return;
453
926k
    }
454
455
583
    auto error_status = STATUS_FORMAT(
456
583
        InvalidArgument, "Unexpected status in transaction participant Handle: $0", *operation);
457
583
    LOG_WITH_PREFIX(DFATAL) << error_status;
458
583
    operation->CompleteWithStatus(error_status);
459
583
  }
460
461
1.30M
  CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) {
462
1.30M
    if (FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms > 0) {
463
0
      SleepFor(1ms * FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms);
464
0
    }
465
466
1.30M
    auto id = FullyDecodeTransactionId(data.state.transaction_id());
467
1.30M
    if (!id.ok()) {
468
0
      return id.status();
469
0
    }
470
471
1.30M
    if (data.state.status() == TransactionStatus::APPLYING) {
472
1.30M
      return ReplicatedApplying(*id, data);
473
1.30M
    } else 
if (716
data.state.status() == TransactionStatus::ABORTED716
) {
474
0
      return ReplicatedAborted(*id, data);
475
0
    }
476
477
716
    auto status = STATUS_FORMAT(
478
716
        InvalidArgument, "Unexpected status in transaction participant ProcessReplicated: $0, $1",
479
716
        data.op_id, data.state);
480
716
    LOG_WITH_PREFIX(DFATAL) << status;
481
716
    return status;
482
1.30M
  }
483
484
0
  void Cleanup(TransactionIdSet&& set, TransactionStatusManager* status_manager) {
485
0
    auto cleanup_aborts_task = std::make_shared<CleanupAbortsTask>(
486
0
        &applier_, std::move(set), &participant_context_, status_manager, LogPrefix());
487
0
    cleanup_aborts_task->Prepare(cleanup_aborts_task);
488
0
    participant_context_.StrandEnqueue(cleanup_aborts_task.get());
489
0
  }
490
491
1.30M
  CHECKED_STATUS ProcessApply(const TransactionApplyData& data) {
492
18.4E
    VLOG_WITH_PREFIX(2) << "Apply: " << data.ToString();
493
494
1.30M
    loader_.WaitLoaded(data.transaction_id);
495
496
1.30M
    ScopedRWOperation operation(pending_op_counter_);
497
1.30M
    if (!operation.ok()) {
498
0
      LOG_WITH_PREFIX(WARNING) << "Process apply rejected";
499
0
      return Status::OK();
500
0
    }
501
502
1.30M
    bool was_applied = false;
503
504
1.30M
    {
505
      // It is our last chance to load transaction metadata, if missing.
506
      // Because it will be deleted when intents are applied.
507
      // We are not trying to cleanup intents here because we don't know whether this transaction
508
      // has intents of not.
509
1.30M
      auto lock_and_iterator = LockAndFind(
510
1.30M
          data.transaction_id, "pre apply"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
511
1.30M
      if (!lock_and_iterator.found()) {
512
        // This situation is normal and could be caused by 2 scenarios:
513
        // 1) Write batch failed, but originator doesn't know that.
514
        // 2) Failed to notify status tablet that we applied transaction.
515
1.57k
        
YB_LOG_WITH_PREFIX_EVERY_N_SECS92
(WARNING, 1)
516
92
            << Format("Apply of unknown transaction: $0", data);
517
1.57k
        NotifyApplied(data);
518
1.57k
        CHECK(!FLAGS_TEST_fail_in_apply_if_no_metadata);
519
1.57k
        return Status::OK();
520
1.57k
      }
521
522
1.30M
      auto existing_commit_ht = lock_and_iterator.transaction().local_commit_time();
523
1.30M
      if (existing_commit_ht) {
524
18
        was_applied = true;
525
18
        LOG_WITH_PREFIX(INFO) << "Transaction already applied: " << data.transaction_id;
526
18
        
LOG_IF_WITH_PREFIX0
(DFATAL, data.commit_ht != existing_commit_ht)
527
0
            << "Transaction was previously applied with another commit ht: " << existing_commit_ht
528
0
            << ", new commit ht: " << data.commit_ht;
529
1.30M
      } else {
530
1.30M
        transactions_.modify(lock_and_iterator.iterator, [&data](auto& txn) {
531
1.30M
          txn->SetLocalCommitData(data.commit_ht, data.aborted);
532
1.30M
        });
533
534
18.4E
        LOG_IF_WITH_PREFIX(DFATAL, data.log_ht < last_safe_time_)
535
18.4E
            << "Apply transaction before last safe time " << data.transaction_id
536
18.4E
            << ": " << data.log_ht << " vs " << last_safe_time_;
537
1.30M
      }
538
1.30M
    }
539
540
1.30M
    if (
!was_applied1.30M
) {
541
1.30M
      auto apply_state = CHECK_RESULT(applier_.ApplyIntents(data));
542
543
18.4E
      VLOG_WITH_PREFIX(4) << "TXN: " << data.transaction_id << ": apply state: "
544
18.4E
                          << apply_state.ToString();
545
546
1.30M
      UpdateAppliedTransaction(data, apply_state, &operation);
547
1.30M
    }
548
549
1.30M
    NotifyApplied(data);
550
1.30M
    return Status::OK();
551
1.30M
  }
552
553
  void UpdateAppliedTransaction(
554
       const TransactionApplyData& data,
555
       const docdb::ApplyTransactionState& apply_state,
556
1.30M
       ScopedRWOperation* operation) NO_THREAD_SAFETY_ANALYSIS {
557
1.30M
    MinRunningNotifier min_running_notifier(&applier_);
558
    // We are not trying to cleanup intents here because we don't know whether this transaction
559
    // has intents or not.
560
1.30M
    auto lock_and_iterator = LockAndFind(
561
1.30M
        data.transaction_id, "apply"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist});
562
1.30M
    if (lock_and_iterator.found()) {
563
1.30M
      lock_and_iterator.transaction().SetOpId(data.op_id);
564
1.30M
      if (!apply_state.active()) {
565
1.30M
        RemoveUnlocked(lock_and_iterator.iterator, RemoveReason::kApplied, &min_running_notifier);
566
1.30M
      } else {
567
1.08k
        lock_and_iterator.transaction().SetApplyData(apply_state, &data, operation);
568
1.08k
      }
569
1.30M
    }
570
1.30M
  }
571
572
1.30M
  void NotifyApplied(const TransactionApplyData& data) {
573
1.30M
    
VLOG_WITH_PREFIX912
(4) << Format("NotifyApplied($0)", data)912
;
574
575
1.30M
    if (data.leader_term != OpId::kUnknownTerm) {
576
436k
      tserver::UpdateTransactionRequestPB req;
577
436k
      req.set_tablet_id(data.status_tablet);
578
436k
      req.set_propagated_hybrid_time(participant_context_.Now().ToUint64());
579
436k
      auto& state = *req.mutable_state();
580
436k
      state.set_transaction_id(data.transaction_id.data(), data.transaction_id.size());
581
436k
      state.set_status(TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS);
582
436k
      state.add_tablets(participant_context_.tablet_id());
583
436k
      auto client_result = client();
584
436k
      if (!client_result.ok()) {
585
0
        LOG_WITH_PREFIX(WARNING) << "Get client failed: " << client_result.status();
586
0
        return;
587
0
      }
588
589
436k
      auto handle = rpcs_.Prepare();
590
436k
      if (handle != rpcs_.InvalidHandle()) {
591
434k
        *handle = UpdateTransaction(
592
434k
            TransactionRpcDeadline(),
593
434k
            nullptr /* remote_tablet */,
594
434k
            *client_result,
595
434k
            &req,
596
434k
            [this, handle](const Status& status,
597
434k
                           const tserver::UpdateTransactionRequestPB& req,
598
436k
                           const tserver::UpdateTransactionResponsePB& resp) {
599
436k
              client::UpdateClock(resp, &participant_context_);
600
436k
              rpcs_.Unregister(handle);
601
436k
              
LOG_IF_WITH_PREFIX14
(WARNING, !status.ok()) << "Failed to send applied: " << status14
;
602
436k
            });
603
434k
        (**handle).SendRpc();
604
434k
      }
605
436k
    }
606
1.30M
  }
607
608
926k
  CHECKED_STATUS ProcessCleanup(const TransactionApplyData& data, CleanupType cleanup_type) {
609
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(data) << ", " << AsString(cleanup_type);
610
611
926k
    loader_.WaitLoaded(data.transaction_id);
612
613
926k
    MinRunningNotifier min_running_notifier(&applier_);
614
926k
    std::lock_guard<std::mutex> lock(mutex_);
615
926k
    auto it = transactions_.find(data.transaction_id);
616
926k
    if (it == transactions_.end()) {
617
594k
      if (cleanup_type == CleanupType::kImmediate) {
618
563k
        cleanup_cache_.Insert(data.transaction_id);
619
563k
        return Status::OK();
620
563k
      }
621
594k
    } else 
if (332k
(**it).ProcessingApply()332k
) {
622
0
      VLOG_WITH_PREFIX(2) << "Don't cleanup transaction because it is applying intents: "
623
0
                          << data.transaction_id;
624
0
      return Status::OK();
625
0
    }
626
627
363k
    if (cleanup_type == CleanupType::kGraceful) {
628
248k
      graceful_cleanup_queue_.push_back(GracefulCleanupQueueEntry{
629
248k
        .request_id = request_serial_,
630
248k
        .transaction_id = data.transaction_id,
631
248k
        .reason = RemoveReason::kProcessCleanup,
632
248k
        .required_safe_time = participant_context_.Now(),
633
248k
      });
634
248k
      return Status::OK();
635
248k
    }
636
637
114k
    if (!RemoveUnlocked(it, RemoveReason::kProcessCleanup, &min_running_notifier)) {
638
31.1k
      
VLOG_WITH_PREFIX1
(2) << "Have added aborted txn to cleanup queue: "
639
1
                          << data.transaction_id;
640
31.1k
    }
641
642
114k
    return Status::OK();
643
363k
  }
644
645
  void SetDB(
646
      const docdb::DocDB& db, const docdb::KeyBounds* key_bounds,
647
145k
      RWOperationCounter* pending_op_counter) {
648
145k
    bool had_db = db_.intents != nullptr;
649
145k
    db_ = db;
650
145k
    key_bounds_ = key_bounds;
651
145k
    pending_op_counter_ = pending_op_counter;
652
653
    // We should only load transactions on the initial call to SetDB (when opening the tablet), not
654
    // in case of truncate/restore.
655
145k
    if (!had_db) {
656
56.9k
      loader_.Start(pending_op_counter, db_);
657
56.9k
      return;
658
56.9k
    }
659
660
88.4k
    loader_.WaitAllLoaded();
661
88.4k
    MinRunningNotifier min_running_notifier(&applier_);
662
88.4k
    std::lock_guard<std::mutex> lock(mutex_);
663
88.4k
    transactions_.clear();
664
88.4k
    TransactionsModifiedUnlocked(&min_running_notifier);
665
88.4k
  }
666
667
  void GetStatus(
668
      const TransactionId& transaction_id,
669
      size_t required_num_replicated_batches,
670
      int64_t term,
671
      tserver::GetTransactionStatusAtParticipantResponsePB* response,
672
0
      rpc::RpcContext* context) {
673
0
    std::lock_guard<std::mutex> lock(mutex_);
674
0
    auto it = transactions_.find(transaction_id);
675
0
    if (it == transactions_.end()) {
676
0
      response->set_num_replicated_batches(0);
677
0
      response->set_status_hybrid_time(0);
678
0
    } else {
679
0
      if ((**it).WasAborted()) {
680
0
        response->set_aborted(true);
681
0
        return;
682
0
      }
683
0
      response->set_num_replicated_batches((**it).num_replicated_batches());
684
0
      response->set_status_hybrid_time((**it).last_batch_data().hybrid_time.ToUint64());
685
0
    }
686
0
  }
687
688
48
  TransactionParticipantContext* participant_context() const {
689
48
    return &participant_context_;
690
48
  }
691
692
12.0M
  HybridTime MinRunningHybridTime() {
693
12.0M
    auto result = min_running_ht_.load(std::memory_order_acquire);
694
12.0M
    if (result == HybridTime::kMax || 
result == HybridTime::kInvalid7.63M
) {
695
4.43M
      return result;
696
4.43M
    }
697
7.62M
    auto now = CoarseMonoClock::now();
698
7.62M
    auto current_next_check_min_running = next_check_min_running_.load(std::memory_order_relaxed);
699
7.62M
    if (now >= current_next_check_min_running) {
700
30.8k
      if (next_check_min_running_.compare_exchange_strong(
701
30.8k
              current_next_check_min_running,
702
30.8k
              now + 1ms * FLAGS_transaction_min_running_check_interval_ms,
703
30.8k
              std::memory_order_acq_rel)) {
704
30.7k
        std::unique_lock<std::mutex> lock(mutex_);
705
30.7k
        if (transactions_.empty()) {
706
1
          return HybridTime::kMax;
707
1
        }
708
30.7k
        auto& first_txn = **transactions_.get<StartTimeTag>().begin();
709
30.7k
        
VLOG_WITH_PREFIX6
(1) << "Checking status of long running min txn " << first_txn.id()
710
6
                            << ": " << first_txn.WasAborted();
711
30.7k
        static const std::string kRequestReason = "min running check"s;
712
        // Get transaction status
713
30.7k
        auto now_ht = participant_context_.Now();
714
30.7k
        StatusRequest status_request = {
715
30.7k
            .id = &first_txn.id(),
716
30.7k
            .read_ht = now_ht,
717
30.7k
            .global_limit_ht = now_ht,
718
            // Could use 0 here, because read_ht == global_limit_ht.
719
            // So we cannot accept status with time >= read_ht and < global_limit_ht.
720
30.7k
            .serial_no = 0,
721
30.7k
            .reason = &kRequestReason,
722
30.7k
            .flags = TransactionLoadFlags{},
723
30.7k
            .callback = [this, id = first_txn.id()](Result<TransactionStatusResult> result) {
724
              // Aborted status will result in cleanup of intents.
725
30.7k
              
VLOG_WITH_PREFIX3
(1) << "Min running status " << id << ": " << result3
;
726
30.7k
            }
727
30.7k
        };
728
30.7k
        first_txn.RequestStatusAt(status_request, &lock);
729
30.7k
      }
730
30.8k
    }
731
7.62M
    return result;
732
7.62M
  }
733
734
3.53k
  void WaitMinRunningHybridTime(HybridTime ht) {
735
3.53k
    MinRunningNotifier min_running_notifier(&applier_);
736
3.53k
    std::unique_lock<std::mutex> lock(mutex_);
737
3.53k
    waiting_for_min_running_ht_ = ht;
738
3.53k
    CheckMinRunningHybridTimeSatisfiedUnlocked(&min_running_notifier);
739
3.53k
  }
740
741
15
  CHECKED_STATUS ResolveIntents(HybridTime resolve_at, CoarseTimePoint deadline) {
742
15
    RETURN_NOT_OK(WaitUntil(participant_context_.clock_ptr().get(), resolve_at, deadline));
743
744
15
    if (FLAGS_max_transactions_in_status_request == 0) {
745
0
      return STATUS(
746
0
          IllegalState,
747
0
          "Cannot resolve intents when FLAGS_max_transactions_in_status_request is zero");
748
0
    }
749
750
15
    std::vector<TransactionId> recheck_ids, committed_ids;
751
752
    // Maintain a set of transactions, check their statuses, and remove them as they get
753
    // committed/applied, aborted or we realize that transaction was not committed at
754
    // resolve_at.
755
15
    for (;;) {
756
15
      TransactionStatusResolver resolver(
757
15
          &participant_context_, &rpcs_, FLAGS_max_transactions_in_status_request,
758
15
          [this, resolve_at, &recheck_ids, &committed_ids](
759
15
              const std::vector <TransactionStatusInfo>& status_infos) {
760
0
            std::vector<TransactionId> aborted;
761
0
            for (const auto& info : status_infos) {
762
0
              VLOG_WITH_PREFIX(4) << "Transaction status: " << info.ToString();
763
0
              if (info.status == TransactionStatus::COMMITTED) {
764
0
                if (info.status_ht <= resolve_at) {
765
                  // Transaction was committed, but not yet applied.
766
                  // So rely on filtering recheck_ids before next phase.
767
0
                  committed_ids.push_back(info.transaction_id);
768
0
                }
769
0
              } else if (info.status == TransactionStatus::ABORTED) {
770
0
                aborted.push_back(info.transaction_id);
771
0
              } else {
772
0
                LOG_IF_WITH_PREFIX(DFATAL, info.status != TransactionStatus::PENDING)
773
0
                    << "Transaction is in unexpected state: " << info.ToString();
774
0
                if (info.status_ht <= resolve_at) {
775
0
                  recheck_ids.push_back(info.transaction_id);
776
0
                }
777
0
              }
778
0
            }
779
0
            if (!aborted.empty()) {
780
0
              MinRunningNotifier min_running_notifier(&applier_);
781
0
              std::lock_guard<std::mutex> lock(mutex_);
782
0
              for (const auto& id : aborted) {
783
0
                EnqueueRemoveUnlocked(id, RemoveReason::kStatusReceived, &min_running_notifier);
784
0
              }
785
0
            }
786
0
          });
787
15
      auto se = ScopeExit([&resolver] {
788
15
        resolver.Shutdown();
789
15
      });
790
15
      {
791
15
        std::lock_guard <std::mutex> lock(mutex_);
792
15
        if (recheck_ids.empty() && committed_ids.empty()) {
793
          // First step, check all transactions.
794
15
          for (const auto& transaction : transactions_) {
795
0
            if (!transaction->local_commit_time().is_valid()) {
796
0
              resolver.Add(transaction->metadata().status_tablet, transaction->id());
797
0
            }
798
0
          }
799
15
        } else {
800
0
          for (const auto& id : recheck_ids) {
801
0
            auto it = transactions_.find(id);
802
0
            if (it == transactions_.end() || (**it).local_commit_time().is_valid()) {
803
0
              continue;
804
0
            }
805
0
            resolver.Add((**it).metadata().status_tablet, id);
806
0
          }
807
0
          auto filter = [this](const TransactionId& id) {
808
0
            auto it = transactions_.find(id);
809
0
            return it == transactions_.end() || (**it).local_commit_time().is_valid();
810
0
          };
811
0
          committed_ids.erase(std::remove_if(committed_ids.begin(), committed_ids.end(), filter),
812
0
                              committed_ids.end());
813
0
        }
814
15
      }
815
816
15
      recheck_ids.clear();
817
15
      resolver.Start(deadline);
818
819
15
      RETURN_NOT_OK(resolver.ResultFuture().get());
820
821
15
      if (recheck_ids.empty()) {
822
15
        if (committed_ids.empty()) {
823
15
          break;
824
15
        } else {
825
          // We are waiting only for committed transactions to be applied.
826
          // So just add some delay.
827
0
          std::this_thread::sleep_for(10ms * std::min<size_t>(10, committed_ids.size()));
828
0
        }
829
15
      }
830
15
    }
831
832
15
    return Status::OK();
833
15
  }
834
835
0
  size_t TEST_GetNumRunningTransactions() {
836
0
    std::lock_guard<std::mutex> lock(mutex_);
837
0
    auto txn_to_id = [](const RunningTransactionPtr& txn) {
838
0
      return txn->id();
839
0
    };
840
0
    VLOG_WITH_PREFIX(4) << "Transactions: " << AsString(transactions_, txn_to_id)
841
0
                        << ", requests: " << AsString(running_requests_);
842
0
    return transactions_.size();
843
0
  }
844
845
0
  OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) {
846
0
    std::lock_guard<std::mutex> lock(mutex_);
847
0
    auto it = transactions_.find(id);
848
0
    return it != transactions_.end() ? (**it).replicated_batches() : OneWayBitmap();
849
0
  }
850
851
0
  std::string DumpTransactions() {
852
0
    std::string result;
853
0
    std::lock_guard<std::mutex> lock(mutex_);
854
855
0
    result += Format(
856
0
        "{ safe_time_for_participant: $0 remove_queue_size: $1 ",
857
0
        participant_context_.SafeTimeForTransactionParticipant(), remove_queue_.size());
858
0
    if (!remove_queue_.empty()) {
859
0
      result += "remove_queue_front: " + AsString(remove_queue_.front());
860
0
    }
861
0
    if (!running_requests_.empty()) {
862
0
      result += "running_requests_front: " + AsString(running_requests_.front());
863
0
    }
864
0
    result += "}\n";
865
866
0
    for (const auto& txn : transactions_.get<StartTimeTag>()) {
867
0
      result += txn->ToString();
868
0
      result += "\n";
869
0
    }
870
0
    return result;
871
0
  }
872
873
  CHECKED_STATUS StopActiveTxnsPriorTo(
874
36.4k
      HybridTime cutoff, CoarseTimePoint deadline, TransactionId* exclude_txn_id) {
875
36.4k
    vector<TransactionId> ids_to_abort;
876
36.4k
    {
877
36.4k
      std::lock_guard<std::mutex> lock(mutex_);
878
36.4k
      for (const auto& txn : transactions_.get<StartTimeTag>()) {
879
210
        if (txn->start_ht() > cutoff ||
880
210
            (exclude_txn_id != nullptr && 
txn->id() == *exclude_txn_id32
)) {
881
0
          break;
882
0
        }
883
210
        if (!txn->WasAborted()) {
884
205
          ids_to_abort.push_back(txn->id());
885
205
        }
886
210
      }
887
36.4k
    }
888
889
36.4k
    if (ids_to_abort.empty()) {
890
36.2k
      return Status::OK();
891
36.2k
    }
892
893
    // It is ok to attempt to abort txns that have committed. We don't care
894
    // if our request succeeds or not.
895
181
    CountDownLatch latch(ids_to_abort.size());
896
181
    std::atomic<bool> failed{false};
897
181
    Status return_status = Status::OK();
898
205
    for (const auto& id : ids_to_abort) {
899
205
      Abort(
900
205
          id, [this, id, &failed, &return_status, &latch](Result<TransactionStatusResult> result) {
901
205
            
VLOG_WITH_PREFIX0
(2) << "Aborting " << id << " got " << result0
;
902
205
            if (!result ||
903
205
                (result->status != TransactionStatus::COMMITTED && 
result->status != ABORTED148
)) {
904
0
              LOG(INFO) << "Could not abort " << id << " got " << result;
905
906
0
              bool expected = false;
907
0
              if (failed.compare_exchange_strong(expected, true)) {
908
0
                if (!result) {
909
0
                  return_status = result.status();
910
0
                } else {
911
0
                  return_status =
912
0
                      STATUS_FORMAT(IllegalState, "Wrong status after abort: $0", result->status);
913
0
                }
914
0
              }
915
0
            }
916
205
            latch.CountDown();
917
205
          });
918
205
    }
919
920
181
    return latch.WaitUntil(deadline) ? 
return_status151
921
181
                                     : 
STATUS30
(TimedOut, "TimedOut while aborting old transactions");
922
36.4k
  }
923
924
26.6k
  Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) {
925
26.6k
    return participant_context_.WaitForSafeTime(safe_time, deadline);
926
26.6k
  }
927
928
0
  void IgnoreAllTransactionsStartedBefore(HybridTime limit) {
929
0
    std::lock_guard<std::mutex> lock(mutex_);
930
0
    ignore_all_transactions_started_before_ =
931
0
        std::max(ignore_all_transactions_started_before_, limit);
932
0
  }
933
934
 private:
935
  class AbortCheckTimeTag;
936
  class StartTimeTag;
937
938
  typedef boost::multi_index_container<RunningTransactionPtr,
939
      boost::multi_index::indexed_by <
940
          boost::multi_index::hashed_unique <
941
              boost::multi_index::const_mem_fun <
942
                  RunningTransaction, const TransactionId&, &RunningTransaction::id>
943
          >,
944
          boost::multi_index::ordered_non_unique <
945
              boost::multi_index::tag<StartTimeTag>,
946
              boost::multi_index::const_mem_fun <
947
                  RunningTransaction, HybridTime, &RunningTransaction::start_ht>
948
          >,
949
          boost::multi_index::ordered_non_unique <
950
              boost::multi_index::tag<AbortCheckTimeTag>,
951
              boost::multi_index::const_mem_fun <
952
                  RunningTransaction, HybridTime, &RunningTransaction::abort_check_ht>
953
          >
954
      >
955
  > Transactions;
956
957
56.9k
  void CompleteLoad(const std::function<void()>& functor) override {
958
56.9k
    MinRunningNotifier min_running_notifier(&applier_);
959
56.9k
    std::lock_guard<std::mutex> lock(mutex_);
960
56.9k
    functor();
961
56.9k
    TransactionsModifiedUnlocked(&min_running_notifier);
962
56.9k
  }
963
964
56.9k
  void LoadFinished(const ApplyStatesMap& pending_applies) override {
965
56.9k
    start_latch_.Wait();
966
56.9k
    std::vector<ScopedRWOperation> operations;
967
56.9k
    operations.reserve(pending_applies.size());
968
56.9k
    for (;;) {
969
56.9k
      if (closing_.load(std::memory_order_acquire)) {
970
0
        LOG_WITH_PREFIX(INFO)
971
0
            << __func__ << ": closing, not starting transaction status resolution";
972
0
        return;
973
0
      }
974
56.9k
      while (operations.size() < pending_applies.size()) {
975
0
        ScopedRWOperation operation(pending_op_counter_);
976
0
        if (!operation.ok()) {
977
0
          break;
978
0
        }
979
0
        operations.push_back(std::move(operation));
980
0
      }
981
56.9k
      if (operations.size() == pending_applies.size()) {
982
56.9k
        break;
983
56.9k
      }
984
6
      operations.clear();
985
6
      
YB_LOG_WITH_PREFIX_EVERY_N_SECS0
(INFO, 5)
986
0
          << __func__ << ": unable to start scoped RW operation";
987
6
      std::this_thread::sleep_for(10ms);
988
6
    }
989
990
56.9k
    if (!pending_applies.empty()) {
991
0
      LOG_WITH_PREFIX(INFO)
992
0
          << __func__ << ": starting " << pending_applies.size() << " pending applies";
993
0
      std::lock_guard<std::mutex> lock(mutex_);
994
0
      size_t idx = 0;
995
0
      for (const auto& p : pending_applies) {
996
0
        auto it = transactions_.find(p.first);
997
0
        if (it == transactions_.end()) {
998
0
          LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first);
999
0
          continue;
1000
0
        }
1001
1002
0
        TransactionApplyData apply_data;
1003
0
        apply_data.transaction_id = p.first;
1004
0
        apply_data.commit_ht = p.second.commit_ht;
1005
0
        (**it).SetApplyData(p.second.state, &apply_data, &operations[idx]);
1006
0
        ++idx;
1007
0
      }
1008
0
    }
1009
1010
56.9k
    {
1011
56.9k
      LOG_WITH_PREFIX(INFO) << __func__ << ": starting transaction status resolution";
1012
56.9k
      std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
1013
56.9k
      for (auto& status_resolver : status_resolvers_) {
1014
4
        status_resolver.Start(CoarseTimePoint::max());
1015
4
      }
1016
56.9k
    }
1017
1018
56.9k
    poller_.Start(
1019
56.9k
        &participant_context_.scheduler(), 1ms * FLAGS_transactions_status_poll_interval_ms);
1020
56.9k
  }
1021
1022
3.53M
  void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
1023
3.53M
    metric_transactions_running_->set_value(transactions_.size());
1024
3.53M
    if (!loader_.complete()) {
1025
4
      return;
1026
4
    }
1027
1028
3.53M
    if (transactions_.empty()) {
1029
825k
      min_running_ht_.store(HybridTime::kMax, std::memory_order_release);
1030
825k
      CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier);
1031
825k
      return;
1032
825k
    }
1033
1034
2.70M
    auto& first_txn = **transactions_.get<StartTimeTag>().begin();
1035
2.70M
    if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) {
1036
1.22M
      min_running_ht_.store(first_txn.start_ht(), std::memory_order_release);
1037
1.22M
      next_check_min_running_.store(
1038
1.22M
          CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms,
1039
1.22M
          std::memory_order_release);
1040
1.22M
      CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier);
1041
1.22M
      return;
1042
1.22M
    }
1043
2.70M
  }
1044
1045
  void EnqueueRemoveUnlocked(
1046
      const TransactionId& id, RemoveReason reason,
1047
76.0k
      MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) override {
1048
76.0k
    auto now = participant_context_.Now();
1049
76.0k
    
VLOG_WITH_PREFIX_AND_FUNC0
(4) << id << " at " << now << ", reason: " << AsString(reason)0
;
1050
76.0k
    remove_queue_.emplace_back(RemoveQueueEntry{
1051
76.0k
      .id = id,
1052
76.0k
      .time = now,
1053
76.0k
      .reason = reason,
1054
76.0k
    });
1055
76.0k
    ProcessRemoveQueueUnlocked(min_running_notifier);
1056
76.0k
  }
1057
1058
14.3M
  void ProcessRemoveQueueUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
1059
14.3M
    if (!remove_queue_.empty()) {
1060
      // When a transaction participant receives an "aborted" response from the coordinator,
1061
      // it puts this transaction into a "remove queue", also storing the current hybrid
1062
      // time. Then queue entries where time is less than current safe time are removed.
1063
      //
1064
      // This is correct because, from a transaction participant's point of view:
1065
      //
1066
      // (1) After we receive a response for a transaction status request, and
1067
      // learn that the transaction is unknown to the coordinator, our local
1068
      // hybrid time is at least as high as the local hybrid time on the
1069
      // transaction status coordinator at the time the transaction was deleted
1070
      // from the coordinator, due to hybrid time propagation on RPC response.
1071
      //
1072
      // (2) If our safe time is greater than the hybrid time when the
1073
      // transaction was deleted from the coordinator, then we have already
1074
      // applied this transaction's provisional records if the transaction was
1075
      // committed.
1076
153k
      auto safe_time = participant_context_.SafeTimeForTransactionParticipant();
1077
153k
      if (!safe_time.is_valid()) {
1078
0
        VLOG_WITH_PREFIX(3) << "Unable to obtain safe time to check remove queue";
1079
0
        return;
1080
0
      }
1081
153k
      
VLOG_WITH_PREFIX2
(3) << "Checking remove queue: " << safe_time << ", "
1082
2
                          << remove_queue_.front().ToString();
1083
153k
      
LOG_IF_WITH_PREFIX4
(DFATAL, safe_time < last_safe_time_)
1084
4
          << "Safe time decreased: " << safe_time << " vs " << last_safe_time_;
1085
153k
      last_safe_time_ = safe_time;
1086
229k
      while (!remove_queue_.empty()) {
1087
156k
        auto& front = remove_queue_.front();
1088
156k
        auto it = transactions_.find(front.id);
1089
156k
        if (it == transactions_.end() || 
(**it).local_commit_time().is_valid()154k
) {
1090
          // It is regular case, since the coordinator returns ABORTED for already applied
1091
          // transaction. But this particular tablet could not yet apply it, so
1092
          // it would add such transaction to remove queue.
1093
          // And it is the main reason why we are waiting for safe time, before removing intents.
1094
2.29k
          
VLOG_WITH_PREFIX0
(4) << "Evicting txn from remove queue, w/o removing intents: "
1095
0
                              << front.ToString();
1096
2.29k
          remove_queue_.pop_front();
1097
2.29k
          continue;
1098
2.29k
        }
1099
154k
        if (safe_time <= front.time) {
1100
80.6k
          break;
1101
80.6k
        }
1102
18.4E
        VLOG_WITH_PREFIX(4) << "Removing from remove queue: " << front.ToString();
1103
73.5k
        RemoveUnlocked(front.id, front.reason, min_running_notifier);
1104
73.5k
        remove_queue_.pop_front();
1105
73.5k
      }
1106
153k
    }
1107
14.3M
  }
1108
1109
  // Tries to remove transaction with specified id.
1110
  // Returns true if transaction is not exists after call to this method, otherwise returns false.
1111
  // Which means that transaction will be removed later.
1112
  bool RemoveUnlocked(
1113
      const TransactionId& id, RemoveReason reason,
1114
73.6k
      MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) override {
1115
73.6k
    auto it = transactions_.find(id);
1116
73.6k
    if (it == transactions_.end()) {
1117
0
      return true;
1118
0
    }
1119
73.6k
    return RemoveUnlocked(it, reason, min_running_notifier);
1120
73.6k
  }
1121
1122
  bool RemoveUnlocked(
1123
      const Transactions::iterator& it, RemoveReason reason,
1124
1.49M
      MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
1125
1.49M
    TransactionId txn_id = (**it).id();
1126
1.49M
    RemoveIntentsData checkpoint;
1127
1.49M
    auto itr = transactions_.find(txn_id);
1128
1.49M
    OpId op_id = (**itr).GetOpId();
1129
1.49M
    participant_context_.GetLastCDCedData(&checkpoint);
1130
1131
1.49M
    
VLOG_WITH_PREFIX818
(2) << "Cleaning tx, data opid is " << op_id.ToString()
1132
818
              << " checkpoint opid is " << checkpoint.op_id.ToString();
1133
1134
1.49M
    if (running_requests_.empty() &&
1135
1.49M
        
(op_id < checkpoint.op_id)1.25M
) {
1136
1.25M
      (**it).ScheduleRemoveIntents(*it);
1137
1.25M
      RemoveTransaction(it, reason, min_running_notifier);
1138
18.4E
      VLOG_WITH_PREFIX(2) << "Cleaned transaction: " << txn_id << ", reason: " << reason
1139
18.4E
                          << ", left: " << transactions_.size();
1140
1.25M
      return true;
1141
1.25M
    }
1142
1143
    // We cannot remove the transaction at this point, because there are running requests
1144
    // that are reading the provisional DB and could request status of this transaction.
1145
    // So we store transaction in a queue and wait when all requests that we launched before our
1146
    // attempt to remove this transaction are completed.
1147
    // Since we try to remove the transaction after all its records are removed from the provisional
1148
    // DB, it is safe to complete removal at this point, because it means that there will be no more
1149
    // queries to status of this transactions.
1150
241k
    immediate_cleanup_queue_.push_back(ImmediateCleanupQueueEntry{
1151
241k
      .request_id = request_serial_,
1152
241k
      .transaction_id = (**it).id(),
1153
241k
      .reason = reason,
1154
241k
    });
1155
241k
    
VLOG_WITH_PREFIX1.55k
(2) << "Queued for cleanup: " << (**it).id() << ", reason: " << reason1.55k
;
1156
241k
    return false;
1157
1.49M
  }
1158
1159
  struct LockAndFindResult {
1160
18.5k
    static Transactions::const_iterator UninitializedIterator() {
1161
18.5k
      static const Transactions empty_transactions;
1162
18.5k
      return empty_transactions.end();
1163
18.5k
    }
1164
1165
    std::unique_lock<std::mutex> lock;
1166
    Transactions::const_iterator iterator = UninitializedIterator();
1167
    bool recently_removed = false;
1168
1169
6.78M
    bool found() const {
1170
6.78M
      return lock.owns_lock();
1171
6.78M
    }
1172
1173
7.80M
    RunningTransaction& transaction() const {
1174
7.80M
      return **iterator;
1175
7.80M
    }
1176
  };
1177
1178
  LockAndFindResult LockAndFind(
1179
6.77M
      const TransactionId& id, const std::string& reason, TransactionLoadFlags flags) {
1180
6.77M
    loader_.WaitLoaded(id);
1181
6.77M
    bool recently_removed;
1182
6.77M
    {
1183
6.77M
      std::unique_lock<std::mutex> lock(mutex_);
1184
6.77M
      auto it = transactions_.find(id);
1185
6.77M
      if (it != transactions_.end()) {
1186
6.76M
        if ((**it).start_ht() <= ignore_all_transactions_started_before_) {
1187
0
          YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 1)
1188
0
              << "Ignore transaction for '" << reason << "' because of limit: "
1189
0
              << ignore_all_transactions_started_before_ << ", txn: " << AsString(**it);
1190
0
          return LockAndFindResult{};
1191
0
        }
1192
6.76M
        return LockAndFindResult{ std::move(lock), it };
1193
6.76M
      }
1194
15.8k
      recently_removed = WasTransactionRecentlyRemoved(id);
1195
15.8k
    }
1196
16.9k
    if (
recently_removed15.8k
) {
1197
16.9k
      
VLOG_WITH_PREFIX0
(1)
1198
0
          << "Attempt to load recently removed transaction: " << id << ", for: " << reason;
1199
16.9k
      LockAndFindResult result;
1200
16.9k
      result.recently_removed = true;
1201
16.9k
      return result;
1202
16.9k
    }
1203
18.4E
    metric_transaction_not_found_->Increment();
1204
18.4E
    if (flags.Test(TransactionLoadFlag::kMustExist)) {
1205
1.59k
      
YB_LOG_WITH_PREFIX_EVERY_N_SECS107
(WARNING, 1)
1206
107
          << "Transaction not found: " << id << ", for: " << reason;
1207
18.4E
    } else {
1208
18.4E
      
YB_LOG_WITH_PREFIX_EVERY_N_SECS0
(INFO, 1)
1209
0
          << "Transaction not found: " << id << ", for: " << reason;
1210
18.4E
    }
1211
18.4E
    if (flags.Test(TransactionLoadFlag::kCleanup)) {
1212
0
      VLOG_WITH_PREFIX(2) << "Schedule cleanup for: " << id;
1213
0
      auto cleanup_task = std::make_shared<CleanupIntentsTask>(
1214
0
          &participant_context_, &applier_, id);
1215
0
      cleanup_task->Prepare(cleanup_task);
1216
0
      participant_context_.StrandEnqueue(cleanup_task.get());
1217
0
    }
1218
18.4E
    return LockAndFindResult{};
1219
15.8k
  }
1220
1221
  void LoadTransaction(
1222
      TransactionMetadata&& metadata,
1223
      TransactionalBatchData&& last_batch_data,
1224
      OneWayBitmap&& replicated_batches,
1225
4
      const ApplyStateWithCommitHt* pending_apply) override {
1226
4
    MinRunningNotifier min_running_notifier(&applier_);
1227
4
    std::lock_guard<std::mutex> lock(mutex_);
1228
4
    auto txn = std::make_shared<RunningTransaction>(
1229
4
        std::move(metadata), std::move(last_batch_data), std::move(replicated_batches),
1230
4
        participant_context_.Now().AddDelta(1ms * FLAGS_transaction_abort_check_timeout_ms), this);
1231
4
    if (pending_apply) {
1232
0
      VLOG_WITH_PREFIX(4) << "Apply state found for " << txn->id() << ": "
1233
0
                          << pending_apply->ToString();
1234
0
      txn->SetLocalCommitData(pending_apply->commit_ht, pending_apply->state.aborted);
1235
0
      txn->SetApplyData(pending_apply->state);
1236
0
    }
1237
4
    transactions_.insert(txn);
1238
4
    TransactionsModifiedUnlocked(&min_running_notifier);
1239
4
  }
1240
1241
492k
  Result<client::YBClient*> client() const {
1242
492k
    auto cached_value = client_cache_.load(std::memory_order_acquire);
1243
492k
    if (cached_value != nullptr) {
1244
486k
      return cached_value;
1245
486k
    }
1246
5.67k
    auto future_status = participant_context_.client_future().wait_for(
1247
5.67k
        TransactionRpcTimeout().ToSteadyDuration());
1248
5.67k
    if (future_status != std::future_status::ready) {
1249
0
      return STATUS(TimedOut, "Client not ready");
1250
0
    }
1251
5.67k
    auto result = participant_context_.client_future().get();
1252
5.67k
    client_cache_.store(result, std::memory_order_release);
1253
5.67k
    return result;
1254
5.67k
  }
1255
1256
329k
  const std::string& LogPrefix() const override {
1257
329k
    return log_prefix_;
1258
329k
  }
1259
1260
  void RemoveTransaction(Transactions::iterator it, RemoveReason reason,
1261
                         MinRunningNotifier* min_running_notifier)
1262
1.67M
      REQUIRES(mutex_) {
1263
1.67M
    auto now = CoarseMonoClock::now();
1264
1.67M
    CleanupRecentlyRemovedTransactions(now);
1265
1.67M
    auto& transaction = **it;
1266
1.67M
    YB_TRANSACTION_DUMP(
1267
1.67M
        Remove, participant_context_.tablet_id(), transaction.id(), participant_context_.Now(),
1268
1.67M
        static_cast<uint8_t>(reason));
1269
1.67M
    recently_removed_transactions_cleanup_queue_.push_back({transaction.id(), now + 15s});
1270
1.67M
    
LOG_IF_WITH_PREFIX473
(DFATAL, !recently_removed_transactions_.insert(transaction.id()).second)
1271
473
        << "Transaction removed twice: " << transaction.id();
1272
1.67M
    
VLOG_WITH_PREFIX984
(4) << "Remove transaction: " << transaction.id()984
;
1273
1.67M
    transactions_.erase(it);
1274
1.67M
    TransactionsModifiedUnlocked(min_running_notifier);
1275
1.67M
  }
1276
1277
4.23M
  void CleanupRecentlyRemovedTransactions(CoarseTimePoint now) {
1278
5.38M
    while (!recently_removed_transactions_cleanup_queue_.empty() &&
1279
5.38M
           
recently_removed_transactions_cleanup_queue_.front().time <= now5.33M
) {
1280
1.14M
      recently_removed_transactions_.erase(recently_removed_transactions_cleanup_queue_.front().id);
1281
1.14M
      recently_removed_transactions_cleanup_queue_.pop_front();
1282
1.14M
    }
1283
4.23M
  }
1284
1285
2.56M
  bool WasTransactionRecentlyRemoved(const TransactionId& id) {
1286
2.56M
    CleanupRecentlyRemovedTransactions(CoarseMonoClock::now());
1287
2.56M
    return recently_removed_transactions_.count(id) != 0;
1288
2.56M
  }
1289
1290
  void CheckMinRunningHybridTimeSatisfiedUnlocked(
1291
2.05M
      MinRunningNotifier* min_running_notifier) {
1292
2.05M
    if (
min_running_ht_.load(std::memory_order_acquire) <= waiting_for_min_running_ht_2.05M
) {
1293
2.05M
      return;
1294
2.05M
    }
1295
18.4E
    waiting_for_min_running_ht_ = HybridTime::kMax;
1296
18.4E
    min_running_notifier->Satisfied();
1297
18.4E
  }
1298
1299
  void TransactionsStatus(
1300
1.70k
      const std::vector<TransactionStatusInfo>& status_infos) {
1301
1.70k
    MinRunningNotifier min_running_notifier(&applier_);
1302
1.70k
    std::lock_guard<std::mutex> lock(mutex_);
1303
1.70k
    HybridTime now = participant_context_.Now();
1304
1.74k
    for (const auto& info : status_infos) {
1305
1.74k
      auto it = transactions_.find(info.transaction_id);
1306
1.74k
      if (it == transactions_.end()) {
1307
8
        continue;
1308
8
      }
1309
1.73k
      if ((**it).UpdateStatus(
1310
1.73k
          info.status, info.status_ht, info.coordinator_safe_time, info.aborted_subtxn_set)) {
1311
58
        EnqueueRemoveUnlocked(
1312
58
            info.transaction_id, RemoveReason::kStatusReceived, &min_running_notifier);
1313
1.67k
      } else {
1314
1.67k
        transactions_.modify(it, [now](const auto& txn) {
1315
1.67k
          txn->UpdateAbortCheckHT(now, UpdateAbortCheckHTMode::kStatusResponseReceived);
1316
1.67k
        });
1317
1.67k
      }
1318
1.73k
    }
1319
1.70k
  }
1320
1321
435k
  void HandleApplying(std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term) {
1322
435k
    if (RandomActWithProbability(GetAtomicFlag(
1323
435k
        &FLAGS_TEST_transaction_ignore_applying_probability))) {
1324
0
      VLOG_WITH_PREFIX(2)
1325
0
          << "TEST: Rejected apply: "
1326
0
          << FullyDecodeTransactionId(operation->request()->transaction_id());
1327
0
      operation->CompleteWithStatus(Status::OK());
1328
0
      return;
1329
0
    }
1330
435k
    participant_context_.SubmitUpdateTransaction(std::move(operation), term);
1331
435k
  }
1332
1333
  void HandleCleanup(
1334
      std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term,
1335
926k
      CleanupType cleanup_type) {
1336
926k
    
VLOG_WITH_PREFIX330
(3) << "Cleanup"330
;
1337
926k
    auto id = FullyDecodeTransactionId(operation->request()->transaction_id());
1338
926k
    if (!id.ok()) {
1339
0
      operation->CompleteWithStatus(id.status());
1340
0
      return;
1341
0
    }
1342
1343
926k
    TransactionApplyData data = {
1344
926k
        .leader_term = term,
1345
926k
        .transaction_id = *id,
1346
926k
        .aborted = AbortedSubTransactionSet(),
1347
926k
        .op_id = OpId(),
1348
926k
        .commit_ht = HybridTime(),
1349
926k
        .log_ht = HybridTime(),
1350
926k
        .sealed = operation->request()->sealed(),
1351
926k
        .status_tablet = std::string()
1352
926k
    };
1353
926k
    WARN_NOT_OK(ProcessCleanup(data, cleanup_type), "Process cleanup failed");
1354
926k
    operation->CompleteWithStatus(Status::OK());
1355
926k
  }
1356
1357
1.30M
  CHECKED_STATUS ReplicatedApplying(const TransactionId& id, const ReplicatedData& data) {
1358
    // data.state.tablets contains only status tablet.
1359
1.30M
    if (data.state.tablets_size() != 1) {
1360
0
      return STATUS_FORMAT(InvalidArgument,
1361
0
                           "Expected only one table during APPLYING, state received: $0",
1362
0
                           data.state);
1363
0
    }
1364
1.30M
    HybridTime commit_time(data.state.commit_hybrid_time());
1365
1.30M
    TransactionApplyData apply_data = {
1366
1.30M
        .leader_term = data.leader_term,
1367
1.30M
        .transaction_id = id,
1368
1.30M
        .aborted = VERIFY_RESULT(AbortedSubTransactionSet::FromPB(data.state.aborted().set())),
1369
0
        .op_id = data.op_id,
1370
1.30M
        .commit_ht = commit_time,
1371
1.30M
        .log_ht = data.hybrid_time,
1372
1.30M
        .sealed = data.sealed,
1373
1.30M
        .status_tablet = data.state.tablets(0)
1374
1.30M
      };
1375
1.30M
    if (!data.already_applied_to_regular_db) {
1376
1.30M
      return ProcessApply(apply_data);
1377
1.30M
    }
1378
1.21k
    if (!data.sealed) {
1379
0
      return ProcessCleanup(apply_data, CleanupType::kImmediate);
1380
0
    }
1381
1.21k
    return Status::OK();
1382
1.21k
  }
1383
1384
0
  CHECKED_STATUS ReplicatedAborted(const TransactionId& id, const ReplicatedData& data) {
1385
0
    MinRunningNotifier min_running_notifier(&applier_);
1386
0
    std::lock_guard<std::mutex> lock(mutex_);
1387
0
    auto it = transactions_.find(id);
1388
0
    if (it == transactions_.end()) {
1389
0
      TransactionMetadata metadata = {
1390
0
        .transaction_id = id,
1391
0
        .isolation = IsolationLevel::NON_TRANSACTIONAL,
1392
0
        .status_tablet = TabletId(),
1393
0
        .priority = 0
1394
0
      };
1395
0
      it = transactions_.insert(std::make_shared<RunningTransaction>(
1396
0
          metadata, TransactionalBatchData(), OneWayBitmap(), HybridTime::kMax, this)).first;
1397
0
      TransactionsModifiedUnlocked(&min_running_notifier);
1398
0
    }
1399
1400
    // TODO(dtxn) store this fact to rocksdb.
1401
0
    (**it).Aborted();
1402
1403
0
    return Status::OK();
1404
0
  }
1405
1406
10.0M
  void Poll() {
1407
10.0M
    {
1408
10.0M
      MinRunningNotifier min_running_notifier(&applier_);
1409
10.0M
      std::lock_guard<std::mutex> lock(mutex_);
1410
1411
10.0M
      ProcessRemoveQueueUnlocked(&min_running_notifier);
1412
10.0M
      if (ANNOTATE_UNPROTECTED_READ(FLAGS_transactions_poll_check_aborted)) {
1413
9.94M
        CheckForAbortedTransactions();
1414
9.94M
      }
1415
10.0M
      CleanTransactionsQueue(&graceful_cleanup_queue_, &min_running_notifier);
1416
10.0M
    }
1417
10.0M
    CleanupStatusResolvers();
1418
10.0M
  }
1419
1420
9.97M
  void CheckForAbortedTransactions() REQUIRES(mutex_) {
1421
9.97M
    if (transactions_.empty()) {
1422
9.92M
      return;
1423
9.92M
    }
1424
51.7k
    auto now = participant_context_.Now();
1425
51.7k
    auto& index = transactions_.get<AbortCheckTimeTag>();
1426
51.7k
    TransactionStatusResolver* resolver = nullptr;
1427
82.8k
    for (;;) {
1428
82.8k
      auto& txn = **index.begin();
1429
82.8k
      if (txn.abort_check_ht() > now) {
1430
80.1k
        break;
1431
80.1k
      }
1432
2.66k
      if (!resolver) {
1433
1.68k
        resolver = &AddStatusResolver();
1434
1.68k
      }
1435
2.66k
      const auto& metadata = txn.metadata();
1436
2.66k
      
VLOG_WITH_PREFIX947
(4)
1437
947
          << "Check aborted: " << metadata.status_tablet << ", " << metadata.transaction_id;
1438
2.66k
      resolver->Add(metadata.status_tablet, metadata.transaction_id);
1439
2.66k
      index.modify(index.begin(), [now](const auto& txn) {
1440
1.73k
        txn->UpdateAbortCheckHT(now, UpdateAbortCheckHTMode::kStatusRequestSent);
1441
1.73k
      });
1442
2.66k
    }
1443
1444
    // We don't introduce limit on number of status resolutions here, because we cannot predict
1445
    // transactions throughput. And we rely the logic that we cannot start multiple resolutions
1446
    // for single transaction because we set abort check hybrid time to the same value as
1447
    // status resolution deadline.
1448
51.7k
    if (resolver) {
1449
1.64k
      resolver->Start(CoarseMonoClock::now() + 1ms * FLAGS_transaction_abort_check_timeout_ms);
1450
1.64k
    }
1451
51.7k
  }
1452
1453
9.94M
  void CleanupStatusResolvers() EXCLUDES(status_resolvers_mutex_) {
1454
9.94M
    std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
1455
9.94M
    while (!status_resolvers_.empty() && 
!status_resolvers_.front().Running()3.45k
) {
1456
1.66k
      status_resolvers_.front().Shutdown();
1457
1.66k
      status_resolvers_.pop_front();
1458
1.66k
    }
1459
9.94M
  }
1460
1461
1.70k
  TransactionStatusResolver& AddStatusResolver() override EXCLUDES(status_resolvers_mutex_) {
1462
1.70k
    std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
1463
1.70k
    status_resolvers_.emplace_back(
1464
1.70k
        &participant_context_, &rpcs_, FLAGS_max_transactions_in_status_request,
1465
1.70k
        std::bind(&Impl::TransactionsStatus, this, _1));
1466
1.70k
    return status_resolvers_.back();
1467
1.70k
  }
1468
1469
  struct ImmediateCleanupQueueEntry {
1470
    int64_t request_id;
1471
    TransactionId transaction_id;
1472
    RemoveReason reason;
1473
1474
239k
    bool Ready(TransactionParticipantContext* participant_context, HybridTime* safe_time) const {
1475
239k
      return true;
1476
239k
    }
1477
  };
1478
1479
  struct GracefulCleanupQueueEntry {
1480
    int64_t request_id;
1481
    TransactionId transaction_id;
1482
    RemoveReason reason;
1483
    HybridTime required_safe_time;
1484
1485
366k
    bool Ready(TransactionParticipantContext* participant_context, HybridTime* safe_time) const {
1486
366k
      if (!*safe_time) {
1487
323k
        *safe_time = participant_context->SafeTimeForTransactionParticipant();
1488
323k
      }
1489
366k
      return *safe_time >= required_safe_time;
1490
366k
    }
1491
  };
1492
1493
  std::string log_prefix_;
1494
1495
  docdb::DocDB db_;
1496
  const docdb::KeyBounds* key_bounds_;
1497
  // Owned externally, should be guaranteed that would not be destroyed before this.
1498
  RWOperationCounter* pending_op_counter_ = nullptr;
1499
1500
  Transactions transactions_;
1501
  // Ids of running requests, stored in increasing order.
1502
  std::deque<int64_t> running_requests_;
1503
  // Ids of complete requests, minimal request is on top.
1504
  // Contains only ids greater than first running request id, otherwise entry is removed
1505
  // from both collections.
1506
  std::priority_queue<int64_t, std::vector<int64_t>, std::greater<void>> complete_requests_;
1507
1508
  // Queues of transaction ids that should be cleaned, paired with request that should be completed
1509
  // in order to be able to do clean.
1510
  // Immediate cleanup is performed as soon as possible.
1511
  // Graceful cleanup is performed after safe time becomes greater than cleanup request hybrid time.
1512
  std::deque<ImmediateCleanupQueueEntry> immediate_cleanup_queue_ GUARDED_BY(mutex_);
1513
  std::deque<GracefulCleanupQueueEntry> graceful_cleanup_queue_ GUARDED_BY(mutex_);
1514
1515
  // Remove queue maintains transactions that could be cleaned when safe time for follower reaches
1516
  // appropriate time for an entry.
1517
  // Since we add entries with increasing time, this queue is ordered by time.
1518
  struct RemoveQueueEntry {
1519
    TransactionId id;
1520
    HybridTime time;
1521
    RemoveReason reason;
1522
1523
0
    std::string ToString() const {
1524
0
      return YB_STRUCT_TO_STRING(id, time, reason);
1525
0
    }
1526
  };
1527
1528
  // Guarded by RunningTransactionContext::mutex_
1529
  std::deque<RemoveQueueEntry> remove_queue_;
1530
1531
  // Guarded by RunningTransactionContext::mutex_
1532
  HybridTime last_safe_time_ = HybridTime::kMin;
1533
1534
  HybridTime ignore_all_transactions_started_before_ GUARDED_BY(mutex_) = HybridTime::kMin;
1535
1536
  std::unordered_set<TransactionId, TransactionIdHash> recently_removed_transactions_;
1537
  struct RecentlyRemovedTransaction {
1538
    TransactionId id;
1539
    CoarseTimePoint time;
1540
  };
1541
  std::deque<RecentlyRemovedTransaction> recently_removed_transactions_cleanup_queue_;
1542
1543
  std::mutex status_resolvers_mutex_;
1544
  std::deque<TransactionStatusResolver> status_resolvers_ GUARDED_BY(status_resolvers_mutex_);
1545
1546
  scoped_refptr<AtomicGauge<uint64_t>> metric_transactions_running_;
1547
  scoped_refptr<Counter> metric_transaction_not_found_;
1548
1549
  TransactionLoader loader_;
1550
  std::atomic<bool> closing_{false};
1551
  CountDownLatch start_latch_{1};
1552
1553
  std::atomic<HybridTime> min_running_ht_{HybridTime::kInvalid};
1554
  std::atomic<CoarseTimePoint> next_check_min_running_{CoarseTimePoint()};
1555
  HybridTime waiting_for_min_running_ht_ = HybridTime::kMax;
1556
  std::atomic<bool> shutdown_done_{false};
1557
1558
  mutable std::atomic<client::YBClient*> client_cache_{nullptr};
1559
1560
  LRUCache<TransactionId> cleanup_cache_{FLAGS_transactions_cleanup_cache_size};
1561
1562
  rpc::Poller poller_;
1563
};
1564
1565
TransactionParticipant::TransactionParticipant(
1566
    TransactionParticipantContext* context, TransactionIntentApplier* applier,
1567
    const scoped_refptr<MetricEntity>& entity)
1568
56.9k
    : impl_(new Impl(context, applier, entity)) {
1569
56.9k
}
1570
1571
44.6k
TransactionParticipant::~TransactionParticipant() {
1572
44.6k
}
1573
1574
56.9k
void TransactionParticipant::Start() {
1575
56.9k
  impl_->Start();
1576
56.9k
}
1577
1578
1.80M
Result<bool> TransactionParticipant::Add(const TransactionMetadata& metadata) {
1579
1.80M
  return impl_->Add(metadata);
1580
1.80M
}
1581
1582
Result<TransactionMetadata> TransactionParticipant::PrepareMetadata(
1583
1.60M
    const TransactionMetadataPB& pb) {
1584
1.60M
  return impl_->PrepareMetadata(pb);
1585
1.60M
}
1586
1587
boost::optional<std::pair<IsolationLevel, TransactionalBatchData>>
1588
    TransactionParticipant::PrepareBatchData(
1589
    const TransactionId& id, size_t batch_idx,
1590
2.31M
    boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
1591
2.31M
  return impl_->PrepareBatchData(id, batch_idx, encoded_replicated_batches);
1592
2.31M
}
1593
1594
void TransactionParticipant::BatchReplicated(
1595
2.31M
    const TransactionId& id, const TransactionalBatchData& data) {
1596
2.31M
  return impl_->BatchReplicated(id, data);
1597
2.31M
}
1598
1599
534k
HybridTime TransactionParticipant::LocalCommitTime(const TransactionId& id) {
1600
534k
  return impl_->LocalCommitTime(id);
1601
534k
}
1602
1603
219k
boost::optional<CommitMetadata> TransactionParticipant::LocalCommitData(const TransactionId& id) {
1604
219k
  return impl_->LocalCommitData(id);
1605
219k
}
1606
1607
0
std::pair<size_t, size_t> TransactionParticipant::TEST_CountIntents() const {
1608
0
  return impl_->TEST_CountIntents();
1609
0
}
1610
1611
388k
void TransactionParticipant::RequestStatusAt(const StatusRequest& request) {
1612
388k
  return impl_->RequestStatusAt(request);
1613
388k
}
1614
1615
6.71M
int64_t TransactionParticipant::RegisterRequest() {
1616
6.71M
  return impl_->RegisterRequest();
1617
6.71M
}
1618
1619
6.71M
void TransactionParticipant::UnregisterRequest(int64_t request) {
1620
6.71M
  impl_->UnregisterRequest(request);
1621
6.71M
}
1622
1623
void TransactionParticipant::Abort(const TransactionId& id,
1624
55.9k
                                   TransactionStatusCallback callback) {
1625
55.9k
  return impl_->Abort(id, std::move(callback));
1626
55.9k
}
1627
1628
void TransactionParticipant::Handle(
1629
1.36M
    std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) {
1630
1.36M
  impl_->Handle(std::move(request), term);
1631
1.36M
}
1632
1633
0
void TransactionParticipant::Cleanup(TransactionIdSet&& set) {
1634
0
  return impl_->Cleanup(std::move(set), this);
1635
0
}
1636
1637
1.30M
Status TransactionParticipant::ProcessReplicated(const ReplicatedData& data) {
1638
1.30M
  return impl_->ProcessReplicated(data);
1639
1.30M
}
1640
1641
369k
Status TransactionParticipant::CheckAborted(const TransactionId& id) {
1642
369k
  return impl_->CheckAborted(id);
1643
369k
}
1644
1645
void TransactionParticipant::FillPriorities(
1646
146k
    boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) {
1647
146k
  return impl_->FillPriorities(inout);
1648
146k
}
1649
1650
void TransactionParticipant::SetDB(
1651
    const docdb::DocDB& db, const docdb::KeyBounds* key_bounds,
1652
145k
    RWOperationCounter* pending_op_counter) {
1653
145k
  impl_->SetDB(db, key_bounds, pending_op_counter);
1654
145k
}
1655
1656
void TransactionParticipant::GetStatus(
1657
    const TransactionId& transaction_id,
1658
    size_t required_num_replicated_batches,
1659
    int64_t term,
1660
    tserver::GetTransactionStatusAtParticipantResponsePB* response,
1661
0
    rpc::RpcContext* context) {
1662
0
  impl_->GetStatus(transaction_id, required_num_replicated_batches, term, response, context);
1663
0
}
1664
1665
48
TransactionParticipantContext* TransactionParticipant::context() const {
1666
48
  return impl_->participant_context();
1667
48
}
1668
1669
12.0M
HybridTime TransactionParticipant::MinRunningHybridTime() const {
1670
12.0M
  return impl_->MinRunningHybridTime();
1671
12.0M
}
1672
1673
3.53k
void TransactionParticipant::WaitMinRunningHybridTime(HybridTime ht) {
1674
3.53k
  impl_->WaitMinRunningHybridTime(ht);
1675
3.53k
}
1676
1677
15
Status TransactionParticipant::ResolveIntents(HybridTime resolve_at, CoarseTimePoint deadline) {
1678
15
  return impl_->ResolveIntents(resolve_at, deadline);
1679
15
}
1680
1681
0
size_t TransactionParticipant::TEST_GetNumRunningTransactions() const {
1682
0
  return impl_->TEST_GetNumRunningTransactions();
1683
0
}
1684
1685
OneWayBitmap TransactionParticipant::TEST_TransactionReplicatedBatches(
1686
0
    const TransactionId& id) const {
1687
0
  return impl_->TEST_TransactionReplicatedBatches(id);
1688
0
}
1689
1690
0
std::string TransactionParticipant::ReplicatedData::ToString() const {
1691
0
  return YB_STRUCT_TO_STRING(leader_term, state, op_id, hybrid_time, already_applied_to_regular_db);
1692
0
}
1693
1694
44.7k
void TransactionParticipant::StartShutdown() {
1695
44.7k
  impl_->StartShutdown();
1696
44.7k
}
1697
1698
44.7k
void TransactionParticipant::CompleteShutdown() {
1699
44.7k
  impl_->CompleteShutdown();
1700
44.7k
}
1701
1702
0
std::string TransactionParticipant::DumpTransactions() const {
1703
0
  return impl_->DumpTransactions();
1704
0
}
1705
1706
Status TransactionParticipant::StopActiveTxnsPriorTo(
1707
36.4k
    HybridTime cutoff, CoarseTimePoint deadline, TransactionId* exclude_txn_id) {
1708
36.4k
  return impl_->StopActiveTxnsPriorTo(cutoff, deadline, exclude_txn_id);
1709
36.4k
}
1710
1711
Result<HybridTime> TransactionParticipant::WaitForSafeTime(
1712
26.6k
    HybridTime safe_time, CoarseTimePoint deadline) {
1713
26.6k
  return impl_->WaitForSafeTime(safe_time, deadline);
1714
26.6k
}
1715
1716
0
void TransactionParticipant::IgnoreAllTransactionsStartedBefore(HybridTime limit) {
1717
0
  impl_->IgnoreAllTransactionsStartedBefore(limit);
1718
0
}
1719
1720
0
const TabletId& TransactionParticipant::tablet_id() const {
1721
0
  return impl_->participant_context()->tablet_id();
1722
0
}
1723
1724
58.6k
std::string TransactionParticipantContext::LogPrefix() const {
1725
58.6k
  return consensus::MakeTabletLogPrefix(tablet_id(), permanent_uuid());
1726
58.6k
}
1727
1728
1.19M
HybridTime TransactionParticipantContext::Now() {
1729
1.19M
  return clock_ptr()->Now();
1730
1.19M
}
1731
1732
} // namespace tablet
1733
} // namespace yb