YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/transaction.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/client/transaction.h"
17
18
#include <unordered_set>
19
20
#include "yb/client/batcher.h"
21
#include "yb/client/client.h"
22
#include "yb/client/in_flight_op.h"
23
#include "yb/client/meta_cache.h"
24
#include "yb/client/transaction_cleanup.h"
25
#include "yb/client/transaction_manager.h"
26
#include "yb/client/transaction_rpc.h"
27
#include "yb/client/yb_op.h"
28
29
#include "yb/common/common.pb.h"
30
#include "yb/common/transaction.h"
31
#include "yb/common/transaction_error.h"
32
#include "yb/common/ybc_util.h"
33
34
#include "yb/rpc/messenger.h"
35
#include "yb/rpc/rpc.h"
36
#include "yb/rpc/scheduler.h"
37
38
#include "yb/tserver/tserver_service.pb.h"
39
40
#include "yb/util/countdown_latch.h"
41
#include "yb/util/flag_tags.h"
42
#include "yb/util/format.h"
43
#include "yb/util/logging.h"
44
#include "yb/util/random_util.h"
45
#include "yb/util/result.h"
46
#include "yb/util/scope_exit.h"
47
#include "yb/util/status_format.h"
48
#include "yb/util/status_log.h"
49
#include "yb/util/strongly_typed_bool.h"
50
#include "yb/util/trace.h"
51
#include "yb/util/tsan_util.h"
52
#include "yb/util/unique_lock.h"
53
54
using namespace std::literals;
55
using namespace std::placeholders;
56
57
DEFINE_int32(txn_print_trace_every_n, 0,
58
             "Controls the rate at which txn traces are printed. Setting this to 0 "
59
             "disables printing the collected traces.");
60
TAG_FLAG(txn_print_trace_every_n, advanced);
61
TAG_FLAG(txn_print_trace_every_n, runtime);
62
63
DEFINE_int32(txn_slow_op_threshold_ms, 0,
64
             "Controls the rate at which txn traces are printed. Setting this to 0 "
65
             "disables printing the collected traces.");
66
TAG_FLAG(txn_slow_op_threshold_ms, advanced);
67
TAG_FLAG(txn_slow_op_threshold_ms, runtime);
68
69
DEFINE_uint64(transaction_heartbeat_usec, 500000 * yb::kTimeMultiplier,
70
              "Interval of transaction heartbeat in usec.");
71
DEFINE_bool(transaction_disable_heartbeat_in_tests, false, "Disable heartbeat during test.");
72
DECLARE_uint64(max_clock_skew_usec);
73
74
DEFINE_test_flag(int32, transaction_inject_flushed_delay_ms, 0,
75
                 "Inject delay before processing flushed operations by transaction.");
76
77
DEFINE_test_flag(bool, disable_proactive_txn_cleanup_on_abort, false,
78
                "Disable cleanup of intents in abort path.");
