YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/running_transaction.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/running_transaction.h"
15
16
#include "yb/client/transaction_rpc.h"
17
18
#include "yb/common/hybrid_time.h"
19
#include "yb/common/pgsql_error.h"
20
21
#include "yb/tablet/transaction_participant_context.h"
22
23
#include "yb/tserver/tserver_service.pb.h"
24
25
#include "yb/util/flag_tags.h"
26
#include "yb/util/logging.h"
27
#include "yb/util/trace.h"
28
#include "yb/util/tsan_util.h"
29
#include "yb/util/yb_pg_errcodes.h"
30
31
using namespace std::placeholders;
32
using namespace std::literals;
33
34
DEFINE_test_flag(uint64, transaction_delay_status_reply_usec_in_tests, 0,
35
                 "For tests only. Delay handling status reply by specified amount of usec.");
36
37
DEFINE_int64(transaction_abort_check_interval_ms, 5000 * yb::kTimeMultiplier,
38
             "Interval to check whether running transaction was aborted.");
39
40
DEFINE_int64(transaction_abort_check_timeout_ms, 30000 * yb::kTimeMultiplier,
41
             "Timeout used when checking for aborted transactions.");
42
43
namespace yb {
44
namespace tablet {
45
46
RunningTransaction::RunningTransaction(TransactionMetadata metadata,
47
                                       const TransactionalBatchData& last_batch_data,
48
                                       OneWayBitmap&& replicated_batches,
49
                                       HybridTime base_time_for_abort_check_ht_calculation,
50
                                       RunningTransactionContext* context)
51
    : metadata_(std::move(metadata)),
52
      last_batch_data_(last_batch_data),
53
      replicated_batches_(std::move(replicated_batches)),
54
      context_(*context),
55
      remove_intents_task_(&context->applier_, &context->participant_context_, context,
56
                           metadata_.transaction_id),
57
      get_status_handle_(context->rpcs_.InvalidHandle()),
58
      abort_handle_(context->rpcs_.InvalidHandle()),
59
      apply_intents_task_(&context->applier_, context, &apply_data_),
60
      abort_check_ht_(base_time_for_abort_check_ht_calculation.AddDelta(
61
968k
                          1ms * FLAGS_transaction_abort_check_interval_ms)) {
62
968k
}
63
64
968k
RunningTransaction::~RunningTransaction() {
65
968k
  context_.rpcs_.Abort({&get_status_handle_, &abort_handle_});
66
968k
}
67
68
void RunningTransaction::AddReplicatedBatch(
69
1.10M
  size_t batch_idx, boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
70
538
  VLOG_WITH_PREFIX(4) << __func__ << "(" << batch_idx << ")";
71
1.10M
  replicated_batches_.Set(batch_idx);
72
1.10M
  encoded_replicated_batches->push_back(docdb::ValueTypeAsChar::kBitSet);
73
1.10M
  replicated_batches_.EncodeTo(encoded_replicated_batches);
74
1.10M
}
75
76
1.26M
void RunningTransaction::BatchReplicated(const TransactionalBatchData& value) {
77
116
  VLOG_WITH_PREFIX(4) << __func__ << "(" << value.ToString() << ")";
78
1.26M
  last_batch_data_ = value;
79
1.26M
}
80
81
void RunningTransaction::SetLocalCommitData(
82
737k
    HybridTime time, const AbortedSubTransactionSet& aborted_subtxn_set) {
83
737k
  local_commit_aborted_subtxn_set_ = aborted_subtxn_set;
84
737k
  local_commit_time_ = time;
85
737k
  last_known_status_hybrid_time_ = local_commit_time_;
86
737k
  last_known_status_ = TransactionStatus::COMMITTED;
87
737k
}
88
89
0
void RunningTransaction::Aborted() {
90
0
  VLOG_WITH_PREFIX(4) << __func__ << "()";
91
92
0
  last_known_status_ = TransactionStatus::ABORTED;
93
0
  last_known_status_hybrid_time_ = HybridTime::kMax;
94
0
}
95
96
void RunningTransaction::RequestStatusAt(const StatusRequest& request,
97
263k
                                         std::unique_lock<std::mutex>* lock) {
98
263k
  DCHECK_LE(request.global_limit_ht, HybridTime::kMax);
99
263k
  DCHECK_LE(request.read_ht, request.global_limit_ht);
100
101
263k
  if (last_known_status_hybrid_time_ > HybridTime::kMin) {
102
160k
    auto transaction_status =
103
160k
        GetStatusAt(request.global_limit_ht, last_known_status_hybrid_time_, last_known_status_);
104
    // If we don't have status at global_limit_ht, then we should request updated status.
105
160k
    if (transaction_status) {
106
42.7k
      HybridTime last_known_status_hybrid_time = last_known_status_hybrid_time_;
107
42.7k
      AbortedSubTransactionSet local_commit_aborted_subtxn_set;
108
42.7k
      if (transaction_status == TransactionStatus::COMMITTED) {
109
13.8k
        local_commit_aborted_subtxn_set = local_commit_aborted_subtxn_set_;
110
13.8k
      }
111
42.7k
      lock->unlock();
112
42.7k
      request.callback(TransactionStatusResult{
113
42.7k
          *transaction_status, last_known_status_hybrid_time, local_commit_aborted_subtxn_set});
114
42.7k
      return;
115
42.7k
    }
116
220k
  }
117
220k
  bool was_empty = status_waiters_.empty();
118
220k
  status_waiters_.push_back(request);
119
220k
  if (!was_empty) {
120
18.5k
    return;
121
18.5k
  }
122
202k
  auto request_id = context_.NextRequestIdUnlocked();
123
202k
  auto shared_self = shared_from_this();
124
125
2
  VLOG_WITH_PREFIX(4) << Format(
126
2
      "Existing status knowledge ($0, $1) does not satisfy requested: $2, sending: $3",
127
2
      TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_, request,
128
2
      request_id);
129
130
202k
  lock->unlock();
131
202k
  SendStatusRequest(request_id, shared_self);
132
202k
}
133
134
586k
bool RunningTransaction::WasAborted() const {
135
586k
  return last_known_status_ == TransactionStatus::ABORTED;
136
586k
}
137
138
461k
CHECKED_STATUS RunningTransaction::CheckAborted() const {
139
461k
  if (WasAborted()) {
140
0
    return MakeAbortedStatus(id());
141
0
  }
142
461k
  return Status::OK();
143
461k
}
144
145
void RunningTransaction::Abort(client::YBClient* client,
146
                               TransactionStatusCallback callback,
147
38.7k
                               std::unique_lock<std::mutex>* lock) {
148
38.7k
  if (last_known_status_ == TransactionStatus::ABORTED ||
149
38.5k
      last_known_status_ == TransactionStatus::COMMITTED) {
150
    // Transaction is already in final state, so no reason to send abort request.
151
0
    VLOG_WITH_PREFIX(3) << "Abort shortcut: " << last_known_status_;
152
1.08k
    TransactionStatusResult status{last_known_status_, last_known_status_hybrid_time_};
153
1.08k
    lock->unlock();
154
1.08k
    callback(status);
155
1.08k
    return;
156
1.08k
  }
157
37.6k
  bool was_empty = abort_waiters_.empty();
158
37.6k
  abort_waiters_.push_back(std::move(callback));
159
37.6k
  lock->unlock();
160
0
  VLOG_WITH_PREFIX(3) << "Abort request: " << was_empty;
161
37.6k
  if (!was_empty) {
162
4.22k
    return;
163
4.22k
  }
164
33.4k
  tserver::AbortTransactionRequestPB req;
165
33.4k
  req.set_tablet_id(metadata_.status_tablet);
166
33.4k
  req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
167
33.4k
  req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64());
168
33.4k
  context_.rpcs_.RegisterAndStart(
169
33.4k
      client::AbortTransaction(
170
33.4k
          TransactionRpcDeadline(),
171
33.4k
          nullptr /* tablet */,
172
33.4k
          client,
173
33.4k
          &req,
174
33.4k
          std::bind(&RunningTransaction::AbortReceived, this, _1, _2, shared_from_this())),
175
33.4k
      &abort_handle_);
176
33.4k
}
177
178
0
std::string RunningTransaction::ToString() const {
179
0
  return Format("{ metadata: $0 last_batch_data: $1 replicated_batches: $2 local_commit_time: $3 "
180
0
                    "last_known_status: $4 last_known_status_hybrid_time: $5 }",
181
0
                metadata_, last_batch_data_, replicated_batches_, local_commit_time_,
182
0
                TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_);
183
0
}
184
185
968k
void RunningTransaction::ScheduleRemoveIntents(const RunningTransactionPtr& shared_self) {
186
968k
  if (remove_intents_task_.Prepare(shared_self)) {
187
967k
    context_.participant_context_.StrandEnqueue(&remove_intents_task_);
188
18.4E
    VLOG_WITH_PREFIX(1) << "Intents should be removed asynchronously";
189
967k
  }
190
968k
}
191
192
boost::optional<TransactionStatus> RunningTransaction::GetStatusAt(
193
    HybridTime time,
194
    HybridTime last_known_status_hybrid_time,
195
381k
    TransactionStatus last_known_status) {
196
381k
  switch (last_known_status) {
197
72.1k
    case TransactionStatus::ABORTED:
198
72.1k
      return TransactionStatus::ABORTED;
199
38.2k
    case TransactionStatus::COMMITTED:
200
38.2k
      return last_known_status_hybrid_time > time
201
1.42k
          ? TransactionStatus::PENDING
202
36.8k
          : TransactionStatus::COMMITTED;
203
271k
    case TransactionStatus::PENDING:
204
271k
      if (last_known_status_hybrid_time >= time) {
205
91.9k
        return TransactionStatus::PENDING;
206
91.9k
      }
207
179k
      return boost::none;
208
0
    default:
209
0
      FATAL_INVALID_ENUM_VALUE(TransactionStatus, last_known_status);
210
381k
  }
211
381k
}
212
213
void RunningTransaction::SendStatusRequest(
214
202k
    int64_t serial_no, const RunningTransactionPtr& shared_self) {
215
202k
  TRACE_FUNC();
216
202k
  VTRACE(1, yb::ToString(metadata_.transaction_id));
217
202k
  tserver::GetTransactionStatusRequestPB req;
218
202k
  req.set_tablet_id(metadata_.status_tablet);
219
202k
  req.add_transaction_id()->assign(
220
202k
      pointer_cast<const char*>(metadata_.transaction_id.data()), metadata_.transaction_id.size());
221
202k
  req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64());
222
202k
  context_.rpcs_.RegisterAndStart(
223
202k
      client::GetTransactionStatus(
224
202k
          TransactionRpcDeadline(),
225
202k
          nullptr /* tablet */,
226
202k
          context_.participant_context_.client_future().get(),
227
202k
          &req,
228
202k
          std::bind(&RunningTransaction::StatusReceived, this, _1, _2, serial_no, shared_self)),
229
202k
      &get_status_handle_);
230
202k
}
231
232
void RunningTransaction::StatusReceived(
233
    const Status& status,
234
    const tserver::GetTransactionStatusResponsePB& response,
235
    int64_t serial_no,
236
202k
    const RunningTransactionPtr& shared_self) {
237
202k
  auto delay_usec = FLAGS_TEST_transaction_delay_status_reply_usec_in_tests;
238
202k
  if (delay_usec > 0) {
239
0
    context_.delayer().Delay(
240
0
        MonoTime::Now() + MonoDelta::FromMicroseconds(delay_usec),
241
0
        std::bind(&RunningTransaction::DoStatusReceived, this, status, response,
242
0
                  serial_no, shared_self));
243
202k
  } else {
244
202k
    DoStatusReceived(status, response, serial_no, shared_self);
245
202k
  }
246
202k
}
247
248
bool RunningTransaction::UpdateStatus(
249
    TransactionStatus transaction_status, HybridTime time_of_status,
250
213k
    HybridTime coordinator_safe_time, AbortedSubTransactionSet aborted_subtxn_set) {
251
  // Check for local_commit_time_ is not required for correctness, but useful for optimization.
252
  // So we could avoid unnecessary actions.
253
213k
  if (local_commit_time_) {
254
5.00k
    return false;
255
5.00k
  }
256
257
208k
  if (transaction_status == TransactionStatus::COMMITTED) {
258
17.1k
    local_commit_aborted_subtxn_set_ = aborted_subtxn_set;
259
17.1k
  }
260
261
208k
  if (transaction_status == TransactionStatus::ABORTED && coordinator_safe_time) {
262
56.1k
    time_of_status = coordinator_safe_time;
263
56.1k
  }
264
208k
  last_known_status_hybrid_time_ = time_of_status;
265
266
208k
  if (transaction_status == last_known_status_) {
267
100k
    return false;
268
100k
  }
269
270
107k
  last_known_status_ = transaction_status;
271
272
107k
  return transaction_status == TransactionStatus::ABORTED;
273
107k
}
274
275
void RunningTransaction::DoStatusReceived(const Status& status,
276
                                          const tserver::GetTransactionStatusResponsePB& response,
277
                                          int64_t serial_no,
278
202k
                                          const RunningTransactionPtr& shared_self) {
279
202k
  TRACE("$0: $1", __func__, response.ShortDebugString());
280
68
  VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", "
281
68
                      << serial_no << ")";
282
283
202k
  if (response.has_propagated_hybrid_time()) {
284
202k
    context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
285
202k
  }
286
287
202k
  context_.rpcs_.Unregister(&get_status_handle_);
288
202k
  decltype(status_waiters_) status_waiters;
289
202k
  HybridTime time_of_status = HybridTime::kMin;
290
202k
  TransactionStatus transaction_status = TransactionStatus::PENDING;
291
202k
  AbortedSubTransactionSet aborted_subtxn_set;
292
202k
  const bool ok = status.ok();
293
202k
  int64_t new_request_id = -1;
294
202k
  {
295
202k
    MinRunningNotifier min_running_notifier(&context_.applier_);
296
202k
    std::unique_lock<std::mutex> lock(context_.mutex_);
297
202k
    if (!ok) {
298
0
      status_waiters_.swap(status_waiters);
299
0
      lock.unlock();
300
0
      for (const auto& waiter : status_waiters) {
301
0
        waiter.callback(status);
302
0
      }
303
0
      return;
304
0
    }
305
306
202k
    if (response.status_hybrid_time().size() == 1 &&
307
202k
        response.status().size() == 1 &&
308
202k
        response.aborted_subtxn_set().size() == 1) {
309
202k
      auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB(
310
202k
          response.aborted_subtxn_set(0).set());
311
202k
      if (aborted_subtxn_set_or_status.ok()) {
312
202k
        time_of_status = HybridTime(response.status_hybrid_time()[0]);
313
202k
        transaction_status = response.status(0);
314
202k
        aborted_subtxn_set = aborted_subtxn_set_or_status.get();
315
18.4E
      } else {
316
18.4E
        LOG_WITH_PREFIX(DFATAL)
317
18.4E
            << "Could not deserialize AbortedSubTransactionSet: "
318
18.4E
            << "error - " << aborted_subtxn_set_or_status.status().ToString()
319
18.4E
            << " response - " << response.ShortDebugString();
320
18.4E
      }
321
18.4E
    } else {
322
18.4E
      LOG_WITH_PREFIX(DFATAL)
323
18.4E
          << "Wrong number of status, status hybrid time, or aborted subtxn set entries, "
324
18.4E
          << "exactly one entry expected: "
325
18.4E
          << response.ShortDebugString();
326
18.4E
    }
327
328
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, response.coordinator_safe_time().size() > 1)
329
18.4E
        << "Wrong number of coordinator safe time entries, at most one expected: "
330
18.4E
        << response.ShortDebugString();
331
202k
    auto coordinator_safe_time = response.coordinator_safe_time().size() == 1
332
153k
        ? HybridTime::FromPB(response.coordinator_safe_time(0)) : HybridTime();
333
202k
    auto did_abort_txn = UpdateStatus(
334
202k
        transaction_status, time_of_status, coordinator_safe_time, aborted_subtxn_set);
335
202k
    if (did_abort_txn) {
336
47.7k
      context_.EnqueueRemoveUnlocked(id(), RemoveReason::kStatusReceived, &min_running_notifier);
337
47.7k
    }
338
339
202k
    time_of_status = last_known_status_hybrid_time_;
340
202k
    transaction_status = last_known_status_;
341
202k
    aborted_subtxn_set = local_commit_aborted_subtxn_set_;
342
343
202k
    status_waiters = ExtractFinishedStatusWaitersUnlocked(
344
202k
        serial_no, time_of_status, transaction_status);
345
202k
    if (!status_waiters_.empty()) {
346
1
      new_request_id = context_.NextRequestIdUnlocked();
347
0
      VLOG_WITH_PREFIX(4) << "Waiters still present, send new status request: " << new_request_id;
348
1
    }
349
202k
  }
