YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_session.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_YQL_PGGATE_PG_SESSION_H_
15
#define YB_YQL_PGGATE_PG_SESSION_H_
16
17
#include <unordered_set>
18
19
#include <boost/optional.hpp>
20
#include <boost/unordered_set.hpp>
21
22
#include "yb/client/client_fwd.h"
23
#include "yb/client/session.h"
24
#include "yb/client/transaction.h"
25
26
#include "yb/common/pg_types.h"
27
#include "yb/common/transaction.h"
28
29
#include "yb/gutil/ref_counted.h"
30
31
#include "yb/server/hybrid_clock.h"
32
33
#include "yb/tserver/tserver_util_fwd.h"
34
35
#include "yb/util/oid_generator.h"
36
#include "yb/util/result.h"
37
38
#include "yb/yql/pggate/pg_client.h"
39
#include "yb/yql/pggate/pg_gate_fwd.h"
40
#include "yb/yql/pggate/pg_env.h"
41
#include "yb/yql/pggate/pg_tabledesc.h"
42
#include "yb/yql/pggate/pg_txn_manager.h"
43
44
namespace yb {
45
namespace pggate {
46
47
YB_STRONGLY_TYPED_BOOL(OpBuffered);
48
YB_STRONGLY_TYPED_BOOL(InvalidateOnPgClient);
49
50
class PgTxnManager;
51
class PgSession;
52
53
struct BufferableOperations {
54
  PgsqlOps operations;
55
  PgObjectIds relations;
56
57
4.02M
  void Add(PgsqlOpPtr op, const PgObjectId& relation) {
58
4.02M
    operations.push_back(std::move(op));
59
4.02M
    relations.push_back(relation);
60
4.02M
  }
61
62
1.62M
  void Clear() {
63
1.62M
    operations.clear();
64
1.62M
    relations.clear();
65
1.62M
  }
66
67
31.0k
  void Swap(BufferableOperations* rhs) {
68
31.0k
    operations.swap(rhs->operations);
69
31.0k
    relations.swap(rhs->relations);
70
31.0k
  }
71
72
8.26M
  bool empty() const {
73
8.26M
    return operations.empty();
74
8.26M
  }
75
76
301k
  size_t size() const {
77
301k
    return operations.size();
78
301k
  }
79
};
80
81
struct PgForeignKeyReference {
82
  PgForeignKeyReference(PgOid table_id, std::string ybctid);
83
  PgOid table_id;
84
  std::string ybctid;
85
};
86
87
// Represents row id (ybctid) from the DocDB's point of view.
88
class RowIdentifier {
89
 public:
90
  RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request);
91
  inline const std::string& ybctid() const;
92
  inline const std::string& table_id() const;
93
94
 private:
95
  const std::string* table_id_;
96
  const std::string* ybctid_;
97
  std::string        ybctid_holder_;
98
};
99
100
YB_STRONGLY_TYPED_BOOL(IsTransactionalSession);
101
YB_STRONGLY_TYPED_BOOL(IsReadOnlyOperation);
102
YB_STRONGLY_TYPED_BOOL(IsCatalogOperation);
103
104
class PerformFuture {
105
 public:
106
4.81M
  PerformFuture() = default;
107
  PerformFuture(std::future<PerformResult> future, PgSession* session, PgObjectIds* relations);
108
109
  bool Valid() const;
110
111
  CHECKED_STATUS Get();
112
 private:
113
  std::future<PerformResult> future_;
114
  PgSession* session_ = nullptr;
115
  PgObjectIds relations_;
116
};
117
118
// This class is not thread-safe as it is mostly used by a single-threaded PostgreSQL backend
119
// process.
120
class PgSession : public RefCountedThreadSafe<PgSession> {
121
 public:
122
  // Public types.
123
  typedef scoped_refptr<PgSession> ScopedRefPtr;
124
125
  // Constructors.
126
  PgSession(client::YBClient* client,
127
            PgClient* pg_client,
128
            const string& database_name,
129
            scoped_refptr<PgTxnManager> pg_txn_manager,
130
            scoped_refptr<server::HybridClock> clock,
131
            const tserver::TServerSharedObject* tserver_shared_object,
132
            const YBCPgCallbacks& pg_callbacks);
133
  virtual ~PgSession();
134
135
  // Resets the read point for catalog tables.
136
  // Next catalog read operation will read the very latest catalog's state.
137
  void ResetCatalogReadPoint();
138
139
  //------------------------------------------------------------------------------------------------
140
  // Operations on Session.
141
  //------------------------------------------------------------------------------------------------
142
143
  CHECKED_STATUS ConnectDatabase(const std::string& database_name);
144
145
  CHECKED_STATUS IsDatabaseColocated(const PgOid database_oid, bool *colocated);
146
147
  //------------------------------------------------------------------------------------------------
148
  // Operations on Database Objects.
149
  //------------------------------------------------------------------------------------------------
150
151
  // API for database operations.
152
  CHECKED_STATUS DropDatabase(const std::string& database_name, PgOid database_oid);
153
154
  CHECKED_STATUS GetCatalogMasterVersion(uint64_t *version);
155
156
  // API for sequences data operations.
157
  CHECKED_STATUS CreateSequencesDataTable();
158
159
  CHECKED_STATUS InsertSequenceTuple(int64_t db_oid,
160
                                     int64_t seq_oid,
161
                                     uint64_t ysql_catalog_version,
162
                                     int64_t last_val,
163
                                     bool is_called);
164
165
  CHECKED_STATUS UpdateSequenceTuple(int64_t db_oid,
166
                                     int64_t seq_oid,
167
                                     uint64_t ysql_catalog_version,
168
                                     int64_t last_val,
169
                                     bool is_called,
170
                                     boost::optional<int64_t> expected_last_val,
171
                                     boost::optional<bool> expected_is_called,
172
                                     bool* skipped);
173
174
  CHECKED_STATUS ReadSequenceTuple(int64_t db_oid,
175
                                   int64_t seq_oid,
176
                                   uint64_t ysql_catalog_version,
177
                                   int64_t *last_val,
178
                                   bool *is_called);
179
180
  CHECKED_STATUS DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid);
181
182
  CHECKED_STATUS DeleteDBSequences(int64_t db_oid);
183
184
  //------------------------------------------------------------------------------------------------
185
  // Operations on Tablegroup.
186
  //------------------------------------------------------------------------------------------------
187
188
  CHECKED_STATUS DropTablegroup(const PgOid database_oid,
189
                                PgOid tablegroup_oid);
190
191
  // API for schema operations.
192
  // TODO(neil) Schema should be a sub-database that have some specialized property.
193
  CHECKED_STATUS CreateSchema(const std::string& schema_name, bool if_not_exist);
194
  CHECKED_STATUS DropSchema(const std::string& schema_name, bool if_exist);
195
196
  // API for table operations.
197
  CHECKED_STATUS DropTable(const PgObjectId& table_id);
198
  CHECKED_STATUS DropIndex(
199
      const PgObjectId& index_id,
200
      client::YBTableName* indexed_table_name = nullptr);
201
  Result<PgTableDescPtr> LoadTable(const PgObjectId& table_id);
202
  void InvalidateTableCache(
203
      const PgObjectId& table_id, InvalidateOnPgClient invalidate_on_pg_client);
204
205
  // Start operation buffering. Buffering must not be in progress.
206
  CHECKED_STATUS StartOperationsBuffering();
207
  // Flush all pending buffered operation and stop further buffering.
208
  // Buffering must be in progress.
209
  CHECKED_STATUS StopOperationsBuffering();
210
  // Drop all pending buffered operations and stop further buffering. Buffering may be in any state.
211
  void ResetOperationsBuffering();
212
213
  // Flush all pending buffered operations. Buffering mode remain unchanged.
214
  CHECKED_STATUS FlushBufferedOperations();
215
  // Drop all pending buffered operations. Buffering mode remain unchanged.
216
  void DropBufferedOperations();
217
218
  PgIsolationLevel GetIsolationLevel();
219
220
  // Run (apply + flush) the given operation to read and write database content.
221
  // Template is used here to handle all kind of derived operations
222
  // (shared_ptr<YBPgsqlReadOp>, shared_ptr<YBPgsqlWriteOp>)
223
  // without implicitly conversion to shared_ptr<YBPgsqlReadOp>.
224
  // Conversion to shared_ptr<YBPgsqlOp> will be done later and result will re-used with move.
225
  template<class Op, class... Args>
226
  Result<PerformFuture> RunOneAsync(const Op& op, Args&& ...args) {
227
    return DoRunAsync(&op, 1, std::forward<Args>(args)...);
228
  }
229
230
  // Run (apply + flush) list of given operations to read and write database content.
231
  template<class Op, class... Args>
232
  Result<PerformFuture> RunAsync(const std::vector<Op>& ops, Args&& ...args) {
233
    return DoRunAsync(ops.data(), ops.size(), std::forward<Args>(args)...);
234
  }
235
236
  template<class Op, class... Args>
237
2.74M
  Result<PerformFuture> RunAsync(const Op* ops, size_t ops_count, Args&& ...args) {
238
2.74M
    return DoRunAsync(ops, ops_count, std::forward<Args>(args)...);
239
2.74M
  }
240
241
  // Smart driver functions.
242
  // -------------
243
  Result<client::TabletServersInfo> ListTabletServers();
244
245
  //------------------------------------------------------------------------------------------------
246
  // Access functions.
247
  // TODO(neil) Need to double check these code later.
248
  // - This code in CQL processor has a lock. CQL comment: It can be accessed by multiple calls in
249
  //   parallel so they need to be thread-safe for shared reads / exclusive writes.
250
  //
251
  // - Currently, for each session, server executes the client requests sequentially, so the
252
  //   the following mutex is not necessary. I don't think we're capable of parallel-processing
253
  //   multiple statements within one session.
254
  //
255
  // TODO(neil) MUST ADD A LOCK FOR ACCESSING AND MODIFYING DATABASE BECAUSE WE USE THIS VARIABLE
256
  // AS INDICATOR FOR ALIVE OR DEAD SESSIONS.
257
258
  // Access functions for connected database.
259
0
  const char* connected_dbname() const {
260
0
    return connected_database_.c_str();
261
0
  }
262
263
0
  const string& connected_database() const {
264
0
    return connected_database_;
265
0
  }
266
0
  void set_connected_database(const std::string& database) {
267
0
    connected_database_ = database;
268
0
  }
269
0
  void reset_connected_database() {
270
0
    connected_database_ = "";
271
0
  }
272
273
  // Generate a new random and unique rowid. It is a v4 UUID.
274
483k
  string GenerateNewRowid() {
275
483k
    return GenerateObjectId(true /* binary_id */);
276
483k
  }
277
278
  void InvalidateAllTablesCache();
279
280
347k
  void InvalidateForeignKeyReferenceCache() {
281
347k
    fk_reference_cache_.clear();
282
347k
    fk_reference_intent_.clear();
283
347k
  }
284
285
  // Check if initdb has already been run before. Needed to make initdb idempotent.
286
  Result<bool> IsInitDbDone();
287
288
  // Return the local tserver's catalog version stored in shared memory or an error if the shared
289
  // memory has not been initialized (e.g. in initdb).
290
  Result<uint64_t> GetSharedCatalogVersion();
291
  // Return the local tserver's postgres authentication key stored in shared memory or an error if
292
  // the shared memory has not been initialized (e.g. in initdb).
293
  Result<uint64_t> GetSharedAuthKey();
294
295
  using YbctidReader =
296
      std::function<Result<std::vector<std::string>>(PgOid, const std::vector<Slice>&)>;
297
  Result<bool> ForeignKeyReferenceExists(
298
      PgOid table_id, const Slice& ybctid, const YbctidReader& reader);
299
  void AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid);