79
80
DECLARE_string(placement_cloud);
81
DECLARE_string(placement_region);
82
83
namespace yb {
84
namespace client {
85
86
namespace {
87
88
YB_STRONGLY_TYPED_BOOL(Child);
89
YB_DEFINE_ENUM(TransactionState, (kRunning)(kAborted)(kCommitted)(kReleased)(kSealed));
90
91
} // namespace
92
93
14.6k
Result<ChildTransactionData> ChildTransactionData::FromPB(const ChildTransactionDataPB& data) {
94
14.6k
  ChildTransactionData result;
95
14.6k
  auto metadata = TransactionMetadata::FromPB(data.metadata());
96
14.6k
  RETURN_NOT_OK(metadata);
97
14.6k
  result.metadata = std::move(*metadata);
98
14.6k
  result.read_time = ReadHybridTime::FromReadTimePB(data);
99
0
  for (const auto& entry : data.local_limits()) {
100
0
    result.local_limits.emplace(entry.first, HybridTime(entry.second));
101
0
  }
102
14.6k
  return result;
103
14.6k
}
104
105
YB_DEFINE_ENUM(MetadataState, (kMissing)(kMaybePresent)(kPresent));
106
107
48.8k
void YBSubTransaction::SetActiveSubTransaction(SubTransactionId id) {
108
48.8k
  sub_txn_.subtransaction_id = id;
109
48.8k
  highest_subtransaction_id_ = std::max(highest_subtransaction_id_, id);
110
48.8k
}
111
112
23.5k
CHECKED_STATUS YBSubTransaction::RollbackSubTransaction(SubTransactionId id) {
113
  // We should abort the range [id, sub_txn_.highest_subtransaction_id]. It's possible that we
114
  // have created and released savepoints, such that there have been writes with a
115
  // subtransaction_id greater than sub_txn_.subtransaction_id, and those should be aborted as
116
  // well.
117
23.5k
  SCHECK_GE(
118
23.5k
    highest_subtransaction_id_, id,
119
23.5k
    InternalError,
120
23.5k
    "Attempted to rollback to non-existent savepoint.");
121
23.5k
  return sub_txn_.aborted.SetRange(id, highest_subtransaction_id_);
122
23.5k
}
123
124
63.5k
const SubTransactionMetadata& YBSubTransaction::get() { return sub_txn_; }
125
126
class YBTransaction::Impl final : public internal::TxnBatcherIf {
127
 public:
128
  Impl(TransactionManager* manager, YBTransaction* transaction, TransactionLocality locality)
129
      : trace_(new Trace),
130
        start_(CoarseMonoClock::Now()),
131
        manager_(manager),
132
        transaction_(transaction),
133
        read_point_(manager->clock()),
134
243k
        child_(Child::kFalse) {
135
243k
    metadata_.priority = RandomUniformInt<uint64_t>();
136
243k
    metadata_.locality = locality;
137
243k
    CompleteConstruction();
138
0
    VLOG_WITH_PREFIX(2) << "Started, metadata: " << metadata_;
139
243k
  }
140
141
  Impl(TransactionManager* manager, YBTransaction* transaction, const TransactionMetadata& metadata)
142
      : trace_(new Trace),
143
        start_(CoarseMonoClock::Now()),
144
        manager_(manager),
145
        transaction_(transaction),
146
        metadata_(metadata),
147
        read_point_(manager->clock()),
148
0
        child_(Child::kFalse) {
149
0
    CompleteConstruction();
150
0
    VLOG_WITH_PREFIX(2) << "Taken, metadata: " << metadata_;
151
0
  }
152
153
  Impl(TransactionManager* manager, YBTransaction* transaction, ChildTransactionData data)
154
      : trace_(new Trace),
155
        start_(CoarseMonoClock::Now()),
156
        manager_(manager),
157
        transaction_(transaction),
158
        read_point_(manager->clock()),
159
        child_(Child::kTrue),
160
14.6k
        child_had_read_time_(data.read_time) {
161
    // For serializable isolation we use read intents, so could always read most recent
162
    // version of DB.
163
    // Otherwise there is possible case when we miss value change that happened after transaction
164
    // start.
165
14.6k
    if (data.metadata.isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
166
14.6k
      read_point_.SetReadTime(std::move(data.read_time), std::move(data.local_limits));
167
14.6k
    }
168
14.6k
    metadata_ = std::move(data.metadata);
169
14.6k
    CompleteConstruction();
170
2
    VLOG_WITH_PREFIX(2) << "Started child, metadata: " << metadata_;
171
14.6k
    ready_ = true;
172
14.6k
  }
173
174
253k
  ~Impl() {
175
253k
    manager_->rpcs().Abort({&heartbeat_handle_, &commit_handle_, &abort_handle_});
176
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, !waiters_.empty()) << "Non empty waiters";
177
253k
    const auto threshold = GetAtomicFlag(&FLAGS_txn_slow_op_threshold_ms);
178
253k
    const auto now = CoarseMonoClock::Now();
179
253k
    if (trace_->must_print()
180
253k
           || (threshold > 0 && ToMilliseconds(now - start_) > threshold)) {
181
0
      LOG(INFO) << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n"
182
0
        << trace_->DumpToString(true);
183
253k
    } else {
184
0
      YB_LOG_IF_EVERY_N(INFO, FLAGS_txn_print_trace_every_n > 0, FLAGS_txn_print_trace_every_n)
185
0
        << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n"
186
0
        << trace_->DumpToString(true);
187
253k
    }
188
253k
  }
189
190
96.3k
  void SetPriority(uint64_t priority) {
191
96.3k
    metadata_.priority = priority;
192
96.3k
  }
193
194
20.2k
  uint64_t GetPriority() const {
195
20.2k
    return metadata_.priority;
196
20.2k
  }
197
198
0
  YBTransactionPtr CreateSimilarTransaction() {
199
0
    return std::make_shared<YBTransaction>(manager_);
200
0
  }
201
202
238k
  CHECKED_STATUS Init(IsolationLevel isolation, const ReadHybridTime& read_time) {
203
238k
    TRACE_TO(trace_, __func__);
204
18.4E
    VLOG_WITH_PREFIX(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", "
205
18.4E
                        << read_time << ")";
206
238k
    if (read_point_.GetReadTime().read.is_valid()) {
207
0
      return STATUS_FORMAT(IllegalState, "Read point already specified: $0",
208
0
                           read_point_.GetReadTime());
209
0
    }
210
211
238k
    if (read_time.read.is_valid()) {
212
0
      read_point_.SetReadTime(read_time, ConsistentReadPoint::HybridTimeMap());
213
0
    }
214
238k
    CompleteInit(isolation);
215
238k
    return Status::OK();
216
238k
  }
217
218
628
  void InitWithReadPoint(IsolationLevel isolation, ConsistentReadPoint&& read_point) {
219
0
    VLOG_WITH_PREFIX(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", "
220
0
                        << read_point.GetReadTime() << ")";
221
222
628
    read_point_.MoveFrom(&read_point);
223
628
    CompleteInit(isolation);
224
628
  }
225
226
84.6k
  const IsolationLevel isolation() const {
227
84.6k
    return metadata_.isolation;
228
84.6k
  }
229
230
  // This transaction is a restarted transaction, so we set it up with data from original one.
231
0
  CHECKED_STATUS FillRestartedTransaction(Impl* other) EXCLUDES(mutex_) {
232
0
    VLOG_WITH_PREFIX(1) << "Setup restart to " << other->ToString();
233
0
    auto transaction = transaction_->shared_from_this();
234
0
    TRACE_TO(trace_, __func__);
235
0
    {
236
0
      std::lock_guard<std::mutex> lock(mutex_);
237
0
      auto state = state_.load(std::memory_order_acquire);
238
0
      if (state != TransactionState::kRunning) {
239
0
        return STATUS_FORMAT(
240
0
            IllegalState, "Restart of completed transaction $0: $1",
241
0
            metadata_.transaction_id, state);
242
0
      }
243
0
      if (!read_point_.IsRestartRequired()) {
244
0
        return STATUS_FORMAT(
245
0
            IllegalState, "Restart of transaction that does not require restart: $0",
246
0
            metadata_.transaction_id);
247
0
      }
248
0
      other->read_point_.MoveFrom(&read_point_);
249
0
      other->read_point_.Restart();
250
0
      other->metadata_.isolation = metadata_.isolation;
251
      // TODO(Piyush): Do we need the below? If yes, prove with a test case and add it.
252
      // other->metadata_.priority = metadata_.priority;
253
0
      if (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
254
0
        other->metadata_.start_time = other->read_point_.GetReadTime().read;
255
0
      } else {
256
0
        other->metadata_.start_time = other->read_point_.Now();
257
0
      }
258
0
      state_.store(TransactionState::kAborted, std::memory_order_release);
259
0
    }
260
0
    DoAbort(TransactionRpcDeadline(), transaction);
261
262
0
    return Status::OK();
263
0
  }
264
265
782k
  Trace* trace() {
266
782k
    return trace_.get();
267
782k
  }
268
269
  CHECKED_STATUS CheckTransactionLocality(internal::InFlightOpsGroupsWithMetadata* ops_info)
270
568k
      REQUIRES(mutex_) {
271
568k
    if (metadata_.locality != TransactionLocality::LOCAL) {
272
568k
      return Status::OK();
273
568k
    }
274
18.4E
    for (auto& group : ops_info->groups) {
275
0
      auto& first_op = *group.begin;
276
0
      const auto& tablet = first_op.tablet;
277
0
      const auto& tablet_id = tablet->tablet_id();
278
279
0
      auto tservers = tablet->GetRemoteTabletServers(internal::IncludeFailedReplicas::kTrue);
280
0
      for (const auto &tserver : tservers) {
281
0
        auto pb = tserver->cloud_info();
282
0
        if (pb.placement_cloud() != FLAGS_placement_cloud ||
283
0
            pb.placement_region() != FLAGS_placement_region) {
284
0
          VLOG_WITH_PREFIX(4) << "Aborting (accessing nonlocal tablet in local transaction)";
285
0
          return STATUS_FORMAT(
286
0
              IllegalState, "Nonlocal tablet accessed in local transaction: tablet $0", tablet_id);
287
0
        }
288
0
      }
289
0
    }
290
18.4E
    return Status::OK();
291
18.4E
  }
292
293
  bool Prepare(internal::InFlightOpsGroupsWithMetadata* ops_info,
294
               ForceConsistentRead force_consistent_read,
295
               CoarseTimePoint deadline,
296
               Initial initial,
297
808k
               Waiter waiter) override EXCLUDES(mutex_) {
298
42
    VLOG_WITH_PREFIX(2) << "Prepare(" << force_consistent_read << ", " << initial << ", "
299
42
                        << AsString(ops_info->groups) << ")";
300
808k
    TRACE_TO(trace_, "Preparing $0 ops", AsString(ops_info->groups.size()));
301
808k
    VTRACE_TO(2, trace_, "Preparing $0 ops", AsString(ops_info->groups));
302
303
808k
    {
304
808k
      UNIQUE_LOCK(lock, mutex_);
305
808k
      const bool defer = !ready_;
306
307
808k
      if (!defer || initial) {
308
568k
        Status status = CheckTransactionLocality(ops_info);
309
568k
        if (!status.ok()) {
310
0
          VLOG_WITH_PREFIX(2) << "Prepare, rejected: " << status;
311
0
          bool abort = false;
312
0
          auto state = state_.load(std::memory_order_acquire);
313
0
          if (state == TransactionState::kRunning) {
314
            // State will be changed to aborted in SetErrorUnlocked
315
0
            abort = true;
316
0
          }
317
0
          SetErrorUnlocked(status, "Check transaction locality");
318
0
          lock.unlock();
319
0
          if (waiter) {
320
0
            waiter(status);
321
0
          }
322
0
          if (abort) {
323
0
            Abort(TransactionRpcDeadline());
324
0
          }
325
0
          return false;
326
0
        }
327
782k
        for (auto& group : ops_info->groups) {
328
782k
          auto& first_op = *group.begin;
329
782k
          const auto should_add_intents = first_op.yb_op->should_add_intents(metadata_.isolation);
330
782k
          const auto& tablet = first_op.tablet;
331
782k
          const auto& tablet_id = tablet->tablet_id();
332
333
782k
          bool has_metadata;
334
782k
          if (initial && should_add_intents) {
335
554k
            auto& tablet_state = tablets_[tablet_id];
336
            // TODO(dtxn) Handle skipped writes, i.e. writes that did not write anything (#3220)
337
554k
            first_op.batch_idx = tablet_state.num_batches;
338
554k
            ++tablet_state.num_batches;
339
554k
            has_metadata = tablet_state.has_metadata;
340
227k
          } else {
341
227k
            const auto it = tablets_.find(tablet_id);
342
227k
            has_metadata = it != tablets_.end() && it->second.has_metadata;
343
227k
          }
344
782k
          group.need_metadata = !has_metadata;
345
782k
        }
346
568k
      }
347
348
808k
      if (defer) {
349
239k
        if (waiter) {
350
239k
          waiters_.push_back(std::move(waiter));
351
239k
        }
352
239k
        lock.unlock();
353
11
        VLOG_WITH_PREFIX(2) << "Prepare, rejected (not ready, requesting status tablet)";
354
239k
        RequestStatusTablet(deadline);
355
239k
        return false;
356
239k
      }
357
358
      // For serializable isolation we never choose read time, since it always reads latest
359
      // snapshot.
360
      // For snapshot isolation, if read time was not yet picked, we have to choose it now, if there
361
      // multiple tablets that will process first request.
362
568k
      SetReadTimeIfNeeded(ops_info->groups.size() > 1 || force_consistent_read);
363
568k
    }
364
365
568k
    ops_info->metadata = {
366
568k
      .transaction = metadata_,
367
568k
      .subtransaction = subtransaction_.active()
368
63.4k
          ? boost::make_optional(subtransaction_.get())
369
505k
          : boost::none,
370
568k
    };
371
372
568k
    return true;
373
568k
  }
374
375
568k
  void ExpectOperations(size_t count) EXCLUDES(mutex_) override {
376
568k
    std::lock_guard<std::mutex> lock(mutex_);
377
568k
    running_requests_ += count;
378
568k
  }
379
380
  void Flushed(
381
      const internal::InFlightOps& ops, const ReadHybridTime& used_read_time,
382
692k
      const Status& status) EXCLUDES(mutex_) override {
383
692k
    TRACE_TO(trace_, "Flushed $0 ops. with Status $1", ops.size(), status.ToString());
384
36
    VLOG_WITH_PREFIX(5)
385
36
        << "Flushed: " << yb::ToString(ops) << ", used_read_time: " << used_read_time
386
36
        << ", status: " << status;
387
692k
    if (FLAGS_TEST_transaction_inject_flushed_delay_ms > 0) {
388
0
      std::this_thread::sleep_for(FLAGS_TEST_transaction_inject_flushed_delay_ms * 1ms);
389
0
    }
390
391
692k
    boost::optional<Status> notify_commit_status;
392
692k
    bool abort = false;
393
394
692k
    CommitCallback commit_callback;
395
692k
    {
396
692k
      std::lock_guard<std::mutex> lock(mutex_);
397
692k
      running_requests_ -= ops.size();
398
399
692k
      if (status.ok()) {
400
602k
        if (used_read_time && metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
401
54.8k
          const bool read_point_already_set = static_cast<bool>(read_point_.GetReadTime());
402
54.8k
#ifndef NDEBUG
403
54.8k
          if (read_point_already_set) {
404
            // Display details of operations before crashing in debug mode.
405
0
            int op_idx = 1;
406
0
            for (const auto& op : ops) {
407
0
              LOG(ERROR) << "Operation " << op_idx << ": " << op.ToString();
408
0
              op_idx++;
409
0
            }
410
0
          }
411
54.8k
#endif
412
0
          LOG_IF_WITH_PREFIX(DFATAL, read_point_already_set)
413
0
              << "Read time already picked (" << read_point_.GetReadTime()
414
0
              << ", but server replied with used read time: " << used_read_time;
415
54.8k
          read_point_.SetReadTime(used_read_time, ConsistentReadPoint::HybridTimeMap());
416
54.8k
        }
417
602k
        const std::string* prev_tablet_id = nullptr;
418
3.20M
        for (const auto& op : ops) {
419
3.20M
          if (op.yb_op->applied() && op.yb_op->should_add_intents(metadata_.isolation)) {
420
2.97M
            const std::string& tablet_id = op.tablet->tablet_id();
421
2.97M
            if (prev_tablet_id == nullptr || tablet_id != *prev_tablet_id) {
422
374k
              prev_tablet_id = &tablet_id;
423
374k
              tablets_[tablet_id].has_metadata = true;
424
374k
            }
425
2.97M
          }
426
3.20M
        }
427
89.6k
      } else {
428
89.6k
        const TransactionError txn_err(status);
429
        // We don't abort the txn in case of a kSkipLocking error to make further progress.
430
        // READ COMMITTED isolation retries errors of kConflict and kReadRestart by restarting
431
        // statements instead of the whole txn and hence should avoid aborting the txn in this case
432
        // too.
433
89.6k
        bool avoid_abort =
434
89.6k
            (txn_err.value() == TransactionErrorCode::kSkipLocking) ||
435
89.8k
            (metadata_.isolation == IsolationLevel::READ_COMMITTED &&
436
23.5k
              (txn_err.value() == TransactionErrorCode::kReadRestartRequired ||
437
23.5k
                txn_err.value() == TransactionErrorCode::kConflict));
438
89.6k
        if (!avoid_abort) {
439
66.3k
          auto state = state_.load(std::memory_order_acquire);
440
18.4E
          VLOG_WITH_PREFIX(4) << "Abort desired, state: " << AsString(state);
441
66.3k
          if (state == TransactionState::kRunning) {
442
45.1k
            abort = true;
443
            // State will be changed to aborted in SetError
444
45.1k
          }
445
446
66.3k
          SetErrorUnlocked(status, "Flush");
447
66.3k
        }
448
89.6k
      }
449
450
692k
      if (running_requests_ == 0 && commit_replicated_) {
451
0
        notify_commit_status = status_;
452
0
        commit_callback = std::move(commit_callback_);
453
0
      }
454
692k
    }
455
456
692k
    if (notify_commit_status) {
457
0
      VLOG_WITH_PREFIX(4) << "Sealing done: " << *notify_commit_status;
458
0
      commit_callback(*notify_commit_status);
459
0
    }
460
461
692k
    if (abort && !child_) {
462
45.0k
      DoAbort(TransactionRpcDeadline(), transaction_->shared_from_this());
463
45.0k
    }
464
692k
  }
465
466
  void Commit(CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback)
467
157k
      EXCLUDES(mutex_) {
468
157k
    auto transaction = transaction_->shared_from_this();
469
157k
    TRACE_TO(trace_, __func__);
470
157k
    {
471
157k
      UNIQUE_LOCK(lock, mutex_);
472
157k
      auto status = CheckCouldCommitUnlocked(seal_only);
473
157k
      if (!status.ok()) {
474
7
        lock.unlock();
475
7
        callback(status);
476
7
        return;
477
7
      }
478
157k
      state_.store(seal_only ? TransactionState::kSealed : TransactionState::kCommitted,
479
157k
                   std::memory_order_release);
480
157k
      commit_callback_ = std::move(callback);
481
157k
      if (!ready_) {
482
        // If we have not written any intents and do not even have a transaction status tablet,
483
        // just report the transaction as committed.
484
        //
485
        // See https://github.com/yugabyte/yugabyte-db/issues/3105 for details -- we might be able
486
        // to remove this special case if it turns out there is a bug elsewhere.
487
5
        if (tablets_.empty() && running_requests_ == 0) {
488
0
          VLOG_WITH_PREFIX(4) << "Committed empty transaction";
489
5
          auto commit_callback = std::move(commit_callback_);
490
5
          lock.unlock();
491
5
          commit_callback(Status::OK());
492
5
          return;
493
5
        }
494
495
0
        waiters_.emplace_back(std::bind(
496
0
            &Impl::DoCommit, this, deadline, seal_only, _1, transaction));
497
0
        lock.unlock();
498
0
        RequestStatusTablet(deadline);
499
0
        return;
500
0
      }
501
157k
    }
502
157k
    DoCommit(deadline, seal_only, Status::OK(), transaction);
503
157k
  }
504
505
82.9k
  void Abort(CoarseTimePoint deadline) EXCLUDES(mutex_) {
506
82.9k
    auto transaction = transaction_->shared_from_this();
507
508
18.4E
    VLOG_WITH_PREFIX(2) << "Abort";
509
82.9k
    TRACE_TO(trace_, __func__);
510
82.9k
    {
511
82.9k
      UNIQUE_LOCK(lock, mutex_);
512
82.9k
      auto state = state_.load(std::memory_order_acquire);
513
82.9k
      if (state != TransactionState::kRunning) {
514
78.9k
        if (state != TransactionState::kAborted) {
515
0
          LOG_WITH_PREFIX(DFATAL)
516
0
              << "Abort of committed transaction: " << AsString(state);
517
78.9k
        } else {
518
18.4E
          VLOG_WITH_PREFIX(2) << "Already aborted";
519
78.9k
        }
520
78.9k
        return;
521
78.9k
      }
522
3.98k
      if (child_) {
523
0
        LOG_WITH_PREFIX(DFATAL) << "Abort of child transaction";
524
0
        return;
525
0
      }
526
3.98k
      state_.store(TransactionState::kAborted, std::memory_order_release);
527
3.98k
      if (!ready_) {
528
92
        std::vector<Waiter> waiters;
529
92
        waiters_.swap(waiters);
530
92
        lock.unlock();
531
92
        const auto aborted_status = STATUS(Aborted, "Transaction aborted");
532
40
        for(const auto& waiter : waiters) {
533
40
          waiter(aborted_status);
534
40
        }
535
0
        VLOG_WITH_PREFIX(2) << "Aborted transaction not yet ready";
536
92
        return;
537
92
      }
538
3.88k
    }
539
3.88k
    DoAbort(deadline, transaction);
540
3.88k
  }
541
542
298k
  bool IsRestartRequired() const {
543
298k
    return read_point_.IsRestartRequired();
544
298k
  }
545
546
5.82k
  std::shared_future<Result<TransactionMetadata>> GetMetadata() EXCLUDES(mutex_) {
547
5.82k
    UNIQUE_LOCK(lock, mutex_);
548
5.82k
    if (metadata_future_.valid()) {
549
0
      return metadata_future_;
550
0
    }
551
5.82k
    metadata_future_ = std::shared_future<Result<TransactionMetadata>>(
552
5.82k
        metadata_promise_.get_future());
553
5.82k
    if (!ready_) {
554
424
      auto transaction = transaction_->shared_from_this();
555
424
      waiters_.push_back([this, transaction](const Status& status) {
556
424
        WARN_NOT_OK(status, "Transaction request failed");
557
424
        if (status.ok()) {
558
424
          metadata_promise_.set_value(metadata_);
559
0
        } else {
560
0
          metadata_promise_.set_value(status);
561
0
        }
562
424
      });
563
424
      lock.unlock();
564
424
      RequestStatusTablet(TransactionRpcDeadline());
565
424
      lock.lock();
566
424
      return metadata_future_;
567
424
    }
568
569
5.40k
    metadata_promise_.set_value(metadata_);
570
5.40k
    return metadata_future_;
571
5.40k
  }
572
573
  void PrepareChild(
574
      ForceConsistentRead force_consistent_read, CoarseTimePoint deadline,
575
82.5k
      PrepareChildCallback callback) {
576
82.5k
    auto transaction = transaction_->shared_from_this();
577
82.5k
    TRACE_TO(trace_, __func__);
578
82.5k
    UNIQUE_LOCK(lock, mutex_);
579
82.5k
    auto status = CheckRunningUnlocked();
580
82.5k
    if (!status.ok()) {
581
0
      lock.unlock();
582
0
      callback(status);
583
0
      return;
584
0
    }
585
82.5k
    if (IsRestartRequired()) {
586
0
      lock.unlock();
587
0
      callback(STATUS(IllegalState, "Restart required"));
588
0
      return;
589
0
    }
590
591
82.5k
    SetReadTimeIfNeeded(force_consistent_read);
592
593
82.5k
    if (!ready_) {
594
3.49k
      waiters_.emplace_back(std::bind(
595
3.49k
          &Impl::DoPrepareChild, this, _1, transaction, std::move(callback)));
596
3.49k
      lock.unlock();
597
3.49k
      RequestStatusTablet(deadline);
598
3.49k
      return;
599
3.49k
    }
600
601
79.0k
    ChildTransactionDataPB child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction);
602
79.0k
    lock.unlock();
603
79.0k
    callback(child_txn_data_pb);
604
79.0k
  }
605
606
14.5k
  Result<ChildTransactionResultPB> FinishChild() {
607
14.5k
    TRACE_TO(trace_, __func__);
608
14.5k
    UNIQUE_LOCK(lock, mutex_);
609
14.5k
    RETURN_NOT_OK(CheckRunningUnlocked());
610
14.5k
    if (!child_) {
611
0
      return STATUS(IllegalState, "Finish child of non child transaction");
612
0
    }
613
14.5k
    state_.store(TransactionState::kCommitted, std::memory_order_release);
614
14.5k
    ChildTransactionResultPB result;
615
14.5k
    auto& tablets = *result.mutable_tablets();
616
14.5k
    tablets.Reserve(narrow_cast<int>(tablets_.size()));
617
35.1k
    for (const auto& tablet : tablets_) {
618
35.1k
      auto& out = *tablets.Add();
619
35.1k
      out.set_tablet_id(tablet.first);
620
35.1k
      out.set_num_batches(tablet.second.num_batches);
621
35.1k
      out.set_metadata_state(
622
34.8k
          tablet.second.has_metadata ? InvolvedTabletMetadataState::EXIST
623
332
                                     : InvolvedTabletMetadataState::MISSING);
624
35.1k
    }
625
14.5k
    read_point_.FinishChildTransactionResult(HadReadTime(child_had_read_time_), &result);
626
14.5k
    return result;
627
14.5k
  }
628
629
14.1k
  Status ApplyChildResult(const ChildTransactionResultPB& result) EXCLUDES(mutex_) {
630
14.1k
    TRACE_TO(trace_, __func__);
631
14.1k
    std::vector<std::string> cleanup_tablet_ids;
632
14.1k
    auto se = ScopeExit([this, &cleanup_tablet_ids] {
633
14.1k
      if (cleanup_tablet_ids.empty()) {
634
14.1k
        return;
635
14.1k
      }
636
0
      CleanupTransaction(
637
0
          manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse,
638
0
          CleanupType::kImmediate, cleanup_tablet_ids);
639
0
    });
640
14.1k
    UNIQUE_LOCK(lock, mutex_);
641
14.1k
    if (state_.load(std::memory_order_acquire) == TransactionState::kAborted) {
642
0
      cleanup_tablet_ids.reserve(result.tablets().size());
643
0
      for (const auto& tablet : result.tablets()) {
644
0
        cleanup_tablet_ids.push_back(tablet.tablet_id());
645
0
      }
646
0
    }
647
648
14.1k
    RETURN_NOT_OK(CheckRunningUnlocked());
649
14.1k
    if (child_) {
650
0
      return STATUS(IllegalState, "Apply child result of child transaction");
651
0
    }
652
653
34.8k
    for (const auto& tablet : result.tablets()) {
654
34.8k
      auto& tablet_state = tablets_[tablet.tablet_id()];
655
34.8k
      tablet_state.num_batches += tablet.num_batches();
656
34.8k
      tablet_state.has_metadata =
657
34.8k
          tablet_state.has_metadata ||
658
34.6k
          tablet.metadata_state() == InvolvedTabletMetadataState::EXIST;
659
34.8k
    }
660
14.1k
    read_point_.ApplyChildTransactionResult(result);
661
662
14.1k
    return Status::OK();
663
14.1k
  }
664
665
2.42k
  const std::string& LogPrefix() {
666
2.42k
    return log_prefix_;
667
2.42k
  }
668
669
0
  std::string ToString() EXCLUDES(mutex_) {
670
0
    std::lock_guard<std::mutex> lock(mutex_);
671
0
    return Format("{ metadata: $0 state: $1 }", metadata_, state_.load(std::memory_order_acquire));
672
0
  }
673
674
15
  const TransactionId& id() const {
675
15
    return metadata_.transaction_id;
676
15
  }
677
678
1.07M
  ConsistentReadPoint& read_point() {
679
1.07M
    return read_point_;
680
1.07M
  }
681
682
0
  Result<TransactionMetadata> Release() {
683
0
    UNIQUE_LOCK(lock, mutex_);
684
0
    auto state = state_.load(std::memory_order_acquire);
685
0
    if (state != TransactionState::kRunning) {
686
0
      return STATUS_FORMAT(IllegalState, "Attempt to release transaction in the wrong state $0: $1",
687
0
                           metadata_.transaction_id, AsString(state));
688
0
    }
689
0
    state_.store(TransactionState::kReleased, std::memory_order_release);
690
691
0
    if (!ready_) {
692
0
      CountDownLatch latch(1);
693
0
      Status pick_status;
694
0
      auto transaction = transaction_->shared_from_this();
695
0
      waiters_.push_back([&latch, &pick_status](const Status& status) {
696
0
        pick_status = status;
697
0
        latch.CountDown();
698
0
      });
699
0
      lock.unlock();
700
0
      RequestStatusTablet(TransactionRpcDeadline());
701
0
      latch.Wait();
702
0
      RETURN_NOT_OK(pick_status);
703
0
      lock.lock();
704
0
    }
705
0
    return metadata_;
706
0
  }
707
708
0
  void StartHeartbeat() {
709
0
    VLOG_WITH_PREFIX(2) << __PRETTY_FUNCTION__;
710
0
    RequestStatusTablet(TransactionRpcDeadline());
711
0
  }
712
713
48.8k
  void SetActiveSubTransaction(SubTransactionId id) {
714
48.8k
    return subtransaction_.SetActiveSubTransaction(id);
715
48.8k
  }
716
717
23.5k
  CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) {
718
23.5k
    SCHECK(
719
23.5k
        subtransaction_.active(), InternalError,
720
23.5k
        "Attempted to rollback to savepoint before creating any savepoints.");
721
23.5k
    return subtransaction_.RollbackSubTransaction(id);
722
23.5k
  }
723
724
0
  bool HasSubTransactionState() {
725
0
    return subtransaction_.active();
726
0
  }
727
728
 private:
729
257k
  void CompleteConstruction() {
730
243k
    log_prefix_ = Format("$0$1: ", metadata_.transaction_id, child_ ? " (CHILD)" : "");
731
257k
    heartbeat_handle_ = manager_->rpcs().InvalidHandle();
732
257k
    commit_handle_ = manager_->rpcs().InvalidHandle();
733
257k
    abort_handle_ = manager_->rpcs().InvalidHandle();
734
257k
  }
735
736
239k
  void CompleteInit(IsolationLevel isolation) {
737
239k
    metadata_.isolation = isolation;
738
    // TODO(Piyush): read_point_ might not represent the correct start time for
739
    // a READ COMMITTED txn since it might have been updated several times
740
    // before a YBTransaction is created. Fix this.
741
239k
    if (read_point_.GetReadTime()) {
742
628
      metadata_.start_time = read_point_.GetReadTime().read;
743
238k
    } else {
744
238k
      metadata_.start_time = read_point_.Now();
745
238k
    }
746
239k
  }
747
748
536k
  void SetReadTimeIfNeeded(bool do_it) {
749
536k
    if (!read_point_.GetReadTime() && do_it &&
750
141k
        (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION ||
751
75.4k
         metadata_.isolation == IsolationLevel::READ_COMMITTED)) {
752
66.2k
      read_point_.SetCurrentReadTime();
753
66.2k
    }
754
536k
  }
755
756
267k
  CHECKED_STATUS CheckRunningUnlocked() REQUIRES(mutex_) {
757
267k
    if (state_.load(std::memory_order_acquire) != TransactionState::kRunning) {
758
7
      auto status = status_;
759
7
      if (status.ok()) {
760
0
        status = STATUS(IllegalState, "Transaction already completed");
761
0
      }
762
7
      return status;
763
7
    }
764
267k
    return Status::OK();
765
267k
  }
766
767
  void DoCommit(
768
      CoarseTimePoint deadline, SealOnly seal_only, const Status& status,
769
157k
      const YBTransactionPtr& transaction) EXCLUDES(mutex_) {
770
18.4E
    VLOG_WITH_PREFIX(1)
771
18.4E
        << Format("Commit, seal_only: $0, tablets: $1, status: $2",
772
18.4E
                  seal_only, tablets_, status);
773
774
157k
    UNIQUE_LOCK(lock, mutex_);
775
157k
    if (!status.ok()) {
776
0
      VLOG_WITH_PREFIX(4) << "Commit failed: " << status;
777
0
      auto commit_callback = std::move(commit_callback_);
778
0
      lock.unlock();
779
0
      commit_callback(status);
780
0
      return;
781
0
    }
782
783
157k
    tserver::UpdateTransactionRequestPB req;
784
157k
    req.set_tablet_id(status_tablet_->tablet_id());
785
157k
    req.set_propagated_hybrid_time(manager_->Now().ToUint64());
786
157k
    auto& state = *req.mutable_state();
787
157k
    state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
788
    // TODO(savepoints) -- Attach metadata about aborted subtransactions to commit message.
789
157k
    state.set_status(seal_only ? TransactionStatus::SEALED : TransactionStatus::COMMITTED);
790
157k
    state.mutable_tablets()->Reserve(narrow_cast<int>(tablets_.size()));
791
306k
    for (const auto& tablet : tablets_) {
792
      // If tablet does not have metadata it should not participate in commit.
793
307k
      if (!seal_only && !tablet.second.has_metadata) {
794
51
        continue;
795
51
      }
796
306k
      state.add_tablets(tablet.first);
797
306k
      if (seal_only) {
798
0
        state.add_tablet_batches(tablet.second.num_batches);
799
0
      }
800
306k
    }
801
802
    // If we don't have any tablets that have intents written to them, just abort it.
803
    // But notify caller that commit was successful, so it is transparent for him.
804
157k
    if (state.tablets().empty()) {
805
0
      VLOG_WITH_PREFIX(4) << "Committed empty";
806
216
      auto status_tablet = status_tablet_;
807
216
      auto commit_callback = std::move(commit_callback_);
808
216
      lock.unlock();
809
216
      DoAbort(deadline, transaction, status_tablet);
810
216
      commit_callback(Status::OK());
811
216
      return;
812
216
    }
813
814
156k
    if (subtransaction_.active()) {
815
154
      subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set());
816
154
    }
817
818
156k
    manager_->rpcs().RegisterAndStart(
819
156k
        UpdateTransaction(
820
156k
            deadline,
821
156k
            status_tablet_.get(),
822
156k
            manager_->client(),
823
156k
            &req,
824
156k
            [this, transaction](const auto& status, const auto& req, const auto& resp) {
825
156k
              this->CommitDone(status, resp, transaction);
826
156k
            }),
827
156k
        &commit_handle_);
828
156k
  }
829
830
82.7k
  void DoAbort(CoarseTimePoint deadline, const YBTransactionPtr& transaction) EXCLUDES(mutex_) {
831
82.7k
    decltype(status_tablet_) status_tablet;
832
82.7k
    {
833
82.7k
      std::lock_guard<std::mutex> lock(mutex_);
834
82.7k
      status_tablet = status_tablet_;
835
82.7k
    }
836
82.7k
    DoAbort(deadline, transaction, status_tablet);
837
82.7k
  }
838
839
  void DoAbort(
840
      CoarseTimePoint deadline,
841
      const YBTransactionPtr& transaction,
842
83.0k
      internal::RemoteTabletPtr status_tablet) EXCLUDES(mutex_) {
843
83.0k
    tserver::AbortTransactionRequestPB req;
844
83.0k
    req.set_tablet_id(status_tablet->tablet_id());
845
83.0k
    req.set_propagated_hybrid_time(manager_->Now().ToUint64());
846
83.0k
    req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
847
848
83.0k
    manager_->rpcs().RegisterAndStart(
849
83.0k
        AbortTransaction(
850
83.0k
            deadline,
851
83.0k
            status_tablet.get(),
852
83.0k
            manager_->client(),
853
83.0k
            &req,
854
83.0k
            std::bind(&Impl::AbortDone, this, _1, _2, transaction)),
855
83.0k
        &abort_handle_);
856
857
83.0k
    DoAbortCleanup(transaction, CleanupType::kImmediate);
858
83.0k
  }
859
860
  void DoAbortCleanup(const YBTransactionPtr& transaction, CleanupType cleanup_type)
861
135k
      EXCLUDES(mutex_) {
862
135k
    if (FLAGS_TEST_disable_proactive_txn_cleanup_on_abort) {
863
0
      VLOG_WITH_PREFIX(1) << "TEST: Disabled proactive transaction cleanup on abort";
864
0
      return;
865
0
    }
866
867
135k
    std::vector<std::string> tablet_ids;
868
135k
    {
869
135k
      std::lock_guard<std::mutex> lock(mutex_);
870
135k
      tablet_ids.reserve(tablets_.size());
871
182k
      for (const auto& tablet : tablets_) {
872
        // We don't check has_metadata here, because intents could be written even in case of
873
        // failure. For instance in case of conflict on unique index.
874
182k
        tablet_ids.push_back(tablet.first);
875
182k
      }
876
80
      VLOG_WITH_PREFIX(1) << "Cleaning up intents from: " << AsString(tablet_ids);
877
135k
    }
878
879
135k
    CleanupTransaction(
880
135k
        manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse,
881
135k
        cleanup_type, tablet_ids);
882
135k
  }
883
884
  void CommitDone(const Status& status,
885
                  const tserver::UpdateTransactionResponsePB& response,
886
156k
                  const YBTransactionPtr& transaction) {
887
156k
    TRACE_TO(trace_, __func__);
888
0
    VLOG_WITH_PREFIX(1) << "Committed: " << status;
889
890
156k
    UpdateClock(response, manager_);
891
156k
    manager_->rpcs().Unregister(&commit_handle_);
892
893
156k
    Status actual_status = status.IsAlreadyPresent() ? Status::OK() : status;
894
156k
    CommitCallback commit_callback;
895
156k
    if (state_.load(std::memory_order_acquire) != TransactionState::kCommitted &&
896
0
        actual_status.ok()) {
897
0
      std::lock_guard<std::mutex> lock(mutex_);
898
0
      commit_replicated_ = true;
899
0
      if (running_requests_ != 0) {
900
0
        return;
901
0
      }
902
0
      commit_callback = std::move(commit_callback_);
903
156k
    } else {
904
156k
      std::lock_guard<std::mutex> lock(mutex_);
905
156k
      commit_callback = std::move(commit_callback_);
906
156k
    }
907
18.4E
    VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status;
908
156k
    commit_callback(actual_status);
909
910
156k
    if (actual_status.IsExpired()) {
911
      // We can't perform immediate cleanup here because the transaction could be committed,
912
      // its APPLY records replicated in all participant tablets, and its status record removed
913
      // from the status tablet.
914
52.4k
      DoAbortCleanup(transaction, CleanupType::kGraceful);
915
52.4k
    }
916
156k
  }
917
918
  void AbortDone(const Status& status,
919
                 const tserver::AbortTransactionResponsePB& response,
920
81.9k
                 const YBTransactionPtr& transaction) {
921
81.9k
    TRACE_TO(trace_, __func__);
922
18.4E
    VLOG_WITH_PREFIX(1) << "Aborted: " << status;
923
924
81.9k
    if (response.has_propagated_hybrid_time()) {
925
81.9k
      manager_->UpdateClock(HybridTime(response.propagated_hybrid_time()));
926
81.9k
    }
927
81.9k
    manager_->rpcs().Unregister(&abort_handle_);
928
81.9k
  }
929
930
243k
  void RequestStatusTablet(const CoarseTimePoint& deadline) EXCLUDES(mutex_) {
931
243k
    TRACE_TO(trace_, __func__);
932
243k
    bool expected = false;
933
243k
    if (!requested_status_tablet_.compare_exchange_strong(
934
0
        expected, true, std::memory_order_acq_rel)) {
935
0
      return;
936
0
    }
937
18.4E
    VLOG_WITH_PREFIX(2) << "RequestStatusTablet()";
938
243k
    auto transaction = transaction_->shared_from_this();
939
243k
    if (metadata_.status_tablet.empty()) {
940
243k
      manager_->PickStatusTablet(
941
243k
          std::bind(&Impl::StatusTabletPicked, this, _1, deadline, transaction),
942
243k
          metadata_.locality);
943
18.4E
    } else {
944
18.4E
      LookupStatusTablet(metadata_.status_tablet, deadline, transaction);
945
18.4E
    }
946
243k
  }
947
948
  void StatusTabletPicked(const Result<std::string>& tablet,
949
                          const CoarseTimePoint& deadline,
950
243k
                          const YBTransactionPtr& transaction) {
951
243k
    TRACE_TO(trace_, __func__);
952
18.4E
    VLOG_WITH_PREFIX(2) << "Picked status tablet: " << tablet;
953
954
243k
    if (!tablet.ok()) {
955
0
      NotifyWaiters(tablet.status(), "Pick status tablet");
956
0
      return;
957
0
    }
958
959
243k
    LookupStatusTablet(*tablet, deadline, transaction);
960
243k
  }
961
962
  void LookupStatusTablet(const std::string& tablet_id,
963
                          const CoarseTimePoint& deadline,
964
243k
                          const YBTransactionPtr& transaction) {
965
243k
    TRACE_TO(trace_, __func__);
966
243k
    manager_->client()->LookupTabletById(
967
243k
        tablet_id,
968
243k
        /* table =*/ nullptr,
969
243k
        master::IncludeInactive::kFalse,
970
243k
        deadline,
971
243k
        std::bind(&Impl::LookupTabletDone, this, _1, transaction),
972
243k
        client::UseCache::kTrue);
973
243k
  }
974
975
  void LookupTabletDone(const Result<client::internal::RemoteTabletPtr>& result,
976
243k
                        const YBTransactionPtr& transaction) {
977
243k
    TRACE_TO(trace_, __func__);
978
18.4E
    VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result);
979
980
243k
    if (!result.ok()) {
981
0
      NotifyWaiters(result.status(), "Lookup tablet");
982
0
      return;
983
0
    }
984
985
243k
    bool precreated;
986
243k
    std::vector<Waiter> waiters;
987
243k
    {
988
243k
      std::lock_guard<std::mutex> lock(mutex_);
989
243k
      status_tablet_ = std::move(*result);
990
243k
      if (metadata_.status_tablet.empty()) {
991
243k
        metadata_.status_tablet = status_tablet_->tablet_id();
992
243k
        precreated = false;
993
18.4E
      } else {
994
18.4E
        precreated = true;
995
18.4E
        ready_ = true;
996
18.4E
        waiters_.swap(waiters);
997
18.4E
      }
998
243k
    }
999
243k
    if (precreated) {
1000
0
      for (const auto& waiter : waiters) {
1001
0
        waiter(Status::OK());
1002
0
      }
1003
0
    }
1004
243k
    SendHeartbeat(precreated ? TransactionStatus::PENDING : TransactionStatus::CREATED,
1005
243k
                  metadata_.transaction_id, transaction_->shared_from_this());
1006
243k
  }