350
202k
  if (new_request_id >= 0) {
351
1
    SendStatusRequest(new_request_id, shared_self);
352
1
  }
353
202k
  NotifyWaiters(serial_no, time_of_status, transaction_status, aborted_subtxn_set, status_waiters);
354
202k
}
355
356
std::vector<StatusRequest> RunningTransaction::ExtractFinishedStatusWaitersUnlocked(
357
202k
    int64_t serial_no, HybridTime time_of_status, TransactionStatus transaction_status) {
358
202k
  if (transaction_status == TransactionStatus::ABORTED) {
359
48.4k
    return std::move(status_waiters_);
360
48.4k
  }
361
153k
  std::vector<StatusRequest> result;
362
153k
  result.reserve(status_waiters_.size());
363
153k
  auto w = status_waiters_.begin();
364
325k
  for (auto it = status_waiters_.begin(); it != status_waiters_.end(); ++it) {
365
171k
    if (it->serial_no <= serial_no ||
366
5
        GetStatusAt(it->global_limit_ht, time_of_status, transaction_status) ||
367
171k
        time_of_status < it->read_ht) {
368
171k
      result.push_back(std::move(*it));
369
25
    } else {
370
25
      if (w != it) {
371
4
        *w = std::move(*it);
372
4
      }
373
25
      ++w;
374
25
    }
375
171k
  }
376
153k
  status_waiters_.erase(w, status_waiters_.end());
377
153k
  return result;
378
153k
}
379
380
void RunningTransaction::NotifyWaiters(int64_t serial_no, HybridTime time_of_status,
381
                                       TransactionStatus transaction_status,
382
                                       const AbortedSubTransactionSet& aborted_subtxn_set,
383
202k
                                       const std::vector<StatusRequest>& status_waiters) {
384
220k
  for (const auto& waiter : status_waiters) {
385
220k
    auto status_for_waiter = GetStatusAt(
386
220k
        waiter.global_limit_ht, time_of_status, transaction_status);
387
220k
    if (status_for_waiter) {
388
      // We know status at global_limit_ht, so could notify waiter.
389
159k
      auto result = TransactionStatusResult{*status_for_waiter, time_of_status};
390
159k
      if (result.status == TransactionStatus::COMMITTED) {
391
23.0k
        result.aborted_subtxn_set = aborted_subtxn_set;
392
23.0k
      }
393
159k
      waiter.callback(std::move(result));
394
61.2k
    } else if (time_of_status >= waiter.read_ht) {
395
      // It means that between read_ht and global_limit_ht transaction was pending.
396
      // It implies that transaction was not committed before request was sent.
397
      // We could safely respond PENDING to caller.
398
0
      LOG_IF_WITH_PREFIX(DFATAL, waiter.serial_no > serial_no)
399
0
          << "Notify waiter with request id greater than id of status request: "
400
0
          << waiter.serial_no << " vs " << serial_no;
401
16.9k
      waiter.callback(TransactionStatusResult{TransactionStatus::PENDING, time_of_status});
402
44.2k
    } else {
403
44.2k
      waiter.callback(STATUS(TryAgain,
404
44.2k
          Format("Cannot determine transaction status with read_ht $0, and global_limit_ht $1, "
405
44.2k
                 "last known: $2 at $3", waiter.read_ht, waiter.global_limit_ht,
406
44.2k
                 TransactionStatus_Name(transaction_status), time_of_status), Slice(),
407
44.2k
          PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) ));
408
44.2k
    }
409
220k
  }
