/Users/deen/code/yugabyte-db/src/yb/tablet/running_transaction.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/running_transaction.h" |
15 | | |
16 | | #include "yb/client/transaction_rpc.h" |
17 | | |
18 | | #include "yb/common/hybrid_time.h" |
19 | | #include "yb/common/pgsql_error.h" |
20 | | |
21 | | #include "yb/tablet/transaction_participant_context.h" |
22 | | |
23 | | #include "yb/tserver/tserver_service.pb.h" |
24 | | |
25 | | #include "yb/util/flag_tags.h" |
26 | | #include "yb/util/logging.h" |
27 | | #include "yb/util/trace.h" |
28 | | #include "yb/util/tsan_util.h" |
29 | | #include "yb/util/yb_pg_errcodes.h" |
30 | | |
31 | | using namespace std::placeholders; |
32 | | using namespace std::literals; |
33 | | |
34 | | DEFINE_test_flag(uint64, transaction_delay_status_reply_usec_in_tests, 0, |
35 | | "For tests only. Delay handling status reply by specified amount of usec."); |
36 | | |
37 | | DEFINE_int64(transaction_abort_check_interval_ms, 5000 * yb::kTimeMultiplier, |
38 | | "Interval to check whether running transaction was aborted."); |
39 | | |
40 | | DEFINE_int64(transaction_abort_check_timeout_ms, 30000 * yb::kTimeMultiplier, |
41 | | "Timeout used when checking for aborted transactions."); |
42 | | |
43 | | namespace yb { |
44 | | namespace tablet { |
45 | | |
46 | | RunningTransaction::RunningTransaction(TransactionMetadata metadata, |
47 | | const TransactionalBatchData& last_batch_data, |
48 | | OneWayBitmap&& replicated_batches, |
49 | | HybridTime base_time_for_abort_check_ht_calculation, |
50 | | RunningTransactionContext* context) |
51 | | : metadata_(std::move(metadata)), |
52 | | last_batch_data_(last_batch_data), |
53 | | replicated_batches_(std::move(replicated_batches)), |
54 | | context_(*context), |
55 | | remove_intents_task_(&context->applier_, &context->participant_context_, context, |
56 | | metadata_.transaction_id), |
57 | | get_status_handle_(context->rpcs_.InvalidHandle()), |
58 | | abort_handle_(context->rpcs_.InvalidHandle()), |
59 | | apply_intents_task_(&context->applier_, context, &apply_data_), |
60 | | abort_check_ht_(base_time_for_abort_check_ht_calculation.AddDelta( |
61 | 1.67M | 1ms * FLAGS_transaction_abort_check_interval_ms)) { |
62 | 1.67M | } |
63 | | |
64 | 1.67M | RunningTransaction::~RunningTransaction() { |
65 | 1.67M | context_.rpcs_.Abort({&get_status_handle_, &abort_handle_}); |
66 | 1.67M | } |
67 | | |
68 | | void RunningTransaction::AddReplicatedBatch( |
69 | 2.31M | size_t batch_idx, boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) { |
70 | 2.31M | VLOG_WITH_PREFIX1.93k (4) << __func__ << "(" << batch_idx << ")"1.93k ; |
71 | 2.31M | replicated_batches_.Set(batch_idx); |
72 | 2.31M | encoded_replicated_batches->push_back(docdb::ValueTypeAsChar::kBitSet); |
73 | 2.31M | replicated_batches_.EncodeTo(encoded_replicated_batches); |
74 | 2.31M | } |
75 | | |
76 | 2.31M | void RunningTransaction::BatchReplicated(const TransactionalBatchData& value) { |
77 | 2.31M | VLOG_WITH_PREFIX190 (4) << __func__ << "(" << value.ToString() << ")"190 ; |
78 | 2.31M | last_batch_data_ = value; |
79 | 2.31M | } |
80 | | |
81 | | void RunningTransaction::SetLocalCommitData( |
82 | 1.30M | HybridTime time, const AbortedSubTransactionSet& aborted_subtxn_set) { |
83 | 1.30M | local_commit_aborted_subtxn_set_ = aborted_subtxn_set; |
84 | 1.30M | local_commit_time_ = time; |
85 | 1.30M | last_known_status_hybrid_time_ = local_commit_time_; |
86 | 1.30M | last_known_status_ = TransactionStatus::COMMITTED; |
87 | 1.30M | } |
88 | | |
89 | 0 | void RunningTransaction::Aborted() { |
90 | 0 | VLOG_WITH_PREFIX(4) << __func__ << "()"; |
91 | |
|
92 | 0 | last_known_status_ = TransactionStatus::ABORTED; |
93 | 0 | last_known_status_hybrid_time_ = HybridTime::kMax; |
94 | 0 | } |
95 | | |
96 | | void RunningTransaction::RequestStatusAt(const StatusRequest& request, |
97 | 404k | std::unique_lock<std::mutex>* lock) { |
98 | 404k | DCHECK_LE(request.global_limit_ht, HybridTime::kMax); |
99 | 404k | DCHECK_LE(request.read_ht, request.global_limit_ht); |
100 | | |
101 | 404k | if (last_known_status_hybrid_time_ > HybridTime::kMin) { |
102 | 224k | auto transaction_status = |
103 | 224k | GetStatusAt(request.global_limit_ht, last_known_status_hybrid_time_, last_known_status_); |
104 | | // If we don't have status at global_limit_ht, then we should request updated status. |
105 | 224k | if (transaction_status) { |
106 | 84.2k | HybridTime last_known_status_hybrid_time = last_known_status_hybrid_time_; |
107 | 84.2k | AbortedSubTransactionSet local_commit_aborted_subtxn_set; |
108 | 84.2k | if (transaction_status == TransactionStatus::COMMITTED) { |
109 | 29.9k | local_commit_aborted_subtxn_set = local_commit_aborted_subtxn_set_; |
110 | 29.9k | } |
111 | 84.2k | lock->unlock(); |
112 | 84.2k | request.callback(TransactionStatusResult{ |
113 | 84.2k | *transaction_status, last_known_status_hybrid_time, local_commit_aborted_subtxn_set}); |
114 | 84.2k | return; |
115 | 84.2k | } |
116 | 224k | } |
117 | 319k | bool was_empty = status_waiters_.empty(); |
118 | 319k | status_waiters_.push_back(request); |
119 | 319k | if (!was_empty) { |
120 | 45.8k | return; |
121 | 45.8k | } |
122 | 273k | auto request_id = context_.NextRequestIdUnlocked(); |
123 | 273k | auto shared_self = shared_from_this(); |
124 | | |
125 | 18.4E | VLOG_WITH_PREFIX(4) << Format( |
126 | 18.4E | "Existing status knowledge ($0, $1) does not satisfy requested: $2, sending: $3", |
127 | 18.4E | TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_, request, |
128 | 18.4E | request_id); |
129 | | |
130 | 273k | lock->unlock(); |
131 | 273k | SendStatusRequest(request_id, shared_self); |
132 | 273k | } |
133 | | |
134 | 1.41M | bool RunningTransaction::WasAborted() const { |
135 | 1.41M | return last_known_status_ == TransactionStatus::ABORTED; |
136 | 1.41M | } |
137 | | |
138 | 1.23M | CHECKED_STATUS RunningTransaction::CheckAborted() const { |
139 | 1.23M | if (WasAborted()) { |
140 | 855 | return MakeAbortedStatus(id()); |
141 | 855 | } |
142 | 1.23M | return Status::OK(); |
143 | 1.23M | } |
144 | | |
145 | | void RunningTransaction::Abort(client::YBClient* client, |
146 | | TransactionStatusCallback callback, |
147 | 56.1k | std::unique_lock<std::mutex>* lock) { |
148 | 56.1k | if (last_known_status_ == TransactionStatus::ABORTED || |
149 | 56.1k | last_known_status_ == TransactionStatus::COMMITTED55.6k ) { |
150 | | // Transaction is already in final state, so no reason to send abort request. |
151 | 2.48k | VLOG_WITH_PREFIX0 (3) << "Abort shortcut: " << last_known_status_0 ; |
152 | 2.48k | TransactionStatusResult status{last_known_status_, last_known_status_hybrid_time_}; |
153 | 2.48k | lock->unlock(); |
154 | 2.48k | callback(status); |
155 | 2.48k | return; |
156 | 2.48k | } |
157 | 53.6k | bool was_empty = abort_waiters_.empty(); |
158 | 53.6k | abort_waiters_.push_back(std::move(callback)); |
159 | 53.6k | lock->unlock(); |
160 | 53.6k | VLOG_WITH_PREFIX1 (3) << "Abort request: " << was_empty1 ; |
161 | 53.6k | if (!was_empty) { |
162 | 10.8k | return; |
163 | 10.8k | } |
164 | 42.7k | tserver::AbortTransactionRequestPB req; |
165 | 42.7k | req.set_tablet_id(metadata_.status_tablet); |
166 | 42.7k | req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
167 | 42.7k | req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64()); |
168 | 42.7k | context_.rpcs_.RegisterAndStart( |
169 | 42.7k | client::AbortTransaction( |
170 | 42.7k | TransactionRpcDeadline(), |
171 | 42.7k | nullptr /* tablet */, |
172 | 42.7k | client, |
173 | 42.7k | &req, |
174 | 42.7k | std::bind(&RunningTransaction::AbortReceived, this, _1, _2, shared_from_this())), |
175 | 42.7k | &abort_handle_); |
176 | 42.7k | } |
177 | | |
178 | 0 | std::string RunningTransaction::ToString() const { |
179 | 0 | return Format("{ metadata: $0 last_batch_data: $1 replicated_batches: $2 local_commit_time: $3 " |
180 | 0 | "last_known_status: $4 last_known_status_hybrid_time: $5 }", |
181 | 0 | metadata_, last_batch_data_, replicated_batches_, local_commit_time_, |
182 | 0 | TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_); |
183 | 0 | } |
184 | | |
185 | 1.67M | void RunningTransaction::ScheduleRemoveIntents(const RunningTransactionPtr& shared_self) { |
186 | 1.67M | if (remove_intents_task_.Prepare(shared_self)) { |
187 | 1.66M | context_.participant_context_.StrandEnqueue(&remove_intents_task_); |
188 | 18.4E | VLOG_WITH_PREFIX(1) << "Intents should be removed asynchronously"; |
189 | 1.66M | } |
190 | 1.67M | } |
191 | | |
192 | | boost::optional<TransactionStatus> RunningTransaction::GetStatusAt( |
193 | | HybridTime time, |
194 | | HybridTime last_known_status_hybrid_time, |
195 | 544k | TransactionStatus last_known_status) { |
196 | 544k | switch (last_known_status) { |
197 | 110k | case TransactionStatus::ABORTED: |
198 | 110k | return TransactionStatus::ABORTED; |
199 | 79.1k | case TransactionStatus::COMMITTED: |
200 | 79.1k | return last_known_status_hybrid_time > time |
201 | 79.1k | ? TransactionStatus::PENDING2.88k |
202 | 79.1k | : TransactionStatus::COMMITTED76.2k ; |
203 | 355k | case TransactionStatus::PENDING: |
204 | 355k | if (last_known_status_hybrid_time >= time) { |
205 | 154k | return TransactionStatus::PENDING; |
206 | 154k | } |
207 | 200k | return boost::none; |
208 | 0 | default: |
209 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, last_known_status); |
210 | 544k | } |
211 | 544k | } |
212 | | |
213 | | void RunningTransaction::SendStatusRequest( |
214 | 273k | int64_t serial_no, const RunningTransactionPtr& shared_self) { |
215 | 273k | TRACE_FUNC(); |
216 | 273k | VTRACE(1, yb::ToString(metadata_.transaction_id)); |
217 | 273k | tserver::GetTransactionStatusRequestPB req; |
218 | 273k | req.set_tablet_id(metadata_.status_tablet); |
219 | 273k | req.add_transaction_id()->assign( |
220 | 273k | pointer_cast<const char*>(metadata_.transaction_id.data()), metadata_.transaction_id.size()); |
221 | 273k | req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64()); |
222 | 273k | context_.rpcs_.RegisterAndStart( |
223 | 273k | client::GetTransactionStatus( |
224 | 273k | TransactionRpcDeadline(), |
225 | 273k | nullptr /* tablet */, |
226 | 273k | context_.participant_context_.client_future().get(), |
227 | 273k | &req, |
228 | 273k | std::bind(&RunningTransaction::StatusReceived, this, _1, _2, serial_no, shared_self)), |
229 | 273k | &get_status_handle_); |
230 | 273k | } |
231 | | |
232 | | void RunningTransaction::StatusReceived( |
233 | | const Status& status, |
234 | | const tserver::GetTransactionStatusResponsePB& response, |
235 | | int64_t serial_no, |
236 | 273k | const RunningTransactionPtr& shared_self) { |
237 | 273k | auto delay_usec = FLAGS_TEST_transaction_delay_status_reply_usec_in_tests; |
238 | 273k | if (delay_usec > 0) { |
239 | 0 | context_.delayer().Delay( |
240 | 0 | MonoTime::Now() + MonoDelta::FromMicroseconds(delay_usec), |
241 | 0 | std::bind(&RunningTransaction::DoStatusReceived, this, status, response, |
242 | 0 | serial_no, shared_self)); |
243 | 273k | } else { |
244 | 273k | DoStatusReceived(status, response, serial_no, shared_self); |
245 | 273k | } |
246 | 273k | } |
247 | | |
248 | | bool RunningTransaction::UpdateStatus( |
249 | | TransactionStatus transaction_status, HybridTime time_of_status, |
250 | 301k | HybridTime coordinator_safe_time, AbortedSubTransactionSet aborted_subtxn_set) { |
251 | | // Check for local_commit_time_ is not required for correctness, but useful for optimization. |
252 | | // So we could avoid unnecessary actions. |
253 | 301k | if (local_commit_time_) { |
254 | 10.3k | return false; |
255 | 10.3k | } |
256 | | |
257 | 291k | if (transaction_status == TransactionStatus::COMMITTED) { |
258 | 31.5k | local_commit_aborted_subtxn_set_ = aborted_subtxn_set; |
259 | 31.5k | } |
260 | | |
261 | 291k | if (transaction_status == TransactionStatus::ABORTED && coordinator_safe_time77.8k ) { |
262 | 77.8k | time_of_status = coordinator_safe_time; |
263 | 77.8k | } |
264 | 291k | last_known_status_hybrid_time_ = time_of_status; |
265 | | |
266 | 291k | if (transaction_status == last_known_status_) { |
267 | 104k | return false; |
268 | 104k | } |
269 | | |
270 | 187k | last_known_status_ = transaction_status; |
271 | | |
272 | 187k | return transaction_status == TransactionStatus::ABORTED; |
273 | 291k | } |
274 | | |
275 | | void RunningTransaction::DoStatusReceived(const Status& status, |
276 | | const tserver::GetTransactionStatusResponsePB& response, |
277 | | int64_t serial_no, |
278 | 273k | const RunningTransactionPtr& shared_self) { |
279 | 273k | TRACE("$0: $1", __func__, response.ShortDebugString()); |
280 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", " |
281 | 18.4E | << serial_no << ")"; |
282 | | |
283 | 273k | if (response.has_propagated_hybrid_time()) { |
284 | 273k | context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); |
285 | 273k | } |
286 | | |
287 | 273k | context_.rpcs_.Unregister(&get_status_handle_); |
288 | 273k | decltype(status_waiters_) status_waiters; |
289 | 273k | HybridTime time_of_status = HybridTime::kMin; |
290 | 273k | TransactionStatus transaction_status = TransactionStatus::PENDING; |
291 | 273k | AbortedSubTransactionSet aborted_subtxn_set; |
292 | 273k | const bool ok = status.ok(); |
293 | 273k | int64_t new_request_id = -1; |
294 | 273k | { |
295 | 273k | MinRunningNotifier min_running_notifier(&context_.applier_); |
296 | 273k | std::unique_lock<std::mutex> lock(context_.mutex_); |
297 | 273k | if (!ok) { |
298 | 1 | status_waiters_.swap(status_waiters); |
299 | 1 | lock.unlock(); |
300 | 1 | for (const auto& waiter : status_waiters) { |
301 | 1 | waiter.callback(status); |
302 | 1 | } |
303 | 1 | return; |
304 | 1 | } |
305 | | |
306 | 273k | if (response.status_hybrid_time().size() != 1 || |
307 | 273k | response.status().size() != 1 || |
308 | 273k | (response.aborted_subtxn_set().size() != 0 && response.aborted_subtxn_set().size() != 1273k )) { |
309 | 0 | LOG_WITH_PREFIX(DFATAL) |
310 | 0 | << "Wrong number of status, status hybrid time, or aborted subtxn set entries, " |
311 | 0 | << "exactly one entry expected: " |
312 | 0 | << response.ShortDebugString(); |
313 | 273k | } else if (PREDICT_FALSE(response.aborted_subtxn_set().empty())) { |
314 | 0 | YB_LOG_EVERY_N(WARNING, 1) |
315 | 0 | << "Empty aborted_subtxn_set in transaction status response. " |
316 | 0 | << "This should only happen when nodes are on different versions, e.g. during upgrade."; |
317 | 273k | } else { |
318 | 273k | auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB( |
319 | 273k | response.aborted_subtxn_set(0).set()); |
320 | 273k | if (aborted_subtxn_set_or_status.ok()273k ) { |
321 | 273k | time_of_status = HybridTime(response.status_hybrid_time()[0]); |
322 | 273k | transaction_status = response.status(0); |
323 | 273k | aborted_subtxn_set = aborted_subtxn_set_or_status.get(); |
324 | 18.4E | } else { |
325 | 18.4E | LOG_WITH_PREFIX(DFATAL) |
326 | 18.4E | << "Could not deserialize AbortedSubTransactionSet: " |
327 | 18.4E | << "error - " << aborted_subtxn_set_or_status.status().ToString() |
328 | 18.4E | << " response - " << response.ShortDebugString(); |
329 | 18.4E | } |
330 | 273k | } |
331 | | |
332 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, response.coordinator_safe_time().size() > 1) |
333 | 18.4E | << "Wrong number of coordinator safe time entries, at most one expected: " |
334 | 18.4E | << response.ShortDebugString(); |
335 | 273k | auto coordinator_safe_time = response.coordinator_safe_time().size() == 1 |
336 | 273k | ? HybridTime::FromPB(response.coordinator_safe_time(0))58.1k : HybridTime()215k ; |
337 | 273k | auto did_abort_txn = UpdateStatus( |
338 | 273k | transaction_status, time_of_status, coordinator_safe_time, aborted_subtxn_set); |
339 | 273k | if (did_abort_txn) { |
340 | 56.2k | context_.EnqueueRemoveUnlocked(id(), RemoveReason::kStatusReceived, &min_running_notifier); |
341 | 56.2k | } |
342 | | |
343 | 273k | time_of_status = last_known_status_hybrid_time_; |
344 | 273k | transaction_status = last_known_status_; |
345 | 273k | aborted_subtxn_set = local_commit_aborted_subtxn_set_; |
346 | | |
347 | 273k | status_waiters = ExtractFinishedStatusWaitersUnlocked( |
348 | 273k | serial_no, time_of_status, transaction_status); |
349 | 273k | if (!status_waiters_.empty()) { |
350 | 37 | new_request_id = context_.NextRequestIdUnlocked(); |
351 | 37 | VLOG_WITH_PREFIX0 (4) << "Waiters still present, send new status request: " << new_request_id0 ; |
352 | 37 | } |
353 | 273k | } |
354 | 273k | if (new_request_id >= 0) { |
355 | 37 | SendStatusRequest(new_request_id, shared_self); |
356 | 37 | } |
357 | 273k | NotifyWaiters(serial_no, time_of_status, transaction_status, aborted_subtxn_set, status_waiters); |
358 | 273k | } |
359 | | |
360 | | std::vector<StatusRequest> RunningTransaction::ExtractFinishedStatusWaitersUnlocked( |
361 | 274k | int64_t serial_no, HybridTime time_of_status, TransactionStatus transaction_status) { |
362 | 274k | if (transaction_status == TransactionStatus::ABORTED) { |
363 | 57.6k | return std::move(status_waiters_); |
364 | 57.6k | } |
365 | 216k | std::vector<StatusRequest> result; |
366 | 216k | result.reserve(status_waiters_.size()); |
367 | 216k | auto w = status_waiters_.begin(); |
368 | 477k | for (auto it = status_waiters_.begin(); it != status_waiters_.end(); ++it261k ) { |
369 | 261k | if (it->serial_no <= serial_no || |
370 | 261k | GetStatusAt(it->global_limit_ht, time_of_status, transaction_status)78 || |
371 | 261k | time_of_status < it->read_ht51 ) { |
372 | 261k | result.push_back(std::move(*it)); |
373 | 261k | } else { |
374 | 48 | if (w != it) { |
375 | 37 | *w = std::move(*it); |
376 | 37 | } |
377 | 48 | ++w; |
378 | 48 | } |
379 | 261k | } |
380 | 216k | status_waiters_.erase(w, status_waiters_.end()); |
381 | 216k | return result; |
382 | 274k | } |
383 | | |
384 | | void RunningTransaction::NotifyWaiters(int64_t serial_no, HybridTime time_of_status, |
385 | | TransactionStatus transaction_status, |
386 | | const AbortedSubTransactionSet& aborted_subtxn_set, |
387 | 273k | const std::vector<StatusRequest>& status_waiters) { |
388 | 319k | for (const auto& waiter : status_waiters) { |
389 | 319k | auto status_for_waiter = GetStatusAt( |
390 | 319k | waiter.global_limit_ht, time_of_status, transaction_status); |
391 | 319k | if (status_for_waiter) { |
392 | | // We know status at global_limit_ht, so could notify waiter. |
393 | 259k | auto result = TransactionStatusResult{*status_for_waiter, time_of_status}; |
394 | 259k | if (result.status == TransactionStatus::COMMITTED) { |
395 | 46.2k | result.aborted_subtxn_set = aborted_subtxn_set; |
396 | 46.2k | } |
397 | 259k | waiter.callback(std::move(result)); |
398 | 259k | } else if (59.9k time_of_status >= waiter.read_ht59.9k ) { |
399 | | // It means that between read_ht and global_limit_ht transaction was pending. |
400 | | // It implies that transaction was not committed before request was sent. |
401 | | // We could safely respond PENDING to caller. |
402 | 11.0k | LOG_IF_WITH_PREFIX0 (DFATAL, waiter.serial_no > serial_no) |
403 | 0 | << "Notify waiter with request id greater than id of status request: " |
404 | 0 | << waiter.serial_no << " vs " << serial_no; |
405 | 11.0k | waiter.callback(TransactionStatusResult{TransactionStatus::PENDING, time_of_status}); |
406 | 48.9k | } else { |
407 | 48.9k | waiter.callback(STATUS(TryAgain, |
408 | 48.9k | Format("Cannot determine transaction status with read_ht $0, and global_limit_ht $1, " |
409 | 48.9k | "last known: $2 at $3", waiter.read_ht, waiter.global_limit_ht, |
410 | 48.9k | TransactionStatus_Name(transaction_status), time_of_status), Slice(), |
411 | 48.9k | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) )); |
412 | 48.9k | } |
413 | 319k | } |
414 | 273k | } |
415 | | |
416 | | Result<TransactionStatusResult> RunningTransaction::MakeAbortResult( |
417 | | const Status& status, |
418 | 42.7k | const tserver::AbortTransactionResponsePB& response) { |
419 | 42.7k | if (!status.ok()) { |
420 | 0 | return status; |
421 | 0 | } |
422 | | |
423 | 42.7k | HybridTime status_time = response.has_status_hybrid_time() |
424 | 42.7k | ? HybridTime(response.status_hybrid_time())22.5k |
425 | 42.7k | : HybridTime::kInvalid20.1k ; |
426 | 42.7k | return TransactionStatusResult{response.status(), status_time, AbortedSubTransactionSet()}; |
427 | 42.7k | } |
428 | | |
429 | | void RunningTransaction::AbortReceived(const Status& status, |
430 | | const tserver::AbortTransactionResponsePB& response, |
431 | 42.7k | const RunningTransactionPtr& shared_self) { |
432 | 42.7k | if (response.has_propagated_hybrid_time()) { |
433 | 42.7k | context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); |
434 | 42.7k | } |
435 | | |
436 | 42.7k | decltype(abort_waiters_) abort_waiters; |
437 | 42.7k | auto result = MakeAbortResult(status, response); |
438 | | |
439 | 42.7k | VLOG_WITH_PREFIX3 (3) << "AbortReceived: " << yb::ToString(result)3 ; |
440 | | |
441 | 42.7k | { |
442 | 42.7k | MinRunningNotifier min_running_notifier(&context_.applier_); |
443 | 42.7k | std::lock_guard<std::mutex> lock(context_.mutex_); |
444 | 42.7k | context_.rpcs_.Unregister(&abort_handle_); |
445 | 42.7k | abort_waiters_.swap(abort_waiters); |
446 | | // kMax status_time means that this status is not yet replicated and could be rejected. |
447 | | // So we could use it as reply to Abort, but cannot store it as transaction status. |
448 | 42.7k | if (result.ok()42.7k && result->status_time != HybridTime::kMax) { |
449 | 26.0k | auto coordinator_safe_time = HybridTime::FromPB(response.coordinator_safe_time()); |
450 | 26.0k | if (UpdateStatus( |
451 | 26.0k | result->status, result->status_time, coordinator_safe_time, result->aborted_subtxn_set)) { |
452 | 19.7k | context_.EnqueueRemoveUnlocked(id(), RemoveReason::kAbortReceived, &min_running_notifier); |
453 | 19.7k | } |
454 | 26.0k | } |
455 | 42.7k | } |
456 | 53.6k | for (const auto& waiter : abort_waiters) { |
457 | 53.6k | waiter(result); |
458 | 53.6k | } |
459 | 42.7k | } |
460 | | |
461 | 0 | std::string RunningTransaction::LogPrefix() const { |
462 | 0 | return Format( |
463 | 0 | "$0 ID $1: ", context_.LogPrefix().substr(0, context_.LogPrefix().length() - 2), id()); |
464 | 0 | } |
465 | | |
466 | 859 | Status MakeAbortedStatus(const TransactionId& id) { |
467 | 859 | return STATUS( |
468 | 859 | TryAgain, Format("Transaction aborted: $0", id), Slice(), |
469 | 859 | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)); |
470 | 859 | } |
471 | | |
472 | | void RunningTransaction::SetApplyData(const docdb::ApplyTransactionState& apply_state, |
473 | | const TransactionApplyData* data, |
474 | 243 | ScopedRWOperation* operation) { |
475 | | // TODO(savepoints): Add test to ensure that apply_state.aborted is properly set here. |
476 | 243 | apply_state_ = apply_state; |
477 | 243 | bool active = apply_state_.active(); |
478 | 243 | if (active) { |
479 | | // We are trying to assign set processing apply before starting actual process, and unset |
480 | | // after we complete processing. |
481 | 183 | processing_apply_.store(true, std::memory_order_release); |
482 | 183 | } |
483 | | |
484 | 243 | if (data) { |
485 | 69 | if (!active) { |
486 | 0 | LOG_WITH_PREFIX(DFATAL) |
487 | 0 | << "Starting processing apply, but provided data in inactive state: " << data->ToString(); |
488 | 0 | return; |
489 | 0 | } |
490 | | |
491 | 69 | apply_data_ = *data; |
492 | 69 | apply_data_.apply_state = &apply_state_; |
493 | | |
494 | 69 | LOG_IF_WITH_PREFIX0 (DFATAL, local_commit_time_ != data->commit_ht) |
495 | 0 | << "Commit time does not match: " << local_commit_time_ << " vs " << data->commit_ht; |
496 | | |
497 | 69 | if (apply_intents_task_.Prepare(shared_from_this(), operation)) { |
498 | 69 | context_.participant_context_.StrandEnqueue(&apply_intents_task_); |
499 | 69 | } else { |
500 | 0 | LOG_WITH_PREFIX(DFATAL) << "Unable to prepare apply intents task"; |
501 | 0 | } |
502 | 69 | } |
503 | | |
504 | 243 | if (!active) { |
505 | 60 | processing_apply_.store(false, std::memory_order_release); |
506 | | |
507 | 60 | VLOG_WITH_PREFIX0 (3) << "Finished applying intents"0 ; |
508 | | |
509 | 60 | MinRunningNotifier min_running_notifier(&context_.applier_); |
510 | 60 | std::lock_guard<std::mutex> lock(context_.mutex_); |
511 | 60 | context_.RemoveUnlocked(id(), RemoveReason::kLargeApplied, &min_running_notifier); |
512 | 60 | } |
513 | 243 | } |
514 | | |
515 | 1.30M | void RunningTransaction::SetOpId(const OpId& id) { |
516 | 1.30M | opId.index = id.index; |
517 | 1.30M | opId.term = id.term; |
518 | 1.30M | } |
519 | | |
520 | 2.41M | bool RunningTransaction::ProcessingApply() const { |
521 | 2.41M | return processing_apply_.load(std::memory_order_acquire); |
522 | 2.41M | } |
523 | | |
524 | 3.39k | void RunningTransaction::UpdateAbortCheckHT(HybridTime now, UpdateAbortCheckHTMode mode) { |
525 | 3.39k | if (last_known_status_ == TransactionStatus::ABORTED || |
526 | 3.39k | last_known_status_ == TransactionStatus::COMMITTED3.35k ) { |
527 | 199 | abort_check_ht_ = HybridTime::kMax; |
528 | 199 | return; |
529 | 199 | } |
530 | | // When we send a status request, we schedule the transaction status to be re-checked around the |
531 | | // same time the request is supposed to time out. When we get a status response (normal case, no |
532 | | // timeout), we go back to the normal interval of re-checking the status of this transaction. |
533 | 3.19k | auto delta_ms = mode == UpdateAbortCheckHTMode::kStatusRequestSent |
534 | 3.19k | ? FLAGS_transaction_abort_check_timeout_ms1.59k |
535 | 3.19k | : FLAGS_transaction_abort_check_interval_ms1.59k ; |
536 | 3.19k | abort_check_ht_ = now.AddDelta(1ms * delta_ms); |
537 | 3.19k | } |
538 | | |
539 | | } // namespace tablet |
540 | | } // namespace yb |