YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pggate.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//--------------------------------------------------------------------------------------------------
14
15
#include "yb/yql/pggate/pggate.h"
16
17
#include <boost/optional.hpp>
18
19
#include "yb/client/client_fwd.h"
20
#include "yb/client/client.h"
21
#include "yb/client/client_utils.h"
22
#include "yb/client/tablet_server.h"
23
24
#include "yb/common/partition.h"
25
#include "yb/common/pg_system_attr.h"
26
#include "yb/common/schema.h"
27
28
#include "yb/docdb/doc_key.h"
29
#include "yb/docdb/primitive_value.h"
30
#include "yb/docdb/value_type.h"
31
32
#include "yb/gutil/casts.h"
33
34
#include "yb/rpc/messenger.h"
35
#include "yb/rpc/proxy.h"
36
#include "yb/rpc/secure_stream.h"
37
38
#include "yb/server/secure.h"
39
40
#include "yb/tserver/tserver_forward_service.proxy.h"
41
#include "yb/tserver/tserver_shared_mem.h"
42
43
#include "yb/util/format.h"
44
#include "yb/util/range.h"
45
#include "yb/util/shared_mem.h"
46
#include "yb/util/status_format.h"
47
#include "yb/util/status_log.h"
48
49
#include "yb/yql/pggate/pg_ddl.h"
50
#include "yb/yql/pggate/pg_delete.h"
51
#include "yb/yql/pggate/pg_insert.h"
52
#include "yb/yql/pggate/pg_memctx.h"
53
#include "yb/yql/pggate/pg_sample.h"
54
#include "yb/yql/pggate/pg_select.h"
55
#include "yb/yql/pggate/pg_truncate_colocated.h"
56
#include "yb/yql/pggate/pg_txn_manager.h"
57
#include "yb/yql/pggate/pg_update.h"
58
#include "yb/yql/pggate/pggate_flags.h"
59
#include "yb/yql/pggate/ybc_pggate.h"
60
61
using namespace std::literals;
62
63
DECLARE_string(rpc_bind_addresses);
64
DECLARE_bool(use_node_to_node_encryption);
65
DECLARE_string(certs_dir);
66
DECLARE_bool(node_to_node_encryption_use_client_certificates);
67
DECLARE_bool(ysql_forward_rpcs_to_local_tserver);
68
DECLARE_bool(use_node_hostname_for_local_tserver);
69
DECLARE_int32(backfill_index_client_rpc_timeout_ms);
70
71
namespace yb {
72
namespace pggate {
73
74
using docdb::PrimitiveValue;
75
using docdb::ValueType;
76
77
namespace {
78
79
CHECKED_STATUS AddColumn(PgCreateTable* pg_stmt, const char *attr_name, int attr_num,
80
                         const YBCPgTypeEntity *attr_type, bool is_hash, bool is_range,
81
3.04k
                         bool is_desc, bool is_nulls_first) {
82
3.04k
  using SortingType = SortingType;
83
3.04k
  SortingType sorting_type = SortingType::kNotSpecified;
84
85
3.04k
  if (!is_hash && is_range) {
86
227
    if (is_desc) {
87
16
      sorting_type = is_nulls_first ? SortingType::kDescending : SortingType::kDescendingNullsLast;
88
209
    } else {
89
209
      sorting_type = is_nulls_first ? SortingType::kAscending : SortingType::kAscendingNullsLast;
90
209
    }
91
227
  }
92
93
3.04k
  return pg_stmt->AddColumn(attr_name, attr_num, attr_type, is_hash, is_range, sorting_type);
94
3.04k
}
95
96
Result<PgApiContext::MessengerHolder> BuildMessenger(
97
    const string& client_name,
98
    int32_t num_reactors,
99
    const scoped_refptr<MetricEntity>& metric_entity,
100
1.65k
    const std::shared_ptr<MemTracker>& parent_mem_tracker) {
101
1.65k
  std::unique_ptr<rpc::SecureContext> secure_context;
102
1.65k
  if (FLAGS_use_node_to_node_encryption) {
103
0
    secure_context = VERIFY_RESULT(server::CreateSecureContext(
104
0
        FLAGS_certs_dir,
105
0
        server::UseClientCerts(FLAGS_node_to_node_encryption_use_client_certificates)));
106
0
  }
107
1.65k
  auto messenger = VERIFY_RESULT(client::CreateClientMessenger(
108
1.65k
      client_name, num_reactors, metric_entity, parent_mem_tracker, secure_context.get()));
109
1.65k
  return PgApiContext::MessengerHolder{std::move(secure_context), std::move(messenger)};
110
1.65k
}
111
112
1.65k
std::unique_ptr<tserver::TServerSharedObject> InitTServerSharedObject() {
113
1.65k
  LOG(INFO) << __func__ << ": " << YBCIsInitDbModeEnvVarSet() << ", "
114
1.65k
            << FLAGS_TEST_pggate_ignore_tserver_shm << ", " << FLAGS_pggate_tserver_shm_fd;
115
  // Do not use shared memory in initdb or if explicity set to be ignored.
116
1.65k
  if (FLAGS_TEST_pggate_ignore_tserver_shm || FLAGS_pggate_tserver_shm_fd == -1) {
117
0
    return nullptr;
118
0
  }
119
1.65k
  return std::make_unique<tserver::TServerSharedObject>(CHECK_RESULT(
120
1.65k
      tserver::TServerSharedObject::OpenReadOnly(FLAGS_pggate_tserver_shm_fd)));
121
1.65k
}
122
123
Result<std::vector<std::string>> FetchExistingYbctids(PgSession::ScopedRefPtr session,
124
                                                      PgOid database_id,
125
                                                      PgOid table_id,
126
525
                                                      const std::vector<Slice>& ybctids) {
127
525
  auto desc  = VERIFY_RESULT(session->LoadTable(PgObjectId(database_id, table_id)));
128
525
  PgTable target(desc);
129
525
  auto read_op = std::make_shared<PgsqlReadOp>(*target);
130
525
  PgsqlExpressionPB* expr_pb = read_op->read_request().add_targets();
131
525
  expr_pb->set_column_id(to_underlying(PgSystemAttrNum::kYBTupleId));
132
525
  auto doc_op = std::make_shared<PgDocReadOp>(session, &target, std::move(read_op));
133
134
  // Postgres uses SELECT FOR KEY SHARE query for FK check.
135
  // Use same lock level.
136
525
  PgExecParameters exec_params = doc_op->ExecParameters();
137
525
  exec_params.rowmark = ROW_MARK_KEYSHARE;
138
525
  RETURN_NOT_OK(doc_op->ExecuteInit(&exec_params));
139
525
  RETURN_NOT_OK(doc_op->PopulateDmlByYbctidOps(ybctids));
140
525
  RETURN_NOT_OK(doc_op->Execute());
141
525
  std::vector<std::string> result;
142
525
  result.reserve(ybctids.size());
143
525
  std::list<PgDocResult> rowsets;
144
1.04k
  do {
145
1.04k
    rowsets.clear();
146
1.04k
    RETURN_NOT_OK(doc_op->GetResult(&rowsets));
147
1.03k
    for (auto& row : rowsets) {
148
767
      RETURN_NOT_OK(row.ProcessSystemColumns());
149
100k
      for (const auto& ybctid : row.ybctids()) {
150
100k
        result.push_back(ybctid.ToBuffer());
151
100k
      }
152
767
    }
153
1.03k
  } while (!rowsets.empty());
154
518
  return result;
155
525
}
156
157
} // namespace
158
159
using std::make_shared;
160
using client::YBSession;
161
162
//--------------------------------------------------------------------------------------------------
163
164
1.65k
PggateOptions::PggateOptions() : ServerBaseOptions(kDefaultPort) {
165
1.65k
  server_type = "tserver";
166
1.65k
  rpc_opts.connection_keepalive_time_ms = FLAGS_pgsql_rpc_keepalive_time_ms;
167
168
1.65k
  if (FLAGS_pggate_proxy_bind_address.empty()) {
169
1.65k
    HostPort host_port;
170
1.65k
    CHECK_OK(host_port.ParseString(FLAGS_rpc_bind_addresses, 0));
171
1.65k
    host_port.set_port(PggateOptions::kDefaultPort);
172
1.65k
    FLAGS_pggate_proxy_bind_address = host_port.ToString();
173
1.65k
    LOG(INFO) << "Reset YSQL bind address to " << FLAGS_pggate_proxy_bind_address;
174
1.65k
  }
175
1.65k
  rpc_opts.rpc_bind_addresses = FLAGS_pggate_proxy_bind_address;
176
1.65k
  master_addresses_flag = FLAGS_pggate_master_addresses;
177
178
1.65k
  server::MasterAddresses master_addresses;
179
  // TODO: we might have to allow setting master_replication_factor similarly to how it is done
180
  // in tserver to support master auto-discovery on Kubernetes.
181
1.65k
  CHECK_OK(server::DetermineMasterAddresses(
182
1.65k
      "pggate_master_addresses", master_addresses_flag, /* master_replication_factor */ 0,
183
1.65k
      &master_addresses, &master_addresses_flag));
184
1.65k
  SetMasterAddresses(make_shared<server::MasterAddresses>(std::move(master_addresses)));
185
1.65k
}
186
187
PgApiContext::MessengerHolder::MessengerHolder(
188
    std::unique_ptr<rpc::SecureContext> security_context_,
189
    std::unique_ptr<rpc::Messenger> messenger_)
190
1.65k
    : security_context(std::move(security_context_)), messenger(std::move(messenger_)) {
191
1.65k
}
192
193
PgApiContext::MessengerHolder::MessengerHolder(MessengerHolder&& rhs)
194
    : security_context(std::move(rhs.security_context)),
195
4.95k
      messenger(std::move(rhs.messenger)) {
196
4.95k
}
197
198
6.60k
PgApiContext::MessengerHolder::~MessengerHolder() {
199
6.60k
}
200
201
PgApiContext::PgApiContext()
202
    : metric_registry(new MetricRegistry()),
203
      metric_entity(METRIC_ENTITY_server.Instantiate(metric_registry.get(), "yb.pggate")),
204
      mem_tracker(MemTracker::CreateTracker("PostgreSQL")),
205
      messenger_holder(CHECK_RESULT(BuildMessenger("pggate_ybclient",
206
                                                   FLAGS_pggate_ybclient_reactor_threads,
207
                                                   metric_entity,
208
                                                   mem_tracker))),
209
1.65k
      proxy_cache(std::make_unique<rpc::ProxyCache>(messenger_holder.messenger.get())) {
210
1.65k
}
211
212
0
PgApiContext::PgApiContext(PgApiContext&&) = default;
213
214
1.65k
PgApiContext::~PgApiContext() = default;
215
216
//--------------------------------------------------------------------------------------------------
217
218
PgApiImpl::PgApiImpl(
219
    PgApiContext context, const YBCPgTypeEntity *YBCDataTypeArray, int count,
220
    YBCPgCallbacks callbacks)
221
    : metric_registry_(std::move(context.metric_registry)),
222
      metric_entity_(std::move(context.metric_entity)),
223
      mem_tracker_(std::move(context.mem_tracker)),
224
      messenger_holder_(std::move(context.messenger_holder)),
225
      async_client_init_(messenger_holder_.messenger.get()->name(),
226
                         FLAGS_pggate_ybclient_reactor_threads,
227
                         FLAGS_pggate_rpc_timeout_secs,
228
                         "" /* tserver_uuid */,
229
                         &pggate_options_,
230
                         metric_entity_,
231
                         mem_tracker_,
232
                         messenger_holder_.messenger.get()),
233
      proxy_cache_(std::move(context.proxy_cache)),
234
      clock_(new server::HybridClock()),
235
      tserver_shared_object_(InitTServerSharedObject()),
236
      pg_callbacks_(callbacks),
237
      pg_txn_manager_(
238
          new PgTxnManager(
239
1.65k
              &pg_client_, clock_, tserver_shared_object_.get(), pg_callbacks_)) {
240
1.65k
  CHECK_OK(clock_->Init());
241
242
  // Setup type mapping.
243
269k
  for (int idx = 0; idx < count; idx++) {
244
267k
    const YBCPgTypeEntity *type_entity = &YBCDataTypeArray[idx];
245
267k
    type_map_[type_entity->type_oid] = type_entity;
246
267k
  }
247
1.65k
  if (FLAGS_ysql_forward_rpcs_to_local_tserver) {
248
0
    async_client_init_.AddPostCreateHook([this](client::YBClient *client) {
249
0
      const auto& tserver_shared_data = **tserver_shared_object_;
250
0
      HostPort host_port(tserver_shared_data.endpoint());
251
0
      MonoDelta resolve_cache_timeout;
252
0
      if (FLAGS_use_node_hostname_for_local_tserver) {
253
0
        host_port = HostPort(tserver_shared_data.host().ToBuffer(),
254
0
                             tserver_shared_data.endpoint().port());
255
0
        resolve_cache_timeout = MonoDelta::kMax;
256
0
      }
257
0
      auto proxy = std::make_shared<tserver::TabletServerForwardServiceProxy>(
258
0
          &client->proxy_cache(), host_port, nullptr /* protocol */, resolve_cache_timeout);
259
0
      client->SetNodeLocalForwardProxy(proxy);
260
0
      client->SetNodeLocalTServerHostPort(host_port);
261
0
    });
262
0
  }
263
1.65k
  async_client_init_.Start();
264
265
1.65k
  CHECK_OK(pg_client_.Start(
266
1.65k
      proxy_cache_.get(), &messenger_holder_.messenger->scheduler(),
267
1.65k
      *DCHECK_NOTNULL(tserver_shared_object_)));
268
1.65k
}
269
270
1.64k
PgApiImpl::~PgApiImpl() {
271
1.64k
  messenger_holder_.messenger->Shutdown();
272
1.64k
  async_client_init_.client()->Shutdown();
273
1.64k
  pg_txn_manager_.reset();
274
1.64k
  pg_client_.Shutdown();
275
1.64k
}
276
277
24.5M
const YBCPgTypeEntity *PgApiImpl::FindTypeEntity(int type_oid) {
278
24.5M
  const auto iter = type_map_.find(type_oid);
279
24.5M
  if (iter != type_map_.end()) {
280
24.4M
    return iter->second;
281
24.4M
  }
282
120k
  return nullptr;
283
120k
}
284
285
//--------------------------------------------------------------------------------------------------
286
287
0
Status PgApiImpl::CreateEnv(PgEnv **pg_env) {
288
0
  *pg_env = pg_env_.get();
289
0
  return Status::OK();
290
0
}
291
292
0
Status PgApiImpl::DestroyEnv(PgEnv *pg_env) {
293
0
  pg_env_ = nullptr;
294
0
  return Status::OK();
295
0
}
296
297
//--------------------------------------------------------------------------------------------------
298
299
Status PgApiImpl::InitSession(const PgEnv *pg_env,
300
1.65k
                              const string& database_name) {
301
1.65k
  CHECK(!pg_session_);
302
1.65k
  auto session = make_scoped_refptr<PgSession>(client(),
303
1.65k
                                               &pg_client_,
304
1.65k
                                               database_name,
305
1.65k
                                               pg_txn_manager_,
306
1.65k
                                               clock_,
307
1.65k
                                               tserver_shared_object_.get(),
308
1.65k
                                               pg_callbacks_);
309
1.65k
  if (!database_name.empty()) {
310
1.65k
    RETURN_NOT_OK(session->ConnectDatabase(database_name));
311
1.65k
  }
312
313
1.65k
  pg_session_.swap(session);
314
1.65k
  return Status::OK();
315
1.65k
}
316
317
613
Status PgApiImpl::InvalidateCache() {
318
613
  pg_session_->InvalidateAllTablesCache();
319
613
  return Status::OK();
320
613
}
321
322
55
bool PgApiImpl::GetDisableTransparentCacheRefreshRetry() {
323
55
  return FLAGS_TEST_ysql_disable_transparent_cache_refresh_retry;
324
55
}
325
326
//--------------------------------------------------------------------------------------------------
327
328
217k
PgMemctx *PgApiImpl::CreateMemctx() {
329
  // Postgres will create YB Memctx when it first use the Memctx to allocate YugaByte object.
330
217k
  return PgMemctx::Create();
331
217k
}
332
333
215k
Status PgApiImpl::DestroyMemctx(PgMemctx *memctx) {
334
  // Postgres will destroy YB Memctx by releasing the pointer.
335
215k
  return PgMemctx::Destroy(memctx);
336
215k
}
337
338
1.17M
Status PgApiImpl::ResetMemctx(PgMemctx *memctx) {
339
  // Postgres reset YB Memctx when clearing a context content without clearing its nested context.
340
1.17M
  return PgMemctx::Reset(memctx);
341
1.17M
}
342
343
// TODO(neil) Use Arena in the future.
344
// - PgStatement should have been declared as derived class of "MCBase".
345
// - All objects of PgStatement's derived class should be allocated by YbPgMemctx::Arena.
346
// - We cannot use Arena yet because quite a large number of YugaByte objects are being referenced
347
//   from other layers.  Those added code violated the original design as they assume ScopedPtr
348
//   instead of memory pool is being used. This mess should be cleaned up later.
349
//
350
// For now, statements is allocated as ScopedPtr and cached in the memory context. The statements
351
// would then be destructed when the context is destroyed and all other references are also cleared.
352
Status PgApiImpl::AddToCurrentPgMemctx(std::unique_ptr<PgStatement> stmt,
353
2.69M
                                       PgStatement **handle) {
354
2.69M
  *handle = stmt.get();
355
2.69M
  pg_callbacks_.GetCurrentYbMemctx()->Register(stmt.release());
356
2.69M
  return Status::OK();
357
2.69M
}
358
359
// TODO(neil) Most like we don't need table_desc. If we do need it, use Arena here.
360
// - PgTableDesc should have been declared as derived class of "MCBase".
361
// - PgTableDesc objects should be allocated by YbPgMemctx::Arena.
362
//
363
// For now, table_desc is allocated as ScopedPtr and cached in the memory context. The table_desc
364
// would then be destructed when the context is destroyed.
365
Status PgApiImpl::AddToCurrentPgMemctx(size_t table_desc_id,
366
1.03M
                                       const PgTableDescPtr &table_desc) {
367
1.03M
  pg_callbacks_.GetCurrentYbMemctx()->Cache(table_desc_id, table_desc);
368
1.03M
  return Status::OK();
369
1.03M
}
370
371
5.96M
Status PgApiImpl::GetTabledescFromCurrentPgMemctx(size_t table_desc_id, PgTableDesc **handle) {
372
5.96M
  pg_callbacks_.GetCurrentYbMemctx()->GetCache(table_desc_id, handle);
373
5.96M
  return Status::OK();
374
5.96M
}
375
376
//--------------------------------------------------------------------------------------------------
377
378
0
Status PgApiImpl::CreateSequencesDataTable() {
379
0
  return pg_session_->CreateSequencesDataTable();
380
0
}
381
382
Status PgApiImpl::InsertSequenceTuple(int64_t db_oid,
383
                                      int64_t seq_oid,
384
                                      uint64_t ysql_catalog_version,
385
                                      int64_t last_val,
386
54
                                      bool is_called) {
387
54
  return pg_session_->InsertSequenceTuple(
388
54
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called);
389
54
}
390
391
Status PgApiImpl::UpdateSequenceTupleConditionally(int64_t db_oid,
392
                                                   int64_t seq_oid,
393
                                                   uint64_t ysql_catalog_version,
394
                                                   int64_t last_val,
395
                                                   bool is_called,
396
                                                   int64_t expected_last_val,
397
                                                   bool expected_is_called,
398
31
                                                   bool *skipped) {
399
31
  return pg_session_->UpdateSequenceTuple(
400
31
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called,
401
31
      expected_last_val, expected_is_called, skipped);
402
31
}
403
404
Status PgApiImpl::UpdateSequenceTuple(int64_t db_oid,
405
                                      int64_t seq_oid,
406
                                      uint64_t ysql_catalog_version,
407
                                      int64_t last_val,
408
                                      bool is_called,
409
2
                                      bool* skipped) {
410
2
  return pg_session_->UpdateSequenceTuple(
411
2
      db_oid, seq_oid, ysql_catalog_version, last_val,
412
2
      is_called, boost::none, boost::none, skipped);
413
2
}
414
415
Status PgApiImpl::ReadSequenceTuple(int64_t db_oid,
416
                                    int64_t seq_oid,
417
                                    uint64_t ysql_catalog_version,
418
                                    int64_t *last_val,
419
69
                                    bool *is_called) {
420
69
  return pg_session_->ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version, last_val, is_called);
421
69
}
422
423
44
Status PgApiImpl::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) {
424
44
  return pg_session_->DeleteSequenceTuple(db_oid, seq_oid);
425
44
}
426
427
428
//--------------------------------------------------------------------------------------------------
429
430
2.30M
void PgApiImpl::DeleteStatement(PgStatement *handle) {
431
2.30M
  if (handle) {
432
2.30M
    PgMemctx::Destroy(handle);
433
2.30M
  }
434
2.30M
}
435
436
//--------------------------------------------------------------------------------------------------
437
438
0
Status PgApiImpl::ConnectDatabase(const char *database_name) {
439
0
  return pg_session_->ConnectDatabase(database_name);
440
0
}
441
442
1.61k
Status PgApiImpl::IsDatabaseColocated(const PgOid database_oid, bool *colocated) {
443
1.61k
  return pg_session_->IsDatabaseColocated(database_oid, colocated);
444
1.61k
}
445
446
Status PgApiImpl::NewCreateDatabase(const char *database_name,
447
                                    const PgOid database_oid,
448
                                    const PgOid source_database_oid,
449
                                    const PgOid next_oid,
450
                                    const bool colocated,
451
22
                                    PgStatement **handle) {
452
22
  auto stmt = std::make_unique<PgCreateDatabase>(pg_session_, database_name, database_oid,
453
22
                                                 source_database_oid, next_oid, colocated);
454
22
  if (pg_txn_manager_->IsDdlMode()) {
455
22
    stmt->UseTransaction();
456
22
  }
457
22
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
458
22
  return Status::OK();
459
22
}
460
461
22
Status PgApiImpl::ExecCreateDatabase(PgStatement *handle) {
462
22
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_DATABASE)) {
463
    // Invalid handle.
464
0
    return STATUS(InvalidArgument, "Invalid statement handle");
465
0
  }
