/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_status_resolver.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/transaction_status_resolver.h" |
15 | | |
16 | | #include "yb/client/transaction_rpc.h" |
17 | | |
18 | | #include "yb/common/wire_protocol.h" |
19 | | |
20 | | #include "yb/rpc/rpc.h" |
21 | | |
22 | | #include "yb/tablet/transaction_participant_context.h" |
23 | | |
24 | | #include "yb/tserver/tserver_service.pb.h" |
25 | | |
26 | | #include "yb/util/atomic.h" |
27 | | #include "yb/util/countdown_latch.h" |
28 | | #include "yb/util/flag_tags.h" |
29 | | #include "yb/util/logging.h" |
30 | | #include "yb/util/result.h" |
31 | | #include "yb/util/status_format.h" |
32 | | |
33 | | DEFINE_test_flag(int32, inject_status_resolver_delay_ms, 0, |
34 | | "Inject delay before launching transaction status resolver RPC."); |
35 | | |
36 | | DEFINE_test_flag(int32, inject_status_resolver_complete_delay_ms, 0, |
37 | | "Inject delay before counting down latch in transaction status resolver " |
38 | | "complete."); |
39 | | |
40 | | using namespace std::literals; |
41 | | using namespace std::placeholders; |
42 | | |
43 | | namespace yb { |
44 | | namespace tablet { |
45 | | |
46 | | class TransactionStatusResolver::Impl { |
47 | | public: |
48 | | Impl(TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs, |
49 | | int max_transactions_per_request, TransactionStatusResolverCallback callback) |
50 | | : participant_context_(*participant_context), rpcs_(*rpcs), |
51 | | max_transactions_per_request_(max_transactions_per_request), callback_(std::move(callback)), |
52 | 1.69k | log_prefix_(participant_context->LogPrefix()), handle_(rpcs_.InvalidHandle()) {} |
53 | | |
54 | 1.65k | ~Impl() { |
55 | 1.65k | LOG_IF_WITH_PREFIX22 (DFATAL, !closing_.load(std::memory_order_acquire)) |
56 | 22 | << "Destroy resolver without Shutdown"; |
57 | 1.65k | } |
58 | | |
59 | 1.68k | void Shutdown() { |
60 | 1.68k | closing_.store(true, std::memory_order_release); |
61 | 1.68k | for (;;) { |
62 | 1.66k | if (run_latch_.WaitFor(10s)1.65k ) { |
63 | 1.66k | break; |
64 | 1.66k | } |
65 | 18.4E | LOG_WITH_PREFIX(DFATAL) << "Long wait for transaction status resolver to shutdown"; |
66 | 18.4E | } |
67 | 1.68k | } |
68 | | |
69 | 1.66k | void Start(CoarseTimePoint deadline) { |
70 | 1.66k | VLOG_WITH_PREFIX4 (2) << "Start, queues: " << queues_.size()4 ; |
71 | | |
72 | 1.66k | deadline_ = deadline; |
73 | 1.66k | run_latch_.Reset(1); |
74 | 1.66k | Execute(); |
75 | 1.66k | } |
76 | | |
77 | 15 | std::future<Status> ResultFuture() { |
78 | 15 | return result_promise_.get_future(); |
79 | 15 | } |
80 | | |
81 | 3.41k | bool Running() { |
82 | 3.41k | return run_latch_.count() != 0; |
83 | 3.41k | } |
84 | | |
85 | 1.73k | void Add(const TabletId& status_tablet, const TransactionId& transaction_id) { |
86 | 1.73k | LOG_IF(DFATAL, run_latch_.count()) << "Add while running"33 ; |
87 | 1.73k | queues_[status_tablet].push_back(transaction_id); |
88 | 1.73k | } |
89 | | |
90 | | private: |
91 | 3.41k | void Execute() { |
92 | 3.41k | LOG_IF(DFATAL, !run_latch_.count()) << "Execute while running is false"26 ; |
93 | | |
94 | 3.41k | if (CoarseMonoClock::now() >= deadline_) { |
95 | 0 | Complete(STATUS(TimedOut, "Timed out to resolve transaction statuses")); |
96 | 0 | return; |
97 | 0 | } |
98 | 3.41k | if (closing_.load(std::memory_order_acquire)) { |
99 | 0 | Complete(STATUS(Aborted, "Aborted because of shutdown")); |
100 | 0 | return; |
101 | 0 | } |
102 | 3.41k | if (queues_.empty() || max_transactions_per_request_ <= 01.67k ) { |
103 | 1.71k | Complete(Status::OK()); |
104 | 1.71k | return; |
105 | 1.71k | } |
106 | | |
107 | | // We access queues_ only while adding transaction and after that while resolving |
108 | | // transaction statuses, which is NOT concurrent. |
109 | | // So we could avoid doing synchronization here. |
110 | 1.69k | auto& tablet_id_and_queue = *queues_.begin(); |
111 | 1.69k | tserver::GetTransactionStatusRequestPB req; |
112 | 1.69k | req.set_tablet_id(tablet_id_and_queue.first); |
113 | 1.69k | req.set_propagated_hybrid_time(participant_context_.Now().ToUint64()); |
114 | 1.69k | const auto& tablet_queue = tablet_id_and_queue.second; |
115 | 1.69k | auto request_size = std::min<size_t>(max_transactions_per_request_, tablet_queue.size()); |
116 | 3.45k | for (size_t i = 0; i != request_size; ++i1.75k ) { |
117 | 1.75k | const auto& txn_id = tablet_queue[i]; |
118 | 1.75k | VLOG_WITH_PREFIX1 (4) << "Checking txn status: " << txn_id1 ; |
119 | 1.75k | req.add_transaction_id()->assign(pointer_cast<const char*>(txn_id.data()), txn_id.size()); |
120 | 1.75k | } |
121 | | |
122 | 1.69k | AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_delay_ms); |
123 | | |
124 | 1.69k | auto client = participant_context_.client_future().get(); |
125 | 1.72k | if (!client1.69k || !rpcs_.RegisterAndStart( |
126 | 1.72k | client::GetTransactionStatus( |
127 | 1.72k | std::min(deadline_, TransactionRpcDeadline()), |
128 | 1.72k | nullptr /* tablet */, |
129 | 1.72k | client, |
130 | 1.72k | &req, |
131 | 1.72k | std::bind(&Impl::StatusReceived, this, _1, _2, request_size)), |
132 | 1.72k | &handle_)) { |
133 | 0 | Complete(STATUS(Aborted, "Aborted because cannot start RPC")); |
134 | 0 | } |
135 | 1.69k | } |
136 | | |
137 | 7 | const std::string& LogPrefix() const { |
138 | 7 | return log_prefix_; |
139 | 7 | } |
140 | | |
141 | | void StatusReceived(Status status, |
142 | | const tserver::GetTransactionStatusResponsePB& response, |
143 | 1.71k | int request_size) { |
144 | 18.4E | VLOG_WITH_PREFIX(2) << "Received statuses: " << status << ", " << response.ShortDebugString(); |
145 | | |
146 | 1.71k | rpcs_.Unregister(&handle_); |
147 | | |
148 | 1.71k | if (status.ok() && response.has_error()1.71k ) { |
149 | 0 | status = StatusFromPB(response.error().status()); |
150 | 0 | } |
151 | | |
152 | 1.71k | if (!status.ok()) { |
153 | 7 | LOG_WITH_PREFIX(WARNING) << "Failed to request transaction statuses: " << status; |
154 | 7 | if (status.IsAborted()) { |
155 | 0 | Complete(status); |
156 | 7 | } else { |
157 | 7 | Execute(); |
158 | 7 | } |
159 | 7 | return; |
160 | 7 | } |
161 | | |
162 | 1.70k | if (response.has_propagated_hybrid_time()) { |
163 | 1.70k | participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); |
164 | 1.70k | } |
165 | | |
166 | 1.70k | if ((response.status().size() != 1 && response.status().size() != request_size16 ) || |
167 | 1.70k | (1.70k response.aborted_subtxn_set().size() != 01.70k && // Old node may not populate these. |
168 | 1.70k | response.aborted_subtxn_set().size() != request_size)) { |
169 | | // Node with old software version would always return 1 status. |
170 | 0 | LOG_WITH_PREFIX(DFATAL) |
171 | 0 | << "Bad response size, expected " << request_size << " entries, but found: " |
172 | 0 | << response.ShortDebugString() << ", queue: " << AsString(queues_); |
173 | 0 | Execute(); |
174 | 0 | return; |
175 | 0 | } |
176 | | |
177 | 1.70k | status_infos_.clear(); |
178 | 1.70k | status_infos_.resize(response.status().size()); |
179 | 1.70k | auto it = queues_.begin(); |
180 | 1.70k | auto& queue = it->second; |
181 | 3.43k | for (int i = 0; i != response.status().size(); ++i1.72k ) { |
182 | 1.73k | auto& status_info = status_infos_[i]; |
183 | 1.73k | status_info.transaction_id = queue.front(); |
184 | 1.73k | status_info.status = response.status(i); |
185 | | |
186 | 1.73k | if (PREDICT_FALSE(response.aborted_subtxn_set().empty())) { |
187 | 0 | YB_LOG_EVERY_N(WARNING, 1) |
188 | 0 | << "Empty aborted_subtxn_set in transaction status response. " |
189 | 0 | << "This should only happen when nodes are on different versions, e.g. during upgrade."; |
190 | 1.73k | } else { |
191 | 1.73k | auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB( |
192 | 1.73k | response.aborted_subtxn_set(i).set()); |
193 | 1.73k | if (!aborted_subtxn_set_or_status.ok()) { |
194 | 0 | Complete(STATUS_FORMAT( |
195 | 0 | IllegalState, "Cannot deserialize AbortedSubTransactionSet: $0", |
196 | 0 | response.aborted_subtxn_set(i).DebugString())); |
197 | 0 | return; |
198 | 0 | } |
199 | 1.73k | status_info.aborted_subtxn_set = aborted_subtxn_set_or_status.get(); |
200 | 1.73k | } |
201 | | |
202 | 1.73k | if (i < response.status_hybrid_time().size()) { |
203 | 1.72k | status_info.status_ht = HybridTime(response.status_hybrid_time(i)); |
204 | | // Could happen only when coordinator has an old version. |
205 | 1.72k | } else if (2 status_info.status == TransactionStatus::ABORTED2 ) { |
206 | 0 | status_info.status_ht = HybridTime::kMax; |
207 | 2 | } else { |
208 | 2 | Complete(STATUS_FORMAT( |
209 | 2 | IllegalState, "Missing status hybrid time for transaction status: $0", |
210 | 2 | TransactionStatus_Name(status_info.status))); |
211 | 2 | return; |
212 | 2 | } |
213 | 1.72k | status_info.coordinator_safe_time = i < response.coordinator_safe_time().size() |
214 | 1.72k | ? HybridTime::FromPB(response.coordinator_safe_time(i))79 : HybridTime()1.65k ; |
215 | 18.4E | VLOG_WITH_PREFIX(4) << "Status: " << status_info.ToString(); |
216 | 1.72k | queue.pop_front(); |
217 | 1.72k | } |
218 | 1.70k | if (queue.empty()) { |
219 | 1.70k | VLOG_WITH_PREFIX0 (2) << "Processed queue for: " << it->first0 ; |
220 | 1.70k | queues_.erase(it); |
221 | 1.70k | } |
222 | | |
223 | 1.70k | callback_(status_infos_); |
224 | | |
225 | 1.70k | Execute(); |
226 | 1.70k | } |
227 | | |
228 | 1.70k | void Complete(const Status& status) { |
229 | 18.4E | VLOG_WITH_PREFIX(2) << "Complete: " << status; |
230 | 1.70k | result_promise_.set_value(status); |
231 | 1.70k | AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_complete_delay_ms); |
232 | 1.70k | run_latch_.CountDown(); |
233 | 1.70k | } |
234 | | |
235 | | TransactionParticipantContext& participant_context_; |
236 | | rpc::Rpcs& rpcs_; |
237 | | const int max_transactions_per_request_; |
238 | | TransactionStatusResolverCallback callback_; |
239 | | |
240 | | const std::string log_prefix_; |
241 | | rpc::Rpcs::Handle handle_; |
242 | | |
243 | | std::atomic<bool> closing_{false}; |
244 | | CountDownLatch run_latch_{0}; |
245 | | CoarseTimePoint deadline_; |
246 | | std::unordered_map<TabletId, std::deque<TransactionId>> queues_; |
247 | | std::vector<TransactionStatusInfo> status_infos_; |
248 | | std::promise<Status> result_promise_; |
249 | | }; |
250 | | |
251 | | TransactionStatusResolver::TransactionStatusResolver( |
252 | | TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs, |
253 | | int max_transactions_per_request, TransactionStatusResolverCallback callback) |
254 | | : impl_(new Impl( |
255 | 1.69k | participant_context, rpcs, max_transactions_per_request, std::move(callback))) { |
256 | 1.69k | } |
257 | | |
258 | 1.67k | TransactionStatusResolver::~TransactionStatusResolver() {} |
259 | | |
260 | 1.68k | void TransactionStatusResolver::Shutdown() { |
261 | 1.68k | impl_->Shutdown(); |
262 | 1.68k | } |
263 | | |
264 | | void TransactionStatusResolver::Add( |
265 | 1.72k | const TabletId& status_tablet, const TransactionId& transaction_id) { |
266 | 1.72k | impl_->Add(status_tablet, transaction_id); |
267 | 1.72k | } |
268 | | |
269 | 1.68k | void TransactionStatusResolver::Start(CoarseTimePoint deadline) { |
270 | 1.68k | impl_->Start(deadline); |
271 | 1.68k | } |
272 | | |
273 | 15 | std::future<Status> TransactionStatusResolver::ResultFuture() { |
274 | 15 | return impl_->ResultFuture(); |
275 | 15 | } |
276 | | |
277 | 3.44k | bool TransactionStatusResolver::Running() const { |
278 | 3.44k | return impl_->Running(); |
279 | 3.44k | } |
280 | | |
281 | | } // namespace tablet |
282 | | } // namespace yb |