/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pggate.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | //-------------------------------------------------------------------------------------------------- |
14 | | |
15 | | #include "yb/yql/pggate/pggate.h" |
16 | | |
17 | | #include <boost/optional.hpp> |
18 | | |
19 | | #include "yb/client/client_fwd.h" |
20 | | #include "yb/client/client.h" |
21 | | #include "yb/client/client_utils.h" |
22 | | #include "yb/client/tablet_server.h" |
23 | | |
24 | | #include "yb/common/partition.h" |
25 | | #include "yb/common/pg_system_attr.h" |
26 | | #include "yb/common/schema.h" |
27 | | |
28 | | #include "yb/docdb/doc_key.h" |
29 | | #include "yb/docdb/primitive_value.h" |
30 | | #include "yb/docdb/value_type.h" |
31 | | |
32 | | #include "yb/gutil/casts.h" |
33 | | |
34 | | #include "yb/rpc/messenger.h" |
35 | | #include "yb/rpc/proxy.h" |
36 | | #include "yb/rpc/secure_stream.h" |
37 | | |
38 | | #include "yb/server/secure.h" |
39 | | |
40 | | #include "yb/tserver/tserver_forward_service.proxy.h" |
41 | | #include "yb/tserver/tserver_shared_mem.h" |
42 | | |
43 | | #include "yb/util/format.h" |
44 | | #include "yb/util/range.h" |
45 | | #include "yb/util/shared_mem.h" |
46 | | #include "yb/util/status_format.h" |
47 | | #include "yb/util/status_log.h" |
48 | | |
49 | | #include "yb/yql/pggate/pg_ddl.h" |
50 | | #include "yb/yql/pggate/pg_delete.h" |
51 | | #include "yb/yql/pggate/pg_insert.h" |
52 | | #include "yb/yql/pggate/pg_memctx.h" |
53 | | #include "yb/yql/pggate/pg_sample.h" |
54 | | #include "yb/yql/pggate/pg_select.h" |
55 | | #include "yb/yql/pggate/pg_truncate_colocated.h" |
56 | | #include "yb/yql/pggate/pg_txn_manager.h" |
57 | | #include "yb/yql/pggate/pg_update.h" |
58 | | #include "yb/yql/pggate/pggate_flags.h" |
59 | | #include "yb/yql/pggate/ybc_pggate.h" |
60 | | |
61 | | using namespace std::literals; |
62 | | |
63 | | DECLARE_string(rpc_bind_addresses); |
64 | | DECLARE_bool(use_node_to_node_encryption); |
65 | | DECLARE_string(certs_dir); |
66 | | DECLARE_bool(node_to_node_encryption_use_client_certificates); |
67 | | DECLARE_bool(ysql_forward_rpcs_to_local_tserver); |
68 | | DECLARE_bool(use_node_hostname_for_local_tserver); |
69 | | DECLARE_int32(backfill_index_client_rpc_timeout_ms); |
70 | | |
71 | | namespace yb { |
72 | | namespace pggate { |
73 | | |
74 | | using docdb::PrimitiveValue; |
75 | | using docdb::ValueType; |
76 | | |
77 | | namespace { |
78 | | |
79 | | CHECKED_STATUS AddColumn(PgCreateTable* pg_stmt, const char *attr_name, int attr_num, |
80 | | const YBCPgTypeEntity *attr_type, bool is_hash, bool is_range, |
81 | 11.7k | bool is_desc, bool is_nulls_first) { |
82 | 11.7k | using SortingType = SortingType; |
83 | 11.7k | SortingType sorting_type = SortingType::kNotSpecified; |
84 | | |
85 | 11.7k | if (!is_hash && is_range9.50k ) { |
86 | 1.28k | if (is_desc) { |
87 | 89 | sorting_type = is_nulls_first ? SortingType::kDescending79 : SortingType::kDescendingNullsLast10 ; |
88 | 1.19k | } else { |
89 | 1.19k | sorting_type = is_nulls_first ? SortingType::kAscending3 : SortingType::kAscendingNullsLast1.19k ; |
90 | 1.19k | } |
91 | 1.28k | } |
92 | | |
93 | 11.7k | return pg_stmt->AddColumn(attr_name, attr_num, attr_type, is_hash, is_range, sorting_type); |
94 | 11.7k | } |
95 | | |
96 | | Result<PgApiContext::MessengerHolder> BuildMessenger( |
97 | | const string& client_name, |
98 | | int32_t num_reactors, |
99 | | const scoped_refptr<MetricEntity>& metric_entity, |
100 | 6.08k | const std::shared_ptr<MemTracker>& parent_mem_tracker) { |
101 | 6.08k | std::unique_ptr<rpc::SecureContext> secure_context; |
102 | 6.08k | if (FLAGS_use_node_to_node_encryption) { |
103 | 14 | secure_context = VERIFY_RESULT(server::CreateSecureContext( |
104 | 14 | FLAGS_certs_dir, |
105 | 14 | server::UseClientCerts(FLAGS_node_to_node_encryption_use_client_certificates))); |
106 | 14 | } |
107 | 6.08k | auto messenger = VERIFY_RESULT(client::CreateClientMessenger( |
108 | 6.08k | client_name, num_reactors, metric_entity, parent_mem_tracker, secure_context.get())); |
109 | 0 | return PgApiContext::MessengerHolder{std::move(secure_context), std::move(messenger)}; |
110 | 6.08k | } |
111 | | |
112 | 6.08k | std::unique_ptr<tserver::TServerSharedObject> InitTServerSharedObject() { |
113 | 6.08k | LOG(INFO) << __func__ << ": " << YBCIsInitDbModeEnvVarSet() << ", " |
114 | 6.08k | << FLAGS_TEST_pggate_ignore_tserver_shm << ", " << FLAGS_pggate_tserver_shm_fd; |
115 | | // Do not use shared memory in initdb or if explicity set to be ignored. |
116 | 6.09k | if (FLAGS_TEST_pggate_ignore_tserver_shm6.08k || FLAGS_pggate_tserver_shm_fd == -1) { |
117 | 0 | return nullptr; |
118 | 0 | } |
119 | 6.08k | return std::make_unique<tserver::TServerSharedObject>(CHECK_RESULT( |
120 | 6.08k | tserver::TServerSharedObject::OpenReadOnly(FLAGS_pggate_tserver_shm_fd))); |
121 | 6.08k | } |
122 | | |
123 | | Result<std::vector<std::string>> FetchExistingYbctids(PgSession::ScopedRefPtr session, |
124 | | PgOid database_id, |
125 | | PgOid table_id, |
126 | 1.97k | const std::vector<Slice>& ybctids) { |
127 | 1.97k | auto desc = VERIFY_RESULT(session->LoadTable(PgObjectId(database_id, table_id))); |
128 | 0 | PgTable target(desc); |
129 | 1.97k | auto read_op = std::make_shared<PgsqlReadOp>(*target); |
130 | 1.97k | PgsqlExpressionPB* expr_pb = read_op->read_request().add_targets(); |
131 | 1.97k | expr_pb->set_column_id(to_underlying(PgSystemAttrNum::kYBTupleId)); |
132 | 1.97k | auto doc_op = std::make_shared<PgDocReadOp>(session, &target, std::move(read_op)); |
133 | | |
134 | | // Postgres uses SELECT FOR KEY SHARE query for FK check. |
135 | | // Use same lock level. |
136 | 1.97k | PgExecParameters exec_params = doc_op->ExecParameters(); |
137 | 1.97k | exec_params.rowmark = ROW_MARK_KEYSHARE; |
138 | 1.97k | RETURN_NOT_OK(doc_op->ExecuteInit(&exec_params)); |
139 | 1.97k | RETURN_NOT_OK(doc_op->PopulateDmlByYbctidOps(ybctids)); |
140 | 1.97k | RETURN_NOT_OK(doc_op->Execute()); |
141 | 1.97k | std::vector<std::string> result; |
142 | 1.97k | result.reserve(ybctids.size()); |
143 | 1.97k | std::list<PgDocResult> rowsets; |
144 | 3.90k | do { |
145 | 3.90k | rowsets.clear(); |
146 | 3.90k | RETURN_NOT_OK(doc_op->GetResult(&rowsets)); |
147 | 3.85k | for (auto& row : rowsets) { |
148 | 2.19k | RETURN_NOT_OK(row.ProcessSystemColumns()); |
149 | 101k | for (const auto& ybctid : row.ybctids())2.19k { |
150 | 101k | result.push_back(ybctid.ToBuffer()); |
151 | 101k | } |
152 | 2.19k | } |
153 | 3.85k | } while (!rowsets.empty()); |
154 | 1.92k | return result; |
155 | 1.97k | } |
156 | | |
157 | | } // namespace |
158 | | |
159 | | using std::make_shared; |
160 | | using client::YBSession; |
161 | | |
162 | | //-------------------------------------------------------------------------------------------------- |
163 | | |
164 | 6.08k | PggateOptions::PggateOptions() : ServerBaseOptions(kDefaultPort) { |
165 | 6.08k | server_type = "tserver"; |
166 | 6.08k | rpc_opts.connection_keepalive_time_ms = FLAGS_pgsql_rpc_keepalive_time_ms; |
167 | | |
168 | 6.08k | if (FLAGS_pggate_proxy_bind_address.empty()) { |
169 | 6.08k | HostPort host_port; |
170 | 6.08k | CHECK_OK(host_port.ParseString(FLAGS_rpc_bind_addresses, 0)); |
171 | 6.08k | host_port.set_port(PggateOptions::kDefaultPort); |
172 | 6.08k | FLAGS_pggate_proxy_bind_address = host_port.ToString(); |
173 | 6.08k | LOG(INFO) << "Reset YSQL bind address to " << FLAGS_pggate_proxy_bind_address; |
174 | 6.08k | } |
175 | 6.08k | rpc_opts.rpc_bind_addresses = FLAGS_pggate_proxy_bind_address; |
176 | 6.08k | master_addresses_flag = FLAGS_pggate_master_addresses; |
177 | | |
178 | 6.08k | server::MasterAddresses master_addresses; |
179 | | // TODO: we might have to allow setting master_replication_factor similarly to how it is done |
180 | | // in tserver to support master auto-discovery on Kubernetes. |
181 | 6.08k | CHECK_OK(server::DetermineMasterAddresses( |
182 | 6.08k | "pggate_master_addresses", master_addresses_flag, /* master_replication_factor */ 0, |
183 | 6.08k | &master_addresses, &master_addresses_flag)); |
184 | 6.08k | SetMasterAddresses(make_shared<server::MasterAddresses>(std::move(master_addresses))); |
185 | 6.08k | } |
186 | | |
187 | | PgApiContext::MessengerHolder::MessengerHolder( |
188 | | std::unique_ptr<rpc::SecureContext> security_context_, |
189 | | std::unique_ptr<rpc::Messenger> messenger_) |
190 | 6.08k | : security_context(std::move(security_context_)), messenger(std::move(messenger_)) { |
191 | 6.08k | } |
192 | | |
193 | | PgApiContext::MessengerHolder::MessengerHolder(MessengerHolder&& rhs) |
194 | | : security_context(std::move(rhs.security_context)), |
195 | 18.2k | messenger(std::move(rhs.messenger)) { |
196 | 18.2k | } |
197 | | |
198 | 24.3k | PgApiContext::MessengerHolder::~MessengerHolder() { |
199 | 24.3k | } |
200 | | |
201 | | PgApiContext::PgApiContext() |
202 | | : metric_registry(new MetricRegistry()), |
203 | | metric_entity(METRIC_ENTITY_server.Instantiate(metric_registry.get(), "yb.pggate")), |
204 | | mem_tracker(MemTracker::CreateTracker("PostgreSQL")), |
205 | | messenger_holder(CHECK_RESULT(BuildMessenger("pggate_ybclient", |
206 | | FLAGS_pggate_ybclient_reactor_threads, |
207 | | metric_entity, |
208 | | mem_tracker))), |
209 | 6.08k | proxy_cache(std::make_unique<rpc::ProxyCache>(messenger_holder.messenger.get())) { |
210 | 6.08k | } |
211 | | |
212 | 0 | PgApiContext::PgApiContext(PgApiContext&&) = default; |
213 | | |
214 | 6.09k | PgApiContext::~PgApiContext() = default; |
215 | | |
216 | | //-------------------------------------------------------------------------------------------------- |
217 | | |
218 | | PgApiImpl::PgApiImpl( |
219 | | PgApiContext context, const YBCPgTypeEntity *YBCDataTypeArray, int count, |
220 | | YBCPgCallbacks callbacks) |
221 | | : metric_registry_(std::move(context.metric_registry)), |
222 | | metric_entity_(std::move(context.metric_entity)), |
223 | | mem_tracker_(std::move(context.mem_tracker)), |
224 | | messenger_holder_(std::move(context.messenger_holder)), |
225 | | proxy_cache_(std::move(context.proxy_cache)), |
226 | | clock_(new server::HybridClock()), |
227 | | tserver_shared_object_(InitTServerSharedObject()), |
228 | | pg_callbacks_(callbacks), |
229 | | pg_txn_manager_( |
230 | | new PgTxnManager( |
231 | 6.09k | &pg_client_, clock_, tserver_shared_object_.get(), pg_callbacks_)) { |
232 | 6.09k | CHECK_OK(clock_->Init()); |
233 | | |
234 | | // Setup type mapping. |
235 | 990k | for (int idx = 0; idx < count; idx++984k ) { |
236 | 984k | const YBCPgTypeEntity *type_entity = &YBCDataTypeArray[idx]; |
237 | 984k | type_map_[type_entity->type_oid] = type_entity; |
238 | 984k | } |
239 | | |
240 | 6.09k | CHECK_OK(pg_client_.Start( |
241 | 6.09k | proxy_cache_.get(), &messenger_holder_.messenger->scheduler(), |
242 | 6.09k | *DCHECK_NOTNULL(tserver_shared_object_))); |
243 | 6.09k | } |
244 | | |
245 | 6.06k | PgApiImpl::~PgApiImpl() { |
246 | 6.06k | messenger_holder_.messenger->Shutdown(); |
247 | 6.06k | pg_txn_manager_.reset(); |
248 | 6.06k | pg_client_.Shutdown(); |
249 | 6.06k | } |
250 | | |
251 | 73.1M | const YBCPgTypeEntity *PgApiImpl::FindTypeEntity(int type_oid) { |
252 | 73.1M | const auto iter = type_map_.find(type_oid); |
253 | 73.1M | if (iter != type_map_.end()) { |
254 | 72.9M | return iter->second; |
255 | 72.9M | } |
256 | 129k | return nullptr; |
257 | 73.1M | } |
258 | | |
259 | | //-------------------------------------------------------------------------------------------------- |
260 | | |
261 | 0 | Status PgApiImpl::CreateEnv(PgEnv **pg_env) { |
262 | 0 | *pg_env = pg_env_.get(); |
263 | 0 | return Status::OK(); |
264 | 0 | } |
265 | | |
266 | 0 | Status PgApiImpl::DestroyEnv(PgEnv *pg_env) { |
267 | 0 | pg_env_ = nullptr; |
268 | 0 | return Status::OK(); |
269 | 0 | } |
270 | | |
271 | | //-------------------------------------------------------------------------------------------------- |
272 | | |
273 | | Status PgApiImpl::InitSession(const PgEnv *pg_env, |
274 | 6.09k | const string& database_name) { |
275 | 6.09k | CHECK(!pg_session_); |
276 | 6.09k | auto session = make_scoped_refptr<PgSession>(&pg_client_, |
277 | 6.09k | database_name, |
278 | 6.09k | pg_txn_manager_, |
279 | 6.09k | clock_, |
280 | 6.09k | tserver_shared_object_.get(), |
281 | 6.09k | pg_callbacks_); |
282 | 6.09k | if (!database_name.empty()) { |
283 | 6.07k | RETURN_NOT_OK(session->ConnectDatabase(database_name)); |
284 | 6.07k | } |
285 | | |
286 | 6.09k | pg_session_.swap(session); |
287 | 6.09k | return Status::OK(); |
288 | 6.09k | } |
289 | | |
290 | 1.23k | Status PgApiImpl::InvalidateCache() { |
291 | 1.23k | pg_session_->InvalidateAllTablesCache(); |
292 | 1.23k | return Status::OK(); |
293 | 1.23k | } |
294 | | |
295 | 147 | bool PgApiImpl::GetDisableTransparentCacheRefreshRetry() { |
296 | 147 | return FLAGS_TEST_ysql_disable_transparent_cache_refresh_retry; |
297 | 147 | } |
298 | | |
299 | | //-------------------------------------------------------------------------------------------------- |
300 | | |
301 | 676k | PgMemctx *PgApiImpl::CreateMemctx() { |
302 | | // Postgres will create YB Memctx when it first use the Memctx to allocate YugaByte object. |
303 | 676k | return PgMemctx::Create(); |
304 | 676k | } |
305 | | |
306 | 668k | Status PgApiImpl::DestroyMemctx(PgMemctx *memctx) { |
307 | | // Postgres will destroy YB Memctx by releasing the pointer. |
308 | 668k | return PgMemctx::Destroy(memctx); |
309 | 668k | } |
310 | | |
311 | 3.95M | Status PgApiImpl::ResetMemctx(PgMemctx *memctx) { |
312 | | // Postgres reset YB Memctx when clearing a context content without clearing its nested context. |
313 | 3.95M | return PgMemctx::Reset(memctx); |
314 | 3.95M | } |
315 | | |
316 | | // TODO(neil) Use Arena in the future. |
317 | | // - PgStatement should have been declared as derived class of "MCBase". |
318 | | // - All objects of PgStatement's derived class should be allocated by YbPgMemctx::Arena. |
319 | | // - We cannot use Arena yet because quite a large number of YugaByte objects are being referenced |
320 | | // from other layers. Those added code violated the original design as they assume ScopedPtr |
321 | | // instead of memory pool is being used. This mess should be cleaned up later. |
322 | | // |
323 | | // For now, statements is allocated as ScopedPtr and cached in the memory context. The statements |
324 | | // would then be destructed when the context is destroyed and all other references are also cleared. |
325 | | Status PgApiImpl::AddToCurrentPgMemctx(std::unique_ptr<PgStatement> stmt, |
326 | 8.80M | PgStatement **handle) { |
327 | 8.80M | *handle = stmt.get(); |
328 | 8.80M | pg_callbacks_.GetCurrentYbMemctx()->Register(stmt.release()); |
329 | 8.80M | return Status::OK(); |
330 | 8.80M | } |
331 | | |
332 | | // TODO(neil) Most like we don't need table_desc. If we do need it, use Arena here. |
333 | | // - PgTableDesc should have been declared as derived class of "MCBase". |
334 | | // - PgTableDesc objects should be allocated by YbPgMemctx::Arena. |
335 | | // |
336 | | // For now, table_desc is allocated as ScopedPtr and cached in the memory context. The table_desc |
337 | | // would then be destructed when the context is destroyed. |
338 | | Status PgApiImpl::AddToCurrentPgMemctx(size_t table_desc_id, |
339 | 2.40M | const PgTableDescPtr &table_desc) { |
340 | 2.40M | pg_callbacks_.GetCurrentYbMemctx()->Cache(table_desc_id, table_desc); |
341 | 2.40M | return Status::OK(); |
342 | 2.40M | } |
343 | | |
344 | 16.4M | Status PgApiImpl::GetTabledescFromCurrentPgMemctx(size_t table_desc_id, PgTableDesc **handle) { |
345 | 16.4M | pg_callbacks_.GetCurrentYbMemctx()->GetCache(table_desc_id, handle); |
346 | 16.4M | return Status::OK(); |
347 | 16.4M | } |
348 | | |
349 | | //-------------------------------------------------------------------------------------------------- |
350 | | |
351 | 0 | Status PgApiImpl::CreateSequencesDataTable() { |
352 | 0 | return pg_session_->CreateSequencesDataTable(); |
353 | 0 | } |
354 | | |
355 | | Status PgApiImpl::InsertSequenceTuple(int64_t db_oid, |
356 | | int64_t seq_oid, |
357 | | uint64_t ysql_catalog_version, |
358 | | int64_t last_val, |
359 | 295 | bool is_called) { |
360 | 295 | return pg_session_->InsertSequenceTuple( |
361 | 295 | db_oid, seq_oid, ysql_catalog_version, last_val, is_called); |
362 | 295 | } |
363 | | |
364 | | Status PgApiImpl::UpdateSequenceTupleConditionally(int64_t db_oid, |
365 | | int64_t seq_oid, |
366 | | uint64_t ysql_catalog_version, |
367 | | int64_t last_val, |
368 | | bool is_called, |
369 | | int64_t expected_last_val, |
370 | | bool expected_is_called, |
371 | 2.95k | bool *skipped) { |
372 | 2.95k | *skipped = VERIFY_RESULT(pg_session_->UpdateSequenceTuple( |
373 | 0 | db_oid, seq_oid, ysql_catalog_version, last_val, is_called, |
374 | 0 | expected_last_val, expected_is_called)); |
375 | 0 | return Status::OK(); |
376 | 2.95k | } |
377 | | |
378 | | Status PgApiImpl::UpdateSequenceTuple(int64_t db_oid, |
379 | | int64_t seq_oid, |
380 | | uint64_t ysql_catalog_version, |
381 | | int64_t last_val, |
382 | | bool is_called, |
383 | 29 | bool* skipped) { |
384 | 29 | bool result = VERIFY_RESULT(pg_session_->UpdateSequenceTuple( |
385 | 29 | db_oid, seq_oid, ysql_catalog_version, last_val, |
386 | 29 | is_called, boost::none, boost::none)); |
387 | 29 | if (skipped) { |
388 | 19 | *skipped = result; |
389 | 19 | } |
390 | 29 | return Status::OK(); |
391 | 29 | } |
392 | | |
393 | | Status PgApiImpl::ReadSequenceTuple(int64_t db_oid, |
394 | | int64_t seq_oid, |
395 | | uint64_t ysql_catalog_version, |
396 | | int64_t *last_val, |
397 | 3.23k | bool *is_called) { |
398 | 3.23k | auto res = VERIFY_RESULT(pg_session_->ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version)); |
399 | 3.23k | if (last_val) { |
400 | 3.23k | *last_val = res.first; |
401 | 3.23k | } |
402 | 3.23k | if (is_called) { |
403 | 3.23k | *is_called = res.second; |
404 | 3.23k | } |
405 | 3.23k | return Status::OK(); |
406 | 3.23k | } |
407 | | |
408 | 282 | Status PgApiImpl::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { |
409 | 282 | return pg_session_->DeleteSequenceTuple(db_oid, seq_oid); |
410 | 282 | } |
411 | | |
412 | | |
413 | | //-------------------------------------------------------------------------------------------------- |
414 | | |
415 | 7.61M | void PgApiImpl::DeleteStatement(PgStatement *handle) { |
416 | 7.61M | if (handle7.61M ) { |
417 | 7.61M | PgMemctx::Destroy(handle); |
418 | 7.61M | } |
419 | 7.61M | } |
420 | | |
421 | | //-------------------------------------------------------------------------------------------------- |
422 | | |
423 | 8 | Status PgApiImpl::ConnectDatabase(const char *database_name) { |
424 | 8 | return pg_session_->ConnectDatabase(database_name); |
425 | 8 | } |
426 | | |
427 | 5.72k | Status PgApiImpl::IsDatabaseColocated(const PgOid database_oid, bool *colocated) { |
428 | 5.72k | return pg_session_->IsDatabaseColocated(database_oid, colocated); |
429 | 5.72k | } |
430 | | |
431 | | Status PgApiImpl::NewCreateDatabase(const char *database_name, |
432 | | const PgOid database_oid, |
433 | | const PgOid source_database_oid, |
434 | | const PgOid next_oid, |
435 | | const bool colocated, |
436 | 134 | PgStatement **handle) { |
437 | 134 | auto stmt = std::make_unique<PgCreateDatabase>(pg_session_, database_name, database_oid, |
438 | 134 | source_database_oid, next_oid, colocated); |
439 | 134 | if (pg_txn_manager_->IsDdlMode()) { |
440 | 114 | stmt->UseTransaction(); |
441 | 114 | } |
442 | 134 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
443 | 134 | return Status::OK(); |
444 | 134 | } |
445 | | |
446 | 134 | Status PgApiImpl::ExecCreateDatabase(PgStatement *handle) { |
447 | 134 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_DATABASE)) { |
448 | | // Invalid handle. |
449 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
450 | 0 | } |
451 | | |
452 | 134 | return down_cast<PgCreateDatabase*>(handle)->Exec(); |
453 | 134 | } |
454 | | |
455 | | Status PgApiImpl::NewDropDatabase(const char *database_name, |
456 | | PgOid database_oid, |
457 | 72 | PgStatement **handle) { |
458 | 72 | auto stmt = std::make_unique<PgDropDatabase>(pg_session_, database_name, database_oid); |
459 | 72 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
460 | 72 | return Status::OK(); |
461 | 72 | } |
462 | | |
463 | 72 | Status PgApiImpl::ExecDropDatabase(PgStatement *handle) { |
464 | 72 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_DATABASE)) { |
465 | | // Invalid handle. |
466 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
467 | 0 | } |
468 | 72 | return down_cast<PgDropDatabase*>(handle)->Exec(); |
469 | 72 | } |
470 | | |
471 | | Status PgApiImpl::NewAlterDatabase(const char *database_name, |
472 | | PgOid database_oid, |
473 | 3 | PgStatement **handle) { |
474 | 3 | auto stmt = std::make_unique<PgAlterDatabase>(pg_session_, database_name, database_oid); |
475 | 3 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
476 | 3 | return Status::OK(); |
477 | 3 | } |
478 | | |
479 | 3 | Status PgApiImpl::AlterDatabaseRenameDatabase(PgStatement *handle, const char *newname) { |
480 | 3 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) { |
481 | | // Invalid handle. |
482 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
483 | 0 | } |
484 | 3 | down_cast<PgAlterDatabase*>(handle)->RenameDatabase(newname); |
485 | 3 | return Status::OK(); |
486 | 3 | } |
487 | | |
488 | 3 | Status PgApiImpl::ExecAlterDatabase(PgStatement *handle) { |
489 | 3 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) { |
490 | | // Invalid handle. |
491 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
492 | 0 | } |
493 | 3 | return down_cast<PgAlterDatabase*>(handle)->Exec(); |
494 | 3 | } |
495 | | |
496 | | Status PgApiImpl::ReserveOids(const PgOid database_oid, |
497 | | const PgOid next_oid, |
498 | | const uint32_t count, |
499 | | PgOid *begin_oid, |
500 | 805 | PgOid *end_oid) { |
501 | 805 | auto p = VERIFY_RESULT(pg_client_.ReserveOids(database_oid, next_oid, count)); |
502 | 0 | *begin_oid = p.first; |
503 | 805 | *end_oid = p.second; |
504 | 805 | return Status::OK(); |
505 | 805 | } |
506 | | |
507 | 22 | Status PgApiImpl::GetCatalogMasterVersion(uint64_t *version) { |
508 | 22 | return pg_session_->GetCatalogMasterVersion(version); |
509 | 22 | } |
510 | | |
511 | 17.4k | Result<PgTableDescPtr> PgApiImpl::LoadTable(const PgObjectId& table_id) { |
512 | 17.4k | return pg_session_->LoadTable(table_id); |
513 | 17.4k | } |
514 | | |
515 | 9 | void PgApiImpl::InvalidateTableCache(const PgObjectId& table_id) { |
516 | 9 | pg_session_->InvalidateTableCache(table_id, InvalidateOnPgClient::kTrue); |
517 | 9 | } |
518 | | |
519 | | //-------------------------------------------------------------------------------------------------- |
520 | | |
521 | | Status PgApiImpl::NewCreateTablegroup(const char *database_name, |
522 | | const PgOid database_oid, |
523 | | const PgOid tablegroup_oid, |
524 | | const PgOid tablespace_oid, |
525 | 54 | PgStatement **handle) { |
526 | 54 | auto stmt = std::make_unique<PgCreateTablegroup>(pg_session_, database_name, |
527 | 54 | database_oid, tablegroup_oid, tablespace_oid); |
528 | 54 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
529 | 54 | return Status::OK(); |
530 | 54 | } |
531 | | |
532 | 54 | Status PgApiImpl::ExecCreateTablegroup(PgStatement *handle) { |
533 | 54 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLEGROUP)) { |
534 | | // Invalid handle. |
535 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
536 | 0 | } |
537 | | |
538 | 54 | return down_cast<PgCreateTablegroup*>(handle)->Exec(); |
539 | 54 | } |
540 | | |
541 | | Status PgApiImpl::NewDropTablegroup(const PgOid database_oid, |
542 | | const PgOid tablegroup_oid, |
543 | 39 | PgStatement **handle) { |
544 | 39 | auto stmt = std::make_unique<PgDropTablegroup>(pg_session_, database_oid, tablegroup_oid); |
545 | 39 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
546 | 39 | return Status::OK(); |
547 | 39 | } |
548 | | |
549 | | |
550 | 0 | Status PgApiImpl::ExecDropTablegroup(PgStatement *handle) { |
551 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLEGROUP)) { |
552 | | // Invalid handle. |
553 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
554 | 0 | } |
555 | 0 | return down_cast<PgDropTablegroup*>(handle)->Exec(); |
556 | 0 | } |
557 | | |
558 | | |
559 | | //-------------------------------------------------------------------------------------------------- |
560 | | |
561 | | Status PgApiImpl::NewCreateTable(const char *database_name, |
562 | | const char *schema_name, |
563 | | const char *table_name, |
564 | | const PgObjectId& table_id, |
565 | | bool is_shared_table, |
566 | | bool if_not_exist, |
567 | | bool add_primary_key, |
568 | | const bool colocated, |
569 | | const PgObjectId& tablegroup_oid, |
570 | | const ColocationId colocation_id, |
571 | | const PgObjectId& tablespace_oid, |
572 | | const PgObjectId& matview_pg_table_oid, |
573 | 4.28k | PgStatement **handle) { |
574 | 4.28k | auto stmt = std::make_unique<PgCreateTable>( |
575 | 4.28k | pg_session_, database_name, schema_name, table_name, |
576 | 4.28k | table_id, is_shared_table, if_not_exist, add_primary_key, colocated, tablegroup_oid, |
577 | 4.28k | colocation_id, tablespace_oid, matview_pg_table_oid); |
578 | 4.28k | if (pg_txn_manager_->IsDdlMode()) { |
579 | 4.13k | stmt->UseTransaction(); |
580 | 4.13k | } |
581 | 4.28k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
582 | 4.28k | return Status::OK(); |
583 | 4.28k | } |
584 | | |
585 | | Status PgApiImpl::CreateTableAddColumn(PgStatement *handle, const char *attr_name, int attr_num, |
586 | | const YBCPgTypeEntity *attr_type, |
587 | | bool is_hash, bool is_range, |
588 | 10.4k | bool is_desc, bool is_nulls_first) { |
589 | 10.4k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) { |
590 | | // Invalid handle. |
591 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
592 | 0 | } |
593 | 10.4k | return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type, |
594 | 10.4k | is_hash, is_range, is_desc, is_nulls_first); |
595 | 10.4k | } |
596 | | |
597 | 340 | Status PgApiImpl::CreateTableSetNumTablets(PgStatement *handle, int32_t num_tablets) { |
598 | 340 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) { |
599 | | // Invalid handle. |
600 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
601 | 0 | } |
602 | 340 | return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets); |
603 | 340 | } |
604 | | |
605 | 181 | Status PgApiImpl::AddSplitBoundary(PgStatement *handle, PgExpr **exprs, int expr_count) { |
606 | | // Partitioning a TABLE or an INDEX. |
607 | 181 | if (PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE) || |
608 | 181 | PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)28 ) { |
609 | 181 | return down_cast<PgCreateTable*>(handle)->AddSplitBoundary(exprs, expr_count); |
610 | 181 | } |
611 | | |
612 | | // Invalid handle. |
613 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
614 | 181 | } |
615 | | |
616 | 4.19k | Status PgApiImpl::ExecCreateTable(PgStatement *handle) { |
617 | 4.19k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) { |
618 | | // Invalid handle. |
619 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
620 | 0 | } |
621 | 4.19k | return down_cast<PgCreateTable*>(handle)->Exec(); |
622 | 4.19k | } |
623 | | |
624 | | Status PgApiImpl::NewAlterTable(const PgObjectId& table_id, |
625 | 1.71k | PgStatement **handle) { |
626 | 1.71k | auto stmt = std::make_unique<PgAlterTable>(pg_session_, table_id); |
627 | 1.71k | if (pg_txn_manager_->IsDdlMode()) { |
628 | 1.71k | stmt->UseTransaction(); |
629 | 1.71k | } |
630 | 1.71k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
631 | 1.71k | return Status::OK(); |
632 | 1.71k | } |
633 | | |
634 | | Status PgApiImpl::AlterTableAddColumn(PgStatement *handle, const char *name, |
635 | 238 | int order, const YBCPgTypeEntity *attr_type) { |
636 | 238 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
637 | | // Invalid handle. |
638 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
639 | 0 | } |
640 | | |
641 | 238 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
642 | 238 | return pg_stmt->AddColumn(name, attr_type, order); |
643 | 238 | } |
644 | | |
645 | | Status PgApiImpl::AlterTableRenameColumn(PgStatement *handle, const char *oldname, |
646 | 17 | const char *newname) { |
647 | 17 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
648 | | // Invalid handle. |
649 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
650 | 0 | } |
651 | | |
652 | 17 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
653 | 17 | return pg_stmt->RenameColumn(oldname, newname); |
654 | 17 | } |
655 | | |
656 | 408 | Status PgApiImpl::AlterTableDropColumn(PgStatement *handle, const char *name) { |
657 | 408 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
658 | | // Invalid handle. |
659 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
660 | 0 | } |
661 | | |
662 | 408 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
663 | 408 | return pg_stmt->DropColumn(name); |
664 | 408 | } |
665 | | |
666 | | Status PgApiImpl::AlterTableRenameTable(PgStatement *handle, const char *db_name, |
667 | 115 | const char *newname) { |
668 | 115 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
669 | | // Invalid handle. |
670 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
671 | 0 | } |
672 | | |
673 | 115 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
674 | 115 | return pg_stmt->RenameTable(db_name, newname); |
675 | 115 | } |
676 | | |
677 | 522 | Status PgApiImpl::ExecAlterTable(PgStatement *handle) { |
678 | 522 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
679 | | // Invalid handle. |
680 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
681 | 0 | } |
682 | 522 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
683 | 522 | return pg_stmt->Exec(); |
684 | 522 | } |
685 | | |
686 | | Status PgApiImpl::NewDropTable(const PgObjectId& table_id, |
687 | | bool if_exist, |
688 | 3.49k | PgStatement **handle) { |
689 | 3.49k | auto stmt = std::make_unique<PgDropTable>(pg_session_, table_id, if_exist); |
690 | 3.49k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
691 | 3.49k | return Status::OK(); |
692 | 3.49k | } |
693 | | |
694 | | Status PgApiImpl::NewTruncateTable(const PgObjectId& table_id, |
695 | 624 | PgStatement **handle) { |
696 | 624 | auto stmt = std::make_unique<PgTruncateTable>(pg_session_, table_id); |
697 | 624 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
698 | 624 | return Status::OK(); |
699 | 624 | } |
700 | | |
701 | 624 | Status PgApiImpl::ExecTruncateTable(PgStatement *handle) { |
702 | 624 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE_TABLE)) { |
703 | | // Invalid handle. |
704 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
705 | 0 | } |
706 | 624 | return down_cast<PgTruncateTable*>(handle)->Exec(); |
707 | 624 | } |
708 | | |
709 | | Status PgApiImpl::GetTableDesc(const PgObjectId& table_id, |
710 | 16.4M | PgTableDesc **handle) { |
711 | | // First read from memory context. |
712 | 16.4M | size_t hash_id = hash_value(table_id); |
713 | 16.4M | RETURN_NOT_OK(GetTabledescFromCurrentPgMemctx(hash_id, handle)); |
714 | | |
715 | | // Read from environment. |
716 | 16.4M | if (*handle == nullptr) { |
717 | 2.40M | auto result = pg_session_->LoadTable(table_id); |
718 | 2.40M | RETURN_NOT_OK(result); |
719 | 2.40M | RETURN_NOT_OK(AddToCurrentPgMemctx(hash_id, *result)); |
720 | | |
721 | 2.40M | *handle = result->get(); |
722 | 2.40M | } |
723 | | |
724 | 16.4M | return Status::OK(); |
725 | 16.4M | } |
726 | | |
727 | | Result<YBCPgColumnInfo> PgApiImpl::GetColumnInfo(YBCPgTableDesc table_desc, |
728 | 77.7M | int16_t attr_number) { |
729 | 77.7M | return table_desc->GetColumnInfo(attr_number); |
730 | 77.7M | } |
731 | | |
732 | 33.8k | Status PgApiImpl::DmlModifiesRow(PgStatement *handle, bool *modifies_row) { |
733 | 33.8k | if (!handle) { |
734 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
735 | 0 | } |
736 | | |
737 | 33.8k | *modifies_row = false; |
738 | | |
739 | 33.8k | switch (handle->stmt_op()) { |
740 | 188 | case StmtOp::STMT_UPDATE: |
741 | 188 | case StmtOp::STMT_DELETE: |
742 | 188 | *modifies_row = true; |
743 | 188 | break; |
744 | 33.7k | default: |
745 | 33.7k | break; |
746 | 33.8k | } |
747 | | |
748 | 33.8k | return Status::OK(); |
749 | 33.8k | } |
750 | | |
751 | 14 | Status PgApiImpl::SetIsSysCatalogVersionChange(PgStatement *handle) { |
752 | 14 | if (!handle) { |
753 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
754 | 0 | } |
755 | | |
756 | 14 | switch (handle->stmt_op()) { |
757 | 12 | case StmtOp::STMT_UPDATE: |
758 | 12 | case StmtOp::STMT_DELETE: |
759 | 14 | case StmtOp::STMT_INSERT: |
760 | 14 | down_cast<PgDmlWrite *>(handle)->SetIsSystemCatalogChange(); |
761 | 14 | return Status::OK(); |
762 | 0 | default: |
763 | 0 | break; |
764 | 14 | } |
765 | | |
766 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
767 | 14 | } |
768 | | |
769 | 7.56M | Status PgApiImpl::SetCatalogCacheVersion(PgStatement *handle, uint64_t catalog_cache_version) { |
770 | 7.56M | if (!handle) { |
771 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
772 | 0 | } |
773 | | |
774 | 7.56M | switch (handle->stmt_op()) { |
775 | 368k | case StmtOp::STMT_SELECT: |
776 | 5.96M | case StmtOp::STMT_INSERT: |
777 | 6.65M | case StmtOp::STMT_UPDATE: |
778 | 7.56M | case StmtOp::STMT_DELETE: |
779 | 7.56M | down_cast<PgDml *>(handle)->SetCatalogCacheVersion(catalog_cache_version); |
780 | 7.56M | return Status::OK(); |
781 | 0 | default: |
782 | 0 | break; |
783 | 7.56M | } |
784 | | |
785 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
786 | 7.56M | } |
787 | | |
788 | | //-------------------------------------------------------------------------------------------------- |
789 | | |
790 | | Status PgApiImpl::NewCreateIndex(const char *database_name, |
791 | | const char *schema_name, |
792 | | const char *index_name, |
793 | | const PgObjectId& index_id, |
794 | | const PgObjectId& base_table_id, |
795 | | bool is_shared_index, |
796 | | bool is_unique_index, |
797 | | const bool skip_index_backfill, |
798 | | bool if_not_exist, |
799 | | const PgObjectId& tablegroup_oid, |
800 | | const YBCPgOid& colocation_id, |
801 | | const PgObjectId& tablespace_oid, |
802 | 868 | PgStatement **handle) { |
803 | 868 | auto stmt = std::make_unique<PgCreateTable>( |
804 | 868 | pg_session_, database_name, schema_name, index_name, index_id, is_shared_index, |
805 | 868 | if_not_exist, false /* add_primary_key */, |
806 | 868 | tablegroup_oid.IsValid() ? false21 : true847 /* colocated */, tablegroup_oid, colocation_id, |
807 | 868 | tablespace_oid, PgObjectId() /* matview_pg_table_id */); |
808 | 868 | stmt->SetupIndex(base_table_id, is_unique_index, skip_index_backfill); |
809 | 868 | if (pg_txn_manager_->IsDdlMode()) { |
810 | 756 | stmt->UseTransaction(); |
811 | 756 | } |
812 | 868 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
813 | 868 | return Status::OK(); |
814 | 868 | } |
815 | | |
816 | | Status PgApiImpl::CreateIndexAddColumn(PgStatement *handle, const char *attr_name, int attr_num, |
817 | | const YBCPgTypeEntity *attr_type, |
818 | | bool is_hash, bool is_range, |
819 | 1.24k | bool is_desc, bool is_nulls_first) { |
820 | 1.24k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) { |
821 | | // Invalid handle. |
822 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
823 | 0 | } |
824 | | |
825 | 1.24k | return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type, |
826 | 1.24k | is_hash, is_range, is_desc, is_nulls_first); |
827 | 1.24k | } |
828 | | |
829 | 13 | Status PgApiImpl::CreateIndexSetNumTablets(PgStatement *handle, int32_t num_tablets) { |
830 | 13 | SCHECK(PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX), |
831 | 13 | InvalidArgument, |
832 | 13 | "Invalid statement handle"); |
833 | 13 | return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets); |
834 | 13 | } |
835 | | |
836 | 862 | Status PgApiImpl::ExecCreateIndex(PgStatement *handle) { |
837 | 862 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) { |
838 | | // Invalid handle. |
839 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
840 | 0 | } |
841 | 862 | return down_cast<PgCreateTable*>(handle)->Exec(); |
842 | 862 | } |
843 | | |
844 | | Status PgApiImpl::NewDropIndex(const PgObjectId& index_id, |
845 | | bool if_exist, |
846 | 672 | PgStatement **handle) { |
847 | 672 | auto stmt = std::make_unique<PgDropIndex>(pg_session_, index_id, if_exist); |
848 | 672 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
849 | 672 | return Status::OK(); |
850 | 672 | } |
851 | | |
852 | 4.18k | Status PgApiImpl::ExecPostponedDdlStmt(PgStatement *handle) { |
853 | 4.18k | if (!handle) { |
854 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
855 | 0 | } |
856 | | |
857 | 4.18k | switch (handle->stmt_op()) { |
858 | 3.48k | case StmtOp::STMT_DROP_TABLE: |
859 | 3.48k | return down_cast<PgDropTable*>(handle)->Exec(); |
860 | 669 | case StmtOp::STMT_DROP_INDEX: |
861 | 669 | return down_cast<PgDropIndex*>(handle)->Exec(); |
862 | 39 | case StmtOp::STMT_DROP_TABLEGROUP: |
863 | 39 | return down_cast<PgDropTablegroup*>(handle)->Exec(); |
864 | | |
865 | 0 | default: |
866 | 0 | break; |
867 | 4.18k | } |
868 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
869 | 4.18k | } |
870 | | |
871 | 540 | Status PgApiImpl::BackfillIndex(const PgObjectId& table_id) { |
872 | 540 | tserver::PgBackfillIndexRequestPB req; |
873 | 540 | table_id.ToPB(req.mutable_table_id()); |
874 | 540 | return pg_session_->pg_client().BackfillIndex( |
875 | 540 | &req, CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms); |
876 | 540 | } |
877 | | |
878 | | //-------------------------------------------------------------------------------------------------- |
879 | | // DML Statment Support. |
880 | | //-------------------------------------------------------------------------------------------------- |
881 | | |
882 | | // Binding ----------------------------------------------------------------------------------------- |
883 | | |
884 | 22.3M | Status PgApiImpl::DmlAppendTarget(PgStatement *handle, PgExpr *target) { |
885 | 22.3M | return down_cast<PgDml*>(handle)->AppendTarget(target); |
886 | 22.3M | } |
887 | | |
888 | 22 | Status PgApiImpl::DmlAppendQual(PgStatement *handle, PgExpr *qual) { |
889 | 22 | return down_cast<PgDml*>(handle)->AppendQual(qual); |
890 | 22 | } |
891 | | |
892 | 11.9k | Status PgApiImpl::DmlAppendColumnRef(PgStatement *handle, PgExpr *colref) { |
893 | 11.9k | return down_cast<PgDml*>(handle)->AppendColumnRef(colref); |
894 | 11.9k | } |
895 | | |
896 | 31.1M | Status PgApiImpl::DmlBindColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) { |
897 | 31.1M | return down_cast<PgDml*>(handle)->BindColumn(attr_num, attr_value); |
898 | 31.1M | } |
899 | | |
900 | | Status PgApiImpl::DmlBindColumnCondBetween(PgStatement *handle, int attr_num, PgExpr *attr_value, |
901 | 306 | PgExpr *attr_value_end) { |
902 | 306 | return down_cast<PgDmlRead*>(handle)->BindColumnCondBetween(attr_num, attr_value, attr_value_end); |
903 | 306 | } |
904 | | |
905 | | Status PgApiImpl::DmlBindColumnCondIn(PgStatement *handle, int attr_num, int n_attr_values, |
906 | 92.3k | PgExpr **attr_values) { |
907 | 92.3k | return down_cast<PgDmlRead*>(handle)->BindColumnCondIn(attr_num, n_attr_values, attr_values); |
908 | 92.3k | } |
909 | | |
910 | | Status PgApiImpl::DmlAddRowUpperBound(YBCPgStatement handle, |
911 | 90 | int n_col_values, PgExpr **col_values, bool is_inclusive) { |
912 | 90 | return down_cast<PgDmlRead*>(handle)->AddRowUpperBound(handle, |
913 | 90 | n_col_values, |
914 | 90 | col_values, |
915 | 90 | is_inclusive); |
916 | 90 | } |
917 | | |
918 | | Status PgApiImpl::DmlAddRowLowerBound(YBCPgStatement handle, |
919 | 88 | int n_col_values, PgExpr **col_values, bool is_inclusive) { |
920 | 88 | return down_cast<PgDmlRead*>(handle)->AddRowLowerBound(handle, |
921 | 88 | n_col_values, |
922 | 88 | col_values, |
923 | 88 | is_inclusive); |
924 | 88 | } |
925 | | |
926 | | Status PgApiImpl::DmlBindHashCode(PgStatement *handle, bool start_valid, |
927 | | bool start_inclusive, |
928 | | uint64_t start_hash_val, bool end_valid, |
929 | 79 | bool end_inclusive, uint64_t end_hash_val) { |
930 | 79 | return down_cast<PgDmlRead*>(handle) |
931 | 79 | ->BindHashCode(start_valid, start_inclusive, start_hash_val, |
932 | 79 | end_valid, end_inclusive, end_hash_val); |
933 | 79 | } |
934 | | |
935 | 90 | Status PgApiImpl::DmlBindTable(PgStatement *handle) { |
936 | 90 | return down_cast<PgDml*>(handle)->BindTable(); |
937 | 90 | } |
938 | | |
939 | 6.19M | Result<YBCPgColumnInfo> PgApiImpl::DmlGetColumnInfo(YBCPgStatement handle, int attr_num) { |
940 | 6.19M | return down_cast<PgDml*>(handle)->GetColumnInfo(attr_num); |
941 | 6.19M | } |
942 | | |
943 | 983k | CHECKED_STATUS PgApiImpl::DmlAssignColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) { |
944 | 983k | return down_cast<PgDml*>(handle)->AssignColumn(attr_num, attr_value); |
945 | 983k | } |
946 | | |
947 | | Status PgApiImpl::DmlFetch(PgStatement *handle, int32_t natts, uint64_t *values, bool *isnulls, |
948 | 53.1M | PgSysColumns *syscols, bool *has_data) { |
949 | 53.1M | return down_cast<PgDml*>(handle)->Fetch(natts, values, isnulls, syscols, has_data); |
950 | 53.1M | } |
951 | | |
952 | | Status PgApiImpl::ProcessYBTupleId(const YBCPgYBTupleIdDescriptor& descr, |
953 | 5.07M | const YBTupleIdProcessor& processor) { |
954 | 5.07M | auto target_desc = VERIFY_RESULT(pg_session_->LoadTable( |
955 | 5.07M | PgObjectId(descr.database_oid, descr.table_oid))); |
956 | 5.07M | SCHECK_EQ(descr.nattrs, target_desc->num_key_columns(), Corruption, |
957 | 5.07M | "Number of key components does not match column description"); |
958 | 5.07M | vector<PrimitiveValue> *values = nullptr; |
959 | 5.07M | PgsqlExpressionPB *expr_pb; |
960 | 5.07M | PgsqlExpressionPB temp_expr_pb; |
961 | 5.07M | google::protobuf::RepeatedPtrField<PgsqlExpressionPB> hashed_values; |
962 | 5.07M | vector<docdb::PrimitiveValue> hashed_components, range_components; |
963 | 5.07M | hashed_components.reserve(target_desc->num_hash_key_columns()); |
964 | 5.07M | range_components.reserve(target_desc->num_key_columns() - target_desc->num_hash_key_columns()); |
965 | 5.07M | size_t remain_attr = descr.nattrs; |
966 | | // DocDB API requires that partition columns must be listed in their created-order. |
967 | | // Order from target_desc should be used as attributes sequence may have different order. |
968 | 6.48M | for (size_t i : Range(target_desc->schema().columns().size())) { |
969 | 6.48M | PgColumn column(target_desc->schema(), i); |
970 | 8.76M | for (auto attr = descr.attrs, end = descr.attrs + descr.nattrs; attr != end; ++attr2.27M ) { |
971 | 8.76M | if (attr->attr_num == column.attr_num()) { |
972 | 6.48M | if (!column.is_primary()) { |
973 | 0 | return STATUS_SUBSTITUTE( |
974 | 0 | InvalidArgument, "Attribute number $0 not a primary attribute", attr->attr_num); |
975 | 0 | } |
976 | 6.48M | if (column.is_partition()) { |
977 | | // Hashed component. |
978 | 4.48M | values = &hashed_components; |
979 | 4.48M | expr_pb = hashed_values.Add(); |
980 | 4.48M | } else { |
981 | | // Range component. |
982 | 2.00M | values = &range_components; |
983 | 2.00M | expr_pb = &temp_expr_pb; |
984 | 2.00M | } |
985 | | |
986 | 6.48M | if (attr->is_null) { |
987 | 164k | values->emplace_back(ValueType::kNullLow); |
988 | 6.32M | } else { |
989 | 6.32M | if (attr->attr_num == to_underlying(PgSystemAttrNum::kYBRowId)) { |
990 | 1.22M | expr_pb->mutable_value()->set_binary_value(pg_session_->GenerateNewRowid()); |
991 | 5.09M | } else { |
992 | 5.09M | const YBCPgCollationInfo& collation_info = attr->collation_info; |
993 | 5.09M | PgConstant value( |
994 | 5.09M | attr->type_entity, collation_info.collate_is_valid_non_c, |
995 | 5.09M | collation_info.sortkey, attr->datum, false); |
996 | 5.09M | SCHECK_EQ(column.internal_type(), value.internal_type(), Corruption, |
997 | 5.09M | "Attribute value type does not match column type"); |
998 | 5.09M | RETURN_NOT_OK(value.Eval(expr_pb->mutable_value())); |
999 | 5.09M | } |
1000 | 6.32M | values->push_back(PrimitiveValue::FromQLValuePB(expr_pb->value(), |
1001 | 6.32M | column.desc().sorting_type())); |
1002 | 6.32M | } |
1003 | | |
1004 | 6.48M | if (--remain_attr == 0) { |
1005 | 5.07M | SCHECK_EQ(hashed_components.size(), target_desc->num_hash_key_columns(), Corruption, |
1006 | 5.07M | "Number of hashed components does not match column description"); |
1007 | 5.07M | SCHECK_EQ(range_components.size(), |
1008 | 5.07M | target_desc->num_key_columns() - target_desc->num_hash_key_columns(), |
1009 | 5.07M | Corruption, "Number of range components does not match column description"); |
1010 | 5.07M | if (hashed_values.empty()) { |
1011 | 704k | return processor(docdb::DocKey(move(range_components)).Encode()); |
1012 | 704k | } |
1013 | 4.37M | string partition_key; |
1014 | 4.37M | const PartitionSchema& partition_schema = target_desc->partition_schema(); |
1015 | 4.37M | RETURN_NOT_OK(partition_schema.EncodeKey(hashed_values, &partition_key)); |
1016 | 4.37M | const uint16_t hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
1017 | | |
1018 | 4.37M | return processor( |
1019 | 4.37M | docdb::DocKey(hash, move(hashed_components), move(range_components)).Encode()); |
1020 | 4.37M | } |
1021 | 1.41M | break; |
1022 | 6.48M | } |
1023 | 8.76M | } |
1024 | 6.48M | } |
1025 | | |
1026 | 529 | return STATUS_FORMAT(Corruption, "Not all attributes ($0) were resolved", remain_attr); |
1027 | 5.07M | } |
1028 | | |
1029 | 667k | Status PgApiImpl::StartOperationsBuffering() { |
1030 | 667k | return pg_session_->StartOperationsBuffering(); |
1031 | 667k | } |
1032 | | |
1033 | 610k | Status PgApiImpl::StopOperationsBuffering() { |
1034 | 610k | return pg_session_->StopOperationsBuffering(); |
1035 | 610k | } |
1036 | | |
1037 | 114k | void PgApiImpl::ResetOperationsBuffering() { |
1038 | 114k | pg_session_->ResetOperationsBuffering(); |
1039 | 114k | } |
1040 | | |
1041 | 595k | Status PgApiImpl::FlushBufferedOperations() { |
1042 | 595k | return pg_session_->FlushBufferedOperations(); |
1043 | 595k | } |
1044 | | |
1045 | 7.20M | Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count) { |
1046 | 7.20M | switch (handle->stmt_op()) { |
1047 | 5.59M | case StmtOp::STMT_INSERT: |
1048 | 6.29M | case StmtOp::STMT_UPDATE: |
1049 | 7.20M | case StmtOp::STMT_DELETE: |
1050 | 7.20M | case StmtOp::STMT_TRUNCATE: |
1051 | 7.20M | { |
1052 | 7.20M | auto dml_write = down_cast<PgDmlWrite *>(handle); |
1053 | 7.20M | RETURN_NOT_OK(dml_write->Exec(rows_affected_count != nullptr /* force_non_bufferable */)); |
1054 | 7.20M | if (rows_affected_count) { |
1055 | 19.1k | *rows_affected_count = dml_write->GetRowsAffectedCount(); |
1056 | 19.1k | } |
1057 | 7.20M | return Status::OK(); |
1058 | 7.20M | } |
1059 | 0 | default: |
1060 | 0 | break; |
1061 | 7.20M | } |
1062 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1063 | 7.20M | } |
1064 | | |
1065 | | // Insert ------------------------------------------------------------------------------------------ |
1066 | | |
1067 | | Status PgApiImpl::NewInsert(const PgObjectId& table_id, |
1068 | | const bool is_single_row_txn, |
1069 | 5.59M | PgStatement **handle) { |
1070 | 5.59M | *handle = nullptr; |
1071 | 5.59M | auto stmt = std::make_unique<PgInsert>(pg_session_, table_id, is_single_row_txn); |
1072 | 5.59M | RETURN_NOT_OK(stmt->Prepare()); |
1073 | 5.59M | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1074 | 5.59M | return Status::OK(); |
1075 | 5.59M | } |
1076 | | |
1077 | 1.04k | Status PgApiImpl::ExecInsert(PgStatement *handle) { |
1078 | 1.04k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1079 | | // Invalid handle. |
1080 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1081 | 0 | } |
1082 | 1.04k | return down_cast<PgInsert*>(handle)->Exec(); |
1083 | 1.04k | } |
1084 | | |
1085 | 953k | Status PgApiImpl::InsertStmtSetUpsertMode(PgStatement *handle) { |
1086 | 953k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1087 | | // Invalid handle. |
1088 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1089 | 0 | } |
1090 | 953k | down_cast<PgInsert*>(handle)->SetUpsertMode(); |
1091 | | |
1092 | 953k | return Status::OK(); |
1093 | 953k | } |
1094 | | |
1095 | 371k | Status PgApiImpl::InsertStmtSetWriteTime(PgStatement *handle, const HybridTime write_time) { |
1096 | 371k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1097 | | // Invalid handle. |
1098 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1099 | 0 | } |
1100 | 371k | RETURN_NOT_OK(down_cast<PgInsert*>(handle)->SetWriteTime(write_time)); |
1101 | 371k | return Status::OK(); |
1102 | 371k | } |
1103 | | |
1104 | 371k | Status PgApiImpl::InsertStmtSetIsBackfill(PgStatement *handle, const bool is_backfill) { |
1105 | 371k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1106 | | // Invalid handle. |
1107 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1108 | 0 | } |
1109 | 371k | down_cast<PgInsert*>(handle)->SetIsBackfill(is_backfill); |
1110 | 371k | return Status::OK(); |
1111 | 371k | } |
1112 | | |
1113 | | // Update ------------------------------------------------------------------------------------------ |
1114 | | |
1115 | | Status PgApiImpl::NewUpdate(const PgObjectId& table_id, |
1116 | | const bool is_single_row_txn, |
1117 | 704k | PgStatement **handle) { |
1118 | 704k | *handle = nullptr; |
1119 | 704k | auto stmt = std::make_unique<PgUpdate>(pg_session_, table_id, is_single_row_txn); |
1120 | 704k | RETURN_NOT_OK(stmt->Prepare()); |
1121 | 704k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1122 | 704k | return Status::OK(); |
1123 | 704k | } |
1124 | | |
1125 | 8 | Status PgApiImpl::ExecUpdate(PgStatement *handle) { |
1126 | 8 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_UPDATE)) { |
1127 | | // Invalid handle. |
1128 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1129 | 0 | } |
1130 | 8 | return down_cast<PgUpdate*>(handle)->Exec(); |
1131 | 8 | } |
1132 | | |
1133 | | // Delete ------------------------------------------------------------------------------------------ |
1134 | | |
1135 | | Status PgApiImpl::NewDelete(const PgObjectId& table_id, |
1136 | | const bool is_single_row_txn, |
1137 | 907k | PgStatement **handle) { |
1138 | 907k | *handle = nullptr; |
1139 | 907k | auto stmt = std::make_unique<PgDelete>(pg_session_, table_id, is_single_row_txn); |
1140 | 907k | RETURN_NOT_OK(stmt->Prepare()); |
1141 | 907k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1142 | 907k | return Status::OK(); |
1143 | 907k | } |
1144 | | |
1145 | 4 | Status PgApiImpl::ExecDelete(PgStatement *handle) { |
1146 | 4 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) { |
1147 | | // Invalid handle. |
1148 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1149 | 0 | } |
1150 | 4 | return down_cast<PgDelete*>(handle)->Exec(); |
1151 | 4 | } |
1152 | | |
1153 | 178 | Status PgApiImpl::NewSample(const PgObjectId& table_id, const int targrows, PgStatement **handle) { |
1154 | 178 | *handle = nullptr; |
1155 | 178 | auto sample = std::make_unique<PgSample>(pg_session_, targrows, table_id); |
1156 | 178 | RETURN_NOT_OK(sample->Prepare()); |
1157 | 178 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(sample), handle)); |
1158 | 178 | return Status::OK(); |
1159 | 178 | } |
1160 | | |
1161 | 178 | Status PgApiImpl::InitRandomState(PgStatement *handle, double rstate_w, uint64 rand_state) { |
1162 | 178 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1163 | | // Invalid handle. |
1164 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1165 | 0 | } |
1166 | 178 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->InitRandomState(rstate_w, rand_state)); |
1167 | 178 | return Status::OK(); |
1168 | 178 | } |
1169 | | |
1170 | 939 | Status PgApiImpl::SampleNextBlock(PgStatement *handle, bool *has_more) { |
1171 | 939 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1172 | | // Invalid handle. |
1173 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1174 | 0 | } |
1175 | 939 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->SampleNextBlock(has_more)); |
1176 | 939 | return Status::OK(); |
1177 | 939 | } |
1178 | | |
1179 | 178 | Status PgApiImpl::ExecSample(PgStatement *handle) { |
1180 | 178 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1181 | | // Invalid handle. |
1182 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1183 | 0 | } |
1184 | 178 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->Exec(nullptr)); |
1185 | 178 | return Status::OK(); |
1186 | 178 | } |
1187 | | |
1188 | 178 | Status PgApiImpl::GetEstimatedRowCount(PgStatement *handle, double *liverows, double *deadrows) { |
1189 | 178 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1190 | | // Invalid handle. |
1191 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1192 | 0 | } |
1193 | 178 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->GetEstimatedRowCount(liverows, deadrows)); |
1194 | 178 | return Status::OK(); |
1195 | 178 | } |
1196 | | |
1197 | 5 | Status PgApiImpl::DeleteStmtSetIsPersistNeeded(PgStatement *handle, const bool is_persist_needed) { |
1198 | 5 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) { |
1199 | | // Invalid handle. |
1200 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1201 | 0 | } |
1202 | 5 | down_cast<PgDelete*>(handle)->SetIsPersistNeeded(is_persist_needed); |
1203 | 5 | return Status::OK(); |
1204 | 5 | } |
1205 | | |
1206 | | // Colocated Truncate ------------------------------------------------------------------------------ |
1207 | | |
1208 | | Status PgApiImpl::NewTruncateColocated(const PgObjectId& table_id, |
1209 | | const bool is_single_row_txn, |
1210 | 90 | PgStatement **handle) { |
1211 | 90 | *handle = nullptr; |
1212 | 90 | auto stmt = std::make_unique<PgTruncateColocated>(pg_session_, table_id, is_single_row_txn); |
1213 | 90 | RETURN_NOT_OK(stmt->Prepare()); |
1214 | 90 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1215 | 90 | return Status::OK(); |
1216 | 90 | } |
1217 | | |
1218 | 0 | Status PgApiImpl::ExecTruncateColocated(PgStatement *handle) { |
1219 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE)) { |
1220 | | // Invalid handle. |
1221 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1222 | 0 | } |
1223 | 0 | return down_cast<PgTruncateColocated*>(handle)->Exec(); |
1224 | 0 | } |
1225 | | |
1226 | | // Select ------------------------------------------------------------------------------------------ |
1227 | | |
1228 | | Status PgApiImpl::NewSelect(const PgObjectId& table_id, |
1229 | | const PgObjectId& index_id, |
1230 | | const PgPrepareParameters *prepare_params, |
1231 | 1.58M | PgStatement **handle) { |
1232 | | // Scenarios: |
1233 | | // - Sequential Scan: PgSelect to read from table_id. |
1234 | | // - Primary Scan: PgSelect from table_id. YugaByte does not have separate table for primary key. |
1235 | | // - Index-Only-Scan: PgSelectIndex directly from secondary index_id. |
1236 | | // - IndexScan: Use PgSelectIndex to read from index_id and then PgSelect to read from table_id. |
1237 | | // Note that for SysTable, only one request is send for both table_id and index_id. |
1238 | 1.58M | *handle = nullptr; |
1239 | 1.58M | std::unique_ptr<PgDmlRead> stmt; |
1240 | 1.58M | if (prepare_params && prepare_params->index_only_scan1.45M && prepare_params->use_secondary_index91.2k ) { |
1241 | 91.3k | if (!index_id.IsValid()) { |
1242 | 0 | return STATUS(InvalidArgument, "Cannot run query with invalid index ID"); |
1243 | 0 | } |
1244 | 91.3k | stmt = std::make_unique<PgSelectIndex>(pg_session_, table_id, index_id, prepare_params); |
1245 | 1.49M | } else { |
1246 | | // For IndexScan PgSelect processing will create subquery PgSelectIndex. |
1247 | 1.49M | stmt = std::make_unique<PgSelect>(pg_session_, table_id, index_id, prepare_params); |
1248 | 1.49M | } |
1249 | | |
1250 | 1.58M | RETURN_NOT_OK(stmt->Prepare()); |
1251 | 1.58M | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1252 | 1.58M | return Status::OK(); |
1253 | 1.58M | } |
1254 | | |
1255 | 1.46M | Status PgApiImpl::SetForwardScan(PgStatement *handle, bool is_forward_scan) { |
1256 | 1.46M | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) { |
1257 | | // Invalid handle. |
1258 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1259 | 0 | } |
1260 | 1.46M | down_cast<PgDmlRead*>(handle)->SetForwardScan(is_forward_scan); |
1261 | 1.46M | return Status::OK(); |
1262 | 1.46M | } |
1263 | | |
1264 | 1.58M | Status PgApiImpl::ExecSelect(PgStatement *handle, const PgExecParameters *exec_params) { |
1265 | 1.58M | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) { |
1266 | | // Invalid handle. |
1267 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1268 | 0 | } |
1269 | 1.58M | return down_cast<PgDmlRead*>(handle)->Exec(exec_params); |
1270 | 1.58M | } |
1271 | | |
1272 | | //-------------------------------------------------------------------------------------------------- |
1273 | | // Expressions. |
1274 | | //-------------------------------------------------------------------------------------------------- |
1275 | | |
1276 | | // Column references ------------------------------------------------------------------------------- |
1277 | | |
1278 | | Status PgApiImpl::NewColumnRef( |
1279 | | PgStatement *stmt, int attr_num, const PgTypeEntity *type_entity, bool collate_is_valid_non_c, |
1280 | 22.3M | const PgTypeAttrs *type_attrs, PgExpr **expr_handle) { |
1281 | 22.3M | if (!stmt) { |
1282 | | // Invalid handle. |
1283 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1284 | 0 | } |
1285 | 22.3M | PgColumnRef::SharedPtr colref = |
1286 | 22.3M | make_shared<PgColumnRef>(attr_num, type_entity, collate_is_valid_non_c, type_attrs); |
1287 | 22.3M | stmt->AddExpr(colref); |
1288 | | |
1289 | 22.3M | *expr_handle = colref.get(); |
1290 | 22.3M | return Status::OK(); |
1291 | 22.3M | } |
1292 | | |
1293 | | // Constant ---------------------------------------------------------------------------------------- |
1294 | | Status PgApiImpl::NewConstant( |
1295 | | YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c, |
1296 | 34.8M | const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle) { |
1297 | 34.8M | if (!stmt) { |
1298 | | // Invalid handle. |
1299 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1300 | 0 | } |
1301 | 34.8M | PgExpr::SharedPtr pg_const = |
1302 | 34.8M | make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey, |
1303 | 34.8M | datum, is_null); |
1304 | 34.8M | stmt->AddExpr(pg_const); |
1305 | | |
1306 | 34.8M | *expr_handle = pg_const.get(); |
1307 | 34.8M | return Status::OK(); |
1308 | 34.8M | } |
1309 | | |
1310 | | Status PgApiImpl::NewConstantVirtual( |
1311 | | YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, |
1312 | 50 | YBCPgDatumKind datum_kind, YBCPgExpr *expr_handle) { |
1313 | 50 | if (!stmt) { |
1314 | | // Invalid handle. |
1315 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1316 | 0 | } |
1317 | 50 | PgExpr::SharedPtr pg_const = |
1318 | 50 | make_shared<PgConstant>(type_entity, false /* collate_is_valid_non_c */, datum_kind); |
1319 | 50 | stmt->AddExpr(pg_const); |
1320 | | |
1321 | 50 | *expr_handle = pg_const.get(); |
1322 | 50 | return Status::OK(); |
1323 | 50 | } |
1324 | | |
1325 | | Status PgApiImpl::NewConstantOp( |
1326 | | YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c, |
1327 | | const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle, |
1328 | 6 | bool is_gt) { |
1329 | 6 | if (!stmt) { |
1330 | | // Invalid handle. |
1331 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1332 | 0 | } |
1333 | 6 | PgExpr::SharedPtr pg_const = |
1334 | 6 | make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey, |
1335 | 6 | datum, is_null, is_gt ? PgExpr::Opcode::PG_EXPR_GT3 : PgExpr::Opcode::PG_EXPR_LT3 ); |
1336 | 6 | stmt->AddExpr(pg_const); |
1337 | | |
1338 | 6 | *expr_handle = pg_const.get(); |
1339 | 6 | return Status::OK(); |
1340 | 6 | } |
1341 | | |
1342 | | // Text constant ----------------------------------------------------------------------------------- |
1343 | | |
1344 | 2.00k | Status PgApiImpl::UpdateConstant(PgExpr *expr, const char *value, bool is_null) { |
1345 | 2.00k | if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) { |
1346 | | // Invalid handle. |
1347 | 0 | return STATUS(InvalidArgument, "Invalid expression handle for constant"); |
1348 | 0 | } |
1349 | 2.00k | down_cast<PgConstant*>(expr)->UpdateConstant(value, is_null); |
1350 | 2.00k | return Status::OK(); |
1351 | 2.00k | } |
1352 | | |
1353 | 36 | Status PgApiImpl::UpdateConstant(PgExpr *expr, const void *value, int64_t bytes, bool is_null) { |
1354 | 36 | if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) { |
1355 | | // Invalid handle. |
1356 | 0 | return STATUS(InvalidArgument, "Invalid expression handle for constant"); |
1357 | 0 | } |
1358 | 36 | down_cast<PgConstant*>(expr)->UpdateConstant(value, bytes, is_null); |
1359 | 36 | return Status::OK(); |
1360 | 36 | } |
1361 | | |
1362 | | // Text constant ----------------------------------------------------------------------------------- |
1363 | | |
1364 | | Status PgApiImpl::NewOperator( |
1365 | | PgStatement *stmt, const char *opname, const YBCPgTypeEntity *type_entity, |
1366 | 22.8k | bool collate_is_valid_non_c, PgExpr **op_handle) { |
1367 | 22.8k | if (!stmt) { |
1368 | | // Invalid handle. |
1369 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1370 | 0 | } |
1371 | 22.8k | RETURN_NOT_OK(PgExpr::CheckOperatorName(opname)); |
1372 | | |
1373 | | // Create operator. |
1374 | 22.8k | PgExpr::SharedPtr pg_op = make_shared<PgOperator>(opname, type_entity, collate_is_valid_non_c); |
1375 | 22.8k | stmt->AddExpr(pg_op); |
1376 | | |
1377 | 22.8k | *op_handle = pg_op.get(); |
1378 | 22.8k | return Status::OK(); |
1379 | 22.8k | } |
1380 | | |
1381 | 22.8k | Status PgApiImpl::OperatorAppendArg(PgExpr *op_handle, PgExpr *arg) { |
1382 | 22.8k | if (!op_handle || !arg) { |
1383 | | // Invalid handle. |
1384 | 0 | return STATUS(InvalidArgument, "Invalid expression handle"); |
1385 | 0 | } |
1386 | 22.8k | down_cast<PgOperator*>(op_handle)->AppendArg(arg); |
1387 | 22.8k | return Status::OK(); |
1388 | 22.8k | } |
1389 | | |
1390 | 2 | Result<bool> PgApiImpl::IsInitDbDone() { |
1391 | 2 | return pg_session_->IsInitDbDone(); |
1392 | 2 | } |
1393 | | |
1394 | 442k | Result<uint64_t> PgApiImpl::GetSharedCatalogVersion() { |
1395 | 442k | return pg_session_->GetSharedCatalogVersion(); |
1396 | 442k | } |
1397 | | |
1398 | 1.95k | Result<uint64_t> PgApiImpl::GetSharedAuthKey() { |
1399 | 1.95k | return pg_session_->GetSharedAuthKey(); |
1400 | 1.95k | } |
1401 | | |
1402 | | // Transaction Control ----------------------------------------------------------------------------- |
1403 | 415k | Status PgApiImpl::BeginTransaction() { |
1404 | 415k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1405 | 415k | return pg_txn_manager_->BeginTransaction(); |
1406 | 415k | } |
1407 | | |
1408 | 69.0k | Status PgApiImpl::RecreateTransaction() { |
1409 | 69.0k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1410 | 69.0k | pg_session_->DropBufferedOperations(); |
1411 | 69.0k | return pg_txn_manager_->RecreateTransaction(); |
1412 | 69.0k | } |
1413 | | |
1414 | 510 | Status PgApiImpl::RestartTransaction() { |
1415 | 510 | pg_session_->InvalidateForeignKeyReferenceCache(); |
1416 | 510 | pg_session_->DropBufferedOperations(); |
1417 | 510 | return pg_txn_manager_->RestartTransaction(); |
1418 | 510 | } |
1419 | | |
1420 | 28.6k | Status PgApiImpl::ResetTransactionReadPoint() { |
1421 | 28.6k | return pg_txn_manager_->ResetTransactionReadPoint(); |
1422 | 28.6k | } |
1423 | | |
1424 | 163 | Status PgApiImpl::RestartReadPoint() { |
1425 | 163 | return pg_txn_manager_->RestartReadPoint(); |
1426 | 163 | } |
1427 | | |
1428 | 381k | Status PgApiImpl::CommitTransaction() { |
1429 | 381k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1430 | 381k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1431 | 381k | return pg_txn_manager_->CommitTransaction(); |
1432 | 381k | } |
1433 | | |
1434 | 50.5k | Status PgApiImpl::AbortTransaction() { |
1435 | 50.5k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1436 | 50.5k | pg_session_->DropBufferedOperations(); |
1437 | 50.5k | return pg_txn_manager_->AbortTransaction(); |
1438 | 50.5k | } |
1439 | | |
1440 | 437k | Status PgApiImpl::SetTransactionIsolationLevel(int isolation) { |
1441 | 437k | return pg_txn_manager_->SetPgIsolationLevel(isolation); |
1442 | 437k | } |
1443 | | |
1444 | 414k | Status PgApiImpl::SetTransactionReadOnly(bool read_only) { |
1445 | 414k | return pg_txn_manager_->SetReadOnly(read_only); |
1446 | 414k | } |
1447 | | |
1448 | 414k | Status PgApiImpl::EnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms) { |
1449 | 414k | return pg_txn_manager_->EnableFollowerReads(enable_follower_reads, staleness_ms); |
1450 | 414k | } |
1451 | | |
1452 | 414k | Status PgApiImpl::SetTransactionDeferrable(bool deferrable) { |
1453 | 414k | return pg_txn_manager_->SetDeferrable(deferrable); |
1454 | 414k | } |
1455 | | |
1456 | 20.3k | Status PgApiImpl::EnterSeparateDdlTxnMode() { |
1457 | | // Flush all buffered operations as ddl txn use its own transaction session. |
1458 | 20.3k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1459 | 20.3k | return pg_txn_manager_->EnterSeparateDdlTxnMode(); |
1460 | 20.3k | } |
1461 | | |
1462 | 18.6k | Status PgApiImpl::ExitSeparateDdlTxnMode() { |
1463 | | // Flush all buffered operations as ddl txn use its own transaction session. |
1464 | 18.6k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1465 | 18.6k | RETURN_NOT_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kTrue)); |
1466 | | // Next reads from catalog tables have to see changes made by the DDL transaction. |
1467 | 18.6k | ResetCatalogReadTime(); |
1468 | 18.6k | return Status::OK(); |
1469 | 18.6k | } |
1470 | | |
1471 | 1.71k | void PgApiImpl::ClearSeparateDdlTxnMode() { |
1472 | 1.71k | pg_session_->DropBufferedOperations(); |
1473 | 1.71k | CHECK_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kFalse)); |
1474 | 1.71k | } |
1475 | | |
1476 | 61.7k | Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) { |
1477 | 61.7k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1478 | 61.7k | return pg_session_->SetActiveSubTransaction(id); |
1479 | 61.7k | } |
1480 | | |
1481 | 13.5k | Status PgApiImpl::RollbackSubTransaction(SubTransactionId id) { |
1482 | 13.5k | pg_session_->DropBufferedOperations(); |
1483 | 13.5k | return pg_session_->RollbackSubTransaction(id); |
1484 | 13.5k | } |
1485 | | |
1486 | 2.37M | void PgApiImpl::ResetCatalogReadTime() { |
1487 | 2.37M | pg_session_->ResetCatalogReadPoint(); |
1488 | 2.37M | } |
1489 | | |
1490 | | Result<bool> PgApiImpl::ForeignKeyReferenceExists( |
1491 | 237k | PgOid table_id, const Slice& ybctid, PgOid database_id) { |
1492 | 237k | return pg_session_->ForeignKeyReferenceExists( |
1493 | 237k | table_id, ybctid, std::bind(FetchExistingYbctids, |
1494 | 237k | pg_session_, |
1495 | 237k | database_id, |
1496 | 237k | std::placeholders::_1, |
1497 | 237k | std::placeholders::_2)); |
1498 | 237k | } |
1499 | | |
1500 | 245k | void PgApiImpl::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) { |
1501 | 245k | pg_session_->AddForeignKeyReferenceIntent(table_id, ybctid); |
1502 | 245k | } |
1503 | | |
1504 | 724k | void PgApiImpl::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
1505 | 724k | pg_session_->DeleteForeignKeyReference(table_id, ybctid); |
1506 | 724k | } |
1507 | | |
1508 | 4.38M | void PgApiImpl::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
1509 | 4.38M | pg_session_->AddForeignKeyReference(table_id, ybctid); |
1510 | 4.38M | } |
1511 | | |
1512 | 2 | void PgApiImpl::SetTimeout(const int timeout_ms) { |
1513 | 2 | pg_session_->SetTimeout(timeout_ms); |
1514 | 2 | } |
1515 | | |
1516 | 4 | Result<client::TabletServersInfo> PgApiImpl::ListTabletServers() { |
1517 | 4 | return pg_session_->ListTabletServers(); |
1518 | 4 | } |
1519 | | |
1520 | 1 | Status PgApiImpl::ValidatePlacement(const char *placement_info) { |
1521 | 1 | return pg_session_->ValidatePlacement(placement_info); |
1522 | 1 | } |
1523 | | |
1524 | | } // namespace pggate |
1525 | | } // namespace yb |