466
467
22
  return down_cast<PgCreateDatabase*>(handle)->Exec();
468
22
}
469
470
Status PgApiImpl::NewDropDatabase(const char *database_name,
471
                                  PgOid database_oid,
472
21
                                  PgStatement **handle) {
473
21
  auto stmt = std::make_unique<PgDropDatabase>(pg_session_, database_name, database_oid);
474
21
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
475
21
  return Status::OK();
476
21
}
477
478
21
Status PgApiImpl::ExecDropDatabase(PgStatement *handle) {
479
21
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_DATABASE)) {
480
    // Invalid handle.
481
0
    return STATUS(InvalidArgument, "Invalid statement handle");
482
0
  }
483
21
  return down_cast<PgDropDatabase*>(handle)->Exec();
484
21
}
485
486
Status PgApiImpl::NewAlterDatabase(const char *database_name,
487
                                  PgOid database_oid,
488
0
                                  PgStatement **handle) {
489
0
  auto stmt = std::make_unique<PgAlterDatabase>(pg_session_, database_name, database_oid);
490
0
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
491
0
  return Status::OK();
492
0
}
493
494
0
Status PgApiImpl::AlterDatabaseRenameDatabase(PgStatement *handle, const char *newname) {
495
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) {
496
    // Invalid handle.
497
0
    return STATUS(InvalidArgument, "Invalid statement handle");
498
0
  }
