YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/transaction.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/client/transaction.h"
17
18
#include <unordered_set>
19
20
#include "yb/client/batcher.h"
21
#include "yb/client/client.h"
22
#include "yb/client/in_flight_op.h"
23
#include "yb/client/meta_cache.h"
24
#include "yb/client/transaction_cleanup.h"
25
#include "yb/client/transaction_manager.h"
26
#include "yb/client/transaction_rpc.h"
27
#include "yb/client/yb_op.h"
28
29
#include "yb/common/common.pb.h"
30
#include "yb/common/transaction.h"
31
#include "yb/common/transaction_error.h"
32
#include "yb/common/ybc_util.h"
33
34
#include "yb/rpc/messenger.h"
35
#include "yb/rpc/rpc.h"
36
#include "yb/rpc/scheduler.h"
37
38
#include "yb/tserver/tserver_service.pb.h"
39
40
#include "yb/util/countdown_latch.h"
41
#include "yb/util/flag_tags.h"
42
#include "yb/util/format.h"
43
#include "yb/util/logging.h"
44
#include "yb/util/random_util.h"
45
#include "yb/util/result.h"
46
#include "yb/util/scope_exit.h"
47
#include "yb/util/status_format.h"
48
#include "yb/util/status_log.h"
49
#include "yb/util/strongly_typed_bool.h"
50
#include "yb/util/trace.h"
51
#include "yb/util/tsan_util.h"
52
#include "yb/util/unique_lock.h"
53
54
using namespace std::literals;
55
using namespace std::placeholders;
56
57
DEFINE_int32(txn_print_trace_every_n, 0,
58
             "Controls the rate at which txn traces are printed. Setting this to 0 "
59
             "disables printing the collected traces.");
60
TAG_FLAG(txn_print_trace_every_n, advanced);
61
TAG_FLAG(txn_print_trace_every_n, runtime);
62
63
DEFINE_int32(txn_slow_op_threshold_ms, 0,
64
             "Controls the rate at which txn traces are printed. Setting this to 0 "
65
             "disables printing the collected traces.");
66
TAG_FLAG(txn_slow_op_threshold_ms, advanced);
67
TAG_FLAG(txn_slow_op_threshold_ms, runtime);
68
69
DEFINE_uint64(transaction_heartbeat_usec, 500000 * yb::kTimeMultiplier,
70
              "Interval of transaction heartbeat in usec.");
71
DEFINE_bool(transaction_disable_heartbeat_in_tests, false, "Disable heartbeat during test.");
72
DECLARE_uint64(max_clock_skew_usec);
73
74
DEFINE_test_flag(int32, transaction_inject_flushed_delay_ms, 0,
75
                 "Inject delay before processing flushed operations by transaction.");
76
77
DEFINE_test_flag(bool, disable_proactive_txn_cleanup_on_abort, false,
78
                "Disable cleanup of intents in abort path.");
