YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pggate.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//--------------------------------------------------------------------------------------------------
14
15
#include "yb/yql/pggate/pggate.h"
16
17
#include <boost/optional.hpp>
18
19
#include "yb/client/client_fwd.h"
20
#include "yb/client/client.h"
21
#include "yb/client/client_utils.h"
22
#include "yb/client/tablet_server.h"
23
24
#include "yb/common/partition.h"
25
#include "yb/common/pg_system_attr.h"
26
#include "yb/common/schema.h"
27
28
#include "yb/docdb/doc_key.h"
29
#include "yb/docdb/primitive_value.h"
30
#include "yb/docdb/value_type.h"
31
32
#include "yb/gutil/casts.h"
33
34
#include "yb/rpc/messenger.h"
35
#include "yb/rpc/proxy.h"
36
#include "yb/rpc/secure_stream.h"
37
38
#include "yb/server/secure.h"
39
40
#include "yb/tserver/tserver_forward_service.proxy.h"
41
#include "yb/tserver/tserver_shared_mem.h"
42
43
#include "yb/util/format.h"
44
#include "yb/util/range.h"
45
#include "yb/util/shared_mem.h"
46
#include "yb/util/status_format.h"
47
#include "yb/util/status_log.h"
48
49
#include "yb/yql/pggate/pg_ddl.h"
50
#include "yb/yql/pggate/pg_delete.h"
51
#include "yb/yql/pggate/pg_insert.h"
52
#include "yb/yql/pggate/pg_memctx.h"
53
#include "yb/yql/pggate/pg_sample.h"
54
#include "yb/yql/pggate/pg_select.h"
55
#include "yb/yql/pggate/pg_truncate_colocated.h"
56
#include "yb/yql/pggate/pg_txn_manager.h"
57
#include "yb/yql/pggate/pg_update.h"
58
#include "yb/yql/pggate/pggate_flags.h"
59
#include "yb/yql/pggate/ybc_pggate.h"
60
61
using namespace std::literals;
62
63
DECLARE_string(rpc_bind_addresses);
64
DECLARE_bool(use_node_to_node_encryption);
65
DECLARE_string(certs_dir);
66
DECLARE_bool(node_to_node_encryption_use_client_certificates);
67
DECLARE_bool(ysql_forward_rpcs_to_local_tserver);
68
DECLARE_bool(use_node_hostname_for_local_tserver);
69
DECLARE_int32(backfill_index_client_rpc_timeout_ms);
70
71
namespace yb {
72
namespace pggate {
73
74
using docdb::PrimitiveValue;
75
using docdb::ValueType;
76
77
namespace {
78
79
CHECKED_STATUS AddColumn(PgCreateTable* pg_stmt, const char *attr_name, int attr_num,
80
                         const YBCPgTypeEntity *attr_type, bool is_hash, bool is_range,
81
11.7k
                         bool is_desc, bool is_nulls_first) {
82
11.7k
  using SortingType = SortingType;
83
11.7k
  SortingType sorting_type = SortingType::kNotSpecified;
84
85
11.7k
  if (!is_hash && 
is_range9.50k
) {
86
1.28k
    if (is_desc) {
87
89
      sorting_type = is_nulls_first ? 
SortingType::kDescending79
:
SortingType::kDescendingNullsLast10
;
88
1.19k
    } else {
89
1.19k
      sorting_type = is_nulls_first ? 
SortingType::kAscending3
:
SortingType::kAscendingNullsLast1.19k
;
90
1.19k
    }
91
1.28k
  }
92
93
11.7k
  return pg_stmt->AddColumn(attr_name, attr_num, attr_type, is_hash, is_range, sorting_type);
94
11.7k
}
95
96
Result<PgApiContext::MessengerHolder> BuildMessenger(
97
    const string& client_name,
98
    int32_t num_reactors,
99
    const scoped_refptr<MetricEntity>& metric_entity,
100
6.08k
    const std::shared_ptr<MemTracker>& parent_mem_tracker) {
101
6.08k
  std::unique_ptr<rpc::SecureContext> secure_context;
102
6.08k
  if (FLAGS_use_node_to_node_encryption) {
103
14
    secure_context = VERIFY_RESULT(server::CreateSecureContext(
104
14
        FLAGS_certs_dir,
105
14
        server::UseClientCerts(FLAGS_node_to_node_encryption_use_client_certificates)));
106
14
  }
107
6.08k
  auto messenger = VERIFY_RESULT(client::CreateClientMessenger(
108
6.08k
      client_name, num_reactors, metric_entity, parent_mem_tracker, secure_context.get()));
109
0
  return PgApiContext::MessengerHolder{std::move(secure_context), std::move(messenger)};
110
6.08k
}
111
112
6.08k
std::unique_ptr<tserver::TServerSharedObject> InitTServerSharedObject() {
113
6.08k
  LOG(INFO) << __func__ << ": " << YBCIsInitDbModeEnvVarSet() << ", "
114
6.08k
            << FLAGS_TEST_pggate_ignore_tserver_shm << ", " << FLAGS_pggate_tserver_shm_fd;
115
  // Do not use shared memory in initdb or if explicity set to be ignored.
116
6.09k
  if (
FLAGS_TEST_pggate_ignore_tserver_shm6.08k
|| FLAGS_pggate_tserver_shm_fd == -1) {
117
0
    return nullptr;
118
0
  }
119
6.08k
  return std::make_unique<tserver::TServerSharedObject>(CHECK_RESULT(
120
6.08k
      tserver::TServerSharedObject::OpenReadOnly(FLAGS_pggate_tserver_shm_fd)));
121
6.08k
}
122
123
Result<std::vector<std::string>> FetchExistingYbctids(PgSession::ScopedRefPtr session,
124
                                                      PgOid database_id,
125
                                                      PgOid table_id,
126
1.97k
                                                      const std::vector<Slice>& ybctids) {
127
1.97k
  auto desc  = VERIFY_RESULT(session->LoadTable(PgObjectId(database_id, table_id)));
128
0
  PgTable target(desc);
129
1.97k
  auto read_op = std::make_shared<PgsqlReadOp>(*target);
130
1.97k
  PgsqlExpressionPB* expr_pb = read_op->read_request().add_targets();
131
1.97k
  expr_pb->set_column_id(to_underlying(PgSystemAttrNum::kYBTupleId));
132
1.97k
  auto doc_op = std::make_shared<PgDocReadOp>(session, &target, std::move(read_op));
133
134
  // Postgres uses SELECT FOR KEY SHARE query for FK check.
135
  // Use same lock level.
136
1.97k
  PgExecParameters exec_params = doc_op->ExecParameters();
137
1.97k
  exec_params.rowmark = ROW_MARK_KEYSHARE;
138
1.97k
  RETURN_NOT_OK(doc_op->ExecuteInit(&exec_params));
139
1.97k
  RETURN_NOT_OK(doc_op->PopulateDmlByYbctidOps(ybctids));
140
1.97k
  RETURN_NOT_OK(doc_op->Execute());
141
1.97k
  std::vector<std::string> result;
142
1.97k
  result.reserve(ybctids.size());
143
1.97k
  std::list<PgDocResult> rowsets;
144
3.90k
  do {
145
3.90k
    rowsets.clear();
146
3.90k
    RETURN_NOT_OK(doc_op->GetResult(&rowsets));
147
3.85k
    for (auto& row : rowsets) {
148
2.19k
      RETURN_NOT_OK(row.ProcessSystemColumns());
149
101k
      
for (const auto& ybctid : row.ybctids())2.19k
{
150
101k
        result.push_back(ybctid.ToBuffer());
151
101k
      }
152
2.19k
    }
153
3.85k
  } while (!rowsets.empty());
154
1.92k
  return result;
155
1.97k
}
156
157
} // namespace
158
159
using std::make_shared;
160
using client::YBSession;
161
162
//--------------------------------------------------------------------------------------------------
163
164
6.08k
PggateOptions::PggateOptions() : ServerBaseOptions(kDefaultPort) {
165
6.08k
  server_type = "tserver";
166
6.08k
  rpc_opts.connection_keepalive_time_ms = FLAGS_pgsql_rpc_keepalive_time_ms;
167
168
6.08k
  if (FLAGS_pggate_proxy_bind_address.empty()) {
169
6.08k
    HostPort host_port;
170
6.08k
    CHECK_OK(host_port.ParseString(FLAGS_rpc_bind_addresses, 0));
171
6.08k
    host_port.set_port(PggateOptions::kDefaultPort);
172
6.08k
    FLAGS_pggate_proxy_bind_address = host_port.ToString();
173
6.08k
    LOG(INFO) << "Reset YSQL bind address to " << FLAGS_pggate_proxy_bind_address;
174
6.08k
  }
175
6.08k
  rpc_opts.rpc_bind_addresses = FLAGS_pggate_proxy_bind_address;
176
6.08k
  master_addresses_flag = FLAGS_pggate_master_addresses;
177
178
6.08k
  server::MasterAddresses master_addresses;
179
  // TODO: we might have to allow setting master_replication_factor similarly to how it is done
180
  // in tserver to support master auto-discovery on Kubernetes.
181
6.08k
  CHECK_OK(server::DetermineMasterAddresses(
182
6.08k
      "pggate_master_addresses", master_addresses_flag, /* master_replication_factor */ 0,
183
6.08k
      &master_addresses, &master_addresses_flag));
184
6.08k
  SetMasterAddresses(make_shared<server::MasterAddresses>(std::move(master_addresses)));
185
6.08k
}
186
187
PgApiContext::MessengerHolder::MessengerHolder(
188
    std::unique_ptr<rpc::SecureContext> security_context_,
189
    std::unique_ptr<rpc::Messenger> messenger_)
190
6.08k
    : security_context(std::move(security_context_)), messenger(std::move(messenger_)) {
191
6.08k
}
192
193
PgApiContext::MessengerHolder::MessengerHolder(MessengerHolder&& rhs)
194
    : security_context(std::move(rhs.security_context)),
195
18.2k
      messenger(std::move(rhs.messenger)) {
196
18.2k
}
197
198
24.3k
PgApiContext::MessengerHolder::~MessengerHolder() {
199
24.3k
}
200
201
PgApiContext::PgApiContext()
202
    : metric_registry(new MetricRegistry()),
203
      metric_entity(METRIC_ENTITY_server.Instantiate(metric_registry.get(), "yb.pggate")),
204
      mem_tracker(MemTracker::CreateTracker("PostgreSQL")),
205
      messenger_holder(CHECK_RESULT(BuildMessenger("pggate_ybclient",
206
                                                   FLAGS_pggate_ybclient_reactor_threads,
207
                                                   metric_entity,
208
                                                   mem_tracker))),
209
6.08k
      proxy_cache(std::make_unique<rpc::ProxyCache>(messenger_holder.messenger.get())) {
210
6.08k
}
211
212
0
PgApiContext::PgApiContext(PgApiContext&&) = default;
213
214
6.09k
PgApiContext::~PgApiContext() = default;
215
216
//--------------------------------------------------------------------------------------------------
217
218
PgApiImpl::PgApiImpl(
219
    PgApiContext context, const YBCPgTypeEntity *YBCDataTypeArray, int count,
220
    YBCPgCallbacks callbacks)
221
    : metric_registry_(std::move(context.metric_registry)),
222
      metric_entity_(std::move(context.metric_entity)),
223
      mem_tracker_(std::move(context.mem_tracker)),
224
      messenger_holder_(std::move(context.messenger_holder)),
225
      proxy_cache_(std::move(context.proxy_cache)),
226
      clock_(new server::HybridClock()),
227
      tserver_shared_object_(InitTServerSharedObject()),
228
      pg_callbacks_(callbacks),
229
      pg_txn_manager_(
230
          new PgTxnManager(
231
6.09k
              &pg_client_, clock_, tserver_shared_object_.get(), pg_callbacks_)) {
232
6.09k
  CHECK_OK(clock_->Init());
233
234
  // Setup type mapping.
235
990k
  for (int idx = 0; idx < count; 
idx++984k
) {
236
984k
    const YBCPgTypeEntity *type_entity = &YBCDataTypeArray[idx];
237
984k
    type_map_[type_entity->type_oid] = type_entity;
238
984k
  }
239
240
6.09k
  CHECK_OK(pg_client_.Start(
241
6.09k
      proxy_cache_.get(), &messenger_holder_.messenger->scheduler(),
242
6.09k
      *DCHECK_NOTNULL(tserver_shared_object_)));
243
6.09k
}
244
245
6.06k
PgApiImpl::~PgApiImpl() {
246
6.06k
  messenger_holder_.messenger->Shutdown();
247
6.06k
  pg_txn_manager_.reset();
248
6.06k
  pg_client_.Shutdown();
249
6.06k
}
250
251
73.1M
const YBCPgTypeEntity *PgApiImpl::FindTypeEntity(int type_oid) {
252
73.1M
  const auto iter = type_map_.find(type_oid);
253
73.1M
  if (iter != type_map_.end()) {
254
72.9M
    return iter->second;
255
72.9M
  }
256
129k
  return nullptr;
257
73.1M
}
258
259
//--------------------------------------------------------------------------------------------------
260
261
0
Status PgApiImpl::CreateEnv(PgEnv **pg_env) {
262
0
  *pg_env = pg_env_.get();
263
0
  return Status::OK();
264
0
}
265
266
0
Status PgApiImpl::DestroyEnv(PgEnv *pg_env) {
267
0
  pg_env_ = nullptr;
268
0
  return Status::OK();
269
0
}
270
271
//--------------------------------------------------------------------------------------------------
272
273
Status PgApiImpl::InitSession(const PgEnv *pg_env,
274
6.09k
                              const string& database_name) {
275
6.09k
  CHECK(!pg_session_);
276
6.09k
  auto session = make_scoped_refptr<PgSession>(&pg_client_,
277
6.09k
                                               database_name,
278
6.09k
                                               pg_txn_manager_,
279
6.09k
                                               clock_,
280
6.09k
                                               tserver_shared_object_.get(),
281
6.09k
                                               pg_callbacks_);
282
6.09k
  if (!database_name.empty()) {
283
6.07k
    RETURN_NOT_OK(session->ConnectDatabase(database_name));
284
6.07k
  }
285
286
6.09k
  pg_session_.swap(session);
287
6.09k
  return Status::OK();
288
6.09k
}
289
290
1.23k
Status PgApiImpl::InvalidateCache() {
291
1.23k
  pg_session_->InvalidateAllTablesCache();
292
1.23k
  return Status::OK();
293
1.23k
}
294
295
147
bool PgApiImpl::GetDisableTransparentCacheRefreshRetry() {
296
147
  return FLAGS_TEST_ysql_disable_transparent_cache_refresh_retry;
297
147
}
298
299
//--------------------------------------------------------------------------------------------------
300
301
676k
PgMemctx *PgApiImpl::CreateMemctx() {
302
  // Postgres will create YB Memctx when it first use the Memctx to allocate YugaByte object.
303
676k
  return PgMemctx::Create();
304
676k
}
305
306
668k
Status PgApiImpl::DestroyMemctx(PgMemctx *memctx) {
307
  // Postgres will destroy YB Memctx by releasing the pointer.
308
668k
  return PgMemctx::Destroy(memctx);
309
668k
}
310
311
3.95M
Status PgApiImpl::ResetMemctx(PgMemctx *memctx) {
312
  // Postgres reset YB Memctx when clearing a context content without clearing its nested context.
313
3.95M
  return PgMemctx::Reset(memctx);
314
3.95M
}
315
316
// TODO(neil) Use Arena in the future.
317
// - PgStatement should have been declared as derived class of "MCBase".
318
// - All objects of PgStatement's derived class should be allocated by YbPgMemctx::Arena.
319
// - We cannot use Arena yet because quite a large number of YugaByte objects are being referenced
320
//   from other layers.  Those added code violated the original design as they assume ScopedPtr
321
//   instead of memory pool is being used. This mess should be cleaned up later.
322
//
323
// For now, statements is allocated as ScopedPtr and cached in the memory context. The statements
324
// would then be destructed when the context is destroyed and all other references are also cleared.
325
Status PgApiImpl::AddToCurrentPgMemctx(std::unique_ptr<PgStatement> stmt,
326
8.80M
                                       PgStatement **handle) {
327
8.80M
  *handle = stmt.get();
328
8.80M
  pg_callbacks_.GetCurrentYbMemctx()->Register(stmt.release());
329
8.80M
  return Status::OK();
330
8.80M
}
331
332
// TODO(neil) Most like we don't need table_desc. If we do need it, use Arena here.
333
// - PgTableDesc should have been declared as derived class of "MCBase".
334
// - PgTableDesc objects should be allocated by YbPgMemctx::Arena.
335
//
336
// For now, table_desc is allocated as ScopedPtr and cached in the memory context. The table_desc
337
// would then be destructed when the context is destroyed.
338
Status PgApiImpl::AddToCurrentPgMemctx(size_t table_desc_id,
339
2.40M
                                       const PgTableDescPtr &table_desc) {
340
2.40M
  pg_callbacks_.GetCurrentYbMemctx()->Cache(table_desc_id, table_desc);
341
2.40M
  return Status::OK();
342
2.40M
}
343
344
16.4M
Status PgApiImpl::GetTabledescFromCurrentPgMemctx(size_t table_desc_id, PgTableDesc **handle) {
345
16.4M
  pg_callbacks_.GetCurrentYbMemctx()->GetCache(table_desc_id, handle);
346
16.4M
  return Status::OK();
347
16.4M
}
348
349
//--------------------------------------------------------------------------------------------------
350
351
0
Status PgApiImpl::CreateSequencesDataTable() {
352
0
  return pg_session_->CreateSequencesDataTable();
353
0
}
354
355
Status PgApiImpl::InsertSequenceTuple(int64_t db_oid,
356
                                      int64_t seq_oid,
357
                                      uint64_t ysql_catalog_version,
358
                                      int64_t last_val,
359
295
                                      bool is_called) {
360
295
  return pg_session_->InsertSequenceTuple(
361
295
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called);
362
295
}
363
364
Status PgApiImpl::UpdateSequenceTupleConditionally(int64_t db_oid,
365
                                                   int64_t seq_oid,
366
                                                   uint64_t ysql_catalog_version,
367
                                                   int64_t last_val,
368
                                                   bool is_called,
369
                                                   int64_t expected_last_val,
370
                                                   bool expected_is_called,
371
2.95k
                                                   bool *skipped) {
372
2.95k
  *skipped = VERIFY_RESULT(pg_session_->UpdateSequenceTuple(
373
0
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called,
374
0
      expected_last_val, expected_is_called));
375
0
  return Status::OK();
376
2.95k
}
377
378
Status PgApiImpl::UpdateSequenceTuple(int64_t db_oid,
379
                                      int64_t seq_oid,
380
                                      uint64_t ysql_catalog_version,
381
                                      int64_t last_val,
382
                                      bool is_called,
383
29
                                      bool* skipped) {
384
29
  bool result = VERIFY_RESULT(pg_session_->UpdateSequenceTuple(
385
29
      db_oid, seq_oid, ysql_catalog_version, last_val,
386
29
      is_called, boost::none, boost::none));
387
29
  if (skipped) {
388
19
    *skipped = result;
389
19
  }
390
29
  return Status::OK();
391
29
}
392
393
Status PgApiImpl::ReadSequenceTuple(int64_t db_oid,
394
                                    int64_t seq_oid,
395
                                    uint64_t ysql_catalog_version,
396
                                    int64_t *last_val,
397
3.23k
                                    bool *is_called) {
398
3.23k
  auto res = VERIFY_RESULT(pg_session_->ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version));
399
3.23k
  if (last_val) {
400
3.23k
    *last_val = res.first;
401
3.23k
  }
402
3.23k
  if (is_called) {
403
3.23k
    *is_called = res.second;
404
3.23k
  }
405
3.23k
  return Status::OK();
406
3.23k
}
407
408
282
Status PgApiImpl::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) {
409
282
  return pg_session_->DeleteSequenceTuple(db_oid, seq_oid);
410
282
}
411
412
413
//--------------------------------------------------------------------------------------------------
414
415
7.61M
void PgApiImpl::DeleteStatement(PgStatement *handle) {
416
7.61M
  if (
handle7.61M
) {
417
7.61M
    PgMemctx::Destroy(handle);
418
7.61M
  }
419
7.61M
}
420
421
//--------------------------------------------------------------------------------------------------
422
423
8
Status PgApiImpl::ConnectDatabase(const char *database_name) {
424
8
  return pg_session_->ConnectDatabase(database_name);
425
8
}
426
427
5.72k
Status PgApiImpl::IsDatabaseColocated(const PgOid database_oid, bool *colocated) {
428
5.72k
  return pg_session_->IsDatabaseColocated(database_oid, colocated);
429
5.72k
}
430
431
Status PgApiImpl::NewCreateDatabase(const char *database_name,
432
                                    const PgOid database_oid,
433
                                    const PgOid source_database_oid,
434
                                    const PgOid next_oid,
435
                                    const bool colocated,
436
134
                                    PgStatement **handle) {
437
134
  auto stmt = std::make_unique<PgCreateDatabase>(pg_session_, database_name, database_oid,
438
134
                                                 source_database_oid, next_oid, colocated);
439
134
  if (pg_txn_manager_->IsDdlMode()) {
440
114
    stmt->UseTransaction();
441
114
  }
442
134
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
443
134
  return Status::OK();
444
134
}
445
446
134
Status PgApiImpl::ExecCreateDatabase(PgStatement *handle) {
447
134
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_DATABASE)) {
448
    // Invalid handle.
449
0
    return STATUS(InvalidArgument, "Invalid statement handle");
450
0
  }
