YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_txn_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pggate/pg_txn_manager.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/session.h"
18
#include "yb/client/transaction.h"
19
20
#include "yb/common/common.pb.h"
21
#include "yb/common/transaction_priority.h"
22
#include "yb/common/ybc_util.h"
23
24
#include "yb/rpc/rpc_controller.h"
25
26
#include "yb/tserver/pg_client.pb.h"
27
#include "yb/tserver/tserver_service.proxy.h"
28
#include "yb/tserver/tserver_shared_mem.h"
29
30
#include "yb/util/debug-util.h"
31
#include "yb/util/format.h"
32
#include "yb/util/logging.h"
33
#include "yb/util/random_util.h"
34
#include "yb/util/shared_mem.h"
35
#include "yb/util/status.h"
36
#include "yb/util/status_format.h"
37
38
#include "yb/yql/pggate/pg_client.h"
39
#include "yb/yql/pggate/pggate_flags.h"
40
#include "yb/yql/pggate/ybc_pggate.h"
41
42
DEFINE_bool(use_node_hostname_for_local_tserver, false,
43
    "Connect to local t-server by using host name instead of local IP");
44
45
// A macro for logging the function name and the state of the current transaction.
46
// This macro is not enclosed in do { ... } while (true) because we want to be able to write
47
// additional information into the same log message.
48
#define VLOG_TXN_STATE(vlog_level) \
49
5.08M
    VLOG(vlog_level) << __func__ << ": " << TxnStateDebugStr() \
50
3.40M
                     << "; query: { " << ::yb::pggate::GetDebugQueryString(pg_callbacks_) << " }; "
51
52
DECLARE_uint64(max_clock_skew_usec);
53
54
DECLARE_bool(ysql_forward_rpcs_to_local_tserver);
55
56
namespace {
57
58
// Local copies that can be modified.
59
uint64_t txn_priority_highpri_upper_bound = yb::kHighPriTxnUpperBound;
60
uint64_t txn_priority_highpri_lower_bound = yb::kHighPriTxnLowerBound;
61
62
uint64_t txn_priority_regular_upper_bound = yb::kRegularTxnUpperBound;
63
uint64_t txn_priority_regular_lower_bound = yb::kRegularTxnLowerBound;
64
65
// Converts double value in range 0..1 to uint64_t value in range [minValue, maxValue]
66
32.2k
uint64_t ConvertBound(long double value, uint64_t minValue, uint64_t maxValue) {
67
32.2k
  if (value <= 0.0) {
68
16.0k
    return minValue;
69
16.0k
  }
70
71
16.1k
  if (value >= 1.0) {
72
16.0k
    return maxValue;
73
16.0k
  }
74
75
52
  return minValue + value * (maxValue - minValue);
76
16.1k
}
77
78
16.1k
uint64_t ConvertRegularPriorityTxnBound(double value) {
79
16.1k
  return ConvertBound(value, yb::kRegularTxnLowerBound, yb::kRegularTxnUpperBound);
80
16.1k
}
81
82
16.1k
uint64_t ConvertHighPriorityTxnBound(double value) {
83
16.1k
  return ConvertBound(value, yb::kHighPriTxnLowerBound, yb::kHighPriTxnUpperBound);
84
16.1k
}
85
86
} // namespace
87
88
extern "C" {
89
90
8.06k
void YBCAssignTransactionPriorityLowerBound(double newval, void* extra) {
91
8.06k
  txn_priority_regular_lower_bound = ConvertRegularPriorityTxnBound(newval);
92
8.06k
  txn_priority_highpri_lower_bound = ConvertHighPriorityTxnBound(newval);
93
  // YSQL layer checks (guc.c) should ensure this.
94
8.06k
  DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound);
95
8.06k
  DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound);
96
8.06k
  DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_highpri_lower_bound);
97
8.06k
}
98
99
8.05k
void YBCAssignTransactionPriorityUpperBound(double newval, void* extra) {
100
8.05k
  txn_priority_regular_upper_bound = ConvertRegularPriorityTxnBound(newval);
101
8.05k
  txn_priority_highpri_upper_bound = ConvertHighPriorityTxnBound(newval);
102
  // YSQL layer checks (guc.c) should ensure this.
103
8.05k
  DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound);
