YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/running_transaction.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/running_transaction.h"
15
16
#include "yb/client/transaction_rpc.h"
17
18
#include "yb/common/hybrid_time.h"
19
#include "yb/common/pgsql_error.h"
20
21
#include "yb/tablet/transaction_participant_context.h"
22
23
#include "yb/tserver/tserver_service.pb.h"
24
25
#include "yb/util/flag_tags.h"
26
#include "yb/util/logging.h"
27
#include "yb/util/trace.h"
28
#include "yb/util/tsan_util.h"
29
#include "yb/util/yb_pg_errcodes.h"
30
31
using namespace std::placeholders;
32
using namespace std::literals;
33
34
DEFINE_test_flag(uint64, transaction_delay_status_reply_usec_in_tests, 0,
35
                 "For tests only. Delay handling status reply by specified amount of usec.");
36
37
DEFINE_int64(transaction_abort_check_interval_ms, 5000 * yb::kTimeMultiplier,
38
             "Interval to check whether running transaction was aborted.");
39
40
DEFINE_int64(transaction_abort_check_timeout_ms, 30000 * yb::kTimeMultiplier,
41
             "Timeout used when checking for aborted transactions.");
42
43
namespace yb {
44
namespace tablet {
45
46
RunningTransaction::RunningTransaction(TransactionMetadata metadata,
47
                                       const TransactionalBatchData& last_batch_data,
48
                                       OneWayBitmap&& replicated_batches,
49
                                       HybridTime base_time_for_abort_check_ht_calculation,
50
                                       RunningTransactionContext* context)
51
    : metadata_(std::move(metadata)),
52
      last_batch_data_(last_batch_data),
53
      replicated_batches_(std::move(replicated_batches)),
54
      context_(*context),
55
      remove_intents_task_(&context->applier_, &context->participant_context_, context,
56
                           metadata_.transaction_id),
57
      get_status_handle_(context->rpcs_.InvalidHandle()),
58
      abort_handle_(context->rpcs_.InvalidHandle()),
59
      apply_intents_task_(&context->applier_, context, &apply_data_),
60
      abort_check_ht_(base_time_for_abort_check_ht_calculation.AddDelta(
61
1.67M
                          1ms * FLAGS_transaction_abort_check_interval_ms)) {
62
1.67M
}
63
64
1.67M
RunningTransaction::~RunningTransaction() {
65
1.67M
  context_.rpcs_.Abort({&get_status_handle_, &abort_handle_});
66
1.67M
}
67
68
void RunningTransaction::AddReplicatedBatch(
69
2.31M
  size_t batch_idx, boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
70
2.31M
  
VLOG_WITH_PREFIX1.93k
(4) << __func__ << "(" << batch_idx << ")"1.93k
;
71
2.31M
  replicated_batches_.Set(batch_idx);
72
2.31M
  encoded_replicated_batches->push_back(docdb::ValueTypeAsChar::kBitSet);
73
2.31M
  replicated_batches_.EncodeTo(encoded_replicated_batches);
74
2.31M
}
75
76
2.31M
void RunningTransaction::BatchReplicated(const TransactionalBatchData& value) {
77
2.31M
  
VLOG_WITH_PREFIX190
(4) << __func__ << "(" << value.ToString() << ")"190
;
78
2.31M
  last_batch_data_ = value;
79
2.31M
}
80
81
void RunningTransaction::SetLocalCommitData(
82
1.30M
    HybridTime time, const AbortedSubTransactionSet& aborted_subtxn_set) {
83
1.30M
  local_commit_aborted_subtxn_set_ = aborted_subtxn_set;
84
1.30M
  local_commit_time_ = time;
85
1.30M
  last_known_status_hybrid_time_ = local_commit_time_;
86
1.30M
  last_known_status_ = TransactionStatus::COMMITTED;
87
1.30M
}
88
89
0
void RunningTransaction::Aborted() {
90
0
  VLOG_WITH_PREFIX(4) << __func__ << "()";
91
92
0
  last_known_status_ = TransactionStatus::ABORTED;
93
0
  last_known_status_hybrid_time_ = HybridTime::kMax;
94
0
}
95
96
void RunningTransaction::RequestStatusAt(const StatusRequest& request,
97
404k
                                         std::unique_lock<std::mutex>* lock) {
98
404k
  DCHECK_LE(request.global_limit_ht, HybridTime::kMax);
99
404k
  DCHECK_LE(request.read_ht, request.global_limit_ht);
100
101
404k
  if (last_known_status_hybrid_time_ > HybridTime::kMin) {
102
224k
    auto transaction_status =
103
224k
        GetStatusAt(request.global_limit_ht, last_known_status_hybrid_time_, last_known_status_);
104
    // If we don't have status at global_limit_ht, then we should request updated status.
105
224k
    if (transaction_status) {
106
84.2k
      HybridTime last_known_status_hybrid_time = last_known_status_hybrid_time_;
107
84.2k
      AbortedSubTransactionSet local_commit_aborted_subtxn_set;
108
84.2k
      if (transaction_status == TransactionStatus::COMMITTED) {
109
29.9k
        local_commit_aborted_subtxn_set = local_commit_aborted_subtxn_set_;
110
29.9k
      }
111
84.2k
      lock->unlock();
112
84.2k
      request.callback(TransactionStatusResult{
113
84.2k
          *transaction_status, last_known_status_hybrid_time, local_commit_aborted_subtxn_set});
114
84.2k
      return;
115
84.2k
    }
116
224k
  }
117
319k
  bool was_empty = status_waiters_.empty();
118
319k
  status_waiters_.push_back(request);
119
319k
  if (!was_empty) {
120
45.8k
    return;
121
45.8k
  }
122
273k
  auto request_id = context_.NextRequestIdUnlocked();
123
273k
  auto shared_self = shared_from_this();
124
125
18.4E
  VLOG_WITH_PREFIX(4) << Format(
126
18.4E
      "Existing status knowledge ($0, $1) does not satisfy requested: $2, sending: $3",
127
18.4E
      TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_, request,
128
18.4E
      request_id);
129
130
273k
  lock->unlock();
131
273k
  SendStatusRequest(request_id, shared_self);
132
273k
}
133
134
1.41M
bool RunningTransaction::WasAborted() const {
135
1.41M
  return last_known_status_ == TransactionStatus::ABORTED;
136
1.41M
}
137
138
1.23M
CHECKED_STATUS RunningTransaction::CheckAborted() const {
139
1.23M
  if (WasAborted()) {
140
855
    return MakeAbortedStatus(id());
141
855
  }
142
1.23M
  return Status::OK();
143
1.23M
}
144
145
void RunningTransaction::Abort(client::YBClient* client,
146
                               TransactionStatusCallback callback,
147
56.1k
                               std::unique_lock<std::mutex>* lock) {
148
56.1k
  if (last_known_status_ == TransactionStatus::ABORTED ||
149
56.1k
      
last_known_status_ == TransactionStatus::COMMITTED55.6k
) {
150
    // Transaction is already in final state, so no reason to send abort request.
151
2.48k
    
VLOG_WITH_PREFIX0
(3) << "Abort shortcut: " << last_known_status_0
;
152
2.48k
    TransactionStatusResult status{last_known_status_, last_known_status_hybrid_time_};
153
2.48k
    lock->unlock();
154
2.48k
    callback(status);
155
2.48k
    return;
156
2.48k
  }
157
53.6k
  bool was_empty = abort_waiters_.empty();
158
53.6k
  abort_waiters_.push_back(std::move(callback));
159
53.6k
  lock->unlock();
160
53.6k
  
VLOG_WITH_PREFIX1
(3) << "Abort request: " << was_empty1
;
161
53.6k
  if (!was_empty) {
162
10.8k
    return;
163
10.8k
  }
164
42.7k
  tserver::AbortTransactionRequestPB req;
165
42.7k
  req.set_tablet_id(metadata_.status_tablet);
166
42.7k
  req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
167
42.7k
  req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64());
168
42.7k
  context_.rpcs_.RegisterAndStart(
169
42.7k
      client::AbortTransaction(
170
42.7k
          TransactionRpcDeadline(),
171
42.7k
          nullptr /* tablet */,
172
42.7k
          client,
173
42.7k
          &req,
174
42.7k
          std::bind(&RunningTransaction::AbortReceived, this, _1, _2, shared_from_this())),
175
42.7k
      &abort_handle_);
176
42.7k
}
177
178
0
std::string RunningTransaction::ToString() const {
179
0
  return Format("{ metadata: $0 last_batch_data: $1 replicated_batches: $2 local_commit_time: $3 "
180
0
                    "last_known_status: $4 last_known_status_hybrid_time: $5 }",
181
0
                metadata_, last_batch_data_, replicated_batches_, local_commit_time_,
182
0
                TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_);
183
0
}
184
185
1.67M
void RunningTransaction::ScheduleRemoveIntents(const RunningTransactionPtr& shared_self) {
186
1.67M
  if (remove_intents_task_.Prepare(shared_self)) {
187
1.66M
    context_.participant_context_.StrandEnqueue(&remove_intents_task_);
188
18.4E
    VLOG_WITH_PREFIX(1) << "Intents should be removed asynchronously";
189
1.66M
  }
190
1.67M
}
191
192
boost::optional<TransactionStatus> RunningTransaction::GetStatusAt(
193
    HybridTime time,
194
    HybridTime last_known_status_hybrid_time,
195
544k
    TransactionStatus last_known_status) {
196
544k
  switch (last_known_status) {
197
110k
    case TransactionStatus::ABORTED:
198
110k
      return TransactionStatus::ABORTED;
199
79.1k
    case TransactionStatus::COMMITTED:
200
79.1k
      return last_known_status_hybrid_time > time
201
79.1k
          ? 
TransactionStatus::PENDING2.88k
202
79.1k
          : 
TransactionStatus::COMMITTED76.2k
;
203
355k
    case TransactionStatus::PENDING:
204
355k
      if (last_known_status_hybrid_time >= time) {
205
154k
        return TransactionStatus::PENDING;
206
154k
      }
207
200k
      return boost::none;
208
0
    default:
209
0
      FATAL_INVALID_ENUM_VALUE(TransactionStatus, last_known_status);
210
544k
  }
211
544k
}
212
213
void RunningTransaction::SendStatusRequest(
214
273k
    int64_t serial_no, const RunningTransactionPtr& shared_self) {
215
273k
  TRACE_FUNC();
216
273k
  VTRACE(1, yb::ToString(metadata_.transaction_id));
217
273k
  tserver::GetTransactionStatusRequestPB req;
218
273k
  req.set_tablet_id(metadata_.status_tablet);
219
273k
  req.add_transaction_id()->assign(
220
273k
      pointer_cast<const char*>(metadata_.transaction_id.data()), metadata_.transaction_id.size());
221
273k
  req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64());
222
273k
  context_.rpcs_.RegisterAndStart(
223
273k
      client::GetTransactionStatus(
224
273k
          TransactionRpcDeadline(),
225
273k
          nullptr /* tablet */,
226
273k
          context_.participant_context_.client_future().get(),
227
273k
          &req,
228
273k
          std::bind(&RunningTransaction::StatusReceived, this, _1, _2, serial_no, shared_self)),
229
273k
      &get_status_handle_);
230
273k
}
231
232
void RunningTransaction::StatusReceived(
233
    const Status& status,
234
    const tserver::GetTransactionStatusResponsePB& response,
235
    int64_t serial_no,
236
273k
    const RunningTransactionPtr& shared_self) {
237
273k
  auto delay_usec = FLAGS_TEST_transaction_delay_status_reply_usec_in_tests;
238
273k
  if (delay_usec > 0) {
239
0
    context_.delayer().Delay(
240
0
        MonoTime::Now() + MonoDelta::FromMicroseconds(delay_usec),
241
0
        std::bind(&RunningTransaction::DoStatusReceived, this, status, response,
242
0
                  serial_no, shared_self));
243
273k
  } else {
244
273k
    DoStatusReceived(status, response, serial_no, shared_self);
245
273k
  }
246
273k
}
247
248
bool RunningTransaction::UpdateStatus(
249
    TransactionStatus transaction_status, HybridTime time_of_status,
250
301k
    HybridTime coordinator_safe_time, AbortedSubTransactionSet aborted_subtxn_set) {
251
  // Check for local_commit_time_ is not required for correctness, but useful for optimization.
252
  // So we could avoid unnecessary actions.
253
301k
  if (local_commit_time_) {
254
10.3k
    return false;
255
10.3k
  }
256
257
291k
  if (transaction_status == TransactionStatus::COMMITTED) {
258
31.5k
    local_commit_aborted_subtxn_set_ = aborted_subtxn_set;
259
31.5k
  }
260
261
291k
  if (transaction_status == TransactionStatus::ABORTED && 
coordinator_safe_time77.8k
) {
262
77.8k
    time_of_status = coordinator_safe_time;
263
77.8k
  }
264
291k
  last_known_status_hybrid_time_ = time_of_status;
265
266
291k
  if (transaction_status == last_known_status_) {
267
104k
    return false;
268
104k
  }
269
270
187k
  last_known_status_ = transaction_status;
271
272
187k
  return transaction_status == TransactionStatus::ABORTED;
273
291k
}
274
275
void RunningTransaction::DoStatusReceived(const Status& status,
276
                                          const tserver::GetTransactionStatusResponsePB& response,
277
                                          int64_t serial_no,
278
273k
                                          const RunningTransactionPtr& shared_self) {
279
273k
  TRACE("$0: $1", __func__, response.ShortDebugString());
280
18.4E
  VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", "
281
18.4E
                      << serial_no << ")";
282
283
273k
  if (response.has_propagated_hybrid_time()) {
284
273k
    context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
285
273k
  }
286
287
273k
  context_.rpcs_.Unregister(&get_status_handle_);
288
273k
  decltype(status_waiters_) status_waiters;
289
273k
  HybridTime time_of_status = HybridTime::kMin;
290
273k
  TransactionStatus transaction_status = TransactionStatus::PENDING;
291
273k
  AbortedSubTransactionSet aborted_subtxn_set;
292
273k
  const bool ok = status.ok();
293
273k
  int64_t new_request_id = -1;
294
273k
  {
295
273k
    MinRunningNotifier min_running_notifier(&context_.applier_);
296
273k
    std::unique_lock<std::mutex> lock(context_.mutex_);
297
273k
    if (!ok) {
298
1
      status_waiters_.swap(status_waiters);
299
1
      lock.unlock();
300
1
      for (const auto& waiter : status_waiters) {
301
1
        waiter.callback(status);
302
1
      }
303
1
      return;
304
1
    }
305
306
273k
    if (response.status_hybrid_time().size() != 1 ||
307
273k
        response.status().size() != 1 ||
308
273k
        (response.aborted_subtxn_set().size() != 0 && 
response.aborted_subtxn_set().size() != 1273k
)) {
309
0
      LOG_WITH_PREFIX(DFATAL)
310
0
          << "Wrong number of status, status hybrid time, or aborted subtxn set entries, "
311
0
          << "exactly one entry expected: "
312
0
          << response.ShortDebugString();
313
273k
    } else if (PREDICT_FALSE(response.aborted_subtxn_set().empty())) {
314
0
      YB_LOG_EVERY_N(WARNING, 1)
315
0
          << "Empty aborted_subtxn_set in transaction status response. "
316
0
          << "This should only happen when nodes are on different versions, e.g. during upgrade.";
317
273k
    } else {
318
273k
      auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB(
319
273k
          response.aborted_subtxn_set(0).set());
320
273k
      if (
aborted_subtxn_set_or_status.ok()273k
) {
321
273k
        time_of_status = HybridTime(response.status_hybrid_time()[0]);
322
273k
        transaction_status = response.status(0);
323
273k
        aborted_subtxn_set = aborted_subtxn_set_or_status.get();
324
18.4E
      } else {
325
18.4E
        LOG_WITH_PREFIX(DFATAL)
326
18.4E
            << "Could not deserialize AbortedSubTransactionSet: "
327
18.4E
            << "error - " << aborted_subtxn_set_or_status.status().ToString()
328
18.4E
            << " response - " << response.ShortDebugString();
329
18.4E
      }
330
273k
    }
331
332
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, response.coordinator_safe_time().size() > 1)
333
18.4E
        << "Wrong number of coordinator safe time entries, at most one expected: "
334
18.4E
        << response.ShortDebugString();
335
273k
    auto coordinator_safe_time = response.coordinator_safe_time().size() == 1
336
273k
        ? 
HybridTime::FromPB(response.coordinator_safe_time(0))58.1k
:
HybridTime()215k
;
337
273k
    auto did_abort_txn = UpdateStatus(
338
273k
        transaction_status, time_of_status, coordinator_safe_time, aborted_subtxn_set);
339
273k
    if (did_abort_txn) {
340
56.2k
      context_.EnqueueRemoveUnlocked(id(), RemoveReason::kStatusReceived, &min_running_notifier);
341
56.2k
    }
342
343
273k
    time_of_status = last_known_status_hybrid_time_;
344
273k
    transaction_status = last_known_status_;
345
273k
    aborted_subtxn_set = local_commit_aborted_subtxn_set_;
346
347
273k
    status_waiters = ExtractFinishedStatusWaitersUnlocked(
348
273k
        serial_no, time_of_status, transaction_status);
349
273k
    if (!status_waiters_.empty()) {
350
37
      new_request_id = context_.NextRequestIdUnlocked();
351
37
      
VLOG_WITH_PREFIX0
(4) << "Waiters still present, send new status request: " << new_request_id0
;
352
37
    }
353
273k
  }