1007
1008
242k
  void NotifyWaiters(const Status& status, const char* operation) {
1009
242k
    std::vector<Waiter> waiters;
1010
242k
    {
1011
242k
      std::lock_guard<std::mutex> lock(mutex_);
1012
242k
      if (status.ok()) {
1013
242k
        DCHECK(!ready_);
1014
242k
        ready_ = true;
1015
3
      } else {
1016
3
        SetErrorUnlocked(status, operation);
1017
3
      }
1018
242k
      waiters_.swap(waiters);
1019
242k
    }
1020
242k
    for (const auto& waiter : waiters) {
1021
242k
      waiter(status);
1022
242k
    }
1023
242k
  }
1024
1025
  void SendHeartbeat(TransactionStatus status,
1026
                     const TransactionId& id,
1027
520k
                     const std::weak_ptr<YBTransaction>& weak_transaction) {
1028
520k
    auto transaction = weak_transaction.lock();
1029
520k
    if (!transaction) {
1030
      // Cannot use LOG_WITH_PREFIX here, since this was actually destroyed.
1031
105
      VLOG(1) << id << ": Transaction destroyed";
1032
218k
      return;
1033
218k
    }
1034
1035
302k
    auto current_state = state_.load(std::memory_order_acquire);
1036
1037
302k
    if (!AllowHeartbeat(current_state, status)) {
1038
10
      VLOG_WITH_PREFIX(1) << " Send heartbeat cancelled: " << yb::ToString(transaction);
1039
19.6k
      return;
1040
19.6k
    }
1041
1042
18.4E
    VLOG_WITH_PREFIX(4) << __func__ << "(" << TransactionStatus_Name(status) << ")";
1043
1044
283k
    MonoDelta timeout;
1045
283k
    if (status != TransactionStatus::CREATED) {
1046
40.1k
      if (GetAtomicFlag(&FLAGS_transaction_disable_heartbeat_in_tests)) {
1047
0
        HeartbeatDone(Status::OK(), /* request= */ {}, /* response= */ {}, status, transaction);
1048
0
        return;
1049
0
      }
1050
40.1k
      timeout = std::chrono::microseconds(FLAGS_transaction_heartbeat_usec);
1051
242k
    } else {
1052
242k
      timeout = TransactionRpcTimeout();
1053
242k
    }
1054
1055
283k
    tserver::UpdateTransactionRequestPB req;
1056
1057
283k
    internal::RemoteTabletPtr status_tablet;
1058
283k
    {
1059
283k
      std::lock_guard<std::mutex> lock(mutex_);
1060
283k
      status_tablet = status_tablet_;
1061
283k
    }
1062
1063
283k
    req.set_tablet_id(status_tablet->tablet_id());
1064
283k
    req.set_propagated_hybrid_time(manager_->Now().ToUint64());
1065
283k
    auto& state = *req.mutable_state();
1066
    // TODO(savepoints) -- Attach metadata about aborted subtransactions in heartbeat.
1067
283k
    state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
1068
283k
    state.set_status(status);
1069
283k
    manager_->rpcs().RegisterAndStart(
1070
283k
        UpdateTransaction(
1071
283k
            CoarseMonoClock::now() + timeout,
1072
283k
            status_tablet.get(),
1073
283k
            manager_->client(),
1074
283k
            &req,
1075
283k
            std::bind(&Impl::HeartbeatDone, this, _1, _2, _3, status, transaction)),
1076
283k
        &heartbeat_handle_);
1077
283k
  }