499
0
  down_cast<PgAlterDatabase*>(handle)->RenameDatabase(newname);
500
0
  return Status::OK();
501
0
}
502
503
0
Status PgApiImpl::ExecAlterDatabase(PgStatement *handle) {
504
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) {
505
    // Invalid handle.
506
0
    return STATUS(InvalidArgument, "Invalid statement handle");
507
0
  }
508
0
  return down_cast<PgAlterDatabase*>(handle)->Exec();
509
0
}
510
511
Status PgApiImpl::ReserveOids(const PgOid database_oid,
512
                              const PgOid next_oid,
513
                              const uint32_t count,
514
                              PgOid *begin_oid,
515
380
                              PgOid *end_oid) {
516
380
  auto p = VERIFY_RESULT(pg_client_.ReserveOids(database_oid, next_oid, count));
517
380
  *begin_oid = p.first;
518
380
  *end_oid = p.second;
519
380
  return Status::OK();
520
380
}
521
522
0
Status PgApiImpl::GetCatalogMasterVersion(uint64_t *version) {
523
0
  return pg_session_->GetCatalogMasterVersion(version);
524
0
}
525
526
5.15k
Result<PgTableDescPtr> PgApiImpl::LoadTable(const PgObjectId& table_id) {
527
5.15k
  return pg_session_->LoadTable(table_id);
528
5.15k
}
529
530
2
void PgApiImpl::InvalidateTableCache(const PgObjectId& table_id) {
531
2
  pg_session_->InvalidateTableCache(table_id, InvalidateOnPgClient::kTrue);
532
2
}
533
534
//--------------------------------------------------------------------------------------------------
535
536
Status PgApiImpl::NewCreateTablegroup(const char *database_name,
537
                                      const PgOid database_oid,
538
                                      const PgOid tablegroup_oid,
539
                                      const PgOid tablespace_oid,
540
1
                                      PgStatement **handle) {
541
1
  auto stmt = std::make_unique<PgCreateTablegroup>(pg_session_, database_name,
542
1
                                                   database_oid, tablegroup_oid, tablespace_oid);
543
1
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
544
1
  return Status::OK();
545
1
}
546
547
1
Status PgApiImpl::ExecCreateTablegroup(PgStatement *handle) {
548
1
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLEGROUP)) {
549
    // Invalid handle.
550
0
    return STATUS(InvalidArgument, "Invalid statement handle");
551
0
  }
552
553
1
  return down_cast<PgCreateTablegroup*>(handle)->Exec();
