YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_coordinator.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/tablet/transaction_coordinator.h"
17
18
#include <boost/multi_index/hashed_index.hpp>
19
#include <boost/multi_index/mem_fun.hpp>
20
#include <boost/multi_index/ordered_index.hpp>
21
#include <boost/multi_index_container.hpp>
22
23
#include "yb/client/client.h"
24
#include "yb/client/transaction_rpc.h"
25
26
#include "yb/common/common.pb.h"
27
#include "yb/common/entity_ids.h"
28
#include "yb/common/pgsql_error.h"
29
#include "yb/common/transaction.h"
30
#include "yb/common/transaction_error.h"
31
#include "yb/common/wire_protocol.h"
32
33
#include "yb/consensus/consensus_round.h"
34
#include "yb/consensus/consensus_util.h"
35
36
#include "yb/docdb/transaction_dump.h"
37
38
#include "yb/rpc/messenger.h"
39
#include "yb/rpc/poller.h"
40
#include "yb/rpc/rpc.h"
41
42
#include "yb/server/clock.h"
43
44
#include "yb/tablet/operations/update_txn_operation.h"
45
46
#include "yb/tserver/tserver_service.pb.h"
47
48
#include "yb/util/atomic.h"
49
#include "yb/util/countdown_latch.h"
50
#include "yb/util/enums.h"
51
#include "yb/util/flag_tags.h"
52
#include "yb/util/format.h"
53
#include "yb/util/logging.h"
54
#include "yb/util/metrics.h"
55
#include "yb/util/random_util.h"
56
#include "yb/util/result.h"
57
#include "yb/util/scope_exit.h"
58
#include "yb/util/status_format.h"
59
#include "yb/util/status_log.h"
60
#include "yb/util/tsan_util.h"
61
#include "yb/util/yb_pg_errcodes.h"
62
63
DECLARE_uint64(transaction_heartbeat_usec);
64
DEFINE_double(transaction_max_missed_heartbeat_periods, 10.0,
65
              "Maximum heartbeat periods that a pending transaction can miss before the "
66
              "transaction coordinator expires the transaction. The total expiration time in "
67
              "microseconds is transaction_heartbeat_usec times "
68
              "transaction_max_missed_heartbeat_periods. The value passed to this flag may be "
69
              "fractional.");
70
DEFINE_uint64(transaction_check_interval_usec, 500000, "Transaction check interval in usec.");
71
DEFINE_uint64(transaction_resend_applying_interval_usec, 5000000,
72
              "Transaction resend applying interval in usec.");
73
74
DEFINE_int64(avoid_abort_after_sealing_ms, 20,
75
             "If transaction was only sealed, we will try to abort it not earlier than this "
76
                 "period in milliseconds.");
77
78
DEFINE_test_flag(uint64, inject_txn_get_status_delay_ms, 0,
79
                 "Inject specified delay to transaction get status requests.");
80
DEFINE_test_flag(int64, inject_random_delay_on_txn_status_response_ms, 0,
81
                 "Inject a random amount of delay to the thread processing a "
82
                 "GetTransactionStatusRequest after it has populated it's response. This could "
83
                 "help simulate e.g. out-of-order responses where PENDING is received by client "
84
                 "after a COMMITTED response.");
