YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_session.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/pggate/pg_session.h"
17
18
#include <memory>
19
20
#include <boost/optional.hpp>
21
22
#include "yb/client/batcher.h"
23
#include "yb/client/error.h"
24
#include "yb/client/schema.h"
25
#include "yb/client/session.h"
26
#include "yb/client/table.h"
27
#include "yb/client/tablet_server.h"
28
#include "yb/client/transaction.h"
29
#include "yb/client/yb_op.h"
30
#include "yb/client/yb_table_name.h"
31
32
#include "yb/common/pg_types.h"
33
#include "yb/common/pgsql_error.h"
34
#include "yb/common/placement_info.h"
35
#include "yb/common/ql_expr.h"
36
#include "yb/common/ql_value.h"
37
#include "yb/common/row_mark.h"
38
#include "yb/common/schema.h"
39
#include "yb/common/transaction_error.h"
40
41
#include "yb/docdb/doc_key.h"
42
#include "yb/docdb/primitive_value.h"
43
#include "yb/docdb/value_type.h"
44
45
#include "yb/gutil/casts.h"
46
47
#include "yb/tserver/pg_client.pb.h"
48
#include "yb/tserver/tserver_shared_mem.h"
49
50
#include "yb/util/flag_tags.h"
51
#include "yb/util/format.h"
52
#include "yb/util/result.h"
53
#include "yb/util/shared_mem.h"
54
#include "yb/util/status_format.h"
55
#include "yb/util/string_util.h"
56
57
#include "yb/yql/pggate/pg_client.h"
58
#include "yb/yql/pggate/pg_expr.h"
59
#include "yb/yql/pggate/pg_op.h"
60
#include "yb/yql/pggate/pg_txn_manager.h"
61
#include "yb/yql/pggate/pggate_flags.h"
62
#include "yb/yql/pggate/ybc_pggate.h"
63
64
using namespace std::literals;
65
66
DEFINE_int32(ysql_wait_until_index_permissions_timeout_ms, 60 * 60 * 1000, // 60 min.
67
             "DEPRECATED: use backfill_index_client_rpc_timeout_ms instead.");
