YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_session.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/pggate/pg_session.h"
17
18
#include <memory>
19
20
#include <boost/optional.hpp>
21
22
#include "yb/client/batcher.h"
23
#include "yb/client/error.h"
24
#include "yb/client/schema.h"
25
#include "yb/client/session.h"
26
#include "yb/client/table.h"
27
#include "yb/client/tablet_server.h"
28
#include "yb/client/transaction.h"
29
#include "yb/client/yb_op.h"
30
#include "yb/client/yb_table_name.h"
31
32
#include "yb/common/pg_types.h"
33
#include "yb/common/pgsql_error.h"
34
#include "yb/common/placement_info.h"
35
#include "yb/common/ql_expr.h"
36
#include "yb/common/ql_value.h"
37
#include "yb/common/row_mark.h"
38
#include "yb/common/schema.h"
39
#include "yb/common/transaction_error.h"
40
41
#include "yb/docdb/doc_key.h"
42
#include "yb/docdb/primitive_value.h"
43
#include "yb/docdb/value_type.h"
44
45
#include "yb/gutil/casts.h"
46
47
#include "yb/tserver/pg_client.pb.h"
48
#include "yb/tserver/tserver_shared_mem.h"
49
50
#include "yb/util/flag_tags.h"
51
#include "yb/util/format.h"
52
#include "yb/util/result.h"
53
#include "yb/util/shared_mem.h"
54
#include "yb/util/status_format.h"
55
#include "yb/util/string_util.h"
56
57
#include "yb/yql/pggate/pg_client.h"
58
#include "yb/yql/pggate/pg_expr.h"
59
#include "yb/yql/pggate/pg_op.h"
60
#include "yb/yql/pggate/pg_txn_manager.h"
61
#include "yb/yql/pggate/pggate_flags.h"
62
#include "yb/yql/pggate/ybc_pggate.h"
63
64
using namespace std::literals;
65
66
DEFINE_int32(ysql_wait_until_index_permissions_timeout_ms, 60 * 60 * 1000, // 60 min.
67
             "DEPRECATED: use backfill_index_client_rpc_timeout_ms instead.");
68
TAG_FLAG(ysql_wait_until_index_permissions_timeout_ms, advanced);
69
DECLARE_int32(TEST_user_ddl_operation_timeout_sec);
70
71
DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests.");
72
73
namespace yb {
74
namespace pggate {
75
76
using std::make_shared;
77
using std::unique_ptr;
78
using std::shared_ptr;
79
using std::string;
80
81
using client::YBClient;
82
using client::YBSession;
83
using client::YBMetaDataCache;
84
using client::YBSchema;
85
using client::YBOperation;
86
using client::YBTable;
87
using client::YBTableName;
88
using client::YBTableType;
89
90
using yb::master::GetNamespaceInfoResponsePB;
91
92
using yb::tserver::TServerSharedObject;
93
94
namespace {
95
96
static constexpr const size_t kPgSequenceLastValueColIdx = 2;
97
static constexpr const size_t kPgSequenceIsCalledColIdx = 3;
98
99
69.2k
docdb::PrimitiveValue NullValue(SortingType sorting) {
100
69.2k
  using SortingType = SortingType;
101
102
69.2k
  return docdb::PrimitiveValue(
103
69.2k
      sorting == SortingType::kAscendingNullsLast || sorting == SortingType::kDescendingNullsLast
104
0
          ? docdb::ValueType::kNullHigh
105
69.2k
          : docdb::ValueType::kNullLow);
106
69.2k
}
107
108
void InitKeyColumnPrimitiveValues(
109
    const google::protobuf::RepeatedPtrField<PgsqlExpressionPB> &column_values,
110
    const Schema &schema,
111
    size_t start_idx,
112
703k
    vector<docdb::PrimitiveValue> *components) {
113
703k
  size_t column_idx = start_idx;
114
1.03M
  for (const auto& column_value : column_values) {
115
1.03M
    const auto sorting_type = schema.column(column_idx).sorting_type();
116
1.03M
    if (column_value.has_value()) {
117
1.03M
      const auto& value = column_value.value();
118
1.03M
      components->push_back(
119
1.03M
          IsNull(value)
120
69.2k
          ? NullValue(sorting_type)
121
963k
          : docdb::PrimitiveValue::FromQLValuePB(value, sorting_type));
122
27
    } else {
123
      // TODO(neil) The current setup only works for CQL as it assumes primary key value must not
124
      // be dependent on any column values. This needs to be fixed as PostgreSQL expression might
125
      // require a read from a table.
126
      //
127
      // Use regular executor for now.
128
27
      QLExprExecutor executor;
129
27
      QLExprResult result;
130
27
      auto s = executor.EvalExpr(column_value, nullptr, result.Writer());
131
132
27
      components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type));
133
27
    }
134
1.03M
    ++column_idx;
135
1.03M
  }
136
703k
}
137
138
302k
bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const string& table_id) {
139
302k
  return request.table_id() == table_id ||
140
296k
      (request.has_index_request() && IsTableUsedByRequest(request.index_request(), table_id));
141
302k
}
142
143
122
bool IsTableUsedByRequest(const PgsqlWriteRequestPB& request, const string& table_id) {
144
122
  return request.table_id() == table_id;
145
122
}
146
147
192k
bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) {
148
192k
  if (op.is_read()) {
149
192k
    return IsTableUsedByRequest(down_cast<const PgsqlReadOp&>(op).read_request(), table_id);
150
122
  } else {
151
122
    return IsTableUsedByRequest(down_cast<const PgsqlWriteOp&>(op).write_request(), table_id);
152
122
  }
153
192k
}
154
155
struct PgForeignKeyReferenceLightweight {
156
  PgOid table_id;
157
  Slice ybctid;
158
};
159
160
4.69M
size_t ForeignKeyReferenceHash(PgOid table_id, const char* begin, const char* end) {
161
4.69M
  size_t hash = 0;
162
4.69M
  boost::hash_combine(hash, table_id);
163
4.69M
  boost::hash_range(hash, begin, end);
164
4.69M
  return hash;
165
4.69M
}
166
167
template<class Container>
168
2.05M
auto Find(const Container& container, PgOid table_id, const Slice& ybctid) {
169
2.05M
  return container.find(PgForeignKeyReferenceLightweight{table_id, ybctid},
170
2.05M
      [](const auto& k) {
171
2.05M
        return ForeignKeyReferenceHash(k.table_id, k.ybctid.cdata(), k.ybctid.cend()); },
172
1.63M
      [](const auto& l, const auto& r) {
173
1.63M
        return l.table_id == r.table_id && l.ybctid == r.ybctid; });
174
2.05M
}
175
176
template<class Container>
177
63.5k
bool Erase(Container* container, PgOid table_id, const Slice& ybctid) {
178
63.5k
  const auto it = Find(*container, table_id, ybctid);
179
63.5k
  if (it != container->end()) {
180
1.70k
    container->erase(it);
181
1.70k
    return true;
182
1.70k
  }
183
61.8k
  return false;
184
61.8k
}
185
186
} // namespace
187
188
189
PerformFuture::PerformFuture(
190
    std::future<PerformResult> future, PgSession* session, PgObjectIds* relations)