85
86
using namespace std::literals;
87
using namespace std::placeholders;
88
89
namespace yb {
90
namespace tablet {
91
92
466k
std::chrono::microseconds GetTransactionTimeout() {
93
466k
  const double timeout = GetAtomicFlag(&FLAGS_transaction_max_missed_heartbeat_periods) *
94
466k
                         GetAtomicFlag(&FLAGS_transaction_heartbeat_usec);
95
  // Cast to avoid -Wimplicit-int-float-conversion.
96
466k
  return timeout >= static_cast<double>(std::chrono::microseconds::max().count())
97
0
      ? std::chrono::microseconds::max()
98
466k
      : std::chrono::microseconds(static_cast<int64_t>(timeout));
99
466k
}
100
101
namespace {
102
103
struct NotifyApplyingData {
104
  TabletId tablet;
105
  TransactionId transaction;
106
  const AbortedSubTransactionSetPB& aborted;
107
  HybridTime commit_time;
108
  bool sealed;
109
110
0
  std::string ToString() const {
111
0
    return Format("{ tablet: $0 transaction: $1 commit_time: $2 sealed: $3}",
112
0
                  tablet, transaction, commit_time, sealed);
113
0
  }
114
};
115
116
struct ExpectedTabletBatches {
117
  TabletId tablet;
118
  size_t batches;
119
120
0
  std::string ToString() const {
121
0
    return Format("{ tablet: $0 batches: $1 }", tablet, batches);
122
0
  }
123
};
124
125
// Context for transaction state. I.e. access to external facilities required by
126
// transaction state to do its job.
127
class TransactionStateContext {
128
 public:
129
  virtual TransactionCoordinatorContext& coordinator_context() = 0;
130
131
  virtual void NotifyApplying(NotifyApplyingData data) = 0;
132
133
  // Submits update transaction to the RAFT log. Returns false if was not able to submit.
134
  virtual MUST_USE_RESULT bool SubmitUpdateTransaction(
135
      std::unique_ptr<UpdateTxnOperation> operation) = 0;
136
137
  virtual void CompleteWithStatus(
138
      std::unique_ptr<UpdateTxnOperation> request, Status status) = 0;
139
140
  virtual void CompleteWithStatus(UpdateTxnOperation* request, Status status) = 0;
141
142
  virtual bool leader() const = 0;
143
144
 protected:
145
128
  ~TransactionStateContext() {}
146
};
147
148
727k
std::string BuildLogPrefix(const std::string& parent_log_prefix, const TransactionId& id) {
149
727k
  auto id_string = id.ToString();
150
727k
  return parent_log_prefix.substr(0, parent_log_prefix.length() - 2) + " ID " + id_string + ": ";
151
727k
}
152
153
// TransactionState keeps state of single transaction.
154
// User of this class should guarantee that it does NOT invoke methods concurrently.
155
class TransactionState {
156
 public:
157
  explicit TransactionState(TransactionStateContext* context,
158
                            const TransactionId& id,
159
                            HybridTime last_touch,
160
                            const std::string& parent_log_prefix)
161
      : context_(*context),
162
        id_(id),
163
        log_prefix_(BuildLogPrefix(parent_log_prefix, id)),
164
727k
        last_touch_(last_touch) {
165
727k
  }
166
167
716k
  ~TransactionState() {
168
716k
    DCHECK(abort_waiters_.empty());
169
716k
    DCHECK(request_queue_.empty());
170
74
    DCHECK(replicating_ == nullptr) << Format("Replicating: $0", static_cast<void*>(replicating_));
171
716k
  }
172
173
  // Id of transaction.
174
10.1M
  const TransactionId& id() const {
175
10.1M
    return id_;
176
10.1M
  }
177
178
  // Time when we last heard from transaction. I.e. hybrid time of replicated raft log entry
179
  // that updates status of this transaction.
180
9.50M
  HybridTime last_touch() const {
181
9.50M
    return last_touch_;
182
9.50M
  }
183
184
  // Status of transaction.
185
381
  TransactionStatus status() const {
186
381
    return status_;
187
381
  }
188
189
  // RAFT index of first RAFT log entry required by this transaction.
190
10.6M
  int64_t first_entry_raft_index() const {
191
10.6M
    return first_entry_raft_index_;
192
10.6M
  }
193
194
716k
  std::string ToString() const {
195
716k
    return Format("{ id: $0 last_touch: $1 status: $2 involved_tablets: $3 replicating: $4 "
196
716k
                      " request_queue: $5 first_entry_raft_index: $6 }",
197
716k
                  id_, last_touch_, TransactionStatus_Name(status_),
198
716k
                  involved_tablets_, replicating_, request_queue_, first_entry_raft_index_);
199
716k
  }
200
201
  // Whether this transaction expired at specified time.
202
470k
  bool ExpiredAt(HybridTime now) const {
203
470k
    if (ShouldBeCommitted() || ShouldBeInStatus(TransactionStatus::SEALED)) {
204
4.10k
      return false;
205
4.10k
    }
206
465k
    const int64_t passed = now.GetPhysicalValueMicros() - last_touch_.GetPhysicalValueMicros();
207
465k
    if (std::chrono::microseconds(passed) > GetTransactionTimeout()) {
208
45.8k
      return true;
209
45.8k
    }
210
420k
    return false;
211
420k
  }
212
213
  // Whether this transaction has completed.
214
1.87M
  bool Completed() const {
215
1.87M
    return status_ == TransactionStatus::ABORTED ||
216
1.46M
           status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS;
217
1.87M
  }
218
219
  // Applies new state to transaction.
220
1.87M
  CHECKED_STATUS ProcessReplicated(const TransactionCoordinator::ReplicatedData& data) {
221
161
    VLOG_WITH_PREFIX(4)
222
161
        << Format("ProcessReplicated: $0, replicating: $1", data, replicating_);
223
224
1.87M
    if (replicating_ != nullptr) {
225
624k
      auto replicating_op_id = replicating_->consensus_round()->id();
226
624k
      if (!replicating_op_id.empty()) {
227
624k
        if (replicating_op_id != data.op_id) {
228
0
          LOG_WITH_PREFIX(DFATAL)
229
0
              << "Replicated unexpected operation, replicating: " << AsString(replicating_)
230
0
              << ", replicated: " << AsString(data);
231
0
        }
232
18.4E
      } else if (data.leader_term != OpId::kUnknownTerm) {
233
0
        LOG_WITH_PREFIX(DFATAL)
234
0
            << "Leader replicated operation without op id, replicating: " << AsString(replicating_)
235
0
            << ", replicated: " << AsString(data);
236
18.4E
      } else {
237
18.4E
        LOG_WITH_PREFIX(INFO) << "Cancel replicating without id: " << AsString(replicating_)
238
18.4E
                              << ", because " << AsString(data) << " was replicated";
239
18.4E
      }
240
624k
      replicating_ = nullptr;
241
624k
    }
242
243
1.87M
    auto status = DoProcessReplicated(data);
244
245
1.87M
    if (data.leader_term == OpId::kUnknownTerm) {
246
1.24M
      ClearRequests(STATUS(IllegalState, "Leader changed"));
247
624k
    } else {
248
624k
      switch(status_) {
249
104k
        case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
250
104k
          ClearRequests(STATUS(AlreadyPresent, "Transaction committed"));
251
104k
          break;
252
134k
        case TransactionStatus::ABORTED:
253
134k
          ClearRequests(
254
134k
              STATUS(Expired, "Transaction aborted",
255
134k
                     TransactionError(TransactionErrorCode::kAborted)));
256
134k
          break;
257
0
        case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
258
281k
        case TransactionStatus::PENDING: FALLTHROUGH_INTENDED;
259
281k
        case TransactionStatus::SEALED: FALLTHROUGH_INTENDED;
260
385k
        case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED;
261
385k
        case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
262
385k
        case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED;
263
385k
        case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
264
385k
        case TransactionStatus::GRACEFUL_CLEANUP:
265
385k
          ProcessQueue();
266
385k
          break;
267
1.87M
      }
268
1.87M
    }
269
270
1.87M
    return status;
271
1.87M
  }
272
273
0
  void ProcessAborted(const TransactionCoordinator::AbortedData& data) {
274
0
    VLOG_WITH_PREFIX(4) << Format("ProcessAborted: $0, replicating: $1", data.state, replicating_);
275
276
0
    LOG_IF(DFATAL,
277
0
           replicating_ != nullptr && !replicating_->op_id().empty() &&
278
0
           replicating_->op_id() != data.op_id)
279
0
        << "Aborted wrong operation, expected " << AsString(replicating_) << ", but "
280
0
        << AsString(data) << " aborted";
281
282
0
    replicating_ = nullptr;
283
284
    // We are not leader, so could abort all queued requests.
285
0
    ClearRequests(STATUS(Aborted, "Replication failed"));
286
0
  }
287
288
  // Clear requests of this transaction.
289
2.20M
  void ClearRequests(const Status& status) {
290
142
    VLOG_WITH_PREFIX(4) << Format("ClearRequests: $0, replicating: $1", status, replicating_);
291
2.20M
    if (replicating_ != nullptr) {
292
0
      context_.CompleteWithStatus(replicating_, status);
293
0
      replicating_ = nullptr;
294
0
    }
295
296
3.03k
    for (auto& entry : request_queue_) {
297
3.03k
      context_.CompleteWithStatus(std::move(entry), status);
298
3.03k
    }
299
2.20M
    request_queue_.clear();
300
301
2.20M
    NotifyAbortWaiters(status);
302
2.20M
  }
303
304
  // Used only during transaction sealing.
305
0
  void ReplicatedAllBatchesAt(const TabletId& tablet, HybridTime last_time) {
306
0
    auto it = involved_tablets_.find(tablet);
307
    // We could be notified several times, so avoid double handling.
308
0
    if (it == involved_tablets_.end() || it->second.all_batches_replicated) {
309
0
      return;
310
0
    }
311
312
    // If transaction was sealed, then its commit time is max of seal record time and intent
313
    // replication times from all participating tablets.
314
0
    commit_time_ = std::max(commit_time_, last_time);
315
0
    --tablets_with_not_replicated_batches_;
316
0
    it->second.all_batches_replicated = true;
317
318
0
    if (tablets_with_not_replicated_batches_ == 0) {
319
0
      StartApply();
320
0
    }
321
0
  }
322
323
19.0k
  const AbortedSubTransactionSetPB& GetAbortedSubTransactionSetPB() const { return aborted_; }
324
325
  Result<TransactionStatusResult> GetStatus(
326
158k
      std::vector<ExpectedTabletBatches>* expected_tablet_batches) const {
327
158k
    switch (status_) {
328
19.0k
      case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED;
329
19.0k
      case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
330
19.0k
        return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_};
331
0
      case TransactionStatus::SEALED:
332
0
        if (tablets_with_not_replicated_batches_ == 0) {
333
0
          return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_};
334
0
        }
335
0
        FillExpectedTabletBatches(expected_tablet_batches);
336
0
        return TransactionStatusResult{TransactionStatus::SEALED, commit_time_};
337
0
      case TransactionStatus::ABORTED:
338
0
        return TransactionStatusResult{TransactionStatus::ABORTED, HybridTime::kMax};
339
139k
      case TransactionStatus::PENDING: {
340
139k
        HybridTime status_ht;
341
139k
        if (replicating_) {
342
56.2k
          auto replicating_status = replicating_->request()->status();
343
56.2k
          if (replicating_status == TransactionStatus::COMMITTED ||
344
45.3k
              replicating_status == TransactionStatus::ABORTED) {
345
45.3k
            auto replicating_ht = replicating_->hybrid_time_even_if_unset();
346
45.3k
            if (replicating_ht.is_valid()) {
347
39.9k
              status_ht = replicating_ht;
348
5.42k
            } else {
349
              // Hybrid time now yet assigned to replicating, so assign more conservative time,
350
              // that is guaranteed to be less then replicating time. See GH #9981.
351
5.42k
              status_ht = replicating_submit_time_;
352
5.42k
            }
353
45.3k
          }
354
56.2k
        }
355
139k
        if (!status_ht) {
356
93.8k
          status_ht = context_.coordinator_context().clock().Now();
357
93.8k
        }
358
139k
        status_ht = std::min(status_ht, context_.coordinator_context().HtLeaseExpiration());
359
139k
        return TransactionStatusResult{TransactionStatus::PENDING, status_ht.Decremented()};
360
0
      }
361
0
      case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
362
0
      case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
363
0
      case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED;
364
0
      case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
365
0
      case TransactionStatus::GRACEFUL_CLEANUP:
366
0
        return STATUS_FORMAT(Corruption, "Transaction has unexpected status: $0",
367
0
                             TransactionStatus_Name(status_));
368
0
    }
369
0
    FATAL_INVALID_ENUM_VALUE(TransactionStatus, status_);
370
0
  }
371
372
0
  void Aborted() {
373
0
    status_ = TransactionStatus::ABORTED;
374
0
    NotifyAbortWaiters(TransactionStatusResult::Aborted());
375
0
  }
376
377
115k
  TransactionStatusResult Abort(TransactionAbortCallback* callback) {
378
115k
    if (status_ == TransactionStatus::COMMITTED ||
379
113k
        status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS) {
380
2.68k
      return TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_);
381
113k
    } else if (ShouldBeCommitted()) {
382
23.0k
      return TransactionStatusResult(TransactionStatus::COMMITTED, HybridTime::kMax);
383
90.1k
    } else if (status_ == TransactionStatus::ABORTED) {
384
0
      return TransactionStatusResult::Aborted();
385
90.1k
    } else {
386
33
      VLOG_WITH_PREFIX(1) << "External abort request";
387
90.1k
      CHECK_EQ(TransactionStatus::PENDING, status_);
388
90.1k
      abort_waiters_.emplace_back(std::move(*callback));
389
90.1k
      Abort();
390
90.1k
      return TransactionStatusResult(TransactionStatus::PENDING, HybridTime::kMax);
391
90.1k
    }
392
115k
  }
393
394
636k
  void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request) {
395
636k
    auto& state = *request->request();
396
18.4E
    VLOG_WITH_PREFIX(1) << "Handle: " << state.ShortDebugString();
397
636k
    if (state.status() == TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS) {
398
246k
      auto status = AppliedInOneOfInvolvedTablets(state);
399
246k
      context_.CompleteWithStatus(std::move(request), status);
400
246k
      return;
401
246k
    }
402
390k
    if (replicating_) {
403
3.83k
      request_queue_.push_back(std::move(request));
404
3.83k
      return;
405
3.83k
    }
406
386k
    DoHandle(std::move(request));
407
386k
  }
408
409
  // Aborts this transaction.
410
135k
  void Abort() {
411
135k
    if (ShouldBeCommitted()) {
412
0
      LOG_WITH_PREFIX(DFATAL) << "Transaction abort in wrong state: " << status_;
413
0
      return;
414
0
    }
415
135k
    if (ShouldBeAborted()) {
416
314
      return;
417
314
    }
418
135k
    if (status_ != TransactionStatus::PENDING) {
419
0
      LOG_WITH_PREFIX(DFATAL) << "Unexpected status during abort: "
420
0
                              << TransactionStatus_Name(status_);
421
0
      return;
422
0
    }
423
135k
    SubmitUpdateStatus(TransactionStatus::ABORTED);
424
135k
  }
