YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_coordinator.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/tablet/transaction_coordinator.h"
17
18
#include <boost/multi_index/hashed_index.hpp>
19
#include <boost/multi_index/mem_fun.hpp>
20
#include <boost/multi_index/ordered_index.hpp>
21
#include <boost/multi_index_container.hpp>
22
23
#include "yb/client/client.h"
24
#include "yb/client/transaction_rpc.h"
25
26
#include "yb/common/common.pb.h"
27
#include "yb/common/entity_ids.h"
28
#include "yb/common/pgsql_error.h"
29
#include "yb/common/transaction.h"
30
#include "yb/common/transaction_error.h"
31
#include "yb/common/wire_protocol.h"
32
33
#include "yb/consensus/consensus_round.h"
34
#include "yb/consensus/consensus_util.h"
35
36
#include "yb/docdb/transaction_dump.h"
37
38
#include "yb/rpc/messenger.h"
39
#include "yb/rpc/poller.h"
40
#include "yb/rpc/rpc.h"
41
42
#include "yb/server/clock.h"
43
44
#include "yb/tablet/operations/update_txn_operation.h"
45
46
#include "yb/tserver/tserver_service.pb.h"
47
48
#include "yb/util/atomic.h"
49
#include "yb/util/countdown_latch.h"
50
#include "yb/util/enums.h"
51
#include "yb/util/flag_tags.h"
52
#include "yb/util/format.h"
53
#include "yb/util/logging.h"
54
#include "yb/util/metrics.h"
55
#include "yb/util/random_util.h"
56
#include "yb/util/result.h"
57
#include "yb/util/scope_exit.h"
58
#include "yb/util/status_format.h"
59
#include "yb/util/status_log.h"
60
#include "yb/util/tsan_util.h"
61
#include "yb/util/yb_pg_errcodes.h"
62
63
DECLARE_uint64(transaction_heartbeat_usec);
64
DEFINE_double(transaction_max_missed_heartbeat_periods, 10.0,
65
              "Maximum heartbeat periods that a pending transaction can miss before the "
66
              "transaction coordinator expires the transaction. The total expiration time in "
67
              "microseconds is transaction_heartbeat_usec times "
68
              "transaction_max_missed_heartbeat_periods. The value passed to this flag may be "
69
              "fractional.");
70
DEFINE_uint64(transaction_check_interval_usec, 500000, "Transaction check interval in usec.");
71
DEFINE_uint64(transaction_resend_applying_interval_usec, 5000000,
72
              "Transaction resend applying interval in usec.");
73
74
DEFINE_int64(avoid_abort_after_sealing_ms, 20,
75
             "If transaction was only sealed, we will try to abort it not earlier than this "
76
                 "period in milliseconds.");
77
78
DEFINE_test_flag(uint64, inject_txn_get_status_delay_ms, 0,
79
                 "Inject specified delay to transaction get status requests.");
80
DEFINE_test_flag(int64, inject_random_delay_on_txn_status_response_ms, 0,
81
                 "Inject a random amount of delay to the thread processing a "
82
                 "GetTransactionStatusRequest after it has populated it's response. This could "
83
                 "help simulate e.g. out-of-order responses where PENDING is received by client "
84
                 "after a COMMITTED response.");