451
452
134
  return down_cast<PgCreateDatabase*>(handle)->Exec();
453
134
}
454
455
Status PgApiImpl::NewDropDatabase(const char *database_name,
456
                                  PgOid database_oid,
457
72
                                  PgStatement **handle) {
458
72
  auto stmt = std::make_unique<PgDropDatabase>(pg_session_, database_name, database_oid);
459
72
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
460
72
  return Status::OK();
461
72
}
462
463
72
Status PgApiImpl::ExecDropDatabase(PgStatement *handle) {
464
72
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_DATABASE)) {
465
    // Invalid handle.
466
0
    return STATUS(InvalidArgument, "Invalid statement handle");
467
0
  }
468
72
  return down_cast<PgDropDatabase*>(handle)->Exec();
469
72
}
470
471
Status PgApiImpl::NewAlterDatabase(const char *database_name,
472
                                  PgOid database_oid,
473
3
                                  PgStatement **handle) {
474
3
  auto stmt = std::make_unique<PgAlterDatabase>(pg_session_, database_name, database_oid);
475
3
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
476
3
  return Status::OK();
477
3
}
478
479
3
Status PgApiImpl::AlterDatabaseRenameDatabase(PgStatement *handle, const char *newname) {
480
3
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) {
481
    // Invalid handle.
482
0
    return STATUS(InvalidArgument, "Invalid statement handle");
483
0
  }
