YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_session.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_YQL_PGGATE_PG_SESSION_H_
15
#define YB_YQL_PGGATE_PG_SESSION_H_
16
17
#include <unordered_set>
18
19
#include <boost/optional.hpp>
20
#include <boost/unordered_set.hpp>
21
22
#include "yb/client/client_fwd.h"
23
#include "yb/client/session.h"
24
#include "yb/client/transaction.h"
25
26
#include "yb/common/pg_types.h"
27
#include "yb/common/transaction.h"
28
29
#include "yb/gutil/ref_counted.h"
30
31
#include "yb/server/hybrid_clock.h"
32
33
#include "yb/tserver/tserver_util_fwd.h"
34
35
#include "yb/util/oid_generator.h"
36
#include "yb/util/result.h"
37
38
#include "yb/yql/pggate/pg_client.h"
39
#include "yb/yql/pggate/pg_gate_fwd.h"
40
#include "yb/yql/pggate/pg_env.h"
41
#include "yb/yql/pggate/pg_tabledesc.h"
42
#include "yb/yql/pggate/pg_txn_manager.h"
43
44
namespace yb {
45
namespace pggate {
46
47
YB_STRONGLY_TYPED_BOOL(OpBuffered);
48
YB_STRONGLY_TYPED_BOOL(InvalidateOnPgClient);
49
50
class PgTxnManager;
51
class PgSession;
52
53
struct BufferableOperations {
54
  PgsqlOps operations;
55
  PgObjectIds relations;
56
57
11.5M
  void Add(PgsqlOpPtr op, const PgObjectId& relation) {
58
11.5M
    operations.push_back(std::move(op));
59
11.5M
    relations.push_back(relation);
60
11.5M
  }
61
62
4.34M
  void Clear() {
63
4.34M
    operations.clear();
64
4.34M
    relations.clear();
65
4.34M
  }
66
67
93.6k
  void Swap(BufferableOperations* rhs) {
68
93.6k
    operations.swap(rhs->operations);
69
93.6k
    relations.swap(rhs->relations);
70
93.6k
  }
71
72
24.5M
  bool empty() const {
73
24.5M
    return operations.empty();
74
24.5M
  }
75
76
800k
  size_t size() const {
77
800k
    return operations.size();
78
800k
  }
79
};
80
81
struct PgForeignKeyReference {
82
  PgForeignKeyReference(PgOid table_id, std::string ybctid);
83
  PgOid table_id;
84
  std::string ybctid;
85
};
86
87
// Represents row id (ybctid) from the DocDB's point of view.
88
class RowIdentifier {
89
 public:
90
  RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request);
91
  inline const std::string& ybctid() const;
92
  inline const std::string& table_id() const;
93
94
 private:
95
  const std::string* table_id_;
96
  const std::string* ybctid_;
97
  std::string        ybctid_holder_;
98
};
99
100
YB_STRONGLY_TYPED_BOOL(IsTransactionalSession);
101
YB_STRONGLY_TYPED_BOOL(IsReadOnlyOperation);
102
YB_STRONGLY_TYPED_BOOL(IsCatalogOperation);
103
104
class PerformFuture {
105
 public:
106
15.9M
  PerformFuture() = default;
107
  PerformFuture(std::future<PerformResult> future, PgSession* session, PgObjectIds* relations);
108
109
  bool Valid() const;
110
111
  CHECKED_STATUS Get();
112
 private:
113
  std::future<PerformResult> future_;
114
  PgSession* session_ = nullptr;
115
  PgObjectIds relations_;
116
};
117
118
// This class is not thread-safe as it is mostly used by a single-threaded PostgreSQL backend
119
// process.
120
class PgSession : public RefCountedThreadSafe<PgSession> {
121
 public:
122
  // Public types.
123
  typedef scoped_refptr<PgSession> ScopedRefPtr;
124
125
  // Constructors.
126
  PgSession(PgClient* pg_client,
127
            const string& database_name,
128
            scoped_refptr<PgTxnManager> pg_txn_manager,
129
            scoped_refptr<server::HybridClock> clock,
130
            const tserver::TServerSharedObject* tserver_shared_object,
131
            const YBCPgCallbacks& pg_callbacks);
132
  virtual ~PgSession();
133
134
  // Resets the read point for catalog tables.
135
  // Next catalog read operation will read the very latest catalog's state.
136
  void ResetCatalogReadPoint();
137
138
  //------------------------------------------------------------------------------------------------
139
  // Operations on Session.
140
  //------------------------------------------------------------------------------------------------
141
142
  CHECKED_STATUS ConnectDatabase(const std::string& database_name);
143
144
  CHECKED_STATUS IsDatabaseColocated(const PgOid database_oid, bool *colocated);
145
146
  //------------------------------------------------------------------------------------------------
147
  // Operations on Database Objects.
148
  //------------------------------------------------------------------------------------------------
149
150
  // API for database operations.
151
  CHECKED_STATUS DropDatabase(const std::string& database_name, PgOid database_oid);
152
153
  CHECKED_STATUS GetCatalogMasterVersion(uint64_t *version);
154
155
  // API for sequences data operations.
156
  CHECKED_STATUS CreateSequencesDataTable();
157
158
  CHECKED_STATUS InsertSequenceTuple(int64_t db_oid,
159
                                     int64_t seq_oid,
160
                                     uint64_t ysql_catalog_version,
161
                                     int64_t last_val,
162
                                     bool is_called);
163
164
  Result<bool> UpdateSequenceTuple(int64_t db_oid,
165
                                   int64_t seq_oid,
166
                                   uint64_t ysql_catalog_version,
167
                                   int64_t last_val,
168
                                   bool is_called,
169
                                   boost::optional<int64_t> expected_last_val,
170
                                   boost::optional<bool> expected_is_called);
171
172
  Result<std::pair<int64_t, bool>> ReadSequenceTuple(int64_t db_oid,
173
                                                     int64_t seq_oid,
174
                                                     uint64_t ysql_catalog_version);
175
176
  CHECKED_STATUS DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid);
177
178
  CHECKED_STATUS DeleteDBSequences(int64_t db_oid);
179
180
  //------------------------------------------------------------------------------------------------
181
  // Operations on Tablegroup.
182
  //------------------------------------------------------------------------------------------------
183
184
  CHECKED_STATUS DropTablegroup(const PgOid database_oid,
185
                                PgOid tablegroup_oid);
186
187
  // API for schema operations.
188
  // TODO(neil) Schema should be a sub-database that have some specialized property.
189
  CHECKED_STATUS CreateSchema(const std::string& schema_name, bool if_not_exist);
190
  CHECKED_STATUS DropSchema(const std::string& schema_name, bool if_exist);
191
192
  // API for table operations.
193
  CHECKED_STATUS DropTable(const PgObjectId& table_id);
194
  CHECKED_STATUS DropIndex(
195
      const PgObjectId& index_id,
196
      client::YBTableName* indexed_table_name = nullptr);
197
  Result<PgTableDescPtr> LoadTable(const PgObjectId& table_id);
198
  void InvalidateTableCache(
199
      const PgObjectId& table_id, InvalidateOnPgClient invalidate_on_pg_client);
200
201
  // Start operation buffering. Buffering must not be in progress.
202
  CHECKED_STATUS StartOperationsBuffering();
203
  // Flush all pending buffered operation and stop further buffering.
204
  // Buffering must be in progress.
205
  CHECKED_STATUS StopOperationsBuffering();
206
  // Drop all pending buffered operations and stop further buffering. Buffering may be in any state.
207
  void ResetOperationsBuffering();
208
209
  // Flush all pending buffered operations. Buffering mode remain unchanged.
210
  CHECKED_STATUS FlushBufferedOperations();
211
  // Drop all pending buffered operations. Buffering mode remain unchanged.
212
  void DropBufferedOperations();
213
214
  PgIsolationLevel GetIsolationLevel();
215
216
  // Run (apply + flush) list of given operations to read and write database content.
217
  struct TableOperation {
218
    const PgsqlOpPtr* operation = nullptr;
219
    const PgTableDesc* table = nullptr;
220
  };
221
222
  using OperationGenerator = std::function<TableOperation()>;
223
224
  template<class... Args>
225
  Result<PerformFuture> RunAsync(
226
8.92M
    const PgsqlOpPtr* ops, size_t ops_count, const PgTableDesc& table, Args&&... args) {
227
20.4M
    const auto generator = [ops, end = ops + ops_count, &table]() mutable {
228
20.4M
        return ops != end
229
20.4M
            ? 
TableOperation { .operation = ops++, .table = &table }11.5M
230
20.4M
            : 
TableOperation()8.91M
;
231
20.4M
    };
232
8.92M
    return RunAsync(generator, std::forward<Args>(args)...);
233
8.92M
  }
234
235
  Result<PerformFuture> RunAsync(
236
    const OperationGenerator& generator, uint64_t* read_time, bool force_non_bufferable);
237
238
  // Smart driver functions.
239
  // -------------
240
  Result<client::TabletServersInfo> ListTabletServers();
241
242
  //------------------------------------------------------------------------------------------------
243
  // Access functions.
244
  // TODO(neil) Need to double check these code later.
245
  // - This code in CQL processor has a lock. CQL comment: It can be accessed by multiple calls in
246
  //   parallel so they need to be thread-safe for shared reads / exclusive writes.
247
  //
248
  // - Currently, for each session, server executes the client requests sequentially, so the
249
  //   the following mutex is not necessary. I don't think we're capable of parallel-processing
250
  //   multiple statements within one session.
251
  //
252
  // TODO(neil) MUST ADD A LOCK FOR ACCESSING AND MODIFYING DATABASE BECAUSE WE USE THIS VARIABLE
253
  // AS INDICATOR FOR ALIVE OR DEAD SESSIONS.
254
255
  // Access functions for connected database.
256
0
  const char* connected_dbname() const {
257
0
    return connected_database_.c_str();
258
0
  }
259
260
0
  const string& connected_database() const {
261
0
    return connected_database_;
262
0
  }
263
0
  void set_connected_database(const std::string& database) {
264
0
    connected_database_ = database;
265
0
  }
266
0
  void reset_connected_database() {
267
0
    connected_database_ = "";
268
0
  }
269
270
  // Generate a new random and unique rowid. It is a v4 UUID.
271
1.22M
  string GenerateNewRowid() {
272
1.22M
    return GenerateObjectId(true /* binary_id */);
273
1.22M
  }
274
275
  void InvalidateAllTablesCache();
276
277
916k
  void InvalidateForeignKeyReferenceCache() {
278
916k
    fk_reference_cache_.clear();
279
916k
    fk_reference_intent_.clear();
280
916k
  }
281
282
  // Check if initdb has already been run before. Needed to make initdb idempotent.
283
  Result<bool> IsInitDbDone();
284
285
  // Return the local tserver's catalog version stored in shared memory or an error if the shared
286
  // memory has not been initialized (e.g. in initdb).
287
  Result<uint64_t> GetSharedCatalogVersion();
288
  // Return the local tserver's postgres authentication key stored in shared memory or an error if
289
  // the shared memory has not been initialized (e.g. in initdb).
290
  Result<uint64_t> GetSharedAuthKey();
291
292
  using YbctidReader =
293
      std::function<Result<std::vector<std::string>>(PgOid, const std::vector<Slice>&)>;
294
  Result<bool> ForeignKeyReferenceExists(
295
      PgOid table_id, const Slice& ybctid, const YbctidReader& reader);
296
  void AddForeignKeyReferenceIntent(PgOid table_id, const Slice& ybctid);
297
  void AddForeignKeyReference(PgOid table_id, const Slice& ybctid);
298
299
  // Deletes the row referenced by ybctid from FK reference cache.
300
  void DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid);
