/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_coordinator.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/tablet/transaction_coordinator.h" |
17 | | |
18 | | #include <boost/multi_index/hashed_index.hpp> |
19 | | #include <boost/multi_index/mem_fun.hpp> |
20 | | #include <boost/multi_index/ordered_index.hpp> |
21 | | #include <boost/multi_index_container.hpp> |
22 | | |
23 | | #include "yb/client/client.h" |
24 | | #include "yb/client/transaction_rpc.h" |
25 | | |
26 | | #include "yb/common/common.pb.h" |
27 | | #include "yb/common/entity_ids.h" |
28 | | #include "yb/common/pgsql_error.h" |
29 | | #include "yb/common/transaction.h" |
30 | | #include "yb/common/transaction_error.h" |
31 | | #include "yb/common/wire_protocol.h" |
32 | | |
33 | | #include "yb/consensus/consensus_round.h" |
34 | | #include "yb/consensus/consensus_util.h" |
35 | | |
36 | | #include "yb/docdb/transaction_dump.h" |
37 | | |
38 | | #include "yb/rpc/messenger.h" |
39 | | #include "yb/rpc/poller.h" |
40 | | #include "yb/rpc/rpc.h" |
41 | | |
42 | | #include "yb/server/clock.h" |
43 | | |
44 | | #include "yb/tablet/operations/update_txn_operation.h" |
45 | | |
46 | | #include "yb/tserver/tserver_service.pb.h" |
47 | | |
48 | | #include "yb/util/atomic.h" |
49 | | #include "yb/util/countdown_latch.h" |
50 | | #include "yb/util/enums.h" |
51 | | #include "yb/util/flag_tags.h" |
52 | | #include "yb/util/format.h" |
53 | | #include "yb/util/logging.h" |
54 | | #include "yb/util/metrics.h" |
55 | | #include "yb/util/random_util.h" |
56 | | #include "yb/util/result.h" |
57 | | #include "yb/util/scope_exit.h" |
58 | | #include "yb/util/status_format.h" |
59 | | #include "yb/util/status_log.h" |
60 | | #include "yb/util/tsan_util.h" |
61 | | #include "yb/util/yb_pg_errcodes.h" |
62 | | |
63 | | DECLARE_uint64(transaction_heartbeat_usec); |
64 | | DEFINE_double(transaction_max_missed_heartbeat_periods, 10.0, |
65 | | "Maximum heartbeat periods that a pending transaction can miss before the " |
66 | | "transaction coordinator expires the transaction. The total expiration time in " |
67 | | "microseconds is transaction_heartbeat_usec times " |
68 | | "transaction_max_missed_heartbeat_periods. The value passed to this flag may be " |
69 | | "fractional."); |
70 | | DEFINE_uint64(transaction_check_interval_usec, 500000, "Transaction check interval in usec."); |
71 | | DEFINE_uint64(transaction_resend_applying_interval_usec, 5000000, |
72 | | "Transaction resend applying interval in usec."); |
73 | | |
74 | | DEFINE_int64(avoid_abort_after_sealing_ms, 20, |
75 | | "If transaction was only sealed, we will try to abort it not earlier than this " |
76 | | "period in milliseconds."); |
77 | | |
78 | | DEFINE_test_flag(uint64, inject_txn_get_status_delay_ms, 0, |
79 | | "Inject specified delay to transaction get status requests."); |
80 | | DEFINE_test_flag(int64, inject_random_delay_on_txn_status_response_ms, 0, |
81 | | "Inject a random amount of delay to the thread processing a " |
82 | | "GetTransactionStatusRequest after it has populated it's response. This could " |
83 | | "help simulate e.g. out-of-order responses where PENDING is received by client " |
84 | | "after a COMMITTED response."); |
85 | | |
86 | | using namespace std::literals; |
87 | | using namespace std::placeholders; |
88 | | |
89 | | namespace yb { |
90 | | namespace tablet { |
91 | | |
92 | 466k | std::chrono::microseconds GetTransactionTimeout() { |
93 | 466k | const double timeout = GetAtomicFlag(&FLAGS_transaction_max_missed_heartbeat_periods) * |
94 | 466k | GetAtomicFlag(&FLAGS_transaction_heartbeat_usec); |
95 | | // Cast to avoid -Wimplicit-int-float-conversion. |
96 | 466k | return timeout >= static_cast<double>(std::chrono::microseconds::max().count()) |
97 | 0 | ? std::chrono::microseconds::max() |
98 | 466k | : std::chrono::microseconds(static_cast<int64_t>(timeout)); |
99 | 466k | } |
100 | | |
101 | | namespace { |
102 | | |
103 | | struct NotifyApplyingData { |
104 | | TabletId tablet; |
105 | | TransactionId transaction; |
106 | | const AbortedSubTransactionSetPB& aborted; |
107 | | HybridTime commit_time; |
108 | | bool sealed; |
109 | | |
110 | 0 | std::string ToString() const { |
111 | 0 | return Format("{ tablet: $0 transaction: $1 commit_time: $2 sealed: $3}", |
112 | 0 | tablet, transaction, commit_time, sealed); |
113 | 0 | } |
114 | | }; |
115 | | |
116 | | struct ExpectedTabletBatches { |
117 | | TabletId tablet; |
118 | | size_t batches; |
119 | | |
120 | 0 | std::string ToString() const { |
121 | 0 | return Format("{ tablet: $0 batches: $1 }", tablet, batches); |
122 | 0 | } |
123 | | }; |
124 | | |
125 | | // Context for transaction state. I.e. access to external facilities required by |
126 | | // transaction state to do its job. |
127 | | class TransactionStateContext { |
128 | | public: |
129 | | virtual TransactionCoordinatorContext& coordinator_context() = 0; |
130 | | |
131 | | virtual void NotifyApplying(NotifyApplyingData data) = 0; |
132 | | |
133 | | // Submits update transaction to the RAFT log. Returns false if was not able to submit. |
134 | | virtual MUST_USE_RESULT bool SubmitUpdateTransaction( |
135 | | std::unique_ptr<UpdateTxnOperation> operation) = 0; |
136 | | |
137 | | virtual void CompleteWithStatus( |
138 | | std::unique_ptr<UpdateTxnOperation> request, Status status) = 0; |
139 | | |
140 | | virtual void CompleteWithStatus(UpdateTxnOperation* request, Status status) = 0; |
141 | | |
142 | | virtual bool leader() const = 0; |
143 | | |
144 | | protected: |
145 | 128 | ~TransactionStateContext() {} |
146 | | }; |
147 | | |
148 | 727k | std::string BuildLogPrefix(const std::string& parent_log_prefix, const TransactionId& id) { |
149 | 727k | auto id_string = id.ToString(); |
150 | 727k | return parent_log_prefix.substr(0, parent_log_prefix.length() - 2) + " ID " + id_string + ": "; |
151 | 727k | } |
152 | | |
153 | | // TransactionState keeps state of single transaction. |
154 | | // User of this class should guarantee that it does NOT invoke methods concurrently. |
155 | | class TransactionState { |
156 | | public: |
157 | | explicit TransactionState(TransactionStateContext* context, |
158 | | const TransactionId& id, |
159 | | HybridTime last_touch, |
160 | | const std::string& parent_log_prefix) |
161 | | : context_(*context), |
162 | | id_(id), |
163 | | log_prefix_(BuildLogPrefix(parent_log_prefix, id)), |
164 | 727k | last_touch_(last_touch) { |
165 | 727k | } |
166 | | |
167 | 716k | ~TransactionState() { |
168 | 716k | DCHECK(abort_waiters_.empty()); |
169 | 716k | DCHECK(request_queue_.empty()); |
170 | 74 | DCHECK(replicating_ == nullptr) << Format("Replicating: $0", static_cast<void*>(replicating_)); |
171 | 716k | } |
172 | | |
173 | | // Id of transaction. |
174 | 10.1M | const TransactionId& id() const { |
175 | 10.1M | return id_; |
176 | 10.1M | } |
177 | | |
178 | | // Time when we last heard from transaction. I.e. hybrid time of replicated raft log entry |
179 | | // that updates status of this transaction. |
180 | 9.50M | HybridTime last_touch() const { |
181 | 9.50M | return last_touch_; |
182 | 9.50M | } |
183 | | |
184 | | // Status of transaction. |
185 | 381 | TransactionStatus status() const { |
186 | 381 | return status_; |
187 | 381 | } |
188 | | |
189 | | // RAFT index of first RAFT log entry required by this transaction. |
190 | 10.6M | int64_t first_entry_raft_index() const { |
191 | 10.6M | return first_entry_raft_index_; |
192 | 10.6M | } |
193 | | |
194 | 716k | std::string ToString() const { |
195 | 716k | return Format("{ id: $0 last_touch: $1 status: $2 involved_tablets: $3 replicating: $4 " |
196 | 716k | " request_queue: $5 first_entry_raft_index: $6 }", |
197 | 716k | id_, last_touch_, TransactionStatus_Name(status_), |
198 | 716k | involved_tablets_, replicating_, request_queue_, first_entry_raft_index_); |
199 | 716k | } |
200 | | |
201 | | // Whether this transaction expired at specified time. |
202 | 470k | bool ExpiredAt(HybridTime now) const { |
203 | 470k | if (ShouldBeCommitted() || ShouldBeInStatus(TransactionStatus::SEALED)) { |
204 | 4.10k | return false; |
205 | 4.10k | } |
206 | 465k | const int64_t passed = now.GetPhysicalValueMicros() - last_touch_.GetPhysicalValueMicros(); |
207 | 465k | if (std::chrono::microseconds(passed) > GetTransactionTimeout()) { |
208 | 45.8k | return true; |
209 | 45.8k | } |
210 | 420k | return false; |
211 | 420k | } |
212 | | |
213 | | // Whether this transaction has completed. |
214 | 1.87M | bool Completed() const { |
215 | 1.87M | return status_ == TransactionStatus::ABORTED || |
216 | 1.46M | status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS; |
217 | 1.87M | } |
218 | | |
219 | | // Applies new state to transaction. |
220 | 1.87M | CHECKED_STATUS ProcessReplicated(const TransactionCoordinator::ReplicatedData& data) { |
221 | 161 | VLOG_WITH_PREFIX(4) |
222 | 161 | << Format("ProcessReplicated: $0, replicating: $1", data, replicating_); |
223 | | |
224 | 1.87M | if (replicating_ != nullptr) { |
225 | 624k | auto replicating_op_id = replicating_->consensus_round()->id(); |
226 | 624k | if (!replicating_op_id.empty()) { |
227 | 624k | if (replicating_op_id != data.op_id) { |
228 | 0 | LOG_WITH_PREFIX(DFATAL) |
229 | 0 | << "Replicated unexpected operation, replicating: " << AsString(replicating_) |
230 | 0 | << ", replicated: " << AsString(data); |
231 | 0 | } |
232 | 18.4E | } else if (data.leader_term != OpId::kUnknownTerm) { |
233 | 0 | LOG_WITH_PREFIX(DFATAL) |
234 | 0 | << "Leader replicated operation without op id, replicating: " << AsString(replicating_) |
235 | 0 | << ", replicated: " << AsString(data); |
236 | 18.4E | } else { |
237 | 18.4E | LOG_WITH_PREFIX(INFO) << "Cancel replicating without id: " << AsString(replicating_) |
238 | 18.4E | << ", because " << AsString(data) << " was replicated"; |
239 | 18.4E | } |
240 | 624k | replicating_ = nullptr; |
241 | 624k | } |
242 | | |
243 | 1.87M | auto status = DoProcessReplicated(data); |
244 | | |
245 | 1.87M | if (data.leader_term == OpId::kUnknownTerm) { |
246 | 1.24M | ClearRequests(STATUS(IllegalState, "Leader changed")); |
247 | 624k | } else { |
248 | 624k | switch(status_) { |
249 | 104k | case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS: |
250 | 104k | ClearRequests(STATUS(AlreadyPresent, "Transaction committed")); |
251 | 104k | break; |
252 | 134k | case TransactionStatus::ABORTED: |
253 | 134k | ClearRequests( |
254 | 134k | STATUS(Expired, "Transaction aborted", |
255 | 134k | TransactionError(TransactionErrorCode::kAborted))); |
256 | 134k | break; |
257 | 0 | case TransactionStatus::CREATED: FALLTHROUGH_INTENDED; |
258 | 281k | case TransactionStatus::PENDING: FALLTHROUGH_INTENDED; |
259 | 281k | case TransactionStatus::SEALED: FALLTHROUGH_INTENDED; |
260 | 385k | case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED; |
261 | 385k | case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED; |
262 | 385k | case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED; |
263 | 385k | case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED; |
264 | 385k | case TransactionStatus::GRACEFUL_CLEANUP: |
265 | 385k | ProcessQueue(); |
266 | 385k | break; |
267 | 1.87M | } |
268 | 1.87M | } |
269 | | |
270 | 1.87M | return status; |
271 | 1.87M | } |
272 | | |
273 | 0 | void ProcessAborted(const TransactionCoordinator::AbortedData& data) { |
274 | 0 | VLOG_WITH_PREFIX(4) << Format("ProcessAborted: $0, replicating: $1", data.state, replicating_); |
275 | |
|
276 | 0 | LOG_IF(DFATAL, |
277 | 0 | replicating_ != nullptr && !replicating_->op_id().empty() && |
278 | 0 | replicating_->op_id() != data.op_id) |
279 | 0 | << "Aborted wrong operation, expected " << AsString(replicating_) << ", but " |
280 | 0 | << AsString(data) << " aborted"; |
281 | |
|
282 | 0 | replicating_ = nullptr; |
283 | | |
284 | | // We are not leader, so could abort all queued requests. |
285 | 0 | ClearRequests(STATUS(Aborted, "Replication failed")); |
286 | 0 | } |
287 | | |
288 | | // Clear requests of this transaction. |
289 | 2.20M | void ClearRequests(const Status& status) { |
290 | 142 | VLOG_WITH_PREFIX(4) << Format("ClearRequests: $0, replicating: $1", status, replicating_); |
291 | 2.20M | if (replicating_ != nullptr) { |
292 | 0 | context_.CompleteWithStatus(replicating_, status); |
293 | 0 | replicating_ = nullptr; |
294 | 0 | } |
295 | | |
296 | 3.03k | for (auto& entry : request_queue_) { |
297 | 3.03k | context_.CompleteWithStatus(std::move(entry), status); |
298 | 3.03k | } |
299 | 2.20M | request_queue_.clear(); |
300 | | |
301 | 2.20M | NotifyAbortWaiters(status); |
302 | 2.20M | } |
303 | | |
304 | | // Used only during transaction sealing. |
305 | 0 | void ReplicatedAllBatchesAt(const TabletId& tablet, HybridTime last_time) { |
306 | 0 | auto it = involved_tablets_.find(tablet); |
307 | | // We could be notified several times, so avoid double handling. |
308 | 0 | if (it == involved_tablets_.end() || it->second.all_batches_replicated) { |
309 | 0 | return; |
310 | 0 | } |
311 | | |
312 | | // If transaction was sealed, then its commit time is max of seal record time and intent |
313 | | // replication times from all participating tablets. |
314 | 0 | commit_time_ = std::max(commit_time_, last_time); |
315 | 0 | --tablets_with_not_replicated_batches_; |
316 | 0 | it->second.all_batches_replicated = true; |
317 | |
|
318 | 0 | if (tablets_with_not_replicated_batches_ == 0) { |
319 | 0 | StartApply(); |
320 | 0 | } |
321 | 0 | } |
322 | | |
323 | 19.0k | const AbortedSubTransactionSetPB& GetAbortedSubTransactionSetPB() const { return aborted_; } |
324 | | |
325 | | Result<TransactionStatusResult> GetStatus( |
326 | 158k | std::vector<ExpectedTabletBatches>* expected_tablet_batches) const { |
327 | 158k | switch (status_) { |
328 | 19.0k | case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED; |
329 | 19.0k | case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS: |
330 | 19.0k | return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_}; |
331 | 0 | case TransactionStatus::SEALED: |
332 | 0 | if (tablets_with_not_replicated_batches_ == 0) { |
333 | 0 | return TransactionStatusResult{TransactionStatus::COMMITTED, commit_time_}; |
334 | 0 | } |
335 | 0 | FillExpectedTabletBatches(expected_tablet_batches); |
336 | 0 | return TransactionStatusResult{TransactionStatus::SEALED, commit_time_}; |
337 | 0 | case TransactionStatus::ABORTED: |
338 | 0 | return TransactionStatusResult{TransactionStatus::ABORTED, HybridTime::kMax}; |
339 | 139k | case TransactionStatus::PENDING: { |
340 | 139k | HybridTime status_ht; |
341 | 139k | if (replicating_) { |
342 | 56.2k | auto replicating_status = replicating_->request()->status(); |
343 | 56.2k | if (replicating_status == TransactionStatus::COMMITTED || |
344 | 45.3k | replicating_status == TransactionStatus::ABORTED) { |
345 | 45.3k | auto replicating_ht = replicating_->hybrid_time_even_if_unset(); |
346 | 45.3k | if (replicating_ht.is_valid()) { |
347 | 39.9k | status_ht = replicating_ht; |
348 | 5.42k | } else { |
349 | | // Hybrid time now yet assigned to replicating, so assign more conservative time, |
350 | | // that is guaranteed to be less then replicating time. See GH #9981. |
351 | 5.42k | status_ht = replicating_submit_time_; |
352 | 5.42k | } |
353 | 45.3k | } |
354 | 56.2k | } |
355 | 139k | if (!status_ht) { |
356 | 93.8k | status_ht = context_.coordinator_context().clock().Now(); |
357 | 93.8k | } |
358 | 139k | status_ht = std::min(status_ht, context_.coordinator_context().HtLeaseExpiration()); |
359 | 139k | return TransactionStatusResult{TransactionStatus::PENDING, status_ht.Decremented()}; |
360 | 0 | } |
361 | 0 | case TransactionStatus::CREATED: FALLTHROUGH_INTENDED; |
362 | 0 | case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED; |
363 | 0 | case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED; |
364 | 0 | case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED; |
365 | 0 | case TransactionStatus::GRACEFUL_CLEANUP: |
366 | 0 | return STATUS_FORMAT(Corruption, "Transaction has unexpected status: $0", |
367 | 0 | TransactionStatus_Name(status_)); |
368 | 0 | } |
369 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, status_); |
370 | 0 | } |
371 | | |
372 | 0 | void Aborted() { |
373 | 0 | status_ = TransactionStatus::ABORTED; |
374 | 0 | NotifyAbortWaiters(TransactionStatusResult::Aborted()); |
375 | 0 | } |
376 | | |
377 | 115k | TransactionStatusResult Abort(TransactionAbortCallback* callback) { |
378 | 115k | if (status_ == TransactionStatus::COMMITTED || |
379 | 113k | status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS) { |
380 | 2.68k | return TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_); |
381 | 113k | } else if (ShouldBeCommitted()) { |
382 | 23.0k | return TransactionStatusResult(TransactionStatus::COMMITTED, HybridTime::kMax); |
383 | 90.1k | } else if (status_ == TransactionStatus::ABORTED) { |
384 | 0 | return TransactionStatusResult::Aborted(); |
385 | 90.1k | } else { |
386 | 33 | VLOG_WITH_PREFIX(1) << "External abort request"; |
387 | 90.1k | CHECK_EQ(TransactionStatus::PENDING, status_); |
388 | 90.1k | abort_waiters_.emplace_back(std::move(*callback)); |
389 | 90.1k | Abort(); |
390 | 90.1k | return TransactionStatusResult(TransactionStatus::PENDING, HybridTime::kMax); |
391 | 90.1k | } |
392 | 115k | } |
393 | | |
394 | 636k | void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request) { |
395 | 636k | auto& state = *request->request(); |
396 | 18.4E | VLOG_WITH_PREFIX(1) << "Handle: " << state.ShortDebugString(); |
397 | 636k | if (state.status() == TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS) { |
398 | 246k | auto status = AppliedInOneOfInvolvedTablets(state); |
399 | 246k | context_.CompleteWithStatus(std::move(request), status); |
400 | 246k | return; |
401 | 246k | } |
402 | 390k | if (replicating_) { |
403 | 3.83k | request_queue_.push_back(std::move(request)); |
404 | 3.83k | return; |
405 | 3.83k | } |
406 | 386k | DoHandle(std::move(request)); |
407 | 386k | } |
408 | | |
409 | | // Aborts this transaction. |
410 | 135k | void Abort() { |
411 | 135k | if (ShouldBeCommitted()) { |
412 | 0 | LOG_WITH_PREFIX(DFATAL) << "Transaction abort in wrong state: " << status_; |
413 | 0 | return; |
414 | 0 | } |
415 | 135k | if (ShouldBeAborted()) { |
416 | 314 | return; |
417 | 314 | } |
418 | 135k | if (status_ != TransactionStatus::PENDING) { |
419 | 0 | LOG_WITH_PREFIX(DFATAL) << "Unexpected status during abort: " |
420 | 0 | << TransactionStatus_Name(status_); |
421 | 0 | return; |
422 | 0 | } |
423 | 135k | SubmitUpdateStatus(TransactionStatus::ABORTED); |
424 | 135k | } |
425 | | |
426 | | // Returns logs prefix for this transaction. |
427 | 0 | const std::string& LogPrefix() { |
428 | 0 | return log_prefix_; |
429 | 0 | } |
430 | | |
431 | | // now_physical is just optimization to avoid querying the current time multiple times. |
432 | 412k | void Poll(bool leader, MonoTime now_physical) { |
433 | 412k | if (status_ != TransactionStatus::COMMITTED && |
434 | 402k | (status_ != TransactionStatus::SEALED || tablets_with_not_replicated_batches_ != 0)) { |
435 | 402k | return; |
436 | 402k | } |
437 | 10.0k | if (tablets_with_not_applied_intents_ == 0) { |
438 | 888 | if (leader && !ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS)) { |
439 | 27 | SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS); |
440 | 27 | } |
441 | 9.15k | } else if (now_physical >= resend_applying_time_) { |
442 | 3 | if (leader) { |
443 | 18 | for (auto& tablet : involved_tablets_) { |
444 | 18 | if (!tablet.second.all_intents_applied) { |
445 | 5 | context_.NotifyApplying({ |
446 | 5 | .tablet = tablet.first, |
447 | 5 | .transaction = id_, |
448 | 5 | .aborted = aborted_, |
449 | 5 | .commit_time = commit_time_, |
450 | 5 | .sealed = status_ == TransactionStatus::SEALED }); |
451 | 5 | } |
452 | 18 | } |
453 | 2 | } |
454 | 3 | resend_applying_time_ = now_physical + |
455 | 3 | std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec); |
456 | 3 | } |
457 | 10.0k | } |
458 | | |
459 | | void AddInvolvedTablets( |
460 | 0 | const TabletId& source_tablet_id, const std::vector<TabletId>& tablet_ids) { |
461 | 0 | auto source_it = involved_tablets_.find(source_tablet_id); |
462 | 0 | if (source_it == involved_tablets_.end()) { |
463 | 0 | LOG(FATAL) << "Unknown involved tablet: " << source_tablet_id; |
464 | 0 | return; |
465 | 0 | } |
466 | 0 | for (const auto& tablet_id : tablet_ids) { |
467 | 0 | if (involved_tablets_.emplace(tablet_id, source_it->second).second) { |
468 | 0 | ++tablets_with_not_applied_intents_; |
469 | 0 | } |
470 | 0 | } |
471 | 0 | if (!source_it->second.all_intents_applied) { |
472 | | // Mark source tablet as if intents have been applied for it. |
473 | 0 | --tablets_with_not_applied_intents_; |
474 | 0 | source_it->second.all_intents_applied = true; |
475 | 0 | } |
476 | 0 | } |
477 | | |
478 | 246k | CHECKED_STATUS AppliedInOneOfInvolvedTablets(const TransactionStatePB& state) { |
479 | 246k | if (status_ != TransactionStatus::COMMITTED && status_ != TransactionStatus::SEALED) { |
480 | | // We could ignore this request, because it will be re-send if required. |
481 | 0 | LOG_WITH_PREFIX(DFATAL) |
482 | 0 | << "AppliedInOneOfInvolvedTablets in wrong state: " << TransactionStatus_Name(status_) |
483 | 0 | << ", request: " << state.ShortDebugString(); |
484 | 0 | return Status::OK(); |
485 | 0 | } |
486 | | |
487 | 246k | if (state.tablets_size() != 1) { |
488 | 0 | return STATUS_FORMAT( |
489 | 0 | InvalidArgument, "Expected exactly one tablet in $0: $1", __func__, state); |
490 | 0 | } |
491 | | |
492 | 246k | auto it = involved_tablets_.find(state.tablets(0)); |
493 | 246k | if (it == involved_tablets_.end()) { |
494 | | // This can happen when transaction coordinator retried apply to post-split tablets, |
495 | | // transaction coordinator moved to new status tablet leader and here new transaction |
496 | | // coordinator receives notification about txn is applied in post-split tablet not yet known |
497 | | // to new transaction coordinator. |
498 | | // It is safe to just log warning and ignore, because new transaction coordinator is sending |
499 | | // again apply requests to all involved tablet it knows and will be retrying for ones that |
500 | | // will reply have been already split. |
501 | 0 | LOG_WITH_PREFIX(WARNING) << "Applied in unknown tablet: " << state.tablets(0); |
502 | 0 | return Status::OK(); |
503 | 0 | } |
504 | 246k | if (!it->second.all_intents_applied) { |
505 | 246k | --tablets_with_not_applied_intents_; |
506 | 246k | it->second.all_intents_applied = true; |
507 | 18.4E | VLOG_WITH_PREFIX(4) << "Applied to " << state.tablets(0) << ", left not applied: " |
508 | 18.4E | << tablets_with_not_applied_intents_; |
509 | 246k | if (tablets_with_not_applied_intents_ == 0) { |
510 | 104k | SubmitUpdateStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS); |
511 | 104k | } |
512 | 246k | } |
513 | 246k | return Status::OK(); |
514 | 246k | } |
515 | | |
516 | | private: |
517 | | // Checks whether we in specified status or going to be in this status when replication is |
518 | | // finished. |
519 | 2.01M | bool ShouldBeInStatus(TransactionStatus status) const { |
520 | 2.01M | if (status_ == status) { |
521 | 3.64k | return true; |
522 | 3.64k | } |
523 | 2.00M | if (replicating_) { |
524 | 65.2k | if (replicating_->request()->status() == status) { |
525 | 21.7k | return true; |
526 | 21.7k | } |
527 | | |
528 | 43.5k | for (const auto& entry : request_queue_) { |
529 | 4.81k | if (entry->request()->status() == status) { |
530 | 2.95k | return true; |
531 | 2.95k | } |
532 | 4.81k | } |
533 | 43.5k | } |
534 | | |
535 | 1.98M | return false; |
536 | 2.00M | } |
537 | | |
538 | 719k | bool ShouldBeCommitted() const { |
539 | 719k | return ShouldBeInStatus(TransactionStatus::COMMITTED) || |
540 | 691k | ShouldBeInStatus(TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS); |
541 | 719k | } |
542 | | |
543 | 135k | bool ShouldBeAborted() const { |
544 | 135k | return ShouldBeInStatus(TransactionStatus::ABORTED); |
545 | 135k | } |
546 | | |
547 | | // Process operation that was replicated in RAFT. |
548 | 1.87M | CHECKED_STATUS DoProcessReplicated(const TransactionCoordinator::ReplicatedData& data) { |
549 | 1.87M | switch (data.state.status()) { |
550 | 404k | case TransactionStatus::ABORTED: |
551 | 404k | return AbortedReplicationFinished(data); |
552 | 0 | case TransactionStatus::SEALED: |
553 | 0 | return SealedReplicationFinished(data); |
554 | 311k | case TransactionStatus::COMMITTED: |
555 | 311k | return CommittedReplicationFinished(data); |
556 | 727k | case TransactionStatus::CREATED: FALLTHROUGH_INTENDED; |
557 | 843k | case TransactionStatus::PENDING: |
558 | 843k | return PendingReplicationFinished(data); |
559 | 0 | case TransactionStatus::APPLYING: |
560 | | // APPLYING is handled separately, because it is received for transactions not managed by |
561 | | // this tablet as a transaction status tablet, but tablets that are involved in the data |
562 | | // path (receive write intents) for this transactions |
563 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
564 | 0 | case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: |
565 | | // APPLIED_IN_ONE_OF_INVOLVED_TABLETS handled w/o use of RAFT log |
566 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
567 | 311k | case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS: |
568 | 311k | return AppliedInAllInvolvedTabletsReplicationFinished(data); |
569 | 0 | case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED; |
570 | 0 | case TransactionStatus::GRACEFUL_CLEANUP: |
571 | | // CLEANUP is handled separately, because it is received for transactions not managed by |
572 | | // this tablet as a transaction status tablet, but tablets that are involved in the data |
573 | | // path (receive write intents) for this transactions |
574 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
575 | 1.87M | } |
576 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status()); |
577 | 0 | } |
578 | | |
579 | 394k | void DoHandle(std::unique_ptr<tablet::UpdateTxnOperation> request) { |
580 | 394k | const auto& state = *request->request(); |
581 | | |
582 | 394k | Status status; |
583 | 394k | auto txn_status = state.status(); |
584 | 394k | if (txn_status == TransactionStatus::COMMITTED) { |
585 | 104k | status = HandleCommit(); |
586 | 289k | } else if (txn_status == TransactionStatus::PENDING || |
587 | 282k | txn_status == TransactionStatus::CREATED) { |
588 | | // Handling txn_status of CREATED when the current status (status_) is PENDING is only |
589 | | // allowed for backward compatibility with versions prior to D11210, which could send |
590 | | // transaction creation retries with the same id. |
591 | 282k | if (status_ != TransactionStatus::PENDING) { |
592 | 4 | status = STATUS_FORMAT(IllegalState, |
593 | 4 | "Transaction in wrong state during heartbeat: $0", |
594 | 4 | TransactionStatus_Name(status_)); |
595 | 4 | } |
596 | 282k | } |
597 | | |
598 | 394k | if (!status.ok()) { |
599 | 4 | context_.CompleteWithStatus(std::move(request), std::move(status)); |
600 | 4 | return; |
601 | 4 | } |
602 | | |
603 | 394k | VLOG_WITH_PREFIX(4) << Format("DoHandle, replicating = $0", replicating_); |
604 | 394k | auto submitted = SubmitRequest(std::move(request)); |
605 | | // Should always succeed, since we execute this code only on the leader. |
606 | 103 | CHECK(submitted) << "Status: " << TransactionStatus_Name(txn_status); |
607 | 394k | } |
608 | | |
609 | 104k | CHECKED_STATUS HandleCommit() { |
610 | 104k | auto hybrid_time = context_.coordinator_context().clock().Now(); |
611 | 104k | if (ExpiredAt(hybrid_time)) { |
612 | 0 | auto status = STATUS(Expired, "Commit of expired transaction"); |
613 | 0 | VLOG_WITH_PREFIX(4) << status; |
614 | 0 | Abort(); |
615 | 0 | return status; |
616 | 0 | } |
617 | 104k | if (status_ != TransactionStatus::PENDING) { |
618 | 0 | return STATUS_FORMAT(IllegalState, |
619 | 0 | "Transaction in wrong state when starting to commit: $0", |
620 | 0 | TransactionStatus_Name(status_)); |
621 | 0 | } |
622 | | |
623 | 104k | return Status::OK(); |
624 | 104k | } |
625 | | |
626 | 239k | void SubmitUpdateStatus(TransactionStatus status) { |
627 | 18.4E | VLOG_WITH_PREFIX(4) << "SubmitUpdateStatus(" << TransactionStatus_Name(status) << ")"; |
628 | | |
629 | 239k | TransactionStatePB state; |
630 | 239k | state.set_transaction_id(id_.data(), id_.size()); |
631 | 239k | state.set_status(status); |
632 | | |
633 | 239k | auto request = context_.coordinator_context().CreateUpdateTransaction(&state); |
634 | 239k | if (replicating_) { |
635 | 7.37k | request_queue_.push_back(std::move(request)); |
636 | 232k | } else { |
637 | 232k | SubmitRequest(std::move(request)); |
638 | 232k | } |
639 | 239k | } |
640 | | |
641 | 626k | bool SubmitRequest(std::unique_ptr<tablet::UpdateTxnOperation> request) { |
642 | 626k | replicating_ = request.get(); |
643 | 626k | replicating_submit_time_ = context_.coordinator_context().clock().Now(); |
644 | 18.4E | VLOG_WITH_PREFIX(4) << Format("SubmitUpdateStatus, replicating = $0", replicating_); |
645 | 626k | if (!context_.SubmitUpdateTransaction(std::move(request))) { |
646 | | // Was not able to submit update transaction, for instance we are not leader. |
647 | | // So we are not replicating. |
648 | 27 | replicating_ = nullptr; |
649 | 27 | return false; |
650 | 27 | } |
651 | | |
652 | 626k | return true; |
653 | 626k | } |
654 | | |
655 | 385k | void ProcessQueue() { |
656 | 393k | while (!replicating_ && !request_queue_.empty()) { |
657 | 7.62k | auto request = std::move(request_queue_.front()); |
658 | 7.62k | request_queue_.pop_front(); |
659 | 7.62k | DoHandle(std::move(request)); |
660 | 7.62k | } |
661 | 385k | } |
662 | | |
663 | 404k | CHECKED_STATUS AbortedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) { |
664 | 404k | if (status_ != TransactionStatus::ABORTED && |
665 | 404k | status_ != TransactionStatus::PENDING) { |
666 | 0 | LOG_WITH_PREFIX(DFATAL) << "Invalid status of aborted transaction: " |
667 | 0 | << TransactionStatus_Name(status_); |
668 | 0 | } |
669 | | |
670 | 404k | status_ = TransactionStatus::ABORTED; |
671 | 404k | first_entry_raft_index_ = data.op_id.index; |
672 | 404k | NotifyAbortWaiters(TransactionStatusResult::Aborted()); |
673 | 404k | return Status::OK(); |
674 | 404k | } |
675 | | |
676 | | CHECKED_STATUS SealedReplicationFinished( |
677 | 0 | const TransactionCoordinator::ReplicatedData& data) { |
678 | 0 | if (status_ != TransactionStatus::PENDING) { |
679 | 0 | auto status = STATUS_FORMAT( |
680 | 0 | IllegalState, |
681 | 0 | "Unexpected status during CommittedReplicationFinished: $0", |
682 | 0 | TransactionStatus_Name(status_)); |
683 | 0 | LOG_WITH_PREFIX(DFATAL) << status; |
684 | 0 | return status; |
685 | 0 | } |
686 | | |
687 | 0 | last_touch_ = data.hybrid_time; |
688 | 0 | commit_time_ = data.hybrid_time; |
689 | | // TODO(dtxn) Not yet implemented |
690 | 0 | next_abort_after_sealing_ = CoarseMonoClock::now() + FLAGS_avoid_abort_after_sealing_ms * 1ms; |
691 | | // TODO(savepoints) Savepoints with sealed transactions is not yet tested |
692 | 0 | aborted_ = data.state.aborted(); |
693 | 0 | VLOG_WITH_PREFIX(4) << "Seal time: " << commit_time_; |
694 | 0 | status_ = TransactionStatus::SEALED; |
695 | |
|
696 | 0 | involved_tablets_.reserve(data.state.tablets().size()); |
697 | 0 | for (int idx = 0; idx != data.state.tablets().size(); ++idx) { |
698 | 0 | auto tablet_batches = data.state.tablet_batches(idx); |
699 | 0 | LOG_IF_WITH_PREFIX(DFATAL, tablet_batches == 0) |
700 | 0 | << "Tablet without batches: " << data.state.ShortDebugString(); |
701 | 0 | ++tablets_with_not_replicated_batches_; |
702 | 0 | InvolvedTabletState state = { |
703 | 0 | .required_replicated_batches = static_cast<size_t>(tablet_batches), |
704 | 0 | .all_batches_replicated = false, |
705 | 0 | .all_intents_applied = false |
706 | 0 | }; |
707 | 0 | involved_tablets_.emplace(data.state.tablets(idx), state); |
708 | 0 | } |
709 | |
|
710 | 0 | first_entry_raft_index_ = data.op_id.index; |
711 | 0 | return Status::OK(); |
712 | 0 | } |
713 | | |
714 | 311k | CHECKED_STATUS CommittedReplicationFinished(const TransactionCoordinator::ReplicatedData& data) { |
715 | 311k | if (status_ != TransactionStatus::PENDING) { |
716 | 0 | auto status = STATUS_FORMAT( |
717 | 0 | IllegalState, |
718 | 0 | "Unexpected status during CommittedReplicationFinished: $0", |
719 | 0 | TransactionStatus_Name(status_)); |
720 | 0 | LOG_WITH_PREFIX(DFATAL) << status; |
721 | 0 | return status; |
722 | 0 | } |
723 | | |
724 | 311k | YB_TRANSACTION_DUMP(Commit, id_, data.hybrid_time, data.state.tablets().size()); |
725 | | |
726 | 311k | last_touch_ = data.hybrid_time; |
727 | 311k | commit_time_ = data.hybrid_time; |
728 | 311k | first_entry_raft_index_ = data.op_id.index; |
729 | 311k | aborted_ = data.state.aborted(); |
730 | | |
731 | 311k | involved_tablets_.reserve(data.state.tablets().size()); |
732 | 738k | for (const auto& tablet : data.state.tablets()) { |
733 | 738k | InvolvedTabletState state = { |
734 | 738k | .required_replicated_batches = 0, |
735 | 738k | .all_batches_replicated = true, |
736 | 738k | .all_intents_applied = false |
737 | 738k | }; |
738 | 738k | involved_tablets_.emplace(tablet, state); |
739 | 738k | } |
740 | | |
741 | 311k | status_ = TransactionStatus::COMMITTED; |
742 | 311k | StartApply(); |
743 | 311k | return Status::OK(); |
744 | 311k | } |
745 | | |
746 | | CHECKED_STATUS AppliedInAllInvolvedTabletsReplicationFinished( |
747 | 311k | const TransactionCoordinator::ReplicatedData& data) { |
748 | 311k | if (status_ != TransactionStatus::COMMITTED && status_ != TransactionStatus::SEALED) { |
749 | | // That could happen in old version, because we could drop all entries before |
750 | | // APPLIED_IN_ALL_INVOLVED_TABLETS. |
751 | 0 | LOG_WITH_PREFIX(DFATAL) |
752 | 0 | << "AppliedInAllInvolvedTabletsReplicationFinished in wrong state: " |
753 | 0 | << TransactionStatus_Name(status_) << ", request: " << data.state.ShortDebugString(); |
754 | 0 | CHECK_EQ(status_, TransactionStatus::PENDING); |
755 | 0 | } |
756 | 16 | VLOG_WITH_PREFIX(4) << __func__ << ", status: " << TransactionStatus_Name(status_) |
757 | 16 | << ", leader: " << context_.leader(); |
758 | 311k | last_touch_ = data.hybrid_time; |
759 | 311k | status_ = TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS; |
760 | | |
761 | 311k | YB_TRANSACTION_DUMP(Applied, id_, data.hybrid_time); |
762 | | |
763 | 311k | return Status::OK(); |
764 | 311k | } |
765 | | |
766 | | // Used for PENDING and CREATED records. Because when we apply replicated operations they have |
767 | | // the same meaning. |
768 | 843k | CHECKED_STATUS PendingReplicationFinished(const TransactionCoordinator::ReplicatedData& data) { |
769 | 843k | if (context_.leader() && ExpiredAt(data.hybrid_time)) { |
770 | 0 | VLOG_WITH_PREFIX(4) << "Expired during replication of PENDING or CREATED operations."; |
771 | 45.4k | Abort(); |
772 | 45.4k | return Status::OK(); |
773 | 45.4k | } |
774 | 798k | if (status_ != TransactionStatus::PENDING) { |
775 | 0 | LOG_WITH_PREFIX(DFATAL) << "Bad status during " << __func__ << "(" << data.ToString() |
776 | 0 | << "): " << ToString(); |
777 | 0 | return Status::OK(); |
778 | 0 | } |
779 | 798k | last_touch_ = data.hybrid_time; |
780 | 798k | first_entry_raft_index_ = data.op_id.index; |
781 | 798k | return Status::OK(); |
782 | 798k | } |
783 | | |
784 | 2.91M | void NotifyAbortWaiters(const Result<TransactionStatusResult>& result) { |
785 | 89.2k | for (auto& waiter : abort_waiters_) { |
786 | 89.2k | waiter(result); |
787 | 89.2k | } |
788 | 2.91M | abort_waiters_.clear(); |
789 | 2.91M | } |
790 | | |
791 | 312k | void StartApply() { |
792 | 14 | VLOG_WITH_PREFIX(4) << __func__ << ", commit time: " << commit_time_ << ", involved tablets: " |
793 | 14 | << AsString(involved_tablets_); |
794 | 312k | resend_applying_time_ = MonoTime::Now() + |
795 | 312k | std::chrono::microseconds(FLAGS_transaction_resend_applying_interval_usec); |
796 | 312k | tablets_with_not_applied_intents_ = involved_tablets_.size(); |
797 | 312k | if (context_.leader()) { |
798 | 246k | for (const auto& tablet : involved_tablets_) { |
799 | 246k | context_.NotifyApplying({ |
800 | 246k | .tablet = tablet.first, |
801 | 246k | .transaction = id_, |
802 | 246k | .aborted = aborted_, |
803 | 246k | .commit_time = commit_time_, |
804 | 246k | .sealed = status_ == TransactionStatus::SEALED}); |
805 | 246k | } |
806 | 104k | } |
807 | 312k | NotifyAbortWaiters(TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_)); |
808 | 312k | } |
809 | | |
810 | | void FillExpectedTabletBatches( |
811 | 0 | std::vector<ExpectedTabletBatches>* expected_tablet_batches) const { |
812 | 0 | if (!expected_tablet_batches) { |
813 | 0 | return; |
814 | 0 | } |
815 | | |
816 | 0 | for (const auto& tablet_id_and_state : involved_tablets_) { |
817 | 0 | if (!tablet_id_and_state.second.all_batches_replicated) { |
818 | 0 | expected_tablet_batches->push_back(ExpectedTabletBatches{ |
819 | 0 | .tablet = tablet_id_and_state.first, |
820 | 0 | .batches = tablet_id_and_state.second.required_replicated_batches |
821 | 0 | }); |
822 | 0 | } |
823 | 0 | } |
824 | 0 | } |
825 | | |
826 | | TransactionStateContext& context_; |
827 | | const TransactionId id_; |
828 | | const std::string log_prefix_; |
829 | | TransactionStatus status_ = TransactionStatus::PENDING; |
830 | | HybridTime last_touch_; |
831 | | // It should match last_touch_, but it is possible that because of some code errors it |
832 | | // would not be so. To add stability we introduce a separate field for it. |
833 | | HybridTime commit_time_; |
834 | | |
835 | | // If transaction was only sealed, we will try to abort it not earlier than this time. |
836 | | CoarseTimePoint next_abort_after_sealing_; |
837 | | |
838 | | struct InvolvedTabletState { |
839 | | // How many batches should be replicated at this tablet. |
840 | | size_t required_replicated_batches = 0; |
841 | | |
842 | | // True if this tablet already replicated all batches. |
843 | | bool all_batches_replicated = false; |
844 | | |
845 | | // True if this tablet already applied all intents. |
846 | | bool all_intents_applied = false; |
847 | | |
848 | 737k | std::string ToString() const { |
849 | 737k | return Format("{ required_replicated_batches: $0 all_batches_replicated: $1 " |
850 | 737k | "all_intents_applied: $2 }", |
851 | 737k | required_replicated_batches, all_batches_replicated, all_intents_applied); |
852 | 737k | } |
853 | | }; |
854 | | |
855 | | // Tablets participating in this transaction. |
856 | | std::unordered_map<TabletId, InvolvedTabletState> involved_tablets_; |
857 | | // Number of tablets that have not yet replicated all batches. |
858 | | size_t tablets_with_not_replicated_batches_ = 0; |
859 | | // Number of tablets that have not yet applied intents. |
860 | | size_t tablets_with_not_applied_intents_ = 0; |
861 | | // Don't resend applying until this time. |
862 | | MonoTime resend_applying_time_; |
863 | | int64_t first_entry_raft_index_ = std::numeric_limits<int64_t>::max(); |
864 | | |
865 | | // Metadata tracking aborted subtransaction IDs in this transaction. |
866 | | AbortedSubTransactionSetPB aborted_; |
867 | | |
868 | | // The operation that we a currently replicating in RAFT. |
869 | | // It is owned by TransactionDriver (that will be renamed to OperationDriver). |
870 | | tablet::UpdateTxnOperation* replicating_ = nullptr; |
871 | | // Hybrid time before submitting replicating operation. |
872 | | // It is guaranteed to be less then actual operation hybrid time. |
873 | | HybridTime replicating_submit_time_; |
874 | | std::deque<std::unique_ptr<tablet::UpdateTxnOperation>> request_queue_; |
875 | | |
876 | | std::vector<TransactionAbortCallback> abort_waiters_; |
877 | | }; |
878 | | |
879 | | struct CompleteWithStatusEntry { |
880 | | std::unique_ptr<UpdateTxnOperation> holder; |
881 | | UpdateTxnOperation* request; |
882 | | Status status; |
883 | | }; |
884 | | |
885 | | // Contains actions that should be executed after lock in transaction coordinator is released. |
886 | | struct PostponedLeaderActions { |
887 | | int64_t leader_term = OpId::kUnknownTerm; |
888 | | // List of tablets with transaction id, that should be notified that this transaction |
889 | | // is applying. |
890 | | std::vector<NotifyApplyingData> notify_applying; |
891 | | // List of update transaction records, that should be replicated via RAFT. |
892 | | std::vector<std::unique_ptr<UpdateTxnOperation>> updates; |
893 | | |
894 | | std::vector<CompleteWithStatusEntry> complete_with_status; |
895 | | |
896 | 4.21M | void Swap(PostponedLeaderActions* other) { |
897 | 4.21M | std::swap(leader_term, other->leader_term); |
898 | 4.21M | notify_applying.swap(other->notify_applying); |
899 | 4.21M | updates.swap(other->updates); |
900 | 4.21M | complete_with_status.swap(other->complete_with_status); |
901 | 4.21M | } |
902 | | |
903 | 6.23M | bool leader() const { |
904 | 6.23M | return leader_term != OpId::kUnknownTerm; |
905 | 6.23M | } |
906 | | }; |
907 | | |
908 | | } // namespace |
909 | | |
910 | 0 | std::string TransactionCoordinator::AbortedData::ToString() const { |
911 | 0 | return YB_STRUCT_TO_STRING(state, op_id); |
912 | 0 | } |
913 | | |
914 | | // Real implementation of transaction coordinator, as in PImpl idiom. |
915 | | class TransactionCoordinator::Impl : public TransactionStateContext { |
916 | | public: |
917 | | Impl(const std::string& permanent_uuid, |
918 | | TransactionCoordinatorContext* context, |
919 | | Counter* expired_metric) |
920 | | : context_(*context), |
921 | | expired_metric_(*expired_metric), |
922 | | log_prefix_(consensus::MakeTabletLogPrefix(context->tablet_id(), permanent_uuid)), |
923 | 29.1k | poller_(log_prefix_, std::bind(&Impl::Poll, this)) { |
924 | 29.1k | } |
925 | | |
926 | 128 | virtual ~Impl() { |
927 | 128 | Shutdown(); |
928 | 128 | } |
929 | | |
930 | 255 | void Shutdown() { |
931 | 255 | poller_.Shutdown(); |
932 | 255 | rpcs_.Shutdown(); |
933 | 255 | } |
934 | | |
935 | | CHECKED_STATUS GetStatus(const google::protobuf::RepeatedPtrField<std::string>& transaction_ids, |
936 | | CoarseTimePoint deadline, |
937 | 208k | tserver::GetTransactionStatusResponsePB* response) { |
938 | 208k | AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms); |
939 | 208k | auto leader_term = context_.LeaderTerm(); |
940 | 208k | PostponedLeaderActions postponed_leader_actions; |
941 | 208k | { |
942 | 208k | std::unique_lock<std::mutex> lock(managed_mutex_); |
943 | 208k | HybridTime leader_safe_time; |
944 | 208k | postponed_leader_actions_.leader_term = leader_term; |
945 | 208k | for (const auto& transaction_id : transaction_ids) { |
946 | 208k | auto id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_id)); |
947 | | |
948 | 208k | auto it = managed_transactions_.find(id); |
949 | 208k | std::vector<ExpectedTabletBatches> expected_tablet_batches; |
950 | 208k | bool known_txn = it != managed_transactions_.end(); |
951 | 208k | auto txn_status_with_ht = known_txn |
952 | 208k | ? VERIFY_RESULT(it->GetStatus(&expected_tablet_batches)) |
953 | 50.1k | : TransactionStatusResult(TransactionStatus::ABORTED, HybridTime::kMax); |
954 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << ": " << id << " => " << txn_status_with_ht |
955 | 18.4E | << ", last touch: " << it->last_touch(); |
956 | 208k | if (txn_status_with_ht.status == TransactionStatus::SEALED) { |
957 | | // TODO(dtxn) Avoid concurrent resolve |
958 | 0 | txn_status_with_ht = VERIFY_RESULT(ResolveSealedStatus( |
959 | 0 | id, txn_status_with_ht.status_time, expected_tablet_batches, |
960 | 0 | /* abort_if_not_replicated = */ false, &lock)); |
961 | 0 | } |
962 | 208k | if (!known_txn) { |
963 | 50.1k | if (!leader_safe_time) { |
964 | | // We should pick leader safe time only after managed_mutex_ is locked. |
965 | | // Otherwise applied transaction could be removed after this safe time. |
966 | 50.1k | leader_safe_time = VERIFY_RESULT(context_.LeaderSafeTime()); |
967 | 50.1k | } |
968 | | // Please note that for known transactions we send 0, that means invalid hybrid time. |
969 | | // We would wait for safe time only for case when transaction is unknown to coordinator. |
970 | | // Since it is only case when transaction could be actually committed. |
971 | 50.1k | response->mutable_coordinator_safe_time()->Resize(response->status().size(), 0); |
972 | 50.1k | response->add_coordinator_safe_time(leader_safe_time.ToUint64()); |
973 | 50.1k | } |
974 | 208k | response->add_status(txn_status_with_ht.status); |
975 | 208k | response->add_status_hybrid_time(txn_status_with_ht.status_time.ToUint64()); |
976 | | |
977 | 208k | auto mutable_aborted_set_pb = response->add_aborted_subtxn_set(); |
978 | 208k | if (txn_status_with_ht.status == TransactionStatus::COMMITTED && |
979 | 19.0k | it != managed_transactions_.end()) { |
980 | 19.0k | *mutable_aborted_set_pb = it->GetAbortedSubTransactionSetPB(); |
981 | 19.0k | } |
982 | 208k | } |
983 | 208k | postponed_leader_actions.Swap(&postponed_leader_actions_); |
984 | 208k | } |
985 | | |
986 | 208k | ExecutePostponedLeaderActions(&postponed_leader_actions); |
987 | 208k | if (GetAtomicFlag(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms)) { |
988 | 0 | if (response->status().size() > 0 && response->status(0) == TransactionStatus::PENDING) { |
989 | 0 | AtomicFlagRandomSleepMs(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms); |
990 | 0 | } |
991 | 0 | } |
992 | 208k | return Status::OK(); |
993 | 208k | } |
994 | | |
995 | | Result<TransactionStatusResult> ResolveSealedStatus( |
996 | | const TransactionId& transaction_id, |
997 | | HybridTime commit_time, |
998 | | const std::vector<ExpectedTabletBatches>& expected_tablet_batches, |
999 | | bool abort_if_not_replicated, |
1000 | 0 | std::unique_lock<std::mutex>* lock) { |
1001 | 0 | VLOG_WITH_PREFIX(4) |
1002 | 0 | << __func__ << ", txn: " << transaction_id << ", commit time: " << commit_time |
1003 | 0 | << ", expected tablet batches: " << AsString(expected_tablet_batches) |
1004 | 0 | << ", abort if not replicated: " << abort_if_not_replicated; |
1005 | |
|
1006 | 0 | auto deadline = TransactionRpcDeadline(); |
1007 | 0 | auto now_ht = context_.clock().Now(); |
1008 | 0 | CountDownLatch latch(expected_tablet_batches.size()); |
1009 | 0 | std::vector<HybridTime> write_hybrid_times(expected_tablet_batches.size()); |
1010 | 0 | { |
1011 | 0 | lock->unlock(); |
1012 | 0 | auto scope_exit = ScopeExit([lock] { |
1013 | 0 | if (lock) { |
1014 | 0 | lock->lock(); |
1015 | 0 | } |
1016 | 0 | }); |
1017 | 0 | size_t idx = 0; |
1018 | 0 | for (const auto& p : expected_tablet_batches) { |
1019 | 0 | tserver::GetTransactionStatusAtParticipantRequestPB req; |
1020 | 0 | req.set_tablet_id(p.tablet); |
1021 | 0 | req.set_transaction_id( |
1022 | 0 | pointer_cast<const char*>(transaction_id.data()), transaction_id.size()); |
1023 | 0 | req.set_propagated_hybrid_time(now_ht.ToUint64()); |
1024 | 0 | if (abort_if_not_replicated) { |
1025 | 0 | req.set_required_num_replicated_batches(p.batches); |
1026 | 0 | } |
1027 | |
|
1028 | 0 | auto handle = rpcs_.Prepare(); |
1029 | 0 | if (handle != rpcs_.InvalidHandle()) { |
1030 | 0 | *handle = GetTransactionStatusAtParticipant( |
1031 | 0 | deadline, |
1032 | 0 | nullptr /* remote_tablet */, |
1033 | 0 | context_.client_future().get(), |
1034 | 0 | &req, |
1035 | 0 | [this, handle, idx, &write_hybrid_times, &expected_tablet_batches, &latch, |
1036 | 0 | &transaction_id, &p]( |
1037 | 0 | const Status& status, |
1038 | 0 | const tserver::GetTransactionStatusAtParticipantResponsePB& resp) { |
1039 | 0 | client::UpdateClock(resp, &context_); |
1040 | 0 | rpcs_.Unregister(handle); |
1041 | |
|
1042 | 0 | VLOG_WITH_PREFIX(4) |
1043 | 0 | << "TXN: " << transaction_id << " batch status at " << p.tablet << ": " |
1044 | 0 | << "idx: " << idx << ", resp: " << resp.ShortDebugString() << ", expected: " |
1045 | 0 | << expected_tablet_batches[idx].batches; |
1046 | 0 | if (status.ok()) { |
1047 | 0 | if (resp.aborted()) { |
1048 | 0 | write_hybrid_times[idx] = HybridTime::kMin; |
1049 | 0 | } else if (implicit_cast<size_t>(resp.num_replicated_batches()) == |
1050 | 0 | expected_tablet_batches[idx].batches) { |
1051 | 0 | write_hybrid_times[idx] = HybridTime(resp.status_hybrid_time()); |
1052 | 0 | LOG_IF_WITH_PREFIX(DFATAL, !write_hybrid_times[idx].is_valid()) |
1053 | 0 | << "Received invalid hybrid time when all batches were replicated: " |
1054 | 0 | << resp.ShortDebugString(); |
1055 | 0 | } |
1056 | 0 | } |
1057 | 0 | latch.CountDown(); |
1058 | 0 | }); |
1059 | 0 | (**handle).SendRpc(); |
1060 | 0 | } else { |
1061 | 0 | latch.CountDown(); |
1062 | 0 | } |
1063 | 0 | ++idx; |
1064 | 0 | } |
1065 | 0 | latch.Wait(); |
1066 | 0 | } |
1067 | |
|
1068 | 0 | auto txn_it = managed_transactions_.find(transaction_id); |
1069 | 0 | if (txn_it == managed_transactions_.end()) { |
1070 | | // Transaction was completed (aborted/committed) during this procedure. |
1071 | 0 | return TransactionStatusResult{TransactionStatus::PENDING, commit_time.Decremented()}; |
1072 | 0 | } |
1073 | | |
1074 | 0 | for (size_t idx = 0; idx != expected_tablet_batches.size(); ++idx) { |
1075 | 0 | if (write_hybrid_times[idx] == HybridTime::kMin) { |
1076 | 0 | managed_transactions_.modify(txn_it, [](TransactionState& state) { |
1077 | 0 | state.Aborted(); |
1078 | 0 | }); |
1079 | 0 | } else if (write_hybrid_times[idx].is_valid()) { |
1080 | 0 | managed_transactions_.modify( |
1081 | 0 | txn_it, [idx, &expected_tablet_batches, &write_hybrid_times](TransactionState& state) { |
1082 | 0 | state.ReplicatedAllBatchesAt( |
1083 | 0 | expected_tablet_batches[idx].tablet, write_hybrid_times[idx]); |
1084 | 0 | }); |
1085 | 0 | } |
1086 | 0 | } |
1087 | 0 | auto result = VERIFY_RESULT(txn_it->GetStatus(/* expected_tablet_batches = */ nullptr)); |
1088 | 0 | if (result.status != TransactionStatus::SEALED) { |
1089 | 0 | VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status resolved: " |
1090 | 0 | << TransactionStatus_Name(result.status); |
1091 | 0 | return result; |
1092 | 0 | } |
1093 | | |
1094 | 0 | VLOG_WITH_PREFIX(4) << "TXN: " << transaction_id << " status NOT resolved"; |
1095 | 0 | return TransactionStatusResult{TransactionStatus::PENDING, result.status_time.Decremented()}; |
1096 | 0 | } |
1097 | | |
1098 | 116k | void Abort(const std::string& transaction_id, int64_t term, TransactionAbortCallback callback) { |
1099 | 116k | AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms); |
1100 | | |
1101 | 116k | auto id = FullyDecodeTransactionId(transaction_id); |
1102 | 116k | if (!id.ok()) { |
1103 | 0 | callback(id.status()); |
1104 | 0 | return; |
1105 | 0 | } |
1106 | | |
1107 | 116k | PostponedLeaderActions actions; |
1108 | 116k | { |
1109 | 116k | std::unique_lock<std::mutex> lock(managed_mutex_); |
1110 | 116k | auto it = managed_transactions_.find(*id); |
1111 | 116k | if (it == managed_transactions_.end()) { |
1112 | 441 | lock.unlock(); |
1113 | 441 | callback(TransactionStatusResult::Aborted()); |
1114 | 441 | return; |
1115 | 441 | } |
1116 | 115k | postponed_leader_actions_.leader_term = term; |
1117 | 115k | boost::optional<TransactionStatusResult> status; |
1118 | 115k | managed_transactions_.modify(it, [&status, &callback](TransactionState& state) { |
1119 | 115k | status = state.Abort(&callback); |
1120 | 115k | }); |
1121 | 115k | if (callback) { |
1122 | 25.7k | lock.unlock(); |
1123 | 25.7k | callback(*status); |
1124 | 25.7k | return; |
1125 | 25.7k | } |
1126 | 90.0k | actions.Swap(&postponed_leader_actions_); |
1127 | 90.0k | } |
1128 | | |
1129 | 90.0k | ExecutePostponedLeaderActions(&actions); |
1130 | 90.0k | } |
1131 | | |
1132 | 0 | size_t test_count_transactions() { |
1133 | 0 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1134 | 0 | return managed_transactions_.size(); |
1135 | 0 | } |
1136 | | |
1137 | 1.87M | CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) { |
1138 | 1.87M | auto id = FullyDecodeTransactionId(data.state.transaction_id()); |
1139 | 1.87M | if (!id.ok()) { |
1140 | 0 | return std::move(id.status()); |
1141 | 0 | } |
1142 | | |
1143 | 1.87M | PostponedLeaderActions actions; |
1144 | 1.87M | Status result; |
1145 | 1.87M | { |
1146 | 1.87M | std::lock_guard<std::mutex> lock(managed_mutex_); |
1147 | 1.87M | postponed_leader_actions_.leader_term = data.leader_term; |
1148 | 1.87M | auto it = GetTransaction(*id, data.state.status(), data.hybrid_time); |
1149 | 1.87M | if (it == managed_transactions_.end()) { |
1150 | 0 | return Status::OK(); |
1151 | 0 | } |
1152 | 1.87M | managed_transactions_.modify(it, [&result, &data](TransactionState& state) { |
1153 | 1.87M | result = state.ProcessReplicated(data); |
1154 | 1.87M | }); |
1155 | 1.87M | CheckCompleted(it); |
1156 | 1.87M | actions.Swap(&postponed_leader_actions_); |
1157 | 1.87M | } |
1158 | 1.87M | ExecutePostponedLeaderActions(&actions); |
1159 | | |
1160 | 18.4E | VLOG_WITH_PREFIX(1) << "Processed: " << data.ToString(); |
1161 | 1.87M | return result; |
1162 | 1.87M | } |
1163 | | |
1164 | 0 | void ProcessAborted(const AbortedData& data) { |
1165 | 0 | auto id = FullyDecodeTransactionId(data.state.transaction_id()); |
1166 | 0 | if (!id.ok()) { |
1167 | 0 | LOG_WITH_PREFIX(DFATAL) << "Abort of transaction with bad id " |
1168 | 0 | << data.state.ShortDebugString() << ": " << id.status(); |
1169 | 0 | return; |
1170 | 0 | } |
1171 | | |
1172 | 0 | PostponedLeaderActions actions; |
1173 | 0 | { |
1174 | 0 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1175 | 0 | postponed_leader_actions_.leader_term = OpId::kUnknownTerm; |
1176 | 0 | auto it = managed_transactions_.find(*id); |
1177 | 0 | if (it == managed_transactions_.end()) { |
1178 | 0 | LOG_WITH_PREFIX(WARNING) << "Aborted operation for unknown transaction: " << *id; |
1179 | 0 | return; |
1180 | 0 | } |
1181 | 0 | managed_transactions_.modify( |
1182 | 0 | it, [&](TransactionState& ts) { |
1183 | 0 | ts.ProcessAborted(data); |
1184 | 0 | }); |
1185 | 0 | CheckCompleted(it); |
1186 | 0 | actions.Swap(&postponed_leader_actions_); |
1187 | 0 | } |
1188 | 0 | ExecutePostponedLeaderActions(&actions); |
1189 | |
|
1190 | 0 | VLOG_WITH_PREFIX(1) << "Aborted, state: " << data.state.ShortDebugString() |
1191 | 0 | << ", op id: " << data.op_id; |
1192 | 0 | } |
1193 | | |
1194 | 29.1k | void Start() { |
1195 | 29.1k | poller_.Start( |
1196 | 29.1k | &context_.client_future().get()->messenger()->scheduler(), |
1197 | 29.1k | std::chrono::microseconds(kTimeMultiplier * FLAGS_transaction_check_interval_usec)); |
1198 | 29.1k | } |
1199 | | |
1200 | 685k | void Handle(std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) { |
1201 | 685k | auto& state = *request->request(); |
1202 | 685k | auto id = FullyDecodeTransactionId(state.transaction_id()); |
1203 | 685k | if (!id.ok()) { |
1204 | 0 | LOG(WARNING) << "Failed to decode id from " << state.ShortDebugString() << ": " << id; |
1205 | 0 | request->CompleteWithStatus(id.status()); |
1206 | 0 | return; |
1207 | 0 | } |
1208 | | |
1209 | 685k | PostponedLeaderActions actions; |
1210 | 685k | { |
1211 | 685k | std::unique_lock<std::mutex> lock(managed_mutex_); |
1212 | 685k | postponed_leader_actions_.leader_term = term; |
1213 | 685k | auto it = managed_transactions_.find(*id); |
1214 | 685k | if (it == managed_transactions_.end()) { |
1215 | 292k | if (state.status() == TransactionStatus::CREATED) { |
1216 | 243k | it = managed_transactions_.emplace( |
1217 | 243k | this, *id, context_.clock().Now(), log_prefix_).first; |
1218 | 49.6k | } else { |
1219 | 49.6k | lock.unlock(); |
1220 | 49.6k | YB_LOG_HIGHER_SEVERITY_WHEN_TOO_MANY(INFO, WARNING, 1s, 50) |
1221 | 49.6k | << LogPrefix() << "Request to unknown transaction " << id << ": " |
1222 | 49.6k | << state.ShortDebugString(); |
1223 | 49.6k | auto status = STATUS_EC_FORMAT( |
1224 | 49.6k | Expired, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE), |
1225 | 49.6k | "Transaction $0 expired or aborted by a conflict", *id); |
1226 | 49.6k | status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); |
1227 | 49.6k | request->CompleteWithStatus(status); |
1228 | 49.6k | return; |
1229 | 49.6k | } |
1230 | 635k | } |
1231 | | |
1232 | 637k | managed_transactions_.modify(it, [&request](TransactionState& state) { |
1233 | 637k | state.Handle(std::move(request)); |
1234 | 637k | }); |
1235 | 635k | postponed_leader_actions_.Swap(&actions); |
1236 | 635k | } |
1237 | | |
1238 | 635k | ExecutePostponedLeaderActions(&actions); |
1239 | 635k | } |
1240 | | |
1241 | 2.68M | int64_t PrepareGC(std::string* details) { |
1242 | 2.68M | std::lock_guard<std::mutex> lock(managed_mutex_); |
1243 | 2.68M | if (!managed_transactions_.empty()) { |
1244 | 161k | auto& txn = *managed_transactions_.get<FirstEntryIndexTag>().begin(); |
1245 | 161k | if (details) { |
1246 | 0 | *details += Format("Transaction coordinator: $0\n", txn); |
1247 | 0 | } |
1248 | 161k | return txn.first_entry_raft_index(); |
1249 | 161k | } |
1250 | 2.52M | return std::numeric_limits<int64_t>::max(); |
1251 | 2.52M | } |
1252 | | |
1253 | | // Returns logs prefix for this transaction coordinator. |
1254 | 49.7k | const std::string& LogPrefix() { |
1255 | 49.7k | return log_prefix_; |
1256 | 49.7k | } |
1257 | | |
1258 | 0 | std::string DumpTransactions() { |
1259 | 0 | std::string result; |
1260 | 0 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1261 | 0 | for (const auto& txn : managed_transactions_) { |
1262 | 0 | result += txn.ToString(); |
1263 | 0 | result += "\n"; |
1264 | 0 | } |
1265 | 0 | return result; |
1266 | 0 | } |
1267 | | |
1268 | | private: |
1269 | | class LastTouchTag; |
1270 | | class FirstEntryIndexTag; |
1271 | | |
1272 | | typedef boost::multi_index_container<TransactionState, |
1273 | | boost::multi_index::indexed_by < |
1274 | | boost::multi_index::hashed_unique < |
1275 | | boost::multi_index::const_mem_fun<TransactionState, |
1276 | | const TransactionId&, |
1277 | | &TransactionState::id> |
1278 | | >, |
1279 | | boost::multi_index::ordered_non_unique < |
1280 | | boost::multi_index::tag<LastTouchTag>, |
1281 | | boost::multi_index::const_mem_fun<TransactionState, |
1282 | | HybridTime, |
1283 | | &TransactionState::last_touch> |
1284 | | >, |
1285 | | boost::multi_index::ordered_non_unique < |
1286 | | boost::multi_index::tag<FirstEntryIndexTag>, |
1287 | | boost::multi_index::const_mem_fun<TransactionState, |
1288 | | int64_t, |
1289 | | &TransactionState::first_entry_raft_index> |
1290 | | > |
1291 | | > |
1292 | | > ManagedTransactions; |
1293 | | |
1294 | | void SendUpdateTransactionRequest( |
1295 | | const NotifyApplyingData& action, HybridTime now, |
1296 | 246k | const CoarseTimePoint& deadline) { |
1297 | 18.4E | VLOG_WITH_PREFIX(3) << "Notify applying: " << action.ToString(); |
1298 | | |
1299 | 246k | tserver::UpdateTransactionRequestPB req; |
1300 | 246k | req.set_tablet_id(action.tablet); |
1301 | 246k | req.set_propagated_hybrid_time(now.ToUint64()); |
1302 | 246k | auto& state = *req.mutable_state(); |
1303 | 246k | state.set_transaction_id(action.transaction.data(), action.transaction.size()); |
1304 | 246k | state.set_status(TransactionStatus::APPLYING); |
1305 | 246k | state.add_tablets(context_.tablet_id()); |
1306 | 246k | state.set_commit_hybrid_time(action.commit_time.ToUint64()); |
1307 | 246k | state.set_sealed(action.sealed); |
1308 | 246k | *state.mutable_aborted() = action.aborted; |
1309 | | |
1310 | 246k | auto handle = rpcs_.Prepare(); |
1311 | 246k | if (handle != rpcs_.InvalidHandle()) { |
1312 | 246k | *handle = UpdateTransaction( |
1313 | 246k | deadline, |
1314 | 246k | nullptr /* remote_tablet */, |
1315 | 246k | context_.client_future().get(), |
1316 | 246k | &req, |
1317 | 246k | [this, handle, action] |
1318 | 246k | (const Status& status, |
1319 | 246k | const tserver::UpdateTransactionRequestPB& req, |
1320 | 245k | const tserver::UpdateTransactionResponsePB& resp) { |
1321 | 245k | client::UpdateClock(resp, &context_); |
1322 | 245k | rpcs_.Unregister(handle); |
1323 | 246k | if (status.ok()) { |
1324 | 246k | return; |
1325 | 246k | } |
1326 | 18.4E | LOG_WITH_PREFIX(WARNING) |
1327 | 18.4E | << "Failed to send apply for transaction: " << action.transaction << ": " |
1328 | 18.4E | << status; |
1329 | 18.4E | const auto split_child_tablet_ids = SplitChildTabletIdsData(status).value(); |
1330 | 18.4E | const bool tablet_has_been_split = !split_child_tablet_ids.empty(); |
1331 | 18.4E | if (status.IsNotFound() || tablet_has_been_split) { |
1332 | 30 | std::lock_guard<std::mutex> lock(managed_mutex_); |
1333 | 30 | auto it = managed_transactions_.find(action.transaction); |
1334 | 30 | if (it == managed_transactions_.end()) { |
1335 | 0 | return; |
1336 | 0 | } |
1337 | 30 | managed_transactions_.modify( |
1338 | 30 | it, [this, &action, &split_child_tablet_ids, |
1339 | 30 | tablet_has_been_split](TransactionState& state) { |
1340 | 30 | if (tablet_has_been_split) { |
1341 | | // We need to update involved tablets map. |
1342 | 0 | LOG_WITH_PREFIX(INFO) << Format( |
1343 | 0 | "Tablet $0 has been split into: $1", action.tablet, |
1344 | 0 | split_child_tablet_ids); |
1345 | 0 | state.AddInvolvedTablets(action.tablet, split_child_tablet_ids); |
1346 | 30 | } else { |
1347 | | // Tablet has been deleted (not split), so we should mark it as applied to |
1348 | | // be able to cleanup the transaction. |
1349 | 30 | TransactionStatePB transaction_state; |
1350 | 30 | transaction_state.add_tablets(action.tablet); |
1351 | 30 | WARN_NOT_OK( |
1352 | 30 | state.AppliedInOneOfInvolvedTablets(transaction_state), |
1353 | 30 | "AppliedInOneOfInvolvedTablets for removed tabled failed: "); |
1354 | 30 | } |
1355 | 30 | }); |
1356 | 30 | if (tablet_has_been_split) { |
1357 | 0 | const auto new_deadline = TransactionRpcDeadline(); |
1358 | 0 | NotifyApplyingData new_action = action; |
1359 | 0 | for (const auto& split_child_tablet_id : split_child_tablet_ids) { |
1360 | 0 | new_action.tablet = split_child_tablet_id; |
1361 | 0 | SendUpdateTransactionRequest(new_action, context_.clock().Now(), new_deadline); |
1362 | 0 | } |
1363 | 0 | } |
1364 | 30 | } |
1365 | 18.4E | }); |
1366 | 246k | (**handle).SendRpc(); |
1367 | 246k | } |
1368 | 246k | } |
1369 | | |
1370 | 4.16M | void ExecutePostponedLeaderActions(PostponedLeaderActions* actions) { |
1371 | 249k | for (const auto& p : actions->complete_with_status) { |
1372 | 249k | p.request->CompleteWithStatus(p.status); |
1373 | 249k | } |
1374 | | |
1375 | 4.16M | if (!actions->leader()) { |
1376 | 2.17M | return; |
1377 | 2.17M | } |
1378 | | |
1379 | 1.99M | if (!actions->notify_applying.empty()) { |
1380 | 104k | auto now = context_.clock().Now(); |
1381 | 104k | auto deadline = TransactionRpcDeadline(); |
1382 | 246k | for (const auto& action : actions->notify_applying) { |
1383 | 246k | SendUpdateTransactionRequest(action, now, deadline); |
1384 | 246k | } |
1385 | 104k | } |
1386 | | |
1387 | 626k | for (auto& update : actions->updates) { |
1388 | 626k | context_.SubmitUpdateTransaction(std::move(update), actions->leader_term); |
1389 | 626k | } |
1390 | 1.99M | } |
1391 | | |
1392 | | ManagedTransactions::iterator GetTransaction(const TransactionId& id, |
1393 | | TransactionStatus status, |
1394 | 1.87M | HybridTime hybrid_time) { |
1395 | 1.87M | auto it = managed_transactions_.find(id); |
1396 | 1.87M | if (it == managed_transactions_.end()) { |
1397 | 484k | if (status != TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS) { |
1398 | 484k | it = managed_transactions_.emplace(this, id, hybrid_time, log_prefix_).first; |
1399 | 44 | VLOG_WITH_PREFIX(1) << Format("Added: $0", *it); |
1400 | 484k | } |
1401 | 484k | } |
1402 | 1.87M | return it; |
1403 | 1.87M | } |
1404 | | |
1405 | 1.20M | TransactionCoordinatorContext& coordinator_context() override { |
1406 | 1.20M | return context_; |
1407 | 1.20M | } |
1408 | | |
1409 | 246k | void NotifyApplying(NotifyApplyingData data) override { |
1410 | 246k | if (!leader()) { |
1411 | 0 | LOG_WITH_PREFIX(WARNING) << __func__ << " at non leader: " << data.ToString(); |
1412 | 0 | return; |
1413 | 0 | } |
1414 | 246k | postponed_leader_actions_.notify_applying.push_back(std::move(data)); |
1415 | 246k | } |
1416 | | |
1417 | | MUST_USE_RESULT bool SubmitUpdateTransaction( |
1418 | 627k | std::unique_ptr<UpdateTxnOperation> operation) override { |
1419 | 627k | if (!postponed_leader_actions_.leader()) { |
1420 | 27 | auto status = STATUS(IllegalState, "Submit update transaction on non leader"); |
1421 | 0 | VLOG_WITH_PREFIX(1) << status; |
1422 | 27 | operation->CompleteWithStatus(status); |
1423 | 27 | return false; |
1424 | 27 | } |
1425 | | |
1426 | 627k | postponed_leader_actions_.updates.push_back(std::move(operation)); |
1427 | 627k | return true; |
1428 | 627k | } |
1429 | | |
1430 | | void CompleteWithStatus( |
1431 | 249k | std::unique_ptr<UpdateTxnOperation> request, Status status) override { |
1432 | 249k | auto ptr = request.get(); |
1433 | 249k | postponed_leader_actions_.complete_with_status.push_back({ |
1434 | 249k | std::move(request), ptr, std::move(status)}); |
1435 | 249k | } |
1436 | | |
1437 | 0 | void CompleteWithStatus(UpdateTxnOperation* request, Status status) override { |
1438 | 0 | postponed_leader_actions_.complete_with_status.push_back({ |
1439 | 0 | nullptr /* holder */, request, std::move(status)}); |
1440 | 0 | } |
1441 | | |
1442 | 1.40M | bool leader() const override { |
1443 | 1.40M | return postponed_leader_actions_.leader(); |
1444 | 1.40M | } |
1445 | | |
1446 | 1.42M | void Poll() { |
1447 | 1.42M | auto now = context_.clock().Now(); |
1448 | | |
1449 | 1.42M | auto leader_term = context_.LeaderTerm(); |
1450 | 1.42M | bool leader = leader_term != OpId::kUnknownTerm; |
1451 | 1.42M | PostponedLeaderActions actions; |
1452 | 1.42M | { |
1453 | 1.42M | std::lock_guard<std::mutex> lock(managed_mutex_); |
1454 | 1.42M | postponed_leader_actions_.leader_term = leader_term; |
1455 | | |
1456 | 1.42M | auto& index = managed_transactions_.get<LastTouchTag>(); |
1457 | | |
1458 | 1.42M | if (VLOG_IS_ON(4) && leader && !index.empty()) { |
1459 | 0 | const auto& txn = *index.begin(); |
1460 | 0 | LOG_WITH_PREFIX(INFO) |
1461 | 0 | << __func__ << ", now: " << now << ", first: " << txn.ToString() |
1462 | 0 | << ", expired: " << txn.ExpiredAt(now) << ", timeout: " |
1463 | 0 | << MonoDelta(GetTransactionTimeout()) << ", passed: " |
1464 | 0 | << MonoDelta::FromMicroseconds( |
1465 | 0 | now.GetPhysicalValueMicros() - txn.last_touch().GetPhysicalValueMicros()); |
1466 | 0 | } |
1467 | | |
1468 | 1.42M | for (auto it = index.begin(); it != index.end() && it->ExpiredAt(now);) { |
1469 | 381 | if (it->status() == TransactionStatus::ABORTED) { |
1470 | 0 | it = index.erase(it); |
1471 | 381 | } else { |
1472 | 381 | if (leader) { |
1473 | 190 | expired_metric_.Increment(); |
1474 | 190 | bool modified = index.modify(it, [](TransactionState& state) { |
1475 | 0 | VLOG(4) << state.LogPrefix() << "Cleanup expired transaction"; |
1476 | 190 | state.Abort(); |
1477 | 190 | }); |
1478 | 190 | DCHECK(modified); |
1479 | 190 | } |
1480 | 381 | ++it; |
1481 | 381 | } |
1482 | 381 | } |
1483 | 1.42M | auto now_physical = MonoTime::Now(); |
1484 | 411k | for (auto& transaction : managed_transactions_) { |
1485 | 411k | const_cast<TransactionState&>(transaction).Poll(leader, now_physical); |
1486 | 411k | } |
1487 | 1.42M | postponed_leader_actions_.Swap(&actions); |
1488 | 1.42M | } |
1489 | 1.42M | ExecutePostponedLeaderActions(&actions); |
1490 | 1.42M | } |
1491 | | |
1492 | 1.52M | void CheckCompleted(ManagedTransactions::iterator it) { |
1493 | 1.52M | if (it->Completed()) { |
1494 | 589k | auto status = STATUS_FORMAT(Expired, "Transaction completed: $0", *it); |
1495 | 4 | VLOG_WITH_PREFIX(1) << status; |
1496 | 589k | managed_transactions_.modify(it, [&status](TransactionState& state) { |
1497 | 589k | state.ClearRequests(status); |
1498 | 589k | }); |
1499 | 589k | managed_transactions_.erase(it); |
1500 | 589k | } |
1501 | 1.52M | } |
1502 | | |
1503 | | TransactionCoordinatorContext& context_; |
1504 | | Counter& expired_metric_; |
1505 | | const std::string log_prefix_; |
1506 | | |
1507 | | std::mutex managed_mutex_; |
1508 | | ManagedTransactions managed_transactions_; |
1509 | | |
1510 | | // Actions that should be executed after mutex is unlocked. |
1511 | | PostponedLeaderActions postponed_leader_actions_; |
1512 | | |
1513 | | rpc::Poller poller_; |
1514 | | rpc::Rpcs rpcs_; |
1515 | | }; |
1516 | | |
1517 | | TransactionCoordinator::TransactionCoordinator(const std::string& permanent_uuid, |
1518 | | TransactionCoordinatorContext* context, |
1519 | | Counter* expired_metric) |
1520 | 29.1k | : impl_(new Impl(permanent_uuid, context, expired_metric)) { |
1521 | 29.1k | } |
1522 | | |
1523 | 128 | TransactionCoordinator::~TransactionCoordinator() { |
1524 | 128 | } |
1525 | | |
1526 | 1.87M | Status TransactionCoordinator::ProcessReplicated(const ReplicatedData& data) { |
1527 | 1.87M | return impl_->ProcessReplicated(data); |
1528 | 1.87M | } |
1529 | | |
1530 | 0 | void TransactionCoordinator::ProcessAborted(const AbortedData& data) { |
1531 | 0 | impl_->ProcessAborted(data); |
1532 | 0 | } |
1533 | | |
1534 | 2.68M | int64_t TransactionCoordinator::PrepareGC(std::string* details) { |
1535 | 2.68M | return impl_->PrepareGC(details); |
1536 | 2.68M | } |
1537 | | |
1538 | 0 | size_t TransactionCoordinator::test_count_transactions() const { |
1539 | 0 | return impl_->test_count_transactions(); |
1540 | 0 | } |
1541 | | |
1542 | | void TransactionCoordinator::Handle( |
1543 | 686k | std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) { |
1544 | 686k | impl_->Handle(std::move(request), term); |
1545 | 686k | } |
1546 | | |
1547 | 29.1k | void TransactionCoordinator::Start() { |
1548 | 29.1k | impl_->Start(); |
1549 | 29.1k | } |
1550 | | |
1551 | 128 | void TransactionCoordinator::Shutdown() { |
1552 | 128 | impl_->Shutdown(); |
1553 | 128 | } |
1554 | | |
1555 | | Status TransactionCoordinator::GetStatus( |
1556 | | const google::protobuf::RepeatedPtrField<std::string>& transaction_ids, |
1557 | | CoarseTimePoint deadline, |
1558 | 208k | tserver::GetTransactionStatusResponsePB* response) { |
1559 | 208k | return impl_->GetStatus(transaction_ids, deadline, response); |
1560 | 208k | } |
1561 | | |
1562 | | void TransactionCoordinator::Abort(const std::string& transaction_id, |
1563 | | int64_t term, |
1564 | 116k | TransactionAbortCallback callback) { |
1565 | 116k | impl_->Abort(transaction_id, term, std::move(callback)); |
1566 | 116k | } |
1567 | | |
1568 | 0 | std::string TransactionCoordinator::DumpTransactions() { |
1569 | 0 | return impl_->DumpTransactions(); |
1570 | 0 | } |
1571 | | |
1572 | 1 | std::string TransactionCoordinator::ReplicatedData::ToString() const { |
1573 | 1 | return Format("{ leader_term: $0 state: $1 op_id: $2 hybrid_time: $3 }", |
1574 | 1 | leader_term, state, op_id, hybrid_time); |
1575 | 1 | } |
1576 | | |
1577 | | } // namespace tablet |
1578 | | } // namespace yb |