YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/apply_intents_task.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/apply_intents_task.h"
15
16
#include "yb/docdb/docdb.h"
17
18
#include "yb/gutil/dynamic_annotations.h"
19
#include "yb/tablet/running_transaction.h"
20
21
#include "yb/util/flag_tags.h"
22
#include "yb/util/logging.h"
23
#include "yb/util/monotime.h"
24
#include "yb/util/status_log.h"
25
26
using namespace std::literals;
27
28
DEFINE_int64(apply_intents_task_injected_delay_ms, 0,
29
             "Inject such delay before applying intents for large transactions. "
30
             "Could be used to throttle the apply speed.");
31
32
DEFINE_test_flag(int32, pause_and_skip_apply_intents_task_loop_ms, 0,
33
                 "If set to a value greater than zero, each loop of the apply intents task will "
34
                 "sleep for the specified duration and continue without doing apply work.");
35
36
namespace yb {
37
namespace tablet {
38
39
ApplyIntentsTask::ApplyIntentsTask(TransactionIntentApplier* applier,
40
                                   RunningTransactionContext* running_transaction_context,
41
                                   const TransactionApplyData* apply_data)
42
    : applier_(*applier), running_transaction_context_(*running_transaction_context),
43
968k
      apply_data_(*apply_data) {}
44
45
1
bool ApplyIntentsTask::Prepare(RunningTransactionPtr transaction, ScopedRWOperation* operation) {
46
1
  bool expected = false;
47
1
  if (!used_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
48
0
    return false;
49
0
  }
50
51
1
  transaction_ = std::move(transaction);
52
1
  operation_ = std::move(*DCHECK_NOTNULL(operation));
53
1
  return true;
54
1
}
55
56
1
void ApplyIntentsTask::Run() {
57
0
  VLOG_WITH_PREFIX(4) << __func__;
58
59
1
  for (;;) {
60
1
    AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms);
61
62
1
    if (running_transaction_context_.Closing()) {
63
0
      VLOG_WITH_PREFIX(1) << "Abort because of shutdown";
64
0
      break;
65
0
    }
66
67
1
    const auto pause_and_skip_ms = ANNOTATE_UNPROTECTED_READ(
68
1
        FLAGS_TEST_pause_and_skip_apply_intents_task_loop_ms);
69
1
    if (PREDICT_FALSE(pause_and_skip_ms > 0)) {
70
0
      SleepFor(pause_and_skip_ms * 1ms);
71
0
      continue;
72
0
    }
73
74
1
    auto result = applier_.ApplyIntents(apply_data_);
75
1
    if (!result.ok()) {
76
0
      LOG_WITH_PREFIX(DFATAL)
77
0
          << "Failed to apply intents " << apply_data_.ToString() << ": " << result.status();
78
0
      break;
79
0
    }
80
81
1
    transaction_->SetApplyData(*result);
82
0
    VLOG_WITH_PREFIX(2) << "Performed next apply step: " << result->ToString();
83
84
1
    if (!result->active()) {
85
1
      break;
86
1
    }
87
1
  }
88
1
}
89
90
1
void ApplyIntentsTask::Done(const Status& status) {
91
1
  WARN_NOT_OK(status, "Apply intents task failed");
92
1
  operation_.Reset();
93
1
  transaction_.reset();
94
1
}
95
96
0
std::string ApplyIntentsTask::LogPrefix() const {
97
0
  return transaction_ ? transaction_->LogPrefix() : running_transaction_context_.LogPrefix();
98
0
}
99
100
} // namespace tablet
101
} // namespace yb