YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/pg_client_session.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/pg_client_session.h"
15
16
#include "yb/client/batcher.h"
17
#include "yb/client/client.h"
18
#include "yb/client/error.h"
19
#include "yb/client/namespace_alterer.h"
20
#include "yb/client/session.h"
21
#include "yb/client/table.h"
22
#include "yb/client/table_alterer.h"
23
#include "yb/client/transaction.h"
24
#include "yb/client/transaction_pool.h"
25
#include "yb/client/yb_op.h"
26
27
#include "yb/common/ql_type.h"
28
#include "yb/common/pgsql_error.h"
29
#include "yb/common/transaction_error.h"
30
#include "yb/common/schema.h"
31
#include "yb/common/wire_protocol.h"
32
33
#include "yb/gutil/casts.h"
34
35
#include "yb/rpc/rpc_context.h"
36
37
#include "yb/tserver/pg_client.pb.h"
38
#include "yb/tserver/pg_create_table.h"
39
#include "yb/tserver/pg_table_cache.h"
40
41
#include "yb/util/logging.h"
42
#include "yb/util/result.h"
43
#include "yb/util/scope_exit.h"
44
#include "yb/util/status_format.h"
45
#include "yb/util/string_util.h"
46
#include "yb/util/yb_pg_errcodes.h"
47
48
#include "yb/yql/pggate/util/pg_doc_data.h"
49
50
DECLARE_bool(ysql_serializable_isolation_for_ddl_txn);
51
52
namespace yb {
53
namespace tserver {
54
55
namespace {
56
57
constexpr const size_t kPgSequenceLastValueColIdx = 2;
58
constexpr const size_t kPgSequenceIsCalledColIdx = 3;
59
60
0
std::string SessionLogPrefix(uint64_t id) {
61
0
  return Format("S $0: ", id);
62
0
}
63
64
40.2k
string GetStatusStringSet(const client::CollectedErrors& errors) {
65
40.2k
  std::set<string> status_strings;
66
881k
  for (const auto& error : errors) {
67
881k
    status_strings.insert(error->status().ToString());
68
881k
  }
69
40.2k
  return RangeToString(status_strings.begin(), status_strings.end());
70
40.2k
}
71
72
93.1k
bool IsHomogeneousErrors(const client::CollectedErrors& errors) {
73
93.1k
  if (errors.size() < 2) {
74
52.8k
    return true;
75
52.8k
  }
76
40.2k
  auto i = errors.begin();
77
40.2k
  const auto& status = (**i).status();
78
40.2k
  const auto codes = status.ErrorCodesSlice();
79
529k
  for (++i; i != errors.end(); 
++i488k
) {
80
514k
    const auto& s = (**i).status();
81
514k
    if (s.code() != status.code() || 
codes != s.ErrorCodesSlice()514k
) {
82
25.7k
      return false;
83
25.7k
    }
84
514k
  }
85
14.4k
  return true;
86
40.2k
}
87
88
660k
boost::optional<YBPgErrorCode> PsqlErrorCode(const Status& status) {
89
660k
  const uint8_t* err_data = status.ErrorData(PgsqlErrorTag::kCategory);
90
660k
  if (err_data) {
91
333k
    return PgsqlErrorTag::Decode(err_data);
92
333k
  }
93
327k
  return boost::none;
94
660k
}
95
96
// Get a common Postgres error code from the status and all errors, and append it to a previous
97
// Status.
98
// If any of those have different conflicting error codes, previous result is returned as-is.
99
CHECKED_STATUS AppendPsqlErrorCode(const Status& status,
100
25.7k
                                   const client::CollectedErrors& errors) {
101
25.7k
  boost::optional<YBPgErrorCode> common_psql_error =  boost::make_optional(false, YBPgErrorCode());
102
660k
  for(const auto& error : errors) {
103
660k
    const auto psql_error = PsqlErrorCode(error->status());
104
660k
    if (!common_psql_error) {
105
173k
      common_psql_error = psql_error;
106
487k
    } else if (psql_error && 
common_psql_error != psql_error307k
) {
107
0
      common_psql_error = boost::none;
108
0
      break;
109
0
    }
110
660k
  }
111
25.7k
  return common_psql_error ? status.CloneAndAddErrorCode(PgsqlError(*common_psql_error)) : 
status0
;
112
25.7k
}
113
114
// Get a common transaction error code for all the errors and append it to the previous Status.
115
25.7k
CHECKED_STATUS AppendTxnErrorCode(const Status& status, const client::CollectedErrors& errors) {
116
25.7k
  TransactionErrorCode common_txn_error = TransactionErrorCode::kNone;
117
660k
  for (const auto& error : errors) {
118
660k
    const TransactionErrorCode txn_error = TransactionError(error->status()).value();
119
660k
    if (txn_error == TransactionErrorCode::kNone ||
120
660k
        
txn_error == common_txn_error660k
) {
121
449k
      continue;
122
449k
    }
123
211k
    if (common_txn_error == TransactionErrorCode::kNone) {
124
25.7k
      common_txn_error = txn_error;
125
25.7k
      continue;
126
25.7k
    }
127
    // If we receive a list of errors, with one as kConflict and others as kAborted, we retain the
128
    // error as kConflict, since in case of a batched request the first operation would receive the
129
    // kConflict and all the others would receive the kAborted error.
130
185k
    if ((txn_error == TransactionErrorCode::kConflict &&
131
185k
         
common_txn_error == TransactionErrorCode::kAborted13.2k
) ||
132
185k
        
(172k
txn_error == TransactionErrorCode::kAborted172k
&&
133
185k
         
common_txn_error == TransactionErrorCode::kConflict172k
)) {
134
185k
      common_txn_error = TransactionErrorCode::kConflict;
135
185k
      continue;
136
185k
    }
137
138
    // In all the other cases, reset the common_txn_error to kNone.
139
18.4E
    common_txn_error = TransactionErrorCode::kNone;
140
18.4E
    break;
141
185k
  }
142
143
25.7k
  return (common_txn_error != TransactionErrorCode::kNone) ?
144
18.4E
    
status.CloneAndAddErrorCode(TransactionError(common_txn_error))25.7k
: status;
145
25.7k
}
146
147
// Given a set of errors from operations, this function attempts to combine them into one status
148
// that is later passed to PostgreSQL and further converted into a more specific error code.
149
2.17M
CHECKED_STATUS CombineErrorsToStatus(const client::CollectedErrors& errors, const Status& status) {
150
2.17M
  if (errors.empty())
151
2.07M
    return status;
152
153
93.4k
  if (status.IsIOError() &&
154
      // TODO: move away from string comparison here and use a more specific status than IOError.
155
      // See https://github.com/YugaByte/yugabyte-db/issues/702
156
93.4k
      
status.message() == client::internal::Batcher::kErrorReachingOutToTServersMsg93.1k
&&
157
93.4k
      
IsHomogeneousErrors(errors)93.1k
) {
158
67.3k
    const auto& result = errors.front()->status();
159
67.3k
    if (errors.size() == 1) {
160
52.8k
      return result;
161
52.8k
    }
162
14.5k
    return Status(result.code(),
163
14.5k
                  __FILE__,
164
14.5k
                  __LINE__,
165
14.5k
                  GetStatusStringSet(errors),
166
14.5k
                  result.ErrorCodesSlice(),
167
14.5k
                  DupFileName::kFalse);
168
67.3k
  }
169
170
26.0k
  Status result =
171
26.0k
    status.ok()
172
26.0k
    ? 
STATUS0
(InternalError, GetStatusStringSet(errors))
173
26.0k
    : status.CloneAndAppend(". Errors from tablet servers: " + GetStatusStringSet(errors));
174
175
26.0k
  return AppendTxnErrorCode(AppendPsqlErrorCode(result, errors), errors);
176
93.4k
}
177
178
10.3M
Status HandleResponse(const client::YBPgsqlOp& op, PgPerformResponsePB* resp) {
179
10.3M
  const auto& response = op.response();
180
10.3M
  if (response.status() == PgsqlResponsePB::PGSQL_STATUS_OK) {
181
10.3M
    if (op.read_only() && 
op.table()->schema().table_properties().is_ysql_catalog_table()3.18M
) {
182
1.25M
      const auto& pgsql_op = down_cast<const client::YBPgsqlReadOp&>(op);
183
1.25M
      if (pgsql_op.used_read_time()) {
184
        // Non empty used_read_time field in catalog read operation means this is the very first
185
        // catalog read operation after catalog read time resetting. read_time for the operation
186
        // has been chosen by master. All further reads from catalog must use same read point.
187
77.1k
        auto catalog_read_time = pgsql_op.used_read_time();
188
189
        // We set global limit to local limit to avoid read restart errors because they are
190
        // disruptive to system catalog reads and it is not always possible to handle them there.
191
        // This might lead to reading slightly outdated state of the system catalog if a recently
192
        // committed DDL transaction used a transaction status tablet whose leader's clock is skewed
193
        // and is in the future compared to the master leader's clock.
194
        // TODO(dmitry) This situation will be handled in context of #7964.
195
77.1k
        catalog_read_time.global_limit = catalog_read_time.local_limit;
196
77.1k
        catalog_read_time.ToPB(resp->mutable_catalog_read_time());
197
77.1k
      }
198
1.25M
    }
199
10.3M
    return Status::OK();
200
10.3M
  }
201
202
12.4k
  auto status = STATUS(
203
12.4k
      QLError, response.error_message(), Slice(), PgsqlRequestStatus(response.status()));
204
205
12.4k
  if (response.has_pg_error_code()) {
206
1.85k
    status = status.CloneAndAddErrorCode(
207
1.85k
        PgsqlError(static_cast<YBPgErrorCode>(response.pg_error_code())));
208
1.85k
  }
209
210
12.4k
  if (response.has_txn_error_code()) {
211
1.85k
    status = status.CloneAndAddErrorCode(
212
1.85k
        TransactionError(static_cast<TransactionErrorCode>(response.txn_error_code())));
213
1.85k
  }
214
215
12.4k
  return status;
216
10.3M
}
217
218
11.5M
CHECKED_STATUS GetTable(const TableId& table_id, PgTableCache* cache, client::YBTablePtr* table) {
219
11.5M
  if (*table && 
(**table).id() == table_id9.39M
) {
220
8.00M
    return Status::OK();
221
8.00M
  }
222
3.55M
  *table = 
VERIFY_RESULT3.55M
(3.55M
cache->Get(table_id));
223
0
  return Status::OK();
224
3.55M
}
225
226
Result<PgClientSessionOperations> PrepareOperations(
227
2.16M
    const PgPerformRequestPB& req, client::YBSession* session, PgTableCache* table_cache) {
228
2.16M
  auto write_time = HybridTime::FromPB(req.write_time());
229
2.16M
  std::vector<std::shared_ptr<client::YBPgsqlOp>> ops;
230
2.16M
  ops.reserve(req.ops().size());
231
2.16M
  client::YBTablePtr table;
232
2.16M
  bool finished = false;
233
2.17M
  auto se = ScopeExit([&finished, session] {
234
2.17M
    if (!finished) {
235
12
      session->Abort();
236
12
    }
237
2.17M
  });
238
11.5M
  for (const auto& op : req.ops()) {
239
11.5M
    if (op.has_read()) {
240
4.35M
      const auto& read = op.read();
241
4.35M
      RETURN_NOT_OK(GetTable(read.table_id(), table_cache, &table));
242
4.35M
      const auto read_op = std::make_shared<client::YBPgsqlReadOp>(
243
4.35M
          table, const_cast<PgsqlReadRequestPB*>(&read));
244
4.35M
      if (op.read_from_followers()) {
245
93
        read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
246
93
      }
247
4.35M
      ops.push_back(read_op);
248
4.35M
      session->Apply(std::move(read_op));
249
7.20M
    } else {
250
7.20M
      const auto& write = op.write();
251
7.20M
      RETURN_NOT_OK(GetTable(write.table_id(), table_cache, &table));
252
7.20M
      const auto write_op = std::make_shared<client::YBPgsqlWriteOp>(
253
7.20M
          table, const_cast<PgsqlWriteRequestPB*>(&write));
254
7.20M
      if (write_time) {
255
1.26k
        write_op->SetWriteTime(write_time);
256
1.26k
        write_time = HybridTime::kInvalid;
257
1.26k
      }
258
7.20M
      ops.push_back(write_op);
259
7.20M
      session->Apply(std::move(write_op));
260
7.20M
    }
261
11.5M
  }
262
2.16M
  finished = true;
263
2.16M
  return ops;
264
2.16M
}
265
266
struct PerformData {
267
  uint64_t session_id;
268
  const PgPerformRequestPB* req;
269
  PgPerformResponsePB* resp;
270
  rpc::RpcContext context;
271
  PgClientSessionOperations ops;
272
  PgTableCache* table_cache;
273
274
2.17M
  void FlushDone(client::FlushStatus* flush_status) {
275
2.17M
    auto status = CombineErrorsToStatus(flush_status->errors, flush_status->status);
276
2.17M
    if (status.ok()) {
277
2.07M
      status = ProcessResponse();
278
2.07M
    }
279
2.17M
    if (!status.ok()) {
280
106k
      StatusToPB(status, resp->mutable_status());
281
106k
    }
282
2.17M
    context.RespondSuccess();
283
2.17M
  }
284
285
2.07M
  CHECKED_STATUS ProcessResponse() {
286
2.07M
    int idx = 0;
287
10.3M
    for (const auto& op : ops) {
288
10.3M
      const auto status = HandleResponse(*op, resp);
289
10.3M
      if (!status.ok()) {
290
13.0k
        if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH) {
291
79
          table_cache->Invalidate(op->table()->id());
292
79
        }
293
18.4E
        VLOG(2) << SessionLogPrefix(session_id) << "Failed op " << idx << ": " << status;
294
13.0k
        return status.CloneAndAddErrorCode(OpIndex(idx));
295
13.0k
      }
296
10.3M
      const auto& req_op = req->ops()[idx];
297
10.3M
      if (req_op.has_read() && 
req_op.read().is_for_backfill()3.18M
&&
298
10.3M
          
op->response().is_backfill_batch_done()2.33k
) {
299
        // After backfill table schema version is updated, so we reset cache in advance.
300
2.25k
        table_cache->Invalidate(op->table()->id());
301
2.25k
      }
302
10.3M
      ++idx;
303
10.3M
    }
304
305
2.06M
    auto& responses = *resp->mutable_responses();
306
2.06M
    responses.Reserve(narrow_cast<int>(ops.size()));
307
10.3M
    for (const auto& op : ops) {
308
10.3M
      auto& op_resp = *responses.Add();
309
10.3M
      op_resp.Swap(op->mutable_response());
310
10.3M
      if (
op_resp.has_rows_data_sidecar()10.3M
) {
311
10.3M
        op_resp.set_rows_data_sidecar(narrow_cast<int>(context.AddRpcSidecar(op->rows_data())));
312
10.3M
      }
313
10.3M
    }
314
315
2.06M
    return Status::OK();
316
2.07M
  }
317
};
318
319
client::YBSessionPtr CreateSession(
320
12.6k
    client::YBClient* client, const scoped_refptr<ClockBase>& clock) {
321
12.6k
  auto result = std::make_shared<client::YBSession>(client, clock);
322
12.6k
  result->SetForceConsistentRead(client::ForceConsistentRead::kTrue);
323
12.6k
  result->set_allow_local_calls_in_curr_thread(false);
324
12.6k
  return result;
325
12.6k
}
326
327
} // namespace
328
329
PgClientSession::PgClientSession(
330
    client::YBClient* client, const scoped_refptr<ClockBase>& clock,
331
    std::reference_wrapper<const TransactionPoolProvider> transaction_pool_provider,
332
    PgTableCache* table_cache, uint64_t id)
333
    : client_(*client),
334
      clock_(clock),
335
      transaction_pool_provider_(transaction_pool_provider.get()),
336
6.09k
      table_cache_(*table_cache), id_(id) {
337
6.09k
}
338
339
2.49M
uint64_t PgClientSession::id() const {
340
2.49M
  return id_;
341
2.49M
}
342
343
Status PgClientSession::CreateTable(
344
5.05k
    const PgCreateTableRequestPB& req, PgCreateTableResponsePB* resp, rpc::RpcContext* context) {
345
5.05k
  PgCreateTable helper(req);
346
5.05k
  RETURN_NOT_OK(helper.Prepare());
347
5.05k
  const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata(
348
5.05k
      req.use_transaction(), context->GetClientDeadline()));
349
5.05k
  RETURN_NOT_OK(helper.Exec(&client(), metadata, context->GetClientDeadline()));
350
5.03k
  
VLOG_WITH_PREFIX2
(1) << __func__ << ": " << req.table_name()2
;
351
5.03k
  const auto& indexed_table_id = helper.indexed_table_id();
352
5.03k
  if (indexed_table_id.IsValid()) {
353
855
    table_cache_.Invalidate(indexed_table_id.GetYbTableId());
354
855
  }
355
5.03k
  return Status::OK();
356
5.05k
}
357
358
Status PgClientSession::CreateDatabase(
359
    const PgCreateDatabaseRequestPB& req, PgCreateDatabaseResponsePB* resp,
360
134
    rpc::RpcContext* context) {
361
134
  return client().CreateNamespace(
362
134
      req.database_name(),
363
134
      YQL_DATABASE_PGSQL,
364
134
      "" /* creator_role_name */,
365
134
      GetPgsqlNamespaceId(req.database_oid()),
366
134
      req.source_database_oid() != kPgInvalidOid
367
134
          ? 
GetPgsqlNamespaceId(req.source_database_oid())123
:
""11
,
368
134
      req.next_oid(),
369
134
      VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction(), context->GetClientDeadline())),