85
86
using namespace std::literals;
87
using namespace std::placeholders;
88
89
namespace yb {
90
namespace tablet {
91
92
859k
std::chrono::microseconds GetTransactionTimeout() {
93
859k
  const double timeout = GetAtomicFlag(&FLAGS_transaction_max_missed_heartbeat_periods) *
94
859k
                         GetAtomicFlag(&FLAGS_transaction_heartbeat_usec);
95
  // Cast to avoid -Wimplicit-int-float-conversion.
96
859k
  return timeout >= static_cast<double>(std::chrono::microseconds::max().count())
97
859k
      ? 
std::chrono::microseconds::max()0
98
859k
      : std::chrono::microseconds(static_cast<int64_t>(timeout));
99
859k
}
100
101
namespace {
102
103
struct NotifyApplyingData {
104
  TabletId tablet;
105
  TransactionId transaction;
106
  const AbortedSubTransactionSetPB& aborted;
107
  HybridTime commit_time;
108
  bool sealed;
109
110
0
  std::string ToString() const {
111
0
    return Format("{ tablet: $0 transaction: $1 commit_time: $2 sealed: $3}",
112
0
                  tablet, transaction, commit_time, sealed);
113
0
  }
114
};
115
116
struct ExpectedTabletBatches {
117
  TabletId tablet;
118
  size_t batches;
119
120
0
  std::string ToString() const {
121
0
    return Format("{ tablet: $0 batches: $1 }", tablet, batches);
122
0
  }
123
};
124
125
// Context for transaction state. I.e. access to external facilities required by
126
// transaction state to do its job.
127
class TransactionStateContext {
128
 public:
129
  virtual TransactionCoordinatorContext& coordinator_context() = 0;
130
131
  virtual void NotifyApplying(NotifyApplyingData data) = 0;
132
133
  // Submits update transaction to the RAFT log. Returns false if was not able to submit.
134
  virtual MUST_USE_RESULT bool SubmitUpdateTransaction(
135
      std::unique_ptr<UpdateTxnOperation> operation) = 0;
136
137
  virtual void CompleteWithStatus(
138
      std::unique_ptr<UpdateTxnOperation> request, Status status) = 0;
139
140
  virtual void CompleteWithStatus(UpdateTxnOperation* request, Status status) = 0;
141
142
  virtual bool leader() const = 0;
143
144
 protected:
145
432
  ~TransactionStateContext() {}
146
};
147
148
1.22M
std::string BuildLogPrefix(const std::string& parent_log_prefix, const TransactionId& id) {
149
1.22M
  auto id_string = id.ToString();
150
1.22M
  return parent_log_prefix.substr(0, parent_log_prefix.length() - 2) + " ID " + id_string + ": ";
151
1.22M
}
152
153
// TransactionState keeps state of single transaction.
154
// User of this class should guarantee that it does NOT invoke methods concurrently.
155
class TransactionState {
156
 public:
157
  explicit TransactionState(TransactionStateContext* context,
158
                            const TransactionId& id,
159
                            HybridTime last_touch,
160
                            const std::string& parent_log_prefix)
161
      : context_(*context),
162
        id_(id),
163
        log_prefix_(BuildLogPrefix(parent_log_prefix, id)),
164
1.22M
        last_touch_(last_touch) {
165
1.22M
  }
166
167
1.21M
  ~TransactionState() {
168
1.21M
    DCHECK(abort_waiters_.empty());
169
1.21M
    DCHECK(request_queue_.empty());
170
1.21M
    DCHECK
(replicating_ == nullptr) << Format("Replicating: $0", static_cast<void*>(replicating_))64
;
171
1.21M
  }
172
173
  // Id of transaction.
174
16.8M
  const TransactionId& id() const {
175
16.8M
    return id_;
176
16.8M
  }
177
178
  // Time when we last heard from transaction. I.e. hybrid time of replicated raft log entry
179
  // that updates status of this transaction.
180
13.5M
  HybridTime last_touch() const {
181
13.5M
    return last_touch_;
182
13.5M
  }
183
184
  // Status of transaction.
185
1.50k
  TransactionStatus status() const {
186
1.50k
    return status_;
187
1.50k
  }
188
189
  // RAFT index of first RAFT log entry required by this transaction.
190
14.8M
  int64_t first_entry_raft_index() const {
191
14.8M
    return first_entry_raft_index_;
192
14.8M
  }
193
194
1.21M
  std::string ToString() const {
195
1.21M
    return Format("{ id: $0 last_touch: $1 status: $2 involved_tablets: $3 replicating: $4 "
196
1.21M
                      " request_queue: $5 first_entry_raft_index: $6 }",
197
1.21M
                  id_, last_touch_, TransactionStatus_Name(status_),
198
1.21M
                  involved_tablets_, replicating_, request_queue_, first_entry_raft_index_);
199
1.21M
  }
200
201
  // Whether this transaction expired at specified time.
202
868k
  bool ExpiredAt(HybridTime now) const {
203
868k
    if (ShouldBeCommitted() || 
ShouldBeInStatus(TransactionStatus::SEALED)858k
) {
204
9.00k
      return false;
205
9.00k
    }
206
858k
    const int64_t passed = now.GetPhysicalValueMicros() - last_touch_.GetPhysicalValueMicros();
207
858k
    if (std::chrono::microseconds(passed) > GetTransactionTimeout()) {
208
49.9k
      return true;
209
49.9k
    }
210
809k
    return false;
211
858k
  }
212
213
  // Whether this transaction has completed.
214
3.19M
  bool Completed() const {
215
3.19M
    return status_ == TransactionStatus::ABORTED ||
216
3.19M
           
status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS2.56M
;
217
3.19M
  }
218
219
  // Applies new state to transaction.
220
3.19M
  CHECKED_STATUS ProcessReplicated(const TransactionCoordinator::ReplicatedData& data) {
221
3.19M
    
VLOG_WITH_PREFIX292
(4)
222
292
        << Format("ProcessReplicated: $0, replicating: $1", data, replicating_);
223
224
3.19M
    if (replicating_ != nullptr) {
225
1.06M
      auto replicating_op_id = replicating_->consensus_round()->id();
226
1.06M
      if (
!replicating_op_id.empty()1.06M
) {
227
1.06M
        if (replicating_op_id != data.op_id) {
228
0
          LOG_WITH_PREFIX(DFATAL)
229
0
              << "Replicated unexpected operation, replicating: " << AsString(replicating_)
230
0
              << ", replicated: " << AsString(data);
231
0
        }
232
18.4E
      } else if (data.leader_term != OpId::kUnknownTerm) {
233
0
        LOG_WITH_PREFIX(DFATAL)
234
0
            << "Leader replicated operation without op id, replicating: " << AsString(replicating_)
235
0
            << ", replicated: " << AsString(data);
236
18.4E
      } else {
237
18.4E
        LOG_WITH_PREFIX(INFO) << "Cancel replicating without id: " << AsString(replicating_)
238
18.4E
                              << ", because " << AsString(data) << " was replicated";
239
18.4E
      }
240
1.06M
      replicating_ = nullptr;
241
1.06M
    }
242
243
3.19M
    auto status = DoProcessReplicated(data);
244
245
3.19M
    if (data.leader_term == OpId::kUnknownTerm) {
246
2.12M
      ClearRequests(STATUS(IllegalState, "Leader changed"));
247
2.12M
    } else {
248
1.06M
      switch(status_) {
249
198k
        case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
250
198k
          ClearRequests(STATUS(AlreadyPresent, "Transaction committed"));
251
198k
          break;
252
206k
        case TransactionStatus::ABORTED:
253
206k
          ClearRequests(
254
206k
              STATUS(Expired, "Transaction aborted",
255
206k
                     TransactionError(TransactionErrorCode::kAborted)));
256
206k
          break;
257
0
        case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
258
462k
        case TransactionStatus::PENDING: FALLTHROUGH_INTENDED;
259
462k
        case TransactionStatus::SEALED: FALLTHROUGH_INTENDED;
260
661k
        case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED;
261
661k
        case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
262
661k
        case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED;
263
661k
        case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
264
661k
        case TransactionStatus::GRACEFUL_CLEANUP:
265
661k
          ProcessQueue();
266
661k
          break;
267
1.06M
      }
268
1.06M
    }
269
270
3.18M
    return status;
271
3.19M
  }
272
273
0
  void ProcessAborted(const TransactionCoordinator::AbortedData& data) {
274
0
    VLOG_WITH_PREFIX(4) << Format("ProcessAborted: $0, replicating: $1", data.state, replicating_);
275
276
0
    LOG_IF(DFATAL,
277
0
           replicating_ != nullptr && !replicating_->op_id().empty() &&
278
0
           replicating_->op_id() != data.op_id)
279
0
        << "Aborted wrong operation, expected " << AsString(replicating_) << ", but "
280
0
        << AsString(data) << " aborted";
281
282
0
    replicating_ = nullptr;
283
284
    // We are not leader, so could abort all queued requests.
285
0
    ClearRequests(STATUS(Aborted, "Replication failed"));
286
0
  }
287
288
  // Clear requests of this transaction.
289
3.74M
  void ClearRequests(const Status& status) {
290
3.74M
    
VLOG_WITH_PREFIX121
(4) << Format("ClearRequests: $0, replicating: $1", status, replicating_)121
;
291
3.74M
    if (replicating_ != nullptr) {
292
0
      context_.CompleteWithStatus(replicating_, status);
293
0
      replicating_ = nullptr;
294
0
    }
295
296
3.74M
    for (auto& entry : request_queue_) {
297
6.40k
      context_.CompleteWithStatus(std::move(entry), status);
298
6.40k
    }
299
3.74M
    request_queue_.clear();
300
301
3.74M
    NotifyAbortWaiters(status);
302
3.74M
  }
303
304
  // Used only during transaction sealing.
305
0
  void ReplicatedAllBatchesAt(const TabletId& tablet, HybridTime last_time) {
306
0
    auto it = involved_tablets_.find(tablet);
307
    // We could be notified several times, so avoid double handling.
308
0
    if (it == involved_tablets_.end() || it->second.all_batches_replicated) {
309
0
      return;
310
0
    }
311
312
    // If transaction was sealed, then its commit time is max of seal record time and intent
313
    // replication times from all participating tablets.
314
0
    commit_time_ = std::max(commit_time_, last_time);
315
0
    --tablets_with_not_replicated_batches_;
316
0
    it->second.all_batches_replicated = true;
317
318
0
    if (tablets_with_not_replicated_batches_ == 0) {
319
0
      StartApply();
320
0
    }
321
0
  }
322
323
35.2k
  const AbortedSubTransactionSetPB& GetAbortedSubTransactionSetPB() const { return aborted_; }
324
325
  Result<TransactionStatusResult> GetStatus(
326
233k
      std::vector<ExpectedTabletBatches>* expected_tablet_batches) const {
327
233k
    switch (status_) {
328
35.2k
      case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED;
329
35.2k
      case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
330
35.2k
        return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_};
331
0
      case TransactionStatus::SEALED:
332
0
        if (tablets_with_not_replicated_batches_ == 0) {
333
0
          return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_};
334
0
        }
335
0
        FillExpectedTabletBatches(expected_tablet_batches);
336
0
        return TransactionStatusResult{TransactionStatus::SEALED, commit_time_};
337
0
      case TransactionStatus::ABORTED:
338
0
        return TransactionStatusResult{TransactionStatus::ABORTED, HybridTime::kMax};
339
197k
      case TransactionStatus::PENDING: {
340
197k
        HybridTime status_ht;
341
197k
        if (replicating_) {
342
49.8k
          auto replicating_status = replicating_->request()->status();
343
49.8k
          if (replicating_status == TransactionStatus::COMMITTED ||
344
49.8k
              
replicating_status == TransactionStatus::ABORTED8.31k
) {
345
48.2k
            auto replicating_ht = replicating_->hybrid_time_even_if_unset();
346
48.2k
            if (replicating_ht.is_valid()) {
347
42.8k
              status_ht = replicating_ht;
348
42.8k
            } else {
349
              // Hybrid time now yet assigned to replicating, so assign more conservative time,
350
              // that is guaranteed to be less then replicating time. See GH #9981.
351
5.41k
              status_ht = replicating_submit_time_;
352
5.41k
            }
353
48.2k
          }
354
49.8k
        }
355
197k
        if (!status_ht) {
356
149k
          status_ht = context_.coordinator_context().clock().Now();
357
149k
        }
358
197k
        status_ht = std::min(status_ht, context_.coordinator_context().HtLeaseExpiration());
359
197k
        return TransactionStatusResult{TransactionStatus::PENDING, status_ht.Decremented()};
360
0
      }
361
0
      case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
362
0
      case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
363
0
      case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED;
364
0
      case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
365
0
      case TransactionStatus::GRACEFUL_CLEANUP:
366
0
        return STATUS_FORMAT(Corruption, "Transaction has unexpected status: $0",
367
233k
                             TransactionStatus_Name(status_));
368
233k
    }
369
0
    FATAL_INVALID_ENUM_VALUE(TransactionStatus, status_);
370
0
  }
371
372
0
  void Aborted() {
373
0
    status_ = TransactionStatus::ABORTED;
374
0
    NotifyAbortWaiters(TransactionStatusResult::Aborted());
375
0
  }
376
377
183k
  TransactionStatusResult Abort(TransactionAbortCallback* callback) {
378
183k
    if (status_ == TransactionStatus::COMMITTED ||
379
183k
        
status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS177k
) {
380
5.88k
      return TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_);
381
177k
    } else if (ShouldBeCommitted()) {
382
16.6k
      return TransactionStatusResult(TransactionStatus::COMMITTED, HybridTime::kMax);
383
160k
    } else if (status_ == TransactionStatus::ABORTED) {
384
0
      return TransactionStatusResult::Aborted();
385
160k
    } else {
386
160k
      
VLOG_WITH_PREFIX25
(1) << "External abort request"25
;
387
160k
      CHECK_EQ(TransactionStatus::PENDING, status_);
388
160k
      abort_waiters_.emplace_back(std::move(*callback));
389
160k
      Abort();
390
160k
      return TransactionStatusResult(TransactionStatus::PENDING, HybridTime::kMax);
391
160k
    }
392
183k
  }
393
394
1.10M
  void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request) {
395
1.10M
    auto& state = *request->request();
396
1.10M
    
VLOG_WITH_PREFIX70
(1) << "Handle: " << state.ShortDebugString()70
;
397
1.10M
    if (state.status() == TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS) {
398
436k
      auto status = AppliedInOneOfInvolvedTablets(state);
399
436k
      context_.CompleteWithStatus(std::move(request), status);
400
436k
      return;
401
436k
    }
402
669k
    if (replicating_) {
403
6.85k
      request_queue_.push_back(std::move(request));
404
6.85k
      return;
405
6.85k
    }
406
662k
    DoHandle(std::move(request));
407
662k
  }
408
409
  // Aborts this transaction.
410
209k
  void Abort() {
411
209k
    if (ShouldBeCommitted()) {
412
0
      LOG_WITH_PREFIX(DFATAL) << "Transaction abort in wrong state: " << status_;
413
0
      return;
414
0
    }
415
209k
    if (ShouldBeAborted()) {
416
861
      return;
417
861
    }
418
208k
    if (status_ != TransactionStatus::PENDING) {
419
0
      LOG_WITH_PREFIX(DFATAL) << "Unexpected status during abort: "
420
0
                              << TransactionStatus_Name(status_);
421
0
      return;
422
0
    }
423
208k
    SubmitUpdateStatus(TransactionStatus::ABORTED);
424
208k
  }