354
273k
  if (new_request_id >= 0) {
355
37
    SendStatusRequest(new_request_id, shared_self);
356
37
  }
357
273k
  NotifyWaiters(serial_no, time_of_status, transaction_status, aborted_subtxn_set, status_waiters);
358
273k
}
359
360
std::vector<StatusRequest> RunningTransaction::ExtractFinishedStatusWaitersUnlocked(
361
274k
    int64_t serial_no, HybridTime time_of_status, TransactionStatus transaction_status) {
362
274k
  if (transaction_status == TransactionStatus::ABORTED) {
363
57.6k
    return std::move(status_waiters_);
364
57.6k
  }
365
216k
  std::vector<StatusRequest> result;
366
216k
  result.reserve(status_waiters_.size());
367
216k
  auto w = status_waiters_.begin();
368
477k
  for (auto it = status_waiters_.begin(); it != status_waiters_.end(); 
++it261k
) {
369
261k
    if (it->serial_no <= serial_no ||
370
261k
        
GetStatusAt(it->global_limit_ht, time_of_status, transaction_status)78
||
371
261k
        
time_of_status < it->read_ht51
) {
372
261k
      result.push_back(std::move(*it));
373
261k
    } else {
374
48
      if (w != it) {
375
37
        *w = std::move(*it);
376
37
      }
377
48
      ++w;
378
48
    }
379
261k
  }
380
216k
  status_waiters_.erase(w, status_waiters_.end());
381
216k
  return result;
382
274k
}
383
384
void RunningTransaction::NotifyWaiters(int64_t serial_no, HybridTime time_of_status,
385
                                       TransactionStatus transaction_status,
386
                                       const AbortedSubTransactionSet& aborted_subtxn_set,
387
273k
                                       const std::vector<StatusRequest>& status_waiters) {
388
319k
  for (const auto& waiter : status_waiters) {
389
319k
    auto status_for_waiter = GetStatusAt(
390
319k
        waiter.global_limit_ht, time_of_status, transaction_status);
391
319k
    if (status_for_waiter) {
392
      // We know status at global_limit_ht, so could notify waiter.
393
259k
      auto result = TransactionStatusResult{*status_for_waiter, time_of_status};
394
259k
      if (result.status == TransactionStatus::COMMITTED) {
395
46.2k
        result.aborted_subtxn_set = aborted_subtxn_set;
396
46.2k
      }
397
259k
      waiter.callback(std::move(result));
398
259k
    } else 
if (59.9k
time_of_status >= waiter.read_ht59.9k
) {
399
      // It means that between read_ht and global_limit_ht transaction was pending.
400
      // It implies that transaction was not committed before request was sent.
401
      // We could safely respond PENDING to caller.
402
11.0k
      
LOG_IF_WITH_PREFIX0
(DFATAL, waiter.serial_no > serial_no)
403
0
          << "Notify waiter with request id greater than id of status request: "
404
0
          << waiter.serial_no << " vs " << serial_no;
405
11.0k
      waiter.callback(TransactionStatusResult{TransactionStatus::PENDING, time_of_status});
406
48.9k
    } else {
407
48.9k
      waiter.callback(STATUS(TryAgain,
408
48.9k
          Format("Cannot determine transaction status with read_ht $0, and global_limit_ht $1, "
409
48.9k
                 "last known: $2 at $3", waiter.read_ht, waiter.global_limit_ht,
410
48.9k
                 TransactionStatus_Name(transaction_status), time_of_status), Slice(),
411
48.9k
          PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) ));
412
48.9k
    }
413
319k
  }
