YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/apply_intents_task.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/apply_intents_task.h"
15
16
#include "yb/docdb/docdb.h"
17
18
#include "yb/gutil/dynamic_annotations.h"
19
#include "yb/tablet/running_transaction.h"
20
21
#include "yb/util/flag_tags.h"
22
#include "yb/util/logging.h"
23
#include "yb/util/monotime.h"
24
#include "yb/util/status_log.h"
25
26
using namespace std::literals;
27
28
DEFINE_int64(apply_intents_task_injected_delay_ms, 0,
29
             "Inject such delay before applying intents for large transactions. "
30
             "Could be used to throttle the apply speed.");
31
32
DEFINE_test_flag(int32, pause_and_skip_apply_intents_task_loop_ms, 0,
33
                 "If set to a value greater than zero, each loop of the apply intents task will "
34
                 "sleep for the specified duration and continue without doing apply work.");
35
36
namespace yb {
37
namespace tablet {
38
39
ApplyIntentsTask::ApplyIntentsTask(TransactionIntentApplier* applier,
40
                                   RunningTransactionContext* running_transaction_context,
41
                                   const TransactionApplyData* apply_data)
42
    : applier_(*applier), running_transaction_context_(*running_transaction_context),
43
1.67M
      apply_data_(*apply_data) {}
44
45
69
bool ApplyIntentsTask::Prepare(RunningTransactionPtr transaction, ScopedRWOperation* operation) {
46
69
  bool expected = false;
47
69
  if (!used_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
48
0
    return false;
49
0
  }
50
51
69
  transaction_ = std::move(transaction);
52
69
  operation_ = std::move(*DCHECK_NOTNULL(operation));
53
69
  return true;
54
69
}
55
56
69
void ApplyIntentsTask::Run() {
57
69
  
VLOG_WITH_PREFIX0
(4) << __func__0
;
58
59
183
  for (;;) {
60
183
    AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms);
61
62
183
    if (running_transaction_context_.Closing()) {
63
9
      
VLOG_WITH_PREFIX0
(1) << "Abort because of shutdown"0
;
64
9
      break;
65
9
    }
66
67
174
    const auto pause_and_skip_ms = ANNOTATE_UNPROTECTED_READ(
68
174
        FLAGS_TEST_pause_and_skip_apply_intents_task_loop_ms);
69
174
    if (PREDICT_FALSE(pause_and_skip_ms > 0)) {
70
0
      SleepFor(pause_and_skip_ms * 1ms);
71
0
      continue;
72
0
    }
73
74
174
    auto result = applier_.ApplyIntents(apply_data_);
75
174
    if (!result.ok()) {
76
0
      LOG_WITH_PREFIX(DFATAL)
77
0
          << "Failed to apply intents " << apply_data_.ToString() << ": " << result.status();
78
0
      break;
79
0
    }
80
81
174
    transaction_->SetApplyData(*result);
82
174
    
VLOG_WITH_PREFIX0
(2) << "Performed next apply step: " << result->ToString()0
;
83
84
174
    if (!result->active()) {
85
60
      break;
86
60
    }
87
174
  }
88
69
}
89
90
69
void ApplyIntentsTask::Done(const Status& status) {
91
69
  WARN_NOT_OK(status, "Apply intents task failed");
92
69
  operation_.Reset();
93
69
  transaction_.reset();
94
69
}
95
96
0
std::string ApplyIntentsTask::LogPrefix() const {
97
0
  return transaction_ ? transaction_->LogPrefix() : running_transaction_context_.LogPrefix();
98
0
}
99
100
} // namespace tablet
101
} // namespace yb