/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_session.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/pggate/pg_session.h" |
17 | | |
18 | | #include <memory> |
19 | | |
20 | | #include <boost/optional.hpp> |
21 | | |
22 | | #include "yb/client/batcher.h" |
23 | | #include "yb/client/error.h" |
24 | | #include "yb/client/schema.h" |
25 | | #include "yb/client/session.h" |
26 | | #include "yb/client/table.h" |
27 | | #include "yb/client/tablet_server.h" |
28 | | #include "yb/client/transaction.h" |
29 | | #include "yb/client/yb_op.h" |
30 | | #include "yb/client/yb_table_name.h" |
31 | | |
32 | | #include "yb/common/pg_types.h" |
33 | | #include "yb/common/pgsql_error.h" |
34 | | #include "yb/common/placement_info.h" |
35 | | #include "yb/common/ql_expr.h" |
36 | | #include "yb/common/ql_value.h" |
37 | | #include "yb/common/row_mark.h" |
38 | | #include "yb/common/schema.h" |
39 | | #include "yb/common/transaction_error.h" |
40 | | |
41 | | #include "yb/docdb/doc_key.h" |
42 | | #include "yb/docdb/primitive_value.h" |
43 | | #include "yb/docdb/value_type.h" |
44 | | |
45 | | #include "yb/gutil/casts.h" |
46 | | |
47 | | #include "yb/tserver/pg_client.pb.h" |
48 | | #include "yb/tserver/tserver_shared_mem.h" |
49 | | |
50 | | #include "yb/util/flag_tags.h" |
51 | | #include "yb/util/format.h" |
52 | | #include "yb/util/result.h" |
53 | | #include "yb/util/shared_mem.h" |
54 | | #include "yb/util/status_format.h" |
55 | | #include "yb/util/string_util.h" |
56 | | |
57 | | #include "yb/yql/pggate/pg_client.h" |
58 | | #include "yb/yql/pggate/pg_expr.h" |
59 | | #include "yb/yql/pggate/pg_op.h" |
60 | | #include "yb/yql/pggate/pg_txn_manager.h" |
61 | | #include "yb/yql/pggate/pggate_flags.h" |
62 | | #include "yb/yql/pggate/ybc_pggate.h" |
63 | | |
64 | | using namespace std::literals; |
65 | | |
66 | | DEFINE_int32(ysql_wait_until_index_permissions_timeout_ms, 60 * 60 * 1000, // 60 min. |
67 | | "DEPRECATED: use backfill_index_client_rpc_timeout_ms instead."); |
68 | | TAG_FLAG(ysql_wait_until_index_permissions_timeout_ms, advanced); |
69 | | DECLARE_int32(TEST_user_ddl_operation_timeout_sec); |
70 | | |
71 | | DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests."); |
72 | | |
73 | | namespace yb { |
74 | | namespace pggate { |
75 | | |
76 | | using std::make_shared; |
77 | | using std::unique_ptr; |
78 | | using std::shared_ptr; |
79 | | using std::string; |
80 | | |
81 | | using client::YBSession; |
82 | | using client::YBMetaDataCache; |
83 | | using client::YBSchema; |
84 | | using client::YBOperation; |
85 | | using client::YBTable; |
86 | | using client::YBTableName; |
87 | | using client::YBTableType; |
88 | | |
89 | | using yb::master::GetNamespaceInfoResponsePB; |
90 | | |
91 | | using yb::tserver::TServerSharedObject; |
92 | | |
93 | | namespace { |
94 | | |
95 | 189k | docdb::PrimitiveValue NullValue(SortingType sorting) { |
96 | 189k | using SortingType = SortingType; |
97 | | |
98 | 189k | return docdb::PrimitiveValue( |
99 | 189k | sorting == SortingType::kAscendingNullsLast || sorting == SortingType::kDescendingNullsLast188k |
100 | 189k | ? docdb::ValueType::kNullHigh553 |
101 | 189k | : docdb::ValueType::kNullLow188k ); |
102 | 189k | } |
103 | | |
104 | | void InitKeyColumnPrimitiveValues( |
105 | | const google::protobuf::RepeatedPtrField<PgsqlExpressionPB> &column_values, |
106 | | const Schema &schema, |
107 | | size_t start_idx, |
108 | 2.64M | vector<docdb::PrimitiveValue> *components) { |
109 | 2.64M | size_t column_idx = start_idx; |
110 | 4.05M | for (const auto& column_value : column_values) { |
111 | 4.05M | const auto sorting_type = schema.column(column_idx).sorting_type(); |
112 | 4.05M | if (column_value.has_value()) { |
113 | 4.05M | const auto& value = column_value.value(); |
114 | 4.05M | components->push_back( |
115 | 4.05M | IsNull(value) |
116 | 4.05M | ? NullValue(sorting_type)189k |
117 | 4.05M | : docdb::PrimitiveValue::FromQLValuePB(value, sorting_type)3.86M ); |
118 | 4.05M | } else { |
119 | | // TODO(neil) The current setup only works for CQL as it assumes primary key value must not |
120 | | // be dependent on any column values. This needs to be fixed as PostgreSQL expression might |
121 | | // require a read from a table. |
122 | | // |
123 | | // Use regular executor for now. |
124 | 127 | QLExprExecutor executor; |
125 | 127 | QLExprResult result; |
126 | 127 | auto s = executor.EvalExpr(column_value, nullptr, result.Writer()); |
127 | | |
128 | 127 | components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type)); |
129 | 127 | } |
130 | 4.05M | ++column_idx; |
131 | 4.05M | } |
132 | 2.64M | } |
133 | | |
134 | 892k | bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const string& table_id) { |
135 | 892k | return request.table_id() == table_id || |
136 | 892k | (864k request.has_index_request()864k && IsTableUsedByRequest(request.index_request(), table_id)316k ); |
137 | 892k | } |
138 | | |
139 | 601 | bool IsTableUsedByRequest(const PgsqlWriteRequestPB& request, const string& table_id) { |
140 | 601 | return request.table_id() == table_id; |
141 | 601 | } |
142 | | |
143 | 576k | bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) { |
144 | 576k | if (op.is_read()) { |
145 | 575k | return IsTableUsedByRequest(down_cast<const PgsqlReadOp&>(op).read_request(), table_id); |
146 | 575k | } else { |
147 | 605 | return IsTableUsedByRequest(down_cast<const PgsqlWriteOp&>(op).write_request(), table_id); |
148 | 605 | } |
149 | 576k | } |
150 | | |
151 | | struct PgForeignKeyReferenceLightweight { |
152 | | PgOid table_id; |
153 | | Slice ybctid; |
154 | | }; |
155 | | |
156 | 13.2M | size_t ForeignKeyReferenceHash(PgOid table_id, const char* begin, const char* end) { |
157 | 13.2M | size_t hash = 0; |
158 | 13.2M | boost::hash_combine(hash, table_id); |
159 | 13.2M | boost::hash_range(hash, begin, end); |
160 | 13.2M | return hash; |
161 | 13.2M | } |
162 | | |
163 | | template<class Container> |
164 | 5.59M | auto Find(const Container& container, PgOid table_id, const Slice& ybctid) { |
165 | 5.59M | return container.find(PgForeignKeyReferenceLightweight{table_id, ybctid}, |
166 | 5.59M | [](const auto& k) { |
167 | 5.59M | return ForeignKeyReferenceHash(k.table_id, k.ybctid.cdata(), k.ybctid.cend()); }, |
168 | 5.59M | [](const auto& l, const auto& r) { |
169 | 4.32M | return l.table_id == r.table_id && l.ybctid == r.ybctid3.84M ; }); |
170 | 5.59M | } |
171 | | |
172 | | template<class Container> |
173 | 726k | bool Erase(Container* container, PgOid table_id, const Slice& ybctid) { |
174 | 726k | const auto it = Find(*container, table_id, ybctid); |
175 | 726k | if (it != container->end()) { |
176 | 4.18k | container->erase(it); |
177 | 4.18k | return true; |
178 | 4.18k | } |
179 | 722k | return false; |
180 | 726k | } |
181 | | |
182 | | YB_DEFINE_ENUM(SessionType, (kRegular)(kTransactional)(kCatalog)); |
183 | | |
184 | | Result<bool> ShouldHandleTransactionally( |
185 | 20.4M | const PgTxnManager& txn_manager, const PgTableDesc& table, const PgsqlOp& op) { |
186 | 20.4M | if (!table.schema().table_properties().is_transactional() || |
187 | 20.4M | !op.need_transaction()20.4M || |
188 | 20.4M | YBCIsInitDbModeEnvVarSet()19.5M ) { |
189 | 1.23M | return false; |
190 | 1.23M | } |
191 | 19.2M | const auto has_non_ddl_txn = txn_manager.IsTxnInProgress(); |
192 | 19.2M | if (!table.schema().table_properties().is_ysql_catalog_table()) { |
193 | 15.4M | SCHECK(has_non_ddl_txn, IllegalState, "Transactional operation requires transaction"); |
194 | 15.4M | return true; |
195 | 15.4M | } |
196 | | // Previously, yb_non_ddl_txn_for_sys_tables_allowed flag caused CREATE VIEW to fail with |
197 | | // read restart error because subsequent cache refresh used an outdated txn to read from the |
198 | | // system catalog, |
199 | | // As a quick fix, we prevent yb_non_ddl_txn_for_sys_tables_allowed from affecting reads. |
200 | 3.75M | if (txn_manager.IsDdlMode() || (1.45M yb_non_ddl_txn_for_sys_tables_allowed1.45M && has_non_ddl_txn119k )) { |
201 | 2.41M | return true; |
202 | 2.41M | } |
203 | 1.34M | if (op.is_write()) { |
204 | | // For consistent read from catalog tables all write operations must be done in transaction. |
205 | 2 | return STATUS_FORMAT(IllegalState, |
206 | 2 | "Transaction for catalog table write operation '$0' not found", |
207 | 2 | table.table_name().table_name()); |
208 | 2 | } |
209 | 1.34M | return false; |
210 | 1.34M | } |
211 | | |
212 | | Result<SessionType> GetRequiredSessionType( |
213 | 20.4M | const PgTxnManager& txn_manager, const PgTableDesc& table, const PgsqlOp& op) { |
214 | 20.4M | if (VERIFY_RESULT(ShouldHandleTransactionally(txn_manager, table, op))) { |
215 | 17.9M | return SessionType::kTransactional; |
216 | 17.9M | } |
217 | | |
218 | 2.57M | return !YBCIsInitDbModeEnvVarSet() && table.schema().table_properties().is_ysql_catalog_table()2.31M |
219 | 2.57M | ? SessionType::kCatalog1.33M |
220 | 2.57M | : SessionType::kRegular1.23M ; |
221 | 20.4M | } |
222 | | |
223 | | } // namespace |
224 | | |
225 | | |
226 | | PerformFuture::PerformFuture( |
227 | | std::future<PerformResult> future, PgSession* session, PgObjectIds* relations) |
228 | 2.16M | : future_(std::move(future)), session_(session), relations_(std::move(*relations)) {} |
229 | | |
230 | 35.5M | bool PerformFuture::Valid() const { |
231 | 35.5M | return session_ != nullptr; |
232 | 35.5M | } |
233 | | |
234 | 2.17M | CHECKED_STATUS PerformFuture::Get() { |
235 | 2.17M | auto result = future_.get(); |
236 | 2.17M | auto session = session_; |
237 | 2.17M | session_ = nullptr; |
238 | 2.17M | session->TrySetCatalogReadPoint(result.catalog_read_time); |
239 | 2.17M | return session->PatchStatus(result.status, relations_); |
240 | 2.17M | } |
241 | | |
242 | | //-------------------------------------------------------------------------------------------------- |
243 | | // Class PgSession::RunHelper |
244 | | //-------------------------------------------------------------------------------------------------- |
245 | | |
246 | | class PgSession::RunHelper { |
247 | | public: |
248 | | RunHelper(PgSession* pg_session, SessionType session_type) |
249 | | : pg_session_(*pg_session), |
250 | | session_type_(session_type), |
251 | | buffer_(IsTransactional(session_type) ? pg_session_.buffered_txn_ops_ |
252 | 8.91M | : pg_session_.buffered_ops_) { |
253 | 8.91M | } |
254 | | |
255 | | CHECKED_STATUS Apply(const PgTableDesc& table, |
256 | | const PgsqlOpPtr& op, |
257 | | uint64_t* read_time, |
258 | 11.5M | bool force_non_bufferable) { |
259 | 11.5M | auto& buffered_keys = pg_session_.buffered_keys_; |
260 | | // Try buffering this operation if it is a write operation, buffering is enabled and no |
261 | | // operations have been already applied to current session (yb session does not exist). |
262 | 11.5M | if (operations_.empty() && pg_session_.buffering_enabled_8.92M && |
263 | 11.5M | !force_non_bufferable8.37M && op->is_write()8.25M ) { |
264 | 7.15M | const auto& wop = down_cast<PgsqlWriteOp&>(*op).write_request(); |
265 | | // Check for buffered operation related to same row. |
266 | | // If multiple operations are performed in context of single RPC second operation will not |
267 | | // see the results of first operation on DocDB side. |
268 | | // Multiple operations on same row must be performed in context of different RPC. |
269 | | // Flush is required in this case. |
270 | 7.15M | RowIdentifier row_id(table.schema(), wop); |
271 | 7.15M | if (PREDICT_FALSE(!buffered_keys.insert(row_id).second)) { |
272 | 22.8k | RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); |
273 | 22.8k | buffered_keys.insert(row_id); |
274 | 22.8k | } |
275 | 7.15M | if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { |
276 | 0 | LOG(INFO) << "Buffering operation: " << wop.ShortDebugString(); |
277 | 0 | } |
278 | 7.15M | buffer_.Add(op, table.id()); |
279 | | // Flush buffers in case limit of operations in single RPC exceeded. |
280 | 7.15M | return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size) |
281 | 7.15M | ? Status::OK()7.13M |
282 | 7.15M | : pg_session_.FlushBufferedOperations()10.2k ; |
283 | 7.15M | } |
284 | 4.41M | bool read_only = op->is_read(); |
285 | | // Flush all buffered operations (if any) before performing non-bufferable operation |
286 | 4.41M | if (!buffered_keys.empty()) { |
287 | 126k | SCHECK(operations_.empty(), |
288 | 126k | IllegalState, |
289 | 126k | "Buffered operations must be flushed before applying first non-bufferable operation"); |
290 | | // Buffered operations can't be combined within single RPC with non bufferable operation |
291 | | // in case non bufferable operation has preset read_time. |
292 | | // Buffered operations must be flushed independently in this case. |
293 | | // Also operations for catalog session can be combined with buffered operations |
294 | | // as catalog session is used for read-only operations. |
295 | 126k | bool full_flush_required = (IsTransactional() && read_time117k && *read_time117k ) || IsCatalog()123k ; |
296 | | // Check for buffered operation that affected same table as current operation. |
297 | 703k | for (auto i = buffered_keys.begin(); !full_flush_required && i != buffered_keys.end()670k ; ++i576k ) { |
298 | 576k | full_flush_required = IsTableUsedByOperation(*op, i->table_id()); |
299 | 576k | } |
300 | 126k | if (full_flush_required) { |
301 | 33.2k | RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); |
302 | 93.7k | } else { |
303 | 93.7k | RETURN_NOT_OK(pg_session_.FlushBufferedOperationsImpl( |
304 | 93.7k | [this](auto ops, auto transactional) -> Status { |
305 | 93.7k | if (transactional == IsTransactional()) { |
306 | | // Save buffered operations for further applying before non-buffered operation. |
307 | 93.7k | operations_.Swap(&ops); |
308 | 93.7k | return Status::OK(); |
309 | 93.7k | } |
310 | 93.7k | return pg_session_.FlushOperations(std::move(ops), transactional); |
311 | 93.7k | })); |
312 | 93.7k | read_only = read_only && operations_.empty()93.1k ; |
313 | 93.7k | } |
314 | 126k | } |
315 | | |
316 | 4.41M | if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { |
317 | 0 | LOG(INFO) << "Applying operation: " << op->ToString(); |
318 | 0 | } |
319 | | |
320 | 4.41M | operations_.Add(op, table.id()); |
321 | | |
322 | 4.41M | if (!IsTransactional()) { |
323 | 734k | return Status::OK(); |
324 | 734k | } |
325 | | |
326 | 3.67M | TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange; |
327 | 3.67M | if (pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { |
328 | 55.9k | txn_priority_requirement = kHighestPriority; |
329 | 3.62M | } else if (op->is_read()) { |
330 | 3.60M | const auto& read_req = down_cast<PgsqlReadOp&>(*op).read_request(); |
331 | 3.60M | auto row_mark_type = GetRowMarkTypeFromPB(read_req); |
332 | 3.60M | read_only = read_only && !IsValidRowMarkType(row_mark_type)3.51M ; |
333 | 3.60M | if (RowMarkNeedsHigherPriority((RowMarkType) row_mark_type)) { |
334 | 5.77k | txn_priority_requirement = kHigherPriorityRange; |
335 | 5.77k | } |
336 | 3.60M | } |
337 | 3.67M | pg_session_.UpdateInTxnLimit(read_time); |
338 | 3.67M | return pg_session_.pg_txn_manager_->CalculateIsolation(read_only, txn_priority_requirement); |
339 | 4.41M | } |
340 | | |
341 | 8.92M | Result<PerformFuture> Flush() { |
342 | 8.92M | if (operations_.empty()) { |
343 | | // All operations were buffered, no need to flush. |
344 | 7.15M | return PerformFuture(); |
345 | 7.15M | } |
346 | | |
347 | 1.76M | auto promise = std::make_shared<std::promise<PerformResult>>(); |
348 | | |
349 | 1.76M | pg_session_.Perform(&operations_.operations, IsCatalog(), [promise](PerformResult result) { |
350 | 1.76M | promise->set_value(result); |
351 | 1.76M | }); |
352 | 1.76M | return PerformFuture(promise->get_future(), &pg_session_, &operations_.relations); |
353 | 8.92M | } |
354 | | |
355 | | private: |
356 | 13.5M | static bool IsTransactional(SessionType type) { |
357 | 13.5M | return type == SessionType::kTransactional; |
358 | 13.5M | } |
359 | | |
360 | 4.63M | bool IsTransactional() const { |
361 | 4.63M | return IsTransactional(session_type_); |
362 | 4.63M | } |
363 | | |
364 | 1.89M | bool IsCatalog() const { |
365 | 1.89M | return session_type_ == SessionType::kCatalog; |
366 | 1.89M | } |
367 | | |
368 | | PgSession& pg_session_; |
369 | | const SessionType session_type_; |
370 | | BufferableOperations& buffer_; |
371 | | BufferableOperations operations_; |
372 | | }; |
373 | | |
374 | | //-------------------------------------------------------------------------------------------------- |
375 | | // Class PgForeignKeyReference |
376 | | //-------------------------------------------------------------------------------------------------- |
377 | | |
378 | | PgForeignKeyReference::PgForeignKeyReference(PgOid tid, std::string yid) : |
379 | 4.72M | table_id(tid), ybctid(std::move(yid)) { |
380 | 4.72M | } |
381 | | |
382 | 4.17M | bool operator==(const PgForeignKeyReference& k1, const PgForeignKeyReference& k2) { |
383 | 4.17M | return k1.table_id == k2.table_id && k1.ybctid == k2.ybctid3.85M ; |
384 | 4.17M | } |
385 | | |
386 | 7.65M | size_t hash_value(const PgForeignKeyReference& key) { |
387 | 7.65M | return ForeignKeyReferenceHash( |
388 | 7.65M | key.table_id, key.ybctid.c_str(), key.ybctid.c_str() + key.ybctid.length()); |
389 | 7.65M | } |
390 | | |
391 | | //-------------------------------------------------------------------------------------------------- |
392 | | // Class RowIdentifier |
393 | | //-------------------------------------------------------------------------------------------------- |
394 | | |
395 | | RowIdentifier::RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request) |
396 | 7.15M | : table_id_(&request.table_id()) { |
397 | 7.15M | if (request.has_ybctid_column_value()) { |
398 | 5.82M | ybctid_ = &request.ybctid_column_value().value().binary_value(); |
399 | 5.82M | } else { |
400 | 1.32M | vector<docdb::PrimitiveValue> hashed_components; |
401 | 1.32M | vector<docdb::PrimitiveValue> range_components; |
402 | 1.32M | InitKeyColumnPrimitiveValues(request.partition_column_values(), |
403 | 1.32M | schema, |
404 | 1.32M | 0 /* start_idx */, |
405 | 1.32M | &hashed_components); |
406 | 1.32M | InitKeyColumnPrimitiveValues(request.range_column_values(), |
407 | 1.32M | schema, |
408 | 1.32M | schema.num_hash_key_columns(), |
409 | 1.32M | &range_components); |
410 | 1.32M | if (hashed_components.empty()) { |
411 | 813k | ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer(); |
412 | 813k | } else { |
413 | 511k | ybctid_holder_ = docdb::DocKey(request.hash_code(), |
414 | 511k | std::move(hashed_components), |
415 | 511k | std::move(range_components)).Encode().ToStringBuffer(); |
416 | 511k | } |
417 | 1.32M | ybctid_ = nullptr; |
418 | 1.32M | } |
419 | 7.15M | } |
420 | | |
421 | 10.7M | const string& RowIdentifier::ybctid() const { |
422 | 10.7M | return ybctid_ ? *ybctid_8.83M : ybctid_holder_1.87M ; |
423 | 10.7M | } |
424 | | |
425 | 11.8M | const string& RowIdentifier::table_id() const { |
426 | 11.8M | return *table_id_; |
427 | 11.8M | } |
428 | | |
429 | 2.03M | bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) { |
430 | 2.03M | return k1.table_id() == k2.table_id() && k1.ybctid() == k2.ybctid()1.76M ; |
431 | 2.03M | } |
432 | | |
433 | 7.17M | size_t hash_value(const RowIdentifier& key) { |
434 | 7.17M | size_t hash = 0; |
435 | 7.17M | boost::hash_combine(hash, key.table_id()); |
436 | 7.17M | boost::hash_combine(hash, key.ybctid()); |
437 | 7.17M | return hash; |
438 | 7.17M | } |
439 | | |
440 | | //-------------------------------------------------------------------------------------------------- |
441 | | // Class PgSession |
442 | | //-------------------------------------------------------------------------------------------------- |
443 | | |
444 | | PgSession::PgSession( |
445 | | PgClient* pg_client, |
446 | | const string& database_name, |
447 | | scoped_refptr<PgTxnManager> pg_txn_manager, |
448 | | scoped_refptr<server::HybridClock> clock, |
449 | | const tserver::TServerSharedObject* tserver_shared_object, |
450 | | const YBCPgCallbacks& pg_callbacks) |
451 | | : pg_client_(*pg_client), |
452 | | pg_txn_manager_(std::move(pg_txn_manager)), |
453 | | clock_(std::move(clock)), |
454 | | tserver_shared_object_(tserver_shared_object), |
455 | 6.09k | pg_callbacks_(pg_callbacks) { |
456 | 6.09k | } |
457 | | |
458 | 6.06k | PgSession::~PgSession() { |
459 | 6.06k | } |
460 | | |
461 | | //-------------------------------------------------------------------------------------------------- |
462 | | |
463 | 6.08k | Status PgSession::ConnectDatabase(const string& database_name) { |
464 | 6.08k | connected_database_ = database_name; |
465 | 6.08k | return Status::OK(); |
466 | 6.08k | } |
467 | | |
468 | 5.72k | Status PgSession::IsDatabaseColocated(const PgOid database_oid, bool *colocated) { |
469 | 5.72k | auto resp = VERIFY_RESULT(pg_client_.GetDatabaseInfo(database_oid)); |
470 | 0 | *colocated = resp.colocated(); |
471 | 5.72k | return Status::OK(); |
472 | 5.72k | } |
473 | | |
474 | | //-------------------------------------------------------------------------------------------------- |
475 | | |
476 | 72 | Status PgSession::DropDatabase(const string& database_name, PgOid database_oid) { |
477 | 72 | tserver::PgDropDatabaseRequestPB req; |
478 | 72 | req.set_database_name(database_name); |
479 | 72 | req.set_database_oid(database_oid); |
480 | | |
481 | 72 | RETURN_NOT_OK(pg_client_.DropDatabase(&req, CoarseTimePoint())); |
482 | 71 | RETURN_NOT_OK(DeleteDBSequences(database_oid)); |
483 | 71 | return Status::OK(); |
484 | 71 | } |
485 | | |
486 | 22 | Status PgSession::GetCatalogMasterVersion(uint64_t *version) { |
487 | 22 | *version = VERIFY_RESULT(pg_client_.GetCatalogMasterVersion()); |
488 | 0 | return Status::OK(); |
489 | 22 | } |
490 | | |
491 | 0 | Status PgSession::CreateSequencesDataTable() { |
492 | 0 | return pg_client_.CreateSequencesDataTable(); |
493 | 0 | } |
494 | | |
495 | | Status PgSession::InsertSequenceTuple(int64_t db_oid, |
496 | | int64_t seq_oid, |
497 | | uint64_t ysql_catalog_version, |
498 | | int64_t last_val, |
499 | 295 | bool is_called) { |
500 | 295 | return pg_client_.InsertSequenceTuple( |
501 | 295 | db_oid, seq_oid, ysql_catalog_version, last_val, is_called); |
502 | 295 | } |
503 | | |
504 | | Result<bool> PgSession::UpdateSequenceTuple(int64_t db_oid, |
505 | | int64_t seq_oid, |
506 | | uint64_t ysql_catalog_version, |
507 | | int64_t last_val, |
508 | | bool is_called, |
509 | | boost::optional<int64_t> expected_last_val, |
510 | 2.97k | boost::optional<bool> expected_is_called) { |
511 | 2.97k | return pg_client_.UpdateSequenceTuple( |
512 | 2.97k | db_oid, seq_oid, ysql_catalog_version, last_val, is_called, expected_last_val, |
513 | 2.97k | expected_is_called); |
514 | 2.97k | } |
515 | | |
516 | | Result<std::pair<int64_t, bool>> PgSession::ReadSequenceTuple(int64_t db_oid, |
517 | | int64_t seq_oid, |
518 | 3.23k | uint64_t ysql_catalog_version) { |
519 | 3.23k | return pg_client_.ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version); |
520 | 3.23k | } |
521 | | |
522 | 282 | Status PgSession::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { |
523 | 282 | return pg_client_.DeleteSequenceTuple(db_oid, seq_oid); |
524 | 282 | } |
525 | | |
526 | 71 | Status PgSession::DeleteDBSequences(int64_t db_oid) { |
527 | 71 | return pg_client_.DeleteDBSequences(db_oid); |
528 | 71 | } |
529 | | |
530 | | //-------------------------------------------------------------------------------------------------- |
531 | | |
532 | 3.48k | Status PgSession::DropTable(const PgObjectId& table_id) { |
533 | 3.48k | tserver::PgDropTableRequestPB req; |
534 | 3.48k | table_id.ToPB(req.mutable_table_id()); |
535 | 3.48k | return ResultToStatus(pg_client_.DropTable(&req, CoarseTimePoint())); |
536 | 3.48k | } |
537 | | |
538 | | Status PgSession::DropIndex( |
539 | | const PgObjectId& index_id, |
540 | 669 | client::YBTableName* indexed_table_name) { |
541 | 669 | tserver::PgDropTableRequestPB req; |
542 | 669 | index_id.ToPB(req.mutable_table_id()); |
543 | 669 | req.set_index(true); |
544 | 669 | auto result = VERIFY_RESULT667 (pg_client_.DropTable(&req, CoarseTimePoint()));667 |
545 | 667 | if (indexed_table_name) { |
546 | 667 | *indexed_table_name = std::move(result); |
547 | 667 | } |
548 | 667 | return Status::OK(); |
549 | 669 | } |
550 | | |
551 | | Status PgSession::DropTablegroup(const PgOid database_oid, |
552 | 0 | PgOid tablegroup_oid) { |
553 | 0 | tserver::PgDropTablegroupRequestPB req; |
554 | 0 | PgObjectId tablegroup_id(database_oid, tablegroup_oid); |
555 | 0 | tablegroup_id.ToPB(req.mutable_tablegroup_id()); |
556 | 0 | Status s = pg_client_.DropTablegroup(&req, CoarseTimePoint()); |
557 | 0 | InvalidateTableCache(PgObjectId(database_oid, tablegroup_oid), InvalidateOnPgClient::kFalse); |
558 | 0 | return s; |
559 | 0 | } |
560 | | |
561 | | //-------------------------------------------------------------------------------------------------- |
562 | | |
563 | 16.7M | Result<PgTableDescPtr> PgSession::LoadTable(const PgObjectId& table_id) { |
564 | 16.7M | VLOG(3) << "Loading table descriptor for " << table_id1.66k ; |
565 | | |
566 | 16.7M | auto cached_table_it = table_cache_.find(table_id); |
567 | 16.7M | bool exists = cached_table_it != table_cache_.end(); |
568 | 16.7M | if (exists && cached_table_it->second16.5M ) { |
569 | 16.5M | return cached_table_it->second; |
570 | 16.5M | } |
571 | | |
572 | 18.4E | VLOG(4) << "Table cache MISS: " << table_id; |
573 | 191k | auto table = VERIFY_RESULT190k (pg_client_.OpenTable(table_id, exists, invalidate_table_cache_time_));190k |
574 | 0 | invalidate_table_cache_time_ = CoarseTimePoint(); |
575 | 190k | if (exists) { |
576 | 9 | cached_table_it->second = table; |
577 | 190k | } else { |
578 | 190k | table_cache_.emplace(table_id, table); |
579 | 190k | } |
580 | 190k | return table; |
581 | 191k | } |
582 | | |
583 | | void PgSession::InvalidateTableCache( |
584 | 6.19k | const PgObjectId& table_id, InvalidateOnPgClient invalidate_on_pg_client) { |
585 | 6.19k | if (invalidate_on_pg_client) { |
586 | | // Keep special record about this table_id, so when we would open this table again, |
587 | | // reopen flag will be sent to pg client service. |
588 | | // This flag means that pg client service should remove table from his cache and fetch |
589 | | // new data from master. |
590 | | // It is optional optimization, but some tests fails w/o it, since they expect that |
591 | | // local table information is updated after alter table operation. |
592 | 9 | table_cache_[table_id] = nullptr; |
593 | 6.18k | } else { |
594 | 6.18k | auto it = table_cache_.find(table_id); |
595 | 6.18k | if (it != table_cache_.end() && it->second2.92k ) { |
596 | 2.92k | table_cache_.erase(it); |
597 | 2.92k | } |
598 | 6.18k | } |
599 | 6.19k | } |
600 | | |
601 | 1.23k | void PgSession::InvalidateAllTablesCache() { |
602 | 1.23k | invalidate_table_cache_time_ = CoarseMonoClock::now(); |
603 | 1.23k | table_cache_.clear(); |
604 | 1.23k | } |
605 | | |
606 | 667k | Status PgSession::StartOperationsBuffering() { |
607 | 667k | SCHECK(!buffering_enabled_, IllegalState, "Buffering has been already started"); |
608 | 667k | if (PREDICT_FALSE(!buffered_keys_.empty())) { |
609 | 0 | LOG(DFATAL) << "Buffering hasn't been started yet but " |
610 | 0 | << buffered_keys_.size() |
611 | 0 | << " buffered operations found"; |
612 | 0 | } |
613 | 667k | buffering_enabled_ = true; |
614 | 667k | return Status::OK(); |
615 | 667k | } |
616 | | |
617 | 610k | Status PgSession::StopOperationsBuffering() { |
618 | 610k | SCHECK(buffering_enabled_, IllegalState, "Buffering hasn't been started"); |
619 | 610k | buffering_enabled_ = false; |
620 | 610k | return FlushBufferedOperations(); |
621 | 610k | } |
622 | | |
623 | 114k | void PgSession::ResetOperationsBuffering() { |
624 | 114k | DropBufferedOperations(); |
625 | 114k | buffering_enabled_ = false; |
626 | 114k | } |
627 | | |
628 | 1.82M | Status PgSession::FlushBufferedOperations() { |
629 | 1.82M | return FlushBufferedOperationsImpl([this](auto ops, auto txn) { |
630 | 400k | return this->FlushOperations(std::move(ops), txn); |
631 | 400k | }); |
632 | 1.82M | } |
633 | | |
634 | 249k | void PgSession::DropBufferedOperations() { |
635 | 249k | VLOG_IF(1, !buffered_keys_.empty()) |
636 | 193 | << "Dropping " << buffered_keys_.size() << " pending operations"; |
637 | 249k | buffered_keys_.clear(); |
638 | 249k | buffered_ops_.Clear(); |
639 | 249k | buffered_txn_ops_.Clear(); |
640 | 249k | } |
641 | | |
642 | 3.95M | PgIsolationLevel PgSession::GetIsolationLevel() { |
643 | 3.95M | return pg_txn_manager_->GetPgIsolationLevel(); |
644 | 3.95M | } |
645 | | |
646 | 1.92M | Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) { |
647 | 1.92M | auto ops = std::move(buffered_ops_); |
648 | 1.92M | auto txn_ops = std::move(buffered_txn_ops_); |
649 | 1.92M | buffered_keys_.clear(); |
650 | 1.92M | buffered_ops_.Clear(); |
651 | 1.92M | buffered_txn_ops_.Clear(); |
652 | 1.92M | if (!ops.empty()) { |
653 | 127k | RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse)); |
654 | 127k | } |
655 | 1.92M | if (!txn_ops.empty()) { |
656 | 366k | SCHECK(!YBCIsInitDbModeEnvVarSet(), |
657 | 366k | IllegalState, |
658 | 366k | "No transactional operations are expected in the initdb mode"); |
659 | 366k | RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue)); |
660 | 366k | } |
661 | 1.86M | return Status::OK(); |
662 | 1.92M | } |
663 | | |
664 | 2 | Result<bool> PgSession::IsInitDbDone() { |
665 | 2 | return pg_client_.IsInitDbDone(); |
666 | 2 | } |
667 | | |
668 | 400k | Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional) { |
669 | 400k | DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size); |
670 | | |
671 | 400k | if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { |
672 | 0 | LOG(INFO) << "Flushing buffered operations, using " |
673 | 0 | << (transactional ? "transactional" : "non-transactional") |
674 | 0 | << " session (num ops: " << ops.size() << ")"; |
675 | 0 | } |
676 | | |
677 | 400k | if (transactional) { |
678 | 277k | TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange; |
679 | 277k | if (GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { |
680 | 10.5k | txn_priority_requirement = kHighestPriority; |
681 | 10.5k | } |
682 | | |
683 | 277k | RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(false, txn_priority_requirement)); |
684 | 277k | in_txn_limit_ = clock_->Now(); |
685 | 277k | } |
686 | | |
687 | 400k | std::promise<PerformResult> promise; |
688 | 400k | Perform( |
689 | 401k | &ops.operations, /* use_catalog_session */ false, [&promise](const PerformResult& result) { |
690 | 401k | promise.set_value(result); |
691 | 401k | }); |
692 | 400k | PerformFuture future(promise.get_future(), this, &ops.relations); |
693 | 400k | return future.Get(); |
694 | 400k | } |
695 | | |
696 | | void PgSession::Perform( |
697 | 2.16M | PgsqlOps* operations, bool use_catalog_session, const PerformCallback& callback) { |
698 | 2.16M | tserver::PgPerformOptionsPB options; |
699 | | |
700 | 2.16M | if (use_catalog_session) { |
701 | 669k | if (catalog_read_time_) { |
702 | 669k | if (*catalog_read_time_) { |
703 | 592k | catalog_read_time_->ToPB(options.mutable_read_time()); |
704 | 592k | } else { |
705 | 77.2k | options.mutable_read_time(); |
706 | 77.2k | } |
707 | 669k | } |
708 | 669k | options.set_use_catalog_session(true); |
709 | 1.49M | } else { |
710 | 1.49M | pg_txn_manager_->SetupPerformOptions(&options); |
711 | | |
712 | 1.49M | if (in_txn_limit_ && pg_txn_manager_->IsTxnInProgress()1.42M ) { |
713 | 1.42M | options.set_in_txn_limit_ht(in_txn_limit_.ToUint64()); |
714 | 1.42M | } |
715 | 1.49M | } |
716 | 2.16M | options.set_force_global_transaction(yb_force_global_transaction); |
717 | | |
718 | 2.16M | pg_client_.PerformAsync(&options, operations, callback); |
719 | 2.16M | } |
720 | | |
721 | 442k | Result<uint64_t> PgSession::GetSharedCatalogVersion() { |
722 | 442k | if (tserver_shared_object_) { |
723 | 442k | return (**tserver_shared_object_).ysql_catalog_version(); |
724 | 442k | } else { |
725 | 91 | return STATUS(NotSupported, "Tablet server shared memory has not been opened"); |
726 | 91 | } |
727 | 442k | } |
728 | | |
729 | 1.95k | Result<uint64_t> PgSession::GetSharedAuthKey() { |
730 | 1.95k | if (tserver_shared_object_) { |
731 | 1.95k | return (**tserver_shared_object_).postgres_auth_key(); |
732 | 1.95k | } else { |
733 | 0 | return STATUS(NotSupported, "Tablet server shared memory has not been opened"); |
734 | 0 | } |
735 | 1.95k | } |
736 | | |
737 | | Result<bool> PgSession::ForeignKeyReferenceExists(PgOid table_id, |
738 | | const Slice& ybctid, |
739 | 237k | const YbctidReader& reader) { |
740 | 237k | if (Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end()) { |
741 | 235k | return true; |
742 | 235k | } |
743 | | |
744 | | // Check existence of required FK intent. |
745 | | // Absence means the key was checked by previous batched request and was not found. |
746 | 1.99k | if (!Erase(&fk_reference_intent_, table_id, ybctid)) { |
747 | 4 | return false; |
748 | 4 | } |
749 | 1.99k | std::vector<Slice> ybctids; |
750 | 1.99k | const auto reserved_size = std::min<size_t>(FLAGS_ysql_session_max_batch_size, |
751 | 1.99k | fk_reference_intent_.size() + 1); |
752 | 1.99k | ybctids.reserve(reserved_size); |
753 | 1.99k | ybctids.push_back(ybctid); |
754 | | // TODO(dmitry): In case number of keys for same table > FLAGS_ysql_session_max_batch_size |
755 | | // two strategy are possible: |
756 | | // 1. select keys belonging to same tablet to reduce number of simultaneous RPC |
757 | | // 2. select keys belonging to different tablets to distribute reads among different nodes |
758 | 195k | const auto intent_match = [table_id](const auto& key) { return key.table_id == table_id; }; |
759 | 1.99k | for (auto it = fk_reference_intent_.begin(); |
760 | 101k | it != fk_reference_intent_.end() && ybctids.size() < FLAGS_ysql_session_max_batch_size100k ; |
761 | 99.9k | ++it) { |
762 | 99.9k | if (intent_match(*it)) { |
763 | 99.8k | ybctids.push_back(it->ybctid); |
764 | 99.8k | } |
765 | 99.9k | } |
766 | 101k | for (auto& r : VERIFY_RESULT1.94k (reader(table_id, ybctids)))1.94k { |
767 | 101k | fk_reference_cache_.emplace(table_id, std::move(r)); |
768 | 101k | } |
769 | | // Remove used intents. |
770 | 1.94k | auto intent_count_for_remove = ybctids.size() - 1; |
771 | 1.94k | if (intent_count_for_remove == fk_reference_intent_.size()) { |
772 | 1.69k | fk_reference_intent_.clear(); |
773 | 1.69k | } else { |
774 | 244 | for (auto it = fk_reference_intent_.begin(); |
775 | 96.3k | it != fk_reference_intent_.end() && intent_count_for_remove > 096.3k ;) { |
776 | 96.0k | if (intent_match(*it)) { |
777 | 96.0k | it = fk_reference_intent_.erase(it); |
778 | 96.0k | --intent_count_for_remove; |
779 | 96.0k | } else { |
780 | 4 | ++it; |
781 | 4 | } |
782 | 96.0k | } |
783 | 244 | } |
784 | 1.94k | return Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end(); |
785 | 1.99k | } |
786 | | |
787 | 245k | void PgSession::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) { |
788 | 245k | if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) { |
789 | 241k | fk_reference_intent_.emplace(table_id, ybctid.ToBuffer()); |
790 | 241k | } |
791 | 245k | } |
792 | | |
793 | 4.38M | void PgSession::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
794 | 4.38M | if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) { |
795 | 4.38M | fk_reference_cache_.emplace(table_id, ybctid.ToBuffer()); |
796 | 4.38M | } |
797 | 4.38M | } |
798 | | |
799 | 724k | void PgSession::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
800 | 724k | Erase(&fk_reference_cache_, table_id, ybctid); |
801 | 724k | } |
802 | | |
803 | 2.17M | Status PgSession::PatchStatus(const Status& status, const PgObjectIds& relations) { |
804 | 2.17M | if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR) { |
805 | 11.1k | auto op_index = OpIndex::ValueFromStatus(status); |
806 | 11.1k | if (op_index && *op_index < relations.size()) { |
807 | 11.1k | char constraint_name[0xFF]; |
808 | 11.1k | constraint_name[sizeof(constraint_name) - 1] = 0; |
809 | 11.1k | pg_callbacks_.FetchUniqueConstraintName(relations[*op_index].object_oid, |
810 | 11.1k | constraint_name, |
811 | 11.1k | sizeof(constraint_name) - 1); |
812 | 11.1k | return STATUS( |
813 | 11.1k | AlreadyPresent, |
814 | 11.1k | Format("duplicate key value violates unique constraint \"$0\"", Slice(constraint_name)), |
815 | 11.1k | Slice(), |
816 | 11.1k | PgsqlError(YBPgErrorCode::YB_PG_UNIQUE_VIOLATION)); |
817 | 11.1k | } |
818 | 11.1k | } |
819 | 2.15M | return status; |
820 | 2.17M | } |
821 | | |
822 | 6.04k | Result<int> PgSession::TabletServerCount(bool primary_only) { |
823 | 6.04k | return pg_client_.TabletServerCount(primary_only); |
824 | 6.04k | } |
825 | | |
826 | 4 | Result<client::TabletServersInfo> PgSession::ListTabletServers() { |
827 | 4 | return pg_client_.ListLiveTabletServers(false); |
828 | 4 | } |
829 | | |
830 | 1.59M | bool PgSession::ShouldUseFollowerReads() const { |
831 | 1.59M | return pg_txn_manager_->ShouldUseFollowerReads(); |
832 | 1.59M | } |
833 | | |
834 | 2 | void PgSession::SetTimeout(const int timeout_ms) { |
835 | 2 | pg_client_.SetTimeout(timeout_ms * 1ms); |
836 | 2 | } |
837 | | |
838 | 2.37M | void PgSession::ResetCatalogReadPoint() { |
839 | 2.37M | catalog_read_time_ = ReadHybridTime(); |
840 | 2.37M | } |
841 | | |
842 | 2.17M | void PgSession::TrySetCatalogReadPoint(const ReadHybridTime& read_ht) { |
843 | 2.17M | if (read_ht) { |
844 | 77.1k | catalog_read_time_ = read_ht; |
845 | 77.1k | } |
846 | 2.17M | } |
847 | | |
848 | 61.7k | Status PgSession::SetActiveSubTransaction(SubTransactionId id) { |
849 | | // It's required that we flush all buffered operations before changing the SubTransactionMetadata |
850 | | // used by the underlying batcher and RPC logic, as this will snapshot the current |
851 | | // SubTransactionMetadata for use in construction of RPCs for already-queued operations, thereby |
852 | | // ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here, |
853 | | // already queued operations may incorrectly use this newly modified SubTransactionMetadata when |
854 | | // they are eventually sent to DocDB. |
855 | 61.7k | RETURN_NOT_OK(FlushBufferedOperations()); |
856 | 61.7k | tserver::PgPerformOptionsPB* options_ptr = nullptr; |
857 | 61.7k | tserver::PgPerformOptionsPB options; |
858 | 61.7k | if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) { |
859 | 7.28k | auto txn_priority_requirement = kLowerPriorityRange; |
860 | 7.28k | if (pg_txn_manager_->GetPgIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { |
861 | 6.33k | txn_priority_requirement = kHighestPriority; |
862 | 6.33k | } |
863 | 7.28k | RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation( |
864 | 7.28k | IsReadOnlyOperation::kFalse, txn_priority_requirement)); |
865 | 7.28k | options_ptr = &options; |
866 | 7.28k | pg_txn_manager_->SetupPerformOptions(&options); |
867 | 7.28k | } |
868 | 61.7k | return pg_client_.SetActiveSubTransaction(id, options_ptr); |
869 | 61.7k | } |
870 | | |
871 | 13.5k | Status PgSession::RollbackSubTransaction(SubTransactionId id) { |
872 | | // TODO(savepoints) -- send async RPC to transaction status tablet, or rely on heartbeater to |
873 | | // eventually send this metadata. |
874 | | // See comment in SetActiveSubTransaction -- we must flush buffered operations before updating any |
875 | | // SubTransactionMetadata. |
876 | 13.5k | RETURN_NOT_OK(FlushBufferedOperations()); |
877 | 13.5k | return pg_client_.RollbackSubTransaction(id); |
878 | 13.5k | } |
879 | | |
880 | 3.67M | void PgSession::UpdateInTxnLimit(uint64_t* read_time) { |
881 | 3.67M | if (!read_time) { |
882 | 0 | return; |
883 | 0 | } |
884 | | |
885 | 3.67M | if (!*read_time) { |
886 | 748k | *read_time = clock_->Now().ToUint64(); |
887 | 748k | } |
888 | 3.67M | in_txn_limit_ = HybridTime(*read_time); |
889 | 3.67M | } |
890 | | |
891 | 1 | Status PgSession::ValidatePlacement(const string& placement_info) { |
892 | 1 | tserver::PgValidatePlacementRequestPB req; |
893 | | |
894 | 1 | Result<PlacementInfoConverter::Placement> result = |
895 | 1 | PlacementInfoConverter::FromString(placement_info); |
896 | | |
897 | | // For validation, if there is no replica_placement option, we default to the |
898 | | // cluster configuration which the user is responsible for maintaining |
899 | 1 | if (!result.ok() && result.status().IsInvalidArgument()0 ) { |
900 | 0 | return Status::OK(); |
901 | 0 | } |
902 | | |
903 | 1 | RETURN_NOT_OK(result); |
904 | | |
905 | 1 | PlacementInfoConverter::Placement placement = result.get(); |
906 | 1 | for (const auto& block : placement.placement_infos) { |
907 | 1 | auto pb = req.add_placement_infos(); |
908 | 1 | pb->set_cloud(block.cloud); |
909 | 1 | pb->set_region(block.region); |
910 | 1 | pb->set_zone(block.zone); |
911 | 1 | pb->set_min_num_replicas(block.min_num_replicas); |
912 | 1 | } |
913 | 1 | req.set_num_replicas(placement.num_replicas); |
914 | | |
915 | 1 | return pg_client_.ValidatePlacement(&req); |
916 | 1 | } |
917 | | |
918 | | Result<PerformFuture> PgSession::RunAsync( |
919 | 8.92M | const OperationGenerator& generator, uint64_t* read_time, bool force_non_bufferable) { |
920 | 8.92M | auto table_op = generator(); |
921 | 8.92M | SCHECK(table_op.operation, IllegalState, "Operation list must not be empty"); |
922 | 8.92M | const auto* table = table_op.table; |
923 | 8.92M | const auto* op = table_op.operation; |
924 | 8.92M | const auto group_session_type = VERIFY_RESULT8.92M (GetRequiredSessionType( |
925 | 8.92M | *pg_txn_manager_, *table, **op)); |
926 | 0 | RunHelper runner(this, group_session_type); |
927 | 20.4M | for (; table_op.operation; table_op = generator()11.5M ) { |
928 | 11.5M | table = table_op.table; |
929 | 11.5M | op = table_op.operation; |
930 | 11.5M | const auto op_session_type = VERIFY_RESULT(GetRequiredSessionType( |
931 | 11.5M | *pg_txn_manager_, *table, **op)); |
932 | 11.5M | SCHECK_EQ(op_session_type, |
933 | 11.5M | group_session_type, |
934 | 11.5M | IllegalState, |
935 | 11.5M | "Operations on different sessions can't be mixed"); |
936 | 11.5M | RETURN_NOT_OK(runner.Apply(*table, *op, read_time, force_non_bufferable)); |
937 | 11.5M | } |
938 | 8.92M | return runner.Flush(); |
939 | 8.92M | } |
940 | | |
941 | | } // namespace pggate |
942 | | } // namespace yb |