/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_coordinator.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/tablet/transaction_coordinator.h" |
17 | | |
18 | | #include <boost/multi_index/hashed_index.hpp> |
19 | | #include <boost/multi_index/mem_fun.hpp> |
20 | | #include <boost/multi_index/ordered_index.hpp> |
21 | | #include <boost/multi_index_container.hpp> |
22 | | |
23 | | #include "yb/client/client.h" |
24 | | #include "yb/client/transaction_rpc.h" |
25 | | |
26 | | #include "yb/common/common.pb.h" |
27 | | #include "yb/common/entity_ids.h" |
28 | | #include "yb/common/pgsql_error.h" |
29 | | #include "yb/common/transaction.h" |
30 | | #include "yb/common/transaction_error.h" |
31 | | #include "yb/common/wire_protocol.h" |
32 | | |
33 | | #include "yb/consensus/consensus_round.h" |
34 | | #include "yb/consensus/consensus_util.h" |
35 | | |
36 | | #include "yb/docdb/transaction_dump.h" |
37 | | |
38 | | #include "yb/rpc/messenger.h" |
39 | | #include "yb/rpc/poller.h" |
40 | | #include "yb/rpc/rpc.h" |
41 | | |
42 | | #include "yb/server/clock.h" |
43 | | |
44 | | #include "yb/tablet/operations/update_txn_operation.h" |
45 | | |
46 | | #include "yb/tserver/tserver_service.pb.h" |
47 | | |
48 | | #include "yb/util/atomic.h" |
49 | | #include "yb/util/countdown_latch.h" |
50 | | #include "yb/util/enums.h" |
51 | | #include "yb/util/flag_tags.h" |
52 | | #include "yb/util/format.h" |
53 | | #include "yb/util/logging.h" |
54 | | #include "yb/util/metrics.h" |
55 | | #include "yb/util/random_util.h" |
56 | | #include "yb/util/result.h" |
57 | | #include "yb/util/scope_exit.h" |
58 | | #include "yb/util/status_format.h" |
59 | | #include "yb/util/status_log.h" |
60 | | #include "yb/util/tsan_util.h" |
61 | | #include "yb/util/yb_pg_errcodes.h" |
62 | | |
63 | | DECLARE_uint64(transaction_heartbeat_usec); |
64 | | DEFINE_double(transaction_max_missed_heartbeat_periods, 10.0, |
65 | | "Maximum heartbeat periods that a pending transaction can miss before the " |
66 | | "transaction coordinator expires the transaction. The total expiration time in " |
67 | | "microseconds is transaction_heartbeat_usec times " |
68 | | "transaction_max_missed_heartbeat_periods. The value passed to this flag may be " |
69 | | "fractional."); |
70 | | DEFINE_uint64(transaction_check_interval_usec, 500000, "Transaction check interval in usec."); |
71 | | DEFINE_uint64(transaction_resend_applying_interval_usec, 5000000, |
72 | | "Transaction resend applying interval in usec."); |
73 | | |
74 | | DEFINE_int64(avoid_abort_after_sealing_ms, 20, |
75 | | "If transaction was only sealed, we will try to abort it not earlier than this " |
76 | | "period in milliseconds."); |
77 | | |
78 | | DEFINE_test_flag(uint64, inject_txn_get_status_delay_ms, 0, |
79 | | "Inject specified delay to transaction get status requests."); |
80 | | DEFINE_test_flag(int64, inject_random_delay_on_txn_status_response_ms, 0, |
81 | | "Inject a random amount of delay to the thread processing a " |
82 | | "GetTransactionStatusRequest after it has populated it's response. This could " |
83 | | "help simulate e.g. out-of-order responses where PENDING is received by client " |
84 | | "after a COMMITTED response."); |
85 | | |
86 | | using namespace std::literals; |
87 | | using namespace std::placeholders; |
88 | | |
89 | | namespace yb { |
90 | | namespace tablet { |
91 | | |
92 | 859k | std::chrono::microseconds GetTransactionTimeout() { |
93 | 859k | const double timeout = GetAtomicFlag(&FLAGS_transaction_max_missed_heartbeat_periods) * |
94 | 859k | GetAtomicFlag(&FLAGS_transaction_heartbeat_usec); |
95 | | // Cast to avoid -Wimplicit-int-float-conversion. |
96 | 859k | return timeout >= static_cast<double>(std::chrono::microseconds::max().count()) |
97 | 859k | ? std::chrono::microseconds::max()0 |
98 | 859k | : std::chrono::microseconds(static_cast<int64_t>(timeout)); |
99 | 859k | } |
100 | | |
101 | | namespace { |
102 | | |
103 | | struct NotifyApplyingData { |
104 | | TabletId tablet; |
105 | | TransactionId transaction; |
106 | | const AbortedSubTransactionSetPB& aborted; |
107 | | HybridTime commit_time; |
108 | | bool sealed; |
109 | | |
110 | 0 | std::string ToString() const { |
111 | 0 | return Format("{ tablet: $0 transaction: $1 commit_time: $2 sealed: $3}", |
112 | 0 | tablet, transaction, commit_time, sealed); |
113 | 0 | } |
114 | | }; |
115 | | |
116 | | struct ExpectedTabletBatches { |
117 | | TabletId tablet; |
118 | | size_t batches; |
119 | | |
120 | 0 | std::string ToString() const { |
121 | 0 | return Format("{ tablet: $0 batches: $1 }", tablet, batches); |
122 | 0 | } |
123 | | }; |
124 | | |
125 | | // Context for transaction state. I.e. access to external facilities required by |
126 | | // transaction state to do its job. |
127 | | class TransactionStateContext { |
128 | | public: |
129 | | virtual TransactionCoordinatorContext& coordinator_context() = 0; |
130 | | |
131 | | virtual void NotifyApplying(NotifyApplyingData data) = 0; |
132 | | |
133 | | // Submits update transaction to the RAFT log. Returns false if was not able to submit. |
134 | | virtual MUST_USE_RESULT bool SubmitUpdateTransaction( |
135 | | std::unique_ptr<UpdateTxnOperation> operation) = 0; |
136 | | |
137 | | virtual void CompleteWithStatus( |
138 | | std::unique_ptr<UpdateTxnOperation> request, Status status) = 0; |
139 | | |
140 | | virtual void CompleteWithStatus(UpdateTxnOperation* request, Status status) = 0; |
141 | | |
142 | | virtual bool leader() const = 0; |
143 | | |
144 | | protected: |
145 | 432 | ~TransactionStateContext() {} |
146 | | }; |
147 | | |
148 | 1.22M | std::string BuildLogPrefix(const std::string& parent_log_prefix, const TransactionId& id) { |
149 | 1.22M | auto id_string = id.ToString(); |
150 | 1.22M | return parent_log_prefix.substr(0, parent_log_prefix.length() - 2) + " ID " + id_string + ": "; |
151 | 1.22M | } |
152 | | |
153 | | // TransactionState keeps state of single transaction. |
154 | | // User of this class should guarantee that it does NOT invoke methods concurrently. |
155 | | class TransactionState { |
156 | | public: |
157 | | explicit TransactionState(TransactionStateContext* context, |
158 | | const TransactionId& id, |
159 | | HybridTime last_touch, |
160 | | const std::string& parent_log_prefix) |
161 | | : context_(*context), |
162 | | id_(id), |
163 | | log_prefix_(BuildLogPrefix(parent_log_prefix, id)), |
164 | 1.22M | last_touch_(last_touch) { |
165 | 1.22M | } |
166 | | |
167 | 1.21M | ~TransactionState() { |
168 | 1.21M | DCHECK(abort_waiters_.empty()); |
169 | 1.21M | DCHECK(request_queue_.empty()); |
170 | 1.21M | DCHECK(replicating_ == nullptr) << Format("Replicating: $0", static_cast<void*>(replicating_))64 ; |
171 | 1.21M | } |
172 | | |
173 | | // Id of transaction. |
174 | 16.8M | const TransactionId& id() const { |
175 | 16.8M | return id_; |
176 | 16.8M | } |
177 | | |
178 | | // Time when we last heard from transaction. I.e. hybrid time of replicated raft log entry |
179 | | // that updates status of this transaction. |
180 | 13.5M | HybridTime last_touch() const { |
181 | 13.5M | return last_touch_; |
182 | 13.5M | } |
183 | | |
184 | | // Status of transaction. |
185 | 1.50k | TransactionStatus status() const { |
186 | 1.50k | return status_; |
187 | 1.50k | } |
188 | | |
189 | | // RAFT index of first RAFT log entry required by this transaction. |
190 | 14.8M | int64_t first_entry_raft_index() const { |
191 | 14.8M | return first_entry_raft_index_; |
192 | 14.8M | } |
193 | | |
194 | 1.21M | std::string ToString() const { |
195 | 1.21M | return Format("{ id: $0 last_touch: $1 status: $2 involved_tablets: $3 replicating: $4 " |
196 | 1.21M | " request_queue: $5 first_entry_raft_index: $6 }", |
197 | 1.21M | id_, last_touch_, TransactionStatus_Name(status_), |
198 | 1.21M | involved_tablets_, replicating_, request_queue_, first_entry_raft_index_); |
199 | 1.21M | } |
200 | | |
201 | | // Whether this transaction expired at specified time. |
202 | 868k | bool ExpiredAt(HybridTime now) const { |
203 | 868k | if (ShouldBeCommitted() || ShouldBeInStatus(TransactionStatus::SEALED)858k ) { |
204 | 9.00k | return false; |
205 | 9.00k | } |
206 | 858k | const int64_t passed = now.GetPhysicalValueMicros() - last_touch_.GetPhysicalValueMicros(); |
207 | 858k | if (std::chrono::microseconds(passed) > GetTransactionTimeout()) { |
208 | 49.9k | return true; |
209 | 49.9k | } |
210 | 809k | return false; |
211 | 858k | } |
212 | | |
213 | | // Whether this transaction has completed. |
214 | 3.19M | bool Completed() const { |
215 | 3.19M | return status_ == TransactionStatus::ABORTED || |
216 | 3.19M | status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS2.56M ; |
217 | 3.19M | } |
218 | | |
219 | | // Applies new state to transaction. |
220 | 3.19M | CHECKED_STATUS ProcessReplicated(const TransactionCoordinator::ReplicatedData& data) { |
221 | 3.19M | VLOG_WITH_PREFIX292 (4) |
222 | 292 | << Format("ProcessReplicated: $0, replicating: $1", data, replicating_); |
223 | | |
224 | 3.19M | if (replicating_ != nullptr) { |
225 | 1.06M | auto replicating_op_id = replicating_->consensus_round()->id(); |
226 | 1.06M | if (!replicating_op_id.empty()1.06M ) { |
227 | 1.06M | if (replicating_op_id != data.op_id) { |
228 | 0 | LOG_WITH_PREFIX(DFATAL) |
229 | 0 | << "Replicated unexpected operation, replicating: " << AsString(replicating_) |
230 | 0 | << ", replicated: " << AsString(data); |
231 | 0 | } |
232 | 18.4E | } else if (data.leader_term != OpId::kUnknownTerm) { |
233 | 0 | LOG_WITH_PREFIX(DFATAL) |
234 | 0 | << "Leader replicated operation without op id, replicating: " << AsString(replicating_) |
235 | 0 | << ", replicated: " << AsString(data); |
236 | 18.4E | } else { |
237 | 18.4E | LOG_WITH_PREFIX(INFO) << "Cancel replicating without id: " << AsString(replicating_) |
238 | 18.4E | << ", because " << AsString(data) << " was replicated"; |
239 | 18.4E | } |
240 | 1.06M | replicating_ = nullptr; |
241 | 1.06M | } |
242 | | |
243 | 3.19M | auto status = DoProcessReplicated(data); |
244 | | |
245 | 3.19M | if (data.leader_term == OpId::kUnknownTerm) { |
246 | 2.12M | ClearRequests(STATUS(IllegalState, "Leader changed")); |
247 | 2.12M | } else { |
248 | 1.06M | switch(status_) { |
249 | 198k | case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS: |
250 | 198k | ClearRequests(STATUS(AlreadyPresent, "Transaction committed")); |
251 | 198k | break; |
252 | 206k | case TransactionStatus::ABORTED: |
253 | 206k | ClearRequests( |
254 | 206k | STATUS(Expired, "Transaction aborted", |
255 | 206k | TransactionError(TransactionErrorCode::kAborted))); |
256 | 206k | break; |
257 | 0 | case TransactionStatus::CREATED: FALLTHROUGH_INTENDED; |
258 | 462k | case TransactionStatus::PENDING: FALLTHROUGH_INTENDED; |
259 | 462k | case TransactionStatus::SEALED: FALLTHROUGH_INTENDED; |
260 | 661k | case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED; |
261 | 661k | case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED; |
262 | 661k | case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED; |
263 | 661k | case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED; |
264 | 661k | case TransactionStatus::GRACEFUL_CLEANUP: |
265 | 661k | ProcessQueue(); |
266 | 661k | break; |
267 | 1.06M | } |
268 | 1.06M | } |
269 | | |
270 | 3.18M | return status; |
271 | 3.19M | } |
272 | | |
273 | 0 | void ProcessAborted(const TransactionCoordinator::AbortedData& data) { |
274 | 0 | VLOG_WITH_PREFIX(4) << Format("ProcessAborted: $0, replicating: $1", data.state, replicating_); |
275 | |
|
276 | 0 | LOG_IF(DFATAL, |
277 | 0 | replicating_ != nullptr && !replicating_->op_id().empty() && |
278 | 0 | replicating_->op_id() != data.op_id) |
279 | 0 | << "Aborted wrong operation, expected " << AsString(replicating_) << ", but " |
280 | 0 | << AsString(data) << " aborted"; |
281 | |
|
282 | 0 | replicating_ = nullptr; |
283 | | |
284 | | // We are not leader, so could abort all queued requests. |
285 | 0 | ClearRequests(STATUS(Aborted, "Replication failed")); |
286 | 0 | } |
287 | | |
288 | | // Clear requests of this transaction. |
289 | 3.74M | void ClearRequests(const Status& status) { |
290 | 3.74M | VLOG_WITH_PREFIX121 (4) << Format("ClearRequests: $0, replicating: $1", status, replicating_)121 ; |
291 | 3.74M | if (replicating_ != nullptr) { |
292 | 0 | context_.CompleteWithStatus(replicating_, status); |
293 | 0 | replicating_ = nullptr; |
294 | 0 | } |
295 | | |
296 | 3.74M | for (auto& entry : request_queue_) { |
297 | 6.40k | context_.CompleteWithStatus(std::move(entry), status); |
298 | 6.40k | } |
299 | 3.74M | request_queue_.clear(); |
300 | | |
301 | 3.74M | NotifyAbortWaiters(status); |
302 | 3.74M | } |
303 | | |
304 | | // Used only during transaction sealing. |
305 | 0 | void ReplicatedAllBatchesAt(const TabletId& tablet, HybridTime last_time) { |
306 | 0 | auto it = involved_tablets_.find(tablet); |
307 | | // We could be notified several times, so avoid double handling. |
308 | 0 | if (it == involved_tablets_.end() || it->second.all_batches_replicated) { |
309 | 0 | return; |
310 | 0 | } |
311 | | |
312 | | // If transaction was sealed, then its commit time is max of seal record time and intent |
313 | | // replication times from all participating tablets. |
314 | 0 | commit_time_ = std::max(commit_time_, last_time); |
315 | 0 | --tablets_with_not_replicated_batches_; |
316 | 0 | it->second.all_batches_replicated = true; |
317 | |
|
318 | 0 | if (tablets_with_not_replicated_batches_ == 0) { |
319 | 0 | StartApply(); |
320 | 0 | } |
321 | 0 | } |
322 | | |
323 | 35.2k | const AbortedSubTransactionSetPB& GetAbortedSubTransactionSetPB() const { return aborted_; } |
324 | | |
325 | | Result<TransactionStatusResult> GetStatus( |
326 | 233k | std::vector<ExpectedTabletBatches>* expected_tablet_batches) const { |
327 | 233k | switch (status_) { |
328 | 35.2k | case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED; |
329 | 35.2k | case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS: |
330 | 35.2k | return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_}; |
331 | 0 | case TransactionStatus::SEALED: |
332 | 0 | if (tablets_with_not_replicated_batches_ == 0) { |
333 | 0 | return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_}; |
334 | 0 | } |
335 | 0 | FillExpectedTabletBatches(expected_tablet_batches); |
336 | 0 | return TransactionStatusResult{TransactionStatus::SEALED, commit_time_}; |
337 | 0 | case TransactionStatus::ABORTED: |
338 | 0 | return TransactionStatusResult{TransactionStatus::ABORTED, HybridTime::kMax}; |
339 | 197k | case TransactionStatus::PENDING: { |
340 | 197k | HybridTime status_ht; |
341 | 197k | if (replicating_) { |
342 | 49.8k | auto replicating_status = replicating_->request()->status(); |
343 | 49.8k | if (replicating_status == TransactionStatus::COMMITTED || |
344 | 49.8k | replicating_status == TransactionStatus::ABORTED8.31k ) { |
345 | 48.2k | auto replicating_ht = replicating_->hybrid_time_even_if_unset(); |
346 | 48.2k | if (replicating_ht.is_valid()) { |
347 | 42.8k | status_ht = replicating_ht; |
348 | 42.8k | } else { |
349 | | // Hybrid time now yet assigned to replicating, so assign more conservative time, |
350 | | // that is guaranteed to be less then replicating time. See GH #9981. |
351 | 5.41k | status_ht = replicating_submit_time_; |
352 | 5.41k | } |
353 | 48.2k | } |
354 | 49.8k | } |
355 | 197k | if (!status_ht) { |
356 | 149k | status_ht = context_.coordinator_context().clock().Now(); |
357 | 149k | } |
358 | 197k | status_ht = std::min(status_ht, context_.coordinator_context().HtLeaseExpiration()); |
359 | 197k | return TransactionStatusResult{TransactionStatus::PENDING, status_ht.Decremented()}; |
360 | 0 | } |
361 | 0 | case TransactionStatus::CREATED: FALLTHROUGH_INTENDED; |
362 | 0 | case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED; |
363 | 0 | case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED; |
364 | 0 | case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED; |
365 | 0 | case TransactionStatus::GRACEFUL_CLEANUP: |
366 | 0 | return STATUS_FORMAT(Corruption, "Transaction has unexpected status: $0", |
367 | 233k | TransactionStatus_Name(status_)); |
368 | 233k | } |
369 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, status_); |
370 | 0 | } |
371 | | |
372 | 0 | void Aborted() { |
373 | 0 | status_ = TransactionStatus::ABORTED; |
374 | 0 | NotifyAbortWaiters(TransactionStatusResult::Aborted()); |
375 | 0 | } |
376 | | |
377 | 183k | TransactionStatusResult Abort(TransactionAbortCallback* callback) { |
378 | 183k | if (status_ == TransactionStatus::COMMITTED || |
379 | 183k | status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS177k ) { |
380 | 5.88k | return TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_); |
381 | 177k | } else if (ShouldBeCommitted()) { |
382 | 16.6k | return TransactionStatusResult(TransactionStatus::COMMITTED, HybridTime::kMax); |
383 | 160k | } else if (status_ == TransactionStatus::ABORTED) { |
384 | 0 | return TransactionStatusResult::Aborted(); |
385 | 160k | } else { |
386 | 160k | VLOG_WITH_PREFIX25 (1) << "External abort request"25 ; |
387 | 160k | CHECK_EQ(TransactionStatus::PENDING, status_); |
388 | 160k | abort_waiters_.emplace_back(std::move(*callback)); |
389 | 160k | Abort(); |
390 | 160k | return TransactionStatusResult(TransactionStatus::PENDING, HybridTime::kMax); |
391 | 160k | } |
392 | 183k | } |
393 | | |
394 | 1.10M | void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request) { |
395 | 1.10M | auto& state = *request->request(); |
396 | 1.10M | VLOG_WITH_PREFIX70 (1) << "Handle: " << state.ShortDebugString()70 ; |
397 | 1.10M | if (state.status() == TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS) { |
398 | 436k | auto status = AppliedInOneOfInvolvedTablets(state); |
399 | 436k | context_.CompleteWithStatus(std::move(request), status); |
400 | 436k | return; |
401 | 436k | } |
402 | 669k | if (replicating_) { |
403 | 6.85k | request_queue_.push_back(std::move(request)); |
404 | 6.85k | return; |
405 | 6.85k | } |
406 | 662k | DoHandle(std::move(request)); |
407 | 662k | } |
408 | | |
409 | | // Aborts this transaction. |
410 | 209k | void Abort() { |
411 | 209k | if (ShouldBeCommitted()) { |
412 | 0 | LOG_WITH_PREFIX(DFATAL) << "Transaction abort in wrong state: " << status_; |
413 | 0 | return; |
414 | 0 | } |
415 | 209k | if (ShouldBeAborted()) { |
416 | 861 | return; |
417 | 861 | } |
418 | 208k | if (status_ != TransactionStatus::PENDING) { |
419 | 0 | LOG_WITH_PREFIX(DFATAL) << "Unexpected status during abort: " |
420 | 0 | << TransactionStatus_Name(status_); |
421 | 0 | return; |
422 | 0 | } |
423 | 208k | SubmitUpdateStatus(TransactionStatus::ABORTED); |
424 | 208k | } |
425 | | |
426 | | // Returns logs prefix for this transaction. |
427 | 4 | const std::string& LogPrefix() { |
428 | 4 | return log_prefix_; |
429 | 4 | } |
430 | | |
431 | | // now_physical is just optimization to avoid querying the current time multiple times. |
432 | 450k | void Poll(bool leader, MonoTime now_physical) { |
433 | 450k | if (status_ != TransactionStatus::COMMITTED && |
434 | 450k | (432k status_ != TransactionStatus::SEALED432k || tablets_with_not_replicated_batches_ != 00 )) { |
435 | 432k | return; |
436 | 432k | } |
437 | 17.2k | if (tablets_with_not_applied_intents_ == 0) { |
438 | 1.54k | if (leader && !ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS)) { |
439 | 21 | SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS); |
440 | 21 | } |
441 | 15.6k | } else if (now_physical >= resend_applying_time_) { |
442 | 12 | if (leader) { |
443 | 27 | for (auto& tablet : involved_tablets_) { |
444 | 27 | if (!tablet.second.all_intents_applied) { |
445 | 22 | context_.NotifyApplying({ |
446 | 22 | .tablet = tablet.first, |
447 | 22 | .transaction = id_, |
448 | 22 | .aborted = aborted_, |
449 | 22 | .commit_time = commit_time_, |
450 | 22 | .sealed = status_ == TransactionStatus::SEALED }); |
451 | 22 | } |
452 | 27 | } |
453 | 5 | } |
454 | 12 | resend_applying_time_ = now_physical + |
455 | 12 | std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec); |
456 | 12 | } |
457 | 17.2k | } |
458 | | |
459 | | void AddInvolvedTablets( |
460 | 0 | const TabletId& source_tablet_id, const std::vector<TabletId>& tablet_ids) { |
461 | 0 | auto source_it = involved_tablets_.find(source_tablet_id); |
462 | 0 | if (source_it == involved_tablets_.end()) { |
463 | 0 | LOG(FATAL) << "Unknown involved tablet: " << source_tablet_id; |
464 | 0 | return; |
465 | 0 | } |
466 | 0 | for (const auto& tablet_id : tablet_ids) { |
467 | 0 | if (involved_tablets_.emplace(tablet_id, source_it->second).second) { |
468 | 0 | ++tablets_with_not_applied_intents_; |
469 | 0 | } |
470 | 0 | } |
471 | 0 | if (!source_it->second.all_intents_applied) { |
472 | | // Mark source tablet as if intents have been applied for it. |
473 | 0 | --tablets_with_not_applied_intents_; |
474 | 0 | source_it->second.all_intents_applied = true; |
475 | 0 | } |
476 | 0 | } |
477 | | |
478 | 436k | CHECKED_STATUS AppliedInOneOfInvolvedTablets(const TransactionStatePB& state) { |
479 | 436k | if (status_ != TransactionStatus::COMMITTED && status_ != TransactionStatus::SEALED0 ) { |
480 | | // We could ignore this request, because it will be re-send if required. |
481 | 0 | LOG_WITH_PREFIX(DFATAL) |
482 | 0 | << "AppliedInOneOfInvolvedTablets in wrong state: " << TransactionStatus_Name(status_) |
483 | 0 | << ", request: " << state.ShortDebugString(); |
484 | 0 | return Status::OK(); |
485 | 0 | } |
486 | | |
487 | 436k | if (state.tablets_size() != 1) { |
488 | 0 | return STATUS_FORMAT( |
489 | 0 | InvalidArgument, "Expected exactly one tablet in $0: $1", __func__, state); |
490 | 0 | } |
491 | | |
492 | 436k | auto it = involved_tablets_.find(state.tablets(0)); |
493 | 436k | if (it == involved_tablets_.end()) { |
494 | | // This can happen when transaction coordinator retried apply to post-split tablets, |
495 | | // transaction coordinator moved to new status tablet leader and here new transaction |
496 | | // coordinator receives notification about txn is applied in post-split tablet not yet known |
497 | | // to new transaction coordinator. |
498 | | // It is safe to just log warning and ignore, because new transaction coordinator is sending |
499 | | // again apply requests to all involved tablet it knows and will be retrying for ones that |
500 | | // will reply have been already split. |
501 | 0 | LOG_WITH_PREFIX(WARNING) << "Applied in unknown tablet: " << state.tablets(0); |
502 | 0 | return Status::OK(); |
503 | 0 | } |
504 | 436k | if (!it->second.all_intents_applied) { |
505 | 436k | --tablets_with_not_applied_intents_; |
506 | 436k | it->second.all_intents_applied = true; |
507 | 436k | VLOG_WITH_PREFIX9 (4) << "Applied to " << state.tablets(0) << ", left not applied: " |
508 | 9 | << tablets_with_not_applied_intents_; |
509 | 436k | if (tablets_with_not_applied_intents_ == 0) { |
510 | 198k | SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS); |
511 | 198k | } |
512 | 436k | } |
513 | 436k | return Status::OK(); |
514 | 436k | } |
515 | | |
516 | | private: |
517 | | // Checks whether we in specified status or going to be in this status when replication is |
518 | | // finished. |
519 | 3.55M | bool ShouldBeInStatus(TransactionStatus status) const { |
520 | 3.55M | if (status_ == status) { |
521 | 8.05k | return true; |
522 | 8.05k | } |
523 | 3.54M | if (replicating_) { |
524 | 37.4k | if (replicating_->request()->status() == status) { |
525 | 19.1k | return true; |
526 | 19.1k | } |
527 | | |
528 | 18.3k | for (const auto& entry : request_queue_) { |
529 | 1.57k | if (entry->request()->status() == status) { |
530 | 909 | return true; |
531 | 909 | } |
532 | 1.57k | } |
533 | 18.3k | } |
534 | | |
535 | 3.52M | return false; |
536 | 3.54M | } |
537 | | |
538 | 1.25M | bool ShouldBeCommitted() const { |
539 | 1.25M | return ShouldBeInStatus(TransactionStatus::COMMITTED) || |
540 | 1.25M | ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS)1.22M ; |
541 | 1.25M | } |
542 | | |
543 | 209k | bool ShouldBeAborted() const { |
544 | 209k | return ShouldBeInStatus(TransactionStatus::ABORTED); |
545 | 209k | } |
546 | | |
547 | | // Process operation that was replicated in RAFT. |
548 | 3.19M | CHECKED_STATUS DoProcessReplicated(const TransactionCoordinator::ReplicatedData& data) { |
549 | 3.19M | switch (data.state.status()) { |
550 | 620k | case TransactionStatus::ABORTED: |
551 | 620k | return AbortedReplicationFinished(data); |
552 | 0 | case TransactionStatus::SEALED: |
553 | 0 | return SealedReplicationFinished(data); |
554 | 592k | case TransactionStatus::COMMITTED: |
555 | 592k | return CommittedReplicationFinished(data); |
556 | 1.22M | case TransactionStatus::CREATED: FALLTHROUGH_INTENDED; |
557 | 1.38M | case TransactionStatus::PENDING: |
558 | 1.38M | return PendingReplicationFinished(data); |
559 | 0 | case TransactionStatus::APPLYING: |
560 | | // APPLYING is handled separately, because it is received for transactions not managed by |
561 | | // this tablet as a transaction status tablet, but tablets that are involved in the data |
562 | | // path (receive write intents) for this transactions |
563 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
564 | 0 | case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: |
565 | | // APPLIED_IN_ONE_OF_INVOLVED_TABLETS handled w/o use of RAFT log |
566 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
567 | 592k | case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS: |
568 | 592k | return AppliedInAllInvolvedTabletsReplicationFinished(data); |
569 | 0 | case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED; |
570 | 0 | case TransactionStatus::GRACEFUL_CLEANUP: |
571 | | // CLEANUP is handled separately, because it is received for transactions not managed by |
572 | | // this tablet as a transaction status tablet, but tablets that are involved in the data |
573 | | // path (receive write intents) for this transactions |
574 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
575 | 3.19M | } |
576 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
577 | 0 | } |
578 | | |
579 | 664k | void DoHandle(std::unique_ptr<tablet::UpdateTxnOperation> request) { |
580 | 664k | const auto& state = *request->request(); |
581 | | |
582 | 664k | Status status; |
583 | 664k | auto txn_status = state.status(); |
584 | 664k | if (txn_status == TransactionStatus::COMMITTED) { |
585 | 198k | status = HandleCommit(); |
586 | 466k | } else if (txn_status == TransactionStatus::PENDING || |
587 | 466k | txn_status == TransactionStatus::CREATED412k ) { |
588 | | // Handling txn_status of CREATED when the current status (status_) is PENDING is only |
589 | | // allowed for backward compatibility with versions prior to D11210, which could send |
590 | | // transaction creation retries with the same id. |
591 | 464k | if (status_ != TransactionStatus::PENDING) { |
592 | 4 | status = STATUS_FORMAT(IllegalState, |
593 | 4 | "Transaction in wrong state during heartbeat: $0", |
594 | 4 | TransactionStatus_Name(status_)); |
595 | 4 | } |
596 | 464k | } |
597 | | |
598 | 664k | if (!status.ok()) { |
599 | 4 | context_.CompleteWithStatus(std::move(request), std::move(status)); |
600 | 4 | return; |
601 | 4 | } |
602 | | |
603 | 664k | VLOG_WITH_PREFIX491 (4) << Format("DoHandle, replicating = $0", replicating_)491 ; |
604 | 664k | auto submitted = SubmitRequest(std::move(request)); |
605 | | // Should always succeed, since we execute this code only on the leader. |
606 | 664k | CHECK(submitted) << "Status: " << TransactionStatus_Name(txn_status)143 ; |
607 | 664k | } |
608 | | |
609 | 198k | CHECKED_STATUS HandleCommit() { |
610 | 198k | auto hybrid_time = context_.coordinator_context().clock().Now(); |
611 | 198k | if (ExpiredAt(hybrid_time)) { |
612 | 0 | auto status = STATUS(Expired, "Commit of expired transaction"); |
613 | 0 | VLOG_WITH_PREFIX(4) << status; |
614 | 0 | Abort(); |
615 | 0 | return status; |
616 | 0 | } |
617 | 198k | if (status_ != TransactionStatus::PENDING) { |
618 | 0 | return STATUS_FORMAT(IllegalState, |
619 | 0 | "Transaction in wrong state when starting to commit: $0", |
620 | 0 | TransactionStatus_Name(status_)); |
621 | 0 | } |
622 | | |
623 | 198k | return Status::OK(); |
624 | 198k | } |
625 | | |
626 | 406k | void SubmitUpdateStatus(TransactionStatus status) { |
627 | 406k | VLOG_WITH_PREFIX58 (4) << "SubmitUpdateStatus(" << TransactionStatus_Name(status) << ")"58 ; |
628 | | |
629 | 406k | TransactionStatePB state; |
630 | 406k | state.set_transaction_id(id_.data(), id_.size()); |
631 | 406k | state.set_status(status); |
632 | | |
633 | 406k | auto request = context_.coordinator_context().CreateUpdateTransaction(&state); |
634 | 406k | if (replicating_) { |
635 | 1.72k | request_queue_.push_back(std::move(request)); |
636 | 405k | } else { |
637 | 405k | SubmitRequest(std::move(request)); |
638 | 405k | } |
639 | 406k | } |
640 | | |
641 | 1.06M | bool SubmitRequest(std::unique_ptr<tablet::UpdateTxnOperation> request) { |
642 | 1.06M | replicating_ = request.get(); |
643 | 1.06M | replicating_submit_time_ = context_.coordinator_context().clock().Now(); |
644 | 18.4E | VLOG_WITH_PREFIX(4) << Format("SubmitUpdateStatus, replicating = $0", replicating_); |
645 | 1.06M | if (!context_.SubmitUpdateTransaction(std::move(request))) { |
646 | | // Was not able to submit update transaction, for instance we are not leader. |
647 | | // So we are not replicating. |
648 | 21 | replicating_ = nullptr; |
649 | 21 | return false; |
650 | 21 | } |
651 | | |
652 | 1.06M | return true; |
653 | 1.06M | } |
654 | | |
655 | 661k | void ProcessQueue() { |
656 | 663k | while (!replicating_ && !request_queue_.empty()612k ) { |
657 | 1.98k | auto request = std::move(request_queue_.front()); |
658 | 1.98k | request_queue_.pop_front(); |
659 | 1.98k | DoHandle(std::move(request)); |
660 | 1.98k | } |
661 | 661k | } |
662 | | |
663 | 620k | CHECKED_STATUS AbortedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) { |
664 | 620k | if (status_ != TransactionStatus::ABORTED && |
665 | 620k | status_ != TransactionStatus::PENDING620k ) { |
666 | 0 | LOG_WITH_PREFIX(DFATAL) << "Invalid status of aborted transaction: " |
667 | 0 | << TransactionStatus_Name(status_); |
668 | 0 | } |
669 | | |
670 | 620k | status_ = TransactionStatus::ABORTED; |
671 | 620k | first_entry_raft_index_ = data.op_id.index; |
672 | 620k | NotifyAbortWaiters(TransactionStatusResult::Aborted()); |
673 | 620k | return Status::OK(); |
674 | 620k | } |
675 | | |
676 | | CHECKED_STATUS SealedReplicationFinished( |
677 | 0 | const TransactionCoordinator::ReplicatedData& data) { |
678 | 0 | if (status_ != TransactionStatus::PENDING) { |
679 | 0 | auto status = STATUS_FORMAT( |
680 | 0 | IllegalState, |
681 | 0 | "Unexpected status during CommittedReplicationFinished: $0", |
682 | 0 | TransactionStatus_Name(status_)); |
683 | 0 | LOG_WITH_PREFIX(DFATAL) << status; |
684 | 0 | return status; |
685 | 0 | } |
686 | | |
687 | 0 | last_touch_ = data.hybrid_time; |
688 | 0 | commit_time_ = data.hybrid_time; |
689 | | // TODO(dtxn) Not yet implemented |
690 | 0 | next_abort_after_sealing_ = CoarseMonoClock::now() + FLAGS_avoid_abort_after_sealing_ms * 1ms; |
691 | | // TODO(savepoints) Savepoints with sealed transactions is not yet tested |
692 | 0 | aborted_ = data.state.aborted(); |
693 | 0 | VLOG_WITH_PREFIX(4) << "Seal time: " << commit_time_; |
694 | 0 | status_ = TransactionStatus::SEALED; |
695 | |
|
696 | 0 | involved_tablets_.reserve(data.state.tablets().size()); |
697 | 0 | for (int idx = 0; idx != data.state.tablets().size(); ++idx) { |
698 | 0 | auto tablet_batches = data.state.tablet_batches(idx); |
699 | 0 | LOG_IF_WITH_PREFIX(DFATAL, tablet_batches == 0) |
700 | 0 | << "Tablet without batches: " << data.state.ShortDebugString(); |
701 | 0 | ++tablets_with_not_replicated_batches_; |
702 | 0 | InvolvedTabletState state = { |
703 | 0 | .required_replicated_batches = static_cast<size_t>(tablet_batches), |
704 | 0 | .all_batches_replicated = false, |
705 | 0 | .all_intents_applied = false |
706 | 0 | }; |
707 | 0 | involved_tablets_.emplace(data.state.tablets(idx), state); |
708 | 0 | } |
709 | |
|
710 | 0 | first_entry_raft_index_ = data.op_id.index; |
711 | 0 | return Status::OK(); |
712 | 0 | } |
713 | | |
714 | 592k | CHECKED_STATUS CommittedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) { |
715 | 592k | if (status_ != TransactionStatus::PENDING) { |
716 | 0 | auto status = STATUS_FORMAT( |
717 | 0 | IllegalState, |
718 | 0 | "Unexpected status during CommittedReplicationFinished: $0", |
719 | 0 | TransactionStatus_Name(status_)); |
720 | 0 | LOG_WITH_PREFIX(DFATAL) << status; |
721 | 0 | return status; |
722 | 0 | } |
723 | | |
724 | 592k | YB_TRANSACTION_DUMP(Commit, id_, data.hybrid_time, data.state.tablets().size()); |
725 | | |
726 | 592k | last_touch_ = data.hybrid_time; |
727 | 592k | commit_time_ = data.hybrid_time; |
728 | 592k | first_entry_raft_index_ = data.op_id.index; |
729 | 592k | aborted_ = data.state.aborted(); |
730 | | |
731 | 592k | involved_tablets_.reserve(data.state.tablets().size()); |
732 | 1.30M | for (const auto& tablet : data.state.tablets()) { |
733 | 1.30M | InvolvedTabletState state = { |
734 | 1.30M | .required_replicated_batches = 0, |
735 | 1.30M | .all_batches_replicated = true, |
736 | 1.30M | .all_intents_applied = false |
737 | 1.30M | }; |
738 | 1.30M | involved_tablets_.emplace(tablet, state); |
739 | 1.30M | } |
740 | | |
741 | 592k | status_ = TransactionStatus::COMMITTED; |
742 | 592k | StartApply(); |
743 | 592k | return Status::OK(); |
744 | 592k | } |
745 | | |
746 | | CHECKED_STATUS AppliedInAllInvolvedTabletsReplicationFinished( |
747 | 592k | const TransactionCoordinator::ReplicatedData& data) { |
748 | 592k | if (status_ != TransactionStatus::COMMITTED && status_ != TransactionStatus::SEALED0 ) { |
749 | | // That could happen in old version, because we could drop all entries before |
750 | | // APPLIED_IN_ALL_INVOLVED_TABLETS. |
751 | 0 | LOG_WITH_PREFIX(DFATAL) |
752 | 0 | << "AppliedInAllInvolvedTabletsReplicationFinished in wrong state: " |
753 | 0 | << TransactionStatus_Name(status_) << ", request: " << data.state.ShortDebugString(); |
754 | 0 | CHECK_EQ(status_, TransactionStatus::PENDING); |
755 | 0 | } |
756 | 592k | VLOG_WITH_PREFIX19 (4) << __func__ << ", status: " << TransactionStatus_Name(status_) |
757 | 19 | << ", leader: " << context_.leader(); |
758 | 592k | last_touch_ = data.hybrid_time; |
759 | 592k | status_ = TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS; |
760 | | |
761 | 592k | YB_TRANSACTION_DUMP(Applied, id_, data.hybrid_time); |
762 | | |
763 | 592k | return Status::OK(); |
764 | 592k | } |
765 | | |
766 | | // Used for PENDING and CREATED records. Because when we apply replicated operations they have |
767 | | // the same meaning. |
768 | 1.38M | CHECKED_STATUS PendingReplicationFinished(const TransactionCoordinator::ReplicatedData& data) { |
769 | 1.38M | if (context_.leader() && ExpiredAt(data.hybrid_time)462k ) { |
770 | 48.4k | VLOG_WITH_PREFIX0 (4) << "Expired during replication of PENDING or CREATED operations."0 ; |
771 | 48.4k | Abort(); |
772 | 48.4k | return Status::OK(); |
773 | 48.4k | } |
774 | 1.33M | if (status_ != TransactionStatus::PENDING) { |
775 | 0 | LOG_WITH_PREFIX(DFATAL) << "Bad status during " << __func__ << "(" << data.ToString() |
776 | 0 | << "): " << ToString(); |
777 | 0 | return Status::OK(); |
778 | 0 | } |
779 | 1.33M | last_touch_ = data.hybrid_time; |
780 | 1.33M | first_entry_raft_index_ = data.op_id.index; |
781 | 1.33M | return Status::OK(); |
782 | 1.33M | } |
783 | | |
784 | 4.95M | void NotifyAbortWaiters(const Result<TransactionStatusResult>& result) { |
785 | 4.95M | for (auto& waiter : abort_waiters_) { |
786 | 158k | waiter(result); |
787 | 158k | } |
788 | 4.95M | abort_waiters_.clear(); |
789 | 4.95M | } |
790 | | |
791 | 592k | void StartApply() { |
792 | 592k | VLOG_WITH_PREFIX36 (4) << __func__ << ", commit time: " << commit_time_ << ", involved tablets: " |
793 | 36 | << AsString(involved_tablets_); |
794 | 592k | resend_applying_time_ = MonoTime::Now() + |
795 | 592k | std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec); |
796 | 592k | tablets_with_not_applied_intents_ = involved_tablets_.size(); |
797 | 592k | if (context_.leader()) { |
798 | 436k | for (const auto& tablet : involved_tablets_) { |
799 | 436k | context_.NotifyApplying({ |
800 | 436k | .tablet = tablet.first, |
801 | 436k | .transaction = id_, |
802 | 436k | .aborted = aborted_, |
803 | 436k | .commit_time = commit_time_, |
804 | 436k | .sealed = status_ == TransactionStatus::SEALED}); |
805 | 436k | } |
806 | 198k | } |
807 | 592k | NotifyAbortWaiters(TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_)); |
808 | 592k | } |
809 | | |
810 | | void FillExpectedTabletBatches( |
811 | 0 | std::vector<ExpectedTabletBatches>* expected_tablet_batches) const { |
812 | 0 | if (!expected_tablet_batches) { |
813 | 0 | return; |
814 | 0 | } |
815 | | |
816 | 0 | for (const auto& tablet_id_and_state : involved_tablets_) { |
817 | 0 | if (!tablet_id_and_state.second.all_batches_replicated) { |
818 | 0 | expected_tablet_batches->push_back(ExpectedTabletBatches{ |
819 | 0 | .tablet = tablet_id_and_state.first, |
820 | 0 | .batches = tablet_id_and_state.second.required_replicated_batches |
821 | 0 | }); |
822 | 0 | } |
823 | 0 | } |
824 | 0 | } |
825 | | |
826 | | TransactionStateContext& context_; |
827 | | const TransactionId id_; |
828 | | const std::string log_prefix_; |
829 | | TransactionStatus status_ = TransactionStatus::PENDING; |
830 | | HybridTime last_touch_; |
831 | | // It should match last_touch_, but it is possible that because of some code errors it |
832 | | // would not be so. To add stability we introduce a separate field for it. |
833 | | HybridTime commit_time_; |
834 | | |
835 | | // If transaction was only sealed, we will try to abort it not earlier than this time. |
836 | | CoarseTimePoint next_abort_after_sealing_; |
837 | | |
838 | | struct InvolvedTabletState { |
839 | | // How many batches should be replicated at this tablet. |
840 | | size_t required_replicated_batches = 0; |
841 | | |
842 | | // True if this tablet already replicated all batches. |
843 | | bool all_batches_replicated = false; |
844 | | |
845 | | // True if this tablet already applied all intents. |
846 | | bool all_intents_applied = false; |
847 | | |
848 | 1.30M | std::string ToString() const { |
849 | 1.30M | return Format("{ required_replicated_batches: $0 all_batches_replicated: $1 " |
850 | 1.30M | "all_intents_applied: $2 }", |
851 | 1.30M | required_replicated_batches, all_batches_replicated, all_intents_applied); |
852 | 1.30M | } |
853 | | }; |
854 | | |
855 | | // Tablets participating in this transaction. |
856 | | std::unordered_map<TabletId, InvolvedTabletState> involved_tablets_; |
857 | | // Number of tablets that have not yet replicated all batches. |
858 | | size_t tablets_with_not_replicated_batches_ = 0; |
859 | | // Number of tablets that have not yet applied intents. |
860 | | size_t tablets_with_not_applied_intents_ = 0; |
861 | | // Don't resend applying until this time. |
862 | | MonoTime resend_applying_time_; |
863 | | int64_t first_entry_raft_index_ = std::numeric_limits<int64_t>::max(); |
864 | | |
865 | | // Metadata tracking aborted subtransaction IDs in this transaction. |
866 | | AbortedSubTransactionSetPB aborted_; |
867 | | |
868 | | // The operation that we a currently replicating in RAFT. |
869 | | // It is owned by TransactionDriver (that will be renamed to OperationDriver). |
870 | | tablet::UpdateTxnOperation* replicating_ = nullptr; |
871 | | // Hybrid time before submitting replicating operation. |
872 | | // It is guaranteed to be less then actual operation hybrid time. |
873 | | HybridTime replicating_submit_time_; |
874 | | std::deque<std::unique_ptr<tablet::UpdateTxnOperation>> request_queue_; |
875 | | |
876 | | std::vector<TransactionAbortCallback> abort_waiters_; |
877 | | }; |
878 | | |
879 | | struct CompleteWithStatusEntry { |
880 | | std::unique_ptr<UpdateTxnOperation> holder; |
881 | | UpdateTxnOperation* request; |
882 | | Status status; |
883 | | }; |
884 | | |
885 | | // Contains actions that should be executed after lock in transaction coordinator is released. |
886 | | struct PostponedLeaderActions { |
887 | | int64_t leader_term = OpId::kUnknownTerm; |
888 | | // List of tablets with transaction id, that should be notified that this transaction |
889 | | // is applying. |
890 | | std::vector<NotifyApplyingData> notify_applying; |
891 | | // List of update transaction records, that should be replicated via RAFT. |
892 | | std::vector<std::unique_ptr<UpdateTxnOperation>> updates; |
893 | | |
894 | | std::vector<CompleteWithStatusEntry> complete_with_status; |
895 | | |
896 | 10.0M | void Swap(PostponedLeaderActions* other) { |
897 | 10.0M | std::swap(leader_term, other->leader_term); |
898 | 10.0M | notify_applying.swap(other->notify_applying); |
899 | 10.0M | updates.swap(other->updates); |
900 | 10.0M | complete_with_status.swap(other->complete_with_status); |
901 | 10.0M | } |
902 | | |
903 | 13.5M | bool leader() const { |
904 | 13.5M | return leader_term != OpId::kUnknownTerm; |
905 | 13.5M | } |
906 | | }; |
907 | | |
908 | | } // namespace |
909 | | |
910 | 0 | std::string TransactionCoordinator::AbortedData::ToString() const { |
911 | 0 | return YB_STRUCT_TO_STRING(state, op_id); |
912 | 0 | } |
913 | | |
914 | | // Real implementation of transaction coordinator, as in PImpl idiom. |
915 | | class TransactionCoordinator::Impl : public TransactionStateContext { |
916 | | public: |
917 | | Impl(const std::string& permanent_uuid, |
918 | | TransactionCoordinatorContext* context, |
919 | | Counter* expired_metric) |
920 | | : context_(*context), |
921 | | expired_metric_(*expired_metric), |
922 | | log_prefix_(consensus::MakeTabletLogPrefix(context->tablet_id(), permanent_uuid)), |
923 | 48.0k | poller_(log_prefix_, std::bind(&Impl::Poll, this)) { |
924 | 48.0k | } |
925 | | |
926 | 432 | virtual ~Impl() { |
927 | 432 | Shutdown(); |
928 | 432 | } |
929 | | |
930 | 864 | void Shutdown() { |
931 | 864 | poller_.Shutdown(); |
932 | 864 | rpcs_.Shutdown(); |
933 | 864 | } |
934 | | |
935 | | CHECKED_STATUS GetStatus(const google::protobuf::RepeatedPtrField<std::string>& transaction_ids, |
936 | | CoarseTimePoint deadline, |
937 | 296k | tserver::GetTransactionStatusResponsePB* response) { |
938 | 296k | AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms); |
939 | 296k | auto leader_term = context_.LeaderTerm(); |
940 | 296k | PostponedLeaderActions postponed_leader_actions; |
941 | 296k | { |
942 | 296k | std::unique_lock<std::mutex> lock(managed_mutex_); |
943 | 296k | HybridTime leader_safe_time; |
944 | 296k | postponed_leader_actions_.leader_term = leader_term; |
945 | 296k | for (const auto& transaction_id : transaction_ids) { |
946 | 296k | auto id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_id)); |
947 | | |
948 | 0 | auto it = managed_transactions_.find(id); |
949 | 296k | std::vector<ExpectedTabletBatches> expected_tablet_batches; |
950 | 296k | bool known_txn = it != managed_transactions_.end(); |
951 | 296k | auto txn_status_with_ht = known_txn |
952 | 296k | ? VERIFY_RESULT(it->GetStatus(&expected_tablet_batches)) |
953 | 296k | : TransactionStatusResult(TransactionStatus::ABORTED, HybridTime::kMax)63.0k ; |
954 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << ": " << id << " => " << txn_status_with_ht |
955 | 18.4E | << ", last touch: " << it->last_touch(); |
956 | 296k | if (txn_status_with_ht.status == TransactionStatus::SEALED) { |
957 | | // TODO(dtxn) Avoid concurrent resolve |
958 | 0 | txn_status_with_ht = VERIFY_RESULT(ResolveSealedStatus( |
959 | 0 | id, txn_status_with_ht.status_time, expected_tablet_batches, |
960 | 0 | /* abort_if_not_replicated = */ false, &lock)); |
961 | 0 | } |
962 | 296k | if (!known_txn) { |
963 | 63.0k | if (!leader_safe_time) { |
964 | | // We should pick leader safe time only after managed_mutex_ is locked. |
965 | | // Otherwise applied transaction could be removed after this safe time. |
966 | 63.0k | leader_safe_time = VERIFY_RESULT(context_.LeaderSafeTime()); |
967 | 63.0k | } |
968 | | // Please note that for known transactions we send 0, that means invalid hybrid time. |
969 | | // We would wait for safe time only for case when transaction is unknown to coordinator. |
970 | | // Since it is only case when transaction could be actually committed. |
971 | 63.0k | response->mutable_coordinator_safe_time()->Resize(response->status().size(), 0); |
972 | 63.0k | response->add_coordinator_safe_time(leader_safe_time.ToUint64()); |
973 | 63.0k | } |
974 | 296k | response->add_status(txn_status_with_ht.status); |
975 | 296k | response->add_status_hybrid_time(txn_status_with_ht.status_time.ToUint64()); |
976 | | |
977 | 296k | auto mutable_aborted_set_pb = response->add_aborted_subtxn_set(); |
978 | 296k | if (txn_status_with_ht.status == TransactionStatus::COMMITTED && |
979 | 296k | it != managed_transactions_.end()35.2k ) { |
980 | 35.2k | *mutable_aborted_set_pb = it->GetAbortedSubTransactionSetPB(); |
981 | 35.2k | } |
982 | 296k | } |
983 | 296k | postponed_leader_actions.Swap(&postponed_leader_actions_); |
984 | 296k | } |
985 | | |
986 | 0 | ExecutePostponedLeaderActions(&postponed_leader_actions); |
987 | 296k | if (GetAtomicFlag(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms)) { |
988 | 0 | if (response->status().size() > 0 && response->status(0) == TransactionStatus::PENDING) { |
989 | 0 | AtomicFlagRandomSleepMs(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms); |
990 | 0 | } |
991 | 0 | } |
992 | 296k | return Status::OK(); |
993 | 296k | } |
994 | | |
995 | | Result<TransactionStatusResult> ResolveSealedStatus( |
996 | | const TransactionId& transaction_id, |
997 | | HybridTime commit_time, |
998 | | const std::vector<ExpectedTabletBatches>& expected_tablet_batches, |
999 | | bool abort_if_not_replicated, |
1000 | 0 | std::unique_lock<std::mutex>* lock) { |
1001 | 0 | VLOG_WITH_PREFIX(4) |
1002 | 0 | << __func__ << ", txn: " << transaction_id << ", commit time: " << commit_time |
1003 | 0 | << ", expected tablet batches: " << AsString(expected_tablet_batches) |
1004 | 0 | << ", abort if not replicated: " << abort_if_not_replicated; |
1005 | |
|
1006 | 0 | auto deadline = TransactionRpcDeadline(); |
1007 | 0 | auto now_ht = context_.clock().Now(); |
1008 | 0 | CountDownLatch latch(expected_tablet_batches.size()); |
1009 | 0 | std::vector<HybridTime> write_hybrid_times(expected_tablet_batches.size()); |
1010 | 0 | { |
1011 | 0 | lock->unlock(); |
1012 | 0 | auto scope_exit = ScopeExit([lock] { |
1013 | 0 | if (lock) { |
1014 | 0 | lock->lock(); |
1015 | 0 | } |
1016 | 0 | }); |
1017 | 0 | size_t idx = 0; |
1018 | 0 | for (const auto& p : expected_tablet_batches) { |
1019 | 0 | tserver::GetTransactionStatusAtParticipantRequestPB req; |
1020 | 0 | req.set_tablet_id(p.tablet); |
1021 | 0 | req.set_transaction_id( |
1022 | 0 | pointer_cast<const char*>(transaction_id.data()), transaction_id.size()); |
1023 | 0 | req.set_propagated_hybrid_time(now_ht.ToUint64()); |
1024 | 0 | if (abort_if_not_replicated) { |
1025 | 0 | req.set_required_num_replicated_batches(p.batches); |
1026 | 0 | } |
1027 | |
|
1028 | 0 | auto handle = rpcs_.Prepare(); |
1029 | 0 | if (handle != rpcs_.InvalidHandle()) { |
1030 | 0 | *handle = GetTransactionStatusAtParticipant( |
1031 | 0 | deadline, |
1032 | 0 | nullptr /* remote_tablet */, |
1033 | 0 | context_.client_future().get(), |
1034 | 0 | &req, |
1035 | 0 | [this, handle, idx, &write_hybrid_times, &expected_tablet_batches, &latch, |
1036 | 0 | &transaction_id, &p]( |
1037 | 0 | const Status& status, |
1038 | 0 | const tserver::GetTransactionStatusAtParticipantResponsePB& resp) { |
1039 | 0 | client::UpdateClock(resp, &context_); |
1040 | 0 | rpcs_.Unregister(handle); |
1041 | |
|
1042 | 0 | VLOG_WITH_PREFIX(4) |
1043 | 0 | << "TXN: " << transaction_id << " batch status at " << p.tablet << ": " |
1044 | 0 | << "idx: " << idx << ", resp: " << resp.ShortDebugString() << ", expected: " |
1045 | 0 | << expected_tablet_batches[idx].batches; |
1046 | 0 | if (status.ok()) { |
1047 | 0 | if (resp.aborted()) { |
1048 | 0 | write_hybrid_times[idx] = HybridTime::kMin; |
1049 | 0 | } else if (implicit_cast<size_t>(resp.num_replicated_batches()) == |
1050 | 0 | expected_tablet_batches[idx].batches) { |
1051 | 0 | write_hybrid_times[idx] = HybridTime(resp.status_hybrid_time()); |
1052 | 0 | LOG_IF_WITH_PREFIX(DFATAL, !write_hybrid_times[idx].is_valid()) |
1053 | 0 | << "Received invalid hybrid time when all batches were replicated: " |
1054 | 0 | << resp.ShortDebugString(); |
1055 | 0 | } |
1056 | 0 | } |
1057 | 0 | latch.CountDown(); |
1058 | 0 | }); |
1059 | 0 | (**handle).SendRpc(); |
1060 | 0 | } else { |
1061 | 0 | latch.CountDown(); |
1062 | 0 | } |
1063 | 0 | ++idx; |
1064 | 0 | } |
1065 | 0 | latch.Wait(); |
1066 | 0 | } |
1067 | |
|
1068 | 0 | auto txn_it = managed_transactions_.find(transaction_id); |
1069 | 0 | if (txn_it == managed_transactions_.end()) { |
1070 | | // Transaction was completed (aborted/committed) during this procedure. |
1071 | 0 | return TransactionStatusResult{TransactionStatus::PENDING, commit_time.Decremented()}; |
1072 | 0 | } |
1073 | | |
1074 | 0 | for (size_t idx = 0; idx != expected_tablet_batches.size(); ++idx) { |
1075 | 0 | if (write_hybrid_times[idx] == HybridTime::kMin) { |
1076 | 0 | managed_transactions_.modify(txn_it, [](TransactionState& state) { |
1077 | 0 | state.Aborted(); |
1078 | 0 | }); |
1079 | 0 | } else if (write_hybrid_times[idx].is_valid()) { |
1080 | 0 | managed_transactions_.modify( |
1081 | 0 | txn_it, [idx, &expected_tablet_batches, &write_hybrid_times](TransactionState& state) { |
1082 | 0 | state.ReplicatedAllBatchesAt( |
1083 | 0 | expected_tablet_batches[idx].tablet, write_hybrid_times[idx]); |
1084 | 0 | }); |
1085 | 0 | } |
1086 | 0 | } |
1087 | 0 | auto result = VERIFY_RESULT(txn_it->GetStatus(/* expected_tablet_batches = */ nullptr)); |
1088 | 0 | if (result.status != TransactionStatus::SEALED) { |
1089 | 0 | VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status resolved: " |
1090 | 0 | << TransactionStatus_Name(result.status); |
1091 | 0 | return result; |
1092 | 0 | } |
1093 | | |
1094 | 0 | VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status NOT resolved"; |
1095 | 0 | return TransactionStatusResult{TransactionStatus::PENDING, result.status_time.Decremented()}; |
1096 | 0 | } |
1097 | | |
1098 | 186k | void Abort(const std::string& transaction_id, int64_t term, TransactionAbortCallback callback) { |
1099 | 186k | AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms); |
1100 | | |
1101 | 186k | auto id = FullyDecodeTransactionId(transaction_id); |
1102 | 186k | if (!id.ok()) { |
1103 | 0 | callback(id.status()); |
1104 | 0 | return; |
1105 | 0 | } |
1106 | | |
1107 | 186k | PostponedLeaderActions actions; |
1108 | 186k | { |
1109 | 186k | std::unique_lock<std::mutex> lock(managed_mutex_); |
1110 | 186k | auto it = managed_transactions_.find(*id); |
1111 | 186k | if (it == managed_transactions_.end()) { |
1112 | 3.14k | lock.unlock(); |
1113 | 3.14k | callback(TransactionStatusResult::Aborted()); |
1114 | 3.14k | return; |
1115 | 3.14k | } |
1116 | 183k | postponed_leader_actions_.leader_term = term; |
1117 | 183k | boost::optional<TransactionStatusResult> status; |
1118 | 183k | managed_transactions_.modify(it, [&status, &callback](TransactionState& state) { |
1119 | 183k | status = state.Abort(&callback); |
1120 | 183k | }); |
1121 | 183k | if (callback) { |
1122 | 22.5k | lock.unlock(); |
1123 | 22.5k | callback(*status); |
1124 | 22.5k | return; |
1125 | 22.5k | } |
1126 | 160k | actions.Swap(&postponed_leader_actions_); |
1127 | 160k | } |
1128 | | |
1129 | 0 | ExecutePostponedLeaderActions(&actions); |
1130 | 160k | } |
1131 | | |
1132 | 0 | size_t test_count_transactions() { |
1133 | 0 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1134 | 0 | return managed_transactions_.size(); |
1135 | 0 | } |
1136 | | |
1137 | 3.18M | CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) { |
1138 | 3.18M | auto id = FullyDecodeTransactionId(data.state.transaction_id()); |
1139 | 3.18M | if (!id.ok()) { |
1140 | 0 | return std::move(id.status()); |
1141 | 0 | } |
1142 | | |
1143 | 3.18M | PostponedLeaderActions actions; |
1144 | 3.18M | Status result; |
1145 | 3.18M | { |
1146 | 3.18M | std::lock_guard<std::mutex> lock(managed_mutex_); |
1147 | 3.18M | postponed_leader_actions_.leader_term = data.leader_term; |
1148 | 3.18M | auto it = GetTransaction(*id, data.state.status(), data.hybrid_time); |
1149 | 3.18M | if (it == managed_transactions_.end()) { |
1150 | 0 | return Status::OK(); |
1151 | 0 | } |
1152 | 3.18M | managed_transactions_.modify(it, [&result, &data](TransactionState& state) 3.18M { |
1153 | 3.18M | result = state.ProcessReplicated(data); |
1154 | 3.18M | }); |
1155 | 3.18M | CheckCompleted(it); |
1156 | 3.18M | actions.Swap(&postponed_leader_actions_); |
1157 | 3.18M | } |
1158 | 0 | ExecutePostponedLeaderActions(&actions); |
1159 | | |
1160 | 18.4E | VLOG_WITH_PREFIX(1) << "Processed: " << data.ToString(); |
1161 | 3.18M | return result; |
1162 | 3.18M | } |
1163 | | |
1164 | 0 | void ProcessAborted(const AbortedData& data) { |
1165 | 0 | auto id = FullyDecodeTransactionId(data.state.transaction_id()); |
1166 | 0 | if (!id.ok()) { |
1167 | 0 | LOG_WITH_PREFIX(DFATAL) << "Abort of transaction with bad id " |
1168 | 0 | << data.state.ShortDebugString() << ": " << id.status(); |
1169 | 0 | return; |
1170 | 0 | } |
1171 | | |
1172 | 0 | PostponedLeaderActions actions; |
1173 | 0 | { |
1174 | 0 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1175 | 0 | postponed_leader_actions_.leader_term = OpId::kUnknownTerm; |
1176 | 0 | auto it = managed_transactions_.find(*id); |
1177 | 0 | if (it == managed_transactions_.end()) { |
1178 | 0 | LOG_WITH_PREFIX(WARNING) << "Aborted operation for unknown transaction: " << *id; |
1179 | 0 | return; |
1180 | 0 | } |
1181 | 0 | managed_transactions_.modify( |
1182 | 0 | it, [&](TransactionState& ts) { |
1183 | 0 | ts.ProcessAborted(data); |
1184 | 0 | }); |
1185 | 0 | CheckCompleted(it); |
1186 | 0 | actions.Swap(&postponed_leader_actions_); |
1187 | 0 | } |
1188 | 0 | ExecutePostponedLeaderActions(&actions); |
1189 | |
|
1190 | 0 | VLOG_WITH_PREFIX(1) << "Aborted, state: " << data.state.ShortDebugString() |
1191 | 0 | << ", op id: " << data.op_id; |
1192 | 0 | } |
1193 | | |
1194 | 48.0k | void Start() { |
1195 | 48.0k | poller_.Start( |
1196 | 48.0k | &context_.client_future().get()->messenger()->scheduler(), |
1197 | 48.0k | std::chrono::microseconds(kTimeMultiplier * FLAGS_transaction_check_interval_usec)); |
1198 | 48.0k | } |
1199 | | |
1200 | 1.16M | void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) { |
1201 | 1.16M | auto& state = *request->request(); |
1202 | 1.16M | auto id = FullyDecodeTransactionId(state.transaction_id()); |
1203 | 1.16M | if (!id.ok()) { |
1204 | 0 | LOG(WARNING) << "Failed to decode id from " << state.ShortDebugString() << ": " << id; |
1205 | 0 | request->CompleteWithStatus(id.status()); |
1206 | 0 | return; |
1207 | 0 | } |
1208 | | |
1209 | 1.16M | PostponedLeaderActions actions; |
1210 | 1.16M | { |
1211 | 1.16M | std::unique_lock<std::mutex> lock(managed_mutex_); |
1212 | 1.16M | postponed_leader_actions_.leader_term = term; |
1213 | 1.16M | auto it = managed_transactions_.find(*id); |
1214 | 1.16M | if (it == managed_transactions_.end()) { |
1215 | 469k | if (state.status() == TransactionStatus::CREATED) { |
1216 | 410k | it = managed_transactions_.emplace( |
1217 | 410k | this, *id, context_.clock().Now(), log_prefix_).first; |
1218 | 410k | } else { |
1219 | 58.5k | lock.unlock(); |
1220 | 58.5k | YB_LOG_HIGHER_SEVERITY_WHEN_TOO_MANY(INFO, WARNING, 1s, 50) |
1221 | 58.5k | << LogPrefix() << "Request to unknown transaction " << id << ": " |
1222 | 58.5k | << state.ShortDebugString(); |
1223 | 58.5k | auto status = STATUS_EC_FORMAT( |
1224 | 58.5k | Expired, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE), |
1225 | 58.5k | "Transaction $0 expired or aborted by a conflict", *id); |
1226 | 58.5k | status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); |
1227 | 58.5k | request->CompleteWithStatus(status); |
1228 | 58.5k | return; |
1229 | 58.5k | } |
1230 | 469k | } |
1231 | | |
1232 | 1.10M | managed_transactions_.modify(it, [&request](TransactionState& state) 1.10M { |
1233 | 1.10M | state.Handle(std::move(request)); |
1234 | 1.10M | }); |
1235 | 1.10M | postponed_leader_actions_.Swap(&actions); |
1236 | 1.10M | } |
1237 | | |
1238 | 0 | ExecutePostponedLeaderActions(&actions); |
1239 | 1.10M | } |
1240 | | |
1241 | 10.4M | int64_t PrepareGC(std::string* details) { |
1242 | 10.4M | std::lock_guard<std::mutex> lock(managed_mutex_); |
1243 | 10.4M | if (!managed_transactions_.empty()) { |
1244 | 396k | auto& txn = *managed_transactions_.get<FirstEntryIndexTag>().begin(); |
1245 | 396k | if (details) { |
1246 | 0 | *details += Format("Transaction coordinator: $0\n", txn); |
1247 | 0 | } |
1248 | 396k | return txn.first_entry_raft_index(); |
1249 | 396k | } |
1250 | 10.0M | return std::numeric_limits<int64_t>::max(); |
1251 | 10.4M | } |
1252 | | |
1253 | | // Returns logs prefix for this transaction coordinator. |
1254 | 58.5k | const std::string& LogPrefix() { |
1255 | 58.5k | return log_prefix_; |
1256 | 58.5k | } |
1257 | | |
1258 | 0 | std::string DumpTransactions() { |
1259 | 0 | std::string result; |
1260 | 0 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1261 | 0 | for (const auto& txn : managed_transactions_) { |
1262 | 0 | result += txn.ToString(); |
1263 | 0 | result += "\n"; |
1264 | 0 | } |
1265 | 0 | return result; |
1266 | 0 | } |
1267 | | |
1268 | | private: |
1269 | | class LastTouchTag; |
1270 | | class FirstEntryIndexTag; |
1271 | | |
1272 | | typedef boost::multi_index_container<TransactionState, |
1273 | | boost::multi_index::indexed_by < |
1274 | | boost::multi_index::hashed_unique < |
1275 | | boost::multi_index::const_mem_fun<TransactionState, |
1276 | | const TransactionId&, |
1277 | | &TransactionState::id> |
1278 | | >, |
1279 | | boost::multi_index::ordered_non_unique < |
1280 | | boost::multi_index::tag<LastTouchTag>, |
1281 | | boost::multi_index::const_mem_fun<TransactionState, |
1282 | | HybridTime, |
1283 | | &TransactionState::last_touch> |
1284 | | >, |
1285 | | boost::multi_index::ordered_non_unique < |
1286 | | boost::multi_index::tag<FirstEntryIndexTag>, |
1287 | | boost::multi_index::const_mem_fun<TransactionState, |
1288 | | int64_t, |
1289 | | &TransactionState::first_entry_raft_index> |
1290 | | > |
1291 | | > |
1292 | | > ManagedTransactions; |
1293 | | |
1294 | | void SendUpdateTransactionRequest( |
1295 | | const NotifyApplyingData& action, HybridTime now, |
1296 | 436k | const CoarseTimePoint& deadline) { |
1297 | 18.4E | VLOG_WITH_PREFIX(3) << "Notify applying: " << action.ToString(); |
1298 | | |
1299 | 436k | tserver::UpdateTransactionRequestPB req; |
1300 | 436k | req.set_tablet_id(action.tablet); |
1301 | 436k | req.set_propagated_hybrid_time(now.ToUint64()); |
1302 | 436k | auto& state = *req.mutable_state(); |
1303 | 436k | state.set_transaction_id(action.transaction.data(), action.transaction.size()); |
1304 | 436k | state.set_status(TransactionStatus::APPLYING); |
1305 | 436k | state.add_tablets(context_.tablet_id()); |
1306 | 436k | state.set_commit_hybrid_time(action.commit_time.ToUint64()); |
1307 | 436k | state.set_sealed(action.sealed); |
1308 | 436k | *state.mutable_aborted() = action.aborted; |
1309 | | |
1310 | 436k | auto handle = rpcs_.Prepare(); |
1311 | 436k | if (handle != rpcs_.InvalidHandle()) { |
1312 | 436k | *handle = UpdateTransaction( |
1313 | 436k | deadline, |
1314 | 436k | nullptr /* remote_tablet */, |
1315 | 436k | context_.client_future().get(), |
1316 | 436k | &req, |
1317 | 436k | [this, handle, action] |
1318 | 436k | (const Status& status, |
1319 | 436k | const tserver::UpdateTransactionRequestPB& req, |
1320 | 436k | const tserver::UpdateTransactionResponsePB& resp) { |
1321 | 436k | client::UpdateClock(resp, &context_); |
1322 | 436k | rpcs_.Unregister(handle); |
1323 | 436k | if (status.ok()436k ) { |
1324 | 436k | return; |
1325 | 436k | } |
1326 | 18.4E | LOG_WITH_PREFIX(WARNING) |
1327 | 18.4E | << "Failed to send apply for transaction: " << action.transaction << ": " |
1328 | 18.4E | << status; |
1329 | 18.4E | const auto split_child_tablet_ids = SplitChildTabletIdsData(status).value(); |
1330 | 18.4E | const bool tablet_has_been_split = !split_child_tablet_ids.empty(); |
1331 | 18.4E | if (status.IsNotFound() || tablet_has_been_split5 ) { |
1332 | 27 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1333 | 27 | auto it = managed_transactions_.find(action.transaction); |
1334 | 27 | if (it == managed_transactions_.end()) { |
1335 | 0 | return; |
1336 | 0 | } |
1337 | 27 | managed_transactions_.modify( |
1338 | 27 | it, [this, &action, &split_child_tablet_ids, |
1339 | 27 | tablet_has_been_split](TransactionState& state) { |
1340 | 27 | if (tablet_has_been_split) { |
1341 | | // We need to update involved tablets map. |
1342 | 0 | LOG_WITH_PREFIX(INFO) << Format( |
1343 | 0 | "Tablet $0 has been split into: $1", action.tablet, |
1344 | 0 | split_child_tablet_ids); |
1345 | 0 | state.AddInvolvedTablets(action.tablet, split_child_tablet_ids); |
1346 | 27 | } else { |
1347 | | // Tablet has been deleted (not split), so we should mark it as applied to |
1348 | | // be able to cleanup the transaction. |
1349 | 27 | TransactionStatePB transaction_state; |
1350 | 27 | transaction_state.add_tablets(action.tablet); |
1351 | 27 | WARN_NOT_OK( |
1352 | 27 | state.AppliedInOneOfInvolvedTablets(transaction_state), |
1353 | 27 | "AppliedInOneOfInvolvedTablets for removed tabled failed: "); |
1354 | 27 | } |
1355 | 27 | }); |
1356 | 27 | if (tablet_has_been_split) { |
1357 | 0 | const auto new_deadline = TransactionRpcDeadline(); |
1358 | 0 | NotifyApplyingData new_action = action; |
1359 | 0 | for (const auto& split_child_tablet_id : split_child_tablet_ids) { |
1360 | 0 | new_action.tablet = split_child_tablet_id; |
1361 | 0 | SendUpdateTransactionRequest(new_action, context_.clock().Now(), new_deadline); |
1362 | 0 | } |
1363 | 0 | } |
1364 | 27 | } |
1365 | 18.4E | }); |
1366 | 436k | (**handle).SendRpc(); |
1367 | 436k | } |
1368 | 436k | } |
1369 | | |
1370 | 10.0M | void ExecutePostponedLeaderActions(PostponedLeaderActions* actions) { |
1371 | 10.0M | for (const auto& p : actions->complete_with_status) { |
1372 | 443k | p.request->CompleteWithStatus(p.status); |
1373 | 443k | } |
1374 | | |
1375 | 10.0M | if (!actions->leader()) { |
1376 | 6.12M | return; |
1377 | 6.12M | } |
1378 | | |
1379 | 3.96M | if (!actions->notify_applying.empty()) { |
1380 | 198k | auto now = context_.clock().Now(); |
1381 | 198k | auto deadline = TransactionRpcDeadline(); |
1382 | 436k | for (const auto& action : actions->notify_applying) { |
1383 | 436k | SendUpdateTransactionRequest(action, now, deadline); |
1384 | 436k | } |
1385 | 198k | } |
1386 | | |
1387 | 3.96M | for (auto& update : actions->updates) { |
1388 | 1.06M | context_.SubmitUpdateTransaction(std::move(update), actions->leader_term); |
1389 | 1.06M | } |
1390 | 3.96M | } |
1391 | | |
1392 | | ManagedTransactions::iterator GetTransaction(const TransactionId& id, |
1393 | | TransactionStatus status, |
1394 | 3.18M | HybridTime hybrid_time) { |
1395 | 3.18M | auto it = managed_transactions_.find(id); |
1396 | 3.18M | if (it == managed_transactions_.end()) { |
1397 | 816k | if (status != TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS816k ) { |
1398 | 816k | it = managed_transactions_.emplace(this, id, hybrid_time, log_prefix_).first; |
1399 | 18.4E | VLOG_WITH_PREFIX(1) << Format("Added: $0", *it); |
1400 | 816k | } |
1401 | 816k | } |
1402 | 3.18M | return it; |
1403 | 3.18M | } |
1404 | | |
1405 | 2.02M | TransactionCoordinatorContext& coordinator_context() override { |
1406 | 2.02M | return context_; |
1407 | 2.02M | } |
1408 | | |
1409 | 436k | void NotifyApplying(NotifyApplyingData data) override { |
1410 | 436k | if (!leader()) { |
1411 | 0 | LOG_WITH_PREFIX(WARNING) << __func__ << " at non leader: " << data.ToString(); |
1412 | 0 | return; |
1413 | 0 | } |
1414 | 436k | postponed_leader_actions_.notify_applying.push_back(std::move(data)); |
1415 | 436k | } |
1416 | | |
1417 | | MUST_USE_RESULT bool SubmitUpdateTransaction( |
1418 | 1.07M | std::unique_ptr<UpdateTxnOperation> operation) override { |
1419 | 1.07M | if (!postponed_leader_actions_.leader()) { |
1420 | 21 | auto status = STATUS(IllegalState, "Submit update transaction on non leader"); |
1421 | 21 | VLOG_WITH_PREFIX0 (1) << status0 ; |
1422 | 21 | operation->CompleteWithStatus(status); |
1423 | 21 | return false; |
1424 | 21 | } |
1425 | | |
1426 | 1.07M | postponed_leader_actions_.updates.push_back(std::move(operation)); |
1427 | 1.07M | return true; |
1428 | 1.07M | } |
1429 | | |
1430 | | void CompleteWithStatus( |
1431 | 443k | std::unique_ptr<UpdateTxnOperation> request, Status status) override { |
1432 | 443k | auto ptr = request.get(); |
1433 | 443k | postponed_leader_actions_.complete_with_status.push_back({ |
1434 | 443k | std::move(request), ptr, std::move(status)}); |
1435 | 443k | } |
1436 | | |
1437 | 0 | void CompleteWithStatus(UpdateTxnOperation* request, Status status) override { |
1438 | 0 | postponed_leader_actions_.complete_with_status.push_back({ |
1439 | 0 | nullptr /* holder */, request, std::move(status)}); |
1440 | 0 | } |
1441 | | |
1442 | 2.41M | bool leader() const override { |
1443 | 2.41M | return postponed_leader_actions_.leader(); |
1444 | 2.41M | } |
1445 | | |
1446 | 5.52M | void Poll() { |
1447 | 5.52M | auto now = context_.clock().Now(); |
1448 | | |
1449 | 5.52M | auto leader_term = context_.LeaderTerm(); |
1450 | 5.52M | bool leader = leader_term != OpId::kUnknownTerm; |
1451 | 5.52M | PostponedLeaderActions actions; |
1452 | 5.52M | { |
1453 | 5.52M | std::lock_guard<std::mutex> lock(managed_mutex_); |
1454 | 5.52M | postponed_leader_actions_.leader_term = leader_term; |
1455 | | |
1456 | 5.52M | auto& index = managed_transactions_.get<LastTouchTag>(); |
1457 | | |
1458 | 5.52M | if (VLOG_IS_ON(4) && leader3 && !index.empty()2 ) { |
1459 | 0 | const auto& txn = *index.begin(); |
1460 | 0 | LOG_WITH_PREFIX(INFO) |
1461 | 0 | << __func__ << ", now: " << now << ", first: " << txn.ToString() |
1462 | 0 | << ", expired: " << txn.ExpiredAt(now) << ", timeout: " |
1463 | 0 | << MonoDelta(GetTransactionTimeout()) << ", passed: " |
1464 | 0 | << MonoDelta::FromMicroseconds( |
1465 | 0 | now.GetPhysicalValueMicros() - txn.last_touch().GetPhysicalValueMicros()); |
1466 | 0 | } |
1467 | | |
1468 | 5.52M | for (auto it = index.begin(); it != index.end() && it->ExpiredAt(now)206k ;) { |
1469 | 1.50k | if (it->status() == TransactionStatus::ABORTED) { |
1470 | 0 | it = index.erase(it); |
1471 | 1.50k | } else { |
1472 | 1.50k | if (leader) { |
1473 | 248 | expired_metric_.Increment(); |
1474 | 248 | bool modified = index.modify(it, [](TransactionState& state) { |
1475 | 248 | VLOG(4) << state.LogPrefix() << "Cleanup expired transaction"0 ; |
1476 | 248 | state.Abort(); |
1477 | 248 | }); |
1478 | 248 | DCHECK(modified); |
1479 | 248 | } |
1480 | 1.50k | ++it; |
1481 | 1.50k | } |
1482 | 1.50k | } |
1483 | 5.52M | auto now_physical = MonoTime::Now(); |
1484 | 5.52M | for (auto& transaction : managed_transactions_) { |
1485 | 450k | const_cast<TransactionState&>(transaction).Poll(leader, now_physical); |
1486 | 450k | } |
1487 | 5.52M | postponed_leader_actions_.Swap(&actions); |
1488 | 5.52M | } |
1489 | 5.52M | ExecutePostponedLeaderActions(&actions); |
1490 | 5.52M | } |
1491 | | |
1492 | 3.19M | void CheckCompleted(ManagedTransactions::iterator it) { |
1493 | 3.19M | if (it->Completed()) { |
1494 | 1.21M | auto status = STATUS_FORMAT(Expired, "Transaction completed: $0", *it); |
1495 | 18.4E | VLOG_WITH_PREFIX(1) << status; |
1496 | 1.21M | managed_transactions_.modify(it, [&status](TransactionState& state) { |
1497 | 1.21M | state.ClearRequests(status); |
1498 | 1.21M | }); |
1499 | 1.21M | managed_transactions_.erase(it); |
1500 | 1.21M | } |
1501 | 3.19M | } |
1502 | | |
1503 | | TransactionCoordinatorContext& context_; |
1504 | | Counter& expired_metric_; |
1505 | | const std::string log_prefix_; |
1506 | | |
1507 | | std::mutex managed_mutex_; |
1508 | | ManagedTransactions managed_transactions_; |
1509 | | |
1510 | | // Actions that should be executed after mutex is unlocked. |
1511 | | PostponedLeaderActions postponed_leader_actions_; |
1512 | | |
1513 | | rpc::Poller poller_; |
1514 | | rpc::Rpcs rpcs_; |
1515 | | }; |
1516 | | |
1517 | | TransactionCoordinator::TransactionCoordinator(const std::string& permanent_uuid, |
1518 | | TransactionCoordinatorContext* context, |
1519 | | Counter* expired_metric) |
1520 | 48.0k | : impl_(new Impl(permanent_uuid, context, expired_metric)) { |
1521 | 48.0k | } |
1522 | | |
1523 | 432 | TransactionCoordinator::~TransactionCoordinator() { |
1524 | 432 | } |
1525 | | |
1526 | 3.18M | Status TransactionCoordinator::ProcessReplicated(const ReplicatedData& data) { |
1527 | 3.18M | return impl_->ProcessReplicated(data); |
1528 | 3.18M | } |
1529 | | |
1530 | 0 | void TransactionCoordinator::ProcessAborted(const AbortedData& data) { |
1531 | 0 | impl_->ProcessAborted(data); |
1532 | 0 | } |
1533 | | |
1534 | 10.4M | int64_t TransactionCoordinator::PrepareGC(std::string* details) { |
1535 | 10.4M | return impl_->PrepareGC(details); |
1536 | 10.4M | } |
1537 | | |
1538 | 0 | size_t TransactionCoordinator::test_count_transactions() const { |
1539 | 0 | return impl_->test_count_transactions(); |
1540 | 0 | } |
1541 | | |
1542 | | void TransactionCoordinator::Handle( |
1543 | 1.16M | std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) { |
1544 | 1.16M | impl_->Handle(std::move(request), term); |
1545 | 1.16M | } |
1546 | | |
1547 | 47.9k | void TransactionCoordinator::Start() { |
1548 | 47.9k | impl_->Start(); |
1549 | 47.9k | } |
1550 | | |
1551 | 432 | void TransactionCoordinator::Shutdown() { |
1552 | 432 | impl_->Shutdown(); |
1553 | 432 | } |
1554 | | |
1555 | | Status TransactionCoordinator::GetStatus( |
1556 | | const google::protobuf::RepeatedPtrField<std::string>& transaction_ids, |
1557 | | CoarseTimePoint deadline, |
1558 | 296k | tserver::GetTransactionStatusResponsePB* response) { |
1559 | 296k | return impl_->GetStatus(transaction_ids, deadline, response); |
1560 | 296k | } |
1561 | | |
1562 | | void TransactionCoordinator::Abort(const std::string& transaction_id, |
1563 | | int64_t term, |
1564 | 186k | TransactionAbortCallback callback) { |
1565 | 186k | impl_->Abort(transaction_id, term, std::move(callback)); |
1566 | 186k | } |
1567 | | |
1568 | 0 | std::string TransactionCoordinator::DumpTransactions() { |
1569 | 0 | return impl_->DumpTransactions(); |
1570 | 0 | } |
1571 | | |
1572 | 1 | std::string TransactionCoordinator::ReplicatedData::ToString() const { |
1573 | 1 | return Format("{ leader_term: $0 state: $1 op_id: $2 hybrid_time: $3 }", |
1574 | 1 | leader_term, state, op_id, hybrid_time); |
1575 | 1 | } |
1576 | | |
1577 | | } // namespace tablet |
1578 | | } // namespace yb |