79
80
namespace yb {
81
namespace client {
82
83
namespace {
84
85
YB_STRONGLY_TYPED_BOOL(Child);
86
YB_DEFINE_ENUM(TransactionState, (kRunning)(kAborted)(kCommitted)(kReleased)(kSealed));
87
88
} // namespace
89
90
13.8k
Result<ChildTransactionData> ChildTransactionData::FromPB(const ChildTransactionDataPB& data) {
91
13.8k
  ChildTransactionData result;
92
13.8k
  auto metadata = TransactionMetadata::FromPB(data.metadata());
93
13.8k
  RETURN_NOT_OK(metadata);
94
13.8k
  result.metadata = std::move(*metadata);
95
13.8k
  result.read_time = ReadHybridTime::FromReadTimePB(data);
96
13.8k
  for (const auto& entry : data.local_limits()) {
97
0
    result.local_limits.emplace(entry.first, HybridTime(entry.second));
98
0
  }
99
13.8k
  return result;
100
13.8k
}
101
102
YB_DEFINE_ENUM(MetadataState, (kMissing)(kMaybePresent)(kPresent));
103
104
61.7k
void YBSubTransaction::SetActiveSubTransaction(SubTransactionId id) {
105
61.7k
  sub_txn_.subtransaction_id = id;
106
61.7k
  highest_subtransaction_id_ = std::max(highest_subtransaction_id_, id);
107
61.7k
}
108
109
13.5k
CHECKED_STATUS YBSubTransaction::RollbackSubTransaction(SubTransactionId id) {
110
  // We should abort the range [id, sub_txn_.highest_subtransaction_id]. It's possible that we
111
  // have created and released savepoints, such that there have been writes with a
112
  // subtransaction_id greater than sub_txn_.subtransaction_id, and those should be aborted as
113
  // well.
114
13.5k
  SCHECK_GE(
115
13.5k
    highest_subtransaction_id_, id,
116
13.5k
    InternalError,
117
13.5k
    "Attempted to rollback to non-existent savepoint.");
118
13.5k
  return sub_txn_.aborted.SetRange(id, highest_subtransaction_id_);
119
13.5k
}
120
121
52.4k
const SubTransactionMetadata& YBSubTransaction::get() { return sub_txn_; }
122
123
class YBTransaction::Impl final : public internal::TxnBatcherIf {
124
 public:
125
  Impl(TransactionManager* manager, YBTransaction* transaction, TransactionLocality locality)
126
      : trace_(new Trace),
127
        start_(CoarseMonoClock::Now()),
128
        manager_(manager),
129
        transaction_(transaction),
130
        read_point_(manager->clock()),
131
411k
        child_(Child::kFalse) {
132
411k
    metadata_.priority = RandomUniformInt<uint64_t>();
133
411k
    metadata_.locality = locality;
134
411k
    CompleteConstruction();
135
411k
    
VLOG_WITH_PREFIX0
(2) << "Started, metadata: " << metadata_0
;
136
411k
  }
137
138
  Impl(TransactionManager* manager, YBTransaction* transaction, const TransactionMetadata& metadata)
139
      : trace_(new Trace),
140
        start_(CoarseMonoClock::Now()),
141
        manager_(manager),
142
        transaction_(transaction),
143
        metadata_(metadata),
144
        read_point_(manager->clock()),
145
0
        child_(Child::kFalse) {
146
0
    CompleteConstruction();
147
0
    VLOG_WITH_PREFIX(2) << "Taken, metadata: " << metadata_;
148
0
  }
149
150
  Impl(TransactionManager* manager, YBTransaction* transaction, ChildTransactionData data)
151
      : trace_(new Trace),
152
        start_(CoarseMonoClock::Now()),
153
        manager_(manager),
154
        transaction_(transaction),
155
        read_point_(manager->clock()),
156
        child_(Child::kTrue),
157
13.8k
        child_had_read_time_(data.read_time) {
158
    // For serializable isolation we use read intents, so could always read most recent
159
    // version of DB.
160
    // Otherwise there is possible case when we miss value change that happened after transaction
161
    // start.
162
13.8k
    if (
data.metadata.isolation == IsolationLevel::SNAPSHOT_ISOLATION13.8k
) {
163
13.8k
      read_point_.SetReadTime(std::move(data.read_time), std::move(data.local_limits));
164
13.8k
    }
165
13.8k
    metadata_ = std::move(data.metadata);
166
13.8k
    CompleteConstruction();
167
13.8k
    
VLOG_WITH_PREFIX0
(2) << "Started child, metadata: " << metadata_0
;
168
13.8k
    ready_ = true;
169
13.8k
  }
170
171
419k
  ~Impl() {
172
419k
    manager_->rpcs().Abort({&heartbeat_handle_, &commit_handle_, &abort_handle_});
173
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, !waiters_.empty()) << "Non empty waiters";
174
419k
    const auto threshold = GetAtomicFlag(&FLAGS_txn_slow_op_threshold_ms);
175
419k
    const auto now = CoarseMonoClock::Now();
176
419k
    if (trace_->must_print()
177
419k
           || (threshold > 0 && 
ToMilliseconds(now - start_) > threshold0
)) {
178
0
      LOG(INFO) << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n"
179
0
        << trace_->DumpToString(true);
180
419k
    } else {
181
419k
      
YB_LOG_IF_EVERY_N0
(INFO, FLAGS_txn_print_trace_every_n > 0, FLAGS_txn_print_trace_every_n)
182
0
        << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n"
183
0
        << trace_->DumpToString(true);
184
419k
    }
185
419k
  }
186
187
267k
  void SetPriority(uint64_t priority) {
188
267k
    metadata_.priority = priority;
189
267k
  }
190
191
68.9k
  uint64_t GetPriority() const {
192
68.9k
    return metadata_.priority;
193
68.9k
  }
194
195
2
  YBTransactionPtr CreateSimilarTransaction() {
196
2
    return std::make_shared<YBTransaction>(manager_);
197
2
  }
198
199
392k
  CHECKED_STATUS Init(IsolationLevel isolation, const ReadHybridTime& read_time) {
200
392k
    TRACE_TO(trace_, __func__);
201
18.4E
    VLOG_WITH_PREFIX(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", "
202
18.4E
                        << read_time << ")";
203
392k
    if (read_point_.GetReadTime().read.is_valid()) {
204
0
      return STATUS_FORMAT(IllegalState, "Read point already specified: $0",
205
0
                           read_point_.GetReadTime());
206
0
    }
207
208
392k
    if (read_time.read.is_valid()) {
209
0
      read_point_.SetReadTime(read_time, ConsistentReadPoint::HybridTimeMap());
210
0
    }
211
392k
    CompleteInit(isolation);
212
392k
    return Status::OK();
213
392k
  }
214
215
13.4k
  void InitWithReadPoint(IsolationLevel isolation, ConsistentReadPoint&& read_point) {
216
13.4k
    
VLOG_WITH_PREFIX0
(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", "
217
0
                        << read_point.GetReadTime() << ")";
218
219
13.4k
    read_point_.MoveFrom(&read_point);
220
13.4k
    CompleteInit(isolation);
221
13.4k
  }
222
223
191k
  const IsolationLevel isolation() const {
224
191k
    return metadata_.isolation;
225
191k
  }
226
227
  // This transaction is a restarted transaction, so we set it up with data from original one.
228
2
  CHECKED_STATUS FillRestartedTransaction(Impl* other) EXCLUDES(mutex_) {
229
2
    
VLOG_WITH_PREFIX0
(1) << "Setup restart to " << other->ToString()0
;
230
2
    auto transaction = transaction_->shared_from_this();
231
2
    TRACE_TO(trace_, __func__);
232
2
    {
233
2
      std::lock_guard<std::mutex> lock(mutex_);
234
2
      auto state = state_.load(std::memory_order_acquire);
235
2
      if (state != TransactionState::kRunning) {
236
0
        return STATUS_FORMAT(
237
0
            IllegalState, "Restart of completed transaction $0: $1",
238
0
            metadata_.transaction_id, state);
239
0
      }
240
2
      if (!read_point_.IsRestartRequired()) {
241
0
        return STATUS_FORMAT(
242
0
            IllegalState, "Restart of transaction that does not require restart: $0",
243
0
            metadata_.transaction_id);
244
0
      }
245
2
      other->read_point_.MoveFrom(&read_point_);
246
2
      other->read_point_.Restart();
247
2
      other->metadata_.isolation = metadata_.isolation;
248
      // TODO(Piyush): Do we need the below? If yes, prove with a test case and add it.
249
      // other->metadata_.priority = metadata_.priority;
250
2
      if (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
251
0
        other->metadata_.start_time = other->read_point_.GetReadTime().read;
252
2
      } else {
253
2
        other->metadata_.start_time = other->read_point_.Now();
254
2
      }
255
2
      state_.store(TransactionState::kAborted, std::memory_order_release);
256
2
    }
257
0
    DoAbort(TransactionRpcDeadline(), transaction);
258
259
2
    return Status::OK();
260
2
  }
261
262
1.59M
  Trace* trace() {
263
1.59M
    return trace_.get();
264
1.59M
  }
265
266
  CHECKED_STATUS CheckTransactionLocality(internal::InFlightOpsGroupsWithMetadata* ops_info)
267
1.16M
      REQUIRES(mutex_) {
268
1.16M
    if (metadata_.locality != TransactionLocality::LOCAL) {
269
1.16M
      return Status::OK();
270
1.16M
    }
271
34
    for (auto& group : ops_info->groups) {
272
31
      auto& first_op = *group.begin;
273
31
      const auto& tablet = first_op.tablet;
274
31
      const auto& tablet_id = tablet->tablet_id();
275
276
31
      auto tservers = tablet->GetRemoteTabletServers(internal::IncludeFailedReplicas::kTrue);
277
93
      for (const auto &tserver : tservers) {
278
93
        if (!tserver->IsLocalRegion()) {
279
0
          VLOG_WITH_PREFIX(4) << "Aborting (accessing nonlocal tablet in local transaction)";
280
0
          return STATUS_FORMAT(
281
0
              IllegalState, "Nonlocal tablet accessed in local transaction: tablet $0", tablet_id);
282
0
        }
283
93
      }
284
31
    }
285
34
    return Status::OK();
286
34
  }
287
288
  bool Prepare(internal::InFlightOpsGroupsWithMetadata* ops_info,
289
               ForceConsistentRead force_consistent_read,
290
               CoarseTimePoint deadline,
291
               Initial initial,
292
1.56M
               Waiter waiter) override EXCLUDES(mutex_) {
293
18.4E
    VLOG_WITH_PREFIX(2) << "Prepare(" << force_consistent_read << ", " << initial << ", "
294
18.4E
                        << AsString(ops_info->groups) << ")";
295
1.56M
    TRACE_TO(trace_, "Preparing $0 ops", AsString(ops_info->groups.size()));
296
1.56M
    VTRACE_TO(2, trace_, "Preparing $0 ops", AsString(ops_info->groups));
297
298
1.56M
    {
299
1.56M
      UNIQUE_LOCK(lock, mutex_);
300
1.56M
      const bool defer = !ready_;
301
302
1.56M
      if (!defer || 
initial406k
) {
303
1.16M
        Status status = CheckTransactionLocality(ops_info);
304
1.16M
        if (!status.ok()) {
305
0
          VLOG_WITH_PREFIX(2) << "Prepare, rejected: " << status;
306
0
          bool abort = false;
307
0
          auto state = state_.load(std::memory_order_acquire);
308
0
          if (state == TransactionState::kRunning) {
309
            // State will be changed to aborted in SetErrorUnlocked
310
0
            abort = true;
311
0
          }
312
0
          SetErrorUnlocked(status, "Check transaction locality");
313
0
          lock.unlock();
314
0
          if (waiter) {
315
0
            waiter(status);
316
0
          }
317
0
          if (abort) {
318
0
            Abort(TransactionRpcDeadline());
319
0
          }
320
0
          return false;
321
0
        }
322
1.59M
        
for (auto& group : ops_info->groups)1.16M
{
323
1.59M
          auto& first_op = *group.begin;
324
1.59M
          const auto should_add_intents = first_op.yb_op->should_add_intents(metadata_.isolation);
325
1.59M
          const auto& tablet = first_op.tablet;
326
1.59M
          const auto& tablet_id = tablet->tablet_id();
327
328
1.59M
          bool has_metadata;
329
1.59M
          if (initial && 
should_add_intents1.59M
) {
330
1.00M
            auto& tablet_state = tablets_[tablet_id];
331
            // TODO(dtxn) Handle skipped writes, i.e. writes that did not write anything (#3220)
332
1.00M
            first_op.batch_idx = tablet_state.num_batches;
333
1.00M
            ++tablet_state.num_batches;
334
1.00M
            has_metadata = tablet_state.has_metadata;
335
1.00M
          } else {
336
587k
            const auto it = tablets_.find(tablet_id);
337
587k
            has_metadata = it != tablets_.end() && 
it->second.has_metadata377k
;
338
587k
          }
339
1.59M
          group.need_metadata = !has_metadata;
340
1.59M
        }
341
1.16M
      }
342
343
1.56M
      if (defer) {
344
406k
        if (waiter) {
345
406k
          waiters_.push_back(std::move(waiter));
346
406k
        }
347
406k
        lock.unlock();
348
406k
        
VLOG_WITH_PREFIX29
(2) << "Prepare, rejected (not ready, requesting status tablet)"29
;
349
406k
        RequestStatusTablet(deadline);
350
406k
        return false;
351
406k
      }
352
353
      // For serializable isolation we never choose read time, since it always reads latest
354
      // snapshot.
355
      // For snapshot isolation, if read time was not yet picked, we have to choose it now, if there
356
      // multiple tablets that will process first request.
357
1.16M
      SetReadTimeIfNeeded(ops_info->groups.size() > 1 || 
force_consistent_read862k
);
358
1.16M
    }
359
360
0
    ops_info->metadata = {
361
1.16M
      .transaction = metadata_,
362
1.16M
      .subtransaction = subtransaction_.active()
363
1.16M
          ? 
boost::make_optional(subtransaction_.get())50.8k
364
1.16M
          : 
boost::none1.11M
,
365
1.16M
    };
366
367
1.16M
    return true;
368
1.56M
  }
369
370
1.16M
  void ExpectOperations(size_t count) EXCLUDES(mutex_) override {
371
1.16M
    std::lock_guard<std::mutex> lock(mutex_);
372
1.16M
    running_requests_ += count;
373
1.16M
  }
374
375
  void Flushed(
376
      const internal::InFlightOps& ops, const ReadHybridTime& used_read_time,
377
1.59M
      const Status& status) EXCLUDES(mutex_) override {
378
1.59M
    TRACE_TO(trace_, "Flushed $0 ops. with Status $1", ops.size(), status.ToString());
379
18.4E
    VLOG_WITH_PREFIX(5)
380
18.4E
        << "Flushed: " << yb::ToString(ops) << ", used_read_time: " << used_read_time
381
18.4E
        << ", status: " << status;
382
1.59M
    if (FLAGS_TEST_transaction_inject_flushed_delay_ms > 0) {
383
40
      std::this_thread::sleep_for(FLAGS_TEST_transaction_inject_flushed_delay_ms * 1ms);
384
40
    }
385
386
1.59M
    boost::optional<Status> notify_commit_status;
387
1.59M
    bool abort = false;
388
389
1.59M
    CommitCallback commit_callback;
390
1.59M
    {
391
1.59M
      std::lock_guard<std::mutex> lock(mutex_);
392
1.59M
      running_requests_ -= ops.size();
393
394
1.59M
      if (status.ok()) {
395
1.41M
        if (used_read_time && 
metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION392k
) {
396
54.0k
          const bool read_point_already_set = static_cast<bool>(read_point_.GetReadTime());
397
54.0k
#ifndef NDEBUG
398
54.0k
          if (read_point_already_set) {
399
            // Display details of operations before crashing in debug mode.
400
0
            int op_idx = 1;
401
0
            for (const auto& op : ops) {
402
0
              LOG(ERROR) << "Operation " << op_idx << ": " << op.ToString();
403
0
              op_idx++;
404
0
            }
405
0
          }
406
54.0k
#endif
407
54.0k
          
LOG_IF_WITH_PREFIX0
(DFATAL, read_point_already_set)
408
0
              << "Read time already picked (" << read_point_.GetReadTime()
409
0
              << ", but server replied with used read time: " << used_read_time;
410
54.0k
          read_point_.SetReadTime(used_read_time, ConsistentReadPoint::HybridTimeMap());
411
54.0k
        }
412
1.41M
        const std::string* prev_tablet_id = nullptr;
413
9.17M
        for (const auto& op : ops) {
414
9.17M
          if (op.yb_op->applied() && 
op.yb_op->should_add_intents(metadata_.isolation)9.16M
) {
415
8.57M
            const std::string& tablet_id = op.tablet->tablet_id();
416
8.57M
            if (prev_tablet_id == nullptr || 
tablet_id != *prev_tablet_id7.76M
) {
417
810k
              prev_tablet_id = &tablet_id;
418
810k
              tablets_[tablet_id].has_metadata = true;
419
810k
            }
420
8.57M
          }
421
9.17M
        }
422
1.41M
      } else {
423
182k
        const TransactionError txn_err(status);
424
        // We don't abort the txn in case of a kSkipLocking error to make further progress.
425
        // READ COMMITTED isolation retries errors of kConflict and kReadRestart by restarting
426
        // statements instead of the whole txn and hence should avoid aborting the txn in this case
427
        // too.
428
182k
        bool avoid_abort =
429
182k
            (txn_err.value() == TransactionErrorCode::kSkipLocking) ||
430
182k
            (metadata_.isolation == IsolationLevel::READ_COMMITTED &&
431
182k
              
(12.0k
txn_err.value() == TransactionErrorCode::kReadRestartRequired12.0k
||
432
12.0k
                txn_err.value() == TransactionErrorCode::kConflict));
433
182k
        if (!avoid_abort) {
434
170k
          auto state = state_.load(std::memory_order_acquire);
435
18.4E
          VLOG_WITH_PREFIX(4) << "Abort desired, state: " << AsString(state);
436
170k
          if (state == TransactionState::kRunning) {
437
122k
            abort = true;
438
            // State will be changed to aborted in SetError
439
122k
          }
440
441
170k
          SetErrorUnlocked(status, "Flush");
442
170k
        }
443
182k
      }
444
445
1.59M
      if (running_requests_ == 0 && 
commit_replicated_1.15M
) {
446
0
        notify_commit_status = status_;
447
0
        commit_callback = std::move(commit_callback_);
448
0
      }
449
1.59M
    }
450
451
1.59M
    if (notify_commit_status) {
452
0
      VLOG_WITH_PREFIX(4) << "Sealing done: " << *notify_commit_status;
453
0
      commit_callback(*notify_commit_status);
454
0
    }
455
456
1.59M
    if (abort && 
!child_122k
) {
457
122k
      DoAbort(TransactionRpcDeadline(), transaction_->shared_from_this());
458
122k
    }
459
1.59M
  }
460
461
  void Commit(CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback)
462
268k
      EXCLUDES(mutex_) {
463
268k
    auto transaction = transaction_->shared_from_this();
464
268k
    TRACE_TO(trace_, __func__);
465
268k
    {
466
268k
      UNIQUE_LOCK(lock, mutex_);
467
268k
      auto status = CheckCouldCommitUnlocked(seal_only);
468
268k
      if (!status.ok()) {
469
142
        lock.unlock();
470
142
        callback(status);
471
142
        return;
472
142
      }
473
268k
      state_.store(seal_only ? 
TransactionState::kSealed0
: TransactionState::kCommitted,
474
268k
                   std::memory_order_release);
475
268k
      commit_callback_ = std::move(callback);
476
268k
      if (!ready_) {
477
        // If we have not written any intents and do not even have a transaction status tablet,
478
        // just report the transaction as committed.
479
        //
480
        // See https://github.com/yugabyte/yugabyte-db/issues/3105 for details -- we might be able
481
        // to remove this special case if it turns out there is a bug elsewhere.
482
23
        if (tablets_.empty() && running_requests_ == 0) {
483
23
          
VLOG_WITH_PREFIX0
(4) << "Committed empty transaction"0
;
484
23
          auto commit_callback = std::move(commit_callback_);
485
23
          lock.unlock();
486
23
          commit_callback(Status::OK());
487
23
          return;
488
23
        }
489
490
0
        waiters_.emplace_back(std::bind(
491
0
            &Impl::DoCommit, this, deadline, seal_only, _1, transaction));
492
0
        lock.unlock();
493
0
        RequestStatusTablet(deadline);
494
0
        return;
495
23
      }
496
268k
    }
497
268k
    DoCommit(deadline, seal_only, Status::OK(), transaction);
498
268k
  }
499
500
137k
  void Abort(CoarseTimePoint deadline) EXCLUDES(mutex_) {
501
137k
    auto transaction = transaction_->shared_from_this();
502
503
18.4E
    VLOG_WITH_PREFIX(2) << "Abort";
504
137k
    TRACE_TO(trace_, __func__);
505
137k
    {
506
137k
      UNIQUE_LOCK(lock, mutex_);
507
137k
      auto state = state_.load(std::memory_order_acquire);
508
137k
      if (state != TransactionState::kRunning) {
509
122k
        if (state != TransactionState::kAborted) {
510
0
          LOG_WITH_PREFIX(DFATAL)
511
0
              << "Abort of committed transaction: " << AsString(state);
512
122k
        } else {
513
18.4E
          VLOG_WITH_PREFIX(2) << "Already aborted";
514
122k
        }
515
122k
        return;
516
122k
      }
517
15.6k
      if (child_) {
518
0
        LOG_WITH_PREFIX(DFATAL) << "Abort of child transaction";
519
0
        return;
520
0
      }
521
15.6k
      state_.store(TransactionState::kAborted, std::memory_order_release);
522
15.6k
      if (!ready_) {
523
54
        std::vector<Waiter> waiters;
524
54
        waiters_.swap(waiters);
525
54
        lock.unlock();
526
54
        const auto aborted_status = STATUS(Aborted, "Transaction aborted");
527
54
        for(const auto& waiter : waiters) {
528
54
          waiter(aborted_status);
529
54
        }
530
54
        
VLOG_WITH_PREFIX1
(2) << "Aborted transaction not yet ready"1
;
531
54
        return;
532
54
      }
533
15.6k
    }
534
15.6k
    DoAbort(deadline, transaction);
535
15.6k
  }
536
537
379k
  bool IsRestartRequired() const {
538
379k
    return read_point_.IsRestartRequired();
539
379k
  }
540
541
  std::shared_future<Result<TransactionMetadata>> GetMetadata(
542
18.8k
      CoarseTimePoint deadline) EXCLUDES(mutex_) {
543
18.8k
    UNIQUE_LOCK(lock, mutex_);
544
18.8k
    if (metadata_future_.valid()) {
545
0
      return metadata_future_;
546
0
    }
547
18.8k
    metadata_future_ = std::shared_future<Result<TransactionMetadata>>(
548
18.8k
        metadata_promise_.get_future());
549
18.8k
    if (!ready_) {
550
1.03k
      auto transaction = transaction_->shared_from_this();
551
1.03k
      waiters_.push_back([this, transaction](const Status& status) {
552
1.03k
        WARN_NOT_OK(status, "Transaction request failed");
553
1.03k
        if (status.ok()) {
554
1.03k
          metadata_promise_.set_value(metadata_);
555
1.03k
        } else {
556
1
          metadata_promise_.set_value(status);
557
1
        }
558
1.03k
      });
559
1.03k
      lock.unlock();
560
1.03k
      RequestStatusTablet(deadline);
561
1.03k
      lock.lock();
562
1.03k
      return metadata_future_;
563
1.03k
    }
564
565
17.7k
    metadata_promise_.set_value(metadata_);
566
17.7k
    return metadata_future_;
567
18.8k
  }
568
569
  void PrepareChild(
570
      ForceConsistentRead force_consistent_read, CoarseTimePoint deadline,
571
66.8k
      PrepareChildCallback callback) {
572
66.8k
    auto transaction = transaction_->shared_from_this();
573
66.8k
    TRACE_TO(trace_, __func__);
574
66.8k
    UNIQUE_LOCK(lock, mutex_);
575
66.8k
    auto status = CheckRunningUnlocked();
576
66.8k
    if (!status.ok()) {
577
0
      lock.unlock();
578
0
      callback(status);
579
0
      return;
580
0
    }
581
66.8k
    if (IsRestartRequired()) {
582
0
      lock.unlock();
583
0
      callback(STATUS(IllegalState, "Restart required"));
584
0
      return;
585
0
    }
586
587
66.8k
    SetReadTimeIfNeeded(force_consistent_read);
588
589
66.8k
    if (!ready_) {
590
3.23k
      waiters_.emplace_back(std::bind(
591
3.23k
          &Impl::DoPrepareChild, this, _1, transaction, std::move(callback)));
592
3.23k
      lock.unlock();
593
3.23k
      RequestStatusTablet(deadline);
594
3.23k
      return;
595
3.23k
    }
596
597
63.6k
    ChildTransactionDataPB child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction);
598
63.6k
    lock.unlock();
599
63.6k
    callback(child_txn_data_pb);
600
63.6k
  }
601
602
13.7k
  Result<ChildTransactionResultPB> FinishChild() {
603
13.7k
    TRACE_TO(trace_, __func__);
604
13.7k
    UNIQUE_LOCK(lock, mutex_);
605
13.7k
    RETURN_NOT_OK(CheckRunningUnlocked());
606
13.7k
    if (!child_) {
607
0
      return STATUS(IllegalState, "Finish child of non child transaction");
608
0
    }
609
13.7k
    state_.store(TransactionState::kCommitted, std::memory_order_release);
610
13.7k
    ChildTransactionResultPB result;
611
13.7k
    auto& tablets = *result.mutable_tablets();
612
13.7k
    tablets.Reserve(narrow_cast<int>(tablets_.size()));
613
32.9k
    for (const auto& tablet : tablets_) {
614
32.9k
      auto& out = *tablets.Add();
615
32.9k
      out.set_tablet_id(tablet.first);
616
32.9k
      out.set_num_batches(tablet.second.num_batches);
617
32.9k
      out.set_metadata_state(
618
32.9k
          tablet.second.has_metadata ? 
InvolvedTabletMetadataState::EXIST32.6k
619
32.9k
                                     : 
InvolvedTabletMetadataState::MISSING331
);
620
32.9k
    }
621
13.7k
    read_point_.FinishChildTransactionResult(HadReadTime(child_had_read_time_), &result);
622
13.7k
    return result;
623
13.7k
  }
624
625
13.4k
  Status ApplyChildResult(const ChildTransactionResultPB& result) EXCLUDES(mutex_) {
626
13.4k
    TRACE_TO(trace_, __func__);
627
13.4k
    std::vector<std::string> cleanup_tablet_ids;
628
13.4k
    auto se = ScopeExit([this, &cleanup_tablet_ids] {
629
13.4k
      if (cleanup_tablet_ids.empty()) {
630
13.4k
        return;
631
13.4k
      }
632
0
      CleanupTransaction(
633
0
          manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse,
634
0
          CleanupType::kImmediate, cleanup_tablet_ids);
635
0
    });
636
13.4k
    UNIQUE_LOCK(lock, mutex_);
637
13.4k
    if (state_.load(std::memory_order_acquire) == TransactionState::kAborted) {
638
0
      cleanup_tablet_ids.reserve(result.tablets().size());
639
0
      for (const auto& tablet : result.tablets()) {
640
0
        cleanup_tablet_ids.push_back(tablet.tablet_id());
641
0
      }
642
0
    }
643
644
13.4k
    RETURN_NOT_OK(CheckRunningUnlocked());
645
13.4k
    if (child_) {
646
0
      return STATUS(IllegalState, "Apply child result of child transaction");
647
0
    }
648
649
32.6k
    
for (const auto& tablet : result.tablets())13.4k
{
650
32.6k
      auto& tablet_state = tablets_[tablet.tablet_id()];
651
32.6k
      tablet_state.num_batches += tablet.num_batches();
652
32.6k
      tablet_state.has_metadata =
653
32.6k
          tablet_state.has_metadata ||
654
32.6k
          
tablet.metadata_state() == InvolvedTabletMetadataState::EXIST32.4k
;
655
32.6k
    }
656
13.4k
    read_point_.ApplyChildTransactionResult(result);
657
658
13.4k
    return Status::OK();
659
13.4k
  }
660
661
1.18k
  const std::string& LogPrefix() {
662
1.18k
    return log_prefix_;
663
1.18k
  }
664
665
0
  std::string ToString() EXCLUDES(mutex_) {
666
0
    std::lock_guard<std::mutex> lock(mutex_);
667
0
    return Format("{ metadata: $0 state: $1 }", metadata_, state_.load(std::memory_order_acquire));
668
0
  }
669
670
13
  const TransactionId& id() const {
671
13
    return metadata_.transaction_id;
672
13
  }
673
674
1.88M
  ConsistentReadPoint& read_point() {
675
1.88M
    return read_point_;
676
1.88M
  }
677
678
0
  Result<TransactionMetadata> Release() {
679
0
    UNIQUE_LOCK(lock, mutex_);
680
0
    auto state = state_.load(std::memory_order_acquire);
681
0
    if (state != TransactionState::kRunning) {
682
0
      return STATUS_FORMAT(IllegalState, "Attempt to release transaction in the wrong state $0: $1",
683
0
                           metadata_.transaction_id, AsString(state));
684
0
    }
685
0
    state_.store(TransactionState::kReleased, std::memory_order_release);
686
687
0
    if (!ready_) {
688
0
      CountDownLatch latch(1);
689
0
      Status pick_status;
690
0
      auto transaction = transaction_->shared_from_this();
691
0
      waiters_.push_back([&latch, &pick_status](const Status& status) {
692
0
        pick_status = status;
693
0
        latch.CountDown();
694
0
      });
695
0
      lock.unlock();
696
0
      RequestStatusTablet(TransactionRpcDeadline());
697
0
      latch.Wait();
698
0
      RETURN_NOT_OK(pick_status);
699
0
      lock.lock();
700
0
    }
701
0
    return metadata_;
702
0
  }
703
704
0
  void StartHeartbeat() {
705
0
    VLOG_WITH_PREFIX(2) << __PRETTY_FUNCTION__;
706
0
    RequestStatusTablet(TransactionRpcDeadline());
707
0
  }
708
709
61.7k
  void SetActiveSubTransaction(SubTransactionId id) {
710
61.7k
    return subtransaction_.SetActiveSubTransaction(id);
711
61.7k
  }
712
713
13.5k
  CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) {
714
13.5k
    SCHECK(
715
13.5k
        subtransaction_.active(), InternalError,
716
13.5k
        "Attempted to rollback to savepoint before creating any savepoints.");
717
13.5k
    return subtransaction_.RollbackSubTransaction(id);
718
13.5k
  }
719
720
0
  bool HasSubTransactionState() {
721
0
    return subtransaction_.active();
722
0
  }
723
724
 private:
725
425k
  void CompleteConstruction() {
726
425k
    log_prefix_ = Format("$0$1: ", metadata_.transaction_id, child_ ? 
" (CHILD)"13.8k
:
""411k
);
727
425k
    heartbeat_handle_ = manager_->rpcs().InvalidHandle();
728
425k
    commit_handle_ = manager_->rpcs().InvalidHandle();
729
425k
    abort_handle_ = manager_->rpcs().InvalidHandle();
730
425k
  }
731
732
406k
  void CompleteInit(IsolationLevel isolation) {
733
406k
    metadata_.isolation = isolation;
734
    // TODO(Piyush): read_point_ might not represent the correct start time for
735
    // a READ COMMITTED txn since it might have been updated several times
736
    // before a YBTransaction is created. Fix this.
737
406k
    if (read_point_.GetReadTime()) {
738
13.4k
      metadata_.start_time = read_point_.GetReadTime().read;
739
392k
    } else {
740
392k
      metadata_.start_time = read_point_.Now();
741
392k
    }
742
406k
  }
743
744
1.22M
  void SetReadTimeIfNeeded(bool do_it) {
745
1.22M
    if (!read_point_.GetReadTime() && 
do_it423k
&&
746
1.22M
        
(369k
metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION369k
||
747
369k
         
metadata_.isolation == IsolationLevel::READ_COMMITTED204k
)) {
748
171k
      read_point_.SetCurrentReadTime();
749
171k
    }
750
1.22M
  }
751
752
362k
  CHECKED_STATUS CheckRunningUnlocked() REQUIRES(mutex_) {
753
362k
    if (state_.load(std::memory_order_acquire) != TransactionState::kRunning) {
754
71
      auto status = status_;
755
71
      if (status.ok()) {
756
0
        status = STATUS(IllegalState, "Transaction already completed");
757
0
      }
758
71
      return status;
759
71
    }
760
362k
    return Status::OK();
761
362k
  }
762
763
  void DoCommit(
764
      CoarseTimePoint deadline, SealOnly seal_only, const Status& status,
765
268k
      const YBTransactionPtr& transaction) EXCLUDES(mutex_) {
766
268k
    
VLOG_WITH_PREFIX25
(1)
767
25
        << Format("Commit, seal_only: $0, tablets: $1, status: $2",
768
25
                  seal_only, tablets_, status);
769
770
268k
    UNIQUE_LOCK(lock, mutex_);
771
268k
    if (!status.ok()) {
772
0
      VLOG_WITH_PREFIX(4) << "Commit failed: " << status;
773
0
      auto commit_callback = std::move(commit_callback_);
774
0
      lock.unlock();
775
0
      commit_callback(status);
776
0
      return;
777
0
    }
778
779
268k
    tserver::UpdateTransactionRequestPB req;
780
268k
    req.set_tablet_id(status_tablet_->tablet_id());
781
268k
    req.set_propagated_hybrid_time(manager_->Now().ToUint64());
782
268k
    auto& state = *req.mutable_state();
783
268k
    state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
784
    // TODO(savepoints) -- Attach metadata about aborted subtransactions to commit message.
785
268k
    state.set_status(seal_only ? 
TransactionStatus::SEALED0
: TransactionStatus::COMMITTED);
786
268k
    state.mutable_tablets()->Reserve(narrow_cast<int>(tablets_.size()));
787
519k
    for (const auto& tablet : tablets_) {
788
      // If tablet does not have metadata it should not participate in commit.
789
519k
      if (
!seal_only519k
&& !tablet.second.has_metadata) {
790
45
        continue;
791
45
      }
792
519k
      state.add_tablets(tablet.first);
793
519k
      if (seal_only) {
794
0
        state.add_tablet_batches(tablet.second.num_batches);
795
0
      }
796
519k
    }
797
798
    // If we don't have any tablets that have intents written to them, just abort it.
799
    // But notify caller that commit was successful, so it is transparent for him.
800
268k
    if (state.tablets().empty()) {
801
5.87k
      
VLOG_WITH_PREFIX0
(4) << "Committed empty"0
;
802
5.87k
      auto status_tablet = status_tablet_;
803
5.87k
      auto commit_callback = std::move(commit_callback_);
804
5.87k
      lock.unlock();
805
5.87k
      DoAbort(deadline, transaction, status_tablet);
806
5.87k
      commit_callback(Status::OK());
807
5.87k
      return;
808
5.87k
    }
809
810
262k
    if (subtransaction_.active()) {
811
1.54k
      subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set());
812
1.54k
    }
813
814
262k
    manager_->rpcs().RegisterAndStart(
815
262k
        UpdateTransaction(
816
262k
            deadline,
817
262k
            status_tablet_.get(),
818
262k
            manager_->client(),
819
262k
            &req,
820
262k
            [this, transaction](const auto& status, const auto& req, const auto& resp) {
821
262k
              this->CommitDone(status, resp, transaction);
822
262k
            }),
823
262k
        &commit_handle_);
824
262k
  }
825
826
137k
  void DoAbort(CoarseTimePoint deadline, const YBTransactionPtr& transaction) EXCLUDES(mutex_) {
827
137k
    decltype(status_tablet_) status_tablet;
828
137k
    {
829
137k
      std::lock_guard<std::mutex> lock(mutex_);
830
137k
      status_tablet = status_tablet_;
831
137k
    }
832
137k
    DoAbort(deadline, transaction, status_tablet);
833
137k
  }
834
835
  void DoAbort(
836
      CoarseTimePoint deadline,
837
      const YBTransactionPtr& transaction,
838
143k
      internal::RemoteTabletPtr status_tablet) EXCLUDES(mutex_) {
839
143k
    tserver::AbortTransactionRequestPB req;
840
143k
    req.set_tablet_id(status_tablet->tablet_id());
841
143k
    req.set_propagated_hybrid_time(manager_->Now().ToUint64());
842
143k
    req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
843
844
143k
    manager_->rpcs().RegisterAndStart(
845
143k
        AbortTransaction(
846
143k
            deadline,
847
143k
            status_tablet.get(),
848
143k
            manager_->client(),
849
143k
            &req,
850
143k
            std::bind(&Impl::AbortDone, this, _1, _2, transaction)),
851
143k
        &abort_handle_);
852
853
143k
    DoAbortCleanup(transaction, CleanupType::kImmediate);
854
143k
  }
855
856
  void DoAbortCleanup(const YBTransactionPtr& transaction, CleanupType cleanup_type)
857
208k
      EXCLUDES(mutex_) {
858
208k
    if (FLAGS_TEST_disable_proactive_txn_cleanup_on_abort) {
859
0
      VLOG_WITH_PREFIX(1) << "TEST: Disabled proactive transaction cleanup on abort";
860
0
      return;
861
0
    }
862
863
208k
    std::vector<std::string> tablet_ids;
864
208k
    {
865
208k
      std::lock_guard<std::mutex> lock(mutex_);
866
208k
      tablet_ids.reserve(tablets_.size());
867
309k
      for (const auto& tablet : tablets_) {
868
        // We don't check has_metadata here, because intents could be written even in case of
869
        // failure. For instance in case of conflict on unique index.
870
309k
        tablet_ids.push_back(tablet.first);
871
309k
      }
872
208k
      
VLOG_WITH_PREFIX122
(1) << "Cleaning up intents from: " << AsString(tablet_ids)122
;
873
208k
    }
874
875
208k
    CleanupTransaction(
876
208k
        manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse,
877
208k
        cleanup_type, tablet_ids);
878
208k
  }
879
880
  void CommitDone(const Status& status,
881
                  const tserver::UpdateTransactionResponsePB& response,
882
262k
                  const YBTransactionPtr& transaction) {
883
262k
    TRACE_TO(trace_, __func__);
884
18.4E
    VLOG_WITH_PREFIX(1) << "Committed: " << status;
885
886
262k
    UpdateClock(response, manager_);
887
262k
    manager_->rpcs().Unregister(&commit_handle_);
888
889
262k
    Status actual_status = status.IsAlreadyPresent() ? 
Status::OK()0
: status;
890
262k
    CommitCallback commit_callback;
891
262k
    if (state_.load(std::memory_order_acquire) != TransactionState::kCommitted &&
892
262k
        
actual_status.ok()3
) {
893
0
      std::lock_guard<std::mutex> lock(mutex_);
894
0
      commit_replicated_ = true;
895
0
      if (running_requests_ != 0) {
896
0
        return;
897
0
      }
898
0
      commit_callback = std::move(commit_callback_);
899
262k
    } else {
900
262k
      std::lock_guard<std::mutex> lock(mutex_);
901
262k
      commit_callback = std::move(commit_callback_);
902
262k
    }
903
18.4E
    VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status;
904
262k
    commit_callback(actual_status);
905
906
262k
    if (actual_status.IsExpired()) {
907
      // We can't perform immediate cleanup here because the transaction could be committed,
908
      // its APPLY records replicated in all participant tablets, and its status record removed
909
      // from the status tablet.
910
64.6k
      DoAbortCleanup(transaction, CleanupType::kGraceful);
911
64.6k
    }
912
262k
  }
913
914
  void AbortDone(const Status& status,
915
                 const tserver::AbortTransactionResponsePB& response,
916
141k
                 const YBTransactionPtr& transaction) {
917
141k
    TRACE_TO(trace_, __func__);
918
141k
    
VLOG_WITH_PREFIX22
(1) << "Aborted: " << status22
;
919
920
141k
    if (response.has_propagated_hybrid_time()) {
921
141k
      manager_->UpdateClock(HybridTime(response.propagated_hybrid_time()));
922
141k
    }
923
141k
    manager_->rpcs().Unregister(&abort_handle_);
924
141k
  }
925
926
411k
  void RequestStatusTablet(const CoarseTimePoint& deadline) EXCLUDES(mutex_) {
927
411k
    TRACE_TO(trace_, __func__);
928
411k
    bool expected = false;
929
411k
    if (!requested_status_tablet_.compare_exchange_strong(
930
411k
        expected, true, std::memory_order_acq_rel)) {
931
0
      return;
932
0
    }
933
18.4E
    VLOG_WITH_PREFIX(2) << "RequestStatusTablet()";
934
411k
    auto transaction = transaction_->shared_from_this();
935
411k
    if (
metadata_.status_tablet.empty()411k
) {
936
411k
      manager_->PickStatusTablet(
937
411k
          std::bind(&Impl::StatusTabletPicked, this, _1, deadline, transaction),
938
411k
          metadata_.locality);
939
18.4E
    } else {
940
18.4E
      LookupStatusTablet(metadata_.status_tablet, deadline, transaction);
941
18.4E
    }
942
411k
  }
943
944
  void StatusTabletPicked(const Result<std::string>& tablet,
945
                          const CoarseTimePoint& deadline,
946
410k
                          const YBTransactionPtr& transaction) {
947
410k
    TRACE_TO(trace_, __func__);
948
18.4E
    VLOG_WITH_PREFIX(2) << "Picked status tablet: " << tablet;
949
950
410k
    if (!tablet.ok()) {
951
0
      NotifyWaiters(tablet.status(), "Pick status tablet");
952
0
      return;
953
0
    }
954
955
410k
    LookupStatusTablet(*tablet, deadline, transaction);
956
410k
  }
957
958
  void LookupStatusTablet(const std::string& tablet_id,
959
                          const CoarseTimePoint& deadline,
960
410k
                          const YBTransactionPtr& transaction) {
961
410k
    TRACE_TO(trace_, __func__);
962
410k
    manager_->client()->LookupTabletById(
963
410k
        tablet_id,
964
410k
        /* table =*/ nullptr,
965
410k
        master::IncludeInactive::kFalse,
966
410k
        deadline,
967
410k
        std::bind(&Impl::LookupTabletDone, this, _1, transaction),
968
410k
        client::UseCache::kTrue);
969
410k
  }
970
971
  void LookupTabletDone(const Result<client::internal::RemoteTabletPtr>& result,
972
410k
                        const YBTransactionPtr& transaction) {
973
410k
    TRACE_TO(trace_, __func__);
974
18.4E
    VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result);
975
976
410k
    if (!result.ok()) {
977
3
      NotifyWaiters(result.status(), "Lookup tablet");
978
3
      return;
979
3
    }
980
981
410k
    bool precreated;
982
410k
    std::vector<Waiter> waiters;
983
410k
    {
984
410k
      std::lock_guard<std::mutex> lock(mutex_);
985
410k
      status_tablet_ = std::move(*result);
986
410k
      if (
metadata_.status_tablet.empty()410k
) {
987
410k
        metadata_.status_tablet = status_tablet_->tablet_id();
988
410k
        precreated = false;
989
18.4E
      } else {
990
18.4E
        precreated = true;
991
18.4E
        ready_ = true;
992
18.4E
        waiters_.swap(waiters);
993
18.4E
      }
994
410k
    }
995
410k
    if (precreated) {
996
0
      for (const auto& waiter : waiters) {
997
0
        waiter(Status::OK());
998
0
      }
999
0
    }
1000
410k
    SendHeartbeat(precreated ? 
TransactionStatus::PENDING0
: TransactionStatus::CREATED,
1001
410k
                  metadata_.transaction_id, transaction_->shared_from_this());
1002
410k
  }