425
426
  // Returns logs prefix for this transaction.
427
4
  const std::string& LogPrefix() {
428
4
    return log_prefix_;
429
4
  }
430
431
  // now_physical is just optimization to avoid querying the current time multiple times.
432
450k
  void Poll(bool leader, MonoTime now_physical) {
433
450k
    if (status_ != TransactionStatus::COMMITTED &&
434
450k
        
(432k
status_ != TransactionStatus::SEALED432k
||
tablets_with_not_replicated_batches_ != 00
)) {
435
432k
      return;
436
432k
    }
437
17.2k
    if (tablets_with_not_applied_intents_ == 0) {
438
1.54k
      if (leader && !ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS)) {
439
21
        SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS);
440
21
      }
441
15.6k
    } else if (now_physical >= resend_applying_time_) {
442
12
      if (leader) {
443
27
        for (auto& tablet : involved_tablets_) {
444
27
          if (!tablet.second.all_intents_applied) {
445
22
            context_.NotifyApplying({
446
22
                .tablet = tablet.first,
447
22
                .transaction = id_,
448
22
                .aborted = aborted_,
449
22
                .commit_time = commit_time_,
450
22
                .sealed = status_ == TransactionStatus::SEALED });
451
22
          }
452
27
        }
453
5
      }
454
12
      resend_applying_time_ = now_physical +
455
12
          std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec);
456
12
    }
457
17.2k
  }
458
459
  void AddInvolvedTablets(
460
0
      const TabletId& source_tablet_id, const std::vector<TabletId>& tablet_ids) {
461
0
    auto source_it = involved_tablets_.find(source_tablet_id);
462
0
    if (source_it == involved_tablets_.end()) {
463
0
      LOG(FATAL) << "Unknown involved tablet: " << source_tablet_id;
464
0
      return;
465
0
    }
466
0
    for (const auto& tablet_id : tablet_ids) {
467
0
      if (involved_tablets_.emplace(tablet_id, source_it->second).second) {
468
0
        ++tablets_with_not_applied_intents_;
469
0
      }
470
0
    }
471
0
    if (!source_it->second.all_intents_applied) {
472
      // Mark source tablet as if intents have been applied for it.
473
0
      --tablets_with_not_applied_intents_;
474
0
      source_it->second.all_intents_applied = true;
475
0
    }
476
0
  }
477
478
436k
  CHECKED_STATUS AppliedInOneOfInvolvedTablets(const TransactionStatePB& state) {
479
436k
    if (status_ != TransactionStatus::COMMITTED && 
status_ != TransactionStatus::SEALED0
) {
480
      // We could ignore this request, because it will be re-send if required.
481
0
      LOG_WITH_PREFIX(DFATAL)
482
0
          << "AppliedInOneOfInvolvedTablets in wrong state: " << TransactionStatus_Name(status_)
483
0
          << ", request: " << state.ShortDebugString();
484
0
      return Status::OK();
485
0
    }
486
487
436k
    if (state.tablets_size() != 1) {
488
0
      return STATUS_FORMAT(
489
0
          InvalidArgument, "Expected exactly one tablet in $0: $1", __func__, state);
490
0
    }
491
492
436k
    auto it = involved_tablets_.find(state.tablets(0));
493
436k
    if (it == involved_tablets_.end()) {
494
      // This can happen when transaction coordinator retried apply to post-split tablets,
495
      // transaction coordinator moved to new status tablet leader and here new transaction
496
      // coordinator receives notification about txn is applied in post-split tablet not yet known
497
      // to new transaction coordinator.
498
      // It is safe to just log warning and ignore, because new transaction coordinator is sending
499
      // again apply requests to all involved tablet it knows and will be retrying for ones that
500
      // will reply have been already split.
501
0
      LOG_WITH_PREFIX(WARNING) << "Applied in unknown tablet: " << state.tablets(0);
502
0
      return Status::OK();
503
0
    }
504
436k
    if (!it->second.all_intents_applied) {
505
436k
      --tablets_with_not_applied_intents_;
506
436k
      it->second.all_intents_applied = true;
507
436k
      
VLOG_WITH_PREFIX9
(4) << "Applied to " << state.tablets(0) << ", left not applied: "
508
9
                          << tablets_with_not_applied_intents_;
509
436k
      if (tablets_with_not_applied_intents_ == 0) {
510
198k
        SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS);
511
198k
      }
512
436k
    }
513
436k
    return Status::OK();
514
436k
  }
515
516
 private:
517
  // Checks whether we in specified status or going to be in this status when replication is
518
  // finished.
519
3.55M
  bool ShouldBeInStatus(TransactionStatus status) const {
520
3.55M
    if (status_ == status) {
521
8.05k
      return true;
522
8.05k
    }
523
3.54M
    if (replicating_) {
524
37.4k
      if (replicating_->request()->status() == status) {
525
19.1k
        return true;
526
19.1k
      }
527
528
18.3k
      for (const auto& entry : request_queue_) {
529
1.57k
        if (entry->request()->status() == status) {
530
909
          return true;
531
909
        }
532
1.57k
      }
533
18.3k
    }
534
535
3.52M
    return false;
536
3.54M
  }
537
538
1.25M
  bool ShouldBeCommitted() const {
539
1.25M
    return ShouldBeInStatus(TransactionStatus::COMMITTED) ||
540
1.25M
           
ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS)1.22M
;
541
1.25M
  }
542
543
209k
  bool ShouldBeAborted() const {
544
209k
    return ShouldBeInStatus(TransactionStatus::ABORTED);
545
209k
  }
546
547
  // Process operation that was replicated in RAFT.
548
3.19M
  CHECKED_STATUS DoProcessReplicated(const TransactionCoordinator::ReplicatedData& data) {
549
3.19M
    switch (data.state.status()) {
550
620k
      case TransactionStatus::ABORTED:
551
620k
        return AbortedReplicationFinished(data);
552
0
      case TransactionStatus::SEALED:
553
0
        return SealedReplicationFinished(data);
554
592k
      case TransactionStatus::COMMITTED:
555
592k
        return CommittedReplicationFinished(data);
556
1.22M
      case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
557
1.38M
      case TransactionStatus::PENDING:
558
1.38M
        return PendingReplicationFinished(data);
559
0
      case TransactionStatus::APPLYING:
560
        // APPLYING is handled separately, because it is received for transactions not managed by
561
        // this tablet as a transaction status tablet, but tablets that are involved in the data
562
        // path (receive write intents) for this transactions
563
0
        FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
564
0
      case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS:
565
        // APPLIED_IN_ONE_OF_INVOLVED_TABLETS handled w/o use of RAFT log
566
0
        FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
567
592k
      case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
568
592k
        return AppliedInAllInvolvedTabletsReplicationFinished(data);
569
0
      case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
570
0
      case TransactionStatus::GRACEFUL_CLEANUP:
571
        // CLEANUP is handled separately, because it is received for transactions not managed by
572
        // this tablet as a transaction status tablet, but tablets that are involved in the data
573
        // path (receive write intents) for this transactions
574
0
        FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
575
3.19M
    }
576
0
    FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
577
0
  }
578
579
664k
  void DoHandle(std::unique_ptr<tablet::UpdateTxnOperation> request) {
580
664k
    const auto& state = *request->request();
581
582
664k
    Status status;
583
664k
    auto txn_status = state.status();
584
664k
    if (txn_status == TransactionStatus::COMMITTED) {
585
198k
      status = HandleCommit();
586
466k
    } else if (txn_status == TransactionStatus::PENDING ||
587
466k
               
txn_status == TransactionStatus::CREATED412k
) {
588
        // Handling txn_status of CREATED when the current status (status_) is PENDING is only
589
        // allowed for backward compatibility with versions prior to D11210, which could send
590
        // transaction creation retries with the same id.
591
464k
      if (status_ != TransactionStatus::PENDING) {
592
4
        status = STATUS_FORMAT(IllegalState,
593
4
            "Transaction in wrong state during heartbeat: $0",
594
4
            TransactionStatus_Name(status_));
595
4
      }
596
464k
    }
597
598
664k
    if (!status.ok()) {
599
4
      context_.CompleteWithStatus(std::move(request), std::move(status));
600
4
      return;
601
4
    }
602
603
664k
    
VLOG_WITH_PREFIX491
(4) << Format("DoHandle, replicating = $0", replicating_)491
;
604
664k
    auto submitted = SubmitRequest(std::move(request));
605
    // Should always succeed, since we execute this code only on the leader.
606
664k
    CHECK
(submitted) << "Status: " << TransactionStatus_Name(txn_status)143
;
607
664k
  }
608
609
198k
  CHECKED_STATUS HandleCommit() {
610
198k
    auto hybrid_time = context_.coordinator_context().clock().Now();
611
198k
    if (ExpiredAt(hybrid_time)) {
612
0
      auto status = STATUS(Expired, "Commit of expired transaction");
613
0
      VLOG_WITH_PREFIX(4) << status;
614
0
      Abort();
615
0
      return status;
616
0
    }
617
198k
    if (status_ != TransactionStatus::PENDING) {
618
0
      return STATUS_FORMAT(IllegalState,
619
0
                           "Transaction in wrong state when starting to commit: $0",
620
0
                           TransactionStatus_Name(status_));
621
0
    }
622
623
198k
    return Status::OK();
624
198k
  }