68
TAG_FLAG(ysql_wait_until_index_permissions_timeout_ms, advanced);
69
DECLARE_int32(TEST_user_ddl_operation_timeout_sec);
70
71
DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests.");
72
73
namespace yb {
74
namespace pggate {
75
76
using std::make_shared;
77
using std::unique_ptr;
78
using std::shared_ptr;
79
using std::string;
80
81
using client::YBSession;
82
using client::YBMetaDataCache;
83
using client::YBSchema;
84
using client::YBOperation;
85
using client::YBTable;
86
using client::YBTableName;
87
using client::YBTableType;
88
89
using yb::master::GetNamespaceInfoResponsePB;
90
91
using yb::tserver::TServerSharedObject;
92
93
namespace {
94
95
189k
docdb::PrimitiveValue NullValue(SortingType sorting) {
96
189k
  using SortingType = SortingType;
97
98
189k
  return docdb::PrimitiveValue(
99
189k
      sorting == SortingType::kAscendingNullsLast || 
sorting == SortingType::kDescendingNullsLast188k
100
189k
          ? 
docdb::ValueType::kNullHigh553
101
189k
          : 
docdb::ValueType::kNullLow188k
);
102
189k
}
103
104
void InitKeyColumnPrimitiveValues(
105
    const google::protobuf::RepeatedPtrField<PgsqlExpressionPB> &column_values,
106
    const Schema &schema,
107
    size_t start_idx,
108
2.64M
    vector<docdb::PrimitiveValue> *components) {
109
2.64M
  size_t column_idx = start_idx;
110
4.05M
  for (const auto& column_value : column_values) {
111
4.05M
    const auto sorting_type = schema.column(column_idx).sorting_type();
112
4.05M
    if (column_value.has_value()) {
113
4.05M
      const auto& value = column_value.value();
114
4.05M
      components->push_back(
115
4.05M
          IsNull(value)
116
4.05M
          ? 
NullValue(sorting_type)189k
117
4.05M
          : 
docdb::PrimitiveValue::FromQLValuePB(value, sorting_type)3.86M
);
118
4.05M
    } else {
119
      // TODO(neil) The current setup only works for CQL as it assumes primary key value must not
120
      // be dependent on any column values. This needs to be fixed as PostgreSQL expression might
121
      // require a read from a table.
122
      //
123
      // Use regular executor for now.
124
127
      QLExprExecutor executor;
125
127
      QLExprResult result;
126
127
      auto s = executor.EvalExpr(column_value, nullptr, result.Writer());
127
128
127
      components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type));
129
127
    }
130
4.05M
    ++column_idx;
131
4.05M
  }
132
2.64M
}
133
134
892k
bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const string& table_id) {
135
892k
  return request.table_id() == table_id ||
136
892k
      
(864k
request.has_index_request()864k
&&
IsTableUsedByRequest(request.index_request(), table_id)316k
);
137
892k
}
138
139
601
bool IsTableUsedByRequest(const PgsqlWriteRequestPB& request, const string& table_id) {
140
601
  return request.table_id() == table_id;
141
601
}
142
143
576k
bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) {
144
576k
  if (op.is_read()) {
145
575k
    return IsTableUsedByRequest(down_cast<const PgsqlReadOp&>(op).read_request(), table_id);
146
575k
  } else {
147
605
    return IsTableUsedByRequest(down_cast<const PgsqlWriteOp&>(op).write_request(), table_id);
148
605
  }
149
576k
}
150
151
struct PgForeignKeyReferenceLightweight {
152
  PgOid table_id;
153
  Slice ybctid;
154
};
155
156
13.2M
size_t ForeignKeyReferenceHash(PgOid table_id, const char* begin, const char* end) {
157
13.2M
  size_t hash = 0;
158
13.2M
  boost::hash_combine(hash, table_id);
159
13.2M
  boost::hash_range(hash, begin, end);
160
13.2M
  return hash;
161
13.2M
}
162
163
template<class Container>
164
5.59M
auto Find(const Container& container, PgOid table_id, const Slice& ybctid) {
165
5.59M
  return container.find(PgForeignKeyReferenceLightweight{table_id, ybctid},
166
5.59M
      [](const auto& k) {
167
5.59M
        return ForeignKeyReferenceHash(k.table_id, k.ybctid.cdata(), k.ybctid.cend()); },
168
5.59M
      [](const auto& l, const auto& r) {
169
4.32M
        return l.table_id == r.table_id && 
l.ybctid == r.ybctid3.84M
; });
170
5.59M
}
171
172
template<class Container>
173
726k
bool Erase(Container* container, PgOid table_id, const Slice& ybctid) {
174
726k
  const auto it = Find(*container, table_id, ybctid);
175
726k
  if (it != container->end()) {
176
4.18k
    container->erase(it);
177
4.18k
    return true;
178
4.18k
  }
179
722k
  return false;
180
726k
}
181
182
YB_DEFINE_ENUM(SessionType, (kRegular)(kTransactional)(kCatalog));
183
184
Result<bool> ShouldHandleTransactionally(
185
20.4M
  const PgTxnManager& txn_manager, const PgTableDesc& table, const PgsqlOp& op) {
186
20.4M
  if (!table.schema().table_properties().is_transactional() ||
187
20.4M
      
!op.need_transaction()20.4M
||
188
20.4M
      
YBCIsInitDbModeEnvVarSet()19.5M
) {
189
1.23M
    return false;
190
1.23M
  }
191
19.2M
  const auto has_non_ddl_txn = txn_manager.IsTxnInProgress();
192
19.2M
  if (!table.schema().table_properties().is_ysql_catalog_table()) {
193
15.4M
    SCHECK(has_non_ddl_txn, IllegalState, "Transactional operation requires transaction");
194
15.4M
    return true;
195
15.4M
  }
196
  // Previously, yb_non_ddl_txn_for_sys_tables_allowed flag caused CREATE VIEW to fail with
197
  // read restart error because subsequent cache refresh used an outdated txn to read from the
198
  // system catalog,
199
  // As a quick fix, we prevent yb_non_ddl_txn_for_sys_tables_allowed from affecting reads.
200
3.75M
  if (txn_manager.IsDdlMode() || 
(1.45M
yb_non_ddl_txn_for_sys_tables_allowed1.45M
&&
has_non_ddl_txn119k
)) {
201
2.41M
    return true;
202
2.41M
  }
203
1.34M
  if (op.is_write()) {
204
    // For consistent read from catalog tables all write operations must be done in transaction.
205
2
    return STATUS_FORMAT(IllegalState,
206
2
                         "Transaction for catalog table write operation '$0' not found",
207
2
                         table.table_name().table_name());
208
2
  }
209
1.34M
  return false;
210
1.34M
}
211
212
Result<SessionType> GetRequiredSessionType(
213
20.4M
  const PgTxnManager& txn_manager, const PgTableDesc& table, const PgsqlOp& op) {
214
20.4M
  if (VERIFY_RESULT(ShouldHandleTransactionally(txn_manager, table, op))) {
215
17.9M
    return SessionType::kTransactional;
216
17.9M
  }
217
218
2.57M
  return !YBCIsInitDbModeEnvVarSet() && 
table.schema().table_properties().is_ysql_catalog_table()2.31M
219
2.57M
      ? 
SessionType::kCatalog1.33M
220
2.57M
      : 
SessionType::kRegular1.23M
;
221
20.4M
}
222
223
} // namespace
224
225
226
PerformFuture::PerformFuture(
227
    std::future<PerformResult> future, PgSession* session, PgObjectIds* relations)
228
2.16M
    : future_(std::move(future)), session_(session), relations_(std::move(*relations)) {}
