/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pggate.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | //-------------------------------------------------------------------------------------------------- |
14 | | |
15 | | #include "yb/yql/pggate/pggate.h" |
16 | | |
17 | | #include <boost/optional.hpp> |
18 | | |
19 | | #include "yb/client/client_fwd.h" |
20 | | #include "yb/client/client.h" |
21 | | #include "yb/client/client_utils.h" |
22 | | #include "yb/client/tablet_server.h" |
23 | | |
24 | | #include "yb/common/partition.h" |
25 | | #include "yb/common/pg_system_attr.h" |
26 | | #include "yb/common/schema.h" |
27 | | |
28 | | #include "yb/docdb/doc_key.h" |
29 | | #include "yb/docdb/primitive_value.h" |
30 | | #include "yb/docdb/value_type.h" |
31 | | |
32 | | #include "yb/gutil/casts.h" |
33 | | |
34 | | #include "yb/rpc/messenger.h" |
35 | | #include "yb/rpc/proxy.h" |
36 | | #include "yb/rpc/secure_stream.h" |
37 | | |
38 | | #include "yb/server/secure.h" |
39 | | |
40 | | #include "yb/tserver/tserver_forward_service.proxy.h" |
41 | | #include "yb/tserver/tserver_shared_mem.h" |
42 | | |
43 | | #include "yb/util/format.h" |
44 | | #include "yb/util/range.h" |
45 | | #include "yb/util/shared_mem.h" |
46 | | #include "yb/util/status_format.h" |
47 | | #include "yb/util/status_log.h" |
48 | | |
49 | | #include "yb/yql/pggate/pg_ddl.h" |
50 | | #include "yb/yql/pggate/pg_delete.h" |
51 | | #include "yb/yql/pggate/pg_insert.h" |
52 | | #include "yb/yql/pggate/pg_memctx.h" |
53 | | #include "yb/yql/pggate/pg_sample.h" |
54 | | #include "yb/yql/pggate/pg_select.h" |
55 | | #include "yb/yql/pggate/pg_truncate_colocated.h" |
56 | | #include "yb/yql/pggate/pg_txn_manager.h" |
57 | | #include "yb/yql/pggate/pg_update.h" |
58 | | #include "yb/yql/pggate/pggate_flags.h" |
59 | | #include "yb/yql/pggate/ybc_pggate.h" |
60 | | |
61 | | using namespace std::literals; |
62 | | |
63 | | DECLARE_string(rpc_bind_addresses); |
64 | | DECLARE_bool(use_node_to_node_encryption); |
65 | | DECLARE_string(certs_dir); |
66 | | DECLARE_bool(node_to_node_encryption_use_client_certificates); |
67 | | DECLARE_bool(ysql_forward_rpcs_to_local_tserver); |
68 | | DECLARE_bool(use_node_hostname_for_local_tserver); |
69 | | DECLARE_int32(backfill_index_client_rpc_timeout_ms); |
70 | | |
71 | | namespace yb { |
72 | | namespace pggate { |
73 | | |
74 | | using docdb::PrimitiveValue; |
75 | | using docdb::ValueType; |
76 | | |
77 | | namespace { |
78 | | |
79 | | CHECKED_STATUS AddColumn(PgCreateTable* pg_stmt, const char *attr_name, int attr_num, |
80 | | const YBCPgTypeEntity *attr_type, bool is_hash, bool is_range, |
81 | 3.04k | bool is_desc, bool is_nulls_first) { |
82 | 3.04k | using SortingType = SortingType; |
83 | 3.04k | SortingType sorting_type = SortingType::kNotSpecified; |
84 | | |
85 | 3.04k | if (!is_hash && is_range) { |
86 | 227 | if (is_desc) { |
87 | 16 | sorting_type = is_nulls_first ? SortingType::kDescending : SortingType::kDescendingNullsLast; |
88 | 209 | } else { |
89 | 209 | sorting_type = is_nulls_first ? SortingType::kAscending : SortingType::kAscendingNullsLast; |
90 | 209 | } |
91 | 227 | } |
92 | | |
93 | 3.04k | return pg_stmt->AddColumn(attr_name, attr_num, attr_type, is_hash, is_range, sorting_type); |
94 | 3.04k | } |
95 | | |
96 | | Result<PgApiContext::MessengerHolder> BuildMessenger( |
97 | | const string& client_name, |
98 | | int32_t num_reactors, |
99 | | const scoped_refptr<MetricEntity>& metric_entity, |
100 | 1.65k | const std::shared_ptr<MemTracker>& parent_mem_tracker) { |
101 | 1.65k | std::unique_ptr<rpc::SecureContext> secure_context; |
102 | 1.65k | if (FLAGS_use_node_to_node_encryption) { |
103 | 0 | secure_context = VERIFY_RESULT(server::CreateSecureContext( |
104 | 0 | FLAGS_certs_dir, |
105 | 0 | server::UseClientCerts(FLAGS_node_to_node_encryption_use_client_certificates))); |
106 | 0 | } |
107 | 1.65k | auto messenger = VERIFY_RESULT(client::CreateClientMessenger( |
108 | 1.65k | client_name, num_reactors, metric_entity, parent_mem_tracker, secure_context.get())); |
109 | 1.65k | return PgApiContext::MessengerHolder{std::move(secure_context), std::move(messenger)}; |
110 | 1.65k | } |
111 | | |
112 | 1.65k | std::unique_ptr<tserver::TServerSharedObject> InitTServerSharedObject() { |
113 | 1.65k | LOG(INFO) << __func__ << ": " << YBCIsInitDbModeEnvVarSet() << ", " |
114 | 1.65k | << FLAGS_TEST_pggate_ignore_tserver_shm << ", " << FLAGS_pggate_tserver_shm_fd; |
115 | | // Do not use shared memory in initdb or if explicity set to be ignored. |
116 | 1.65k | if (FLAGS_TEST_pggate_ignore_tserver_shm || FLAGS_pggate_tserver_shm_fd == -1) { |
117 | 0 | return nullptr; |
118 | 0 | } |
119 | 1.65k | return std::make_unique<tserver::TServerSharedObject>(CHECK_RESULT( |
120 | 1.65k | tserver::TServerSharedObject::OpenReadOnly(FLAGS_pggate_tserver_shm_fd))); |
121 | 1.65k | } |
122 | | |
123 | | Result<std::vector<std::string>> FetchExistingYbctids(PgSession::ScopedRefPtr session, |
124 | | PgOid database_id, |
125 | | PgOid table_id, |
126 | 525 | const std::vector<Slice>& ybctids) { |
127 | 525 | auto desc = VERIFY_RESULT(session->LoadTable(PgObjectId(database_id, table_id))); |
128 | 525 | PgTable target(desc); |
129 | 525 | auto read_op = std::make_shared<PgsqlReadOp>(*target); |
130 | 525 | PgsqlExpressionPB* expr_pb = read_op->read_request().add_targets(); |
131 | 525 | expr_pb->set_column_id(to_underlying(PgSystemAttrNum::kYBTupleId)); |
132 | 525 | auto doc_op = std::make_shared<PgDocReadOp>(session, &target, std::move(read_op)); |
133 | | |
134 | | // Postgres uses SELECT FOR KEY SHARE query for FK check. |
135 | | // Use same lock level. |
136 | 525 | PgExecParameters exec_params = doc_op->ExecParameters(); |
137 | 525 | exec_params.rowmark = ROW_MARK_KEYSHARE; |
138 | 525 | RETURN_NOT_OK(doc_op->ExecuteInit(&exec_params)); |
139 | 525 | RETURN_NOT_OK(doc_op->PopulateDmlByYbctidOps(ybctids)); |
140 | 525 | RETURN_NOT_OK(doc_op->Execute()); |
141 | 525 | std::vector<std::string> result; |
142 | 525 | result.reserve(ybctids.size()); |
143 | 525 | std::list<PgDocResult> rowsets; |
144 | 1.04k | do { |
145 | 1.04k | rowsets.clear(); |
146 | 1.04k | RETURN_NOT_OK(doc_op->GetResult(&rowsets)); |
147 | 1.03k | for (auto& row : rowsets) { |
148 | 767 | RETURN_NOT_OK(row.ProcessSystemColumns()); |
149 | 100k | for (const auto& ybctid : row.ybctids()) { |
150 | 100k | result.push_back(ybctid.ToBuffer()); |
151 | 100k | } |
152 | 767 | } |
153 | 1.03k | } while (!rowsets.empty()); |
154 | 518 | return result; |
155 | 525 | } |
156 | | |
157 | | } // namespace |
158 | | |
159 | | using std::make_shared; |
160 | | using client::YBSession; |
161 | | |
162 | | //-------------------------------------------------------------------------------------------------- |
163 | | |
164 | 1.65k | PggateOptions::PggateOptions() : ServerBaseOptions(kDefaultPort) { |
165 | 1.65k | server_type = "tserver"; |
166 | 1.65k | rpc_opts.connection_keepalive_time_ms = FLAGS_pgsql_rpc_keepalive_time_ms; |
167 | | |
168 | 1.65k | if (FLAGS_pggate_proxy_bind_address.empty()) { |
169 | 1.65k | HostPort host_port; |
170 | 1.65k | CHECK_OK(host_port.ParseString(FLAGS_rpc_bind_addresses, 0)); |
171 | 1.65k | host_port.set_port(PggateOptions::kDefaultPort); |
172 | 1.65k | FLAGS_pggate_proxy_bind_address = host_port.ToString(); |
173 | 1.65k | LOG(INFO) << "Reset YSQL bind address to " << FLAGS_pggate_proxy_bind_address; |
174 | 1.65k | } |
175 | 1.65k | rpc_opts.rpc_bind_addresses = FLAGS_pggate_proxy_bind_address; |
176 | 1.65k | master_addresses_flag = FLAGS_pggate_master_addresses; |
177 | | |
178 | 1.65k | server::MasterAddresses master_addresses; |
179 | | // TODO: we might have to allow setting master_replication_factor similarly to how it is done |
180 | | // in tserver to support master auto-discovery on Kubernetes. |
181 | 1.65k | CHECK_OK(server::DetermineMasterAddresses( |
182 | 1.65k | "pggate_master_addresses", master_addresses_flag, /* master_replication_factor */ 0, |
183 | 1.65k | &master_addresses, &master_addresses_flag)); |
184 | 1.65k | SetMasterAddresses(make_shared<server::MasterAddresses>(std::move(master_addresses))); |
185 | 1.65k | } |
186 | | |
187 | | PgApiContext::MessengerHolder::MessengerHolder( |
188 | | std::unique_ptr<rpc::SecureContext> security_context_, |
189 | | std::unique_ptr<rpc::Messenger> messenger_) |
190 | 1.65k | : security_context(std::move(security_context_)), messenger(std::move(messenger_)) { |
191 | 1.65k | } |
192 | | |
193 | | PgApiContext::MessengerHolder::MessengerHolder(MessengerHolder&& rhs) |
194 | | : security_context(std::move(rhs.security_context)), |
195 | 4.95k | messenger(std::move(rhs.messenger)) { |
196 | 4.95k | } |
197 | | |
198 | 6.60k | PgApiContext::MessengerHolder::~MessengerHolder() { |
199 | 6.60k | } |
200 | | |
201 | | PgApiContext::PgApiContext() |
202 | | : metric_registry(new MetricRegistry()), |
203 | | metric_entity(METRIC_ENTITY_server.Instantiate(metric_registry.get(), "yb.pggate")), |
204 | | mem_tracker(MemTracker::CreateTracker("PostgreSQL")), |
205 | | messenger_holder(CHECK_RESULT(BuildMessenger("pggate_ybclient", |
206 | | FLAGS_pggate_ybclient_reactor_threads, |
207 | | metric_entity, |
208 | | mem_tracker))), |
209 | 1.65k | proxy_cache(std::make_unique<rpc::ProxyCache>(messenger_holder.messenger.get())) { |
210 | 1.65k | } |
211 | | |
212 | 0 | PgApiContext::PgApiContext(PgApiContext&&) = default; |
213 | | |
214 | 1.65k | PgApiContext::~PgApiContext() = default; |
215 | | |
216 | | //-------------------------------------------------------------------------------------------------- |
217 | | |
218 | | PgApiImpl::PgApiImpl( |
219 | | PgApiContext context, const YBCPgTypeEntity *YBCDataTypeArray, int count, |
220 | | YBCPgCallbacks callbacks) |
221 | | : metric_registry_(std::move(context.metric_registry)), |
222 | | metric_entity_(std::move(context.metric_entity)), |
223 | | mem_tracker_(std::move(context.mem_tracker)), |
224 | | messenger_holder_(std::move(context.messenger_holder)), |
225 | | async_client_init_(messenger_holder_.messenger.get()->name(), |
226 | | FLAGS_pggate_ybclient_reactor_threads, |
227 | | FLAGS_pggate_rpc_timeout_secs, |
228 | | "" /* tserver_uuid */, |
229 | | &pggate_options_, |
230 | | metric_entity_, |
231 | | mem_tracker_, |
232 | | messenger_holder_.messenger.get()), |
233 | | proxy_cache_(std::move(context.proxy_cache)), |
234 | | clock_(new server::HybridClock()), |
235 | | tserver_shared_object_(InitTServerSharedObject()), |
236 | | pg_callbacks_(callbacks), |
237 | | pg_txn_manager_( |
238 | | new PgTxnManager( |
239 | 1.65k | &pg_client_, clock_, tserver_shared_object_.get(), pg_callbacks_)) { |
240 | 1.65k | CHECK_OK(clock_->Init()); |
241 | | |
242 | | // Setup type mapping. |
243 | 269k | for (int idx = 0; idx < count; idx++) { |
244 | 267k | const YBCPgTypeEntity *type_entity = &YBCDataTypeArray[idx]; |
245 | 267k | type_map_[type_entity->type_oid] = type_entity; |
246 | 267k | } |
247 | 1.65k | if (FLAGS_ysql_forward_rpcs_to_local_tserver) { |
248 | 0 | async_client_init_.AddPostCreateHook([this](client::YBClient *client) { |
249 | 0 | const auto& tserver_shared_data = **tserver_shared_object_; |
250 | 0 | HostPort host_port(tserver_shared_data.endpoint()); |
251 | 0 | MonoDelta resolve_cache_timeout; |
252 | 0 | if (FLAGS_use_node_hostname_for_local_tserver) { |
253 | 0 | host_port = HostPort(tserver_shared_data.host().ToBuffer(), |
254 | 0 | tserver_shared_data.endpoint().port()); |
255 | 0 | resolve_cache_timeout = MonoDelta::kMax; |
256 | 0 | } |
257 | 0 | auto proxy = std::make_shared<tserver::TabletServerForwardServiceProxy>( |
258 | 0 | &client->proxy_cache(), host_port, nullptr /* protocol */, resolve_cache_timeout); |
259 | 0 | client->SetNodeLocalForwardProxy(proxy); |
260 | 0 | client->SetNodeLocalTServerHostPort(host_port); |
261 | 0 | }); |
262 | 0 | } |
263 | 1.65k | async_client_init_.Start(); |
264 | | |
265 | 1.65k | CHECK_OK(pg_client_.Start( |
266 | 1.65k | proxy_cache_.get(), &messenger_holder_.messenger->scheduler(), |
267 | 1.65k | *DCHECK_NOTNULL(tserver_shared_object_))); |
268 | 1.65k | } |
269 | | |
270 | 1.64k | PgApiImpl::~PgApiImpl() { |
271 | 1.64k | messenger_holder_.messenger->Shutdown(); |
272 | 1.64k | async_client_init_.client()->Shutdown(); |
273 | 1.64k | pg_txn_manager_.reset(); |
274 | 1.64k | pg_client_.Shutdown(); |
275 | 1.64k | } |
276 | | |
277 | 24.5M | const YBCPgTypeEntity *PgApiImpl::FindTypeEntity(int type_oid) { |
278 | 24.5M | const auto iter = type_map_.find(type_oid); |
279 | 24.5M | if (iter != type_map_.end()) { |
280 | 24.4M | return iter->second; |
281 | 24.4M | } |
282 | 120k | return nullptr; |
283 | 120k | } |
284 | | |
285 | | //-------------------------------------------------------------------------------------------------- |
286 | | |
287 | 0 | Status PgApiImpl::CreateEnv(PgEnv **pg_env) { |
288 | 0 | *pg_env = pg_env_.get(); |
289 | 0 | return Status::OK(); |
290 | 0 | } |
291 | | |
292 | 0 | Status PgApiImpl::DestroyEnv(PgEnv *pg_env) { |
293 | 0 | pg_env_ = nullptr; |
294 | 0 | return Status::OK(); |
295 | 0 | } |
296 | | |
297 | | //-------------------------------------------------------------------------------------------------- |
298 | | |
299 | | Status PgApiImpl::InitSession(const PgEnv *pg_env, |
300 | 1.65k | const string& database_name) { |
301 | 1.65k | CHECK(!pg_session_); |
302 | 1.65k | auto session = make_scoped_refptr<PgSession>(client(), |
303 | 1.65k | &pg_client_, |
304 | 1.65k | database_name, |
305 | 1.65k | pg_txn_manager_, |
306 | 1.65k | clock_, |
307 | 1.65k | tserver_shared_object_.get(), |
308 | 1.65k | pg_callbacks_); |
309 | 1.65k | if (!database_name.empty()) { |
310 | 1.65k | RETURN_NOT_OK(session->ConnectDatabase(database_name)); |
311 | 1.65k | } |
312 | | |
313 | 1.65k | pg_session_.swap(session); |
314 | 1.65k | return Status::OK(); |
315 | 1.65k | } |
316 | | |
317 | 613 | Status PgApiImpl::InvalidateCache() { |
318 | 613 | pg_session_->InvalidateAllTablesCache(); |
319 | 613 | return Status::OK(); |
320 | 613 | } |
321 | | |
322 | 55 | bool PgApiImpl::GetDisableTransparentCacheRefreshRetry() { |
323 | 55 | return FLAGS_TEST_ysql_disable_transparent_cache_refresh_retry; |
324 | 55 | } |
325 | | |
326 | | //-------------------------------------------------------------------------------------------------- |
327 | | |
328 | 217k | PgMemctx *PgApiImpl::CreateMemctx() { |
329 | | // Postgres will create YB Memctx when it first use the Memctx to allocate YugaByte object. |
330 | 217k | return PgMemctx::Create(); |
331 | 217k | } |
332 | | |
333 | 215k | Status PgApiImpl::DestroyMemctx(PgMemctx *memctx) { |
334 | | // Postgres will destroy YB Memctx by releasing the pointer. |
335 | 215k | return PgMemctx::Destroy(memctx); |
336 | 215k | } |
337 | | |
338 | 1.17M | Status PgApiImpl::ResetMemctx(PgMemctx *memctx) { |
339 | | // Postgres reset YB Memctx when clearing a context content without clearing its nested context. |
340 | 1.17M | return PgMemctx::Reset(memctx); |
341 | 1.17M | } |
342 | | |
343 | | // TODO(neil) Use Arena in the future. |
344 | | // - PgStatement should have been declared as derived class of "MCBase". |
345 | | // - All objects of PgStatement's derived class should be allocated by YbPgMemctx::Arena. |
346 | | // - We cannot use Arena yet because quite a large number of YugaByte objects are being referenced |
347 | | // from other layers. Those added code violated the original design as they assume ScopedPtr |
348 | | // instead of memory pool is being used. This mess should be cleaned up later. |
349 | | // |
350 | | // For now, statements is allocated as ScopedPtr and cached in the memory context. The statements |
351 | | // would then be destructed when the context is destroyed and all other references are also cleared. |
352 | | Status PgApiImpl::AddToCurrentPgMemctx(std::unique_ptr<PgStatement> stmt, |
353 | 2.69M | PgStatement **handle) { |
354 | 2.69M | *handle = stmt.get(); |
355 | 2.69M | pg_callbacks_.GetCurrentYbMemctx()->Register(stmt.release()); |
356 | 2.69M | return Status::OK(); |
357 | 2.69M | } |
358 | | |
359 | | // TODO(neil) Most like we don't need table_desc. If we do need it, use Arena here. |
360 | | // - PgTableDesc should have been declared as derived class of "MCBase". |
361 | | // - PgTableDesc objects should be allocated by YbPgMemctx::Arena. |
362 | | // |
363 | | // For now, table_desc is allocated as ScopedPtr and cached in the memory context. The table_desc |
364 | | // would then be destructed when the context is destroyed. |
365 | | Status PgApiImpl::AddToCurrentPgMemctx(size_t table_desc_id, |
366 | 1.03M | const PgTableDescPtr &table_desc) { |
367 | 1.03M | pg_callbacks_.GetCurrentYbMemctx()->Cache(table_desc_id, table_desc); |
368 | 1.03M | return Status::OK(); |
369 | 1.03M | } |
370 | | |
371 | 5.96M | Status PgApiImpl::GetTabledescFromCurrentPgMemctx(size_t table_desc_id, PgTableDesc **handle) { |
372 | 5.96M | pg_callbacks_.GetCurrentYbMemctx()->GetCache(table_desc_id, handle); |
373 | 5.96M | return Status::OK(); |
374 | 5.96M | } |
375 | | |
376 | | //-------------------------------------------------------------------------------------------------- |
377 | | |
378 | 0 | Status PgApiImpl::CreateSequencesDataTable() { |
379 | 0 | return pg_session_->CreateSequencesDataTable(); |
380 | 0 | } |
381 | | |
382 | | Status PgApiImpl::InsertSequenceTuple(int64_t db_oid, |
383 | | int64_t seq_oid, |
384 | | uint64_t ysql_catalog_version, |
385 | | int64_t last_val, |
386 | 54 | bool is_called) { |
387 | 54 | return pg_session_->InsertSequenceTuple( |
388 | 54 | db_oid, seq_oid, ysql_catalog_version, last_val, is_called); |
389 | 54 | } |
390 | | |
391 | | Status PgApiImpl::UpdateSequenceTupleConditionally(int64_t db_oid, |
392 | | int64_t seq_oid, |
393 | | uint64_t ysql_catalog_version, |
394 | | int64_t last_val, |
395 | | bool is_called, |
396 | | int64_t expected_last_val, |
397 | | bool expected_is_called, |
398 | 31 | bool *skipped) { |
399 | 31 | return pg_session_->UpdateSequenceTuple( |
400 | 31 | db_oid, seq_oid, ysql_catalog_version, last_val, is_called, |
401 | 31 | expected_last_val, expected_is_called, skipped); |
402 | 31 | } |
403 | | |
404 | | Status PgApiImpl::UpdateSequenceTuple(int64_t db_oid, |
405 | | int64_t seq_oid, |
406 | | uint64_t ysql_catalog_version, |
407 | | int64_t last_val, |
408 | | bool is_called, |
409 | 2 | bool* skipped) { |
410 | 2 | return pg_session_->UpdateSequenceTuple( |
411 | 2 | db_oid, seq_oid, ysql_catalog_version, last_val, |
412 | 2 | is_called, boost::none, boost::none, skipped); |
413 | 2 | } |
414 | | |
415 | | Status PgApiImpl::ReadSequenceTuple(int64_t db_oid, |
416 | | int64_t seq_oid, |
417 | | uint64_t ysql_catalog_version, |
418 | | int64_t *last_val, |
419 | 69 | bool *is_called) { |
420 | 69 | return pg_session_->ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version, last_val, is_called); |
421 | 69 | } |
422 | | |
423 | 44 | Status PgApiImpl::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { |
424 | 44 | return pg_session_->DeleteSequenceTuple(db_oid, seq_oid); |
425 | 44 | } |
426 | | |
427 | | |
428 | | //-------------------------------------------------------------------------------------------------- |
429 | | |
430 | 2.30M | void PgApiImpl::DeleteStatement(PgStatement *handle) { |
431 | 2.30M | if (handle) { |
432 | 2.30M | PgMemctx::Destroy(handle); |
433 | 2.30M | } |
434 | 2.30M | } |
435 | | |
436 | | //-------------------------------------------------------------------------------------------------- |
437 | | |
438 | 0 | Status PgApiImpl::ConnectDatabase(const char *database_name) { |
439 | 0 | return pg_session_->ConnectDatabase(database_name); |
440 | 0 | } |
441 | | |
442 | 1.61k | Status PgApiImpl::IsDatabaseColocated(const PgOid database_oid, bool *colocated) { |
443 | 1.61k | return pg_session_->IsDatabaseColocated(database_oid, colocated); |
444 | 1.61k | } |
445 | | |
446 | | Status PgApiImpl::NewCreateDatabase(const char *database_name, |
447 | | const PgOid database_oid, |
448 | | const PgOid source_database_oid, |
449 | | const PgOid next_oid, |
450 | | const bool colocated, |
451 | 22 | PgStatement **handle) { |
452 | 22 | auto stmt = std::make_unique<PgCreateDatabase>(pg_session_, database_name, database_oid, |
453 | 22 | source_database_oid, next_oid, colocated); |
454 | 22 | if (pg_txn_manager_->IsDdlMode()) { |
455 | 22 | stmt->UseTransaction(); |
456 | 22 | } |
457 | 22 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
458 | 22 | return Status::OK(); |
459 | 22 | } |
460 | | |
461 | 22 | Status PgApiImpl::ExecCreateDatabase(PgStatement *handle) { |
462 | 22 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_DATABASE)) { |
463 | | // Invalid handle. |
464 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
465 | 0 | } |
466 | | |
467 | 22 | return down_cast<PgCreateDatabase*>(handle)->Exec(); |
468 | 22 | } |
469 | | |
470 | | Status PgApiImpl::NewDropDatabase(const char *database_name, |
471 | | PgOid database_oid, |
472 | 21 | PgStatement **handle) { |
473 | 21 | auto stmt = std::make_unique<PgDropDatabase>(pg_session_, database_name, database_oid); |
474 | 21 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
475 | 21 | return Status::OK(); |
476 | 21 | } |
477 | | |
478 | 21 | Status PgApiImpl::ExecDropDatabase(PgStatement *handle) { |
479 | 21 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_DATABASE)) { |
480 | | // Invalid handle. |
481 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
482 | 0 | } |
483 | 21 | return down_cast<PgDropDatabase*>(handle)->Exec(); |
484 | 21 | } |
485 | | |
486 | | Status PgApiImpl::NewAlterDatabase(const char *database_name, |
487 | | PgOid database_oid, |
488 | 0 | PgStatement **handle) { |
489 | 0 | auto stmt = std::make_unique<PgAlterDatabase>(pg_session_, database_name, database_oid); |
490 | 0 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
491 | 0 | return Status::OK(); |
492 | 0 | } |
493 | | |
494 | 0 | Status PgApiImpl::AlterDatabaseRenameDatabase(PgStatement *handle, const char *newname) { |
495 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) { |
496 | | // Invalid handle. |
497 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
498 | 0 | } |
499 | 0 | down_cast<PgAlterDatabase*>(handle)->RenameDatabase(newname); |
500 | 0 | return Status::OK(); |
501 | 0 | } |
502 | | |
503 | 0 | Status PgApiImpl::ExecAlterDatabase(PgStatement *handle) { |
504 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) { |
505 | | // Invalid handle. |
506 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
507 | 0 | } |
508 | 0 | return down_cast<PgAlterDatabase*>(handle)->Exec(); |
509 | 0 | } |
510 | | |
511 | | Status PgApiImpl::ReserveOids(const PgOid database_oid, |
512 | | const PgOid next_oid, |
513 | | const uint32_t count, |
514 | | PgOid *begin_oid, |
515 | 380 | PgOid *end_oid) { |
516 | 380 | auto p = VERIFY_RESULT(pg_client_.ReserveOids(database_oid, next_oid, count)); |
517 | 380 | *begin_oid = p.first; |
518 | 380 | *end_oid = p.second; |
519 | 380 | return Status::OK(); |
520 | 380 | } |
521 | | |
522 | 0 | Status PgApiImpl::GetCatalogMasterVersion(uint64_t *version) { |
523 | 0 | return pg_session_->GetCatalogMasterVersion(version); |
524 | 0 | } |
525 | | |
526 | 5.15k | Result<PgTableDescPtr> PgApiImpl::LoadTable(const PgObjectId& table_id) { |
527 | 5.15k | return pg_session_->LoadTable(table_id); |
528 | 5.15k | } |
529 | | |
530 | 2 | void PgApiImpl::InvalidateTableCache(const PgObjectId& table_id) { |
531 | 2 | pg_session_->InvalidateTableCache(table_id, InvalidateOnPgClient::kTrue); |
532 | 2 | } |
533 | | |
534 | | //-------------------------------------------------------------------------------------------------- |
535 | | |
536 | | Status PgApiImpl::NewCreateTablegroup(const char *database_name, |
537 | | const PgOid database_oid, |
538 | | const PgOid tablegroup_oid, |
539 | | const PgOid tablespace_oid, |
540 | 1 | PgStatement **handle) { |
541 | 1 | auto stmt = std::make_unique<PgCreateTablegroup>(pg_session_, database_name, |
542 | 1 | database_oid, tablegroup_oid, tablespace_oid); |
543 | 1 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
544 | 1 | return Status::OK(); |
545 | 1 | } |
546 | | |
547 | 1 | Status PgApiImpl::ExecCreateTablegroup(PgStatement *handle) { |
548 | 1 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLEGROUP)) { |
549 | | // Invalid handle. |
550 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
551 | 0 | } |
552 | | |
553 | 1 | return down_cast<PgCreateTablegroup*>(handle)->Exec(); |
554 | 1 | } |
555 | | |
556 | | Status PgApiImpl::NewDropTablegroup(const PgOid database_oid, |
557 | | const PgOid tablegroup_oid, |
558 | 1 | PgStatement **handle) { |
559 | 1 | auto stmt = std::make_unique<PgDropTablegroup>(pg_session_, database_oid, tablegroup_oid); |
560 | 1 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
561 | 1 | return Status::OK(); |
562 | 1 | } |
563 | | |
564 | | |
565 | 0 | Status PgApiImpl::ExecDropTablegroup(PgStatement *handle) { |
566 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLEGROUP)) { |
567 | | // Invalid handle. |
568 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
569 | 0 | } |
570 | 0 | return down_cast<PgDropTablegroup*>(handle)->Exec(); |
571 | 0 | } |
572 | | |
573 | | |
574 | | //-------------------------------------------------------------------------------------------------- |
575 | | |
576 | | Status PgApiImpl::NewCreateTable(const char *database_name, |
577 | | const char *schema_name, |
578 | | const char *table_name, |
579 | | const PgObjectId& table_id, |
580 | | bool is_shared_table, |
581 | | bool if_not_exist, |
582 | | bool add_primary_key, |
583 | | const bool colocated, |
584 | | const PgObjectId& tablegroup_oid, |
585 | | const PgObjectId& tablespace_oid, |
586 | | const PgObjectId& matview_pg_table_oid, |
587 | 1.27k | PgStatement **handle) { |
588 | 1.27k | auto stmt = std::make_unique<PgCreateTable>( |
589 | 1.27k | pg_session_, database_name, schema_name, table_name, |
590 | 1.27k | table_id, is_shared_table, if_not_exist, add_primary_key, colocated, tablegroup_oid, |
591 | 1.27k | tablespace_oid, matview_pg_table_oid); |
592 | 1.27k | if (pg_txn_manager_->IsDdlMode()) { |
593 | 1.27k | stmt->UseTransaction(); |
594 | 1.27k | } |
595 | 1.27k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
596 | 1.27k | return Status::OK(); |
597 | 1.27k | } |
598 | | |
599 | | Status PgApiImpl::CreateTableAddColumn(PgStatement *handle, const char *attr_name, int attr_num, |
600 | | const YBCPgTypeEntity *attr_type, |
601 | | bool is_hash, bool is_range, |
602 | 2.84k | bool is_desc, bool is_nulls_first) { |
603 | 2.84k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) { |
604 | | // Invalid handle. |
605 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
606 | 0 | } |
607 | 2.84k | return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type, |
608 | 2.84k | is_hash, is_range, is_desc, is_nulls_first); |
609 | 2.84k | } |
610 | | |
611 | 162 | Status PgApiImpl::CreateTableSetNumTablets(PgStatement *handle, int32_t num_tablets) { |
612 | 162 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) { |
613 | | // Invalid handle. |
614 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
615 | 0 | } |
616 | 162 | return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets); |
617 | 162 | } |
618 | | |
619 | 15 | Status PgApiImpl::AddSplitBoundary(PgStatement *handle, PgExpr **exprs, int expr_count) { |
620 | | // Partitioning a TABLE or an INDEX. |
621 | 15 | if (PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE) || |
622 | 15 | PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) { |
623 | 15 | return down_cast<PgCreateTable*>(handle)->AddSplitBoundary(exprs, expr_count); |
624 | 15 | } |
625 | | |
626 | | // Invalid handle. |
627 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
628 | 0 | } |
629 | | |
630 | 1.26k | Status PgApiImpl::ExecCreateTable(PgStatement *handle) { |
631 | 1.26k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) { |
632 | | // Invalid handle. |
633 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
634 | 0 | } |
635 | 1.26k | return down_cast<PgCreateTable*>(handle)->Exec(); |
636 | 1.26k | } |
637 | | |
638 | | Status PgApiImpl::NewAlterTable(const PgObjectId& table_id, |
639 | 661 | PgStatement **handle) { |
640 | 661 | auto stmt = std::make_unique<PgAlterTable>(pg_session_, table_id); |
641 | 661 | if (pg_txn_manager_->IsDdlMode()) { |
642 | 661 | stmt->UseTransaction(); |
643 | 661 | } |
644 | 661 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
645 | 661 | return Status::OK(); |
646 | 661 | } |
647 | | |
648 | | Status PgApiImpl::AlterTableAddColumn(PgStatement *handle, const char *name, |
649 | 69 | int order, const YBCPgTypeEntity *attr_type) { |
650 | 69 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
651 | | // Invalid handle. |
652 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
653 | 0 | } |
654 | | |
655 | 69 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
656 | 69 | return pg_stmt->AddColumn(name, attr_type, order); |
657 | 69 | } |
658 | | |
659 | | Status PgApiImpl::AlterTableRenameColumn(PgStatement *handle, const char *oldname, |
660 | 1 | const char *newname) { |
661 | 1 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
662 | | // Invalid handle. |
663 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
664 | 0 | } |
665 | | |
666 | 1 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
667 | 1 | return pg_stmt->RenameColumn(oldname, newname); |
668 | 1 | } |
669 | | |
670 | 116 | Status PgApiImpl::AlterTableDropColumn(PgStatement *handle, const char *name) { |
671 | 116 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
672 | | // Invalid handle. |
673 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
674 | 0 | } |
675 | | |
676 | 116 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
677 | 116 | return pg_stmt->DropColumn(name); |
678 | 116 | } |
679 | | |
680 | | Status PgApiImpl::AlterTableRenameTable(PgStatement *handle, const char *db_name, |
681 | 42 | const char *newname) { |
682 | 42 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
683 | | // Invalid handle. |
684 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
685 | 0 | } |
686 | | |
687 | 42 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
688 | 42 | return pg_stmt->RenameTable(db_name, newname); |
689 | 42 | } |
690 | | |
691 | 155 | Status PgApiImpl::ExecAlterTable(PgStatement *handle) { |
692 | 155 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) { |
693 | | // Invalid handle. |
694 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
695 | 0 | } |
696 | 155 | PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle); |
697 | 155 | return pg_stmt->Exec(); |
698 | 155 | } |
699 | | |
700 | | Status PgApiImpl::NewDropTable(const PgObjectId& table_id, |
701 | | bool if_exist, |
702 | 1.03k | PgStatement **handle) { |
703 | 1.03k | auto stmt = std::make_unique<PgDropTable>(pg_session_, table_id, if_exist); |
704 | 1.03k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
705 | 1.03k | return Status::OK(); |
706 | 1.03k | } |
707 | | |
708 | | Status PgApiImpl::NewTruncateTable(const PgObjectId& table_id, |
709 | 31 | PgStatement **handle) { |
710 | 31 | auto stmt = std::make_unique<PgTruncateTable>(pg_session_, table_id); |
711 | 31 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
712 | 31 | return Status::OK(); |
713 | 31 | } |
714 | | |
715 | 31 | Status PgApiImpl::ExecTruncateTable(PgStatement *handle) { |
716 | 31 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE_TABLE)) { |
717 | | // Invalid handle. |
718 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
719 | 0 | } |
720 | 31 | return down_cast<PgTruncateTable*>(handle)->Exec(); |
721 | 31 | } |
722 | | |
723 | | Status PgApiImpl::GetTableDesc(const PgObjectId& table_id, |
724 | 5.96M | PgTableDesc **handle) { |
725 | | // First read from memory context. |
726 | 5.96M | size_t hash_id = hash_value(table_id); |
727 | 5.96M | RETURN_NOT_OK(GetTabledescFromCurrentPgMemctx(hash_id, handle)); |
728 | | |
729 | | // Read from environment. |
730 | 5.96M | if (*handle == nullptr) { |
731 | 1.03M | auto result = pg_session_->LoadTable(table_id); |
732 | 1.03M | RETURN_NOT_OK(result); |
733 | 1.03M | RETURN_NOT_OK(AddToCurrentPgMemctx(hash_id, *result)); |
734 | | |
735 | 1.03M | *handle = result->get(); |
736 | 1.03M | } |
737 | | |
738 | 5.96M | return Status::OK(); |
739 | 5.96M | } |
740 | | |
741 | | Result<YBCPgColumnInfo> PgApiImpl::GetColumnInfo(YBCPgTableDesc table_desc, |
742 | 26.2M | int16_t attr_number) { |
743 | 26.2M | return table_desc->GetColumnInfo(attr_number); |
744 | 26.2M | } |
745 | | |
746 | 0 | Status PgApiImpl::DmlModifiesRow(PgStatement *handle, bool *modifies_row) { |
747 | 0 | if (!handle) { |
748 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
749 | 0 | } |
750 | | |
751 | 0 | *modifies_row = false; |
752 | |
|
753 | 0 | switch (handle->stmt_op()) { |
754 | 0 | case StmtOp::STMT_UPDATE: |
755 | 0 | case StmtOp::STMT_DELETE: |
756 | 0 | *modifies_row = true; |
757 | 0 | break; |
758 | 0 | default: |
759 | 0 | break; |
760 | 0 | } |
761 | | |
762 | 0 | return Status::OK(); |
763 | 0 | } |
764 | | |
765 | 0 | Status PgApiImpl::SetIsSysCatalogVersionChange(PgStatement *handle) { |
766 | 0 | if (!handle) { |
767 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
768 | 0 | } |
769 | | |
770 | 0 | switch (handle->stmt_op()) { |
771 | 0 | case StmtOp::STMT_UPDATE: |
772 | 0 | case StmtOp::STMT_DELETE: |
773 | 0 | case StmtOp::STMT_INSERT: |
774 | 0 | down_cast<PgDmlWrite *>(handle)->SetIsSystemCatalogChange(); |
775 | 0 | return Status::OK(); |
776 | 0 | default: |
777 | 0 | break; |
778 | 0 | } |
779 | | |
780 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
781 | 0 | } |
782 | | |
783 | 2.31M | Status PgApiImpl::SetCatalogCacheVersion(PgStatement *handle, uint64_t catalog_cache_version) { |
784 | 2.31M | if (!handle) { |
785 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
786 | 0 | } |
787 | | |
788 | 2.31M | switch (handle->stmt_op()) { |
789 | 191k | case StmtOp::STMT_SELECT: |
790 | 2.03M | case StmtOp::STMT_INSERT: |
791 | 2.20M | case StmtOp::STMT_UPDATE: |
792 | 2.31M | case StmtOp::STMT_DELETE: |
793 | 2.31M | down_cast<PgDml *>(handle)->SetCatalogCacheVersion(catalog_cache_version); |
794 | 2.31M | return Status::OK(); |
795 | 0 | default: |
796 | 0 | break; |
797 | 0 | } |
798 | | |
799 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
800 | 0 | } |
801 | | |
802 | | //-------------------------------------------------------------------------------------------------- |
803 | | |
804 | | Status PgApiImpl::NewCreateIndex(const char *database_name, |
805 | | const char *schema_name, |
806 | | const char *index_name, |
807 | | const PgObjectId& index_id, |
808 | | const PgObjectId& base_table_id, |
809 | | bool is_shared_index, |
810 | | bool is_unique_index, |
811 | | const bool skip_index_backfill, |
812 | | bool if_not_exist, |
813 | | const PgObjectId& tablegroup_oid, |
814 | | const PgObjectId& tablespace_oid, |
815 | 151 | PgStatement **handle) { |
816 | 151 | auto stmt = std::make_unique<PgCreateTable>( |
817 | 151 | pg_session_, database_name, schema_name, index_name, index_id, is_shared_index, |
818 | 151 | if_not_exist, false /* add_primary_key */, |
819 | 149 | tablegroup_oid.IsValid() ? false : true /* colocated */, tablegroup_oid, tablespace_oid, |
820 | 151 | PgObjectId() /* matview_pg_table_id */); |
821 | 151 | stmt->SetupIndex(base_table_id, is_unique_index, skip_index_backfill); |
822 | 151 | if (pg_txn_manager_->IsDdlMode()) { |
823 | 151 | stmt->UseTransaction(); |
824 | 151 | } |
825 | 151 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
826 | 151 | return Status::OK(); |
827 | 151 | } |
828 | | |
829 | | Status PgApiImpl::CreateIndexAddColumn(PgStatement *handle, const char *attr_name, int attr_num, |
830 | | const YBCPgTypeEntity *attr_type, |
831 | | bool is_hash, bool is_range, |
832 | 194 | bool is_desc, bool is_nulls_first) { |
833 | 194 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) { |
834 | | // Invalid handle. |
835 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
836 | 0 | } |
837 | | |
838 | 194 | return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type, |
839 | 194 | is_hash, is_range, is_desc, is_nulls_first); |
840 | 194 | } |
841 | | |
842 | 0 | Status PgApiImpl::CreateIndexSetNumTablets(PgStatement *handle, int32_t num_tablets) { |
843 | 0 | SCHECK(PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX), |
844 | 0 | InvalidArgument, |
845 | 0 | "Invalid statement handle"); |
846 | 0 | return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets); |
847 | 0 | } |
848 | | |
849 | 151 | Status PgApiImpl::ExecCreateIndex(PgStatement *handle) { |
850 | 151 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) { |
851 | | // Invalid handle. |
852 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
853 | 0 | } |
854 | 151 | return down_cast<PgCreateTable*>(handle)->Exec(); |
855 | 151 | } |
856 | | |
857 | | Status PgApiImpl::NewDropIndex(const PgObjectId& index_id, |
858 | | bool if_exist, |
859 | 141 | PgStatement **handle) { |
860 | 141 | auto stmt = std::make_unique<PgDropIndex>(pg_session_, index_id, if_exist); |
861 | 141 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
862 | 141 | return Status::OK(); |
863 | 141 | } |
864 | | |
865 | 1.17k | Status PgApiImpl::ExecPostponedDdlStmt(PgStatement *handle) { |
866 | 1.17k | if (!handle) { |
867 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
868 | 0 | } |
869 | | |
870 | 1.17k | switch (handle->stmt_op()) { |
871 | 1.03k | case StmtOp::STMT_DROP_TABLE: |
872 | 1.03k | return down_cast<PgDropTable*>(handle)->Exec(); |
873 | 141 | case StmtOp::STMT_DROP_INDEX: |
874 | 141 | return down_cast<PgDropIndex*>(handle)->Exec(); |
875 | 1 | case StmtOp::STMT_DROP_TABLEGROUP: |
876 | 1 | return down_cast<PgDropTablegroup*>(handle)->Exec(); |
877 | | |
878 | 0 | default: |
879 | 0 | break; |
880 | 0 | } |
881 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
882 | 0 | } |
883 | | |
884 | 89 | Status PgApiImpl::BackfillIndex(const PgObjectId& table_id) { |
885 | 89 | tserver::PgBackfillIndexRequestPB req; |
886 | 89 | table_id.ToPB(req.mutable_table_id()); |
887 | 89 | return pg_session_->pg_client().BackfillIndex( |
888 | 89 | &req, CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms); |
889 | 89 | } |
890 | | |
891 | | //-------------------------------------------------------------------------------------------------- |
892 | | // DML Statment Support. |
893 | | //-------------------------------------------------------------------------------------------------- |
894 | | |
895 | | // Binding ----------------------------------------------------------------------------------------- |
896 | | |
897 | 7.54M | Status PgApiImpl::DmlAppendTarget(PgStatement *handle, PgExpr *target) { |
898 | 7.54M | return down_cast<PgDml*>(handle)->AppendTarget(target); |
899 | 7.54M | } |
900 | | |
901 | 22 | Status PgApiImpl::DmlAppendQual(PgStatement *handle, PgExpr *qual) { |
902 | 22 | return down_cast<PgDml*>(handle)->AppendQual(qual); |
903 | 22 | } |
904 | | |
905 | 3.94k | Status PgApiImpl::DmlAppendColumnRef(PgStatement *handle, PgExpr *colref) { |
906 | 3.94k | return down_cast<PgDml*>(handle)->AppendColumnRef(colref); |
907 | 3.94k | } |
908 | | |
909 | 9.75M | Status PgApiImpl::DmlBindColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) { |
910 | 9.75M | return down_cast<PgDml*>(handle)->BindColumn(attr_num, attr_value); |
911 | 9.75M | } |
912 | | |
913 | | Status PgApiImpl::DmlBindColumnCondBetween(PgStatement *handle, int attr_num, PgExpr *attr_value, |
914 | 134 | PgExpr *attr_value_end) { |
915 | 134 | return down_cast<PgDmlRead*>(handle)->BindColumnCondBetween(attr_num, attr_value, attr_value_end); |
916 | 134 | } |
917 | | |
918 | | Status PgApiImpl::DmlBindColumnCondIn(PgStatement *handle, int attr_num, int n_attr_values, |
919 | 45.5k | PgExpr **attr_values) { |
920 | 45.5k | return down_cast<PgDmlRead*>(handle)->BindColumnCondIn(attr_num, n_attr_values, attr_values); |
921 | 45.5k | } |
922 | | |
923 | | Status PgApiImpl::DmlBindHashCode(PgStatement *handle, bool start_valid, |
924 | | bool start_inclusive, |
925 | | uint64_t start_hash_val, bool end_valid, |
926 | 0 | bool end_inclusive, uint64_t end_hash_val) { |
927 | 0 | return down_cast<PgDmlRead*>(handle) |
928 | 0 | ->BindHashCode(start_valid, start_inclusive, start_hash_val, |
929 | 0 | end_valid, end_inclusive, end_hash_val); |
930 | 0 | } |
931 | | |
932 | 19 | Status PgApiImpl::DmlBindTable(PgStatement *handle) { |
933 | 19 | return down_cast<PgDml*>(handle)->BindTable(); |
934 | 19 | } |
935 | | |
936 | 1.16M | Result<YBCPgColumnInfo> PgApiImpl::DmlGetColumnInfo(YBCPgStatement handle, int attr_num) { |
937 | 1.16M | return down_cast<PgDml*>(handle)->GetColumnInfo(attr_num); |
938 | 1.16M | } |
939 | | |
940 | 259k | CHECKED_STATUS PgApiImpl::DmlAssignColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) { |
941 | 259k | return down_cast<PgDml*>(handle)->AssignColumn(attr_num, attr_value); |
942 | 259k | } |
943 | | |
944 | | Status PgApiImpl::DmlFetch(PgStatement *handle, int32_t natts, uint64_t *values, bool *isnulls, |
945 | 20.5M | PgSysColumns *syscols, bool *has_data) { |
946 | 20.5M | return down_cast<PgDml*>(handle)->Fetch(natts, values, isnulls, syscols, has_data); |
947 | 20.5M | } |
948 | | |
949 | | Status PgApiImpl::ProcessYBTupleId(const YBCPgYBTupleIdDescriptor& descr, |
950 | 2.07M | const YBTupleIdProcessor& processor) { |
951 | 2.07M | auto target_desc = VERIFY_RESULT(pg_session_->LoadTable( |
952 | 2.07M | PgObjectId(descr.database_oid, descr.table_oid))); |
953 | 2.07M | SCHECK_EQ(descr.nattrs, target_desc->num_key_columns(), Corruption, |
954 | 2.07M | "Number of key components does not match column description"); |
955 | 2.07M | vector<PrimitiveValue> *values = nullptr; |
956 | 2.07M | PgsqlExpressionPB *expr_pb; |
957 | 2.07M | PgsqlExpressionPB temp_expr_pb; |
958 | 2.07M | google::protobuf::RepeatedPtrField<PgsqlExpressionPB> hashed_values; |
959 | 2.07M | vector<docdb::PrimitiveValue> hashed_components, range_components; |
960 | 2.07M | hashed_components.reserve(target_desc->num_hash_key_columns()); |
961 | 2.07M | range_components.reserve(target_desc->num_key_columns() - target_desc->num_hash_key_columns()); |
962 | 2.07M | size_t remain_attr = descr.nattrs; |
963 | | // DocDB API requires that partition columns must be listed in their created-order. |
964 | | // Order from target_desc should be used as attributes sequence may have different order. |
965 | 2.91M | for (size_t i : Range(target_desc->schema().columns().size())) { |
966 | 2.91M | PgColumn column(target_desc->schema(), i); |
967 | 4.15M | for (auto attr = descr.attrs, end = descr.attrs + descr.nattrs; attr != end; ++attr) { |
968 | 4.15M | if (attr->attr_num == column.attr_num()) { |
969 | 2.91M | if (!column.is_primary()) { |
970 | 0 | return STATUS_SUBSTITUTE( |
971 | 0 | InvalidArgument, "Attribute number $0 not a primary attribute", attr->attr_num); |
972 | 0 | } |
973 | 2.91M | if (column.is_partition()) { |
974 | | // Hashed component. |
975 | 1.66M | values = &hashed_components; |
976 | 1.66M | expr_pb = hashed_values.Add(); |
977 | 1.24M | } else { |
978 | | // Range component. |
979 | 1.24M | values = &range_components; |
980 | 1.24M | expr_pb = &temp_expr_pb; |
981 | 1.24M | } |
982 | | |
983 | 2.91M | if (attr->is_null) { |
984 | 117k | values->emplace_back(ValueType::kNullLow); |
985 | 2.79M | } else { |
986 | 2.79M | if (attr->attr_num == to_underlying(PgSystemAttrNum::kYBRowId)) { |
987 | 483k | expr_pb->mutable_value()->set_binary_value(pg_session_->GenerateNewRowid()); |
988 | 2.30M | } else { |
989 | 2.30M | const YBCPgCollationInfo& collation_info = attr->collation_info; |
990 | 2.30M | PgConstant value( |
991 | 2.30M | attr->type_entity, collation_info.collate_is_valid_non_c, |
992 | 2.30M | collation_info.sortkey, attr->datum, false); |
993 | 2.30M | SCHECK_EQ(column.internal_type(), value.internal_type(), Corruption, |
994 | 2.30M | "Attribute value type does not match column type"); |
995 | 2.30M | RETURN_NOT_OK(value.Eval(expr_pb->mutable_value())); |
996 | 2.30M | } |
997 | 2.79M | values->push_back(PrimitiveValue::FromQLValuePB(expr_pb->value(), |
998 | 2.79M | column.desc().sorting_type())); |
999 | 2.79M | } |
1000 | | |
1001 | 2.91M | if (--remain_attr == 0) { |
1002 | 2.07M | SCHECK_EQ(hashed_components.size(), target_desc->num_hash_key_columns(), Corruption, |
1003 | 2.07M | "Number of hashed components does not match column description"); |
1004 | 2.07M | SCHECK_EQ(range_components.size(), |
1005 | 2.07M | target_desc->num_key_columns() - target_desc->num_hash_key_columns(), |
1006 | 2.07M | Corruption, "Number of range components does not match column description"); |
1007 | 2.07M | if (hashed_values.empty()) { |
1008 | 498k | return processor(docdb::DocKey(move(range_components)).Encode()); |
1009 | 498k | } |
1010 | 1.57M | string partition_key; |
1011 | 1.57M | const PartitionSchema& partition_schema = target_desc->partition_schema(); |
1012 | 1.57M | RETURN_NOT_OK(partition_schema.EncodeKey(hashed_values, &partition_key)); |
1013 | 1.57M | const uint16_t hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
1014 | | |
1015 | 1.57M | return processor( |
1016 | 1.57M | docdb::DocKey(hash, move(hashed_components), move(range_components)).Encode()); |
1017 | 839k | } |
1018 | 839k | break; |
1019 | 839k | } |
1020 | 4.15M | } |
1021 | 2.91M | } |
1022 | | |
1023 | 56 | return STATUS_FORMAT(Corruption, "Not all attributes ($0) were resolved", remain_attr); |
1024 | 2.07M | } |
1025 | | |
1026 | 201k | Status PgApiImpl::StartOperationsBuffering() { |
1027 | 201k | return pg_session_->StartOperationsBuffering(); |
1028 | 201k | } |
1029 | | |
1030 | 174k | Status PgApiImpl::StopOperationsBuffering() { |
1031 | 174k | return pg_session_->StopOperationsBuffering(); |
1032 | 174k | } |
1033 | | |
1034 | 48.0k | void PgApiImpl::ResetOperationsBuffering() { |
1035 | 48.0k | pg_session_->ResetOperationsBuffering(); |
1036 | 48.0k | } |
1037 | | |
1038 | 185k | Status PgApiImpl::FlushBufferedOperations() { |
1039 | 185k | return pg_session_->FlushBufferedOperations(); |
1040 | 185k | } |
1041 | | |
1042 | 2.12M | Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count) { |
1043 | 2.12M | switch (handle->stmt_op()) { |
1044 | 1.84M | case StmtOp::STMT_INSERT: |
1045 | 2.01M | case StmtOp::STMT_UPDATE: |
1046 | 2.12M | case StmtOp::STMT_DELETE: |
1047 | 2.12M | case StmtOp::STMT_TRUNCATE: |
1048 | 2.12M | { |
1049 | 2.12M | auto dml_write = down_cast<PgDmlWrite *>(handle); |
1050 | 2.12M | RETURN_NOT_OK(dml_write->Exec(rows_affected_count != nullptr /* force_non_bufferable */)); |
1051 | 2.12M | if (rows_affected_count) { |
1052 | 4.08k | *rows_affected_count = dml_write->GetRowsAffectedCount(); |
1053 | 4.08k | } |
1054 | 2.12M | return Status::OK(); |
1055 | 2.12M | } |
1056 | 0 | default: |
1057 | 0 | break; |
1058 | 0 | } |
1059 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1060 | 0 | } |
1061 | | |
1062 | | // Insert ------------------------------------------------------------------------------------------ |
1063 | | |
1064 | | Status PgApiImpl::NewInsert(const PgObjectId& table_id, |
1065 | | const bool is_single_row_txn, |
1066 | 1.84M | PgStatement **handle) { |
1067 | 1.84M | *handle = nullptr; |
1068 | 1.84M | auto stmt = std::make_unique<PgInsert>(pg_session_, table_id, is_single_row_txn); |
1069 | 1.84M | RETURN_NOT_OK(stmt->Prepare()); |
1070 | 1.84M | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1071 | 1.84M | return Status::OK(); |
1072 | 1.84M | } |
1073 | | |
1074 | 0 | Status PgApiImpl::ExecInsert(PgStatement *handle) { |
1075 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1076 | | // Invalid handle. |
1077 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1078 | 0 | } |
1079 | 0 | return down_cast<PgInsert*>(handle)->Exec(); |
1080 | 0 | } |
1081 | | |
1082 | 228k | Status PgApiImpl::InsertStmtSetUpsertMode(PgStatement *handle) { |
1083 | 228k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1084 | | // Invalid handle. |
1085 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1086 | 0 | } |
1087 | 228k | down_cast<PgInsert*>(handle)->SetUpsertMode(); |
1088 | | |
1089 | 228k | return Status::OK(); |
1090 | 228k | } |
1091 | | |
1092 | 103k | Status PgApiImpl::InsertStmtSetWriteTime(PgStatement *handle, const HybridTime write_time) { |
1093 | 103k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1094 | | // Invalid handle. |
1095 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1096 | 0 | } |
1097 | 103k | RETURN_NOT_OK(down_cast<PgInsert*>(handle)->SetWriteTime(write_time)); |
1098 | 103k | return Status::OK(); |
1099 | 103k | } |
1100 | | |
1101 | 103k | Status PgApiImpl::InsertStmtSetIsBackfill(PgStatement *handle, const bool is_backfill) { |
1102 | 103k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { |
1103 | | // Invalid handle. |
1104 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1105 | 0 | } |
1106 | 103k | down_cast<PgInsert*>(handle)->SetIsBackfill(is_backfill); |
1107 | 103k | return Status::OK(); |
1108 | 103k | } |
1109 | | |
1110 | | // Update ------------------------------------------------------------------------------------------ |
1111 | | |
1112 | | Status PgApiImpl::NewUpdate(const PgObjectId& table_id, |
1113 | | const bool is_single_row_txn, |
1114 | 166k | PgStatement **handle) { |
1115 | 166k | *handle = nullptr; |
1116 | 166k | auto stmt = std::make_unique<PgUpdate>(pg_session_, table_id, is_single_row_txn); |
1117 | 166k | RETURN_NOT_OK(stmt->Prepare()); |
1118 | 166k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1119 | 166k | return Status::OK(); |
1120 | 166k | } |
1121 | | |
1122 | 0 | Status PgApiImpl::ExecUpdate(PgStatement *handle) { |
1123 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_UPDATE)) { |
1124 | | // Invalid handle. |
1125 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1126 | 0 | } |
1127 | 0 | return down_cast<PgUpdate*>(handle)->Exec(); |
1128 | 0 | } |
1129 | | |
1130 | | // Delete ------------------------------------------------------------------------------------------ |
1131 | | |
1132 | | Status PgApiImpl::NewDelete(const PgObjectId& table_id, |
1133 | | const bool is_single_row_txn, |
1134 | 116k | PgStatement **handle) { |
1135 | 116k | *handle = nullptr; |
1136 | 116k | auto stmt = std::make_unique<PgDelete>(pg_session_, table_id, is_single_row_txn); |
1137 | 116k | RETURN_NOT_OK(stmt->Prepare()); |
1138 | 116k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1139 | 116k | return Status::OK(); |
1140 | 116k | } |
1141 | | |
1142 | 0 | Status PgApiImpl::ExecDelete(PgStatement *handle) { |
1143 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) { |
1144 | | // Invalid handle. |
1145 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1146 | 0 | } |
1147 | 0 | return down_cast<PgDelete*>(handle)->Exec(); |
1148 | 0 | } |
1149 | | |
1150 | 79 | Status PgApiImpl::NewSample(const PgObjectId& table_id, const int targrows, PgStatement **handle) { |
1151 | 79 | *handle = nullptr; |
1152 | 79 | auto sample = std::make_unique<PgSample>(pg_session_, targrows, table_id); |
1153 | 79 | RETURN_NOT_OK(sample->Prepare()); |
1154 | 79 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(sample), handle)); |
1155 | 79 | return Status::OK(); |
1156 | 79 | } |
1157 | | |
1158 | 79 | Status PgApiImpl::InitRandomState(PgStatement *handle, double rstate_w, uint64 rand_state) { |
1159 | 79 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1160 | | // Invalid handle. |
1161 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1162 | 0 | } |
1163 | 79 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->InitRandomState(rstate_w, rand_state)); |
1164 | 79 | return Status::OK(); |
1165 | 79 | } |
1166 | | |
1167 | 270 | Status PgApiImpl::SampleNextBlock(PgStatement *handle, bool *has_more) { |
1168 | 270 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1169 | | // Invalid handle. |
1170 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1171 | 0 | } |
1172 | 270 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->SampleNextBlock(has_more)); |
1173 | 270 | return Status::OK(); |
1174 | 270 | } |
1175 | | |
1176 | 79 | Status PgApiImpl::ExecSample(PgStatement *handle) { |
1177 | 79 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1178 | | // Invalid handle. |
1179 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1180 | 0 | } |
1181 | 79 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->Exec(nullptr)); |
1182 | 79 | return Status::OK(); |
1183 | 79 | } |
1184 | | |
1185 | 79 | Status PgApiImpl::GetEstimatedRowCount(PgStatement *handle, double *liverows, double *deadrows) { |
1186 | 79 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) { |
1187 | | // Invalid handle. |
1188 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1189 | 0 | } |
1190 | 79 | RETURN_NOT_OK(down_cast<PgSample*>(handle)->GetEstimatedRowCount(liverows, deadrows)); |
1191 | 79 | return Status::OK(); |
1192 | 79 | } |
1193 | | |
1194 | 0 | Status PgApiImpl::DeleteStmtSetIsPersistNeeded(PgStatement *handle, const bool is_persist_needed) { |
1195 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) { |
1196 | | // Invalid handle. |
1197 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1198 | 0 | } |
1199 | 0 | down_cast<PgDelete*>(handle)->SetIsPersistNeeded(is_persist_needed); |
1200 | 0 | return Status::OK(); |
1201 | 0 | } |
1202 | | |
1203 | | // Colocated Truncate ------------------------------------------------------------------------------ |
1204 | | |
1205 | | Status PgApiImpl::NewTruncateColocated(const PgObjectId& table_id, |
1206 | | const bool is_single_row_txn, |
1207 | 19 | PgStatement **handle) { |
1208 | 19 | *handle = nullptr; |
1209 | 19 | auto stmt = std::make_unique<PgTruncateColocated>(pg_session_, table_id, is_single_row_txn); |
1210 | 19 | RETURN_NOT_OK(stmt->Prepare()); |
1211 | 19 | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1212 | 19 | return Status::OK(); |
1213 | 19 | } |
1214 | | |
1215 | 0 | Status PgApiImpl::ExecTruncateColocated(PgStatement *handle) { |
1216 | 0 | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE)) { |
1217 | | // Invalid handle. |
1218 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1219 | 0 | } |
1220 | 0 | return down_cast<PgTruncateColocated*>(handle)->Exec(); |
1221 | 0 | } |
1222 | | |
1223 | | // Select ------------------------------------------------------------------------------------------ |
1224 | | |
1225 | | Status PgApiImpl::NewSelect(const PgObjectId& table_id, |
1226 | | const PgObjectId& index_id, |
1227 | | const PgPrepareParameters *prepare_params, |
1228 | 561k | PgStatement **handle) { |
1229 | | // Scenarios: |
1230 | | // - Sequential Scan: PgSelect to read from table_id. |
1231 | | // - Primary Scan: PgSelect from table_id. YugaByte does not have separate table for primary key. |
1232 | | // - Index-Only-Scan: PgSelectIndex directly from secondary index_id. |
1233 | | // - IndexScan: Use PgSelectIndex to read from index_id and then PgSelect to read from table_id. |
1234 | | // Note that for SysTable, only one request is send for both table_id and index_id. |
1235 | 561k | *handle = nullptr; |
1236 | 561k | std::unique_ptr<PgDmlRead> stmt; |
1237 | 561k | if (prepare_params && prepare_params->index_only_scan && prepare_params->use_secondary_index) { |
1238 | 44.0k | if (!index_id.IsValid()) { |
1239 | 0 | return STATUS(InvalidArgument, "Cannot run query with invalid index ID"); |
1240 | 0 | } |
1241 | 44.0k | stmt = std::make_unique<PgSelectIndex>(pg_session_, table_id, index_id, prepare_params); |
1242 | 517k | } else { |
1243 | | // For IndexScan PgSelect processing will create subquery PgSelectIndex. |
1244 | 517k | stmt = std::make_unique<PgSelect>(pg_session_, table_id, index_id, prepare_params); |
1245 | 517k | } |
1246 | | |
1247 | 561k | RETURN_NOT_OK(stmt->Prepare()); |
1248 | 561k | RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle)); |
1249 | 561k | return Status::OK(); |
1250 | 561k | } |
1251 | | |
1252 | 526k | Status PgApiImpl::SetForwardScan(PgStatement *handle, bool is_forward_scan) { |
1253 | 526k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) { |
1254 | | // Invalid handle. |
1255 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1256 | 0 | } |
1257 | 526k | down_cast<PgDmlRead*>(handle)->SetForwardScan(is_forward_scan); |
1258 | 526k | return Status::OK(); |
1259 | 526k | } |
1260 | | |
1261 | 560k | Status PgApiImpl::ExecSelect(PgStatement *handle, const PgExecParameters *exec_params) { |
1262 | 560k | if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) { |
1263 | | // Invalid handle. |
1264 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1265 | 0 | } |
1266 | 560k | return down_cast<PgDmlRead*>(handle)->Exec(exec_params); |
1267 | 560k | } |
1268 | | |
1269 | | //-------------------------------------------------------------------------------------------------- |
1270 | | // Expressions. |
1271 | | //-------------------------------------------------------------------------------------------------- |
1272 | | |
1273 | | // Column references ------------------------------------------------------------------------------- |
1274 | | |
1275 | | Status PgApiImpl::NewColumnRef( |
1276 | | PgStatement *stmt, int attr_num, const PgTypeEntity *type_entity, bool collate_is_valid_non_c, |
1277 | 7.55M | const PgTypeAttrs *type_attrs, PgExpr **expr_handle) { |
1278 | 7.55M | if (!stmt) { |
1279 | | // Invalid handle. |
1280 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1281 | 0 | } |
1282 | 7.55M | PgColumnRef::SharedPtr colref = |
1283 | 7.55M | make_shared<PgColumnRef>(attr_num, type_entity, collate_is_valid_non_c, type_attrs); |
1284 | 7.55M | stmt->AddExpr(colref); |
1285 | | |
1286 | 7.55M | *expr_handle = colref.get(); |
1287 | 7.55M | return Status::OK(); |
1288 | 7.55M | } |
1289 | | |
1290 | | // Constant ---------------------------------------------------------------------------------------- |
1291 | | Status PgApiImpl::NewConstant( |
1292 | | YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c, |
1293 | 11.3M | const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle) { |
1294 | 11.3M | if (!stmt) { |
1295 | | // Invalid handle. |
1296 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1297 | 0 | } |
1298 | 11.3M | PgExpr::SharedPtr pg_const = |
1299 | 11.3M | make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey, |
1300 | 11.3M | datum, is_null); |
1301 | 11.3M | stmt->AddExpr(pg_const); |
1302 | | |
1303 | 11.3M | *expr_handle = pg_const.get(); |
1304 | 11.3M | return Status::OK(); |
1305 | 11.3M | } |
1306 | | |
1307 | | Status PgApiImpl::NewConstantVirtual( |
1308 | | YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, |
1309 | 8 | YBCPgDatumKind datum_kind, YBCPgExpr *expr_handle) { |
1310 | 8 | if (!stmt) { |
1311 | | // Invalid handle. |
1312 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1313 | 0 | } |
1314 | 8 | PgExpr::SharedPtr pg_const = |
1315 | 8 | make_shared<PgConstant>(type_entity, false /* collate_is_valid_non_c */, datum_kind); |
1316 | 8 | stmt->AddExpr(pg_const); |
1317 | | |
1318 | 8 | *expr_handle = pg_const.get(); |
1319 | 8 | return Status::OK(); |
1320 | 8 | } |
1321 | | |
1322 | | Status PgApiImpl::NewConstantOp( |
1323 | | YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c, |
1324 | | const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle, |
1325 | 0 | bool is_gt) { |
1326 | 0 | if (!stmt) { |
1327 | | // Invalid handle. |
1328 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1329 | 0 | } |
1330 | 0 | PgExpr::SharedPtr pg_const = |
1331 | 0 | make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey, |
1332 | 0 | datum, is_null, is_gt ? PgExpr::Opcode::PG_EXPR_GT : PgExpr::Opcode::PG_EXPR_LT); |
1333 | 0 | stmt->AddExpr(pg_const); |
1334 | |
|
1335 | 0 | *expr_handle = pg_const.get(); |
1336 | 0 | return Status::OK(); |
1337 | 0 | } |
1338 | | |
1339 | | // Text constant ----------------------------------------------------------------------------------- |
1340 | | |
1341 | 0 | Status PgApiImpl::UpdateConstant(PgExpr *expr, const char *value, bool is_null) { |
1342 | 0 | if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) { |
1343 | | // Invalid handle. |
1344 | 0 | return STATUS(InvalidArgument, "Invalid expression handle for constant"); |
1345 | 0 | } |
1346 | 0 | down_cast<PgConstant*>(expr)->UpdateConstant(value, is_null); |
1347 | 0 | return Status::OK(); |
1348 | 0 | } |
1349 | | |
1350 | 0 | Status PgApiImpl::UpdateConstant(PgExpr *expr, const void *value, int64_t bytes, bool is_null) { |
1351 | 0 | if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) { |
1352 | | // Invalid handle. |
1353 | 0 | return STATUS(InvalidArgument, "Invalid expression handle for constant"); |
1354 | 0 | } |
1355 | 0 | down_cast<PgConstant*>(expr)->UpdateConstant(value, bytes, is_null); |
1356 | 0 | return Status::OK(); |
1357 | 0 | } |
1358 | | |
1359 | | // Text constant ----------------------------------------------------------------------------------- |
1360 | | |
1361 | | Status PgApiImpl::NewOperator( |
1362 | | PgStatement *stmt, const char *opname, const YBCPgTypeEntity *type_entity, |
1363 | 5.98k | bool collate_is_valid_non_c, PgExpr **op_handle) { |
1364 | 5.98k | if (!stmt) { |
1365 | | // Invalid handle. |
1366 | 0 | return STATUS(InvalidArgument, "Invalid statement handle"); |
1367 | 0 | } |
1368 | 5.98k | RETURN_NOT_OK(PgExpr::CheckOperatorName(opname)); |
1369 | | |
1370 | | // Create operator. |
1371 | 5.98k | PgExpr::SharedPtr pg_op = make_shared<PgOperator>(opname, type_entity, collate_is_valid_non_c); |
1372 | 5.98k | stmt->AddExpr(pg_op); |
1373 | | |
1374 | 5.98k | *op_handle = pg_op.get(); |
1375 | 5.98k | return Status::OK(); |
1376 | 5.98k | } |
1377 | | |
1378 | 5.98k | Status PgApiImpl::OperatorAppendArg(PgExpr *op_handle, PgExpr *arg) { |
1379 | 5.98k | if (!op_handle || !arg) { |
1380 | | // Invalid handle. |
1381 | 0 | return STATUS(InvalidArgument, "Invalid expression handle"); |
1382 | 0 | } |
1383 | 5.98k | down_cast<PgOperator*>(op_handle)->AppendArg(arg); |
1384 | 5.98k | return Status::OK(); |
1385 | 5.98k | } |
1386 | | |
1387 | 0 | Result<bool> PgApiImpl::IsInitDbDone() { |
1388 | 0 | return pg_session_->IsInitDbDone(); |
1389 | 0 | } |
1390 | | |
1391 | 158k | Result<uint64_t> PgApiImpl::GetSharedCatalogVersion() { |
1392 | 158k | return pg_session_->GetSharedCatalogVersion(); |
1393 | 158k | } |
1394 | | |
1395 | 235 | Result<uint64_t> PgApiImpl::GetSharedAuthKey() { |
1396 | 235 | return pg_session_->GetSharedAuthKey(); |
1397 | 235 | } |
1398 | | |
1399 | | // Transaction Control ----------------------------------------------------------------------------- |
1400 | 160k | Status PgApiImpl::BeginTransaction() { |
1401 | 160k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1402 | 160k | return pg_txn_manager_->BeginTransaction(); |
1403 | 160k | } |
1404 | | |
1405 | 20.2k | Status PgApiImpl::RecreateTransaction() { |
1406 | 20.2k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1407 | 20.2k | pg_session_->DropBufferedOperations(); |
1408 | 20.2k | return pg_txn_manager_->RecreateTransaction(); |
1409 | 20.2k | } |
1410 | | |
1411 | 0 | Status PgApiImpl::RestartTransaction() { |
1412 | 0 | pg_session_->InvalidateForeignKeyReferenceCache(); |
1413 | 0 | pg_session_->DropBufferedOperations(); |
1414 | 0 | return pg_txn_manager_->RestartTransaction(); |
1415 | 0 | } |
1416 | | |
1417 | 24.3k | Status PgApiImpl::ResetTransactionReadPoint() { |
1418 | 24.3k | return pg_txn_manager_->ResetTransactionReadPoint(); |
1419 | 24.3k | } |
1420 | | |
1421 | 1 | Status PgApiImpl::RestartReadPoint() { |
1422 | 1 | return pg_txn_manager_->RestartReadPoint(); |
1423 | 1 | } |
1424 | | |
1425 | 155k | Status PgApiImpl::CommitTransaction() { |
1426 | 155k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1427 | 155k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1428 | 155k | return pg_txn_manager_->CommitTransaction(); |
1429 | 155k | } |
1430 | | |
1431 | 11.6k | Status PgApiImpl::AbortTransaction() { |
1432 | 11.6k | pg_session_->InvalidateForeignKeyReferenceCache(); |
1433 | 11.6k | pg_session_->DropBufferedOperations(); |
1434 | 11.6k | return pg_txn_manager_->AbortTransaction(); |
1435 | 11.6k | } |
1436 | | |
1437 | 161k | Status PgApiImpl::SetTransactionIsolationLevel(int isolation) { |
1438 | 161k | return pg_txn_manager_->SetPgIsolationLevel(isolation); |
1439 | 161k | } |
1440 | | |
1441 | 160k | Status PgApiImpl::SetTransactionReadOnly(bool read_only) { |
1442 | 160k | return pg_txn_manager_->SetReadOnly(read_only); |
1443 | 160k | } |
1444 | | |
1445 | 160k | Status PgApiImpl::EnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms) { |
1446 | 160k | return pg_txn_manager_->EnableFollowerReads(enable_follower_reads, staleness_ms); |
1447 | 160k | } |
1448 | | |
1449 | 160k | Status PgApiImpl::SetTransactionDeferrable(bool deferrable) { |
1450 | 160k | return pg_txn_manager_->SetDeferrable(deferrable); |
1451 | 160k | } |
1452 | | |
1453 | 6.33k | Status PgApiImpl::EnterSeparateDdlTxnMode() { |
1454 | | // Flush all buffered operations as ddl txn use its own transaction session. |
1455 | 6.33k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1456 | 6.33k | return pg_txn_manager_->EnterSeparateDdlTxnMode(); |
1457 | 6.33k | } |
1458 | | |
1459 | 5.94k | Status PgApiImpl::ExitSeparateDdlTxnMode() { |
1460 | | // Flush all buffered operations as ddl txn use its own transaction session. |
1461 | 5.94k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1462 | 5.94k | RETURN_NOT_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kTrue)); |
1463 | | // Next reads from catalog tables have to see changes made by the DDL transaction. |
1464 | 5.94k | ResetCatalogReadTime(); |
1465 | 5.94k | return Status::OK(); |
1466 | 5.94k | } |
1467 | | |
1468 | 421 | void PgApiImpl::ClearSeparateDdlTxnMode() { |
1469 | 421 | pg_session_->DropBufferedOperations(); |
1470 | 421 | CHECK_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kFalse)); |
1471 | 421 | } |
1472 | | |
1473 | 48.8k | Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) { |
1474 | 48.8k | RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); |
1475 | 48.8k | return pg_session_->SetActiveSubTransaction(id); |
1476 | 48.8k | } |
1477 | | |
1478 | 23.5k | Status PgApiImpl::RollbackSubTransaction(SubTransactionId id) { |
1479 | 23.5k | pg_session_->DropBufferedOperations(); |
1480 | 23.5k | return pg_session_->RollbackSubTransaction(id); |
1481 | 23.5k | } |
1482 | | |
1483 | 765k | void PgApiImpl::ResetCatalogReadTime() { |
1484 | 765k | pg_session_->ResetCatalogReadPoint(); |
1485 | 765k | } |
1486 | | |
1487 | | Result<bool> PgApiImpl::ForeignKeyReferenceExists( |
1488 | 232k | PgOid table_id, const Slice& ybctid, PgOid database_id) { |
1489 | 232k | return pg_session_->ForeignKeyReferenceExists( |
1490 | 232k | table_id, ybctid, std::bind(FetchExistingYbctids, |
1491 | 232k | pg_session_, |
1492 | 232k | database_id, |
1493 | 232k | std::placeholders::_1, |
1494 | 232k | std::placeholders::_2)); |
1495 | 232k | } |
1496 | | |
1497 | 240k | void PgApiImpl::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) { |
1498 | 240k | pg_session_->AddForeignKeyReferenceIntent(table_id, ybctid); |
1499 | 240k | } |
1500 | | |
1501 | 62.9k | void PgApiImpl::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
1502 | 62.9k | pg_session_->DeleteForeignKeyReference(table_id, ybctid); |
1503 | 62.9k | } |
1504 | | |
1505 | 1.51M | void PgApiImpl::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) { |
1506 | 1.51M | pg_session_->AddForeignKeyReference(table_id, ybctid); |
1507 | 1.51M | } |
1508 | | |
1509 | 0 | void PgApiImpl::SetTimeout(const int timeout_ms) { |
1510 | 0 | pg_session_->SetTimeout(timeout_ms); |
1511 | 0 | } |
1512 | | |
1513 | 2 | Result<client::TabletServersInfo> PgApiImpl::ListTabletServers() { |
1514 | 2 | return pg_session_->ListTabletServers(); |
1515 | 2 | } |
1516 | | |
1517 | 0 | Status PgApiImpl::ValidatePlacement(const char *placement_info) { |
1518 | 0 | return pg_session_->ValidatePlacement(placement_info); |
1519 | 0 | } |
1520 | | |
1521 | | } // namespace pggate |
1522 | | } // namespace yb |