300
  void AddForeignKeyReference(PgOid table_id, const Slice& ybctid);
301
302
  // Deletes the row referenced by ybctid from FK reference cache.
303
  void DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid);
304
305
  CHECKED_STATUS PatchStatus(const Status& status, const PgObjectIds& relations);
306
307
  Result<int> TabletServerCount(bool primary_only = false);
308
309
  // Sets the specified timeout in the rpc service.
310
  void SetTimeout(int timeout_ms);
311
312
  CHECKED_STATUS ValidatePlacement(const string& placement_info);
313
314
  void TrySetCatalogReadPoint(const ReadHybridTime& read_ht);
315
316
1.71k
  PgClient& pg_client() const {
317
1.71k
    return pg_client_;
318
1.71k
  }
319
320
  bool ShouldUseFollowerReads() const;
321
322
  CHECKED_STATUS SetActiveSubTransaction(SubTransactionId id);
323
  CHECKED_STATUS RollbackSubTransaction(SubTransactionId id);
324
325
 private:
326
  using Flusher = std::function<Status(BufferableOperations, IsTransactionalSession)>;
327
328
  CHECKED_STATUS FlushBufferedOperationsImpl(const Flusher& flusher);
329
  CHECKED_STATUS FlushOperations(BufferableOperations ops, IsTransactionalSession transactional);