104
8.05k
  DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound);
105
8.05k
  DCHECK_LE(txn_priority_regular_upper_bound, txn_priority_highpri_lower_bound);
106
8.05k
}
107
108
int* YBCStatementTimeoutPtr = nullptr;
109
110
}
111
112
using namespace std::literals;
113
using namespace std::placeholders;
114
115
namespace yb {
116
namespace pggate {
117
118
using client::YBTransaction;
119
using client::AsyncClientInitialiser;
120
using client::TransactionManager;
121
using client::YBTransactionPtr;
122
using client::YBSession;
123
using client::YBSessionPtr;
124
using client::LocalTabletFilter;
125
126
#if defined(__APPLE__) && !defined(NDEBUG)
127
// We are experiencing more slowness in tests on macOS in debug mode.
128
const int kDefaultPgYbSessionTimeoutMs = 120 * 1000;
129
#else
130
const int kDefaultPgYbSessionTimeoutMs = 60 * 1000;
131
#endif
132
133
DEFINE_int32(pg_yb_session_timeout_ms, kDefaultPgYbSessionTimeoutMs,
134
             "Timeout for operations between PostgreSQL server and YugaByte DocDB services");
135
136
PgTxnManager::PgTxnManager(
137
    PgClient* client,
138
    scoped_refptr<ClockBase> clock,
139
    const tserver::TServerSharedObject* tserver_shared_object,
140
    PgCallbacks pg_callbacks)
141
    : client_(client),
142
      clock_(std::move(clock)),
143
      tserver_shared_object_(tserver_shared_object),
144
6.08k
      pg_callbacks_(pg_callbacks) {
145
6.08k
}
146
147
6.06k
PgTxnManager::~PgTxnManager() {
148
  // Abort the transaction before the transaction manager gets destroyed.
149
6.06k
  WARN_NOT_OK(AbortTransaction(), "Failed to abort transaction in dtor");
150
6.06k
}
151
152
415k
Status PgTxnManager::BeginTransaction() {
153
415k
  VLOG_TXN_STATE(2);
154
415k
  if (YBCIsInitDbModeEnvVarSet()) {
155
830
    return Status::OK();
156
830
  }
157
414k
  if (IsTxnInProgress()) {
158
0
    return STATUS(IllegalState, "Transaction is already in progress");
159
0
  }
160
414k
  return RecreateTransaction(SavePriority::kFalse /* save_priority */);
161
414k
}
162
163
69.0k
Status PgTxnManager::RecreateTransaction() {
164
69.0k
  VLOG_TXN_STATE(2);
165
69.0k
  return RecreateTransaction(SavePriority::kTrue /* save_priority */);
166
69.0k
}
167
168
483k
Status PgTxnManager::RecreateTransaction(const SavePriority save_priority) {
169
483k
  use_saved_priority_ = save_priority;
170
483k
  ResetTxnAndSession();
171
483k
  txn_in_progress_ = true;
172
483k
  return Status::OK();
173
483k
}
174
175
436k
Status PgTxnManager::SetPgIsolationLevel(int level) {
176
436k
  pg_isolation_level_ = static_cast<PgIsolationLevel>(level);
177
436k
  return Status::OK();
178
436k
}
179
180
3.96M
PgIsolationLevel PgTxnManager::GetPgIsolationLevel() {
181
3.96M
  return pg_isolation_level_;
182
3.96M
}
183
184
414k
Status PgTxnManager::SetReadOnly(bool read_only) {
185
414k
  read_only_ = read_only;
186
414k
  VLOG
(2) << __func__ << " set to " << read_only_ << " from " << GetStackTrace()82
;
187
414k
  return UpdateReadTimeForFollowerReadsIfRequired();
188
414k
}
189
190
414k
Status PgTxnManager::EnableFollowerReads(bool enable_follower_reads, int32_t session_staleness) {
191
414k
  
VLOG_TXN_STATE413
(2) << (413
enable_follower_reads413
?
"Enabling follower reads "0
192
413
                                              : "Disabling follower reads ")
193
413
                    << " with staleness " << session_staleness << " ms";
194
414k
  enable_follower_reads_ = enable_follower_reads;
195
414k
  follower_read_staleness_ms_ = session_staleness;
196
414k
  return UpdateReadTimeForFollowerReadsIfRequired();
197
414k
}
198
199
829k
Status PgTxnManager::UpdateReadTimeForFollowerReadsIfRequired() {
200
829k
  if (enable_follower_reads_ && 
read_only_15.4k
&&
!read_time_for_follower_reads_16
) {
201
16
    constexpr uint64_t kMargin = 2;
202
16
    RSTATUS_DCHECK(
203
16
        follower_read_staleness_ms_ * 1000 > kMargin * GetAtomicFlag(&FLAGS_max_clock_skew_usec),
204
16
        InvalidArgument,
205
16
        Format("Setting follower read staleness less than the $0 x max_clock_skew.", kMargin));
206
    // Add a delta to the start point to lower the read point.
207
16
    read_time_for_follower_reads_ = clock_->Now().AddMilliseconds(-follower_read_staleness_ms_);
208
16
    
VLOG_TXN_STATE0
(2) << "Updating read-time with staleness "
209
0
                      << follower_read_staleness_ms_ << " to "
210
0
                      << read_time_for_follower_reads_;
211
829k
  } else {
212
829k
    VLOG(2) << " Not updating read-time " << yb::ToString(pg_isolation_level_)
213
255
            << read_time_for_follower_reads_
214
255
            << (enable_follower_reads_ ? 
" Follower reads allowed."0
: " Follower reads DISallowed.")
215
255
            << (read_only_ ? 
" Is read-only"0
: " Is NOT read-only");
216
829k
  }
217
829k
  return Status::OK();
218
829k
}
219
220
414k
Status PgTxnManager::SetDeferrable(bool deferrable) {
221
414k
  deferrable_ = deferrable;
222
414k
  return Status::OK();
223
414k
}
224
225
197k
uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requirement) {
226
197k
  if (txn_priority_requirement == kHighestPriority) {
227
6.40k
    return txn_priority_highpri_upper_bound;
228
6.40k
  }
229
230
190k
  if (txn_priority_requirement == kHigherPriorityRange) {
231
1.97k
    return RandomUniformInt(txn_priority_highpri_lower_bound,
232
1.97k
                            txn_priority_highpri_upper_bound);
233
1.97k
  }
234
235
188k
  return RandomUniformInt(txn_priority_regular_lower_bound,
236
188k
                          txn_priority_regular_upper_bound);
237
190k
}
238
239
Status PgTxnManager::CalculateIsolation(
240
3.96M
     bool read_only_op, TxnPriorityRequirement txn_priority_requirement) {
241
3.96M
  if (ddl_mode_) {
242
566k
    VLOG_TXN_STATE(2);
243
566k
    return Status::OK();
244
566k
  }
245
246
3.96M
  VLOG_TXN_STATE(2);
247
3.39M
  if (!txn_in_progress_) {
248
0
    return RecreateTransaction(SavePriority::kFalse /* save_priority */);
249
0
  }
250
251
  // Using pg_isolation_level_, read_only_, and deferrable_, determine the effective isolation level
252
  // to use at the DocDB layer, and the "deferrable" flag.
253
  //
254
  // Effective isolation means that sometimes SERIALIZABLE reads are internally executed as snapshot
255
  // isolation reads. This way we don't have to write read intents and we get higher peformance.
256
  // The resulting execution is still serializable: the order of transactions is the order of
257
  // timestamps, i.e. read timestamps (for read-only transactions executed at snapshot isolation)
258
  // and commit timestamps of serializable transactions.
259
  //
260
  // The "deferrable" flag that in SERIALIZABLE DEFERRABLE READ ONLY mode we will choose the read
261
  // timestamp as global_limit to avoid the possibility of read restarts. This results in waiting
262
  // out the maximum clock skew and is appropriate for non-latency-sensitive operations.
263
264
3.39M
  const IsolationLevel docdb_isolation =
265
3.39M
      (pg_isolation_level_ == PgIsolationLevel::SERIALIZABLE) && 
!read_only_2.82M
266
3.39M
          ? 
IsolationLevel::SERIALIZABLE_ISOLATION2.82M
267
3.39M
          : 
(570k
pg_isolation_level_ == PgIsolationLevel::READ_COMMITTED570k
268
570k
              ? 
IsolationLevel::READ_COMMITTED66.4k
269
570k
              : 
IsolationLevel::SNAPSHOT_ISOLATION504k
);
270
3.39M
  const bool defer = read_only_ && 
deferrable_80
;
271
272
3.39M
  
VLOG_TXN_STATE1.96k
(2) << "DocDB isolation level: " << IsolationLevel_Name(docdb_isolation)1.96k
;
273
274
3.39M
  if (isolation_level_ != IsolationLevel::NON_TRANSACTIONAL) {
275
    // Sanity check: query layer should ensure that this does not happen.
276
2.82M
    if (isolation_level_ != docdb_isolation) {
277
0
      return STATUS_FORMAT(
278
0
          IllegalState,
279
0
          "Attempt to change effective isolation from $0 to $1 in the middle of a transaction. "
280
0
          "Postgres-level isolation: $2; read_only: $3.",
281
0
          isolation_level_, IsolationLevel_Name(docdb_isolation), pg_isolation_level_,
282
0
          read_only_);
283
0
    }
284
2.82M
  } else 
if (573k
read_only_op573k
&&
285
573k
             
(410k
docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION410k
||
286
410k
              
docdb_isolation == IsolationLevel::READ_COMMITTED115k
)) {
287
306k
    if (defer) {
288
80
      need_defer_read_point_ = true;
289
80
    }
290
306k
  } else {
291
266k
    if (!use_saved_priority_) {
292
196k
      priority_ = NewPriority(txn_priority_requirement);
293
196k
    }
294
266k
    isolation_level_ = docdb_isolation;
295
296
266k
    
VLOG_TXN_STATE256
(2) << "effective isolation level: "
297
256
                      << IsolationLevel_Name(docdb_isolation)
298
256
                      << "; transaction started successfully.";
299
266k
  }
300
301
3.39M
  return Status::OK();
302
3.39M
}
303
304
510
Status PgTxnManager::RestartTransaction() {
305
510
  need_restart_ = true;
306
510
  return Status::OK();
307
510
}
308
309
/* This is called at the start of each statement in READ COMMITTED isolation level */
310
28.6k
Status PgTxnManager::ResetTransactionReadPoint() {
311
28.6k
  read_time_manipulation_ = tserver::ReadTimeManipulation::RESET;
312
28.6k
  return Status::OK();
313
28.6k
}
314
315
/* This is called when a read committed transaction wants to restart its read point */
316
163
Status PgTxnManager::RestartReadPoint() {
317
163
  read_time_manipulation_ = tserver::ReadTimeManipulation::RESTART;
318
163
  return Status::OK();
319
163
}
320
321
381k
Status PgTxnManager::CommitTransaction() {
322
381k
  return FinishTransaction(Commit::kTrue);
323
381k
}
324
325
437k
Status PgTxnManager::FinishTransaction(Commit commit) {
326
  // If a DDL operation during a DDL txn fails the txn will be aborted before we get here.
327
  // However if there are failures afterwards (i.e. during COMMIT or catalog version increment),
328
  // then we might get here with a ddl_txn_. Clean it up in that case.
329
437k
  if (ddl_mode_ && 
!commit177
) {
330
6
    RETURN_NOT_OK(ExitSeparateDdlTxnMode(commit));
331
6
  }
332
333
437k
  if (!txn_in_progress_) {
334
18.4E
    VLOG_TXN_STATE(2) << "No transaction in progress, nothing to commit.";
335
22.9k
    return Status::OK();
336
22.9k
  }
337
338
414k
  if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) {
339
216k
    
VLOG_TXN_STATE47
(2) << "This was a read-only transaction, nothing to commit."47
;
340
216k
    ResetTxnAndSession();
341
216k
    return Status::OK();
342
216k
  }
343
344
18.4E
  VLOG_TXN_STATE(2) << (commit ? 
"Committing"0
: "Aborting") << " transaction.";
345
197k
  Status status = client_->FinishTransaction(commit, DdlMode::kFalse);
346
18.4E
  VLOG_TXN_STATE(2) << "Transaction " << (commit ? 
"commit"0
: "abort") << " status: " << status;
347
197k
  ResetTxnAndSession();
348
197k
  return status;
349
414k
}
350
351
56.6k
Status PgTxnManager::AbortTransaction() {
352
56.6k
  return FinishTransaction(Commit::kFalse);
353
56.6k
}
354
355
898k
void PgTxnManager::ResetTxnAndSession() {
356
898k
  txn_in_progress_ = false;
357
898k
  isolation_level_ = IsolationLevel::NON_TRANSACTIONAL;
358
898k
  ++txn_serial_no_;
359
360
898k
  enable_follower_reads_ = false;
361
898k
  read_only_ = false;
362
898k
  read_time_for_follower_reads_ = HybridTime();
363
898k
  read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
364
898k
}
365
366
20.3k
Status PgTxnManager::EnterSeparateDdlTxnMode() {
367
20.3k
  RSTATUS_DCHECK(!ddl_mode_, IllegalState,
368
20.3k
                 "EnterSeparateDdlTxnMode called when already in a DDL transaction");
369
20.3k
  VLOG_TXN_STATE(2);
370
20.3k
  ddl_mode_ = true;
371
20.3k
  VLOG_TXN_STATE(2);
372
20.3k
  return Status::OK();
373
20.3k
}
374
375
20.4k
Status PgTxnManager::ExitSeparateDdlTxnMode(Commit commit) {
376
20.4k
  VLOG_TXN_STATE(2);
377
20.4k
  if (!ddl_mode_) {
378
37
    RSTATUS_DCHECK(!commit, IllegalState, "Commit ddl txn called when not in a DDL transaction");
379
37
    return Status::OK();
380
37
  }
381
20.3k
  RETURN_NOT_OK(client_->FinishTransaction(commit, DdlMode::kTrue));
382
20.3k
  ddl_mode_ = false;
383
20.3k
  return Status::OK();
384
20.3k
}
385
386
0
std::string PgTxnManager::TxnStateDebugStr() const {
387
0
  return YB_CLASS_TO_STRING(
388
0
      ddl_mode,
389
0
      read_only,
390
0
      deferrable,
391
0
      txn_in_progress,
392
0
      pg_isolation_level,
393
0
      isolation_level);
394
0
}
395
396
1.50M
void PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) {
397
1.50M
  if (!ddl_mode_ && 
!txn_in_progress_941k
) {
398
69.1k
    ++txn_serial_no_;
399
69.1k
  }
400
1.50M
  options->set_isolation(isolation_level_);
401
1.50M
  options->set_ddl_mode(ddl_mode_);
402
1.50M
  options->set_txn_serial_no(txn_serial_no_);
403
1.50M
  if (use_saved_priority_) {
404
75.2k
    options->set_use_existing_priority(true);
405
1.43M
  } else {
406
1.43M
    options->set_priority(priority_);
407
1.43M
  }
408
1.50M
  if (need_restart_) {
409
509
    options->set_restart_transaction(true);
410
509
    need_restart_ = false;
411
509
  }
412
1.50M
  if (need_defer_read_point_) {
413
80
    options->set_defer_read_point(true);
414
80
    need_defer_read_point_ = false;
415
80
  }
416
1.50M
  options->set_read_time_manipulation(read_time_manipulation_);
417
1.50M
  read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
418
1.50M
  if (read_time_for_follower_reads_) {
419
73
    ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time());
420
73
  }
421
1.50M
}
422
423
}  // namespace pggate
424
}  // namespace yb