YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/transaction_status_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/docdb/transaction_status_cache.h"
14
15
#include <future>
16
17
#include <boost/optional/optional.hpp>
18
19
#include "yb/common/hybrid_time.h"
20
#include "yb/docdb/transaction_dump.h"
21
#include "yb/util/backoff_waiter.h"
22
#include "yb/util/result.h"
23
#include "yb/util/status_format.h"
24
#include "yb/util/tsan_util.h"
25
26
using namespace std::literals;
27
28
DEFINE_bool(TEST_transaction_allow_rerequest_status, true,
29
            "Allow rerequest transaction status when TryAgain is received.");
30
31
namespace yb {
32
namespace docdb {
33
34
namespace {
35
36
0
CHECKED_STATUS StatusWaitTimedOut(const TransactionId& transaction_id) {
37
0
  return STATUS_FORMAT(
38
0
      TimedOut, "Timed out waiting for transaction status: $0", transaction_id);
39
0
}
40
41
YB_DEFINE_ENUM(CommitTimeSource,
42
               ((kLocalBefore, 0)) // Transaction was committed locally before the remote check.
43
               ((kNoMetadata, 1)) // Transaction metadata not present
44
               ((kLocalAfter, 2)) // Transaction was committed locally after the remote check.
45
               ((kRemoteAborted, 3)) // Coordinator responded that transaction was aborted.
46
               ((kRemoteCommitted, 4)) // Coordinator responded that transaction was committed.
47
               ((kRemotePending, 5))); // Coordinator responded that transaction is pending.
48
49
} // namespace
50
51
struct TransactionStatusCache::GetCommitDataResult {
52
  CommitMetadata commit_data;
53
  CommitTimeSource source = CommitTimeSource();
54
  HybridTime status_time;
55
  HybridTime safe_time;
56
};
57
58
// For locally committed transactions returns commit time if committed at specified time or
59
// HybridTime::kMin otherwise. For other transactions returns boost::none.
60
boost::optional<CommitMetadata> TransactionStatusCache::GetLocalCommitData(
61
67.5k
    const TransactionId& transaction_id) {
62
67.5k
  auto local_commit_data_opt = txn_context_opt_.txn_status_manager->LocalCommitData(transaction_id);
63
67.5k
  if (local_commit_data_opt == boost::none || !local_commit_data_opt->commit_ht.is_valid()) {
64
44.7k
    return boost::none;
65
44.7k
  }
66
67
22.7k
  if (local_commit_data_opt->commit_ht > read_time_.global_limit) {
68
18
    local_commit_data_opt->commit_ht = HybridTime::kMin;
69
18
  }
70
71
22.7k
  return local_commit_data_opt;
72
22.7k
}
73
74
117k
Result<CommitMetadata> TransactionStatusCache::GetCommitData(const TransactionId& transaction_id) {
75
117k
  auto it = cache_.find(transaction_id);
76
117k
  if (it != cache_.end()) {
77
60.5k
    return it->second;
78
60.5k
  }
79
80
57.2k
  auto result = VERIFY_RESULT(DoGetCommitData(transaction_id));
81
57.2k
  YB_TRANSACTION_DUMP(
82
57.2k
      Status, txn_context_opt_ ? txn_context_opt_.transaction_id : TransactionId::Nil(),
83
57.2k
      read_time_, transaction_id, result.commit_data.commit_ht, static_cast<uint8_t>(result.source),
84
57.2k
      result.status_time, result.safe_time, result.commit_data.aborted_subtxn_set.ToString());
85
57.2k
  cache_.emplace(transaction_id, result.commit_data);
86
57.2k
  return result.commit_data;
87
57.2k
}
88
89
Result<TransactionStatusCache::GetCommitDataResult> TransactionStatusCache::DoGetCommitData(
90
57.1k
    const TransactionId& transaction_id) {
91
57.1k
  auto local_commit_data_opt = GetLocalCommitData(transaction_id);
92
57.1k
  if (local_commit_data_opt != boost::none) {
93
22.7k
    return GetCommitDataResult {
94
22.7k
      .commit_data = std::move(*local_commit_data_opt),
95
22.7k
      .source = CommitTimeSource::kLocalBefore,
96
22.7k
    };
97
22.7k
  }
98
99
  // Since TransactionStatusResult does not have default ctor we should init it somehow.
100
34.4k
  TransactionStatusResult txn_status(TransactionStatus::ABORTED, HybridTime());
101
34.4k
  const auto kMaxWait = 50ms * kTimeMultiplier;
102
34.4k
  const auto kRequestTimeout = kMaxWait;
103
34.4k
  bool TEST_retry_allowed = FLAGS_TEST_transaction_allow_rerequest_status;
104
34.4k
  CoarseBackoffWaiter waiter(deadline_, kMaxWait);
105
34.4k
  static const std::string kRequestReason = "get commit time"s;
106
34.8k
  for(;;) {
107
34.8k
    auto txn_status_promise = std::make_shared<std::promise<Result<TransactionStatusResult>>>();
108
34.8k
    auto future = txn_status_promise->get_future();
109
34.8k
    auto callback = [txn_status_promise](Result<TransactionStatusResult> result) {
110
34.8k
      txn_status_promise->set_value(std::move(result));
111
34.8k
    };
112
34.8k
    txn_context_opt_.txn_status_manager->RequestStatusAt(
113
34.8k
        {&transaction_id, read_time_.read, read_time_.global_limit, read_time_.serial_no,
114
34.8k
              &kRequestReason,
115
34.8k
              TransactionLoadFlags{TransactionLoadFlag::kCleanup},
116
34.8k
              callback});
117
34.8k
    auto wait_start = CoarseMonoClock::now();
118
34.8k
    auto future_status = future.wait_until(
119
34.8k
        TEST_retry_allowed ? wait_start + kRequestTimeout : deadline_);
120
34.8k
    if (future_status == std::future_status::ready) {
121
34.6k
      auto txn_status_result = future.get();
122
34.6k
      if (txn_status_result.ok()) {
123
33.2k
        txn_status = *txn_status_result;
124
33.2k
        break;
125
33.2k
      }
126
1.39k
      if (txn_status_result.status().IsNotFound()) {
127
        // We have intent w/o metadata, that means that transaction was already cleaned up.
128
1.25k
        LOG(WARNING) << "Intent for transaction w/o metadata: " << transaction_id;
129
1.25k
        return GetCommitDataResult {
130
1.25k
          .commit_data = CommitMetadata {HybridTime::kMin},
131
1.25k
          .source = CommitTimeSource::kNoMetadata,
132
1.25k
        };
133
1.25k
      }
134
140
      LOG(WARNING)
135
140
          << "Failed to request transaction " << transaction_id << " status: "
136
140
          <<  txn_status_result.status();
137
140
      if (!txn_status_result.status().IsTryAgain()) {
138
0
        return std::move(txn_status_result.status());
139
0
      }
140
140
      if (!waiter.Wait()) {
141
0
        return StatusWaitTimedOut(transaction_id);
142
0
      }
143
179
    } else {
144
179
      LOG(INFO) << "TXN: " << transaction_id << ": Timed out waiting txn status, waited: "
145
179
                << MonoDelta(CoarseMonoClock::now() - wait_start)
146
179
                << ", future status: " << to_underlying(future_status)
147
179
                << ", left to deadline: " << MonoDelta(deadline_ - CoarseMonoClock::now());
148
179
      if (waiter.ExpiredNow()) {
149
0
        return StatusWaitTimedOut(transaction_id);
150
0
      }
151
179
      waiter.NextAttempt();
152
179
    }
153
319
    DCHECK(TEST_retry_allowed);
154
319
  }
155
18.4E
  VLOG(4) << "Transaction_id " << transaction_id << " at " << read_time_
156
18.4E
          << ": status: " << TransactionStatus_Name(txn_status.status)
157
18.4E
          << ", status_time: " << txn_status.status_time;
158
  // There could be case when transaction was committed and applied between previous call to
159
  // GetLocalCommitTime, in this case coordinator does not know transaction and will respond
160
  // with ABORTED status. So we recheck whether it was committed locally.
161
33.1k
  if (txn_status.status == TransactionStatus::ABORTED) {
162
10.2k
    HybridTime safe_time;
163
10.2k
    if (txn_status.status_time && txn_status.status_time != HybridTime::kMax) {
164
      // It is possible that this node not yet received APPLY, so it is possible that
165
      // we would not have local commit time even for committed transaction.
166
      // Waiting for safe time to be sure that we APPLY was processed if present.
167
      // See https://github.com/YugaByte/yugabyte-db/issues/7729 for details.
168
10.2k
      safe_time = VERIFY_RESULT(txn_context_opt_.txn_status_manager->WaitForSafeTime(
169
10.2k
          txn_status.status_time, deadline_));
170
10.2k
    }
171
10.2k
    local_commit_data_opt = GetLocalCommitData(transaction_id);
172
10.2k
    if (local_commit_data_opt != boost::none) {
173
0
      return GetCommitDataResult {
174
0
        .commit_data = std::move(*local_commit_data_opt),
175
0
        .source = CommitTimeSource::kLocalAfter,
176
0
        .status_time = txn_status.status_time,
177
0
        .safe_time = safe_time,
178
0
      };
179
0
    }
180
181
10.2k
    return GetCommitDataResult {
182
10.2k
      .commit_data = CommitMetadata {HybridTime::kMin},
183
10.2k
      .source = CommitTimeSource::kRemoteAborted,
184
10.2k
      .status_time = txn_status.status_time,
185
10.2k
      .safe_time = safe_time,
186
10.2k
    };
187
10.2k
  }
188
189
22.9k
  if (txn_status.status == TransactionStatus::COMMITTED) {
190
5.91k
    return GetCommitDataResult {
191
5.91k
      .commit_data = CommitMetadata {txn_status.status_time, txn_status.aborted_subtxn_set},
192
5.91k
      .source = CommitTimeSource::kRemoteCommitted,
193
5.91k
    };
194
5.91k
  }
195
196
17.0k
  return GetCommitDataResult {
197
    // TODO(savepoints) - surface aborted subtxn data for pending transactions.
198
17.0k
    .commit_data = CommitMetadata {HybridTime::kMin},
199
17.0k
    .source = CommitTimeSource::kRemotePending,
200
17.0k
  };
201
17.0k
}
202
203
} // namespace docdb
204
} // namespace yb