191
775k
    : future_(std::move(future)), session_(session), relations_(std::move(*relations)) {}
192
193
10.8M
bool PerformFuture::Valid() const {
194
10.8M
  return session_ != nullptr;
195
10.8M
}
196
197
774k
CHECKED_STATUS PerformFuture::Get() {
198
774k
  auto result = future_.get();
199
774k
  auto session = session_;
200
774k
  session_ = nullptr;
201
774k
  session->TrySetCatalogReadPoint(result.catalog_read_time);
202
774k
  return session->PatchStatus(result.status, relations_);
203
774k
}
204
205
//--------------------------------------------------------------------------------------------------
206
// Class PgSession::RunHelper
207
//--------------------------------------------------------------------------------------------------
208
209
PgSession::RunHelper::RunHelper(const PgObjectId& relation_id,
210
                                PgSession* pg_session,
211
                                IsTransactionalSession transactional)
212
    : relation_id_(relation_id),
213
      pg_session_(*pg_session),
214
      transactional_(transactional),
215
      buffer_(transactional_ ? pg_session_.buffered_txn_ops_
216
2.74M
                             : pg_session_.buffered_ops_) {
217
2.74M
}
218
219
Status PgSession::RunHelper::Apply(
220
4.03M
    const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable) {
221
4.03M
  auto& buffered_keys = pg_session_.buffered_keys_;
222
  // Try buffering this operation if it is a write operation, buffering is enabled and no
223
  // operations have been already applied to current session (yb session does not exist).
224
4.03M
  if (operations_.empty() && pg_session_.buffering_enabled_ &&
225
2.59M
      !force_non_bufferable && op->is_write()) {
226
2.12M
    const auto& wop = down_cast<PgsqlWriteOp&>(*op).write_request();
227
    // Check for buffered operation related to same row.
228
    // If multiple operations are performed in context of single RPC second operation will not
229
    // see the results of first operation on DocDB side.
230
    // Multiple operations on same row must be performed in context of different RPC.
231
    // Flush is required in this case.
232
2.12M
    RowIdentifier row_id(schema, wop);
233
2.12M
    if (PREDICT_FALSE(!buffered_keys.insert(row_id).second)) {
234
17.6k
      RETURN_NOT_OK(pg_session_.FlushBufferedOperations());
235
17.6k
      buffered_keys.insert(row_id);
236
17.6k
    }
237
2.12M
    if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
238
0
      LOG(INFO) << "Buffering operation: " << wop.ShortDebugString();
239
0
    }
240
2.12M
    buffer_.Add(op, relation_id_);
241
    // Flush buffers in case limit of operations in single RPC exceeded.
242
2.12M
    return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size)
243
2.12M
        ? Status::OK()
244
2.31k
        : pg_session_.FlushBufferedOperations();
245
1.90M
  }
246
1.90M
  bool read_only = op->is_read();
247
  // Flush all buffered operations (if any) before performing non-bufferable operation
248
1.90M
  if (!buffered_keys.empty()) {
249
38.3k
    SCHECK(operations_.empty(),
250
38.3k
           IllegalState,
251
38.3k
           "Buffered operations must be flushed before applying first non-bufferable operation");
252
    // Buffered operations can't be combined within single RPC with non bufferable operation
253
    // in case non bufferable operation has preset read_time.
254
    // Buffered operations must be flushed independently in this case.
255
38.3k
    bool full_flush_required = transactional_ && read_time && *read_time;
256
    // Check for buffered operation that affected same table as current operation.
257
231k
    for (auto i = buffered_keys.begin(); !full_flush_required && i != buffered_keys.end(); ++i) {
258
192k
      full_flush_required = IsTableUsedByOperation(*op, i->table_id());
259
192k
    }
260
38.3k
    if (full_flush_required) {
261
7.14k
      RETURN_NOT_OK(pg_session_.FlushBufferedOperations());
262
31.2k
    } else {
263
31.2k
      RETURN_NOT_OK(pg_session_.FlushBufferedOperationsImpl(
264
31.2k
          [this](auto ops, auto transactional) -> Status {
265
31.2k
            if (transactional == transactional_) {
266
              // Save buffered operations for further applying before non-buffered operation.
267
31.2k
              operations_.Swap(&ops);
268
31.2k
              return Status::OK();
269
31.2k
            }
270
31.2k
            return pg_session_.FlushOperations(std::move(ops), transactional);
271
31.2k
          }));
272
31.2k
      read_only = read_only && operations_.empty();
273
31.2k
    }
274
38.3k
  }
275
276
1.90M
  TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange;