425
426
  // Returns logs prefix for this transaction.
427
0
  const std::string& LogPrefix() {
428
0
    return log_prefix_;
429
0
  }
430
431
  // now_physical is just optimization to avoid querying the current time multiple times.
432
412k
  void Poll(bool leader, MonoTime now_physical) {
433
412k
    if (status_ != TransactionStatus::COMMITTED &&
434
402k
        (status_ != TransactionStatus::SEALED || tablets_with_not_replicated_batches_ != 0)) {
435
402k
      return;
436
402k
    }
437
10.0k
    if (tablets_with_not_applied_intents_ == 0) {
438
888
      if (leader && !ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS)) {
439
27
        SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS);
440
27
      }
441
9.15k
    } else if (now_physical >= resend_applying_time_) {
442
3
      if (leader) {
443
18
        for (auto& tablet : involved_tablets_) {
444
18
          if (!tablet.second.all_intents_applied) {
445
5
            context_.NotifyApplying({
446
5
                .tablet = tablet.first,
447
5
                .transaction = id_,
448
5
                .aborted = aborted_,
449
5
                .commit_time = commit_time_,
450
5
                .sealed = status_ == TransactionStatus::SEALED });
451
5
          }
452
18
        }
453
2
      }
454
3
      resend_applying_time_ = now_physical +
455
3
          std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec);
456
3
    }
457
10.0k
  }
458
459
  void AddInvolvedTablets(
460
0
      const TabletId& source_tablet_id, const std::vector<TabletId>& tablet_ids) {
461
0
    auto source_it = involved_tablets_.find(source_tablet_id);
462
0
    if (source_it == involved_tablets_.end()) {
463
0
      LOG(FATAL) << "Unknown involved tablet: " << source_tablet_id;
464
0
      return;
465
0
    }
466
0
    for (const auto& tablet_id : tablet_ids) {
467
0
      if (involved_tablets_.emplace(tablet_id, source_it->second).second) {
468
0
        ++tablets_with_not_applied_intents_;
469
0
      }
470
0
    }
471
0
    if (!source_it->second.all_intents_applied) {
472
      // Mark source tablet as if intents have been applied for it.
473
0
      --tablets_with_not_applied_intents_;
474
0
      source_it->second.all_intents_applied = true;
475
0
    }
476
0
  }
477
478
246k
  CHECKED_STATUS AppliedInOneOfInvolvedTablets(const TransactionStatePB& state) {
479
246k
    if (status_ != TransactionStatus::COMMITTED && status_ != TransactionStatus::SEALED) {
480
      // We could ignore this request, because it will be re-send if required.
481
0
      LOG_WITH_PREFIX(DFATAL)
482
0
          << "AppliedInOneOfInvolvedTablets in wrong state: " << TransactionStatus_Name(status_)
483
0
          << ", request: " << state.ShortDebugString();
484
0
      return Status::OK();
485
0
    }
486
487
246k
    if (state.tablets_size() != 1) {
488
0
      return STATUS_FORMAT(
489
0
          InvalidArgument, "Expected exactly one tablet in $0: $1", __func__, state);
490
0
    }
491
492
246k
    auto it = involved_tablets_.find(state.tablets(0));
493
246k
    if (it == involved_tablets_.end()) {
494
      // This can happen when transaction coordinator retried apply to post-split tablets,
495
      // transaction coordinator moved to new status tablet leader and here new transaction
496
      // coordinator receives notification about txn is applied in post-split tablet not yet known
497
      // to new transaction coordinator.
498
      // It is safe to just log warning and ignore, because new transaction coordinator is sending
499
      // again apply requests to all involved tablet it knows and will be retrying for ones that
500
      // will reply have been already split.
501
0
      LOG_WITH_PREFIX(WARNING) << "Applied in unknown tablet: " << state.tablets(0);
502
0
      return Status::OK();
503
0
    }
504
246k
    if (!it->second.all_intents_applied) {
505
246k
      --tablets_with_not_applied_intents_;
506
246k
      it->second.all_intents_applied = true;
507
18.4E
      VLOG_WITH_PREFIX(4) << "Applied to " << state.tablets(0) << ", left not applied: "
508
18.4E
                          << tablets_with_not_applied_intents_;
509
246k
      if (tablets_with_not_applied_intents_ == 0) {
510
104k
        SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS);
511
104k
      }
512
246k
    }
513
246k
    return Status::OK();
514
246k
  }
515
516
 private:
517
  // Checks whether we in specified status or going to be in this status when replication is
518
  // finished.
519
2.01M
  bool ShouldBeInStatus(TransactionStatus status) const {
520
2.01M
    if (status_ == status) {
521
3.64k
      return true;
522
3.64k
    }
523
2.00M
    if (replicating_) {
524
65.2k
      if (replicating_->request()->status() == status) {
525
21.7k
        return true;
526
21.7k
      }
527
528
43.5k
      for (const auto& entry : request_queue_) {
529
4.81k
        if (entry->request()->status() == status) {
530
2.95k
          return true;
531
2.95k
        }
532
4.81k
      }
533
43.5k
    }
534
535
1.98M
    return false;
536
2.00M
  }
537
538
719k
  bool ShouldBeCommitted() const {
539
719k
    return ShouldBeInStatus(TransactionStatus::COMMITTED) ||
540
691k
           ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS);
541
719k
  }
542
543
135k
  bool ShouldBeAborted() const {
544
135k
    return ShouldBeInStatus(TransactionStatus::ABORTED);
545
135k
  }
546
547
  // Process operation that was replicated in RAFT.
548
1.87M
  CHECKED_STATUS DoProcessReplicated(const TransactionCoordinator::ReplicatedData& data) {
549
1.87M
    switch (data.state.status()) {
550
404k
      case TransactionStatus::ABORTED:
551
404k
        return AbortedReplicationFinished(data);
552
0
      case TransactionStatus::SEALED:
553
0
        return SealedReplicationFinished(data);
554
311k
      case TransactionStatus::COMMITTED:
555
311k
        return CommittedReplicationFinished(data);
556
727k
      case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
557
843k
      case TransactionStatus::PENDING:
558
843k
        return PendingReplicationFinished(data);
559
0
      case TransactionStatus::APPLYING:
560
        // APPLYING is handled separately, because it is received for transactions not managed by
561
        // this tablet as a transaction status tablet, but tablets that are involved in the data
562
        // path (receive write intents) for this transactions
563
0
        FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
564
0
      case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS:
565
        // APPLIED_IN_ONE_OF_INVOLVED_TABLETS handled w/o use of RAFT log
566
0
        FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
567
311k
      case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
568
311k
        return AppliedInAllInvolvedTabletsReplicationFinished(data);
569
0
      case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
570
0
      case TransactionStatus::GRACEFUL_CLEANUP:
571
        // CLEANUP is handled separately, because it is received for transactions not managed by
572
        // this tablet as a transaction status tablet, but tablets that are involved in the data
573
        // path (receive write intents) for this transactions
574
0
        FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
575
1.87M
    }
576
0
    FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
577
0
  }
578
579
394k
  void DoHandle(std::unique_ptr<tablet::UpdateTxnOperation> request) {
580
394k
    const auto& state = *request->request();
581
582
394k
    Status status;
583
394k
    auto txn_status = state.status();
584
394k
    if (txn_status == TransactionStatus::COMMITTED) {
585
104k
      status = HandleCommit();
586
289k
    } else if (txn_status == TransactionStatus::PENDING ||
587
282k
               txn_status == TransactionStatus::CREATED) {
588
        // Handling txn_status of CREATED when the current status (status_) is PENDING is only
589
        // allowed for backward compatibility with versions prior to D11210, which could send
590
        // transaction creation retries with the same id.
591
282k
      if (status_ != TransactionStatus::PENDING) {
592
4
        status = STATUS_FORMAT(IllegalState,
593
4
            "Transaction in wrong state during heartbeat: $0",
594
4
            TransactionStatus_Name(status_));
595
4
      }
596
282k
    }
597
598
394k
    if (!status.ok()) {
599
4
      context_.CompleteWithStatus(std::move(request), std::move(status));
600
4
      return;
601
4
    }
602
603
394k
    VLOG_WITH_PREFIX(4) << Format("DoHandle, replicating = $0", replicating_);
604
394k
    auto submitted = SubmitRequest(std::move(request));
605
    // Should always succeed, since we execute this code only on the leader.
606
103
    CHECK(submitted) << "Status: " << TransactionStatus_Name(txn_status);
607
394k
  }
608
609
104k
  CHECKED_STATUS HandleCommit() {
610
104k
    auto hybrid_time = context_.coordinator_context().clock().Now();
611
104k
    if (ExpiredAt(hybrid_time)) {
612
0
      auto status = STATUS(Expired, "Commit of expired transaction");
613
0
      VLOG_WITH_PREFIX(4) << status;
614
0
      Abort();
615
0
      return status;
616
0
    }
617
104k
    if (status_ != TransactionStatus::PENDING) {
618
0
      return STATUS_FORMAT(IllegalState,
619
0
                           "Transaction in wrong state when starting to commit: $0",
620
0
                           TransactionStatus_Name(status_));
621
0
    }
622
623
104k
    return Status::OK();
624
104k
  }