229
230
35.5M
bool PerformFuture::Valid() const {
231
35.5M
  return session_ != nullptr;
232
35.5M
}
233
234
2.17M
CHECKED_STATUS PerformFuture::Get() {
235
2.17M
  auto result = future_.get();
236
2.17M
  auto session = session_;
237
2.17M
  session_ = nullptr;
238
2.17M
  session->TrySetCatalogReadPoint(result.catalog_read_time);
239
2.17M
  return session->PatchStatus(result.status, relations_);
240
2.17M
}
241
242
//--------------------------------------------------------------------------------------------------
243
// Class PgSession::RunHelper
244
//--------------------------------------------------------------------------------------------------
245
246
class PgSession::RunHelper {
247
 public:
248
  RunHelper(PgSession* pg_session, SessionType session_type)
249
      : pg_session_(*pg_session),
250
        session_type_(session_type),
251
        buffer_(IsTransactional(session_type) ? pg_session_.buffered_txn_ops_
252
8.91M
                                              : pg_session_.buffered_ops_) {
253
8.91M
  }
254
255
  CHECKED_STATUS Apply(const PgTableDesc& table,
256
                       const PgsqlOpPtr& op,
257
                       uint64_t* read_time,
258
11.5M
                       bool force_non_bufferable) {
259
11.5M
    auto& buffered_keys = pg_session_.buffered_keys_;
260
    // Try buffering this operation if it is a write operation, buffering is enabled and no
261
    // operations have been already applied to current session (yb session does not exist).
262
11.5M
    if (operations_.empty() && 
pg_session_.buffering_enabled_8.92M
&&
263
11.5M
        
!force_non_bufferable8.37M
&&
op->is_write()8.25M
) {
264
7.15M
      const auto& wop = down_cast<PgsqlWriteOp&>(*op).write_request();
265
      // Check for buffered operation related to same row.
266
      // If multiple operations are performed in context of single RPC second operation will not
267
      // see the results of first operation on DocDB side.
268
      // Multiple operations on same row must be performed in context of different RPC.
269
      // Flush is required in this case.
270
7.15M
      RowIdentifier row_id(table.schema(), wop);
271
7.15M
      if (PREDICT_FALSE(!buffered_keys.insert(row_id).second)) {
272
22.8k
        RETURN_NOT_OK(pg_session_.FlushBufferedOperations());
273
22.8k
        buffered_keys.insert(row_id);
274
22.8k
      }
275
7.15M
      if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
276
0
        LOG(INFO) << "Buffering operation: " << wop.ShortDebugString();
277
0
      }
278
7.15M
      buffer_.Add(op, table.id());
279
      // Flush buffers in case limit of operations in single RPC exceeded.
280
7.15M
      return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size)
281
7.15M
          ? 
Status::OK()7.13M
282
7.15M
          : 
pg_session_.FlushBufferedOperations()10.2k
;
283
7.15M
    }
284
4.41M
    bool read_only = op->is_read();
285
    // Flush all buffered operations (if any) before performing non-bufferable operation
286
4.41M
    if (!buffered_keys.empty()) {
287
126k
      SCHECK(operations_.empty(),
288
126k
            IllegalState,
289
126k
            "Buffered operations must be flushed before applying first non-bufferable operation");
290
      // Buffered operations can't be combined within single RPC with non bufferable operation
291
      // in case non bufferable operation has preset read_time.
292
      // Buffered operations must be flushed independently in this case.
293
      // Also operations for catalog session can be combined with buffered operations
294
      // as catalog session is used for read-only operations.
295
126k
      bool full_flush_required = (IsTransactional() && 
read_time117k
&&
*read_time117k
) ||
IsCatalog()123k
;
296
      // Check for buffered operation that affected same table as current operation.
297
703k
      for (auto i = buffered_keys.begin(); !full_flush_required && 
i != buffered_keys.end()670k
;
++i576k
) {
298
576k
        full_flush_required = IsTableUsedByOperation(*op, i->table_id());
299
576k
      }
300
126k
      if (full_flush_required) {
301
33.2k
        RETURN_NOT_OK(pg_session_.FlushBufferedOperations());
302
93.7k
      } else {
303
93.7k
        RETURN_NOT_OK(pg_session_.FlushBufferedOperationsImpl(
304
93.7k
            [this](auto ops, auto transactional) -> Status {
305
93.7k
              if (transactional == IsTransactional()) {
306
                // Save buffered operations for further applying before non-buffered operation.
307
93.7k
                operations_.Swap(&ops);
308
93.7k
                return Status::OK();
309
93.7k
              }
310
93.7k
              return pg_session_.FlushOperations(std::move(ops), transactional);
311
93.7k
            }));
312
93.7k
        read_only = read_only && 
operations_.empty()93.1k
;
313
93.7k
      }
314
126k
    }
315
316
4.41M
    if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
317
0
      LOG(INFO) << "Applying operation: " << op->ToString();
318
0
    }
319
320
4.41M
    operations_.Add(op, table.id());
321
322
4.41M
    if (!IsTransactional()) {
323
734k
      return Status::OK();
324
734k
    }
325
326
3.67M
    TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange;
327
3.67M
    if (pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
328
55.9k
      txn_priority_requirement = kHighestPriority;
329
3.62M
    } else if (op->is_read()) {
330
3.60M
      const auto& read_req = down_cast<PgsqlReadOp&>(*op).read_request();
331
3.60M
      auto row_mark_type = GetRowMarkTypeFromPB(read_req);
332
3.60M
      read_only = read_only && 
!IsValidRowMarkType(row_mark_type)3.51M
;
333
3.60M
      if (RowMarkNeedsHigherPriority((RowMarkType) row_mark_type)) {
334
5.77k
        txn_priority_requirement = kHigherPriorityRange;
335
5.77k
      }
336
3.60M
    }
337
3.67M
    pg_session_.UpdateInTxnLimit(read_time);
338
3.67M
    return pg_session_.pg_txn_manager_->CalculateIsolation(read_only, txn_priority_requirement);
339
4.41M
  }
340
341
8.92M
  Result<PerformFuture> Flush() {
342
8.92M
    if (operations_.empty()) {
343
      // All operations were buffered, no need to flush.
344
7.15M
      return PerformFuture();
345
7.15M
    }
346
347
1.76M
    auto promise = std::make_shared<std::promise<PerformResult>>();
348
349
1.76M
    pg_session_.Perform(&operations_.operations, IsCatalog(), [promise](PerformResult result) {
350
1.76M
      promise->set_value(result);
351
1.76M
    });
352
1.76M
    return PerformFuture(promise->get_future(), &pg_session_, &operations_.relations);
353
8.92M
  }
354
355
 private:
356
13.5M
  static bool IsTransactional(SessionType type) {
357
13.5M
    return type == SessionType::kTransactional;
358
13.5M
  }