277
1.90M
  if (op->is_read()) {
278
1.89M
    const PgsqlReadRequestPB& read_req = down_cast<PgsqlReadOp&>(*op).read_request();
279
1.89M
    auto row_mark_type = GetRowMarkTypeFromPB(read_req);
280
1.89M
    read_only = read_only && !IsValidRowMarkType(row_mark_type);
281
1.89M
    if (RowMarkNeedsHigherPriority((RowMarkType) row_mark_type)) {
282
6.89k
      txn_priority_requirement = kHigherPriorityRange;
283
6.89k
    }
284
1.89M
  }
285
286
1.90M
  if (pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
287
75.5k
    txn_priority_requirement = kHighestPriority;
288
75.5k
  }
289
290
1.90M
  if (!transactional_ && read_only && schema.table_properties().is_ysql_catalog_table() &&
291
194k
      !YBCIsInitDbModeEnvVarSet()) {
292
194k
    pg_session_.use_catalog_session_ = true;
293
194k
  }
294
295
1.90M
  if (transactional_) {
296
1.71M
    pg_session_.UpdateInTxnLimit(read_time);
297
1.71M
  }
298
299
1.90M
  if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
300
#ifdef PG_CLIENT
301
    LOG(INFO) << "Applying operation: " << op->ShortDebugString();
302
#endif
303
0
  }
304
305
1.90M
  operations_.Add(op, relation_id_);
306
307
1.90M
  if (transactional_) {
308
1.71M
    RETURN_NOT_OK(pg_session_.pg_txn_manager_->CalculateIsolation(
309
1.71M
        read_only, txn_priority_requirement));
310
1.71M
  }
311
312
1.90M
  return Status::OK();
313
1.90M
}
314
315
2.74M
Result<PerformFuture> PgSession::RunHelper::Flush() {
316
2.74M
  if (operations_.empty()) {
317
    // All operations were buffered, no need to flush.
318
2.12M
    return PerformFuture();
319
2.12M
  }
320
321
624k
  auto promise = std::make_shared<std::promise<PerformResult>>();
322
323
624k
  pg_session_.Perform(&operations_.operations, [promise](PerformResult result) {
324
624k
    promise->set_value(result);
325
624k
  });
326
624k
  return PerformFuture(promise->get_future(), &pg_session_, &operations_.relations);
327
624k
}
328
329
//--------------------------------------------------------------------------------------------------
330
// Class PgForeignKeyReference
331
//--------------------------------------------------------------------------------------------------
332
333
PgForeignKeyReference::PgForeignKeyReference(PgOid tid, std::string yid) :
334
1.85M
  table_id(tid), ybctid(std::move(yid)) {
335
1.85M
}
336
337
1.50M
bool operator==(const PgForeignKeyReference& k1, const PgForeignKeyReference& k2) {
338
1.50M
  return k1.table_id == k2.table_id && k1.ybctid == k2.ybctid;
339
1.50M
}
340
341
2.64M
size_t hash_value(const PgForeignKeyReference& key) {
342
2.64M
  return ForeignKeyReferenceHash(
343
2.64M
      key.table_id, key.ybctid.c_str(), key.ybctid.c_str() + key.ybctid.length());
344
2.64M
}
345
346
//--------------------------------------------------------------------------------------------------
347
// Class RowIdentifier
348
//--------------------------------------------------------------------------------------------------
349
350
RowIdentifier::RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request)
351
2.12M
    : table_id_(&request.table_id()) {
352
2.12M
  if (request.has_ybctid_column_value()) {
353
1.77M
    ybctid_ = &request.ybctid_column_value().value().binary_value();
354
352k
  } else {
355
352k
    vector<docdb::PrimitiveValue> hashed_components;
356
352k
    vector<docdb::PrimitiveValue> range_components;
357
352k
    InitKeyColumnPrimitiveValues(request.partition_column_values(),
358
352k
                                 schema,
359
352k
                                 0 /* start_idx */,
360
352k
                                 &hashed_components);
361
352k
    InitKeyColumnPrimitiveValues(request.range_column_values(),
362
352k
                                 schema,
363
352k
                                 schema.num_hash_key_columns(),
364
352k
                                 &range_components);
365
352k
    if (hashed_components.empty()) {
366
204k
      ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer();
367
147k
    } else {
368
147k
      ybctid_holder_ = docdb::DocKey(request.hash_code(),
369
147k
                                     std::move(hashed_components),
370
147k
                                     std::move(range_components)).Encode().ToStringBuffer();
371
147k
    }
372
352k
    ybctid_ = nullptr;
373
352k
  }
374
2.12M
}
375
376
3.19M
const string& RowIdentifier::ybctid() const {
377
2.68M
  return ybctid_ ? *ybctid_ : ybctid_holder_;
378
3.19M
}
379
380
3.54M
const string& RowIdentifier::table_id() const {
381
3.54M
  return *table_id_;
382
3.54M
}
383
384
603k
bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) {
385
603k
  return k1.table_id() == k2.table_id() && k1.ybctid() == k2.ybctid();
386
603k
}
387
388
2.14M
size_t hash_value(const RowIdentifier& key) {
389
2.14M
  size_t hash = 0;
390
2.14M
  boost::hash_combine(hash, key.table_id());
391
2.14M
  boost::hash_combine(hash, key.ybctid());
392
2.14M
  return hash;
393
2.14M
}
394
395
//--------------------------------------------------------------------------------------------------
396
// Class PgSession
397
//--------------------------------------------------------------------------------------------------
398
399
PgSession::PgSession(
400
    client::YBClient* client,
401
    PgClient* pg_client,
402
    const string& database_name,
403
    scoped_refptr<PgTxnManager> pg_txn_manager,
404
    scoped_refptr<server::HybridClock> clock,
405
    const tserver::TServerSharedObject* tserver_shared_object,
406
    const YBCPgCallbacks& pg_callbacks)
407
    : session_(BuildSession(client)),
408
      pg_client_(*pg_client),
409
      pg_txn_manager_(std::move(pg_txn_manager)),
410
      clock_(std::move(clock)),
411
      tserver_shared_object_(tserver_shared_object),