370
0
      req.colocated(),
371
134
      context->GetClientDeadline());
372
134
}
373
374
Status PgClientSession::DropDatabase(
375
72
    const PgDropDatabaseRequestPB& req, PgDropDatabaseResponsePB* resp, rpc::RpcContext* context) {
376
72
  return client().DeleteNamespace(
377
72
      req.database_name(),
378
72
      YQL_DATABASE_PGSQL,
379
72
      GetPgsqlNamespaceId(req.database_oid()),
380
72
      context->GetClientDeadline());
381
72
}
382
383
Status PgClientSession::DropTable(
384
4.14k
    const PgDropTableRequestPB& req, PgDropTableResponsePB* resp, rpc::RpcContext* context) {
385
4.14k
  const auto yb_table_id = PgObjectId::GetYbTableIdFromPB(req.table_id());
386
4.14k
  if (req.index()) {
387
669
    client::YBTableName indexed_table;
388
669
    RETURN_NOT_OK(client().DeleteIndexTable(
389
669
        yb_table_id, &indexed_table, true, context->GetClientDeadline()));
390
667
    indexed_table.SetIntoTableIdentifierPB(resp->mutable_indexed_table());
391
667
    table_cache_.Invalidate(indexed_table.table_id());
392
667
    table_cache_.Invalidate(yb_table_id);
393
667
    return Status::OK();
394
669
  }
395
396
3.48k
  RETURN_NOT_OK(client().DeleteTable(yb_table_id, true, context->GetClientDeadline()));
397
3.47k
  table_cache_.Invalidate(yb_table_id);
398
3.47k
  return Status::OK();
399
3.48k
}
400
401
Status PgClientSession::AlterDatabase(
402
    const PgAlterDatabaseRequestPB& req, PgAlterDatabaseResponsePB* resp,
403
3
    rpc::RpcContext* context) {
404
3
  const auto alterer = client().NewNamespaceAlterer(
405
3
      req.database_name(), GetPgsqlNamespaceId(req.database_oid()));
406
3
  alterer->SetDatabaseType(YQL_DATABASE_PGSQL);
407
3
  alterer->RenameTo(req.new_name());
408
3
  return alterer->Alter(context->GetClientDeadline());
409
3
}
410
411
Status PgClientSession::AlterTable(
412
522
    const PgAlterTableRequestPB& req, PgAlterTableResponsePB* resp, rpc::RpcContext* context) {
413
522
  const auto table_id = PgObjectId::GetYbTableIdFromPB(req.table_id());
414
522
  const auto alterer = client().NewTableAlterer(table_id);
415
522
  const auto txn = VERIFY_RESULT(GetDdlTransactionMetadata(
416
522
      req.use_transaction(), context->GetClientDeadline()));
417
522
  if (txn) {
418
522
    alterer->part_of_transaction(txn);
419
522
  }
420
522
  for (const auto& add_column : req.add_columns()) {
421
234
    const auto yb_type = QLType::Create(static_cast<DataType>(add_column.attr_ybtype()));
422
234
    alterer->AddColumn(add_column.attr_name())
423
234
           ->Type(yb_type)->Order(add_column.attr_num())->PgTypeOid(add_column.attr_pgoid());
424
    // Do not set 'nullable' attribute as PgCreateTable::AddColumn() does not do it.
425
234
  }
426
522
  for (const auto& rename_column : req.rename_columns()) {
427
15
    alterer->AlterColumn(rename_column.old_name())->RenameTo(rename_column.new_name());
428
15
  }
429
522
  for (const auto& drop_column : req.drop_columns()) {
430
177
    alterer->DropColumn(drop_column);
431
177
  }
432
522
  if (!req.rename_table().table_name().empty()) {
433
115
    client::YBTableName new_table_name(
434
115
        YQL_DATABASE_PGSQL, req.rename_table().database_name(), req.rename_table().table_name());
435
115
    alterer->RenameTo(new_table_name);
436
115
  }
437
438
522
  alterer->timeout(context->GetClientDeadline() - CoarseMonoClock::now());
439
522
  RETURN_NOT_OK(alterer->Alter());
440
520
  table_cache_.Invalidate(table_id);
441
520
  return Status::OK();
442
522
}
443
444
Status PgClientSession::TruncateTable(
445
    const PgTruncateTableRequestPB& req, PgTruncateTableResponsePB* resp,
446
624
    rpc::RpcContext* context) {
447
624
  return client().TruncateTable(PgObjectId::GetYbTableIdFromPB(req.table_id()));
448
624
}
449
450
Status PgClientSession::BackfillIndex(
451
    const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp,
452
540
    rpc::RpcContext* context) {
453
540
  return client().BackfillIndex(
454
540
      PgObjectId::GetYbTableIdFromPB(req.table_id()), /* wait= */ true,
455
540
      context->GetClientDeadline());
456
540
}
457
458
Status PgClientSession::CreateTablegroup(
459
    const PgCreateTablegroupRequestPB& req, PgCreateTablegroupResponsePB* resp,
460
54
    rpc::RpcContext* context) {
461
54
  const auto id = PgObjectId::FromPB(req.tablegroup_id());
462
54
  auto tablespace_id = PgObjectId::FromPB(req.tablespace_id());
463
54
  auto s = client().CreateTablegroup(
464
54
      req.database_name(), GetPgsqlNamespaceId(id.database_oid),
465
54
      id.GetYbTablegroupId(),
466
54
      tablespace_id.IsValid() ? 
tablespace_id.GetYbTablespaceId()10
:
""44
);
467
54
  if (s.ok()) {
468
52
    return Status::OK();
469
52
  }
470
471
2
  if (s.IsAlreadyPresent()) {
472
0
    return STATUS(InvalidArgument, "Duplicate tablegroup");
473
0
  }
474
475
2
  if (s.IsNotFound()) {
476
0
    return STATUS(InvalidArgument, "Database not found", req.database_name());
477
0
  }
478
479
2
  return STATUS_FORMAT(
480
2
      InvalidArgument, "Invalid table definition: $0",
481
2
      s.ToString(false /* include_file_and_line */, false /* include_code */));
482
2
}
483
484
Status PgClientSession::DropTablegroup(
485
    const PgDropTablegroupRequestPB& req, PgDropTablegroupResponsePB* resp,
486
39
    rpc::RpcContext* context) {
487
39
  const auto id = PgObjectId::FromPB(req.tablegroup_id());
488
39
  const auto status = client().DeleteTablegroup(
489
39
      GetPgsqlNamespaceId(id.database_oid),
490
39
      GetPgsqlTablegroupId(id.database_oid, id.object_oid));
491
39
  if (status.IsNotFound()) {
492
0
    return Status::OK();
493
0
  }
494
39
  return status;
495
39
}
496
497
Status PgClientSession::RollbackSubTransaction(
498
    const PgRollbackSubTransactionRequestPB& req, PgRollbackSubTransactionResponsePB* resp,
499
13.5k
    rpc::RpcContext* context) {
500
13.5k
  
VLOG_WITH_PREFIX_AND_FUNC0
(2) << req.ShortDebugString()0
;
501
13.5k
  SCHECK(Transaction(PgClientSessionKind::kPlain), IllegalState,
502
13.5k
         Format("Rollback sub transaction $0, when not transaction is running",
503
13.5k
                req.sub_transaction_id()));
504
13.5k
  return Transaction(PgClientSessionKind::kPlain)->RollbackSubTransaction(req.sub_transaction_id());
505
13.5k
}
506
507
Status PgClientSession::SetActiveSubTransaction(
508
    const PgSetActiveSubTransactionRequestPB& req, PgSetActiveSubTransactionResponsePB* resp,
509
61.7k
    rpc::RpcContext* context) {
510
61.7k
  
VLOG_WITH_PREFIX_AND_FUNC23
(2) << req.ShortDebugString()23
;
511
512
61.7k
  if (req.has_options()) {
513
7.28k
    RETURN_NOT_OK(BeginTransactionIfNecessary(req.options(), context->GetClientDeadline()));
514
7.28k
    txn_serial_no_ = req.options().txn_serial_no();
515
7.28k
  }
516
517
61.7k
  SCHECK(Transaction(PgClientSessionKind::kPlain), IllegalState,
518
61.7k
         Format("Set active sub transaction $0, when not transaction is running",
519
61.7k
                req.sub_transaction_id()));
520
521
61.7k
  Transaction(PgClientSessionKind::kPlain)->SetActiveSubTransaction(req.sub_transaction_id());
522
61.7k
  return Status::OK();
523
61.7k
}
524
525
Status PgClientSession::FinishTransaction(
526
    const PgFinishTransactionRequestPB& req, PgFinishTransactionResponsePB* resp,
527
218k
    rpc::RpcContext* context) {
528
218k
  saved_priority_ = boost::none;
529
218k
  auto kind = req.ddl_mode() ? 
PgClientSessionKind::kDdl20.3k
:
PgClientSessionKind::kPlain198k
;
530
218k
  auto& txn = Transaction(kind);
531
218k
  if (!txn) {
532
1.55k
    
VLOG_WITH_PREFIX_AND_FUNC0
(2) << "ddl: " << req.ddl_mode() << ", no running transaction"0
;
533
1.55k
    return Status::OK();
534
1.55k
  }
535
216k
  const auto txn_value = std::move(txn);
536
216k
  Session(kind)->SetTransaction(nullptr);
537
538
216k
  if (req.commit()) {
539
191k
    const auto commit_status = txn_value->CommitFuture().get();
540
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(2)
541
18.4E
        << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id()
542
18.4E
        << ", commit: " << commit_status;
543
191k
    return commit_status;
544
191k
  }
545
546
25.1k
  
VLOG_WITH_PREFIX_AND_FUNC12
(2)
547
12
      << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", abort";
548
25.1k
  txn_value->Abort();
549
25.1k
  return Status::OK();
550
216k
}
551
552
Status PgClientSession::Perform(
553
2.17M
    const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context) {
554
2.17M
  auto session = 
VERIFY_RESULT2.17M
(SetupSession(req, context->GetClientDeadline()));2.17M
555
556
2.17M
  auto ops = VERIFY_RESULT(PrepareOperations(req, session, &table_cache_));
557
0
  auto data = std::make_shared<PerformData>(PerformData {
558
2.17M
    .session_id = id_,
559
2.17M
    .req = &req,
560
2.17M
    .resp = resp,
561
2.17M
    .context = std::move(*context),
562
2.17M
    .ops = std::move(ops),
563
2.17M
    .table_cache = &table_cache_,
564
2.17M
  });
565
2.17M
  session->FlushAsync([data](client::FlushStatus* flush_status) {
566
2.17M
    data->FlushDone(flush_status);
567
2.17M
  });
568
2.17M
  return Status::OK();
569
2.17M
}
570
571
2.17M
void PgClientSession::ProcessReadTimeManipulation(ReadTimeManipulation manipulation) {
572
2.17M
  switch (manipulation) {
573
15.6k
    case ReadTimeManipulation::RESET: {
574
        // If a txn_ has been created, session_->read_point() returns the read point stored in txn_.
575
15.6k
        ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point();
576
15.6k
        rp->SetCurrentReadTime();
577
578
15.6k
        VLOG
(1) << "Setting current ht as read point " << rp->GetReadTime()0
;
579
15.6k
      }
580
15.6k
      return;
581
163
    case ReadTimeManipulation::RESTART: {
582
163
        ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point();
583
163
        rp->Restart();
584
585
163
        VLOG
(1) << "Restarted read point " << rp->GetReadTime()0
;
586
163
      }
587
163
      return;
588
2.15M
    case ReadTimeManipulation::NONE:
589
2.15M
      return;
590
0
    case ReadTimeManipulation::ReadTimeManipulation_INT_MIN_SENTINEL_DO_NOT_USE_:
591
0
    case ReadTimeManipulation::ReadTimeManipulation_INT_MAX_SENTINEL_DO_NOT_USE_:
592
0
      break;
593
2.17M
  }
594
0
  FATAL_INVALID_ENUM_VALUE(ReadTimeManipulation, manipulation);
595
0
}
596
597
Result<client::YBSession*> PgClientSession::SetupSession(
598
2.17M
    const PgPerformRequestPB& req, CoarseTimePoint deadline) {
599
600
2.17M
  const auto& options = req.options();
601
2.17M
  PgClientSessionKind kind;
602
2.17M
  if (options.use_catalog_session()) {
603
669k
    kind = PgClientSessionKind::kCatalog;
604
669k
    EnsureSession(kind);
605
1.50M
  } else if (options.ddl_mode()) {
606
566k
    kind = PgClientSessionKind::kDdl;
607
566k
    EnsureSession(kind);
608
566k
    RETURN_NOT_OK(GetDdlTransactionMetadata(true, deadline));
609
934k
  } else {
610
934k
    kind = PgClientSessionKind::kPlain;
611
934k
    RETURN_NOT_OK(BeginTransactionIfNecessary(options, deadline));
612
934k
  }
613
614
2.17M
  client::YBSession* session = Session(kind).get();
615
2.17M
  client::YBTransaction* transaction = Transaction(kind).get();
616
617
2.17M
  
VLOG_WITH_PREFIX483
(4) << __func__ << ": " << options.ShortDebugString()483
;
618
619
2.17M
  if (options.restart_transaction()) {
620
510
    if(options.ddl_mode()) {
621
0
      return STATUS(NotSupported, "Not supported to restart DDL transaction");
622
0
    }
623
510
    Transaction(kind) = VERIFY_RESULT(RestartTransaction(session, transaction));
624
0
    transaction = Transaction(kind).get();
625
2.16M
  } else {
626
2.16M
    ProcessReadTimeManipulation(options.read_time_manipulation());
627
2.16M
    if (options.has_read_time() &&
628
2.16M
        
(702k
options.read_time().has_read_ht()702k
||
options.use_catalog_session()77.1k
)) {
629
701k
      const auto read_time = options.read_time().has_read_ht()
630
701k
          ? 
ReadHybridTime::FromPB(options.read_time())624k
:
ReadHybridTime()77.1k
;
631
701k
      session->SetReadPoint(read_time);
632
701k
      if (read_time) {
633
18.4E
        VLOG_WITH_PREFIX(3) << "Read time: " << read_time;
634
624k
      } else {
635
77.3k
        
VLOG_WITH_PREFIX155
(3) << "Reset read time: " << session->read_point()->GetReadTime()155
;
636
77.3k
      }
637
1.46M
    } else if (!transaction &&
638
1.46M
               
(466k
options.ddl_mode()466k
||
txn_serial_no_ != options.txn_serial_no()466k
)) {
639
233k
      session->SetReadPoint(client::Restart::kFalse);
640
18.4E
      VLOG_WITH_PREFIX(3) << "New read time: " << session->read_point()->GetReadTime();
641
1.23M
    } else {
642
1.23M
      
VLOG_WITH_PREFIX48
(3) << "Keep read time: " << session->read_point()->GetReadTime()48
;
643
1.23M
    }
644
2.16M
  }
645
646
2.17M
  if (options.defer_read_point()) {
647
    // This call is idempotent, meaning it has no effect after the first call.
648
80
    session->DeferReadPoint();
649
80
  }
650
651
2.17M
  if (!options.ddl_mode() && 
!options.use_catalog_session()1.60M
) {
652
934k
    txn_serial_no_ = options.txn_serial_no();
653
654
934k
    const auto in_txn_limit = HybridTime::FromPB(options.in_txn_limit_ht());
655
934k
    if (in_txn_limit) {
656
18.4E
      VLOG_WITH_PREFIX(3) << "In txn limit: " << in_txn_limit;
657
864k
      session->SetInTxnLimit(in_txn_limit);
658
864k
    }
659
934k
  }
660
661
2.17M
  session->SetDeadline(deadline);
662
663
2.17M
  return session;
664
2.17M
}
665
666
0
std::string PgClientSession::LogPrefix() {
667
0
  return SessionLogPrefix(id_);
668
0
}
669
670
Status PgClientSession::BeginTransactionIfNecessary(
671
941k
    const PgPerformOptionsPB& options, CoarseTimePoint deadline) {
672
941k
  const auto isolation = static_cast<IsolationLevel>(options.isolation());
673
674
941k
  auto priority = options.priority();
675
941k
  auto& session = EnsureSession(PgClientSessionKind::kPlain);
676
941k
  auto& txn = Transaction(PgClientSessionKind::kPlain);
677
941k
  if (txn && 
txn_serial_no_ != options.txn_serial_no()260k
) {
678
18.4E
    VLOG_WITH_PREFIX(2)
679
18.4E
        << "Abort previous transaction, use existing priority: " << options.use_existing_priority()
680
18.4E
        << ", new isolation: " << IsolationLevel_Name(isolation);
681
682
69.0k
    if (
options.use_existing_priority()68.9k
) {
683
69.0k
      saved_priority_ = txn->GetPriority();
684
69.0k
    }
685
68.9k
    txn->Abort();
686
68.9k
    session->SetTransaction(nullptr);
687
68.9k
    txn = nullptr;
688
68.9k
  }
689
690
941k
  if (isolation == IsolationLevel::NON_TRANSACTIONAL) {
691
483k
    return Status::OK();
692
483k
  }
693
694
458k
  if (txn) {
695
191k
    return txn->isolation() != isolation
696
191k
        ? 
STATUS_FORMAT0
(
697
191k
            IllegalState,
698
191k
            "Attempt to change isolation level of running transaction from $0 to $1",
699
191k
            txn->isolation(), isolation)
700
191k
        : Status::OK();
701
191k
  }
702
703
267k
  txn = transaction_pool_provider_()->Take(
704
267k
      client::ForceGlobalTransaction(options.force_global_transaction()), deadline);
705
267k
  if ((isolation == IsolationLevel::SNAPSHOT_ISOLATION ||
706
267k
           
isolation == IsolationLevel::READ_COMMITTED172k
) &&
707
267k
      
txn_serial_no_ == options.txn_serial_no()100k
) {
708
13.4k
    txn->InitWithReadPoint(isolation, std::move(*session->read_point()));
709
18.4E
    VLOG_WITH_PREFIX(2) << "Start transaction " << IsolationLevel_Name(isolation)
710
18.4E
                        << ", id: " << txn->id()
711
18.4E
                        << ", kept read time: " << txn->read_point().GetReadTime();
712
253k
  } else {
713
253k
    
VLOG_WITH_PREFIX2
(2) << "Start transaction " << IsolationLevel_Name(isolation)
714
2
                        << ", id: " << txn->id()
715
2
                        << ", new read time";
716
253k
    RETURN_NOT_OK(txn->Init(isolation));
717
253k
  }
718
267k
  if (saved_priority_) {
719
69.0k
    priority = *saved_priority_;
720
69.0k
    saved_priority_ = boost::none;
721
69.0k
  }
722
267k
  txn->SetPriority(priority);
723
267k
  session->SetTransaction(txn);
724
725
267k
  return Status::OK();
726
267k
}
727
728
Result<const TransactionMetadata*> PgClientSession::GetDdlTransactionMetadata(
729
572k
    bool use_transaction, CoarseTimePoint deadline) {
730
572k
  if (!use_transaction) {
731
283
    return nullptr;
732
283
  }
733
734
571k
  auto& txn = Transaction(PgClientSessionKind::kDdl);
735
571k
  if (!txn) {
736
18.8k
    const auto isolation = FLAGS_ysql_serializable_isolation_for_ddl_txn
737
18.8k
        ? 
IsolationLevel::SERIALIZABLE_ISOLATION0
: IsolationLevel::SNAPSHOT_ISOLATION;
738
18.8k
    txn = VERIFY_RESULT(transaction_pool_provider_()->TakeAndInit(isolation, deadline));
739
18.8k
    ddl_txn_metadata_ = VERIFY_RESULT(Copy(txn->GetMetadata(deadline).get()));
740
0
    EnsureSession(PgClientSessionKind::kDdl)->SetTransaction(txn);
741
18.8k
  }
742
743
571k
  return &ddl_txn_metadata_;
744
571k
}
745
746
11.1k
client::YBClient& PgClientSession::client() {
747
11.1k
  return client_;
748
11.1k
}
749
750
Result<client::YBTransactionPtr> PgClientSession::RestartTransaction(
751
510
    client::YBSession* session, client::YBTransaction* transaction) {
752
510
  if (!transaction) {
753
508
    SCHECK(session->IsRestartRequired(), IllegalState,
754
508
           "Attempted to restart when session does not require restart");
755
756
508
    const auto old_read_time = session->read_point()->GetReadTime();
757
508
    session->SetReadPoint(client::Restart::kTrue);
758
508
    const auto new_read_time = session->read_point()->GetReadTime();
759
508
    
VLOG_WITH_PREFIX0
(3) << "Restarted read: " << old_read_time << " => " << new_read_time0
;
760
508
    
LOG_IF_WITH_PREFIX0
(DFATAL, old_read_time == new_read_time)
761
0
        << "Read time did not change during restart: " << old_read_time << " => " << new_read_time;
762
508
    return nullptr;
763
508
  }
764
765
2
  if (!transaction->IsRestartRequired()) {
766
0
    return STATUS(IllegalState, "Attempted to restart when transaction does not require restart");
767
0
  }
768
2
  const auto result = VERIFY_RESULT(transaction->CreateRestartedTransaction());
769
0
  session->SetTransaction(result);
770
2
  
VLOG_WITH_PREFIX0
(3) << "Restarted transaction"0
;
771
2
  return result;
772
2
}
773
774
Status PgClientSession::InsertSequenceTuple(
775
    const PgInsertSequenceTupleRequestPB& req, PgInsertSequenceTupleResponsePB* resp,
776
295
    rpc::RpcContext* context) {
777
295
  PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
778
295
  auto result = table_cache_.Get(table_oid.GetYbTableId());
779
295
  if (!result.ok()) {
780
92
    RETURN_NOT_OK(CreateSequencesDataTable(&client_, context->GetClientDeadline()));
781
    // Try one more time.
782
92
    result = table_cache_.Get(table_oid.GetYbTableId());
783
92
  }
784
295
  auto table = VERIFY_RESULT(std::move(result));
785
786
0
  auto psql_write(client::YBPgsqlWriteOp::NewInsert(table));
787
788
295
  auto write_request = psql_write->mutable_request();
789
295
  write_request->set_ysql_catalog_version(req.ysql_catalog_version());
790
791
295
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid());
792
295
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid());
793
794
295
  PgsqlColumnValuePB* column_value = write_request->add_column_values();