625
626
239k
  void SubmitUpdateStatus(TransactionStatus status) {
627
18.4E
    VLOG_WITH_PREFIX(4) << "SubmitUpdateStatus(" << TransactionStatus_Name(status) << ")";
628
629
239k
    TransactionStatePB state;
630
239k
    state.set_transaction_id(id_.data(), id_.size());
631
239k
    state.set_status(status);
632
633
239k
    auto request = context_.coordinator_context().CreateUpdateTransaction(&state);
634
239k
    if (replicating_) {
635
7.37k
      request_queue_.push_back(std::move(request));
636
232k
    } else {
637
232k
      SubmitRequest(std::move(request));
638
232k
    }
639
239k
  }
640
641
626k
  bool SubmitRequest(std::unique_ptr<tablet::UpdateTxnOperation> request) {
642
626k
    replicating_ = request.get();
643
626k
    replicating_submit_time_ = context_.coordinator_context().clock().Now();
644
18.4E
    VLOG_WITH_PREFIX(4) << Format("SubmitUpdateStatus, replicating = $0", replicating_);
645
626k
    if (!context_.SubmitUpdateTransaction(std::move(request))) {
646
      // Was not able to submit update transaction, for instance we are not leader.
647
      // So we are not replicating.
648
27
      replicating_ = nullptr;
649
27
      return false;
650
27
    }
651
652
626k
    return true;
653
626k
  }
654
655
385k
  void ProcessQueue() {
656
393k
    while (!replicating_ && !request_queue_.empty()) {
657
7.62k
      auto request = std::move(request_queue_.front());
658
7.62k
      request_queue_.pop_front();
659
7.62k
      DoHandle(std::move(request));
660
7.62k
    }
661
385k
  }
662
663
404k
  CHECKED_STATUS AbortedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) {
664
404k
    if (status_ != TransactionStatus::ABORTED &&
665
404k
        status_ != TransactionStatus::PENDING) {
666
0
      LOG_WITH_PREFIX(DFATAL) << "Invalid status of aborted transaction: "
667
0
                              << TransactionStatus_Name(status_);
668
0
    }
669
670
404k
    status_ = TransactionStatus::ABORTED;
671
404k
    first_entry_raft_index_ = data.op_id.index;
672
404k
    NotifyAbortWaiters(TransactionStatusResult::Aborted());
673
404k
    return Status::OK();
674
404k
  }
675
676
  CHECKED_STATUS SealedReplicationFinished(
677
0
      const TransactionCoordinator::ReplicatedData& data) {
678
0
    if (status_ != TransactionStatus::PENDING) {
679
0
      auto status = STATUS_FORMAT(
680
0
          IllegalState,
681
0
          "Unexpected status during CommittedReplicationFinished: $0",
682
0
          TransactionStatus_Name(status_));
683
0
      LOG_WITH_PREFIX(DFATAL) << status;
684
0
      return status;
685
0
    }
686
687
0
    last_touch_ = data.hybrid_time;
688
0
    commit_time_ = data.hybrid_time;
689
    // TODO(dtxn) Not yet implemented
690
0
    next_abort_after_sealing_ = CoarseMonoClock::now() + FLAGS_avoid_abort_after_sealing_ms * 1ms;
691
    // TODO(savepoints) Savepoints with sealed transactions is not yet tested
692
0
    aborted_ = data.state.aborted();
693
0
    VLOG_WITH_PREFIX(4) << "Seal time: " << commit_time_;
694
0
    status_ = TransactionStatus::SEALED;
695
696
0
    involved_tablets_.reserve(data.state.tablets().size());
697
0
    for (int idx = 0; idx != data.state.tablets().size(); ++idx) {
698
0
      auto tablet_batches = data.state.tablet_batches(idx);
699
0
      LOG_IF_WITH_PREFIX(DFATAL, tablet_batches == 0)
700
0
          << "Tablet without batches: " << data.state.ShortDebugString();
701
0
      ++tablets_with_not_replicated_batches_;
702
0
      InvolvedTabletState state = {
703
0
        .required_replicated_batches = static_cast<size_t>(tablet_batches),
704
0
        .all_batches_replicated = false,
705
0
        .all_intents_applied = false
706
0
      };
707
0
      involved_tablets_.emplace(data.state.tablets(idx), state);
708
0
    }
709
710
0
    first_entry_raft_index_ = data.op_id.index;
711
0
    return Status::OK();
712
0
  }
713
714
311k
  CHECKED_STATUS CommittedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) {
715
311k
    if (status_ != TransactionStatus::PENDING) {
716
0
      auto status = STATUS_FORMAT(
717
0
          IllegalState,
718
0
          "Unexpected status during CommittedReplicationFinished: $0",
719
0
          TransactionStatus_Name(status_));
720
0
      LOG_WITH_PREFIX(DFATAL) << status;
721
0
      return status;
722
0
    }
723
724
311k
    YB_TRANSACTION_DUMP(Commit, id_, data.hybrid_time, data.state.tablets().size());
725
726
311k
    last_touch_ = data.hybrid_time;
727
311k
    commit_time_ = data.hybrid_time;
728
311k
    first_entry_raft_index_ = data.op_id.index;
729
311k
    aborted_ = data.state.aborted();
730
731
311k
    involved_tablets_.reserve(data.state.tablets().size());
732
738k
    for (const auto& tablet : data.state.tablets()) {
733
738k
      InvolvedTabletState state = {
734
738k
        .required_replicated_batches = 0,
735
738k
        .all_batches_replicated = true,
736
738k
        .all_intents_applied = false
737
738k
      };
738
738k
      involved_tablets_.emplace(tablet, state);
739
738k
    }
740
741
311k
    status_ = TransactionStatus::COMMITTED;
742
311k
    StartApply();
743
311k
    return Status::OK();
744
311k
  }
745
746
  CHECKED_STATUS AppliedInAllInvolvedTabletsReplicationFinished(
747
311k
      const TransactionCoordinator::ReplicatedData& data) {
748
311k
    if (status_ != TransactionStatus::COMMITTED && status_ != TransactionStatus::SEALED) {
749
      // That could happen in old version, because we could drop all entries before
750
      // APPLIED_IN_ALL_INVOLVED_TABLETS.
751
0
      LOG_WITH_PREFIX(DFATAL)
752
0
          << "AppliedInAllInvolvedTabletsReplicationFinished in wrong state: "
753
0
          << TransactionStatus_Name(status_) << ", request: " << data.state.ShortDebugString();
754
0
      CHECK_EQ(status_, TransactionStatus::PENDING);
755
0
    }
756
16
    VLOG_WITH_PREFIX(4) << __func__ << ", status: " << TransactionStatus_Name(status_)
757
16
                        << ", leader: " << context_.leader();
758
311k
    last_touch_ = data.hybrid_time;
759
311k
    status_ = TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS;
760
761
311k
    YB_TRANSACTION_DUMP(Applied, id_, data.hybrid_time);
762
763
311k
    return Status::OK();
764
311k
  }
765
766
  // Used for PENDING and CREATED records. Because when we apply replicated operations they have
767
  // the same meaning.
768
843k
  CHECKED_STATUS PendingReplicationFinished(const TransactionCoordinator::ReplicatedData& data) {
769
843k
    if (context_.leader() && ExpiredAt(data.hybrid_time)) {
770
0
      VLOG_WITH_PREFIX(4) << "Expired during replication of PENDING or CREATED operations.";
771
45.4k
      Abort();
772
45.4k
      return Status::OK();
773
45.4k
    }
774
798k
    if (status_ != TransactionStatus::PENDING) {
775
0
      LOG_WITH_PREFIX(DFATAL) << "Bad status during " << __func__ << "(" << data.ToString()
776
0
                              << "): " << ToString();
777
0
      return Status::OK();
778
0
    }
779
798k
    last_touch_ = data.hybrid_time;
780
798k
    first_entry_raft_index_ = data.op_id.index;
781
798k
    return Status::OK();
782
798k
  }
783
784
2.91M
  void NotifyAbortWaiters(const Result<TransactionStatusResult>& result) {
785
89.2k
    for (auto& waiter : abort_waiters_) {
786
89.2k
      waiter(result);
787
89.2k
    }
788
2.91M
    abort_waiters_.clear();
789
2.91M
  }
790
791
312k
  void StartApply() {
792
14
    VLOG_WITH_PREFIX(4) << __func__ << ", commit time: " << commit_time_ << ", involved tablets: "
793
14
                        << AsString(involved_tablets_);
794
312k
    resend_applying_time_ = MonoTime::Now() +
795
312k
        std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec);
796
312k
    tablets_with_not_applied_intents_ = involved_tablets_.size();
797
312k
    if (context_.leader()) {
798
246k
      for (const auto& tablet : involved_tablets_) {
799
246k
        context_.NotifyApplying({
800
246k
            .tablet = tablet.first,
801
246k
            .transaction = id_,
802
246k
            .aborted = aborted_,
803
246k
            .commit_time = commit_time_,
804
246k
            .sealed = status_ == TransactionStatus::SEALED});
805
246k
      }
806
104k
    }
807
312k
    NotifyAbortWaiters(TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_));
808
312k
  }