412
1.65k
      pg_callbacks_(pg_callbacks) {
413
1.65k
}
414
415
1.65k
PgSession::~PgSession() {
416
1.65k
}
417
418
//--------------------------------------------------------------------------------------------------
419
420
1.65k
Status PgSession::ConnectDatabase(const string& database_name) {
421
1.65k
  connected_database_ = database_name;
422
1.65k
  return Status::OK();
423
1.65k
}
424
425
1.61k
Status PgSession::IsDatabaseColocated(const PgOid database_oid, bool *colocated) {
426
1.61k
  auto resp = VERIFY_RESULT(pg_client_.GetDatabaseInfo(database_oid));
427
1.61k
  *colocated = resp.colocated();
428
1.61k
  return Status::OK();
429
1.61k
}
430
431
//--------------------------------------------------------------------------------------------------
432
433
21
Status PgSession::DropDatabase(const string& database_name, PgOid database_oid) {
434
21
  tserver::PgDropDatabaseRequestPB req;
435
21
  req.set_database_name(database_name);
436
21
  req.set_database_oid(database_oid);
437
438
21
  RETURN_NOT_OK(pg_client_.DropDatabase(&req, CoarseTimePoint()));
439
20
  RETURN_NOT_OK(DeleteDBSequences(database_oid));
440
20
  return Status::OK();
441
20
}
442
443
0
Status PgSession::GetCatalogMasterVersion(uint64_t *version) {
444
0
  *version = VERIFY_RESULT(pg_client_.GetCatalogMasterVersion());
445
0
  return Status::OK();
446
0
}
447
448
21
Status PgSession::CreateSequencesDataTable() {
449
21
  return pg_client_.CreateSequencesDataTable();
450
21
}
451
452
Status PgSession::InsertSequenceTuple(int64_t db_oid,
453
                                      int64_t seq_oid,
454
                                      uint64_t ysql_catalog_version,
455
                                      int64_t last_val,
456
54
                                      bool is_called) {
457
54
  PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
458
54
  auto result = LoadTable(oid);
459
54
  if (!result.ok()) {
460
21
    RETURN_NOT_OK(CreateSequencesDataTable());
461
    // Try one more time.
462
21
    result = LoadTable(oid);
463
21
  }
464
54
  auto t = VERIFY_RESULT(std::move(result));
465
466
54
  auto psql_write(t->NewPgsqlInsert());
467
468
54
  auto write_request = psql_write->mutable_request();
469
54
  write_request->set_ysql_catalog_version(ysql_catalog_version);
470
471
54
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid);
472
54
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid);
473
474
54
  PgsqlColumnValuePB* column_value = write_request->add_column_values();
475
54
  column_value->set_column_id(t->schema().column_id(kPgSequenceLastValueColIdx));
476
54
  column_value->mutable_expr()->mutable_value()->set_int64_value(last_val);
477
478
54
  column_value = write_request->add_column_values();
479
54
  column_value->set_column_id(t->schema().column_id(kPgSequenceIsCalledColIdx));
480
54
  column_value->mutable_expr()->mutable_value()->set_bool_value(is_called);
481
482
54
  return session_->ApplyAndFlush(std::move(psql_write));
483
54
}
484
485
Status PgSession::UpdateSequenceTuple(int64_t db_oid,
486
                                      int64_t seq_oid,
487
                                      uint64_t ysql_catalog_version,
488
                                      int64_t last_val,
489
                                      bool is_called,
490
                                      boost::optional<int64_t> expected_last_val,
491
                                      boost::optional<bool> expected_is_called,
492
33
                                      bool* skipped) {
493
33
  PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
494
33
  auto t = VERIFY_RESULT(LoadTable(oid));
495
496
33
  std::shared_ptr<client::YBPgsqlWriteOp> psql_write(t->NewPgsqlUpdate());
497
498
33
  auto write_request = psql_write->mutable_request();
499
33
  write_request->set_ysql_catalog_version(ysql_catalog_version);
500
501
33
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid);
502
33
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid);
503
504
33
  PgsqlColumnValuePB* column_value = write_request->add_column_new_values();
505
33
  column_value->set_column_id(t->schema().column_id(kPgSequenceLastValueColIdx));
506
33
  column_value->mutable_expr()->mutable_value()->set_int64_value(last_val);
507
508
33
  column_value = write_request->add_column_new_values();
509
33
  column_value->set_column_id(t->schema().column_id(kPgSequenceIsCalledColIdx));
510
33
  column_value->mutable_expr()->mutable_value()->set_bool_value(is_called);
511
512
33
  auto where_pb = write_request->mutable_where_expr()->mutable_condition();
513
514
33
  if (expected_last_val && expected_is_called) {
515
    // WHERE clause => WHERE last_val == expected_last_val AND is_called == expected_is_called.
516
31
    where_pb->set_op(QL_OP_AND);
517
518
31
    auto cond = where_pb->add_operands()->mutable_condition();
519
31
    cond->set_op(QL_OP_EQUAL);
520
31
    cond->add_operands()->set_column_id(t->schema().column_id(kPgSequenceLastValueColIdx));
521
31
    cond->add_operands()->mutable_value()->set_int64_value(*expected_last_val);
522
523
31
    cond = where_pb->add_operands()->mutable_condition();
524
31
    cond->set_op(QL_OP_EQUAL);
525
31
    cond->add_operands()->set_column_id(t->schema().column_id(kPgSequenceIsCalledColIdx));
526
31
    cond->add_operands()->mutable_value()->set_bool_value(*expected_is_called);
527
2
  } else {
528
2
    where_pb->set_op(QL_OP_EXISTS);
529
2
  }
530
531
  // For compatibility set deprecated column_refs
532
33
  write_request->mutable_column_refs()->add_ids(
533
33
      t->schema().column_id(kPgSequenceLastValueColIdx));
534
33
  write_request->mutable_column_refs()->add_ids(
535
33
      t->schema().column_id(kPgSequenceIsCalledColIdx));
536
  // Same values, to be consumed by current TServers
537
33
  write_request->add_col_refs()->set_column_id(
538
33
      t->schema().column_id(kPgSequenceLastValueColIdx));