795
295
  column_value->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx));
796
295
  column_value->mutable_expr()->mutable_value()->set_int64_value(req.last_val());
797
798
295
  column_value = write_request->add_column_values();
799
295
  column_value->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx));
800
295
  column_value->mutable_expr()->mutable_value()->set_bool_value(req.is_called());
801
802
295
  auto& session = EnsureSession(PgClientSessionKind::kSequence);
803
295
  session->SetDeadline(context->GetClientDeadline());
804
295
  return session->ApplyAndFlush(std::move(psql_write));
805
295
}
806
807
Status PgClientSession::UpdateSequenceTuple(
808
    const PgUpdateSequenceTupleRequestPB& req, PgUpdateSequenceTupleResponsePB* resp,
809
2.97k
    rpc::RpcContext* context) {
810
2.97k
  PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
811
2.97k
  auto table = VERIFY_RESULT(table_cache_.Get(table_oid.GetYbTableId()));
812
813
0
  std::shared_ptr<client::YBPgsqlWriteOp> psql_write(client::YBPgsqlWriteOp::NewUpdate(table));
814
815
2.97k
  auto write_request = psql_write->mutable_request();
816
2.97k
  write_request->set_ysql_catalog_version(req.ysql_catalog_version());
817
818
2.97k
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid());
819
2.97k
  write_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid());