484
3
  down_cast<PgAlterDatabase*>(handle)->RenameDatabase(newname);
485
3
  return Status::OK();
486
3
}
487
488
3
Status PgApiImpl::ExecAlterDatabase(PgStatement *handle) {
489
3
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_DATABASE)) {
490
    // Invalid handle.
491
0
    return STATUS(InvalidArgument, "Invalid statement handle");
492
0
  }
493
3
  return down_cast<PgAlterDatabase*>(handle)->Exec();
494
3
}
495
496
Status PgApiImpl::ReserveOids(const PgOid database_oid,
497
                              const PgOid next_oid,
498
                              const uint32_t count,
499
                              PgOid *begin_oid,
500
805
                              PgOid *end_oid) {
501
805
  auto p = VERIFY_RESULT(pg_client_.ReserveOids(database_oid, next_oid, count));
502
0
  *begin_oid = p.first;
503
805
  *end_oid = p.second;
504
805
  return Status::OK();
505
805
}
506
507
22
Status PgApiImpl::GetCatalogMasterVersion(uint64_t *version) {
508
22
  return pg_session_->GetCatalogMasterVersion(version);
509
22
}
510
511
17.4k
Result<PgTableDescPtr> PgApiImpl::LoadTable(const PgObjectId& table_id) {
512
17.4k
  return pg_session_->LoadTable(table_id);
513
17.4k
}
514
515
9
void PgApiImpl::InvalidateTableCache(const PgObjectId& table_id) {
516
9
  pg_session_->InvalidateTableCache(table_id, InvalidateOnPgClient::kTrue);
517
9
}
518
519
//--------------------------------------------------------------------------------------------------
520
521
Status PgApiImpl::NewCreateTablegroup(const char *database_name,
522
                                      const PgOid database_oid,
523
                                      const PgOid tablegroup_oid,
524
                                      const PgOid tablespace_oid,
525
54
                                      PgStatement **handle) {
526
54
  auto stmt = std::make_unique<PgCreateTablegroup>(pg_session_, database_name,
527
54
                                                   database_oid, tablegroup_oid, tablespace_oid);
528
54
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
529
54
  return Status::OK();
530
54
}
531
532
54
Status PgApiImpl::ExecCreateTablegroup(PgStatement *handle) {
533
54
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLEGROUP)) {
534
    // Invalid handle.
535
0
    return STATUS(InvalidArgument, "Invalid statement handle");
536
0
  }
537
538
54
  return down_cast<PgCreateTablegroup*>(handle)->Exec();
539
54
}
540
541
Status PgApiImpl::NewDropTablegroup(const PgOid database_oid,
542
                                    const PgOid tablegroup_oid,
543
39
                                    PgStatement **handle) {
544
39
  auto stmt = std::make_unique<PgDropTablegroup>(pg_session_, database_oid, tablegroup_oid);
545
39
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
546
39
  return Status::OK();
547
39
}
548
549
550
0
Status PgApiImpl::ExecDropTablegroup(PgStatement *handle) {
551
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DROP_TABLEGROUP)) {
552
    // Invalid handle.
553
0
    return STATUS(InvalidArgument, "Invalid statement handle");
554
0
  }
555
0
  return down_cast<PgDropTablegroup*>(handle)->Exec();