539
33
  write_request->add_col_refs()->set_column_id(
540
33
      t->schema().column_id(kPgSequenceIsCalledColIdx));
541
542
33
  RETURN_NOT_OK(session_->ApplyAndFlush(psql_write));
543
33
  if (skipped) {
544
31
    *skipped = psql_write->response().skipped();
545
31
  }
546
33
  return Status::OK();
547
33
}
548
549
Status PgSession::ReadSequenceTuple(int64_t db_oid,
550
                                    int64_t seq_oid,
551
                                    uint64_t ysql_catalog_version,
552
                                    int64_t *last_val,
553
69
                                    bool *is_called) {
554
69
  PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
555
69
  PgTableDescPtr t = VERIFY_RESULT(LoadTable(oid));
556
557
69
  std::shared_ptr<client::YBPgsqlReadOp> psql_read(t->NewPgsqlSelect());
558
559
69
  auto read_request = psql_read->mutable_request();
560
69
  read_request->set_ysql_catalog_version(ysql_catalog_version);
561
562
69
  read_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid);
563
69
  read_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid);
564
565
69
  read_request->add_targets()->set_column_id(
566
69
      t->schema().column_id(kPgSequenceLastValueColIdx));
567
69
  read_request->add_targets()->set_column_id(
568
69
      t->schema().column_id(kPgSequenceIsCalledColIdx));
569
570
  // For compatibility set deprecated column_refs
571
69
  read_request->mutable_column_refs()->add_ids(
572
69
      t->schema().column_id(kPgSequenceLastValueColIdx));
573
69
  read_request->mutable_column_refs()->add_ids(
574
69
      t->schema().column_id(kPgSequenceIsCalledColIdx));
575
  // Same values, to be consumed by current TServers
576
69
  read_request->add_col_refs()->set_column_id(
577
69
      t->schema().column_id(kPgSequenceLastValueColIdx));
578
69
  read_request->add_col_refs()->set_column_id(
579
69
      t->schema().column_id(kPgSequenceIsCalledColIdx));
580
581
69
  RETURN_NOT_OK(session_->ReadSync(psql_read));
582
583
69
  Slice cursor;
584
69
  int64_t row_count = 0;
585
69
  PgDocData::LoadCache(psql_read->rows_data(), &row_count, &cursor);
586
69
  if (row_count == 0) {
587
0
    return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid);
588
0
  }
589
590
69
  PgWireDataHeader header = PgDocData::ReadDataHeader(&cursor);
591
69
  if (header.is_null()) {
592
0
    return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid);
593
0
  }
594
69
  size_t read_size = PgDocData::ReadNumber(&cursor, last_val);
595
69
  cursor.remove_prefix(read_size);
596
597
69
  header = PgDocData::ReadDataHeader(&cursor);
598
69
  if (header.is_null()) {
599
0
    return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid);
600
0
  }
601
69
  read_size = PgDocData::ReadNumber(&cursor, is_called);
602
69
  return Status::OK();
603
69
}
604
605
44
Status PgSession::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) {
606
44
  PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
607
44
  PgTableDescPtr t = VERIFY_RESULT(LoadTable(oid));
608
609
44
  auto psql_delete(t->NewPgsqlDelete());
610
44
  auto delete_request = psql_delete->mutable_request();
611
612
44
  delete_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid);
613
44
  delete_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid);
614
615
44
  return session_->ApplyAndFlush(std::move(psql_delete));
616
44
}
617
618
20
Status PgSession::DeleteDBSequences(int64_t db_oid) {
619
20
  PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
620
20
  Result<PgTableDescPtr> r = LoadTable(oid);
621
20
  if (!r.ok()) {
622
    // Sequence table is not yet created.
623
16
    return Status::OK();
624
16
  }
625
626
4
  auto t = std::move(*r);
627
4
  if (t == nullptr) {
628
0
    return Status::OK();
629
0
  }
630
631
4
  auto psql_delete(t->NewPgsqlDelete());
632
4
  auto delete_request = psql_delete->mutable_request();
633
634
4
  delete_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid);
635
4
  return session_->ApplyAndFlush(std::move(psql_delete));