820
821
2.97k
  PgsqlColumnValuePB* column_value = write_request->add_column_new_values();
822
2.97k
  column_value->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx));
823
2.97k
  column_value->mutable_expr()->mutable_value()->set_int64_value(req.last_val());
824
825
2.97k
  column_value = write_request->add_column_new_values();
826
2.97k
  column_value->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx));
827
2.97k
  column_value->mutable_expr()->mutable_value()->set_bool_value(req.is_called());
828
829
2.97k
  auto where_pb = write_request->mutable_where_expr()->mutable_condition();
830
831
2.97k
  if (req.has_expected()) {
832
    // WHERE clause => WHERE last_val == expected_last_val AND is_called == expected_is_called.
833
2.95k
    where_pb->set_op(QL_OP_AND);
834
835
2.95k
    auto cond = where_pb->add_operands()->mutable_condition();
836
2.95k
    cond->set_op(QL_OP_EQUAL);
837
2.95k
    cond->add_operands()->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx));
838
2.95k
    cond->add_operands()->mutable_value()->set_int64_value(req.expected_last_val());
839
840
2.95k
    cond = where_pb->add_operands()->mutable_condition();
841
2.95k
    cond->set_op(QL_OP_EQUAL);
842
2.95k
    cond->add_operands()->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx));