1003
1004
409k
  void NotifyWaiters(const Status& status, const char* operation) {
1005
409k
    std::vector<Waiter> waiters;
1006
409k
    {
1007
409k
      std::lock_guard<std::mutex> lock(mutex_);
1008
409k
      if (status.ok()) {
1009
409k
        DCHECK(!ready_);
1010
409k
        ready_ = true;
1011
409k
      } else {
1012
19
        SetErrorUnlocked(status, operation);
1013
19
      }
1014
409k
      waiters_.swap(waiters);
1015
409k
    }
1016
409k
    for (const auto& waiter : waiters) {
1017
409k
      waiter(status);
1018
409k
    }
1019
409k
  }
1020
1021
  void SendHeartbeat(TransactionStatus status,
1022
                     const TransactionId& id,
1023
867k
                     const std::weak_ptr<YBTransaction>& weak_transaction) {
1024
867k
    auto transaction = weak_transaction.lock();
1025
867k
    if (!transaction) {
1026
      // Cannot use LOG_WITH_PREFIX here, since this was actually destroyed.
1027
387k
      VLOG
(1) << id << ": Transaction destroyed"179
;
1028
387k
      return;
1029
387k
    }
1030
1031
479k
    auto current_state = state_.load(std::memory_order_acquire);
1032
1033
479k
    if (!AllowHeartbeat(current_state, status)) {
1034
15.3k
      
VLOG_WITH_PREFIX4
(1) << " Send heartbeat cancelled: " << yb::ToString(transaction)4
;
1035
15.3k
      return;
1036
15.3k
    }
1037
1038
18.4E
    VLOG_WITH_PREFIX(4) << __func__ << "(" << TransactionStatus_Name(status) << ")";
1039
1040
464k
    MonoDelta timeout;
1041
464k
    if (status != TransactionStatus::CREATED) {
1042
54.3k
      if (GetAtomicFlag(&FLAGS_transaction_disable_heartbeat_in_tests)) {
1043
0
        HeartbeatDone(Status::OK(), /* request= */ {}, /* response= */ {}, status, transaction);
1044
0
        return;
1045
0
      }
1046
54.3k
      timeout = std::chrono::microseconds(FLAGS_transaction_heartbeat_usec);
1047
410k
    } else {
1048
410k
      timeout = TransactionRpcTimeout();
1049
410k
    }
1050
1051
464k
    tserver::UpdateTransactionRequestPB req;
1052
1053
464k
    internal::RemoteTabletPtr status_tablet;
1054
464k
    {
1055
464k
      std::lock_guard<std::mutex> lock(mutex_);
1056
464k
      status_tablet = status_tablet_;
1057
464k
    }
1058
1059
464k
    req.set_tablet_id(status_tablet->tablet_id());
1060
464k
    req.set_propagated_hybrid_time(manager_->Now().ToUint64());
1061
464k
    auto& state = *req.mutable_state();
1062
    // TODO(savepoints) -- Attach metadata about aborted subtransactions in heartbeat.
1063
464k
    state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
1064
464k
    state.set_status(status);
1065
464k
    manager_->rpcs().RegisterAndStart(
1066
464k
        UpdateTransaction(
1067
464k
            CoarseMonoClock::now() + timeout,
1068
464k
            status_tablet.get(),
1069
464k
            manager_->client(),
1070
464k
            &req,
1071
464k
            std::bind(&Impl::HeartbeatDone, this, _1, _2, _3, status, transaction)),
1072
464k
        &heartbeat_handle_);
1073
464k
  }