636
4
}
637
638
//--------------------------------------------------------------------------------------------------
639
640
1.03k
Status PgSession::DropTable(const PgObjectId& table_id) {
641
1.03k
  tserver::PgDropTableRequestPB req;
642
1.03k
  table_id.ToPB(req.mutable_table_id());
643
1.03k
  return ResultToStatus(pg_client_.DropTable(&req, CoarseTimePoint()));
644
1.03k
}
645
646
Status PgSession::DropIndex(
647
    const PgObjectId& index_id,
648
141
    client::YBTableName* indexed_table_name) {
649
141
  tserver::PgDropTableRequestPB req;
650
141
  index_id.ToPB(req.mutable_table_id());
651
141
  req.set_index(true);
652
140
  auto result = VERIFY_RESULT(pg_client_.DropTable(&req, CoarseTimePoint()));
653
140
  if (indexed_table_name) {
654
140
    *indexed_table_name = std::move(result);
655
140
  }
656
140
  return Status::OK();
657
141
}
658
659
Status PgSession::DropTablegroup(const PgOid database_oid,
660
0
                                 PgOid tablegroup_oid) {
661
0
  tserver::PgDropTablegroupRequestPB req;
662
0
  PgObjectId tablegroup_id(database_oid, tablegroup_oid);
663
0
  tablegroup_id.ToPB(req.mutable_tablegroup_id());
664
0
  Status s = pg_client_.DropTablegroup(&req, CoarseTimePoint());
665
0
  InvalidateTableCache(PgObjectId(database_oid, tablegroup_oid), InvalidateOnPgClient::kFalse);
666
0
  return s;
667
0
}
668
669
//--------------------------------------------------------------------------------------------------
670
671
5.95M
Result<PgTableDescPtr> PgSession::LoadTable(const PgObjectId& table_id) {
672
188
  VLOG(3) << "Loading table descriptor for " << table_id;
673
674
5.95M
  auto cached_table_it = table_cache_.find(table_id);
675
5.95M
  bool exists = cached_table_it != table_cache_.end();
676
5.95M
  if (exists && cached_table_it->second) {
677
5.89M
    return cached_table_it->second;
678
5.89M
  }
679
680
14
  VLOG(4) << "Table cache MISS: " << table_id;
681
64.5k
  auto table = VERIFY_RESULT(pg_client_.OpenTable(table_id, exists, invalidate_table_cache_time_));
682
64.5k
  invalidate_table_cache_time_ = CoarseTimePoint();
683
64.5k
  if (exists) {
684
2
    cached_table_it->second = table;
685
64.5k
  } else {
686
64.5k
    table_cache_.emplace(table_id, table);
687
64.5k
  }
688
64.5k
  return table;
689
64.6k
}
690
691
void PgSession::InvalidateTableCache(
692
1.61k
    const PgObjectId& table_id, InvalidateOnPgClient invalidate_on_pg_client) {
693
1.61k
  if (invalidate_on_pg_client) {
694
    // Keep special record about this table_id, so when we would open this table again,
695
    // reopen flag will be sent to pg client service.
696
    // This flag means that pg client service should remove table from his cache and fetch
697
    // new data from master.
698
    // It is optional optimization, but some tests fails w/o it, since they expect that
699
    // local table information is updated after alter table operation.
700
2
    table_cache_[table_id] = nullptr;
701
1.61k
  } else {
702
1.61k
    auto it = table_cache_.find(table_id);
703
1.61k
    if (it != table_cache_.end() && it->second) {
704
850
      table_cache_.erase(it);
705
850
    }
706
1.61k
  }
707
1.61k
}
708
709
613
void PgSession::InvalidateAllTablesCache() {
710
613
  invalidate_table_cache_time_ = CoarseMonoClock::now();
711
613
  table_cache_.clear();
712
613
}
713
714
201k
Status PgSession::StartOperationsBuffering() {
715
201k
  SCHECK(!buffering_enabled_, IllegalState, "Buffering has been already started");
716
201k
  if (PREDICT_FALSE(!buffered_keys_.empty())) {
717
0
    LOG(DFATAL) << "Buffering hasn't been started yet but "
718
0
                << buffered_keys_.size()
719
0
                << " buffered operations found";
720
0
  }
721
201k
  buffering_enabled_ = true;
722
201k
  return Status::OK();
723
201k
}
724
725
174k
Status PgSession::StopOperationsBuffering() {
726
174k
  SCHECK(buffering_enabled_, IllegalState, "Buffering hasn't been started");
727
174k
  buffering_enabled_ = false;
728
174k
  return FlushBufferedOperations();
729
174k
}
730
731
48.0k
void PgSession::ResetOperationsBuffering() {
732
48.0k
  DropBufferedOperations();
733
48.0k
  buffering_enabled_ = false;
734
48.0k
}
735
736
675k
Status PgSession::FlushBufferedOperations() {
737
150k
  return FlushBufferedOperationsImpl([this](auto ops, auto txn) {
738
150k
    return this->FlushOperations(std::move(ops), txn);
739
150k
  });
740
675k
}
741
742
103k
void PgSession::DropBufferedOperations() {
743
30
  VLOG_IF(1, !buffered_keys_.empty())
744
30
          << "Dropping " << buffered_keys_.size() << " pending operations";
745
103k
  buffered_keys_.clear();
746
103k
  buffered_ops_.Clear();
747
103k
  buffered_txn_ops_.Clear();
748
103k
}
749
750
2.00M
PgIsolationLevel PgSession::GetIsolationLevel() {
751
2.00M
  return pg_txn_manager_->GetPgIsolationLevel();
752
2.00M
}
753
754
707k
Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) {
755
707k
  auto ops = std::move(buffered_ops_);
756
707k
  auto txn_ops = std::move(buffered_txn_ops_);
757
707k
  buffered_keys_.clear();
758
707k
  buffered_ops_.Clear();
759
707k
  buffered_txn_ops_.Clear();
760
707k
  if (!ops.empty()) {
761
55.3k
    RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse));
762
55.3k
  }
763
707k
  if (!txn_ops.empty()) {
764
126k
    SCHECK(!YBCIsInitDbModeEnvVarSet(),
765
126k
           IllegalState,
766
126k
           "No transactional operations are expected in the initdb mode");
767
126k
    RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue));
768
126k
  }
769
685k
  return Status::OK();
770
707k
}
771
772
2.74M
Result<bool> PgSession::ShouldHandleTransactionally(const PgTableDesc& table, const PgsqlOp& op) {
773
2.74M
  if (!table.schema().table_properties().is_transactional() ||
774
2.74M
      !op.need_transaction() ||
775
2.59M
      YBCIsInitDbModeEnvVarSet()) {
776
158k
    return false;
777
158k
  }
778
2.59M
  const auto has_non_ddl_txn = pg_txn_manager_->IsTxnInProgress();
779
2.59M
  if (!table.schema().table_properties().is_ysql_catalog_table()) {
780
1.99M
    SCHECK(has_non_ddl_txn, IllegalState, "Transactional operation requires transaction");
781
1.99M
    return true;
782
597k
  }
783
  // Previously, yb_non_ddl_txn_for_sys_tables_allowed flag caused CREATE VIEW to fail with
784
  // read restart error because subsequent cache refresh used an outdated txn to read from the
785
  // system catalog,
786
  // As a quick fix, we prevent yb_non_ddl_txn_for_sys_tables_allowed from affecting reads.
787
597k
  if (pg_txn_manager_->IsDdlMode() || (yb_non_ddl_txn_for_sys_tables_allowed && has_non_ddl_txn)) {
788
402k
    return true;
789
402k
  }
790
194k
  if (op.is_write()) {
791
    // For consistent read from catalog tables all write operations must be done in transaction.
792
0
    return STATUS_FORMAT(IllegalState,
793
0
                         "Transaction for catalog table write operation '$0' not found",
794
0
                         table.table_name().table_name());
795
0
  }
796
194k
  return false;
797
194k
}
798
799
0
Result<bool> PgSession::IsInitDbDone() {
800
0
  return pg_client_.IsInitDbDone();
801
0
}
802
803
150k
Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional) {
804
150k
  DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size);