625
626
406k
  void SubmitUpdateStatus(TransactionStatus status) {
627
406k
    
VLOG_WITH_PREFIX58
(4) << "SubmitUpdateStatus(" << TransactionStatus_Name(status) << ")"58
;
628
629
406k
    TransactionStatePB state;
630
406k
    state.set_transaction_id(id_.data(), id_.size());
631
406k
    state.set_status(status);
632
633
406k
    auto request = context_.coordinator_context().CreateUpdateTransaction(&state);
634
406k
    if (replicating_) {
635
1.72k
      request_queue_.push_back(std::move(request));
636
405k
    } else {
637
405k
      SubmitRequest(std::move(request));
638
405k
    }
639
406k
  }
640
641
1.06M
  bool SubmitRequest(std::unique_ptr<tablet::UpdateTxnOperation> request) {
642
1.06M
    replicating_ = request.get();
643
1.06M
    replicating_submit_time_ = context_.coordinator_context().clock().Now();
644
18.4E
    VLOG_WITH_PREFIX(4) << Format("SubmitUpdateStatus, replicating = $0", replicating_);
645
1.06M
    if (!context_.SubmitUpdateTransaction(std::move(request))) {
646
      // Was not able to submit update transaction, for instance we are not leader.
647
      // So we are not replicating.
648
21
      replicating_ = nullptr;
649
21
      return false;
650
21
    }
651
652
1.06M
    return true;
653
1.06M
  }
654
655
661k
  void ProcessQueue() {
656
663k
    while (!replicating_ && 
!request_queue_.empty()612k
) {
657
1.98k
      auto request = std::move(request_queue_.front());
658
1.98k
      request_queue_.pop_front();
659
1.98k
      DoHandle(std::move(request));
660
1.98k
    }
661
661k
  }
662
663
620k
  CHECKED_STATUS AbortedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) {
664
620k
    if (status_ != TransactionStatus::ABORTED &&
665
620k
        
status_ != TransactionStatus::PENDING620k
) {
666
0
      LOG_WITH_PREFIX(DFATAL) << "Invalid status of aborted transaction: "
667
0
                              << TransactionStatus_Name(status_);
668
0
    }
669
670
620k
    status_ = TransactionStatus::ABORTED;
671
620k
    first_entry_raft_index_ = data.op_id.index;
672
620k
    NotifyAbortWaiters(TransactionStatusResult::Aborted());
673
620k
    return Status::OK();
674
620k
  }
675
676
  CHECKED_STATUS SealedReplicationFinished(
677
0
      const TransactionCoordinator::ReplicatedData& data) {
678
0
    if (status_ != TransactionStatus::PENDING) {
679
0
      auto status = STATUS_FORMAT(
680
0
          IllegalState,
681
0
          "Unexpected status during CommittedReplicationFinished: $0",
682
0
          TransactionStatus_Name(status_));
683
0
      LOG_WITH_PREFIX(DFATAL) << status;
684
0
      return status;
685
0
    }
686
687
0
    last_touch_ = data.hybrid_time;
688
0
    commit_time_ = data.hybrid_time;
689
    // TODO(dtxn) Not yet implemented
690
0
    next_abort_after_sealing_ = CoarseMonoClock::now() + FLAGS_avoid_abort_after_sealing_ms * 1ms;
691
    // TODO(savepoints) Savepoints with sealed transactions is not yet tested
692
0
    aborted_ = data.state.aborted();
693
0
    VLOG_WITH_PREFIX(4) << "Seal time: " << commit_time_;
694
0
    status_ = TransactionStatus::SEALED;
695
696
0
    involved_tablets_.reserve(data.state.tablets().size());
697
0
    for (int idx = 0; idx != data.state.tablets().size(); ++idx) {
698
0
      auto tablet_batches = data.state.tablet_batches(idx);
699
0
      LOG_IF_WITH_PREFIX(DFATAL, tablet_batches == 0)
700
0
          << "Tablet without batches: " << data.state.ShortDebugString();
701
0
      ++tablets_with_not_replicated_batches_;
702
0
      InvolvedTabletState state = {
703
0
        .required_replicated_batches = static_cast<size_t>(tablet_batches),
704
0
        .all_batches_replicated = false,
705
0
        .all_intents_applied = false
706
0
      };
707
0
      involved_tablets_.emplace(data.state.tablets(idx), state);
708
0
    }
709
710
0
    first_entry_raft_index_ = data.op_id.index;
711
0
    return Status::OK();
712
0
  }
713
714
592k
  CHECKED_STATUS CommittedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) {
715
592k
    if (status_ != TransactionStatus::PENDING) {
716
0
      auto status = STATUS_FORMAT(
717
0
          IllegalState,
718
0
          "Unexpected status during CommittedReplicationFinished: $0",
719
0
          TransactionStatus_Name(status_));
720
0
      LOG_WITH_PREFIX(DFATAL) << status;
721
0
      return status;
722
0
    }
723
724
592k
    YB_TRANSACTION_DUMP(Commit, id_, data.hybrid_time, data.state.tablets().size());
725
726
592k
    last_touch_ = data.hybrid_time;
727
592k
    commit_time_ = data.hybrid_time;
728
592k
    first_entry_raft_index_ = data.op_id.index;
729
592k
    aborted_ = data.state.aborted();
730
731
592k
    involved_tablets_.reserve(data.state.tablets().size());
732
1.30M
    for (const auto& tablet : data.state.tablets()) {
733
1.30M
      InvolvedTabletState state = {
734
1.30M
        .required_replicated_batches = 0,
735
1.30M
        .all_batches_replicated = true,
736
1.30M
        .all_intents_applied = false
737
1.30M
      };
738
1.30M
      involved_tablets_.emplace(tablet, state);
739
1.30M
    }
740
741
592k
    status_ = TransactionStatus::COMMITTED;
742
592k
    StartApply();
743
592k
    return Status::OK();
744
592k
  }
745
746
  CHECKED_STATUS AppliedInAllInvolvedTabletsReplicationFinished(
747
592k
      const TransactionCoordinator::ReplicatedData& data) {
748
592k
    if (status_ != TransactionStatus::COMMITTED && 
status_ != TransactionStatus::SEALED0
) {
749
      // That could happen in old version, because we could drop all entries before
750
      // APPLIED_IN_ALL_INVOLVED_TABLETS.
751
0
      LOG_WITH_PREFIX(DFATAL)
752
0
          << "AppliedInAllInvolvedTabletsReplicationFinished in wrong state: "
753
0
          << TransactionStatus_Name(status_) << ", request: " << data.state.ShortDebugString();
754
0
      CHECK_EQ(status_, TransactionStatus::PENDING);
755
0
    }
756
592k
    
VLOG_WITH_PREFIX19
(4) << __func__ << ", status: " << TransactionStatus_Name(status_)
757
19
                        << ", leader: " << context_.leader();
758
592k
    last_touch_ = data.hybrid_time;
759
592k
    status_ = TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS;
760
761
592k
    YB_TRANSACTION_DUMP(Applied, id_, data.hybrid_time);
762
763
592k
    return Status::OK();
764
592k
  }
765
766
  // Used for PENDING and CREATED records. Because when we apply replicated operations they have
767
  // the same meaning.
768
1.38M
  CHECKED_STATUS PendingReplicationFinished(const TransactionCoordinator::ReplicatedData& data) {
769
1.38M
    if (context_.leader() && 
ExpiredAt(data.hybrid_time)462k
) {
770
48.4k
      
VLOG_WITH_PREFIX0
(4) << "Expired during replication of PENDING or CREATED operations."0
;
771
48.4k
      Abort();
772
48.4k
      return Status::OK();
773
48.4k
    }
774
1.33M
    if (status_ != TransactionStatus::PENDING) {
775
0
      LOG_WITH_PREFIX(DFATAL) << "Bad status during " << __func__ << "(" << data.ToString()
776
0
                              << "): " << ToString();
777
0
      return Status::OK();
778
0
    }
779
1.33M
    last_touch_ = data.hybrid_time;
780
1.33M
    first_entry_raft_index_ = data.op_id.index;
781
1.33M
    return Status::OK();
782
1.33M
  }
783
784
4.95M
  void NotifyAbortWaiters(const Result<TransactionStatusResult>& result) {
785
4.95M
    for (auto& waiter : abort_waiters_) {
786
158k
      waiter(result);
787
158k
    }
788
4.95M
    abort_waiters_.clear();
789
4.95M
  }
790
791
592k
  void StartApply() {
792
592k
    
VLOG_WITH_PREFIX36
(4) << __func__ << ", commit time: " << commit_time_ << ", involved tablets: "
793
36
                        << AsString(involved_tablets_);
794
592k
    resend_applying_time_ = MonoTime::Now() +
795
592k
        std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec);
796
592k
    tablets_with_not_applied_intents_ = involved_tablets_.size();
797
592k
    if (context_.leader()) {
798
436k
      for (const auto& tablet : involved_tablets_) {
799
436k
        context_.NotifyApplying({
800
436k
            .tablet = tablet.first,
801
436k
            .transaction = id_,
802
436k
            .aborted = aborted_,
803
436k
            .commit_time = commit_time_,
804
436k
            .sealed = status_ == TransactionStatus::SEALED});
805
436k
      }
806
198k
    }
807
592k
    NotifyAbortWaiters(TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_));
808
592k
  }