359
360
4.63M
  bool IsTransactional() const {
361
4.63M
    return IsTransactional(session_type_);
362
4.63M
  }
363
364
1.89M
  bool IsCatalog() const {
365
1.89M
    return session_type_ == SessionType::kCatalog;
366
1.89M
  }
367
368
  PgSession& pg_session_;
369
  const SessionType session_type_;
370
  BufferableOperations& buffer_;
371
  BufferableOperations operations_;
372
};
373
374
//--------------------------------------------------------------------------------------------------
375
// Class PgForeignKeyReference
376
//--------------------------------------------------------------------------------------------------
377
378
PgForeignKeyReference::PgForeignKeyReference(PgOid tid, std::string yid) :
379
4.72M
  table_id(tid), ybctid(std::move(yid)) {
380
4.72M
}
381
382
4.17M
bool operator==(const PgForeignKeyReference& k1, const PgForeignKeyReference& k2) {
383
4.17M
  return k1.table_id == k2.table_id && 
k1.ybctid == k2.ybctid3.85M
;
384
4.17M
}
385
386
7.65M
size_t hash_value(const PgForeignKeyReference& key) {
387
7.65M
  return ForeignKeyReferenceHash(
388
7.65M
      key.table_id, key.ybctid.c_str(), key.ybctid.c_str() + key.ybctid.length());
389
7.65M
}
390
391
//--------------------------------------------------------------------------------------------------
392
// Class RowIdentifier
393
//--------------------------------------------------------------------------------------------------
394
395
RowIdentifier::RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request)
396
7.15M
    : table_id_(&request.table_id()) {
397
7.15M
  if (request.has_ybctid_column_value()) {
398
5.82M
    ybctid_ = &request.ybctid_column_value().value().binary_value();
399
5.82M
  } else {
400
1.32M
    vector<docdb::PrimitiveValue> hashed_components;
401
1.32M
    vector<docdb::PrimitiveValue> range_components;
402
1.32M
    InitKeyColumnPrimitiveValues(request.partition_column_values(),
403
1.32M
                                 schema,
404
1.32M
                                 0 /* start_idx */,
405
1.32M
                                 &hashed_components);
406
1.32M
    InitKeyColumnPrimitiveValues(request.range_column_values(),
407
1.32M
                                 schema,
408
1.32M
                                 schema.num_hash_key_columns(),
409
1.32M
                                 &range_components);
410
1.32M
    if (hashed_components.empty()) {
411
813k
      ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer();
412
813k
    } else {
413
511k
      ybctid_holder_ = docdb::DocKey(request.hash_code(),
414
511k
                                     std::move(hashed_components),
415
511k
                                     std::move(range_components)).Encode().ToStringBuffer();
416
511k
    }
417
1.32M
    ybctid_ = nullptr;
418
1.32M
  }
419
7.15M
}
420
421
10.7M
const string& RowIdentifier::ybctid() const {
422
10.7M
  return ybctid_ ? 
*ybctid_8.83M
:
ybctid_holder_1.87M
;
423
10.7M
}
424
425
11.8M
const string& RowIdentifier::table_id() const {
426
11.8M
  return *table_id_;
427
11.8M
}
428
429
2.03M
bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) {
430
2.03M
  return k1.table_id() == k2.table_id() && 
k1.ybctid() == k2.ybctid()1.76M
;
431
2.03M
}
432
433
7.17M
size_t hash_value(const RowIdentifier& key) {
434
7.17M
  size_t hash = 0;
435
7.17M
  boost::hash_combine(hash, key.table_id());
436
7.17M
  boost::hash_combine(hash, key.ybctid());
437
7.17M
  return hash;
438
7.17M
}
439
440
//--------------------------------------------------------------------------------------------------
441
// Class PgSession
442
//--------------------------------------------------------------------------------------------------
443
444
PgSession::PgSession(
445
    PgClient* pg_client,
446
    const string& database_name,
447
    scoped_refptr<PgTxnManager> pg_txn_manager,
448
    scoped_refptr<server::HybridClock> clock,
449
    const tserver::TServerSharedObject* tserver_shared_object,
450
    const YBCPgCallbacks& pg_callbacks)
451
    : pg_client_(*pg_client),
452
      pg_txn_manager_(std::move(pg_txn_manager)),
453
      clock_(std::move(clock)),
454
      tserver_shared_object_(tserver_shared_object),