554
1
}
555
556
Status PgApiImpl::NewDropTablegroup(const PgOid database_oid,
557
                                    const PgOid tablegroup_oid,
558
1
                                    PgStatement **handle) {
559
1
  auto stmt = std::make_unique<PgDropTablegroup>(pg_session_, database_oid, tablegroup_oid);
560
1
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
561
1
  return Status::OK();
562
1
}
563
564
565
0
Status PgApiImpl::ExecDropTablegroup(PgStatement *handle) {
566
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLEGROUP)) {
567
    // Invalid handle.
568
0
    return STATUS(InvalidArgument, "Invalid statement handle");
569
0
  }
570
0
  return down_cast<PgDropTablegroup*>(handle)->Exec();
571
0
}
572
573
574
//--------------------------------------------------------------------------------------------------
575
576
Status PgApiImpl::NewCreateTable(const char *database_name,
577
                                 const char *schema_name,
578
                                 const char *table_name,
579
                                 const PgObjectId& table_id,
580
                                 bool is_shared_table,
581
                                 bool if_not_exist,
582
                                 bool add_primary_key,
583
                                 const bool colocated,
584
                                 const PgObjectId& tablegroup_oid,
585
                                 const PgObjectId& tablespace_oid,
586
                                 const PgObjectId& matview_pg_table_oid,
587
1.27k
                                 PgStatement **handle) {
588
1.27k
  auto stmt = std::make_unique<PgCreateTable>(
589
1.27k
      pg_session_, database_name, schema_name, table_name,
590
1.27k
      table_id, is_shared_table, if_not_exist, add_primary_key, colocated, tablegroup_oid,
591
1.27k
      tablespace_oid, matview_pg_table_oid);
592
1.27k
  if (pg_txn_manager_->IsDdlMode()) {
593
1.27k
    stmt->UseTransaction();
594
1.27k
  }
595
1.27k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
596
1.27k
  return Status::OK();
597
1.27k
}
598
599
Status PgApiImpl::CreateTableAddColumn(PgStatement *handle, const char *attr_name, int attr_num,
600
                                       const YBCPgTypeEntity *attr_type,
601
                                       bool is_hash, bool is_range,
602
2.84k
                                       bool is_desc, bool is_nulls_first) {
603
2.84k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) {
604
    // Invalid handle.
605
0
    return STATUS(InvalidArgument, "Invalid statement handle");
606
0
  }
607
2.84k
  return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type,
608
2.84k
      is_hash, is_range, is_desc, is_nulls_first);
609
2.84k
}
610
611
162
Status PgApiImpl::CreateTableSetNumTablets(PgStatement *handle, int32_t num_tablets) {
612
162
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) {
613
    // Invalid handle.
614
0
    return STATUS(InvalidArgument, "Invalid statement handle");
615
0
  }
616
162
  return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets);
617
162
}
618
619
15
Status PgApiImpl::AddSplitBoundary(PgStatement *handle, PgExpr **exprs, int expr_count) {
620
  // Partitioning a TABLE or an INDEX.
621
15
  if (PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE) ||
622
15
      PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) {
623
15
    return down_cast<PgCreateTable*>(handle)->AddSplitBoundary(exprs, expr_count);
624
15
  }
625
626
  // Invalid handle.
627
0
  return STATUS(InvalidArgument, "Invalid statement handle");
628
0
}
629
630
1.26k
Status PgApiImpl::ExecCreateTable(PgStatement *handle) {
631
1.26k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) {
632
    // Invalid handle.
633
0
    return STATUS(InvalidArgument, "Invalid statement handle");
634
0
  }
635
1.26k
  return down_cast<PgCreateTable*>(handle)->Exec();
636
1.26k
}
637
638
Status PgApiImpl::NewAlterTable(const PgObjectId& table_id,
639
661
                                PgStatement **handle) {
640
661
  auto stmt = std::make_unique<PgAlterTable>(pg_session_, table_id);
641
661
  if (pg_txn_manager_->IsDdlMode()) {
642
661
    stmt->UseTransaction();
643
661
  }
644
661
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
645
661
  return Status::OK();
646
661
}
647
648
Status PgApiImpl::AlterTableAddColumn(PgStatement *handle, const char *name,
649
69
                                      int order, const YBCPgTypeEntity *attr_type) {
650
69
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
651
    // Invalid handle.
652
0
    return STATUS(InvalidArgument, "Invalid statement handle");
653
0
  }
654
655
69
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
656
69
  return pg_stmt->AddColumn(name, attr_type, order);
657
69
}
658
659
Status PgApiImpl::AlterTableRenameColumn(PgStatement *handle, const char *oldname,
660
1
                                         const char *newname) {
661
1
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
662
    // Invalid handle.
663
0
    return STATUS(InvalidArgument, "Invalid statement handle");
664
0
  }
665
666
1
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
667
1
  return pg_stmt->RenameColumn(oldname, newname);
668
1
}
669
670
116
Status PgApiImpl::AlterTableDropColumn(PgStatement *handle, const char *name) {
671
116
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
672
    // Invalid handle.
673
0
    return STATUS(InvalidArgument, "Invalid statement handle");
674
0
  }
675
676
116
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
677
116
  return pg_stmt->DropColumn(name);
678
116
}
679
680
Status PgApiImpl::AlterTableRenameTable(PgStatement *handle, const char *db_name,
681
42
                                        const char *newname) {
682
42
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
683
    // Invalid handle.
684
0
    return STATUS(InvalidArgument, "Invalid statement handle");
685
0
  }
686
687
42
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
688
42
  return pg_stmt->RenameTable(db_name, newname);
689
42
}
690
691
155
Status PgApiImpl::ExecAlterTable(PgStatement *handle) {
692
155
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
693
    // Invalid handle.
694
0
    return STATUS(InvalidArgument, "Invalid statement handle");
695
0
  }
696
155
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
697
155
  return pg_stmt->Exec();
698
155
}
699
700
Status PgApiImpl::NewDropTable(const PgObjectId& table_id,
701
                               bool if_exist,
702
1.03k
                               PgStatement **handle) {
703
1.03k
  auto stmt = std::make_unique<PgDropTable>(pg_session_, table_id, if_exist);
704
1.03k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
705
1.03k
  return Status::OK();
706
1.03k
}
707
708
Status PgApiImpl::NewTruncateTable(const PgObjectId& table_id,
709
31
                                   PgStatement **handle) {
710
31
  auto stmt = std::make_unique<PgTruncateTable>(pg_session_, table_id);
711
31
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
712
31
  return Status::OK();
713
31
}
714
715
31
Status PgApiImpl::ExecTruncateTable(PgStatement *handle) {
716
31
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE_TABLE)) {
717
    // Invalid handle.
718
0
    return STATUS(InvalidArgument, "Invalid statement handle");
719
0
  }
720
31
  return down_cast<PgTruncateTable*>(handle)->Exec();
