/Users/deen/code/yugabyte-db/src/yb/docdb/transaction_status_cache.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include "yb/docdb/transaction_status_cache.h" |
14 | | |
15 | | #include <future> |
16 | | |
17 | | #include <boost/optional/optional.hpp> |
18 | | |
19 | | #include "yb/common/hybrid_time.h" |
20 | | #include "yb/docdb/transaction_dump.h" |
21 | | #include "yb/util/backoff_waiter.h" |
22 | | #include "yb/util/result.h" |
23 | | #include "yb/util/status_format.h" |
24 | | #include "yb/util/tsan_util.h" |
25 | | |
26 | | using namespace std::literals; |
27 | | |
28 | | DEFINE_bool(TEST_transaction_allow_rerequest_status, true, |
29 | | "Allow rerequest transaction status when TryAgain is received."); |
30 | | |
31 | | namespace yb { |
32 | | namespace docdb { |
33 | | |
34 | | namespace { |
35 | | |
36 | 0 | CHECKED_STATUS StatusWaitTimedOut(const TransactionId& transaction_id) { |
37 | 0 | return STATUS_FORMAT( |
38 | 0 | TimedOut, "Timed out waiting for transaction status: $0", transaction_id); |
39 | 0 | } |
40 | | |
41 | | YB_DEFINE_ENUM(CommitTimeSource, |
42 | | ((kLocalBefore, 0)) // Transaction was committed locally before the remote check. |
43 | | ((kNoMetadata, 1)) // Transaction metadata not present |
44 | | ((kLocalAfter, 2)) // Transaction was committed locally after the remote check. |
45 | | ((kRemoteAborted, 3)) // Coordinator responded that transaction was aborted. |
46 | | ((kRemoteCommitted, 4)) // Coordinator responded that transaction was committed. |
47 | | ((kRemotePending, 5))); // Coordinator responded that transaction is pending. |
48 | | |
49 | | } // namespace |
50 | | |
51 | | struct TransactionStatusCache::GetCommitDataResult { |
52 | | CommitMetadata commit_data; |
53 | | CommitTimeSource source = CommitTimeSource(); |
54 | | HybridTime status_time; |
55 | | HybridTime safe_time; |
56 | | }; |
57 | | |
58 | | // For locally committed transactions returns commit time if committed at specified time or |
59 | | // HybridTime::kMin otherwise. For other transactions returns boost::none. |
60 | | boost::optional<CommitMetadata> TransactionStatusCache::GetLocalCommitData( |
61 | 67.5k | const TransactionId& transaction_id) { |
62 | 67.5k | auto local_commit_data_opt = txn_context_opt_.txn_status_manager->LocalCommitData(transaction_id); |
63 | 67.5k | if (local_commit_data_opt == boost::none || !local_commit_data_opt->commit_ht.is_valid()) { |
64 | 44.7k | return boost::none; |
65 | 44.7k | } |
66 | | |
67 | 22.7k | if (local_commit_data_opt->commit_ht > read_time_.global_limit) { |
68 | 18 | local_commit_data_opt->commit_ht = HybridTime::kMin; |
69 | 18 | } |
70 | | |
71 | 22.7k | return local_commit_data_opt; |
72 | 22.7k | } |
73 | | |
74 | 117k | Result<CommitMetadata> TransactionStatusCache::GetCommitData(const TransactionId& transaction_id) { |
75 | 117k | auto it = cache_.find(transaction_id); |
76 | 117k | if (it != cache_.end()) { |
77 | 60.5k | return it->second; |
78 | 60.5k | } |
79 | | |
80 | 57.2k | auto result = VERIFY_RESULT(DoGetCommitData(transaction_id)); |
81 | 57.2k | YB_TRANSACTION_DUMP( |
82 | 57.2k | Status, txn_context_opt_ ? txn_context_opt_.transaction_id : TransactionId::Nil(), |
83 | 57.2k | read_time_, transaction_id, result.commit_data.commit_ht, static_cast<uint8_t>(result.source), |
84 | 57.2k | result.status_time, result.safe_time, result.commit_data.aborted_subtxn_set.ToString()); |
85 | 57.2k | cache_.emplace(transaction_id, result.commit_data); |
86 | 57.2k | return result.commit_data; |
87 | 57.2k | } |
88 | | |
89 | | Result<TransactionStatusCache::GetCommitDataResult> TransactionStatusCache::DoGetCommitData( |
90 | 57.1k | const TransactionId& transaction_id) { |
91 | 57.1k | auto local_commit_data_opt = GetLocalCommitData(transaction_id); |
92 | 57.1k | if (local_commit_data_opt != boost::none) { |
93 | 22.7k | return GetCommitDataResult { |
94 | 22.7k | .commit_data = std::move(*local_commit_data_opt), |
95 | 22.7k | .source = CommitTimeSource::kLocalBefore, |
96 | 22.7k | }; |
97 | 22.7k | } |
98 | | |
99 | | // Since TransactionStatusResult does not have default ctor we should init it somehow. |
100 | 34.4k | TransactionStatusResult txn_status(TransactionStatus::ABORTED, HybridTime()); |
101 | 34.4k | const auto kMaxWait = 50ms * kTimeMultiplier; |
102 | 34.4k | const auto kRequestTimeout = kMaxWait; |
103 | 34.4k | bool TEST_retry_allowed = FLAGS_TEST_transaction_allow_rerequest_status; |
104 | 34.4k | CoarseBackoffWaiter waiter(deadline_, kMaxWait); |
105 | 34.4k | static const std::string kRequestReason = "get commit time"s; |
106 | 34.8k | for(;;) { |
107 | 34.8k | auto txn_status_promise = std::make_shared<std::promise<Result<TransactionStatusResult>>>(); |
108 | 34.8k | auto future = txn_status_promise->get_future(); |
109 | 34.8k | auto callback = [txn_status_promise](Result<TransactionStatusResult> result) { |
110 | 34.8k | txn_status_promise->set_value(std::move(result)); |
111 | 34.8k | }; |
112 | 34.8k | txn_context_opt_.txn_status_manager->RequestStatusAt( |
113 | 34.8k | {&transaction_id, read_time_.read, read_time_.global_limit, read_time_.serial_no, |
114 | 34.8k | &kRequestReason, |
115 | 34.8k | TransactionLoadFlags{TransactionLoadFlag::kCleanup}, |
116 | 34.8k | callback}); |
117 | 34.8k | auto wait_start = CoarseMonoClock::now(); |
118 | 34.8k | auto future_status = future.wait_until( |
119 | 34.8k | TEST_retry_allowed ? wait_start + kRequestTimeout : deadline_); |
120 | 34.8k | if (future_status == std::future_status::ready) { |
121 | 34.6k | auto txn_status_result = future.get(); |
122 | 34.6k | if (txn_status_result.ok()) { |
123 | 33.2k | txn_status = *txn_status_result; |
124 | 33.2k | break; |
125 | 33.2k | } |
126 | 1.39k | if (txn_status_result.status().IsNotFound()) { |
127 | | // We have intent w/o metadata, that means that transaction was already cleaned up. |
128 | 1.25k | LOG(WARNING) << "Intent for transaction w/o metadata: " << transaction_id; |
129 | 1.25k | return GetCommitDataResult { |
130 | 1.25k | .commit_data = CommitMetadata {HybridTime::kMin}, |
131 | 1.25k | .source = CommitTimeSource::kNoMetadata, |
132 | 1.25k | }; |
133 | 1.25k | } |
134 | 140 | LOG(WARNING) |
135 | 140 | << "Failed to request transaction " << transaction_id << " status: " |
136 | 140 | << txn_status_result.status(); |
137 | 140 | if (!txn_status_result.status().IsTryAgain()) { |
138 | 0 | return std::move(txn_status_result.status()); |
139 | 0 | } |
140 | 140 | if (!waiter.Wait()) { |
141 | 0 | return StatusWaitTimedOut(transaction_id); |
142 | 0 | } |
143 | 179 | } else { |
144 | 179 | LOG(INFO) << "TXN: " << transaction_id << ": Timed out waiting txn status, waited: " |
145 | 179 | << MonoDelta(CoarseMonoClock::now() - wait_start) |
146 | 179 | << ", future status: " << to_underlying(future_status) |
147 | 179 | << ", left to deadline: " << MonoDelta(deadline_ - CoarseMonoClock::now()); |
148 | 179 | if (waiter.ExpiredNow()) { |
149 | 0 | return StatusWaitTimedOut(transaction_id); |
150 | 0 | } |
151 | 179 | waiter.NextAttempt(); |
152 | 179 | } |
153 | 319 | DCHECK(TEST_retry_allowed); |
154 | 319 | } |
155 | 18.4E | VLOG(4) << "Transaction_id " << transaction_id << " at " << read_time_ |
156 | 18.4E | << ": status: " << TransactionStatus_Name(txn_status.status) |
157 | 18.4E | << ", status_time: " << txn_status.status_time; |
158 | | // There could be case when transaction was committed and applied between previous call to |
159 | | // GetLocalCommitTime, in this case coordinator does not know transaction and will respond |
160 | | // with ABORTED status. So we recheck whether it was committed locally. |
161 | 33.1k | if (txn_status.status == TransactionStatus::ABORTED) { |
162 | 10.2k | HybridTime safe_time; |
163 | 10.2k | if (txn_status.status_time && txn_status.status_time != HybridTime::kMax) { |
164 | | // It is possible that this node not yet received APPLY, so it is possible that |
165 | | // we would not have local commit time even for committed transaction. |
166 | | // Waiting for safe time to be sure that we APPLY was processed if present. |
167 | | // See https://github.com/YugaByte/yugabyte-db/issues/7729 for details. |
168 | 10.2k | safe_time = VERIFY_RESULT(txn_context_opt_.txn_status_manager->WaitForSafeTime( |
169 | 10.2k | txn_status.status_time, deadline_)); |
170 | 10.2k | } |
171 | 10.2k | local_commit_data_opt = GetLocalCommitData(transaction_id); |
172 | 10.2k | if (local_commit_data_opt != boost::none) { |
173 | 0 | return GetCommitDataResult { |
174 | 0 | .commit_data = std::move(*local_commit_data_opt), |
175 | 0 | .source = CommitTimeSource::kLocalAfter, |
176 | 0 | .status_time = txn_status.status_time, |
177 | 0 | .safe_time = safe_time, |
178 | 0 | }; |
179 | 0 | } |
180 | | |
181 | 10.2k | return GetCommitDataResult { |
182 | 10.2k | .commit_data = CommitMetadata {HybridTime::kMin}, |
183 | 10.2k | .source = CommitTimeSource::kRemoteAborted, |
184 | 10.2k | .status_time = txn_status.status_time, |
185 | 10.2k | .safe_time = safe_time, |
186 | 10.2k | }; |
187 | 10.2k | } |
188 | | |
189 | 22.9k | if (txn_status.status == TransactionStatus::COMMITTED) { |
190 | 5.91k | return GetCommitDataResult { |
191 | 5.91k | .commit_data = CommitMetadata {txn_status.status_time, txn_status.aborted_subtxn_set}, |
192 | 5.91k | .source = CommitTimeSource::kRemoteCommitted, |
193 | 5.91k | }; |
194 | 5.91k | } |
195 | | |
196 | 17.0k | return GetCommitDataResult { |
197 | | // TODO(savepoints) - surface aborted subtxn data for pending transactions. |
198 | 17.0k | .commit_data = CommitMetadata {HybridTime::kMin}, |
199 | 17.0k | .source = CommitTimeSource::kRemotePending, |
200 | 17.0k | }; |
201 | 17.0k | } |
202 | | |
203 | | } // namespace docdb |
204 | | } // namespace yb |