1078
1079
302k
  static bool AllowHeartbeat(TransactionState current_state, TransactionStatus status) {
1080
302k
    switch (current_state) {
1081
283k
      case TransactionState::kRunning:
1082
283k
        return true;
1083
0
      case TransactionState::kReleased: FALLTHROUGH_INTENDED;
1084
0
      case TransactionState::kSealed:
1085
0
        return status == TransactionStatus::CREATED;
1086
19.4k
      case TransactionState::kAborted: FALLTHROUGH_INTENDED;
1087
19.6k
      case TransactionState::kCommitted:
1088
19.6k
        return false;
1089
0
    }
1090
0
    FATAL_INVALID_ENUM_VALUE(TransactionState, current_state);
1091
0
  }
1092
1093
  void HeartbeatDone(Status status,
1094
                     const tserver::UpdateTransactionRequestPB& request,
1095
                     const tserver::UpdateTransactionResponsePB& response,
1096
                     TransactionStatus transaction_status,
1097
281k
                     const YBTransactionPtr& transaction) {
1098
281k
    UpdateClock(response, manager_);
1099
281k
    manager_->rpcs().Unregister(&heartbeat_handle_);
1100
1101
281k
    if (status.ok() && transaction_status == TransactionStatus::CREATED) {
1102
242k
      auto decode_result = FullyDecodeTransactionId(request.state().transaction_id());
1103
242k
      if (decode_result.ok()) {
1104
242k
        metadata_.transaction_id = *decode_result;
1105
242k
        auto id_str = AsString(metadata_.transaction_id);
1106
        // It is not fully thread safe, since we don't use mutex to access log_prefix_.
1107
        // But here we just replace characters inplace.
1108
        // It would not crash anyway, and could produce wrong id in the logs.
1109
        // It is ok, since one moment before we would output nil id.
1110
242k
        log_prefix_.replace(0, id_str.length(), id_str);
1111
1
      } else {
1112
1
        status = decode_result.status();
1113
1
      }
1114
242k
    }
1115
1116
18.4E
    VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", "
1117
18.4E
                        << TransactionStatus_Name(transaction_status) << ")";
1118
1119
281k
    if (status.ok()) {
1120
279k
      if (transaction_status == TransactionStatus::CREATED) {
1121
242k
        NotifyWaiters(Status::OK(), "Heartbeat");
1122
242k
      }
1123
279k
      std::weak_ptr<YBTransaction> weak_transaction(transaction);
1124
279k
      manager_->client()->messenger()->scheduler().Schedule(
1125
275k
          [this, weak_transaction, id = metadata_.transaction_id](const Status&) {
1126
275k
              SendHeartbeat(TransactionStatus::PENDING, id, weak_transaction);
1127
275k
          },
1128
279k
          std::chrono::microseconds(FLAGS_transaction_heartbeat_usec));
1129
2.21k
    } else {
1130
2.21k
      auto state = state_.load(std::memory_order_acquire);
1131
2.21k
      LOG_WITH_PREFIX(WARNING) << "Send heartbeat failed: " << status << ", state: " << state;
1132
2.42k
      if (status.IsAborted() || status.IsExpired()) {
1133
        // IsAborted - Service is shutting down, no reason to retry.
1134
        // IsExpired - Transaction expired.
1135
55
        if (transaction_status == TransactionStatus::CREATED) {
1136
0
          NotifyWaiters(status, "Heartbeat");
1137
55
        } else {
1138
55
          SetError(status, "Heartbeat");
1139
55
        }
1140
        // If state is committed, then we should not cleanup.
1141
55
        if (status.IsExpired() && state == TransactionState::kRunning) {
1142
7
          DoAbortCleanup(transaction, CleanupType::kImmediate);
1143
7
        }
1144
55
        return;
1145
55
      }
1146
      // Other errors could have different causes, but we should just retry sending heartbeat
1147
      // in this case.
1148
2.15k
      SendHeartbeat(transaction_status, metadata_.transaction_id, transaction);
1149
2.15k
    }
1150
281k
  }