556
0
}
557
558
559
//--------------------------------------------------------------------------------------------------
560
561
Status PgApiImpl::NewCreateTable(const char *database_name,
562
                                 const char *schema_name,
563
                                 const char *table_name,
564
                                 const PgObjectId& table_id,
565
                                 bool is_shared_table,
566
                                 bool if_not_exist,
567
                                 bool add_primary_key,
568
                                 const bool colocated,
569
                                 const PgObjectId& tablegroup_oid,
570
                                 const ColocationId colocation_id,
571
                                 const PgObjectId& tablespace_oid,
572
                                 const PgObjectId& matview_pg_table_oid,
573
4.28k
                                 PgStatement **handle) {
574
4.28k
  auto stmt = std::make_unique<PgCreateTable>(
575
4.28k
      pg_session_, database_name, schema_name, table_name,
576
4.28k
      table_id, is_shared_table, if_not_exist, add_primary_key, colocated, tablegroup_oid,
577
4.28k
      colocation_id, tablespace_oid, matview_pg_table_oid);
578
4.28k
  if (pg_txn_manager_->IsDdlMode()) {
579
4.13k
    stmt->UseTransaction();
580
4.13k
  }
581
4.28k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
582
4.28k
  return Status::OK();
583
4.28k
}
584
585
Status PgApiImpl::CreateTableAddColumn(PgStatement *handle, const char *attr_name, int attr_num,
586
                                       const YBCPgTypeEntity *attr_type,
587
                                       bool is_hash, bool is_range,
588
10.4k
                                       bool is_desc, bool is_nulls_first) {
589
10.4k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) {
590
    // Invalid handle.
591
0
    return STATUS(InvalidArgument, "Invalid statement handle");
592
0
  }
593
10.4k
  return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type,
594
10.4k
      is_hash, is_range, is_desc, is_nulls_first);
595
10.4k
}
596
597
340
Status PgApiImpl::CreateTableSetNumTablets(PgStatement *handle, int32_t num_tablets) {
598
340
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) {
599
    // Invalid handle.
600
0
    return STATUS(InvalidArgument, "Invalid statement handle");
601
0
  }
602
340
  return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets);
603
340
}
604
605
181
Status PgApiImpl::AddSplitBoundary(PgStatement *handle, PgExpr **exprs, int expr_count) {
606
  // Partitioning a TABLE or an INDEX.
607
181
  if (PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE) ||
608
181
      
PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)28
) {
609
181
    return down_cast<PgCreateTable*>(handle)->AddSplitBoundary(exprs, expr_count);
610
181
  }
611
612
  // Invalid handle.
613
0
  return STATUS(InvalidArgument, "Invalid statement handle");
614
181
}
615
616
4.19k
Status PgApiImpl::ExecCreateTable(PgStatement *handle) {
617
4.19k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_TABLE)) {
618
    // Invalid handle.
619
0
    return STATUS(InvalidArgument, "Invalid statement handle");
620
0
  }
621
4.19k
  return down_cast<PgCreateTable*>(handle)->Exec();
622
4.19k
}
623
624
Status PgApiImpl::NewAlterTable(const PgObjectId& table_id,
625
1.71k
                                PgStatement **handle) {
626
1.71k
  auto stmt = std::make_unique<PgAlterTable>(pg_session_, table_id);
627
1.71k
  if (pg_txn_manager_->IsDdlMode()) {
628
1.71k
    stmt->UseTransaction();
629
1.71k
  }
630
1.71k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
631
1.71k
  return Status::OK();
632
1.71k
}
633
634
Status PgApiImpl::AlterTableAddColumn(PgStatement *handle, const char *name,
635
238
                                      int order, const YBCPgTypeEntity *attr_type) {
636
238
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
637
    // Invalid handle.
638
0
    return STATUS(InvalidArgument, "Invalid statement handle");
639
0
  }
640
641
238
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
642
238
  return pg_stmt->AddColumn(name, attr_type, order);
643
238
}
644
645
Status PgApiImpl::AlterTableRenameColumn(PgStatement *handle, const char *oldname,
646
17
                                         const char *newname) {
647
17
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
648
    // Invalid handle.
649
0
    return STATUS(InvalidArgument, "Invalid statement handle");
650
0
  }
651
652
17
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
653
17
  return pg_stmt->RenameColumn(oldname, newname);
654
17
}
655
656
408
Status PgApiImpl::AlterTableDropColumn(PgStatement *handle, const char *name) {
657
408
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
658
    // Invalid handle.
659
0
    return STATUS(InvalidArgument, "Invalid statement handle");
660
0
  }
661
662
408
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
663
408
  return pg_stmt->DropColumn(name);
664
408
}
665
666
Status PgApiImpl::AlterTableRenameTable(PgStatement *handle, const char *db_name,
667
115
                                        const char *newname) {
668
115
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
669
    // Invalid handle.
670
0
    return STATUS(InvalidArgument, "Invalid statement handle");
671
0
  }
672
673
115
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
674
115
  return pg_stmt->RenameTable(db_name, newname);
675
115
}
676
677
522
Status PgApiImpl::ExecAlterTable(PgStatement *handle) {
678
522
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_ALTER_TABLE)) {
679
    // Invalid handle.
680
0
    return STATUS(InvalidArgument, "Invalid statement handle");
681
0
  }
682
522
  PgAlterTable *pg_stmt = down_cast<PgAlterTable*>(handle);
683
522
  return pg_stmt->Exec();
684
522
}
685
686
Status PgApiImpl::NewDropTable(const PgObjectId& table_id,
687
                               bool if_exist,
688
3.49k
                               PgStatement **handle) {
689
3.49k
  auto stmt = std::make_unique<PgDropTable>(pg_session_, table_id, if_exist);
690
3.49k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
691
3.49k
  return Status::OK();
692
3.49k
}
693
694
Status PgApiImpl::NewTruncateTable(const PgObjectId& table_id,
695
624
                                   PgStatement **handle) {
696
624
  auto stmt = std::make_unique<PgTruncateTable>(pg_session_, table_id);
697
624
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
698
624
  return Status::OK();
699
624
}
700
701
624
Status PgApiImpl::ExecTruncateTable(PgStatement *handle) {
702
624
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE_TABLE)) {
703
    // Invalid handle.
704
0
    return STATUS(InvalidArgument, "Invalid statement handle");
705
0
  }
706
624
  return down_cast<PgTruncateTable*>(handle)->Exec();
707
624
}
708
709
Status PgApiImpl::GetTableDesc(const PgObjectId& table_id,
710
16.4M
                               PgTableDesc **handle) {
711
  // First read from memory context.
712
16.4M
  size_t hash_id = hash_value(table_id);
713
16.4M
  RETURN_NOT_OK(GetTabledescFromCurrentPgMemctx(hash_id, handle));
714
715
  // Read from environment.
716
16.4M
  if (*handle == nullptr) {
717
2.40M
    auto result = pg_session_->LoadTable(table_id);
718
2.40M
    RETURN_NOT_OK(result);
719
2.40M
    RETURN_NOT_OK(AddToCurrentPgMemctx(hash_id, *result));
720
721
2.40M
    *handle = result->get();
722
2.40M
  }
723
724
16.4M
  return Status::OK();
725
16.4M
}
726
727
Result<YBCPgColumnInfo> PgApiImpl::GetColumnInfo(YBCPgTableDesc table_desc,
728
77.7M
                                                 int16_t attr_number) {
729
77.7M
  return table_desc->GetColumnInfo(attr_number);
730
77.7M
}
731
732
33.8k
Status PgApiImpl::DmlModifiesRow(PgStatement *handle, bool *modifies_row) {
733
33.8k
  if (!handle) {
734
0
    return STATUS(InvalidArgument, "Invalid statement handle");
735
0
  }
736
737
33.8k
  *modifies_row = false;
738
739
33.8k
  switch (handle->stmt_op()) {
740
188
    case StmtOp::STMT_UPDATE:
741
188
    case StmtOp::STMT_DELETE:
742
188
      *modifies_row = true;
743
188
      break;
744
33.7k
    default:
745
33.7k
      break;
746
33.8k
  }
747
748
33.8k
  return Status::OK();
749
33.8k
}
750
751
14
Status PgApiImpl::SetIsSysCatalogVersionChange(PgStatement *handle) {
752
14
  if (!handle) {
753
0
    return STATUS(InvalidArgument, "Invalid statement handle");
754
0
  }
755
756
14
  switch (handle->stmt_op()) {
757
12
    case StmtOp::STMT_UPDATE:
758
12
    case StmtOp::STMT_DELETE:
759
14
    case StmtOp::STMT_INSERT:
760
14
      down_cast<PgDmlWrite *>(handle)->SetIsSystemCatalogChange();
761
14
      return Status::OK();
762
0
    default:
763
0
      break;
764
14
  }
765
766
0
  return STATUS(InvalidArgument, "Invalid statement handle");
767
14
}
768
769
7.56M
Status PgApiImpl::SetCatalogCacheVersion(PgStatement *handle, uint64_t catalog_cache_version) {
770
7.56M
  if (!handle) {
771
0
    return STATUS(InvalidArgument, "Invalid statement handle");
772
0
  }
773
774
7.56M
  switch (handle->stmt_op()) {
775
368k
    case StmtOp::STMT_SELECT:
776
5.96M
    case StmtOp::STMT_INSERT:
777
6.65M
    case StmtOp::STMT_UPDATE:
778
7.56M
    case StmtOp::STMT_DELETE:
779
7.56M
      down_cast<PgDml *>(handle)->SetCatalogCacheVersion(catalog_cache_version);
780
7.56M
      return Status::OK();
781
0
    default:
782
0
      break;
783
7.56M
  }
784
785
0
  return STATUS(InvalidArgument, "Invalid statement handle");
786
7.56M
}
787
788
//--------------------------------------------------------------------------------------------------
789
790
Status PgApiImpl::NewCreateIndex(const char *database_name,
791
                                 const char *schema_name,
792
                                 const char *index_name,
793
                                 const PgObjectId& index_id,
794
                                 const PgObjectId& base_table_id,
795
                                 bool is_shared_index,
796
                                 bool is_unique_index,
797
                                 const bool skip_index_backfill,
798
                                 bool if_not_exist,
799
                                 const PgObjectId& tablegroup_oid,
800
                                 const YBCPgOid& colocation_id,
801
                                 const PgObjectId& tablespace_oid,
802
868
                                 PgStatement **handle) {
803
868
  auto stmt = std::make_unique<PgCreateTable>(
804
868
      pg_session_, database_name, schema_name, index_name, index_id, is_shared_index,
805
868
      if_not_exist, false /* add_primary_key */,
806
868
      tablegroup_oid.IsValid() ? 
false21
:
true847
/* colocated */, tablegroup_oid, colocation_id,
807
868
      tablespace_oid, PgObjectId() /* matview_pg_table_id */);
808
868
  stmt->SetupIndex(base_table_id, is_unique_index, skip_index_backfill);
809
868
  if (pg_txn_manager_->IsDdlMode()) {
810
756
      stmt->UseTransaction();
811
756
  }
812
868
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
813
868
  return Status::OK();
814
868
}
815
816
Status PgApiImpl::CreateIndexAddColumn(PgStatement *handle, const char *attr_name, int attr_num,
817
                                       const YBCPgTypeEntity *attr_type,
818
                                       bool is_hash, bool is_range,
819
1.24k
                                       bool is_desc, bool is_nulls_first) {
820
1.24k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) {
821
    // Invalid handle.
822
0
    return STATUS(InvalidArgument, "Invalid statement handle");
823
0
  }
