/Users/deen/code/yugabyte-db/src/yb/client/transaction.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/client/transaction.h" |
17 | | |
18 | | #include <unordered_set> |
19 | | |
20 | | #include "yb/client/batcher.h" |
21 | | #include "yb/client/client.h" |
22 | | #include "yb/client/in_flight_op.h" |
23 | | #include "yb/client/meta_cache.h" |
24 | | #include "yb/client/transaction_cleanup.h" |
25 | | #include "yb/client/transaction_manager.h" |
26 | | #include "yb/client/transaction_rpc.h" |
27 | | #include "yb/client/yb_op.h" |
28 | | |
29 | | #include "yb/common/common.pb.h" |
30 | | #include "yb/common/transaction.h" |
31 | | #include "yb/common/transaction_error.h" |
32 | | #include "yb/common/ybc_util.h" |
33 | | |
34 | | #include "yb/rpc/messenger.h" |
35 | | #include "yb/rpc/rpc.h" |
36 | | #include "yb/rpc/scheduler.h" |
37 | | |
38 | | #include "yb/tserver/tserver_service.pb.h" |
39 | | |
40 | | #include "yb/util/countdown_latch.h" |
41 | | #include "yb/util/flag_tags.h" |
42 | | #include "yb/util/format.h" |
43 | | #include "yb/util/logging.h" |
44 | | #include "yb/util/random_util.h" |
45 | | #include "yb/util/result.h" |
46 | | #include "yb/util/scope_exit.h" |
47 | | #include "yb/util/status_format.h" |
48 | | #include "yb/util/status_log.h" |
49 | | #include "yb/util/strongly_typed_bool.h" |
50 | | #include "yb/util/trace.h" |
51 | | #include "yb/util/tsan_util.h" |
52 | | #include "yb/util/unique_lock.h" |
53 | | |
54 | | using namespace std::literals; |
55 | | using namespace std::placeholders; |
56 | | |
57 | | DEFINE_int32(txn_print_trace_every_n, 0, |
58 | | "Controls the rate at which txn traces are printed. Setting this to 0 " |
59 | | "disables printing the collected traces."); |
60 | | TAG_FLAG(txn_print_trace_every_n, advanced); |
61 | | TAG_FLAG(txn_print_trace_every_n, runtime); |
62 | | |
63 | | DEFINE_int32(txn_slow_op_threshold_ms, 0, |
64 | | "Controls the rate at which txn traces are printed. Setting this to 0 " |
65 | | "disables printing the collected traces."); |
66 | | TAG_FLAG(txn_slow_op_threshold_ms, advanced); |
67 | | TAG_FLAG(txn_slow_op_threshold_ms, runtime); |
68 | | |
69 | | DEFINE_uint64(transaction_heartbeat_usec, 500000 * yb::kTimeMultiplier, |
70 | | "Interval of transaction heartbeat in usec."); |
71 | | DEFINE_bool(transaction_disable_heartbeat_in_tests, false, "Disable heartbeat during test."); |
72 | | DECLARE_uint64(max_clock_skew_usec); |
73 | | |
74 | | DEFINE_test_flag(int32, transaction_inject_flushed_delay_ms, 0, |
75 | | "Inject delay before processing flushed operations by transaction."); |
76 | | |
77 | | DEFINE_test_flag(bool, disable_proactive_txn_cleanup_on_abort, false, |
78 | | "Disable cleanup of intents in abort path."); |
79 | | |
80 | | DECLARE_string(placement_cloud); |
81 | | DECLARE_string(placement_region); |
82 | | |
83 | | namespace yb { |
84 | | namespace client { |
85 | | |
86 | | namespace { |
87 | | |
88 | | YB_STRONGLY_TYPED_BOOL(Child); |
89 | | YB_DEFINE_ENUM(TransactionState, (kRunning)(kAborted)(kCommitted)(kReleased)(kSealed)); |
90 | | |
91 | | } // namespace |
92 | | |
93 | 14.6k | Result<ChildTransactionData> ChildTransactionData::FromPB(const ChildTransactionDataPB& data) { |
94 | 14.6k | ChildTransactionData result; |
95 | 14.6k | auto metadata = TransactionMetadata::FromPB(data.metadata()); |
96 | 14.6k | RETURN_NOT_OK(metadata); |
97 | 14.6k | result.metadata = std::move(*metadata); |
98 | 14.6k | result.read_time = ReadHybridTime::FromReadTimePB(data); |
99 | 0 | for (const auto& entry : data.local_limits()) { |
100 | 0 | result.local_limits.emplace(entry.first, HybridTime(entry.second)); |
101 | 0 | } |
102 | 14.6k | return result; |
103 | 14.6k | } |
104 | | |
105 | | YB_DEFINE_ENUM(MetadataState, (kMissing)(kMaybePresent)(kPresent)); |
106 | | |
107 | 48.8k | void YBSubTransaction::SetActiveSubTransaction(SubTransactionId id) { |
108 | 48.8k | sub_txn_.subtransaction_id = id; |
109 | 48.8k | highest_subtransaction_id_ = std::max(highest_subtransaction_id_, id); |
110 | 48.8k | } |
111 | | |
112 | 23.5k | CHECKED_STATUS YBSubTransaction::RollbackSubTransaction(SubTransactionId id) { |
113 | | // We should abort the range [id, sub_txn_.highest_subtransaction_id]. It's possible that we |
114 | | // have created and released savepoints, such that there have been writes with a |
115 | | // subtransaction_id greater than sub_txn_.subtransaction_id, and those should be aborted as |
116 | | // well. |
117 | 23.5k | SCHECK_GE( |
118 | 23.5k | highest_subtransaction_id_, id, |
119 | 23.5k | InternalError, |
120 | 23.5k | "Attempted to rollback to non-existent savepoint."); |
121 | 23.5k | return sub_txn_.aborted.SetRange(id, highest_subtransaction_id_); |
122 | 23.5k | } |
123 | | |
124 | 63.5k | const SubTransactionMetadata& YBSubTransaction::get() { return sub_txn_; } |
125 | | |
126 | | class YBTransaction::Impl final : public internal::TxnBatcherIf { |
127 | | public: |
128 | | Impl(TransactionManager* manager, YBTransaction* transaction, TransactionLocality locality) |
129 | | : trace_(new Trace), |
130 | | start_(CoarseMonoClock::Now()), |
131 | | manager_(manager), |
132 | | transaction_(transaction), |
133 | | read_point_(manager->clock()), |
134 | 243k | child_(Child::kFalse) { |
135 | 243k | metadata_.priority = RandomUniformInt<uint64_t>(); |
136 | 243k | metadata_.locality = locality; |
137 | 243k | CompleteConstruction(); |
138 | 0 | VLOG_WITH_PREFIX(2) << "Started, metadata: " << metadata_; |
139 | 243k | } |
140 | | |
141 | | Impl(TransactionManager* manager, YBTransaction* transaction, const TransactionMetadata& metadata) |
142 | | : trace_(new Trace), |
143 | | start_(CoarseMonoClock::Now()), |
144 | | manager_(manager), |
145 | | transaction_(transaction), |
146 | | metadata_(metadata), |
147 | | read_point_(manager->clock()), |
148 | 0 | child_(Child::kFalse) { |
149 | 0 | CompleteConstruction(); |
150 | 0 | VLOG_WITH_PREFIX(2) << "Taken, metadata: " << metadata_; |
151 | 0 | } |
152 | | |
153 | | Impl(TransactionManager* manager, YBTransaction* transaction, ChildTransactionData data) |
154 | | : trace_(new Trace), |
155 | | start_(CoarseMonoClock::Now()), |
156 | | manager_(manager), |
157 | | transaction_(transaction), |
158 | | read_point_(manager->clock()), |
159 | | child_(Child::kTrue), |
160 | 14.6k | child_had_read_time_(data.read_time) { |
161 | | // For serializable isolation we use read intents, so could always read most recent |
162 | | // version of DB. |
163 | | // Otherwise there is possible case when we miss value change that happened after transaction |
164 | | // start. |
165 | 14.6k | if (data.metadata.isolation == IsolationLevel::SNAPSHOT_ISOLATION) { |
166 | 14.6k | read_point_.SetReadTime(std::move(data.read_time), std::move(data.local_limits)); |
167 | 14.6k | } |
168 | 14.6k | metadata_ = std::move(data.metadata); |
169 | 14.6k | CompleteConstruction(); |
170 | 2 | VLOG_WITH_PREFIX(2) << "Started child, metadata: " << metadata_; |
171 | 14.6k | ready_ = true; |
172 | 14.6k | } |
173 | | |
174 | 253k | ~Impl() { |
175 | 253k | manager_->rpcs().Abort({&heartbeat_handle_, &commit_handle_, &abort_handle_}); |
176 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !waiters_.empty()) << "Non empty waiters"; |
177 | 253k | const auto threshold = GetAtomicFlag(&FLAGS_txn_slow_op_threshold_ms); |
178 | 253k | const auto now = CoarseMonoClock::Now(); |
179 | 253k | if (trace_->must_print() |
180 | 253k | || (threshold > 0 && ToMilliseconds(now - start_) > threshold)) { |
181 | 0 | LOG(INFO) << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n" |
182 | 0 | << trace_->DumpToString(true); |
183 | 253k | } else { |
184 | 0 | YB_LOG_IF_EVERY_N(INFO, FLAGS_txn_print_trace_every_n > 0, FLAGS_txn_print_trace_every_n) |
185 | 0 | << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n" |
186 | 0 | << trace_->DumpToString(true); |
187 | 253k | } |
188 | 253k | } |
189 | | |
190 | 96.3k | void SetPriority(uint64_t priority) { |
191 | 96.3k | metadata_.priority = priority; |
192 | 96.3k | } |
193 | | |
194 | 20.2k | uint64_t GetPriority() const { |
195 | 20.2k | return metadata_.priority; |
196 | 20.2k | } |
197 | | |
198 | 0 | YBTransactionPtr CreateSimilarTransaction() { |
199 | 0 | return std::make_shared<YBTransaction>(manager_); |
200 | 0 | } |
201 | | |
202 | 238k | CHECKED_STATUS Init(IsolationLevel isolation, const ReadHybridTime& read_time) { |
203 | 238k | TRACE_TO(trace_, __func__); |
204 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", " |
205 | 18.4E | << read_time << ")"; |
206 | 238k | if (read_point_.GetReadTime().read.is_valid()) { |
207 | 0 | return STATUS_FORMAT(IllegalState, "Read point already specified: $0", |
208 | 0 | read_point_.GetReadTime()); |
209 | 0 | } |
210 | | |
211 | 238k | if (read_time.read.is_valid()) { |
212 | 0 | read_point_.SetReadTime(read_time, ConsistentReadPoint::HybridTimeMap()); |
213 | 0 | } |
214 | 238k | CompleteInit(isolation); |
215 | 238k | return Status::OK(); |
216 | 238k | } |
217 | | |
218 | 628 | void InitWithReadPoint(IsolationLevel isolation, ConsistentReadPoint&& read_point) { |
219 | 0 | VLOG_WITH_PREFIX(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", " |
220 | 0 | << read_point.GetReadTime() << ")"; |
221 | | |
222 | 628 | read_point_.MoveFrom(&read_point); |
223 | 628 | CompleteInit(isolation); |
224 | 628 | } |
225 | | |
226 | 84.6k | const IsolationLevel isolation() const { |
227 | 84.6k | return metadata_.isolation; |
228 | 84.6k | } |
229 | | |
230 | | // This transaction is a restarted transaction, so we set it up with data from original one. |
231 | 0 | CHECKED_STATUS FillRestartedTransaction(Impl* other) EXCLUDES(mutex_) { |
232 | 0 | VLOG_WITH_PREFIX(1) << "Setup restart to " << other->ToString(); |
233 | 0 | auto transaction = transaction_->shared_from_this(); |
234 | 0 | TRACE_TO(trace_, __func__); |
235 | 0 | { |
236 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
237 | 0 | auto state = state_.load(std::memory_order_acquire); |
238 | 0 | if (state != TransactionState::kRunning) { |
239 | 0 | return STATUS_FORMAT( |
240 | 0 | IllegalState, "Restart of completed transaction $0: $1", |
241 | 0 | metadata_.transaction_id, state); |
242 | 0 | } |
243 | 0 | if (!read_point_.IsRestartRequired()) { |
244 | 0 | return STATUS_FORMAT( |
245 | 0 | IllegalState, "Restart of transaction that does not require restart: $0", |
246 | 0 | metadata_.transaction_id); |
247 | 0 | } |
248 | 0 | other->read_point_.MoveFrom(&read_point_); |
249 | 0 | other->read_point_.Restart(); |
250 | 0 | other->metadata_.isolation = metadata_.isolation; |
251 | | // TODO(Piyush): Do we need the below? If yes, prove with a test case and add it. |
252 | | // other->metadata_.priority = metadata_.priority; |
253 | 0 | if (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) { |
254 | 0 | other->metadata_.start_time = other->read_point_.GetReadTime().read; |
255 | 0 | } else { |
256 | 0 | other->metadata_.start_time = other->read_point_.Now(); |
257 | 0 | } |
258 | 0 | state_.store(TransactionState::kAborted, std::memory_order_release); |
259 | 0 | } |
260 | 0 | DoAbort(TransactionRpcDeadline(), transaction); |
261 | |
|
262 | 0 | return Status::OK(); |
263 | 0 | } |
264 | | |
265 | 782k | Trace* trace() { |
266 | 782k | return trace_.get(); |
267 | 782k | } |
268 | | |
269 | | CHECKED_STATUS CheckTransactionLocality(internal::InFlightOpsGroupsWithMetadata* ops_info) |
270 | 568k | REQUIRES(mutex_) { |
271 | 568k | if (metadata_.locality != TransactionLocality::LOCAL) { |
272 | 568k | return Status::OK(); |
273 | 568k | } |
274 | 18.4E | for (auto& group : ops_info->groups) { |
275 | 0 | auto& first_op = *group.begin; |
276 | 0 | const auto& tablet = first_op.tablet; |
277 | 0 | const auto& tablet_id = tablet->tablet_id(); |
278 | |
|
279 | 0 | auto tservers = tablet->GetRemoteTabletServers(internal::IncludeFailedReplicas::kTrue); |
280 | 0 | for (const auto &tserver : tservers) { |
281 | 0 | auto pb = tserver->cloud_info(); |
282 | 0 | if (pb.placement_cloud() != FLAGS_placement_cloud || |
283 | 0 | pb.placement_region() != FLAGS_placement_region) { |
284 | 0 | VLOG_WITH_PREFIX(4) << "Aborting (accessing nonlocal tablet in local transaction)"; |
285 | 0 | return STATUS_FORMAT( |
286 | 0 | IllegalState, "Nonlocal tablet accessed in local transaction: tablet $0", tablet_id); |
287 | 0 | } |
288 | 0 | } |
289 | 0 | } |
290 | 18.4E | return Status::OK(); |
291 | 18.4E | } |
292 | | |
293 | | bool Prepare(internal::InFlightOpsGroupsWithMetadata* ops_info, |
294 | | ForceConsistentRead force_consistent_read, |
295 | | CoarseTimePoint deadline, |
296 | | Initial initial, |
297 | 808k | Waiter waiter) override EXCLUDES(mutex_) { |
298 | 42 | VLOG_WITH_PREFIX(2) << "Prepare(" << force_consistent_read << ", " << initial << ", " |
299 | 42 | << AsString(ops_info->groups) << ")"; |
300 | 808k | TRACE_TO(trace_, "Preparing $0 ops", AsString(ops_info->groups.size())); |
301 | 808k | VTRACE_TO(2, trace_, "Preparing $0 ops", AsString(ops_info->groups)); |
302 | | |
303 | 808k | { |
304 | 808k | UNIQUE_LOCK(lock, mutex_); |
305 | 808k | const bool defer = !ready_; |
306 | | |
307 | 808k | if (!defer || initial) { |
308 | 568k | Status status = CheckTransactionLocality(ops_info); |
309 | 568k | if (!status.ok()) { |
310 | 0 | VLOG_WITH_PREFIX(2) << "Prepare, rejected: " << status; |
311 | 0 | bool abort = false; |
312 | 0 | auto state = state_.load(std::memory_order_acquire); |
313 | 0 | if (state == TransactionState::kRunning) { |
314 | | // State will be changed to aborted in SetErrorUnlocked |
315 | 0 | abort = true; |
316 | 0 | } |
317 | 0 | SetErrorUnlocked(status, "Check transaction locality"); |
318 | 0 | lock.unlock(); |
319 | 0 | if (waiter) { |
320 | 0 | waiter(status); |
321 | 0 | } |
322 | 0 | if (abort) { |
323 | 0 | Abort(TransactionRpcDeadline()); |
324 | 0 | } |
325 | 0 | return false; |
326 | 0 | } |
327 | 782k | for (auto& group : ops_info->groups) { |
328 | 782k | auto& first_op = *group.begin; |
329 | 782k | const auto should_add_intents = first_op.yb_op->should_add_intents(metadata_.isolation); |
330 | 782k | const auto& tablet = first_op.tablet; |
331 | 782k | const auto& tablet_id = tablet->tablet_id(); |
332 | | |
333 | 782k | bool has_metadata; |
334 | 782k | if (initial && should_add_intents) { |
335 | 554k | auto& tablet_state = tablets_[tablet_id]; |
336 | | // TODO(dtxn) Handle skipped writes, i.e. writes that did not write anything (#3220) |
337 | 554k | first_op.batch_idx = tablet_state.num_batches; |
338 | 554k | ++tablet_state.num_batches; |
339 | 554k | has_metadata = tablet_state.has_metadata; |
340 | 227k | } else { |
341 | 227k | const auto it = tablets_.find(tablet_id); |
342 | 227k | has_metadata = it != tablets_.end() && it->second.has_metadata; |
343 | 227k | } |
344 | 782k | group.need_metadata = !has_metadata; |
345 | 782k | } |
346 | 568k | } |
347 | | |
348 | 808k | if (defer) { |
349 | 239k | if (waiter) { |
350 | 239k | waiters_.push_back(std::move(waiter)); |
351 | 239k | } |
352 | 239k | lock.unlock(); |
353 | 11 | VLOG_WITH_PREFIX(2) << "Prepare, rejected (not ready, requesting status tablet)"; |
354 | 239k | RequestStatusTablet(deadline); |
355 | 239k | return false; |
356 | 239k | } |
357 | | |
358 | | // For serializable isolation we never choose read time, since it always reads latest |
359 | | // snapshot. |
360 | | // For snapshot isolation, if read time was not yet picked, we have to choose it now, if there |
361 | | // multiple tablets that will process first request. |
362 | 568k | SetReadTimeIfNeeded(ops_info->groups.size() > 1 || force_consistent_read); |
363 | 568k | } |
364 | | |
365 | 568k | ops_info->metadata = { |
366 | 568k | .transaction = metadata_, |
367 | 568k | .subtransaction = subtransaction_.active() |
368 | 63.4k | ? boost::make_optional(subtransaction_.get()) |
369 | 505k | : boost::none, |
370 | 568k | }; |
371 | | |
372 | 568k | return true; |
373 | 568k | } |
374 | | |
375 | 568k | void ExpectOperations(size_t count) EXCLUDES(mutex_) override { |
376 | 568k | std::lock_guard<std::mutex> lock(mutex_); |
377 | 568k | running_requests_ += count; |
378 | 568k | } |
379 | | |
380 | | void Flushed( |
381 | | const internal::InFlightOps& ops, const ReadHybridTime& used_read_time, |
382 | 692k | const Status& status) EXCLUDES(mutex_) override { |
383 | 692k | TRACE_TO(trace_, "Flushed $0 ops. with Status $1", ops.size(), status.ToString()); |
384 | 36 | VLOG_WITH_PREFIX(5) |
385 | 36 | << "Flushed: " << yb::ToString(ops) << ", used_read_time: " << used_read_time |
386 | 36 | << ", status: " << status; |
387 | 692k | if (FLAGS_TEST_transaction_inject_flushed_delay_ms > 0) { |
388 | 0 | std::this_thread::sleep_for(FLAGS_TEST_transaction_inject_flushed_delay_ms * 1ms); |
389 | 0 | } |
390 | | |
391 | 692k | boost::optional<Status> notify_commit_status; |
392 | 692k | bool abort = false; |
393 | | |
394 | 692k | CommitCallback commit_callback; |
395 | 692k | { |
396 | 692k | std::lock_guard<std::mutex> lock(mutex_); |
397 | 692k | running_requests_ -= ops.size(); |
398 | | |
399 | 692k | if (status.ok()) { |
400 | 602k | if (used_read_time && metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) { |
401 | 54.8k | const bool read_point_already_set = static_cast<bool>(read_point_.GetReadTime()); |
402 | 54.8k | #ifndef NDEBUG |
403 | 54.8k | if (read_point_already_set) { |
404 | | // Display details of operations before crashing in debug mode. |
405 | 0 | int op_idx = 1; |
406 | 0 | for (const auto& op : ops) { |
407 | 0 | LOG(ERROR) << "Operation " << op_idx << ": " << op.ToString(); |
408 | 0 | op_idx++; |
409 | 0 | } |
410 | 0 | } |
411 | 54.8k | #endif |
412 | 0 | LOG_IF_WITH_PREFIX(DFATAL, read_point_already_set) |
413 | 0 | << "Read time already picked (" << read_point_.GetReadTime() |
414 | 0 | << ", but server replied with used read time: " << used_read_time; |
415 | 54.8k | read_point_.SetReadTime(used_read_time, ConsistentReadPoint::HybridTimeMap()); |
416 | 54.8k | } |
417 | 602k | const std::string* prev_tablet_id = nullptr; |
418 | 3.20M | for (const auto& op : ops) { |
419 | 3.20M | if (op.yb_op->applied() && op.yb_op->should_add_intents(metadata_.isolation)) { |
420 | 2.97M | const std::string& tablet_id = op.tablet->tablet_id(); |
421 | 2.97M | if (prev_tablet_id == nullptr || tablet_id != *prev_tablet_id) { |
422 | 374k | prev_tablet_id = &tablet_id; |
423 | 374k | tablets_[tablet_id].has_metadata = true; |
424 | 374k | } |
425 | 2.97M | } |
426 | 3.20M | } |
427 | 89.6k | } else { |
428 | 89.6k | const TransactionError txn_err(status); |
429 | | // We don't abort the txn in case of a kSkipLocking error to make further progress. |
430 | | // READ COMMITTED isolation retries errors of kConflict and kReadRestart by restarting |
431 | | // statements instead of the whole txn and hence should avoid aborting the txn in this case |
432 | | // too. |
433 | 89.6k | bool avoid_abort = |
434 | 89.6k | (txn_err.value() == TransactionErrorCode::kSkipLocking) || |
435 | 89.8k | (metadata_.isolation == IsolationLevel::READ_COMMITTED && |
436 | 23.5k | (txn_err.value() == TransactionErrorCode::kReadRestartRequired || |
437 | 23.5k | txn_err.value() == TransactionErrorCode::kConflict)); |
438 | 89.6k | if (!avoid_abort) { |
439 | 66.3k | auto state = state_.load(std::memory_order_acquire); |
440 | 18.4E | VLOG_WITH_PREFIX(4) << "Abort desired, state: " << AsString(state); |
441 | 66.3k | if (state == TransactionState::kRunning) { |
442 | 45.1k | abort = true; |
443 | | // State will be changed to aborted in SetError |
444 | 45.1k | } |
445 | | |
446 | 66.3k | SetErrorUnlocked(status, "Flush"); |
447 | 66.3k | } |
448 | 89.6k | } |
449 | | |
450 | 692k | if (running_requests_ == 0 && commit_replicated_) { |
451 | 0 | notify_commit_status = status_; |
452 | 0 | commit_callback = std::move(commit_callback_); |
453 | 0 | } |
454 | 692k | } |
455 | | |
456 | 692k | if (notify_commit_status) { |
457 | 0 | VLOG_WITH_PREFIX(4) << "Sealing done: " << *notify_commit_status; |
458 | 0 | commit_callback(*notify_commit_status); |
459 | 0 | } |
460 | | |
461 | 692k | if (abort && !child_) { |
462 | 45.0k | DoAbort(TransactionRpcDeadline(), transaction_->shared_from_this()); |
463 | 45.0k | } |
464 | 692k | } |
465 | | |
466 | | void Commit(CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) |
467 | 157k | EXCLUDES(mutex_) { |
468 | 157k | auto transaction = transaction_->shared_from_this(); |
469 | 157k | TRACE_TO(trace_, __func__); |
470 | 157k | { |
471 | 157k | UNIQUE_LOCK(lock, mutex_); |
472 | 157k | auto status = CheckCouldCommitUnlocked(seal_only); |
473 | 157k | if (!status.ok()) { |
474 | 7 | lock.unlock(); |
475 | 7 | callback(status); |
476 | 7 | return; |
477 | 7 | } |
478 | 157k | state_.store(seal_only ? TransactionState::kSealed : TransactionState::kCommitted, |
479 | 157k | std::memory_order_release); |
480 | 157k | commit_callback_ = std::move(callback); |
481 | 157k | if (!ready_) { |
482 | | // If we have not written any intents and do not even have a transaction status tablet, |
483 | | // just report the transaction as committed. |
484 | | // |
485 | | // See https://github.com/yugabyte/yugabyte-db/issues/3105 for details -- we might be able |
486 | | // to remove this special case if it turns out there is a bug elsewhere. |
487 | 5 | if (tablets_.empty() && running_requests_ == 0) { |
488 | 0 | VLOG_WITH_PREFIX(4) << "Committed empty transaction"; |
489 | 5 | auto commit_callback = std::move(commit_callback_); |
490 | 5 | lock.unlock(); |
491 | 5 | commit_callback(Status::OK()); |
492 | 5 | return; |
493 | 5 | } |
494 | | |
495 | 0 | waiters_.emplace_back(std::bind( |
496 | 0 | &Impl::DoCommit, this, deadline, seal_only, _1, transaction)); |
497 | 0 | lock.unlock(); |
498 | 0 | RequestStatusTablet(deadline); |
499 | 0 | return; |
500 | 0 | } |
501 | 157k | } |
502 | 157k | DoCommit(deadline, seal_only, Status::OK(), transaction); |
503 | 157k | } |
504 | | |
505 | 82.9k | void Abort(CoarseTimePoint deadline) EXCLUDES(mutex_) { |
506 | 82.9k | auto transaction = transaction_->shared_from_this(); |
507 | | |
508 | 18.4E | VLOG_WITH_PREFIX(2) << "Abort"; |
509 | 82.9k | TRACE_TO(trace_, __func__); |
510 | 82.9k | { |
511 | 82.9k | UNIQUE_LOCK(lock, mutex_); |
512 | 82.9k | auto state = state_.load(std::memory_order_acquire); |
513 | 82.9k | if (state != TransactionState::kRunning) { |
514 | 78.9k | if (state != TransactionState::kAborted) { |
515 | 0 | LOG_WITH_PREFIX(DFATAL) |
516 | 0 | << "Abort of committed transaction: " << AsString(state); |
517 | 78.9k | } else { |
518 | 18.4E | VLOG_WITH_PREFIX(2) << "Already aborted"; |
519 | 78.9k | } |
520 | 78.9k | return; |
521 | 78.9k | } |
522 | 3.98k | if (child_) { |
523 | 0 | LOG_WITH_PREFIX(DFATAL) << "Abort of child transaction"; |
524 | 0 | return; |
525 | 0 | } |
526 | 3.98k | state_.store(TransactionState::kAborted, std::memory_order_release); |
527 | 3.98k | if (!ready_) { |
528 | 92 | std::vector<Waiter> waiters; |
529 | 92 | waiters_.swap(waiters); |
530 | 92 | lock.unlock(); |
531 | 92 | const auto aborted_status = STATUS(Aborted, "Transaction aborted"); |
532 | 40 | for(const auto& waiter : waiters) { |
533 | 40 | waiter(aborted_status); |
534 | 40 | } |
535 | 0 | VLOG_WITH_PREFIX(2) << "Aborted transaction not yet ready"; |
536 | 92 | return; |
537 | 92 | } |
538 | 3.88k | } |
539 | 3.88k | DoAbort(deadline, transaction); |
540 | 3.88k | } |
541 | | |
542 | 298k | bool IsRestartRequired() const { |
543 | 298k | return read_point_.IsRestartRequired(); |
544 | 298k | } |
545 | | |
546 | 5.82k | std::shared_future<Result<TransactionMetadata>> GetMetadata() EXCLUDES(mutex_) { |
547 | 5.82k | UNIQUE_LOCK(lock, mutex_); |
548 | 5.82k | if (metadata_future_.valid()) { |
549 | 0 | return metadata_future_; |
550 | 0 | } |
551 | 5.82k | metadata_future_ = std::shared_future<Result<TransactionMetadata>>( |
552 | 5.82k | metadata_promise_.get_future()); |
553 | 5.82k | if (!ready_) { |
554 | 424 | auto transaction = transaction_->shared_from_this(); |
555 | 424 | waiters_.push_back([this, transaction](const Status& status) { |
556 | 424 | WARN_NOT_OK(status, "Transaction request failed"); |
557 | 424 | if (status.ok()) { |
558 | 424 | metadata_promise_.set_value(metadata_); |
559 | 0 | } else { |
560 | 0 | metadata_promise_.set_value(status); |
561 | 0 | } |
562 | 424 | }); |
563 | 424 | lock.unlock(); |
564 | 424 | RequestStatusTablet(TransactionRpcDeadline()); |
565 | 424 | lock.lock(); |
566 | 424 | return metadata_future_; |
567 | 424 | } |
568 | | |
569 | 5.40k | metadata_promise_.set_value(metadata_); |
570 | 5.40k | return metadata_future_; |
571 | 5.40k | } |
572 | | |
573 | | void PrepareChild( |
574 | | ForceConsistentRead force_consistent_read, CoarseTimePoint deadline, |
575 | 82.5k | PrepareChildCallback callback) { |
576 | 82.5k | auto transaction = transaction_->shared_from_this(); |
577 | 82.5k | TRACE_TO(trace_, __func__); |
578 | 82.5k | UNIQUE_LOCK(lock, mutex_); |
579 | 82.5k | auto status = CheckRunningUnlocked(); |
580 | 82.5k | if (!status.ok()) { |
581 | 0 | lock.unlock(); |
582 | 0 | callback(status); |
583 | 0 | return; |
584 | 0 | } |
585 | 82.5k | if (IsRestartRequired()) { |
586 | 0 | lock.unlock(); |
587 | 0 | callback(STATUS(IllegalState, "Restart required")); |
588 | 0 | return; |
589 | 0 | } |
590 | | |
591 | 82.5k | SetReadTimeIfNeeded(force_consistent_read); |
592 | | |
593 | 82.5k | if (!ready_) { |
594 | 3.49k | waiters_.emplace_back(std::bind( |
595 | 3.49k | &Impl::DoPrepareChild, this, _1, transaction, std::move(callback))); |
596 | 3.49k | lock.unlock(); |
597 | 3.49k | RequestStatusTablet(deadline); |
598 | 3.49k | return; |
599 | 3.49k | } |
600 | | |
601 | 79.0k | ChildTransactionDataPB child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction); |
602 | 79.0k | lock.unlock(); |
603 | 79.0k | callback(child_txn_data_pb); |
604 | 79.0k | } |
605 | | |
606 | 14.5k | Result<ChildTransactionResultPB> FinishChild() { |
607 | 14.5k | TRACE_TO(trace_, __func__); |
608 | 14.5k | UNIQUE_LOCK(lock, mutex_); |
609 | 14.5k | RETURN_NOT_OK(CheckRunningUnlocked()); |
610 | 14.5k | if (!child_) { |
611 | 0 | return STATUS(IllegalState, "Finish child of non child transaction"); |
612 | 0 | } |
613 | 14.5k | state_.store(TransactionState::kCommitted, std::memory_order_release); |
614 | 14.5k | ChildTransactionResultPB result; |
615 | 14.5k | auto& tablets = *result.mutable_tablets(); |
616 | 14.5k | tablets.Reserve(narrow_cast<int>(tablets_.size())); |
617 | 35.1k | for (const auto& tablet : tablets_) { |
618 | 35.1k | auto& out = *tablets.Add(); |
619 | 35.1k | out.set_tablet_id(tablet.first); |
620 | 35.1k | out.set_num_batches(tablet.second.num_batches); |
621 | 35.1k | out.set_metadata_state( |
622 | 34.8k | tablet.second.has_metadata ? InvolvedTabletMetadataState::EXIST |
623 | 332 | : InvolvedTabletMetadataState::MISSING); |
624 | 35.1k | } |
625 | 14.5k | read_point_.FinishChildTransactionResult(HadReadTime(child_had_read_time_), &result); |
626 | 14.5k | return result; |
627 | 14.5k | } |
628 | | |
629 | 14.1k | Status ApplyChildResult(const ChildTransactionResultPB& result) EXCLUDES(mutex_) { |
630 | 14.1k | TRACE_TO(trace_, __func__); |
631 | 14.1k | std::vector<std::string> cleanup_tablet_ids; |
632 | 14.1k | auto se = ScopeExit([this, &cleanup_tablet_ids] { |
633 | 14.1k | if (cleanup_tablet_ids.empty()) { |
634 | 14.1k | return; |
635 | 14.1k | } |
636 | 0 | CleanupTransaction( |
637 | 0 | manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse, |
638 | 0 | CleanupType::kImmediate, cleanup_tablet_ids); |
639 | 0 | }); |
640 | 14.1k | UNIQUE_LOCK(lock, mutex_); |
641 | 14.1k | if (state_.load(std::memory_order_acquire) == TransactionState::kAborted) { |
642 | 0 | cleanup_tablet_ids.reserve(result.tablets().size()); |
643 | 0 | for (const auto& tablet : result.tablets()) { |
644 | 0 | cleanup_tablet_ids.push_back(tablet.tablet_id()); |
645 | 0 | } |
646 | 0 | } |
647 | | |
648 | 14.1k | RETURN_NOT_OK(CheckRunningUnlocked()); |
649 | 14.1k | if (child_) { |
650 | 0 | return STATUS(IllegalState, "Apply child result of child transaction"); |
651 | 0 | } |
652 | | |
653 | 34.8k | for (const auto& tablet : result.tablets()) { |
654 | 34.8k | auto& tablet_state = tablets_[tablet.tablet_id()]; |
655 | 34.8k | tablet_state.num_batches += tablet.num_batches(); |
656 | 34.8k | tablet_state.has_metadata = |
657 | 34.8k | tablet_state.has_metadata || |
658 | 34.6k | tablet.metadata_state() == InvolvedTabletMetadataState::EXIST; |
659 | 34.8k | } |
660 | 14.1k | read_point_.ApplyChildTransactionResult(result); |
661 | | |
662 | 14.1k | return Status::OK(); |
663 | 14.1k | } |
664 | | |
665 | 2.42k | const std::string& LogPrefix() { |
666 | 2.42k | return log_prefix_; |
667 | 2.42k | } |
668 | | |
669 | 0 | std::string ToString() EXCLUDES(mutex_) { |
670 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
671 | 0 | return Format("{ metadata: $0 state: $1 }", metadata_, state_.load(std::memory_order_acquire)); |
672 | 0 | } |
673 | | |
674 | 15 | const TransactionId& id() const { |
675 | 15 | return metadata_.transaction_id; |
676 | 15 | } |
677 | | |
678 | 1.07M | ConsistentReadPoint& read_point() { |
679 | 1.07M | return read_point_; |
680 | 1.07M | } |
681 | | |
682 | 0 | Result<TransactionMetadata> Release() { |
683 | 0 | UNIQUE_LOCK(lock, mutex_); |
684 | 0 | auto state = state_.load(std::memory_order_acquire); |
685 | 0 | if (state != TransactionState::kRunning) { |
686 | 0 | return STATUS_FORMAT(IllegalState, "Attempt to release transaction in the wrong state $0: $1", |
687 | 0 | metadata_.transaction_id, AsString(state)); |
688 | 0 | } |
689 | 0 | state_.store(TransactionState::kReleased, std::memory_order_release); |
690 | |
|
691 | 0 | if (!ready_) { |
692 | 0 | CountDownLatch latch(1); |
693 | 0 | Status pick_status; |
694 | 0 | auto transaction = transaction_->shared_from_this(); |
695 | 0 | waiters_.push_back([&latch, &pick_status](const Status& status) { |
696 | 0 | pick_status = status; |
697 | 0 | latch.CountDown(); |
698 | 0 | }); |
699 | 0 | lock.unlock(); |
700 | 0 | RequestStatusTablet(TransactionRpcDeadline()); |
701 | 0 | latch.Wait(); |
702 | 0 | RETURN_NOT_OK(pick_status); |
703 | 0 | lock.lock(); |
704 | 0 | } |
705 | 0 | return metadata_; |
706 | 0 | } |
707 | | |
708 | 0 | void StartHeartbeat() { |
709 | 0 | VLOG_WITH_PREFIX(2) << __PRETTY_FUNCTION__; |
710 | 0 | RequestStatusTablet(TransactionRpcDeadline()); |
711 | 0 | } |
712 | | |
713 | 48.8k | void SetActiveSubTransaction(SubTransactionId id) { |
714 | 48.8k | return subtransaction_.SetActiveSubTransaction(id); |
715 | 48.8k | } |
716 | | |
717 | 23.5k | CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) { |
718 | 23.5k | SCHECK( |
719 | 23.5k | subtransaction_.active(), InternalError, |
720 | 23.5k | "Attempted to rollback to savepoint before creating any savepoints."); |
721 | 23.5k | return subtransaction_.RollbackSubTransaction(id); |
722 | 23.5k | } |
723 | | |
724 | 0 | bool HasSubTransactionState() { |
725 | 0 | return subtransaction_.active(); |
726 | 0 | } |
727 | | |
728 | | private: |
729 | 257k | void CompleteConstruction() { |
730 | 243k | log_prefix_ = Format("$0$1: ", metadata_.transaction_id, child_ ? " (CHILD)" : ""); |
731 | 257k | heartbeat_handle_ = manager_->rpcs().InvalidHandle(); |
732 | 257k | commit_handle_ = manager_->rpcs().InvalidHandle(); |
733 | 257k | abort_handle_ = manager_->rpcs().InvalidHandle(); |
734 | 257k | } |
735 | | |
736 | 239k | void CompleteInit(IsolationLevel isolation) { |
737 | 239k | metadata_.isolation = isolation; |
738 | | // TODO(Piyush): read_point_ might not represent the correct start time for |
739 | | // a READ COMMITTED txn since it might have been updated several times |
740 | | // before a YBTransaction is created. Fix this. |
741 | 239k | if (read_point_.GetReadTime()) { |
742 | 628 | metadata_.start_time = read_point_.GetReadTime().read; |
743 | 238k | } else { |
744 | 238k | metadata_.start_time = read_point_.Now(); |
745 | 238k | } |
746 | 239k | } |
747 | | |
748 | 536k | void SetReadTimeIfNeeded(bool do_it) { |
749 | 536k | if (!read_point_.GetReadTime() && do_it && |
750 | 141k | (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION || |
751 | 75.4k | metadata_.isolation == IsolationLevel::READ_COMMITTED)) { |
752 | 66.2k | read_point_.SetCurrentReadTime(); |
753 | 66.2k | } |
754 | 536k | } |
755 | | |
756 | 267k | CHECKED_STATUS CheckRunningUnlocked() REQUIRES(mutex_) { |
757 | 267k | if (state_.load(std::memory_order_acquire) != TransactionState::kRunning) { |
758 | 7 | auto status = status_; |
759 | 7 | if (status.ok()) { |
760 | 0 | status = STATUS(IllegalState, "Transaction already completed"); |
761 | 0 | } |
762 | 7 | return status; |
763 | 7 | } |
764 | 267k | return Status::OK(); |
765 | 267k | } |
766 | | |
767 | | void DoCommit( |
768 | | CoarseTimePoint deadline, SealOnly seal_only, const Status& status, |
769 | 157k | const YBTransactionPtr& transaction) EXCLUDES(mutex_) { |
770 | 18.4E | VLOG_WITH_PREFIX(1) |
771 | 18.4E | << Format("Commit, seal_only: $0, tablets: $1, status: $2", |
772 | 18.4E | seal_only, tablets_, status); |
773 | | |
774 | 157k | UNIQUE_LOCK(lock, mutex_); |
775 | 157k | if (!status.ok()) { |
776 | 0 | VLOG_WITH_PREFIX(4) << "Commit failed: " << status; |
777 | 0 | auto commit_callback = std::move(commit_callback_); |
778 | 0 | lock.unlock(); |
779 | 0 | commit_callback(status); |
780 | 0 | return; |
781 | 0 | } |
782 | | |
783 | 157k | tserver::UpdateTransactionRequestPB req; |
784 | 157k | req.set_tablet_id(status_tablet_->tablet_id()); |
785 | 157k | req.set_propagated_hybrid_time(manager_->Now().ToUint64()); |
786 | 157k | auto& state = *req.mutable_state(); |
787 | 157k | state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
788 | | // TODO(savepoints) -- Attach metadata about aborted subtransactions to commit message. |
789 | 157k | state.set_status(seal_only ? TransactionStatus::SEALED : TransactionStatus::COMMITTED); |
790 | 157k | state.mutable_tablets()->Reserve(narrow_cast<int>(tablets_.size())); |
791 | 306k | for (const auto& tablet : tablets_) { |
792 | | // If tablet does not have metadata it should not participate in commit. |
793 | 307k | if (!seal_only && !tablet.second.has_metadata) { |
794 | 51 | continue; |
795 | 51 | } |
796 | 306k | state.add_tablets(tablet.first); |
797 | 306k | if (seal_only) { |
798 | 0 | state.add_tablet_batches(tablet.second.num_batches); |
799 | 0 | } |
800 | 306k | } |
801 | | |
802 | | // If we don't have any tablets that have intents written to them, just abort it. |
803 | | // But notify caller that commit was successful, so it is transparent for him. |
804 | 157k | if (state.tablets().empty()) { |
805 | 0 | VLOG_WITH_PREFIX(4) << "Committed empty"; |
806 | 216 | auto status_tablet = status_tablet_; |
807 | 216 | auto commit_callback = std::move(commit_callback_); |
808 | 216 | lock.unlock(); |
809 | 216 | DoAbort(deadline, transaction, status_tablet); |
810 | 216 | commit_callback(Status::OK()); |
811 | 216 | return; |
812 | 216 | } |
813 | | |
814 | 156k | if (subtransaction_.active()) { |
815 | 154 | subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set()); |
816 | 154 | } |
817 | | |
818 | 156k | manager_->rpcs().RegisterAndStart( |
819 | 156k | UpdateTransaction( |
820 | 156k | deadline, |
821 | 156k | status_tablet_.get(), |
822 | 156k | manager_->client(), |
823 | 156k | &req, |
824 | 156k | [this, transaction](const auto& status, const auto& req, const auto& resp) { |
825 | 156k | this->CommitDone(status, resp, transaction); |
826 | 156k | }), |
827 | 156k | &commit_handle_); |
828 | 156k | } |
829 | | |
830 | 82.7k | void DoAbort(CoarseTimePoint deadline, const YBTransactionPtr& transaction) EXCLUDES(mutex_) { |
831 | 82.7k | decltype(status_tablet_) status_tablet; |
832 | 82.7k | { |
833 | 82.7k | std::lock_guard<std::mutex> lock(mutex_); |
834 | 82.7k | status_tablet = status_tablet_; |
835 | 82.7k | } |
836 | 82.7k | DoAbort(deadline, transaction, status_tablet); |
837 | 82.7k | } |
838 | | |
839 | | void DoAbort( |
840 | | CoarseTimePoint deadline, |
841 | | const YBTransactionPtr& transaction, |
842 | 83.0k | internal::RemoteTabletPtr status_tablet) EXCLUDES(mutex_) { |
843 | 83.0k | tserver::AbortTransactionRequestPB req; |
844 | 83.0k | req.set_tablet_id(status_tablet->tablet_id()); |
845 | 83.0k | req.set_propagated_hybrid_time(manager_->Now().ToUint64()); |
846 | 83.0k | req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
847 | | |
848 | 83.0k | manager_->rpcs().RegisterAndStart( |
849 | 83.0k | AbortTransaction( |
850 | 83.0k | deadline, |
851 | 83.0k | status_tablet.get(), |
852 | 83.0k | manager_->client(), |
853 | 83.0k | &req, |
854 | 83.0k | std::bind(&Impl::AbortDone, this, _1, _2, transaction)), |
855 | 83.0k | &abort_handle_); |
856 | | |
857 | 83.0k | DoAbortCleanup(transaction, CleanupType::kImmediate); |
858 | 83.0k | } |
859 | | |
860 | | void DoAbortCleanup(const YBTransactionPtr& transaction, CleanupType cleanup_type) |
861 | 135k | EXCLUDES(mutex_) { |
862 | 135k | if (FLAGS_TEST_disable_proactive_txn_cleanup_on_abort) { |
863 | 0 | VLOG_WITH_PREFIX(1) << "TEST: Disabled proactive transaction cleanup on abort"; |
864 | 0 | return; |
865 | 0 | } |
866 | | |
867 | 135k | std::vector<std::string> tablet_ids; |
868 | 135k | { |
869 | 135k | std::lock_guard<std::mutex> lock(mutex_); |
870 | 135k | tablet_ids.reserve(tablets_.size()); |
871 | 182k | for (const auto& tablet : tablets_) { |
872 | | // We don't check has_metadata here, because intents could be written even in case of |
873 | | // failure. For instance in case of conflict on unique index. |
874 | 182k | tablet_ids.push_back(tablet.first); |
875 | 182k | } |
876 | 80 | VLOG_WITH_PREFIX(1) << "Cleaning up intents from: " << AsString(tablet_ids); |
877 | 135k | } |
878 | | |
879 | 135k | CleanupTransaction( |
880 | 135k | manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse, |
881 | 135k | cleanup_type, tablet_ids); |
882 | 135k | } |
883 | | |
884 | | void CommitDone(const Status& status, |
885 | | const tserver::UpdateTransactionResponsePB& response, |
886 | 156k | const YBTransactionPtr& transaction) { |
887 | 156k | TRACE_TO(trace_, __func__); |
888 | 0 | VLOG_WITH_PREFIX(1) << "Committed: " << status; |
889 | | |
890 | 156k | UpdateClock(response, manager_); |
891 | 156k | manager_->rpcs().Unregister(&commit_handle_); |
892 | | |
893 | 156k | Status actual_status = status.IsAlreadyPresent() ? Status::OK() : status; |
894 | 156k | CommitCallback commit_callback; |
895 | 156k | if (state_.load(std::memory_order_acquire) != TransactionState::kCommitted && |
896 | 0 | actual_status.ok()) { |
897 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
898 | 0 | commit_replicated_ = true; |
899 | 0 | if (running_requests_ != 0) { |
900 | 0 | return; |
901 | 0 | } |
902 | 0 | commit_callback = std::move(commit_callback_); |
903 | 156k | } else { |
904 | 156k | std::lock_guard<std::mutex> lock(mutex_); |
905 | 156k | commit_callback = std::move(commit_callback_); |
906 | 156k | } |
907 | 18.4E | VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status; |
908 | 156k | commit_callback(actual_status); |
909 | | |
910 | 156k | if (actual_status.IsExpired()) { |
911 | | // We can't perform immediate cleanup here because the transaction could be committed, |
912 | | // its APPLY records replicated in all participant tablets, and its status record removed |
913 | | // from the status tablet. |
914 | 52.4k | DoAbortCleanup(transaction, CleanupType::kGraceful); |
915 | 52.4k | } |
916 | 156k | } |
917 | | |
918 | | void AbortDone(const Status& status, |
919 | | const tserver::AbortTransactionResponsePB& response, |
920 | 81.9k | const YBTransactionPtr& transaction) { |
921 | 81.9k | TRACE_TO(trace_, __func__); |
922 | 18.4E | VLOG_WITH_PREFIX(1) << "Aborted: " << status; |
923 | | |
924 | 81.9k | if (response.has_propagated_hybrid_time()) { |
925 | 81.9k | manager_->UpdateClock(HybridTime(response.propagated_hybrid_time())); |
926 | 81.9k | } |
927 | 81.9k | manager_->rpcs().Unregister(&abort_handle_); |
928 | 81.9k | } |
929 | | |
930 | 243k | void RequestStatusTablet(const CoarseTimePoint& deadline) EXCLUDES(mutex_) { |
931 | 243k | TRACE_TO(trace_, __func__); |
932 | 243k | bool expected = false; |
933 | 243k | if (!requested_status_tablet_.compare_exchange_strong( |
934 | 0 | expected, true, std::memory_order_acq_rel)) { |
935 | 0 | return; |
936 | 0 | } |
937 | 18.4E | VLOG_WITH_PREFIX(2) << "RequestStatusTablet()"; |
938 | 243k | auto transaction = transaction_->shared_from_this(); |
939 | 243k | if (metadata_.status_tablet.empty()) { |
940 | 243k | manager_->PickStatusTablet( |
941 | 243k | std::bind(&Impl::StatusTabletPicked, this, _1, deadline, transaction), |
942 | 243k | metadata_.locality); |
943 | 18.4E | } else { |
944 | 18.4E | LookupStatusTablet(metadata_.status_tablet, deadline, transaction); |
945 | 18.4E | } |
946 | 243k | } |
947 | | |
948 | | void StatusTabletPicked(const Result<std::string>& tablet, |
949 | | const CoarseTimePoint& deadline, |
950 | 243k | const YBTransactionPtr& transaction) { |
951 | 243k | TRACE_TO(trace_, __func__); |
952 | 18.4E | VLOG_WITH_PREFIX(2) << "Picked status tablet: " << tablet; |
953 | | |
954 | 243k | if (!tablet.ok()) { |
955 | 0 | NotifyWaiters(tablet.status(), "Pick status tablet"); |
956 | 0 | return; |
957 | 0 | } |
958 | | |
959 | 243k | LookupStatusTablet(*tablet, deadline, transaction); |
960 | 243k | } |
961 | | |
962 | | void LookupStatusTablet(const std::string& tablet_id, |
963 | | const CoarseTimePoint& deadline, |
964 | 243k | const YBTransactionPtr& transaction) { |
965 | 243k | TRACE_TO(trace_, __func__); |
966 | 243k | manager_->client()->LookupTabletById( |
967 | 243k | tablet_id, |
968 | 243k | /* table =*/ nullptr, |
969 | 243k | master::IncludeInactive::kFalse, |
970 | 243k | deadline, |
971 | 243k | std::bind(&Impl::LookupTabletDone, this, _1, transaction), |
972 | 243k | client::UseCache::kTrue); |
973 | 243k | } |
974 | | |
975 | | void LookupTabletDone(const Result<client::internal::RemoteTabletPtr>& result, |
976 | 243k | const YBTransactionPtr& transaction) { |
977 | 243k | TRACE_TO(trace_, __func__); |
978 | 18.4E | VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result); |
979 | | |
980 | 243k | if (!result.ok()) { |
981 | 0 | NotifyWaiters(result.status(), "Lookup tablet"); |
982 | 0 | return; |
983 | 0 | } |
984 | | |
985 | 243k | bool precreated; |
986 | 243k | std::vector<Waiter> waiters; |
987 | 243k | { |
988 | 243k | std::lock_guard<std::mutex> lock(mutex_); |
989 | 243k | status_tablet_ = std::move(*result); |
990 | 243k | if (metadata_.status_tablet.empty()) { |
991 | 243k | metadata_.status_tablet = status_tablet_->tablet_id(); |
992 | 243k | precreated = false; |
993 | 18.4E | } else { |
994 | 18.4E | precreated = true; |
995 | 18.4E | ready_ = true; |
996 | 18.4E | waiters_.swap(waiters); |
997 | 18.4E | } |
998 | 243k | } |
999 | 243k | if (precreated) { |
1000 | 0 | for (const auto& waiter : waiters) { |
1001 | 0 | waiter(Status::OK()); |
1002 | 0 | } |
1003 | 0 | } |
1004 | 243k | SendHeartbeat(precreated ? TransactionStatus::PENDING : TransactionStatus::CREATED, |
1005 | 243k | metadata_.transaction_id, transaction_->shared_from_this()); |
1006 | 243k | } |
1007 | | |
1008 | 242k | void NotifyWaiters(const Status& status, const char* operation) { |
1009 | 242k | std::vector<Waiter> waiters; |
1010 | 242k | { |
1011 | 242k | std::lock_guard<std::mutex> lock(mutex_); |
1012 | 242k | if (status.ok()) { |
1013 | 242k | DCHECK(!ready_); |
1014 | 242k | ready_ = true; |
1015 | 3 | } else { |
1016 | 3 | SetErrorUnlocked(status, operation); |
1017 | 3 | } |
1018 | 242k | waiters_.swap(waiters); |
1019 | 242k | } |
1020 | 242k | for (const auto& waiter : waiters) { |
1021 | 242k | waiter(status); |
1022 | 242k | } |
1023 | 242k | } |
1024 | | |
1025 | | void SendHeartbeat(TransactionStatus status, |
1026 | | const TransactionId& id, |
1027 | 520k | const std::weak_ptr<YBTransaction>& weak_transaction) { |
1028 | 520k | auto transaction = weak_transaction.lock(); |
1029 | 520k | if (!transaction) { |
1030 | | // Cannot use LOG_WITH_PREFIX here, since this was actually destroyed. |
1031 | 105 | VLOG(1) << id << ": Transaction destroyed"; |
1032 | 218k | return; |
1033 | 218k | } |
1034 | | |
1035 | 302k | auto current_state = state_.load(std::memory_order_acquire); |
1036 | | |
1037 | 302k | if (!AllowHeartbeat(current_state, status)) { |
1038 | 10 | VLOG_WITH_PREFIX(1) << " Send heartbeat cancelled: " << yb::ToString(transaction); |
1039 | 19.6k | return; |
1040 | 19.6k | } |
1041 | | |
1042 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << "(" << TransactionStatus_Name(status) << ")"; |
1043 | | |
1044 | 283k | MonoDelta timeout; |
1045 | 283k | if (status != TransactionStatus::CREATED) { |
1046 | 40.1k | if (GetAtomicFlag(&FLAGS_transaction_disable_heartbeat_in_tests)) { |
1047 | 0 | HeartbeatDone(Status::OK(), /* request= */ {}, /* response= */ {}, status, transaction); |
1048 | 0 | return; |
1049 | 0 | } |
1050 | 40.1k | timeout = std::chrono::microseconds(FLAGS_transaction_heartbeat_usec); |
1051 | 242k | } else { |
1052 | 242k | timeout = TransactionRpcTimeout(); |
1053 | 242k | } |
1054 | | |
1055 | 283k | tserver::UpdateTransactionRequestPB req; |
1056 | | |
1057 | 283k | internal::RemoteTabletPtr status_tablet; |
1058 | 283k | { |
1059 | 283k | std::lock_guard<std::mutex> lock(mutex_); |
1060 | 283k | status_tablet = status_tablet_; |
1061 | 283k | } |
1062 | | |
1063 | 283k | req.set_tablet_id(status_tablet->tablet_id()); |
1064 | 283k | req.set_propagated_hybrid_time(manager_->Now().ToUint64()); |
1065 | 283k | auto& state = *req.mutable_state(); |
1066 | | // TODO(savepoints) -- Attach metadata about aborted subtransactions in heartbeat. |
1067 | 283k | state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
1068 | 283k | state.set_status(status); |
1069 | 283k | manager_->rpcs().RegisterAndStart( |
1070 | 283k | UpdateTransaction( |
1071 | 283k | CoarseMonoClock::now() + timeout, |
1072 | 283k | status_tablet.get(), |
1073 | 283k | manager_->client(), |
1074 | 283k | &req, |
1075 | 283k | std::bind(&Impl::HeartbeatDone, this, _1, _2, _3, status, transaction)), |
1076 | 283k | &heartbeat_handle_); |
1077 | 283k | } |
1078 | | |
1079 | 302k | static bool AllowHeartbeat(TransactionState current_state, TransactionStatus status) { |
1080 | 302k | switch (current_state) { |
1081 | 283k | case TransactionState::kRunning: |
1082 | 283k | return true; |
1083 | 0 | case TransactionState::kReleased: FALLTHROUGH_INTENDED; |
1084 | 0 | case TransactionState::kSealed: |
1085 | 0 | return status == TransactionStatus::CREATED; |
1086 | 19.4k | case TransactionState::kAborted: FALLTHROUGH_INTENDED; |
1087 | 19.6k | case TransactionState::kCommitted: |
1088 | 19.6k | return false; |
1089 | 0 | } |
1090 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionState, current_state); |
1091 | 0 | } |
1092 | | |
1093 | | void HeartbeatDone(Status status, |
1094 | | const tserver::UpdateTransactionRequestPB& request, |
1095 | | const tserver::UpdateTransactionResponsePB& response, |
1096 | | TransactionStatus transaction_status, |
1097 | 281k | const YBTransactionPtr& transaction) { |
1098 | 281k | UpdateClock(response, manager_); |
1099 | 281k | manager_->rpcs().Unregister(&heartbeat_handle_); |
1100 | | |
1101 | 281k | if (status.ok() && transaction_status == TransactionStatus::CREATED) { |
1102 | 242k | auto decode_result = FullyDecodeTransactionId(request.state().transaction_id()); |
1103 | 242k | if (decode_result.ok()) { |
1104 | 242k | metadata_.transaction_id = *decode_result; |
1105 | 242k | auto id_str = AsString(metadata_.transaction_id); |
1106 | | // It is not fully thread safe, since we don't use mutex to access log_prefix_. |
1107 | | // But here we just replace characters inplace. |
1108 | | // It would not crash anyway, and could produce wrong id in the logs. |
1109 | | // It is ok, since one moment before we would output nil id. |
1110 | 242k | log_prefix_.replace(0, id_str.length(), id_str); |
1111 | 1 | } else { |
1112 | 1 | status = decode_result.status(); |
1113 | 1 | } |
1114 | 242k | } |
1115 | | |
1116 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " |
1117 | 18.4E | << TransactionStatus_Name(transaction_status) << ")"; |
1118 | | |
1119 | 281k | if (status.ok()) { |
1120 | 279k | if (transaction_status == TransactionStatus::CREATED) { |
1121 | 242k | NotifyWaiters(Status::OK(), "Heartbeat"); |
1122 | 242k | } |
1123 | 279k | std::weak_ptr<YBTransaction> weak_transaction(transaction); |
1124 | 279k | manager_->client()->messenger()->scheduler().Schedule( |
1125 | 275k | [this, weak_transaction, id = metadata_.transaction_id](const Status&) { |
1126 | 275k | SendHeartbeat(TransactionStatus::PENDING, id, weak_transaction); |
1127 | 275k | }, |
1128 | 279k | std::chrono::microseconds(FLAGS_transaction_heartbeat_usec)); |
1129 | 2.21k | } else { |
1130 | 2.21k | auto state = state_.load(std::memory_order_acquire); |
1131 | 2.21k | LOG_WITH_PREFIX(WARNING) << "Send heartbeat failed: " << status << ", state: " << state; |
1132 | 2.42k | if (status.IsAborted() || status.IsExpired()) { |
1133 | | // IsAborted - Service is shutting down, no reason to retry. |
1134 | | // IsExpired - Transaction expired. |
1135 | 55 | if (transaction_status == TransactionStatus::CREATED) { |
1136 | 0 | NotifyWaiters(status, "Heartbeat"); |
1137 | 55 | } else { |
1138 | 55 | SetError(status, "Heartbeat"); |
1139 | 55 | } |
1140 | | // If state is committed, then we should not cleanup. |
1141 | 55 | if (status.IsExpired() && state == TransactionState::kRunning) { |
1142 | 7 | DoAbortCleanup(transaction, CleanupType::kImmediate); |
1143 | 7 | } |
1144 | 55 | return; |
1145 | 55 | } |
1146 | | // Other errors could have different causes, but we should just retry sending heartbeat |
1147 | | // in this case. |
1148 | 2.15k | SendHeartbeat(transaction_status, metadata_.transaction_id, transaction); |
1149 | 2.15k | } |
1150 | 281k | } |
1151 | | |
1152 | 55 | void SetError(const Status& status, const char* operation) EXCLUDES(mutex_) { |
1153 | 55 | std::lock_guard<std::mutex> lock(mutex_); |
1154 | 55 | SetErrorUnlocked(status, operation); |
1155 | 55 | } |
1156 | | |
1157 | 100k | void SetErrorUnlocked(const Status& status, const char* operation) REQUIRES(mutex_) { |
1158 | 18.4E | VLOG_WITH_PREFIX(1) << operation << " failed: " << status; |
1159 | 100k | if (status_.ok()) { |
1160 | 78.7k | status_ = status.CloneAndPrepend(operation); |
1161 | 78.7k | state_.store(TransactionState::kAborted, std::memory_order_release); |
1162 | 78.7k | } |
1163 | 100k | } |
1164 | | |
1165 | | ChildTransactionDataPB PrepareChildTransactionDataUnlocked( |
1166 | 82.5k | const YBTransactionPtr& transaction) REQUIRES(mutex_) { |
1167 | 82.5k | ChildTransactionDataPB data; |
1168 | 82.5k | metadata_.ToPB(data.mutable_metadata()); |
1169 | 82.5k | read_point_.PrepareChildTransactionData(&data); |
1170 | 82.5k | return data; |
1171 | 82.5k | } |
1172 | | |
1173 | | void DoPrepareChild(const Status& status, |
1174 | | const YBTransactionPtr& transaction, |
1175 | 3.49k | PrepareChildCallback callback) EXCLUDES(mutex_) { |
1176 | 3.49k | TRACE_TO(trace_, __func__); |
1177 | 3.49k | if (!status.ok()) { |
1178 | 40 | callback(status); |
1179 | 40 | return; |
1180 | 40 | } |
1181 | | |
1182 | 3.45k | ChildTransactionDataPB child_txn_data_pb; |
1183 | 3.45k | { |
1184 | 3.45k | std::lock_guard<std::mutex> lock(mutex_); |
1185 | 3.45k | child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction); |
1186 | 3.45k | } |
1187 | | |
1188 | 3.45k | callback(child_txn_data_pb); |
1189 | 3.45k | } |
1190 | | |
1191 | 157k | CHECKED_STATUS CheckCouldCommitUnlocked(SealOnly seal_only) REQUIRES(mutex_) { |
1192 | 157k | RETURN_NOT_OK(CheckRunningUnlocked()); |
1193 | 157k | if (child_) { |
1194 | 0 | return STATUS(IllegalState, "Commit of child transaction is not allowed"); |
1195 | 0 | } |
1196 | 157k | if (IsRestartRequired()) { |
1197 | 0 | return STATUS( |
1198 | 0 | IllegalState, "Commit of transaction that requires restart is not allowed"); |
1199 | 0 | } |
1200 | 157k | if (!seal_only && running_requests_ > 0) { |
1201 | 0 | return STATUS(IllegalState, "Commit of transaction with running requests"); |
1202 | 0 | } |
1203 | | |
1204 | 157k | return Status::OK(); |
1205 | 157k | } |
1206 | | |
1207 | | // The trace buffer. |
1208 | | scoped_refptr<Trace> trace_; |
1209 | | |
1210 | | const CoarseTimePoint start_; |
1211 | | |
1212 | | // Manager is created once per service. |
1213 | | TransactionManager* const manager_; |
1214 | | |
1215 | | // Transaction related to this impl. |
1216 | | YBTransaction* const transaction_; |
1217 | | |
1218 | | TransactionMetadata metadata_; |
1219 | | ConsistentReadPoint read_point_; |
1220 | | |
1221 | | // Metadata tracking savepoint-related state for the scope of this transaction. |
1222 | | YBSubTransaction subtransaction_; |
1223 | | |
1224 | | std::atomic<bool> requested_status_tablet_{false}; |
1225 | | internal::RemoteTabletPtr status_tablet_ GUARDED_BY(mutex_); |
1226 | | std::atomic<TransactionState> state_{TransactionState::kRunning}; |
1227 | | |
1228 | | // Transaction is successfully initialized and ready to process intents. |
1229 | | const bool child_; |
1230 | | const bool child_had_read_time_ = false; |
1231 | | bool ready_ GUARDED_BY(mutex_) = false; |
1232 | | CommitCallback commit_callback_ GUARDED_BY(mutex_); |
1233 | | Status status_ GUARDED_BY(mutex_); |
1234 | | |
1235 | | // The following fields are initialized in CompleteConstruction() and can be used with no locking. |
1236 | | std::string log_prefix_; |
1237 | | rpc::Rpcs::Handle heartbeat_handle_; |
1238 | | rpc::Rpcs::Handle commit_handle_; |
1239 | | rpc::Rpcs::Handle abort_handle_; |
1240 | | |
1241 | | struct TabletState { |
1242 | | size_t num_batches = 0; |
1243 | | bool has_metadata = false; |
1244 | | |
1245 | 0 | std::string ToString() const { |
1246 | 0 | return Format("{ num_batches: $0 has_metadata: $1 }", num_batches, has_metadata); |
1247 | 0 | } |
1248 | | }; |
1249 | | |
1250 | | typedef std::unordered_map<TabletId, TabletState> TabletStates; |
1251 | | |
1252 | | std::mutex mutex_; |
1253 | | TabletStates tablets_ GUARDED_BY(mutex_); |
1254 | | std::vector<Waiter> waiters_; |
1255 | | std::promise<Result<TransactionMetadata>> metadata_promise_; |
1256 | | std::shared_future<Result<TransactionMetadata>> metadata_future_ GUARDED_BY(mutex_); |
1257 | | // As of 2021-04-05 running_requests_ reflects number of ops in progress within this transaction |
1258 | | // only if no in-transaction operations have failed. |
1259 | | // If in-transaction operation has failed during tablet lookup or it has failed and will be |
1260 | | // retried by YBSession inside the same transaction - Transaction::Flushed is not getting called |
1261 | | // and running_requests_ is not updated. |
1262 | | // For YBSession-level retries Transaction::Flushed will be called when operation is finally |
1263 | | // successfully flushed, if operation fails after retry - Transaction::Flushed is not getting |
1264 | | // called. |
1265 | | // We might need to fix this before turning on transactions sealing. |
1266 | | // https://github.com/yugabyte/yugabyte-db/issues/7984. |
1267 | | size_t running_requests_ GUARDED_BY(mutex_) = 0; |
1268 | | // Set to true after commit record is replicated. Used only during transaction sealing. |
1269 | | bool commit_replicated_ = false; |
1270 | | }; |
1271 | | |
1272 | 322k | CoarseTimePoint AdjustDeadline(CoarseTimePoint deadline) { |
1273 | 322k | if (deadline == CoarseTimePoint()) { |
1274 | 161k | return TransactionRpcDeadline(); |
1275 | 161k | } |
1276 | 160k | return deadline; |
1277 | 160k | } |
1278 | | |
1279 | | YBTransaction::YBTransaction(TransactionManager* manager, TransactionLocality locality) |
1280 | 243k | : impl_(new Impl(manager, this, locality)) { |
1281 | 243k | } |
1282 | | |
1283 | | YBTransaction::YBTransaction( |
1284 | | TransactionManager* manager, const TransactionMetadata& metadata, PrivateOnlyTag) |
1285 | 0 | : impl_(new Impl(manager, this, metadata)) { |
1286 | 0 | } |
1287 | | |
1288 | | YBTransaction::YBTransaction(TransactionManager* manager, ChildTransactionData data) |
1289 | 14.6k | : impl_(new Impl(manager, this, std::move(data))) { |
1290 | 14.6k | } |
1291 | | |
1292 | 253k | YBTransaction::~YBTransaction() { |
1293 | 253k | } |
1294 | | |
1295 | 96.3k | void YBTransaction::SetPriority(uint64_t priority) { |
1296 | 96.3k | impl_->SetPriority(priority); |
1297 | 96.3k | } |
1298 | | |
1299 | 20.2k | uint64_t YBTransaction::GetPriority() const { |
1300 | 20.2k | return impl_->GetPriority(); |
1301 | 20.2k | } |
1302 | | |
1303 | 238k | Status YBTransaction::Init(IsolationLevel isolation, const ReadHybridTime& read_time) { |
1304 | 238k | return impl_->Init(isolation, read_time); |
1305 | 238k | } |
1306 | | |
1307 | | void YBTransaction::InitWithReadPoint( |
1308 | | IsolationLevel isolation, |
1309 | 628 | ConsistentReadPoint&& read_point) { |
1310 | 628 | return impl_->InitWithReadPoint(isolation, std::move(read_point)); |
1311 | 628 | } |
1312 | | |
1313 | 2.15M | internal::TxnBatcherIf& YBTransaction::batcher_if() { |
1314 | 2.15M | return *impl_; |
1315 | 2.15M | } |
1316 | | |
1317 | | void YBTransaction::Commit( |
1318 | 78.5k | CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) { |
1319 | 78.5k | impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback)); |
1320 | 78.5k | } |
1321 | | |
1322 | 78.5k | void YBTransaction::Commit(CoarseTimePoint deadline, CommitCallback callback) { |
1323 | 78.5k | Commit(deadline, SealOnly::kFalse, callback); |
1324 | 78.5k | } |
1325 | | |
1326 | 0 | void YBTransaction::Commit(CommitCallback callback) { |
1327 | 0 | Commit(CoarseTimePoint(), SealOnly::kFalse, std::move(callback)); |
1328 | 0 | } |
1329 | | |
1330 | 15 | const TransactionId& YBTransaction::id() const { |
1331 | 15 | return impl_->id(); |
1332 | 15 | } |
1333 | | |
1334 | 84.6k | IsolationLevel YBTransaction::isolation() const { |
1335 | 84.6k | return impl_->isolation(); |
1336 | 84.6k | } |
1337 | | |
1338 | 0 | const ConsistentReadPoint& YBTransaction::read_point() const { |
1339 | 0 | return impl_->read_point(); |
1340 | 0 | } |
1341 | | |
1342 | 1.07M | ConsistentReadPoint& YBTransaction::read_point() { |
1343 | 1.07M | return impl_->read_point(); |
1344 | 1.07M | } |
1345 | | |
1346 | | std::future<Status> YBTransaction::CommitFuture( |
1347 | 78.6k | CoarseTimePoint deadline, SealOnly seal_only) { |
1348 | 78.6k | return MakeFuture<Status>([this, deadline, seal_only](auto callback) { |
1349 | 78.6k | impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback)); |
1350 | 78.6k | }); |
1351 | 78.6k | } |
1352 | | |
1353 | 82.9k | void YBTransaction::Abort(CoarseTimePoint deadline) { |
1354 | 82.9k | impl_->Abort(AdjustDeadline(deadline)); |
1355 | 82.9k | } |
1356 | | |
1357 | 59.5k | bool YBTransaction::IsRestartRequired() const { |
1358 | 59.5k | return impl_->IsRestartRequired(); |
1359 | 59.5k | } |
1360 | | |
1361 | 0 | Result<YBTransactionPtr> YBTransaction::CreateRestartedTransaction() { |
1362 | 0 | auto result = impl_->CreateSimilarTransaction(); |
1363 | 0 | RETURN_NOT_OK(impl_->FillRestartedTransaction(result->impl_.get())); |
1364 | 0 | return result; |
1365 | 0 | } |
1366 | | |
1367 | 0 | Status YBTransaction::FillRestartedTransaction(const YBTransactionPtr& dest) { |
1368 | 0 | return impl_->FillRestartedTransaction(dest->impl_.get()); |
1369 | 0 | } |
1370 | | |
1371 | | void YBTransaction::PrepareChild( |
1372 | | ForceConsistentRead force_consistent_read, CoarseTimePoint deadline, |
1373 | 0 | PrepareChildCallback callback) { |
1374 | 0 | return impl_->PrepareChild(force_consistent_read, deadline, std::move(callback)); |
1375 | 0 | } |
1376 | | |
1377 | | std::future<Result<ChildTransactionDataPB>> YBTransaction::PrepareChildFuture( |
1378 | 82.3k | ForceConsistentRead force_consistent_read, CoarseTimePoint deadline) { |
1379 | 82.3k | return MakeFuture<Result<ChildTransactionDataPB>>( |
1380 | 81.8k | [this, deadline, force_consistent_read](auto callback) { |
1381 | 81.8k | impl_->PrepareChild(force_consistent_read, AdjustDeadline(deadline), std::move(callback)); |
1382 | 81.8k | }); |
1383 | 82.3k | } |
1384 | | |
1385 | 14.5k | Result<ChildTransactionResultPB> YBTransaction::FinishChild() { |
1386 | 14.5k | return impl_->FinishChild(); |
1387 | 14.5k | } |
1388 | | |
1389 | 5.82k | std::shared_future<Result<TransactionMetadata>> YBTransaction::GetMetadata() const { |
1390 | 5.82k | return impl_->GetMetadata(); |
1391 | 5.82k | } |
1392 | | |
1393 | 14.1k | Status YBTransaction::ApplyChildResult(const ChildTransactionResultPB& result) { |
1394 | 14.1k | return impl_->ApplyChildResult(result); |
1395 | 14.1k | } |
1396 | | |
1397 | 0 | std::string YBTransaction::ToString() const { |
1398 | 0 | return impl_->ToString(); |
1399 | 0 | } |
1400 | | |
1401 | 0 | Result<TransactionMetadata> YBTransaction::Release() { |
1402 | 0 | return impl_->Release(); |
1403 | 0 | } |
1404 | | |
1405 | 782k | Trace* YBTransaction::trace() { |
1406 | 782k | return impl_->trace(); |
1407 | 782k | } |
1408 | | |
1409 | | YBTransactionPtr YBTransaction::Take( |
1410 | 0 | TransactionManager* manager, const TransactionMetadata& metadata) { |
1411 | 0 | auto result = std::make_shared<YBTransaction>(manager, metadata, PrivateOnlyTag()); |
1412 | 0 | result->impl_->StartHeartbeat(); |
1413 | 0 | return result; |
1414 | 0 | } |
1415 | | |
1416 | 48.8k | void YBTransaction::SetActiveSubTransaction(SubTransactionId id) { |
1417 | 48.8k | return impl_->SetActiveSubTransaction(id); |
1418 | 48.8k | } |
1419 | | |
1420 | 23.5k | Status YBTransaction::RollbackSubTransaction(SubTransactionId id) { |
1421 | 23.5k | return impl_->RollbackSubTransaction(id); |
1422 | 23.5k | } |
1423 | | |
1424 | 0 | bool YBTransaction::HasSubTransactionState() { |
1425 | 0 | return impl_->HasSubTransactionState(); |
1426 | 0 | } |
1427 | | |
1428 | | } // namespace client |
1429 | | } // namespace yb |