809
810
  void FillExpectedTabletBatches(
811
0
      std::vector<ExpectedTabletBatches>* expected_tablet_batches) const {
812
0
    if (!expected_tablet_batches) {
813
0
      return;
814
0
    }
815
816
0
    for (const auto& tablet_id_and_state : involved_tablets_) {
817
0
      if (!tablet_id_and_state.second.all_batches_replicated) {
818
0
        expected_tablet_batches->push_back(ExpectedTabletBatches{
819
0
          .tablet = tablet_id_and_state.first,
820
0
          .batches = tablet_id_and_state.second.required_replicated_batches
821
0
        });
822
0
      }
823
0
    }
824
0
  }
825
826
  TransactionStateContext& context_;
827
  const TransactionId id_;
828
  const std::string log_prefix_;
829
  TransactionStatus status_ = TransactionStatus::PENDING;
830
  HybridTime last_touch_;
831
  // It should match last_touch_, but it is possible that because of some code errors it
832
  // would not be so. To add stability we introduce a separate field for it.
833
  HybridTime commit_time_;
834
835
  // If transaction was only sealed, we will try to abort it not earlier than this time.
836
  CoarseTimePoint next_abort_after_sealing_;
837
838
  struct InvolvedTabletState {
839
    // How many batches should be replicated at this tablet.
840
    size_t required_replicated_batches = 0;
841
842
    // True if this tablet already replicated all batches.
843
    bool all_batches_replicated = false;
844
845
    // True if this tablet already applied all intents.
846
    bool all_intents_applied = false;
847
848
1.30M
    std::string ToString() const {
849
1.30M
      return Format("{ required_replicated_batches: $0 all_batches_replicated: $1 "
850
1.30M
                        "all_intents_applied: $2 }",
851
1.30M
                    required_replicated_batches, all_batches_replicated, all_intents_applied);
852
1.30M
    }
853
  };
854
855
  // Tablets participating in this transaction.
856
  std::unordered_map<TabletId, InvolvedTabletState> involved_tablets_;
857
  // Number of tablets that have not yet replicated all batches.
858
  size_t tablets_with_not_replicated_batches_ = 0;
859
  // Number of tablets that have not yet applied intents.
860
  size_t tablets_with_not_applied_intents_ = 0;
861
  // Don't resend applying until this time.
862
  MonoTime resend_applying_time_;
863
  int64_t first_entry_raft_index_ = std::numeric_limits<int64_t>::max();
864
865
  // Metadata tracking aborted subtransaction IDs in this transaction.
866
  AbortedSubTransactionSetPB aborted_;
867
868
  // The operation that we a currently replicating in RAFT.
869
  // It is owned by TransactionDriver (that will be renamed to OperationDriver).
870
  tablet::UpdateTxnOperation* replicating_ = nullptr;
871
  // Hybrid time before submitting replicating operation.
872
  // It is guaranteed to be less then actual operation hybrid time.
873
  HybridTime replicating_submit_time_;
874
  std::deque<std::unique_ptr<tablet::UpdateTxnOperation>> request_queue_;
875
876
  std::vector<TransactionAbortCallback> abort_waiters_;
877
};
878
879
struct CompleteWithStatusEntry {
880
  std::unique_ptr<UpdateTxnOperation> holder;
881
  UpdateTxnOperation* request;
882
  Status status;
883
};
884
885
// Contains actions that should be executed after lock in transaction coordinator is released.
886
struct PostponedLeaderActions {
887
  int64_t leader_term = OpId::kUnknownTerm;
888
  // List of tablets with transaction id, that should be notified that this transaction
889
  // is applying.
890
  std::vector<NotifyApplyingData> notify_applying;
891
  // List of update transaction records, that should be replicated via RAFT.
892
  std::vector<std::unique_ptr<UpdateTxnOperation>> updates;
893
894
  std::vector<CompleteWithStatusEntry> complete_with_status;
895
896
10.0M
  void Swap(PostponedLeaderActions* other) {
897
10.0M
    std::swap(leader_term, other->leader_term);
898
10.0M
    notify_applying.swap(other->notify_applying);
899
10.0M
    updates.swap(other->updates);
900
10.0M
    complete_with_status.swap(other->complete_with_status);
901
10.0M
  }
902
903
13.5M
  bool leader() const {
904
13.5M
    return leader_term != OpId::kUnknownTerm;
905
13.5M
  }
906
};
907
908
} // namespace
909
910
0
std::string TransactionCoordinator::AbortedData::ToString() const {
911
0
  return YB_STRUCT_TO_STRING(state, op_id);
912
0
}
913
914
// Real implementation of transaction coordinator, as in PImpl idiom.
915
class TransactionCoordinator::Impl : public TransactionStateContext {
916
 public:
917
  Impl(const std::string& permanent_uuid,
918
       TransactionCoordinatorContext* context,
919
       Counter* expired_metric)
920
      : context_(*context),
921
        expired_metric_(*expired_metric),
922
        log_prefix_(consensus::MakeTabletLogPrefix(context->tablet_id(), permanent_uuid)),
923
48.0k
        poller_(log_prefix_, std::bind(&Impl::Poll, this)) {
924
48.0k
  }
925
926
432
  virtual ~Impl() {
927
432
    Shutdown();
928
432
  }
929
930
864
  void Shutdown() {
931
864
    poller_.Shutdown();
932
864
    rpcs_.Shutdown();
933
864
  }
934
935
  CHECKED_STATUS GetStatus(const google::protobuf::RepeatedPtrField<std::string>& transaction_ids,
936
                           CoarseTimePoint deadline,
937
296k
                           tserver::GetTransactionStatusResponsePB* response) {
938
296k
    AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms);
939
296k
    auto leader_term = context_.LeaderTerm();
940
296k
    PostponedLeaderActions postponed_leader_actions;
941
296k
    {
942
296k
      std::unique_lock<std::mutex> lock(managed_mutex_);
943
296k
      HybridTime leader_safe_time;
944
296k
      postponed_leader_actions_.leader_term = leader_term;
945
296k
      for (const auto& transaction_id : transaction_ids) {
946
296k
        auto id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_id));
947
948
0
        auto it = managed_transactions_.find(id);
949
296k
        std::vector<ExpectedTabletBatches> expected_tablet_batches;
950
296k
        bool known_txn = it != managed_transactions_.end();
951
296k
        auto txn_status_with_ht = known_txn
952
296k
            ? VERIFY_RESULT(it->GetStatus(&expected_tablet_batches))
953
296k
            : 
TransactionStatusResult(TransactionStatus::ABORTED, HybridTime::kMax)63.0k
;
954
18.4E
        VLOG_WITH_PREFIX(4) << __func__ << ": " << id << " => " << txn_status_with_ht
955
18.4E
                            << ", last touch: " << it->last_touch();
956
296k
        if (txn_status_with_ht.status == TransactionStatus::SEALED) {
957
          // TODO(dtxn) Avoid concurrent resolve
958
0
          txn_status_with_ht = VERIFY_RESULT(ResolveSealedStatus(
959
0
              id, txn_status_with_ht.status_time, expected_tablet_batches,
960
0
              /* abort_if_not_replicated = */ false, &lock));
961
0
        }
962
296k
        if (!known_txn) {
963
63.0k
          if (!leader_safe_time) {
964
            // We should pick leader safe time only after managed_mutex_ is locked.
965
            // Otherwise applied transaction could be removed after this safe time.
966
63.0k
            leader_safe_time = VERIFY_RESULT(context_.LeaderSafeTime());
967
63.0k
          }
968
          // Please note that for known transactions we send 0, that means invalid hybrid time.
969
          // We would wait for safe time only for case when transaction is unknown to coordinator.
970
          // Since it is only case when transaction could be actually committed.
971
63.0k
          response->mutable_coordinator_safe_time()->Resize(response->status().size(), 0);
972
63.0k
          response->add_coordinator_safe_time(leader_safe_time.ToUint64());
973
63.0k
        }
974
296k
        response->add_status(txn_status_with_ht.status);
975
296k
        response->add_status_hybrid_time(txn_status_with_ht.status_time.ToUint64());
976
977
296k
        auto mutable_aborted_set_pb = response->add_aborted_subtxn_set();
978
296k
        if (txn_status_with_ht.status == TransactionStatus::COMMITTED &&
979
296k
            
it != managed_transactions_.end()35.2k
) {
980
35.2k
          *mutable_aborted_set_pb = it->GetAbortedSubTransactionSetPB();
981
35.2k
        }
982
296k
      }
983
296k
      postponed_leader_actions.Swap(&postponed_leader_actions_);
984
296k
    }
985
986
0
    ExecutePostponedLeaderActions(&postponed_leader_actions);
987
296k
    if (GetAtomicFlag(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms)) {
988
0
      if (response->status().size() > 0 && response->status(0) == TransactionStatus::PENDING) {
989
0
        AtomicFlagRandomSleepMs(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms);
990
0
      }
991
0
    }
992
296k
    return Status::OK();
993
296k
  }