455
6.09k
      pg_callbacks_(pg_callbacks) {
456
6.09k
}
457
458
6.06k
PgSession::~PgSession() {
459
6.06k
}
460
461
//--------------------------------------------------------------------------------------------------
462
463
6.08k
Status PgSession::ConnectDatabase(const string& database_name) {
464
6.08k
  connected_database_ = database_name;
465
6.08k
  return Status::OK();
466
6.08k
}
467
468
5.72k
Status PgSession::IsDatabaseColocated(const PgOid database_oid, bool *colocated) {
469
5.72k
  auto resp = VERIFY_RESULT(pg_client_.GetDatabaseInfo(database_oid));
470
0
  *colocated = resp.colocated();
471
5.72k
  return Status::OK();
472
5.72k
}
473
474
//--------------------------------------------------------------------------------------------------
475
476
72
Status PgSession::DropDatabase(const string& database_name, PgOid database_oid) {
477
72
  tserver::PgDropDatabaseRequestPB req;
478
72
  req.set_database_name(database_name);
479
72
  req.set_database_oid(database_oid);
480
481
72
  RETURN_NOT_OK(pg_client_.DropDatabase(&req, CoarseTimePoint()));
482
71
  RETURN_NOT_OK(DeleteDBSequences(database_oid));
483
71
  return Status::OK();
484
71
}
485
486
22
Status PgSession::GetCatalogMasterVersion(uint64_t *version) {
487
22
  *version = VERIFY_RESULT(pg_client_.GetCatalogMasterVersion());
488
0
  return Status::OK();
489
22
}
490
491
0
Status PgSession::CreateSequencesDataTable() {
492
0
  return pg_client_.CreateSequencesDataTable();
493
0
}
494
495
Status PgSession::InsertSequenceTuple(int64_t db_oid,
496
                                      int64_t seq_oid,
497
                                      uint64_t ysql_catalog_version,
498
                                      int64_t last_val,
499
295
                                      bool is_called) {
500
295
  return pg_client_.InsertSequenceTuple(
501
295
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called);
502
295
}
503
504
Result<bool> PgSession::UpdateSequenceTuple(int64_t db_oid,
505
                                            int64_t seq_oid,
506
                                            uint64_t ysql_catalog_version,
507
                                            int64_t last_val,
508
                                            bool is_called,
509
                                            boost::optional<int64_t> expected_last_val,
510
2.97k
                                            boost::optional<bool> expected_is_called) {
511
2.97k
  return pg_client_.UpdateSequenceTuple(
512
2.97k
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called, expected_last_val,
513
2.97k
      expected_is_called);
514
2.97k
}
515
516
Result<std::pair<int64_t, bool>> PgSession::ReadSequenceTuple(int64_t db_oid,
517
                                                              int64_t seq_oid,
518
3.23k
                                                              uint64_t ysql_catalog_version) {
519
3.23k
  return pg_client_.ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version);
520
3.23k
}
521
522
282
Status PgSession::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) {
523
282
  return pg_client_.DeleteSequenceTuple(db_oid, seq_oid);
524
282
}
525
526
71
Status PgSession::DeleteDBSequences(int64_t db_oid) {
527
71
  return pg_client_.DeleteDBSequences(db_oid);
528
71
}
529
530
//--------------------------------------------------------------------------------------------------
531
532
3.48k
Status PgSession::DropTable(const PgObjectId& table_id) {
533
3.48k
  tserver::PgDropTableRequestPB req;
534
3.48k
  table_id.ToPB(req.mutable_table_id());
535
3.48k
  return ResultToStatus(pg_client_.DropTable(&req, CoarseTimePoint()));
536
3.48k
}
537
538
Status PgSession::DropIndex(
539
    const PgObjectId& index_id,
540
669
    client::YBTableName* indexed_table_name) {
541
669
  tserver::PgDropTableRequestPB req;
542
669
  index_id.ToPB(req.mutable_table_id());
543
669
  req.set_index(true);
544
669
  auto result = 
VERIFY_RESULT667
(pg_client_.DropTable(&req, CoarseTimePoint()));667
545
667
  if (indexed_table_name) {
546
667
    *indexed_table_name = std::move(result);
547
667
  }
548
667
  return Status::OK();
549
669
}
550
551
Status PgSession::DropTablegroup(const PgOid database_oid,
552
0
                                 PgOid tablegroup_oid) {
553
0
  tserver::PgDropTablegroupRequestPB req;
554
0
  PgObjectId tablegroup_id(database_oid, tablegroup_oid);
555
0
  tablegroup_id.ToPB(req.mutable_tablegroup_id());
556
0
  Status s = pg_client_.DropTablegroup(&req, CoarseTimePoint());
557
0
  InvalidateTableCache(PgObjectId(database_oid, tablegroup_oid), InvalidateOnPgClient::kFalse);
558
0
  return s;
559
0
}
560
561
//--------------------------------------------------------------------------------------------------
562
563
16.7M
Result<PgTableDescPtr> PgSession::LoadTable(const PgObjectId& table_id) {
564
16.7M
  VLOG
(3) << "Loading table descriptor for " << table_id1.66k
;
565
566
16.7M
  auto cached_table_it = table_cache_.find(table_id);
567
16.7M
  bool exists = cached_table_it != table_cache_.end();
568
16.7M
  if (exists && 
cached_table_it->second16.5M
) {
569
16.5M
    return cached_table_it->second;
570
16.5M
  }
571
572
18.4E
  VLOG(4) << "Table cache MISS: " << table_id;
573
191k
  auto table = 
VERIFY_RESULT190k
(pg_client_.OpenTable(table_id, exists, invalidate_table_cache_time_));190k
574
0
  invalidate_table_cache_time_ = CoarseTimePoint();
575
190k
  if (exists) {
576
9
    cached_table_it->second = table;
577
190k
  } else {
578
190k
    table_cache_.emplace(table_id, table);
579
190k
  }
580
190k
  return table;
581
191k
}
582
583
void PgSession::InvalidateTableCache(
584
6.19k
    const PgObjectId& table_id, InvalidateOnPgClient invalidate_on_pg_client) {
585
6.19k
  if (invalidate_on_pg_client) {
586
    // Keep special record about this table_id, so when we would open this table again,
587
    // reopen flag will be sent to pg client service.
588
    // This flag means that pg client service should remove table from his cache and fetch
589
    // new data from master.
590
    // It is optional optimization, but some tests fails w/o it, since they expect that
591
    // local table information is updated after alter table operation.
592
9
    table_cache_[table_id] = nullptr;
593
6.18k
  } else {
594
6.18k
    auto it = table_cache_.find(table_id);
595
6.18k
    if (it != table_cache_.end() && 
it->second2.92k
) {
596
2.92k
      table_cache_.erase(it);
597
2.92k
    }
598
6.18k
  }
599
6.19k
}
600
601
1.23k
void PgSession::InvalidateAllTablesCache() {
602
1.23k
  invalidate_table_cache_time_ = CoarseMonoClock::now();
603
1.23k
  table_cache_.clear();
604
1.23k
}
605
606
667k
Status PgSession::StartOperationsBuffering() {
607
667k
  SCHECK(!buffering_enabled_, IllegalState, "Buffering has been already started");
608
667k
  if (PREDICT_FALSE(!buffered_keys_.empty())) {
609
0
    LOG(DFATAL) << "Buffering hasn't been started yet but "
610
0
                << buffered_keys_.size()
611
0
                << " buffered operations found";
612
0
  }
613
667k
  buffering_enabled_ = true;
614
667k
  return Status::OK();
615
667k
}
616
617
610k
Status PgSession::StopOperationsBuffering() {
618
610k
  SCHECK(buffering_enabled_, IllegalState, "Buffering hasn't been started");
619
610k
  buffering_enabled_ = false;
620
610k
  return FlushBufferedOperations();
621
610k
}
622
623
114k
void PgSession::ResetOperationsBuffering() {
624
114k
  DropBufferedOperations();
625
114k
  buffering_enabled_ = false;
626
114k
}
627
628
1.82M
Status PgSession::FlushBufferedOperations() {
629
1.82M
  return FlushBufferedOperationsImpl([this](auto ops, auto txn) {
630
400k
    return this->FlushOperations(std::move(ops), txn);
631
400k
  });
632
1.82M
}
633
634
249k
void PgSession::DropBufferedOperations() {
635
249k
  VLOG_IF(1, !buffered_keys_.empty())
636
193
          << "Dropping " << buffered_keys_.size() << " pending operations";
637
249k
  buffered_keys_.clear();
638
249k
  buffered_ops_.Clear();
639
249k
  buffered_txn_ops_.Clear();
640
249k
}
641
642
3.95M
PgIsolationLevel PgSession::GetIsolationLevel() {
643
3.95M
  return pg_txn_manager_->GetPgIsolationLevel();
644
3.95M
}
645
646
1.92M
Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) {
647
1.92M
  auto ops = std::move(buffered_ops_);
648
1.92M
  auto txn_ops = std::move(buffered_txn_ops_);
649
1.92M
  buffered_keys_.clear();
650
1.92M
  buffered_ops_.Clear();
651
1.92M
  buffered_txn_ops_.Clear();
652
1.92M
  if (!ops.empty()) {
653
127k
    RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse));