809
810
  void FillExpectedTabletBatches(
811
0
      std::vector<ExpectedTabletBatches>* expected_tablet_batches) const {
812
0
    if (!expected_tablet_batches) {
813
0
      return;
814
0
    }
815
816
0
    for (const auto& tablet_id_and_state : involved_tablets_) {
817
0
      if (!tablet_id_and_state.second.all_batches_replicated) {
818
0
        expected_tablet_batches->push_back(ExpectedTabletBatches{
819
0
          .tablet = tablet_id_and_state.first,
820
0
          .batches = tablet_id_and_state.second.required_replicated_batches
821
0
        });
822
0
      }
823
0
    }
824
0
  }
825
826
  TransactionStateContext& context_;
827
  const TransactionId id_;
828
  const std::string log_prefix_;
829
  TransactionStatus status_ = TransactionStatus::PENDING;
830
  HybridTime last_touch_;
831
  // It should match last_touch_, but it is possible that because of some code errors it
832
  // would not be so. To add stability we introduce a separate field for it.
833
  HybridTime commit_time_;
834
835
  // If transaction was only sealed, we will try to abort it not earlier than this time.
836
  CoarseTimePoint next_abort_after_sealing_;
837
838
  struct InvolvedTabletState {
839
    // How many batches should be replicated at this tablet.
840
    size_t required_replicated_batches = 0;
841
842
    // True if this tablet already replicated all batches.
843
    bool all_batches_replicated = false;
844
845
    // True if this tablet already applied all intents.
846
    bool all_intents_applied = false;
847
848
737k
    std::string ToString() const {
849
737k
      return Format("{ required_replicated_batches: $0 all_batches_replicated: $1 "
850
737k
                        "all_intents_applied: $2 }",
851
737k
                    required_replicated_batches, all_batches_replicated, all_intents_applied);
852
737k
    }
853
  };
854
855
  // Tablets participating in this transaction.
856
  std::unordered_map<TabletId, InvolvedTabletState> involved_tablets_;
857
  // Number of tablets that have not yet replicated all batches.
858
  size_t tablets_with_not_replicated_batches_ = 0;
859
  // Number of tablets that have not yet applied intents.
860
  size_t tablets_with_not_applied_intents_ = 0;
861
  // Don't resend applying until this time.
862
  MonoTime resend_applying_time_;
863
  int64_t first_entry_raft_index_ = std::numeric_limits<int64_t>::max();
864
865
  // Metadata tracking aborted subtransaction IDs in this transaction.
866
  AbortedSubTransactionSetPB aborted_;
867
868
  // The operation that we a currently replicating in RAFT.
869
  // It is owned by TransactionDriver (that will be renamed to OperationDriver).
870
  tablet::UpdateTxnOperation* replicating_ = nullptr;
871
  // Hybrid time before submitting replicating operation.
872
  // It is guaranteed to be less then actual operation hybrid time.
873
  HybridTime replicating_submit_time_;
874
  std::deque<std::unique_ptr<tablet::UpdateTxnOperation>> request_queue_;
875
876
  std::vector<TransactionAbortCallback> abort_waiters_;
877
};
878
879
struct CompleteWithStatusEntry {
880
  std::unique_ptr<UpdateTxnOperation> holder;
881
  UpdateTxnOperation* request;
882
  Status status;
883
};
884
885
// Contains actions that should be executed after lock in transaction coordinator is released.
886
struct PostponedLeaderActions {
887
  int64_t leader_term = OpId::kUnknownTerm;
888
  // List of tablets with transaction id, that should be notified that this transaction
889
  // is applying.
890
  std::vector<NotifyApplyingData> notify_applying;
891
  // List of update transaction records, that should be replicated via RAFT.
892
  std::vector<std::unique_ptr<UpdateTxnOperation>> updates;
893
894
  std::vector<CompleteWithStatusEntry> complete_with_status;
895
896
4.21M
  void Swap(PostponedLeaderActions* other) {
897
4.21M
    std::swap(leader_term, other->leader_term);
898
4.21M
    notify_applying.swap(other->notify_applying);
899
4.21M
    updates.swap(other->updates);
900
4.21M
    complete_with_status.swap(other->complete_with_status);
901
4.21M
  }
902
903
6.23M
  bool leader() const {
904
6.23M
    return leader_term != OpId::kUnknownTerm;
905
6.23M
  }
906
};
907
908
} // namespace
909
910
0
std::string TransactionCoordinator::AbortedData::ToString() const {
911
0
  return YB_STRUCT_TO_STRING(state, op_id);
912
0
}
913
914
// Real implementation of transaction coordinator, as in PImpl idiom.
915
class TransactionCoordinator::Impl : public TransactionStateContext {
916
 public:
917
  Impl(const std::string& permanent_uuid,
918
       TransactionCoordinatorContext* context,
919
       Counter* expired_metric)
920
      : context_(*context),
921
        expired_metric_(*expired_metric),
922
        log_prefix_(consensus::MakeTabletLogPrefix(context->tablet_id(), permanent_uuid)),
923
29.1k
        poller_(log_prefix_, std::bind(&Impl::Poll, this)) {
924
29.1k
  }
925
926
128
  virtual ~Impl() {
927
128
    Shutdown();
928
128
  }
929
930
255
  void Shutdown() {
931
255
    poller_.Shutdown();
932
255
    rpcs_.Shutdown();
933
255
  }
934
935
  CHECKED_STATUS GetStatus(const google::protobuf::RepeatedPtrField<std::string>& transaction_ids,
936
                           CoarseTimePoint deadline,
937
208k
                           tserver::GetTransactionStatusResponsePB* response) {
938
208k
    AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms);
939
208k
    auto leader_term = context_.LeaderTerm();
940
208k
    PostponedLeaderActions postponed_leader_actions;
941
208k
    {
942
208k
      std::unique_lock<std::mutex> lock(managed_mutex_);
943
208k
      HybridTime leader_safe_time;
944
208k
      postponed_leader_actions_.leader_term = leader_term;
945
208k
      for (const auto& transaction_id : transaction_ids) {
946
208k
        auto id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_id));
947
948
208k
        auto it = managed_transactions_.find(id);
949
208k
        std::vector<ExpectedTabletBatches> expected_tablet_batches;
950
208k
        bool known_txn = it != managed_transactions_.end();
951
208k
        auto txn_status_with_ht = known_txn
952
208k
            ? VERIFY_RESULT(it->GetStatus(&expected_tablet_batches))
953
50.1k
            : TransactionStatusResult(TransactionStatus::ABORTED, HybridTime::kMax);
954
18.4E
        VLOG_WITH_PREFIX(4) << __func__ << ": " << id << " => " << txn_status_with_ht
955
18.4E
                            << ", last touch: " << it->last_touch();
956
208k
        if (txn_status_with_ht.status == TransactionStatus::SEALED) {
957
          // TODO(dtxn) Avoid concurrent resolve
958
0
          txn_status_with_ht = VERIFY_RESULT(ResolveSealedStatus(
959
0
              id, txn_status_with_ht.status_time, expected_tablet_batches,
960
0
              /* abort_if_not_replicated = */ false, &lock));
961
0
        }
962
208k
        if (!known_txn) {
963
50.1k
          if (!leader_safe_time) {
964
            // We should pick leader safe time only after managed_mutex_ is locked.
965
            // Otherwise applied transaction could be removed after this safe time.
966
50.1k
            leader_safe_time = VERIFY_RESULT(context_.LeaderSafeTime());
967
50.1k
          }
968
          // Please note that for known transactions we send 0, that means invalid hybrid time.
969
          // We would wait for safe time only for case when transaction is unknown to coordinator.
970
          // Since it is only case when transaction could be actually committed.
971
50.1k
          response->mutable_coordinator_safe_time()->Resize(response->status().size(), 0);
972
50.1k
          response->add_coordinator_safe_time(leader_safe_time.ToUint64());
973
50.1k
        }
974
208k
        response->add_status(txn_status_with_ht.status);
975
208k
        response->add_status_hybrid_time(txn_status_with_ht.status_time.ToUint64());
976
977
208k
        auto mutable_aborted_set_pb = response->add_aborted_subtxn_set();
978
208k
        if (txn_status_with_ht.status == TransactionStatus::COMMITTED &&
979
19.0k
            it != managed_transactions_.end()) {
980
19.0k
          *mutable_aborted_set_pb = it->GetAbortedSubTransactionSetPB();
981
19.0k
        }
982
208k
      }
983
208k
      postponed_leader_actions.Swap(&postponed_leader_actions_);
984
208k
    }
985
986
208k
    ExecutePostponedLeaderActions(&postponed_leader_actions);
987
208k
    if (GetAtomicFlag(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms)) {
988
0
      if (response->status().size() > 0 && response->status(0) == TransactionStatus::PENDING) {
989
0
        AtomicFlagRandomSleepMs(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms);
990
0
      }
991
0
    }
992
208k
    return Status::OK();
993
208k
  }