330
331
  // Run multiple operations.
332
  template<class Op>
333
  Result<PerformFuture> DoRunAsync(const Op* op,
334
                                   size_t ops_count,
335
                                   const PgTableDesc& table,
336
                                   const PgObjectId& relation_id,
337
                                   uint64_t* read_time,
338
2.74M
                                   bool force_non_bufferable) {
339
2.74M
    SCHECK_GT(ops_count, 0ULL, IllegalState, "Operation list must not be empty");
340
2.74M
    const IsTransactionalSession transactional(VERIFY_RESULT(
341
2.74M
        ShouldHandleTransactionally(table, **op)));
342
2.74M
    RunHelper runner(relation_id, this, transactional);
343
6.77M
    for (auto end = op + ops_count; op != end; ++op) {
344
4.03M
      RETURN_NOT_OK(runner.Apply(table.schema(), *op, read_time, force_non_bufferable));
345
4.03M
    }
346
2.74M
    return runner.Flush();
347
2.74M
  }
348
349
  // Helper class to run multiple operations on single session.
350
  // This class allows to keep implementation of RunAsync template method simple
351
  // without moving its implementation details into header file.
352
  class RunHelper {
353
   public:
354
    RunHelper(
355
        const PgObjectId& relation_id, PgSession* pg_session, IsTransactionalSession transactional);
356
    CHECKED_STATUS Apply(
357
        const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable);
358
    Result<PerformFuture> Flush();
359
360
   private:
361
    const PgObjectId& relation_id_;
362
    PgSession& pg_session_;
363
    const IsTransactionalSession transactional_;
364
    BufferableOperations& buffer_;
365
    BufferableOperations operations_;
366
  };