654
127k
  }
655
1.92M
  if (!txn_ops.empty()) {
656
366k
    SCHECK(!YBCIsInitDbModeEnvVarSet(),
657
366k
           IllegalState,
658
366k
           "No transactional operations are expected in the initdb mode");
659
366k
    RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue));
660
366k
  }
661
1.86M
  return Status::OK();
662
1.92M
}
663
664
2
Result<bool> PgSession::IsInitDbDone() {
665
2
  return pg_client_.IsInitDbDone();
666
2
}
667
668
400k
Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional) {
669
400k
  DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size);
670
671
400k
  if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
672
0
    LOG(INFO) << "Flushing buffered operations, using "
673
0
              << (transactional ? "transactional" : "non-transactional")
674
0
              << " session (num ops: " << ops.size() << ")";
675
0
  }
676
677
400k
  if (transactional) {
678
277k
    TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange;
679
277k
    if (GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
680
10.5k
      txn_priority_requirement = kHighestPriority;
681
10.5k
    }
682
683
277k
    RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(false, txn_priority_requirement));
684
277k
    in_txn_limit_ = clock_->Now();
685
277k
  }
686
687
400k
  std::promise<PerformResult> promise;
688
400k
  Perform(
689
401k
    &ops.operations, /* use_catalog_session */ false, [&promise](const PerformResult& result) {
690
401k
    promise.set_value(result);
691
401k
  });
692
400k
  PerformFuture future(promise.get_future(), this, &ops.relations);
693
400k
  return future.Get();