1074
1075
480k
  static bool AllowHeartbeat(TransactionState current_state, TransactionStatus status) {
1076
480k
    switch (current_state) {
1077
464k
      case TransactionState::kRunning:
1078
464k
        return true;
1079
0
      case TransactionState::kReleased: FALLTHROUGH_INTENDED;
1080
0
      case TransactionState::kSealed:
1081
0
        return status == TransactionStatus::CREATED;
1082
15.1k
      case TransactionState::kAborted: FALLTHROUGH_INTENDED;
1083
15.3k
      case TransactionState::kCommitted:
1084
15.3k
        return false;
1085
480k
    }
1086
0
    FATAL_INVALID_ENUM_VALUE(TransactionState, current_state);
1087
0
  }
1088
1089
  void HeartbeatDone(Status status,
1090
                     const tserver::UpdateTransactionRequestPB& request,
1091
                     const tserver::UpdateTransactionResponsePB& response,
1092
                     TransactionStatus transaction_status,
1093
463k
                     const YBTransactionPtr& transaction) {
1094
463k
    UpdateClock(response, manager_);
1095
463k
    manager_->rpcs().Unregister(&heartbeat_handle_);
1096
1097
463k
    if (status.ok() && 
transaction_status == TransactionStatus::CREATED462k
) {
1098
409k
      auto decode_result = FullyDecodeTransactionId(request.state().transaction_id());
1099
409k
      if (decode_result.ok()) {
1100
409k
        metadata_.transaction_id = *decode_result;
1101
409k
        auto id_str = AsString(metadata_.transaction_id);
1102
        // It is not fully thread safe, since we don't use mutex to access log_prefix_.
1103
        // But here we just replace characters inplace.
1104
        // It would not crash anyway, and could produce wrong id in the logs.
1105
        // It is ok, since one moment before we would output nil id.
1106
409k
        log_prefix_.replace(0, id_str.length(), id_str);
1107
409k
      } else {
1108
3
        status = decode_result.status();
1109
3
      }
1110
409k
    }
1111
1112
18.4E
    VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", "
1113
18.4E
                        << TransactionStatus_Name(transaction_status) << ")";
1114
1115
463k
    if (status.ok()) {
1116
462k
      if (transaction_status == TransactionStatus::CREATED) {
1117
409k
        NotifyWaiters(Status::OK(), "Heartbeat");
1118
409k
      }
1119
462k
      std::weak_ptr<YBTransaction> weak_transaction(transaction);
1120
462k
      manager_->client()->messenger()->scheduler().Schedule(
1121
462k
          [this, weak_transaction, id = metadata_.transaction_id](const Status&) {
1122
456k
              SendHeartbeat(TransactionStatus::PENDING, id, weak_transaction);
1123
456k
          },
1124
462k
          std::chrono::microseconds(FLAGS_transaction_heartbeat_usec));
1125
462k
    } else {
1126
886
      auto state = state_.load(std::memory_order_acquire);
1127
886
      LOG_WITH_PREFIX(WARNING) << "Send heartbeat failed: " << status << ", state: " << state;
1128
1.18k
      if (
status.IsAborted()886
|| status.IsExpired()) {
1129
        // IsAborted - Service is shutting down, no reason to retry.
1130
        // IsExpired - Transaction expired.
1131
78
        if (transaction_status == TransactionStatus::CREATED) {
1132
0
          NotifyWaiters(status, "Heartbeat");
1133
78
        } else {
1134
78
          SetError(status, "Heartbeat");
1135
78
        }
1136
        // If state is committed, then we should not cleanup.
1137
78
        if (status.IsExpired() && state == TransactionState::kRunning) {
1138
52
          DoAbortCleanup(transaction, CleanupType::kImmediate);
1139
52
        }
1140
78
        return;
1141
78
      }
1142
      // Other errors could have different causes, but we should just retry sending heartbeat
1143
      // in this case.
1144
808
      SendHeartbeat(transaction_status, metadata_.transaction_id, transaction);
1145
808
    }
1146
463k
  }