824
825
1.24k
  return AddColumn(down_cast<PgCreateTable*>(handle), attr_name, attr_num, attr_type,
826
1.24k
      is_hash, is_range, is_desc, is_nulls_first);
827
1.24k
}
828
829
13
Status PgApiImpl::CreateIndexSetNumTablets(PgStatement *handle, int32_t num_tablets) {
830
13
  SCHECK(PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX),
831
13
         InvalidArgument,
832
13
         "Invalid statement handle");
833
13
  return down_cast<PgCreateTable*>(handle)->SetNumTablets(num_tablets);
834
13
}
835
836
862
Status PgApiImpl::ExecCreateIndex(PgStatement *handle) {
837
862
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_CREATE_INDEX)) {
838
    // Invalid handle.
839
0
    return STATUS(InvalidArgument, "Invalid statement handle");
840
0
  }
841
862
  return down_cast<PgCreateTable*>(handle)->Exec();
842
862
}
843
844
Status PgApiImpl::NewDropIndex(const PgObjectId& index_id,
845
                               bool if_exist,
846
672
                               PgStatement **handle) {
847
672
  auto stmt = std::make_unique<PgDropIndex>(pg_session_, index_id, if_exist);
848
672
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
849
672
  return Status::OK();
850
672
}
851
852
4.18k
Status PgApiImpl::ExecPostponedDdlStmt(PgStatement *handle) {
853
4.18k
  if (!handle) {
854
0
    return STATUS(InvalidArgument, "Invalid statement handle");
855
0
  }
856
857
4.18k
  switch (handle->stmt_op()) {
858
3.48k
    case StmtOp::STMT_DROP_TABLE:
859
3.48k
      return down_cast<PgDropTable*>(handle)->Exec();
860
669
    case StmtOp::STMT_DROP_INDEX:
861
669
      return down_cast<PgDropIndex*>(handle)->Exec();
862
39
    case StmtOp::STMT_DROP_TABLEGROUP:
863
39
      return down_cast<PgDropTablegroup*>(handle)->Exec();
864
865
0
    default:
866
0
      break;
867
4.18k
  }
868
0
  return STATUS(InvalidArgument, "Invalid statement handle");
869
4.18k
}
870
871
540
Status PgApiImpl::BackfillIndex(const PgObjectId& table_id) {
872
540
  tserver::PgBackfillIndexRequestPB req;
873
540
  table_id.ToPB(req.mutable_table_id());
874
540
  return pg_session_->pg_client().BackfillIndex(
875
540
      &req, CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms);
876
540
}
877
878
//--------------------------------------------------------------------------------------------------
879
// DML Statment Support.
880
//--------------------------------------------------------------------------------------------------
881
882
// Binding -----------------------------------------------------------------------------------------
883
884
22.3M
Status PgApiImpl::DmlAppendTarget(PgStatement *handle, PgExpr *target) {
885
22.3M
  return down_cast<PgDml*>(handle)->AppendTarget(target);
886
22.3M
}
887
888
22
Status PgApiImpl::DmlAppendQual(PgStatement *handle, PgExpr *qual) {
889
22
  return down_cast<PgDml*>(handle)->AppendQual(qual);
890
22
}
891
892
11.9k
Status PgApiImpl::DmlAppendColumnRef(PgStatement *handle, PgExpr *colref) {
893
11.9k
  return down_cast<PgDml*>(handle)->AppendColumnRef(colref);
894
11.9k
}
895
896
31.1M
Status PgApiImpl::DmlBindColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) {
897
31.1M
  return down_cast<PgDml*>(handle)->BindColumn(attr_num, attr_value);
898
31.1M
}
899
900
Status PgApiImpl::DmlBindColumnCondBetween(PgStatement *handle, int attr_num, PgExpr *attr_value,
901
306
    PgExpr *attr_value_end) {
902
306
  return down_cast<PgDmlRead*>(handle)->BindColumnCondBetween(attr_num, attr_value, attr_value_end);
903
306
}
904
905
Status PgApiImpl::DmlBindColumnCondIn(PgStatement *handle, int attr_num, int n_attr_values,
906
92.3k
    PgExpr **attr_values) {
907
92.3k
  return down_cast<PgDmlRead*>(handle)->BindColumnCondIn(attr_num, n_attr_values, attr_values);
908
92.3k
}
909
910
Status PgApiImpl::DmlAddRowUpperBound(YBCPgStatement handle,
911
90
    int n_col_values, PgExpr **col_values, bool is_inclusive) {
912
90
    return down_cast<PgDmlRead*>(handle)->AddRowUpperBound(handle,
913
90
                                                        n_col_values,
914
90
                                                        col_values,
915
90
                                                        is_inclusive);
916
90
}
917
918
Status PgApiImpl::DmlAddRowLowerBound(YBCPgStatement handle,
919
88
    int n_col_values, PgExpr **col_values, bool is_inclusive) {
920
88
    return down_cast<PgDmlRead*>(handle)->AddRowLowerBound(handle,
921
88
                                                        n_col_values,
922
88
                                                        col_values,
923
88
                                                        is_inclusive);
924
88
}
925
926
Status PgApiImpl::DmlBindHashCode(PgStatement *handle, bool start_valid,
927
                                    bool start_inclusive,
928
                                    uint64_t start_hash_val, bool end_valid,
929
79
                                    bool end_inclusive, uint64_t end_hash_val) {
930
79
  return down_cast<PgDmlRead*>(handle)
931
79
                  ->BindHashCode(start_valid, start_inclusive, start_hash_val,
932
79
                                  end_valid, end_inclusive, end_hash_val);
933
79
}
934
935
90
Status PgApiImpl::DmlBindTable(PgStatement *handle) {
936
90
  return down_cast<PgDml*>(handle)->BindTable();
937
90
}
938
939
6.19M
Result<YBCPgColumnInfo> PgApiImpl::DmlGetColumnInfo(YBCPgStatement handle, int attr_num) {
940
6.19M
  return down_cast<PgDml*>(handle)->GetColumnInfo(attr_num);
941
6.19M
}
942
943
983k
CHECKED_STATUS PgApiImpl::DmlAssignColumn(PgStatement *handle, int attr_num, PgExpr *attr_value) {
944
983k
  return down_cast<PgDml*>(handle)->AssignColumn(attr_num, attr_value);
945
983k
}
946
947
Status PgApiImpl::DmlFetch(PgStatement *handle, int32_t natts, uint64_t *values, bool *isnulls,
948
53.1M
                           PgSysColumns *syscols, bool *has_data) {
949
53.1M
  return down_cast<PgDml*>(handle)->Fetch(natts, values, isnulls, syscols, has_data);
950
53.1M
}
951
952
Status PgApiImpl::ProcessYBTupleId(const YBCPgYBTupleIdDescriptor& descr,
953
5.07M
                                   const YBTupleIdProcessor& processor) {
954
5.07M
  auto target_desc = VERIFY_RESULT(pg_session_->LoadTable(
955
5.07M
      PgObjectId(descr.database_oid, descr.table_oid)));
956
5.07M
  SCHECK_EQ(descr.nattrs, target_desc->num_key_columns(), Corruption,
957
5.07M
            "Number of key components does not match column description");
958
5.07M
  vector<PrimitiveValue> *values = nullptr;
959
5.07M
  PgsqlExpressionPB *expr_pb;
960
5.07M
  PgsqlExpressionPB temp_expr_pb;
961
5.07M
  google::protobuf::RepeatedPtrField<PgsqlExpressionPB> hashed_values;
962
5.07M
  vector<docdb::PrimitiveValue> hashed_components, range_components;
963
5.07M
  hashed_components.reserve(target_desc->num_hash_key_columns());
964
5.07M
  range_components.reserve(target_desc->num_key_columns() - target_desc->num_hash_key_columns());
965
5.07M
  size_t remain_attr = descr.nattrs;
966
  // DocDB API requires that partition columns must be listed in their created-order.
967
  // Order from target_desc should be used as attributes sequence may have different order.
968
6.48M
  for (size_t i : Range(target_desc->schema().columns().size())) {
969
6.48M
    PgColumn column(target_desc->schema(), i);
970
8.76M
    for (auto attr = descr.attrs, end = descr.attrs + descr.nattrs; attr != end; 
++attr2.27M
) {
971
8.76M
      if (attr->attr_num == column.attr_num()) {
972
6.48M
        if (!column.is_primary()) {
973
0
          return STATUS_SUBSTITUTE(
974
0
              InvalidArgument, "Attribute number $0 not a primary attribute", attr->attr_num);
975
0
        }
976
6.48M
        if (column.is_partition()) {
977
          // Hashed component.
978
4.48M
          values = &hashed_components;
979
4.48M
          expr_pb = hashed_values.Add();
980
4.48M
        } else {
981
          // Range component.
982
2.00M
          values = &range_components;
983
2.00M
          expr_pb = &temp_expr_pb;
984
2.00M
        }
985
986
6.48M
        if (attr->is_null) {
987
164k
          values->emplace_back(ValueType::kNullLow);
988
6.32M
        } else {
989
6.32M
          if (attr->attr_num == to_underlying(PgSystemAttrNum::kYBRowId)) {
990
1.22M
            expr_pb->mutable_value()->set_binary_value(pg_session_->GenerateNewRowid());
991
5.09M
          } else {
992
5.09M
            const YBCPgCollationInfo& collation_info = attr->collation_info;
993
5.09M
            PgConstant value(
994
5.09M
                attr->type_entity, collation_info.collate_is_valid_non_c,
995
5.09M
                collation_info.sortkey, attr->datum, false);
996
5.09M
            SCHECK_EQ(column.internal_type(), value.internal_type(), Corruption,
997
5.09M
                      "Attribute value type does not match column type");
998
5.09M
            RETURN_NOT_OK(value.Eval(expr_pb->mutable_value()));
999
5.09M
          }
1000
6.32M
          values->push_back(PrimitiveValue::FromQLValuePB(expr_pb->value(),
1001
6.32M
                                                          column.desc().sorting_type()));
1002
6.32M
        }
1003
1004
6.48M
        if (--remain_attr == 0) {
1005
5.07M
          SCHECK_EQ(hashed_components.size(), target_desc->num_hash_key_columns(), Corruption,
1006
5.07M
                    "Number of hashed components does not match column description");
1007
5.07M
          SCHECK_EQ(range_components.size(),
1008
5.07M
                    target_desc->num_key_columns() - target_desc->num_hash_key_columns(),
1009
5.07M
                    Corruption, "Number of range components does not match column description");
1010
5.07M
          if (hashed_values.empty()) {
1011
704k
            return processor(docdb::DocKey(move(range_components)).Encode());
1012
704k
          }
1013
4.37M
          string partition_key;
1014
4.37M
          const PartitionSchema& partition_schema = target_desc->partition_schema();
1015
4.37M
          RETURN_NOT_OK(partition_schema.EncodeKey(hashed_values, &partition_key));
1016
4.37M
          const uint16_t hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
1017
1018
4.37M
          return processor(
1019
4.37M
              docdb::DocKey(hash, move(hashed_components), move(range_components)).Encode());
1020
4.37M
        }
1021
1.41M
        break;
1022
6.48M
      }
1023
8.76M
    }
1024
6.48M
  }
1025
1026
529
  return STATUS_FORMAT(Corruption, "Not all attributes ($0) were resolved", remain_attr);
1027
5.07M
}
1028
1029
667k
Status PgApiImpl::StartOperationsBuffering() {
1030
667k
  return pg_session_->StartOperationsBuffering();
1031
667k
}
1032
1033
610k
Status PgApiImpl::StopOperationsBuffering() {
1034
610k
  return pg_session_->StopOperationsBuffering();
1035
610k
}
1036
1037
114k
void PgApiImpl::ResetOperationsBuffering() {
1038
114k
  pg_session_->ResetOperationsBuffering();
1039
114k
}
1040
1041
595k
Status PgApiImpl::FlushBufferedOperations() {
1042
595k
  return pg_session_->FlushBufferedOperations();
1043
595k
}
1044
1045
7.20M
Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count) {
1046
7.20M
  switch (handle->stmt_op()) {
1047
5.59M
    case StmtOp::STMT_INSERT:
1048
6.29M
    case StmtOp::STMT_UPDATE:
1049
7.20M
    case StmtOp::STMT_DELETE:
1050
7.20M
    case StmtOp::STMT_TRUNCATE:
1051
7.20M
      {
1052
7.20M
        auto dml_write = down_cast<PgDmlWrite *>(handle);
1053
7.20M
        RETURN_NOT_OK(dml_write->Exec(rows_affected_count != nullptr /* force_non_bufferable */));
1054
7.20M
        if (rows_affected_count) {
1055
19.1k
          *rows_affected_count = dml_write->GetRowsAffectedCount();
1056
19.1k
        }
1057
7.20M
        return Status::OK();
1058
7.20M
      }
1059
0
    default:
1060
0
      break;
1061
7.20M
  }
1062
0
  return STATUS(InvalidArgument, "Invalid statement handle");
1063
7.20M
}
1064
1065
// Insert ------------------------------------------------------------------------------------------
1066
1067
Status PgApiImpl::NewInsert(const PgObjectId& table_id,
1068
                            const bool is_single_row_txn,
1069
5.59M
                            PgStatement **handle) {
1070
5.59M
  *handle = nullptr;
1071
5.59M
  auto stmt = std::make_unique<PgInsert>(pg_session_, table_id, is_single_row_txn);
1072
5.59M
  RETURN_NOT_OK(stmt->Prepare());
1073
5.59M
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1074
5.59M
  return Status::OK();
1075
5.59M
}
1076
1077
1.04k
Status PgApiImpl::ExecInsert(PgStatement *handle) {
1078
1.04k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1079
    // Invalid handle.
1080
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1081
0
  }