414
273k
}
415
416
Result<TransactionStatusResult> RunningTransaction::MakeAbortResult(
417
    const Status& status,
418
42.7k
    const tserver::AbortTransactionResponsePB& response) {
419
42.7k
  if (!status.ok()) {
420
0
    return status;
421
0
  }
422
423
42.7k
  HybridTime status_time = response.has_status_hybrid_time()
424
42.7k
       ? 
HybridTime(response.status_hybrid_time())22.5k
425
42.7k
       : 
HybridTime::kInvalid20.1k
;
426
42.7k
  return TransactionStatusResult{response.status(), status_time, AbortedSubTransactionSet()};
427
42.7k
}
428
429
void RunningTransaction::AbortReceived(const Status& status,
430
                                       const tserver::AbortTransactionResponsePB& response,
431
42.7k
                                       const RunningTransactionPtr& shared_self) {
432
42.7k
  if (response.has_propagated_hybrid_time()) {
433
42.7k
    context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
434
42.7k
  }
435
436
42.7k
  decltype(abort_waiters_) abort_waiters;
437
42.7k
  auto result = MakeAbortResult(status, response);
438
439
42.7k
  
VLOG_WITH_PREFIX3
(3) << "AbortReceived: " << yb::ToString(result)3
;
440
441
42.7k
  {
442
42.7k
    MinRunningNotifier min_running_notifier(&context_.applier_);
443
42.7k
    std::lock_guard<std::mutex> lock(context_.mutex_);
444
42.7k
    context_.rpcs_.Unregister(&abort_handle_);
445
42.7k
    abort_waiters_.swap(abort_waiters);
446
    // kMax status_time means that this status is not yet replicated and could be rejected.
447
    // So we could use it as reply to Abort, but cannot store it as transaction status.
448
42.7k
    if (
result.ok()42.7k
&& result->status_time != HybridTime::kMax) {
449
26.0k
      auto coordinator_safe_time = HybridTime::FromPB(response.coordinator_safe_time());
450
26.0k
      if (UpdateStatus(
451
26.0k
          result->status, result->status_time, coordinator_safe_time, result->aborted_subtxn_set)) {
452
19.7k
        context_.EnqueueRemoveUnlocked(id(), RemoveReason::kAbortReceived, &min_running_notifier);
453
19.7k
      }
454
26.0k
    }
455
42.7k
  }
456
53.6k
  for (const auto& waiter : abort_waiters) {
457
53.6k
    waiter(result);
458
53.6k
  }
459
42.7k
}
460
461
0
std::string RunningTransaction::LogPrefix() const {
462
0
  return Format(
463
0
      "$0 ID $1: ", context_.LogPrefix().substr(0, context_.LogPrefix().length() - 2), id());
464
0
}
465
466
859
Status MakeAbortedStatus(const TransactionId& id) {
467
859
  return STATUS(
468
859
      TryAgain, Format("Transaction aborted: $0", id), Slice(),
469
859
      PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE));