694
400k
}
695
696
void PgSession::Perform(
697
2.16M
  PgsqlOps* operations, bool use_catalog_session, const PerformCallback& callback) {
698
2.16M
  tserver::PgPerformOptionsPB options;
699
700
2.16M
  if (use_catalog_session) {
701
669k
    if (catalog_read_time_) {
702
669k
      if (*catalog_read_time_) {
703
592k
        catalog_read_time_->ToPB(options.mutable_read_time());
704
592k
      } else {
705
77.2k
        options.mutable_read_time();
706
77.2k
      }
707
669k
    }
708
669k
    options.set_use_catalog_session(true);
709
1.49M
  } else {
710
1.49M
    pg_txn_manager_->SetupPerformOptions(&options);
711
712
1.49M
    if (in_txn_limit_ && 
pg_txn_manager_->IsTxnInProgress()1.42M
) {
713
1.42M
      options.set_in_txn_limit_ht(in_txn_limit_.ToUint64());
714
1.42M
    }
715
1.49M
  }
716
2.16M
  options.set_force_global_transaction(yb_force_global_transaction);
717
718
2.16M
  pg_client_.PerformAsync(&options, operations, callback);
719
2.16M
}
720
721
442k
Result<uint64_t> PgSession::GetSharedCatalogVersion() {
722
442k
  if (tserver_shared_object_) {
723
442k
    return (**tserver_shared_object_).ysql_catalog_version();
724
442k
  } else {
725
91
    return STATUS(NotSupported, "Tablet server shared memory has not been opened");
726
91
  }
727
442k
}
728
729
1.95k
Result<uint64_t> PgSession::GetSharedAuthKey() {
730
1.95k
  if (tserver_shared_object_) {
731
1.95k
    return (**tserver_shared_object_).postgres_auth_key();
732
1.95k
  } else {
733
0
    return STATUS(NotSupported, "Tablet server shared memory has not been opened");
734
0
  }
735
1.95k
}
736
737
Result<bool> PgSession::ForeignKeyReferenceExists(PgOid table_id,
738
                                                  const Slice& ybctid,
739
237k
                                                  const YbctidReader& reader) {
740
237k
  if (Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end()) {
741
235k
    return true;
742
235k
  }
743
744
  // Check existence of required FK intent.
745
  // Absence means the key was checked by previous batched request and was not found.
746
1.99k
  if (!Erase(&fk_reference_intent_, table_id, ybctid)) {
747
4
    return false;
748
4
  }
749
1.99k
  std::vector<Slice> ybctids;
750
1.99k
  const auto reserved_size = std::min<size_t>(FLAGS_ysql_session_max_batch_size,
751
1.99k
                                              fk_reference_intent_.size() + 1);
752
1.99k
  ybctids.reserve(reserved_size);
753
1.99k
  ybctids.push_back(ybctid);
754
  // TODO(dmitry): In case number of keys for same table > FLAGS_ysql_session_max_batch_size
755
  // two strategy are possible:
756
  // 1. select keys belonging to same tablet to reduce number of simultaneous RPC
757
  // 2. select keys belonging to different tablets to distribute reads among different nodes
758
195k
  const auto intent_match = [table_id](const auto& key) { return key.table_id == table_id; };
759
1.99k
  for (auto it = fk_reference_intent_.begin();
760
101k
       it != fk_reference_intent_.end() && 
ybctids.size() < FLAGS_ysql_session_max_batch_size100k
;
761
99.9k
       ++it) {
762
99.9k
    if (intent_match(*it)) {
763
99.8k
      ybctids.push_back(it->ybctid);
764
99.8k
    }
765
99.9k
  }
766
101k
  for (auto& r : 
VERIFY_RESULT1.94k
(reader(table_id, ybctids)))1.94k
{
767
101k
    fk_reference_cache_.emplace(table_id, std::move(r));
768
101k
  }
769
  // Remove used intents.
770
1.94k
  auto intent_count_for_remove = ybctids.size() - 1;
771
1.94k
  if (intent_count_for_remove == fk_reference_intent_.size()) {
772
1.69k
    fk_reference_intent_.clear();
773
1.69k
  } else {
774
244
    for (auto it = fk_reference_intent_.begin();
775
96.3k
        it != fk_reference_intent_.end() && 
intent_count_for_remove > 096.3k
;) {
776
96.0k
      if (intent_match(*it)) {
777
96.0k
        it = fk_reference_intent_.erase(it);
778
96.0k
        --intent_count_for_remove;
779
96.0k
      } else {
780
4
        ++it;
781
4
      }
782
96.0k
    }
783
244
  }
784
1.94k
  return Find(fk_reference_cache_, table_id, ybctid) != fk_reference_cache_.end();
785
1.99k
}
786
787
245k
void PgSession::AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid) {
788
245k
  if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) {
789
241k
    fk_reference_intent_.emplace(table_id, ybctid.ToBuffer());
790
241k
  }
791
245k
}
792
793
4.38M
void PgSession::AddForeignKeyReference(PgOid table_id, const Slice& ybctid) {
794
4.38M
  if (Find(fk_reference_cache_, table_id, ybctid) == fk_reference_cache_.end()) {
795
4.38M
    fk_reference_cache_.emplace(table_id, ybctid.ToBuffer());
796
4.38M
  }
797
4.38M
}
798
799
724k
void PgSession::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) {
800
724k
  Erase(&fk_reference_cache_, table_id, ybctid);
801
724k
}
802
803
2.17M
Status PgSession::PatchStatus(const Status& status, const PgObjectIds& relations) {
804
2.17M
  if (PgsqlRequestStatus(status) == PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR) {
805
11.1k
    auto op_index = OpIndex::ValueFromStatus(status);
806
11.1k
    if (op_index && *op_index < relations.size()) {
807
11.1k
      char constraint_name[0xFF];
808
11.1k
      constraint_name[sizeof(constraint_name) - 1] = 0;
809
11.1k
      pg_callbacks_.FetchUniqueConstraintName(relations[*op_index].object_oid,
810
11.1k
                                              constraint_name,
811
11.1k
                                              sizeof(constraint_name) - 1);
812
11.1k
      return STATUS(
813
11.1k
          AlreadyPresent,
814
11.1k
          Format("duplicate key value violates unique constraint \"$0\"", Slice(constraint_name)),
815
11.1k
          Slice(),
816
11.1k
          PgsqlError(YBPgErrorCode::YB_PG_UNIQUE_VIOLATION));
817
11.1k
    }
818
11.1k
  }
819
2.15M
  return status;
820
2.17M
}
821
822
6.04k
Result<int> PgSession::TabletServerCount(bool primary_only) {
823
6.04k
  return pg_client_.TabletServerCount(primary_only);
824
6.04k
}
825
826
4
Result<client::TabletServersInfo> PgSession::ListTabletServers() {
827
4
  return pg_client_.ListLiveTabletServers(false);
828
4
}
829
830
1.59M
bool PgSession::ShouldUseFollowerReads() const {
831
1.59M
  return pg_txn_manager_->ShouldUseFollowerReads();
832
1.59M
}
833
834
2
void PgSession::SetTimeout(const int timeout_ms) {
835
2
  pg_client_.SetTimeout(timeout_ms * 1ms);
836
2
}
837
838
2.37M
void PgSession::ResetCatalogReadPoint() {
839
2.37M
  catalog_read_time_ = ReadHybridTime();
840
2.37M
}
841
842
2.17M
void PgSession::TrySetCatalogReadPoint(const ReadHybridTime& read_ht) {
843
2.17M
  if (read_ht) {
844
77.1k
    catalog_read_time_ = read_ht;
845
77.1k
  }
846
2.17M
}
847
848
61.7k
Status PgSession::SetActiveSubTransaction(SubTransactionId id) {
849
  // It's required that we flush all buffered operations before changing the SubTransactionMetadata
850
  // used by the underlying batcher and RPC logic, as this will snapshot the current
851
  // SubTransactionMetadata for use in construction of RPCs for already-queued operations, thereby
852
  // ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here,
853
  // already queued operations may incorrectly use this newly modified SubTransactionMetadata when
854
  // they are eventually sent to DocDB.
855
61.7k
  RETURN_NOT_OK(FlushBufferedOperations());
856
61.7k
  tserver::PgPerformOptionsPB* options_ptr = nullptr;
857
61.7k
  tserver::PgPerformOptionsPB options;
858
61.7k
  if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) {
859
7.28k
    auto txn_priority_requirement = kLowerPriorityRange;
860
7.28k
    if (pg_txn_manager_->GetPgIsolationLevel() == PgIsolationLevel::READ_COMMITTED) {
861
6.33k
      txn_priority_requirement = kHighestPriority;
862
6.33k
    }
863
7.28k
    RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation(
864
7.28k
        IsReadOnlyOperation::kFalse, txn_priority_requirement));