994
995
  Result<TransactionStatusResult> ResolveSealedStatus(
996
      const TransactionId& transaction_id,
997
      HybridTime commit_time,
998
      const std::vector<ExpectedTabletBatches>& expected_tablet_batches,
999
      bool abort_if_not_replicated,
1000
0
      std::unique_lock<std::mutex>* lock) {
1001
0
    VLOG_WITH_PREFIX(4)
1002
0
        << __func__ << ", txn: " << transaction_id << ", commit time: " << commit_time
1003
0
        << ", expected tablet batches: " << AsString(expected_tablet_batches)
1004
0
        << ", abort if not replicated: " << abort_if_not_replicated;
1005
1006
0
    auto deadline = TransactionRpcDeadline();
1007
0
    auto now_ht = context_.clock().Now();
1008
0
    CountDownLatch latch(expected_tablet_batches.size());
1009
0
    std::vector<HybridTime> write_hybrid_times(expected_tablet_batches.size());
1010
0
    {
1011
0
      lock->unlock();
1012
0
      auto scope_exit = ScopeExit([lock] {
1013
0
        if (lock) {
1014
0
          lock->lock();
1015
0
        }
1016
0
      });
1017
0
      size_t idx = 0;
1018
0
      for (const auto& p : expected_tablet_batches) {
1019
0
        tserver::GetTransactionStatusAtParticipantRequestPB req;
1020
0
        req.set_tablet_id(p.tablet);
1021
0
        req.set_transaction_id(
1022
0
            pointer_cast<const char*>(transaction_id.data()), transaction_id.size());
1023
0
        req.set_propagated_hybrid_time(now_ht.ToUint64());
1024
0
        if (abort_if_not_replicated) {
1025
0
          req.set_required_num_replicated_batches(p.batches);
1026
0
        }
1027
1028
0
        auto handle = rpcs_.Prepare();
1029
0
        if (handle != rpcs_.InvalidHandle()) {
1030
0
          *handle = GetTransactionStatusAtParticipant(
1031
0
              deadline,
1032
0
              nullptr /* remote_tablet */,
1033
0
              context_.client_future().get(),
1034
0
              &req,
1035
0
              [this, handle, idx, &write_hybrid_times, &expected_tablet_batches, &latch,
1036
0
               &transaction_id, &p](
1037
0
                  const Status& status,
1038
0
                  const tserver::GetTransactionStatusAtParticipantResponsePB& resp) {
1039
0
                client::UpdateClock(resp, &context_);
1040
0
                rpcs_.Unregister(handle);
1041
1042
0
                VLOG_WITH_PREFIX(4)
1043
0
                    << "TXN: " << transaction_id << " batch status at " << p.tablet << ": "
1044
0
                    << "idx: " << idx << ", resp: " << resp.ShortDebugString() << ", expected: "
1045
0
                    << expected_tablet_batches[idx].batches;
1046
0
                if (status.ok()) {
1047
0
                  if (resp.aborted()) {
1048
0
                    write_hybrid_times[idx] = HybridTime::kMin;
1049
0
                  } else if (implicit_cast<size_t>(resp.num_replicated_batches()) ==
1050
0
                                 expected_tablet_batches[idx].batches) {
1051
0
                    write_hybrid_times[idx] = HybridTime(resp.status_hybrid_time());
1052
0
                    LOG_IF_WITH_PREFIX(DFATAL, !write_hybrid_times[idx].is_valid())
1053
0
                        << "Received invalid hybrid time when all batches were replicated: "
1054
0
                        << resp.ShortDebugString();
1055
0
                  }
1056
0
                }
1057
0
                latch.CountDown();
1058
0
              });
1059
0
          (**handle).SendRpc();
1060
0
        } else {
1061
0
          latch.CountDown();
1062
0
        }
1063
0
        ++idx;
1064
0
      }
1065
0
      latch.Wait();
1066
0
    }
1067
1068
0
    auto txn_it = managed_transactions_.find(transaction_id);
1069
0
    if (txn_it == managed_transactions_.end()) {
1070
      // Transaction was completed (aborted/committed) during this procedure.
1071
0
      return TransactionStatusResult{TransactionStatus::PENDING, commit_time.Decremented()};
1072
0
    }
1073
1074
0
    for (size_t idx = 0; idx != expected_tablet_batches.size(); ++idx) {
1075
0
      if (write_hybrid_times[idx] == HybridTime::kMin) {
1076
0
        managed_transactions_.modify(txn_it, [](TransactionState& state) {
1077
0
          state.Aborted();
1078
0
        });
1079
0
      } else if (write_hybrid_times[idx].is_valid()) {
1080
0
        managed_transactions_.modify(
1081
0
            txn_it, [idx, &expected_tablet_batches, &write_hybrid_times](TransactionState& state) {
1082
0
          state.ReplicatedAllBatchesAt(
1083
0
              expected_tablet_batches[idx].tablet, write_hybrid_times[idx]);
1084
0
        });
1085
0
      }
1086
0
    }
1087
0
    auto result = VERIFY_RESULT(txn_it->GetStatus(/* expected_tablet_batches = */ nullptr));
1088
0
    if (result.status != TransactionStatus::SEALED) {
1089
0
      VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status resolved: "
1090
0
                          << TransactionStatus_Name(result.status);
1091
0
      return result;
1092
0
    }
1093
1094
0
    VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status NOT resolved";
1095
0
    return TransactionStatusResult{TransactionStatus::PENDING, result.status_time.Decremented()};
1096
0
  }
1097
1098
116k
  void Abort(const std::string& transaction_id, int64_t term, TransactionAbortCallback callback) {
1099
116k
    AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms);
1100
1101
116k
    auto id = FullyDecodeTransactionId(transaction_id);
1102
116k
    if (!id.ok()) {
1103
0
      callback(id.status());
1104
0
      return;
1105
0
    }
1106
1107
116k
    PostponedLeaderActions actions;
1108
116k
    {
1109
116k
      std::unique_lock<std::mutex> lock(managed_mutex_);
1110
116k
      auto it = managed_transactions_.find(*id);
1111
116k
      if (it == managed_transactions_.end()) {
1112
441
        lock.unlock();
1113
441
        callback(TransactionStatusResult::Aborted());
1114
441
        return;
1115
441
      }
1116
115k
      postponed_leader_actions_.leader_term = term;
1117
115k
      boost::optional<TransactionStatusResult> status;
1118
115k
      managed_transactions_.modify(it, [&status, &callback](TransactionState& state) {
1119
115k
        status = state.Abort(&callback);
1120
115k
      });
1121
115k
      if (callback) {
1122
25.7k
        lock.unlock();
1123
25.7k
        callback(*status);
1124
25.7k
        return;
1125
25.7k
      }
1126
90.0k
      actions.Swap(&postponed_leader_actions_);
1127
90.0k
    }
1128
1129
90.0k
    ExecutePostponedLeaderActions(&actions);
1130
90.0k
  }
1131
1132
0
  size_t test_count_transactions() {
1133
0
    std::lock_guard<std::mutex> lock(managed_mutex_);
1134
0
    return managed_transactions_.size();
1135
0
  }
1136
1137
1.87M
  CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) {
1138
1.87M
    auto id = FullyDecodeTransactionId(data.state.transaction_id());
1139
1.87M
    if (!id.ok()) {
1140
0
      return std::move(id.status());
1141
0
    }
1142
1143
1.87M
    PostponedLeaderActions actions;
1144
1.87M
    Status result;
1145
1.87M
    {
1146
1.87M
      std::lock_guard<std::mutex> lock(managed_mutex_);
1147
1.87M
      postponed_leader_actions_.leader_term = data.leader_term;
1148
1.87M
      auto it = GetTransaction(*id, data.state.status(), data.hybrid_time);
1149
1.87M
      if (it == managed_transactions_.end()) {
1150
0
        return Status::OK();
1151
0
      }
1152
1.87M
      managed_transactions_.modify(it, [&result, &data](TransactionState& state) {
1153
1.87M
        result = state.ProcessReplicated(data);
1154
1.87M
      });
1155
1.87M
      CheckCompleted(it);
1156
1.87M
      actions.Swap(&postponed_leader_actions_);
1157
1.87M
    }
1158
1.87M
    ExecutePostponedLeaderActions(&actions);
1159
1160
18.4E
    VLOG_WITH_PREFIX(1) << "Processed: " << data.ToString();
1161
1.87M
    return result;
1162
1.87M
  }
1163
1164
0
  void ProcessAborted(const AbortedData& data) {
1165
0
    auto id = FullyDecodeTransactionId(data.state.transaction_id());
1166
0
    if (!id.ok()) {
1167
0
      LOG_WITH_PREFIX(DFATAL) << "Abort of transaction with bad id "
1168
0
                              << data.state.ShortDebugString() << ": " << id.status();
1169
0
      return;
1170
0
    }
1171
1172
0
    PostponedLeaderActions actions;
1173
0
    {
1174
0
      std::lock_guard<std::mutex> lock(managed_mutex_);
1175
0
      postponed_leader_actions_.leader_term = OpId::kUnknownTerm;
1176
0
      auto it = managed_transactions_.find(*id);
1177
0
      if (it == managed_transactions_.end()) {
1178
0
        LOG_WITH_PREFIX(WARNING) << "Aborted operation for unknown transaction: " << *id;
1179
0
        return;
1180
0
      }
1181
0
      managed_transactions_.modify(
1182
0
          it, [&](TransactionState& ts) {
1183
0
            ts.ProcessAborted(data);
1184
0
          });
1185
0
      CheckCompleted(it);
1186
0
      actions.Swap(&postponed_leader_actions_);
1187
0
    }
1188
0
    ExecutePostponedLeaderActions(&actions);
1189
1190
0
    VLOG_WITH_PREFIX(1) << "Aborted, state: " << data.state.ShortDebugString()
1191
0
                        << ", op id: " << data.op_id;
1192
0
  }