301
302
  CHECKED_STATUS PatchStatus(const Status& status, const PgObjectIds& relations);
303
304
  Result<int> TabletServerCount(bool primary_only = false);
305
306
  // Sets the specified timeout in the rpc service.
307
  void SetTimeout(int timeout_ms);
308
309
  CHECKED_STATUS ValidatePlacement(const string& placement_info);
310
311
  void TrySetCatalogReadPoint(const ReadHybridTime& read_ht);
312
313
6.97k
  PgClient& pg_client() const {
314
6.97k
    return pg_client_;
315
6.97k
  }
316
317
  bool ShouldUseFollowerReads() const;
318
319
  CHECKED_STATUS SetActiveSubTransaction(SubTransactionId id);
320
  CHECKED_STATUS RollbackSubTransaction(SubTransactionId id);
321
322
 private:
323
  using Flusher = std::function<Status(BufferableOperations, IsTransactionalSession)>;
324
325
  CHECKED_STATUS FlushBufferedOperationsImpl(const Flusher& flusher);
326
  CHECKED_STATUS FlushOperations(BufferableOperations ops, IsTransactionalSession transactional);
327
328
  class RunHelper;
329
330
  // Flush buffered write operations from the given buffer.
331
  Status FlushBufferedWriteOperations(BufferableOperations* write_ops, bool transactional);