1147
1148
78
  void SetError(const Status& status, const char* operation) EXCLUDES(mutex_) {
1149
78
    std::lock_guard<std::mutex> lock(mutex_);
1150
78
    SetErrorUnlocked(status, operation);
1151
78
  }
1152
1153
170k
  void SetErrorUnlocked(const Status& status, const char* operation) REQUIRES(mutex_) {
1154
18.4E
    VLOG_WITH_PREFIX(1) << operation << " failed: " << status;
1155
170k
    if (status_.ok()) {
1156
122k
      status_ = status.CloneAndPrepend(operation);
1157
122k
      state_.store(TransactionState::kAborted, std::memory_order_release);
1158
122k
    }
1159
170k
  }
1160
1161
  ChildTransactionDataPB PrepareChildTransactionDataUnlocked(
1162
66.7k
      const YBTransactionPtr& transaction) REQUIRES(mutex_) {
1163
66.7k
    ChildTransactionDataPB data;
1164
66.7k
    metadata_.ToPB(data.mutable_metadata());
1165
66.7k
    read_point_.PrepareChildTransactionData(&data);
1166
66.7k
    return data;
1167
66.7k
  }
1168
1169
  void DoPrepareChild(const Status& status,
1170
                      const YBTransactionPtr& transaction,
1171
3.16k
                      PrepareChildCallback callback) EXCLUDES(mutex_) {
1172
3.16k
    TRACE_TO(trace_, __func__);
1173
3.16k
    if (!status.ok()) {
1174
54
      callback(status);
1175
54
      return;
1176
54
    }
1177
1178
3.11k
    ChildTransactionDataPB child_txn_data_pb;
1179
3.11k
    {
1180
3.11k
      std::lock_guard<std::mutex> lock(mutex_);
1181
3.11k
      child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction);
1182
3.11k
    }