410
202k
}
411
412
Result<TransactionStatusResult> RunningTransaction::MakeAbortResult(
413
    const Status& status,
414
33.4k
    const tserver::AbortTransactionResponsePB& response) {
415
33.4k
  if (!status.ok()) {
416
0
    return status;
417
0
  }
418
419
33.4k
  HybridTime status_time = response.has_status_hybrid_time()
420
25.7k
       ? HybridTime(response.status_hybrid_time())
421
7.71k
       : HybridTime::kInvalid;
422
33.4k
  return TransactionStatusResult{response.status(), status_time, AbortedSubTransactionSet()};
423
33.4k
}
424
425
void RunningTransaction::AbortReceived(const Status& status,
426
                                       const tserver::AbortTransactionResponsePB& response,
427
33.4k
                                       const RunningTransactionPtr& shared_self) {
428
33.4k
  if (response.has_propagated_hybrid_time()) {
429
33.4k
    context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
430
33.4k
  }
431
432
33.4k
  decltype(abort_waiters_) abort_waiters;
433
33.4k
  auto result = MakeAbortResult(status, response);
434
435
18.4E
  VLOG_WITH_PREFIX(3) << "AbortReceived: " << yb::ToString(result);
436
437
33.4k
  {
438
33.4k
    MinRunningNotifier min_running_notifier(&context_.applier_);
439
33.4k
    std::lock_guard<std::mutex> lock(context_.mutex_);
440
33.4k
    context_.rpcs_.Unregister(&abort_handle_);
441
33.4k
    abort_waiters_.swap(abort_waiters);
442
    // kMax status_time means that this status is not yet replicated and could be rejected.
443
    // So we could use it as reply to Abort, but cannot store it as transaction status.
444
33.4k
    if (result.ok() && result->status_time != HybridTime::kMax) {
445
10.3k
      auto coordinator_safe_time = HybridTime::FromPB(response.coordinator_safe_time());
446
10.3k
      if (UpdateStatus(
447
7.49k
          result->status, result->status_time, coordinator_safe_time, result->aborted_subtxn_set)) {
448
7.49k
        context_.EnqueueRemoveUnlocked(id(), RemoveReason::kAbortReceived, &min_running_notifier);
449
7.49k
      }
450
10.3k
    }
451
33.4k
  }
452
37.6k
  for (const auto& waiter : abort_waiters) {
453
37.6k
    waiter(result);
454
37.6k
  }
455
33.4k
}
456
457
0
std::string RunningTransaction::LogPrefix() const {
458
0
  return Format(
459
0
      "$0 ID $1: ", context_.LogPrefix().substr(0, context_.LogPrefix().length() - 2), id());
460
0
}
461
462
0
Status MakeAbortedStatus(const TransactionId& id) {
463
0
  return STATUS(
464
0
      TryAgain, Format("Transaction aborted: $0", id), Slice(),
465
0
      PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE));
