YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/pg_client_session.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/pg_client_session.h"
15
16
#include "yb/client/batcher.h"
17
#include "yb/client/client.h"
18
#include "yb/client/error.h"
19
#include "yb/client/namespace_alterer.h"
20
#include "yb/client/session.h"
21
#include "yb/client/table.h"
22
#include "yb/client/table_alterer.h"
23
#include "yb/client/transaction.h"
24
#include "yb/client/transaction_pool.h"
25
#include "yb/client/yb_op.h"
26
27
#include "yb/common/ql_type.h"
28
#include "yb/common/pgsql_error.h"
29
#include "yb/common/transaction_error.h"
30
#include "yb/common/schema.h"
31
#include "yb/common/wire_protocol.h"
32
33
#include "yb/gutil/casts.h"
34
35
#include "yb/rpc/rpc_context.h"
36
37
#include "yb/tserver/pg_client.pb.h"
38
#include "yb/tserver/pg_create_table.h"
39
#include "yb/tserver/pg_table_cache.h"
40
41
#include "yb/util/logging.h"
42
#include "yb/util/result.h"
43
#include "yb/util/scope_exit.h"
44
#include "yb/util/status_format.h"
45
#include "yb/util/string_util.h"
46
#include "yb/util/yb_pg_errcodes.h"
47
48
DECLARE_bool(ysql_serializable_isolation_for_ddl_txn);
49
50
namespace yb {
51
namespace tserver {
52
53
namespace {
54
55
0
std::string SessionLogPrefix(uint64_t id) {
56
0
  return Format("S $0: ", id);
57
0
}
58
59
18.5k
string GetStatusStringSet(const client::CollectedErrors& errors) {
60
18.5k
  std::set<string> status_strings;
61
394k
  for (const auto& error : errors) {
62
394k
    status_strings.insert(error->status().ToString());
63
394k
  }
64
18.5k
  return RangeToString(status_strings.begin(), status_strings.end());
65
18.5k
}
66
67
45.7k
bool IsHomogeneousErrors(const client::CollectedErrors& errors) {
68
45.7k
  if (errors.size() < 2) {
69
27.2k
    return true;
70
27.2k
  }
71
18.5k
  auto i = errors.begin();
72
18.5k
  const auto& status = (**i).status();
73
18.5k
  const auto codes = status.ErrorCodesSlice();
74
232k
  for (++i; i != errors.end(); ++i) {
75
225k
    const auto& s = (**i).status();
76
225k
    if (s.code() != status.code() || codes != s.ErrorCodesSlice()) {
77
11.6k
      return false;
78
11.6k
    }
79
225k
  }
80
6.81k
  return true;
81
18.5k
}
82
83
295k
boost::optional<YBPgErrorCode> PsqlErrorCode(const Status& status) {
84
295k
  const uint8_t* err_data = status.ErrorData(PgsqlErrorTag::kCategory);
85
295k
  if (err_data) {
86
151k
    return PgsqlErrorTag::Decode(err_data);
87
151k
  }
88
144k
  return boost::none;
89
144k
}
90
91
// Get a common Postgres error code from the status and all errors, and append it to a previous
92
// Status.
93
// If any of those have different conflicting error codes, previous result is returned as-is.
94
CHECKED_STATUS AppendPsqlErrorCode(const Status& status,
95
11.6k
                                   const client::CollectedErrors& errors) {
96
11.6k
  boost::optional<YBPgErrorCode> common_psql_error =  boost::make_optional(false, YBPgErrorCode());
97
295k
  for(const auto& error : errors) {
98
295k
    const auto psql_error = PsqlErrorCode(error->status());
99
295k
    if (!common_psql_error) {
100
79.3k
      common_psql_error = psql_error;
101
216k
    } else if (psql_error && common_psql_error != psql_error) {
102
0
      common_psql_error = boost::none;
103
0
      break;
104
0
    }
105
295k
  }
106
18.4E
  return common_psql_error ? status.CloneAndAddErrorCode(PgsqlError(*common_psql_error)) : status;
107
11.6k
}
108
109
// Get a common transaction error code for all the errors and append it to the previous Status.
110
11.6k
CHECKED_STATUS AppendTxnErrorCode(const Status& status, const client::CollectedErrors& errors) {
111
11.6k
  TransactionErrorCode common_txn_error = TransactionErrorCode::kNone;
112
295k
  for (const auto& error : errors) {
113
295k
    const TransactionErrorCode txn_error = TransactionError(error->status()).value();
114
295k
    if (txn_error == TransactionErrorCode::kNone ||
115
295k
        txn_error == common_txn_error) {
116
192k
      continue;
117
192k
    }
118
103k
    if (common_txn_error == TransactionErrorCode::kNone) {
119
11.6k
      common_txn_error = txn_error;
120
11.6k
      continue;
121
11.6k
    }
122
    // If we receive a list of errors, with one as kConflict and others as kAborted, we retain the
123
    // error as kConflict, since in case of a batched request the first operation would receive the
124
    // kConflict and all the others would receive the kAborted error.
125
91.4k
    if ((txn_error == TransactionErrorCode::kConflict &&
126
5.88k
         common_txn_error == TransactionErrorCode::kAborted) ||
127
85.5k
        (txn_error == TransactionErrorCode::kAborted &&
128
91.4k
         common_txn_error == TransactionErrorCode::kConflict)) {
129
91.4k
      common_txn_error = TransactionErrorCode::kConflict;
130
91.4k
      continue;
131
91.4k
    }
132
133
    // In all the other cases, reset the common_txn_error to kNone.
134
18.4E
    common_txn_error = TransactionErrorCode::kNone;
135
18.4E
    break;
136
18.4E
  }
137
138
11.6k
  return (common_txn_error != TransactionErrorCode::kNone) ?
139
11.6k
    status.CloneAndAddErrorCode(TransactionError(common_txn_error)) : status;
140
11.6k
}
141
142
// Given a set of errors from operations, this function attempts to combine them into one status
143
// that is later passed to PostgreSQL and further converted into a more specific error code.
144
775k
CHECKED_STATUS CombineErrorsToStatus(const client::CollectedErrors& errors, const Status& status) {
145
775k
  if (errors.empty())
146
729k
    return status;
147
148
45.7k
  if (status.IsIOError() &&
149
      // TODO: move away from string comparison here and use a more specific status than IOError.
150
      // See https://github.com/YugaByte/yugabyte-db/issues/702
151
45.7k
      status.message() == client::internal::Batcher::kErrorReachingOutToTServersMsg &&
152
45.7k
      IsHomogeneousErrors(errors)) {
153
34.0k
    const auto& result = errors.front()->status();
154
34.0k
    if (errors.size() == 1) {
155
27.2k
      return result;
156
27.2k
    }
157
6.81k
    return Status(result.code(),
158
6.81k
                  __FILE__,
159
6.81k
                  __LINE__,
160
6.81k
                  GetStatusStringSet(errors),
161
6.81k
                  result.ErrorCodesSlice(),
162
6.81k
                  DupFileName::kFalse);
163
6.81k
  }
164
165
11.7k
  Status result =
166
11.7k
    status.ok()
167
0
    ? STATUS(InternalError, GetStatusStringSet(errors))
168
11.7k
    : status.CloneAndAppend(". Errors from tablet servers: " + GetStatusStringSet(errors));
169
170
11.7k
  return AppendTxnErrorCode(AppendPsqlErrorCode(result, errors), errors);
171
11.7k
}
172
173
3.46M
Status HandleResponse(const client::YBPgsqlOp& op, PgPerformResponsePB* resp) {
174
3.46M
  const auto& response = op.response();
175
3.46M
  if (response.status() == PgsqlResponsePB::PGSQL_STATUS_OK) {
176
3.46M
    if (op.read_only() && op.table()->schema().table_properties().is_ysql_catalog_table()) {
177
380k
      const auto& pgsql_op = down_cast<const client::YBPgsqlReadOp&>(op);
178
380k
      if (pgsql_op.used_read_time()) {
179
        // Non empty used_read_time field in catalog read operation means this is the very first
180
        // catalog read operation after catalog read time resetting. read_time for the operation
181
        // has been chosen by master. All further reads from catalog must use same read point.
182
18.5k
        auto catalog_read_time = pgsql_op.used_read_time();
183
184
        // We set global limit to local limit to avoid read restart errors because they are
185
        // disruptive to system catalog reads and it is not always possible to handle them there.
186
        // This might lead to reading slightly outdated state of the system catalog if a recently
187
        // committed DDL transaction used a transaction status tablet whose leader's clock is skewed
188
        // and is in the future compared to the master leader's clock.
189
        // TODO(dmitry) This situation will be handled in context of #7964.
190
18.5k
        catalog_read_time.global_limit = catalog_read_time.local_limit;
191
18.5k
        catalog_read_time.ToPB(resp->mutable_catalog_read_time());
192
18.5k
      }
193
380k
    }
194
3.46M
    return Status::OK();
195
3.46M
  }
196
197
707
  auto status = STATUS(
198
707
      QLError, response.error_message(), Slice(), PgsqlRequestStatus(response.status()));
199
200
707
  if (response.has_pg_error_code()) {
201
1
    status = status.CloneAndAddErrorCode(
202
1
        PgsqlError(static_cast<YBPgErrorCode>(response.pg_error_code())));
203
1
  }
204
205
707
  if (response.has_txn_error_code()) {
206
1
    status = status.CloneAndAddErrorCode(
207
1
        TransactionError(static_cast<TransactionErrorCode>(response.txn_error_code())));
208
1
  }
209
210
707
  return status;
211
707
}
212
213
4.02M
CHECKED_STATUS GetTable(const TableId& table_id, PgTableCache* cache, client::YBTablePtr* table) {
214
4.02M
  if (*table && (**table).id() == table_id) {
215
2.83M
    return Status::OK();
216
2.83M
  }
217
1.19M
  *table = VERIFY_RESULT(cache->Get(table_id));
218
1.19M
  return Status::OK();
219
1.19M
}
220
221
Result<PgClientSessionOperations> PrepareOperations(
222
774k
    const PgPerformRequestPB& req, client::YBSession* session, PgTableCache* table_cache) {
223
774k
  auto write_time = HybridTime::FromPB(req.write_time());
224
774k
  std::vector<std::shared_ptr<client::YBPgsqlOp>> ops;
225
774k
  ops.reserve(req.ops().size());
226
774k
  client::YBTablePtr table;
227
774k
  bool finished = false;
228
775k
  auto se = ScopeExit([&finished, session] {
229
775k
    if (!finished) {
230
6
      session->Abort();
231
6
    }
232
775k
  });
233
4.02M
  for (const auto& op : req.ops()) {
234
4.02M
    if (op.has_read()) {
235
1.89M
      const auto& read = op.read();
236
1.89M
      RETURN_NOT_OK(GetTable(read.table_id(), table_cache, &table));
237
1.89M
      const auto read_op = std::make_shared<client::YBPgsqlReadOp>(
238
1.89M
          table, const_cast<PgsqlReadRequestPB*>(&read));
239
1.89M
      if (op.read_from_followers()) {
240
0
        read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
241
0
      }
242
1.89M
      ops.push_back(read_op);
243
1.89M
      session->Apply(std::move(read_op));
244
2.12M
    } else {
245
2.12M
      const auto& write = op.write();
246
2.12M
      RETURN_NOT_OK(GetTable(write.table_id(), table_cache, &table));
247
2.12M
      const auto write_op = std::make_shared<client::YBPgsqlWriteOp>(
248
2.12M
          table, const_cast<PgsqlWriteRequestPB*>(&write));
249
2.12M
      if (write_time) {
250
226
        write_op->SetWriteTime(write_time);
251
226
        write_time = HybridTime::kInvalid;
252
226
      }
253
2.12M
      ops.push_back(write_op);
254
2.12M
      session->Apply(std::move(write_op));
255
2.12M
    }
256
4.02M
  }
257
774k
  finished = true;
258
774k
  return ops;
259
774k
}
260
261
struct PerformData {
262
  uint64_t session_id;
263
  const PgPerformRequestPB* req;
264
  PgPerformResponsePB* resp;
265
  rpc::RpcContext context;
266
  PgClientSessionOperations ops;
267
  PgTableCache* table_cache;
268
269
775k
  void FlushDone(client::FlushStatus* flush_status) {
270
775k
    auto status = CombineErrorsToStatus(flush_status->errors, flush_status->status);
271
775k
    if (status.ok()) {
272
729k
      status = ProcessResponse();
273
729k
    }
274
775k
    if (!status.ok()) {
275
46.7k
      StatusToPB(status, resp->mutable_status());
276
46.7k
    }
277
775k
    context.RespondSuccess();
278
775k
  }
279
280
729k
  CHECKED_STATUS ProcessResponse() {
281
729k
    int idx = 0;
282
3.46M
    for (const auto& op : ops) {
283
3.46M
      const auto status = HandleResponse(*op, resp);
284
3.46M
      if (!status.ok()) {
285
1.03k
        if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH) {
286
11
          table_cache->Invalidate(op->table()->id());
287
11
        }
288
18.4E
        VLOG(2) << SessionLogPrefix(session_id) << "Failed op " << idx << ": " << status;
289
1.03k
        return status.CloneAndAddErrorCode(OpIndex(idx));
290
1.03k
      }
291
3.45M
      const auto& req_op = req->ops()[idx];
292
3.45M
      if (req_op.has_read() && req_op.read().is_for_backfill() &&
293
289
          op->response().is_backfill_batch_done()) {
294
        // After backfill table schema version is updated, so we reset cache in advance.
295
257
        table_cache->Invalidate(op->table()->id());
296
257
      }
297
3.45M
      ++idx;
298
3.45M
    }
299
300
728k
    auto& responses = *resp->mutable_responses();
301
728k
    responses.Reserve(narrow_cast<int>(ops.size()));
302
3.45M
    for (const auto& op : ops) {
303
3.45M
      auto& op_resp = *responses.Add();
304
3.45M
      op_resp.Swap(op->mutable_response());
305
3.45M
      if (op_resp.has_rows_data_sidecar()) {
306
3.45M
        op_resp.set_rows_data_sidecar(narrow_cast<int>(context.AddRpcSidecar(op->rows_data())));
307
3.45M
      }
308
3.45M
    }
309
310
728k
    return Status::OK();
311
729k
  }
312
};
313
314
client::YBSessionPtr CreateSession(
315
4.95k
    client::YBClient* client, const scoped_refptr<ClockBase>& clock) {
316
4.95k
  auto result = std::make_shared<client::YBSession>(client, clock);
317
4.95k
  result->SetForceConsistentRead(client::ForceConsistentRead::kTrue);
318
4.95k
  result->set_allow_local_calls_in_curr_thread(false);
319
4.95k
  return result;
320
4.95k
}
321
322
} // namespace
323
324
PgClientSession::PgClientSession(
325
    client::YBClient* client, const scoped_refptr<ClockBase>& clock,
326
    std::reference_wrapper<const TransactionPoolProvider> transaction_pool_provider,
327
    PgTableCache* table_cache, uint64_t id)
328
    : client_(*client),
329
      transaction_pool_provider_(transaction_pool_provider.get()),
330
      table_cache_(*table_cache), id_(id),
331
      session_(CreateSession(client, clock)),
332
      ddl_session_(CreateSession(client, clock)),
333
1.65k
      catalog_session_(CreateSession(client, clock)) {
334
1.65k
}
335
336
936k
uint64_t PgClientSession::id() const {
337
936k
  return id_;
338
936k
}
339
340
Status PgClientSession::CreateTable(
341
1.41k
    const PgCreateTableRequestPB& req, PgCreateTableResponsePB* resp, rpc::RpcContext* context) {
342
1.41k
  PgCreateTable helper(req);
343
1.41k
  RETURN_NOT_OK(helper.Prepare());
344
1.41k
  const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction()));