865
7.28k
    options_ptr = &options;
866
7.28k
    pg_txn_manager_->SetupPerformOptions(&options);
867
7.28k
  }
868
61.7k
  return pg_client_.SetActiveSubTransaction(id, options_ptr);
869
61.7k
}
870
871
13.5k
Status PgSession::RollbackSubTransaction(SubTransactionId id) {
872
  // TODO(savepoints) -- send async RPC to transaction status tablet, or rely on heartbeater to
873
  // eventually send this metadata.
874
  // See comment in SetActiveSubTransaction -- we must flush buffered operations before updating any
875
  // SubTransactionMetadata.
876
13.5k
  RETURN_NOT_OK(FlushBufferedOperations());
877
13.5k
  return pg_client_.RollbackSubTransaction(id);
878
13.5k
}
879
880
3.67M
void PgSession::UpdateInTxnLimit(uint64_t* read_time) {
881
3.67M
  if (!read_time) {
882
0
    return;
883
0
  }
884
885
3.67M
  if (!*read_time) {
886
748k
    *read_time = clock_->Now().ToUint64();
887
748k
  }
888
3.67M
  in_txn_limit_ = HybridTime(*read_time);
889
3.67M
}
890
891
1
Status PgSession::ValidatePlacement(const string& placement_info) {
892
1
  tserver::PgValidatePlacementRequestPB req;
893
894
1
  Result<PlacementInfoConverter::Placement> result =
895
1
      PlacementInfoConverter::FromString(placement_info);
896
897
  // For validation, if there is no replica_placement option, we default to the
898
  // cluster configuration which the user is responsible for maintaining
899
1
  if (!result.ok() && 
result.status().IsInvalidArgument()0
) {
900
0
    return Status::OK();
901
0
  }
902
903
1
  RETURN_NOT_OK(result);
904
905
1
  PlacementInfoConverter::Placement placement = result.get();
906
1
  for (const auto& block : placement.placement_infos) {
907
1
    auto pb = req.add_placement_infos();
908
1
    pb->set_cloud(block.cloud);
909
1
    pb->set_region(block.region);
910
1
    pb->set_zone(block.zone);
911
1
    pb->set_min_num_replicas(block.min_num_replicas);
912
1
  }
913
1
  req.set_num_replicas(placement.num_replicas);
914
915
1
  return pg_client_.ValidatePlacement(&req);
916
1
}
917
918
Result<PerformFuture> PgSession::RunAsync(
919
8.92M
  const OperationGenerator& generator, uint64_t* read_time, bool force_non_bufferable) {
920
8.92M
  auto table_op = generator();
921
8.92M
  SCHECK(table_op.operation, IllegalState, "Operation list must not be empty");
922
8.92M
  const auto* table = table_op.table;
923
8.92M
  const auto* op = table_op.operation;
924
8.92M
  const auto group_session_type = 
VERIFY_RESULT8.92M
(GetRequiredSessionType(
925
8.92M
    *pg_txn_manager_, *table, **op));
926
0
  RunHelper runner(this, group_session_type);
927
20.4M
  for (; table_op.operation; 
table_op = generator()11.5M
) {
928
11.5M
    table = table_op.table;
929
11.5M
    op = table_op.operation;
930
11.5M
    const auto op_session_type = VERIFY_RESULT(GetRequiredSessionType(
931
11.5M
      *pg_txn_manager_, *table, **op));
932
11.5M
    SCHECK_EQ(op_session_type,
933
11.5M
              group_session_type,
934
11.5M
              IllegalState,
935
11.5M
              "Operations on different sessions can't be mixed");
936
11.5M
    RETURN_NOT_OK(runner.Apply(*table, *op, read_time, force_non_bufferable));
937
11.5M
  }
938
8.92M
  return runner.Flush();
939
8.92M
}
940
941
}  // namespace pggate
942
}  // namespace yb