1082
1.04k
  return down_cast<PgInsert*>(handle)->Exec();
1083
1.04k
}
1084
1085
953k
Status PgApiImpl::InsertStmtSetUpsertMode(PgStatement *handle) {
1086
953k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1087
    // Invalid handle.
1088
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1089
0
  }
1090
953k
  down_cast<PgInsert*>(handle)->SetUpsertMode();
1091
1092
953k
  return Status::OK();
1093
953k
}
1094
1095
371k
Status PgApiImpl::InsertStmtSetWriteTime(PgStatement *handle, const HybridTime write_time) {
1096
371k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1097
    // Invalid handle.
1098
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1099
0
  }
1100
371k
  RETURN_NOT_OK(down_cast<PgInsert*>(handle)->SetWriteTime(write_time));
1101
371k
  return Status::OK();
1102
371k
}
1103
1104
371k
Status PgApiImpl::InsertStmtSetIsBackfill(PgStatement *handle, const bool is_backfill) {
1105
371k
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) {
1106
    // Invalid handle.
1107
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1108
0
  }
1109
371k
  down_cast<PgInsert*>(handle)->SetIsBackfill(is_backfill);
1110
371k
  return Status::OK();
1111
371k
}
1112
1113
// Update ------------------------------------------------------------------------------------------
1114
1115
Status PgApiImpl::NewUpdate(const PgObjectId& table_id,
1116
                            const bool is_single_row_txn,
1117
704k
                            PgStatement **handle) {
1118
704k
  *handle = nullptr;
1119
704k
  auto stmt = std::make_unique<PgUpdate>(pg_session_, table_id, is_single_row_txn);
1120
704k
  RETURN_NOT_OK(stmt->Prepare());
1121
704k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1122
704k
  return Status::OK();
1123
704k
}
1124
1125
8
Status PgApiImpl::ExecUpdate(PgStatement *handle) {
1126
8
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_UPDATE)) {
1127
    // Invalid handle.
1128
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1129
0
  }
1130
8
  return down_cast<PgUpdate*>(handle)->Exec();
1131
8
}
1132
1133
// Delete ------------------------------------------------------------------------------------------
1134
1135
Status PgApiImpl::NewDelete(const PgObjectId& table_id,
1136
                            const bool is_single_row_txn,
1137
907k
                            PgStatement **handle) {
1138
907k
  *handle = nullptr;
1139
907k
  auto stmt = std::make_unique<PgDelete>(pg_session_, table_id, is_single_row_txn);
1140
907k
  RETURN_NOT_OK(stmt->Prepare());
1141
907k
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1142
907k
  return Status::OK();
1143
907k
}
1144
1145
4
Status PgApiImpl::ExecDelete(PgStatement *handle) {
1146
4
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) {
1147
    // Invalid handle.
1148
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1149
0
  }