470
859
}
471
472
void RunningTransaction::SetApplyData(const docdb::ApplyTransactionState& apply_state,
473
                                      const TransactionApplyData* data,
474
243
                                      ScopedRWOperation* operation) {
475
  // TODO(savepoints): Add test to ensure that apply_state.aborted is properly set here.
476
243
  apply_state_ = apply_state;
477
243
  bool active = apply_state_.active();
478
243
  if (active) {
479
    // We are trying to assign set processing apply before starting actual process, and unset
480
    // after we complete processing.
481
183
    processing_apply_.store(true, std::memory_order_release);
482
183
  }
483
484
243
  if (data) {
485
69
    if (!active) {
486
0
      LOG_WITH_PREFIX(DFATAL)
487
0
          << "Starting processing apply, but provided data in inactive state: " << data->ToString();
488
0
      return;
489
0
    }
490
491
69
    apply_data_ = *data;
492
69
    apply_data_.apply_state = &apply_state_;
493
494
69
    
LOG_IF_WITH_PREFIX0
(DFATAL, local_commit_time_ != data->commit_ht)
495
0
        << "Commit time does not match: " << local_commit_time_ << " vs " << data->commit_ht;
496
497
69
    if (apply_intents_task_.Prepare(shared_from_this(), operation)) {
498
69
      context_.participant_context_.StrandEnqueue(&apply_intents_task_);
499
69
    } else {
500
0
      LOG_WITH_PREFIX(DFATAL) << "Unable to prepare apply intents task";
501
0
    }
502
69
  }
503
504
243
  if (!active) {
505
60
    processing_apply_.store(false, std::memory_order_release);
506
507
60
    
VLOG_WITH_PREFIX0
(3) << "Finished applying intents"0
;
508
509
60
    MinRunningNotifier min_running_notifier(&context_.applier_);
510
60
    std::lock_guard<std::mutex> lock(context_.mutex_);
511
60
    context_.RemoveUnlocked(id(), RemoveReason::kLargeApplied, &min_running_notifier);
512
60
  }
513
243
}
514
515
1.30M
void RunningTransaction::SetOpId(const OpId& id) {
516
1.30M
  opId.index = id.index;
517
1.30M
  opId.term = id.term;
518
1.30M
}
519
520
2.41M
bool RunningTransaction::ProcessingApply() const {
521
2.41M
  return processing_apply_.load(std::memory_order_acquire);
522
2.41M
}
523
524
3.39k
void RunningTransaction::UpdateAbortCheckHT(HybridTime now, UpdateAbortCheckHTMode mode) {
525
3.39k
  if (last_known_status_ == TransactionStatus::ABORTED ||
526
3.39k
      
last_known_status_ == TransactionStatus::COMMITTED3.35k
) {
527
199
    abort_check_ht_ = HybridTime::kMax;
528
199
    return;
529
199
  }
530
  // When we send a status request, we schedule the transaction status to be re-checked around the
531
  // same time the request is supposed to time out. When we get a status response (normal case, no
532
  // timeout), we go back to the normal interval of re-checking the status of this transaction.
533
3.19k
  auto delta_ms = mode == UpdateAbortCheckHTMode::kStatusRequestSent
534
3.19k
      ? 
FLAGS_transaction_abort_check_timeout_ms1.59k
535
3.19k
      : 
FLAGS_transaction_abort_check_interval_ms1.59k
;
536
3.19k
  abort_check_ht_ = now.AddDelta(1ms * delta_ms);
537
3.19k
}
538
539
} // namespace tablet
540
} // namespace yb