367
368
  // Flush buffered write operations from the given buffer.
369
  Status FlushBufferedWriteOperations(BufferableOperations* write_ops, bool transactional);
370
371
  // Whether we should use transactional or non-transactional session.
372
  // Error is raised in case operation requires transaction session but it has not been created.
373
  Result<bool> ShouldHandleTransactionally(const PgTableDesc& table, const PgsqlOp& op);
374
375
  void Perform(PgsqlOps* operations, const PerformCallback& callback);
376
377
  void UpdateInTxnLimit(uint64_t* read_time);
378
379
  // YBSession to execute operations.
380
  std::shared_ptr<client::YBSession> session_;
381
382
  PgClient& pg_client_;
383
384
  // Connected database.
385
  std::string connected_database_;
386
387
  // A transaction manager allowing to begin/abort/commit transactions.
388
  scoped_refptr<PgTxnManager> pg_txn_manager_;
389
390
  const scoped_refptr<server::HybridClock> clock_;
391
392
  // YBSession to read data from catalog tables.
393
  boost::optional<ReadHybridTime> catalog_read_time_;
394
395
  // Execution status.
396
  Status status_;
397
  string errmsg_;
398
399
  CoarseTimePoint invalidate_table_cache_time_;
400
  std::unordered_map<PgObjectId, PgTableDescPtr, PgObjectIdHash> table_cache_;
401
  boost::unordered_set<PgForeignKeyReference> fk_reference_cache_;
402
  boost::unordered_set<PgForeignKeyReference> fk_reference_intent_;
403
404
  // Should write operations be buffered?
405
  bool buffering_enabled_ = false;
406
  BufferableOperations buffered_ops_;
407
  BufferableOperations buffered_txn_ops_;
408
  std::unordered_set<RowIdentifier, boost::hash<RowIdentifier>> buffered_keys_;
409
410
  HybridTime in_txn_limit_;
411
  bool use_catalog_session_ = false;
412
413
  const tserver::TServerSharedObject* const tserver_shared_object_;
414
  const YBCPgCallbacks& pg_callbacks_;
415
};
416
417
}  // namespace pggate
418
}  // namespace yb
419
420
#endif // YB_YQL_PGGATE_PG_SESSION_H_