1183
1184
3.11k
    callback(child_txn_data_pb);
1185
3.11k
  }
1186
1187
269k
  CHECKED_STATUS CheckCouldCommitUnlocked(SealOnly seal_only) REQUIRES(mutex_) {
1188
269k
    RETURN_NOT_OK(CheckRunningUnlocked());
1189
268k
    if (child_) {
1190
0
      return STATUS(IllegalState, "Commit of child transaction is not allowed");
1191
0
    }
1192
268k
    if (IsRestartRequired()) {
1193
71
      return STATUS(
1194
71
          IllegalState, "Commit of transaction that requires restart is not allowed");
1195
71
    }
1196
268k
    
if (268k
!seal_only268k
&& running_requests_ > 0) {
1197
0
      return STATUS(IllegalState, "Commit of transaction with running requests");
1198
0
    }
1199
1200
268k
    return Status::OK();
1201
268k
  }
1202
1203
  // The trace buffer.
1204
  scoped_refptr<Trace> trace_;
1205
1206
  const CoarseTimePoint start_;
1207
1208
  // Manager is created once per service.
1209
  TransactionManager* const manager_;
1210
1211
  // Transaction related to this impl.
1212
  YBTransaction* const transaction_;
1213
1214
  TransactionMetadata metadata_;
1215
  ConsistentReadPoint read_point_;