1150
4
  return down_cast<PgDelete*>(handle)->Exec();
1151
4
}
1152
1153
178
Status PgApiImpl::NewSample(const PgObjectId& table_id, const int targrows, PgStatement **handle) {
1154
178
  *handle = nullptr;
1155
178
  auto sample = std::make_unique<PgSample>(pg_session_, targrows, table_id);
1156
178
  RETURN_NOT_OK(sample->Prepare());
1157
178
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(sample), handle));
1158
178
  return Status::OK();
1159
178
}
1160
1161
178
Status PgApiImpl::InitRandomState(PgStatement *handle, double rstate_w, uint64 rand_state) {
1162
178
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1163
    // Invalid handle.
1164
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1165
0
  }
1166
178
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->InitRandomState(rstate_w, rand_state));
1167
178
  return Status::OK();
1168
178
}
1169
1170
939
Status PgApiImpl::SampleNextBlock(PgStatement *handle, bool *has_more) {
1171
939
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1172
    // Invalid handle.
1173
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1174
0
  }
1175
939
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->SampleNextBlock(has_more));
1176
939
  return Status::OK();
1177
939
}
1178
1179
178
Status PgApiImpl::ExecSample(PgStatement *handle) {
1180
178
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1181
    // Invalid handle.
1182
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1183
0
  }
1184
178
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->Exec(nullptr));
1185
178
  return Status::OK();
1186
178
}
1187
1188
178
Status PgApiImpl::GetEstimatedRowCount(PgStatement *handle, double *liverows, double *deadrows) {
1189
178
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SAMPLE)) {
1190
    // Invalid handle.
1191
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1192
0
  }
1193
178
  RETURN_NOT_OK(down_cast<PgSample*>(handle)->GetEstimatedRowCount(liverows, deadrows));
1194
178
  return Status::OK();
1195
178
}
1196
1197
5
Status PgApiImpl::DeleteStmtSetIsPersistNeeded(PgStatement *handle, const bool is_persist_needed) {
1198
5
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_DELETE)) {
1199
    // Invalid handle.
1200
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1201
0
  }
1202
5
  down_cast<PgDelete*>(handle)->SetIsPersistNeeded(is_persist_needed);
1203
5
  return Status::OK();
1204
5
}
1205
1206
// Colocated Truncate ------------------------------------------------------------------------------
1207
1208
Status PgApiImpl::NewTruncateColocated(const PgObjectId& table_id,
1209
                                       const bool is_single_row_txn,
1210
90
                                       PgStatement **handle) {
1211
90
  *handle = nullptr;
1212
90
  auto stmt = std::make_unique<PgTruncateColocated>(pg_session_, table_id, is_single_row_txn);
1213
90
  RETURN_NOT_OK(stmt->Prepare());
1214
90
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1215
90
  return Status::OK();
1216
90
}
1217
1218
0
Status PgApiImpl::ExecTruncateColocated(PgStatement *handle) {
1219
0
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_TRUNCATE)) {
1220
    // Invalid handle.
1221
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1222
0
  }
1223
0
  return down_cast<PgTruncateColocated*>(handle)->Exec();
1224
0
}
1225
1226
// Select ------------------------------------------------------------------------------------------
1227
1228
Status PgApiImpl::NewSelect(const PgObjectId& table_id,
1229
                            const PgObjectId& index_id,
1230
                            const PgPrepareParameters *prepare_params,
1231
1.58M
                            PgStatement **handle) {
1232
  // Scenarios:
1233
  // - Sequential Scan: PgSelect to read from table_id.
1234
  // - Primary Scan: PgSelect from table_id. YugaByte does not have separate table for primary key.
1235
  // - Index-Only-Scan: PgSelectIndex directly from secondary index_id.
1236
  // - IndexScan: Use PgSelectIndex to read from index_id and then PgSelect to read from table_id.
1237
  //     Note that for SysTable, only one request is send for both table_id and index_id.
1238
1.58M
  *handle = nullptr;
1239
1.58M
  std::unique_ptr<PgDmlRead> stmt;
1240
1.58M
  if (prepare_params && 
prepare_params->index_only_scan1.45M
&&
prepare_params->use_secondary_index91.2k
) {
1241
91.3k
    if (!index_id.IsValid()) {
1242
0
      return STATUS(InvalidArgument, "Cannot run query with invalid index ID");
1243
0
    }
1244
91.3k
    stmt = std::make_unique<PgSelectIndex>(pg_session_, table_id, index_id, prepare_params);
1245
1.49M
  } else {
1246
    // For IndexScan PgSelect processing will create subquery PgSelectIndex.
1247
1.49M
    stmt = std::make_unique<PgSelect>(pg_session_, table_id, index_id, prepare_params);
1248
1.49M
  }
1249
1250
1.58M
  RETURN_NOT_OK(stmt->Prepare());
1251
1.58M
  RETURN_NOT_OK(AddToCurrentPgMemctx(std::move(stmt), handle));
1252
1.58M
  return Status::OK();
1253
1.58M
}
1254
1255
1.46M
Status PgApiImpl::SetForwardScan(PgStatement *handle, bool is_forward_scan) {
1256
1.46M
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) {
1257
    // Invalid handle.
1258
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1259
0
  }
1260
1.46M
  down_cast<PgDmlRead*>(handle)->SetForwardScan(is_forward_scan);
1261
1.46M
  return Status::OK();
1262
1.46M
}
1263
1264
1.58M
Status PgApiImpl::ExecSelect(PgStatement *handle, const PgExecParameters *exec_params) {
1265
1.58M
  if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_SELECT)) {
1266
    // Invalid handle.
1267
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1268
0
  }
1269
1.58M
  return down_cast<PgDmlRead*>(handle)->Exec(exec_params);
1270
1.58M
}
1271
1272
//--------------------------------------------------------------------------------------------------
1273
// Expressions.
1274
//--------------------------------------------------------------------------------------------------
1275
1276
// Column references -------------------------------------------------------------------------------
1277
1278
Status PgApiImpl::NewColumnRef(
1279
    PgStatement *stmt, int attr_num, const PgTypeEntity *type_entity, bool collate_is_valid_non_c,
1280
22.3M
    const PgTypeAttrs *type_attrs, PgExpr **expr_handle) {
1281
22.3M
  if (!stmt) {
1282
    // Invalid handle.
1283
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1284
0
  }
1285
22.3M
  PgColumnRef::SharedPtr colref =
1286
22.3M
    make_shared<PgColumnRef>(attr_num, type_entity, collate_is_valid_non_c, type_attrs);
1287
22.3M
  stmt->AddExpr(colref);
1288
1289
22.3M
  *expr_handle = colref.get();
1290
22.3M
  return Status::OK();
1291
22.3M
}
1292
1293
// Constant ----------------------------------------------------------------------------------------
1294
Status PgApiImpl::NewConstant(
1295
    YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c,
1296
34.8M
    const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle) {
1297
34.8M
  if (!stmt) {
1298
    // Invalid handle.
1299
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1300
0
  }
1301
34.8M
  PgExpr::SharedPtr pg_const =
1302
34.8M
    make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey,
1303
34.8M
                            datum, is_null);
1304
34.8M
  stmt->AddExpr(pg_const);
1305
1306
34.8M
  *expr_handle = pg_const.get();
1307
34.8M
  return Status::OK();
1308
34.8M
}
1309
1310
Status PgApiImpl::NewConstantVirtual(
1311
    YBCPgStatement stmt, const YBCPgTypeEntity *type_entity,
1312
50
    YBCPgDatumKind datum_kind, YBCPgExpr *expr_handle) {
1313
50
  if (!stmt) {
1314
    // Invalid handle.
1315
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1316
0
  }
1317
50
  PgExpr::SharedPtr pg_const =
1318
50
    make_shared<PgConstant>(type_entity, false /* collate_is_valid_non_c */, datum_kind);
1319
50
  stmt->AddExpr(pg_const);
1320
1321
50
  *expr_handle = pg_const.get();
1322
50
  return Status::OK();
1323
50
}
1324
1325
Status PgApiImpl::NewConstantOp(
1326
    YBCPgStatement stmt, const YBCPgTypeEntity *type_entity, bool collate_is_valid_non_c,
1327
    const char *collation_sortkey, uint64_t datum, bool is_null, YBCPgExpr *expr_handle,
1328
6
    bool is_gt) {
1329
6
  if (!stmt) {
1330
    // Invalid handle.
1331
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1332
0
  }
1333
6
  PgExpr::SharedPtr pg_const =
1334
6
    make_shared<PgConstant>(type_entity, collate_is_valid_non_c, collation_sortkey,
1335
6
      datum, is_null, is_gt ? 
PgExpr::Opcode::PG_EXPR_GT3
:
PgExpr::Opcode::PG_EXPR_LT3
);
1336
6
  stmt->AddExpr(pg_const);
1337
1338
6
  *expr_handle = pg_const.get();
1339
6
  return Status::OK();
1340
6
}
1341
1342
// Text constant -----------------------------------------------------------------------------------
1343
1344
2.00k
Status PgApiImpl::UpdateConstant(PgExpr *expr, const char *value, bool is_null) {
1345
2.00k
  if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) {
1346
    // Invalid handle.
1347
0
    return STATUS(InvalidArgument, "Invalid expression handle for constant");
1348
0
  }