994
995
  Result<TransactionStatusResult> ResolveSealedStatus(
996
      const TransactionId& transaction_id,
997
      HybridTime commit_time,
998
      const std::vector<ExpectedTabletBatches>& expected_tablet_batches,
999
      bool abort_if_not_replicated,
1000
0
      std::unique_lock<std::mutex>* lock) {
1001
0
    VLOG_WITH_PREFIX(4)
1002
0
        << __func__ << ", txn: " << transaction_id << ", commit time: " << commit_time
1003
0
        << ", expected tablet batches: " << AsString(expected_tablet_batches)
1004
0
        << ", abort if not replicated: " << abort_if_not_replicated;
1005
1006
0
    auto deadline = TransactionRpcDeadline();
1007
0
    auto now_ht = context_.clock().Now();
1008
0
    CountDownLatch latch(expected_tablet_batches.size());
1009
0
    std::vector<HybridTime> write_hybrid_times(expected_tablet_batches.size());
1010
0
    {
1011
0
      lock->unlock();
1012
0
      auto scope_exit = ScopeExit([lock] {
1013
0
        if (lock) {
1014
0
          lock->lock();
1015
0
        }
1016
0
      });
1017
0
      size_t idx = 0;
1018
0
      for (const auto& p : expected_tablet_batches) {
1019
0
        tserver::GetTransactionStatusAtParticipantRequestPB req;
1020
0
        req.set_tablet_id(p.tablet);
1021
0
        req.set_transaction_id(
1022
0
            pointer_cast<const char*>(transaction_id.data()), transaction_id.size());
1023
0
        req.set_propagated_hybrid_time(now_ht.ToUint64());
1024
0
        if (abort_if_not_replicated) {
1025
0
          req.set_required_num_replicated_batches(p.batches);
1026
0
        }
1027
1028
0
        auto handle = rpcs_.Prepare();
1029
0
        if (handle != rpcs_.InvalidHandle()) {
1030
0
          *handle = GetTransactionStatusAtParticipant(
1031
0
              deadline,
1032
0
              nullptr /* remote_tablet */,
1033
0
              context_.client_future().get(),
1034
0
              &req,
1035
0
              [this, handle, idx, &write_hybrid_times, &expected_tablet_batches, &latch,
1036
0
               &transaction_id, &p](
1037
0
                  const Status& status,
1038
0
                  const tserver::GetTransactionStatusAtParticipantResponsePB& resp) {
1039
0
                client::UpdateClock(resp, &context_);
1040
0
                rpcs_.Unregister(handle);
1041
1042
0
                VLOG_WITH_PREFIX(4)
1043
0
                    << "TXN: " << transaction_id << " batch status at " << p.tablet << ": "
1044
0
                    << "idx: " << idx << ", resp: " << resp.ShortDebugString() << ", expected: "
1045
0
                    << expected_tablet_batches[idx].batches;
1046
0
                if (status.ok()) {
1047
0
                  if (resp.aborted()) {
1048
0
                    write_hybrid_times[idx] = HybridTime::kMin;
1049
0
                  } else if (implicit_cast<size_t>(resp.num_replicated_batches()) ==
1050
0
                                 expected_tablet_batches[idx].batches) {
1051
0
                    write_hybrid_times[idx] = HybridTime(resp.status_hybrid_time());
1052
0
                    LOG_IF_WITH_PREFIX(DFATAL, !write_hybrid_times[idx].is_valid())
1053
0
                        << "Received invalid hybrid time when all batches were replicated: "
1054
0
                        << resp.ShortDebugString();
1055
0
                  }
1056
0
                }
1057
0
                latch.CountDown();
1058
0
              });
1059
0
          (**handle).SendRpc();
1060
0
        } else {
1061
0
          latch.CountDown();
1062
0
        }
1063
0
        ++idx;
1064
0
      }
1065
0
      latch.Wait();
1066
0
    }
1067
1068
0
    auto txn_it = managed_transactions_.find(transaction_id);
1069
0
    if (txn_it == managed_transactions_.end()) {
1070
      // Transaction was completed (aborted/committed) during this procedure.
1071
0
      return TransactionStatusResult{TransactionStatus::PENDING, commit_time.Decremented()};
1072
0
    }
1073
1074
0
    for (size_t idx = 0; idx != expected_tablet_batches.size(); ++idx) {
1075
0
      if (write_hybrid_times[idx] == HybridTime::kMin) {
1076
0
        managed_transactions_.modify(txn_it, [](TransactionState& state) {
1077
0
          state.Aborted();
1078
0
        });
1079
0
      } else if (write_hybrid_times[idx].is_valid()) {
1080
0
        managed_transactions_.modify(
1081
0
            txn_it, [idx, &expected_tablet_batches, &write_hybrid_times](TransactionState& state) {
1082
0
          state.ReplicatedAllBatchesAt(
1083
0
              expected_tablet_batches[idx].tablet, write_hybrid_times[idx]);
1084
0
        });
1085
0
      }
1086
0
    }
1087
0
    auto result = VERIFY_RESULT(txn_it->GetStatus(/* expected_tablet_batches = */ nullptr));
1088
0
    if (result.status != TransactionStatus::SEALED) {
1089
0
      VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status resolved: "
1090
0
                          << TransactionStatus_Name(result.status);
1091
0
      return result;
1092
0
    }
1093
1094
0
    VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status NOT resolved";
1095
0
    return TransactionStatusResult{TransactionStatus::PENDING, result.status_time.Decremented()};
1096
0
  }
1097
1098
186k
  void Abort(const std::string& transaction_id, int64_t term, TransactionAbortCallback callback) {
1099
186k
    AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms);
1100
1101
186k
    auto id = FullyDecodeTransactionId(transaction_id);
1102
186k
    if (!id.ok()) {
1103
0
      callback(id.status());
1104
0
      return;
1105
0
    }
1106
1107
186k
    PostponedLeaderActions actions;
1108
186k
    {
1109
186k
      std::unique_lock<std::mutex> lock(managed_mutex_);
1110
186k
      auto it = managed_transactions_.find(*id);
1111
186k
      if (it == managed_transactions_.end()) {
1112
3.14k
        lock.unlock();
1113
3.14k
        callback(TransactionStatusResult::Aborted());
1114
3.14k
        return;
1115
3.14k
      }
1116
183k
      postponed_leader_actions_.leader_term = term;
1117
183k
      boost::optional<TransactionStatusResult> status;
1118
183k
      managed_transactions_.modify(it, [&status, &callback](TransactionState& state) {
1119
183k
        status = state.Abort(&callback);
1120
183k
      });
1121
183k
      if (callback) {
1122
22.5k
        lock.unlock();
1123
22.5k
        callback(*status);
1124
22.5k
        return;
1125
22.5k
      }
1126
160k
      actions.Swap(&postponed_leader_actions_);
1127
160k
    }
1128
1129
0
    ExecutePostponedLeaderActions(&actions);
1130
160k
  }
1131
1132
0
  size_t test_count_transactions() {
1133
0
    std::lock_guard<std::mutex> lock(managed_mutex_);
1134
0
    return managed_transactions_.size();
1135
0
  }
1136
1137
3.18M
  CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) {
1138
3.18M
    auto id = FullyDecodeTransactionId(data.state.transaction_id());
1139
3.18M
    if (!id.ok()) {
1140
0
      return std::move(id.status());
1141
0
    }
1142
1143
3.18M
    PostponedLeaderActions actions;
1144
3.18M
    Status result;
1145
3.18M
    {
1146
3.18M
      std::lock_guard<std::mutex> lock(managed_mutex_);
1147
3.18M
      postponed_leader_actions_.leader_term = data.leader_term;
1148
3.18M
      auto it = GetTransaction(*id, data.state.status(), data.hybrid_time);
1149
3.18M
      if (it == managed_transactions_.end()) {
1150
0
        return Status::OK();
1151
0
      }
1152
3.18M
      
managed_transactions_.modify(it, [&result, &data](TransactionState& state) 3.18M
{
1153
3.18M
        result = state.ProcessReplicated(data);
1154
3.18M
      });
1155
3.18M
      CheckCompleted(it);
1156
3.18M
      actions.Swap(&postponed_leader_actions_);
1157
3.18M
    }
1158
0
    ExecutePostponedLeaderActions(&actions);
1159
1160
18.4E
    VLOG_WITH_PREFIX(1) << "Processed: " << data.ToString();
1161
3.18M
    return result;
1162
3.18M
  }
1163
1164
0
  void ProcessAborted(const AbortedData& data) {
1165
0
    auto id = FullyDecodeTransactionId(data.state.transaction_id());
1166
0
    if (!id.ok()) {
1167
0
      LOG_WITH_PREFIX(DFATAL) << "Abort of transaction with bad id "
1168
0
                              << data.state.ShortDebugString() << ": " << id.status();
1169
0
      return;
1170
0
    }
1171
1172
0
    PostponedLeaderActions actions;
1173
0
    {
1174
0
      std::lock_guard<std::mutex> lock(managed_mutex_);
1175
0
      postponed_leader_actions_.leader_term = OpId::kUnknownTerm;
1176
0
      auto it = managed_transactions_.find(*id);
1177
0
      if (it == managed_transactions_.end()) {
1178
0
        LOG_WITH_PREFIX(WARNING) << "Aborted operation for unknown transaction: " << *id;
1179
0
        return;
1180
0
      }
1181
0
      managed_transactions_.modify(
1182
0
          it, [&](TransactionState& ts) {
1183
0
            ts.ProcessAborted(data);
1184
0
          });
1185
0
      CheckCompleted(it);
1186
0
      actions.Swap(&postponed_leader_actions_);
1187
0
    }
1188
0
    ExecutePostponedLeaderActions(&actions);
1189
1190
0
    VLOG_WITH_PREFIX(1) << "Aborted, state: " << data.state.ShortDebugString()
1191
0
                        << ", op id: " << data.op_id;
1192
0
  }