843
2.95k
    cond->add_operands()->mutable_value()->set_bool_value(req.expected_is_called());
844
2.95k
  } else {
845
29
    where_pb->set_op(QL_OP_EXISTS);
846
29
  }
847
848
  // For compatibility set deprecated column_refs
849
2.97k
  write_request->mutable_column_refs()->add_ids(
850
2.97k
      table->schema().ColumnId(kPgSequenceLastValueColIdx));
851
2.97k
  write_request->mutable_column_refs()->add_ids(
852
2.97k
      table->schema().ColumnId(kPgSequenceIsCalledColIdx));
853
  // Same values, to be consumed by current TServers
854
2.97k
  write_request->add_col_refs()->set_column_id(
855
2.97k
      table->schema().ColumnId(kPgSequenceLastValueColIdx));
856
2.97k
  write_request->add_col_refs()->set_column_id(
857
2.97k
      table->schema().ColumnId(kPgSequenceIsCalledColIdx));
858
859
2.97k
  auto& session = EnsureSession(PgClientSessionKind::kSequence);
860
2.97k
  session->SetDeadline(context->GetClientDeadline());
861
2.97k
  RETURN_NOT_OK(session->ApplyAndFlush(psql_write));
862
2.97k
  resp->set_skipped(psql_write->response().skipped());
