/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_session.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/pggate/pg_session.h" |
17 | | |
18 | | #include <memory> |
19 | | |
20 | | #include <boost/optional.hpp> |
21 | | |
22 | | #include "yb/client/batcher.h" |
23 | | #include "yb/client/error.h" |
24 | | #include "yb/client/schema.h" |
25 | | #include "yb/client/session.h" |
26 | | #include "yb/client/table.h" |
27 | | #include "yb/client/tablet_server.h" |
28 | | #include "yb/client/transaction.h" |
29 | | #include "yb/client/yb_op.h" |
30 | | #include "yb/client/yb_table_name.h" |
31 | | |
32 | | #include "yb/common/pg_types.h" |
33 | | #include "yb/common/pgsql_error.h" |
34 | | #include "yb/common/placement_info.h" |
35 | | #include "yb/common/ql_expr.h" |
36 | | #include "yb/common/ql_value.h" |
37 | | #include "yb/common/row_mark.h" |
38 | | #include "yb/common/schema.h" |
39 | | #include "yb/common/transaction_error.h" |
40 | | |
41 | | #include "yb/docdb/doc_key.h" |
42 | | #include "yb/docdb/primitive_value.h" |
43 | | #include "yb/docdb/value_type.h" |
44 | | |
45 | | #include "yb/gutil/casts.h" |
46 | | |
47 | | #include "yb/tserver/pg_client.pb.h" |
48 | | #include "yb/tserver/tserver_shared_mem.h" |
49 | | |
50 | | #include "yb/util/flag_tags.h" |
51 | | #include "yb/util/format.h" |
52 | | #include "yb/util/result.h" |
53 | | #include "yb/util/shared_mem.h" |
54 | | #include "yb/util/status_format.h" |
55 | | #include "yb/util/string_util.h" |
56 | | |
57 | | #include "yb/yql/pggate/pg_client.h" |
58 | | #include "yb/yql/pggate/pg_expr.h" |
59 | | #include "yb/yql/pggate/pg_op.h" |
60 | | #include "yb/yql/pggate/pg_txn_manager.h" |
61 | | #include "yb/yql/pggate/pggate_flags.h" |
62 | | #include "yb/yql/pggate/ybc_pggate.h" |
63 | | |
64 | | using namespace std::literals; |
65 | | |
66 | | DEFINE_int32(ysql_wait_until_index_permissions_timeout_ms, 60 * 60 * 1000, // 60 min. |
67 | | "DEPRECATED: use backfill_index_client_rpc_timeout_ms instead."); |
68 | | TAG_FLAG(ysql_wait_until_index_permissions_timeout_ms, advanced); |
69 | | DECLARE_int32(TEST_user_ddl_operation_timeout_sec); |
70 | | |
71 | | DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests."); |
72 | | |
73 | | namespace yb { |
74 | | namespace pggate { |
75 | | |
76 | | using std::make_shared; |
77 | | using std::unique_ptr; |
78 | | using std::shared_ptr; |
79 | | using std::string; |
80 | | |
81 | | using client::YBClient; |
82 | | using client::YBSession; |
83 | | using client::YBMetaDataCache; |
84 | | using client::YBSchema; |
85 | | using client::YBOperation; |
86 | | using client::YBTable; |
87 | | using client::YBTableName; |
88 | | using client::YBTableType; |
89 | | |
90 | | using yb::master::GetNamespaceInfoResponsePB; |
91 | | |
92 | | using yb::tserver::TServerSharedObject; |
93 | | |
94 | | namespace { |
95 | | |
96 | | static constexpr const size_t kPgSequenceLastValueColIdx = 2; |
97 | | static constexpr const size_t kPgSequenceIsCalledColIdx = 3; |
98 | | |
99 | 69.2k | docdb::PrimitiveValue NullValue(SortingType sorting) { |
100 | 69.2k | using SortingType = SortingType; |
101 | | |
102 | 69.2k | return docdb::PrimitiveValue( |
103 | 69.2k | sorting == SortingType::kAscendingNullsLast || sorting == SortingType::kDescendingNullsLast |
104 | 0 | ? docdb::ValueType::kNullHigh |
105 | 69.2k | : docdb::ValueType::kNullLow); |
106 | 69.2k | } |
107 | | |
108 | | void InitKeyColumnPrimitiveValues( |
109 | | const google::protobuf::RepeatedPtrField<PgsqlExpressionPB> &column_values, |
110 | | const Schema &schema, |
111 | | size_t start_idx, |
112 | 703k | vector<docdb::PrimitiveValue> *components) { |
113 | 703k | size_t column_idx = start_idx; |
114 | 1.03M | for (const auto& column_value : column_values) { |
115 | 1.03M | const auto sorting_type = schema.column(column_idx).sorting_type(); |
116 | 1.03M | if (column_value.has_value()) { |
117 | 1.03M | const auto& value = column_value.value(); |
118 | 1.03M | components->push_back( |
119 | 1.03M | IsNull(value) |
120 | 69.2k | ? NullValue(sorting_type) |
121 | 963k | : docdb::PrimitiveValue::FromQLValuePB(value, sorting_type)); |
122 | 27 | } else { |
123 | | // TODO(neil) The current setup only works for CQL as it assumes primary key value must not |
124 | | // be dependent on any column values. This needs to be fixed as PostgreSQL expression might |
125 | | // require a read from a table. |
126 | | // |
127 | | // Use regular executor for now. |
128 | 27 | QLExprExecutor executor; |
129 | 27 | QLExprResult result; |
130 | 27 | auto s = executor.EvalExpr(column_value, nullptr, result.Writer()); |
131 | | |
132 | 27 | components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type)); |
133 | 27 | } |
134 | 1.03M | ++column_idx; |
135 | 1.03M | } |
136 | 703k | } |
137 | | |
138 | 302k | bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const string& table_id) { |
139 | 302k | return request.table_id() == table_id || |
140 | 296k | (request.has_index_request() && IsTableUsedByRequest(request.index_request(), table_id)); |
141 | 302k | } |
142 | | |
143 | 122 | bool IsTableUsedByRequest(const PgsqlWriteRequestPB& request, const string& table_id) { |
144 | 122 | return request.table_id() == table_id; |
145 | 122 | } |
146 | | |
147 | 192k | bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) { |
148 | 192k | if (op.is_read()) { |
149 | 192k | return IsTableUsedByRequest(down_cast<const PgsqlReadOp&>(op).read_request(), table_id); |
150 | 122 | } else { |
151 | 122 | return IsTableUsedByRequest(down_cast<const PgsqlWriteOp&>(op).write_request(), table_id); |
152 | 122 | } |
153 | 192k | } |
154 | | |
155 | | struct PgForeignKeyReferenceLightweight { |
156 | | PgOid table_id; |
157 | | Slice ybctid; |
158 | | }; |
159 | | |
160 | 4.69M | size_t ForeignKeyReferenceHash(PgOid table_id, const char* begin, const char* end) { |
161 | 4.69M | size_t hash = 0; |
162 | 4.69M | boost::hash_combine(hash, table_id); |
163 | 4.69M | boost::hash_range(hash, begin, end); |
164 | 4.69M | return hash; |
165 | 4.69M | } |
166 | | |
167 | | template<class Container> |
168 | 2.05M | auto Find(const Container& container, PgOid table_id, const Slice& ybctid) { |
169 | 2.05M | return container.find(PgForeignKeyReferenceLightweight{table_id, ybctid}, |
170 | 2.05M | [](const auto& k) { |
171 | 2.05M | return ForeignKeyReferenceHash(k.table_id, k.ybctid.cdata(), k.ybctid.cend()); }, |
172 | 1.63M | [](const auto& l, const auto& r) { |
173 | 1.63M | return l.table_id == r.table_id && l.ybctid == r.ybctid; }); |
174 | 2.05M | } |
175 | | |
176 | | template<class Container> |
177 | 63.5k | bool Erase(Container* container, PgOid table_id, const Slice& ybctid) { |
178 | 63.5k | const auto it = Find(*container, table_id, ybctid); |
179 | 63.5k | if (it != container->end()) { |
180 | 1.70k | container->erase(it); |
181 | 1.70k | return true; |
182 | 1.70k | } |
183 | 61.8k | return false; |
184 | 61.8k | } |
185 | | |
186 | | } // namespace |
187 | | |
188 | | |
189 | | PerformFuture::PerformFuture( |
190 | | std::future<PerformResult> future, PgSession* session, PgObjectIds* relations) |
191 | 775k | : future_(std::move(future)), session_(session), relations_(std::move(*relations)) {} |
192 | | |
193 | 10.8M | bool PerformFuture::Valid() const { |
194 | 10.8M | return session_ != nullptr; |
195 | 10.8M | } |
196 | | |
197 | 774k | CHECKED_STATUS PerformFuture::Get() { |
198 | 774k | auto result = future_.get(); |
199 | 774k | auto session = session_; |
200 | 774k | session_ = nullptr; |
201 | 774k | session->TrySetCatalogReadPoint(result.catalog_read_time); |
202 | 774k | return session->PatchStatus(result.status, relations_); |
203 | 774k | } |
204 | | |
205 | | //-------------------------------------------------------------------------------------------------- |
206 | | // Class PgSession::RunHelper |
207 | | //-------------------------------------------------------------------------------------------------- |
208 | | |
209 | | PgSession::RunHelper::RunHelper(const PgObjectId& relation_id, |
210 | | PgSession* pg_session, |
211 | | IsTransactionalSession transactional) |
212 | | : relation_id_(relation_id), |
213 | | pg_session_(*pg_session), |
214 | | transactional_(transactional), |
215 | | buffer_(transactional_ ? pg_session_.buffered_txn_ops_ |
216 | 2.74M | : pg_session_.buffered_ops_) { |
217 | 2.74M | } |
218 | | |
219 | | Status PgSession::RunHelper::Apply( |
220 | 4.03M | const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable) { |
221 | 4.03M | auto& buffered_keys = pg_session_.buffered_keys_; |
222 | | // Try buffering this operation if it is a write operation, buffering is enabled and no |
223 | | // operations have been already applied to current session (yb session does not exist). |
224 | 4.03M | if (operations_.empty() && pg_session_.buffering_enabled_ && |
225 | 2.59M | !force_non_bufferable && op->is_write()) { |
226 | 2.12M | const auto& wop = down_cast<PgsqlWriteOp&>(*op).write_request(); |
227 | | // Check for buffered operation related to same row. |
228 | | // If multiple operations are performed in context of single RPC second operation will not |
229 | | // see the results of first operation on DocDB side. |
230 | | // Multiple operations on same row must be performed in context of different RPC. |
231 | | // Flush is required in this case. |
232 | 2.12M | RowIdentifier row_id(schema, wop); |
233 | 2.12M | if (PREDICT_FALSE(!buffered_keys.insert(row_id).second)) { |
234 | 17.6k | RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); |
235 | 17.6k | buffered_keys.insert(row_id); |
236 | 17.6k | } |
237 | 2.12M | if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { |
238 | 0 | LOG(INFO) << "Buffering operation: " << wop.ShortDebugString(); |
239 | 0 | } |
240 | 2.12M | buffer_.Add(op, relation_id_); |
241 | | // Flush buffers in case limit of operations in single RPC exceeded. |
242 | 2.12M | return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size) |
243 | 2.12M | ? Status::OK() |
244 | 2.31k | : pg_session_.FlushBufferedOperations(); |
245 | 1.90M | } |
246 | 1.90M | bool read_only = op->is_read(); |
247 | | // Flush all buffered operations (if any) before performing non-bufferable operation |
248 | 1.90M | if (!buffered_keys.empty()) { |
249 | 38.3k | SCHECK(operations_.empty(), |
250 | 38.3k | IllegalState, |
251 | 38.3k | "Buffered operations must be flushed before applying first non-bufferable operation"); |
252 | | // Buffered operations can't be combined within single RPC with non bufferable operation |
253 | | // in case non bufferable operation has preset read_time. |
254 | | // Buffered operations must be flushed independently in this case. |
255 | 38.3k | bool full_flush_required = transactional_ && read_time && *read_time; |
256 | | // Check for buffered operation that affected same table as current operation. |
257 | 231k | for (auto i = buffered_keys.begin(); !full_flush_required && i != buffered_keys.end(); ++i) { |
258 | 192k | full_flush_required = IsTableUsedByOperation(*op, i->table_id()); |
259 | 192k | } |
260 | 38.3k | if (full_flush_required) { |
261 | 7.14k | RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); |
262 | 31.2k | } else { |
263 | 31.2k | RETURN_NOT_OK(pg_session_.FlushBufferedOperationsImpl( |
264 | 31.2k | [this](auto ops, auto transactional) -> Status { |
265 | 31.2k | if (transactional == transactional_) { |
266 | | // Save buffered operations for further applying before non-buffered operation. |
267 | 31.2k | operations_.Swap(&ops); |
268 | 31.2k | return Status::OK(); |
269 | 31.2k | } |
270 | 31.2k | return pg_session_.FlushOperations(std::move(ops), transactional); |
271 | 31.2k | })); |
272 | 31.2k | read_only = read_only && operations_.empty(); |
273 | 31.2k | } |
274 | 38.3k | } |
275 | | |
276 | 1.90M | TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange; |
277 | 1.90M | if (op->is_read()) { |
278 | 1.89M | const PgsqlReadRequestPB& read_req = down_cast<PgsqlReadOp&>(*op).read_request(); |
279 | 1.89M | auto row_mark_type = GetRowMarkTypeFromPB(read_req); |
280 | 1.89M | read_only = read_only && !IsValidRowMarkType(row_mark_type); |
281 | 1.89M | if (RowMarkNeedsHigherPriority((RowMarkType) row_mark_type)) { |
282 | 6.89k | txn_priority_requirement = kHigherPriorityRange; |
283 | 6.89k | } |
284 | 1.89M | } |
285 | | |
286 | 1.90M | if (pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { |
287 | 75.5k | txn_priority_requirement = kHighestPriority; |
288 | 75.5k | } |
289 | | |
290 | 1.90M | if (!transactional_ && read_only && schema.table_properties().is_ysql_catalog_table() && |
291 | 194k | !YBCIsInitDbModeEnvVarSet()) { |
292 | 194k | pg_session_.use_catalog_session_ = true; |
293 | 194k | } |
294 | | |
295 | 1.90M | if (transactional_) { |
296 | 1.71M | pg_session_.UpdateInTxnLimit(read_time); |
297 | 1.71M | } |
298 | | |
299 | 1.90M | if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { |
300 | | #ifdef PG_CLIENT |
301 | | LOG(INFO) << "Applying operation: " << op->ShortDebugString(); |
302 | | #endif |
303 | 0 | } |
304 | | |
305 | 1.90M | operations_.Add(op, relation_id_); |
306 | | |
307 | 1.90M | if (transactional_) { |
308 | 1.71M | RETURN_NOT_OK(pg_session_.pg_txn_manager_->CalculateIsolation( |
309 | 1.71M | read_only, txn_priority_requirement)); |
310 | 1.71M | } |
311 | | |
312 | 1.90M | return Status::OK(); |
313 | 1.90M | } |
314 | | |
315 | 2.74M | Result<PerformFuture> PgSession::RunHelper::Flush() { |
316 | 2.74M | if (operations_.empty()) { |
317 | | // All operations were buffered, no need to flush. |
318 | 2.12M | return PerformFuture(); |
319 | 2.12M | } |
320 | | |
321 | 624k | auto promise = std::make_shared<std::promise<PerformResult>>(); |
322 | | |
323 | 624k | pg_session_.Perform(&operations_.operations, [promise](PerformResult result) { |
324 | 624k | promise->set_value(result); |
325 | 624k | }); |
326 | 624k | return PerformFuture(promise->get_future(), &pg_session_, &operations_.relations); |
327 | 624k | } |
328 | | |
329 | | //-------------------------------------------------------------------------------------------------- |
330 | | // Class PgForeignKeyReference |
331 | | //-------------------------------------------------------------------------------------------------- |
332 | | |
333 | | PgForeignKeyReference::PgForeignKeyReference(PgOid tid, std::string yid) : |
334 | 1.85M | table_id(tid), ybctid(std::move(yid)) { |
335 | 1.85M | } |
336 | | |
337 | 1.50M | bool operator==(const PgForeignKeyReference& k1, const PgForeignKeyReference& k2) { |
338 | 1.50M | return k1.table_id == k2.table_id && k1.ybctid == k2.ybctid; |
339 | 1.50M | } |
340 | | |
341 | 2.64M | size_t hash_value(const PgForeignKeyReference& key) { |
342 | 2.64M | return ForeignKeyReferenceHash( |
343 | 2.64M | key.table_id, key.ybctid.c_str(), key.ybctid.c_str() + key.ybctid.length()); |
344 | 2.64M | } |
345 | | |
346 | | //-------------------------------------------------------------------------------------------------- |
347 | | // Class RowIdentifier |
348 | | //-------------------------------------------------------------------------------------------------- |
349 | | |
350 | | RowIdentifier::RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request) |
351 | 2.12M | : table_id_(&request.table_id()) { |
352 | 2.12M | if (request.has_ybctid_column_value()) { |
353 | 1.77M | ybctid_ = &request.ybctid_column_value().value().binary_value(); |
354 | 352k | } else { |
355 | 352k | vector<docdb::PrimitiveValue> hashed_components; |
356 | 352k | vector<docdb::PrimitiveValue> range_components; |
357 | 352k | InitKeyColumnPrimitiveValues(request.partition_column_values(), |
358 | 352k | schema, |
359 | 352k | 0 /* start_idx */, |
360 | 352k | &hashed_components); |
361 | 352k | InitKeyColumnPrimitiveValues(request.range_column_values(), |
362 | 352k | schema, |
363 | 352k | schema.num_hash_key_columns(), |
364 | 352k | &range_components); |
365 | 352k | if (hashed_components.empty()) { |
366 | 204k | ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer(); |
367 | 147k | } else { |
368 | 147k | ybctid_holder_ = docdb::DocKey(request.hash_code(), |
369 | 147k | std::move(hashed_components), |
370 | 147k | std::move(range_components)).Encode().ToStringBuffer(); |
371 | 147k | } |
372 | 352k | ybctid_ = nullptr; |
373 | 352k | } |
374 | 2.12M | } |
375 | | |
376 | 3.19M | const string& RowIdentifier::ybctid() const { |
377 | 2.68M | return ybctid_ ? *ybctid_ : ybctid_holder_; |
378 | 3.19M | } |
379 | | |
380 | 3.54M | const string& RowIdentifier::table_id() const { |
381 | 3.54M | return *table_id_; |
382 | 3.54M | } |
383 | | |
384 | 603k | bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) { |
385 | 603k | return k1.table_id() == k2.table_id() && k1.ybctid() == k2.ybctid(); |
386 | 603k | } |
387 | | |
388 | 2.14M | size_t hash_value(const RowIdentifier& key) { |
389 | 2.14M | size_t hash = 0; |
390 | 2.14M | boost::hash_combine(hash, key.table_id()); |
391 | 2.14M | boost::hash_combine(hash, key.ybctid()); |
392 | 2.14M | return hash; |
393 | 2.14M | } |
394 | | |
395 | | //-------------------------------------------------------------------------------------------------- |
396 | | // Class PgSession |
397 | | //-------------------------------------------------------------------------------------------------- |
398 | | |
399 | | PgSession::PgSession( |
400 | | client::YBClient* client, |
401 | | PgClient* pg_client, |
402 | | const string& database_name, |
403 | | scoped_refptr<PgTxnManager> pg_txn_manager, |
404 | | scoped_refptr<server::HybridClock> clock, |
405 | | const tserver::TServerSharedObject* tserver_shared_object, |
406 | | const YBCPgCallbacks& pg_callbacks) |
407 | | : session_(BuildSession(client)), |
408 | | pg_client_(*pg_client), |
409 | | pg_txn_manager_(std::move(pg_txn_manager)), |
410 | | clock_(std::move(clock)), |
411 | | tserver_shared_object_(tserver_shared_object), |
412 | 1.65k | pg_callbacks_(pg_callbacks) { |
413 | 1.65k | } |
414 | | |
415 | 1.65k | PgSession::~PgSession() { |
416 | 1.65k | } |
417 | | |
418 | | //-------------------------------------------------------------------------------------------------- |
419 | | |
420 | 1.65k | Status PgSession::ConnectDatabase(const string& database_name) { |
421 | 1.65k | connected_database_ = database_name; |
422 | 1.65k | return Status::OK(); |
423 | 1.65k | } |
424 | | |
425 | 1.61k | Status PgSession::IsDatabaseColocated(const PgOid database_oid, bool *colocated) { |
426 | 1.61k | auto resp = VERIFY_RESULT(pg_client_.GetDatabaseInfo(database_oid)); |
427 | 1.61k | *colocated = resp.colocated(); |
428 | 1.61k | return Status::OK(); |
429 | 1.61k | } |
430 | | |
431 | | //-------------------------------------------------------------------------------------------------- |
432 | | |
433 | 21 | Status PgSession::DropDatabase(const string& database_name, PgOid database_oid) { |
434 | 21 | tserver::PgDropDatabaseRequestPB req; |
435 | 21 | req.set_database_name(database_name); |
436 | 21 | req.set_database_oid(database_oid); |
437 | | |
438 | 21 | RETURN_NOT_OK(pg_client_.DropDatabase(&req, CoarseTimePoint())); |
439 | 20 | RETURN_NOT_OK(DeleteDBSequences(database_oid)); |
440 | 20 | return Status::OK(); |
441 | 20 | } |
442 | | |
443 | 0 | Status PgSession::GetCatalogMasterVersion(uint64_t *version) { |
444 | 0 | *version = VERIFY_RESULT(pg_client_.GetCatalogMasterVersion()); |
445 | 0 | return Status::OK(); |
446 | 0 | } |
447 | | |
448 | 21 | Status PgSession::CreateSequencesDataTable() { |
449 | 21 | return pg_client_.CreateSequencesDataTable(); |
450 | 21 | } |
451 | | |
452 | | Status PgSession::InsertSequenceTuple(int64_t db_oid, |
453 | | int64_t seq_oid, |
454 | | uint64_t ysql_catalog_version, |
455 | | int64_t last_val, |
456 | 54 | bool is_called) { |
457 | 54 | PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
458 | 54 | auto result = LoadTable(oid); |
459 | 54 | if (!result.ok()) { |
460 | 21 | RETURN_NOT_OK(CreateSequencesDataTable()); |
461 | | // Try one more time. |
462 | 21 | result = LoadTable(oid); |
463 | 21 | } |
464 | 54 | auto t = VERIFY_RESULT(std::move(result)); |
465 | | |
466 | 54 | auto psql_write(t->NewPgsqlInsert()); |
467 | | |
468 | 54 | auto write_request = psql_write->mutable_request(); |
469 | 54 | write_request->set_ysql_catalog_version(ysql_catalog_version); |
470 | | |
471 | 54 | write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); |
472 | 54 | write_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); |
473 | | |
474 | 54 | PgsqlColumnValuePB* column_value = write_request->add_column_values(); |
475 | 54 | column_value->set_column_id(t->schema().column_id(kPgSequenceLastValueColIdx)); |
476 | 54 | column_value->mutable_expr()->mutable_value()->set_int64_value(last_val); |
477 | | |
478 | 54 | column_value = write_request->add_column_values(); |
479 | 54 | column_value->set_column_id(t->schema().column_id(kPgSequenceIsCalledColIdx)); |
480 | 54 | column_value->mutable_expr()->mutable_value()->set_bool_value(is_called); |
481 | | |
482 | 54 | return session_->ApplyAndFlush(std::move(psql_write)); |
483 | 54 | } |
484 | | |
485 | | Status PgSession::UpdateSequenceTuple(int64_t db_oid, |
486 | | int64_t seq_oid, |
487 | | uint64_t ysql_catalog_version, |
488 | | int64_t last_val, |
489 | | bool is_called, |
490 | | boost::optional<int64_t> expected_last_val, |
491 | | boost::optional<bool> expected_is_called, |
492 | 33 | bool* skipped) { |
493 | 33 | PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
494 | 33 | auto t = VERIFY_RESULT(LoadTable(oid)); |
495 | | |
496 | 33 | std::shared_ptr<client::YBPgsqlWriteOp> psql_write(t->NewPgsqlUpdate()); |
497 | | |
498 | 33 | auto write_request = psql_write->mutable_request(); |
499 | 33 | write_request->set_ysql_catalog_version(ysql_catalog_version); |
500 | | |
501 | 33 | write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); |
502 | 33 | write_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); |
503 | | |
504 | 33 | PgsqlColumnValuePB* column_value = write_request->add_column_new_values(); |
505 | 33 | column_value->set_column_id(t->schema().column_id(kPgSequenceLastValueColIdx)); |
506 | 33 | column_value->mutable_expr()->mutable_value()->set_int64_value(last_val); |
507 | | |
508 | 33 | column_value = write_request->add_column_new_values(); |
509 | 33 | column_value->set_column_id(t->schema().column_id(kPgSequenceIsCalledColIdx)); |
510 | 33 | column_value->mutable_expr()->mutable_value()->set_bool_value(is_called); |
511 | | |
512 | 33 | auto where_pb = write_request->mutable_where_expr()->mutable_condition(); |
513 | | |
514 | 33 | if (expected_last_val && expected_is_called) { |
515 | | // WHERE clause => WHERE last_val == expected_last_val AND is_called == expected_is_called. |
516 | 31 | where_pb->set_op(QL_OP_AND); |
517 | | |
518 | 31 | auto cond = where_pb->add_operands()->mutable_condition(); |
519 | 31 | cond->set_op(QL_OP_EQUAL); |
520 | 31 | cond->add_operands()->set_column_id(t->schema().column_id(kPgSequenceLastValueColIdx)); |
521 | 31 | cond->add_operands()->mutable_value()->set_int64_value(*expected_last_val); |
522 | | |
523 | 31 | cond = where_pb->add_operands()->mutable_condition(); |
524 | 31 | cond->set_op(QL_OP_EQUAL); |
525 | 31 | cond->add_operands()->set_column_id(t->schema().column_id(kPgSequenceIsCalledColIdx)); |
526 | 31 | cond->add_operands()->mutable_value()->set_bool_value(*expected_is_called); |
527 | 2 | } else { |
528 | 2 | where_pb->set_op(QL_OP_EXISTS); |
529 | 2 | } |
530 | | |
531 | | // For compatibility set deprecated column_refs |
532 | 33 | write_request->mutable_column_refs()->add_ids( |
533 | 33 | t->schema().column_id(kPgSequenceLastValueColIdx)); |
534 | 33 | write_request->mutable_column_refs()->add_ids( |
535 | 33 | t->schema().column_id(kPgSequenceIsCalledColIdx)); |
536 | | // Same values, to be consumed by current TServers |
537 | 33 | write_request->add_col_refs()->set_column_id( |
538 | 33 | t->schema().column_id(kPgSequenceLastValueColIdx)); |
539 | 33 | write_request->add_col_refs()->set_column_id( |
540 | 33 | t->schema().column_id(kPgSequenceIsCalledColIdx)); |
541 | | |
542 | 33 | RETURN_NOT_OK(session_->ApplyAndFlush(psql_write)); |
543 | 33 | if (skipped) { |
544 | 31 | *skipped = psql_write->response().skipped(); |
545 | 31 | } |
546 | 33 | return Status::OK(); |
547 | 33 | } |
548 | | |
549 | | Status PgSession::ReadSequenceTuple(int64_t db_oid, |
550 | | int64_t seq_oid, |
551 | | uint64_t ysql_catalog_version, |
552 | | int64_t *last_val, |
553 | 69 | bool *is_called) { |
554 | 69 | PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
555 | 69 | PgTableDescPtr t = VERIFY_RESULT(LoadTable(oid)); |
556 | | |
557 | 69 | std::shared_ptr<client::YBPgsqlReadOp> psql_read(t->NewPgsqlSelect()); |
558 | | |
559 | 69 | auto read_request = psql_read->mutable_request(); |
560 | 69 | read_request->set_ysql_catalog_version(ysql_catalog_version); |
561 | | |
562 | 69 | read_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); |
563 | 69 | read_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); |
564 | | |
565 | 69 | read_request->add_targets()->set_column_id( |
566 | 69 | t->schema().column_id(kPgSequenceLastValueColIdx)); |
567 | 69 | read_request->add_targets()->set_column_id( |
568 | 69 | t->schema().column_id(kPgSequenceIsCalledColIdx)); |
569 | | |
570 | | // For compatibility set deprecated column_refs |
571 | 69 | read_request->mutable_column_refs()->add_ids( |
572 | 69 | t->schema().column_id(kPgSequenceLastValueColIdx)); |
573 | 69 | read_request->mutable_column_refs()->add_ids( |
574 | 69 | t->schema().column_id(kPgSequenceIsCalledColIdx)); |
575 | | // Same values, to be consumed by current TServers |
576 | 69 | read_request->add_col_refs()->set_column_id( |
577 | 69 | t->schema().column_id(kPgSequenceLastValueColIdx)); |
578 | 69 | read_request->add_col_refs()->set_column_id( |
579 | 69 | t->schema().column_id(kPgSequenceIsCalledColIdx)); |
580 | | |
581 | 69 | RETURN_NOT_OK(session_->ReadSync(psql_read)); |
582 | | |
583 | 69 | Slice cursor; |
584 | 69 | int64_t row_count = 0; |
585 | 69 | PgDocData::LoadCache(psql_read->rows_data(), &row_count, &cursor); |
586 | 69 | if (row_count == 0) { |
587 | 0 | return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid); |
588 | 0 | } |
589 | | |
590 | 69 | PgWireDataHeader header = PgDocData::ReadDataHeader(&cursor); |
591 | 69 | if (header.is_null()) { |
592 | 0 | return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid); |
593 | 0 | } |
594 | 69 | size_t read_size = PgDocData::ReadNumber(&cursor, last_val); |
595 | 69 | cursor.remove_prefix(read_size); |
596 | | |
597 | 69 | header = PgDocData::ReadDataHeader(&cursor); |
598 | 69 | if (header.is_null()) { |
599 | 0 | return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid); |
600 | 0 | } |
601 | 69 | read_size = PgDocData::ReadNumber(&cursor, is_called); |
602 | 69 | return Status::OK(); |
603 | 69 | } |
604 | | |
605 | 44 | Status PgSession::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { |
606 | 44 | PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
607 | 44 | PgTableDescPtr t = VERIFY_RESULT(LoadTable(oid)); |
608 | | |
609 | 44 | auto psql_delete(t->NewPgsqlDelete()); |
610 | 44 | auto delete_request = psql_delete->mutable_request(); |
611 | | |
612 | 44 | delete_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); |
613 | 44 | delete_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); |
614 | | |
615 | 44 | return session_->ApplyAndFlush(std::move(psql_delete)); |
616 | 44 | } |
617 | | |
618 | 20 | Status PgSession::DeleteDBSequences(int64_t db_oid) { |
619 | 20 | PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); |
620 | 20 | Result<PgTableDescPtr> r = LoadTable(oid); |
621 | 20 | if (!r.ok()) { |
622 | | // Sequence table is not yet created. |
623 | 16 | return Status::OK(); |
624 | 16 | } |
625 | | |
626 | 4 | auto t = std::move(*r); |
627 | 4 | if (t == nullptr) { |
628 | 0 | return Status::OK(); |
629 | 0 | } |
630 | | |
631 | 4 | auto psql_delete(t->NewPgsqlDelete()); |
632 | 4 | auto delete_request = psql_delete->mutable_request(); |
633 | | |
634 | 4 | delete_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); |
635 | 4 | return session_->ApplyAndFlush(std::move(psql_delete)); |
636 | 4 | } |
637 | | |
638 | | //-------------------------------------------------------------------------------------------------- |
639 | | |
640 | 1.03k | Status PgSession::DropTable(const PgObjectId& table_id) { |
641 | 1.03k | tserver::PgDropTableRequestPB req; |
642 | 1.03k | table_id.ToPB(req.mutable_table_id()); |
643 | 1.03k | return ResultToStatus(pg_client_.DropTable(&req, CoarseTimePoint())); |
644 | 1.03k | } |
645 | | |
646 | | Status PgSession::DropIndex( |
647 | | const PgObjectId& index_id, |
648 | 141 | client::YBTableName* indexed_table_name) { |
649 | 141 | tserver::PgDropTableRequestPB req; |
650 | 141 | index_id.ToPB(req.mutable_table_id()); |
651 | 141 | req.set_index(true); |
652 | 140 | auto result = VERIFY_RESULT(pg_client_.DropTable(&req, CoarseTimePoint())); |
653 | 140 | if (indexed_table_name) { |
654 | 140 | *indexed_table_name = std::move(result); |
655 | 140 | } |
656 | 140 | return Status::OK(); |
657 | 141 | } |
658 | | |
659 | | Status PgSession::DropTablegroup(const PgOid database_oid, |
660 | 0 | PgOid tablegroup_oid) { |
661 | 0 | tserver::PgDropTablegroupRequestPB req; |
662 | 0 | PgObjectId tablegroup_id(database_oid, tablegroup_oid); |
663 | 0 | tablegroup_id.ToPB(req.mutable_tablegroup_id()); |
664 | 0 | Status s = pg_client_.DropTablegroup(&req, CoarseTimePoint()); |
665 | 0 | InvalidateTableCache(PgObjectId(database_oid, tablegroup_oid), InvalidateOnPgClient::kFalse); |
666 | 0 | return s; |
667 | 0 | } |
668 | | |
669 | | //-------------------------------------------------------------------------------------------------- |
670 | | |
671 | 5.95M | Result<PgTableDescPtr> PgSession::LoadTable(const PgObjectId& table_id) { |
672 | 188 | VLOG(3) << "Loading table descriptor for " << table_id; |
673 | | |
674 | 5.95M | auto cached_table_it = table_cache_.find(table_id); |
675 | 5.95M | bool exists = cached_table_it != table_cache_.end(); |
676 | 5.95M | if (exists && cached_table_it->second) { |
677 | 5.89M | return cached_table_it->second; |
678 | 5.89M | } |
679 | | |
680 | 14 | VLOG(4) << "Table cache MISS: " << table_id; |
681 | 64.5k | auto table = VERIFY_RESULT(pg_client_.OpenTable(table_id, exists, invalidate_table_cache_time_)); |
682 | 64.5k | invalidate_table_cache_time_ = CoarseTimePoint(); |
683 | 64.5k | if (exists) { |
684 | 2 | cached_table_it->second = table; |
685 | 64.5k | } else { |
686 | 64.5k | table_cache_.emplace(table_id, table); |
687 | 64.5k | } |
688 | 64.5k | return table; |
689 | 64.6k | } |
690 | | |
691 | | void PgSession::InvalidateTableCache( |
692 | 1.61k | const PgObjectId& table_id, InvalidateOnPgClient invalidate_on_pg_client) { |
693 | 1.61k | if (invalidate_on_pg_client) { |
694 | | // Keep special record about this table_id, so when we would open this table again, |
695 | | // reopen flag will be sent to pg client service. |
696 | | // This flag means that pg client service should remove table from his cache and fetch |
697 | | // new data from master. |
698 | | // It is optional optimization, but some tests fails w/o it, since they expect that |
699 | | // local table information is updated after alter table operation. |
700 | 2 | table_cache_[table_id] = nullptr; |
701 | 1.61k | } else { |
702 | 1.61k | auto it = table_cache_.find(table_id); |
703 | 1.61k | if (it != table_cache_.end() && it->second) { |
704 | 850 | table_cache_.erase(it); |
705 | 850 | } |
706 | 1.61k | } |
707 | 1.61k | } |
708 | | |
709 | 613 | void PgSession::InvalidateAllTablesCache() { |
710 | 613 | invalidate_table_cache_time_ = CoarseMonoClock::now(); |
711 | 613 | table_cache_.clear(); |
712 | 613 | } |
713 | | |
714 | 201k | Status PgSession::StartOperationsBuffering() { |
715 | 201k | SCHECK(!buffering_enabled_, IllegalState, "Buffering has been already started"); |
716 | 201k | if (PREDICT_FALSE(!buffered_keys_.empty())) { |
717 | 0 | LOG(DFATAL) << "Buffering hasn't been started yet but " |
718 | 0 | << buffered_keys_.size() |
719 | 0 | << " buffered operations found"; |
720 | 0 | } |
721 | 201k | buffering_enabled_ = true; |
722 | 201k | return Status::OK(); |
723 | 201k | } |
724 | | |
725 | 174k | Status PgSession::StopOperationsBuffering() { |
726 | 174k | SCHECK(buffering_enabled_, IllegalState, "Buffering hasn't been started"); |
727 | 174k | buffering_enabled_ = false; |
728 | 174k | return FlushBufferedOperations(); |
729 | 174k | } |
730 | | |
731 | 48.0k | void PgSession::ResetOperationsBuffering() { |
732 | 48.0k | DropBufferedOperations(); |
733 | 48.0k | buffering_enabled_ = false; |
734 | 48.0k | } |
735 | | |
736 | 675k | Status PgSession::FlushBufferedOperations() { |
737 | 150k | return FlushBufferedOperationsImpl([this](auto ops, auto txn) { |
738 | 150k | return this->FlushOperations(std::move(ops), txn); |
739 | 150k | }); |
740 | 675k | } |
741 | | |
742 | 103k | void PgSession::DropBufferedOperations() { |
743 | 30 | VLOG_IF(1, !buffered_keys_.empty()) |
744 | 30 | << "Dropping " << buffered_keys_.size() << " pending operations"; |
745 | 103k | buffered_keys_.clear(); |
746 | 103k | buffered_ops_.Clear(); |
747 | 103k | buffered_txn_ops_.Clear(); |
748 | 103k | } |
749 | | |
750 | 2.00M | PgIsolationLevel PgSession::GetIsolationLevel() { |
751 | 2.00M | return pg_txn_manager_->GetPgIsolationLevel(); |
752 | 2.00M | } |
753 | | |
754 | 707k | Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) { |
755 | 707k | auto ops = std::move(buffered_ops_); |
756 | 707k | auto txn_ops = std::move(buffered_txn_ops_); |
757 | 707k | buffered_keys_.clear(); |
758 | 707k | buffered_ops_.Clear(); |
759 | 707k | buffered_txn_ops_.Clear(); |
760 | 707k | if (!ops.empty()) { |
761 | 55.3k | RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse)); |
762 | 55.3k | } |
763 | 707k | if (!txn_ops.empty()) { |
764 | 126k | SCHECK(!YBCIsInitDbModeEnvVarSet(), |
765 | 126k | IllegalState, |
766 | 126k | "No transactional operations are expected in the initdb mode"); |
767 | 126k | RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue)); |
768 | 126k | } |
769 | 685k | return Status::OK(); |
770 | 707k | } |
771 | | |
772 | 2.74M | Result<bool> PgSession::ShouldHandleTransactionally(const PgTableDesc& table, const PgsqlOp& op) { |
773 | 2.74M | if (!table.schema().table_properties().is_transactional() || |
774 | 2.74M | !op.need_transaction() || |
775 | 2.59M | YBCIsInitDbModeEnvVarSet()) { |
776 | 158k | return false; |
777 | 158k | } |
778 | 2.59M | const auto has_non_ddl_txn = pg_txn_manager_->IsTxnInProgress(); |
779 | 2.59M | if (!table.schema().table_properties().is_ysql_catalog_table()) { |
780 | 1.99M | SCHECK(has_non_ddl_txn, IllegalState, "Transactional operation requires transaction"); |
781 | 1.99M | return true; |
782 | 597k | } |
783 | | // Previously, yb_non_ddl_txn_for_sys_tables_allowed flag caused CREATE VIEW to fail with |
784 | | // read restart error because subsequent cache refresh used an outdated txn to read from the |
785 | | // system catalog, |
786 | | // As a quick fix, we prevent yb_non_ddl_txn_for_sys_tables_allowed from affecting reads. |
787 | 597k | if (pg_txn_manager_->IsDdlMode() || (yb_non_ddl_txn_for_sys_tables_allowed && has_non_ddl_txn)) { |
788 | 402k | return true; |
789 | 402k | } |
790 | 194k | if (op.is_write()) { |
791 | | // For consistent read from catalog tables all write operations must be done in transaction. |
792 | 0 | return STATUS_FORMAT(IllegalState, |
793 | 0 | "Transaction for catalog table write operation '$0' not found", |
794 | 0 | table.table_name().table_name()); |
795 | 0 | } |
796 | 194k | return false; |
797 | 194k | } |
798 | | |
799 | 0 | Result<bool> PgSession::IsInitDbDone() { |
800 | 0 | return pg_client_.IsInitDbDone(); |
801 | 0 | } |
802 | | |
803 | 150k | Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional) { |
804 | 150k | DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size); |
805 | | |
806 | 150k | if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { |
807 | 0 | LOG(INFO) << "Flushing buffered operations, using " |
808 | 0 | << (transactional ? "transactional" : "non-transactional") |
809 | 0 | << " session (num ops: " << ops.size() << ")"; |
810 | 0 | } |
811 | | |
812 | 150k | if (transactional) { |
813 | 95.5k | TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange; |
814 | 95.5k | if (GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { |
815 | 17.2k | txn_priority_requirement = kHighestPriority; |
816 | 17.2k | } |
817 | | |
818 | 95.5k | RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(false, txn_priority_requirement)); |
819 | 95.5k | in_txn_limit_ = clock_->Now(); |
820 | 95.5k | } |
821 | | |
822 | 150k | std::promise<PerformResult> promise; |
823 | 150k | Perform(&ops.operations, [&promise](const PerformResult& result) { |
824 | 150k | promise.set_value(result); |
825 | 150k | }); |
826 | 150k | PerformFuture future(promise.get_future(), this, &ops.relations); |
827 | 150k | return future.Get(); |
828 | 150k | } |
829 | | |
830 | 774k | void PgSession::Perform(PgsqlOps* operations, const PerformCallback& callback) { |
831 | 774k | tserver::PgPerformOptionsPB options; |
832 | | |
833 | 774k | if (use_catalog_session_) { |
834 | 194k | if (catalog_read_time_) { |
835 | 194k | if (*catalog_read_time_) { |
836 | 175k | catalog_read_time_->ToPB(options.mutable_read_time()); |
837 | 18.6k | } else { |
838 | 18.6k | options.mutable_read_time(); |
839 | 18.6k | } |
840 | 194k | } |
841 | 194k | options.set_use_catalog_session(true); |
842 | 194k | use_catalog_session_ = false; |
843 | 580k | } else { |
844 | 580k | pg_txn_manager_->SetupPerformOptions(&options); |
845 | | |
846 | 580k | if (in_txn_limit_ && pg_txn_manager_->IsTxnInProgress()) { |
847 | 580k | options.set_in_txn_limit_ht(in_txn_limit_.ToUint64()); |
848 | 580k | } |
849 | 580k | } |
850 | 774k | options.set_force_global_transaction(yb_force_global_transaction); |
851 | | |
852 | 774k | pg_client_.PerformAsync(&options, operations, callback); |
853 | 774k | } |
854 | | |
855 | 158k | Result<uint64_t> PgSession::GetSharedCatalogVersion() { |
856 | 158k | if (tserver_shared_object_) { |
857 | 158k | return (**tserver_shared_object_).ysql_catalog_version(); |
858 | 17 | } else { |
859 | 17 | return STATUS(NotSupported, "Tablet server shared memory has not been opened"); |
860 | 17 | } |
861 | 158k | } |
862 | | |
863 | 235 | Result<uint64_t> PgSession::GetSharedAuthKey() { |
864 | 235 | if (tserver_shared_object_) { |
865 | 235 | return (**tserver_shared_object_).postgres_auth_key(); |
866 | 0 | } else { |
867 | 0 | return STATUS(NotSupported, "Tablet server shared memory has not been opened"); |
868 | 0 | } |
869 | 235 | } |
870 | | |
871 | | Result<bool> PgSession::ForeignKeyReferenceExists(PgOid table_id, |
872 | | const Slice& ybctid, |
873 | 232k | const YbctidReader& reader) { |
874 | 232k | if (Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end()) { |
875 | 232k | return true; |
876 | 232k | } |
877 | | |
878 | | // Check existence of required FK intent. |
879 | | // Absence means the key was checked by previous batched request and was not found. |
880 | 529 | if (!Erase(&fk_reference_intent_, table_id, ybctid)) { |
881 | 4 | return false; |
882 | 4 | } |
883 | 525 | std::vector<Slice> ybctids; |
884 | 525 | const auto reserved_size = std::min<size_t>(FLAGS_ysql_session_max_batch_size, |
885 | 525 | fk_reference_intent_.size() + 1); |
886 | 525 | ybctids.reserve(reserved_size); |
887 | 525 | ybctids.push_back(ybctid); |
888 | | // TODO(dmitry): In case number of keys for same table > FLAGS_ysql_session_max_batch_size |
889 | | // two strategy are possible: |
890 | | // 1. select keys belonging to same tablet to reduce number of simultaneous RPC |
891 | | // 2. select keys belonging to different tablets to distribute reads among different nodes |
892 | 195k | const auto intent_match = [table_id](const auto& key) { return key.table_id == table_id; }; |
893 | 525 | for (auto it = fk_reference_intent_.begin(); |
894 | 100k | it != fk_reference_intent_.end() && ybctids.size() < FLAGS_ysql_session_max_batch_size; |
895 | 99.8k | ++it) { |
896 | 99.8k | if (intent_match(*it)) { |
897 | 99.8k | ybctids.push_back(it->ybctid); |
898 | 99.8k | } |
899 | 99.8k | } |
900 | 100k | for (auto& r : VERIFY_RESULT(reader(table_id, ybctids))) { |
901 | 100k | fk_reference_cache_.emplace(table_id, std::move(r)); |
902 | 100k | } |
903 | | // Remove used intents. |
904 | 518 | auto intent_count_for_remove = ybctids.size() - 1; |
905 | 518 | if (intent_count_for_remove == fk_reference_intent_.size()) { |
906 | 305 | fk_reference_intent_.clear(); |
907 | 213 | } else { |
908 | 213 | for (auto it = fk_reference_intent_.begin(); |
909 | 96.2k | it != fk_reference_intent_.end() && intent_count_for_remove > 0;) { |
910 | 96.0k | if (intent_match(*it)) { |
911 | 96.0k | it = fk_reference_intent_.erase(it); |
912 | 96.0k | --intent_count_for_remove; |
913 | 4 | } else { |
914 | 4 | ++it; |
915 | 4 | } |
916 | 96.0k | } |
917 | 213 | } |
918 | 518 | return Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end(); |
919 | 525 | } |
920 | | |
921 | 240k | void PgSession::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) { |
922 | 240k | if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) { |
923 | 240k | fk_reference_intent_.emplace(table_id, ybctid.ToBuffer()); |
924 | 240k | } |
925 | 240k | } |
926 | | |
927 | 1.51M | void PgSession::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
928 | 1.51M | if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) { |
929 | 1.50M | fk_reference_cache_.emplace(table_id, ybctid.ToBuffer()); |
930 | 1.50M | } |
931 | 1.51M | } |
932 | | |
933 | 62.9k | void PgSession::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
934 | 62.9k | Erase(&fk_reference_cache_, table_id, ybctid); |
935 | 62.9k | } |
936 | | |
937 | 775k | Status PgSession::PatchStatus(const Status& status, const PgObjectIds& relations) { |
938 | 775k | if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR) { |
939 | 1.02k | auto op_index = OpIndex::ValueFromStatus(status); |
940 | 1.02k | if (op_index && *op_index < relations.size()) { |
941 | 1.02k | char constraint_name[0xFF]; |
942 | 1.02k | constraint_name[sizeof(constraint_name) - 1] = 0; |
943 | 1.02k | pg_callbacks_.FetchUniqueConstraintName(relations[*op_index].object_oid, |
944 | 1.02k | constraint_name, |
945 | 1.02k | sizeof(constraint_name) - 1); |
946 | 1.02k | return STATUS( |
947 | 1.02k | AlreadyPresent, |
948 | 1.02k | Format("duplicate key value violates unique constraint \"$0\"", Slice(constraint_name)), |
949 | 1.02k | Slice(), |
950 | 1.02k | PgsqlError(YBPgErrorCode::YB_PG_UNIQUE_VIOLATION)); |
951 | 1.02k | } |
952 | 774k | } |
953 | 774k | return status; |
954 | 774k | } |
955 | | |
956 | 178 | Result<int> PgSession::TabletServerCount(bool primary_only) { |
957 | 178 | return pg_client_.TabletServerCount(primary_only); |
958 | 178 | } |
959 | | |
960 | 2 | Result<client::TabletServersInfo> PgSession::ListTabletServers() { |
961 | 2 | return pg_client_.ListLiveTabletServers(false); |
962 | 2 | } |
963 | | |
964 | 562k | bool PgSession::ShouldUseFollowerReads() const { |
965 | 562k | return pg_txn_manager_->ShouldUseFollowerReads(); |
966 | 562k | } |
967 | | |
968 | 0 | void PgSession::SetTimeout(const int timeout_ms) { |
969 | 0 | session_->SetTimeout(MonoDelta::FromMilliseconds(timeout_ms)); |
970 | 0 | pg_client_.SetTimeout(timeout_ms * 1ms); |
971 | 0 | } |
972 | | |
973 | 765k | void PgSession::ResetCatalogReadPoint() { |
974 | 765k | catalog_read_time_ = ReadHybridTime(); |
975 | 765k | } |
976 | | |
977 | 775k | void PgSession::TrySetCatalogReadPoint(const ReadHybridTime& read_ht) { |
978 | 775k | if (read_ht) { |
979 | 18.5k | catalog_read_time_ = read_ht; |
980 | 18.5k | } |
981 | 775k | } |
982 | | |
983 | 48.8k | Status PgSession::SetActiveSubTransaction(SubTransactionId id) { |
984 | | // It's required that we flush all buffered operations before changing the SubTransactionMetadata |
985 | | // used by the underlying batcher and RPC logic, as this will snapshot the current |
986 | | // SubTransactionMetadata for use in construction of RPCs for already-queued operations, thereby |
987 | | // ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here, |
988 | | // already queued operations may incorrectly use this newly modified SubTransactionMetadata when |
989 | | // they are eventually sent to DocDB. |
990 | 48.8k | RETURN_NOT_OK(FlushBufferedOperations()); |
991 | 48.8k | tserver::PgPerformOptionsPB* options_ptr = nullptr; |
992 | 48.8k | tserver::PgPerformOptionsPB options; |
993 | 48.8k | if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) { |
994 | 307 | auto txn_priority_requirement = kLowerPriorityRange; |
995 | 307 | if (pg_txn_manager_->GetPgIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { |
996 | 299 | txn_priority_requirement = kHighestPriority; |
997 | 299 | } |
998 | 307 | RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation( |
999 | 307 | IsReadOnlyOperation::kFalse, txn_priority_requirement)); |
1000 | 307 | options_ptr = &options; |
1001 | 307 | pg_txn_manager_->SetupPerformOptions(&options); |
1002 | 307 | } |
1003 | 48.8k | return pg_client_.SetActiveSubTransaction(id, options_ptr); |
1004 | 48.8k | } |
1005 | | |
1006 | 23.5k | Status PgSession::RollbackSubTransaction(SubTransactionId id) { |
1007 | | // TODO(savepoints) -- send async RPC to transaction status tablet, or rely on heartbeater to |
1008 | | // eventually send this metadata. |
1009 | | // See comment in SetActiveSubTransaction -- we must flush buffered operations before updating any |
1010 | | // SubTransactionMetadata. |
1011 | 23.5k | RETURN_NOT_OK(FlushBufferedOperations()); |
1012 | 23.5k | return pg_client_.RollbackSubTransaction(id); |
1013 | 23.5k | } |
1014 | | |
1015 | 1.71M | void PgSession::UpdateInTxnLimit(uint64_t* read_time) { |
1016 | 1.71M | if (!read_time) { |
1017 | 0 | return; |
1018 | 0 | } |
1019 | | |
1020 | 1.71M | if (!*read_time) { |
1021 | 252k | *read_time = clock_->Now().ToUint64(); |
1022 | 252k | } |
1023 | 1.71M | in_txn_limit_ = HybridTime(*read_time); |
1024 | 1.71M | } |
1025 | | |
1026 | 0 | Status PgSession::ValidatePlacement(const string& placement_info) { |
1027 | 0 | tserver::PgValidatePlacementRequestPB req; |
1028 | |
|
1029 | 0 | Result<PlacementInfoConverter::Placement> result = |
1030 | 0 | PlacementInfoConverter::FromString(placement_info); |
1031 | | |
1032 | | // For validation, if there is no replica_placement option, we default to the |
1033 | | // cluster configuration which the user is responsible for maintaining |
1034 | 0 | if (!result.ok() && result.status().IsInvalidArgument()) { |
1035 | 0 | return Status::OK(); |
1036 | 0 | } |
1037 | | |
1038 | 0 | RETURN_NOT_OK(result); |
1039 | |
|
1040 | 0 | PlacementInfoConverter::Placement placement = result.get(); |
1041 | 0 | for (const auto& block : placement.placement_infos) { |
1042 | 0 | auto pb = req.add_placement_infos(); |
1043 | 0 | pb->set_cloud(block.cloud); |
1044 | 0 | pb->set_region(block.region); |
1045 | 0 | pb->set_zone(block.zone); |
1046 | 0 | pb->set_min_num_replicas(block.min_num_replicas); |
1047 | 0 | } |
1048 | 0 | req.set_num_replicas(placement.num_replicas); |
1049 | |
|
1050 | 0 | return pg_client_.ValidatePlacement(&req); |
1051 | 0 | } |
1052 | | |
1053 | | } // namespace pggate |
1054 | | } // namespace yb |