1193
1194
48.0k
  void Start() {
1195
48.0k
    poller_.Start(
1196
48.0k
        &context_.client_future().get()->messenger()->scheduler(),
1197
48.0k
        std::chrono::microseconds(kTimeMultiplier * FLAGS_transaction_check_interval_usec));
1198
48.0k
  }
1199
1200
1.16M
  void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) {
1201
1.16M
    auto& state = *request->request();
1202
1.16M
    auto id = FullyDecodeTransactionId(state.transaction_id());
1203
1.16M
    if (!id.ok()) {
1204
0
      LOG(WARNING) << "Failed to decode id from " << state.ShortDebugString() << ": " << id;
1205
0
      request->CompleteWithStatus(id.status());
1206
0
      return;
1207
0
    }
1208
1209
1.16M
    PostponedLeaderActions actions;
1210
1.16M
    {
1211
1.16M
      std::unique_lock<std::mutex> lock(managed_mutex_);
1212
1.16M
      postponed_leader_actions_.leader_term = term;
1213
1.16M
      auto it = managed_transactions_.find(*id);
1214
1.16M
      if (it == managed_transactions_.end()) {
1215
469k
        if (state.status() == TransactionStatus::CREATED) {
1216
410k
          it = managed_transactions_.emplace(
1217
410k
              this, *id, context_.clock().Now(), log_prefix_).first;
1218
410k
        } else {
1219
58.5k
          lock.unlock();
1220
58.5k
          YB_LOG_HIGHER_SEVERITY_WHEN_TOO_MANY(INFO, WARNING, 1s, 50)
1221
58.5k
              << LogPrefix() << "Request to unknown transaction " << id << ": "
1222
58.5k
              << state.ShortDebugString();
1223
58.5k
          auto status = STATUS_EC_FORMAT(
1224
58.5k
              Expired, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE),
1225
58.5k
              "Transaction $0 expired or aborted by a conflict", *id);
1226
58.5k
          status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted));
1227
58.5k
          request->CompleteWithStatus(status);
1228
58.5k
          return;
1229
58.5k
        }
1230
469k
      }
1231
1232
1.10M
      
managed_transactions_.modify(it, [&request](TransactionState& state) 1.10M
{
1233
1.10M
        state.Handle(std::move(request));
1234
1.10M
      });
1235
1.10M
      postponed_leader_actions_.Swap(&actions);
1236
1.10M
    }
1237
1238
0
    ExecutePostponedLeaderActions(&actions);
1239
1.10M
  }
1240
1241
10.4M
  int64_t PrepareGC(std::string* details) {
1242
10.4M
    std::lock_guard<std::mutex> lock(managed_mutex_);
1243
10.4M
    if (!managed_transactions_.empty()) {
1244
396k
      auto& txn = *managed_transactions_.get<FirstEntryIndexTag>().begin();
1245
396k
      if (details) {
1246
0
        *details += Format("Transaction coordinator: $0\n", txn);
1247
0
      }
1248
396k
      return txn.first_entry_raft_index();
1249
396k
    }
1250
10.0M
    return std::numeric_limits<int64_t>::max();
1251
10.4M
  }
1252
1253
  // Returns logs prefix for this transaction coordinator.
1254
58.5k
  const std::string& LogPrefix() {
1255
58.5k
    return log_prefix_;
1256
58.5k
  }
1257
1258
0
  std::string DumpTransactions() {
1259
0
    std::string result;
1260
0
    std::lock_guard<std::mutex> lock(managed_mutex_);
1261
0
    for (const auto& txn : managed_transactions_) {
1262
0
      result += txn.ToString();
1263
0
      result += "\n";
1264
0
    }
1265
0
    return result;
1266
0
  }
1267
1268
 private:
1269
  class LastTouchTag;
1270
  class FirstEntryIndexTag;
1271
1272
  typedef boost::multi_index_container<TransactionState,
1273
      boost::multi_index::indexed_by <
1274
          boost::multi_index::hashed_unique <
1275
              boost::multi_index::const_mem_fun<TransactionState,
1276
                                                const TransactionId&,
1277
                                                &TransactionState::id>
1278
          >,
1279
          boost::multi_index::ordered_non_unique <
1280
              boost::multi_index::tag<LastTouchTag>,
1281
              boost::multi_index::const_mem_fun<TransactionState,
1282
                                                HybridTime,
1283
                                                &TransactionState::last_touch>
1284
          >,
1285
          boost::multi_index::ordered_non_unique <
1286
              boost::multi_index::tag<FirstEntryIndexTag>,
1287
              boost::multi_index::const_mem_fun<TransactionState,
1288
                                                int64_t,
1289
                                                &TransactionState::first_entry_raft_index>
1290
          >
1291
      >
1292
  > ManagedTransactions;
1293
1294
  void SendUpdateTransactionRequest(
1295
      const NotifyApplyingData& action, HybridTime now,
1296
436k
      const CoarseTimePoint& deadline) {
1297
18.4E
    VLOG_WITH_PREFIX(3) << "Notify applying: " << action.ToString();
1298
1299
436k
    tserver::UpdateTransactionRequestPB req;
1300
436k
    req.set_tablet_id(action.tablet);
1301
436k
    req.set_propagated_hybrid_time(now.ToUint64());
1302
436k
    auto& state = *req.mutable_state();
1303
436k
    state.set_transaction_id(action.transaction.data(), action.transaction.size());
1304
436k
    state.set_status(TransactionStatus::APPLYING);
1305
436k
    state.add_tablets(context_.tablet_id());
1306
436k
    state.set_commit_hybrid_time(action.commit_time.ToUint64());
1307
436k
    state.set_sealed(action.sealed);
1308
436k
    *state.mutable_aborted() = action.aborted;
1309
1310
436k
    auto handle = rpcs_.Prepare();
1311
436k
    if (handle != rpcs_.InvalidHandle()) {
1312
436k
      *handle = UpdateTransaction(
1313
436k
          deadline,
1314
436k
          nullptr /* remote_tablet */,
1315
436k
          context_.client_future().get(),
1316
436k
          &req,
1317
436k
          [this, handle, action]
1318
436k
              (const Status& status,
1319
436k
               const tserver::UpdateTransactionRequestPB& req,
1320
436k
               const tserver::UpdateTransactionResponsePB& resp) {
1321
436k
            client::UpdateClock(resp, &context_);
1322
436k
            rpcs_.Unregister(handle);
1323
436k
            if (
status.ok()436k
) {
1324
436k
              return;
1325
436k
            }
1326
18.4E
            LOG_WITH_PREFIX(WARNING)
1327
18.4E
                << "Failed to send apply for transaction: " << action.transaction << ": "
1328
18.4E
                << status;
1329
18.4E
            const auto split_child_tablet_ids = SplitChildTabletIdsData(status).value();
1330
18.4E
            const bool tablet_has_been_split = !split_child_tablet_ids.empty();
1331
18.4E
            if (status.IsNotFound() || 
tablet_has_been_split5
) {
1332
27
              std::lock_guard<std::mutex> lock(managed_mutex_);
1333
27
              auto it = managed_transactions_.find(action.transaction);
1334
27
              if (it == managed_transactions_.end()) {
1335
0
                return;
1336
0
              }
1337
27
              managed_transactions_.modify(
1338
27
                  it, [this, &action, &split_child_tablet_ids,
1339
27
                       tablet_has_been_split](TransactionState& state) {
1340
27
                    if (tablet_has_been_split) {
1341
                      // We need to update involved tablets map.
1342
0
                      LOG_WITH_PREFIX(INFO) << Format(
1343
0
                          "Tablet $0 has been split into: $1", action.tablet,
1344
0
                          split_child_tablet_ids);
1345
0
                      state.AddInvolvedTablets(action.tablet, split_child_tablet_ids);
1346
27
                    } else {
1347
                      // Tablet has been deleted (not split), so we should mark it as applied to
1348
                      // be able to cleanup the transaction.
1349
27
                      TransactionStatePB transaction_state;
1350
27
                      transaction_state.add_tablets(action.tablet);
1351
27
                      WARN_NOT_OK(
1352
27
                          state.AppliedInOneOfInvolvedTablets(transaction_state),
1353
27
                          "AppliedInOneOfInvolvedTablets for removed tabled failed: ");
1354
27
                    }
1355
27
                  });
1356
27
              if (tablet_has_been_split) {
1357
0
                const auto new_deadline = TransactionRpcDeadline();
1358
0
                NotifyApplyingData new_action = action;
1359
0
                for (const auto& split_child_tablet_id : split_child_tablet_ids) {
1360
0
                  new_action.tablet = split_child_tablet_id;
1361
0
                  SendUpdateTransactionRequest(new_action, context_.clock().Now(), new_deadline);
1362
0
                }
1363
0
              }
1364
27
            }
1365
18.4E
          });
1366
436k
      (**handle).SendRpc();
1367
436k
    }
1368
436k
  }