332
333
  void Perform(PgsqlOps* operations, bool use_catalog_session, const PerformCallback& callback);
334
335
  void UpdateInTxnLimit(uint64_t* read_time);
336
337
  PgClient& pg_client_;
338
339
  // Connected database.
340
  std::string connected_database_;
341
342
  // A transaction manager allowing to begin/abort/commit transactions.
343
  scoped_refptr<PgTxnManager> pg_txn_manager_;
344
345
  const scoped_refptr<server::HybridClock> clock_;
346
347
  // YBSession to read data from catalog tables.
348
  boost::optional<ReadHybridTime> catalog_read_time_;
349
350
  // Execution status.
351
  Status status_;
352
  string errmsg_;
353
354
  CoarseTimePoint invalidate_table_cache_time_;
355
  std::unordered_map<PgObjectId, PgTableDescPtr, PgObjectIdHash> table_cache_;
356
  boost::unordered_set<PgForeignKeyReference> fk_reference_cache_;
357
  boost::unordered_set<PgForeignKeyReference> fk_reference_intent_;
358
359
  // Should write operations be buffered?
360
  bool buffering_enabled_ = false;
361
  BufferableOperations buffered_ops_;
362
  BufferableOperations buffered_txn_ops_;
363
  std::unordered_set<RowIdentifier, boost::hash<RowIdentifier>> buffered_keys_;
364
365
  HybridTime in_txn_limit_;
366
367
  const tserver::TServerSharedObject* const tserver_shared_object_;
368
  const YBCPgCallbacks& pg_callbacks_;
369
};
370
371
}  // namespace pggate
372
}  // namespace yb
373
374
#endif // YB_YQL_PGGATE_PG_SESSION_H_