863
2.97k
  return Status::OK();
864
2.97k
}
865
866
Status PgClientSession::ReadSequenceTuple(
867
    const PgReadSequenceTupleRequestPB& req, PgReadSequenceTupleResponsePB* resp,
868
3.23k
    rpc::RpcContext* context) {
869
3.23k
  using pggate::PgDocData;
870
3.23k
  using pggate::PgWireDataHeader;
871
872
3.23k
  PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
873
3.23k
  auto table = VERIFY_RESULT(table_cache_.Get(table_oid.GetYbTableId()));
874
875
0
  std::shared_ptr<client::YBPgsqlReadOp> psql_read(client::YBPgsqlReadOp::NewSelect(table));
876
877
3.23k
  auto read_request = psql_read->mutable_request();
878
3.23k
  read_request->set_ysql_catalog_version(req.ysql_catalog_version());
879
880
3.23k
  read_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid());
881
3.23k
  read_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid());
882
883
3.23k
  read_request->add_targets()->set_column_id(
884
3.23k
      table->schema().ColumnId(kPgSequenceLastValueColIdx));
885
3.23k
  read_request->add_targets()->set_column_id(
886
3.23k
      table->schema().ColumnId(kPgSequenceIsCalledColIdx));
887
888
  // For compatibility set deprecated column_refs
