YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_txn_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pggate/pg_txn_manager.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/session.h"
18
#include "yb/client/transaction.h"
19
20
#include "yb/common/common.pb.h"
21
#include "yb/common/transaction_priority.h"
22
#include "yb/common/ybc_util.h"
23
24
#include "yb/rpc/rpc_controller.h"
25
26
#include "yb/tserver/pg_client.pb.h"
27
#include "yb/tserver/tserver_service.proxy.h"
28
#include "yb/tserver/tserver_shared_mem.h"
29
30
#include "yb/util/debug-util.h"
31
#include "yb/util/format.h"
32
#include "yb/util/logging.h"
33
#include "yb/util/random_util.h"
34
#include "yb/util/shared_mem.h"
35
#include "yb/util/status.h"
36
#include "yb/util/status_format.h"
37
38
#include "yb/yql/pggate/pg_client.h"
39
#include "yb/yql/pggate/pggate_flags.h"
40
#include "yb/yql/pggate/ybc_pggate.h"
41
42
DEFINE_bool(use_node_hostname_for_local_tserver, false,
43
    "Connect to local t-server by using host name instead of local IP");
44
45
// A macro for logging the function name and the state of the current transaction.
46
// This macro is not enclosed in do { ... } while (true) because we want to be able to write
47
// additional information into the same log message.
48
#define VLOG_TXN_STATE(vlog_level) \
49
3.69M
    VLOG(vlog_level) << __func__ << ": " << TxnStateDebugStr() \
50
150
                     << "; query: { " << ::yb::pggate::GetDebugQueryString(pg_callbacks_) << " }; "
51
52
DECLARE_uint64(max_clock_skew_usec);
53
54
DECLARE_bool(ysql_forward_rpcs_to_local_tserver);
55
56
namespace {
57
58
// Local copies that can be modified.
59
uint64_t txn_priority_highpri_upper_bound = yb::kHighPriTxnUpperBound;
60
uint64_t txn_priority_highpri_lower_bound = yb::kHighPriTxnLowerBound;
61
62
uint64_t txn_priority_regular_upper_bound = yb::kRegularTxnUpperBound;
63
uint64_t txn_priority_regular_lower_bound = yb::kRegularTxnLowerBound;
64
65
// Converts double value in range 0..1 to uint64_t value in range [minValue, maxValue]
66
14.4k
uint64_t ConvertBound(long double value, uint64_t minValue, uint64_t maxValue) {
67
14.4k
  if (value <= 0.0) {
68
7.23k
    return minValue;
69
7.23k
  }
70
71
7.26k
  if (value >= 1.0) {
72
7.23k
    return maxValue;
73
7.23k
  }
74
75
28
  return minValue + value * (maxValue - minValue);
76
28
}
77
78
7.24k
uint64_t ConvertRegularPriorityTxnBound(double value) {
79
7.24k
  return ConvertBound(value, yb::kRegularTxnLowerBound, yb::kRegularTxnUpperBound);
80
7.24k
}
81
82
7.24k
uint64_t ConvertHighPriorityTxnBound(double value) {
83
7.24k
  return ConvertBound(value, yb::kHighPriTxnLowerBound, yb::kHighPriTxnUpperBound);
84
7.24k
}
85
86
} // namespace
87
88
extern "C" {
89
90
3.62k
void YBCAssignTransactionPriorityLowerBound(double newval, void* extra) {
91
3.62k
  txn_priority_regular_lower_bound = ConvertRegularPriorityTxnBound(newval);
92
3.62k
  txn_priority_highpri_lower_bound = ConvertHighPriorityTxnBound(newval);
93
  // YSQL layer checks (guc.c) should ensure this.
94
3.62k
  DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound);
95
3.62k
  DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound);
96
3.62k
  DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_highpri_lower_bound);
97
3.62k
}
98
99
3.62k
void YBCAssignTransactionPriorityUpperBound(double newval, void* extra) {
100
3.62k
  txn_priority_regular_upper_bound = ConvertRegularPriorityTxnBound(newval);
101
3.62k
  txn_priority_highpri_upper_bound = ConvertHighPriorityTxnBound(newval);
102
  // YSQL layer checks (guc.c) should ensure this.
103
3.62k
  DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound);
