/Users/deen/code/yugabyte-db/src/yb/tserver/pg_client_session.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tserver/pg_client_session.h" |
15 | | |
16 | | #include "yb/client/batcher.h" |
17 | | #include "yb/client/client.h" |
18 | | #include "yb/client/error.h" |
19 | | #include "yb/client/namespace_alterer.h" |
20 | | #include "yb/client/session.h" |
21 | | #include "yb/client/table.h" |
22 | | #include "yb/client/table_alterer.h" |
23 | | #include "yb/client/transaction.h" |
24 | | #include "yb/client/transaction_pool.h" |
25 | | #include "yb/client/yb_op.h" |
26 | | |
27 | | #include "yb/common/ql_type.h" |
28 | | #include "yb/common/pgsql_error.h" |
29 | | #include "yb/common/transaction_error.h" |
30 | | #include "yb/common/schema.h" |
31 | | #include "yb/common/wire_protocol.h" |
32 | | |
33 | | #include "yb/gutil/casts.h" |
34 | | |
35 | | #include "yb/rpc/rpc_context.h" |
36 | | |
37 | | #include "yb/tserver/pg_client.pb.h" |
38 | | #include "yb/tserver/pg_create_table.h" |
39 | | #include "yb/tserver/pg_table_cache.h" |
40 | | |
41 | | #include "yb/util/logging.h" |
42 | | #include "yb/util/result.h" |
43 | | #include "yb/util/scope_exit.h" |
44 | | #include "yb/util/status_format.h" |
45 | | #include "yb/util/string_util.h" |
46 | | #include "yb/util/yb_pg_errcodes.h" |
47 | | |
48 | | #include "yb/yql/pggate/util/pg_doc_data.h" |
49 | | |
50 | | DECLARE_bool(ysql_serializable_isolation_for_ddl_txn); |
51 | | |
52 | | namespace yb { |
53 | | namespace tserver { |
54 | | |
55 | | namespace { |
56 | | |
57 | | constexpr const size_t kPgSequenceLastValueColIdx = 2; |
58 | | constexpr const size_t kPgSequenceIsCalledColIdx = 3; |
59 | | |
60 | 0 | std::string SessionLogPrefix(uint64_t id) { |
61 | 0 | return Format("S $0: ", id); |
62 | 0 | } |
63 | | |
64 | 40.2k | string GetStatusStringSet(const client::CollectedErrors& errors) { |
65 | 40.2k | std::set<string> status_strings; |
66 | 881k | for (const auto& error : errors) { |
67 | 881k | status_strings.insert(error->status().ToString()); |
68 | 881k | } |
69 | 40.2k | return RangeToString(status_strings.begin(), status_strings.end()); |
70 | 40.2k | } |
71 | | |
72 | 93.1k | bool IsHomogeneousErrors(const client::CollectedErrors& errors) { |
73 | 93.1k | if (errors.size() < 2) { |
74 | 52.8k | return true; |
75 | 52.8k | } |
76 | 40.2k | auto i = errors.begin(); |
77 | 40.2k | const auto& status = (**i).status(); |
78 | 40.2k | const auto codes = status.ErrorCodesSlice(); |
79 | 529k | for (++i; i != errors.end(); ++i488k ) { |
80 | 514k | const auto& s = (**i).status(); |
81 | 514k | if (s.code() != status.code() || codes != s.ErrorCodesSlice()514k ) { |
82 | 25.7k | return false; |
83 | 25.7k | } |
84 | 514k | } |
85 | 14.4k | return true; |
86 | 40.2k | } |
87 | | |
88 | 660k | boost::optional<YBPgErrorCode> PsqlErrorCode(const Status& status) { |
89 | 660k | const uint8_t* err_data = status.ErrorData(PgsqlErrorTag::kCategory); |
90 | 660k | if (err_data) { |
91 | 333k | return PgsqlErrorTag::Decode(err_data); |
92 | 333k | } |
93 | 327k | return boost::none; |
94 | 660k | } |
95 | | |
96 | | // Get a common Postgres error code from the status and all errors, and append it to a previous |
97 | | // Status. |
98 | | // If any of those have different conflicting error codes, previous result is returned as-is. |
99 | | CHECKED_STATUS AppendPsqlErrorCode(const Status& status, |
100 | 25.7k | const client::CollectedErrors& errors) { |
101 | 25.7k | boost::optional<YBPgErrorCode> common_psql_error = boost::make_optional(false, YBPgErrorCode()); |
102 | 660k | for(const auto& error : errors) { |
103 | 660k | const auto psql_error = PsqlErrorCode(error->status()); |
104 | 660k | if (!common_psql_error) { |
105 | 173k | common_psql_error = psql_error; |
106 | 487k | } else if (psql_error && common_psql_error != psql_error307k ) { |
107 | 0 | common_psql_error = boost::none; |
108 | 0 | break; |
109 | 0 | } |
110 | 660k | } |
111 | 25.7k | return common_psql_error ? status.CloneAndAddErrorCode(PgsqlError(*common_psql_error)) : status0 ; |
112 | 25.7k | } |
113 | | |
114 | | // Get a common transaction error code for all the errors and append it to the previous Status. |
115 | 25.7k | CHECKED_STATUS AppendTxnErrorCode(const Status& status, const client::CollectedErrors& errors) { |
116 | 25.7k | TransactionErrorCode common_txn_error = TransactionErrorCode::kNone; |
117 | 660k | for (const auto& error : errors) { |
118 | 660k | const TransactionErrorCode txn_error = TransactionError(error->status()).value(); |
119 | 660k | if (txn_error == TransactionErrorCode::kNone || |
120 | 660k | txn_error == common_txn_error660k ) { |
121 | 449k | continue; |
122 | 449k | } |
123 | 211k | if (common_txn_error == TransactionErrorCode::kNone) { |
124 | 25.7k | common_txn_error = txn_error; |
125 | 25.7k | continue; |
126 | 25.7k | } |
127 | | // If we receive a list of errors, with one as kConflict and others as kAborted, we retain the |
128 | | // error as kConflict, since in case of a batched request the first operation would receive the |
129 | | // kConflict and all the others would receive the kAborted error. |
130 | 185k | if ((txn_error == TransactionErrorCode::kConflict && |
131 | 185k | common_txn_error == TransactionErrorCode::kAborted13.2k ) || |
132 | 185k | (172k txn_error == TransactionErrorCode::kAborted172k && |
133 | 185k | common_txn_error == TransactionErrorCode::kConflict172k )) { |
134 | 185k | common_txn_error = TransactionErrorCode::kConflict; |
135 | 185k | continue; |
136 | 185k | } |
137 | | |
138 | | // In all the other cases, reset the common_txn_error to kNone. |
139 | 18.4E | common_txn_error = TransactionErrorCode::kNone; |
140 | 18.4E | break; |
141 | 185k | } |
142 | | |
143 | 25.7k | return (common_txn_error != TransactionErrorCode::kNone) ? |
144 | 18.4E | status.CloneAndAddErrorCode(TransactionError(common_txn_error))25.7k : status; |
145 | 25.7k | } |
146 | | |
147 | | // Given a set of errors from operations, this function attempts to combine them into one status |
148 | | // that is later passed to PostgreSQL and further converted into a more specific error code. |
149 | 2.17M | CHECKED_STATUS CombineErrorsToStatus(const client::CollectedErrors& errors, const Status& status) { |
150 | 2.17M | if (errors.empty()) |
151 | 2.07M | return status; |
152 | | |
153 | 93.4k | if (status.IsIOError() && |
154 | | // TODO: move away from string comparison here and use a more specific status than IOError. |
155 | | // See https://github.com/YugaByte/yugabyte-db/issues/702 |
156 | 93.4k | status.message() == client::internal::Batcher::kErrorReachingOutToTServersMsg93.1k && |
157 | 93.4k | IsHomogeneousErrors(errors)93.1k ) { |
158 | 67.3k | const auto& result = errors.front()->status(); |
159 | 67.3k | if (errors.size() == 1) { |
160 | 52.8k | return result; |
161 | 52.8k | } |
162 | 14.5k | return Status(result.code(), |
163 | 14.5k | __FILE__, |
164 | 14.5k | __LINE__, |
165 | 14.5k | GetStatusStringSet(errors), |
166 | 14.5k | result.ErrorCodesSlice(), |
167 | 14.5k | DupFileName::kFalse); |
168 | 67.3k | } |
169 | | |
170 | 26.0k | Status result = |
171 | 26.0k | status.ok() |
172 | 26.0k | ? STATUS0 (InternalError, GetStatusStringSet(errors)) |
173 | 26.0k | : status.CloneAndAppend(". Errors from tablet servers: " + GetStatusStringSet(errors)); |
174 | | |
175 | 26.0k | return AppendTxnErrorCode(AppendPsqlErrorCode(result, errors), errors); |
176 | 93.4k | } |
177 | | |
178 | 10.3M | Status HandleResponse(const client::YBPgsqlOp& op, PgPerformResponsePB* resp) { |
179 | 10.3M | const auto& response = op.response(); |
180 | 10.3M | if (response.status() == PgsqlResponsePB::PGSQL_STATUS_OK) { |
181 | 10.3M | if (op.read_only() && op.table()->schema().table_properties().is_ysql_catalog_table()3.18M ) { |
182 | 1.25M | const auto& pgsql_op = down_cast<const client::YBPgsqlReadOp&>(op); |
183 | 1.25M | if (pgsql_op.used_read_time()) { |
184 | | // Non empty used_read_time field in catalog read operation means this is the very first |
185 | | // catalog read operation after catalog read time resetting. read_time for the operation |
186 | | // has been chosen by master. All further reads from catalog must use same read point. |
187 | 77.1k | auto catalog_read_time = pgsql_op.used_read_time(); |
188 | | |
189 | | // We set global limit to local limit to avoid read restart errors because they are |
190 | | // disruptive to system catalog reads and it is not always possible to handle them there. |
191 | | // This might lead to reading slightly outdated state of the system catalog if a recently |
192 | | // committed DDL transaction used a transaction status tablet whose leader's clock is skewed |
193 | | // and is in the future compared to the master leader's clock. |
194 | | // TODO(dmitry) This situation will be handled in context of #7964. |
195 | 77.1k | catalog_read_time.global_limit = catalog_read_time.local_limit; |
196 | 77.1k | catalog_read_time.ToPB(resp->mutable_catalog_read_time()); |
197 | 77.1k | } |
198 | 1.25M | } |
199 | 10.3M | return Status::OK(); |
200 | 10.3M | } |
201 | | |
202 | 12.4k | auto status = STATUS( |
203 | 12.4k | QLError, response.error_message(), Slice(), PgsqlRequestStatus(response.status())); |
204 | | |
205 | 12.4k | if (response.has_pg_error_code()) { |
206 | 1.85k | status = status.CloneAndAddErrorCode( |
207 | 1.85k | PgsqlError(static_cast<YBPgErrorCode>(response.pg_error_code()))); |
208 | 1.85k | } |
209 | | |
210 | 12.4k | if (response.has_txn_error_code()) { |
211 | 1.85k | status = status.CloneAndAddErrorCode( |
212 | 1.85k | TransactionError(static_cast<TransactionErrorCode>(response.txn_error_code()))); |
213 | 1.85k | } |
214 | | |
215 | 12.4k | return status; |
216 | 10.3M | } |
217 | | |
218 | 11.5M | CHECKED_STATUS GetTable(const TableId& table_id, PgTableCache* cache, client::YBTablePtr* table) { |
219 | 11.5M | if (*table && (**table).id() == table_id9.39M ) { |
220 | 8.00M | return Status::OK(); |
221 | 8.00M | } |
222 | 3.55M | *table = VERIFY_RESULT3.55M (3.55M cache->Get(table_id)); |
223 | 0 | return Status::OK(); |
224 | 3.55M | } |
225 | | |
226 | | Result<PgClientSessionOperations> PrepareOperations( |
227 | 2.16M | const PgPerformRequestPB& req, client::YBSession* session, PgTableCache* table_cache) { |
228 | 2.16M | auto write_time = HybridTime::FromPB(req.write_time()); |
229 | 2.16M | std::vector<std::shared_ptr<client::YBPgsqlOp>> ops; |
230 | 2.16M | ops.reserve(req.ops().size()); |
231 | 2.16M | client::YBTablePtr table; |
232 | 2.16M | bool finished = false; |
233 | 2.17M | auto se = ScopeExit([&finished, session] { |
234 | 2.17M | if (!finished) { |
235 | 12 | session->Abort(); |
236 | 12 | } |
237 | 2.17M | }); |
238 | 11.5M | for (const auto& op : req.ops()) { |
239 | 11.5M | if (op.has_read()) { |
240 | 4.35M | const auto& read = op.read(); |
241 | 4.35M | RETURN_NOT_OK(GetTable(read.table_id(), table_cache, &table)); |
242 | 4.35M | const auto read_op = std::make_shared<client::YBPgsqlReadOp>( |
243 | 4.35M | table, const_cast<PgsqlReadRequestPB*>(&read)); |
244 | 4.35M | if (op.read_from_followers()) { |
245 | 93 | read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
246 | 93 | } |
247 | 4.35M | ops.push_back(read_op); |
248 | 4.35M | session->Apply(std::move(read_op)); |
249 | 7.20M | } else { |
250 | 7.20M | const auto& write = op.write(); |
251 | 7.20M | RETURN_NOT_OK(GetTable(write.table_id(), table_cache, &table)); |
252 | 7.20M | const auto write_op = std::make_shared<client::YBPgsqlWriteOp>( |
253 | 7.20M | table, const_cast<PgsqlWriteRequestPB*>(&write)); |
254 | 7.20M | if (write_time) { |
255 | 1.26k | write_op->SetWriteTime(write_time); |
256 | 1.26k | write_time = HybridTime::kInvalid; |
257 | 1.26k | } |
258 | 7.20M | ops.push_back(write_op); |
259 | 7.20M | session->Apply(std::move(write_op)); |
260 | 7.20M | } |
261 | 11.5M | } |
262 | 2.16M | finished = true; |
263 | 2.16M | return ops; |
264 | 2.16M | } |
265 | | |
266 | | struct PerformData { |
267 | | uint64_t session_id; |
268 | | const PgPerformRequestPB* req; |
269 | | PgPerformResponsePB* resp; |
270 | | rpc::RpcContext context; |
271 | | PgClientSessionOperations ops; |
272 | | PgTableCache* table_cache; |
273 | | |
274 | 2.17M | void FlushDone(client::FlushStatus* flush_status) { |
275 | 2.17M | auto status = CombineErrorsToStatus(flush_status->errors, flush_status->status); |
276 | 2.17M | if (status.ok()) { |
277 | 2.07M | status = ProcessResponse(); |
278 | 2.07M | } |
279 | 2.17M | if (!status.ok()) { |
280 | 106k | StatusToPB(status, resp->mutable_status()); |
281 | 106k | } |
282 | 2.17M | context.RespondSuccess(); |
283 | 2.17M | } |
284 | | |
285 | 2.07M | CHECKED_STATUS ProcessResponse() { |
286 | 2.07M | int idx = 0; |
287 | 10.3M | for (const auto& op : ops) { |
288 | 10.3M | const auto status = HandleResponse(*op, resp); |
289 | 10.3M | if (!status.ok()) { |
290 | 13.0k | if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH) { |
291 | 79 | table_cache->Invalidate(op->table()->id()); |
292 | 79 | } |
293 | 18.4E | VLOG(2) << SessionLogPrefix(session_id) << "Failed op " << idx << ": " << status; |
294 | 13.0k | return status.CloneAndAddErrorCode(OpIndex(idx)); |
295 | 13.0k | } |
296 | 10.3M | const auto& req_op = req->ops()[idx]; |
297 | 10.3M | if (req_op.has_read() && req_op.read().is_for_backfill()3.18M && |
298 | 10.3M | op->response().is_backfill_batch_done()2.33k ) { |
299 | | // After backfill table schema version is updated, so we reset cache in advance. |
300 | 2.25k | table_cache->Invalidate(op->table()->id()); |
301 | 2.25k | } |
302 | 10.3M | ++idx; |
303 | 10.3M | } |
304 | | |
305 | 2.06M | auto& responses = *resp->mutable_responses(); |
306 | 2.06M | responses.Reserve(narrow_cast<int>(ops.size())); |
307 | 10.3M | for (const auto& op : ops) { |
308 | 10.3M | auto& op_resp = *responses.Add(); |
309 | 10.3M | op_resp.Swap(op->mutable_response()); |
310 | 10.3M | if (op_resp.has_rows_data_sidecar()10.3M ) { |
311 | 10.3M | op_resp.set_rows_data_sidecar(narrow_cast<int>(context.AddRpcSidecar(op->rows_data()))); |
312 | 10.3M | } |
313 | 10.3M | } |
314 | | |
315 | 2.06M | return Status::OK(); |
316 | 2.07M | } |
317 | | }; |
318 | | |
319 | | client::YBSessionPtr CreateSession( |
320 | 12.6k | client::YBClient* client, const scoped_refptr<ClockBase>& clock) { |
321 | 12.6k | auto result = std::make_shared<client::YBSession>(client, clock); |
322 | 12.6k | result->SetForceConsistentRead(client::ForceConsistentRead::kTrue); |
323 | 12.6k | result->set_allow_local_calls_in_curr_thread(false); |
324 | 12.6k | return result; |
325 | 12.6k | } |
326 | | |
327 | | } // namespace |
328 | | |
329 | | PgClientSession::PgClientSession( |
330 | | client::YBClient* client, const scoped_refptr<ClockBase>& clock, |
331 | | std::reference_wrapper<const TransactionPoolProvider> transaction_pool_provider, |
332 | | PgTableCache* table_cache, uint64_t id) |
333 | | : client_(*client), |
334 | | clock_(clock), |
335 | | transaction_pool_provider_(transaction_pool_provider.get()), |
336 | 6.09k | table_cache_(*table_cache), id_(id) { |
337 | 6.09k | } |
338 | | |
339 | 2.49M | uint64_t PgClientSession::id() const { |
340 | 2.49M | return id_; |
341 | 2.49M | } |
342 | | |
343 | | Status PgClientSession::CreateTable( |
344 | 5.05k | const PgCreateTableRequestPB& req, PgCreateTableResponsePB* resp, rpc::RpcContext* context) { |
345 | 5.05k | PgCreateTable helper(req); |
346 | 5.05k | RETURN_NOT_OK(helper.Prepare()); |
347 | 5.05k | const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata( |
348 | 5.05k | req.use_transaction(), context->GetClientDeadline())); |
349 | 5.05k | RETURN_NOT_OK(helper.Exec(&client(), metadata, context->GetClientDeadline())); |
350 | 5.03k | VLOG_WITH_PREFIX2 (1) << __func__ << ": " << req.table_name()2 ; |
351 | 5.03k | const auto& indexed_table_id = helper.indexed_table_id(); |
352 | 5.03k | if (indexed_table_id.IsValid()) { |
353 | 855 | table_cache_.Invalidate(indexed_table_id.GetYbTableId()); |
354 | 855 | } |
355 | 5.03k | return Status::OK(); |
356 | 5.05k | } |
357 | | |
358 | | Status PgClientSession::CreateDatabase( |
359 | | const PgCreateDatabaseRequestPB& req, PgCreateDatabaseResponsePB* resp, |
360 | 134 | rpc::RpcContext* context) { |
361 | 134 | return client().CreateNamespace( |
362 | 134 | req.database_name(), |
363 | 134 | YQL_DATABASE_PGSQL, |
364 | 134 | "" /* creator_role_name */, |
365 | 134 | GetPgsqlNamespaceId(req.database_oid()), |
366 | 134 | req.source_database_oid() != kPgInvalidOid |
367 | 134 | ? GetPgsqlNamespaceId(req.source_database_oid())123 : ""11 , |
368 | 134 | req.next_oid(), |
369 | 134 | VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction(), context->GetClientDeadline())), |
370 | 0 | req.colocated(), |
371 | 134 | context->GetClientDeadline()); |
372 | 134 | } |
373 | | |
374 | | Status PgClientSession::DropDatabase( |
375 | 72 | const PgDropDatabaseRequestPB& req, PgDropDatabaseResponsePB* resp, rpc::RpcContext* context) { |
376 | 72 | return client().DeleteNamespace( |
377 | 72 | req.database_name(), |
378 | 72 | YQL_DATABASE_PGSQL, |
379 | 72 | GetPgsqlNamespaceId(req.database_oid()), |
380 | 72 | context->GetClientDeadline()); |
381 | 72 | } |
382 | | |
383 | | Status PgClientSession::DropTable( |
384 | 4.14k | const PgDropTableRequestPB& req, PgDropTableResponsePB* resp, rpc::RpcContext* context) { |
385 | 4.14k | const auto yb_table_id = PgObjectId::GetYbTableIdFromPB(req.table_id()); |
386 | 4.14k | if (req.index()) { |
387 | 669 | client::YBTableName indexed_table; |
388 | 669 | RETURN_NOT_OK(client().DeleteIndexTable( |
389 | 669 | yb_table_id, &indexed_table, true, context->GetClientDeadline())); |
390 | 667 | indexed_table.SetIntoTableIdentifierPB(resp->mutable_indexed_table()); |
391 | 667 | table_cache_.Invalidate(indexed_table.table_id()); |
392 | 667 | table_cache_.Invalidate(yb_table_id); |
393 | 667 | return Status::OK(); |
394 | 669 | } |
395 | | |
396 | 3.48k | RETURN_NOT_OK(client().DeleteTable(yb_table_id, true, context->GetClientDeadline())); |
397 | 3.47k | table_cache_.Invalidate(yb_table_id); |
398 | 3.47k | return Status::OK(); |
399 | 3.48k | } |
400 | | |
401 | | Status PgClientSession::AlterDatabase( |
402 | | const PgAlterDatabaseRequestPB& req, PgAlterDatabaseResponsePB* resp, |
403 | 3 | rpc::RpcContext* context) { |
404 | 3 | const auto alterer = client().NewNamespaceAlterer( |
405 | 3 | req.database_name(), GetPgsqlNamespaceId(req.database_oid())); |
406 | 3 | alterer->SetDatabaseType(YQL_DATABASE_PGSQL); |
407 | 3 | alterer->RenameTo(req.new_name()); |
408 | 3 | return alterer->Alter(context->GetClientDeadline()); |
409 | 3 | } |
410 | | |
411 | | Status PgClientSession::AlterTable( |
412 | 522 | const PgAlterTableRequestPB& req, PgAlterTableResponsePB* resp, rpc::RpcContext* context) { |
413 | 522 | const auto table_id = PgObjectId::GetYbTableIdFromPB(req.table_id()); |
414 | 522 | const auto alterer = client().NewTableAlterer(table_id); |
415 | 522 | const auto txn = VERIFY_RESULT(GetDdlTransactionMetadata( |
416 | 522 | req.use_transaction(), context->GetClientDeadline())); |
417 | 522 | if (txn) { |
418 | 522 | alterer->part_of_transaction(txn); |
419 | 522 | } |
420 | 522 | for (const auto& add_column : req.add_columns()) { |
421 | 234 | const auto yb_type = QLType::Create(static_cast<DataType>(add_column.attr_ybtype())); |
422 | 234 | alterer->AddColumn(add_column.attr_name()) |
423 | 234 | ->Type(yb_type)->Order(add_column.attr_num())->PgTypeOid(add_column.attr_pgoid()); |
424 | | // Do not set 'nullable' attribute as PgCreateTable::AddColumn() does not do it. |
425 | 234 | } |
426 | 522 | for (const auto& rename_column : req.rename_columns()) { |
427 | 15 | alterer->AlterColumn(rename_column.old_name())->RenameTo(rename_column.new_name()); |
428 | 15 | } |
429 | 522 | for (const auto& drop_column : req.drop_columns()) { |
430 | 177 | alterer->DropColumn(drop_column); |
431 | 177 | } |
432 | 522 | if (!req.rename_table().table_name().empty()) { |
433 | 115 | client::YBTableName new_table_name( |
434 | 115 | YQL_DATABASE_PGSQL, req.rename_table().database_name(), req.rename_table().table_name()); |
435 | 115 | alterer->RenameTo(new_table_name); |
436 | 115 | } |
437 | | |
438 | 522 | alterer->timeout(context->GetClientDeadline() - CoarseMonoClock::now()); |
439 | 522 | RETURN_NOT_OK(alterer->Alter()); |
440 | 520 | table_cache_.Invalidate(table_id); |
441 | 520 | return Status::OK(); |
442 | 522 | } |
443 | | |
444 | | Status PgClientSession::TruncateTable( |
445 | | const PgTruncateTableRequestPB& req, PgTruncateTableResponsePB* resp, |
446 | 624 | rpc::RpcContext* context) { |
447 | 624 | return client().TruncateTable(PgObjectId::GetYbTableIdFromPB(req.table_id())); |
448 | 624 | } |
449 | | |
450 | | Status PgClientSession::BackfillIndex( |
451 | | const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp, |
452 | 540 | rpc::RpcContext* context) { |
453 | 540 | return client().BackfillIndex( |
454 | 540 | PgObjectId::GetYbTableIdFromPB(req.table_id()), /* wait= */ true, |
455 | 540 | context->GetClientDeadline()); |
456 | 540 | } |
457 | | |
458 | | Status PgClientSession::CreateTablegroup( |
459 | | const PgCreateTablegroupRequestPB& req, PgCreateTablegroupResponsePB* resp, |
460 | 54 | rpc::RpcContext* context) { |
461 | 54 | const auto id = PgObjectId::FromPB(req.tablegroup_id()); |
462 | 54 | auto tablespace_id = PgObjectId::FromPB(req.tablespace_id()); |
463 | 54 | auto s = client().CreateTablegroup( |
464 | 54 | req.database_name(), GetPgsqlNamespaceId(id.database_oid), |
465 | 54 | id.GetYbTablegroupId(), |
466 | 54 | tablespace_id.IsValid() ? tablespace_id.GetYbTablespaceId()10 : ""44 ); |
467 | 54 | if (s.ok()) { |
468 | 52 | return Status::OK(); |
469 | 52 | } |
470 | | |
471 | 2 | if (s.IsAlreadyPresent()) { |
472 | 0 | return STATUS(InvalidArgument, "Duplicate tablegroup"); |
473 | 0 | } |
474 | | |
475 | 2 | if (s.IsNotFound()) { |
476 | 0 | return STATUS(InvalidArgument, "Database not found", req.database_name()); |
477 | 0 | } |
478 | | |
479 | 2 | return STATUS_FORMAT( |
480 | 2 | InvalidArgument, "Invalid table definition: $0", |
481 | 2 | s.ToString(false /* include_file_and_line */, false /* include_code */)); |
482 | 2 | } |
483 | | |
484 | | Status PgClientSession::DropTablegroup( |
485 | | const PgDropTablegroupRequestPB& req, PgDropTablegroupResponsePB* resp, |
486 | 39 | rpc::RpcContext* context) { |
487 | 39 | const auto id = PgObjectId::FromPB(req.tablegroup_id()); |
488 | 39 | const auto status = client().DeleteTablegroup( |
489 | 39 | GetPgsqlNamespaceId(id.database_oid), |
490 | 39 | GetPgsqlTablegroupId(id.database_oid, id.object_oid)); |
491 | 39 | if (status.IsNotFound()) { |
492 | 0 | return Status::OK(); |
493 | 0 | } |
494 | 39 | return status; |
495 | 39 | } |
496 | | |
497 | | Status PgClientSession::RollbackSubTransaction( |
498 | | const PgRollbackSubTransactionRequestPB& req, PgRollbackSubTransactionResponsePB* resp, |
499 | 13.5k | rpc::RpcContext* context) { |
500 | 13.5k | VLOG_WITH_PREFIX_AND_FUNC0 (2) << req.ShortDebugString()0 ; |
501 | 13.5k | SCHECK(Transaction(PgClientSessionKind::kPlain), IllegalState, |
502 | 13.5k | Format("Rollback sub transaction $0, when not transaction is running", |
503 | 13.5k | req.sub_transaction_id())); |
504 | 13.5k | return Transaction(PgClientSessionKind::kPlain)->RollbackSubTransaction(req.sub_transaction_id()); |
505 | 13.5k | } |
506 | | |
507 | | Status PgClientSession::SetActiveSubTransaction( |
508 | | const PgSetActiveSubTransactionRequestPB& req, PgSetActiveSubTransactionResponsePB* resp, |
509 | 61.7k | rpc::RpcContext* context) { |
510 | 61.7k | VLOG_WITH_PREFIX_AND_FUNC23 (2) << req.ShortDebugString()23 ; |
511 | | |
512 | 61.7k | if (req.has_options()) { |
513 | 7.28k | RETURN_NOT_OK(BeginTransactionIfNecessary(req.options(), context->GetClientDeadline())); |
514 | 7.28k | txn_serial_no_ = req.options().txn_serial_no(); |
515 | 7.28k | } |
516 | | |
517 | 61.7k | SCHECK(Transaction(PgClientSessionKind::kPlain), IllegalState, |
518 | 61.7k | Format("Set active sub transaction $0, when not transaction is running", |
519 | 61.7k | req.sub_transaction_id())); |
520 | | |
521 | 61.7k | Transaction(PgClientSessionKind::kPlain)->SetActiveSubTransaction(req.sub_transaction_id()); |
522 | 61.7k | return Status::OK(); |
523 | 61.7k | } |
524 | | |
525 | | Status PgClientSession::FinishTransaction( |
526 | | const PgFinishTransactionRequestPB& req, PgFinishTransactionResponsePB* resp, |
527 | 218k | rpc::RpcContext* context) { |
528 | 218k | saved_priority_ = boost::none; |
529 | 218k | auto kind = req.ddl_mode() ? PgClientSessionKind::kDdl20.3k : PgClientSessionKind::kPlain198k ; |
530 | 218k | auto& txn = Transaction(kind); |
531 | 218k | if (!txn) { |
532 | 1.55k | VLOG_WITH_PREFIX_AND_FUNC0 (2) << "ddl: " << req.ddl_mode() << ", no running transaction"0 ; |
533 | 1.55k | return Status::OK(); |
534 | 1.55k | } |
535 | 216k | const auto txn_value = std::move(txn); |
536 | 216k | Session(kind)->SetTransaction(nullptr); |
537 | | |
538 | 216k | if (req.commit()) { |
539 | 191k | const auto commit_status = txn_value->CommitFuture().get(); |
540 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(2) |
541 | 18.4E | << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() |
542 | 18.4E | << ", commit: " << commit_status; |
543 | 191k | return commit_status; |
544 | 191k | } |
545 | | |
546 | 25.1k | VLOG_WITH_PREFIX_AND_FUNC12 (2) |
547 | 12 | << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", abort"; |
548 | 25.1k | txn_value->Abort(); |
549 | 25.1k | return Status::OK(); |
550 | 216k | } |
551 | | |
552 | | Status PgClientSession::Perform( |
553 | 2.17M | const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context) { |
554 | 2.17M | auto session = VERIFY_RESULT2.17M (SetupSession(req, context->GetClientDeadline()));2.17M |
555 | | |
556 | 2.17M | auto ops = VERIFY_RESULT(PrepareOperations(req, session, &table_cache_)); |
557 | 0 | auto data = std::make_shared<PerformData>(PerformData { |
558 | 2.17M | .session_id = id_, |
559 | 2.17M | .req = &req, |
560 | 2.17M | .resp = resp, |
561 | 2.17M | .context = std::move(*context), |
562 | 2.17M | .ops = std::move(ops), |
563 | 2.17M | .table_cache = &table_cache_, |
564 | 2.17M | }); |
565 | 2.17M | session->FlushAsync([data](client::FlushStatus* flush_status) { |
566 | 2.17M | data->FlushDone(flush_status); |
567 | 2.17M | }); |
568 | 2.17M | return Status::OK(); |
569 | 2.17M | } |
570 | | |
571 | 2.17M | void PgClientSession::ProcessReadTimeManipulation(ReadTimeManipulation manipulation) { |
572 | 2.17M | switch (manipulation) { |
573 | 15.6k | case ReadTimeManipulation::RESET: { |
574 | | // If a txn_ has been created, session_->read_point() returns the read point stored in txn_. |
575 | 15.6k | ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point(); |
576 | 15.6k | rp->SetCurrentReadTime(); |
577 | | |
578 | 15.6k | VLOG(1) << "Setting current ht as read point " << rp->GetReadTime()0 ; |
579 | 15.6k | } |
580 | 15.6k | return; |
581 | 163 | case ReadTimeManipulation::RESTART: { |
582 | 163 | ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point(); |
583 | 163 | rp->Restart(); |
584 | | |
585 | 163 | VLOG(1) << "Restarted read point " << rp->GetReadTime()0 ; |
586 | 163 | } |
587 | 163 | return; |
588 | 2.15M | case ReadTimeManipulation::NONE: |
589 | 2.15M | return; |
590 | 0 | case ReadTimeManipulation::ReadTimeManipulation_INT_MIN_SENTINEL_DO_NOT_USE_: |
591 | 0 | case ReadTimeManipulation::ReadTimeManipulation_INT_MAX_SENTINEL_DO_NOT_USE_: |
592 | 0 | break; |
593 | 2.17M | } |
594 | 0 | FATAL_INVALID_ENUM_VALUE(ReadTimeManipulation, manipulation); |
595 | 0 | } |
596 | | |
597 | | Result<client::YBSession*> PgClientSession::SetupSession( |
598 | 2.17M | const PgPerformRequestPB& req, CoarseTimePoint deadline) { |
599 | | |
600 | 2.17M | const auto& options = req.options(); |
601 | 2.17M | PgClientSessionKind kind; |
602 | 2.17M | if (options.use_catalog_session()) { |
603 | 669k | kind = PgClientSessionKind::kCatalog; |
604 | 669k | EnsureSession(kind); |
605 | 1.50M | } else if (options.ddl_mode()) { |
606 | 566k | kind = PgClientSessionKind::kDdl; |
607 | 566k | EnsureSession(kind); |
608 | 566k | RETURN_NOT_OK(GetDdlTransactionMetadata(true, deadline)); |
609 | 934k | } else { |
610 | 934k | kind = PgClientSessionKind::kPlain; |
611 | 934k | RETURN_NOT_OK(BeginTransactionIfNecessary(options, deadline)); |
612 | 934k | } |
613 | | |
614 | 2.17M | client::YBSession* session = Session(kind).get(); |
615 | 2.17M | client::YBTransaction* transaction = Transaction(kind).get(); |
616 | | |
617 | 2.17M | VLOG_WITH_PREFIX483 (4) << __func__ << ": " << options.ShortDebugString()483 ; |
618 | | |
619 | 2.17M | if (options.restart_transaction()) { |
620 | 510 | if(options.ddl_mode()) { |
621 | 0 | return STATUS(NotSupported, "Not supported to restart DDL transaction"); |
622 | 0 | } |
623 | 510 | Transaction(kind) = VERIFY_RESULT(RestartTransaction(session, transaction)); |
624 | 0 | transaction = Transaction(kind).get(); |
625 | 2.16M | } else { |
626 | 2.16M | ProcessReadTimeManipulation(options.read_time_manipulation()); |
627 | 2.16M | if (options.has_read_time() && |
628 | 2.16M | (702k options.read_time().has_read_ht()702k || options.use_catalog_session()77.1k )) { |
629 | 701k | const auto read_time = options.read_time().has_read_ht() |
630 | 701k | ? ReadHybridTime::FromPB(options.read_time())624k : ReadHybridTime()77.1k ; |
631 | 701k | session->SetReadPoint(read_time); |
632 | 701k | if (read_time) { |
633 | 18.4E | VLOG_WITH_PREFIX(3) << "Read time: " << read_time; |
634 | 624k | } else { |
635 | 77.3k | VLOG_WITH_PREFIX155 (3) << "Reset read time: " << session->read_point()->GetReadTime()155 ; |
636 | 77.3k | } |
637 | 1.46M | } else if (!transaction && |
638 | 1.46M | (466k options.ddl_mode()466k || txn_serial_no_ != options.txn_serial_no()466k )) { |
639 | 233k | session->SetReadPoint(client::Restart::kFalse); |
640 | 18.4E | VLOG_WITH_PREFIX(3) << "New read time: " << session->read_point()->GetReadTime(); |
641 | 1.23M | } else { |
642 | 1.23M | VLOG_WITH_PREFIX48 (3) << "Keep read time: " << session->read_point()->GetReadTime()48 ; |
643 | 1.23M | } |
644 | 2.16M | } |
645 | | |
646 | 2.17M | if (options.defer_read_point()) { |
647 | | // This call is idempotent, meaning it has no effect after the first call. |
648 | 80 | session->DeferReadPoint(); |
649 | 80 | } |
650 | | |
651 | 2.17M | if (!options.ddl_mode() && !options.use_catalog_session()1.60M ) { |
652 | 934k | txn_serial_no_ = options.txn_serial_no(); |
653 | | |
654 | 934k | const auto in_txn_limit = HybridTime::FromPB(options.in_txn_limit_ht()); |
655 | 934k | if (in_txn_limit) { |
656 | 18.4E | VLOG_WITH_PREFIX(3) << "In txn limit: " << in_txn_limit; |
657 | 864k | session->SetInTxnLimit(in_txn_limit); |
658 | 864k | } |
659 | 934k | } |
660 | | |
661 | 2.17M | session->SetDeadline(deadline); |
662 | | |
663 | 2.17M | return session; |
664 | 2.17M | } |
665 | | |
666 | 0 | std::string PgClientSession::LogPrefix() { |
667 | 0 | return SessionLogPrefix(id_); |
668 | 0 | } |
669 | | |
670 | | Status PgClientSession::BeginTransactionIfNecessary( |
671 | 941k | const PgPerformOptionsPB& options, CoarseTimePoint deadline) { |
672 | 941k | const auto isolation = static_cast<IsolationLevel>(options.isolation()); |
673 | | |
674 | 941k | auto priority = options.priority(); |
675 | 941k | auto& session = EnsureSession(PgClientSessionKind::kPlain); |
676 | 941k | auto& txn = Transaction(PgClientSessionKind::kPlain); |
677 | 941k | if (txn && txn_serial_no_ != options.txn_serial_no()260k ) { |
678 | 18.4E | VLOG_WITH_PREFIX(2) |
679 | 18.4E | << "Abort previous transaction, use existing priority: " << options.use_existing_priority() |
680 | 18.4E | << ", new isolation: " << IsolationLevel_Name(isolation); |
681 | | |
682 | 69.0k | if (options.use_existing_priority()68.9k ) { |
683 | 69.0k | saved_priority_ = txn->GetPriority(); |
684 | 69.0k | } |
685 | 68.9k | txn->Abort(); |
686 | 68.9k | session->SetTransaction(nullptr); |
687 | 68.9k | txn = nullptr; |
688 | 68.9k | } |
689 | | |
690 | 941k | if (isolation == IsolationLevel::NON_TRANSACTIONAL) { |
691 | 483k | return Status::OK(); |
692 | 483k | } |
693 | | |
694 | 458k | if (txn) { |
695 | 191k | return txn->isolation() != isolation |
696 | 191k | ? STATUS_FORMAT0 ( |
697 | 191k | IllegalState, |
698 | 191k | "Attempt to change isolation level of running transaction from $0 to $1", |
699 | 191k | txn->isolation(), isolation) |
700 | 191k | : Status::OK(); |
701 | 191k | } |
702 | | |
703 | 267k | txn = transaction_pool_provider_()->Take( |
704 | 267k | client::ForceGlobalTransaction(options.force_global_transaction()), deadline); |
705 | 267k | if ((isolation == IsolationLevel::SNAPSHOT_ISOLATION || |
706 | 267k | isolation == IsolationLevel::READ_COMMITTED172k ) && |
707 | 267k | txn_serial_no_ == options.txn_serial_no()100k ) { |
708 | 13.4k | txn->InitWithReadPoint(isolation, std::move(*session->read_point())); |
709 | 18.4E | VLOG_WITH_PREFIX(2) << "Start transaction " << IsolationLevel_Name(isolation) |
710 | 18.4E | << ", id: " << txn->id() |
711 | 18.4E | << ", kept read time: " << txn->read_point().GetReadTime(); |
712 | 253k | } else { |
713 | 253k | VLOG_WITH_PREFIX2 (2) << "Start transaction " << IsolationLevel_Name(isolation) |
714 | 2 | << ", id: " << txn->id() |
715 | 2 | << ", new read time"; |
716 | 253k | RETURN_NOT_OK(txn->Init(isolation)); |
717 | 253k | } |
718 | 267k | if (saved_priority_) { |
719 | 69.0k | priority = *saved_priority_; |
720 | 69.0k | saved_priority_ = boost::none; |
721 | 69.0k | } |
722 | 267k | txn->SetPriority(priority); |
723 | 267k | session->SetTransaction(txn); |
724 | | |
725 | 267k | return Status::OK(); |
726 | 267k | } |
727 | | |
728 | | Result<const TransactionMetadata*> PgClientSession::GetDdlTransactionMetadata( |
729 | 572k | bool use_transaction, CoarseTimePoint deadline) { |
730 | 572k | if (!use_transaction) { |
731 | 283 | return nullptr; |
732 | 283 | } |
733 | | |
734 | 571k | auto& txn = Transaction(PgClientSessionKind::kDdl); |
735 | 571k | if (!txn) { |
736 | 18.8k | const auto isolation = FLAGS_ysql_serializable_isolation_for_ddl_txn |
737 | 18.8k | ? IsolationLevel::SERIALIZABLE_ISOLATION0 : IsolationLevel::SNAPSHOT_ISOLATION; |
738 | 18.8k | txn = VERIFY_RESULT(transaction_pool_provider_()->TakeAndInit(isolation, deadline)); |
739 | 18.8k | ddl_txn_metadata_ = VERIFY_RESULT(Copy(txn->GetMetadata(deadline).get())); |
740 | 0 | EnsureSession(PgClientSessionKind::kDdl)->SetTransaction(txn); |
741 | 18.8k | } |
742 | | |
743 | 571k | return &ddl_txn_metadata_; |
744 | 571k | } |
745 | | |
746 | 11.1k | client::YBClient& PgClientSession::client() { |
747 | 11.1k | return client_; |
748 | 11.1k | } |
749 | | |
750 | | Result<client::YBTransactionPtr> PgClientSession::RestartTransaction( |
751 | 510 | client::YBSession* session, client::YBTransaction* transaction) { |
752 | 510 | if (!transaction) { |
753 | 508 | SCHECK(session->IsRestartRequired(), IllegalState, |
754 | 508 | "Attempted to restart when session does not require restart"); |
755 | | |
756 | 508 | const auto old_read_time = session->read_point()->GetReadTime(); |
757 | 508 | session->SetReadPoint(client::Restart::kTrue); |
758 | 508 | const auto new_read_time = session->read_point()->GetReadTime(); |
759 | 508 | VLOG_WITH_PREFIX0 (3) << "Restarted read: " << old_read_time << " => " << new_read_time0 ; |
760 | 508 | LOG_IF_WITH_PREFIX0 (DFATAL, old_read_time == new_read_time) |
761 | 0 | << "Read time did not change during restart: " << old_read_time << " => " << new_read_time; |
762 | 508 | return nullptr; |
763 | 508 | } |
764 | | |
765 | 2 | if (!transaction->IsRestartRequired()) { |
766 | 0 | return STATUS(IllegalState, "Attempted to restart when transaction does not require restart"); |
767 | 0 | } |
768 | 2 | const auto result = VERIFY_RESULT(transaction->CreateRestartedTransaction()); |
769 | 0 | session->SetTransaction(result); |
770 | 2 | VLOG_WITH_PREFIX0 (3) << "Restarted transaction"0 ; |
771 | 2 | return result; |
772 | 2 | } |
773 | | |
774 | | Status PgClientSession::InsertSequenceTuple( |
775 | | const PgInsertSequenceTupleRequestPB& req, PgInsertSequenceTupleResponsePB* resp, |
776 | 295 | rpc::RpcContext* context) { |
777 | 295 | PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
778 | 295 | auto result = table_cache_.Get(table_oid.GetYbTableId()); |
779 | 295 | if (!result.ok()) { |
780 | 92 | RETURN_NOT_OK(CreateSequencesDataTable(&client_, context->GetClientDeadline())); |
781 | | // Try one more time. |
782 | 92 | result = table_cache_.Get(table_oid.GetYbTableId()); |
783 | 92 | } |
784 | 295 | auto table = VERIFY_RESULT(std::move(result)); |
785 | | |
786 | 0 | auto psql_write(client::YBPgsqlWriteOp::NewInsert(table)); |
787 | | |
788 | 295 | auto write_request = psql_write->mutable_request(); |
789 | 295 | write_request->set_ysql_catalog_version(req.ysql_catalog_version()); |
790 | | |
791 | 295 | write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid()); |
792 | 295 | write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid()); |
793 | | |
794 | 295 | PgsqlColumnValuePB* column_value = write_request->add_column_values(); |
795 | 295 | column_value->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
796 | 295 | column_value->mutable_expr()->mutable_value()->set_int64_value(req.last_val()); |
797 | | |
798 | 295 | column_value = write_request->add_column_values(); |
799 | 295 | column_value->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
800 | 295 | column_value->mutable_expr()->mutable_value()->set_bool_value(req.is_called()); |
801 | | |
802 | 295 | auto& session = EnsureSession(PgClientSessionKind::kSequence); |
803 | 295 | session->SetDeadline(context->GetClientDeadline()); |
804 | 295 | return session->ApplyAndFlush(std::move(psql_write)); |
805 | 295 | } |
806 | | |
807 | | Status PgClientSession::UpdateSequenceTuple( |
808 | | const PgUpdateSequenceTupleRequestPB& req, PgUpdateSequenceTupleResponsePB* resp, |
809 | 2.97k | rpc::RpcContext* context) { |
810 | 2.97k | PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
811 | 2.97k | auto table = VERIFY_RESULT(table_cache_.Get(table_oid.GetYbTableId())); |
812 | | |
813 | 0 | std::shared_ptr<client::YBPgsqlWriteOp> psql_write(client::YBPgsqlWriteOp::NewUpdate(table)); |
814 | | |
815 | 2.97k | auto write_request = psql_write->mutable_request(); |
816 | 2.97k | write_request->set_ysql_catalog_version(req.ysql_catalog_version()); |
817 | | |
818 | 2.97k | write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid()); |
819 | 2.97k | write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid()); |
820 | | |
821 | 2.97k | PgsqlColumnValuePB* column_value = write_request->add_column_new_values(); |
822 | 2.97k | column_value->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
823 | 2.97k | column_value->mutable_expr()->mutable_value()->set_int64_value(req.last_val()); |
824 | | |
825 | 2.97k | column_value = write_request->add_column_new_values(); |
826 | 2.97k | column_value->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
827 | 2.97k | column_value->mutable_expr()->mutable_value()->set_bool_value(req.is_called()); |
828 | | |
829 | 2.97k | auto where_pb = write_request->mutable_where_expr()->mutable_condition(); |
830 | | |
831 | 2.97k | if (req.has_expected()) { |
832 | | // WHERE clause => WHERE last_val == expected_last_val AND is_called == expected_is_called. |
833 | 2.95k | where_pb->set_op(QL_OP_AND); |
834 | | |
835 | 2.95k | auto cond = where_pb->add_operands()->mutable_condition(); |
836 | 2.95k | cond->set_op(QL_OP_EQUAL); |
837 | 2.95k | cond->add_operands()->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
838 | 2.95k | cond->add_operands()->mutable_value()->set_int64_value(req.expected_last_val()); |
839 | | |
840 | 2.95k | cond = where_pb->add_operands()->mutable_condition(); |
841 | 2.95k | cond->set_op(QL_OP_EQUAL); |
842 | 2.95k | cond->add_operands()->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
843 | 2.95k | cond->add_operands()->mutable_value()->set_bool_value(req.expected_is_called()); |
844 | 2.95k | } else { |
845 | 29 | where_pb->set_op(QL_OP_EXISTS); |
846 | 29 | } |
847 | | |
848 | | // For compatibility set deprecated column_refs |
849 | 2.97k | write_request->mutable_column_refs()->add_ids( |
850 | 2.97k | table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
851 | 2.97k | write_request->mutable_column_refs()->add_ids( |
852 | 2.97k | table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
853 | | // Same values, to be consumed by current TServers |
854 | 2.97k | write_request->add_col_refs()->set_column_id( |
855 | 2.97k | table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
856 | 2.97k | write_request->add_col_refs()->set_column_id( |
857 | 2.97k | table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
858 | | |
859 | 2.97k | auto& session = EnsureSession(PgClientSessionKind::kSequence); |
860 | 2.97k | session->SetDeadline(context->GetClientDeadline()); |
861 | 2.97k | RETURN_NOT_OK(session->ApplyAndFlush(psql_write)); |
862 | 2.97k | resp->set_skipped(psql_write->response().skipped()); |
863 | 2.97k | return Status::OK(); |
864 | 2.97k | } |
865 | | |
866 | | Status PgClientSession::ReadSequenceTuple( |
867 | | const PgReadSequenceTupleRequestPB& req, PgReadSequenceTupleResponsePB* resp, |
868 | 3.23k | rpc::RpcContext* context) { |
869 | 3.23k | using pggate::PgDocData; |
870 | 3.23k | using pggate::PgWireDataHeader; |
871 | | |
872 | 3.23k | PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
873 | 3.23k | auto table = VERIFY_RESULT(table_cache_.Get(table_oid.GetYbTableId())); |
874 | | |
875 | 0 | std::shared_ptr<client::YBPgsqlReadOp> psql_read(client::YBPgsqlReadOp::NewSelect(table)); |
876 | | |
877 | 3.23k | auto read_request = psql_read->mutable_request(); |
878 | 3.23k | read_request->set_ysql_catalog_version(req.ysql_catalog_version()); |
879 | | |
880 | 3.23k | read_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid()); |
881 | 3.23k | read_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid()); |
882 | | |
883 | 3.23k | read_request->add_targets()->set_column_id( |
884 | 3.23k | table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
885 | 3.23k | read_request->add_targets()->set_column_id( |
886 | 3.23k | table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
887 | | |
888 | | // For compatibility set deprecated column_refs |
889 | 3.23k | read_request->mutable_column_refs()->add_ids( |
890 | 3.23k | table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
891 | 3.23k | read_request->mutable_column_refs()->add_ids( |
892 | 3.23k | table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
893 | | // Same values, to be consumed by current TServers |
894 | 3.23k | read_request->add_col_refs()->set_column_id( |
895 | 3.23k | table->schema().ColumnId(kPgSequenceLastValueColIdx)); |
896 | 3.23k | read_request->add_col_refs()->set_column_id( |
897 | 3.23k | table->schema().ColumnId(kPgSequenceIsCalledColIdx)); |
898 | | |
899 | 3.23k | auto& session = EnsureSession(PgClientSessionKind::kSequence); |
900 | 3.23k | session->SetDeadline(context->GetClientDeadline()); |
901 | 3.23k | RETURN_NOT_OK(session->ReadSync(psql_read)); |
902 | | |
903 | 3.23k | Slice cursor; |
904 | 3.23k | int64_t row_count = 0; |
905 | 3.23k | PgDocData::LoadCache(psql_read->rows_data(), &row_count, &cursor); |
906 | 3.23k | if (row_count == 0) { |
907 | 0 | return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", req.seq_oid()); |
908 | 0 | } |
909 | | |
910 | 3.23k | PgWireDataHeader header = PgDocData::ReadDataHeader(&cursor); |
911 | 3.23k | if (header.is_null()) { |
912 | 0 | return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", req.seq_oid()); |
913 | 0 | } |
914 | 3.23k | int64_t last_val = 0; |
915 | 3.23k | size_t read_size = PgDocData::ReadNumber(&cursor, &last_val); |
916 | 3.23k | cursor.remove_prefix(read_size); |
917 | 3.23k | resp->set_last_val(last_val); |
918 | | |
919 | 3.23k | header = PgDocData::ReadDataHeader(&cursor); |
920 | 3.23k | if (header.is_null()) { |
921 | 0 | return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", req.seq_oid()); |
922 | 0 | } |
923 | 3.23k | bool is_called = false; |
924 | 3.23k | read_size = PgDocData::ReadNumber(&cursor, &is_called); |
925 | 3.23k | resp->set_is_called(is_called); |
926 | 3.23k | return Status::OK(); |
927 | 3.23k | } |
928 | | |
929 | | Status PgClientSession::DeleteSequenceTuple( |
930 | | const PgDeleteSequenceTupleRequestPB& req, PgDeleteSequenceTupleResponsePB* resp, |
931 | 282 | rpc::RpcContext* context) { |
932 | 282 | PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
933 | 282 | auto table = VERIFY_RESULT(table_cache_.Get(table_oid.GetYbTableId())); |
934 | | |
935 | 0 | auto psql_delete(client::YBPgsqlWriteOp::NewDelete(table)); |
936 | 282 | auto delete_request = psql_delete->mutable_request(); |
937 | | |
938 | 282 | delete_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid()); |
939 | 282 | delete_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid()); |
940 | | |
941 | 282 | auto& session = EnsureSession(PgClientSessionKind::kSequence); |
942 | 282 | session->SetDeadline(context->GetClientDeadline()); |
943 | 282 | return session->ApplyAndFlush(std::move(psql_delete)); |
944 | 282 | } |
945 | | |
946 | | Status PgClientSession::DeleteDBSequences( |
947 | | const PgDeleteDBSequencesRequestPB& req, PgDeleteDBSequencesResponsePB* resp, |
948 | 71 | rpc::RpcContext* context) { |
949 | 71 | PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
950 | 71 | auto table_res = table_cache_.Get(table_oid.GetYbTableId()); |
951 | 71 | if (!table_res.ok()) { |
952 | | // Sequence table is not yet created. |
953 | 58 | return Status::OK(); |
954 | 58 | } |
955 | | |
956 | 13 | auto table = std::move(*table_res); |
957 | 13 | if (table == nullptr) { |
958 | 0 | return Status::OK(); |
959 | 0 | } |
960 | | |
961 | 13 | auto psql_delete(client::YBPgsqlWriteOp::NewDelete(table)); |
962 | 13 | auto delete_request = psql_delete->mutable_request(); |
963 | | |
964 | 13 | delete_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid()); |
965 | | |
966 | 13 | auto& session = EnsureSession(PgClientSessionKind::kSequence); |
967 | 13 | session->SetDeadline(context->GetClientDeadline()); |
968 | 13 | return session->ApplyAndFlush(std::move(psql_delete)); |
969 | 13 | } |
970 | | |
971 | 2.20M | client::YBSessionPtr& PgClientSession::EnsureSession(PgClientSessionKind kind) { |
972 | 2.20M | auto& session = Session(kind); |
973 | 2.20M | if (!session) { |
974 | 12.6k | session = CreateSession(&client_, clock_); |
975 | 12.6k | } |
976 | 2.20M | return session; |
977 | 2.20M | } |
978 | | |
979 | 4.60M | client::YBSessionPtr& PgClientSession::Session(PgClientSessionKind kind) { |
980 | 4.60M | return sessions_[to_underlying(kind)].session; |
981 | 4.60M | } |
982 | | |
983 | 4.05M | client::YBTransactionPtr& PgClientSession::Transaction(PgClientSessionKind kind) { |
984 | 4.05M | return sessions_[to_underlying(kind)].transaction; |
985 | 4.05M | } |
986 | | |
987 | | } // namespace tserver |
988 | | } // namespace yb |