889
3.23k
  read_request->mutable_column_refs()->add_ids(
890
3.23k
      table->schema().ColumnId(kPgSequenceLastValueColIdx));
891
3.23k
  read_request->mutable_column_refs()->add_ids(
892
3.23k
      table->schema().ColumnId(kPgSequenceIsCalledColIdx));
893
  // Same values, to be consumed by current TServers
894
3.23k
  read_request->add_col_refs()->set_column_id(
895
3.23k
      table->schema().ColumnId(kPgSequenceLastValueColIdx));
896
3.23k
  read_request->add_col_refs()->set_column_id(
897
3.23k
      table->schema().ColumnId(kPgSequenceIsCalledColIdx));
898
899
3.23k
  auto& session = EnsureSession(PgClientSessionKind::kSequence);
900
3.23k
  session->SetDeadline(context->GetClientDeadline());
901
3.23k
  RETURN_NOT_OK(session->ReadSync(psql_read));
902
903
3.23k
  Slice cursor;
904
3.23k
  int64_t row_count = 0;
905
3.23k
  PgDocData::LoadCache(psql_read->rows_data(), &row_count, &cursor);
906
3.23k
  if (row_count == 0) {
907
0
    return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", req.seq_oid());
908
0
  }
909
910
3.23k
  PgWireDataHeader header = PgDocData::ReadDataHeader(&cursor);