1193
1194
29.1k
  void Start() {
1195
29.1k
    poller_.Start(
1196
29.1k
        &context_.client_future().get()->messenger()->scheduler(),
1197
29.1k
        std::chrono::microseconds(kTimeMultiplier * FLAGS_transaction_check_interval_usec));
1198
29.1k
  }
1199
1200
685k
  void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) {
1201
685k
    auto& state = *request->request();
1202
685k
    auto id = FullyDecodeTransactionId(state.transaction_id());
1203
685k
    if (!id.ok()) {
1204
0
      LOG(WARNING) << "Failed to decode id from " << state.ShortDebugString() << ": " << id;
1205
0
      request->CompleteWithStatus(id.status());
1206
0
      return;
1207
0
    }
1208
1209
685k
    PostponedLeaderActions actions;
1210
685k
    {
1211
685k
      std::unique_lock<std::mutex> lock(managed_mutex_);
1212
685k
      postponed_leader_actions_.leader_term = term;
1213
685k
      auto it = managed_transactions_.find(*id);
1214
685k
      if (it == managed_transactions_.end()) {
1215
292k
        if (state.status() == TransactionStatus::CREATED) {
1216
243k
          it = managed_transactions_.emplace(
1217
243k
              this, *id, context_.clock().Now(), log_prefix_).first;
1218
49.6k
        } else {
1219
49.6k
          lock.unlock();
1220
49.6k
          YB_LOG_HIGHER_SEVERITY_WHEN_TOO_MANY(INFO, WARNING, 1s, 50)
1221
49.6k
              << LogPrefix() << "Request to unknown transaction " << id << ": "
1222
49.6k
              << state.ShortDebugString();
1223
49.6k
          auto status = STATUS_EC_FORMAT(
1224
49.6k
              Expired, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE),
1225
49.6k
              "Transaction $0 expired or aborted by a conflict", *id);
1226
49.6k
          status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted));
1227
49.6k
          request->CompleteWithStatus(status);
1228
49.6k
          return;
1229
49.6k
        }
1230
635k
      }
1231
1232
637k
      managed_transactions_.modify(it, [&request](TransactionState& state) {
1233
637k
        state.Handle(std::move(request));
1234
637k
      });
1235
635k
      postponed_leader_actions_.Swap(&actions);
1236
635k
    }
1237
1238
635k
    ExecutePostponedLeaderActions(&actions);
1239
635k
  }
1240
1241
2.68M
  int64_t PrepareGC(std::string* details) {
1242
2.68M
    std::lock_guard<std::mutex> lock(managed_mutex_);
1243
2.68M
    if (!managed_transactions_.empty()) {
1244
161k
      auto& txn = *managed_transactions_.get<FirstEntryIndexTag>().begin();
1245
161k
      if (details) {
1246
0
        *details += Format("Transaction coordinator: $0\n", txn);
1247
0
      }
1248
161k
      return txn.first_entry_raft_index();
1249
161k
    }
1250
2.52M
    return std::numeric_limits<int64_t>::max();
1251
2.52M
  }
1252
1253
  // Returns logs prefix for this transaction coordinator.
1254
49.7k
  const std::string& LogPrefix() {
1255
49.7k
    return log_prefix_;
1256
49.7k
  }
1257
1258
0
  std::string DumpTransactions() {
1259
0
    std::string result;
1260
0
    std::lock_guard<std::mutex> lock(managed_mutex_);
1261
0
    for (const auto& txn : managed_transactions_) {
1262
0
      result += txn.ToString();
1263
0
      result += "\n";
1264
0
    }
1265
0
    return result;
1266
0
  }
1267
1268
 private:
1269
  class LastTouchTag;
1270
  class FirstEntryIndexTag;
1271
1272
  typedef boost::multi_index_container<TransactionState,
1273
      boost::multi_index::indexed_by <
1274
          boost::multi_index::hashed_unique <
1275
              boost::multi_index::const_mem_fun<TransactionState,
1276
                                                const TransactionId&,
1277
                                                &TransactionState::id>
1278
          >,
1279
          boost::multi_index::ordered_non_unique <
1280
              boost::multi_index::tag<LastTouchTag>,
1281
              boost::multi_index::const_mem_fun<TransactionState,
1282
                                                HybridTime,
1283
                                                &TransactionState::last_touch>
1284
          >,
1285
          boost::multi_index::ordered_non_unique <
1286
              boost::multi_index::tag<FirstEntryIndexTag>,
1287
              boost::multi_index::const_mem_fun<TransactionState,
1288
                                                int64_t,
1289
                                                &TransactionState::first_entry_raft_index>
1290
          >
1291
      >
1292
  > ManagedTransactions;
1293
1294
  void SendUpdateTransactionRequest(
1295
      const NotifyApplyingData& action, HybridTime now,
1296
246k
      const CoarseTimePoint& deadline) {
1297
18.4E
    VLOG_WITH_PREFIX(3) << "Notify applying: " << action.ToString();
1298
1299
246k
    tserver::UpdateTransactionRequestPB req;
1300
246k
    req.set_tablet_id(action.tablet);
1301
246k
    req.set_propagated_hybrid_time(now.ToUint64());
1302
246k
    auto& state = *req.mutable_state();
1303
246k
    state.set_transaction_id(action.transaction.data(), action.transaction.size());
1304
246k
    state.set_status(TransactionStatus::APPLYING);
1305
246k
    state.add_tablets(context_.tablet_id());
1306
246k
    state.set_commit_hybrid_time(action.commit_time.ToUint64());
1307
246k
    state.set_sealed(action.sealed);
1308
246k
    *state.mutable_aborted() = action.aborted;
1309
1310
246k
    auto handle = rpcs_.Prepare();
1311
246k
    if (handle != rpcs_.InvalidHandle()) {
1312
246k
      *handle = UpdateTransaction(
1313
246k
          deadline,
1314
246k
          nullptr /* remote_tablet */,
1315
246k
          context_.client_future().get(),
1316
246k
          &req,
1317
246k
          [this, handle, action]
1318
246k
              (const Status& status,
1319
246k
               const tserver::UpdateTransactionRequestPB& req,
1320
245k
               const tserver::UpdateTransactionResponsePB& resp) {
1321
245k
            client::UpdateClock(resp, &context_);
1322
245k
            rpcs_.Unregister(handle);
1323
246k
            if (status.ok()) {
1324
246k
              return;
1325
246k
            }
1326
18.4E
            LOG_WITH_PREFIX(WARNING)
1327
18.4E
                << "Failed to send apply for transaction: " << action.transaction << ": "
1328
18.4E
                << status;
1329
18.4E
            const auto split_child_tablet_ids = SplitChildTabletIdsData(status).value();
1330
18.4E
            const bool tablet_has_been_split = !split_child_tablet_ids.empty();
1331
18.4E
            if (status.IsNotFound() || tablet_has_been_split) {
1332
30
              std::lock_guard<std::mutex> lock(managed_mutex_);
1333
30
              auto it = managed_transactions_.find(action.transaction);
1334
30
              if (it == managed_transactions_.end()) {
1335
0
                return;
1336
0
              }
1337
30
              managed_transactions_.modify(
1338
30
                  it, [this, &action, &split_child_tablet_ids,
1339
30
                       tablet_has_been_split](TransactionState& state) {
1340
30
                    if (tablet_has_been_split) {
1341
                      // We need to update involved tablets map.
1342
0
                      LOG_WITH_PREFIX(INFO) << Format(
1343
0
                          "Tablet $0 has been split into: $1", action.tablet,
1344
0
                          split_child_tablet_ids);
1345
0
                      state.AddInvolvedTablets(action.tablet, split_child_tablet_ids);
1346
30
                    } else {
1347
                      // Tablet has been deleted (not split), so we should mark it as applied to
1348
                      // be able to cleanup the transaction.
1349
30
                      TransactionStatePB transaction_state;
1350
30
                      transaction_state.add_tablets(action.tablet);
1351
30
                      WARN_NOT_OK(
1352
30
                          state.AppliedInOneOfInvolvedTablets(transaction_state),
1353
30
                          "AppliedInOneOfInvolvedTablets for removed tabled failed: ");
1354
30
                    }
1355
30
                  });
1356
30
              if (tablet_has_been_split) {
1357
0
                const auto new_deadline = TransactionRpcDeadline();
1358
0
                NotifyApplyingData new_action = action;
1359
0
                for (const auto& split_child_tablet_id : split_child_tablet_ids) {
1360
0
                  new_action.tablet = split_child_tablet_id;
1361
0
                  SendUpdateTransactionRequest(new_action, context_.clock().Now(), new_deadline);
1362
0
                }
1363
0
              }
1364
30
            }
1365
18.4E
          });
1366
246k
      (**handle).SendRpc();
1367
246k
    }
1368
246k
  }