721
31
}
722
723
Status PgApiImpl::GetTableDesc(const PgObjectId& table_id,
724
5.96M
                               PgTableDesc **handle) {
725
  // First read from memory context.
726
5.96M
  size_t hash_id = hash_value(table_id);
727
5.96M
  RETURN_NOT_OK(GetTabledescFromCurrentPgMemctx(hash_id, handle));
728
729
  // Read from environment.
730
5.96M
  if (*handle == nullptr) {
731
1.03M
    auto result = pg_session_->LoadTable(table_id);
732
1.03M
    RETURN_NOT_OK(result);
733
1.03M
    RETURN_NOT_OK(AddToCurrentPgMemctx(hash_id, *result));
734
735
1.03M
    *handle = result->get();
736
1.03M
  }
737
738
5.96M
  return Status::OK();
739
5.96M
}
740
741
Result<YBCPgColumnInfo> PgApiImpl::GetColumnInfo(YBCPgTableDesc table_desc,
742
26.2M
                                                 int16_t attr_number) {
743
26.2M
  return table_desc->GetColumnInfo(attr_number);
744
26.2M
}
745
746
0
Status PgApiImpl::DmlModifiesRow(PgStatement *handle, bool *modifies_row) {
747
0
  if (!handle) {
748
0
    return STATUS(InvalidArgument, "Invalid statement handle");
749
0
  }
750
751
0
  *modifies_row = false;
752
753
0
  switch (handle->stmt_op()) {
754
0
    case StmtOp::STMT_UPDATE:
755
0
    case StmtOp::STMT_DELETE:
756
0
      *modifies_row = true;
757
0
      break;
758
0
    default:
759
0
      break;
760
0
  }
761
762
0
  return Status::OK();
763
0
}
764
765
0
Status PgApiImpl::SetIsSysCatalogVersionChange(PgStatement *handle) {
766
0
  if (!handle) {
767
0
    return STATUS(InvalidArgument, "Invalid statement handle");
768
0
  }
769
770
0
  switch (handle->stmt_op()) {
771
0
    case StmtOp::STMT_UPDATE:
772
0
    case StmtOp::STMT_DELETE:
773
0
    case StmtOp::STMT_INSERT:
774
0
      down_cast<PgDmlWrite *>(handle)->SetIsSystemCatalogChange();
775
0
      return Status::OK();
776
0
    default:
777
0
      break;
778
0
  }
779
780
0
  return STATUS(InvalidArgument, "Invalid statement handle");
781
0
}
782
783
2.31M
Status PgApiImpl::SetCatalogCacheVersion(PgStatement *handle, uint64_t catalog_cache_version) {
784
2.31M
  if (!handle) {
785
0
    return STATUS(InvalidArgument, "Invalid statement handle");
786
0
  }
787
788
2.31M
  switch (handle->stmt_op()) {
789
191k
    case StmtOp::STMT_SELECT:
790
2.03M
    case StmtOp::STMT_INSERT:
791
2.20M
    case StmtOp::STMT_UPDATE:
792
2.31M
    case StmtOp::STMT_DELETE:
793
2.31M
      down_cast<PgDml *>(handle)->SetCatalogCacheVersion(catalog_cache_version);
794
2.31M
      return Status::OK();
795
0
    default:
796
0
      break;
797
0
  }
798
799
0
  return STATUS(InvalidArgument, "Invalid statement handle");
800
0
}
801
802
//--------------------------------------------------------------------------------------------------
803
804
Status PgApiImpl::NewCreateIndex(const char *database_name,
805
                                 const char *schema_name,
806
                                 const char *index_name,
807
                                 const PgObjectId& index_id,
808
                                 const PgObjectId& base_table_id,
809
                                 bool is_shared_index,
810
                                 bool is_unique_index,
811
                                 const bool skip_index_backfill,
812
                                 bool if_not_exist,
813
                                 const PgObjectId& tablegroup_oid,
814
                                 const PgObjectId& tablespace_oid,
815
151
                                 PgStatement **handle) {
816
151
  auto stmt = std::make_unique<PgCreateTable>(
817
151
      pg_session_, database_name, schema_name, index_name, index_id, is_shared_index,
818
151
      if_not_exist, false /* add_primary_key */,
819
149
      tablegroup_oid.IsValid() ? false : true /* colocated */, tablegroup_oid, tablespace_oid,
820
151
      PgObjectId() /* matview_pg_table_id */);
821
151
  stmt->SetupIndex(base_table_id, is_unique_index, skip_index_backfill);
822
151
  if (pg_txn_manager_->IsDdlMode()) {
823
151
      stmt->UseTransaction();
824
151
  }
825
151
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
826
151
  return Status::OK();
827
151
}
828
829
Status PgApiImpl::CreateIndexAddColumn(PgStatement *handle, const char *attr_name, int attr_num,
830
                                       const YBCPgTypeEntity *attr_type,
831
                                       bool is_hash, bool is_range,
832
194
                                       bool is_desc, bool is_nulls_first) {
833
194
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) {
834
    // Invalid handle.
835
0
    return STATUS(InvalidArgument, "Invalid statement handle");
836
0
  }
837
838
194
  return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type,
839
194
      is_hash, is_range, is_desc, is_nulls_first);
840
194
}
841
842
0
Status PgApiImpl::CreateIndexSetNumTablets(PgStatement *handle, int32_t num_tablets) {
843
0
  SCHECK(PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX),
844
0
         InvalidArgument,
845
0
         "Invalid statement handle");
846
0
  return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets);
847
0
}
848
849
151
Status PgApiImpl::ExecCreateIndex(PgStatement *handle) {
850
151
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) {
851
    // Invalid handle.
852
0
    return STATUS(InvalidArgument, "Invalid statement handle");
853
0
  }
854
151
  return down_cast<PgCreateTable*>(handle)->Exec();