911
3.23k
  if (header.is_null()) {
912
0
    return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", req.seq_oid());
913
0
  }
914
3.23k
  int64_t last_val = 0;
915
3.23k
  size_t read_size = PgDocData::ReadNumber(&cursor, &last_val);
916
3.23k
  cursor.remove_prefix(read_size);
917
3.23k
  resp->set_last_val(last_val);
918
919
3.23k
  header = PgDocData::ReadDataHeader(&cursor);
920
3.23k
  if (header.is_null()) {
921
0
    return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", req.seq_oid());
922
0
  }
923
3.23k
  bool is_called = false;
924
3.23k
  read_size = PgDocData::ReadNumber(&cursor, &is_called);
925
3.23k
  resp->set_is_called(is_called);
926
3.23k
  return Status::OK();
927
3.23k
}
928
929
Status PgClientSession::DeleteSequenceTuple(
930
    const PgDeleteSequenceTupleRequestPB& req, PgDeleteSequenceTupleResponsePB* resp,
931
282
    rpc::RpcContext* context) {
932
282
  PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
933
282
  auto table = VERIFY_RESULT(table_cache_.Get(table_oid.GetYbTableId()));
934
935
0
  auto psql_delete(client::YBPgsqlWriteOp::NewDelete(table));
936
282
  auto delete_request = psql_delete->mutable_request();
937
938
282
  delete_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid());
939
282
  delete_request->add_partition_column_values()->mutable_value()->set_int64_value(req.seq_oid());
940
941
282
  auto& session = EnsureSession(PgClientSessionKind::kSequence);
942
282
  session->SetDeadline(context->GetClientDeadline());
943
282
  return session->ApplyAndFlush(std::move(psql_delete));
944
282
}
945
946
Status PgClientSession::DeleteDBSequences(
947
    const PgDeleteDBSequencesRequestPB& req, PgDeleteDBSequencesResponsePB* resp,
948
71
    rpc::RpcContext* context) {
949
71
  PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
950
71
  auto table_res = table_cache_.Get(table_oid.GetYbTableId());
951
71
  if (!table_res.ok()) {
952
    // Sequence table is not yet created.
953
58
    return Status::OK();
954
58
  }
955
956
13
  auto table = std::move(*table_res);
957
13
  if (table == nullptr) {
958
0
    return Status::OK();
959
0
  }
960
961
13
  auto psql_delete(client::YBPgsqlWriteOp::NewDelete(table));
962
13
  auto delete_request = psql_delete->mutable_request();
963
964
13
  delete_request->add_partition_column_values()->mutable_value()->set_int64_value(req.db_oid());
965
966
13
  auto& session = EnsureSession(PgClientSessionKind::kSequence);
967
13
  session->SetDeadline(context->GetClientDeadline());
968
13
  return session->ApplyAndFlush(std::move(psql_delete));
969
13
}
970
971
2.20M
client::YBSessionPtr& PgClientSession::EnsureSession(PgClientSessionKind kind) {
972
2.20M
  auto& session = Session(kind);
973
2.20M
  if (!session) {
974
12.6k
    session = CreateSession(&client_, clock_);
975
12.6k
  }
976
2.20M
  return session;
977
2.20M
}
978
979
4.60M
client::YBSessionPtr& PgClientSession::Session(PgClientSessionKind kind) {
980
4.60M
  return sessions_[to_underlying(kind)].session;
981
4.60M
}
982
983
4.05M
client::YBTransactionPtr& PgClientSession::Transaction(PgClientSessionKind kind) {
984
4.05M
  return sessions_[to_underlying(kind)].transaction;
985
4.05M
}
986
987
}  // namespace tserver
988
}  // namespace yb