104
3.62k
  DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound);
105
3.62k
  DCHECK_LE(txn_priority_regular_upper_bound, txn_priority_highpri_lower_bound);
106
3.62k
}
107
108
int* YBCStatementTimeoutPtr = nullptr;
109
110
}
111
112
using namespace std::literals;
113
using namespace std::placeholders;
114
115
namespace yb {
116
namespace pggate {
117
118
using client::YBTransaction;
119
using client::AsyncClientInitialiser;
120
using client::TransactionManager;
121
using client::YBTransactionPtr;
122
using client::YBSession;
123
using client::YBSessionPtr;
124
using client::LocalTabletFilter;
125
126
#if defined(__APPLE__) && !defined(NDEBUG)
127
// We are experiencing more slowness in tests on macOS in debug mode.
128
const int kDefaultPgYbSessionTimeoutMs = 120 * 1000;
129
#else
130
const int kDefaultPgYbSessionTimeoutMs = 60 * 1000;
131
#endif
132
133
DEFINE_int32(pg_yb_session_timeout_ms, kDefaultPgYbSessionTimeoutMs,
134
             "Timeout for operations between PostgreSQL server and YugaByte DocDB services");
135
136
std::shared_ptr<yb::client::YBSession> BuildSession(
137
    yb::client::YBClient* client,
138
1.65k
    const scoped_refptr<ClockBase>& clock) {
139
1.65k
  int statement_timeout = YBCStatementTimeoutPtr ? *YBCStatementTimeoutPtr : 0;
140
1.65k
  int session_timeout = FLAGS_pg_yb_session_timeout_ms;
141
1.65k
  if (statement_timeout > 0 && statement_timeout < session_timeout) {
142
1
    session_timeout = statement_timeout;
143
1
  }
144
1.65k
  auto session = std::make_shared<YBSession>(client, clock);
145
1.65k
  session->SetForceConsistentRead(client::ForceConsistentRead::kTrue);
146
1.65k
  session->SetTimeout(MonoDelta::FromMilliseconds(session_timeout));
147
1.65k
  return session;
148
1.65k
}
149
150
PgTxnManager::PgTxnManager(
151
    PgClient* client,
152
    scoped_refptr<ClockBase> clock,
153
    const tserver::TServerSharedObject* tserver_shared_object,
154
    PgCallbacks pg_callbacks)
155
    : client_(client),
156
      clock_(std::move(clock)),
157
      tserver_shared_object_(tserver_shared_object),
158
1.65k
      pg_callbacks_(pg_callbacks) {
159
1.65k
}
160
161
1.65k
PgTxnManager::~PgTxnManager() {
162
  // Abort the transaction before the transaction manager gets destroyed.
163
1.65k
  WARN_NOT_OK(AbortTransaction(), "Failed to abort transaction in dtor");
164
1.65k
}
165
166
160k
Status PgTxnManager::BeginTransaction() {
167
160k
  VLOG_TXN_STATE(2);
168
160k
  if (YBCIsInitDbModeEnvVarSet()) {
169
0
    return Status::OK();
170
0
  }
171
160k
  if (IsTxnInProgress()) {
172
0
    return STATUS(IllegalState, "Transaction is already in progress");
173
0
  }
174
160k
  return RecreateTransaction(SavePriority::kFalse /* save_priority */);
175
160k
}
176
177
20.2k
Status PgTxnManager::RecreateTransaction() {
178
20.2k
  VLOG_TXN_STATE(2);
179
20.2k
  return RecreateTransaction(SavePriority::kTrue /* save_priority */);
180
20.2k
}
181
182
180k
Status PgTxnManager::RecreateTransaction(const SavePriority save_priority) {
183
180k
  use_saved_priority_ = save_priority;
184
180k
  ResetTxnAndSession();
185
180k
  txn_in_progress_ = true;
186
180k
  return Status::OK();
187
180k
}
188
189
161k
Status PgTxnManager::SetPgIsolationLevel(int level) {
190
161k
  pg_isolation_level_ = static_cast<PgIsolationLevel>(level);
191
161k
  return Status::OK();
192
161k
}
193
194
2.00M
PgIsolationLevel PgTxnManager::GetPgIsolationLevel() {
195
2.00M
  return pg_isolation_level_;
196
2.00M
}
197
198
160k
Status PgTxnManager::SetReadOnly(bool read_only) {
199
160k
  read_only_ = read_only;
200
18.4E
  VLOG(2) << __func__ << " set to " << read_only_ << " from " << GetStackTrace();
201
160k
  return UpdateReadTimeForFollowerReadsIfRequired();
202
160k
}
203
204
160k
Status PgTxnManager::EnableFollowerReads(bool enable_follower_reads, int32_t session_staleness) {
205
57
  VLOG_TXN_STATE(2) << (enable_follower_reads ? "Enabling follower reads "
206
57
                                              : "Disabling follower reads ")
207
57
                    << " with staleness " << session_staleness << " ms";
208
160k
  enable_follower_reads_ = enable_follower_reads;
209
160k
  follower_read_staleness_ms_ = session_staleness;
210
160k
  return UpdateReadTimeForFollowerReadsIfRequired();
211
160k
}
212
213
320k
Status PgTxnManager::UpdateReadTimeForFollowerReadsIfRequired() {
214
320k
  if (enable_follower_reads_ && read_only_ && !read_time_for_follower_reads_) {
215
0
    constexpr uint64_t kMargin = 2;
216
0
    RSTATUS_DCHECK(
217
0
        follower_read_staleness_ms_ * 1000 > kMargin * GetAtomicFlag(&FLAGS_max_clock_skew_usec),
218
0
        InvalidArgument,
219
0
        Format("Setting follower read staleness less than the $0 x max_clock_skew.", kMargin));
220
    // Add a delta to the start point to lower the read point.
221
0
    read_time_for_follower_reads_ = clock_->Now().AddMilliseconds(-follower_read_staleness_ms_);
222
0
    VLOG_TXN_STATE(2) << "Updating read-time with staleness "
223
0
                      << follower_read_staleness_ms_ << " to "
224
0
                      << read_time_for_follower_reads_;
225
320k
  } else {
226
14
    VLOG(2) << " Not updating read-time " << yb::ToString(pg_isolation_level_)
227
14
            << read_time_for_follower_reads_
228
14
            << (enable_follower_reads_ ? " Follower reads allowed." : " Follower reads DISallowed.")
229
14
            << (read_only_ ? " Is read-only" : " Is NOT read-only");
230
320k
  }
231
320k
  return Status::OK();
232
320k
}
233
234
160k
Status PgTxnManager::SetDeferrable(bool deferrable) {
235
160k
  deferrable_ = deferrable;
236
160k
  return Status::OK();
237
160k
}
238
239
75.8k
uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requirement) {
240
75.8k
  if (txn_priority_requirement == kHighestPriority) {
241
385
    return txn_priority_highpri_upper_bound;
242
385
  }
243
244
75.4k
  if (txn_priority_requirement == kHigherPriorityRange) {
245
40
    return RandomUniformInt(txn_priority_highpri_lower_bound,
246
40
                            txn_priority_highpri_upper_bound);
247
40
  }
248
249
75.4k
  return RandomUniformInt(txn_priority_regular_lower_bound,
250
75.4k
                          txn_priority_regular_upper_bound);
251
75.4k
}
252
253
Status PgTxnManager::CalculateIsolation(
254
1.80M
     bool read_only_op, TxnPriorityRequirement txn_priority_requirement) {
255
1.80M
  if (ddl_mode_) {
256
200k
    VLOG_TXN_STATE(2);
257
200k
    return Status::OK();
258
200k
  }
259
260
1.60M
  VLOG_TXN_STATE(2);
261
1.60M
  if (!txn_in_progress_) {
262
0
    return RecreateTransaction(SavePriority::kFalse /* save_priority */);
263
0
  }
264
265
  // Using pg_isolation_level_, read_only_, and deferrable_, determine the effective isolation level
266
  // to use at the DocDB layer, and the "deferrable" flag.
267
  //
268
  // Effective isolation means that sometimes SERIALIZABLE reads are internally executed as snapshot
269
  // isolation reads. This way we don't have to write read intents and we get higher peformance.
270
  // The resulting execution is still serializable: the order of transactions is the order of
271
  // timestamps, i.e. read timestamps (for read-only transactions executed at snapshot isolation)
272
  // and commit timestamps of serializable transactions.
273
  //
274
  // The "deferrable" flag that in SERIALIZABLE DEFERRABLE READ ONLY mode we will choose the read
275
  // timestamp as global_limit to avoid the possibility of read restarts. This results in waiting
276
  // out the maximum clock skew and is appropriate for non-latency-sensitive operations.
277
278
1.60M
  const IsolationLevel docdb_isolation =
279
1.60M
      (pg_isolation_level_ == PgIsolationLevel::SERIALIZABLE) && !read_only_
280
1.35M
          ? IsolationLevel::SERIALIZABLE_ISOLATION
281
253k
          : (pg_isolation_level_ == PgIsolationLevel::READ_COMMITTED
282
63.8k
              ? IsolationLevel::READ_COMMITTED
283
189k
              : IsolationLevel::SNAPSHOT_ISOLATION);
284
1.60M
  const bool defer = read_only_ && deferrable_;
285
286
32
  VLOG_TXN_STATE(2) << "DocDB isolation level: " << IsolationLevel_Name(docdb_isolation);
287
288
1.60M
  if (isolation_level_ != IsolationLevel::NON_TRANSACTIONAL) {
289
    // Sanity check: query layer should ensure that this does not happen.
290
1.36M
    if (isolation_level_ != docdb_isolation) {
291
0
      return STATUS_FORMAT(
292
0
          IllegalState,
293
0
          "Attempt to change effective isolation from $0 to $1 in the middle of a transaction. "
294
0
          "Postgres-level isolation: $2; read_only: $3.",
295
0
          isolation_level_, IsolationLevel_Name(docdb_isolation), pg_isolation_level_,
296
0
          read_only_);
297
0
    }
298
243k
  } else if (read_only_op &&
299
191k
             (docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION ||
300
147k
              docdb_isolation == IsolationLevel::READ_COMMITTED)) {
301
147k
    if (defer) {
302
0
      need_defer_read_point_ = true;
303
0
    }
304
96.2k
  } else {
305
96.2k
    if (!use_saved_priority_) {
306
75.9k
      priority_ = NewPriority(txn_priority_requirement);
307
75.9k
    }
308
96.2k
    isolation_level_ = docdb_isolation;
309
310
18.4E
    VLOG_TXN_STATE(2) << "effective isolation level: "
311
18.4E
                      << IsolationLevel_Name(docdb_isolation)
312
18.4E
                      << "; transaction started successfully.";
313
96.2k
  }
314
315
1.60M
  return Status::OK();
316
1.60M
}
317
318
0
Status PgTxnManager::RestartTransaction() {
319
0
  need_restart_ = true;
320
0
  return Status::OK();
321
0
}
322
323
/* This is called at the start of each statement in READ COMMITTED isolation level */
324
24.3k
Status PgTxnManager::ResetTransactionReadPoint() {
325
24.3k
  read_time_manipulation_ = tserver::ReadTimeManipulation::RESET;
326
24.3k
  return Status::OK();
327
24.3k
}
328
329
/* This is called when a read committed transaction wants to restart its read point */
330
1
Status PgTxnManager::RestartReadPoint() {
331
1
  read_time_manipulation_ = tserver::ReadTimeManipulation::RESTART;
332
1
  return Status::OK();
333
1
}
334
335
155k
Status PgTxnManager::CommitTransaction() {
336
155k
  return FinishTransaction(Commit::kTrue);
337
155k
}
338
339
168k
Status PgTxnManager::FinishTransaction(Commit commit) {
340
  // If a DDL operation during a DDL txn fails the txn will be aborted before we get here.
341
  // However if there are failures afterwards (i.e. during COMMIT or catalog version increment),
342
  // then we might get here with a ddl_txn_. Clean it up in that case.
343
168k
  if (ddl_mode_ && !commit) {
344
0
    RETURN_NOT_OK(ExitSeparateDdlTxnMode(commit));
345
0
  }
346
347
168k
  if (!txn_in_progress_) {
348
18.4E
    VLOG_TXN_STATE(2) << "No transaction in progress, nothing to commit.";
349
8.42k
    return Status::OK();
350
8.42k
  }
351
352
160k
  if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) {
353
0
    VLOG_TXN_STATE(2) << "This was a read-only transaction, nothing to commit.";
354
84.2k
    ResetTxnAndSession();
355
84.2k
    return Status::OK();
356
84.2k
  }
357
358
18.4E
  VLOG_TXN_STATE(2) << (commit ? "Committing" : "Aborting") << " transaction.";
359
76.0k
  Status status = client_->FinishTransaction(commit, DdlMode::kFalse);
360
18.4E
  VLOG_TXN_STATE(2) << "Transaction " << (commit ? "commit" : "abort") << " status: " << status;
361
76.0k
  ResetTxnAndSession();
362
76.0k
  return status;
363
76.0k
}
364
365
13.2k
Status PgTxnManager::AbortTransaction() {
366
13.2k
  return FinishTransaction(Commit::kFalse);
367
13.2k
}
368
369
340k
void PgTxnManager::ResetTxnAndSession() {
370
340k
  txn_in_progress_ = false;
371
340k
  isolation_level_ = IsolationLevel::NON_TRANSACTIONAL;
372
340k
  ++txn_serial_no_;
373
374
340k
  enable_follower_reads_ = false;
375
340k
  read_only_ = false;
376
340k
  read_time_for_follower_reads_ = HybridTime();
377
340k
  read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
378
340k
}
379
380
6.33k
Status PgTxnManager::EnterSeparateDdlTxnMode() {
381
6.33k
  RSTATUS_DCHECK(!ddl_mode_, IllegalState,
382
6.33k
                 "EnterSeparateDdlTxnMode called when already in a DDL transaction");
383
6.33k
  VLOG_TXN_STATE(2);
384
6.33k
  ddl_mode_ = true;
385
6.33k
  VLOG_TXN_STATE(2);
386
6.33k
  return Status::OK();
387
6.33k
}
388
389
6.36k
Status PgTxnManager::ExitSeparateDdlTxnMode(Commit commit) {
390
6.36k
  VLOG_TXN_STATE(2);
391
6.36k
  if (!ddl_mode_) {
392
22
    RSTATUS_DCHECK(!commit, IllegalState, "Commit ddl txn called when not in a DDL transaction");
393
22
    return Status::OK();
394
22
  }
395
6.33k
  RETURN_NOT_OK(client_->FinishTransaction(commit, DdlMode::kTrue));
396
6.33k
  ddl_mode_ = false;
397
6.33k
  return Status::OK();
398
6.33k
}
399
400
0
std::string PgTxnManager::TxnStateDebugStr() const {
401
0
  return YB_CLASS_TO_STRING(
402
0
      ddl_mode,
403
0
      read_only,
404
0
      deferrable,
405
0
      txn_in_progress,
406
0
      pg_isolation_level,
407
0
      isolation_level);
408
0
}
409
410
581k
void PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) {
411
581k
  if (!ddl_mode_ && !txn_in_progress_) {
412
0
    ++txn_serial_no_;
413
0
  }
414
581k
  options->set_isolation(isolation_level_);
415
581k
  options->set_ddl_mode(ddl_mode_);
416
581k
  options->set_txn_serial_no(txn_serial_no_);
417
581k
  if (use_saved_priority_) {
418
20.3k
    options->set_use_existing_priority(true);
419
560k
  } else {
420
560k
    options->set_priority(priority_);
421
560k
  }
422
581k
  if (need_restart_) {
423
0
    options->set_restart_transaction(true);
424
0
    need_restart_ = false;
425
0
  }
426
581k
  if (need_defer_read_point_) {
427
0
    options->set_defer_read_point(true);
428
0
    need_defer_read_point_ = false;
429
0
  }
430
581k
  options->set_read_time_manipulation(read_time_manipulation_);
431
581k
  read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
432
581k
  if (read_time_for_follower_reads_) {
433
0
    ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time());
434
0
  }
435
581k
}
436
437
}  // namespace pggate
438
}  // namespace yb