855
151
}
856
857
Status PgApiImpl::NewDropIndex(const PgObjectId& index_id,
858
                               bool if_exist,
859
141
                               PgStatement **handle) {
860
141
  auto stmt = std::make_unique<PgDropIndex>(pg_session_, index_id, if_exist);
861
141
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
862
141
  return Status::OK();
863
141
}
864
865
1.17k
Status PgApiImpl::ExecPostponedDdlStmt(PgStatement *handle) {
866
1.17k
  if (!handle) {
867
0
    return STATUS(InvalidArgument, "Invalid statement handle");
868
0
  }
869
870
1.17k
  switch (handle->stmt_op()) {
871
1.03k
    case StmtOp::STMT_DROP_TABLE:
872
1.03k
      return down_cast<PgDropTable*>(handle)->Exec();
873
141
    case StmtOp::STMT_DROP_INDEX:
874
141
      return down_cast<PgDropIndex*>(handle)->Exec();
875
1
    case StmtOp::STMT_DROP_TABLEGROUP:
876
1
      return down_cast<PgDropTablegroup*>(handle)->Exec();
877
878
0
    default:
879
0
      break;
880
0
  }
881
0
  return STATUS(InvalidArgument, "Invalid statement handle");
882
0
}
883
884
89
Status PgApiImpl::BackfillIndex(const PgObjectId& table_id) {
885
89
  tserver::PgBackfillIndexRequestPB req;
886
89
  table_id.ToPB(req.mutable_table_id());
887
89
  return pg_session_->pg_client().BackfillIndex(
888
89
      &req, CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms);
889
89
}
890
891
//--------------------------------------------------------------------------------------------------
892
// DML Statment Support.
893
//--------------------------------------------------------------------------------------------------
894
895
// Binding -----------------------------------------------------------------------------------------
896
897
7.54M
Status PgApiImpl::DmlAppendTarget(PgStatement *handle, PgExpr *target) {
898
7.54M
  return down_cast<PgDml*>(handle)->AppendTarget(target);
899
7.54M
}
900
901
22
Status PgApiImpl::DmlAppendQual(PgStatement *handle, PgExpr *qual) {
902
22
  return down_cast<PgDml*>(handle)->AppendQual(qual);
903
22
}
904
905
3.94k
Status PgApiImpl::DmlAppendColumnRef(PgStatement *handle, PgExpr *colref) {
906
3.94k
  return down_cast<PgDml*>(handle)->AppendColumnRef(colref);
907
3.94k
}
908
909
9.75M
Status PgApiImpl::DmlBindColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) {
910
9.75M
  return down_cast<PgDml*>(handle)->BindColumn(attr_num, attr_value);
911
9.75M
}
912
913
Status PgApiImpl::DmlBindColumnCondBetween(PgStatement *handle, int attr_num, PgExpr *attr_value,
914
134
    PgExpr *attr_value_end) {
915
134
  return down_cast<PgDmlRead*>(handle)->BindColumnCondBetween(attr_num, attr_value, attr_value_end);
916
134
}
917
918
Status PgApiImpl::DmlBindColumnCondIn(PgStatement *handle, int attr_num, int n_attr_values,
919
45.5k
    PgExpr **attr_values) {
920
45.5k
  return down_cast<PgDmlRead*>(handle)->BindColumnCondIn(attr_num, n_attr_values, attr_values);
921
45.5k
}
922
923
Status PgApiImpl::DmlBindHashCode(PgStatement *handle, bool start_valid,
924
                                    bool start_inclusive,
925
                                    uint64_t start_hash_val, bool end_valid,
926
0
                                    bool end_inclusive, uint64_t end_hash_val) {
927
0
  return down_cast<PgDmlRead*>(handle)
928
0
                  ->BindHashCode(start_valid, start_inclusive, start_hash_val,
929
0
                                  end_valid, end_inclusive, end_hash_val);
930
0
}
931
932
19
Status PgApiImpl::DmlBindTable(PgStatement *handle) {
933
19
  return down_cast<PgDml*>(handle)->BindTable();
934
19
}
935
936
1.16M
Result<YBCPgColumnInfo> PgApiImpl::DmlGetColumnInfo(YBCPgStatement handle, int attr_num) {
937
1.16M
  return down_cast<PgDml*>(handle)->GetColumnInfo(attr_num);
938
1.16M
}
939
940
259k
CHECKED_STATUS PgApiImpl::DmlAssignColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) {
941
259k
  return down_cast<PgDml*>(handle)->AssignColumn(attr_num, attr_value);
942
259k
}
943
944
Status PgApiImpl::DmlFetch(PgStatement *handle, int32_t natts, uint64_t *values, bool *isnulls,
945
20.5M
                           PgSysColumns *syscols, bool *has_data) {
946
20.5M
  return down_cast<PgDml*>(handle)->Fetch(natts, values, isnulls, syscols, has_data);
947
20.5M
}
948
949
Status PgApiImpl::ProcessYBTupleId(const YBCPgYBTupleIdDescriptor& descr,
950
2.07M
                                   const YBTupleIdProcessor& processor) {
951
2.07M
  auto target_desc = VERIFY_RESULT(pg_session_->LoadTable(
952
2.07M
      PgObjectId(descr.database_oid, descr.table_oid)));
953
2.07M
  SCHECK_EQ(descr.nattrs, target_desc->num_key_columns(), Corruption,
954
2.07M
            "Number of key components does not match column description");
955
2.07M
  vector<PrimitiveValue> *values = nullptr;
956
2.07M
  PgsqlExpressionPB *expr_pb;
957
2.07M
  PgsqlExpressionPB temp_expr_pb;
958
2.07M
  google::protobuf::RepeatedPtrField<PgsqlExpressionPB> hashed_values;
959
2.07M
  vector<docdb::PrimitiveValue> hashed_components, range_components;
960
2.07M
  hashed_components.reserve(target_desc->num_hash_key_columns());
961
2.07M
  range_components.reserve(target_desc->num_key_columns() - target_desc->num_hash_key_columns());
962
2.07M
  size_t remain_attr = descr.nattrs;
963
  // DocDB API requires that partition columns must be listed in their created-order.
964
  // Order from target_desc should be used as attributes sequence may have different order.
965
2.91M
  for (size_t i : Range(target_desc->schema().columns().size())) {
966
2.91M
    PgColumn column(target_desc->schema(), i);
967
4.15M
    for (auto attr = descr.attrs, end = descr.attrs + descr.nattrs; attr != end; ++attr) {
968
4.15M
      if (attr->attr_num == column.attr_num()) {
969
2.91M
        if (!column.is_primary()) {
970
0
          return STATUS_SUBSTITUTE(
971
0
              InvalidArgument, "Attribute number $0 not a primary attribute", attr->attr_num);
972
0
        }
973
2.91M
        if (column.is_partition()) {
974
          // Hashed component.
975
1.66M
          values = &hashed_components;
976
1.66M
          expr_pb = hashed_values.Add();
977
1.24M
        } else {
978
          // Range component.
979
1.24M
          values = &range_components;
980
1.24M
          expr_pb = &temp_expr_pb;
981
1.24M
        }
982
983
2.91M
        if (attr->is_null) {
984
117k
          values->emplace_back(ValueType::kNullLow);
985
2.79M
        } else {
986
2.79M
          if (attr->attr_num == to_underlying(PgSystemAttrNum::kYBRowId)) {
987
483k
            expr_pb->mutable_value()->set_binary_value(pg_session_->GenerateNewRowid());
988
2.30M
          } else {
989
2.30M
            const YBCPgCollationInfo& collation_info = attr->collation_info;
990
2.30M
            PgConstant value(
991
2.30M
                attr->type_entity, collation_info.collate_is_valid_non_c,
992
2.30M
                collation_info.sortkey, attr->datum, false);
993
2.30M
            SCHECK_EQ(column.internal_type(), value.internal_type(), Corruption,
994
2.30M
                      "Attribute value type does not match column type");
995
2.30M
            RETURN_NOT_OK(value.Eval(expr_pb->mutable_value()));
996
2.30M
          }
997
2.79M
          values->push_back(PrimitiveValue::FromQLValuePB(expr_pb->value(),
998
2.79M
                                                          column.desc().sorting_type()));
999
2.79M
        }
1000
1001
2.91M
        if (--remain_attr == 0) {
1002
2.07M
          SCHECK_EQ(hashed_components.size(), target_desc->num_hash_key_columns(), Corruption,
1003
2.07M
                    "Number of hashed components does not match column description");
1004
2.07M
          SCHECK_EQ(range_components.size(),
1005
2.07M
                    target_desc->num_key_columns() - target_desc->num_hash_key_columns(),
1006
2.07M
                    Corruption, "Number of range components does not match column description");
1007
2.07M
          if (hashed_values.empty()) {
1008
498k
            return processor(docdb::DocKey(move(range_components)).Encode());
1009
498k
          }
1010
1.57M
          string partition_key;
1011
1.57M
          const PartitionSchema& partition_schema = target_desc->partition_schema();
1012
1.57M
          RETURN_NOT_OK(partition_schema.EncodeKey(hashed_values, &partition_key));
1013
1.57M
          const uint16_t hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
1014
1015
1.57M
          return processor(
1016
1.57M
              docdb::DocKey(hash, move(hashed_components), move(range_components)).Encode());
1017
839k
        }
1018
839k
        break;
1019
839k
      }
1020
4.15M
    }
1021
2.91M
  }
1022
1023
56
  return STATUS_FORMAT(Corruption, "Not all attributes ($0) were resolved", remain_attr);
1024
2.07M
}
1025
1026
201k
Status PgApiImpl::StartOperationsBuffering() {
1027
201k
  return pg_session_->StartOperationsBuffering();
1028
201k
}
1029
1030
174k
Status PgApiImpl::StopOperationsBuffering() {
1031
174k
  return pg_session_->StopOperationsBuffering();
1032
174k
}
1033
1034
48.0k
void PgApiImpl::ResetOperationsBuffering() {
1035
48.0k
  pg_session_->ResetOperationsBuffering();
1036
48.0k
}
1037
1038
185k
Status PgApiImpl::FlushBufferedOperations() {
1039
185k
  return pg_session_->FlushBufferedOperations();
1040
185k
}
1041
1042
2.12M
Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count) {
1043
2.12M
  switch (handle->stmt_op()) {
1044
1.84M
    case StmtOp::STMT_INSERT:
1045
2.01M
    case StmtOp::STMT_UPDATE:
1046
2.12M
    case StmtOp::STMT_DELETE:
1047
2.12M
    case StmtOp::STMT_TRUNCATE:
1048
2.12M
      {
1049
2.12M
        auto dml_write = down_cast<PgDmlWrite *>(handle);
1050
2.12M
        RETURN_NOT_OK(dml_write->Exec(rows_affected_count != nullptr /* force_non_bufferable */));
1051
2.12M
        if (rows_affected_count) {
1052
4.08k
          *rows_affected_count = dml_write->GetRowsAffectedCount();
1053
4.08k
        }
1054
2.12M
        return Status::OK();
1055
2.12M
      }
1056
0
    default:
1057
0
      break;
1058
0
  }
1059
0
  return STATUS(InvalidArgument, "Invalid statement handle");
1060
0
}
1061
1062
// Insert ------------------------------------------------------------------------------------------
1063
1064
Status PgApiImpl::NewInsert(const PgObjectId& table_id,
1065
                            const bool is_single_row_txn,
1066
1.84M
                            PgStatement **handle) {
1067
1.84M
  *handle = nullptr;
1068
1.84M
  auto stmt = std::make_unique<PgInsert>(pg_session_, table_id, is_single_row_txn);
1069
1.84M
  RETURN_NOT_OK(stmt->Prepare());
1070
1.84M
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1071
1.84M
  return Status::OK();
1072
1.84M
}
1073
1074
0
Status PgApiImpl::ExecInsert(PgStatement *handle) {
1075
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1076
    // Invalid handle.
1077
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1078
0
  }
1079
0
  return down_cast<PgInsert*>(handle)->Exec();
1080
0
}
1081
1082
228k
Status PgApiImpl::InsertStmtSetUpsertMode(PgStatement *handle) {
1083
228k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1084
    // Invalid handle.
1085
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1086
0
  }
1087
228k
  down_cast<PgInsert*>(handle)->SetUpsertMode();
1088
1089
228k
  return Status::OK();
1090
228k
}
1091
1092
103k
Status PgApiImpl::InsertStmtSetWriteTime(PgStatement *handle, const HybridTime write_time) {
1093
103k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1094
    // Invalid handle.
1095
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1096
0
  }
1097
103k
  RETURN_NOT_OK(down_cast<PgInsert*>(handle)->SetWriteTime(write_time));
1098
103k
  return Status::OK();
1099
103k
}
1100
1101
103k
Status PgApiImpl::InsertStmtSetIsBackfill(PgStatement *handle, const bool is_backfill) {
1102
103k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1103
    // Invalid handle.
1104
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1105
0
  }
1106
103k
  down_cast<PgInsert*>(handle)->SetIsBackfill(is_backfill);
1107
103k
  return Status::OK();
1108
103k
}
1109
1110
// Update ------------------------------------------------------------------------------------------
1111
1112
Status PgApiImpl::NewUpdate(const PgObjectId& table_id,
1113
                            const bool is_single_row_txn,
1114
166k
                            PgStatement **handle) {
1115
166k
  *handle = nullptr;
1116
166k
  auto stmt = std::make_unique<PgUpdate>(pg_session_, table_id, is_single_row_txn);
1117
166k
  RETURN_NOT_OK(stmt->Prepare());
1118
166k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1119
166k
  return Status::OK();
1120
166k
}
1121
1122
0
Status PgApiImpl::ExecUpdate(PgStatement *handle) {
1123
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_UPDATE)) {
1124
    // Invalid handle.
1125
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1126
0
  }