1216
1217
  // Metadata tracking savepoint-related state for the scope of this transaction.
1218
  YBSubTransaction subtransaction_;
1219
1220
  std::atomic<bool> requested_status_tablet_{false};
1221
  internal::RemoteTabletPtr status_tablet_ GUARDED_BY(mutex_);
1222
  std::atomic<TransactionState> state_{TransactionState::kRunning};
1223
1224
  // Transaction is successfully initialized and ready to process intents.
1225
  const bool child_;
1226
  const bool child_had_read_time_ = false;
1227
  bool ready_ GUARDED_BY(mutex_) = false;
1228
  CommitCallback commit_callback_ GUARDED_BY(mutex_);
1229
  Status status_ GUARDED_BY(mutex_);
1230
1231
  // The following fields are initialized in CompleteConstruction() and can be used with no locking.
1232
  std::string log_prefix_;
1233
  rpc::Rpcs::Handle heartbeat_handle_;
1234
  rpc::Rpcs::Handle commit_handle_;
1235
  rpc::Rpcs::Handle abort_handle_;
1236
1237
  struct TabletState {
1238
    size_t num_batches = 0;
1239
    bool has_metadata = false;
1240
1241
0
    std::string ToString() const {
1242
0
      return Format("{ num_batches: $0 has_metadata: $1 }", num_batches, has_metadata);
1243
0
    }
1244
  };
