/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_txn_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/pggate/pg_txn_manager.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/session.h" |
18 | | #include "yb/client/transaction.h" |
19 | | |
20 | | #include "yb/common/common.pb.h" |
21 | | #include "yb/common/transaction_priority.h" |
22 | | #include "yb/common/ybc_util.h" |
23 | | |
24 | | #include "yb/rpc/rpc_controller.h" |
25 | | |
26 | | #include "yb/tserver/pg_client.pb.h" |
27 | | #include "yb/tserver/tserver_service.proxy.h" |
28 | | #include "yb/tserver/tserver_shared_mem.h" |
29 | | |
30 | | #include "yb/util/debug-util.h" |
31 | | #include "yb/util/format.h" |
32 | | #include "yb/util/logging.h" |
33 | | #include "yb/util/random_util.h" |
34 | | #include "yb/util/shared_mem.h" |
35 | | #include "yb/util/status.h" |
36 | | #include "yb/util/status_format.h" |
37 | | |
38 | | #include "yb/yql/pggate/pg_client.h" |
39 | | #include "yb/yql/pggate/pggate_flags.h" |
40 | | #include "yb/yql/pggate/ybc_pggate.h" |
41 | | |
42 | | DEFINE_bool(use_node_hostname_for_local_tserver, false, |
43 | | "Connect to local t-server by using host name instead of local IP"); |
44 | | |
45 | | // A macro for logging the function name and the state of the current transaction. |
46 | | // This macro is not enclosed in do { ... } while (true) because we want to be able to write |
47 | | // additional information into the same log message. |
48 | | #define VLOG_TXN_STATE(vlog_level) \ |
49 | 3.69M | VLOG(vlog_level) << __func__ << ": " << TxnStateDebugStr() \ |
50 | 150 | << "; query: { " << ::yb::pggate::GetDebugQueryString(pg_callbacks_) << " }; " |
51 | | |
52 | | DECLARE_uint64(max_clock_skew_usec); |
53 | | |
54 | | DECLARE_bool(ysql_forward_rpcs_to_local_tserver); |
55 | | |
56 | | namespace { |
57 | | |
58 | | // Local copies that can be modified. |
59 | | uint64_t txn_priority_highpri_upper_bound = yb::kHighPriTxnUpperBound; |
60 | | uint64_t txn_priority_highpri_lower_bound = yb::kHighPriTxnLowerBound; |
61 | | |
62 | | uint64_t txn_priority_regular_upper_bound = yb::kRegularTxnUpperBound; |
63 | | uint64_t txn_priority_regular_lower_bound = yb::kRegularTxnLowerBound; |
64 | | |
65 | | // Converts double value in range 0..1 to uint64_t value in range [minValue, maxValue] |
66 | 14.4k | uint64_t ConvertBound(long double value, uint64_t minValue, uint64_t maxValue) { |
67 | 14.4k | if (value <= 0.0) { |
68 | 7.23k | return minValue; |
69 | 7.23k | } |
70 | | |
71 | 7.26k | if (value >= 1.0) { |
72 | 7.23k | return maxValue; |
73 | 7.23k | } |
74 | | |
75 | 28 | return minValue + value * (maxValue - minValue); |
76 | 28 | } |
77 | | |
78 | 7.24k | uint64_t ConvertRegularPriorityTxnBound(double value) { |
79 | 7.24k | return ConvertBound(value, yb::kRegularTxnLowerBound, yb::kRegularTxnUpperBound); |
80 | 7.24k | } |
81 | | |
82 | 7.24k | uint64_t ConvertHighPriorityTxnBound(double value) { |
83 | 7.24k | return ConvertBound(value, yb::kHighPriTxnLowerBound, yb::kHighPriTxnUpperBound); |
84 | 7.24k | } |
85 | | |
86 | | } // namespace |
87 | | |
88 | | extern "C" { |
89 | | |
90 | 3.62k | void YBCAssignTransactionPriorityLowerBound(double newval, void* extra) { |
91 | 3.62k | txn_priority_regular_lower_bound = ConvertRegularPriorityTxnBound(newval); |
92 | 3.62k | txn_priority_highpri_lower_bound = ConvertHighPriorityTxnBound(newval); |
93 | | // YSQL layer checks (guc.c) should ensure this. |
94 | 3.62k | DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound); |
95 | 3.62k | DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound); |
96 | 3.62k | DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_highpri_lower_bound); |
97 | 3.62k | } |
98 | | |
99 | 3.62k | void YBCAssignTransactionPriorityUpperBound(double newval, void* extra) { |
100 | 3.62k | txn_priority_regular_upper_bound = ConvertRegularPriorityTxnBound(newval); |
101 | 3.62k | txn_priority_highpri_upper_bound = ConvertHighPriorityTxnBound(newval); |
102 | | // YSQL layer checks (guc.c) should ensure this. |
103 | 3.62k | DCHECK_LE(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound); |
104 | 3.62k | DCHECK_LE(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound); |
105 | 3.62k | DCHECK_LE(txn_priority_regular_upper_bound, txn_priority_highpri_lower_bound); |
106 | 3.62k | } |
107 | | |
108 | | int* YBCStatementTimeoutPtr = nullptr; |
109 | | |
110 | | } |
111 | | |
112 | | using namespace std::literals; |
113 | | using namespace std::placeholders; |
114 | | |
115 | | namespace yb { |
116 | | namespace pggate { |
117 | | |
118 | | using client::YBTransaction; |
119 | | using client::AsyncClientInitialiser; |
120 | | using client::TransactionManager; |
121 | | using client::YBTransactionPtr; |
122 | | using client::YBSession; |
123 | | using client::YBSessionPtr; |
124 | | using client::LocalTabletFilter; |
125 | | |
126 | | #if defined(__APPLE__) && !defined(NDEBUG) |
127 | | // We are experiencing more slowness in tests on macOS in debug mode. |
128 | | const int kDefaultPgYbSessionTimeoutMs = 120 * 1000; |
129 | | #else |
130 | | const int kDefaultPgYbSessionTimeoutMs = 60 * 1000; |
131 | | #endif |
132 | | |
133 | | DEFINE_int32(pg_yb_session_timeout_ms, kDefaultPgYbSessionTimeoutMs, |
134 | | "Timeout for operations between PostgreSQL server and YugaByte DocDB services"); |
135 | | |
136 | | std::shared_ptr<yb::client::YBSession> BuildSession( |
137 | | yb::client::YBClient* client, |
138 | 1.65k | const scoped_refptr<ClockBase>& clock) { |
139 | 1.65k | int statement_timeout = YBCStatementTimeoutPtr ? *YBCStatementTimeoutPtr : 0; |
140 | 1.65k | int session_timeout = FLAGS_pg_yb_session_timeout_ms; |
141 | 1.65k | if (statement_timeout > 0 && statement_timeout < session_timeout) { |
142 | 1 | session_timeout = statement_timeout; |
143 | 1 | } |
144 | 1.65k | auto session = std::make_shared<YBSession>(client, clock); |
145 | 1.65k | session->SetForceConsistentRead(client::ForceConsistentRead::kTrue); |
146 | 1.65k | session->SetTimeout(MonoDelta::FromMilliseconds(session_timeout)); |
147 | 1.65k | return session; |
148 | 1.65k | } |
149 | | |
150 | | PgTxnManager::PgTxnManager( |
151 | | PgClient* client, |
152 | | scoped_refptr<ClockBase> clock, |
153 | | const tserver::TServerSharedObject* tserver_shared_object, |
154 | | PgCallbacks pg_callbacks) |
155 | | : client_(client), |
156 | | clock_(std::move(clock)), |
157 | | tserver_shared_object_(tserver_shared_object), |
158 | 1.65k | pg_callbacks_(pg_callbacks) { |
159 | 1.65k | } |
160 | | |
161 | 1.65k | PgTxnManager::~PgTxnManager() { |
162 | | // Abort the transaction before the transaction manager gets destroyed. |
163 | 1.65k | WARN_NOT_OK(AbortTransaction(), "Failed to abort transaction in dtor"); |
164 | 1.65k | } |
165 | | |
166 | 160k | Status PgTxnManager::BeginTransaction() { |
167 | 160k | VLOG_TXN_STATE(2); |
168 | 160k | if (YBCIsInitDbModeEnvVarSet()) { |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | 160k | if (IsTxnInProgress()) { |
172 | 0 | return STATUS(IllegalState, "Transaction is already in progress"); |
173 | 0 | } |
174 | 160k | return RecreateTransaction(SavePriority::kFalse /* save_priority */); |
175 | 160k | } |
176 | | |
177 | 20.2k | Status PgTxnManager::RecreateTransaction() { |
178 | 20.2k | VLOG_TXN_STATE(2); |
179 | 20.2k | return RecreateTransaction(SavePriority::kTrue /* save_priority */); |
180 | 20.2k | } |
181 | | |
182 | 180k | Status PgTxnManager::RecreateTransaction(const SavePriority save_priority) { |
183 | 180k | use_saved_priority_ = save_priority; |
184 | 180k | ResetTxnAndSession(); |
185 | 180k | txn_in_progress_ = true; |
186 | 180k | return Status::OK(); |
187 | 180k | } |
188 | | |
189 | 161k | Status PgTxnManager::SetPgIsolationLevel(int level) { |
190 | 161k | pg_isolation_level_ = static_cast<PgIsolationLevel>(level); |
191 | 161k | return Status::OK(); |
192 | 161k | } |
193 | | |
194 | 2.00M | PgIsolationLevel PgTxnManager::GetPgIsolationLevel() { |
195 | 2.00M | return pg_isolation_level_; |
196 | 2.00M | } |
197 | | |
198 | 160k | Status PgTxnManager::SetReadOnly(bool read_only) { |
199 | 160k | read_only_ = read_only; |
200 | 18.4E | VLOG(2) << __func__ << " set to " << read_only_ << " from " << GetStackTrace(); |
201 | 160k | return UpdateReadTimeForFollowerReadsIfRequired(); |
202 | 160k | } |
203 | | |
204 | 160k | Status PgTxnManager::EnableFollowerReads(bool enable_follower_reads, int32_t session_staleness) { |
205 | 57 | VLOG_TXN_STATE(2) << (enable_follower_reads ? "Enabling follower reads " |
206 | 57 | : "Disabling follower reads ") |
207 | 57 | << " with staleness " << session_staleness << " ms"; |
208 | 160k | enable_follower_reads_ = enable_follower_reads; |
209 | 160k | follower_read_staleness_ms_ = session_staleness; |
210 | 160k | return UpdateReadTimeForFollowerReadsIfRequired(); |
211 | 160k | } |
212 | | |
213 | 320k | Status PgTxnManager::UpdateReadTimeForFollowerReadsIfRequired() { |
214 | 320k | if (enable_follower_reads_ && read_only_ && !read_time_for_follower_reads_) { |
215 | 0 | constexpr uint64_t kMargin = 2; |
216 | 0 | RSTATUS_DCHECK( |
217 | 0 | follower_read_staleness_ms_ * 1000 > kMargin * GetAtomicFlag(&FLAGS_max_clock_skew_usec), |
218 | 0 | InvalidArgument, |
219 | 0 | Format("Setting follower read staleness less than the $0 x max_clock_skew.", kMargin)); |
220 | | // Add a delta to the start point to lower the read point. |
221 | 0 | read_time_for_follower_reads_ = clock_->Now().AddMilliseconds(-follower_read_staleness_ms_); |
222 | 0 | VLOG_TXN_STATE(2) << "Updating read-time with staleness " |
223 | 0 | << follower_read_staleness_ms_ << " to " |
224 | 0 | << read_time_for_follower_reads_; |
225 | 320k | } else { |
226 | 14 | VLOG(2) << " Not updating read-time " << yb::ToString(pg_isolation_level_) |
227 | 14 | << read_time_for_follower_reads_ |
228 | 14 | << (enable_follower_reads_ ? " Follower reads allowed." : " Follower reads DISallowed.") |
229 | 14 | << (read_only_ ? " Is read-only" : " Is NOT read-only"); |
230 | 320k | } |
231 | 320k | return Status::OK(); |
232 | 320k | } |
233 | | |
234 | 160k | Status PgTxnManager::SetDeferrable(bool deferrable) { |
235 | 160k | deferrable_ = deferrable; |
236 | 160k | return Status::OK(); |
237 | 160k | } |
238 | | |
239 | 75.8k | uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requirement) { |
240 | 75.8k | if (txn_priority_requirement == kHighestPriority) { |
241 | 385 | return txn_priority_highpri_upper_bound; |
242 | 385 | } |
243 | | |
244 | 75.4k | if (txn_priority_requirement == kHigherPriorityRange) { |
245 | 40 | return RandomUniformInt(txn_priority_highpri_lower_bound, |
246 | 40 | txn_priority_highpri_upper_bound); |
247 | 40 | } |
248 | | |
249 | 75.4k | return RandomUniformInt(txn_priority_regular_lower_bound, |
250 | 75.4k | txn_priority_regular_upper_bound); |
251 | 75.4k | } |
252 | | |
253 | | Status PgTxnManager::CalculateIsolation( |
254 | 1.80M | bool read_only_op, TxnPriorityRequirement txn_priority_requirement) { |
255 | 1.80M | if (ddl_mode_) { |
256 | 200k | VLOG_TXN_STATE(2); |
257 | 200k | return Status::OK(); |
258 | 200k | } |
259 | | |
260 | 1.60M | VLOG_TXN_STATE(2); |
261 | 1.60M | if (!txn_in_progress_) { |
262 | 0 | return RecreateTransaction(SavePriority::kFalse /* save_priority */); |
263 | 0 | } |
264 | | |
265 | | // Using pg_isolation_level_, read_only_, and deferrable_, determine the effective isolation level |
266 | | // to use at the DocDB layer, and the "deferrable" flag. |
267 | | // |
268 | | // Effective isolation means that sometimes SERIALIZABLE reads are internally executed as snapshot |
269 | | // isolation reads. This way we don't have to write read intents and we get higher peformance. |
270 | | // The resulting execution is still serializable: the order of transactions is the order of |
271 | | // timestamps, i.e. read timestamps (for read-only transactions executed at snapshot isolation) |
272 | | // and commit timestamps of serializable transactions. |
273 | | // |
274 | | // The "deferrable" flag that in SERIALIZABLE DEFERRABLE READ ONLY mode we will choose the read |
275 | | // timestamp as global_limit to avoid the possibility of read restarts. This results in waiting |
276 | | // out the maximum clock skew and is appropriate for non-latency-sensitive operations. |
277 | | |
278 | 1.60M | const IsolationLevel docdb_isolation = |
279 | 1.60M | (pg_isolation_level_ == PgIsolationLevel::SERIALIZABLE) && !read_only_ |
280 | 1.35M | ? IsolationLevel::SERIALIZABLE_ISOLATION |
281 | 253k | : (pg_isolation_level_ == PgIsolationLevel::READ_COMMITTED |
282 | 63.8k | ? IsolationLevel::READ_COMMITTED |
283 | 189k | : IsolationLevel::SNAPSHOT_ISOLATION); |
284 | 1.60M | const bool defer = read_only_ && deferrable_; |
285 | | |
286 | 32 | VLOG_TXN_STATE(2) << "DocDB isolation level: " << IsolationLevel_Name(docdb_isolation); |
287 | | |
288 | 1.60M | if (isolation_level_ != IsolationLevel::NON_TRANSACTIONAL) { |
289 | | // Sanity check: query layer should ensure that this does not happen. |
290 | 1.36M | if (isolation_level_ != docdb_isolation) { |
291 | 0 | return STATUS_FORMAT( |
292 | 0 | IllegalState, |
293 | 0 | "Attempt to change effective isolation from $0 to $1 in the middle of a transaction. " |
294 | 0 | "Postgres-level isolation: $2; read_only: $3.", |
295 | 0 | isolation_level_, IsolationLevel_Name(docdb_isolation), pg_isolation_level_, |
296 | 0 | read_only_); |
297 | 0 | } |
298 | 243k | } else if (read_only_op && |
299 | 191k | (docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION || |
300 | 147k | docdb_isolation == IsolationLevel::READ_COMMITTED)) { |
301 | 147k | if (defer) { |
302 | 0 | need_defer_read_point_ = true; |
303 | 0 | } |
304 | 96.2k | } else { |
305 | 96.2k | if (!use_saved_priority_) { |
306 | 75.9k | priority_ = NewPriority(txn_priority_requirement); |
307 | 75.9k | } |
308 | 96.2k | isolation_level_ = docdb_isolation; |
309 | | |
310 | 18.4E | VLOG_TXN_STATE(2) << "effective isolation level: " |
311 | 18.4E | << IsolationLevel_Name(docdb_isolation) |
312 | 18.4E | << "; transaction started successfully."; |
313 | 96.2k | } |
314 | | |
315 | 1.60M | return Status::OK(); |
316 | 1.60M | } |
317 | | |
318 | 0 | Status PgTxnManager::RestartTransaction() { |
319 | 0 | need_restart_ = true; |
320 | 0 | return Status::OK(); |
321 | 0 | } |
322 | | |
323 | | /* This is called at the start of each statement in READ COMMITTED isolation level */ |
324 | 24.3k | Status PgTxnManager::ResetTransactionReadPoint() { |
325 | 24.3k | read_time_manipulation_ = tserver::ReadTimeManipulation::RESET; |
326 | 24.3k | return Status::OK(); |
327 | 24.3k | } |
328 | | |
329 | | /* This is called when a read committed transaction wants to restart its read point */ |
330 | 1 | Status PgTxnManager::RestartReadPoint() { |
331 | 1 | read_time_manipulation_ = tserver::ReadTimeManipulation::RESTART; |
332 | 1 | return Status::OK(); |
333 | 1 | } |
334 | | |
335 | 155k | Status PgTxnManager::CommitTransaction() { |
336 | 155k | return FinishTransaction(Commit::kTrue); |
337 | 155k | } |
338 | | |
339 | 168k | Status PgTxnManager::FinishTransaction(Commit commit) { |
340 | | // If a DDL operation during a DDL txn fails the txn will be aborted before we get here. |
341 | | // However if there are failures afterwards (i.e. during COMMIT or catalog version increment), |
342 | | // then we might get here with a ddl_txn_. Clean it up in that case. |
343 | 168k | if (ddl_mode_ && !commit) { |
344 | 0 | RETURN_NOT_OK(ExitSeparateDdlTxnMode(commit)); |
345 | 0 | } |
346 | | |
347 | 168k | if (!txn_in_progress_) { |
348 | 18.4E | VLOG_TXN_STATE(2) << "No transaction in progress, nothing to commit."; |
349 | 8.42k | return Status::OK(); |
350 | 8.42k | } |
351 | | |
352 | 160k | if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) { |
353 | 0 | VLOG_TXN_STATE(2) << "This was a read-only transaction, nothing to commit."; |
354 | 84.2k | ResetTxnAndSession(); |
355 | 84.2k | return Status::OK(); |
356 | 84.2k | } |
357 | | |
358 | 18.4E | VLOG_TXN_STATE(2) << (commit ? "Committing" : "Aborting") << " transaction."; |
359 | 76.0k | Status status = client_->FinishTransaction(commit, DdlMode::kFalse); |
360 | 18.4E | VLOG_TXN_STATE(2) << "Transaction " << (commit ? "commit" : "abort") << " status: " << status; |
361 | 76.0k | ResetTxnAndSession(); |
362 | 76.0k | return status; |
363 | 76.0k | } |
364 | | |
365 | 13.2k | Status PgTxnManager::AbortTransaction() { |
366 | 13.2k | return FinishTransaction(Commit::kFalse); |
367 | 13.2k | } |
368 | | |
369 | 340k | void PgTxnManager::ResetTxnAndSession() { |
370 | 340k | txn_in_progress_ = false; |
371 | 340k | isolation_level_ = IsolationLevel::NON_TRANSACTIONAL; |
372 | 340k | ++txn_serial_no_; |
373 | | |
374 | 340k | enable_follower_reads_ = false; |
375 | 340k | read_only_ = false; |
376 | 340k | read_time_for_follower_reads_ = HybridTime(); |
377 | 340k | read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; |
378 | 340k | } |
379 | | |
380 | 6.33k | Status PgTxnManager::EnterSeparateDdlTxnMode() { |
381 | 6.33k | RSTATUS_DCHECK(!ddl_mode_, IllegalState, |
382 | 6.33k | "EnterSeparateDdlTxnMode called when already in a DDL transaction"); |
383 | 6.33k | VLOG_TXN_STATE(2); |
384 | 6.33k | ddl_mode_ = true; |
385 | 6.33k | VLOG_TXN_STATE(2); |
386 | 6.33k | return Status::OK(); |
387 | 6.33k | } |
388 | | |
389 | 6.36k | Status PgTxnManager::ExitSeparateDdlTxnMode(Commit commit) { |
390 | 6.36k | VLOG_TXN_STATE(2); |
391 | 6.36k | if (!ddl_mode_) { |
392 | 22 | RSTATUS_DCHECK(!commit, IllegalState, "Commit ddl txn called when not in a DDL transaction"); |
393 | 22 | return Status::OK(); |
394 | 22 | } |
395 | 6.33k | RETURN_NOT_OK(client_->FinishTransaction(commit, DdlMode::kTrue)); |
396 | 6.33k | ddl_mode_ = false; |
397 | 6.33k | return Status::OK(); |
398 | 6.33k | } |
399 | | |
400 | 0 | std::string PgTxnManager::TxnStateDebugStr() const { |
401 | 0 | return YB_CLASS_TO_STRING( |
402 | 0 | ddl_mode, |
403 | 0 | read_only, |
404 | 0 | deferrable, |
405 | 0 | txn_in_progress, |
406 | 0 | pg_isolation_level, |
407 | 0 | isolation_level); |
408 | 0 | } |
409 | | |
410 | 581k | void PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) { |
411 | 581k | if (!ddl_mode_ && !txn_in_progress_) { |
412 | 0 | ++txn_serial_no_; |
413 | 0 | } |
414 | 581k | options->set_isolation(isolation_level_); |
415 | 581k | options->set_ddl_mode(ddl_mode_); |
416 | 581k | options->set_txn_serial_no(txn_serial_no_); |
417 | 581k | if (use_saved_priority_) { |
418 | 20.3k | options->set_use_existing_priority(true); |
419 | 560k | } else { |
420 | 560k | options->set_priority(priority_); |
421 | 560k | } |
422 | 581k | if (need_restart_) { |
423 | 0 | options->set_restart_transaction(true); |
424 | 0 | need_restart_ = false; |
425 | 0 | } |
426 | 581k | if (need_defer_read_point_) { |
427 | 0 | options->set_defer_read_point(true); |
428 | 0 | need_defer_read_point_ = false; |
429 | 0 | } |
430 | 581k | options->set_read_time_manipulation(read_time_manipulation_); |
431 | 581k | read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; |
432 | 581k | if (read_time_for_follower_reads_) { |
433 | 0 | ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time()); |
434 | 0 | } |
435 | 581k | } |
436 | | |
437 | | } // namespace pggate |
438 | | } // namespace yb |