1127
0
  return down_cast<PgUpdate*>(handle)->Exec();
1128
0
}
1129
1130
// Delete ------------------------------------------------------------------------------------------
1131
1132
Status PgApiImpl::NewDelete(const PgObjectId& table_id,
1133
                            const bool is_single_row_txn,
1134
116k
                            PgStatement **handle) {
1135
116k
  *handle = nullptr;
1136
116k
  auto stmt = std::make_unique<PgDelete>(pg_session_, table_id, is_single_row_txn);
1137
116k
  RETURN_NOT_OK(stmt->Prepare());
1138
116k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1139
116k
  return Status::OK();
1140
116k
}
1141
1142
0
Status PgApiImpl::ExecDelete(PgStatement *handle) {
1143
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) {
1144
    // Invalid handle.
1145
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1146
0
  }
1147
0
  return down_cast<PgDelete*>(handle)->Exec();
1148
0
}
1149
1150
79
Status PgApiImpl::NewSample(const PgObjectId& table_id, const int targrows, PgStatement **handle) {
1151
79
  *handle = nullptr;
1152
79
  auto sample = std::make_unique<PgSample>(pg_session_, targrows, table_id);
1153
79
  RETURN_NOT_OK(sample->Prepare());
1154
79
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(sample), handle));
1155
79
  return Status::OK();
1156
79
}
1157
1158
79
Status PgApiImpl::InitRandomState(PgStatement *handle, double rstate_w, uint64 rand_state) {
1159
79
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1160
    // Invalid handle.
1161
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1162
0
  }
1163
79
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->InitRandomState(rstate_w, rand_state));
1164
79
  return Status::OK();
1165
79
}
1166
1167
270
Status PgApiImpl::SampleNextBlock(PgStatement *handle, bool *has_more) {
1168
270
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1169
    // Invalid handle.
1170
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1171
0
  }
1172
270
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->SampleNextBlock(has_more));
1173
270
  return Status::OK();
1174
270
}
1175
1176
79
Status PgApiImpl::ExecSample(PgStatement *handle) {
1177
79
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1178
    // Invalid handle.
1179
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1180
0
  }
1181
79
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->Exec(nullptr));
1182
79
  return Status::OK();
1183
79
}
1184
1185
79
Status PgApiImpl::GetEstimatedRowCount(PgStatement *handle, double *liverows, double *deadrows) {
1186
79
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1187
    // Invalid handle.
1188
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1189
0
  }
1190
79
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->GetEstimatedRowCount(liverows, deadrows));
1191
79
  return Status::OK();
1192
79
}
1193
1194
0
Status PgApiImpl::DeleteStmtSetIsPersistNeeded(PgStatement *handle, const bool is_persist_needed) {
1195
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) {
1196
    // Invalid handle.
1197
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1198
0
  }
1199
0
  down_cast<PgDelete*>(handle)->SetIsPersistNeeded(is_persist_needed);
1200
0
  return Status::OK();
1201
0
}
1202
1203
// Colocated Truncate ------------------------------------------------------------------------------
1204
1205
Status PgApiImpl::NewTruncateColocated(const PgObjectId& table_id,
1206
                                       const bool is_single_row_txn,
1207
19
                                       PgStatement **handle) {
1208
19
  *handle = nullptr;
1209
19
  auto stmt = std::make_unique<PgTruncateColocated>(pg_session_, table_id, is_single_row_txn);
1210
19
  RETURN_NOT_OK(stmt->Prepare());
1211
19
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1212
19
  return Status::OK();
1213
19
}
1214
1215
0
Status PgApiImpl::ExecTruncateColocated(PgStatement *handle) {
1216
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE)) {
1217
    // Invalid handle.
1218
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1219
0
  }
1220
0
  return down_cast<PgTruncateColocated*>(handle)->Exec();
1221
0
}
1222
1223
// Select ------------------------------------------------------------------------------------------
1224
1225
Status PgApiImpl::NewSelect(const PgObjectId& table_id,
1226
                            const PgObjectId& index_id,
1227
                            const PgPrepareParameters *prepare_params,
1228
561k
                            PgStatement **handle) {
1229
  // Scenarios:
1230
  // - Sequential Scan: PgSelect to read from table_id.
1231
  // - Primary Scan: PgSelect from table_id. YugaByte does not have separate table for primary key.
1232
  // - Index-Only-Scan: PgSelectIndex directly from secondary index_id.
1233
  // - IndexScan: Use PgSelectIndex to read from index_id and then PgSelect to read from table_id.
1234
  //     Note that for SysTable, only one request is send for both table_id and index_id.
1235
561k
  *handle = nullptr;
1236
561k
  std::unique_ptr<PgDmlRead> stmt;
1237
561k
  if (prepare_params && prepare_params->index_only_scan && prepare_params->use_secondary_index) {
1238
44.0k
    if (!index_id.IsValid()) {
1239
0
      return STATUS(InvalidArgument, "Cannot run query with invalid index ID");
1240
0
    }
1241
44.0k
    stmt = std::make_unique<PgSelectIndex>(pg_session_, table_id, index_id, prepare_params);
1242
517k
  } else {
1243
    // For IndexScan PgSelect processing will create subquery PgSelectIndex.
1244
517k
    stmt = std::make_unique<PgSelect>(pg_session_, table_id, index_id, prepare_params);
1245
517k
  }
1246
1247
561k
  RETURN_NOT_OK(stmt->Prepare());
1248
561k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1249
561k
  return Status::OK();
1250
561k
}
1251
1252
526k
Status PgApiImpl::SetForwardScan(PgStatement *handle, bool is_forward_scan) {
1253
526k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) {
1254
    // Invalid handle.
1255
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1256
0
  }
1257
526k
  down_cast<PgDmlRead*>(handle)->SetForwardScan(is_forward_scan);
1258
526k
  return Status::OK();
1259
526k
}
1260
1261
560k
Status PgApiImpl::ExecSelect(PgStatement *handle, const PgExecParameters *exec_params) {
1262
560k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) {
1263
    // Invalid handle.
1264
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1265
0
  }
1266
560k
  return down_cast<PgDmlRead*>(handle)->Exec(exec_params);
1267
560k
}
1268
1269
//--------------------------------------------------------------------------------------------------
1270
// Expressions.
1271
//--------------------------------------------------------------------------------------------------
1272
1273
// Column references -------------------------------------------------------------------------------
1274
1275
Status PgApiImpl::NewColumnRef(
1276
    PgStatement *stmt, int attr_num, const PgTypeEntity *type_entity, bool collate_is_valid_non_c,
1277
7.55M
    const PgTypeAttrs *type_attrs, PgExpr **expr_handle) {
1278
7.55M
  if (!stmt) {
1279
    // Invalid handle.
1280
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1281
0
  }
1282
7.55M
  PgColumnRef::SharedPtr colref =
1283
7.55M
    make_shared<PgColumnRef>(attr_num, type_entity, collate_is_valid_non_c, type_attrs);
1284
7.55M
  stmt->AddExpr(colref);
1285
1286
7.55M
  *expr_handle = colref.get();
1287
7.55M
  return Status::OK();
1288
7.55M
}
1289
1290
// Constant ----------------------------------------------------------------------------------------
1291
Status PgApiImpl::NewConstant(
1292
    YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c,
1293
11.3M
    const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle) {
1294
11.3M
  if (!stmt) {
1295
    // Invalid handle.
1296
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1297
0
  }
1298
11.3M
  PgExpr::SharedPtr pg_const =
1299
11.3M
    make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey,
1300
11.3M
                            datum, is_null);
1301
11.3M
  stmt->AddExpr(pg_const);
1302
1303
11.3M
  *expr_handle = pg_const.get();
1304
11.3M
  return Status::OK();
1305
11.3M
}
1306
1307
Status PgApiImpl::NewConstantVirtual(
1308
    YBCPgStatement stmt, const YBCPgTypeEntity *type_entity,
1309
8
    YBCPgDatumKind datum_kind, YBCPgExpr *expr_handle) {
1310
8
  if (!stmt) {
1311
    // Invalid handle.
1312
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1313
0
  }
1314
8
  PgExpr::SharedPtr pg_const =
1315
8
    make_shared<PgConstant>(type_entity, false /* collate_is_valid_non_c */, datum_kind);
1316
8
  stmt->AddExpr(pg_const);
1317
1318
8
  *expr_handle = pg_const.get();
1319
8
  return Status::OK();
1320
8
}
1321
1322
Status PgApiImpl::NewConstantOp(
1323
    YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c,
1324
    const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle,
1325
0
    bool is_gt) {
1326
0
  if (!stmt) {
1327
    // Invalid handle.
1328
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1329
0
  }
1330
0
  PgExpr::SharedPtr pg_const =
1331
0
    make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey,
1332
0
      datum, is_null, is_gt ? PgExpr::Opcode::PG_EXPR_GT : PgExpr::Opcode::PG_EXPR_LT);