1245
1246
  typedef std::unordered_map<TabletId, TabletState> TabletStates;
1247
1248
  std::mutex mutex_;
1249
  TabletStates tablets_ GUARDED_BY(mutex_);
1250
  std::vector<Waiter> waiters_;
1251
  std::promise<Result<TransactionMetadata>> metadata_promise_;
1252
  std::shared_future<Result<TransactionMetadata>> metadata_future_ GUARDED_BY(mutex_);
1253
  // As of 2021-04-05 running_requests_ reflects number of ops in progress within this transaction
1254
  // only if no in-transaction operations have failed.
1255
  // If in-transaction operation has failed during tablet lookup or it has failed and will be
1256
  // retried by YBSession inside the same transaction - Transaction::Flushed is not getting called
1257
  // and running_requests_ is not updated.
1258
  // For YBSession-level retries Transaction::Flushed will be called when operation is finally
1259
  // successfully flushed, if operation fails after retry - Transaction::Flushed is not getting
1260
  // called.
1261
  // We might need to fix this before turning on transactions sealing.
1262
  // https://github.com/yugabyte/yugabyte-db/issues/7984.
1263
  size_t running_requests_ GUARDED_BY(mutex_) = 0;
1264
  // Set to true after commit record is replicated. Used only during transaction sealing.
1265
  bool commit_replicated_ = false;
1266
};
1267
1268
473k
CoarseTimePoint AdjustDeadline(CoarseTimePoint deadline) {
1269
473k
  if (deadline == CoarseTimePoint()) {
1270
329k
    return TransactionRpcDeadline();
1271
329k
  }
1272
143k
  return deadline;
1273
473k
}
1274
1275
YBTransaction::YBTransaction(TransactionManager* manager, TransactionLocality locality)
1276
411k
    : impl_(new Impl(manager, this, locality)) {
1277
411k
}
1278
1279
YBTransaction::YBTransaction(
1280
    TransactionManager* manager, const TransactionMetadata& metadata, PrivateOnlyTag)
1281
0
    : impl_(new Impl(manager, this, metadata)) {
1282
0
}
1283
1284
YBTransaction::YBTransaction(TransactionManager* manager, ChildTransactionData data)
1285
13.8k
    : impl_(new Impl(manager, this, std::move(data))) {
1286
13.8k
}
1287
1288
418k
YBTransaction::~YBTransaction() {
1289
418k
}
1290
1291
267k
void YBTransaction::SetPriority(uint64_t priority) {
1292
267k
  impl_->SetPriority(priority);
1293
267k
}
1294
1295
69.0k
uint64_t YBTransaction::GetPriority() const {
1296
69.0k
  return impl_->GetPriority();
1297
69.0k
}
1298
1299
392k
Status YBTransaction::Init(IsolationLevel isolation, const ReadHybridTime& read_time) {
1300
392k
  return impl_->Init(isolation, read_time);
1301
392k
}
1302
1303
void YBTransaction::InitWithReadPoint(
1304
    IsolationLevel isolation,
1305
13.4k
    ConsistentReadPoint&& read_point) {
1306
13.4k
  return impl_->InitWithReadPoint(isolation, std::move(read_point));
1307
13.4k
}
1308
1309
4.32M
internal::TxnBatcherIf& YBTransaction::batcher_if() {
1310
4.32M
  return *impl_;
1311
4.32M
}
1312
1313
void YBTransaction::Commit(
1314
77.2k
    CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) {
1315
77.2k
  impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback));
1316
77.2k
}
1317
1318
77.2k
void YBTransaction::Commit(CoarseTimePoint deadline, CommitCallback callback) {
1319
77.2k
  Commit(deadline, SealOnly::kFalse, callback);
1320
77.2k
}
1321
1322
0
void YBTransaction::Commit(CommitCallback callback) {
1323
0
  Commit(CoarseTimePoint(), SealOnly::kFalse, std::move(callback));
1324
0
}
1325
1326
13
const TransactionId& YBTransaction::id() const {
1327
13
  return impl_->id();
1328
13
}
1329
1330
191k
IsolationLevel YBTransaction::isolation() const {
1331
191k
  return impl_->isolation();
1332
191k
}
1333
1334
0
const ConsistentReadPoint& YBTransaction::read_point() const {
1335
0
  return impl_->read_point();
1336
0
}
1337
1338
1.88M
ConsistentReadPoint& YBTransaction::read_point() {
1339
1.88M
  return impl_->read_point();
1340
1.88M
}
1341
1342
std::future<Status> YBTransaction::CommitFuture(
1343
191k
    CoarseTimePoint deadline, SealOnly seal_only) {
1344
191k
  return MakeFuture<Status>([this, deadline, seal_only](auto callback) {
1345
191k
    impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback));
1346
191k
  });
1347
191k
}
1348
1349
138k
void YBTransaction::Abort(CoarseTimePoint deadline) {
1350
138k
  impl_->Abort(AdjustDeadline(deadline));
1351
138k
}
1352
1353
44.1k
bool YBTransaction::IsRestartRequired() const {
1354
44.1k
  return impl_->IsRestartRequired();
1355
44.1k
}
1356
1357
2
Result<YBTransactionPtr> YBTransaction::CreateRestartedTransaction() {
1358
2
  auto result = impl_->CreateSimilarTransaction();
1359
2
  RETURN_NOT_OK(impl_->FillRestartedTransaction(result->impl_.get()));
1360
2
  return result;
1361
2
}
1362
1363
0
Status YBTransaction::FillRestartedTransaction(const YBTransactionPtr& dest) {
1364
0
  return impl_->FillRestartedTransaction(dest->impl_.get());
1365
0
}
1366
1367
void YBTransaction::PrepareChild(
1368
    ForceConsistentRead force_consistent_read, CoarseTimePoint deadline,
1369
0
    PrepareChildCallback callback) {
1370
0
  return impl_->PrepareChild(force_consistent_read, deadline, std::move(callback));
1371
0
}
1372
1373
std::future<Result<ChildTransactionDataPB>> YBTransaction::PrepareChildFuture(
1374
66.5k
    ForceConsistentRead force_consistent_read, CoarseTimePoint deadline) {
1375
66.5k
  return MakeFuture<Result<ChildTransactionDataPB>>(
1376
66.5k
      [this, deadline, force_consistent_read](auto callback) {
1377
66.0k
    impl_->PrepareChild(force_consistent_read, AdjustDeadline(deadline), std::move(callback));
1378
66.0k
  });
1379
66.5k
}
1380
1381
13.7k
Result<ChildTransactionResultPB> YBTransaction::FinishChild() {
1382
13.7k
  return impl_->FinishChild();
1383
13.7k
}
1384
1385
std::shared_future<Result<TransactionMetadata>> YBTransaction::GetMetadata(
1386
18.8k
    CoarseTimePoint deadline) const {
1387
18.8k
  return impl_->GetMetadata(deadline);
1388
18.8k
}
1389
1390
13.4k
Status YBTransaction::ApplyChildResult(const ChildTransactionResultPB& result) {
1391
13.4k
  return impl_->ApplyChildResult(result);
1392
13.4k
}
1393
1394
0
std::string YBTransaction::ToString() const {
1395
0
  return impl_->ToString();
1396
0
}
1397
1398
0
Result<TransactionMetadata> YBTransaction::Release() {
1399
0
  return impl_->Release();
1400
0
}
1401
1402
1.59M
Trace* YBTransaction::trace() {
1403
1.59M
  return impl_->trace();
1404
1.59M
}
1405
1406
YBTransactionPtr YBTransaction::Take(
1407
0
    TransactionManager* manager, const TransactionMetadata& metadata) {
1408
0
  auto result = std::make_shared<YBTransaction>(manager, metadata, PrivateOnlyTag());
1409
0
  result->impl_->StartHeartbeat();
1410
0
  return result;
1411
0
}
1412
1413
61.7k
void YBTransaction::SetActiveSubTransaction(SubTransactionId id) {
1414
61.7k
  return impl_->SetActiveSubTransaction(id);
1415
61.7k
}
1416
1417
13.5k
Status YBTransaction::RollbackSubTransaction(SubTransactionId id) {
1418
13.5k
  return impl_->RollbackSubTransaction(id);
1419
13.5k
}
1420
1421
0
bool YBTransaction::HasSubTransactionState() {
1422
0
  return impl_->HasSubTransactionState();
1423
0
}
1424
1425
} // namespace client
1426
} // namespace yb