/Users/deen/code/yugabyte-db/src/yb/tserver/pg_client_session.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tserver/pg_client_session.h" |
15 | | |
16 | | #include "yb/client/batcher.h" |
17 | | #include "yb/client/client.h" |
18 | | #include "yb/client/error.h" |
19 | | #include "yb/client/namespace_alterer.h" |
20 | | #include "yb/client/session.h" |
21 | | #include "yb/client/table.h" |
22 | | #include "yb/client/table_alterer.h" |
23 | | #include "yb/client/transaction.h" |
24 | | #include "yb/client/transaction_pool.h" |
25 | | #include "yb/client/yb_op.h" |
26 | | |
27 | | #include "yb/common/ql_type.h" |
28 | | #include "yb/common/pgsql_error.h" |
29 | | #include "yb/common/transaction_error.h" |
30 | | #include "yb/common/schema.h" |
31 | | #include "yb/common/wire_protocol.h" |
32 | | |
33 | | #include "yb/gutil/casts.h" |
34 | | |
35 | | #include "yb/rpc/rpc_context.h" |
36 | | |
37 | | #include "yb/tserver/pg_client.pb.h" |
38 | | #include "yb/tserver/pg_create_table.h" |
39 | | #include "yb/tserver/pg_table_cache.h" |
40 | | |
41 | | #include "yb/util/logging.h" |
42 | | #include "yb/util/result.h" |
43 | | #include "yb/util/scope_exit.h" |
44 | | #include "yb/util/status_format.h" |
45 | | #include "yb/util/string_util.h" |
46 | | #include "yb/util/yb_pg_errcodes.h" |
47 | | |
48 | | DECLARE_bool(ysql_serializable_isolation_for_ddl_txn); |
49 | | |
50 | | namespace yb { |
51 | | namespace tserver { |
52 | | |
53 | | namespace { |
54 | | |
55 | 0 | std::string SessionLogPrefix(uint64_t id) { |
56 | 0 | return Format("S $0: ", id); |
57 | 0 | } |
58 | | |
59 | 18.5k | string GetStatusStringSet(const client::CollectedErrors& errors) { |
60 | 18.5k | std::set<string> status_strings; |
61 | 394k | for (const auto& error : errors) { |
62 | 394k | status_strings.insert(error->status().ToString()); |
63 | 394k | } |
64 | 18.5k | return RangeToString(status_strings.begin(), status_strings.end()); |
65 | 18.5k | } |
66 | | |
67 | 45.7k | bool IsHomogeneousErrors(const client::CollectedErrors& errors) { |
68 | 45.7k | if (errors.size() < 2) { |
69 | 27.2k | return true; |
70 | 27.2k | } |
71 | 18.5k | auto i = errors.begin(); |
72 | 18.5k | const auto& status = (**i).status(); |
73 | 18.5k | const auto codes = status.ErrorCodesSlice(); |
74 | 232k | for (++i; i != errors.end(); ++i) { |
75 | 225k | const auto& s = (**i).status(); |
76 | 225k | if (s.code() != status.code() || codes != s.ErrorCodesSlice()) { |
77 | 11.6k | return false; |
78 | 11.6k | } |
79 | 225k | } |
80 | 6.81k | return true; |
81 | 18.5k | } |
82 | | |
83 | 295k | boost::optional<YBPgErrorCode> PsqlErrorCode(const Status& status) { |
84 | 295k | const uint8_t* err_data = status.ErrorData(PgsqlErrorTag::kCategory); |
85 | 295k | if (err_data) { |
86 | 151k | return PgsqlErrorTag::Decode(err_data); |
87 | 151k | } |
88 | 144k | return boost::none; |
89 | 144k | } |
90 | | |
91 | | // Get a common Postgres error code from the status and all errors, and append it to a previous |
92 | | // Status. |
93 | | // If any of those have different conflicting error codes, previous result is returned as-is. |
94 | | CHECKED_STATUS AppendPsqlErrorCode(const Status& status, |
95 | 11.6k | const client::CollectedErrors& errors) { |
96 | 11.6k | boost::optional<YBPgErrorCode> common_psql_error = boost::make_optional(false, YBPgErrorCode()); |
97 | 295k | for(const auto& error : errors) { |
98 | 295k | const auto psql_error = PsqlErrorCode(error->status()); |
99 | 295k | if (!common_psql_error) { |
100 | 79.3k | common_psql_error = psql_error; |
101 | 216k | } else if (psql_error && common_psql_error != psql_error) { |
102 | 0 | common_psql_error = boost::none; |
103 | 0 | break; |
104 | 0 | } |
105 | 295k | } |
106 | 18.4E | return common_psql_error ? status.CloneAndAddErrorCode(PgsqlError(*common_psql_error)) : status; |
107 | 11.6k | } |
108 | | |
109 | | // Get a common transaction error code for all the errors and append it to the previous Status. |
110 | 11.6k | CHECKED_STATUS AppendTxnErrorCode(const Status& status, const client::CollectedErrors& errors) { |
111 | 11.6k | TransactionErrorCode common_txn_error = TransactionErrorCode::kNone; |
112 | 295k | for (const auto& error : errors) { |
113 | 295k | const TransactionErrorCode txn_error = TransactionError(error->status()).value(); |
114 | 295k | if (txn_error == TransactionErrorCode::kNone || |
115 | 295k | txn_error == common_txn_error) { |
116 | 192k | continue; |
117 | 192k | } |
118 | 103k | if (common_txn_error == TransactionErrorCode::kNone) { |
119 | 11.6k | common_txn_error = txn_error; |
120 | 11.6k | continue; |
121 | 11.6k | } |
122 | | // If we receive a list of errors, with one as kConflict and others as kAborted, we retain the |
123 | | // error as kConflict, since in case of a batched request the first operation would receive the |
124 | | // kConflict and all the others would receive the kAborted error. |
125 | 91.4k | if ((txn_error == TransactionErrorCode::kConflict && |
126 | 5.88k | common_txn_error == TransactionErrorCode::kAborted) || |
127 | 85.5k | (txn_error == TransactionErrorCode::kAborted && |
128 | 91.4k | common_txn_error == TransactionErrorCode::kConflict)) { |
129 | 91.4k | common_txn_error = TransactionErrorCode::kConflict; |
130 | 91.4k | continue; |
131 | 91.4k | } |
132 | | |
133 | | // In all the other cases, reset the common_txn_error to kNone. |
134 | 18.4E | common_txn_error = TransactionErrorCode::kNone; |
135 | 18.4E | break; |
136 | 18.4E | } |
137 | | |
138 | 11.6k | return (common_txn_error != TransactionErrorCode::kNone) ? |
139 | 11.6k | status.CloneAndAddErrorCode(TransactionError(common_txn_error)) : status; |
140 | 11.6k | } |
141 | | |
142 | | // Given a set of errors from operations, this function attempts to combine them into one status |
143 | | // that is later passed to PostgreSQL and further converted into a more specific error code. |
144 | 775k | CHECKED_STATUS CombineErrorsToStatus(const client::CollectedErrors& errors, const Status& status) { |
145 | 775k | if (errors.empty()) |
146 | 729k | return status; |
147 | | |
148 | 45.7k | if (status.IsIOError() && |
149 | | // TODO: move away from string comparison here and use a more specific status than IOError. |
150 | | // See https://github.com/YugaByte/yugabyte-db/issues/702 |
151 | 45.7k | status.message() == client::internal::Batcher::kErrorReachingOutToTServersMsg && |
152 | 45.7k | IsHomogeneousErrors(errors)) { |
153 | 34.0k | const auto& result = errors.front()->status(); |
154 | 34.0k | if (errors.size() == 1) { |
155 | 27.2k | return result; |
156 | 27.2k | } |
157 | 6.81k | return Status(result.code(), |
158 | 6.81k | __FILE__, |
159 | 6.81k | __LINE__, |
160 | 6.81k | GetStatusStringSet(errors), |
161 | 6.81k | result.ErrorCodesSlice(), |
162 | 6.81k | DupFileName::kFalse); |
163 | 6.81k | } |
164 | | |
165 | 11.7k | Status result = |
166 | 11.7k | status.ok() |
167 | 0 | ? STATUS(InternalError, GetStatusStringSet(errors)) |
168 | 11.7k | : status.CloneAndAppend(". Errors from tablet servers: " + GetStatusStringSet(errors)); |
169 | | |
170 | 11.7k | return AppendTxnErrorCode(AppendPsqlErrorCode(result, errors), errors); |
171 | 11.7k | } |
172 | | |
173 | 3.46M | Status HandleResponse(const client::YBPgsqlOp& op, PgPerformResponsePB* resp) { |
174 | 3.46M | const auto& response = op.response(); |
175 | 3.46M | if (response.status() == PgsqlResponsePB::PGSQL_STATUS_OK) { |
176 | 3.46M | if (op.read_only() && op.table()->schema().table_properties().is_ysql_catalog_table()) { |
177 | 380k | const auto& pgsql_op = down_cast<const client::YBPgsqlReadOp&>(op); |
178 | 380k | if (pgsql_op.used_read_time()) { |
179 | | // Non empty used_read_time field in catalog read operation means this is the very first |
180 | | // catalog read operation after catalog read time resetting. read_time for the operation |
181 | | // has been chosen by master. All further reads from catalog must use same read point. |
182 | 18.5k | auto catalog_read_time = pgsql_op.used_read_time(); |
183 | | |
184 | | // We set global limit to local limit to avoid read restart errors because they are |
185 | | // disruptive to system catalog reads and it is not always possible to handle them there. |
186 | | // This might lead to reading slightly outdated state of the system catalog if a recently |
187 | | // committed DDL transaction used a transaction status tablet whose leader's clock is skewed |
188 | | // and is in the future compared to the master leader's clock. |
189 | | // TODO(dmitry) This situation will be handled in context of #7964. |
190 | 18.5k | catalog_read_time.global_limit = catalog_read_time.local_limit; |
191 | 18.5k | catalog_read_time.ToPB(resp->mutable_catalog_read_time()); |
192 | 18.5k | } |
193 | 380k | } |
194 | 3.46M | return Status::OK(); |
195 | 3.46M | } |
196 | | |
197 | 707 | auto status = STATUS( |
198 | 707 | QLError, response.error_message(), Slice(), PgsqlRequestStatus(response.status())); |
199 | | |
200 | 707 | if (response.has_pg_error_code()) { |
201 | 1 | status = status.CloneAndAddErrorCode( |
202 | 1 | PgsqlError(static_cast<YBPgErrorCode>(response.pg_error_code()))); |
203 | 1 | } |
204 | | |
205 | 707 | if (response.has_txn_error_code()) { |
206 | 1 | status = status.CloneAndAddErrorCode( |
207 | 1 | TransactionError(static_cast<TransactionErrorCode>(response.txn_error_code()))); |
208 | 1 | } |
209 | | |
210 | 707 | return status; |
211 | 707 | } |
212 | | |
213 | 4.02M | CHECKED_STATUS GetTable(const TableId& table_id, PgTableCache* cache, client::YBTablePtr* table) { |
214 | 4.02M | if (*table && (**table).id() == table_id) { |
215 | 2.83M | return Status::OK(); |
216 | 2.83M | } |
217 | 1.19M | *table = VERIFY_RESULT(cache->Get(table_id)); |
218 | 1.19M | return Status::OK(); |
219 | 1.19M | } |
220 | | |
221 | | Result<PgClientSessionOperations> PrepareOperations( |
222 | 774k | const PgPerformRequestPB& req, client::YBSession* session, PgTableCache* table_cache) { |
223 | 774k | auto write_time = HybridTime::FromPB(req.write_time()); |
224 | 774k | std::vector<std::shared_ptr<client::YBPgsqlOp>> ops; |
225 | 774k | ops.reserve(req.ops().size()); |
226 | 774k | client::YBTablePtr table; |
227 | 774k | bool finished = false; |
228 | 775k | auto se = ScopeExit([&finished, session] { |
229 | 775k | if (!finished) { |
230 | 6 | session->Abort(); |
231 | 6 | } |
232 | 775k | }); |
233 | 4.02M | for (const auto& op : req.ops()) { |
234 | 4.02M | if (op.has_read()) { |
235 | 1.89M | const auto& read = op.read(); |
236 | 1.89M | RETURN_NOT_OK(GetTable(read.table_id(), table_cache, &table)); |
237 | 1.89M | const auto read_op = std::make_shared<client::YBPgsqlReadOp>( |
238 | 1.89M | table, const_cast<PgsqlReadRequestPB*>(&read)); |
239 | 1.89M | if (op.read_from_followers()) { |
240 | 0 | read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
241 | 0 | } |
242 | 1.89M | ops.push_back(read_op); |
243 | 1.89M | session->Apply(std::move(read_op)); |
244 | 2.12M | } else { |
245 | 2.12M | const auto& write = op.write(); |
246 | 2.12M | RETURN_NOT_OK(GetTable(write.table_id(), table_cache, &table)); |
247 | 2.12M | const auto write_op = std::make_shared<client::YBPgsqlWriteOp>( |
248 | 2.12M | table, const_cast<PgsqlWriteRequestPB*>(&write)); |
249 | 2.12M | if (write_time) { |
250 | 226 | write_op->SetWriteTime(write_time); |
251 | 226 | write_time = HybridTime::kInvalid; |
252 | 226 | } |
253 | 2.12M | ops.push_back(write_op); |
254 | 2.12M | session->Apply(std::move(write_op)); |
255 | 2.12M | } |
256 | 4.02M | } |
257 | 774k | finished = true; |
258 | 774k | return ops; |
259 | 774k | } |
260 | | |
261 | | struct PerformData { |
262 | | uint64_t session_id; |
263 | | const PgPerformRequestPB* req; |
264 | | PgPerformResponsePB* resp; |
265 | | rpc::RpcContext context; |
266 | | PgClientSessionOperations ops; |
267 | | PgTableCache* table_cache; |
268 | | |
269 | 775k | void FlushDone(client::FlushStatus* flush_status) { |
270 | 775k | auto status = CombineErrorsToStatus(flush_status->errors, flush_status->status); |
271 | 775k | if (status.ok()) { |
272 | 729k | status = ProcessResponse(); |
273 | 729k | } |
274 | 775k | if (!status.ok()) { |
275 | 46.7k | StatusToPB(status, resp->mutable_status()); |
276 | 46.7k | } |
277 | 775k | context.RespondSuccess(); |
278 | 775k | } |
279 | | |
280 | 729k | CHECKED_STATUS ProcessResponse() { |
281 | 729k | int idx = 0; |
282 | 3.46M | for (const auto& op : ops) { |
283 | 3.46M | const auto status = HandleResponse(*op, resp); |
284 | 3.46M | if (!status.ok()) { |
285 | 1.03k | if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH) { |
286 | 11 | table_cache->Invalidate(op->table()->id()); |
287 | 11 | } |
288 | 18.4E | VLOG(2) << SessionLogPrefix(session_id) << "Failed op " << idx << ": " << status; |
289 | 1.03k | return status.CloneAndAddErrorCode(OpIndex(idx)); |
290 | 1.03k | } |
291 | 3.45M | const auto& req_op = req->ops()[idx]; |
292 | 3.45M | if (req_op.has_read() && req_op.read().is_for_backfill() && |
293 | 289 | op->response().is_backfill_batch_done()) { |
294 | | // After backfill table schema version is updated, so we reset cache in advance. |
295 | 257 | table_cache->Invalidate(op->table()->id()); |
296 | 257 | } |
297 | 3.45M | ++idx; |
298 | 3.45M | } |
299 | | |
300 | 728k | auto& responses = *resp->mutable_responses(); |
301 | 728k | responses.Reserve(narrow_cast<int>(ops.size())); |
302 | 3.45M | for (const auto& op : ops) { |
303 | 3.45M | auto& op_resp = *responses.Add(); |
304 | 3.45M | op_resp.Swap(op->mutable_response()); |
305 | 3.45M | if (op_resp.has_rows_data_sidecar()) { |
306 | 3.45M | op_resp.set_rows_data_sidecar(narrow_cast<int>(context.AddRpcSidecar(op->rows_data()))); |
307 | 3.45M | } |
308 | 3.45M | } |
309 | | |
310 | 728k | return Status::OK(); |
311 | 729k | } |
312 | | }; |
313 | | |
314 | | client::YBSessionPtr CreateSession( |
315 | 4.95k | client::YBClient* client, const scoped_refptr<ClockBase>& clock) { |
316 | 4.95k | auto result = std::make_shared<client::YBSession>(client, clock); |
317 | 4.95k | result->SetForceConsistentRead(client::ForceConsistentRead::kTrue); |
318 | 4.95k | result->set_allow_local_calls_in_curr_thread(false); |
319 | 4.95k | return result; |
320 | 4.95k | } |
321 | | |
322 | | } // namespace |
323 | | |
324 | | PgClientSession::PgClientSession( |
325 | | client::YBClient* client, const scoped_refptr<ClockBase>& clock, |
326 | | std::reference_wrapper<const TransactionPoolProvider> transaction_pool_provider, |
327 | | PgTableCache* table_cache, uint64_t id) |
328 | | : client_(*client), |
329 | | transaction_pool_provider_(transaction_pool_provider.get()), |
330 | | table_cache_(*table_cache), id_(id), |
331 | | session_(CreateSession(client, clock)), |
332 | | ddl_session_(CreateSession(client, clock)), |
333 | 1.65k | catalog_session_(CreateSession(client, clock)) { |
334 | 1.65k | } |
335 | | |
336 | 936k | uint64_t PgClientSession::id() const { |
337 | 936k | return id_; |
338 | 936k | } |
339 | | |
340 | | Status PgClientSession::CreateTable( |
341 | 1.41k | const PgCreateTableRequestPB& req, PgCreateTableResponsePB* resp, rpc::RpcContext* context) { |
342 | 1.41k | PgCreateTable helper(req); |
343 | 1.41k | RETURN_NOT_OK(helper.Prepare()); |
344 | 1.41k | const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction())); |
345 | 1.41k | RETURN_NOT_OK(helper.Exec(&client(), metadata, context->GetClientDeadline())); |
346 | 0 | VLOG_WITH_PREFIX(1) << __func__ << ": " << req.table_name(); |
347 | 1.41k | const auto& indexed_table_id = helper.indexed_table_id(); |
348 | 1.41k | if (indexed_table_id.IsValid()) { |
349 | 150 | table_cache_.Invalidate(indexed_table_id.GetYBTableId()); |
350 | 150 | } |
351 | 1.41k | return Status::OK(); |
352 | 1.41k | } |
353 | | |
354 | | Status PgClientSession::CreateDatabase( |
355 | | const PgCreateDatabaseRequestPB& req, PgCreateDatabaseResponsePB* resp, |
356 | 22 | rpc::RpcContext* context) { |
357 | 22 | return client().CreateNamespace( |
358 | 22 | req.database_name(), |
359 | 22 | YQL_DATABASE_PGSQL, |
360 | 22 | "" /* creator_role_name */, |
361 | 22 | GetPgsqlNamespaceId(req.database_oid()), |
362 | 22 | req.source_database_oid() != kPgInvalidOid |
363 | 22 | ? GetPgsqlNamespaceId(req.source_database_oid()) : "", |
364 | 22 | req.next_oid(), |
365 | 22 | VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction())), |
366 | 22 | req.colocated(), |
367 | 22 | context->GetClientDeadline()); |
368 | 22 | } |
369 | | |
370 | | Status PgClientSession::DropDatabase( |
371 | 21 | const PgDropDatabaseRequestPB& req, PgDropDatabaseResponsePB* resp, rpc::RpcContext* context) { |
372 | 21 | return client().DeleteNamespace( |
373 | 21 | req.database_name(), |
374 | 21 | YQL_DATABASE_PGSQL, |
375 | 21 | GetPgsqlNamespaceId(req.database_oid()), |
376 | 21 | context->GetClientDeadline()); |
377 | 21 | } |
378 | | |
379 | | Status PgClientSession::DropTable( |
380 | 1.17k | const PgDropTableRequestPB& req, PgDropTableResponsePB* resp, rpc::RpcContext* context) { |
381 | 1.17k | const auto yb_table_id = PgObjectId::GetYBTableIdFromPB(req.table_id()); |
382 | 1.17k | if (req.index()) { |
383 | 141 | client::YBTableName indexed_table; |
384 | 141 | RETURN_NOT_OK(client().DeleteIndexTable( |
385 | 141 | yb_table_id, &indexed_table, true, context->GetClientDeadline())); |
386 | 140 | indexed_table.SetIntoTableIdentifierPB(resp->mutable_indexed_table()); |
387 | 140 | table_cache_.Invalidate(indexed_table.table_id()); |
388 | 140 | table_cache_.Invalidate(yb_table_id); |
389 | 140 | return Status::OK(); |
390 | 1.03k | } |
391 | | |
392 | 1.03k | RETURN_NOT_OK(client().DeleteTable(yb_table_id, true, context->GetClientDeadline())); |
393 | 1.03k | table_cache_.Invalidate(yb_table_id); |
394 | 1.03k | return Status::OK(); |
395 | 1.03k | } |
396 | | |
397 | | Status PgClientSession::AlterDatabase( |
398 | | const PgAlterDatabaseRequestPB& req, PgAlterDatabaseResponsePB* resp, |
399 | 0 | rpc::RpcContext* context) { |
400 | 0 | const auto alterer = client().NewNamespaceAlterer( |
401 | 0 | req.database_name(), GetPgsqlNamespaceId(req.database_oid())); |
402 | 0 | alterer->SetDatabaseType(YQL_DATABASE_PGSQL); |
403 | 0 | alterer->RenameTo(req.new_name()); |
404 | 0 | return alterer->Alter(context->GetClientDeadline()); |
405 | 0 | } |
406 | | |
407 | | Status PgClientSession::AlterTable( |
408 | 155 | const PgAlterTableRequestPB& req, PgAlterTableResponsePB* resp, rpc::RpcContext* context) { |
409 | 155 | const auto table_id = PgObjectId::GetYBTableIdFromPB(req.table_id()); |
410 | 155 | const auto alterer = client().NewTableAlterer(table_id); |
411 | 155 | const auto txn = VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction())); |
412 | 155 | if (txn) { |
413 | 155 | alterer->part_of_transaction(txn); |
414 | 155 | } |
415 | 69 | for (const auto& add_column : req.add_columns()) { |
416 | 69 | const auto yb_type = QLType::Create(static_cast<DataType>(add_column.attr_ybtype())); |
417 | 69 | alterer->AddColumn(add_column.attr_name()) |
418 | 69 | ->Type(yb_type)->Order(add_column.attr_num())->PgTypeOid(add_column.attr_pgoid()); |
419 | | // Do not set 'nullable' attribute as PgCreateTable::AddColumn() does not do it. |
420 | 69 | } |
421 | 1 | for (const auto& rename_column : req.rename_columns()) { |
422 | 1 | alterer->AlterColumn(rename_column.old_name())->RenameTo(rename_column.new_name()); |
423 | 1 | } |
424 | 50 | for (const auto& drop_column : req.drop_columns()) { |
425 | 50 | alterer->DropColumn(drop_column); |
426 | 50 | } |
427 | 155 | if (!req.rename_table().table_name().empty()) { |
428 | 42 | client::YBTableName new_table_name( |
429 | 42 | YQL_DATABASE_PGSQL, req.rename_table().database_name(), req.rename_table().table_name()); |
430 | 42 | alterer->RenameTo(new_table_name); |
431 | 42 | } |
432 | | |
433 | 155 | alterer->timeout(context->GetClientDeadline() - CoarseMonoClock::now()); |
434 | 155 | RETURN_NOT_OK(alterer->Alter()); |
435 | 154 | table_cache_.Invalidate(table_id); |
436 | 154 | return Status::OK(); |
437 | 155 | } |
438 | | |
439 | | Status PgClientSession::TruncateTable( |
440 | | const PgTruncateTableRequestPB& req, PgTruncateTableResponsePB* resp, |
441 | 31 | rpc::RpcContext* context) { |
442 | 31 | return client().TruncateTable(PgObjectId::GetYBTableIdFromPB(req.table_id())); |
443 | 31 | } |
444 | | |
445 | | Status PgClientSession::BackfillIndex( |
446 | | const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp, |
447 | 89 | rpc::RpcContext* context) { |
448 | 89 | return client().BackfillIndex( |
449 | 89 | PgObjectId::GetYBTableIdFromPB(req.table_id()), /* wait= */ true, |
450 | 89 | context->GetClientDeadline()); |
451 | 89 | } |
452 | | |
453 | | Status PgClientSession::CreateTablegroup( |
454 | | const PgCreateTablegroupRequestPB& req, PgCreateTablegroupResponsePB* resp, |
455 | 1 | rpc::RpcContext* context) { |
456 | 1 | const auto id = PgObjectId::FromPB(req.tablegroup_id()); |
457 | 1 | auto tablespace_id = PgObjectId::FromPB(req.tablespace_id()); |
458 | 1 | auto s = client().CreateTablegroup( |
459 | 1 | req.database_name(), GetPgsqlNamespaceId(id.database_oid), |
460 | 1 | id.GetYBTablegroupId(), |
461 | 1 | tablespace_id.IsValid() ? tablespace_id.GetYBTablespaceId() : ""); |
462 | 1 | if (s.ok()) { |
463 | 1 | return Status::OK(); |
464 | 1 | } |
465 | | |
466 | 0 | if (s.IsAlreadyPresent()) { |
467 | 0 | return STATUS(InvalidArgument, "Duplicate tablegroup"); |
468 | 0 | } |
469 | | |
470 | 0 | if (s.IsNotFound()) { |
471 | 0 | return STATUS(InvalidArgument, "Database not found", req.database_name()); |
472 | 0 | } |
473 | | |
474 | 0 | return STATUS_FORMAT( |
475 | 0 | InvalidArgument, "Invalid table definition: $0", |
476 | 0 | s.ToString(false /* include_file_and_line */, false /* include_code */)); |
477 | 0 | } |
478 | | |
479 | | Status PgClientSession::DropTablegroup( |
480 | | const PgDropTablegroupRequestPB& req, PgDropTablegroupResponsePB* resp, |
481 | 1 | rpc::RpcContext* context) { |
482 | 1 | const auto id = PgObjectId::FromPB(req.tablegroup_id()); |
483 | 1 | const auto status = client().DeleteTablegroup( |
484 | 1 | GetPgsqlNamespaceId(id.database_oid), |
485 | 1 | GetPgsqlTablegroupId(id.database_oid, id.object_oid)); |
486 | 1 | if (status.IsNotFound()) { |
487 | 0 | return Status::OK(); |
488 | 0 | } |
489 | 1 | return status; |
490 | 1 | } |
491 | | |
492 | | Status PgClientSession::RollbackSubTransaction( |
493 | | const PgRollbackSubTransactionRequestPB& req, PgRollbackSubTransactionResponsePB* resp, |
494 | 23.5k | rpc::RpcContext* context) { |
495 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString(); |
496 | 23.5k | SCHECK(txn_, IllegalState, |
497 | 23.5k | Format("Rollback sub transaction $0, when not transaction is running", |
498 | 23.5k | req.sub_transaction_id())); |
499 | 23.5k | return txn_->RollbackSubTransaction(req.sub_transaction_id()); |
500 | 23.5k | } |
501 | | |
502 | | Status PgClientSession::SetActiveSubTransaction( |
503 | | const PgSetActiveSubTransactionRequestPB& req, PgSetActiveSubTransactionResponsePB* resp, |
504 | 48.8k | rpc::RpcContext* context) { |
505 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString(); |
506 | | |
507 | 48.8k | if (req.has_options()) { |
508 | 307 | RETURN_NOT_OK(BeginTransactionIfNecessary(req.options())); |
509 | 307 | txn_serial_no_ = req.options().txn_serial_no(); |
510 | 307 | } |
511 | | |
512 | 48.8k | SCHECK(txn_, IllegalState, |
513 | 48.8k | Format("Set active sub transaction $0, when not transaction is running", |
514 | 48.8k | req.sub_transaction_id())); |
515 | | |
516 | 48.8k | txn_->SetActiveSubTransaction(req.sub_transaction_id()); |
517 | 48.8k | return Status::OK(); |
518 | 48.8k | } |
519 | | |
520 | | Status PgClientSession::FinishTransaction( |
521 | | const PgFinishTransactionRequestPB& req, PgFinishTransactionResponsePB* resp, |
522 | 82.4k | rpc::RpcContext* context) { |
523 | 82.4k | saved_priority_ = boost::none; |
524 | 76.1k | auto& txn = req.ddl_mode() ? ddl_txn_ : txn_; |
525 | 82.4k | if (!txn) { |
526 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << "ddl: " << req.ddl_mode() << ", no running transaction"; |
527 | 513 | return Status::OK(); |
528 | 513 | } |
529 | 81.9k | const auto txn_value = std::move(txn); |
530 | 76.1k | (req.ddl_mode() ? ddl_session_ : session_)->SetTransaction(nullptr); |
531 | | |
532 | 81.9k | if (req.commit()) { |
533 | 78.6k | const auto commit_status = txn_value->CommitFuture().get(); |
534 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(2) |
535 | 18.4E | << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() |
536 | 18.4E | << ", commit: " << commit_status; |
537 | 78.6k | return commit_status; |
538 | 78.6k | } |
539 | | |
540 | 3.27k | VLOG_WITH_PREFIX_AND_FUNC(2) |
541 | 17 | << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", abort"; |
542 | 3.27k | txn_value->Abort(); |
543 | 3.27k | return Status::OK(); |
544 | 3.27k | } |
545 | | |
546 | | Status PgClientSession::Perform( |
547 | 775k | const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context) { |
548 | 775k | auto session = VERIFY_RESULT(SetupSession(req)); |
549 | | |
550 | 775k | session->SetDeadline(context->GetClientDeadline()); |
551 | | |
552 | 775k | auto ops = VERIFY_RESULT(PrepareOperations(req, session, &table_cache_)); |
553 | 775k | auto data = std::make_shared<PerformData>(PerformData { |
554 | 775k | .session_id = id_, |
555 | 775k | .req = &req, |
556 | 775k | .resp = resp, |
557 | 775k | .context = std::move(*context), |
558 | 775k | .ops = std::move(ops), |
559 | 775k | .table_cache = &table_cache_, |
560 | 775k | }); |
561 | 775k | session->FlushAsync([data](client::FlushStatus* flush_status) { |
562 | 775k | data->FlushDone(flush_status); |
563 | 775k | }); |
564 | 775k | return Status::OK(); |
565 | 775k | } |
566 | | |
567 | 775k | void PgClientSession::ProcessReadTimeManipulation(ReadTimeManipulation manipulation) { |
568 | 775k | switch (manipulation) { |
569 | 23.6k | case ReadTimeManipulation::RESET: { |
570 | | // If a txn_ has been created, session_->read_point() returns the read point stored in txn_. |
571 | 23.6k | ConsistentReadPoint* rp = session_->read_point(); |
572 | 23.6k | rp->SetCurrentReadTime(); |
573 | | |
574 | 0 | VLOG(1) << "Setting current ht as read point " << rp->GetReadTime(); |
575 | 23.6k | } |
576 | 23.6k | return; |
577 | 1 | case ReadTimeManipulation::RESTART: { |
578 | 1 | ConsistentReadPoint* rp = session_->read_point(); |
579 | 1 | rp->Restart(); |
580 | | |
581 | 0 | VLOG(1) << "Restarted read point " << rp->GetReadTime(); |
582 | 1 | } |
583 | 1 | return; |
584 | 751k | case ReadTimeManipulation::NONE: |
585 | 751k | return; |
586 | 0 | case ReadTimeManipulation::ReadTimeManipulation_INT_MIN_SENTINEL_DO_NOT_USE_: |
587 | 0 | case ReadTimeManipulation::ReadTimeManipulation_INT_MAX_SENTINEL_DO_NOT_USE_: |
588 | 0 | break; |
589 | 0 | } |
590 | 0 | FATAL_INVALID_ENUM_VALUE(ReadTimeManipulation, manipulation); |
591 | 0 | } |
592 | | |
593 | 775k | Result<client::YBSession*> PgClientSession::SetupSession(const PgPerformRequestPB& req) { |
594 | 775k | client::YBSession* session; |
595 | 775k | client::YBTransaction* transaction; |
596 | | |
597 | 775k | const auto& options = req.options(); |
598 | 775k | if (options.use_catalog_session()) { |
599 | 194k | session = catalog_session_.get(); |
600 | 194k | transaction = nullptr; |
601 | 580k | } else if (options.ddl_mode()) { |
602 | 200k | RETURN_NOT_OK(GetDdlTransactionMetadata(true)); |
603 | 200k | session = ddl_session_.get(); |
604 | 200k | transaction = ddl_txn_.get(); |
605 | 380k | } else { |
606 | 380k | RETURN_NOT_OK(BeginTransactionIfNecessary(options)); |
607 | | |
608 | 380k | session = session_.get(); |
609 | 380k | transaction = txn_.get(); |
610 | 380k | } |
611 | | |
612 | 62 | VLOG_WITH_PREFIX(4) << __func__ << ": " << options.ShortDebugString(); |
613 | | |
614 | 775k | if (options.restart_transaction()) { |
615 | 0 | if(options.ddl_mode()) { |
616 | 0 | return STATUS(NotSupported, "Not supported to restart DDL transaction"); |
617 | 0 | } |
618 | 0 | txn_ = VERIFY_RESULT(RestartTransaction(session, transaction)); |
619 | 0 | transaction = txn_.get(); |
620 | 775k | } else { |
621 | 775k | ProcessReadTimeManipulation(options.read_time_manipulation()); |
622 | 775k | if (options.has_read_time() && |
623 | 211k | (options.read_time().has_read_ht() || options.use_catalog_session())) { |
624 | 211k | const auto read_time = options.read_time().has_read_ht() |
625 | 192k | ? ReadHybridTime::FromPB(options.read_time()) : ReadHybridTime(); |
626 | 211k | session->SetReadPoint(read_time); |
627 | 211k | if (read_time) { |
628 | 18.4E | VLOG_WITH_PREFIX(3) << "Read time: " << read_time; |
629 | 18.5k | } else { |
630 | 18.4E | VLOG_WITH_PREFIX(3) << "Reset read time: " << session->read_point()->GetReadTime(); |
631 | 18.5k | } |
632 | 564k | } else if (!transaction && |
633 | 191k | (options.ddl_mode() || txn_serial_no_ != options.txn_serial_no())) { |
634 | 59.8k | session->SetReadPoint(client::Restart::kFalse); |
635 | 0 | VLOG_WITH_PREFIX(3) << "New read time: " << session->read_point()->GetReadTime(); |
636 | 504k | } else { |
637 | 36 | VLOG_WITH_PREFIX(3) << "Keep read time: " << session->read_point()->GetReadTime(); |
638 | 504k | } |
639 | 775k | } |
640 | | |
641 | 775k | if (options.defer_read_point()) { |
642 | | // This call is idempotent, meaning it has no effect after the first call. |
643 | 0 | session->DeferReadPoint(); |
644 | 0 | } |
645 | | |
646 | 775k | if (!options.ddl_mode() && !options.use_catalog_session()) { |
647 | 380k | txn_serial_no_ = options.txn_serial_no(); |
648 | | |
649 | 380k | const auto in_txn_limit = HybridTime::FromPB(options.in_txn_limit_ht()); |
650 | 380k | if (in_txn_limit) { |
651 | 34 | VLOG_WITH_PREFIX(3) << "In txn limit: " << in_txn_limit; |
652 | 380k | session->SetInTxnLimit(in_txn_limit); |
653 | 380k | } |
654 | 380k | } |
655 | 775k | return session; |
656 | 775k | } |
657 | | |
658 | 0 | std::string PgClientSession::LogPrefix() { |
659 | 0 | return SessionLogPrefix(id_); |
660 | 0 | } |
661 | | |
662 | 380k | Status PgClientSession::BeginTransactionIfNecessary(const PgPerformOptionsPB& options) { |
663 | 380k | const auto isolation = static_cast<IsolationLevel>(options.isolation()); |
664 | | |
665 | 380k | auto priority = options.priority(); |
666 | 380k | if (txn_ && txn_serial_no_ != options.txn_serial_no()) { |
667 | 1 | VLOG_WITH_PREFIX(2) |
668 | 1 | << "Abort previous transaction, use existing priority: " << options.use_existing_priority() |
669 | 1 | << ", new isolation: " << IsolationLevel_Name(isolation); |
670 | | |
671 | 20.2k | if (options.use_existing_priority()) { |
672 | 20.2k | saved_priority_ = txn_->GetPriority(); |
673 | 20.2k | } |
674 | 20.2k | txn_->Abort(); |
675 | 20.2k | session_->SetTransaction(nullptr); |
676 | 20.2k | txn_ = nullptr; |
677 | 20.2k | } |
678 | | |
679 | 380k | if (isolation == IsolationLevel::NON_TRANSACTIONAL) { |
680 | 199k | return Status::OK(); |
681 | 199k | } |
682 | | |
683 | 180k | if (txn_) { |
684 | 84.6k | return txn_->isolation() != isolation |
685 | 0 | ? STATUS_FORMAT( |
686 | 84.6k | IllegalState, |
687 | 84.6k | "Attempt to change isolation level of running transaction from $0 to $1", |
688 | 84.6k | txn_->isolation(), isolation) |
689 | 84.6k | : Status::OK(); |
690 | 84.6k | } |
691 | | |
692 | 96.3k | txn_ = transaction_pool_provider_()->Take( |
693 | 96.3k | client::ForceGlobalTransaction(options.force_global_transaction())); |
694 | 96.3k | if ((isolation == IsolationLevel::SNAPSHOT_ISOLATION || |
695 | 74.6k | isolation == IsolationLevel::READ_COMMITTED) && |
696 | 22.1k | txn_serial_no_ == options.txn_serial_no()) { |
697 | 628 | txn_->InitWithReadPoint(isolation, std::move(*session_->read_point())); |
698 | 0 | VLOG_WITH_PREFIX(2) << "Start transaction " << IsolationLevel_Name(isolation) |
699 | 0 | << ", id: " << txn_->id() |
700 | 0 | << ", kept read time: " << txn_->read_point().GetReadTime(); |
701 | 95.7k | } else { |
702 | 4 | VLOG_WITH_PREFIX(2) << "Start transaction " << IsolationLevel_Name(isolation) |
703 | 4 | << ", id: " << txn_->id() |
704 | 4 | << ", new read time"; |
705 | 95.7k | RETURN_NOT_OK(txn_->Init(isolation)); |
706 | 95.7k | } |
707 | 96.3k | if (saved_priority_) { |
708 | 20.2k | priority = *saved_priority_; |
709 | 20.2k | saved_priority_ = boost::none; |
710 | 20.2k | } |
711 | 96.3k | txn_->SetPriority(priority); |
712 | 96.3k | session_->SetTransaction(txn_); |
713 | | |
714 | 96.3k | return Status::OK(); |
715 | 96.3k | } |
716 | | |
717 | | Result<const TransactionMetadata*> PgClientSession::GetDdlTransactionMetadata( |
718 | 201k | bool use_transaction) { |
719 | 201k | if (!use_transaction) { |
720 | 0 | return nullptr; |
721 | 0 | } |
722 | 201k | if (!ddl_txn_) { |
723 | 5.82k | const auto isolation = FLAGS_ysql_serializable_isolation_for_ddl_txn |
724 | 5.82k | ? IsolationLevel::SERIALIZABLE_ISOLATION : IsolationLevel::SNAPSHOT_ISOLATION; |
725 | 5.82k | ddl_txn_ = VERIFY_RESULT(transaction_pool_provider_()->TakeAndInit(isolation)); |
726 | 5.82k | ddl_txn_metadata_ = VERIFY_RESULT(Copy(ddl_txn_->GetMetadata().get())); |
727 | 5.82k | ddl_session_->SetTransaction(ddl_txn_); |
728 | 5.82k | } |
729 | | |
730 | 201k | return &ddl_txn_metadata_; |
731 | 201k | } |
732 | | |
733 | 2.91k | client::YBClient& PgClientSession::client() { |
734 | 2.91k | return client_; |
735 | 2.91k | } |
736 | | |
737 | | Result<client::YBTransactionPtr> PgClientSession::RestartTransaction( |
738 | 0 | client::YBSession* session, client::YBTransaction* transaction) { |
739 | 0 | if (!transaction) { |
740 | 0 | SCHECK(session->IsRestartRequired(), IllegalState, |
741 | 0 | "Attempted to restart when session does not require restart"); |
742 | |
|
743 | 0 | const auto old_read_time = session->read_point()->GetReadTime(); |
744 | 0 | session->SetReadPoint(client::Restart::kTrue); |
745 | 0 | const auto new_read_time = session->read_point()->GetReadTime(); |
746 | 0 | VLOG_WITH_PREFIX(3) << "Restarted read: " << old_read_time << " => " << new_read_time; |
747 | 0 | LOG_IF_WITH_PREFIX(DFATAL, old_read_time == new_read_time) |
748 | 0 | << "Read time did not change during restart: " << old_read_time << " => " << new_read_time; |
749 | 0 | return nullptr; |
750 | 0 | } |
751 | | |
752 | 0 | if (!transaction->IsRestartRequired()) { |
753 | 0 | return STATUS(IllegalState, "Attempted to restart when transaction does not require restart"); |
754 | 0 | } |
755 | 0 | const auto result = VERIFY_RESULT(transaction->CreateRestartedTransaction()); |
756 | 0 | session->SetTransaction(result); |
757 | 0 | VLOG_WITH_PREFIX(3) << "Restarted transaction"; |
758 | 0 | return result; |
759 | 0 | } |
760 | | |
761 | | } // namespace tserver |
762 | | } // namespace yb |