1333
0
  stmt->AddExpr(pg_const);
1334
1335
0
  *expr_handle = pg_const.get();
1336
0
  return Status::OK();
1337
0
}
1338
1339
// Text constant -----------------------------------------------------------------------------------
1340
1341
0
Status PgApiImpl::UpdateConstant(PgExpr *expr, const char *value, bool is_null) {
1342
0
  if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) {
1343
    // Invalid handle.
1344
0
    return STATUS(InvalidArgument, "Invalid expression handle for constant");
1345
0
  }
1346
0
  down_cast<PgConstant*>(expr)->UpdateConstant(value, is_null);
1347
0
  return Status::OK();
1348
0
}
1349
1350
0
Status PgApiImpl::UpdateConstant(PgExpr *expr, const void *value, int64_t bytes, bool is_null) {
1351
0
  if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) {
1352
    // Invalid handle.
1353
0
    return STATUS(InvalidArgument, "Invalid expression handle for constant");
1354
0
  }
1355
0
  down_cast<PgConstant*>(expr)->UpdateConstant(value, bytes, is_null);
1356
0
  return Status::OK();
1357
0
}
1358
1359
// Text constant -----------------------------------------------------------------------------------
1360
1361
Status PgApiImpl::NewOperator(
1362
    PgStatement *stmt, const char *opname, const YBCPgTypeEntity *type_entity,
1363
5.98k
    bool collate_is_valid_non_c, PgExpr **op_handle) {
1364
5.98k
  if (!stmt) {
1365
    // Invalid handle.
1366
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1367
0
  }
1368
5.98k
  RETURN_NOT_OK(PgExpr::CheckOperatorName(opname));
1369
1370
  // Create operator.
1371
5.98k
  PgExpr::SharedPtr pg_op = make_shared<PgOperator>(opname, type_entity, collate_is_valid_non_c);
1372
5.98k
  stmt->AddExpr(pg_op);
1373
1374
5.98k
  *op_handle = pg_op.get();
1375
5.98k
  return Status::OK();
1376
5.98k
}
1377
1378
5.98k
Status PgApiImpl::OperatorAppendArg(PgExpr *op_handle, PgExpr *arg) {
1379
5.98k
  if (!op_handle || !arg) {
1380
    // Invalid handle.
1381
0
    return STATUS(InvalidArgument, "Invalid expression handle");
1382
0
  }
1383
5.98k
  down_cast<PgOperator*>(op_handle)->AppendArg(arg);
1384
5.98k
  return Status::OK();
1385
5.98k
}
1386
1387
0
Result<bool> PgApiImpl::IsInitDbDone() {
1388
0
  return pg_session_->IsInitDbDone();
1389
0
}
1390
1391
158k
Result<uint64_t> PgApiImpl::GetSharedCatalogVersion() {
1392
158k
  return pg_session_->GetSharedCatalogVersion();
1393
158k
}
1394
1395
235
Result<uint64_t> PgApiImpl::GetSharedAuthKey() {
1396
235
  return pg_session_->GetSharedAuthKey();
1397
235
}
1398
1399
// Transaction Control -----------------------------------------------------------------------------
1400
160k
Status PgApiImpl::BeginTransaction() {
1401
160k
  pg_session_->InvalidateForeignKeyReferenceCache();
1402
160k
  return pg_txn_manager_->BeginTransaction();
1403
160k
}
1404
1405
20.2k
Status PgApiImpl::RecreateTransaction() {
1406
20.2k
  pg_session_->InvalidateForeignKeyReferenceCache();
1407
20.2k
  pg_session_->DropBufferedOperations();
1408
20.2k
  return pg_txn_manager_->RecreateTransaction();
1409
20.2k
}
1410
1411
0
Status PgApiImpl::RestartTransaction() {
1412
0
  pg_session_->InvalidateForeignKeyReferenceCache();
1413
0
  pg_session_->DropBufferedOperations();
1414
0
  return pg_txn_manager_->RestartTransaction();
1415
0
}
1416
1417
24.3k
Status PgApiImpl::ResetTransactionReadPoint() {
1418
24.3k
  return pg_txn_manager_->ResetTransactionReadPoint();
1419
24.3k
}
1420
1421
1
Status PgApiImpl::RestartReadPoint() {
1422
1
  return pg_txn_manager_->RestartReadPoint();
1423
1
}
1424
1425
155k
Status PgApiImpl::CommitTransaction() {
1426
155k
  pg_session_->InvalidateForeignKeyReferenceCache();
1427
155k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1428
155k
  return pg_txn_manager_->CommitTransaction();
1429
155k
}
1430
1431
11.6k
Status PgApiImpl::AbortTransaction() {
1432
11.6k
  pg_session_->InvalidateForeignKeyReferenceCache();
1433
11.6k
  pg_session_->DropBufferedOperations();
1434
11.6k
  return pg_txn_manager_->AbortTransaction();
1435
11.6k
}
1436
1437
161k
Status PgApiImpl::SetTransactionIsolationLevel(int isolation) {
1438
161k
  return pg_txn_manager_->SetPgIsolationLevel(isolation);
1439
161k
}
1440
1441
160k
Status PgApiImpl::SetTransactionReadOnly(bool read_only) {
1442
160k
  return pg_txn_manager_->SetReadOnly(read_only);
1443
160k
}
1444
1445
160k
Status PgApiImpl::EnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms) {
1446
160k
  return pg_txn_manager_->EnableFollowerReads(enable_follower_reads, staleness_ms);
1447
160k
}
1448
1449
160k
Status PgApiImpl::SetTransactionDeferrable(bool deferrable) {
1450
160k
  return pg_txn_manager_->SetDeferrable(deferrable);
1451
160k
}
1452
1453
6.33k
Status PgApiImpl::EnterSeparateDdlTxnMode() {
1454
  // Flush all buffered operations as ddl txn use its own transaction session.
1455
6.33k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1456
6.33k
  return pg_txn_manager_->EnterSeparateDdlTxnMode();
1457
6.33k
}
1458
1459
5.94k
Status PgApiImpl::ExitSeparateDdlTxnMode() {
1460
  // Flush all buffered operations as ddl txn use its own transaction session.
1461
5.94k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1462
5.94k
  RETURN_NOT_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kTrue));
1463
  // Next reads from catalog tables have to see changes made by the DDL transaction.
1464
5.94k
  ResetCatalogReadTime();
1465
5.94k
  return Status::OK();
1466
5.94k
}
1467
1468
421
void PgApiImpl::ClearSeparateDdlTxnMode() {
1469
421
  pg_session_->DropBufferedOperations();
1470
421
  CHECK_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kFalse));
1471
421
}
1472
1473
48.8k
Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) {
1474
48.8k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1475
48.8k
  return pg_session_->SetActiveSubTransaction(id);
1476
48.8k
}
1477
1478
23.5k
Status PgApiImpl::RollbackSubTransaction(SubTransactionId id) {
1479
23.5k
  pg_session_->DropBufferedOperations();
1480
23.5k
  return pg_session_->RollbackSubTransaction(id);
1481
23.5k
}
1482
1483
765k
void PgApiImpl::ResetCatalogReadTime() {
1484
765k
  pg_session_->ResetCatalogReadPoint();
1485
765k
}
1486
1487
Result<bool> PgApiImpl::ForeignKeyReferenceExists(
1488
232k
    PgOid table_id, const Slice& ybctid, PgOid database_id) {
1489
232k
  return pg_session_->ForeignKeyReferenceExists(
1490
232k
      table_id, ybctid, std::bind(FetchExistingYbctids,
1491
232k
                                  pg_session_,
1492
232k
                                  database_id,
1493
232k
                                  std::placeholders::_1,
1494
232k
                                  std::placeholders::_2));
1495
232k
}
1496
1497
240k
void PgApiImpl::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) {
1498
240k
  pg_session_->AddForeignKeyReferenceIntent(table_id, ybctid);
1499
240k
}
1500
1501
62.9k
void PgApiImpl::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) {
1502
62.9k
  pg_session_->DeleteForeignKeyReference(table_id, ybctid);
1503
62.9k
}
1504
1505
1.51M
void PgApiImpl::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) {
1506
1.51M
  pg_session_->AddForeignKeyReference(table_id, ybctid);
1507
1.51M
}
1508
1509
0
void PgApiImpl::SetTimeout(const int timeout_ms) {
1510
0
  pg_session_->SetTimeout(timeout_ms);
1511
0
}
1512
1513
2
Result<client::TabletServersInfo> PgApiImpl::ListTabletServers() {
1514
2
  return pg_session_->ListTabletServers();
1515
2
}
1516
1517
0
Status PgApiImpl::ValidatePlacement(const char *placement_info) {
1518
0
  return pg_session_->ValidatePlacement(placement_info);
1519
0
}
1520
1521
} // namespace pggate
1522
} // namespace yb