805
806
150k
  if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
807
0
    LOG(INFO) << "Flushing buffered operations, using "
808
0
              << (transactional ? "transactional" : "non-transactional")
809
0
              << " session (num ops: " << ops.size() << ")";
810
0
  }
811
812
150k
  if (transactional) {
813
95.5k
    TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange;
814
95.5k
    if (GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
815
17.2k
      txn_priority_requirement = kHighestPriority;
816
17.2k
    }
817
818
95.5k
    RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(false, txn_priority_requirement));
819
95.5k
    in_txn_limit_ = clock_->Now();
820
95.5k
  }
821
822
150k
  std::promise<PerformResult> promise;
823
150k
  Perform(&ops.operations, [&promise](const PerformResult& result) {
824
150k
    promise.set_value(result);
825
150k
  });
826
150k
  PerformFuture future(promise.get_future(), this, &ops.relations);
827
150k
  return future.Get();
828
150k
}
829
830
774k
void PgSession::Perform(PgsqlOps* operations, const PerformCallback& callback) {
831
774k
  tserver::PgPerformOptionsPB options;
832
833
774k
  if (use_catalog_session_) {
834
194k
    if (catalog_read_time_) {
835
194k
      if (*catalog_read_time_) {
836
175k
        catalog_read_time_->ToPB(options.mutable_read_time());
837
18.6k
      } else {
838
18.6k
        options.mutable_read_time();
839
18.6k
      }
840
194k
    }
841
194k
    options.set_use_catalog_session(true);
842
194k
    use_catalog_session_ = false;
843
580k
  } else {
844
580k
    pg_txn_manager_->SetupPerformOptions(&options);
845
846
580k
    if (in_txn_limit_ && pg_txn_manager_->IsTxnInProgress()) {
847
580k
      options.set_in_txn_limit_ht(in_txn_limit_.ToUint64());
848
580k
    }
849
580k
  }
850
774k
  options.set_force_global_transaction(yb_force_global_transaction);
851
852
774k
  pg_client_.PerformAsync(&options, operations, callback);
853
774k
}
854
855
158k
Result<uint64_t> PgSession::GetSharedCatalogVersion() {
856
158k
  if (tserver_shared_object_) {
857
158k
    return (**tserver_shared_object_).ysql_catalog_version();
858
17
  } else {
859
17
    return STATUS(NotSupported, "Tablet server shared memory has not been opened");
860
17
  }
861
158k
}
862
863
235
Result<uint64_t> PgSession::GetSharedAuthKey() {
864
235
  if (tserver_shared_object_) {
865
235
    return (**tserver_shared_object_).postgres_auth_key();
866
0
  } else {
867
0
    return STATUS(NotSupported, "Tablet server shared memory has not been opened");
868
0
  }
869
235
}
870
871
Result<bool> PgSession::ForeignKeyReferenceExists(PgOid table_id,
872
                                                  const Slice& ybctid,
873
232k
                                                  const YbctidReader& reader) {
874
232k
  if (Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end()) {
875
232k
    return true;
876
232k
  }
877
878
  // Check existence of required FK intent.
879
  // Absence means the key was checked by previous batched request and was not found.
880
529
  if (!Erase(&fk_reference_intent_, table_id, ybctid)) {
881
4
    return false;
882
4
  }
883
525
  std::vector<Slice> ybctids;
884
525
  const auto reserved_size = std::min<size_t>(FLAGS_ysql_session_max_batch_size,
885
525
                                              fk_reference_intent_.size() + 1);
886
525
  ybctids.reserve(reserved_size);
887
525
  ybctids.push_back(ybctid);
888
  // TODO(dmitry): In case number of keys for same table > FLAGS_ysql_session_max_batch_size
889
  // two strategy are possible:
890
  // 1. select keys belonging to same tablet to reduce number of simultaneous RPC
891
  // 2. select keys belonging to different tablets to distribute reads among different nodes
892
195k
  const auto intent_match = [table_id](const auto& key) { return key.table_id == table_id; };
893
525
  for (auto it = fk_reference_intent_.begin();
894
100k
       it != fk_reference_intent_.end() && ybctids.size() < FLAGS_ysql_session_max_batch_size;
895
99.8k
       ++it) {
896
99.8k
    if (intent_match(*it)) {
897
99.8k
      ybctids.push_back(it->ybctid);
898
99.8k
    }
899
99.8k
  }
900
100k
  for (auto& r : VERIFY_RESULT(reader(table_id, ybctids))) {
901
100k
    fk_reference_cache_.emplace(table_id, std::move(r));
902
100k
  }
903
  // Remove used intents.
904
518
  auto intent_count_for_remove = ybctids.size() - 1;
905
518
  if (intent_count_for_remove == fk_reference_intent_.size()) {
906
305
    fk_reference_intent_.clear();
907
213
  } else {
908
213
    for (auto it = fk_reference_intent_.begin();
909
96.2k
        it != fk_reference_intent_.end() && intent_count_for_remove > 0;) {
910
96.0k
      if (intent_match(*it)) {
911
96.0k
        it = fk_reference_intent_.erase(it);
912
96.0k
        --intent_count_for_remove;
913
4
      } else {
914
4
        ++it;
915
4
      }
916
96.0k
    }
917
213
  }
918
518
  return Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end();
919
525
}
920
921
240k
void PgSession::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) {
922
240k
  if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) {
923
240k
    fk_reference_intent_.emplace(table_id, ybctid.ToBuffer());
924
240k
  }
925
240k
}
926
927
1.51M
void PgSession::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) {
928
1.51M
  if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) {
929
1.50M
    fk_reference_cache_.emplace(table_id, ybctid.ToBuffer());
930
1.50M
  }
