/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_status_resolver.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/transaction_status_resolver.h" |
15 | | |
16 | | #include "yb/client/transaction_rpc.h" |
17 | | |
18 | | #include "yb/common/wire_protocol.h" |
19 | | |
20 | | #include "yb/rpc/rpc.h" |
21 | | |
22 | | #include "yb/tablet/transaction_participant_context.h" |
23 | | |
24 | | #include "yb/tserver/tserver_service.pb.h" |
25 | | |
26 | | #include "yb/util/atomic.h" |
27 | | #include "yb/util/countdown_latch.h" |
28 | | #include "yb/util/flag_tags.h" |
29 | | #include "yb/util/logging.h" |
30 | | #include "yb/util/result.h" |
31 | | #include "yb/util/status_format.h" |
32 | | |
33 | | DEFINE_test_flag(int32, inject_status_resolver_delay_ms, 0, |
34 | | "Inject delay before launching transaction status resolver RPC."); |
35 | | |
36 | | DEFINE_test_flag(int32, inject_status_resolver_complete_delay_ms, 0, |
37 | | "Inject delay before counting down latch in transaction status resolver " |
38 | | "complete."); |
39 | | |
40 | | using namespace std::literals; |
41 | | using namespace std::placeholders; |
42 | | |
43 | | namespace yb { |
44 | | namespace tablet { |
45 | | |
46 | | class TransactionStatusResolver::Impl { |
47 | | public: |
48 | | Impl(TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs, |
49 | | int max_transactions_per_request, TransactionStatusResolverCallback callback) |
50 | | : participant_context_(*participant_context), rpcs_(*rpcs), |
51 | | max_transactions_per_request_(max_transactions_per_request), callback_(std::move(callback)), |
52 | 428 | log_prefix_(participant_context->LogPrefix()), handle_(rpcs_.InvalidHandle()) {} |
53 | | |
54 | 428 | ~Impl() { |
55 | 12 | LOG_IF_WITH_PREFIX(DFATAL, !closing_.load(std::memory_order_acquire)) |
56 | 12 | << "Destroy resolver without Shutdown"; |
57 | 428 | } |
58 | | |
59 | 427 | void Shutdown() { |
60 | 427 | closing_.store(true, std::memory_order_release); |
61 | 419 | for (;;) { |
62 | 425 | if (run_latch_.WaitFor(10s)) { |
63 | 425 | break; |
64 | 425 | } |
65 | 18.4E | LOG_WITH_PREFIX(DFATAL) << "Long wait for transaction status resolver to shutdown"; |
66 | 18.4E | } |
67 | 427 | } |
68 | | |
69 | 423 | void Start(CoarseTimePoint deadline) { |
70 | 8 | VLOG_WITH_PREFIX(2) << "Start, queues: " << queues_.size(); |
71 | | |
72 | 423 | deadline_ = deadline; |
73 | 423 | run_latch_.Reset(1); |
74 | 423 | Execute(); |
75 | 423 | } |
76 | | |
77 | 0 | std::future<Status> ResultFuture() { |
78 | 0 | return result_promise_.get_future(); |
79 | 0 | } |
80 | | |
81 | 849 | bool Running() { |
82 | 849 | return run_latch_.count() != 0; |
83 | 849 | } |
84 | | |
85 | 423 | void Add(const TabletId& status_tablet, const TransactionId& transaction_id) { |
86 | 18.4E | LOG_IF(DFATAL, run_latch_.count()) << "Add while running"; |
87 | 423 | queues_[status_tablet].push_back(transaction_id); |
88 | 423 | } |
89 | | |
90 | | private: |
91 | 857 | void Execute() { |
92 | 4 | LOG_IF(DFATAL, !run_latch_.count()) << "Execute while running is false"; |
93 | | |
94 | 857 | if (CoarseMonoClock::now() >= deadline_) { |
95 | 0 | Complete(STATUS(TimedOut, "Timed out to resolve transaction statuses")); |
96 | 0 | return; |
97 | 0 | } |
98 | 857 | if (closing_.load(std::memory_order_acquire)) { |
99 | 0 | Complete(STATUS(Aborted, "Aborted because of shutdown")); |
100 | 0 | return; |
101 | 0 | } |
102 | 857 | if (queues_.empty() || max_transactions_per_request_ <= 0) { |
103 | 431 | Complete(Status::OK()); |
104 | 431 | return; |
105 | 431 | } |
106 | | |
107 | | // We access queues_ only while adding transaction and after that while resolving |
108 | | // transaction statuses, which is NOT concurrent. |
109 | | // So we could avoid doing synchronization here. |
110 | 426 | auto& tablet_id_and_queue = *queues_.begin(); |
111 | 426 | tserver::GetTransactionStatusRequestPB req; |
112 | 426 | req.set_tablet_id(tablet_id_and_queue.first); |
113 | 426 | req.set_propagated_hybrid_time(participant_context_.Now().ToUint64()); |
114 | 426 | const auto& tablet_queue = tablet_id_and_queue.second; |
115 | 426 | auto request_size = std::min<size_t>(max_transactions_per_request_, tablet_queue.size()); |
116 | 859 | for (size_t i = 0; i != request_size; ++i) { |
117 | 433 | const auto& txn_id = tablet_queue[i]; |
118 | 18.4E | VLOG_WITH_PREFIX(4) << "Checking txn status: " << txn_id; |
119 | 433 | req.add_transaction_id()->assign(pointer_cast<const char*>(txn_id.data()), txn_id.size()); |
120 | 433 | } |
121 | | |
122 | 426 | AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_delay_ms); |
123 | | |
124 | 426 | auto client = participant_context_.client_future().get(); |
125 | 432 | if (!client || !rpcs_.RegisterAndStart( |
126 | 432 | client::GetTransactionStatus( |
127 | 432 | std::min(deadline_, TransactionRpcDeadline()), |
128 | 432 | nullptr /* tablet */, |
129 | 432 | client, |
130 | 432 | &req, |
131 | 432 | std::bind(&Impl::StatusReceived, this, _1, _2, request_size)), |
132 | 0 | &handle_)) { |
133 | 0 | Complete(STATUS(Aborted, "Aborted because cannot start RPC")); |
134 | 0 | } |
135 | 426 | } |
136 | | |
137 | 0 | const std::string& LogPrefix() const { |
138 | 0 | return log_prefix_; |
139 | 0 | } |
140 | | |
141 | | void StatusReceived(Status status, |
142 | | const tserver::GetTransactionStatusResponsePB& response, |
143 | 432 | int request_size) { |
144 | 1 | VLOG_WITH_PREFIX(2) << "Received statuses: " << status << ", " << response.ShortDebugString(); |
145 | | |
146 | 432 | rpcs_.Unregister(&handle_); |
147 | | |
148 | 432 | if (status.ok() && response.has_error()) { |
149 | 0 | status = StatusFromPB(response.error().status()); |
150 | 0 | } |
151 | | |
152 | 432 | if (!status.ok()) { |
153 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to request transaction statuses: " << status; |
154 | 0 | if (status.IsAborted()) { |
155 | 0 | Complete(status); |
156 | 0 | } else { |
157 | 0 | Execute(); |
158 | 0 | } |
159 | 0 | return; |
160 | 0 | } |
161 | | |
162 | 432 | if (response.has_propagated_hybrid_time()) { |
163 | 428 | participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); |
164 | 428 | } |
165 | | |
166 | 432 | if ((response.status().size() != 1 && response.status().size() != request_size) || |
167 | 429 | (response.aborted_subtxn_set().size() != 1 && |
168 | 2 | response.aborted_subtxn_set().size() != request_size)) { |
169 | | // Node with old software version would always return 1 status. |
170 | 0 | LOG_WITH_PREFIX(DFATAL) |
171 | 0 | << "Bad response size, expected " << request_size << " entries, but found: " |
172 | 0 | << response.ShortDebugString() << ", queue: " << AsString(queues_); |
173 | 0 | Execute(); |
174 | 0 | return; |
175 | 0 | } |
176 | | |
177 | 432 | status_infos_.clear(); |
178 | 432 | status_infos_.resize(response.status().size()); |
179 | 432 | auto it = queues_.begin(); |
180 | 432 | auto& queue = it->second; |
181 | 866 | for (int i = 0; i != response.status().size(); ++i) { |
182 | 433 | auto& status_info = status_infos_[i]; |
183 | 433 | status_info.transaction_id = queue.front(); |
184 | 433 | status_info.status = response.status(i); |
185 | | |
186 | 433 | auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB( |
187 | 433 | response.aborted_subtxn_set(i).set()); |
188 | 433 | if (!aborted_subtxn_set_or_status.ok()) { |
189 | 0 | Complete(STATUS_FORMAT( |
190 | 0 | IllegalState, "Cannot deserialize AbortedSubTransactionSet: $0", |
191 | 0 | response.aborted_subtxn_set(i).DebugString())); |
192 | 0 | return; |
193 | 0 | } |
194 | 433 | status_info.aborted_subtxn_set = aborted_subtxn_set_or_status.get(); |
195 | | |
196 | 434 | if (i < response.status_hybrid_time().size()) { |
197 | 434 | status_info.status_ht = HybridTime(response.status_hybrid_time(i)); |
198 | | // Could happen only when coordinator has an old version. |
199 | 18.4E | } else if (status_info.status == TransactionStatus::ABORTED) { |
200 | 0 | status_info.status_ht = HybridTime::kMax; |
201 | 18.4E | } else { |
202 | 18.4E | Complete(STATUS_FORMAT( |
203 | 18.4E | IllegalState, "Missing status hybrid time for transaction status: $0", |
204 | 18.4E | TransactionStatus_Name(status_info.status))); |
205 | 18.4E | return; |
206 | 18.4E | } |
207 | 434 | status_info.coordinator_safe_time = i < response.coordinator_safe_time().size() |
208 | 417 | ? HybridTime::FromPB(response.coordinator_safe_time(i)) : HybridTime(); |
209 | 18.4E | VLOG_WITH_PREFIX(4) << "Status: " << status_info.ToString(); |
210 | 434 | queue.pop_front(); |
211 | 434 | } |
212 | 433 | if (queue.empty()) { |
213 | 18.4E | VLOG_WITH_PREFIX(2) << "Processed queue for: " << it->first; |
214 | 429 | queues_.erase(it); |
215 | 429 | } |
216 | | |
217 | 433 | callback_(status_infos_); |
218 | | |
219 | 433 | Execute(); |
220 | 433 | } |
221 | | |
222 | 428 | void Complete(const Status& status) { |
223 | 18.4E | VLOG_WITH_PREFIX(2) << "Complete: " << status; |
224 | 428 | result_promise_.set_value(status); |
225 | 428 | AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_complete_delay_ms); |
226 | 428 | run_latch_.CountDown(); |
227 | 428 | } |
228 | | |
229 | | TransactionParticipantContext& participant_context_; |
230 | | rpc::Rpcs& rpcs_; |
231 | | const int max_transactions_per_request_; |
232 | | TransactionStatusResolverCallback callback_; |
233 | | |
234 | | const std::string log_prefix_; |
235 | | rpc::Rpcs::Handle handle_; |
236 | | |
237 | | std::atomic<bool> closing_{false}; |
238 | | CountDownLatch run_latch_{0}; |
239 | | CoarseTimePoint deadline_; |
240 | | std::unordered_map<TabletId, std::deque<TransactionId>> queues_; |
241 | | std::vector<TransactionStatusInfo> status_infos_; |
242 | | std::promise<Status> result_promise_; |
243 | | }; |
244 | | |
245 | | TransactionStatusResolver::TransactionStatusResolver( |
246 | | TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs, |
247 | | int max_transactions_per_request, TransactionStatusResolverCallback callback) |
248 | | : impl_(new Impl( |
249 | 429 | participant_context, rpcs, max_transactions_per_request, std::move(callback))) { |
250 | 429 | } |
251 | | |
252 | 428 | TransactionStatusResolver::~TransactionStatusResolver() {} |
253 | | |
254 | 426 | void TransactionStatusResolver::Shutdown() { |
255 | 426 | impl_->Shutdown(); |
256 | 426 | } |
257 | | |
258 | | void TransactionStatusResolver::Add( |
259 | 432 | const TabletId& status_tablet, const TransactionId& transaction_id) { |
260 | 432 | impl_->Add(status_tablet, transaction_id); |
261 | 432 | } |
262 | | |
263 | 424 | void TransactionStatusResolver::Start(CoarseTimePoint deadline) { |
264 | 424 | impl_->Start(deadline); |
265 | 424 | } |
266 | | |
267 | 0 | std::future<Status> TransactionStatusResolver::ResultFuture() { |
268 | 0 | return impl_->ResultFuture(); |
269 | 0 | } |
270 | | |
271 | 856 | bool TransactionStatusResolver::Running() const { |
272 | 856 | return impl_->Running(); |
273 | 856 | } |
274 | | |
275 | | } // namespace tablet |
276 | | } // namespace yb |