466
0
}
467
468
void RunningTransaction::SetApplyData(const docdb::ApplyTransactionState& apply_state,
469
                                      const TransactionApplyData* data,
470
2
                                      ScopedRWOperation* operation) {
471
  // TODO(savepoints): Add test to ensure that apply_state.aborted is properly set here.
472
2
  apply_state_ = apply_state;
473
2
  bool active = apply_state_.active();
474
2
  if (active) {
475
    // We are trying to assign set processing apply before starting actual process, and unset
476
    // after we complete processing.
477
1
    processing_apply_.store(true, std::memory_order_release);
478
1
  }
479
480
2
  if (data) {
481
1
    if (!active) {
482
0
      LOG_WITH_PREFIX(DFATAL)
483
0
          << "Starting processing apply, but provided data in inactive state: " << data->ToString();
484
0
      return;
485
0
    }
486
487
1
    apply_data_ = *data;
488
1
    apply_data_.apply_state = &apply_state_;
489
490
0
    LOG_IF_WITH_PREFIX(DFATAL, local_commit_time_ != data->commit_ht)
491
0
        << "Commit time does not match: " << local_commit_time_ << " vs " << data->commit_ht;
492
493
1
    if (apply_intents_task_.Prepare(shared_from_this(), operation)) {
494
1
      context_.participant_context_.StrandEnqueue(&apply_intents_task_);
495
0
    } else {
496
0
      LOG_WITH_PREFIX(DFATAL) << "Unable to prepare apply intents task";
497
0
    }
498
1
  }
499
500
2
  if (!active) {
501
1
    processing_apply_.store(false, std::memory_order_release);
502
503
0
    VLOG_WITH_PREFIX(3) << "Finished applying intents";
504
505
1
    MinRunningNotifier min_running_notifier(&context_.applier_);
506
1
    std::lock_guard<std::mutex> lock(context_.mutex_);
507
1
    context_.RemoveUnlocked(id(), RemoveReason::kLargeApplied, &min_running_notifier);
508
1
  }
509
2
}
510
511
642k
void RunningTransaction::SetOpId(const OpId& id) {
512
642k
  opId.index = id.index;
513
642k
  opId.term = id.term;
514
642k
}
515
516
1.41M
bool RunningTransaction::ProcessingApply() const {
517
1.41M
  return processing_apply_.load(std::memory_order_acquire);
518
1.41M
}
519
520
862
void RunningTransaction::UpdateAbortCheckHT(HybridTime now, UpdateAbortCheckHTMode mode) {
521
862
  if (last_known_status_ == TransactionStatus::ABORTED ||
522
855
      last_known_status_ == TransactionStatus::COMMITTED) {
523
84
    abort_check_ht_ = HybridTime::kMax;
524
84
    return;
525
84
  }
526
  // When we send a status request, we schedule the transaction status to be re-checked around the
527
  // same time the request is supposed to time out. When we get a status response (normal case, no
528
  // timeout), we go back to the normal interval of re-checking the status of this transaction.
529
778
  auto delta_ms = mode == UpdateAbortCheckHTMode::kStatusRequestSent
530
382
      ? FLAGS_transaction_abort_check_timeout_ms
531
396
      : FLAGS_transaction_abort_check_interval_ms;
532
778
  abort_check_ht_ = now.AddDelta(1ms * delta_ms);
533
778
}
534
535
} // namespace tablet
536
} // namespace yb