1151
1152
55
  void SetError(const Status& status, const char* operation) EXCLUDES(mutex_) {
1153
55
    std::lock_guard<std::mutex> lock(mutex_);
1154
55
    SetErrorUnlocked(status, operation);
1155
55
  }
1156
1157
100k
  void SetErrorUnlocked(const Status& status, const char* operation) REQUIRES(mutex_) {
1158
18.4E
    VLOG_WITH_PREFIX(1) << operation << " failed: " << status;
1159
100k
    if (status_.ok()) {
1160
78.7k
      status_ = status.CloneAndPrepend(operation);
1161
78.7k
      state_.store(TransactionState::kAborted, std::memory_order_release);
1162
78.7k
    }
1163
100k
  }
1164
1165
  ChildTransactionDataPB PrepareChildTransactionDataUnlocked(
1166
82.5k
      const YBTransactionPtr& transaction) REQUIRES(mutex_) {
1167
82.5k
    ChildTransactionDataPB data;
1168
82.5k
    metadata_.ToPB(data.mutable_metadata());
1169
82.5k
    read_point_.PrepareChildTransactionData(&data);
1170
82.5k
    return data;
1171
82.5k
  }
1172
1173
  void DoPrepareChild(const Status& status,
1174
                      const YBTransactionPtr& transaction,
1175
3.49k
                      PrepareChildCallback callback) EXCLUDES(mutex_) {
1176
3.49k
    TRACE_TO(trace_, __func__);
1177
3.49k
    if (!status.ok()) {
1178
40
      callback(status);
1179
40
      return;
1180
40
    }
1181
1182
3.45k
    ChildTransactionDataPB child_txn_data_pb;
1183
3.45k
    {
1184
3.45k
      std::lock_guard<std::mutex> lock(mutex_);
1185
3.45k
      child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction);
1186
3.45k
    }