1369
1370
10.0M
  void ExecutePostponedLeaderActions(PostponedLeaderActions* actions) {
1371
10.0M
    for (const auto& p : actions->complete_with_status) {
1372
443k
      p.request->CompleteWithStatus(p.status);
1373
443k
    }
1374
1375
10.0M
    if (!actions->leader()) {
1376
6.12M
      return;
1377
6.12M
    }
1378
1379
3.96M
    if (!actions->notify_applying.empty()) {
1380
198k
      auto now = context_.clock().Now();
1381
198k
      auto deadline = TransactionRpcDeadline();
1382
436k
      for (const auto& action : actions->notify_applying) {
1383
436k
        SendUpdateTransactionRequest(action, now, deadline);
1384
436k
      }
1385
198k
    }
1386
1387
3.96M
    for (auto& update : actions->updates) {
1388
1.06M
      context_.SubmitUpdateTransaction(std::move(update), actions->leader_term);
1389
1.06M
    }
1390
3.96M
  }
1391
1392
  ManagedTransactions::iterator GetTransaction(const TransactionId& id,
1393
                                               TransactionStatus status,
1394
3.18M
                                               HybridTime hybrid_time) {
1395
3.18M
    auto it = managed_transactions_.find(id);
1396
3.18M
    if (it == managed_transactions_.end()) {
1397
816k
      if (
status != TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS816k
) {
1398
816k
        it = managed_transactions_.emplace(this, id, hybrid_time, log_prefix_).first;
1399
18.4E
        VLOG_WITH_PREFIX(1) << Format("Added: $0", *it);
1400
816k
      }
1401
816k
    }
1402
3.18M
    return it;
1403
3.18M
  }
1404
1405
2.02M
  TransactionCoordinatorContext& coordinator_context() override {
1406
2.02M
    return context_;
1407
2.02M
  }
1408
1409
436k
  void NotifyApplying(NotifyApplyingData data) override {
1410
436k
    if (!leader()) {
1411
0
      LOG_WITH_PREFIX(WARNING) << __func__ << " at non leader: " << data.ToString();
1412
0
      return;
1413
0
    }
1414
436k
    postponed_leader_actions_.notify_applying.push_back(std::move(data));
1415
436k
  }
1416
1417
  MUST_USE_RESULT bool SubmitUpdateTransaction(
1418
1.07M
      std::unique_ptr<UpdateTxnOperation> operation) override {
1419
1.07M
    if (!postponed_leader_actions_.leader()) {
1420
21
      auto status = STATUS(IllegalState, "Submit update transaction on non leader");
1421
21
      
VLOG_WITH_PREFIX0
(1) << status0
;
1422
21
      operation->CompleteWithStatus(status);
1423
21
      return false;
1424
21
    }
1425
1426
1.07M
    postponed_leader_actions_.updates.push_back(std::move(operation));
1427
1.07M
    return true;
1428
1.07M
  }
1429
1430
  void CompleteWithStatus(
1431
443k
      std::unique_ptr<UpdateTxnOperation> request, Status status) override {
1432
443k
    auto ptr = request.get();
1433
443k
    postponed_leader_actions_.complete_with_status.push_back({
1434
443k
        std::move(request), ptr, std::move(status)});
1435
443k
  }
1436
1437
0
  void CompleteWithStatus(UpdateTxnOperation* request, Status status) override {
1438
0
    postponed_leader_actions_.complete_with_status.push_back({
1439
0
        nullptr /* holder */, request, std::move(status)});
1440
0
  }
1441
1442
2.41M
  bool leader() const override {
1443
2.41M
    return postponed_leader_actions_.leader();
1444
2.41M
  }
1445
1446
5.52M
  void Poll() {
1447
5.52M
    auto now = context_.clock().Now();
1448
1449
5.52M
    auto leader_term = context_.LeaderTerm();
1450
5.52M
    bool leader = leader_term != OpId::kUnknownTerm;
1451
5.52M
    PostponedLeaderActions actions;
1452
5.52M
    {
1453
5.52M
      std::lock_guard<std::mutex> lock(managed_mutex_);
1454
5.52M
      postponed_leader_actions_.leader_term = leader_term;
1455
1456
5.52M
      auto& index = managed_transactions_.get<LastTouchTag>();
1457
1458
5.52M
      if (VLOG_IS_ON(4) && 
leader3
&&
!index.empty()2
) {
1459
0
        const auto& txn = *index.begin();
1460
0
        LOG_WITH_PREFIX(INFO)
1461
0
            << __func__ << ", now: " << now << ", first: " << txn.ToString()
1462
0
            << ", expired: " << txn.ExpiredAt(now) << ", timeout: "
1463
0
            << MonoDelta(GetTransactionTimeout()) << ", passed: "
1464
0
            << MonoDelta::FromMicroseconds(
1465
0
                   now.GetPhysicalValueMicros() - txn.last_touch().GetPhysicalValueMicros());
1466
0
      }
1467
1468
5.52M
      for (auto it = index.begin(); it != index.end() && 
it->ExpiredAt(now)206k
;) {
1469
1.50k
        if (it->status() == TransactionStatus::ABORTED) {
1470
0
          it = index.erase(it);
1471
1.50k
        } else {
1472
1.50k
          if (leader) {
1473
248
            expired_metric_.Increment();
1474
248
            bool modified = index.modify(it, [](TransactionState& state) {
1475
248
              VLOG
(4) << state.LogPrefix() << "Cleanup expired transaction"0
;
1476
248
              state.Abort();
1477
248
            });
1478
248
            DCHECK(modified);
1479
248
          }
1480
1.50k
          ++it;
1481
1.50k
        }
1482
1.50k
      }
1483
5.52M
      auto now_physical = MonoTime::Now();
1484
5.52M
      for (auto& transaction : managed_transactions_) {
1485
450k
        const_cast<TransactionState&>(transaction).Poll(leader, now_physical);
1486
450k
      }
1487
5.52M
      postponed_leader_actions_.Swap(&actions);
1488
5.52M
    }
1489
5.52M
    ExecutePostponedLeaderActions(&actions);
1490
5.52M
  }
1491
1492
3.19M
  void CheckCompleted(ManagedTransactions::iterator it) {
1493
3.19M
    if (it->Completed()) {
1494
1.21M
      auto status = STATUS_FORMAT(Expired, "Transaction completed: $0", *it);
1495
18.4E
      VLOG_WITH_PREFIX(1) << status;
1496
1.21M
      managed_transactions_.modify(it, [&status](TransactionState& state) {
1497
1.21M
        state.ClearRequests(status);
1498
1.21M
      });
1499
1.21M
      managed_transactions_.erase(it);
1500
1.21M
    }
1501
3.19M
  }
1502
1503
  TransactionCoordinatorContext& context_;
1504
  Counter& expired_metric_;
1505
  const std::string log_prefix_;
1506
1507
  std::mutex managed_mutex_;
1508
  ManagedTransactions managed_transactions_;
1509
1510
  // Actions that should be executed after mutex is unlocked.
1511
  PostponedLeaderActions postponed_leader_actions_;
1512
1513
  rpc::Poller poller_;
1514
  rpc::Rpcs rpcs_;
1515
};
1516
1517
TransactionCoordinator::TransactionCoordinator(const std::string& permanent_uuid,
1518
                                               TransactionCoordinatorContext* context,
1519
                                               Counter* expired_metric)
1520
48.0k
    : impl_(new Impl(permanent_uuid, context, expired_metric)) {
1521
48.0k
}
1522
1523
432
TransactionCoordinator::~TransactionCoordinator() {
1524
432
}
1525
1526
3.18M
Status TransactionCoordinator::ProcessReplicated(const ReplicatedData& data) {
1527
3.18M
  return impl_->ProcessReplicated(data);
1528
3.18M
}
1529
1530
0
void TransactionCoordinator::ProcessAborted(const AbortedData& data) {
1531
0
  impl_->ProcessAborted(data);
1532
0
}
1533
1534
10.4M
int64_t TransactionCoordinator::PrepareGC(std::string* details) {
1535
10.4M
  return impl_->PrepareGC(details);
1536
10.4M
}
1537
1538
0
size_t TransactionCoordinator::test_count_transactions() const {
1539
0
  return impl_->test_count_transactions();
1540
0
}
1541
1542
void TransactionCoordinator::Handle(
1543
1.16M
    std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) {
1544
1.16M
  impl_->Handle(std::move(request), term);
1545
1.16M
}
1546
1547
47.9k
void TransactionCoordinator::Start() {
1548
47.9k
  impl_->Start();
1549
47.9k
}
1550
1551
432
void TransactionCoordinator::Shutdown() {
1552
432
  impl_->Shutdown();
1553
432
}
1554
1555
Status TransactionCoordinator::GetStatus(
1556
    const google::protobuf::RepeatedPtrField<std::string>& transaction_ids,
1557
    CoarseTimePoint deadline,
1558
296k
    tserver::GetTransactionStatusResponsePB* response) {
1559
296k
  return impl_->GetStatus(transaction_ids, deadline, response);
1560
296k
}
1561
1562
void TransactionCoordinator::Abort(const std::string& transaction_id,
1563
                                   int64_t term,
1564
186k
                                   TransactionAbortCallback callback) {
1565
186k
  impl_->Abort(transaction_id, term, std::move(callback));
1566
186k
}
1567
1568
0
std::string TransactionCoordinator::DumpTransactions() {
1569
0
  return impl_->DumpTransactions();
1570
0
}
1571
1572
1
std::string TransactionCoordinator::ReplicatedData::ToString() const {
1573
1
  return Format("{ leader_term: $0 state: $1 op_id: $2 hybrid_time: $3 }",
1574
1
                leader_term, state, op_id, hybrid_time);
1575
1
}
1576
1577
} // namespace tablet
1578
} // namespace yb