/Users/deen/code/yugabyte-db/src/yb/master/ysql_transaction_ddl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/ysql_transaction_ddl.h" |
15 | | |
16 | | #include "yb/client/transaction_rpc.h" |
17 | | |
18 | | #include "yb/common/ql_expr.h" |
19 | | #include "yb/common/wire_protocol.h" |
20 | | |
21 | | #include "yb/docdb/doc_rowwise_iterator.h" |
22 | | |
23 | | #include "yb/gutil/casts.h" |
24 | | |
25 | | #include "yb/master/sys_catalog.h" |
26 | | |
27 | | #include "yb/tablet/tablet.h" |
28 | | #include "yb/tablet/tablet_metadata.h" |
29 | | #include "yb/tablet/tablet_peer.h" |
30 | | |
31 | | #include "yb/tserver/tserver_service.pb.h" |
32 | | |
33 | | #include "yb/util/logging.h" |
34 | | #include "yb/util/monotime.h" |
35 | | #include "yb/util/net/net_fwd.h" |
36 | | #include "yb/util/status_log.h" |
37 | | |
38 | | DEFINE_int32(ysql_transaction_bg_task_wait_ms, 200, |
39 | | "Amount of time the catalog manager background task thread waits " |
40 | | "between runs"); |
41 | | |
42 | | namespace yb { |
43 | | namespace master { |
44 | | |
45 | 85 | YsqlTransactionDdl::~YsqlTransactionDdl() { |
46 | | // Shutdown any outstanding RPCs. |
47 | 85 | rpcs_.Shutdown(); |
48 | 85 | } |
49 | | |
50 | | void YsqlTransactionDdl::VerifyTransaction( |
51 | | const TransactionMetadata& transaction_metadata, |
52 | 5.64k | std::function<Status(bool)> complete_callback) { |
53 | 5.64k | SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms)); |
54 | | |
55 | 5.64k | YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_metadata; |
56 | | |
57 | 5.64k | tserver::GetTransactionStatusRequestPB req; |
58 | 5.64k | req.set_tablet_id(transaction_metadata.status_tablet); |
59 | 5.64k | req.add_transaction_id()->assign( |
60 | 5.64k | pointer_cast<const char*>(transaction_metadata.transaction_id.data()), |
61 | 5.64k | transaction_metadata.transaction_id.size()); |
62 | | |
63 | 5.64k | auto rpc_handle = rpcs_.Prepare(); |
64 | 5.64k | if (rpc_handle == rpcs_.InvalidHandle()) { |
65 | 0 | LOG(WARNING) << "Shutting down. Cannot send GetTransactionStatus: " << transaction_metadata; |
66 | 0 | return; |
67 | 0 | } |
68 | 5.64k | auto client = client_future_.get(); |
69 | 5.64k | if (!client) { |
70 | 0 | LOG(WARNING) << "Shutting down. Cannot get GetTransactionStatus: " << transaction_metadata; |
71 | 0 | return; |
72 | 0 | } |
73 | | // We need to query the TransactionCoordinator here. Can't use TransactionStatusResolver in |
74 | | // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet. |
75 | 5.64k | *rpc_handle = client::GetTransactionStatus( |
76 | 5.64k | TransactionRpcDeadline(), |
77 | 5.64k | nullptr /* tablet */, |
78 | 5.64k | client, |
79 | 5.64k | &req, |
80 | 5.64k | [this, rpc_handle, transaction_metadata, complete_callback] |
81 | 5.63k | (Status status, const tserver::GetTransactionStatusResponsePB& resp) { |
82 | 5.63k | auto retained = rpcs_.Unregister(rpc_handle); |
83 | 5.63k | TransactionReceived(transaction_metadata, complete_callback, std::move(status), resp); |
84 | 5.63k | }); |
85 | 5.64k | (**rpc_handle).SendRpc(); |
86 | 5.64k | } |
87 | | |
88 | | void YsqlTransactionDdl::TransactionReceived( |
89 | | const TransactionMetadata& transaction, |
90 | | std::function<Status(bool)> complete_callback, |
91 | 5.64k | Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) { |
92 | 5.64k | if (!txn_status.ok()) { |
93 | 0 | LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() |
94 | 0 | << ") failed with status " << txn_status; |
95 | 0 | WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { |
96 | 0 | WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); |
97 | 0 | }), "Failed to enqueue callback"); |
98 | | // #5981: Improve failure handling to retry transient errors or recognize transaction complete. |
99 | 5.64k | } else if (resp.has_error()) { |
100 | 0 | const Status s = StatusFromPB(resp.error().status()); |
101 | 0 | const tserver::TabletServerErrorPB::Code code = resp.error().code(); |
102 | 0 | LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString() |
103 | 0 | << ") failed with error code " << tserver::TabletServerErrorPB::Code_Name(code) |
104 | 0 | << ": " << s; |
105 | 0 | WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { |
106 | 0 | WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure"); |
107 | 0 | }), "Failed to enqueue callback"); |
108 | | // #5981: Maybe have the same heuristic as above? |
109 | 5.64k | } else { |
110 | 5.64k | YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString() |
111 | 1.49k | << ", resp: " << resp.ShortDebugString(); |
112 | 5.64k | bool is_pending = (resp.status_size() == 0); |
113 | 11.2k | for (int i = 0; i < resp.status_size() && !is_pending; ++i) { |
114 | | // NOTE: COMMITTED state is also "pending" because we need APPLIED. |
115 | 5.64k | is_pending = resp.status(i) == TransactionStatus::PENDING || |
116 | 1.49k | resp.status(i) == TransactionStatus::COMMITTED; |
117 | 5.64k | } |
118 | 5.64k | if (is_pending) { |
119 | | // Re-enqueue if transaction is still pending. |
120 | 4.20k | WARN_NOT_OK(thread_pool_->SubmitFunc( |
121 | 4.20k | std::bind(&YsqlTransactionDdl::VerifyTransaction, this, transaction, complete_callback)), |
122 | 4.20k | "Could not submit VerifyTransaction to thread pool"); |
123 | 1.43k | } else { |
124 | | // If this transaction isn't pending, then the transaction is in a terminal state. |
125 | | // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. |
126 | 1.43k | WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () { |
127 | 1.43k | WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure"); |
128 | 1.43k | }), "Failed to enqueue callback"); |
129 | 1.43k | } |
130 | 5.64k | } |
131 | 5.64k | } |
132 | | |
133 | 1.43k | Result<bool> YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result<uint32_t> entry_oid) { |
134 | 1.43k | auto tablet_peer = sys_catalog_->tablet_peer(); |
135 | 1.43k | if (!tablet_peer || !tablet_peer->tablet()) { |
136 | 0 | return STATUS(ServiceUnavailable, "SysCatalog unavailable"); |
137 | 0 | } |
138 | 1.43k | const tablet::Tablet* catalog_tablet = tablet_peer->tablet(); |
139 | 1.43k | const Schema& pg_database_schema = |
140 | 1.43k | *VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_table_id))->schema; |
141 | | |
142 | | // Use Scan to query the 'pg_database' table, filtering by our 'oid'. |
143 | 1.43k | Schema projection; |
144 | 1.43k | RETURN_NOT_OK(pg_database_schema.CreateProjectionByNames({"oid"}, &projection, |
145 | 1.43k | pg_database_schema.num_key_columns())); |
146 | 1.43k | const auto oid_col_id = VERIFY_RESULT(projection.ColumnIdByName("oid")).rep(); |
147 | 1.43k | auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator( |
148 | 1.43k | projection.CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_table_id)); |
149 | 1.43k | auto e_oid_val = VERIFY_RESULT(std::move(entry_oid)); |
150 | 1.43k | { |
151 | 1.43k | auto doc_iter = down_cast<docdb::DocRowwiseIterator*>(iter.get()); |
152 | 1.43k | PgsqlConditionPB cond; |
153 | 1.43k | cond.add_operands()->set_column_id(oid_col_id); |
154 | 1.43k | cond.set_op(QL_OP_EQUAL); |
155 | 1.43k | cond.add_operands()->mutable_value()->set_uint32_value(e_oid_val); |
156 | 1.43k | const std::vector<docdb::PrimitiveValue> empty_key_components; |
157 | 1.43k | docdb::DocPgsqlScanSpec spec( |
158 | 1.43k | projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components, |
159 | 1.43k | &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */); |
160 | 1.43k | RETURN_NOT_OK(doc_iter->Init(spec)); |
161 | 1.43k | } |
162 | | |
163 | | // Expect exactly one row, which means the transaction was a success. |
164 | 1.43k | QLTableRow row; |
165 | 1.43k | if (VERIFY_RESULT(iter->HasNext())) { |
166 | 1.39k | RETURN_NOT_OK(iter->NextRow(&row)); |
167 | 1.39k | return true; |
168 | 39 | } |
169 | 39 | return false; |
170 | 39 | } |
171 | | |
172 | | } // namespace master |
173 | | } // namespace yb |