1187
1188
3.45k
    callback(child_txn_data_pb);
1189
3.45k
  }
1190
1191
157k
  CHECKED_STATUS CheckCouldCommitUnlocked(SealOnly seal_only) REQUIRES(mutex_) {
1192
157k
    RETURN_NOT_OK(CheckRunningUnlocked());
1193
157k
    if (child_) {
1194
0
      return STATUS(IllegalState, "Commit of child transaction is not allowed");
1195
0
    }
1196
157k
    if (IsRestartRequired()) {
1197
0
      return STATUS(
1198
0
          IllegalState, "Commit of transaction that requires restart is not allowed");
1199
0
    }
1200
157k
    if (!seal_only && running_requests_ > 0) {
1201
0
      return STATUS(IllegalState, "Commit of transaction with running requests");
1202
0
    }
1203
1204
157k
    return Status::OK();
1205
157k
  }
1206
1207
  // The trace buffer.
1208
  scoped_refptr<Trace> trace_;
1209
1210
  const CoarseTimePoint start_;
1211
1212
  // Manager is created once per service.
1213
  TransactionManager* const manager_;
1214
1215
  // Transaction related to this impl.
1216
  YBTransaction* const transaction_;
1217
1218
  TransactionMetadata metadata_;
1219
  ConsistentReadPoint read_point_;