1349
2.00k
  down_cast<PgConstant*>(expr)->UpdateConstant(value, is_null);
1350
2.00k
  return Status::OK();
1351
2.00k
}
1352
1353
36
Status PgApiImpl::UpdateConstant(PgExpr *expr, const void *value, int64_t bytes, bool is_null) {
1354
36
  if (expr->opcode() != PgExpr::Opcode::PG_EXPR_CONSTANT) {
1355
    // Invalid handle.
1356
0
    return STATUS(InvalidArgument, "Invalid expression handle for constant");
1357
0
  }
1358
36
  down_cast<PgConstant*>(expr)->UpdateConstant(value, bytes, is_null);
1359
36
  return Status::OK();
1360
36
}
1361
1362
// Text constant -----------------------------------------------------------------------------------
1363
1364
Status PgApiImpl::NewOperator(
1365
    PgStatement *stmt, const char *opname, const YBCPgTypeEntity *type_entity,
1366
22.8k
    bool collate_is_valid_non_c, PgExpr **op_handle) {
1367
22.8k
  if (!stmt) {
1368
    // Invalid handle.
1369
0
    return STATUS(InvalidArgument, "Invalid statement handle");
1370
0
  }
1371
22.8k
  RETURN_NOT_OK(PgExpr::CheckOperatorName(opname));
1372
1373
  // Create operator.
1374
22.8k
  PgExpr::SharedPtr pg_op = make_shared<PgOperator>(opname, type_entity, collate_is_valid_non_c);
1375
22.8k
  stmt->AddExpr(pg_op);
1376
1377
22.8k
  *op_handle = pg_op.get();
1378
22.8k
  return Status::OK();
1379
22.8k
}
1380
1381
22.8k
Status PgApiImpl::OperatorAppendArg(PgExpr *op_handle, PgExpr *arg) {
1382
22.8k
  if (!op_handle || !arg) {
1383
    // Invalid handle.
1384
0
    return STATUS(InvalidArgument, "Invalid expression handle");
1385
0
  }
1386
22.8k
  down_cast<PgOperator*>(op_handle)->AppendArg(arg);
1387
22.8k
  return Status::OK();
1388
22.8k
}
1389
1390
2
Result<bool> PgApiImpl::IsInitDbDone() {
1391
2
  return pg_session_->IsInitDbDone();
1392
2
}
1393
1394
442k
Result<uint64_t> PgApiImpl::GetSharedCatalogVersion() {
1395
442k
  return pg_session_->GetSharedCatalogVersion();
1396
442k
}
1397
1398
1.95k
Result<uint64_t> PgApiImpl::GetSharedAuthKey() {
1399
1.95k
  return pg_session_->GetSharedAuthKey();
1400
1.95k
}
1401
1402
// Transaction Control -----------------------------------------------------------------------------
1403
415k
Status PgApiImpl::BeginTransaction() {
1404
415k
  pg_session_->InvalidateForeignKeyReferenceCache();
1405
415k
  return pg_txn_manager_->BeginTransaction();
1406
415k
}
1407
1408
69.0k
Status PgApiImpl::RecreateTransaction() {
1409
69.0k
  pg_session_->InvalidateForeignKeyReferenceCache();
1410
69.0k
  pg_session_->DropBufferedOperations();
1411
69.0k
  return pg_txn_manager_->RecreateTransaction();
1412
69.0k
}
1413
1414
510
Status PgApiImpl::RestartTransaction() {
1415
510
  pg_session_->InvalidateForeignKeyReferenceCache();
1416
510
  pg_session_->DropBufferedOperations();
1417
510
  return pg_txn_manager_->RestartTransaction();
1418
510
}
1419
1420
28.6k
Status PgApiImpl::ResetTransactionReadPoint() {
1421
28.6k
  return pg_txn_manager_->ResetTransactionReadPoint();
1422
28.6k
}
1423
1424
163
Status PgApiImpl::RestartReadPoint() {
1425
163
  return pg_txn_manager_->RestartReadPoint();
1426
163
}
1427
1428
381k
Status PgApiImpl::CommitTransaction() {
1429
381k
  pg_session_->InvalidateForeignKeyReferenceCache();
1430
381k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1431
381k
  return pg_txn_manager_->CommitTransaction();
1432
381k
}
1433
1434
50.5k
Status PgApiImpl::AbortTransaction() {
1435
50.5k
  pg_session_->InvalidateForeignKeyReferenceCache();
1436
50.5k
  pg_session_->DropBufferedOperations();
1437
50.5k
  return pg_txn_manager_->AbortTransaction();
1438
50.5k
}
1439
1440
437k
Status PgApiImpl::SetTransactionIsolationLevel(int isolation) {
1441
437k
  return pg_txn_manager_->SetPgIsolationLevel(isolation);
1442
437k
}
1443
1444
414k
Status PgApiImpl::SetTransactionReadOnly(bool read_only) {
1445
414k
  return pg_txn_manager_->SetReadOnly(read_only);
1446
414k
}
1447
1448
414k
Status PgApiImpl::EnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms) {
1449
414k
  return pg_txn_manager_->EnableFollowerReads(enable_follower_reads, staleness_ms);
1450
414k
}
1451
1452
414k
Status PgApiImpl::SetTransactionDeferrable(bool deferrable) {
1453
414k
  return pg_txn_manager_->SetDeferrable(deferrable);
1454
414k
}
1455
1456
20.3k
Status PgApiImpl::EnterSeparateDdlTxnMode() {
1457
  // Flush all buffered operations as ddl txn use its own transaction session.
1458
20.3k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1459
20.3k
  return pg_txn_manager_->EnterSeparateDdlTxnMode();
1460
20.3k
}
1461
1462
18.6k
Status PgApiImpl::ExitSeparateDdlTxnMode() {
1463
  // Flush all buffered operations as ddl txn use its own transaction session.
1464
18.6k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1465
18.6k
  RETURN_NOT_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kTrue));
1466
  // Next reads from catalog tables have to see changes made by the DDL transaction.
1467
18.6k
  ResetCatalogReadTime();
1468
18.6k
  return Status::OK();
1469
18.6k
}
1470
1471
1.71k
void PgApiImpl::ClearSeparateDdlTxnMode() {
1472
1.71k
  pg_session_->DropBufferedOperations();
1473
1.71k
  CHECK_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kFalse));
1474
1.71k
}
1475
1476
61.7k
Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) {
1477
61.7k
  RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1478
61.7k
  return pg_session_->SetActiveSubTransaction(id);
1479
61.7k
}
1480
1481
13.5k
Status PgApiImpl::RollbackSubTransaction(SubTransactionId id) {
1482
13.5k
  pg_session_->DropBufferedOperations();
1483
13.5k
  return pg_session_->RollbackSubTransaction(id);
1484
13.5k
}
1485
1486
2.37M
void PgApiImpl::ResetCatalogReadTime() {
1487
2.37M
  pg_session_->ResetCatalogReadPoint();
1488
2.37M
}
1489
1490
Result<bool> PgApiImpl::ForeignKeyReferenceExists(
1491
237k
    PgOid table_id, const Slice& ybctid, PgOid database_id) {
1492
237k
  return pg_session_->ForeignKeyReferenceExists(
1493
237k
      table_id, ybctid, std::bind(FetchExistingYbctids,
1494
237k
                                  pg_session_,
1495
237k
                                  database_id,
1496
237k
                                  std::placeholders::_1,
1497
237k
                                  std::placeholders::_2));
1498
237k
}
1499
1500
245k
void PgApiImpl::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) {
1501
245k
  pg_session_->AddForeignKeyReferenceIntent(table_id, ybctid);
1502
245k
}
1503
1504
724k
void PgApiImpl::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) {
1505
724k
  pg_session_->DeleteForeignKeyReference(table_id, ybctid);
1506
724k
}
1507
1508
4.38M
void PgApiImpl::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) {
1509
4.38M
  pg_session_->AddForeignKeyReference(table_id, ybctid);
1510
4.38M
}
1511
1512
2
void PgApiImpl::SetTimeout(const int timeout_ms) {
1513
2
  pg_session_->SetTimeout(timeout_ms);
1514
2
}
1515
1516
4
Result<client::TabletServersInfo> PgApiImpl::ListTabletServers() {
1517
4
  return pg_session_->ListTabletServers();
1518
4
}
1519
1520
1
Status PgApiImpl::ValidatePlacement(const char *placement_info) {
1521
1
  return pg_session_->ValidatePlacement(placement_info);
1522
1
}
1523
1524
} // namespace pggate
1525
} // namespace yb