345
1.41k
  RETURN_NOT_OK(helper.Exec(&client(), metadata, context->GetClientDeadline()));
346
0
  VLOG_WITH_PREFIX(1) << __func__ << ": " << req.table_name();
347
1.41k
  const auto& indexed_table_id = helper.indexed_table_id();
348
1.41k
  if (indexed_table_id.IsValid()) {
349
150
    table_cache_.Invalidate(indexed_table_id.GetYBTableId());
350
150
  }
351
1.41k
  return Status::OK();
352
1.41k
}
353
354
Status PgClientSession::CreateDatabase(
355
    const PgCreateDatabaseRequestPB& req, PgCreateDatabaseResponsePB* resp,
356
22
    rpc::RpcContext* context) {
357
22
  return client().CreateNamespace(
358
22
      req.database_name(),
359
22
      YQL_DATABASE_PGSQL,
360
22
      "" /* creator_role_name */,
361
22
      GetPgsqlNamespaceId(req.database_oid()),
362
22
      req.source_database_oid() != kPgInvalidOid
363
22
          ? GetPgsqlNamespaceId(req.source_database_oid()) : "",
364
22
      req.next_oid(),
365
22
      VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction())),
366
22
      req.colocated(),
367
22
      context->GetClientDeadline());
368
22
}
369
370
Status PgClientSession::DropDatabase(
371
21
    const PgDropDatabaseRequestPB& req, PgDropDatabaseResponsePB* resp, rpc::RpcContext* context) {
372
21
  return client().DeleteNamespace(
373
21
      req.database_name(),
374
21
      YQL_DATABASE_PGSQL,
375
21
      GetPgsqlNamespaceId(req.database_oid()),
376
21
      context->GetClientDeadline());
377
21
}
378
379
Status PgClientSession::DropTable(
380
1.17k
    const PgDropTableRequestPB& req, PgDropTableResponsePB* resp, rpc::RpcContext* context) {
381
1.17k
  const auto yb_table_id = PgObjectId::GetYBTableIdFromPB(req.table_id());
382
1.17k
  if (req.index()) {
383
141
    client::YBTableName indexed_table;
384
141
    RETURN_NOT_OK(client().DeleteIndexTable(
385
141
        yb_table_id, &indexed_table, true, context->GetClientDeadline()));
386
140
    indexed_table.SetIntoTableIdentifierPB(resp->mutable_indexed_table());
387
140
    table_cache_.Invalidate(indexed_table.table_id());
388
140
    table_cache_.Invalidate(yb_table_id);
389
140
    return Status::OK();
390
1.03k
  }
391
392
1.03k
  RETURN_NOT_OK(client().DeleteTable(yb_table_id, true, context->GetClientDeadline()));
393
1.03k
  table_cache_.Invalidate(yb_table_id);
394
1.03k
  return Status::OK();
395
1.03k
}
396
397
Status PgClientSession::AlterDatabase(
398
    const PgAlterDatabaseRequestPB& req, PgAlterDatabaseResponsePB* resp,
399
0
    rpc::RpcContext* context) {
400
0
  const auto alterer = client().NewNamespaceAlterer(
401
0
      req.database_name(), GetPgsqlNamespaceId(req.database_oid()));
402
0
  alterer->SetDatabaseType(YQL_DATABASE_PGSQL);
403
0
  alterer->RenameTo(req.new_name());
404
0
  return alterer->Alter(context->GetClientDeadline());
405
0
}
406
407
Status PgClientSession::AlterTable(
408
155
    const PgAlterTableRequestPB& req, PgAlterTableResponsePB* resp, rpc::RpcContext* context) {
409
155
  const auto table_id = PgObjectId::GetYBTableIdFromPB(req.table_id());
410
155
  const auto alterer = client().NewTableAlterer(table_id);
411
155
  const auto txn = VERIFY_RESULT(GetDdlTransactionMetadata(req.use_transaction()));
412
155
  if (txn) {
413
155
    alterer->part_of_transaction(txn);
414
155
  }
415
69
  for (const auto& add_column : req.add_columns()) {
416
69
    const auto yb_type = QLType::Create(static_cast<DataType>(add_column.attr_ybtype()));
417
69
    alterer->AddColumn(add_column.attr_name())
418
69
           ->Type(yb_type)->Order(add_column.attr_num())->PgTypeOid(add_column.attr_pgoid());
419
    // Do not set 'nullable' attribute as PgCreateTable::AddColumn() does not do it.
420
69
  }
421
1
  for (const auto& rename_column : req.rename_columns()) {
422
1
    alterer->AlterColumn(rename_column.old_name())->RenameTo(rename_column.new_name());
423
1
  }
424
50
  for (const auto& drop_column : req.drop_columns()) {
425
50
    alterer->DropColumn(drop_column);
426
50
  }
427
155
  if (!req.rename_table().table_name().empty()) {
428
42
    client::YBTableName new_table_name(
429
42
        YQL_DATABASE_PGSQL, req.rename_table().database_name(), req.rename_table().table_name());
430
42
    alterer->RenameTo(new_table_name);
431
42
  }
432
433
155
  alterer->timeout(context->GetClientDeadline() - CoarseMonoClock::now());
434
155
  RETURN_NOT_OK(alterer->Alter());
435
154
  table_cache_.Invalidate(table_id);
436
154
  return Status::OK();
437
155
}
438
439
Status PgClientSession::TruncateTable(
440
    const PgTruncateTableRequestPB& req, PgTruncateTableResponsePB* resp,
441
31
    rpc::RpcContext* context) {
442
31
  return client().TruncateTable(PgObjectId::GetYBTableIdFromPB(req.table_id()));
443
31
}
444
445
Status PgClientSession::BackfillIndex(
446
    const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp,
447
89
    rpc::RpcContext* context) {
448
89
  return client().BackfillIndex(
449
89
      PgObjectId::GetYBTableIdFromPB(req.table_id()), /* wait= */ true,
450
89
      context->GetClientDeadline());
451
89
}
452
453
Status PgClientSession::CreateTablegroup(
454
    const PgCreateTablegroupRequestPB& req, PgCreateTablegroupResponsePB* resp,
455
1
    rpc::RpcContext* context) {
456
1
  const auto id = PgObjectId::FromPB(req.tablegroup_id());
457
1
  auto tablespace_id = PgObjectId::FromPB(req.tablespace_id());
458
1
  auto s = client().CreateTablegroup(
459
1
      req.database_name(), GetPgsqlNamespaceId(id.database_oid),
460
1
      id.GetYBTablegroupId(),
461
1
      tablespace_id.IsValid() ? tablespace_id.GetYBTablespaceId() : "");
462
1
  if (s.ok()) {
463
1
    return Status::OK();
464
1
  }
465
466
0
  if (s.IsAlreadyPresent()) {
467
0
    return STATUS(InvalidArgument, "Duplicate tablegroup");
468
0
  }
469
470
0
  if (s.IsNotFound()) {
471
0
    return STATUS(InvalidArgument, "Database not found", req.database_name());
472
0
  }
473
474
0
  return STATUS_FORMAT(
475
0
      InvalidArgument, "Invalid table definition: $0",
476
0
      s.ToString(false /* include_file_and_line */, false /* include_code */));
477
0
}
478
479
Status PgClientSession::DropTablegroup(
480
    const PgDropTablegroupRequestPB& req, PgDropTablegroupResponsePB* resp,
481
1
    rpc::RpcContext* context) {
482
1
  const auto id = PgObjectId::FromPB(req.tablegroup_id());
483
1
  const auto status = client().DeleteTablegroup(
484
1
      GetPgsqlNamespaceId(id.database_oid),
485
1
      GetPgsqlTablegroupId(id.database_oid, id.object_oid));
486
1
  if (status.IsNotFound()) {
487
0
    return Status::OK();
488
0
  }
489
1
  return status;
490
1
}
491
492
Status PgClientSession::RollbackSubTransaction(
493
    const PgRollbackSubTransactionRequestPB& req, PgRollbackSubTransactionResponsePB* resp,
494
23.5k
    rpc::RpcContext* context) {
495
0
  VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString();
496
23.5k
  SCHECK(txn_, IllegalState,
497
23.5k
         Format("Rollback sub transaction $0, when not transaction is running",
498
23.5k
                req.sub_transaction_id()));
499
23.5k
  return txn_->RollbackSubTransaction(req.sub_transaction_id());
500
23.5k
}
501
502
Status PgClientSession::SetActiveSubTransaction(
503
    const PgSetActiveSubTransactionRequestPB& req, PgSetActiveSubTransactionResponsePB* resp,
504
48.8k
    rpc::RpcContext* context) {
505
0
  VLOG_WITH_PREFIX_AND_FUNC(2) << req.ShortDebugString();
506
507
48.8k
  if (req.has_options()) {
508
307
    RETURN_NOT_OK(BeginTransactionIfNecessary(req.options()));
509
307
    txn_serial_no_ = req.options().txn_serial_no();
510
307
  }
511
512
48.8k
  SCHECK(txn_, IllegalState,
513
48.8k
         Format("Set active sub transaction $0, when not transaction is running",
514
48.8k
                req.sub_transaction_id()));
515
516
48.8k
  txn_->SetActiveSubTransaction(req.sub_transaction_id());
517
48.8k
  return Status::OK();
518
48.8k
}
519
520
Status PgClientSession::FinishTransaction(
521
    const PgFinishTransactionRequestPB& req, PgFinishTransactionResponsePB* resp,
522
82.4k
    rpc::RpcContext* context) {
523
82.4k
  saved_priority_ = boost::none;
524
76.1k
  auto& txn = req.ddl_mode() ? ddl_txn_ : txn_;
525
82.4k
  if (!txn) {
526
0
    VLOG_WITH_PREFIX_AND_FUNC(2) << "ddl: " << req.ddl_mode() << ", no running transaction";
527
513
    return Status::OK();
528
513
  }
529
81.9k
  const auto txn_value = std::move(txn);
530
76.1k
  (req.ddl_mode() ? ddl_session_ : session_)->SetTransaction(nullptr);
531
532
81.9k
  if (req.commit()) {
533
78.6k
    const auto commit_status = txn_value->CommitFuture().get();
534
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(2)
535
18.4E
        << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id()
536
18.4E
        << ", commit: " << commit_status;
537
78.6k
    return commit_status;
538
78.6k
  }
539
540
3.27k
  VLOG_WITH_PREFIX_AND_FUNC(2)
541
17
      << "ddl: " << req.ddl_mode() << ", txn: " << txn_value->id() << ", abort";
542
3.27k
  txn_value->Abort();
543
3.27k
  return Status::OK();
544
3.27k
}
545
546
Status PgClientSession::Perform(
547
775k
    const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context) {
548
775k
  auto session = VERIFY_RESULT(SetupSession(req));
549
550
775k
  session->SetDeadline(context->GetClientDeadline());
551
552
775k
  auto ops = VERIFY_RESULT(PrepareOperations(req, session, &table_cache_));
553
775k
  auto data = std::make_shared<PerformData>(PerformData {
554
775k
    .session_id = id_,
555
775k
    .req = &req,
556
775k
    .resp = resp,
557
775k
    .context = std::move(*context),
558
775k
    .ops = std::move(ops),
559
775k
    .table_cache = &table_cache_,
560
775k
  });
561
775k
  session->FlushAsync([data](client::FlushStatus* flush_status) {
562
775k
    data->FlushDone(flush_status);
563
775k
  });
564
775k
  return Status::OK();
565
775k
}
566
567
775k
void PgClientSession::ProcessReadTimeManipulation(ReadTimeManipulation manipulation) {
568
775k
  switch (manipulation) {
569
23.6k
    case ReadTimeManipulation::RESET: {
570
        // If a txn_ has been created, session_->read_point() returns the read point stored in txn_.
571
23.6k
        ConsistentReadPoint* rp = session_->read_point();
572
23.6k
        rp->SetCurrentReadTime();
573
574
0
        VLOG(1) << "Setting current ht as read point " << rp->GetReadTime();
575
23.6k
      }
576
23.6k
      return;
577
1
    case ReadTimeManipulation::RESTART: {
578
1
        ConsistentReadPoint* rp = session_->read_point();
579
1
        rp->Restart();
580
581
0
        VLOG(1) << "Restarted read point " << rp->GetReadTime();
582
1
      }
583
1
      return;
584
751k
    case ReadTimeManipulation::NONE:
585
751k
      return;
586
0
    case ReadTimeManipulation::ReadTimeManipulation_INT_MIN_SENTINEL_DO_NOT_USE_:
587
0
    case ReadTimeManipulation::ReadTimeManipulation_INT_MAX_SENTINEL_DO_NOT_USE_:
588
0
      break;
589
0
  }
590
0
  FATAL_INVALID_ENUM_VALUE(ReadTimeManipulation, manipulation);
591
0
}
592
593
775k
Result<client::YBSession*> PgClientSession::SetupSession(const PgPerformRequestPB& req) {
594
775k
  client::YBSession* session;
595
775k
  client::YBTransaction* transaction;
596
597
775k
  const auto& options = req.options();
598
775k
  if (options.use_catalog_session()) {
599
194k
    session = catalog_session_.get();
600
194k
    transaction = nullptr;
601
580k
  } else if (options.ddl_mode()) {
602
200k
    RETURN_NOT_OK(GetDdlTransactionMetadata(true));
603
200k
    session = ddl_session_.get();
604
200k
    transaction = ddl_txn_.get();
605
380k
  } else {
606
380k
    RETURN_NOT_OK(BeginTransactionIfNecessary(options));
607
608
380k
    session = session_.get();
609
380k
    transaction = txn_.get();
610
380k
  }
611
612
62
  VLOG_WITH_PREFIX(4) << __func__ << ": " << options.ShortDebugString();
613
614
775k
  if (options.restart_transaction()) {
615
0
    if(options.ddl_mode()) {
616
0
      return STATUS(NotSupported, "Not supported to restart DDL transaction");
617
0
    }
618
0
    txn_ = VERIFY_RESULT(RestartTransaction(session, transaction));
619
0
    transaction = txn_.get();
620
775k
  } else {
621
775k
    ProcessReadTimeManipulation(options.read_time_manipulation());
622
775k
    if (options.has_read_time() &&
623
211k
        (options.read_time().has_read_ht() || options.use_catalog_session())) {
624
211k
      const auto read_time = options.read_time().has_read_ht()
625
192k
          ? ReadHybridTime::FromPB(options.read_time()) : ReadHybridTime();
626
211k
      session->SetReadPoint(read_time);
627
211k
      if (read_time) {
628
18.4E
        VLOG_WITH_PREFIX(3) << "Read time: " << read_time;
629
18.5k
      } else {
630
18.4E
        VLOG_WITH_PREFIX(3) << "Reset read time: " << session->read_point()->GetReadTime();
631
18.5k
      }
632
564k
    } else if (!transaction &&
633
191k
               (options.ddl_mode() || txn_serial_no_ != options.txn_serial_no())) {
634
59.8k
      session->SetReadPoint(client::Restart::kFalse);
635
0
      VLOG_WITH_PREFIX(3) << "New read time: " << session->read_point()->GetReadTime();
636
504k
    } else {
637
36
      VLOG_WITH_PREFIX(3) << "Keep read time: " << session->read_point()->GetReadTime();
638
504k
    }
639
775k
  }
640
641
775k
  if (options.defer_read_point()) {
642
    // This call is idempotent, meaning it has no effect after the first call.
643
0
    session->DeferReadPoint();
644
0
  }
645
646
775k
  if (!options.ddl_mode() && !options.use_catalog_session()) {
647
380k
    txn_serial_no_ = options.txn_serial_no();
648
649
380k
    const auto in_txn_limit = HybridTime::FromPB(options.in_txn_limit_ht());
650
380k
    if (in_txn_limit) {
651
34
      VLOG_WITH_PREFIX(3) << "In txn limit: " << in_txn_limit;
652
380k
      session->SetInTxnLimit(in_txn_limit);
653
380k
    }
654
380k
  }
655
775k
  return session;
656
775k
}
657
658
0
std::string PgClientSession::LogPrefix() {
659
0
  return SessionLogPrefix(id_);
660
0
}
661
662
380k
Status PgClientSession::BeginTransactionIfNecessary(const PgPerformOptionsPB& options) {
663
380k
  const auto isolation = static_cast<IsolationLevel>(options.isolation());
664
665
380k
  auto priority = options.priority();
666
380k
  if (txn_ && txn_serial_no_ != options.txn_serial_no()) {
667
1
    VLOG_WITH_PREFIX(2)
668
1
        << "Abort previous transaction, use existing priority: " << options.use_existing_priority()
669
1
        << ", new isolation: " << IsolationLevel_Name(isolation);
670
671
20.2k
    if (options.use_existing_priority()) {
672
20.2k
      saved_priority_ = txn_->GetPriority();
673
20.2k
    }
674
20.2k
    txn_->Abort();
675
20.2k
    session_->SetTransaction(nullptr);
676
20.2k
    txn_ = nullptr;
677
20.2k
  }
678
679
380k
  if (isolation == IsolationLevel::NON_TRANSACTIONAL) {
680
199k
    return Status::OK();
681
199k
  }
682
683
180k
  if (txn_) {
684
84.6k
    return txn_->isolation() != isolation
685
0
        ? STATUS_FORMAT(
686
84.6k
            IllegalState,
687
84.6k
            "Attempt to change isolation level of running transaction from $0 to $1",
688
84.6k
            txn_->isolation(), isolation)
689
84.6k
        : Status::OK();
690
84.6k
  }
691
692
96.3k
  txn_ = transaction_pool_provider_()->Take(
693
96.3k
      client::ForceGlobalTransaction(options.force_global_transaction()));
694
96.3k
  if ((isolation == IsolationLevel::SNAPSHOT_ISOLATION ||
695
74.6k
           isolation == IsolationLevel::READ_COMMITTED) &&
696
22.1k
      txn_serial_no_ == options.txn_serial_no()) {
697
628
    txn_->InitWithReadPoint(isolation, std::move(*session_->read_point()));
698
0
    VLOG_WITH_PREFIX(2) << "Start transaction " << IsolationLevel_Name(isolation)
699
0
                        << ", id: " << txn_->id()
700
0
                        << ", kept read time: " << txn_->read_point().GetReadTime();
701
95.7k
  } else {
702
4
    VLOG_WITH_PREFIX(2) << "Start transaction " << IsolationLevel_Name(isolation)
703
4
                        << ", id: " << txn_->id()
704
4
                        << ", new read time";
705
95.7k
    RETURN_NOT_OK(txn_->Init(isolation));
706
95.7k
  }
707
96.3k
  if (saved_priority_) {
708
20.2k
    priority = *saved_priority_;
709
20.2k
    saved_priority_ = boost::none;
710
20.2k
  }
711
96.3k
  txn_->SetPriority(priority);
712
96.3k
  session_->SetTransaction(txn_);
713
714
96.3k
  return Status::OK();
715
96.3k
}
716
717
Result<const TransactionMetadata*> PgClientSession::GetDdlTransactionMetadata(
718
201k
    bool use_transaction) {
719
201k
  if (!use_transaction) {
720
0
    return nullptr;
721
0
  }
722
201k
  if (!ddl_txn_) {
723
5.82k
    const auto isolation = FLAGS_ysql_serializable_isolation_for_ddl_txn
724
5.82k
        ? IsolationLevel::SERIALIZABLE_ISOLATION : IsolationLevel::SNAPSHOT_ISOLATION;
725
5.82k
    ddl_txn_ = VERIFY_RESULT(transaction_pool_provider_()->TakeAndInit(isolation));
726
5.82k
    ddl_txn_metadata_ = VERIFY_RESULT(Copy(ddl_txn_->GetMetadata().get()));
727
5.82k
    ddl_session_->SetTransaction(ddl_txn_);
728
5.82k
  }
729
730
201k
  return &ddl_txn_metadata_;
731
201k
}
732
733
2.91k
client::YBClient& PgClientSession::client() {
734
2.91k
  return client_;
735
2.91k
}
736
737
Result<client::YBTransactionPtr> PgClientSession::RestartTransaction(
738
0
    client::YBSession* session, client::YBTransaction* transaction) {
739
0
  if (!transaction) {
740
0
    SCHECK(session->IsRestartRequired(), IllegalState,
741
0
           "Attempted to restart when session does not require restart");
742
743
0
    const auto old_read_time = session->read_point()->GetReadTime();
744
0
    session->SetReadPoint(client::Restart::kTrue);
745
0
    const auto new_read_time = session->read_point()->GetReadTime();
746
0
    VLOG_WITH_PREFIX(3) << "Restarted read: " << old_read_time << " => " << new_read_time;
747
0
    LOG_IF_WITH_PREFIX(DFATAL, old_read_time == new_read_time)
748
0
        << "Read time did not change during restart: " << old_read_time << " => " << new_read_time;
749
0
    return nullptr;
750
0
  }
751
752
0
  if (!transaction->IsRestartRequired()) {
753
0
    return STATUS(IllegalState, "Attempted to restart when transaction does not require restart");
754
0
  }
755
0
  const auto result = VERIFY_RESULT(transaction->CreateRestartedTransaction());
756
0
  session->SetTransaction(result);
757
0
  VLOG_WITH_PREFIX(3) << "Restarted transaction";
758
0
  return result;
759
0
}
760
761
}  // namespace tserver
762
}  // namespace yb