/Users/deen/code/yugabyte-db/src/yb/tablet/running_transaction.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/running_transaction.h" |
15 | | |
16 | | #include "yb/client/transaction_rpc.h" |
17 | | |
18 | | #include "yb/common/hybrid_time.h" |
19 | | #include "yb/common/pgsql_error.h" |
20 | | |
21 | | #include "yb/tablet/transaction_participant_context.h" |
22 | | |
23 | | #include "yb/tserver/tserver_service.pb.h" |
24 | | |
25 | | #include "yb/util/flag_tags.h" |
26 | | #include "yb/util/logging.h" |
27 | | #include "yb/util/trace.h" |
28 | | #include "yb/util/tsan_util.h" |
29 | | #include "yb/util/yb_pg_errcodes.h" |
30 | | |
31 | | using namespace std::placeholders; |
32 | | using namespace std::literals; |
33 | | |
34 | | DEFINE_test_flag(uint64, transaction_delay_status_reply_usec_in_tests, 0, |
35 | | "For tests only. Delay handling status reply by specified amount of usec."); |
36 | | |
37 | | DEFINE_int64(transaction_abort_check_interval_ms, 5000 * yb::kTimeMultiplier, |
38 | | "Interval to check whether running transaction was aborted."); |
39 | | |
40 | | DEFINE_int64(transaction_abort_check_timeout_ms, 30000 * yb::kTimeMultiplier, |
41 | | "Timeout used when checking for aborted transactions."); |
42 | | |
43 | | namespace yb { |
44 | | namespace tablet { |
45 | | |
46 | | RunningTransaction::RunningTransaction(TransactionMetadata metadata, |
47 | | const TransactionalBatchData& last_batch_data, |
48 | | OneWayBitmap&& replicated_batches, |
49 | | HybridTime base_time_for_abort_check_ht_calculation, |
50 | | RunningTransactionContext* context) |
51 | | : metadata_(std::move(metadata)), |
52 | | last_batch_data_(last_batch_data), |
53 | | replicated_batches_(std::move(replicated_batches)), |
54 | | context_(*context), |
55 | | remove_intents_task_(&context->applier_, &context->participant_context_, context, |
56 | | metadata_.transaction_id), |
57 | | get_status_handle_(context->rpcs_.InvalidHandle()), |
58 | | abort_handle_(context->rpcs_.InvalidHandle()), |
59 | | apply_intents_task_(&context->applier_, context, &apply_data_), |
60 | | abort_check_ht_(base_time_for_abort_check_ht_calculation.AddDelta( |
61 | 968k | 1ms * FLAGS_transaction_abort_check_interval_ms)) { |
62 | 968k | } |
63 | | |
64 | 968k | RunningTransaction::~RunningTransaction() { |
65 | 968k | context_.rpcs_.Abort({&get_status_handle_, &abort_handle_}); |
66 | 968k | } |
67 | | |
68 | | void RunningTransaction::AddReplicatedBatch( |
69 | 1.10M | size_t batch_idx, boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) { |
70 | 538 | VLOG_WITH_PREFIX(4) << __func__ << "(" << batch_idx << ")"; |
71 | 1.10M | replicated_batches_.Set(batch_idx); |
72 | 1.10M | encoded_replicated_batches->push_back(docdb::ValueTypeAsChar::kBitSet); |
73 | 1.10M | replicated_batches_.EncodeTo(encoded_replicated_batches); |
74 | 1.10M | } |
75 | | |
76 | 1.26M | void RunningTransaction::BatchReplicated(const TransactionalBatchData& value) { |
77 | 116 | VLOG_WITH_PREFIX(4) << __func__ << "(" << value.ToString() << ")"; |
78 | 1.26M | last_batch_data_ = value; |
79 | 1.26M | } |
80 | | |
81 | | void RunningTransaction::SetLocalCommitData( |
82 | 737k | HybridTime time, const AbortedSubTransactionSet& aborted_subtxn_set) { |
83 | 737k | local_commit_aborted_subtxn_set_ = aborted_subtxn_set; |
84 | 737k | local_commit_time_ = time; |
85 | 737k | last_known_status_hybrid_time_ = local_commit_time_; |
86 | 737k | last_known_status_ = TransactionStatus::COMMITTED; |
87 | 737k | } |
88 | | |
89 | 0 | void RunningTransaction::Aborted() { |
90 | 0 | VLOG_WITH_PREFIX(4) << __func__ << "()"; |
91 | |
|
92 | 0 | last_known_status_ = TransactionStatus::ABORTED; |
93 | 0 | last_known_status_hybrid_time_ = HybridTime::kMax; |
94 | 0 | } |
95 | | |
96 | | void RunningTransaction::RequestStatusAt(const StatusRequest& request, |
97 | 263k | std::unique_lock<std::mutex>* lock) { |
98 | 263k | DCHECK_LE(request.global_limit_ht, HybridTime::kMax); |
99 | 263k | DCHECK_LE(request.read_ht, request.global_limit_ht); |
100 | | |
101 | 263k | if (last_known_status_hybrid_time_ > HybridTime::kMin) { |
102 | 160k | auto transaction_status = |
103 | 160k | GetStatusAt(request.global_limit_ht, last_known_status_hybrid_time_, last_known_status_); |
104 | | // If we don't have status at global_limit_ht, then we should request updated status. |
105 | 160k | if (transaction_status) { |
106 | 42.7k | HybridTime last_known_status_hybrid_time = last_known_status_hybrid_time_; |
107 | 42.7k | AbortedSubTransactionSet local_commit_aborted_subtxn_set; |
108 | 42.7k | if (transaction_status == TransactionStatus::COMMITTED) { |
109 | 13.8k | local_commit_aborted_subtxn_set = local_commit_aborted_subtxn_set_; |
110 | 13.8k | } |
111 | 42.7k | lock->unlock(); |
112 | 42.7k | request.callback(TransactionStatusResult{ |
113 | 42.7k | *transaction_status, last_known_status_hybrid_time, local_commit_aborted_subtxn_set}); |
114 | 42.7k | return; |
115 | 42.7k | } |
116 | 220k | } |
117 | 220k | bool was_empty = status_waiters_.empty(); |
118 | 220k | status_waiters_.push_back(request); |
119 | 220k | if (!was_empty) { |
120 | 18.5k | return; |
121 | 18.5k | } |
122 | 202k | auto request_id = context_.NextRequestIdUnlocked(); |
123 | 202k | auto shared_self = shared_from_this(); |
124 | | |
125 | 2 | VLOG_WITH_PREFIX(4) << Format( |
126 | 2 | "Existing status knowledge ($0, $1) does not satisfy requested: $2, sending: $3", |
127 | 2 | TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_, request, |
128 | 2 | request_id); |
129 | | |
130 | 202k | lock->unlock(); |
131 | 202k | SendStatusRequest(request_id, shared_self); |
132 | 202k | } |
133 | | |
134 | 586k | bool RunningTransaction::WasAborted() const { |
135 | 586k | return last_known_status_ == TransactionStatus::ABORTED; |
136 | 586k | } |
137 | | |
138 | 461k | CHECKED_STATUS RunningTransaction::CheckAborted() const { |
139 | 461k | if (WasAborted()) { |
140 | 0 | return MakeAbortedStatus(id()); |
141 | 0 | } |
142 | 461k | return Status::OK(); |
143 | 461k | } |
144 | | |
145 | | void RunningTransaction::Abort(client::YBClient* client, |
146 | | TransactionStatusCallback callback, |
147 | 38.7k | std::unique_lock<std::mutex>* lock) { |
148 | 38.7k | if (last_known_status_ == TransactionStatus::ABORTED || |
149 | 38.5k | last_known_status_ == TransactionStatus::COMMITTED) { |
150 | | // Transaction is already in final state, so no reason to send abort request. |
151 | 0 | VLOG_WITH_PREFIX(3) << "Abort shortcut: " << last_known_status_; |
152 | 1.08k | TransactionStatusResult status{last_known_status_, last_known_status_hybrid_time_}; |
153 | 1.08k | lock->unlock(); |
154 | 1.08k | callback(status); |
155 | 1.08k | return; |
156 | 1.08k | } |
157 | 37.6k | bool was_empty = abort_waiters_.empty(); |
158 | 37.6k | abort_waiters_.push_back(std::move(callback)); |
159 | 37.6k | lock->unlock(); |
160 | 0 | VLOG_WITH_PREFIX(3) << "Abort request: " << was_empty; |
161 | 37.6k | if (!was_empty) { |
162 | 4.22k | return; |
163 | 4.22k | } |
164 | 33.4k | tserver::AbortTransactionRequestPB req; |
165 | 33.4k | req.set_tablet_id(metadata_.status_tablet); |
166 | 33.4k | req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
167 | 33.4k | req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64()); |
168 | 33.4k | context_.rpcs_.RegisterAndStart( |
169 | 33.4k | client::AbortTransaction( |
170 | 33.4k | TransactionRpcDeadline(), |
171 | 33.4k | nullptr /* tablet */, |
172 | 33.4k | client, |
173 | 33.4k | &req, |
174 | 33.4k | std::bind(&RunningTransaction::AbortReceived, this, _1, _2, shared_from_this())), |
175 | 33.4k | &abort_handle_); |
176 | 33.4k | } |
177 | | |
178 | 0 | std::string RunningTransaction::ToString() const { |
179 | 0 | return Format("{ metadata: $0 last_batch_data: $1 replicated_batches: $2 local_commit_time: $3 " |
180 | 0 | "last_known_status: $4 last_known_status_hybrid_time: $5 }", |
181 | 0 | metadata_, last_batch_data_, replicated_batches_, local_commit_time_, |
182 | 0 | TransactionStatus_Name(last_known_status_), last_known_status_hybrid_time_); |
183 | 0 | } |
184 | | |
185 | 968k | void RunningTransaction::ScheduleRemoveIntents(const RunningTransactionPtr& shared_self) { |
186 | 968k | if (remove_intents_task_.Prepare(shared_self)) { |
187 | 967k | context_.participant_context_.StrandEnqueue(&remove_intents_task_); |
188 | 18.4E | VLOG_WITH_PREFIX(1) << "Intents should be removed asynchronously"; |
189 | 967k | } |
190 | 968k | } |
191 | | |
192 | | boost::optional<TransactionStatus> RunningTransaction::GetStatusAt( |
193 | | HybridTime time, |
194 | | HybridTime last_known_status_hybrid_time, |
195 | 381k | TransactionStatus last_known_status) { |
196 | 381k | switch (last_known_status) { |
197 | 72.1k | case TransactionStatus::ABORTED: |
198 | 72.1k | return TransactionStatus::ABORTED; |
199 | 38.2k | case TransactionStatus::COMMITTED: |
200 | 38.2k | return last_known_status_hybrid_time > time |
201 | 1.42k | ? TransactionStatus::PENDING |
202 | 36.8k | : TransactionStatus::COMMITTED; |
203 | 271k | case TransactionStatus::PENDING: |
204 | 271k | if (last_known_status_hybrid_time >= time) { |
205 | 91.9k | return TransactionStatus::PENDING; |
206 | 91.9k | } |
207 | 179k | return boost::none; |
208 | 0 | default: |
209 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, last_known_status); |
210 | 381k | } |
211 | 381k | } |
212 | | |
213 | | void RunningTransaction::SendStatusRequest( |
214 | 202k | int64_t serial_no, const RunningTransactionPtr& shared_self) { |
215 | 202k | TRACE_FUNC(); |
216 | 202k | VTRACE(1, yb::ToString(metadata_.transaction_id)); |
217 | 202k | tserver::GetTransactionStatusRequestPB req; |
218 | 202k | req.set_tablet_id(metadata_.status_tablet); |
219 | 202k | req.add_transaction_id()->assign( |
220 | 202k | pointer_cast<const char*>(metadata_.transaction_id.data()), metadata_.transaction_id.size()); |
221 | 202k | req.set_propagated_hybrid_time(context_.participant_context_.Now().ToUint64()); |
222 | 202k | context_.rpcs_.RegisterAndStart( |
223 | 202k | client::GetTransactionStatus( |
224 | 202k | TransactionRpcDeadline(), |
225 | 202k | nullptr /* tablet */, |
226 | 202k | context_.participant_context_.client_future().get(), |
227 | 202k | &req, |
228 | 202k | std::bind(&RunningTransaction::StatusReceived, this, _1, _2, serial_no, shared_self)), |
229 | 202k | &get_status_handle_); |
230 | 202k | } |
231 | | |
232 | | void RunningTransaction::StatusReceived( |
233 | | const Status& status, |
234 | | const tserver::GetTransactionStatusResponsePB& response, |
235 | | int64_t serial_no, |
236 | 202k | const RunningTransactionPtr& shared_self) { |
237 | 202k | auto delay_usec = FLAGS_TEST_transaction_delay_status_reply_usec_in_tests; |
238 | 202k | if (delay_usec > 0) { |
239 | 0 | context_.delayer().Delay( |
240 | 0 | MonoTime::Now() + MonoDelta::FromMicroseconds(delay_usec), |
241 | 0 | std::bind(&RunningTransaction::DoStatusReceived, this, status, response, |
242 | 0 | serial_no, shared_self)); |
243 | 202k | } else { |
244 | 202k | DoStatusReceived(status, response, serial_no, shared_self); |
245 | 202k | } |
246 | 202k | } |
247 | | |
248 | | bool RunningTransaction::UpdateStatus( |
249 | | TransactionStatus transaction_status, HybridTime time_of_status, |
250 | 213k | HybridTime coordinator_safe_time, AbortedSubTransactionSet aborted_subtxn_set) { |
251 | | // Check for local_commit_time_ is not required for correctness, but useful for optimization. |
252 | | // So we could avoid unnecessary actions. |
253 | 213k | if (local_commit_time_) { |
254 | 5.00k | return false; |
255 | 5.00k | } |
256 | | |
257 | 208k | if (transaction_status == TransactionStatus::COMMITTED) { |
258 | 17.1k | local_commit_aborted_subtxn_set_ = aborted_subtxn_set; |
259 | 17.1k | } |
260 | | |
261 | 208k | if (transaction_status == TransactionStatus::ABORTED && coordinator_safe_time) { |
262 | 56.1k | time_of_status = coordinator_safe_time; |
263 | 56.1k | } |
264 | 208k | last_known_status_hybrid_time_ = time_of_status; |
265 | | |
266 | 208k | if (transaction_status == last_known_status_) { |
267 | 100k | return false; |
268 | 100k | } |
269 | | |
270 | 107k | last_known_status_ = transaction_status; |
271 | | |
272 | 107k | return transaction_status == TransactionStatus::ABORTED; |
273 | 107k | } |
274 | | |
275 | | void RunningTransaction::DoStatusReceived(const Status& status, |
276 | | const tserver::GetTransactionStatusResponsePB& response, |
277 | | int64_t serial_no, |
278 | 202k | const RunningTransactionPtr& shared_self) { |
279 | 202k | TRACE("$0: $1", __func__, response.ShortDebugString()); |
280 | 68 | VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " << response.ShortDebugString() << ", " |
281 | 68 | << serial_no << ")"; |
282 | | |
283 | 202k | if (response.has_propagated_hybrid_time()) { |
284 | 202k | context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); |
285 | 202k | } |
286 | | |
287 | 202k | context_.rpcs_.Unregister(&get_status_handle_); |
288 | 202k | decltype(status_waiters_) status_waiters; |
289 | 202k | HybridTime time_of_status = HybridTime::kMin; |
290 | 202k | TransactionStatus transaction_status = TransactionStatus::PENDING; |
291 | 202k | AbortedSubTransactionSet aborted_subtxn_set; |
292 | 202k | const bool ok = status.ok(); |
293 | 202k | int64_t new_request_id = -1; |
294 | 202k | { |
295 | 202k | MinRunningNotifier min_running_notifier(&context_.applier_); |
296 | 202k | std::unique_lock<std::mutex> lock(context_.mutex_); |
297 | 202k | if (!ok) { |
298 | 0 | status_waiters_.swap(status_waiters); |
299 | 0 | lock.unlock(); |
300 | 0 | for (const auto& waiter : status_waiters) { |
301 | 0 | waiter.callback(status); |
302 | 0 | } |
303 | 0 | return; |
304 | 0 | } |
305 | | |
306 | 202k | if (response.status_hybrid_time().size() == 1 && |
307 | 202k | response.status().size() == 1 && |
308 | 202k | response.aborted_subtxn_set().size() == 1) { |
309 | 202k | auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB( |
310 | 202k | response.aborted_subtxn_set(0).set()); |
311 | 202k | if (aborted_subtxn_set_or_status.ok()) { |
312 | 202k | time_of_status = HybridTime(response.status_hybrid_time()[0]); |
313 | 202k | transaction_status = response.status(0); |
314 | 202k | aborted_subtxn_set = aborted_subtxn_set_or_status.get(); |
315 | 18.4E | } else { |
316 | 18.4E | LOG_WITH_PREFIX(DFATAL) |
317 | 18.4E | << "Could not deserialize AbortedSubTransactionSet: " |
318 | 18.4E | << "error - " << aborted_subtxn_set_or_status.status().ToString() |
319 | 18.4E | << " response - " << response.ShortDebugString(); |
320 | 18.4E | } |
321 | 18.4E | } else { |
322 | 18.4E | LOG_WITH_PREFIX(DFATAL) |
323 | 18.4E | << "Wrong number of status, status hybrid time, or aborted subtxn set entries, " |
324 | 18.4E | << "exactly one entry expected: " |
325 | 18.4E | << response.ShortDebugString(); |
326 | 18.4E | } |
327 | | |
328 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, response.coordinator_safe_time().size() > 1) |
329 | 18.4E | << "Wrong number of coordinator safe time entries, at most one expected: " |
330 | 18.4E | << response.ShortDebugString(); |
331 | 202k | auto coordinator_safe_time = response.coordinator_safe_time().size() == 1 |
332 | 153k | ? HybridTime::FromPB(response.coordinator_safe_time(0)) : HybridTime(); |
333 | 202k | auto did_abort_txn = UpdateStatus( |
334 | 202k | transaction_status, time_of_status, coordinator_safe_time, aborted_subtxn_set); |
335 | 202k | if (did_abort_txn) { |
336 | 47.7k | context_.EnqueueRemoveUnlocked(id(), RemoveReason::kStatusReceived, &min_running_notifier); |
337 | 47.7k | } |
338 | | |
339 | 202k | time_of_status = last_known_status_hybrid_time_; |
340 | 202k | transaction_status = last_known_status_; |
341 | 202k | aborted_subtxn_set = local_commit_aborted_subtxn_set_; |
342 | | |
343 | 202k | status_waiters = ExtractFinishedStatusWaitersUnlocked( |
344 | 202k | serial_no, time_of_status, transaction_status); |
345 | 202k | if (!status_waiters_.empty()) { |
346 | 1 | new_request_id = context_.NextRequestIdUnlocked(); |
347 | 0 | VLOG_WITH_PREFIX(4) << "Waiters still present, send new status request: " << new_request_id; |
348 | 1 | } |
349 | 202k | } |
350 | 202k | if (new_request_id >= 0) { |
351 | 1 | SendStatusRequest(new_request_id, shared_self); |
352 | 1 | } |
353 | 202k | NotifyWaiters(serial_no, time_of_status, transaction_status, aborted_subtxn_set, status_waiters); |
354 | 202k | } |
355 | | |
356 | | std::vector<StatusRequest> RunningTransaction::ExtractFinishedStatusWaitersUnlocked( |
357 | 202k | int64_t serial_no, HybridTime time_of_status, TransactionStatus transaction_status) { |
358 | 202k | if (transaction_status == TransactionStatus::ABORTED) { |
359 | 48.4k | return std::move(status_waiters_); |
360 | 48.4k | } |
361 | 153k | std::vector<StatusRequest> result; |
362 | 153k | result.reserve(status_waiters_.size()); |
363 | 153k | auto w = status_waiters_.begin(); |
364 | 325k | for (auto it = status_waiters_.begin(); it != status_waiters_.end(); ++it) { |
365 | 171k | if (it->serial_no <= serial_no || |
366 | 5 | GetStatusAt(it->global_limit_ht, time_of_status, transaction_status) || |
367 | 171k | time_of_status < it->read_ht) { |
368 | 171k | result.push_back(std::move(*it)); |
369 | 25 | } else { |
370 | 25 | if (w != it) { |
371 | 4 | *w = std::move(*it); |
372 | 4 | } |
373 | 25 | ++w; |
374 | 25 | } |
375 | 171k | } |
376 | 153k | status_waiters_.erase(w, status_waiters_.end()); |
377 | 153k | return result; |
378 | 153k | } |
379 | | |
380 | | void RunningTransaction::NotifyWaiters(int64_t serial_no, HybridTime time_of_status, |
381 | | TransactionStatus transaction_status, |
382 | | const AbortedSubTransactionSet& aborted_subtxn_set, |
383 | 202k | const std::vector<StatusRequest>& status_waiters) { |
384 | 220k | for (const auto& waiter : status_waiters) { |
385 | 220k | auto status_for_waiter = GetStatusAt( |
386 | 220k | waiter.global_limit_ht, time_of_status, transaction_status); |
387 | 220k | if (status_for_waiter) { |
388 | | // We know status at global_limit_ht, so could notify waiter. |
389 | 159k | auto result = TransactionStatusResult{*status_for_waiter, time_of_status}; |
390 | 159k | if (result.status == TransactionStatus::COMMITTED) { |
391 | 23.0k | result.aborted_subtxn_set = aborted_subtxn_set; |
392 | 23.0k | } |
393 | 159k | waiter.callback(std::move(result)); |
394 | 61.2k | } else if (time_of_status >= waiter.read_ht) { |
395 | | // It means that between read_ht and global_limit_ht transaction was pending. |
396 | | // It implies that transaction was not committed before request was sent. |
397 | | // We could safely respond PENDING to caller. |
398 | 0 | LOG_IF_WITH_PREFIX(DFATAL, waiter.serial_no > serial_no) |
399 | 0 | << "Notify waiter with request id greater than id of status request: " |
400 | 0 | << waiter.serial_no << " vs " << serial_no; |
401 | 16.9k | waiter.callback(TransactionStatusResult{TransactionStatus::PENDING, time_of_status}); |
402 | 44.2k | } else { |
403 | 44.2k | waiter.callback(STATUS(TryAgain, |
404 | 44.2k | Format("Cannot determine transaction status with read_ht $0, and global_limit_ht $1, " |
405 | 44.2k | "last known: $2 at $3", waiter.read_ht, waiter.global_limit_ht, |
406 | 44.2k | TransactionStatus_Name(transaction_status), time_of_status), Slice(), |
407 | 44.2k | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) )); |
408 | 44.2k | } |
409 | 220k | } |
410 | 202k | } |
411 | | |
412 | | Result<TransactionStatusResult> RunningTransaction::MakeAbortResult( |
413 | | const Status& status, |
414 | 33.4k | const tserver::AbortTransactionResponsePB& response) { |
415 | 33.4k | if (!status.ok()) { |
416 | 0 | return status; |
417 | 0 | } |
418 | | |
419 | 33.4k | HybridTime status_time = response.has_status_hybrid_time() |
420 | 25.7k | ? HybridTime(response.status_hybrid_time()) |
421 | 7.71k | : HybridTime::kInvalid; |
422 | 33.4k | return TransactionStatusResult{response.status(), status_time, AbortedSubTransactionSet()}; |
423 | 33.4k | } |
424 | | |
425 | | void RunningTransaction::AbortReceived(const Status& status, |
426 | | const tserver::AbortTransactionResponsePB& response, |
427 | 33.4k | const RunningTransactionPtr& shared_self) { |
428 | 33.4k | if (response.has_propagated_hybrid_time()) { |
429 | 33.4k | context_.participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time())); |
430 | 33.4k | } |
431 | | |
432 | 33.4k | decltype(abort_waiters_) abort_waiters; |
433 | 33.4k | auto result = MakeAbortResult(status, response); |
434 | | |
435 | 18.4E | VLOG_WITH_PREFIX(3) << "AbortReceived: " << yb::ToString(result); |
436 | | |
437 | 33.4k | { |
438 | 33.4k | MinRunningNotifier min_running_notifier(&context_.applier_); |
439 | 33.4k | std::lock_guard<std::mutex> lock(context_.mutex_); |
440 | 33.4k | context_.rpcs_.Unregister(&abort_handle_); |
441 | 33.4k | abort_waiters_.swap(abort_waiters); |
442 | | // kMax status_time means that this status is not yet replicated and could be rejected. |
443 | | // So we could use it as reply to Abort, but cannot store it as transaction status. |
444 | 33.4k | if (result.ok() && result->status_time != HybridTime::kMax) { |
445 | 10.3k | auto coordinator_safe_time = HybridTime::FromPB(response.coordinator_safe_time()); |
446 | 10.3k | if (UpdateStatus( |
447 | 7.49k | result->status, result->status_time, coordinator_safe_time, result->aborted_subtxn_set)) { |
448 | 7.49k | context_.EnqueueRemoveUnlocked(id(), RemoveReason::kAbortReceived, &min_running_notifier); |
449 | 7.49k | } |
450 | 10.3k | } |
451 | 33.4k | } |
452 | 37.6k | for (const auto& waiter : abort_waiters) { |
453 | 37.6k | waiter(result); |
454 | 37.6k | } |
455 | 33.4k | } |
456 | | |
457 | 0 | std::string RunningTransaction::LogPrefix() const { |
458 | 0 | return Format( |
459 | 0 | "$0 ID $1: ", context_.LogPrefix().substr(0, context_.LogPrefix().length() - 2), id()); |
460 | 0 | } |
461 | | |
462 | 0 | Status MakeAbortedStatus(const TransactionId& id) { |
463 | 0 | return STATUS( |
464 | 0 | TryAgain, Format("Transaction aborted: $0", id), Slice(), |
465 | 0 | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)); |
466 | 0 | } |
467 | | |
468 | | void RunningTransaction::SetApplyData(const docdb::ApplyTransactionState& apply_state, |
469 | | const TransactionApplyData* data, |
470 | 2 | ScopedRWOperation* operation) { |
471 | | // TODO(savepoints): Add test to ensure that apply_state.aborted is properly set here. |
472 | 2 | apply_state_ = apply_state; |
473 | 2 | bool active = apply_state_.active(); |
474 | 2 | if (active) { |
475 | | // We are trying to assign set processing apply before starting actual process, and unset |
476 | | // after we complete processing. |
477 | 1 | processing_apply_.store(true, std::memory_order_release); |
478 | 1 | } |
479 | | |
480 | 2 | if (data) { |
481 | 1 | if (!active) { |
482 | 0 | LOG_WITH_PREFIX(DFATAL) |
483 | 0 | << "Starting processing apply, but provided data in inactive state: " << data->ToString(); |
484 | 0 | return; |
485 | 0 | } |
486 | | |
487 | 1 | apply_data_ = *data; |
488 | 1 | apply_data_.apply_state = &apply_state_; |
489 | | |
490 | 0 | LOG_IF_WITH_PREFIX(DFATAL, local_commit_time_ != data->commit_ht) |
491 | 0 | << "Commit time does not match: " << local_commit_time_ << " vs " << data->commit_ht; |
492 | | |
493 | 1 | if (apply_intents_task_.Prepare(shared_from_this(), operation)) { |
494 | 1 | context_.participant_context_.StrandEnqueue(&apply_intents_task_); |
495 | 0 | } else { |
496 | 0 | LOG_WITH_PREFIX(DFATAL) << "Unable to prepare apply intents task"; |
497 | 0 | } |
498 | 1 | } |
499 | | |
500 | 2 | if (!active) { |
501 | 1 | processing_apply_.store(false, std::memory_order_release); |
502 | | |
503 | 0 | VLOG_WITH_PREFIX(3) << "Finished applying intents"; |
504 | | |
505 | 1 | MinRunningNotifier min_running_notifier(&context_.applier_); |
506 | 1 | std::lock_guard<std::mutex> lock(context_.mutex_); |
507 | 1 | context_.RemoveUnlocked(id(), RemoveReason::kLargeApplied, &min_running_notifier); |
508 | 1 | } |
509 | 2 | } |
510 | | |
511 | 642k | void RunningTransaction::SetOpId(const OpId& id) { |
512 | 642k | opId.index = id.index; |
513 | 642k | opId.term = id.term; |
514 | 642k | } |
515 | | |
516 | 1.41M | bool RunningTransaction::ProcessingApply() const { |
517 | 1.41M | return processing_apply_.load(std::memory_order_acquire); |
518 | 1.41M | } |
519 | | |
520 | 862 | void RunningTransaction::UpdateAbortCheckHT(HybridTime now, UpdateAbortCheckHTMode mode) { |
521 | 862 | if (last_known_status_ == TransactionStatus::ABORTED || |
522 | 855 | last_known_status_ == TransactionStatus::COMMITTED) { |
523 | 84 | abort_check_ht_ = HybridTime::kMax; |
524 | 84 | return; |
525 | 84 | } |
526 | | // When we send a status request, we schedule the transaction status to be re-checked around the |
527 | | // same time the request is supposed to time out. When we get a status response (normal case, no |
528 | | // timeout), we go back to the normal interval of re-checking the status of this transaction. |
529 | 778 | auto delta_ms = mode == UpdateAbortCheckHTMode::kStatusRequestSent |
530 | 382 | ? FLAGS_transaction_abort_check_timeout_ms |
531 | 396 | : FLAGS_transaction_abort_check_interval_ms; |
532 | 778 | abort_check_ht_ = now.AddDelta(1ms * delta_ms); |
533 | 778 | } |
534 | | |
535 | | } // namespace tablet |
536 | | } // namespace yb |