1220
1221
  // Metadata tracking savepoint-related state for the scope of this transaction.
1222
  YBSubTransaction subtransaction_;
1223
1224
  std::atomic<bool> requested_status_tablet_{false};
1225
  internal::RemoteTabletPtr status_tablet_ GUARDED_BY(mutex_);
1226
  std::atomic<TransactionState> state_{TransactionState::kRunning};
1227
1228
  // Transaction is successfully initialized and ready to process intents.
1229
  const bool child_;
1230
  const bool child_had_read_time_ = false;
1231
  bool ready_ GUARDED_BY(mutex_) = false;
1232
  CommitCallback commit_callback_ GUARDED_BY(mutex_);
1233
  Status status_ GUARDED_BY(mutex_);
1234
1235
  // The following fields are initialized in CompleteConstruction() and can be used with no locking.
1236
  std::string log_prefix_;
1237
  rpc::Rpcs::Handle heartbeat_handle_;
1238
  rpc::Rpcs::Handle commit_handle_;
1239
  rpc::Rpcs::Handle abort_handle_;
1240
1241
  struct TabletState {
1242
    size_t num_batches = 0;
1243
    bool has_metadata = false;
1244
1245
0
    std::string ToString() const {
1246
0
      return Format("{ num_batches: $0 has_metadata: $1 }", num_batches, has_metadata);
1247
0
    }
1248
  };
