/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_txn_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/pggate/pg_txn_manager.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/session.h" |
18 | | #include "yb/client/transaction.h" |
19 | | |
20 | | #include "yb/common/common.pb.h" |
21 | | #include "yb/common/transaction_priority.h" |
22 | | #include "yb/common/ybc_util.h" |
23 | | |
24 | | #include "yb/rpc/rpc_controller.h" |
25 | | |
26 | | #include "yb/tserver/pg_client.pb.h" |
27 | | #include "yb/tserver/tserver_service.proxy.h" |
28 | | #include "yb/tserver/tserver_shared_mem.h" |
29 | | |
30 | | #include "yb/util/debug-util.h" |
31 | | #include "yb/util/format.h" |
32 | | #include "yb/util/logging.h" |
33 | | #include "yb/util/random_util.h" |
34 | | #include "yb/util/shared_mem.h" |
35 | | #include "yb/util/status.h" |
36 | | #include "yb/util/status_format.h" |
37 | | |
38 | | #include "yb/yql/pggate/pg_client.h" |
39 | | #include "yb/yql/pggate/pggate_flags.h" |
40 | | #include "yb/yql/pggate/ybc_pggate.h" |
41 | | |
42 | | DEFINE_bool(use_node_hostname_for_local_tserver, false, |
43 | | "Connect to local t-server by using host name instead of local IP"); |
44 | | |
45 | | // A macro for logging the function name and the state of the current transaction. |
46 | | // This macro is not enclosed in do { ... } while (true) because we want to be able to write |
47 | | // additional information into the same log message. |
48 | | #define VLOG_TXN_STATE(vlog_level) \ |
49 | 5.08M | VLOG(vlog_level) << __func__ << ": " << TxnStateDebugStr() \ |
50 | 3.40M | << "; query: { " << ::yb::pggate::GetDebugQueryString(pg_callbacks_) << " }; " |
51 | | |
52 | | DECLARE_uint64(max_clock_skew_usec); |
53 | | |
54 | | DECLARE_bool(ysql_forward_rpcs_to_local_tserver); |
55 | | |
56 | | namespace { |
57 | | |
58 | | // Local copies that can be modified. |
59 | | uint64_t txn_priority_highpri_upper_bound = yb::kHighPriTxnUpperBound; |
60 | | uint64_t txn_priority_highpri_lower_bound = yb::kHighPriTxnLowerBound; |
61 | | |
62 | | uint64_t txn_priority_regular_upper_bound = yb::kRegularTxnUpperBound; |
63 | | uint64_t txn_priority_regular_lower_bound = yb::kRegularTxnLowerBound; |
64 | | |
65 | | // Converts double value in range 0..1 to uint64_t value in range [minValue, maxValue] |
66 | 32.2k | uint64_t ConvertBound(long double value, uint64_t minValue, uint64_t maxValue) { |
67 | 32.2k | if (value <= 0.0) { |
68 | 16.0k | return minValue; |
69 | 16.0k | } |
70 | | |
71 | 16.1k | if (value >= 1.0) { |
72 | 16.0k | return maxValue; |
73 | 16.0k | } |
74 | | |
75 | 52 | return minValue + value * (maxValue - minValue); |
76 | 16.1k | } |
77 | | |
78 | 16.1k | uint64_t ConvertRegularPriorityTxnBound(double value) { |
79 | 16.1k | return ConvertBound(value, yb::kRegularTxnLowerBound, yb::kRegularTxnUpperBound); |
80 | 16.1k | } |
81 | | |
82 | 16.1k | uint64_t ConvertHighPriorityTxnBound(double value) { |
83 | 16.1k | return ConvertBound(value, yb::kHighPriTxnLowerBound, yb::kHighPriTxnUpperBound); |
84 | 16.1k | } |
85 | | |
86 | | } // namespace |
87 | | |
88 | | extern "C" { |
89 | | |
90 | 8.06k | void YBCAssignTransactionPriorityLowerBound(double newval, void* extra) { |
91 | 8.06k | txn_priority_regular_lower_bound = ConvertRegularPriorityTxnBound(newval); |
92 | 8.06k | txn_priority_highpri_lower_bound = ConvertHighPriorityTxnBound(newval); |
93 | | // YSQL layer checks (guc.c) should ensure this. |
94 | 8.06k | DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound); |
95 | 8.06k | DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound); |
96 | 8.06k | DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_highpri_lower_bound); |
97 | 8.06k | } |
98 | | |
99 | 8.05k | void YBCAssignTransactionPriorityUpperBound(double newval, void* extra) { |
100 | 8.05k | txn_priority_regular_upper_bound = ConvertRegularPriorityTxnBound(newval); |
101 | 8.05k | txn_priority_highpri_upper_bound = ConvertHighPriorityTxnBound(newval); |
102 | | // YSQL layer checks (guc.c) should ensure this. |
103 | 8.05k | DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound); |
104 | 8.05k | DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound); |
105 | 8.05k | DCHECK_LE(txn_priority_regular_upper_bound, txn_priority_highpri_lower_bound); |
106 | 8.05k | } |
107 | | |
108 | | int* YBCStatementTimeoutPtr = nullptr; |
109 | | |
110 | | } |
111 | | |
112 | | using namespace std::literals; |
113 | | using namespace std::placeholders; |
114 | | |
115 | | namespace yb { |
116 | | namespace pggate { |
117 | | |
118 | | using client::YBTransaction; |
119 | | using client::AsyncClientInitialiser; |
120 | | using client::TransactionManager; |
121 | | using client::YBTransactionPtr; |
122 | | using client::YBSession; |
123 | | using client::YBSessionPtr; |
124 | | using client::LocalTabletFilter; |
125 | | |
126 | | #if defined(__APPLE__) && !defined(NDEBUG) |
127 | | // We are experiencing more slowness in tests on macOS in debug mode. |
128 | | const int kDefaultPgYbSessionTimeoutMs = 120 * 1000; |
129 | | #else |
130 | | const int kDefaultPgYbSessionTimeoutMs = 60 * 1000; |
131 | | #endif |
132 | | |
133 | | DEFINE_int32(pg_yb_session_timeout_ms, kDefaultPgYbSessionTimeoutMs, |
134 | | "Timeout for operations between PostgreSQL server and YugaByte DocDB services"); |
135 | | |
136 | | PgTxnManager::PgTxnManager( |
137 | | PgClient* client, |
138 | | scoped_refptr<ClockBase> clock, |
139 | | const tserver::TServerSharedObject* tserver_shared_object, |
140 | | PgCallbacks pg_callbacks) |
141 | | : client_(client), |
142 | | clock_(std::move(clock)), |
143 | | tserver_shared_object_(tserver_shared_object), |
144 | 6.08k | pg_callbacks_(pg_callbacks) { |
145 | 6.08k | } |
146 | | |
147 | 6.06k | PgTxnManager::~PgTxnManager() { |
148 | | // Abort the transaction before the transaction manager gets destroyed. |
149 | 6.06k | WARN_NOT_OK(AbortTransaction(), "Failed to abort transaction in dtor"); |
150 | 6.06k | } |
151 | | |
152 | 415k | Status PgTxnManager::BeginTransaction() { |
153 | 415k | VLOG_TXN_STATE(2); |
154 | 415k | if (YBCIsInitDbModeEnvVarSet()) { |
155 | 830 | return Status::OK(); |
156 | 830 | } |
157 | 414k | if (IsTxnInProgress()) { |
158 | 0 | return STATUS(IllegalState, "Transaction is already in progress"); |
159 | 0 | } |
160 | 414k | return RecreateTransaction(SavePriority::kFalse /* save_priority */); |
161 | 414k | } |
162 | | |
163 | 69.0k | Status PgTxnManager::RecreateTransaction() { |
164 | 69.0k | VLOG_TXN_STATE(2); |
165 | 69.0k | return RecreateTransaction(SavePriority::kTrue /* save_priority */); |
166 | 69.0k | } |
167 | | |
168 | 483k | Status PgTxnManager::RecreateTransaction(const SavePriority save_priority) { |
169 | 483k | use_saved_priority_ = save_priority; |
170 | 483k | ResetTxnAndSession(); |
171 | 483k | txn_in_progress_ = true; |
172 | 483k | return Status::OK(); |
173 | 483k | } |
174 | | |
175 | 436k | Status PgTxnManager::SetPgIsolationLevel(int level) { |
176 | 436k | pg_isolation_level_ = static_cast<PgIsolationLevel>(level); |
177 | 436k | return Status::OK(); |
178 | 436k | } |
179 | | |
180 | 3.96M | PgIsolationLevel PgTxnManager::GetPgIsolationLevel() { |
181 | 3.96M | return pg_isolation_level_; |
182 | 3.96M | } |
183 | | |
184 | 414k | Status PgTxnManager::SetReadOnly(bool read_only) { |
185 | 414k | read_only_ = read_only; |
186 | 414k | VLOG(2) << __func__ << " set to " << read_only_ << " from " << GetStackTrace()82 ; |
187 | 414k | return UpdateReadTimeForFollowerReadsIfRequired(); |
188 | 414k | } |
189 | | |
190 | 414k | Status PgTxnManager::EnableFollowerReads(bool enable_follower_reads, int32_t session_staleness) { |
191 | 414k | VLOG_TXN_STATE413 (2) << (413 enable_follower_reads413 ? "Enabling follower reads "0 |
192 | 413 | : "Disabling follower reads ") |
193 | 413 | << " with staleness " << session_staleness << " ms"; |
194 | 414k | enable_follower_reads_ = enable_follower_reads; |
195 | 414k | follower_read_staleness_ms_ = session_staleness; |
196 | 414k | return UpdateReadTimeForFollowerReadsIfRequired(); |
197 | 414k | } |
198 | | |
199 | 829k | Status PgTxnManager::UpdateReadTimeForFollowerReadsIfRequired() { |
200 | 829k | if (enable_follower_reads_ && read_only_15.4k && !read_time_for_follower_reads_16 ) { |
201 | 16 | constexpr uint64_t kMargin = 2; |
202 | 16 | RSTATUS_DCHECK( |
203 | 16 | follower_read_staleness_ms_ * 1000 > kMargin * GetAtomicFlag(&FLAGS_max_clock_skew_usec), |
204 | 16 | InvalidArgument, |
205 | 16 | Format("Setting follower read staleness less than the $0 x max_clock_skew.", kMargin)); |
206 | | // Add a delta to the start point to lower the read point. |
207 | 16 | read_time_for_follower_reads_ = clock_->Now().AddMilliseconds(-follower_read_staleness_ms_); |
208 | 16 | VLOG_TXN_STATE0 (2) << "Updating read-time with staleness " |
209 | 0 | << follower_read_staleness_ms_ << " to " |
210 | 0 | << read_time_for_follower_reads_; |
211 | 829k | } else { |
212 | 829k | VLOG(2) << " Not updating read-time " << yb::ToString(pg_isolation_level_) |
213 | 255 | << read_time_for_follower_reads_ |
214 | 255 | << (enable_follower_reads_ ? " Follower reads allowed."0 : " Follower reads DISallowed.") |
215 | 255 | << (read_only_ ? " Is read-only"0 : " Is NOT read-only"); |
216 | 829k | } |
217 | 829k | return Status::OK(); |
218 | 829k | } |
219 | | |
220 | 414k | Status PgTxnManager::SetDeferrable(bool deferrable) { |
221 | 414k | deferrable_ = deferrable; |
222 | 414k | return Status::OK(); |
223 | 414k | } |
224 | | |
225 | 197k | uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requirement) { |
226 | 197k | if (txn_priority_requirement == kHighestPriority) { |
227 | 6.40k | return txn_priority_highpri_upper_bound; |
228 | 6.40k | } |
229 | | |
230 | 190k | if (txn_priority_requirement == kHigherPriorityRange) { |
231 | 1.97k | return RandomUniformInt(txn_priority_highpri_lower_bound, |
232 | 1.97k | txn_priority_highpri_upper_bound); |
233 | 1.97k | } |
234 | | |
235 | 188k | return RandomUniformInt(txn_priority_regular_lower_bound, |
236 | 188k | txn_priority_regular_upper_bound); |
237 | 190k | } |
238 | | |
239 | | Status PgTxnManager::CalculateIsolation( |
240 | 3.96M | bool read_only_op, TxnPriorityRequirement txn_priority_requirement) { |
241 | 3.96M | if (ddl_mode_) { |
242 | 566k | VLOG_TXN_STATE(2); |
243 | 566k | return Status::OK(); |
244 | 566k | } |
245 | | |
246 | 3.96M | VLOG_TXN_STATE(2); |
247 | 3.39M | if (!txn_in_progress_) { |
248 | 0 | return RecreateTransaction(SavePriority::kFalse /* save_priority */); |
249 | 0 | } |
250 | | |
251 | | // Using pg_isolation_level_, read_only_, and deferrable_, determine the effective isolation level |
252 | | // to use at the DocDB layer, and the "deferrable" flag. |
253 | | // |
254 | | // Effective isolation means that sometimes SERIALIZABLE reads are internally executed as snapshot |
255 | | // isolation reads. This way we don't have to write read intents and we get higher peformance. |
256 | | // The resulting execution is still serializable: the order of transactions is the order of |
257 | | // timestamps, i.e. read timestamps (for read-only transactions executed at snapshot isolation) |
258 | | // and commit timestamps of serializable transactions. |
259 | | // |
260 | | // The "deferrable" flag that in SERIALIZABLE DEFERRABLE READ ONLY mode we will choose the read |
261 | | // timestamp as global_limit to avoid the possibility of read restarts. This results in waiting |
262 | | // out the maximum clock skew and is appropriate for non-latency-sensitive operations. |
263 | | |
264 | 3.39M | const IsolationLevel docdb_isolation = |
265 | 3.39M | (pg_isolation_level_ == PgIsolationLevel::SERIALIZABLE) && !read_only_2.82M |
266 | 3.39M | ? IsolationLevel::SERIALIZABLE_ISOLATION2.82M |
267 | 3.39M | : (570k pg_isolation_level_ == PgIsolationLevel::READ_COMMITTED570k |
268 | 570k | ? IsolationLevel::READ_COMMITTED66.4k |
269 | 570k | : IsolationLevel::SNAPSHOT_ISOLATION504k ); |
270 | 3.39M | const bool defer = read_only_ && deferrable_80 ; |
271 | | |
272 | 3.39M | VLOG_TXN_STATE1.96k (2) << "DocDB isolation level: " << IsolationLevel_Name(docdb_isolation)1.96k ; |
273 | | |
274 | 3.39M | if (isolation_level_ != IsolationLevel::NON_TRANSACTIONAL) { |
275 | | // Sanity check: query layer should ensure that this does not happen. |
276 | 2.82M | if (isolation_level_ != docdb_isolation) { |
277 | 0 | return STATUS_FORMAT( |
278 | 0 | IllegalState, |
279 | 0 | "Attempt to change effective isolation from $0 to $1 in the middle of a transaction. " |
280 | 0 | "Postgres-level isolation: $2; read_only: $3.", |
281 | 0 | isolation_level_, IsolationLevel_Name(docdb_isolation), pg_isolation_level_, |
282 | 0 | read_only_); |
283 | 0 | } |
284 | 2.82M | } else if (573k read_only_op573k && |
285 | 573k | (410k docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION410k || |
286 | 410k | docdb_isolation == IsolationLevel::READ_COMMITTED115k )) { |
287 | 306k | if (defer) { |
288 | 80 | need_defer_read_point_ = true; |
289 | 80 | } |
290 | 306k | } else { |
291 | 266k | if (!use_saved_priority_) { |
292 | 196k | priority_ = NewPriority(txn_priority_requirement); |
293 | 196k | } |
294 | 266k | isolation_level_ = docdb_isolation; |
295 | | |
296 | 266k | VLOG_TXN_STATE256 (2) << "effective isolation level: " |
297 | 256 | << IsolationLevel_Name(docdb_isolation) |
298 | 256 | << "; transaction started successfully."; |
299 | 266k | } |
300 | | |
301 | 3.39M | return Status::OK(); |
302 | 3.39M | } |
303 | | |
304 | 510 | Status PgTxnManager::RestartTransaction() { |
305 | 510 | need_restart_ = true; |
306 | 510 | return Status::OK(); |
307 | 510 | } |
308 | | |
309 | | /* This is called at the start of each statement in READ COMMITTED isolation level */ |
310 | 28.6k | Status PgTxnManager::ResetTransactionReadPoint() { |
311 | 28.6k | read_time_manipulation_ = tserver::ReadTimeManipulation::RESET; |
312 | 28.6k | return Status::OK(); |
313 | 28.6k | } |
314 | | |
315 | | /* This is called when a read committed transaction wants to restart its read point */ |
316 | 163 | Status PgTxnManager::RestartReadPoint() { |
317 | 163 | read_time_manipulation_ = tserver::ReadTimeManipulation::RESTART; |
318 | 163 | return Status::OK(); |
319 | 163 | } |
320 | | |
321 | 381k | Status PgTxnManager::CommitTransaction() { |
322 | 381k | return FinishTransaction(Commit::kTrue); |
323 | 381k | } |
324 | | |
325 | 437k | Status PgTxnManager::FinishTransaction(Commit commit) { |
326 | | // If a DDL operation during a DDL txn fails the txn will be aborted before we get here. |
327 | | // However if there are failures afterwards (i.e. during COMMIT or catalog version increment), |
328 | | // then we might get here with a ddl_txn_. Clean it up in that case. |
329 | 437k | if (ddl_mode_ && !commit177 ) { |
330 | 6 | RETURN_NOT_OK(ExitSeparateDdlTxnMode(commit)); |
331 | 6 | } |
332 | | |
333 | 437k | if (!txn_in_progress_) { |
334 | 18.4E | VLOG_TXN_STATE(2) << "No transaction in progress, nothing to commit."; |
335 | 22.9k | return Status::OK(); |
336 | 22.9k | } |
337 | | |
338 | 414k | if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) { |
339 | 216k | VLOG_TXN_STATE47 (2) << "This was a read-only transaction, nothing to commit."47 ; |
340 | 216k | ResetTxnAndSession(); |
341 | 216k | return Status::OK(); |
342 | 216k | } |
343 | | |
344 | 18.4E | VLOG_TXN_STATE(2) << (commit ? "Committing"0 : "Aborting") << " transaction."; |
345 | 197k | Status status = client_->FinishTransaction(commit, DdlMode::kFalse); |
346 | 18.4E | VLOG_TXN_STATE(2) << "Transaction " << (commit ? "commit"0 : "abort") << " status: " << status; |
347 | 197k | ResetTxnAndSession(); |
348 | 197k | return status; |
349 | 414k | } |
350 | | |
351 | 56.6k | Status PgTxnManager::AbortTransaction() { |
352 | 56.6k | return FinishTransaction(Commit::kFalse); |
353 | 56.6k | } |
354 | | |
355 | 898k | void PgTxnManager::ResetTxnAndSession() { |
356 | 898k | txn_in_progress_ = false; |
357 | 898k | isolation_level_ = IsolationLevel::NON_TRANSACTIONAL; |
358 | 898k | ++txn_serial_no_; |
359 | | |
360 | 898k | enable_follower_reads_ = false; |
361 | 898k | read_only_ = false; |
362 | 898k | read_time_for_follower_reads_ = HybridTime(); |
363 | 898k | read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; |
364 | 898k | } |
365 | | |
366 | 20.3k | Status PgTxnManager::EnterSeparateDdlTxnMode() { |
367 | 20.3k | RSTATUS_DCHECK(!ddl_mode_, IllegalState, |
368 | 20.3k | "EnterSeparateDdlTxnMode called when already in a DDL transaction"); |
369 | 20.3k | VLOG_TXN_STATE(2); |
370 | 20.3k | ddl_mode_ = true; |
371 | 20.3k | VLOG_TXN_STATE(2); |
372 | 20.3k | return Status::OK(); |
373 | 20.3k | } |
374 | | |
375 | 20.4k | Status PgTxnManager::ExitSeparateDdlTxnMode(Commit commit) { |
376 | 20.4k | VLOG_TXN_STATE(2); |
377 | 20.4k | if (!ddl_mode_) { |
378 | 37 | RSTATUS_DCHECK(!commit, IllegalState, "Commit ddl txn called when not in a DDL transaction"); |
379 | 37 | return Status::OK(); |
380 | 37 | } |
381 | 20.3k | RETURN_NOT_OK(client_->FinishTransaction(commit, DdlMode::kTrue)); |
382 | 20.3k | ddl_mode_ = false; |
383 | 20.3k | return Status::OK(); |
384 | 20.3k | } |
385 | | |
386 | 0 | std::string PgTxnManager::TxnStateDebugStr() const { |
387 | 0 | return YB_CLASS_TO_STRING( |
388 | 0 | ddl_mode, |
389 | 0 | read_only, |
390 | 0 | deferrable, |
391 | 0 | txn_in_progress, |
392 | 0 | pg_isolation_level, |
393 | 0 | isolation_level); |
394 | 0 | } |
395 | | |
396 | 1.50M | void PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) { |
397 | 1.50M | if (!ddl_mode_ && !txn_in_progress_941k ) { |
398 | 69.1k | ++txn_serial_no_; |
399 | 69.1k | } |
400 | 1.50M | options->set_isolation(isolation_level_); |
401 | 1.50M | options->set_ddl_mode(ddl_mode_); |
402 | 1.50M | options->set_txn_serial_no(txn_serial_no_); |
403 | 1.50M | if (use_saved_priority_) { |
404 | 75.2k | options->set_use_existing_priority(true); |
405 | 1.43M | } else { |
406 | 1.43M | options->set_priority(priority_); |
407 | 1.43M | } |
408 | 1.50M | if (need_restart_) { |
409 | 509 | options->set_restart_transaction(true); |
410 | 509 | need_restart_ = false; |
411 | 509 | } |
412 | 1.50M | if (need_defer_read_point_) { |
413 | 80 | options->set_defer_read_point(true); |
414 | 80 | need_defer_read_point_ = false; |
415 | 80 | } |
416 | 1.50M | options->set_read_time_manipulation(read_time_manipulation_); |
417 | 1.50M | read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; |
418 | 1.50M | if (read_time_for_follower_reads_) { |
419 | 73 | ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time()); |
420 | 73 | } |
421 | 1.50M | } |
422 | | |
423 | | } // namespace pggate |
424 | | } // namespace yb |