/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_participant.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/tablet/transaction_participant.h" |
17 | | |
18 | | #include <queue> |
19 | | |
20 | | #include <boost/multi_index/hashed_index.hpp> |
21 | | #include <boost/multi_index/mem_fun.hpp> |
22 | | #include <boost/multi_index/ordered_index.hpp> |
23 | | |
24 | | #include "yb/client/transaction_rpc.h" |
25 | | |
26 | | #include "yb/common/pgsql_error.h" |
27 | | #include "yb/common/transaction_error.h" |
28 | | |
29 | | #include "yb/consensus/consensus_util.h" |
30 | | |
31 | | #include "yb/docdb/docdb_rocksdb_util.h" |
32 | | #include "yb/docdb/transaction_dump.h" |
33 | | |
34 | | #include "yb/rpc/poller.h" |
35 | | |
36 | | #include "yb/server/clock.h" |
37 | | |
38 | | #include "yb/tablet/cleanup_aborts_task.h" |
39 | | #include "yb/tablet/cleanup_intents_task.h" |
40 | | #include "yb/tablet/operations/update_txn_operation.h" |
41 | | #include "yb/tablet/remove_intents_task.h" |
42 | | #include "yb/tablet/running_transaction.h" |
43 | | #include "yb/tablet/running_transaction_context.h" |
44 | | #include "yb/tablet/transaction_loader.h" |
45 | | #include "yb/tablet/transaction_participant_context.h" |
46 | | #include "yb/tablet/transaction_status_resolver.h" |
47 | | |
48 | | #include "yb/tserver/tserver_service.pb.h" |
49 | | |
50 | | #include "yb/util/countdown_latch.h" |
51 | | #include "yb/util/debug-util.h" |
52 | | #include "yb/util/flag_tags.h" |
53 | | #include "yb/util/format.h" |
54 | | #include "yb/util/logging.h" |
55 | | #include "yb/util/lru_cache.h" |
56 | | #include "yb/util/metrics.h" |
57 | | #include "yb/util/operation_counter.h" |
58 | | #include "yb/util/scope_exit.h" |
59 | | #include "yb/util/status_format.h" |
60 | | #include "yb/util/status_log.h" |
61 | | #include "yb/util/tsan_util.h" |
62 | | |
63 | | using namespace std::literals; |
64 | | using namespace std::placeholders; |
65 | | |
66 | | DEFINE_uint64(transaction_min_running_check_delay_ms, 50, |
67 | | "When transaction with minimal start hybrid time is updated at transaction " |
68 | | "participant, we wait at least this number of milliseconds before checking its " |
69 | | "status at transaction coordinator. Used for the optimization that deletes " |
70 | | "provisional records RocksDB SSTable files."); |
71 | | |
72 | | DEFINE_uint64(transaction_min_running_check_interval_ms, 250, |
73 | | "While transaction with minimal start hybrid time remains the same, we will try " |
74 | | "to check its status at transaction coordinator at regular intervals this " |
75 | | "long (ms). Used for the optimization that deletes " |
76 | | "provisional records RocksDB SSTable files."); |
77 | | |
78 | | DEFINE_test_flag(double, transaction_ignore_applying_probability, 0, |
79 | | "Probability to ignore APPLYING update in tests."); |
80 | | DEFINE_test_flag(bool, fail_in_apply_if_no_metadata, false, |
81 | | "Fail when applying intents if metadata is not found."); |
82 | | |
83 | | DEFINE_int32(max_transactions_in_status_request, 128, |
84 | | "Request status for at most specified number of transactions at once. " |
85 | | "0 disables load time transaction status resolution."); |
86 | | |
87 | | DEFINE_uint64(transactions_cleanup_cache_size, 256, "Transactions cleanup cache size."); |
88 | | |
89 | | DEFINE_uint64(transactions_status_poll_interval_ms, 500 * yb::kTimeMultiplier, |
90 | | "Transactions poll interval."); |
91 | | |
92 | | DEFINE_bool(transactions_poll_check_aborted, true, "Check aborted transactions during poll."); |
93 | | |
94 | | DECLARE_int64(transaction_abort_check_timeout_ms); |
95 | | |
96 | | METRIC_DEFINE_simple_counter( |
97 | | tablet, transaction_not_found, "Total number of missing transactions during load", |
98 | | yb::MetricUnit::kTransactions); |
99 | | METRIC_DEFINE_simple_gauge_uint64( |
100 | | tablet, transactions_running, "Total number of transactions running in participant", |
101 | | yb::MetricUnit::kTransactions); |
102 | | |
103 | | DEFINE_test_flag(int32, txn_participant_inject_latency_on_apply_update_txn_ms, 0, |
104 | | "How much latency to inject when a update txn operation is applied."); |
105 | | |
106 | | namespace yb { |
107 | | namespace tablet { |
108 | | |
109 | | namespace { |
110 | | |
111 | | YB_STRONGLY_TYPED_BOOL(PostApplyCleanup); |
112 | | |
113 | | } // namespace |
114 | | |
115 | 93 | std::string TransactionApplyData::ToString() const { |
116 | 93 | return YB_STRUCT_TO_STRING( |
117 | 93 | leader_term, transaction_id, op_id, commit_ht, log_ht, sealed, status_tablet, apply_state); |
118 | 93 | } |
119 | | |
120 | | class TransactionParticipant::Impl |
121 | | : public RunningTransactionContext, public TransactionLoaderContext { |
122 | | public: |
123 | | Impl(TransactionParticipantContext* context, TransactionIntentApplier* applier, |
124 | | const scoped_refptr<MetricEntity>& entity) |
125 | | : RunningTransactionContext(context, applier), |
126 | | log_prefix_(context->LogPrefix()), |
127 | | loader_(this, entity), |
128 | 56.9k | poller_(log_prefix_, std::bind(&Impl::Poll, this)) { |
129 | 56.9k | LOG_WITH_PREFIX(INFO) << "Create"; |
130 | 56.9k | metric_transactions_running_ = METRIC_transactions_running.Instantiate(entity, 0); |
131 | 56.9k | metric_transaction_not_found_ = METRIC_transaction_not_found.Instantiate(entity); |
132 | 56.9k | } |
133 | | |
134 | 44.5k | ~Impl() { |
135 | 44.5k | if (StartShutdown()) { |
136 | 0 | CompleteShutdown(); |
137 | 44.5k | } else { |
138 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !shutdown_done_.load(std::memory_order_acquire)) |
139 | 18.4E | << "Destroying transaction participant that did not complete shutdown"; |
140 | 44.5k | } |
141 | 44.5k | } |
142 | | |
143 | 89.4k | bool StartShutdown() { |
144 | 89.4k | bool expected = false; |
145 | 89.4k | if (!closing_.compare_exchange_strong(expected, true)) { |
146 | 44.6k | return false; |
147 | 44.6k | } |
148 | | |
149 | 44.8k | poller_.Shutdown(); |
150 | | |
151 | 44.8k | if (start_latch_.count()) { |
152 | 0 | start_latch_.CountDown(); |
153 | 0 | } |
154 | | |
155 | 44.8k | LOG_WITH_PREFIX(INFO) << "Shutdown"; |
156 | 44.8k | return true; |
157 | 89.4k | } |
158 | | |
159 | 44.7k | void CompleteShutdown() { |
160 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !closing_.load()) << __func__ << " w/o StartShutdown"; |
161 | | |
162 | 44.7k | decltype(status_resolvers_) status_resolvers; |
163 | 44.7k | { |
164 | 44.7k | MinRunningNotifier min_running_notifier(nullptr /* applier */); |
165 | 44.7k | std::lock_guard<std::mutex> lock(mutex_); |
166 | 44.7k | transactions_.clear(); |
167 | 44.7k | TransactionsModifiedUnlocked(&min_running_notifier); |
168 | 44.7k | status_resolvers.swap(status_resolvers_); |
169 | 44.7k | } |
170 | | |
171 | 44.7k | rpcs_.Shutdown(); |
172 | 44.7k | loader_.Shutdown(); |
173 | 44.7k | for (auto& resolver : status_resolvers) { |
174 | 3 | resolver.Shutdown(); |
175 | 3 | } |
176 | 44.7k | shutdown_done_.store(true, std::memory_order_release); |
177 | 44.7k | } |
178 | | |
179 | 183 | bool Closing() const override { |
180 | 183 | return closing_.load(std::memory_order_acquire); |
181 | 183 | } |
182 | | |
183 | 56.9k | void Start() { |
184 | 56.9k | LOG_WITH_PREFIX(INFO) << "Start"; |
185 | 56.9k | start_latch_.CountDown(); |
186 | 56.9k | } |
187 | | |
188 | | // Adds new running transaction. |
189 | 1.80M | Result<bool> Add(const TransactionMetadata& metadata) { |
190 | 1.80M | loader_.WaitLoaded(metadata.transaction_id); |
191 | | |
192 | 1.80M | MinRunningNotifier min_running_notifier(&applier_); |
193 | 1.80M | std::lock_guard<std::mutex> lock(mutex_); |
194 | 1.80M | auto it = transactions_.find(metadata.transaction_id); |
195 | 1.80M | if (it != transactions_.end()) { |
196 | 586 | return false; |
197 | 586 | } |
198 | 1.80M | if (WasTransactionRecentlyRemoved(metadata.transaction_id) || |
199 | 1.80M | cleanup_cache_.Erase(metadata.transaction_id) != 0) { |
200 | 130k | auto status = STATUS_EC_FORMAT( |
201 | 130k | TryAgain, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE), |
202 | 130k | "Transaction was recently aborted: $0", metadata.transaction_id); |
203 | 130k | return status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); |
204 | 130k | } |
205 | 18.4E | VLOG_WITH_PREFIX(4) << "Create new transaction: " << metadata.transaction_id; |
206 | 1.67M | transactions_.insert(std::make_shared<RunningTransaction>( |
207 | 1.67M | metadata, TransactionalBatchData(), OneWayBitmap(), metadata.start_time, this)); |
208 | 1.67M | TransactionsModifiedUnlocked(&min_running_notifier); |
209 | 1.67M | return true; |
210 | 1.80M | } |
211 | | |
212 | 533k | HybridTime LocalCommitTime(const TransactionId& id) { |
213 | 533k | std::lock_guard<std::mutex> lock(mutex_); |
214 | 533k | auto it = transactions_.find(id); |
215 | 533k | if (it == transactions_.end()) { |
216 | 23.9k | return HybridTime::kInvalid; |
217 | 23.9k | } |
218 | 509k | return (**it).local_commit_time(); |
219 | 533k | } |
220 | | |
221 | 219k | boost::optional<CommitMetadata> LocalCommitData(const TransactionId& id) { |
222 | 219k | std::lock_guard<std::mutex> lock(mutex_); |
223 | 219k | auto it = transactions_.find(id); |
224 | 219k | if (it == transactions_.end()) { |
225 | 4.12k | return boost::none; |
226 | 4.12k | } |
227 | 215k | return boost::make_optional<CommitMetadata>({ |
228 | 215k | .commit_ht = (**it).local_commit_time(), |
229 | 215k | .aborted_subtxn_set = (**it).local_commit_aborted_subtxn_set(), |
230 | 215k | }); |
231 | 219k | } |
232 | | |
233 | 0 | std::pair<size_t, size_t> TEST_CountIntents() { |
234 | 0 | { |
235 | 0 | MinRunningNotifier min_running_notifier(&applier_); |
236 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
237 | 0 | ProcessRemoveQueueUnlocked(&min_running_notifier); |
238 | 0 | } |
239 | |
|
240 | 0 | std::pair<size_t, size_t> result(0, 0); |
241 | 0 | auto iter = docdb::CreateRocksDBIterator(db_.intents, |
242 | 0 | key_bounds_, |
243 | 0 | docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, |
244 | 0 | boost::none, |
245 | 0 | rocksdb::kDefaultQueryId); |
246 | 0 | for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { |
247 | 0 | ++result.first; |
248 | | // Count number of transaction, by counting metadata records. |
249 | 0 | if (iter.key().size() == TransactionId::StaticSize() + 1) { |
250 | 0 | ++result.second; |
251 | 0 | auto key = iter.key(); |
252 | 0 | key.remove_prefix(1); |
253 | 0 | auto id = CHECK_RESULT(FullyDecodeTransactionId(key)); |
254 | 0 | LOG_WITH_PREFIX(INFO) << "Stored txn meta: " << id; |
255 | 0 | } |
256 | 0 | } |
257 | |
|
258 | 0 | return result; |
259 | 0 | } |
260 | | |
261 | 1.60M | Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) { |
262 | 1.60M | if (pb.has_isolation()) { |
263 | 737k | auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(pb)); |
264 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
265 | 737k | auto it = transactions_.find(metadata.transaction_id); |
266 | 737k | if (it != transactions_.end()) { |
267 | 305 | RETURN_NOT_OK((**it).CheckAborted()); |
268 | 737k | } else if (WasTransactionRecentlyRemoved(metadata.transaction_id)) { |
269 | 0 | return MakeAbortedStatus(metadata.transaction_id); |
270 | 0 | } |
271 | 737k | return metadata; |
272 | 737k | } |
273 | | |
274 | 862k | auto id = VERIFY_RESULT(FullyDecodeTransactionId(pb.transaction_id())); |
275 | | |
276 | | // We are not trying to cleanup intents here because we don't know whether this transaction |
277 | | // has intents or not. |
278 | 0 | auto lock_and_iterator = LockAndFind( |
279 | 862k | id, "metadata"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist}); |
280 | 862k | if (!lock_and_iterator.found()) { |
281 | 1.22k | return STATUS(TryAgain, |
282 | 1.22k | Format("Unknown transaction, could be recently aborted: $0", id), Slice(), |
283 | 1.22k | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)); |
284 | 1.22k | } |
285 | 861k | RETURN_NOT_OK(lock_and_iterator.transaction().CheckAborted()); |
286 | 860k | return lock_and_iterator.transaction().metadata(); |
287 | 861k | } |
288 | | |
289 | | boost::optional<std::pair<IsolationLevel, TransactionalBatchData>> PrepareBatchData( |
290 | | const TransactionId& id, size_t batch_idx, |
291 | 2.30M | boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) { |
292 | | // We are not trying to cleanup intents here because we don't know whether this transaction |
293 | | // has intents of not. |
294 | 2.30M | auto lock_and_iterator = LockAndFind( |
295 | 2.30M | id, "metadata with write id"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist}); |
296 | 2.30M | if (!lock_and_iterator.found()) { |
297 | 481 | return boost::none; |
298 | 481 | } |
299 | 2.30M | auto& transaction = lock_and_iterator.transaction(); |
300 | 2.30M | transaction.AddReplicatedBatch(batch_idx, encoded_replicated_batches); |
301 | 2.30M | return std::make_pair(transaction.metadata().isolation, transaction.last_batch_data()); |
302 | 2.30M | } |
303 | | |
304 | 2.31M | void BatchReplicated(const TransactionId& id, const TransactionalBatchData& data) { |
305 | 2.31M | std::lock_guard<std::mutex> lock(mutex_); |
306 | 2.31M | auto it = transactions_.find(id); |
307 | 2.31M | if (it == transactions_.end()) { |
308 | 99 | LOG_IF_WITH_PREFIX0 (DFATAL, !WasTransactionRecentlyRemoved(id)) |
309 | 0 | << "Update last write id for unknown transaction: " << id; |
310 | 99 | return; |
311 | 99 | } |
312 | 2.31M | (**it).BatchReplicated(data); |
313 | 2.31M | } |
314 | | |
315 | 388k | void RequestStatusAt(const StatusRequest& request) { |
316 | 388k | auto lock_and_iterator = LockAndFind(*request.id, *request.reason, request.flags); |
317 | 388k | if (!lock_and_iterator.found()) { |
318 | 15.0k | request.callback( |
319 | 15.0k | STATUS_FORMAT(NotFound, "Request status of unknown transaction: $0", *request.id)); |
320 | 15.0k | return; |
321 | 15.0k | } |
322 | 373k | lock_and_iterator.transaction().RequestStatusAt(request, &lock_and_iterator.lock); |
323 | 373k | } |
324 | | |
325 | | // Registers a request, giving it a newly allocated id and returning this id. |
326 | 6.70M | int64_t RegisterRequest() { |
327 | 6.70M | std::lock_guard<std::mutex> lock(mutex_); |
328 | 6.70M | auto result = NextRequestIdUnlocked(); |
329 | 6.70M | running_requests_.push_back(result); |
330 | 6.70M | return result; |
331 | 6.70M | } |
332 | | |
333 | | // Unregisters a previously registered request. |
334 | 6.71M | void UnregisterRequest(int64_t request) { |
335 | 6.71M | MinRunningNotifier min_running_notifier(&applier_); |
336 | 6.71M | { |
337 | 6.71M | std::lock_guard<std::mutex> lock(mutex_); |
338 | 6.71M | DCHECK(!running_requests_.empty()); |
339 | 6.71M | if (running_requests_.front() != request) { |
340 | 2.43M | complete_requests_.push(request); |
341 | 2.43M | return; |
342 | 2.43M | } |
343 | 4.27M | running_requests_.pop_front(); |
344 | 6.71M | while (!complete_requests_.empty() && complete_requests_.top() == running_requests_.front()2.65M ) { |
345 | 2.43M | complete_requests_.pop(); |
346 | 2.43M | running_requests_.pop_front(); |
347 | 2.43M | } |
348 | | |
349 | 4.27M | CleanTransactionsUnlocked(&min_running_notifier); |
350 | 4.27M | } |
351 | 4.27M | } |
352 | | |
353 | | // Cleans transactions that are requested and now is safe to clean. |
354 | | // See RemoveUnlocked for details. |
355 | 4.27M | void CleanTransactionsUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { |
356 | 4.27M | ProcessRemoveQueueUnlocked(min_running_notifier); |
357 | | |
358 | 4.27M | CleanTransactionsQueue(&immediate_cleanup_queue_, min_running_notifier); |
359 | 4.27M | CleanTransactionsQueue(&graceful_cleanup_queue_, min_running_notifier); |
360 | 4.27M | } |
361 | | |
362 | | template <class Queue> |
363 | | void CleanTransactionsQueue( |
364 | 18.5M | Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { |
365 | 18.5M | int64_t min_request = running_requests_.empty() ? std::numeric_limits<int64_t>::max()17.5M |
366 | 18.5M | : running_requests_.front()979k ; |
367 | 18.5M | HybridTime safe_time; |
368 | 19.0M | while (!queue->empty()) { |
369 | 748k | const auto& front = queue->front(); |
370 | 748k | if (front.request_id >= min_request) { |
371 | 142k | break; |
372 | 142k | } |
373 | 606k | if (!front.Ready(&participant_context_, &safe_time)) { |
374 | 117k | break; |
375 | 117k | } |
376 | 488k | const auto& id = front.transaction_id; |
377 | 488k | RemoveIntentsData checkpoint; |
378 | 488k | auto it = transactions_.find(id); |
379 | | |
380 | 488k | if (it != transactions_.end() && !(**it).ProcessingApply()417k ) { |
381 | 417k | OpId op_id = (**it).GetOpId(); |
382 | 417k | participant_context_.GetLastCDCedData(&checkpoint); |
383 | 417k | VLOG_WITH_PREFIX10 (2) << "Cleaning tx opid is " << op_id.ToString() |
384 | 10 | << " checkpoint opid is " << checkpoint.op_id.ToString(); |
385 | | |
386 | 417k | if (checkpoint.op_id < op_id) { |
387 | 380 | break; |
388 | 380 | } |
389 | 416k | (**it).ScheduleRemoveIntents(*it); |
390 | 416k | RemoveTransaction(it, front.reason, min_running_notifier); |
391 | 416k | } |
392 | 18.4E | VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id; |
393 | 488k | queue->pop_front(); |
394 | 488k | } |
395 | 18.5M | } void yb::tablet::TransactionParticipant::Impl::CleanTransactionsQueue<std::__1::deque<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry> > >(std::__1::deque<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::GracefulCleanupQueueEntry> >*, yb::tablet::MinRunningNotifier*) Line | Count | Source | 364 | 14.2M | Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { | 365 | 14.2M | int64_t min_request = running_requests_.empty() ? std::numeric_limits<int64_t>::max()13.7M | 366 | 14.2M | : running_requests_.front()522k ; | 367 | 14.2M | HybridTime safe_time; | 368 | 14.5M | while (!queue->empty()) { | 369 | 380k | const auto& front = queue->front(); | 370 | 380k | if (front.request_id >= min_request) { | 371 | 14.6k | break; | 372 | 14.6k | } | 373 | 366k | if (!front.Ready(&participant_context_, &safe_time)) { | 374 | 117k | break; | 375 | 117k | } | 376 | 248k | const auto& id = front.transaction_id; | 377 | 248k | RemoveIntentsData checkpoint; | 378 | 248k | auto it = transactions_.find(id); | 379 | | | 380 | 248k | if (it != transactions_.end() && !(**it).ProcessingApply()186k ) { | 381 | 186k | OpId op_id = (**it).GetOpId(); | 382 | 186k | participant_context_.GetLastCDCedData(&checkpoint); | 383 | 186k | VLOG_WITH_PREFIX5 (2) << "Cleaning tx opid is " << op_id.ToString() | 384 | 5 | << " checkpoint opid is " << checkpoint.op_id.ToString(); | 385 | | | 386 | 186k | if (checkpoint.op_id < op_id) { | 387 | 0 | break; | 388 | 0 | } | 389 | 186k | (**it).ScheduleRemoveIntents(*it); | 390 | 186k | RemoveTransaction(it, front.reason, min_running_notifier); | 391 | 186k | } | 392 | 18.4E | VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id; | 393 | 248k | queue->pop_front(); | 394 | 248k | } | 395 | 14.2M | } |
void yb::tablet::TransactionParticipant::Impl::CleanTransactionsQueue<std::__1::deque<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry> > >(std::__1::deque<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry, std::__1::allocator<yb::tablet::TransactionParticipant::Impl::ImmediateCleanupQueueEntry> >*, yb::tablet::MinRunningNotifier*) Line | Count | Source | 364 | 4.27M | Queue* queue, MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { | 365 | 4.27M | int64_t min_request = running_requests_.empty() ? std::numeric_limits<int64_t>::max()3.82M | 366 | 4.27M | : running_requests_.front()457k ; | 367 | 4.27M | HybridTime safe_time; | 368 | 4.51M | while (!queue->empty()) { | 369 | 368k | const auto& front = queue->front(); | 370 | 368k | if (front.request_id >= min_request) { | 371 | 128k | break; | 372 | 128k | } | 373 | 239k | if (!front.Ready(&participant_context_, &safe_time)) { | 374 | 0 | break; | 375 | 0 | } | 376 | 239k | const auto& id = front.transaction_id; | 377 | 239k | RemoveIntentsData checkpoint; | 378 | 239k | auto it = transactions_.find(id); | 379 | | | 380 | 239k | if (it != transactions_.end() && !(**it).ProcessingApply()231k ) { | 381 | 231k | OpId op_id = (**it).GetOpId(); | 382 | 231k | participant_context_.GetLastCDCedData(&checkpoint); | 383 | 231k | VLOG_WITH_PREFIX5 (2) << "Cleaning tx opid is " << op_id.ToString() | 384 | 5 | << " checkpoint opid is " << checkpoint.op_id.ToString(); | 385 | | | 386 | 231k | if (checkpoint.op_id < op_id) { | 387 | 380 | break; | 388 | 380 | } | 389 | 230k | (**it).ScheduleRemoveIntents(*it); | 390 | 230k | RemoveTransaction(it, front.reason, min_running_notifier); | 391 | 230k | } | 392 | 18.4E | VLOG_WITH_PREFIX(2) << "Cleaned from queue: " << id; | 393 | 239k | queue->pop_front(); | 394 | 239k | } | 395 | 4.27M | } |
|
396 | | |
397 | 56.1k | void Abort(const TransactionId& id, TransactionStatusCallback callback) { |
398 | | // We are not trying to cleanup intents here because we don't know whether this transaction |
399 | | // has intents of not. |
400 | 56.1k | auto lock_and_iterator = LockAndFind( |
401 | 56.1k | id, "abort"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist}); |
402 | 56.1k | if (!lock_and_iterator.found()) { |
403 | 56 | callback(STATUS_FORMAT(NotFound, "Abort of unknown transaction: $0", id)); |
404 | 56 | return; |
405 | 56 | } |
406 | 56.1k | auto client_result = client(); |
407 | 56.1k | if (!client_result.ok()) { |
408 | 0 | callback(client_result.status()); |
409 | 0 | return; |
410 | 0 | } |
411 | 56.1k | lock_and_iterator.transaction().Abort( |
412 | 56.1k | *client_result, std::move(callback), &lock_and_iterator.lock); |
413 | 56.1k | } |
414 | | |
415 | 369k | CHECKED_STATUS CheckAborted(const TransactionId& id) { |
416 | | // We are not trying to cleanup intents here because we don't know whether this transaction |
417 | | // has intents of not. |
418 | 369k | auto lock_and_iterator = LockAndFind(id, "check aborted"s, TransactionLoadFlags{}); |
419 | 369k | if (!lock_and_iterator.found()) { |
420 | 4 | return MakeAbortedStatus(id); |
421 | 4 | } |
422 | 369k | return lock_and_iterator.transaction().CheckAborted(); |
423 | 369k | } |
424 | | |
425 | | void FillPriorities( |
426 | 146k | boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) { |
427 | | // TODO(dtxn) optimize locking |
428 | 179k | for (auto& pair : *inout) { |
429 | 179k | auto lock_and_iterator = LockAndFind( |
430 | 179k | pair.first, "fill priorities"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist}); |
431 | 179k | if (!lock_and_iterator.found() || lock_and_iterator.transaction().WasAborted()179k ) { |
432 | 981 | pair.second = 0; // Minimal priority for already aborted transactions |
433 | 178k | } else { |
434 | 178k | pair.second = lock_and_iterator.transaction().metadata().priority; |
435 | 178k | } |
436 | 179k | } |
437 | 146k | } |
438 | | |
439 | 1.36M | void Handle(std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term) { |
440 | 1.36M | auto txn_status = operation->request()->status(); |
441 | 1.36M | if (txn_status == TransactionStatus::APPLYING) { |
442 | 436k | HandleApplying(std::move(operation), term); |
443 | 436k | return; |
444 | 436k | } |
445 | | |
446 | 926k | if (txn_status == TransactionStatus::IMMEDIATE_CLEANUP || |
447 | 926k | txn_status == TransactionStatus::GRACEFUL_CLEANUP248k ) { |
448 | 926k | auto cleanup_type = txn_status == TransactionStatus::IMMEDIATE_CLEANUP |
449 | 926k | ? CleanupType::kImmediate677k |
450 | 926k | : CleanupType::kGraceful248k ; |
451 | 926k | HandleCleanup(std::move(operation), term, cleanup_type); |
452 | 926k | return; |
453 | 926k | } |
454 | | |
455 | 583 | auto error_status = STATUS_FORMAT( |
456 | 583 | InvalidArgument, "Unexpected status in transaction participant Handle: $0", *operation); |
457 | 583 | LOG_WITH_PREFIX(DFATAL) << error_status; |
458 | 583 | operation->CompleteWithStatus(error_status); |
459 | 583 | } |
460 | | |
461 | 1.30M | CHECKED_STATUS ProcessReplicated(const ReplicatedData& data) { |
462 | 1.30M | if (FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms > 0) { |
463 | 0 | SleepFor(1ms * FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms); |
464 | 0 | } |
465 | | |
466 | 1.30M | auto id = FullyDecodeTransactionId(data.state.transaction_id()); |
467 | 1.30M | if (!id.ok()) { |
468 | 0 | return id.status(); |
469 | 0 | } |
470 | | |
471 | 1.30M | if (data.state.status() == TransactionStatus::APPLYING) { |
472 | 1.30M | return ReplicatedApplying(*id, data); |
473 | 1.30M | } else if (716 data.state.status() == TransactionStatus::ABORTED716 ) { |
474 | 0 | return ReplicatedAborted(*id, data); |
475 | 0 | } |
476 | | |
477 | 716 | auto status = STATUS_FORMAT( |
478 | 716 | InvalidArgument, "Unexpected status in transaction participant ProcessReplicated: $0, $1", |
479 | 716 | data.op_id, data.state); |
480 | 716 | LOG_WITH_PREFIX(DFATAL) << status; |
481 | 716 | return status; |
482 | 1.30M | } |
483 | | |
484 | 0 | void Cleanup(TransactionIdSet&& set, TransactionStatusManager* status_manager) { |
485 | 0 | auto cleanup_aborts_task = std::make_shared<CleanupAbortsTask>( |
486 | 0 | &applier_, std::move(set), &participant_context_, status_manager, LogPrefix()); |
487 | 0 | cleanup_aborts_task->Prepare(cleanup_aborts_task); |
488 | 0 | participant_context_.StrandEnqueue(cleanup_aborts_task.get()); |
489 | 0 | } |
490 | | |
491 | 1.30M | CHECKED_STATUS ProcessApply(const TransactionApplyData& data) { |
492 | 18.4E | VLOG_WITH_PREFIX(2) << "Apply: " << data.ToString(); |
493 | | |
494 | 1.30M | loader_.WaitLoaded(data.transaction_id); |
495 | | |
496 | 1.30M | ScopedRWOperation operation(pending_op_counter_); |
497 | 1.30M | if (!operation.ok()) { |
498 | 0 | LOG_WITH_PREFIX(WARNING) << "Process apply rejected"; |
499 | 0 | return Status::OK(); |
500 | 0 | } |
501 | | |
502 | 1.30M | bool was_applied = false; |
503 | | |
504 | 1.30M | { |
505 | | // It is our last chance to load transaction metadata, if missing. |
506 | | // Because it will be deleted when intents are applied. |
507 | | // We are not trying to cleanup intents here because we don't know whether this transaction |
508 | | // has intents of not. |
509 | 1.30M | auto lock_and_iterator = LockAndFind( |
510 | 1.30M | data.transaction_id, "pre apply"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist}); |
511 | 1.30M | if (!lock_and_iterator.found()) { |
512 | | // This situation is normal and could be caused by 2 scenarios: |
513 | | // 1) Write batch failed, but originator doesn't know that. |
514 | | // 2) Failed to notify status tablet that we applied transaction. |
515 | 1.57k | YB_LOG_WITH_PREFIX_EVERY_N_SECS92 (WARNING, 1) |
516 | 92 | << Format("Apply of unknown transaction: $0", data); |
517 | 1.57k | NotifyApplied(data); |
518 | 1.57k | CHECK(!FLAGS_TEST_fail_in_apply_if_no_metadata); |
519 | 1.57k | return Status::OK(); |
520 | 1.57k | } |
521 | | |
522 | 1.30M | auto existing_commit_ht = lock_and_iterator.transaction().local_commit_time(); |
523 | 1.30M | if (existing_commit_ht) { |
524 | 18 | was_applied = true; |
525 | 18 | LOG_WITH_PREFIX(INFO) << "Transaction already applied: " << data.transaction_id; |
526 | 18 | LOG_IF_WITH_PREFIX0 (DFATAL, data.commit_ht != existing_commit_ht) |
527 | 0 | << "Transaction was previously applied with another commit ht: " << existing_commit_ht |
528 | 0 | << ", new commit ht: " << data.commit_ht; |
529 | 1.30M | } else { |
530 | 1.30M | transactions_.modify(lock_and_iterator.iterator, [&data](auto& txn) { |
531 | 1.30M | txn->SetLocalCommitData(data.commit_ht, data.aborted); |
532 | 1.30M | }); |
533 | | |
534 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, data.log_ht < last_safe_time_) |
535 | 18.4E | << "Apply transaction before last safe time " << data.transaction_id |
536 | 18.4E | << ": " << data.log_ht << " vs " << last_safe_time_; |
537 | 1.30M | } |
538 | 1.30M | } |
539 | | |
540 | 1.30M | if (!was_applied1.30M ) { |
541 | 1.30M | auto apply_state = CHECK_RESULT(applier_.ApplyIntents(data)); |
542 | | |
543 | 18.4E | VLOG_WITH_PREFIX(4) << "TXN: " << data.transaction_id << ": apply state: " |
544 | 18.4E | << apply_state.ToString(); |
545 | | |
546 | 1.30M | UpdateAppliedTransaction(data, apply_state, &operation); |
547 | 1.30M | } |
548 | | |
549 | 1.30M | NotifyApplied(data); |
550 | 1.30M | return Status::OK(); |
551 | 1.30M | } |
552 | | |
553 | | void UpdateAppliedTransaction( |
554 | | const TransactionApplyData& data, |
555 | | const docdb::ApplyTransactionState& apply_state, |
556 | 1.30M | ScopedRWOperation* operation) NO_THREAD_SAFETY_ANALYSIS { |
557 | 1.30M | MinRunningNotifier min_running_notifier(&applier_); |
558 | | // We are not trying to cleanup intents here because we don't know whether this transaction |
559 | | // has intents or not. |
560 | 1.30M | auto lock_and_iterator = LockAndFind( |
561 | 1.30M | data.transaction_id, "apply"s, TransactionLoadFlags{TransactionLoadFlag::kMustExist}); |
562 | 1.30M | if (lock_and_iterator.found()) { |
563 | 1.30M | lock_and_iterator.transaction().SetOpId(data.op_id); |
564 | 1.30M | if (!apply_state.active()) { |
565 | 1.30M | RemoveUnlocked(lock_and_iterator.iterator, RemoveReason::kApplied, &min_running_notifier); |
566 | 1.30M | } else { |
567 | 1.08k | lock_and_iterator.transaction().SetApplyData(apply_state, &data, operation); |
568 | 1.08k | } |
569 | 1.30M | } |
570 | 1.30M | } |
571 | | |
572 | 1.30M | void NotifyApplied(const TransactionApplyData& data) { |
573 | 1.30M | VLOG_WITH_PREFIX912 (4) << Format("NotifyApplied($0)", data)912 ; |
574 | | |
575 | 1.30M | if (data.leader_term != OpId::kUnknownTerm) { |
576 | 436k | tserver::UpdateTransactionRequestPB req; |
577 | 436k | req.set_tablet_id(data.status_tablet); |
578 | 436k | req.set_propagated_hybrid_time(participant_context_.Now().ToUint64()); |
579 | 436k | auto& state = *req.mutable_state(); |
580 | 436k | state.set_transaction_id(data.transaction_id.data(), data.transaction_id.size()); |
581 | 436k | state.set_status(TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS); |
582 | 436k | state.add_tablets(participant_context_.tablet_id()); |
583 | 436k | auto client_result = client(); |
584 | 436k | if (!client_result.ok()) { |
585 | 0 | LOG_WITH_PREFIX(WARNING) << "Get client failed: " << client_result.status(); |
586 | 0 | return; |
587 | 0 | } |
588 | | |
589 | 436k | auto handle = rpcs_.Prepare(); |
590 | 436k | if (handle != rpcs_.InvalidHandle()) { |
591 | 434k | *handle = UpdateTransaction( |
592 | 434k | TransactionRpcDeadline(), |
593 | 434k | nullptr /* remote_tablet */, |
594 | 434k | *client_result, |
595 | 434k | &req, |
596 | 434k | [this, handle](const Status& status, |
597 | 434k | const tserver::UpdateTransactionRequestPB& req, |
598 | 436k | const tserver::UpdateTransactionResponsePB& resp) { |
599 | 436k | client::UpdateClock(resp, &participant_context_); |
600 | 436k | rpcs_.Unregister(handle); |
601 | 436k | LOG_IF_WITH_PREFIX14 (WARNING, !status.ok()) << "Failed to send applied: " << status14 ; |
602 | 436k | }); |
603 | 434k | (**handle).SendRpc(); |
604 | 434k | } |
605 | 436k | } |
606 | 1.30M | } |
607 | | |
608 | 926k | CHECKED_STATUS ProcessCleanup(const TransactionApplyData& data, CleanupType cleanup_type) { |
609 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(data) << ", " << AsString(cleanup_type); |
610 | | |
611 | 926k | loader_.WaitLoaded(data.transaction_id); |
612 | | |
613 | 926k | MinRunningNotifier min_running_notifier(&applier_); |
614 | 926k | std::lock_guard<std::mutex> lock(mutex_); |
615 | 926k | auto it = transactions_.find(data.transaction_id); |
616 | 926k | if (it == transactions_.end()) { |
617 | 594k | if (cleanup_type == CleanupType::kImmediate) { |
618 | 563k | cleanup_cache_.Insert(data.transaction_id); |
619 | 563k | return Status::OK(); |
620 | 563k | } |
621 | 594k | } else if (332k (**it).ProcessingApply()332k ) { |
622 | 0 | VLOG_WITH_PREFIX(2) << "Don't cleanup transaction because it is applying intents: " |
623 | 0 | << data.transaction_id; |
624 | 0 | return Status::OK(); |
625 | 0 | } |
626 | | |
627 | 363k | if (cleanup_type == CleanupType::kGraceful) { |
628 | 248k | graceful_cleanup_queue_.push_back(GracefulCleanupQueueEntry{ |
629 | 248k | .request_id = request_serial_, |
630 | 248k | .transaction_id = data.transaction_id, |
631 | 248k | .reason = RemoveReason::kProcessCleanup, |
632 | 248k | .required_safe_time = participant_context_.Now(), |
633 | 248k | }); |
634 | 248k | return Status::OK(); |
635 | 248k | } |
636 | | |
637 | 114k | if (!RemoveUnlocked(it, RemoveReason::kProcessCleanup, &min_running_notifier)) { |
638 | 31.1k | VLOG_WITH_PREFIX1 (2) << "Have added aborted txn to cleanup queue: " |
639 | 1 | << data.transaction_id; |
640 | 31.1k | } |
641 | | |
642 | 114k | return Status::OK(); |
643 | 363k | } |
644 | | |
645 | | void SetDB( |
646 | | const docdb::DocDB& db, const docdb::KeyBounds* key_bounds, |
647 | 145k | RWOperationCounter* pending_op_counter) { |
648 | 145k | bool had_db = db_.intents != nullptr; |
649 | 145k | db_ = db; |
650 | 145k | key_bounds_ = key_bounds; |
651 | 145k | pending_op_counter_ = pending_op_counter; |
652 | | |
653 | | // We should only load transactions on the initial call to SetDB (when opening the tablet), not |
654 | | // in case of truncate/restore. |
655 | 145k | if (!had_db) { |
656 | 56.9k | loader_.Start(pending_op_counter, db_); |
657 | 56.9k | return; |
658 | 56.9k | } |
659 | | |
660 | 88.4k | loader_.WaitAllLoaded(); |
661 | 88.4k | MinRunningNotifier min_running_notifier(&applier_); |
662 | 88.4k | std::lock_guard<std::mutex> lock(mutex_); |
663 | 88.4k | transactions_.clear(); |
664 | 88.4k | TransactionsModifiedUnlocked(&min_running_notifier); |
665 | 88.4k | } |
666 | | |
667 | | void GetStatus( |
668 | | const TransactionId& transaction_id, |
669 | | size_t required_num_replicated_batches, |
670 | | int64_t term, |
671 | | tserver::GetTransactionStatusAtParticipantResponsePB* response, |
672 | 0 | rpc::RpcContext* context) { |
673 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
674 | 0 | auto it = transactions_.find(transaction_id); |
675 | 0 | if (it == transactions_.end()) { |
676 | 0 | response->set_num_replicated_batches(0); |
677 | 0 | response->set_status_hybrid_time(0); |
678 | 0 | } else { |
679 | 0 | if ((**it).WasAborted()) { |
680 | 0 | response->set_aborted(true); |
681 | 0 | return; |
682 | 0 | } |
683 | 0 | response->set_num_replicated_batches((**it).num_replicated_batches()); |
684 | 0 | response->set_status_hybrid_time((**it).last_batch_data().hybrid_time.ToUint64()); |
685 | 0 | } |
686 | 0 | } |
687 | | |
688 | 48 | TransactionParticipantContext* participant_context() const { |
689 | 48 | return &participant_context_; |
690 | 48 | } |
691 | | |
692 | 12.0M | HybridTime MinRunningHybridTime() { |
693 | 12.0M | auto result = min_running_ht_.load(std::memory_order_acquire); |
694 | 12.0M | if (result == HybridTime::kMax || result == HybridTime::kInvalid7.63M ) { |
695 | 4.43M | return result; |
696 | 4.43M | } |
697 | 7.62M | auto now = CoarseMonoClock::now(); |
698 | 7.62M | auto current_next_check_min_running = next_check_min_running_.load(std::memory_order_relaxed); |
699 | 7.62M | if (now >= current_next_check_min_running) { |
700 | 30.8k | if (next_check_min_running_.compare_exchange_strong( |
701 | 30.8k | current_next_check_min_running, |
702 | 30.8k | now + 1ms * FLAGS_transaction_min_running_check_interval_ms, |
703 | 30.8k | std::memory_order_acq_rel)) { |
704 | 30.7k | std::unique_lock<std::mutex> lock(mutex_); |
705 | 30.7k | if (transactions_.empty()) { |
706 | 1 | return HybridTime::kMax; |
707 | 1 | } |
708 | 30.7k | auto& first_txn = **transactions_.get<StartTimeTag>().begin(); |
709 | 30.7k | VLOG_WITH_PREFIX6 (1) << "Checking status of long running min txn " << first_txn.id() |
710 | 6 | << ": " << first_txn.WasAborted(); |
711 | 30.7k | static const std::string kRequestReason = "min running check"s; |
712 | | // Get transaction status |
713 | 30.7k | auto now_ht = participant_context_.Now(); |
714 | 30.7k | StatusRequest status_request = { |
715 | 30.7k | .id = &first_txn.id(), |
716 | 30.7k | .read_ht = now_ht, |
717 | 30.7k | .global_limit_ht = now_ht, |
718 | | // Could use 0 here, because read_ht == global_limit_ht. |
719 | | // So we cannot accept status with time >= read_ht and < global_limit_ht. |
720 | 30.7k | .serial_no = 0, |
721 | 30.7k | .reason = &kRequestReason, |
722 | 30.7k | .flags = TransactionLoadFlags{}, |
723 | 30.7k | .callback = [this, id = first_txn.id()](Result<TransactionStatusResult> result) { |
724 | | // Aborted status will result in cleanup of intents. |
725 | 30.7k | VLOG_WITH_PREFIX3 (1) << "Min running status " << id << ": " << result3 ; |
726 | 30.7k | } |
727 | 30.7k | }; |
728 | 30.7k | first_txn.RequestStatusAt(status_request, &lock); |
729 | 30.7k | } |
730 | 30.8k | } |
731 | 7.62M | return result; |
732 | 7.62M | } |
733 | | |
734 | 3.53k | void WaitMinRunningHybridTime(HybridTime ht) { |
735 | 3.53k | MinRunningNotifier min_running_notifier(&applier_); |
736 | 3.53k | std::unique_lock<std::mutex> lock(mutex_); |
737 | 3.53k | waiting_for_min_running_ht_ = ht; |
738 | 3.53k | CheckMinRunningHybridTimeSatisfiedUnlocked(&min_running_notifier); |
739 | 3.53k | } |
740 | | |
741 | 15 | CHECKED_STATUS ResolveIntents(HybridTime resolve_at, CoarseTimePoint deadline) { |
742 | 15 | RETURN_NOT_OK(WaitUntil(participant_context_.clock_ptr().get(), resolve_at, deadline)); |
743 | | |
744 | 15 | if (FLAGS_max_transactions_in_status_request == 0) { |
745 | 0 | return STATUS( |
746 | 0 | IllegalState, |
747 | 0 | "Cannot resolve intents when FLAGS_max_transactions_in_status_request is zero"); |
748 | 0 | } |
749 | | |
750 | 15 | std::vector<TransactionId> recheck_ids, committed_ids; |
751 | | |
752 | | // Maintain a set of transactions, check their statuses, and remove them as they get |
753 | | // committed/applied, aborted or we realize that transaction was not committed at |
754 | | // resolve_at. |
755 | 15 | for (;;) { |
756 | 15 | TransactionStatusResolver resolver( |
757 | 15 | &participant_context_, &rpcs_, FLAGS_max_transactions_in_status_request, |
758 | 15 | [this, resolve_at, &recheck_ids, &committed_ids]( |
759 | 15 | const std::vector <TransactionStatusInfo>& status_infos) { |
760 | 0 | std::vector<TransactionId> aborted; |
761 | 0 | for (const auto& info : status_infos) { |
762 | 0 | VLOG_WITH_PREFIX(4) << "Transaction status: " << info.ToString(); |
763 | 0 | if (info.status == TransactionStatus::COMMITTED) { |
764 | 0 | if (info.status_ht <= resolve_at) { |
765 | | // Transaction was committed, but not yet applied. |
766 | | // So rely on filtering recheck_ids before next phase. |
767 | 0 | committed_ids.push_back(info.transaction_id); |
768 | 0 | } |
769 | 0 | } else if (info.status == TransactionStatus::ABORTED) { |
770 | 0 | aborted.push_back(info.transaction_id); |
771 | 0 | } else { |
772 | 0 | LOG_IF_WITH_PREFIX(DFATAL, info.status != TransactionStatus::PENDING) |
773 | 0 | << "Transaction is in unexpected state: " << info.ToString(); |
774 | 0 | if (info.status_ht <= resolve_at) { |
775 | 0 | recheck_ids.push_back(info.transaction_id); |
776 | 0 | } |
777 | 0 | } |
778 | 0 | } |
779 | 0 | if (!aborted.empty()) { |
780 | 0 | MinRunningNotifier min_running_notifier(&applier_); |
781 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
782 | 0 | for (const auto& id : aborted) { |
783 | 0 | EnqueueRemoveUnlocked(id, RemoveReason::kStatusReceived, &min_running_notifier); |
784 | 0 | } |
785 | 0 | } |
786 | 0 | }); |
787 | 15 | auto se = ScopeExit([&resolver] { |
788 | 15 | resolver.Shutdown(); |
789 | 15 | }); |
790 | 15 | { |
791 | 15 | std::lock_guard <std::mutex> lock(mutex_); |
792 | 15 | if (recheck_ids.empty() && committed_ids.empty()) { |
793 | | // First step, check all transactions. |
794 | 15 | for (const auto& transaction : transactions_) { |
795 | 0 | if (!transaction->local_commit_time().is_valid()) { |
796 | 0 | resolver.Add(transaction->metadata().status_tablet, transaction->id()); |
797 | 0 | } |
798 | 0 | } |
799 | 15 | } else { |
800 | 0 | for (const auto& id : recheck_ids) { |
801 | 0 | auto it = transactions_.find(id); |
802 | 0 | if (it == transactions_.end() || (**it).local_commit_time().is_valid()) { |
803 | 0 | continue; |
804 | 0 | } |
805 | 0 | resolver.Add((**it).metadata().status_tablet, id); |
806 | 0 | } |
807 | 0 | auto filter = [this](const TransactionId& id) { |
808 | 0 | auto it = transactions_.find(id); |
809 | 0 | return it == transactions_.end() || (**it).local_commit_time().is_valid(); |
810 | 0 | }; |
811 | 0 | committed_ids.erase(std::remove_if(committed_ids.begin(), committed_ids.end(), filter), |
812 | 0 | committed_ids.end()); |
813 | 0 | } |
814 | 15 | } |
815 | | |
816 | 15 | recheck_ids.clear(); |
817 | 15 | resolver.Start(deadline); |
818 | | |
819 | 15 | RETURN_NOT_OK(resolver.ResultFuture().get()); |
820 | | |
821 | 15 | if (recheck_ids.empty()) { |
822 | 15 | if (committed_ids.empty()) { |
823 | 15 | break; |
824 | 15 | } else { |
825 | | // We are waiting only for committed transactions to be applied. |
826 | | // So just add some delay. |
827 | 0 | std::this_thread::sleep_for(10ms * std::min<size_t>(10, committed_ids.size())); |
828 | 0 | } |
829 | 15 | } |
830 | 15 | } |
831 | | |
832 | 15 | return Status::OK(); |
833 | 15 | } |
834 | | |
835 | 0 | size_t TEST_GetNumRunningTransactions() { |
836 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
837 | 0 | auto txn_to_id = [](const RunningTransactionPtr& txn) { |
838 | 0 | return txn->id(); |
839 | 0 | }; |
840 | 0 | VLOG_WITH_PREFIX(4) << "Transactions: " << AsString(transactions_, txn_to_id) |
841 | 0 | << ", requests: " << AsString(running_requests_); |
842 | 0 | return transactions_.size(); |
843 | 0 | } |
844 | | |
845 | 0 | OneWayBitmap TEST_TransactionReplicatedBatches(const TransactionId& id) { |
846 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
847 | 0 | auto it = transactions_.find(id); |
848 | 0 | return it != transactions_.end() ? (**it).replicated_batches() : OneWayBitmap(); |
849 | 0 | } |
850 | | |
851 | 0 | std::string DumpTransactions() { |
852 | 0 | std::string result; |
853 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
854 | |
|
855 | 0 | result += Format( |
856 | 0 | "{ safe_time_for_participant: $0 remove_queue_size: $1 ", |
857 | 0 | participant_context_.SafeTimeForTransactionParticipant(), remove_queue_.size()); |
858 | 0 | if (!remove_queue_.empty()) { |
859 | 0 | result += "remove_queue_front: " + AsString(remove_queue_.front()); |
860 | 0 | } |
861 | 0 | if (!running_requests_.empty()) { |
862 | 0 | result += "running_requests_front: " + AsString(running_requests_.front()); |
863 | 0 | } |
864 | 0 | result += "}\n"; |
865 | |
|
866 | 0 | for (const auto& txn : transactions_.get<StartTimeTag>()) { |
867 | 0 | result += txn->ToString(); |
868 | 0 | result += "\n"; |
869 | 0 | } |
870 | 0 | return result; |
871 | 0 | } |
872 | | |
873 | | CHECKED_STATUS StopActiveTxnsPriorTo( |
874 | 36.4k | HybridTime cutoff, CoarseTimePoint deadline, TransactionId* exclude_txn_id) { |
875 | 36.4k | vector<TransactionId> ids_to_abort; |
876 | 36.4k | { |
877 | 36.4k | std::lock_guard<std::mutex> lock(mutex_); |
878 | 36.4k | for (const auto& txn : transactions_.get<StartTimeTag>()) { |
879 | 210 | if (txn->start_ht() > cutoff || |
880 | 210 | (exclude_txn_id != nullptr && txn->id() == *exclude_txn_id32 )) { |
881 | 0 | break; |
882 | 0 | } |
883 | 210 | if (!txn->WasAborted()) { |
884 | 205 | ids_to_abort.push_back(txn->id()); |
885 | 205 | } |
886 | 210 | } |
887 | 36.4k | } |
888 | | |
889 | 36.4k | if (ids_to_abort.empty()) { |
890 | 36.2k | return Status::OK(); |
891 | 36.2k | } |
892 | | |
893 | | // It is ok to attempt to abort txns that have committed. We don't care |
894 | | // if our request succeeds or not. |
895 | 181 | CountDownLatch latch(ids_to_abort.size()); |
896 | 181 | std::atomic<bool> failed{false}; |
897 | 181 | Status return_status = Status::OK(); |
898 | 205 | for (const auto& id : ids_to_abort) { |
899 | 205 | Abort( |
900 | 205 | id, [this, id, &failed, &return_status, &latch](Result<TransactionStatusResult> result) { |
901 | 205 | VLOG_WITH_PREFIX0 (2) << "Aborting " << id << " got " << result0 ; |
902 | 205 | if (!result || |
903 | 205 | (result->status != TransactionStatus::COMMITTED && result->status != ABORTED148 )) { |
904 | 0 | LOG(INFO) << "Could not abort " << id << " got " << result; |
905 | |
|
906 | 0 | bool expected = false; |
907 | 0 | if (failed.compare_exchange_strong(expected, true)) { |
908 | 0 | if (!result) { |
909 | 0 | return_status = result.status(); |
910 | 0 | } else { |
911 | 0 | return_status = |
912 | 0 | STATUS_FORMAT(IllegalState, "Wrong status after abort: $0", result->status); |
913 | 0 | } |
914 | 0 | } |
915 | 0 | } |
916 | 205 | latch.CountDown(); |
917 | 205 | }); |
918 | 205 | } |
919 | | |
920 | 181 | return latch.WaitUntil(deadline) ? return_status151 |
921 | 181 | : STATUS30 (TimedOut, "TimedOut while aborting old transactions"); |
922 | 36.4k | } |
923 | | |
924 | 26.6k | Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) { |
925 | 26.6k | return participant_context_.WaitForSafeTime(safe_time, deadline); |
926 | 26.6k | } |
927 | | |
928 | 0 | void IgnoreAllTransactionsStartedBefore(HybridTime limit) { |
929 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
930 | 0 | ignore_all_transactions_started_before_ = |
931 | 0 | std::max(ignore_all_transactions_started_before_, limit); |
932 | 0 | } |
933 | | |
934 | | private: |
935 | | class AbortCheckTimeTag; |
936 | | class StartTimeTag; |
937 | | |
938 | | typedef boost::multi_index_container<RunningTransactionPtr, |
939 | | boost::multi_index::indexed_by < |
940 | | boost::multi_index::hashed_unique < |
941 | | boost::multi_index::const_mem_fun < |
942 | | RunningTransaction, const TransactionId&, &RunningTransaction::id> |
943 | | >, |
944 | | boost::multi_index::ordered_non_unique < |
945 | | boost::multi_index::tag<StartTimeTag>, |
946 | | boost::multi_index::const_mem_fun < |
947 | | RunningTransaction, HybridTime, &RunningTransaction::start_ht> |
948 | | >, |
949 | | boost::multi_index::ordered_non_unique < |
950 | | boost::multi_index::tag<AbortCheckTimeTag>, |
951 | | boost::multi_index::const_mem_fun < |
952 | | RunningTransaction, HybridTime, &RunningTransaction::abort_check_ht> |
953 | | > |
954 | | > |
955 | | > Transactions; |
956 | | |
957 | 56.9k | void CompleteLoad(const std::function<void()>& functor) override { |
958 | 56.9k | MinRunningNotifier min_running_notifier(&applier_); |
959 | 56.9k | std::lock_guard<std::mutex> lock(mutex_); |
960 | 56.9k | functor(); |
961 | 56.9k | TransactionsModifiedUnlocked(&min_running_notifier); |
962 | 56.9k | } |
963 | | |
964 | 56.9k | void LoadFinished(const ApplyStatesMap& pending_applies) override { |
965 | 56.9k | start_latch_.Wait(); |
966 | 56.9k | std::vector<ScopedRWOperation> operations; |
967 | 56.9k | operations.reserve(pending_applies.size()); |
968 | 56.9k | for (;;) { |
969 | 56.9k | if (closing_.load(std::memory_order_acquire)) { |
970 | 0 | LOG_WITH_PREFIX(INFO) |
971 | 0 | << __func__ << ": closing, not starting transaction status resolution"; |
972 | 0 | return; |
973 | 0 | } |
974 | 56.9k | while (operations.size() < pending_applies.size()) { |
975 | 0 | ScopedRWOperation operation(pending_op_counter_); |
976 | 0 | if (!operation.ok()) { |
977 | 0 | break; |
978 | 0 | } |
979 | 0 | operations.push_back(std::move(operation)); |
980 | 0 | } |
981 | 56.9k | if (operations.size() == pending_applies.size()) { |
982 | 56.9k | break; |
983 | 56.9k | } |
984 | 6 | operations.clear(); |
985 | 6 | YB_LOG_WITH_PREFIX_EVERY_N_SECS0 (INFO, 5) |
986 | 0 | << __func__ << ": unable to start scoped RW operation"; |
987 | 6 | std::this_thread::sleep_for(10ms); |
988 | 6 | } |
989 | | |
990 | 56.9k | if (!pending_applies.empty()) { |
991 | 0 | LOG_WITH_PREFIX(INFO) |
992 | 0 | << __func__ << ": starting " << pending_applies.size() << " pending applies"; |
993 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
994 | 0 | size_t idx = 0; |
995 | 0 | for (const auto& p : pending_applies) { |
996 | 0 | auto it = transactions_.find(p.first); |
997 | 0 | if (it == transactions_.end()) { |
998 | 0 | LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first); |
999 | 0 | continue; |
1000 | 0 | } |
1001 | | |
1002 | 0 | TransactionApplyData apply_data; |
1003 | 0 | apply_data.transaction_id = p.first; |
1004 | 0 | apply_data.commit_ht = p.second.commit_ht; |
1005 | 0 | (**it).SetApplyData(p.second.state, &apply_data, &operations[idx]); |
1006 | 0 | ++idx; |
1007 | 0 | } |
1008 | 0 | } |
1009 | | |
1010 | 56.9k | { |
1011 | 56.9k | LOG_WITH_PREFIX(INFO) << __func__ << ": starting transaction status resolution"; |
1012 | 56.9k | std::lock_guard<std::mutex> lock(status_resolvers_mutex_); |
1013 | 56.9k | for (auto& status_resolver : status_resolvers_) { |
1014 | 4 | status_resolver.Start(CoarseTimePoint::max()); |
1015 | 4 | } |
1016 | 56.9k | } |
1017 | | |
1018 | 56.9k | poller_.Start( |
1019 | 56.9k | &participant_context_.scheduler(), 1ms * FLAGS_transactions_status_poll_interval_ms); |
1020 | 56.9k | } |
1021 | | |
1022 | 3.53M | void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { |
1023 | 3.53M | metric_transactions_running_->set_value(transactions_.size()); |
1024 | 3.53M | if (!loader_.complete()) { |
1025 | 4 | return; |
1026 | 4 | } |
1027 | | |
1028 | 3.53M | if (transactions_.empty()) { |
1029 | 825k | min_running_ht_.store(HybridTime::kMax, std::memory_order_release); |
1030 | 825k | CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier); |
1031 | 825k | return; |
1032 | 825k | } |
1033 | | |
1034 | 2.70M | auto& first_txn = **transactions_.get<StartTimeTag>().begin(); |
1035 | 2.70M | if (first_txn.start_ht() != min_running_ht_.load(std::memory_order_relaxed)) { |
1036 | 1.22M | min_running_ht_.store(first_txn.start_ht(), std::memory_order_release); |
1037 | 1.22M | next_check_min_running_.store( |
1038 | 1.22M | CoarseMonoClock::now() + 1ms * FLAGS_transaction_min_running_check_delay_ms, |
1039 | 1.22M | std::memory_order_release); |
1040 | 1.22M | CheckMinRunningHybridTimeSatisfiedUnlocked(min_running_notifier); |
1041 | 1.22M | return; |
1042 | 1.22M | } |
1043 | 2.70M | } |
1044 | | |
1045 | | void EnqueueRemoveUnlocked( |
1046 | | const TransactionId& id, RemoveReason reason, |
1047 | 76.0k | MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) override { |
1048 | 76.0k | auto now = participant_context_.Now(); |
1049 | 76.0k | VLOG_WITH_PREFIX_AND_FUNC0 (4) << id << " at " << now << ", reason: " << AsString(reason)0 ; |
1050 | 76.0k | remove_queue_.emplace_back(RemoveQueueEntry{ |
1051 | 76.0k | .id = id, |
1052 | 76.0k | .time = now, |
1053 | 76.0k | .reason = reason, |
1054 | 76.0k | }); |
1055 | 76.0k | ProcessRemoveQueueUnlocked(min_running_notifier); |
1056 | 76.0k | } |
1057 | | |
1058 | 14.3M | void ProcessRemoveQueueUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { |
1059 | 14.3M | if (!remove_queue_.empty()) { |
1060 | | // When a transaction participant receives an "aborted" response from the coordinator, |
1061 | | // it puts this transaction into a "remove queue", also storing the current hybrid |
1062 | | // time. Then queue entries where time is less than current safe time are removed. |
1063 | | // |
1064 | | // This is correct because, from a transaction participant's point of view: |
1065 | | // |
1066 | | // (1) After we receive a response for a transaction status request, and |
1067 | | // learn that the transaction is unknown to the coordinator, our local |
1068 | | // hybrid time is at least as high as the local hybrid time on the |
1069 | | // transaction status coordinator at the time the transaction was deleted |
1070 | | // from the coordinator, due to hybrid time propagation on RPC response. |
1071 | | // |
1072 | | // (2) If our safe time is greater than the hybrid time when the |
1073 | | // transaction was deleted from the coordinator, then we have already |
1074 | | // applied this transaction's provisional records if the transaction was |
1075 | | // committed. |
1076 | 153k | auto safe_time = participant_context_.SafeTimeForTransactionParticipant(); |
1077 | 153k | if (!safe_time.is_valid()) { |
1078 | 0 | VLOG_WITH_PREFIX(3) << "Unable to obtain safe time to check remove queue"; |
1079 | 0 | return; |
1080 | 0 | } |
1081 | 153k | VLOG_WITH_PREFIX2 (3) << "Checking remove queue: " << safe_time << ", " |
1082 | 2 | << remove_queue_.front().ToString(); |
1083 | 153k | LOG_IF_WITH_PREFIX4 (DFATAL, safe_time < last_safe_time_) |
1084 | 4 | << "Safe time decreased: " << safe_time << " vs " << last_safe_time_; |
1085 | 153k | last_safe_time_ = safe_time; |
1086 | 229k | while (!remove_queue_.empty()) { |
1087 | 156k | auto& front = remove_queue_.front(); |
1088 | 156k | auto it = transactions_.find(front.id); |
1089 | 156k | if (it == transactions_.end() || (**it).local_commit_time().is_valid()154k ) { |
1090 | | // It is regular case, since the coordinator returns ABORTED for already applied |
1091 | | // transaction. But this particular tablet could not yet apply it, so |
1092 | | // it would add such transaction to remove queue. |
1093 | | // And it is the main reason why we are waiting for safe time, before removing intents. |
1094 | 2.29k | VLOG_WITH_PREFIX0 (4) << "Evicting txn from remove queue, w/o removing intents: " |
1095 | 0 | << front.ToString(); |
1096 | 2.29k | remove_queue_.pop_front(); |
1097 | 2.29k | continue; |
1098 | 2.29k | } |
1099 | 154k | if (safe_time <= front.time) { |
1100 | 80.6k | break; |
1101 | 80.6k | } |
1102 | 18.4E | VLOG_WITH_PREFIX(4) << "Removing from remove queue: " << front.ToString(); |
1103 | 73.5k | RemoveUnlocked(front.id, front.reason, min_running_notifier); |
1104 | 73.5k | remove_queue_.pop_front(); |
1105 | 73.5k | } |
1106 | 153k | } |
1107 | 14.3M | } |
1108 | | |
1109 | | // Tries to remove transaction with specified id. |
1110 | | // Returns true if transaction is not exists after call to this method, otherwise returns false. |
1111 | | // Which means that transaction will be removed later. |
1112 | | bool RemoveUnlocked( |
1113 | | const TransactionId& id, RemoveReason reason, |
1114 | 73.6k | MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) override { |
1115 | 73.6k | auto it = transactions_.find(id); |
1116 | 73.6k | if (it == transactions_.end()) { |
1117 | 0 | return true; |
1118 | 0 | } |
1119 | 73.6k | return RemoveUnlocked(it, reason, min_running_notifier); |
1120 | 73.6k | } |
1121 | | |
1122 | | bool RemoveUnlocked( |
1123 | | const Transactions::iterator& it, RemoveReason reason, |
1124 | 1.49M | MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { |
1125 | 1.49M | TransactionId txn_id = (**it).id(); |
1126 | 1.49M | RemoveIntentsData checkpoint; |
1127 | 1.49M | auto itr = transactions_.find(txn_id); |
1128 | 1.49M | OpId op_id = (**itr).GetOpId(); |
1129 | 1.49M | participant_context_.GetLastCDCedData(&checkpoint); |
1130 | | |
1131 | 1.49M | VLOG_WITH_PREFIX818 (2) << "Cleaning tx, data opid is " << op_id.ToString() |
1132 | 818 | << " checkpoint opid is " << checkpoint.op_id.ToString(); |
1133 | | |
1134 | 1.49M | if (running_requests_.empty() && |
1135 | 1.49M | (op_id < checkpoint.op_id)1.25M ) { |
1136 | 1.25M | (**it).ScheduleRemoveIntents(*it); |
1137 | 1.25M | RemoveTransaction(it, reason, min_running_notifier); |
1138 | 18.4E | VLOG_WITH_PREFIX(2) << "Cleaned transaction: " << txn_id << ", reason: " << reason |
1139 | 18.4E | << ", left: " << transactions_.size(); |
1140 | 1.25M | return true; |
1141 | 1.25M | } |
1142 | | |
1143 | | // We cannot remove the transaction at this point, because there are running requests |
1144 | | // that are reading the provisional DB and could request status of this transaction. |
1145 | | // So we store transaction in a queue and wait when all requests that we launched before our |
1146 | | // attempt to remove this transaction are completed. |
1147 | | // Since we try to remove the transaction after all its records are removed from the provisional |
1148 | | // DB, it is safe to complete removal at this point, because it means that there will be no more |
1149 | | // queries to status of this transactions. |
1150 | 241k | immediate_cleanup_queue_.push_back(ImmediateCleanupQueueEntry{ |
1151 | 241k | .request_id = request_serial_, |
1152 | 241k | .transaction_id = (**it).id(), |
1153 | 241k | .reason = reason, |
1154 | 241k | }); |
1155 | 241k | VLOG_WITH_PREFIX1.55k (2) << "Queued for cleanup: " << (**it).id() << ", reason: " << reason1.55k ; |
1156 | 241k | return false; |
1157 | 1.49M | } |
1158 | | |
1159 | | struct LockAndFindResult { |
1160 | 18.5k | static Transactions::const_iterator UninitializedIterator() { |
1161 | 18.5k | static const Transactions empty_transactions; |
1162 | 18.5k | return empty_transactions.end(); |
1163 | 18.5k | } |
1164 | | |
1165 | | std::unique_lock<std::mutex> lock; |
1166 | | Transactions::const_iterator iterator = UninitializedIterator(); |
1167 | | bool recently_removed = false; |
1168 | | |
1169 | 6.78M | bool found() const { |
1170 | 6.78M | return lock.owns_lock(); |
1171 | 6.78M | } |
1172 | | |
1173 | 7.80M | RunningTransaction& transaction() const { |
1174 | 7.80M | return **iterator; |
1175 | 7.80M | } |
1176 | | }; |
1177 | | |
1178 | | LockAndFindResult LockAndFind( |
1179 | 6.77M | const TransactionId& id, const std::string& reason, TransactionLoadFlags flags) { |
1180 | 6.77M | loader_.WaitLoaded(id); |
1181 | 6.77M | bool recently_removed; |
1182 | 6.77M | { |
1183 | 6.77M | std::unique_lock<std::mutex> lock(mutex_); |
1184 | 6.77M | auto it = transactions_.find(id); |
1185 | 6.77M | if (it != transactions_.end()) { |
1186 | 6.76M | if ((**it).start_ht() <= ignore_all_transactions_started_before_) { |
1187 | 0 | YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 1) |
1188 | 0 | << "Ignore transaction for '" << reason << "' because of limit: " |
1189 | 0 | << ignore_all_transactions_started_before_ << ", txn: " << AsString(**it); |
1190 | 0 | return LockAndFindResult{}; |
1191 | 0 | } |
1192 | 6.76M | return LockAndFindResult{ std::move(lock), it }; |
1193 | 6.76M | } |
1194 | 15.8k | recently_removed = WasTransactionRecentlyRemoved(id); |
1195 | 15.8k | } |
1196 | 16.9k | if (recently_removed15.8k ) { |
1197 | 16.9k | VLOG_WITH_PREFIX0 (1) |
1198 | 0 | << "Attempt to load recently removed transaction: " << id << ", for: " << reason; |
1199 | 16.9k | LockAndFindResult result; |
1200 | 16.9k | result.recently_removed = true; |
1201 | 16.9k | return result; |
1202 | 16.9k | } |
1203 | 18.4E | metric_transaction_not_found_->Increment(); |
1204 | 18.4E | if (flags.Test(TransactionLoadFlag::kMustExist)) { |
1205 | 1.59k | YB_LOG_WITH_PREFIX_EVERY_N_SECS107 (WARNING, 1) |
1206 | 107 | << "Transaction not found: " << id << ", for: " << reason; |
1207 | 18.4E | } else { |
1208 | 18.4E | YB_LOG_WITH_PREFIX_EVERY_N_SECS0 (INFO, 1) |
1209 | 0 | << "Transaction not found: " << id << ", for: " << reason; |
1210 | 18.4E | } |
1211 | 18.4E | if (flags.Test(TransactionLoadFlag::kCleanup)) { |
1212 | 0 | VLOG_WITH_PREFIX(2) << "Schedule cleanup for: " << id; |
1213 | 0 | auto cleanup_task = std::make_shared<CleanupIntentsTask>( |
1214 | 0 | &participant_context_, &applier_, id); |
1215 | 0 | cleanup_task->Prepare(cleanup_task); |
1216 | 0 | participant_context_.StrandEnqueue(cleanup_task.get()); |
1217 | 0 | } |
1218 | 18.4E | return LockAndFindResult{}; |
1219 | 15.8k | } |
1220 | | |
1221 | | void LoadTransaction( |
1222 | | TransactionMetadata&& metadata, |
1223 | | TransactionalBatchData&& last_batch_data, |
1224 | | OneWayBitmap&& replicated_batches, |
1225 | 4 | const ApplyStateWithCommitHt* pending_apply) override { |
1226 | 4 | MinRunningNotifier min_running_notifier(&applier_); |
1227 | 4 | std::lock_guard<std::mutex> lock(mutex_); |
1228 | 4 | auto txn = std::make_shared<RunningTransaction>( |
1229 | 4 | std::move(metadata), std::move(last_batch_data), std::move(replicated_batches), |
1230 | 4 | participant_context_.Now().AddDelta(1ms * FLAGS_transaction_abort_check_timeout_ms), this); |
1231 | 4 | if (pending_apply) { |
1232 | 0 | VLOG_WITH_PREFIX(4) << "Apply state found for " << txn->id() << ": " |
1233 | 0 | << pending_apply->ToString(); |
1234 | 0 | txn->SetLocalCommitData(pending_apply->commit_ht, pending_apply->state.aborted); |
1235 | 0 | txn->SetApplyData(pending_apply->state); |
1236 | 0 | } |
1237 | 4 | transactions_.insert(txn); |
1238 | 4 | TransactionsModifiedUnlocked(&min_running_notifier); |
1239 | 4 | } |
1240 | | |
1241 | 492k | Result<client::YBClient*> client() const { |
1242 | 492k | auto cached_value = client_cache_.load(std::memory_order_acquire); |
1243 | 492k | if (cached_value != nullptr) { |
1244 | 486k | return cached_value; |
1245 | 486k | } |
1246 | 5.67k | auto future_status = participant_context_.client_future().wait_for( |
1247 | 5.67k | TransactionRpcTimeout().ToSteadyDuration()); |
1248 | 5.67k | if (future_status != std::future_status::ready) { |
1249 | 0 | return STATUS(TimedOut, "Client not ready"); |
1250 | 0 | } |
1251 | 5.67k | auto result = participant_context_.client_future().get(); |
1252 | 5.67k | client_cache_.store(result, std::memory_order_release); |
1253 | 5.67k | return result; |
1254 | 5.67k | } |
1255 | | |
1256 | 329k | const std::string& LogPrefix() const override { |
1257 | 329k | return log_prefix_; |
1258 | 329k | } |
1259 | | |
1260 | | void RemoveTransaction(Transactions::iterator it, RemoveReason reason, |
1261 | | MinRunningNotifier* min_running_notifier) |
1262 | 1.67M | REQUIRES(mutex_) { |
1263 | 1.67M | auto now = CoarseMonoClock::now(); |
1264 | 1.67M | CleanupRecentlyRemovedTransactions(now); |
1265 | 1.67M | auto& transaction = **it; |
1266 | 1.67M | YB_TRANSACTION_DUMP( |
1267 | 1.67M | Remove, participant_context_.tablet_id(), transaction.id(), participant_context_.Now(), |
1268 | 1.67M | static_cast<uint8_t>(reason)); |
1269 | 1.67M | recently_removed_transactions_cleanup_queue_.push_back({transaction.id(), now + 15s}); |
1270 | 1.67M | LOG_IF_WITH_PREFIX473 (DFATAL, !recently_removed_transactions_.insert(transaction.id()).second) |
1271 | 473 | << "Transaction removed twice: " << transaction.id(); |
1272 | 1.67M | VLOG_WITH_PREFIX984 (4) << "Remove transaction: " << transaction.id()984 ; |
1273 | 1.67M | transactions_.erase(it); |
1274 | 1.67M | TransactionsModifiedUnlocked(min_running_notifier); |
1275 | 1.67M | } |
1276 | | |
1277 | 4.23M | void CleanupRecentlyRemovedTransactions(CoarseTimePoint now) { |
1278 | 5.38M | while (!recently_removed_transactions_cleanup_queue_.empty() && |
1279 | 5.38M | recently_removed_transactions_cleanup_queue_.front().time <= now5.33M ) { |
1280 | 1.14M | recently_removed_transactions_.erase(recently_removed_transactions_cleanup_queue_.front().id); |
1281 | 1.14M | recently_removed_transactions_cleanup_queue_.pop_front(); |
1282 | 1.14M | } |
1283 | 4.23M | } |
1284 | | |
1285 | 2.56M | bool WasTransactionRecentlyRemoved(const TransactionId& id) { |
1286 | 2.56M | CleanupRecentlyRemovedTransactions(CoarseMonoClock::now()); |
1287 | 2.56M | return recently_removed_transactions_.count(id) != 0; |
1288 | 2.56M | } |
1289 | | |
1290 | | void CheckMinRunningHybridTimeSatisfiedUnlocked( |
1291 | 2.05M | MinRunningNotifier* min_running_notifier) { |
1292 | 2.05M | if (min_running_ht_.load(std::memory_order_acquire) <= waiting_for_min_running_ht_2.05M ) { |
1293 | 2.05M | return; |
1294 | 2.05M | } |
1295 | 18.4E | waiting_for_min_running_ht_ = HybridTime::kMax; |
1296 | 18.4E | min_running_notifier->Satisfied(); |
1297 | 18.4E | } |
1298 | | |
1299 | | void TransactionsStatus( |
1300 | 1.70k | const std::vector<TransactionStatusInfo>& status_infos) { |
1301 | 1.70k | MinRunningNotifier min_running_notifier(&applier_); |
1302 | 1.70k | std::lock_guard<std::mutex> lock(mutex_); |
1303 | 1.70k | HybridTime now = participant_context_.Now(); |
1304 | 1.74k | for (const auto& info : status_infos) { |
1305 | 1.74k | auto it = transactions_.find(info.transaction_id); |
1306 | 1.74k | if (it == transactions_.end()) { |
1307 | 8 | continue; |
1308 | 8 | } |
1309 | 1.73k | if ((**it).UpdateStatus( |
1310 | 1.73k | info.status, info.status_ht, info.coordinator_safe_time, info.aborted_subtxn_set)) { |
1311 | 58 | EnqueueRemoveUnlocked( |
1312 | 58 | info.transaction_id, RemoveReason::kStatusReceived, &min_running_notifier); |
1313 | 1.67k | } else { |
1314 | 1.67k | transactions_.modify(it, [now](const auto& txn) { |
1315 | 1.67k | txn->UpdateAbortCheckHT(now, UpdateAbortCheckHTMode::kStatusResponseReceived); |
1316 | 1.67k | }); |
1317 | 1.67k | } |
1318 | 1.73k | } |
1319 | 1.70k | } |
1320 | | |
1321 | 435k | void HandleApplying(std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term) { |
1322 | 435k | if (RandomActWithProbability(GetAtomicFlag( |
1323 | 435k | &FLAGS_TEST_transaction_ignore_applying_probability))) { |
1324 | 0 | VLOG_WITH_PREFIX(2) |
1325 | 0 | << "TEST: Rejected apply: " |
1326 | 0 | << FullyDecodeTransactionId(operation->request()->transaction_id()); |
1327 | 0 | operation->CompleteWithStatus(Status::OK()); |
1328 | 0 | return; |
1329 | 0 | } |
1330 | 435k | participant_context_.SubmitUpdateTransaction(std::move(operation), term); |
1331 | 435k | } |
1332 | | |
1333 | | void HandleCleanup( |
1334 | | std::unique_ptr<tablet::UpdateTxnOperation> operation, int64_t term, |
1335 | 926k | CleanupType cleanup_type) { |
1336 | 926k | VLOG_WITH_PREFIX330 (3) << "Cleanup"330 ; |
1337 | 926k | auto id = FullyDecodeTransactionId(operation->request()->transaction_id()); |
1338 | 926k | if (!id.ok()) { |
1339 | 0 | operation->CompleteWithStatus(id.status()); |
1340 | 0 | return; |
1341 | 0 | } |
1342 | | |
1343 | 926k | TransactionApplyData data = { |
1344 | 926k | .leader_term = term, |
1345 | 926k | .transaction_id = *id, |
1346 | 926k | .aborted = AbortedSubTransactionSet(), |
1347 | 926k | .op_id = OpId(), |
1348 | 926k | .commit_ht = HybridTime(), |
1349 | 926k | .log_ht = HybridTime(), |
1350 | 926k | .sealed = operation->request()->sealed(), |
1351 | 926k | .status_tablet = std::string() |
1352 | 926k | }; |
1353 | 926k | WARN_NOT_OK(ProcessCleanup(data, cleanup_type), "Process cleanup failed"); |
1354 | 926k | operation->CompleteWithStatus(Status::OK()); |
1355 | 926k | } |
1356 | | |
1357 | 1.30M | CHECKED_STATUS ReplicatedApplying(const TransactionId& id, const ReplicatedData& data) { |
1358 | | // data.state.tablets contains only status tablet. |
1359 | 1.30M | if (data.state.tablets_size() != 1) { |
1360 | 0 | return STATUS_FORMAT(InvalidArgument, |
1361 | 0 | "Expected only one table during APPLYING, state received: $0", |
1362 | 0 | data.state); |
1363 | 0 | } |
1364 | 1.30M | HybridTime commit_time(data.state.commit_hybrid_time()); |
1365 | 1.30M | TransactionApplyData apply_data = { |
1366 | 1.30M | .leader_term = data.leader_term, |
1367 | 1.30M | .transaction_id = id, |
1368 | 1.30M | .aborted = VERIFY_RESULT(AbortedSubTransactionSet::FromPB(data.state.aborted().set())), |
1369 | 0 | .op_id = data.op_id, |
1370 | 1.30M | .commit_ht = commit_time, |
1371 | 1.30M | .log_ht = data.hybrid_time, |
1372 | 1.30M | .sealed = data.sealed, |
1373 | 1.30M | .status_tablet = data.state.tablets(0) |
1374 | 1.30M | }; |
1375 | 1.30M | if (!data.already_applied_to_regular_db) { |
1376 | 1.30M | return ProcessApply(apply_data); |
1377 | 1.30M | } |
1378 | 1.21k | if (!data.sealed) { |
1379 | 0 | return ProcessCleanup(apply_data, CleanupType::kImmediate); |
1380 | 0 | } |
1381 | 1.21k | return Status::OK(); |
1382 | 1.21k | } |
1383 | | |
1384 | 0 | CHECKED_STATUS ReplicatedAborted(const TransactionId& id, const ReplicatedData& data) { |
1385 | 0 | MinRunningNotifier min_running_notifier(&applier_); |
1386 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
1387 | 0 | auto it = transactions_.find(id); |
1388 | 0 | if (it == transactions_.end()) { |
1389 | 0 | TransactionMetadata metadata = { |
1390 | 0 | .transaction_id = id, |
1391 | 0 | .isolation = IsolationLevel::NON_TRANSACTIONAL, |
1392 | 0 | .status_tablet = TabletId(), |
1393 | 0 | .priority = 0 |
1394 | 0 | }; |
1395 | 0 | it = transactions_.insert(std::make_shared<RunningTransaction>( |
1396 | 0 | metadata, TransactionalBatchData(), OneWayBitmap(), HybridTime::kMax, this)).first; |
1397 | 0 | TransactionsModifiedUnlocked(&min_running_notifier); |
1398 | 0 | } |
1399 | | |
1400 | | // TODO(dtxn) store this fact to rocksdb. |
1401 | 0 | (**it).Aborted(); |
1402 | |
|
1403 | 0 | return Status::OK(); |
1404 | 0 | } |
1405 | | |
1406 | 10.0M | void Poll() { |
1407 | 10.0M | { |
1408 | 10.0M | MinRunningNotifier min_running_notifier(&applier_); |
1409 | 10.0M | std::lock_guard<std::mutex> lock(mutex_); |
1410 | | |
1411 | 10.0M | ProcessRemoveQueueUnlocked(&min_running_notifier); |
1412 | 10.0M | if (ANNOTATE_UNPROTECTED_READ(FLAGS_transactions_poll_check_aborted)) { |
1413 | 9.94M | CheckForAbortedTransactions(); |
1414 | 9.94M | } |
1415 | 10.0M | CleanTransactionsQueue(&graceful_cleanup_queue_, &min_running_notifier); |
1416 | 10.0M | } |
1417 | 10.0M | CleanupStatusResolvers(); |
1418 | 10.0M | } |
1419 | | |
1420 | 9.97M | void CheckForAbortedTransactions() REQUIRES(mutex_) { |
1421 | 9.97M | if (transactions_.empty()) { |
1422 | 9.92M | return; |
1423 | 9.92M | } |
1424 | 51.7k | auto now = participant_context_.Now(); |
1425 | 51.7k | auto& index = transactions_.get<AbortCheckTimeTag>(); |
1426 | 51.7k | TransactionStatusResolver* resolver = nullptr; |
1427 | 82.8k | for (;;) { |
1428 | 82.8k | auto& txn = **index.begin(); |
1429 | 82.8k | if (txn.abort_check_ht() > now) { |
1430 | 80.1k | break; |
1431 | 80.1k | } |
1432 | 2.66k | if (!resolver) { |
1433 | 1.68k | resolver = &AddStatusResolver(); |
1434 | 1.68k | } |
1435 | 2.66k | const auto& metadata = txn.metadata(); |
1436 | 2.66k | VLOG_WITH_PREFIX947 (4) |
1437 | 947 | << "Check aborted: " << metadata.status_tablet << ", " << metadata.transaction_id; |
1438 | 2.66k | resolver->Add(metadata.status_tablet, metadata.transaction_id); |
1439 | 2.66k | index.modify(index.begin(), [now](const auto& txn) { |
1440 | 1.73k | txn->UpdateAbortCheckHT(now, UpdateAbortCheckHTMode::kStatusRequestSent); |
1441 | 1.73k | }); |
1442 | 2.66k | } |
1443 | | |
1444 | | // We don't introduce limit on number of status resolutions here, because we cannot predict |
1445 | | // transactions throughput. And we rely the logic that we cannot start multiple resolutions |
1446 | | // for single transaction because we set abort check hybrid time to the same value as |
1447 | | // status resolution deadline. |
1448 | 51.7k | if (resolver) { |
1449 | 1.64k | resolver->Start(CoarseMonoClock::now() + 1ms * FLAGS_transaction_abort_check_timeout_ms); |
1450 | 1.64k | } |
1451 | 51.7k | } |
1452 | | |
1453 | 9.94M | void CleanupStatusResolvers() EXCLUDES(status_resolvers_mutex_) { |
1454 | 9.94M | std::lock_guard<std::mutex> lock(status_resolvers_mutex_); |
1455 | 9.94M | while (!status_resolvers_.empty() && !status_resolvers_.front().Running()3.45k ) { |
1456 | 1.66k | status_resolvers_.front().Shutdown(); |
1457 | 1.66k | status_resolvers_.pop_front(); |
1458 | 1.66k | } |
1459 | 9.94M | } |
1460 | | |
1461 | 1.70k | TransactionStatusResolver& AddStatusResolver() override EXCLUDES(status_resolvers_mutex_) { |
1462 | 1.70k | std::lock_guard<std::mutex> lock(status_resolvers_mutex_); |
1463 | 1.70k | status_resolvers_.emplace_back( |
1464 | 1.70k | &participant_context_, &rpcs_, FLAGS_max_transactions_in_status_request, |
1465 | 1.70k | std::bind(&Impl::TransactionsStatus, this, _1)); |
1466 | 1.70k | return status_resolvers_.back(); |
1467 | 1.70k | } |
1468 | | |
1469 | | struct ImmediateCleanupQueueEntry { |
1470 | | int64_t request_id; |
1471 | | TransactionId transaction_id; |
1472 | | RemoveReason reason; |
1473 | | |
1474 | 239k | bool Ready(TransactionParticipantContext* participant_context, HybridTime* safe_time) const { |
1475 | 239k | return true; |
1476 | 239k | } |
1477 | | }; |
1478 | | |
1479 | | struct GracefulCleanupQueueEntry { |
1480 | | int64_t request_id; |
1481 | | TransactionId transaction_id; |
1482 | | RemoveReason reason; |
1483 | | HybridTime required_safe_time; |
1484 | | |
1485 | 366k | bool Ready(TransactionParticipantContext* participant_context, HybridTime* safe_time) const { |
1486 | 366k | if (!*safe_time) { |
1487 | 323k | *safe_time = participant_context->SafeTimeForTransactionParticipant(); |
1488 | 323k | } |
1489 | 366k | return *safe_time >= required_safe_time; |
1490 | 366k | } |
1491 | | }; |
1492 | | |
1493 | | std::string log_prefix_; |
1494 | | |
1495 | | docdb::DocDB db_; |
1496 | | const docdb::KeyBounds* key_bounds_; |
1497 | | // Owned externally, should be guaranteed that would not be destroyed before this. |
1498 | | RWOperationCounter* pending_op_counter_ = nullptr; |
1499 | | |
1500 | | Transactions transactions_; |
1501 | | // Ids of running requests, stored in increasing order. |
1502 | | std::deque<int64_t> running_requests_; |
1503 | | // Ids of complete requests, minimal request is on top. |
1504 | | // Contains only ids greater than first running request id, otherwise entry is removed |
1505 | | // from both collections. |
1506 | | std::priority_queue<int64_t, std::vector<int64_t>, std::greater<void>> complete_requests_; |
1507 | | |
1508 | | // Queues of transaction ids that should be cleaned, paired with request that should be completed |
1509 | | // in order to be able to do clean. |
1510 | | // Immediate cleanup is performed as soon as possible. |
1511 | | // Graceful cleanup is performed after safe time becomes greater than cleanup request hybrid time. |
1512 | | std::deque<ImmediateCleanupQueueEntry> immediate_cleanup_queue_ GUARDED_BY(mutex_); |
1513 | | std::deque<GracefulCleanupQueueEntry> graceful_cleanup_queue_ GUARDED_BY(mutex_); |
1514 | | |
1515 | | // Remove queue maintains transactions that could be cleaned when safe time for follower reaches |
1516 | | // appropriate time for an entry. |
1517 | | // Since we add entries with increasing time, this queue is ordered by time. |
1518 | | struct RemoveQueueEntry { |
1519 | | TransactionId id; |
1520 | | HybridTime time; |
1521 | | RemoveReason reason; |
1522 | | |
1523 | 0 | std::string ToString() const { |
1524 | 0 | return YB_STRUCT_TO_STRING(id, time, reason); |
1525 | 0 | } |
1526 | | }; |
1527 | | |
1528 | | // Guarded by RunningTransactionContext::mutex_ |
1529 | | std::deque<RemoveQueueEntry> remove_queue_; |
1530 | | |
1531 | | // Guarded by RunningTransactionContext::mutex_ |
1532 | | HybridTime last_safe_time_ = HybridTime::kMin; |
1533 | | |
1534 | | HybridTime ignore_all_transactions_started_before_ GUARDED_BY(mutex_) = HybridTime::kMin; |
1535 | | |
1536 | | std::unordered_set<TransactionId, TransactionIdHash> recently_removed_transactions_; |
1537 | | struct RecentlyRemovedTransaction { |
1538 | | TransactionId id; |
1539 | | CoarseTimePoint time; |
1540 | | }; |
1541 | | std::deque<RecentlyRemovedTransaction> recently_removed_transactions_cleanup_queue_; |
1542 | | |
1543 | | std::mutex status_resolvers_mutex_; |
1544 | | std::deque<TransactionStatusResolver> status_resolvers_ GUARDED_BY(status_resolvers_mutex_); |
1545 | | |
1546 | | scoped_refptr<AtomicGauge<uint64_t>> metric_transactions_running_; |
1547 | | scoped_refptr<Counter> metric_transaction_not_found_; |
1548 | | |
1549 | | TransactionLoader loader_; |
1550 | | std::atomic<bool> closing_{false}; |
1551 | | CountDownLatch start_latch_{1}; |
1552 | | |
1553 | | std::atomic<HybridTime> min_running_ht_{HybridTime::kInvalid}; |
1554 | | std::atomic<CoarseTimePoint> next_check_min_running_{CoarseTimePoint()}; |
1555 | | HybridTime waiting_for_min_running_ht_ = HybridTime::kMax; |
1556 | | std::atomic<bool> shutdown_done_{false}; |
1557 | | |
1558 | | mutable std::atomic<client::YBClient*> client_cache_{nullptr}; |
1559 | | |
1560 | | LRUCache<TransactionId> cleanup_cache_{FLAGS_transactions_cleanup_cache_size}; |
1561 | | |
1562 | | rpc::Poller poller_; |
1563 | | }; |
1564 | | |
1565 | | TransactionParticipant::TransactionParticipant( |
1566 | | TransactionParticipantContext* context, TransactionIntentApplier* applier, |
1567 | | const scoped_refptr<MetricEntity>& entity) |
1568 | 56.9k | : impl_(new Impl(context, applier, entity)) { |
1569 | 56.9k | } |
1570 | | |
1571 | 44.6k | TransactionParticipant::~TransactionParticipant() { |
1572 | 44.6k | } |
1573 | | |
1574 | 56.9k | void TransactionParticipant::Start() { |
1575 | 56.9k | impl_->Start(); |
1576 | 56.9k | } |
1577 | | |
1578 | 1.80M | Result<bool> TransactionParticipant::Add(const TransactionMetadata& metadata) { |
1579 | 1.80M | return impl_->Add(metadata); |
1580 | 1.80M | } |
1581 | | |
1582 | | Result<TransactionMetadata> TransactionParticipant::PrepareMetadata( |
1583 | 1.60M | const TransactionMetadataPB& pb) { |
1584 | 1.60M | return impl_->PrepareMetadata(pb); |
1585 | 1.60M | } |
1586 | | |
1587 | | boost::optional<std::pair<IsolationLevel, TransactionalBatchData>> |
1588 | | TransactionParticipant::PrepareBatchData( |
1589 | | const TransactionId& id, size_t batch_idx, |
1590 | 2.31M | boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) { |
1591 | 2.31M | return impl_->PrepareBatchData(id, batch_idx, encoded_replicated_batches); |
1592 | 2.31M | } |
1593 | | |
1594 | | void TransactionParticipant::BatchReplicated( |
1595 | 2.31M | const TransactionId& id, const TransactionalBatchData& data) { |
1596 | 2.31M | return impl_->BatchReplicated(id, data); |
1597 | 2.31M | } |
1598 | | |
1599 | 534k | HybridTime TransactionParticipant::LocalCommitTime(const TransactionId& id) { |
1600 | 534k | return impl_->LocalCommitTime(id); |
1601 | 534k | } |
1602 | | |
1603 | 219k | boost::optional<CommitMetadata> TransactionParticipant::LocalCommitData(const TransactionId& id) { |
1604 | 219k | return impl_->LocalCommitData(id); |
1605 | 219k | } |
1606 | | |
1607 | 0 | std::pair<size_t, size_t> TransactionParticipant::TEST_CountIntents() const { |
1608 | 0 | return impl_->TEST_CountIntents(); |
1609 | 0 | } |
1610 | | |
1611 | 388k | void TransactionParticipant::RequestStatusAt(const StatusRequest& request) { |
1612 | 388k | return impl_->RequestStatusAt(request); |
1613 | 388k | } |
1614 | | |
1615 | 6.71M | int64_t TransactionParticipant::RegisterRequest() { |
1616 | 6.71M | return impl_->RegisterRequest(); |
1617 | 6.71M | } |
1618 | | |
1619 | 6.71M | void TransactionParticipant::UnregisterRequest(int64_t request) { |
1620 | 6.71M | impl_->UnregisterRequest(request); |
1621 | 6.71M | } |
1622 | | |
1623 | | void TransactionParticipant::Abort(const TransactionId& id, |
1624 | 55.9k | TransactionStatusCallback callback) { |
1625 | 55.9k | return impl_->Abort(id, std::move(callback)); |
1626 | 55.9k | } |
1627 | | |
1628 | | void TransactionParticipant::Handle( |
1629 | 1.36M | std::unique_ptr<tablet::UpdateTxnOperation> request, int64_t term) { |
1630 | 1.36M | impl_->Handle(std::move(request), term); |
1631 | 1.36M | } |
1632 | | |
1633 | 0 | void TransactionParticipant::Cleanup(TransactionIdSet&& set) { |
1634 | 0 | return impl_->Cleanup(std::move(set), this); |
1635 | 0 | } |
1636 | | |
1637 | 1.30M | Status TransactionParticipant::ProcessReplicated(const ReplicatedData& data) { |
1638 | 1.30M | return impl_->ProcessReplicated(data); |
1639 | 1.30M | } |
1640 | | |
1641 | 369k | Status TransactionParticipant::CheckAborted(const TransactionId& id) { |
1642 | 369k | return impl_->CheckAborted(id); |
1643 | 369k | } |
1644 | | |
1645 | | void TransactionParticipant::FillPriorities( |
1646 | 146k | boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) { |
1647 | 146k | return impl_->FillPriorities(inout); |
1648 | 146k | } |
1649 | | |
1650 | | void TransactionParticipant::SetDB( |
1651 | | const docdb::DocDB& db, const docdb::KeyBounds* key_bounds, |
1652 | 145k | RWOperationCounter* pending_op_counter) { |
1653 | 145k | impl_->SetDB(db, key_bounds, pending_op_counter); |
1654 | 145k | } |
1655 | | |
1656 | | void TransactionParticipant::GetStatus( |
1657 | | const TransactionId& transaction_id, |
1658 | | size_t required_num_replicated_batches, |
1659 | | int64_t term, |
1660 | | tserver::GetTransactionStatusAtParticipantResponsePB* response, |
1661 | 0 | rpc::RpcContext* context) { |
1662 | 0 | impl_->GetStatus(transaction_id, required_num_replicated_batches, term, response, context); |
1663 | 0 | } |
1664 | | |
1665 | 48 | TransactionParticipantContext* TransactionParticipant::context() const { |
1666 | 48 | return impl_->participant_context(); |
1667 | 48 | } |
1668 | | |
1669 | 12.0M | HybridTime TransactionParticipant::MinRunningHybridTime() const { |
1670 | 12.0M | return impl_->MinRunningHybridTime(); |
1671 | 12.0M | } |
1672 | | |
1673 | 3.53k | void TransactionParticipant::WaitMinRunningHybridTime(HybridTime ht) { |
1674 | 3.53k | impl_->WaitMinRunningHybridTime(ht); |
1675 | 3.53k | } |
1676 | | |
1677 | 15 | Status TransactionParticipant::ResolveIntents(HybridTime resolve_at, CoarseTimePoint deadline) { |
1678 | 15 | return impl_->ResolveIntents(resolve_at, deadline); |
1679 | 15 | } |
1680 | | |
1681 | 0 | size_t TransactionParticipant::TEST_GetNumRunningTransactions() const { |
1682 | 0 | return impl_->TEST_GetNumRunningTransactions(); |
1683 | 0 | } |
1684 | | |
1685 | | OneWayBitmap TransactionParticipant::TEST_TransactionReplicatedBatches( |
1686 | 0 | const TransactionId& id) const { |
1687 | 0 | return impl_->TEST_TransactionReplicatedBatches(id); |
1688 | 0 | } |
1689 | | |
1690 | 0 | std::string TransactionParticipant::ReplicatedData::ToString() const { |
1691 | 0 | return YB_STRUCT_TO_STRING(leader_term, state, op_id, hybrid_time, already_applied_to_regular_db); |
1692 | 0 | } |
1693 | | |
1694 | 44.7k | void TransactionParticipant::StartShutdown() { |
1695 | 44.7k | impl_->StartShutdown(); |
1696 | 44.7k | } |
1697 | | |
1698 | 44.7k | void TransactionParticipant::CompleteShutdown() { |
1699 | 44.7k | impl_->CompleteShutdown(); |
1700 | 44.7k | } |
1701 | | |
1702 | 0 | std::string TransactionParticipant::DumpTransactions() const { |
1703 | 0 | return impl_->DumpTransactions(); |
1704 | 0 | } |
1705 | | |
1706 | | Status TransactionParticipant::StopActiveTxnsPriorTo( |
1707 | 36.4k | HybridTime cutoff, CoarseTimePoint deadline, TransactionId* exclude_txn_id) { |
1708 | 36.4k | return impl_->StopActiveTxnsPriorTo(cutoff, deadline, exclude_txn_id); |
1709 | 36.4k | } |
1710 | | |
1711 | | Result<HybridTime> TransactionParticipant::WaitForSafeTime( |
1712 | 26.6k | HybridTime safe_time, CoarseTimePoint deadline) { |
1713 | 26.6k | return impl_->WaitForSafeTime(safe_time, deadline); |
1714 | 26.6k | } |
1715 | | |
1716 | 0 | void TransactionParticipant::IgnoreAllTransactionsStartedBefore(HybridTime limit) { |
1717 | 0 | impl_->IgnoreAllTransactionsStartedBefore(limit); |
1718 | 0 | } |
1719 | | |
1720 | 0 | const TabletId& TransactionParticipant::tablet_id() const { |
1721 | 0 | return impl_->participant_context()->tablet_id(); |
1722 | 0 | } |
1723 | | |
1724 | 58.6k | std::string TransactionParticipantContext::LogPrefix() const { |
1725 | 58.6k | return consensus::MakeTabletLogPrefix(tablet_id(), permanent_uuid()); |
1726 | 58.6k | } |
1727 | | |
1728 | 1.19M | HybridTime TransactionParticipantContext::Now() { |
1729 | 1.19M | return clock_ptr()->Now(); |
1730 | 1.19M | } |
1731 | | |
1732 | | } // namespace tablet |
1733 | | } // namespace yb |