1369
1370
4.16M
  void ExecutePostponedLeaderActions(PostponedLeaderActions* actions) {
1371
249k
    for (const auto& p : actions->complete_with_status) {
1372
249k
      p.request->CompleteWithStatus(p.status);
1373
249k
    }
1374
1375
4.16M
    if (!actions->leader()) {
1376
2.17M
      return;
1377
2.17M
    }
1378
1379
1.99M
    if (!actions->notify_applying.empty()) {
1380
104k
      auto now = context_.clock().Now();
1381
104k
      auto deadline = TransactionRpcDeadline();
1382
246k
      for (const auto& action : actions->notify_applying) {
1383
246k
        SendUpdateTransactionRequest(action, now, deadline);
1384
246k
      }
1385
104k
    }
1386
1387
626k
    for (auto& update : actions->updates) {
1388
626k
      context_.SubmitUpdateTransaction(std::move(update), actions->leader_term);
1389
626k
    }
1390
1.99M
  }
1391
1392
  ManagedTransactions::iterator GetTransaction(const TransactionId& id,
1393
                                               TransactionStatus status,
1394
1.87M
                                               HybridTime hybrid_time) {
1395
1.87M
    auto it = managed_transactions_.find(id);
1396
1.87M
    if (it == managed_transactions_.end()) {
1397
484k
      if (status != TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS) {
1398
484k
        it = managed_transactions_.emplace(this, id, hybrid_time, log_prefix_).first;
1399
44
        VLOG_WITH_PREFIX(1) << Format("Added: $0", *it);
1400
484k
      }
1401
484k
    }
1402
1.87M
    return it;
1403
1.87M
  }
1404
1405
1.20M
  TransactionCoordinatorContext& coordinator_context() override {
1406
1.20M
    return context_;
1407
1.20M
  }
1408
1409
246k
  void NotifyApplying(NotifyApplyingData data) override {
1410
246k
    if (!leader()) {
1411
0
      LOG_WITH_PREFIX(WARNING) << __func__ << " at non leader: " << data.ToString();
1412
0
      return;
1413
0
    }
1414
246k
    postponed_leader_actions_.notify_applying.push_back(std::move(data));
1415
246k
  }
1416
1417
  MUST_USE_RESULT bool SubmitUpdateTransaction(
1418
627k
      std::unique_ptr<UpdateTxnOperation> operation) override {
1419
627k
    if (!postponed_leader_actions_.leader()) {
1420
27
      auto status = STATUS(IllegalState, "Submit update transaction on non leader");
1421
0
      VLOG_WITH_PREFIX(1) << status;
1422
27
      operation->CompleteWithStatus(status);
1423
27
      return false;
1424
27
    }
1425
1426
627k
    postponed_leader_actions_.updates.push_back(std::move(operation));
1427
627k
    return true;
1428
627k
  }
1429
1430
  void CompleteWithStatus(
1431
249k
      std::unique_ptr<UpdateTxnOperation> request, Status status) override {
1432
249k
    auto ptr = request.get();
1433
249k
    postponed_leader_actions_.complete_with_status.push_back({
1434
249k
        std::move(request), ptr, std::move(status)});
1435
249k
  }
1436
1437
0
  void CompleteWithStatus(UpdateTxnOperation* request, Status status) override {
1438
0
    postponed_leader_actions_.complete_with_status.push_back({
1439
0
        nullptr /* holder */, request, std::move(status)});
1440
0
  }
1441
1442
1.40M
  bool leader() const override {
1443
1.40M
    return postponed_leader_actions_.leader();
1444
1.40M
  }
1445
1446
1.42M
  void Poll() {
1447
1.42M
    auto now = context_.clock().Now();
1448
1449
1.42M
    auto leader_term = context_.LeaderTerm();
1450
1.42M
    bool leader = leader_term != OpId::kUnknownTerm;
1451
1.42M
    PostponedLeaderActions actions;
1452
1.42M
    {
1453
1.42M
      std::lock_guard<std::mutex> lock(managed_mutex_);
1454
1.42M
      postponed_leader_actions_.leader_term = leader_term;
1455
1456
1.42M
      auto& index = managed_transactions_.get<LastTouchTag>();
1457
1458
1.42M
      if (VLOG_IS_ON(4) && leader && !index.empty()) {
1459
0
        const auto& txn = *index.begin();
1460
0
        LOG_WITH_PREFIX(INFO)
1461
0
            << __func__ << ", now: " << now << ", first: " << txn.ToString()
1462
0
            << ", expired: " << txn.ExpiredAt(now) << ", timeout: "
1463
0
            << MonoDelta(GetTransactionTimeout()) << ", passed: "
1464
0
            << MonoDelta::FromMicroseconds(
1465
0
                   now.GetPhysicalValueMicros() - txn.last_touch().GetPhysicalValueMicros());
1466
0
      }
1467
1468
1.42M
      for (auto it = index.begin(); it != index.end() && it->ExpiredAt(now);) {
1469
381
        if (it->status() == TransactionStatus::ABORTED) {
1470
0
          it = index.erase(it);
1471
381
        } else {
1472
381
          if (leader) {
1473
190
            expired_metric_.Increment();
1474
190
            bool modified = index.modify(it, [](TransactionState& state) {
1475
0
              VLOG(4) << state.LogPrefix() << "Cleanup expired transaction";
1476
190
              state.Abort();
1477
190
            });
1478
190
            DCHECK(modified);
1479
190
          }
1480
381
          ++it;
1481
381
        }
1482
381
      }
1483
1.42M
      auto now_physical = MonoTime::Now();
1484
411k
      for (auto& transaction : managed_transactions_) {
1485
411k
        const_cast<TransactionState&>(transaction).Poll(leader, now_physical);
1486
411k
      }
1487
1.42M
      postponed_leader_actions_.Swap(&actions);
1488
1.42M
    }
1489
1.42M
    ExecutePostponedLeaderActions(&actions);
1490
1.42M
  }
1491
1492
1.52M
  void CheckCompleted(ManagedTransactions::iterator it) {
1493
1.52M
    if (it->Completed()) {
1494
589k
      auto status = STATUS_FORMAT(Expired, "Transaction completed: $0", *it);
1495
4
      VLOG_WITH_PREFIX(1) << status;
1496
589k
      managed_transactions_.modify(it, [&status](TransactionState& state) {
1497
589k
        state.ClearRequests(status);
1498
589k
      });
1499
589k
      managed_transactions_.erase(it);
1500
589k
    }
1501
1.52M
  }
1502
1503
  TransactionCoordinatorContext& context_;
1504
  Counter& expired_metric_;
1505
  const std::string log_prefix_;
1506
1507
  std::mutex managed_mutex_;
1508
  ManagedTransactions managed_transactions_;
1509
1510
  // Actions that should be executed after mutex is unlocked.
1511
  PostponedLeaderActions postponed_leader_actions_;
1512
1513
  rpc::Poller poller_;
1514
  rpc::Rpcs rpcs_;
1515
};
1516
1517
TransactionCoordinator::TransactionCoordinator(const std::string& permanent_uuid,
1518
                                               TransactionCoordinatorContext* context,
1519
                                               Counter* expired_metric)
1520
29.1k
    : impl_(new Impl(permanent_uuid, context, expired_metric)) {
1521
29.1k
}
1522
1523
128
TransactionCoordinator::~TransactionCoordinator() {
1524
128
}
1525
1526
1.87M
Status TransactionCoordinator::ProcessReplicated(const ReplicatedData& data) {
1527
1.87M
  return impl_->ProcessReplicated(data);
1528
1.87M
}
1529
1530
0
void TransactionCoordinator::ProcessAborted(const AbortedData& data) {
1531
0
  impl_->ProcessAborted(data);
1532
0
}
1533
1534
2.68M
int64_t TransactionCoordinator::PrepareGC(std::string* details) {
1535
2.68M
  return impl_->PrepareGC(details);
1536
2.68M
}
1537
1538
0
size_t TransactionCoordinator::test_count_transactions() const {
1539
0
  return impl_->test_count_transactions();
1540
0
}
1541
1542
void TransactionCoordinator::Handle(
1543
686k
    std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) {
1544
686k
  impl_->Handle(std::move(request), term);
1545
686k
}
1546
1547
29.1k
void TransactionCoordinator::Start() {
1548
29.1k
  impl_->Start();
1549
29.1k
}
1550
1551
128
void TransactionCoordinator::Shutdown() {
1552
128
  impl_->Shutdown();
1553
128
}
1554
1555
Status TransactionCoordinator::GetStatus(
1556
    const google::protobuf::RepeatedPtrField<std::string>& transaction_ids,
1557
    CoarseTimePoint deadline,
1558
208k
    tserver::GetTransactionStatusResponsePB* response) {
1559
208k
  return impl_->GetStatus(transaction_ids, deadline, response);
1560
208k
}
1561
1562
void TransactionCoordinator::Abort(const std::string& transaction_id,
1563
                                   int64_t term,
1564
116k
                                   TransactionAbortCallback callback) {
1565
116k
  impl_->Abort(transaction_id, term, std::move(callback));
1566
116k
}
1567
1568
0
std::string TransactionCoordinator::DumpTransactions() {
1569
0
  return impl_->DumpTransactions();
1570
0
}
1571
1572
1
std::string TransactionCoordinator::ReplicatedData::ToString() const {
1573
1
  return Format("{ leader_term: $0 state: $1 op_id: $2 hybrid_time: $3 }",
1574
1
                leader_term, state, op_id, hybrid_time);
1575
1
}
1576
1577
} // namespace tablet
1578
} // namespace yb