1249
1250
  typedef std::unordered_map<TabletId, TabletState> TabletStates;
1251
1252
  std::mutex mutex_;
1253
  TabletStates tablets_ GUARDED_BY(mutex_);
1254
  std::vector<Waiter> waiters_;
1255
  std::promise<Result<TransactionMetadata>> metadata_promise_;
1256
  std::shared_future<Result<TransactionMetadata>> metadata_future_ GUARDED_BY(mutex_);
1257
  // As of 2021-04-05 running_requests_ reflects number of ops in progress within this transaction
1258
  // only if no in-transaction operations have failed.
1259
  // If in-transaction operation has failed during tablet lookup or it has failed and will be
1260
  // retried by YBSession inside the same transaction - Transaction::Flushed is not getting called
1261
  // and running_requests_ is not updated.
1262
  // For YBSession-level retries Transaction::Flushed will be called when operation is finally
1263
  // successfully flushed, if operation fails after retry - Transaction::Flushed is not getting
1264
  // called.
1265
  // We might need to fix this before turning on transactions sealing.
1266
  // https://github.com/yugabyte/yugabyte-db/issues/7984.
1267
  size_t running_requests_ GUARDED_BY(mutex_) = 0;
1268
  // Set to true after commit record is replicated. Used only during transaction sealing.
1269
  bool commit_replicated_ = false;
1270
};
1271
1272
322k
CoarseTimePoint AdjustDeadline(CoarseTimePoint deadline) {
1273
322k
  if (deadline == CoarseTimePoint()) {
1274
161k
    return TransactionRpcDeadline();
1275
161k
  }
1276
160k
  return deadline;
1277
160k
}
1278
1279
YBTransaction::YBTransaction(TransactionManager* manager, TransactionLocality locality)
1280
243k
    : impl_(new Impl(manager, this, locality)) {
1281
243k
}
1282
1283
YBTransaction::YBTransaction(
1284
    TransactionManager* manager, const TransactionMetadata& metadata, PrivateOnlyTag)
1285
0
    : impl_(new Impl(manager, this, metadata)) {
1286
0
}
1287
1288
YBTransaction::YBTransaction(TransactionManager* manager, ChildTransactionData data)
1289
14.6k
    : impl_(new Impl(manager, this, std::move(data))) {
1290
14.6k
}
1291
1292
253k
YBTransaction::~YBTransaction() {
1293
253k
}
1294
1295
96.3k
void YBTransaction::SetPriority(uint64_t priority) {
1296
96.3k
  impl_->SetPriority(priority);
1297
96.3k
}
1298
1299
20.2k
uint64_t YBTransaction::GetPriority() const {
1300
20.2k
  return impl_->GetPriority();
1301
20.2k
}
1302
1303
238k
Status YBTransaction::Init(IsolationLevel isolation, const ReadHybridTime& read_time) {
1304
238k
  return impl_->Init(isolation, read_time);
1305
238k
}
1306
1307
void YBTransaction::InitWithReadPoint(
1308
    IsolationLevel isolation,
1309
628
    ConsistentReadPoint&& read_point) {
1310
628
  return impl_->InitWithReadPoint(isolation, std::move(read_point));
1311
628
}
1312
1313
2.15M
internal::TxnBatcherIf& YBTransaction::batcher_if() {
1314
2.15M
  return *impl_;
1315
2.15M
}
1316
1317
void YBTransaction::Commit(
1318
78.5k
    CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) {
1319
78.5k
  impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback));
1320
78.5k
}
1321
1322
78.5k
void YBTransaction::Commit(CoarseTimePoint deadline, CommitCallback callback) {
1323
78.5k
  Commit(deadline, SealOnly::kFalse, callback);
1324
78.5k
}
1325
1326
0
void YBTransaction::Commit(CommitCallback callback) {
1327
0
  Commit(CoarseTimePoint(), SealOnly::kFalse, std::move(callback));
1328
0
}
1329
1330
15
const TransactionId& YBTransaction::id() const {
1331
15
  return impl_->id();
1332
15
}
1333
1334
84.6k
IsolationLevel YBTransaction::isolation() const {
1335
84.6k
  return impl_->isolation();
1336
84.6k
}
1337
1338
0
const ConsistentReadPoint& YBTransaction::read_point() const {
1339
0
  return impl_->read_point();
1340
0
}
1341
1342
1.07M
ConsistentReadPoint& YBTransaction::read_point() {
1343
1.07M
  return impl_->read_point();
1344
1.07M
}
1345
1346
std::future<Status> YBTransaction::CommitFuture(
1347
78.6k
    CoarseTimePoint deadline, SealOnly seal_only) {
1348
78.6k
  return MakeFuture<Status>([this, deadline, seal_only](auto callback) {
1349
78.6k
    impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback));
1350
78.6k
  });
1351
78.6k
}
1352
1353
82.9k
void YBTransaction::Abort(CoarseTimePoint deadline) {
1354
82.9k
  impl_->Abort(AdjustDeadline(deadline));
1355
82.9k
}
1356
1357
59.5k
bool YBTransaction::IsRestartRequired() const {
1358
59.5k
  return impl_->IsRestartRequired();
1359
59.5k
}
1360
1361
0
Result<YBTransactionPtr> YBTransaction::CreateRestartedTransaction() {
1362
0
  auto result = impl_->CreateSimilarTransaction();
1363
0
  RETURN_NOT_OK(impl_->FillRestartedTransaction(result->impl_.get()));
1364
0
  return result;
1365
0
}
1366
1367
0
Status YBTransaction::FillRestartedTransaction(const YBTransactionPtr& dest) {
1368
0
  return impl_->FillRestartedTransaction(dest->impl_.get());
1369
0
}
1370
1371
void YBTransaction::PrepareChild(
1372
    ForceConsistentRead force_consistent_read, CoarseTimePoint deadline,
1373
0
    PrepareChildCallback callback) {
1374
0
  return impl_->PrepareChild(force_consistent_read, deadline, std::move(callback));
1375
0
}
1376
1377
std::future<Result<ChildTransactionDataPB>> YBTransaction::PrepareChildFuture(
1378
82.3k
    ForceConsistentRead force_consistent_read, CoarseTimePoint deadline) {
1379
82.3k
  return MakeFuture<Result<ChildTransactionDataPB>>(
1380
81.8k
      [this, deadline, force_consistent_read](auto callback) {
1381
81.8k
    impl_->PrepareChild(force_consistent_read, AdjustDeadline(deadline), std::move(callback));
1382
81.8k
  });
1383
82.3k
}
1384
1385
14.5k
Result<ChildTransactionResultPB> YBTransaction::FinishChild() {
1386
14.5k
  return impl_->FinishChild();
1387
14.5k
}
1388
1389
5.82k
std::shared_future<Result<TransactionMetadata>> YBTransaction::GetMetadata() const {
1390
5.82k
  return impl_->GetMetadata();
1391
5.82k
}
1392
1393
14.1k
Status YBTransaction::ApplyChildResult(const ChildTransactionResultPB& result) {
1394
14.1k
  return impl_->ApplyChildResult(result);
1395
14.1k
}
1396
1397
0
std::string YBTransaction::ToString() const {
1398
0
  return impl_->ToString();
1399
0
}
1400
1401
0
Result<TransactionMetadata> YBTransaction::Release() {
1402
0
  return impl_->Release();
1403
0
}
1404
1405
782k
Trace* YBTransaction::trace() {
1406
782k
  return impl_->trace();
1407
782k
}
1408
1409
YBTransactionPtr YBTransaction::Take(
1410
0
    TransactionManager* manager, const TransactionMetadata& metadata) {
1411
0
  auto result = std::make_shared<YBTransaction>(manager, metadata, PrivateOnlyTag());
1412
0
  result->impl_->StartHeartbeat();
1413
0
  return result;
1414
0
}
1415
1416
48.8k
void YBTransaction::SetActiveSubTransaction(SubTransactionId id) {
1417
48.8k
  return impl_->SetActiveSubTransaction(id);
1418
48.8k
}
1419
1420
23.5k
Status YBTransaction::RollbackSubTransaction(SubTransactionId id) {
1421
23.5k
  return impl_->RollbackSubTransaction(id);
1422
23.5k
}
1423
1424
0
bool YBTransaction::HasSubTransactionState() {
1425
0
  return impl_->HasSubTransactionState();
1426
0
}
1427
1428
} // namespace client
1429
} // namespace yb