931
1.51M
}
932
933
62.9k
void PgSession::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) {
934
62.9k
  Erase(&fk_reference_cache_, table_id, ybctid);
935
62.9k
}
936
937
775k
Status PgSession::PatchStatus(const Status& status, const PgObjectIds& relations) {
938
775k
  if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR) {
939
1.02k
    auto op_index = OpIndex::ValueFromStatus(status);
940
1.02k
    if (op_index && *op_index < relations.size()) {
941
1.02k
      char constraint_name[0xFF];
942
1.02k
      constraint_name[sizeof(constraint_name) - 1] = 0;
943
1.02k
      pg_callbacks_.FetchUniqueConstraintName(relations[*op_index].object_oid,
944
1.02k
                                              constraint_name,
945
1.02k
                                              sizeof(constraint_name) - 1);
946
1.02k
      return STATUS(
947
1.02k
          AlreadyPresent,
948
1.02k
          Format("duplicate key value violates unique constraint \"$0\"", Slice(constraint_name)),
949
1.02k
          Slice(),
950
1.02k
          PgsqlError(YBPgErrorCode::YB_PG_UNIQUE_VIOLATION));
951
1.02k
    }
952
774k
  }
953
774k
  return status;
954
774k
}
955
956
178
Result<int> PgSession::TabletServerCount(bool primary_only) {
957
178
  return pg_client_.TabletServerCount(primary_only);
958
178
}
959
960
2
Result<client::TabletServersInfo> PgSession::ListTabletServers() {
961
2
  return pg_client_.ListLiveTabletServers(false);
962
2
}
963
964
562k
bool PgSession::ShouldUseFollowerReads() const {
965
562k
  return pg_txn_manager_->ShouldUseFollowerReads();
966
562k
}
967
968
0
void PgSession::SetTimeout(const int timeout_ms) {
969
0
  session_->SetTimeout(MonoDelta::FromMilliseconds(timeout_ms));
970
0
  pg_client_.SetTimeout(timeout_ms * 1ms);
971
0
}
972
973
765k
void PgSession::ResetCatalogReadPoint() {
974
765k
  catalog_read_time_ = ReadHybridTime();
975
765k
}
976
977
775k
void PgSession::TrySetCatalogReadPoint(const ReadHybridTime& read_ht) {
978
775k
  if (read_ht) {
979
18.5k
    catalog_read_time_ = read_ht;
980
18.5k
  }
981
775k
}
982
983
48.8k
Status PgSession::SetActiveSubTransaction(SubTransactionId id) {
984
  // It's required that we flush all buffered operations before changing the SubTransactionMetadata
985
  // used by the underlying batcher and RPC logic, as this will snapshot the current
986
  // SubTransactionMetadata for use in construction of RPCs for already-queued operations, thereby
987
  // ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here,
988
  // already queued operations may incorrectly use this newly modified SubTransactionMetadata when
989
  // they are eventually sent to DocDB.
990
48.8k
  RETURN_NOT_OK(FlushBufferedOperations());
991
48.8k
  tserver::PgPerformOptionsPB* options_ptr = nullptr;
992
48.8k
  tserver::PgPerformOptionsPB options;
993
48.8k
  if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) {
994
307
    auto txn_priority_requirement = kLowerPriorityRange;
995
307
    if (pg_txn_manager_->GetPgIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
996
299
      txn_priority_requirement = kHighestPriority;
997
299
    }
998
307
    RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(
999
307
        IsReadOnlyOperation::kFalse, txn_priority_requirement));
1000
307
    options_ptr = &options;
1001
307
    pg_txn_manager_->SetupPerformOptions(&options);
1002
307
  }
1003
48.8k
  return pg_client_.SetActiveSubTransaction(id, options_ptr);
1004
48.8k
}
1005
1006
23.5k
Status PgSession::RollbackSubTransaction(SubTransactionId id) {
1007
  // TODO(savepoints) -- send async RPC to transaction status tablet, or rely on heartbeater to
1008
  // eventually send this metadata.
1009
  // See comment in SetActiveSubTransaction -- we must flush buffered operations before updating any
1010
  // SubTransactionMetadata.
1011
23.5k
  RETURN_NOT_OK(FlushBufferedOperations());
1012
23.5k
  return pg_client_.RollbackSubTransaction(id);
1013
23.5k
}
1014
1015
1.71M
void PgSession::UpdateInTxnLimit(uint64_t* read_time) {
1016
1.71M
  if (!read_time) {
1017
0
    return;
1018
0
  }
1019
1020
1.71M
  if (!*read_time) {
1021
252k
    *read_time = clock_->Now().ToUint64();
1022
252k
  }
1023
1.71M
  in_txn_limit_ = HybridTime(*read_time);
1024
1.71M
}
1025
1026
0
Status PgSession::ValidatePlacement(const string& placement_info) {
1027
0
  tserver::PgValidatePlacementRequestPB req;
1028
1029
0
  Result<PlacementInfoConverter::Placement> result =
1030
0
      PlacementInfoConverter::FromString(placement_info);
1031
1032
  // For validation, if there is no replica_placement option, we default to the
1033
  // cluster configuration which the user is responsible for maintaining
1034
0
  if (!result.ok() && result.status().IsInvalidArgument()) {
1035
0
    return Status::OK();
1036
0
  }
1037
1038
0
  RETURN_NOT_OK(result);
1039
1040
0
  PlacementInfoConverter::Placement placement = result.get();
1041
0
  for (const auto& block : placement.placement_infos) {
1042
0
    auto pb = req.add_placement_infos();
1043
0
    pb->set_cloud(block.cloud);
1044
0
    pb->set_region(block.region);
1045
0
    pb->set_zone(block.zone);
1046
0
    pb->set_min_num_replicas(block.min_num_replicas);
1047
0
  }
1048
0
  req.set_num_replicas(placement.num_replicas);
1049
1050
0
  return pg_client_.ValidatePlacement(&req);
1051
0
}
1052
1053
}  // namespace pggate
1054
}  // namespace yb