/Users/deen/code/yugabyte-db/src/yb/client/transaction.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/client/transaction.h" |
17 | | |
18 | | #include <unordered_set> |
19 | | |
20 | | #include "yb/client/batcher.h" |
21 | | #include "yb/client/client.h" |
22 | | #include "yb/client/in_flight_op.h" |
23 | | #include "yb/client/meta_cache.h" |
24 | | #include "yb/client/transaction_cleanup.h" |
25 | | #include "yb/client/transaction_manager.h" |
26 | | #include "yb/client/transaction_rpc.h" |
27 | | #include "yb/client/yb_op.h" |
28 | | |
29 | | #include "yb/common/common.pb.h" |
30 | | #include "yb/common/transaction.h" |
31 | | #include "yb/common/transaction_error.h" |
32 | | #include "yb/common/ybc_util.h" |
33 | | |
34 | | #include "yb/rpc/messenger.h" |
35 | | #include "yb/rpc/rpc.h" |
36 | | #include "yb/rpc/scheduler.h" |
37 | | |
38 | | #include "yb/tserver/tserver_service.pb.h" |
39 | | |
40 | | #include "yb/util/countdown_latch.h" |
41 | | #include "yb/util/flag_tags.h" |
42 | | #include "yb/util/format.h" |
43 | | #include "yb/util/logging.h" |
44 | | #include "yb/util/random_util.h" |
45 | | #include "yb/util/result.h" |
46 | | #include "yb/util/scope_exit.h" |
47 | | #include "yb/util/status_format.h" |
48 | | #include "yb/util/status_log.h" |
49 | | #include "yb/util/strongly_typed_bool.h" |
50 | | #include "yb/util/trace.h" |
51 | | #include "yb/util/tsan_util.h" |
52 | | #include "yb/util/unique_lock.h" |
53 | | |
54 | | using namespace std::literals; |
55 | | using namespace std::placeholders; |
56 | | |
57 | | DEFINE_int32(txn_print_trace_every_n, 0, |
58 | | "Controls the rate at which txn traces are printed. Setting this to 0 " |
59 | | "disables printing the collected traces."); |
60 | | TAG_FLAG(txn_print_trace_every_n, advanced); |
61 | | TAG_FLAG(txn_print_trace_every_n, runtime); |
62 | | |
63 | | DEFINE_int32(txn_slow_op_threshold_ms, 0, |
64 | | "Controls the rate at which txn traces are printed. Setting this to 0 " |
65 | | "disables printing the collected traces."); |
66 | | TAG_FLAG(txn_slow_op_threshold_ms, advanced); |
67 | | TAG_FLAG(txn_slow_op_threshold_ms, runtime); |
68 | | |
69 | | DEFINE_uint64(transaction_heartbeat_usec, 500000 * yb::kTimeMultiplier, |
70 | | "Interval of transaction heartbeat in usec."); |
71 | | DEFINE_bool(transaction_disable_heartbeat_in_tests, false, "Disable heartbeat during test."); |
72 | | DECLARE_uint64(max_clock_skew_usec); |
73 | | |
74 | | DEFINE_test_flag(int32, transaction_inject_flushed_delay_ms, 0, |
75 | | "Inject delay before processing flushed operations by transaction."); |
76 | | |
77 | | DEFINE_test_flag(bool, disable_proactive_txn_cleanup_on_abort, false, |
78 | | "Disable cleanup of intents in abort path."); |
79 | | |
80 | | namespace yb { |
81 | | namespace client { |
82 | | |
83 | | namespace { |
84 | | |
85 | | YB_STRONGLY_TYPED_BOOL(Child); |
86 | | YB_DEFINE_ENUM(TransactionState, (kRunning)(kAborted)(kCommitted)(kReleased)(kSealed)); |
87 | | |
88 | | } // namespace |
89 | | |
90 | 13.8k | Result<ChildTransactionData> ChildTransactionData::FromPB(const ChildTransactionDataPB& data) { |
91 | 13.8k | ChildTransactionData result; |
92 | 13.8k | auto metadata = TransactionMetadata::FromPB(data.metadata()); |
93 | 13.8k | RETURN_NOT_OK(metadata); |
94 | 13.8k | result.metadata = std::move(*metadata); |
95 | 13.8k | result.read_time = ReadHybridTime::FromReadTimePB(data); |
96 | 13.8k | for (const auto& entry : data.local_limits()) { |
97 | 0 | result.local_limits.emplace(entry.first, HybridTime(entry.second)); |
98 | 0 | } |
99 | 13.8k | return result; |
100 | 13.8k | } |
101 | | |
102 | | YB_DEFINE_ENUM(MetadataState, (kMissing)(kMaybePresent)(kPresent)); |
103 | | |
104 | 61.7k | void YBSubTransaction::SetActiveSubTransaction(SubTransactionId id) { |
105 | 61.7k | sub_txn_.subtransaction_id = id; |
106 | 61.7k | highest_subtransaction_id_ = std::max(highest_subtransaction_id_, id); |
107 | 61.7k | } |
108 | | |
109 | 13.5k | CHECKED_STATUS YBSubTransaction::RollbackSubTransaction(SubTransactionId id) { |
110 | | // We should abort the range [id, sub_txn_.highest_subtransaction_id]. It's possible that we |
111 | | // have created and released savepoints, such that there have been writes with a |
112 | | // subtransaction_id greater than sub_txn_.subtransaction_id, and those should be aborted as |
113 | | // well. |
114 | 13.5k | SCHECK_GE( |
115 | 13.5k | highest_subtransaction_id_, id, |
116 | 13.5k | InternalError, |
117 | 13.5k | "Attempted to rollback to non-existent savepoint."); |
118 | 13.5k | return sub_txn_.aborted.SetRange(id, highest_subtransaction_id_); |
119 | 13.5k | } |
120 | | |
121 | 52.4k | const SubTransactionMetadata& YBSubTransaction::get() { return sub_txn_; } |
122 | | |
123 | | class YBTransaction::Impl final : public internal::TxnBatcherIf { |
124 | | public: |
125 | | Impl(TransactionManager* manager, YBTransaction* transaction, TransactionLocality locality) |
126 | | : trace_(new Trace), |
127 | | start_(CoarseMonoClock::Now()), |
128 | | manager_(manager), |
129 | | transaction_(transaction), |
130 | | read_point_(manager->clock()), |
131 | 411k | child_(Child::kFalse) { |
132 | 411k | metadata_.priority = RandomUniformInt<uint64_t>(); |
133 | 411k | metadata_.locality = locality; |
134 | 411k | CompleteConstruction(); |
135 | 411k | VLOG_WITH_PREFIX0 (2) << "Started, metadata: " << metadata_0 ; |
136 | 411k | } |
137 | | |
138 | | Impl(TransactionManager* manager, YBTransaction* transaction, const TransactionMetadata& metadata) |
139 | | : trace_(new Trace), |
140 | | start_(CoarseMonoClock::Now()), |
141 | | manager_(manager), |
142 | | transaction_(transaction), |
143 | | metadata_(metadata), |
144 | | read_point_(manager->clock()), |
145 | 0 | child_(Child::kFalse) { |
146 | 0 | CompleteConstruction(); |
147 | 0 | VLOG_WITH_PREFIX(2) << "Taken, metadata: " << metadata_; |
148 | 0 | } |
149 | | |
150 | | Impl(TransactionManager* manager, YBTransaction* transaction, ChildTransactionData data) |
151 | | : trace_(new Trace), |
152 | | start_(CoarseMonoClock::Now()), |
153 | | manager_(manager), |
154 | | transaction_(transaction), |
155 | | read_point_(manager->clock()), |
156 | | child_(Child::kTrue), |
157 | 13.8k | child_had_read_time_(data.read_time) { |
158 | | // For serializable isolation we use read intents, so could always read most recent |
159 | | // version of DB. |
160 | | // Otherwise there is possible case when we miss value change that happened after transaction |
161 | | // start. |
162 | 13.8k | if (data.metadata.isolation == IsolationLevel::SNAPSHOT_ISOLATION13.8k ) { |
163 | 13.8k | read_point_.SetReadTime(std::move(data.read_time), std::move(data.local_limits)); |
164 | 13.8k | } |
165 | 13.8k | metadata_ = std::move(data.metadata); |
166 | 13.8k | CompleteConstruction(); |
167 | 13.8k | VLOG_WITH_PREFIX0 (2) << "Started child, metadata: " << metadata_0 ; |
168 | 13.8k | ready_ = true; |
169 | 13.8k | } |
170 | | |
171 | 419k | ~Impl() { |
172 | 419k | manager_->rpcs().Abort({&heartbeat_handle_, &commit_handle_, &abort_handle_}); |
173 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !waiters_.empty()) << "Non empty waiters"; |
174 | 419k | const auto threshold = GetAtomicFlag(&FLAGS_txn_slow_op_threshold_ms); |
175 | 419k | const auto now = CoarseMonoClock::Now(); |
176 | 419k | if (trace_->must_print() |
177 | 419k | || (threshold > 0 && ToMilliseconds(now - start_) > threshold0 )) { |
178 | 0 | LOG(INFO) << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n" |
179 | 0 | << trace_->DumpToString(true); |
180 | 419k | } else { |
181 | 419k | YB_LOG_IF_EVERY_N0 (INFO, FLAGS_txn_print_trace_every_n > 0, FLAGS_txn_print_trace_every_n) |
182 | 0 | << ToString() << " took " << ToMicroseconds(now - start_) << "us. Trace: \n" |
183 | 0 | << trace_->DumpToString(true); |
184 | 419k | } |
185 | 419k | } |
186 | | |
187 | 267k | void SetPriority(uint64_t priority) { |
188 | 267k | metadata_.priority = priority; |
189 | 267k | } |
190 | | |
191 | 68.9k | uint64_t GetPriority() const { |
192 | 68.9k | return metadata_.priority; |
193 | 68.9k | } |
194 | | |
195 | 2 | YBTransactionPtr CreateSimilarTransaction() { |
196 | 2 | return std::make_shared<YBTransaction>(manager_); |
197 | 2 | } |
198 | | |
199 | 392k | CHECKED_STATUS Init(IsolationLevel isolation, const ReadHybridTime& read_time) { |
200 | 392k | TRACE_TO(trace_, __func__); |
201 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", " |
202 | 18.4E | << read_time << ")"; |
203 | 392k | if (read_point_.GetReadTime().read.is_valid()) { |
204 | 0 | return STATUS_FORMAT(IllegalState, "Read point already specified: $0", |
205 | 0 | read_point_.GetReadTime()); |
206 | 0 | } |
207 | | |
208 | 392k | if (read_time.read.is_valid()) { |
209 | 0 | read_point_.SetReadTime(read_time, ConsistentReadPoint::HybridTimeMap()); |
210 | 0 | } |
211 | 392k | CompleteInit(isolation); |
212 | 392k | return Status::OK(); |
213 | 392k | } |
214 | | |
215 | 13.4k | void InitWithReadPoint(IsolationLevel isolation, ConsistentReadPoint&& read_point) { |
216 | 13.4k | VLOG_WITH_PREFIX0 (1) << __func__ << "(" << IsolationLevel_Name(isolation) << ", " |
217 | 0 | << read_point.GetReadTime() << ")"; |
218 | | |
219 | 13.4k | read_point_.MoveFrom(&read_point); |
220 | 13.4k | CompleteInit(isolation); |
221 | 13.4k | } |
222 | | |
223 | 191k | const IsolationLevel isolation() const { |
224 | 191k | return metadata_.isolation; |
225 | 191k | } |
226 | | |
227 | | // This transaction is a restarted transaction, so we set it up with data from original one. |
228 | 2 | CHECKED_STATUS FillRestartedTransaction(Impl* other) EXCLUDES(mutex_) { |
229 | 2 | VLOG_WITH_PREFIX0 (1) << "Setup restart to " << other->ToString()0 ; |
230 | 2 | auto transaction = transaction_->shared_from_this(); |
231 | 2 | TRACE_TO(trace_, __func__); |
232 | 2 | { |
233 | 2 | std::lock_guard<std::mutex> lock(mutex_); |
234 | 2 | auto state = state_.load(std::memory_order_acquire); |
235 | 2 | if (state != TransactionState::kRunning) { |
236 | 0 | return STATUS_FORMAT( |
237 | 0 | IllegalState, "Restart of completed transaction $0: $1", |
238 | 0 | metadata_.transaction_id, state); |
239 | 0 | } |
240 | 2 | if (!read_point_.IsRestartRequired()) { |
241 | 0 | return STATUS_FORMAT( |
242 | 0 | IllegalState, "Restart of transaction that does not require restart: $0", |
243 | 0 | metadata_.transaction_id); |
244 | 0 | } |
245 | 2 | other->read_point_.MoveFrom(&read_point_); |
246 | 2 | other->read_point_.Restart(); |
247 | 2 | other->metadata_.isolation = metadata_.isolation; |
248 | | // TODO(Piyush): Do we need the below? If yes, prove with a test case and add it. |
249 | | // other->metadata_.priority = metadata_.priority; |
250 | 2 | if (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) { |
251 | 0 | other->metadata_.start_time = other->read_point_.GetReadTime().read; |
252 | 2 | } else { |
253 | 2 | other->metadata_.start_time = other->read_point_.Now(); |
254 | 2 | } |
255 | 2 | state_.store(TransactionState::kAborted, std::memory_order_release); |
256 | 2 | } |
257 | 0 | DoAbort(TransactionRpcDeadline(), transaction); |
258 | | |
259 | 2 | return Status::OK(); |
260 | 2 | } |
261 | | |
262 | 1.59M | Trace* trace() { |
263 | 1.59M | return trace_.get(); |
264 | 1.59M | } |
265 | | |
266 | | CHECKED_STATUS CheckTransactionLocality(internal::InFlightOpsGroupsWithMetadata* ops_info) |
267 | 1.16M | REQUIRES(mutex_) { |
268 | 1.16M | if (metadata_.locality != TransactionLocality::LOCAL) { |
269 | 1.16M | return Status::OK(); |
270 | 1.16M | } |
271 | 34 | for (auto& group : ops_info->groups) { |
272 | 31 | auto& first_op = *group.begin; |
273 | 31 | const auto& tablet = first_op.tablet; |
274 | 31 | const auto& tablet_id = tablet->tablet_id(); |
275 | | |
276 | 31 | auto tservers = tablet->GetRemoteTabletServers(internal::IncludeFailedReplicas::kTrue); |
277 | 93 | for (const auto &tserver : tservers) { |
278 | 93 | if (!tserver->IsLocalRegion()) { |
279 | 0 | VLOG_WITH_PREFIX(4) << "Aborting (accessing nonlocal tablet in local transaction)"; |
280 | 0 | return STATUS_FORMAT( |
281 | 0 | IllegalState, "Nonlocal tablet accessed in local transaction: tablet $0", tablet_id); |
282 | 0 | } |
283 | 93 | } |
284 | 31 | } |
285 | 34 | return Status::OK(); |
286 | 34 | } |
287 | | |
288 | | bool Prepare(internal::InFlightOpsGroupsWithMetadata* ops_info, |
289 | | ForceConsistentRead force_consistent_read, |
290 | | CoarseTimePoint deadline, |
291 | | Initial initial, |
292 | 1.56M | Waiter waiter) override EXCLUDES(mutex_) { |
293 | 18.4E | VLOG_WITH_PREFIX(2) << "Prepare(" << force_consistent_read << ", " << initial << ", " |
294 | 18.4E | << AsString(ops_info->groups) << ")"; |
295 | 1.56M | TRACE_TO(trace_, "Preparing $0 ops", AsString(ops_info->groups.size())); |
296 | 1.56M | VTRACE_TO(2, trace_, "Preparing $0 ops", AsString(ops_info->groups)); |
297 | | |
298 | 1.56M | { |
299 | 1.56M | UNIQUE_LOCK(lock, mutex_); |
300 | 1.56M | const bool defer = !ready_; |
301 | | |
302 | 1.56M | if (!defer || initial406k ) { |
303 | 1.16M | Status status = CheckTransactionLocality(ops_info); |
304 | 1.16M | if (!status.ok()) { |
305 | 0 | VLOG_WITH_PREFIX(2) << "Prepare, rejected: " << status; |
306 | 0 | bool abort = false; |
307 | 0 | auto state = state_.load(std::memory_order_acquire); |
308 | 0 | if (state == TransactionState::kRunning) { |
309 | | // State will be changed to aborted in SetErrorUnlocked |
310 | 0 | abort = true; |
311 | 0 | } |
312 | 0 | SetErrorUnlocked(status, "Check transaction locality"); |
313 | 0 | lock.unlock(); |
314 | 0 | if (waiter) { |
315 | 0 | waiter(status); |
316 | 0 | } |
317 | 0 | if (abort) { |
318 | 0 | Abort(TransactionRpcDeadline()); |
319 | 0 | } |
320 | 0 | return false; |
321 | 0 | } |
322 | 1.59M | for (auto& group : ops_info->groups)1.16M { |
323 | 1.59M | auto& first_op = *group.begin; |
324 | 1.59M | const auto should_add_intents = first_op.yb_op->should_add_intents(metadata_.isolation); |
325 | 1.59M | const auto& tablet = first_op.tablet; |
326 | 1.59M | const auto& tablet_id = tablet->tablet_id(); |
327 | | |
328 | 1.59M | bool has_metadata; |
329 | 1.59M | if (initial && should_add_intents1.59M ) { |
330 | 1.00M | auto& tablet_state = tablets_[tablet_id]; |
331 | | // TODO(dtxn) Handle skipped writes, i.e. writes that did not write anything (#3220) |
332 | 1.00M | first_op.batch_idx = tablet_state.num_batches; |
333 | 1.00M | ++tablet_state.num_batches; |
334 | 1.00M | has_metadata = tablet_state.has_metadata; |
335 | 1.00M | } else { |
336 | 587k | const auto it = tablets_.find(tablet_id); |
337 | 587k | has_metadata = it != tablets_.end() && it->second.has_metadata377k ; |
338 | 587k | } |
339 | 1.59M | group.need_metadata = !has_metadata; |
340 | 1.59M | } |
341 | 1.16M | } |
342 | | |
343 | 1.56M | if (defer) { |
344 | 406k | if (waiter) { |
345 | 406k | waiters_.push_back(std::move(waiter)); |
346 | 406k | } |
347 | 406k | lock.unlock(); |
348 | 406k | VLOG_WITH_PREFIX29 (2) << "Prepare, rejected (not ready, requesting status tablet)"29 ; |
349 | 406k | RequestStatusTablet(deadline); |
350 | 406k | return false; |
351 | 406k | } |
352 | | |
353 | | // For serializable isolation we never choose read time, since it always reads latest |
354 | | // snapshot. |
355 | | // For snapshot isolation, if read time was not yet picked, we have to choose it now, if there |
356 | | // multiple tablets that will process first request. |
357 | 1.16M | SetReadTimeIfNeeded(ops_info->groups.size() > 1 || force_consistent_read862k ); |
358 | 1.16M | } |
359 | | |
360 | 0 | ops_info->metadata = { |
361 | 1.16M | .transaction = metadata_, |
362 | 1.16M | .subtransaction = subtransaction_.active() |
363 | 1.16M | ? boost::make_optional(subtransaction_.get())50.8k |
364 | 1.16M | : boost::none1.11M , |
365 | 1.16M | }; |
366 | | |
367 | 1.16M | return true; |
368 | 1.56M | } |
369 | | |
370 | 1.16M | void ExpectOperations(size_t count) EXCLUDES(mutex_) override { |
371 | 1.16M | std::lock_guard<std::mutex> lock(mutex_); |
372 | 1.16M | running_requests_ += count; |
373 | 1.16M | } |
374 | | |
375 | | void Flushed( |
376 | | const internal::InFlightOps& ops, const ReadHybridTime& used_read_time, |
377 | 1.59M | const Status& status) EXCLUDES(mutex_) override { |
378 | 1.59M | TRACE_TO(trace_, "Flushed $0 ops. with Status $1", ops.size(), status.ToString()); |
379 | 18.4E | VLOG_WITH_PREFIX(5) |
380 | 18.4E | << "Flushed: " << yb::ToString(ops) << ", used_read_time: " << used_read_time |
381 | 18.4E | << ", status: " << status; |
382 | 1.59M | if (FLAGS_TEST_transaction_inject_flushed_delay_ms > 0) { |
383 | 40 | std::this_thread::sleep_for(FLAGS_TEST_transaction_inject_flushed_delay_ms * 1ms); |
384 | 40 | } |
385 | | |
386 | 1.59M | boost::optional<Status> notify_commit_status; |
387 | 1.59M | bool abort = false; |
388 | | |
389 | 1.59M | CommitCallback commit_callback; |
390 | 1.59M | { |
391 | 1.59M | std::lock_guard<std::mutex> lock(mutex_); |
392 | 1.59M | running_requests_ -= ops.size(); |
393 | | |
394 | 1.59M | if (status.ok()) { |
395 | 1.41M | if (used_read_time && metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION392k ) { |
396 | 54.0k | const bool read_point_already_set = static_cast<bool>(read_point_.GetReadTime()); |
397 | 54.0k | #ifndef NDEBUG |
398 | 54.0k | if (read_point_already_set) { |
399 | | // Display details of operations before crashing in debug mode. |
400 | 0 | int op_idx = 1; |
401 | 0 | for (const auto& op : ops) { |
402 | 0 | LOG(ERROR) << "Operation " << op_idx << ": " << op.ToString(); |
403 | 0 | op_idx++; |
404 | 0 | } |
405 | 0 | } |
406 | 54.0k | #endif |
407 | 54.0k | LOG_IF_WITH_PREFIX0 (DFATAL, read_point_already_set) |
408 | 0 | << "Read time already picked (" << read_point_.GetReadTime() |
409 | 0 | << ", but server replied with used read time: " << used_read_time; |
410 | 54.0k | read_point_.SetReadTime(used_read_time, ConsistentReadPoint::HybridTimeMap()); |
411 | 54.0k | } |
412 | 1.41M | const std::string* prev_tablet_id = nullptr; |
413 | 9.17M | for (const auto& op : ops) { |
414 | 9.17M | if (op.yb_op->applied() && op.yb_op->should_add_intents(metadata_.isolation)9.16M ) { |
415 | 8.57M | const std::string& tablet_id = op.tablet->tablet_id(); |
416 | 8.57M | if (prev_tablet_id == nullptr || tablet_id != *prev_tablet_id7.76M ) { |
417 | 810k | prev_tablet_id = &tablet_id; |
418 | 810k | tablets_[tablet_id].has_metadata = true; |
419 | 810k | } |
420 | 8.57M | } |
421 | 9.17M | } |
422 | 1.41M | } else { |
423 | 182k | const TransactionError txn_err(status); |
424 | | // We don't abort the txn in case of a kSkipLocking error to make further progress. |
425 | | // READ COMMITTED isolation retries errors of kConflict and kReadRestart by restarting |
426 | | // statements instead of the whole txn and hence should avoid aborting the txn in this case |
427 | | // too. |
428 | 182k | bool avoid_abort = |
429 | 182k | (txn_err.value() == TransactionErrorCode::kSkipLocking) || |
430 | 182k | (metadata_.isolation == IsolationLevel::READ_COMMITTED && |
431 | 182k | (12.0k txn_err.value() == TransactionErrorCode::kReadRestartRequired12.0k || |
432 | 12.0k | txn_err.value() == TransactionErrorCode::kConflict)); |
433 | 182k | if (!avoid_abort) { |
434 | 170k | auto state = state_.load(std::memory_order_acquire); |
435 | 18.4E | VLOG_WITH_PREFIX(4) << "Abort desired, state: " << AsString(state); |
436 | 170k | if (state == TransactionState::kRunning) { |
437 | 122k | abort = true; |
438 | | // State will be changed to aborted in SetError |
439 | 122k | } |
440 | | |
441 | 170k | SetErrorUnlocked(status, "Flush"); |
442 | 170k | } |
443 | 182k | } |
444 | | |
445 | 1.59M | if (running_requests_ == 0 && commit_replicated_1.15M ) { |
446 | 0 | notify_commit_status = status_; |
447 | 0 | commit_callback = std::move(commit_callback_); |
448 | 0 | } |
449 | 1.59M | } |
450 | | |
451 | 1.59M | if (notify_commit_status) { |
452 | 0 | VLOG_WITH_PREFIX(4) << "Sealing done: " << *notify_commit_status; |
453 | 0 | commit_callback(*notify_commit_status); |
454 | 0 | } |
455 | | |
456 | 1.59M | if (abort && !child_122k ) { |
457 | 122k | DoAbort(TransactionRpcDeadline(), transaction_->shared_from_this()); |
458 | 122k | } |
459 | 1.59M | } |
460 | | |
461 | | void Commit(CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) |
462 | 268k | EXCLUDES(mutex_) { |
463 | 268k | auto transaction = transaction_->shared_from_this(); |
464 | 268k | TRACE_TO(trace_, __func__); |
465 | 268k | { |
466 | 268k | UNIQUE_LOCK(lock, mutex_); |
467 | 268k | auto status = CheckCouldCommitUnlocked(seal_only); |
468 | 268k | if (!status.ok()) { |
469 | 142 | lock.unlock(); |
470 | 142 | callback(status); |
471 | 142 | return; |
472 | 142 | } |
473 | 268k | state_.store(seal_only ? TransactionState::kSealed0 : TransactionState::kCommitted, |
474 | 268k | std::memory_order_release); |
475 | 268k | commit_callback_ = std::move(callback); |
476 | 268k | if (!ready_) { |
477 | | // If we have not written any intents and do not even have a transaction status tablet, |
478 | | // just report the transaction as committed. |
479 | | // |
480 | | // See https://github.com/yugabyte/yugabyte-db/issues/3105 for details -- we might be able |
481 | | // to remove this special case if it turns out there is a bug elsewhere. |
482 | 23 | if (tablets_.empty() && running_requests_ == 0) { |
483 | 23 | VLOG_WITH_PREFIX0 (4) << "Committed empty transaction"0 ; |
484 | 23 | auto commit_callback = std::move(commit_callback_); |
485 | 23 | lock.unlock(); |
486 | 23 | commit_callback(Status::OK()); |
487 | 23 | return; |
488 | 23 | } |
489 | | |
490 | 0 | waiters_.emplace_back(std::bind( |
491 | 0 | &Impl::DoCommit, this, deadline, seal_only, _1, transaction)); |
492 | 0 | lock.unlock(); |
493 | 0 | RequestStatusTablet(deadline); |
494 | 0 | return; |
495 | 23 | } |
496 | 268k | } |
497 | 268k | DoCommit(deadline, seal_only, Status::OK(), transaction); |
498 | 268k | } |
499 | | |
500 | 137k | void Abort(CoarseTimePoint deadline) EXCLUDES(mutex_) { |
501 | 137k | auto transaction = transaction_->shared_from_this(); |
502 | | |
503 | 18.4E | VLOG_WITH_PREFIX(2) << "Abort"; |
504 | 137k | TRACE_TO(trace_, __func__); |
505 | 137k | { |
506 | 137k | UNIQUE_LOCK(lock, mutex_); |
507 | 137k | auto state = state_.load(std::memory_order_acquire); |
508 | 137k | if (state != TransactionState::kRunning) { |
509 | 122k | if (state != TransactionState::kAborted) { |
510 | 0 | LOG_WITH_PREFIX(DFATAL) |
511 | 0 | << "Abort of committed transaction: " << AsString(state); |
512 | 122k | } else { |
513 | 18.4E | VLOG_WITH_PREFIX(2) << "Already aborted"; |
514 | 122k | } |
515 | 122k | return; |
516 | 122k | } |
517 | 15.6k | if (child_) { |
518 | 0 | LOG_WITH_PREFIX(DFATAL) << "Abort of child transaction"; |
519 | 0 | return; |
520 | 0 | } |
521 | 15.6k | state_.store(TransactionState::kAborted, std::memory_order_release); |
522 | 15.6k | if (!ready_) { |
523 | 54 | std::vector<Waiter> waiters; |
524 | 54 | waiters_.swap(waiters); |
525 | 54 | lock.unlock(); |
526 | 54 | const auto aborted_status = STATUS(Aborted, "Transaction aborted"); |
527 | 54 | for(const auto& waiter : waiters) { |
528 | 54 | waiter(aborted_status); |
529 | 54 | } |
530 | 54 | VLOG_WITH_PREFIX1 (2) << "Aborted transaction not yet ready"1 ; |
531 | 54 | return; |
532 | 54 | } |
533 | 15.6k | } |
534 | 15.6k | DoAbort(deadline, transaction); |
535 | 15.6k | } |
536 | | |
537 | 379k | bool IsRestartRequired() const { |
538 | 379k | return read_point_.IsRestartRequired(); |
539 | 379k | } |
540 | | |
541 | | std::shared_future<Result<TransactionMetadata>> GetMetadata( |
542 | 18.8k | CoarseTimePoint deadline) EXCLUDES(mutex_) { |
543 | 18.8k | UNIQUE_LOCK(lock, mutex_); |
544 | 18.8k | if (metadata_future_.valid()) { |
545 | 0 | return metadata_future_; |
546 | 0 | } |
547 | 18.8k | metadata_future_ = std::shared_future<Result<TransactionMetadata>>( |
548 | 18.8k | metadata_promise_.get_future()); |
549 | 18.8k | if (!ready_) { |
550 | 1.03k | auto transaction = transaction_->shared_from_this(); |
551 | 1.03k | waiters_.push_back([this, transaction](const Status& status) { |
552 | 1.03k | WARN_NOT_OK(status, "Transaction request failed"); |
553 | 1.03k | if (status.ok()) { |
554 | 1.03k | metadata_promise_.set_value(metadata_); |
555 | 1.03k | } else { |
556 | 1 | metadata_promise_.set_value(status); |
557 | 1 | } |
558 | 1.03k | }); |
559 | 1.03k | lock.unlock(); |
560 | 1.03k | RequestStatusTablet(deadline); |
561 | 1.03k | lock.lock(); |
562 | 1.03k | return metadata_future_; |
563 | 1.03k | } |
564 | | |
565 | 17.7k | metadata_promise_.set_value(metadata_); |
566 | 17.7k | return metadata_future_; |
567 | 18.8k | } |
568 | | |
569 | | void PrepareChild( |
570 | | ForceConsistentRead force_consistent_read, CoarseTimePoint deadline, |
571 | 66.8k | PrepareChildCallback callback) { |
572 | 66.8k | auto transaction = transaction_->shared_from_this(); |
573 | 66.8k | TRACE_TO(trace_, __func__); |
574 | 66.8k | UNIQUE_LOCK(lock, mutex_); |
575 | 66.8k | auto status = CheckRunningUnlocked(); |
576 | 66.8k | if (!status.ok()) { |
577 | 0 | lock.unlock(); |
578 | 0 | callback(status); |
579 | 0 | return; |
580 | 0 | } |
581 | 66.8k | if (IsRestartRequired()) { |
582 | 0 | lock.unlock(); |
583 | 0 | callback(STATUS(IllegalState, "Restart required")); |
584 | 0 | return; |
585 | 0 | } |
586 | | |
587 | 66.8k | SetReadTimeIfNeeded(force_consistent_read); |
588 | | |
589 | 66.8k | if (!ready_) { |
590 | 3.23k | waiters_.emplace_back(std::bind( |
591 | 3.23k | &Impl::DoPrepareChild, this, _1, transaction, std::move(callback))); |
592 | 3.23k | lock.unlock(); |
593 | 3.23k | RequestStatusTablet(deadline); |
594 | 3.23k | return; |
595 | 3.23k | } |
596 | | |
597 | 63.6k | ChildTransactionDataPB child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction); |
598 | 63.6k | lock.unlock(); |
599 | 63.6k | callback(child_txn_data_pb); |
600 | 63.6k | } |
601 | | |
602 | 13.7k | Result<ChildTransactionResultPB> FinishChild() { |
603 | 13.7k | TRACE_TO(trace_, __func__); |
604 | 13.7k | UNIQUE_LOCK(lock, mutex_); |
605 | 13.7k | RETURN_NOT_OK(CheckRunningUnlocked()); |
606 | 13.7k | if (!child_) { |
607 | 0 | return STATUS(IllegalState, "Finish child of non child transaction"); |
608 | 0 | } |
609 | 13.7k | state_.store(TransactionState::kCommitted, std::memory_order_release); |
610 | 13.7k | ChildTransactionResultPB result; |
611 | 13.7k | auto& tablets = *result.mutable_tablets(); |
612 | 13.7k | tablets.Reserve(narrow_cast<int>(tablets_.size())); |
613 | 32.9k | for (const auto& tablet : tablets_) { |
614 | 32.9k | auto& out = *tablets.Add(); |
615 | 32.9k | out.set_tablet_id(tablet.first); |
616 | 32.9k | out.set_num_batches(tablet.second.num_batches); |
617 | 32.9k | out.set_metadata_state( |
618 | 32.9k | tablet.second.has_metadata ? InvolvedTabletMetadataState::EXIST32.6k |
619 | 32.9k | : InvolvedTabletMetadataState::MISSING331 ); |
620 | 32.9k | } |
621 | 13.7k | read_point_.FinishChildTransactionResult(HadReadTime(child_had_read_time_), &result); |
622 | 13.7k | return result; |
623 | 13.7k | } |
624 | | |
625 | 13.4k | Status ApplyChildResult(const ChildTransactionResultPB& result) EXCLUDES(mutex_) { |
626 | 13.4k | TRACE_TO(trace_, __func__); |
627 | 13.4k | std::vector<std::string> cleanup_tablet_ids; |
628 | 13.4k | auto se = ScopeExit([this, &cleanup_tablet_ids] { |
629 | 13.4k | if (cleanup_tablet_ids.empty()) { |
630 | 13.4k | return; |
631 | 13.4k | } |
632 | 0 | CleanupTransaction( |
633 | 0 | manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse, |
634 | 0 | CleanupType::kImmediate, cleanup_tablet_ids); |
635 | 0 | }); |
636 | 13.4k | UNIQUE_LOCK(lock, mutex_); |
637 | 13.4k | if (state_.load(std::memory_order_acquire) == TransactionState::kAborted) { |
638 | 0 | cleanup_tablet_ids.reserve(result.tablets().size()); |
639 | 0 | for (const auto& tablet : result.tablets()) { |
640 | 0 | cleanup_tablet_ids.push_back(tablet.tablet_id()); |
641 | 0 | } |
642 | 0 | } |
643 | | |
644 | 13.4k | RETURN_NOT_OK(CheckRunningUnlocked()); |
645 | 13.4k | if (child_) { |
646 | 0 | return STATUS(IllegalState, "Apply child result of child transaction"); |
647 | 0 | } |
648 | | |
649 | 32.6k | for (const auto& tablet : result.tablets())13.4k { |
650 | 32.6k | auto& tablet_state = tablets_[tablet.tablet_id()]; |
651 | 32.6k | tablet_state.num_batches += tablet.num_batches(); |
652 | 32.6k | tablet_state.has_metadata = |
653 | 32.6k | tablet_state.has_metadata || |
654 | 32.6k | tablet.metadata_state() == InvolvedTabletMetadataState::EXIST32.4k ; |
655 | 32.6k | } |
656 | 13.4k | read_point_.ApplyChildTransactionResult(result); |
657 | | |
658 | 13.4k | return Status::OK(); |
659 | 13.4k | } |
660 | | |
661 | 1.18k | const std::string& LogPrefix() { |
662 | 1.18k | return log_prefix_; |
663 | 1.18k | } |
664 | | |
665 | 0 | std::string ToString() EXCLUDES(mutex_) { |
666 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
667 | 0 | return Format("{ metadata: $0 state: $1 }", metadata_, state_.load(std::memory_order_acquire)); |
668 | 0 | } |
669 | | |
670 | 13 | const TransactionId& id() const { |
671 | 13 | return metadata_.transaction_id; |
672 | 13 | } |
673 | | |
674 | 1.88M | ConsistentReadPoint& read_point() { |
675 | 1.88M | return read_point_; |
676 | 1.88M | } |
677 | | |
678 | 0 | Result<TransactionMetadata> Release() { |
679 | 0 | UNIQUE_LOCK(lock, mutex_); |
680 | 0 | auto state = state_.load(std::memory_order_acquire); |
681 | 0 | if (state != TransactionState::kRunning) { |
682 | 0 | return STATUS_FORMAT(IllegalState, "Attempt to release transaction in the wrong state $0: $1", |
683 | 0 | metadata_.transaction_id, AsString(state)); |
684 | 0 | } |
685 | 0 | state_.store(TransactionState::kReleased, std::memory_order_release); |
686 | |
|
687 | 0 | if (!ready_) { |
688 | 0 | CountDownLatch latch(1); |
689 | 0 | Status pick_status; |
690 | 0 | auto transaction = transaction_->shared_from_this(); |
691 | 0 | waiters_.push_back([&latch, &pick_status](const Status& status) { |
692 | 0 | pick_status = status; |
693 | 0 | latch.CountDown(); |
694 | 0 | }); |
695 | 0 | lock.unlock(); |
696 | 0 | RequestStatusTablet(TransactionRpcDeadline()); |
697 | 0 | latch.Wait(); |
698 | 0 | RETURN_NOT_OK(pick_status); |
699 | 0 | lock.lock(); |
700 | 0 | } |
701 | 0 | return metadata_; |
702 | 0 | } |
703 | | |
704 | 0 | void StartHeartbeat() { |
705 | 0 | VLOG_WITH_PREFIX(2) << __PRETTY_FUNCTION__; |
706 | 0 | RequestStatusTablet(TransactionRpcDeadline()); |
707 | 0 | } |
708 | | |
709 | 61.7k | void SetActiveSubTransaction(SubTransactionId id) { |
710 | 61.7k | return subtransaction_.SetActiveSubTransaction(id); |
711 | 61.7k | } |
712 | | |
713 | 13.5k | CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) { |
714 | 13.5k | SCHECK( |
715 | 13.5k | subtransaction_.active(), InternalError, |
716 | 13.5k | "Attempted to rollback to savepoint before creating any savepoints."); |
717 | 13.5k | return subtransaction_.RollbackSubTransaction(id); |
718 | 13.5k | } |
719 | | |
720 | 0 | bool HasSubTransactionState() { |
721 | 0 | return subtransaction_.active(); |
722 | 0 | } |
723 | | |
724 | | private: |
725 | 425k | void CompleteConstruction() { |
726 | 425k | log_prefix_ = Format("$0$1: ", metadata_.transaction_id, child_ ? " (CHILD)"13.8k : ""411k ); |
727 | 425k | heartbeat_handle_ = manager_->rpcs().InvalidHandle(); |
728 | 425k | commit_handle_ = manager_->rpcs().InvalidHandle(); |
729 | 425k | abort_handle_ = manager_->rpcs().InvalidHandle(); |
730 | 425k | } |
731 | | |
732 | 406k | void CompleteInit(IsolationLevel isolation) { |
733 | 406k | metadata_.isolation = isolation; |
734 | | // TODO(Piyush): read_point_ might not represent the correct start time for |
735 | | // a READ COMMITTED txn since it might have been updated several times |
736 | | // before a YBTransaction is created. Fix this. |
737 | 406k | if (read_point_.GetReadTime()) { |
738 | 13.4k | metadata_.start_time = read_point_.GetReadTime().read; |
739 | 392k | } else { |
740 | 392k | metadata_.start_time = read_point_.Now(); |
741 | 392k | } |
742 | 406k | } |
743 | | |
744 | 1.22M | void SetReadTimeIfNeeded(bool do_it) { |
745 | 1.22M | if (!read_point_.GetReadTime() && do_it423k && |
746 | 1.22M | (369k metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION369k || |
747 | 369k | metadata_.isolation == IsolationLevel::READ_COMMITTED204k )) { |
748 | 171k | read_point_.SetCurrentReadTime(); |
749 | 171k | } |
750 | 1.22M | } |
751 | | |
752 | 362k | CHECKED_STATUS CheckRunningUnlocked() REQUIRES(mutex_) { |
753 | 362k | if (state_.load(std::memory_order_acquire) != TransactionState::kRunning) { |
754 | 71 | auto status = status_; |
755 | 71 | if (status.ok()) { |
756 | 0 | status = STATUS(IllegalState, "Transaction already completed"); |
757 | 0 | } |
758 | 71 | return status; |
759 | 71 | } |
760 | 362k | return Status::OK(); |
761 | 362k | } |
762 | | |
763 | | void DoCommit( |
764 | | CoarseTimePoint deadline, SealOnly seal_only, const Status& status, |
765 | 268k | const YBTransactionPtr& transaction) EXCLUDES(mutex_) { |
766 | 268k | VLOG_WITH_PREFIX25 (1) |
767 | 25 | << Format("Commit, seal_only: $0, tablets: $1, status: $2", |
768 | 25 | seal_only, tablets_, status); |
769 | | |
770 | 268k | UNIQUE_LOCK(lock, mutex_); |
771 | 268k | if (!status.ok()) { |
772 | 0 | VLOG_WITH_PREFIX(4) << "Commit failed: " << status; |
773 | 0 | auto commit_callback = std::move(commit_callback_); |
774 | 0 | lock.unlock(); |
775 | 0 | commit_callback(status); |
776 | 0 | return; |
777 | 0 | } |
778 | | |
779 | 268k | tserver::UpdateTransactionRequestPB req; |
780 | 268k | req.set_tablet_id(status_tablet_->tablet_id()); |
781 | 268k | req.set_propagated_hybrid_time(manager_->Now().ToUint64()); |
782 | 268k | auto& state = *req.mutable_state(); |
783 | 268k | state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
784 | | // TODO(savepoints) -- Attach metadata about aborted subtransactions to commit message. |
785 | 268k | state.set_status(seal_only ? TransactionStatus::SEALED0 : TransactionStatus::COMMITTED); |
786 | 268k | state.mutable_tablets()->Reserve(narrow_cast<int>(tablets_.size())); |
787 | 519k | for (const auto& tablet : tablets_) { |
788 | | // If tablet does not have metadata it should not participate in commit. |
789 | 519k | if (!seal_only519k && !tablet.second.has_metadata) { |
790 | 45 | continue; |
791 | 45 | } |
792 | 519k | state.add_tablets(tablet.first); |
793 | 519k | if (seal_only) { |
794 | 0 | state.add_tablet_batches(tablet.second.num_batches); |
795 | 0 | } |
796 | 519k | } |
797 | | |
798 | | // If we don't have any tablets that have intents written to them, just abort it. |
799 | | // But notify caller that commit was successful, so it is transparent for him. |
800 | 268k | if (state.tablets().empty()) { |
801 | 5.87k | VLOG_WITH_PREFIX0 (4) << "Committed empty"0 ; |
802 | 5.87k | auto status_tablet = status_tablet_; |
803 | 5.87k | auto commit_callback = std::move(commit_callback_); |
804 | 5.87k | lock.unlock(); |
805 | 5.87k | DoAbort(deadline, transaction, status_tablet); |
806 | 5.87k | commit_callback(Status::OK()); |
807 | 5.87k | return; |
808 | 5.87k | } |
809 | | |
810 | 262k | if (subtransaction_.active()) { |
811 | 1.54k | subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set()); |
812 | 1.54k | } |
813 | | |
814 | 262k | manager_->rpcs().RegisterAndStart( |
815 | 262k | UpdateTransaction( |
816 | 262k | deadline, |
817 | 262k | status_tablet_.get(), |
818 | 262k | manager_->client(), |
819 | 262k | &req, |
820 | 262k | [this, transaction](const auto& status, const auto& req, const auto& resp) { |
821 | 262k | this->CommitDone(status, resp, transaction); |
822 | 262k | }), |
823 | 262k | &commit_handle_); |
824 | 262k | } |
825 | | |
826 | 137k | void DoAbort(CoarseTimePoint deadline, const YBTransactionPtr& transaction) EXCLUDES(mutex_) { |
827 | 137k | decltype(status_tablet_) status_tablet; |
828 | 137k | { |
829 | 137k | std::lock_guard<std::mutex> lock(mutex_); |
830 | 137k | status_tablet = status_tablet_; |
831 | 137k | } |
832 | 137k | DoAbort(deadline, transaction, status_tablet); |
833 | 137k | } |
834 | | |
835 | | void DoAbort( |
836 | | CoarseTimePoint deadline, |
837 | | const YBTransactionPtr& transaction, |
838 | 143k | internal::RemoteTabletPtr status_tablet) EXCLUDES(mutex_) { |
839 | 143k | tserver::AbortTransactionRequestPB req; |
840 | 143k | req.set_tablet_id(status_tablet->tablet_id()); |
841 | 143k | req.set_propagated_hybrid_time(manager_->Now().ToUint64()); |
842 | 143k | req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
843 | | |
844 | 143k | manager_->rpcs().RegisterAndStart( |
845 | 143k | AbortTransaction( |
846 | 143k | deadline, |
847 | 143k | status_tablet.get(), |
848 | 143k | manager_->client(), |
849 | 143k | &req, |
850 | 143k | std::bind(&Impl::AbortDone, this, _1, _2, transaction)), |
851 | 143k | &abort_handle_); |
852 | | |
853 | 143k | DoAbortCleanup(transaction, CleanupType::kImmediate); |
854 | 143k | } |
855 | | |
856 | | void DoAbortCleanup(const YBTransactionPtr& transaction, CleanupType cleanup_type) |
857 | 208k | EXCLUDES(mutex_) { |
858 | 208k | if (FLAGS_TEST_disable_proactive_txn_cleanup_on_abort) { |
859 | 0 | VLOG_WITH_PREFIX(1) << "TEST: Disabled proactive transaction cleanup on abort"; |
860 | 0 | return; |
861 | 0 | } |
862 | | |
863 | 208k | std::vector<std::string> tablet_ids; |
864 | 208k | { |
865 | 208k | std::lock_guard<std::mutex> lock(mutex_); |
866 | 208k | tablet_ids.reserve(tablets_.size()); |
867 | 309k | for (const auto& tablet : tablets_) { |
868 | | // We don't check has_metadata here, because intents could be written even in case of |
869 | | // failure. For instance in case of conflict on unique index. |
870 | 309k | tablet_ids.push_back(tablet.first); |
871 | 309k | } |
872 | 208k | VLOG_WITH_PREFIX122 (1) << "Cleaning up intents from: " << AsString(tablet_ids)122 ; |
873 | 208k | } |
874 | | |
875 | 208k | CleanupTransaction( |
876 | 208k | manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse, |
877 | 208k | cleanup_type, tablet_ids); |
878 | 208k | } |
879 | | |
880 | | void CommitDone(const Status& status, |
881 | | const tserver::UpdateTransactionResponsePB& response, |
882 | 262k | const YBTransactionPtr& transaction) { |
883 | 262k | TRACE_TO(trace_, __func__); |
884 | 18.4E | VLOG_WITH_PREFIX(1) << "Committed: " << status; |
885 | | |
886 | 262k | UpdateClock(response, manager_); |
887 | 262k | manager_->rpcs().Unregister(&commit_handle_); |
888 | | |
889 | 262k | Status actual_status = status.IsAlreadyPresent() ? Status::OK()0 : status; |
890 | 262k | CommitCallback commit_callback; |
891 | 262k | if (state_.load(std::memory_order_acquire) != TransactionState::kCommitted && |
892 | 262k | actual_status.ok()3 ) { |
893 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
894 | 0 | commit_replicated_ = true; |
895 | 0 | if (running_requests_ != 0) { |
896 | 0 | return; |
897 | 0 | } |
898 | 0 | commit_callback = std::move(commit_callback_); |
899 | 262k | } else { |
900 | 262k | std::lock_guard<std::mutex> lock(mutex_); |
901 | 262k | commit_callback = std::move(commit_callback_); |
902 | 262k | } |
903 | 18.4E | VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status; |
904 | 262k | commit_callback(actual_status); |
905 | | |
906 | 262k | if (actual_status.IsExpired()) { |
907 | | // We can't perform immediate cleanup here because the transaction could be committed, |
908 | | // its APPLY records replicated in all participant tablets, and its status record removed |
909 | | // from the status tablet. |
910 | 64.6k | DoAbortCleanup(transaction, CleanupType::kGraceful); |
911 | 64.6k | } |
912 | 262k | } |
913 | | |
914 | | void AbortDone(const Status& status, |
915 | | const tserver::AbortTransactionResponsePB& response, |
916 | 141k | const YBTransactionPtr& transaction) { |
917 | 141k | TRACE_TO(trace_, __func__); |
918 | 141k | VLOG_WITH_PREFIX22 (1) << "Aborted: " << status22 ; |
919 | | |
920 | 141k | if (response.has_propagated_hybrid_time()) { |
921 | 141k | manager_->UpdateClock(HybridTime(response.propagated_hybrid_time())); |
922 | 141k | } |
923 | 141k | manager_->rpcs().Unregister(&abort_handle_); |
924 | 141k | } |
925 | | |
926 | 411k | void RequestStatusTablet(const CoarseTimePoint& deadline) EXCLUDES(mutex_) { |
927 | 411k | TRACE_TO(trace_, __func__); |
928 | 411k | bool expected = false; |
929 | 411k | if (!requested_status_tablet_.compare_exchange_strong( |
930 | 411k | expected, true, std::memory_order_acq_rel)) { |
931 | 0 | return; |
932 | 0 | } |
933 | 18.4E | VLOG_WITH_PREFIX(2) << "RequestStatusTablet()"; |
934 | 411k | auto transaction = transaction_->shared_from_this(); |
935 | 411k | if (metadata_.status_tablet.empty()411k ) { |
936 | 411k | manager_->PickStatusTablet( |
937 | 411k | std::bind(&Impl::StatusTabletPicked, this, _1, deadline, transaction), |
938 | 411k | metadata_.locality); |
939 | 18.4E | } else { |
940 | 18.4E | LookupStatusTablet(metadata_.status_tablet, deadline, transaction); |
941 | 18.4E | } |
942 | 411k | } |
943 | | |
944 | | void StatusTabletPicked(const Result<std::string>& tablet, |
945 | | const CoarseTimePoint& deadline, |
946 | 410k | const YBTransactionPtr& transaction) { |
947 | 410k | TRACE_TO(trace_, __func__); |
948 | 18.4E | VLOG_WITH_PREFIX(2) << "Picked status tablet: " << tablet; |
949 | | |
950 | 410k | if (!tablet.ok()) { |
951 | 0 | NotifyWaiters(tablet.status(), "Pick status tablet"); |
952 | 0 | return; |
953 | 0 | } |
954 | | |
955 | 410k | LookupStatusTablet(*tablet, deadline, transaction); |
956 | 410k | } |
957 | | |
958 | | void LookupStatusTablet(const std::string& tablet_id, |
959 | | const CoarseTimePoint& deadline, |
960 | 410k | const YBTransactionPtr& transaction) { |
961 | 410k | TRACE_TO(trace_, __func__); |
962 | 410k | manager_->client()->LookupTabletById( |
963 | 410k | tablet_id, |
964 | 410k | /* table =*/ nullptr, |
965 | 410k | master::IncludeInactive::kFalse, |
966 | 410k | deadline, |
967 | 410k | std::bind(&Impl::LookupTabletDone, this, _1, transaction), |
968 | 410k | client::UseCache::kTrue); |
969 | 410k | } |
970 | | |
971 | | void LookupTabletDone(const Result<client::internal::RemoteTabletPtr>& result, |
972 | 410k | const YBTransactionPtr& transaction) { |
973 | 410k | TRACE_TO(trace_, __func__); |
974 | 18.4E | VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result); |
975 | | |
976 | 410k | if (!result.ok()) { |
977 | 3 | NotifyWaiters(result.status(), "Lookup tablet"); |
978 | 3 | return; |
979 | 3 | } |
980 | | |
981 | 410k | bool precreated; |
982 | 410k | std::vector<Waiter> waiters; |
983 | 410k | { |
984 | 410k | std::lock_guard<std::mutex> lock(mutex_); |
985 | 410k | status_tablet_ = std::move(*result); |
986 | 410k | if (metadata_.status_tablet.empty()410k ) { |
987 | 410k | metadata_.status_tablet = status_tablet_->tablet_id(); |
988 | 410k | precreated = false; |
989 | 18.4E | } else { |
990 | 18.4E | precreated = true; |
991 | 18.4E | ready_ = true; |
992 | 18.4E | waiters_.swap(waiters); |
993 | 18.4E | } |
994 | 410k | } |
995 | 410k | if (precreated) { |
996 | 0 | for (const auto& waiter : waiters) { |
997 | 0 | waiter(Status::OK()); |
998 | 0 | } |
999 | 0 | } |
1000 | 410k | SendHeartbeat(precreated ? TransactionStatus::PENDING0 : TransactionStatus::CREATED, |
1001 | 410k | metadata_.transaction_id, transaction_->shared_from_this()); |
1002 | 410k | } |
1003 | | |
1004 | 409k | void NotifyWaiters(const Status& status, const char* operation) { |
1005 | 409k | std::vector<Waiter> waiters; |
1006 | 409k | { |
1007 | 409k | std::lock_guard<std::mutex> lock(mutex_); |
1008 | 409k | if (status.ok()) { |
1009 | 409k | DCHECK(!ready_); |
1010 | 409k | ready_ = true; |
1011 | 409k | } else { |
1012 | 19 | SetErrorUnlocked(status, operation); |
1013 | 19 | } |
1014 | 409k | waiters_.swap(waiters); |
1015 | 409k | } |
1016 | 409k | for (const auto& waiter : waiters) { |
1017 | 409k | waiter(status); |
1018 | 409k | } |
1019 | 409k | } |
1020 | | |
1021 | | void SendHeartbeat(TransactionStatus status, |
1022 | | const TransactionId& id, |
1023 | 867k | const std::weak_ptr<YBTransaction>& weak_transaction) { |
1024 | 867k | auto transaction = weak_transaction.lock(); |
1025 | 867k | if (!transaction) { |
1026 | | // Cannot use LOG_WITH_PREFIX here, since this was actually destroyed. |
1027 | 387k | VLOG(1) << id << ": Transaction destroyed"179 ; |
1028 | 387k | return; |
1029 | 387k | } |
1030 | | |
1031 | 479k | auto current_state = state_.load(std::memory_order_acquire); |
1032 | | |
1033 | 479k | if (!AllowHeartbeat(current_state, status)) { |
1034 | 15.3k | VLOG_WITH_PREFIX4 (1) << " Send heartbeat cancelled: " << yb::ToString(transaction)4 ; |
1035 | 15.3k | return; |
1036 | 15.3k | } |
1037 | | |
1038 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << "(" << TransactionStatus_Name(status) << ")"; |
1039 | | |
1040 | 464k | MonoDelta timeout; |
1041 | 464k | if (status != TransactionStatus::CREATED) { |
1042 | 54.3k | if (GetAtomicFlag(&FLAGS_transaction_disable_heartbeat_in_tests)) { |
1043 | 0 | HeartbeatDone(Status::OK(), /* request= */ {}, /* response= */ {}, status, transaction); |
1044 | 0 | return; |
1045 | 0 | } |
1046 | 54.3k | timeout = std::chrono::microseconds(FLAGS_transaction_heartbeat_usec); |
1047 | 410k | } else { |
1048 | 410k | timeout = TransactionRpcTimeout(); |
1049 | 410k | } |
1050 | | |
1051 | 464k | tserver::UpdateTransactionRequestPB req; |
1052 | | |
1053 | 464k | internal::RemoteTabletPtr status_tablet; |
1054 | 464k | { |
1055 | 464k | std::lock_guard<std::mutex> lock(mutex_); |
1056 | 464k | status_tablet = status_tablet_; |
1057 | 464k | } |
1058 | | |
1059 | 464k | req.set_tablet_id(status_tablet->tablet_id()); |
1060 | 464k | req.set_propagated_hybrid_time(manager_->Now().ToUint64()); |
1061 | 464k | auto& state = *req.mutable_state(); |
1062 | | // TODO(savepoints) -- Attach metadata about aborted subtransactions in heartbeat. |
1063 | 464k | state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); |
1064 | 464k | state.set_status(status); |
1065 | 464k | manager_->rpcs().RegisterAndStart( |
1066 | 464k | UpdateTransaction( |
1067 | 464k | CoarseMonoClock::now() + timeout, |
1068 | 464k | status_tablet.get(), |
1069 | 464k | manager_->client(), |
1070 | 464k | &req, |
1071 | 464k | std::bind(&Impl::HeartbeatDone, this, _1, _2, _3, status, transaction)), |
1072 | 464k | &heartbeat_handle_); |
1073 | 464k | } |
1074 | | |
1075 | 480k | static bool AllowHeartbeat(TransactionState current_state, TransactionStatus status) { |
1076 | 480k | switch (current_state) { |
1077 | 464k | case TransactionState::kRunning: |
1078 | 464k | return true; |
1079 | 0 | case TransactionState::kReleased: FALLTHROUGH_INTENDED; |
1080 | 0 | case TransactionState::kSealed: |
1081 | 0 | return status == TransactionStatus::CREATED; |
1082 | 15.1k | case TransactionState::kAborted: FALLTHROUGH_INTENDED; |
1083 | 15.3k | case TransactionState::kCommitted: |
1084 | 15.3k | return false; |
1085 | 480k | } |
1086 | 0 | FATAL_INVALID_ENUM_VALUE(TransactionState, current_state); |
1087 | 0 | } |
1088 | | |
1089 | | void HeartbeatDone(Status status, |
1090 | | const tserver::UpdateTransactionRequestPB& request, |
1091 | | const tserver::UpdateTransactionResponsePB& response, |
1092 | | TransactionStatus transaction_status, |
1093 | 463k | const YBTransactionPtr& transaction) { |
1094 | 463k | UpdateClock(response, manager_); |
1095 | 463k | manager_->rpcs().Unregister(&heartbeat_handle_); |
1096 | | |
1097 | 463k | if (status.ok() && transaction_status == TransactionStatus::CREATED462k ) { |
1098 | 409k | auto decode_result = FullyDecodeTransactionId(request.state().transaction_id()); |
1099 | 409k | if (decode_result.ok()) { |
1100 | 409k | metadata_.transaction_id = *decode_result; |
1101 | 409k | auto id_str = AsString(metadata_.transaction_id); |
1102 | | // It is not fully thread safe, since we don't use mutex to access log_prefix_. |
1103 | | // But here we just replace characters inplace. |
1104 | | // It would not crash anyway, and could produce wrong id in the logs. |
1105 | | // It is ok, since one moment before we would output nil id. |
1106 | 409k | log_prefix_.replace(0, id_str.length(), id_str); |
1107 | 409k | } else { |
1108 | 3 | status = decode_result.status(); |
1109 | 3 | } |
1110 | 409k | } |
1111 | | |
1112 | 18.4E | VLOG_WITH_PREFIX(4) << __func__ << "(" << status << ", " |
1113 | 18.4E | << TransactionStatus_Name(transaction_status) << ")"; |
1114 | | |
1115 | 463k | if (status.ok()) { |
1116 | 462k | if (transaction_status == TransactionStatus::CREATED) { |
1117 | 409k | NotifyWaiters(Status::OK(), "Heartbeat"); |
1118 | 409k | } |
1119 | 462k | std::weak_ptr<YBTransaction> weak_transaction(transaction); |
1120 | 462k | manager_->client()->messenger()->scheduler().Schedule( |
1121 | 462k | [this, weak_transaction, id = metadata_.transaction_id](const Status&) { |
1122 | 456k | SendHeartbeat(TransactionStatus::PENDING, id, weak_transaction); |
1123 | 456k | }, |
1124 | 462k | std::chrono::microseconds(FLAGS_transaction_heartbeat_usec)); |
1125 | 462k | } else { |
1126 | 886 | auto state = state_.load(std::memory_order_acquire); |
1127 | 886 | LOG_WITH_PREFIX(WARNING) << "Send heartbeat failed: " << status << ", state: " << state; |
1128 | 1.18k | if (status.IsAborted()886 || status.IsExpired()) { |
1129 | | // IsAborted - Service is shutting down, no reason to retry. |
1130 | | // IsExpired - Transaction expired. |
1131 | 78 | if (transaction_status == TransactionStatus::CREATED) { |
1132 | 0 | NotifyWaiters(status, "Heartbeat"); |
1133 | 78 | } else { |
1134 | 78 | SetError(status, "Heartbeat"); |
1135 | 78 | } |
1136 | | // If state is committed, then we should not cleanup. |
1137 | 78 | if (status.IsExpired() && state == TransactionState::kRunning) { |
1138 | 52 | DoAbortCleanup(transaction, CleanupType::kImmediate); |
1139 | 52 | } |
1140 | 78 | return; |
1141 | 78 | } |
1142 | | // Other errors could have different causes, but we should just retry sending heartbeat |
1143 | | // in this case. |
1144 | 808 | SendHeartbeat(transaction_status, metadata_.transaction_id, transaction); |
1145 | 808 | } |
1146 | 463k | } |
1147 | | |
1148 | 78 | void SetError(const Status& status, const char* operation) EXCLUDES(mutex_) { |
1149 | 78 | std::lock_guard<std::mutex> lock(mutex_); |
1150 | 78 | SetErrorUnlocked(status, operation); |
1151 | 78 | } |
1152 | | |
1153 | 170k | void SetErrorUnlocked(const Status& status, const char* operation) REQUIRES(mutex_) { |
1154 | 18.4E | VLOG_WITH_PREFIX(1) << operation << " failed: " << status; |
1155 | 170k | if (status_.ok()) { |
1156 | 122k | status_ = status.CloneAndPrepend(operation); |
1157 | 122k | state_.store(TransactionState::kAborted, std::memory_order_release); |
1158 | 122k | } |
1159 | 170k | } |
1160 | | |
1161 | | ChildTransactionDataPB PrepareChildTransactionDataUnlocked( |
1162 | 66.7k | const YBTransactionPtr& transaction) REQUIRES(mutex_) { |
1163 | 66.7k | ChildTransactionDataPB data; |
1164 | 66.7k | metadata_.ToPB(data.mutable_metadata()); |
1165 | 66.7k | read_point_.PrepareChildTransactionData(&data); |
1166 | 66.7k | return data; |
1167 | 66.7k | } |
1168 | | |
1169 | | void DoPrepareChild(const Status& status, |
1170 | | const YBTransactionPtr& transaction, |
1171 | 3.16k | PrepareChildCallback callback) EXCLUDES(mutex_) { |
1172 | 3.16k | TRACE_TO(trace_, __func__); |
1173 | 3.16k | if (!status.ok()) { |
1174 | 54 | callback(status); |
1175 | 54 | return; |
1176 | 54 | } |
1177 | | |
1178 | 3.11k | ChildTransactionDataPB child_txn_data_pb; |
1179 | 3.11k | { |
1180 | 3.11k | std::lock_guard<std::mutex> lock(mutex_); |
1181 | 3.11k | child_txn_data_pb = PrepareChildTransactionDataUnlocked(transaction); |
1182 | 3.11k | } |
1183 | | |
1184 | 3.11k | callback(child_txn_data_pb); |
1185 | 3.11k | } |
1186 | | |
1187 | 269k | CHECKED_STATUS CheckCouldCommitUnlocked(SealOnly seal_only) REQUIRES(mutex_) { |
1188 | 269k | RETURN_NOT_OK(CheckRunningUnlocked()); |
1189 | 268k | if (child_) { |
1190 | 0 | return STATUS(IllegalState, "Commit of child transaction is not allowed"); |
1191 | 0 | } |
1192 | 268k | if (IsRestartRequired()) { |
1193 | 71 | return STATUS( |
1194 | 71 | IllegalState, "Commit of transaction that requires restart is not allowed"); |
1195 | 71 | } |
1196 | 268k | if (268k !seal_only268k && running_requests_ > 0) { |
1197 | 0 | return STATUS(IllegalState, "Commit of transaction with running requests"); |
1198 | 0 | } |
1199 | | |
1200 | 268k | return Status::OK(); |
1201 | 268k | } |
1202 | | |
1203 | | // The trace buffer. |
1204 | | scoped_refptr<Trace> trace_; |
1205 | | |
1206 | | const CoarseTimePoint start_; |
1207 | | |
1208 | | // Manager is created once per service. |
1209 | | TransactionManager* const manager_; |
1210 | | |
1211 | | // Transaction related to this impl. |
1212 | | YBTransaction* const transaction_; |
1213 | | |
1214 | | TransactionMetadata metadata_; |
1215 | | ConsistentReadPoint read_point_; |
1216 | | |
1217 | | // Metadata tracking savepoint-related state for the scope of this transaction. |
1218 | | YBSubTransaction subtransaction_; |
1219 | | |
1220 | | std::atomic<bool> requested_status_tablet_{false}; |
1221 | | internal::RemoteTabletPtr status_tablet_ GUARDED_BY(mutex_); |
1222 | | std::atomic<TransactionState> state_{TransactionState::kRunning}; |
1223 | | |
1224 | | // Transaction is successfully initialized and ready to process intents. |
1225 | | const bool child_; |
1226 | | const bool child_had_read_time_ = false; |
1227 | | bool ready_ GUARDED_BY(mutex_) = false; |
1228 | | CommitCallback commit_callback_ GUARDED_BY(mutex_); |
1229 | | Status status_ GUARDED_BY(mutex_); |
1230 | | |
1231 | | // The following fields are initialized in CompleteConstruction() and can be used with no locking. |
1232 | | std::string log_prefix_; |
1233 | | rpc::Rpcs::Handle heartbeat_handle_; |
1234 | | rpc::Rpcs::Handle commit_handle_; |
1235 | | rpc::Rpcs::Handle abort_handle_; |
1236 | | |
1237 | | struct TabletState { |
1238 | | size_t num_batches = 0; |
1239 | | bool has_metadata = false; |
1240 | | |
1241 | 0 | std::string ToString() const { |
1242 | 0 | return Format("{ num_batches: $0 has_metadata: $1 }", num_batches, has_metadata); |
1243 | 0 | } |
1244 | | }; |
1245 | | |
1246 | | typedef std::unordered_map<TabletId, TabletState> TabletStates; |
1247 | | |
1248 | | std::mutex mutex_; |
1249 | | TabletStates tablets_ GUARDED_BY(mutex_); |
1250 | | std::vector<Waiter> waiters_; |
1251 | | std::promise<Result<TransactionMetadata>> metadata_promise_; |
1252 | | std::shared_future<Result<TransactionMetadata>> metadata_future_ GUARDED_BY(mutex_); |
1253 | | // As of 2021-04-05 running_requests_ reflects number of ops in progress within this transaction |
1254 | | // only if no in-transaction operations have failed. |
1255 | | // If in-transaction operation has failed during tablet lookup or it has failed and will be |
1256 | | // retried by YBSession inside the same transaction - Transaction::Flushed is not getting called |
1257 | | // and running_requests_ is not updated. |
1258 | | // For YBSession-level retries Transaction::Flushed will be called when operation is finally |
1259 | | // successfully flushed, if operation fails after retry - Transaction::Flushed is not getting |
1260 | | // called. |
1261 | | // We might need to fix this before turning on transactions sealing. |
1262 | | // https://github.com/yugabyte/yugabyte-db/issues/7984. |
1263 | | size_t running_requests_ GUARDED_BY(mutex_) = 0; |
1264 | | // Set to true after commit record is replicated. Used only during transaction sealing. |
1265 | | bool commit_replicated_ = false; |
1266 | | }; |
1267 | | |
1268 | 473k | CoarseTimePoint AdjustDeadline(CoarseTimePoint deadline) { |
1269 | 473k | if (deadline == CoarseTimePoint()) { |
1270 | 329k | return TransactionRpcDeadline(); |
1271 | 329k | } |
1272 | 143k | return deadline; |
1273 | 473k | } |
1274 | | |
1275 | | YBTransaction::YBTransaction(TransactionManager* manager, TransactionLocality locality) |
1276 | 411k | : impl_(new Impl(manager, this, locality)) { |
1277 | 411k | } |
1278 | | |
1279 | | YBTransaction::YBTransaction( |
1280 | | TransactionManager* manager, const TransactionMetadata& metadata, PrivateOnlyTag) |
1281 | 0 | : impl_(new Impl(manager, this, metadata)) { |
1282 | 0 | } |
1283 | | |
1284 | | YBTransaction::YBTransaction(TransactionManager* manager, ChildTransactionData data) |
1285 | 13.8k | : impl_(new Impl(manager, this, std::move(data))) { |
1286 | 13.8k | } |
1287 | | |
1288 | 418k | YBTransaction::~YBTransaction() { |
1289 | 418k | } |
1290 | | |
1291 | 267k | void YBTransaction::SetPriority(uint64_t priority) { |
1292 | 267k | impl_->SetPriority(priority); |
1293 | 267k | } |
1294 | | |
1295 | 69.0k | uint64_t YBTransaction::GetPriority() const { |
1296 | 69.0k | return impl_->GetPriority(); |
1297 | 69.0k | } |
1298 | | |
1299 | 392k | Status YBTransaction::Init(IsolationLevel isolation, const ReadHybridTime& read_time) { |
1300 | 392k | return impl_->Init(isolation, read_time); |
1301 | 392k | } |
1302 | | |
1303 | | void YBTransaction::InitWithReadPoint( |
1304 | | IsolationLevel isolation, |
1305 | 13.4k | ConsistentReadPoint&& read_point) { |
1306 | 13.4k | return impl_->InitWithReadPoint(isolation, std::move(read_point)); |
1307 | 13.4k | } |
1308 | | |
1309 | 4.32M | internal::TxnBatcherIf& YBTransaction::batcher_if() { |
1310 | 4.32M | return *impl_; |
1311 | 4.32M | } |
1312 | | |
1313 | | void YBTransaction::Commit( |
1314 | 77.2k | CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) { |
1315 | 77.2k | impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback)); |
1316 | 77.2k | } |
1317 | | |
1318 | 77.2k | void YBTransaction::Commit(CoarseTimePoint deadline, CommitCallback callback) { |
1319 | 77.2k | Commit(deadline, SealOnly::kFalse, callback); |
1320 | 77.2k | } |
1321 | | |
1322 | 0 | void YBTransaction::Commit(CommitCallback callback) { |
1323 | 0 | Commit(CoarseTimePoint(), SealOnly::kFalse, std::move(callback)); |
1324 | 0 | } |
1325 | | |
1326 | 13 | const TransactionId& YBTransaction::id() const { |
1327 | 13 | return impl_->id(); |
1328 | 13 | } |
1329 | | |
1330 | 191k | IsolationLevel YBTransaction::isolation() const { |
1331 | 191k | return impl_->isolation(); |
1332 | 191k | } |
1333 | | |
1334 | 0 | const ConsistentReadPoint& YBTransaction::read_point() const { |
1335 | 0 | return impl_->read_point(); |
1336 | 0 | } |
1337 | | |
1338 | 1.88M | ConsistentReadPoint& YBTransaction::read_point() { |
1339 | 1.88M | return impl_->read_point(); |
1340 | 1.88M | } |
1341 | | |
1342 | | std::future<Status> YBTransaction::CommitFuture( |
1343 | 191k | CoarseTimePoint deadline, SealOnly seal_only) { |
1344 | 191k | return MakeFuture<Status>([this, deadline, seal_only](auto callback) { |
1345 | 191k | impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback)); |
1346 | 191k | }); |
1347 | 191k | } |
1348 | | |
1349 | 138k | void YBTransaction::Abort(CoarseTimePoint deadline) { |
1350 | 138k | impl_->Abort(AdjustDeadline(deadline)); |
1351 | 138k | } |
1352 | | |
1353 | 44.1k | bool YBTransaction::IsRestartRequired() const { |
1354 | 44.1k | return impl_->IsRestartRequired(); |
1355 | 44.1k | } |
1356 | | |
1357 | 2 | Result<YBTransactionPtr> YBTransaction::CreateRestartedTransaction() { |
1358 | 2 | auto result = impl_->CreateSimilarTransaction(); |
1359 | 2 | RETURN_NOT_OK(impl_->FillRestartedTransaction(result->impl_.get())); |
1360 | 2 | return result; |
1361 | 2 | } |
1362 | | |
1363 | 0 | Status YBTransaction::FillRestartedTransaction(const YBTransactionPtr& dest) { |
1364 | 0 | return impl_->FillRestartedTransaction(dest->impl_.get()); |
1365 | 0 | } |
1366 | | |
1367 | | void YBTransaction::PrepareChild( |
1368 | | ForceConsistentRead force_consistent_read, CoarseTimePoint deadline, |
1369 | 0 | PrepareChildCallback callback) { |
1370 | 0 | return impl_->PrepareChild(force_consistent_read, deadline, std::move(callback)); |
1371 | 0 | } |
1372 | | |
1373 | | std::future<Result<ChildTransactionDataPB>> YBTransaction::PrepareChildFuture( |
1374 | 66.5k | ForceConsistentRead force_consistent_read, CoarseTimePoint deadline) { |
1375 | 66.5k | return MakeFuture<Result<ChildTransactionDataPB>>( |
1376 | 66.5k | [this, deadline, force_consistent_read](auto callback) { |
1377 | 66.0k | impl_->PrepareChild(force_consistent_read, AdjustDeadline(deadline), std::move(callback)); |
1378 | 66.0k | }); |
1379 | 66.5k | } |
1380 | | |
1381 | 13.7k | Result<ChildTransactionResultPB> YBTransaction::FinishChild() { |
1382 | 13.7k | return impl_->FinishChild(); |
1383 | 13.7k | } |
1384 | | |
1385 | | std::shared_future<Result<TransactionMetadata>> YBTransaction::GetMetadata( |
1386 | 18.8k | CoarseTimePoint deadline) const { |
1387 | 18.8k | return impl_->GetMetadata(deadline); |
1388 | 18.8k | } |
1389 | | |
1390 | 13.4k | Status YBTransaction::ApplyChildResult(const ChildTransactionResultPB& result) { |
1391 | 13.4k | return impl_->ApplyChildResult(result); |
1392 | 13.4k | } |
1393 | | |
1394 | 0 | std::string YBTransaction::ToString() const { |
1395 | 0 | return impl_->ToString(); |
1396 | 0 | } |
1397 | | |
1398 | 0 | Result<TransactionMetadata> YBTransaction::Release() { |
1399 | 0 | return impl_->Release(); |
1400 | 0 | } |
1401 | | |
1402 | 1.59M | Trace* YBTransaction::trace() { |
1403 | 1.59M | return impl_->trace(); |
1404 | 1.59M | } |
1405 | | |
1406 | | YBTransactionPtr YBTransaction::Take( |
1407 | 0 | TransactionManager* manager, const TransactionMetadata& metadata) { |
1408 | 0 | auto result = std::make_shared<YBTransaction>(manager, metadata, PrivateOnlyTag()); |
1409 | 0 | result->impl_->StartHeartbeat(); |
1410 | 0 | return result; |
1411 | 0 | } |
1412 | | |
1413 | 61.7k | void YBTransaction::SetActiveSubTransaction(SubTransactionId id) { |
1414 | 61.7k | return impl_->SetActiveSubTransaction(id); |
1415 | 61.7k | } |
1416 | | |
1417 | 13.5k | Status YBTransaction::RollbackSubTransaction(SubTransactionId id) { |
1418 | 13.5k | return impl_->RollbackSubTransaction(id); |
1419 | 13.5k | } |
1420 | | |
1421 | 0 | bool YBTransaction::HasSubTransactionState() { |
1422 | 0 | return impl_->HasSubTransactionState(); |
1423 | 0 | } |
1424 | | |
1425 | | } // namespace client |
1426 | | } // namespace yb |