/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_mini-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <atomic> |
15 | | #include <thread> |
16 | | |
17 | | #include <boost/preprocessor/seq/for_each.hpp> |
18 | | |
19 | | #include <gtest/gtest.h> |
20 | | |
21 | | #include "yb/client/yb_table_name.h" |
22 | | |
23 | | #include "yb/common/pgsql_error.h" |
24 | | |
25 | | #include "yb/docdb/value_type.h" |
26 | | |
27 | | #include "yb/integration-tests/mini_cluster.h" |
28 | | |
29 | | #include "yb/master/catalog_entity_info.h" |
30 | | #include "yb/master/catalog_manager_if.h" |
31 | | #include "yb/master/mini_master.h" |
32 | | #include "yb/master/sys_catalog_constants.h" |
33 | | |
34 | | #include "yb/rocksdb/db.h" |
35 | | |
36 | | #include "yb/server/skewed_clock.h" |
37 | | |
38 | | #include "yb/tablet/tablet.h" |
39 | | #include "yb/tablet/tablet_peer.h" |
40 | | #include "yb/tablet/transaction_participant.h" |
41 | | |
42 | | #include "yb/tools/tools_test_utils.h" |
43 | | |
44 | | #include "yb/util/atomic.h" |
45 | | #include "yb/util/random_util.h" |
46 | | #include "yb/util/scope_exit.h" |
47 | | #include "yb/util/status_log.h" |
48 | | #include "yb/util/test_macros.h" |
49 | | #include "yb/util/test_thread_holder.h" |
50 | | #include "yb/util/test_util.h" |
51 | | #include "yb/util/tsan_util.h" |
52 | | |
53 | | #include "yb/yql/pggate/pggate_flags.h" |
54 | | #include "yb/yql/pgwrapper/pg_mini_test_base.h" |
55 | | |
56 | | using namespace std::literals; |
57 | | |
58 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
59 | | DECLARE_bool(TEST_force_master_leader_resolution); |
60 | | DECLARE_bool(TEST_timeout_non_leader_master_rpcs); |
61 | | DECLARE_double(TEST_respond_write_failed_probability); |
62 | | DECLARE_double(TEST_transaction_ignore_applying_probability); |
63 | | DECLARE_int32(history_cutoff_propagation_interval_ms); |
64 | | DECLARE_int32(timestamp_history_retention_interval_sec); |
65 | | DECLARE_int32(txn_max_apply_batch_records); |
66 | | DECLARE_int64(apply_intents_task_injected_delay_ms); |
67 | | DECLARE_uint64(max_clock_skew_usec); |
68 | | DECLARE_int64(db_write_buffer_size); |
69 | | DECLARE_bool(rocksdb_use_logging_iterator); |
70 | | DECLARE_bool(enable_automatic_tablet_splitting); |
71 | | DECLARE_int32(yb_num_shards_per_tserver); |
72 | | DECLARE_int64(tablet_split_low_phase_size_threshold_bytes); |
73 | | DECLARE_int64(tablet_split_high_phase_size_threshold_bytes); |
74 | | DECLARE_int64(tablet_split_low_phase_shard_count_per_node); |
75 | | DECLARE_int64(tablet_split_high_phase_shard_count_per_node); |
76 | | |
77 | | DECLARE_int32(heartbeat_interval_ms); |
78 | | DECLARE_int32(tserver_heartbeat_metrics_interval_ms); |
79 | | DECLARE_int32(TEST_txn_participant_inject_latency_on_apply_update_txn_ms); |
80 | | |
81 | | DECLARE_int64(db_block_size_bytes); |
82 | | DECLARE_int64(db_filter_block_size_bytes); |
83 | | DECLARE_int64(db_index_block_size_bytes); |
84 | | DECLARE_int64(tablet_force_split_threshold_bytes); |
85 | | DECLARE_int64(TEST_inject_random_delay_on_txn_status_response_ms); |
86 | | |
87 | | namespace yb { |
88 | | namespace pgwrapper { |
89 | | namespace { |
90 | | |
91 | | template<IsolationLevel level> |
92 | | class TxnHelper { |
93 | | public: |
94 | 0 | static CHECKED_STATUS StartTxn(PGConn* connection) { |
95 | 0 | return connection->StartTransaction(level); |
96 | 0 | } Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE8StartTxnEPNS0_6PGConnE Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE8StartTxnEPNS0_6PGConnE |
97 | | |
98 | 0 | static CHECKED_STATUS ExecuteInTxn(PGConn* connection, const std::string& query) { |
99 | 0 | const auto guard = CreateTxnGuard(connection); |
100 | 0 | return connection->Execute(query); |
101 | 0 | } Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE |
102 | | |
103 | 0 | static Result<PGResultPtr> FetchInTxn(PGConn* connection, const std::string& query) { |
104 | 0 | const auto guard = CreateTxnGuard(connection); |
105 | 0 | return connection->Fetch(query); |
106 | 0 | } Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE |
107 | | |
108 | | private: |
109 | 0 | static auto CreateTxnGuard(PGConn* connection) { |
110 | 0 | EXPECT_OK(StartTxn(connection)); |
111 | 0 | return ScopeExit([connection]() { |
112 | | // Event in case some operations in transaction failed the COMMIT command |
113 | | // will complete successfully as ROLLBACK will be performed by postgres. |
114 | 0 | EXPECT_OK(connection->Execute("COMMIT")); |
115 | 0 | }); Unexecuted instantiation: pg_mini-test.cc:_ZZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE14CreateTxnGuardEPNS0_6PGConnEENKUlvE_clEv Unexecuted instantiation: pg_mini-test.cc:_ZZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE14CreateTxnGuardEPNS0_6PGConnEENKUlvE_clEv |
116 | 0 | } Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE14CreateTxnGuardEPNS0_6PGConnE Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE14CreateTxnGuardEPNS0_6PGConnE |
117 | | }; |
118 | | |
119 | 0 | std::string RowMarkTypeToPgsqlString(const RowMarkType row_mark_type) { |
120 | 0 | switch (row_mark_type) { |
121 | 0 | case RowMarkType::ROW_MARK_EXCLUSIVE: |
122 | 0 | return "UPDATE"; |
123 | 0 | case RowMarkType::ROW_MARK_NOKEYEXCLUSIVE: |
124 | 0 | return "NO KEY UPDATE"; |
125 | 0 | case RowMarkType::ROW_MARK_SHARE: |
126 | 0 | return "SHARE"; |
127 | 0 | case RowMarkType::ROW_MARK_KEYSHARE: |
128 | 0 | return "KEY SHARE"; |
129 | 0 | default: |
130 | | // We shouldn't get here because other row lock types are disabled at the postgres level. |
131 | 0 | LOG(DFATAL) << "Unsupported row lock of type " << RowMarkType_Name(row_mark_type); |
132 | 0 | return ""; |
133 | 0 | } |
134 | 0 | } |
135 | | |
136 | | YB_DEFINE_ENUM(TestStatement, (kInsert)(kDelete)); |
137 | | |
138 | | } // namespace |
139 | | |
140 | | class PgMiniTest : public PgMiniTestBase { |
141 | | protected: |
142 | | // Have several threads doing updates and several threads doing large scans in parallel. |
143 | | // If deferrable is true, then the scans are in deferrable transactions, so no read restarts are |
144 | | // expected. |
145 | | // Otherwise, the scans are in transactions with snapshot isolation, but we still don't expect any |
146 | | // read restarts to be observed because they should be transparently handled on the postgres side. |
147 | | void TestReadRestart(bool deferrable = true); |
148 | | |
149 | | // Run interleaved INSERT/DELETE, SELECT with specified isolation level and row mark. |
150 | | // Possible isolation levels are SNAPSHOT_ISOLATION and SERIALIZABLE_ISOLATION. |
151 | | // Possible row marks are ROW_MARK_KEYSHARE, ROW_MARK_SHARE, ROW_MARK_NOKEYEXCLUSIVE, and |
152 | | // ROW_MARK_EXCLUSIVE. |
153 | | void TestSelectRowLock(IsolationLevel isolation, RowMarkType row_mark, TestStatement statement); |
154 | | |
155 | | void TestForeignKey(IsolationLevel isolation); |
156 | | |
157 | | void TestBigInsert(bool restart); |
158 | | |
159 | | void CreateTableAndInitialize(std::string table_name, int num_tablets); |
160 | | |
161 | | void DestroyTable(std::string table_name); |
162 | | |
163 | | void GetTableIDFromTableName(const std::string table_name, std::string* table_id); |
164 | | |
165 | | void StartReadWriteThreads(std::string table_name, TestThreadHolder *thread_holder); |
166 | | |
167 | | void TestConcurrentDeleteRowAndUpdateColumn(bool select_before_update); |
168 | | |
169 | 0 | void FlushAndCompactTablets() { |
170 | 0 | FLAGS_timestamp_history_retention_interval_sec = 0; |
171 | 0 | FLAGS_history_cutoff_propagation_interval_ms = 1; |
172 | 0 | ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync)); |
173 | 0 | const auto compaction_start = MonoTime::Now(); |
174 | 0 | ASSERT_OK(cluster_->CompactTablets()); |
175 | 0 | const auto compaction_finish = MonoTime::Now(); |
176 | 0 | const double compaction_elapsed_time_sec = (compaction_finish - compaction_start).ToSeconds(); |
177 | 0 | LOG(INFO) << "Compaction duration: " << compaction_elapsed_time_sec << " s"; |
178 | 0 | } |
179 | | |
180 | | void RunManyConcurrentReadersTest(); |
181 | | }; |
182 | | |
183 | | class PgMiniSingleTServerTest : public PgMiniTest { |
184 | | public: |
185 | 0 | size_t NumTabletServers() override { |
186 | 0 | return 1; |
187 | 0 | } |
188 | | }; |
189 | | |
190 | | class PgMiniMasterFailoverTest : public PgMiniTest { |
191 | | public: |
192 | 0 | size_t NumMasters() override { |
193 | 0 | return 3; |
194 | 0 | } |
195 | | }; |
196 | | |
197 | | // Try to change this to test follower reads. |
198 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(FollowerReads)) { |
199 | 0 | auto conn = ASSERT_RESULT(Connect()); |
200 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t2 (key int PRIMARY KEY, word TEXT, phrase TEXT)")); |
201 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t2 (key, word, phrase) VALUES (1, 'old', 'old is gold')")); |
202 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t2 (key, word, phrase) VALUES (2, 'NEW', 'NEW is fine')")); |
203 | |
|
204 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value TEXT)")); |
205 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (1, 'old')")); |
206 | |
|
207 | 0 | ASSERT_OK(conn.Execute("SET yb_debug_log_docdb_requests = true")); |
208 | 0 | ASSERT_OK(conn.Execute("SET yb_read_from_followers = true")); |
209 | | |
210 | | // Try to set a value < 2 * max_clock_skew (500ms) should fail. |
211 | 0 | ASSERT_NOK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", 400))); |
212 | 0 | ASSERT_NOK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", 999))); |
213 | | // Setting a value > 2 * max_clock_skew should work. |
214 | 0 | ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", 1001))); |
215 | | |
216 | | // Setting staleness to what we require for the test. |
217 | | // Sleep and then perform an update, such that follower reads should see the old value. |
218 | | // But current reads will see the new/updated value. |
219 | 0 | constexpr int32_t kStalenessMs = 4000; |
220 | 0 | SleepFor(MonoDelta::FromMilliseconds(kStalenessMs)); |
221 | 0 | ASSERT_OK(conn.Execute("UPDATE t SET value = 'NEW' WHERE key = 1")); |
222 | 0 | auto kUpdateTime = MonoTime::Now(); |
223 | 0 | ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", kStalenessMs))); |
224 | | |
225 | | // Follower reads will not be enabled unless a transaction block is marked read-only. |
226 | 0 | { |
227 | 0 | ASSERT_OK(conn.Execute("BEGIN TRANSACTION")); |
228 | 0 | auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
229 | 0 | ASSERT_EQ(value, "NEW"); |
230 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
231 | 0 | } |
232 | | |
233 | | // Follower reads will be enabled for transaction block(s) marked read-only. |
234 | 0 | { |
235 | 0 | ASSERT_OK(conn.Execute("BEGIN TRANSACTION READ ONLY")); |
236 | 0 | auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
237 | 0 | ASSERT_EQ(value, "old"); |
238 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
239 | 0 | } |
240 | | |
241 | | // Follower reads will not be enabled unless the session or statement is marked read-only. |
242 | 0 | auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
243 | 0 | ASSERT_EQ(value, "NEW"); |
244 | |
|
245 | 0 | value = ASSERT_RESULT( |
246 | 0 | conn.FetchValue<std::string>("SELECT phrase FROM t, t2 WHERE t.value = t2.word")); |
247 | 0 | ASSERT_EQ(value, "NEW is fine"); |
248 | | |
249 | | // Follower reads can be enabled for a single statement with a pg hint. |
250 | 0 | value = |
251 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
252 | 0 | "SELECT value FROM t WHERE key = 1")); |
253 | 0 | ASSERT_EQ(value, "old"); |
254 | 0 | value = ASSERT_RESULT( |
255 | 0 | conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
256 | 0 | "SELECT phrase FROM t, t2 WHERE t.value = t2.word")); |
257 | 0 | ASSERT_EQ(value, "old is gold"); |
258 | | |
259 | | // pg_hint only applies for the specific statement used. |
260 | | // Statements following it should not enable follower reads if it is not marked read-only. |
261 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
262 | 0 | ASSERT_EQ(value, "NEW"); |
263 | | |
264 | | // pg_hint should also apply for prepared statements, if the hint is provided |
265 | | // at PREPARE stage. |
266 | 0 | { |
267 | 0 | ASSERT_OK( |
268 | 0 | conn.Execute("PREPARE hinted_select_stmt (int) AS " |
269 | 0 | "/*+ Set(transaction_read_only on) */ " |
270 | 0 | "SELECT value FROM t WHERE key = $1")); |
271 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("EXECUTE hinted_select_stmt (1)")); |
272 | 0 | ASSERT_EQ(value, "old"); |
273 | 0 | value = |
274 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
275 | 0 | "EXECUTE hinted_select_stmt (1)")); |
276 | 0 | ASSERT_EQ(value, "old"); |
277 | 0 | value = |
278 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ " |
279 | 0 | "EXECUTE hinted_select_stmt (1)")); |
280 | 0 | ASSERT_EQ(value, "old"); |
281 | 0 | } |
282 | | // Adding a pg_hint at the EXECUTE stage has no effect. |
283 | 0 | { |
284 | 0 | ASSERT_OK( |
285 | 0 | conn.Execute("PREPARE select_stmt (int) AS " |
286 | 0 | "SELECT value FROM t WHERE key = $1")); |
287 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("EXECUTE select_stmt (1)")); |
288 | 0 | ASSERT_EQ(value, "NEW"); |
289 | 0 | value = |
290 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
291 | 0 | "EXECUTE select_stmt (1)")); |
292 | 0 | ASSERT_EQ(value, "NEW"); |
293 | 0 | } |
294 | | |
295 | | // pg_hint with func() |
296 | | // The hint may be provided when the function is defined, or when the function is |
297 | | // called. |
298 | 0 | { |
299 | 0 | ASSERT_OK( |
300 | 0 | conn.Execute("CREATE FUNCTION func() RETURNS text AS" |
301 | 0 | " $$ SELECT value FROM t WHERE key = 1 $$ LANGUAGE SQL")); |
302 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT func()")); |
303 | 0 | ASSERT_EQ(value, "NEW"); |
304 | 0 | value = |
305 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ " |
306 | 0 | "SELECT func()")); |
307 | 0 | ASSERT_EQ(value, "NEW"); |
308 | 0 | value = |
309 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
310 | 0 | "SELECT func()")); |
311 | 0 | ASSERT_EQ(value, "old"); |
312 | 0 | ASSERT_OK(conn.Execute("DROP FUNCTION func()")); |
313 | 0 | } |
314 | 0 | { |
315 | 0 | ASSERT_OK( |
316 | 0 | conn.Execute("CREATE FUNCTION hinted_func() RETURNS text AS" |
317 | 0 | " $$ " |
318 | 0 | "/*+ Set(transaction_read_only on) */ " |
319 | 0 | "SELECT value FROM t WHERE key = 1" |
320 | 0 | " $$ LANGUAGE SQL")); |
321 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT hinted_func()")); |
322 | 0 | ASSERT_EQ(value, "old"); |
323 | 0 | value = |
324 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ " |
325 | 0 | "SELECT hinted_func()")); |
326 | 0 | ASSERT_EQ(value, "old"); |
327 | 0 | value = |
328 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
329 | 0 | "SELECT hinted_func()")); |
330 | 0 | ASSERT_EQ(value, "old"); |
331 | 0 | ASSERT_OK(conn.Execute("DROP FUNCTION hinted_func()")); |
332 | 0 | } |
333 | |
|
334 | 0 | ASSERT_OK(conn.Execute("SET default_transaction_read_only = true")); |
335 | | // Follower reads will be enabled since the session is marked read-only. |
336 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
337 | 0 | ASSERT_EQ(value, "old"); |
338 | | |
339 | | // pg_hint can only mark a session from read-write to read-only. |
340 | | // Marking a statement in a read-only session as read-write is not allowed. |
341 | | // Writes operations fail. |
342 | | // Read operations will be performed as if they are read-write, so will not use follower |
343 | | // reads. |
344 | 0 | { |
345 | 0 | auto status = conn.Execute( |
346 | 0 | "/*+ Set(transaction_read_only off) */ " |
347 | 0 | "UPDATE t SET value = 'NEWER' WHERE key = 1"); |
348 | 0 | ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_READ_ONLY_SQL_TRANSACTION) << status; |
349 | 0 | ASSERT_STR_CONTAINS(status.ToString(), "cannot execute UPDATE in a read-only transaction"); |
350 | |
|
351 | 0 | auto value = |
352 | 0 | ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ " |
353 | 0 | "SELECT value FROM t WHERE key = 1")); |
354 | 0 | ASSERT_EQ(value, "old"); |
355 | 0 | } |
356 | | |
357 | | // After sufficient time has passed, even "follower reads" should see the newer value. |
358 | 0 | SleepFor(kUpdateTime + MonoDelta::FromMilliseconds(kStalenessMs) - MonoTime::Now()); |
359 | |
|
360 | 0 | ASSERT_OK(conn.Execute("SET default_transaction_read_only = false")); |
361 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
362 | 0 | ASSERT_EQ(value, "NEW"); |
363 | |
|
364 | 0 | ASSERT_OK(conn.Execute("SET default_transaction_read_only = true")); |
365 | 0 | value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
366 | 0 | ASSERT_EQ(value, "NEW"); |
367 | 0 | value = ASSERT_RESULT( |
368 | 0 | conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ " |
369 | 0 | "SELECT phrase FROM t, t2 WHERE t.value = t2.word")); |
370 | 0 | ASSERT_EQ(value, "NEW is fine"); |
371 | 0 | } |
372 | | |
373 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(MultiColFollowerReads)) { |
374 | 0 | auto conn = ASSERT_RESULT(Connect()); |
375 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (k int PRIMARY KEY, c1 TEXT, c2 TEXT)")); |
376 | 0 | ASSERT_OK(conn.Execute("SET yb_debug_log_docdb_requests = true")); |
377 | 0 | ASSERT_OK(conn.Execute("SET yb_read_from_followers = true")); |
378 | |
|
379 | 0 | constexpr int32_t kSleepTimeMs = 1200 * kTimeMultiplier; |
380 | |
|
381 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t (k, c1, c2) VALUES (1, 'old', 'old')")); |
382 | 0 | auto kUpdateTime0 = MonoTime::Now(); |
383 | |
|
384 | 0 | SleepFor(MonoDelta::FromMilliseconds(kSleepTimeMs)); |
385 | |
|
386 | 0 | ASSERT_OK(conn.Execute("UPDATE t SET c1 = 'NEW' WHERE k = 1")); |
387 | 0 | auto kUpdateTime1 = MonoTime::Now(); |
388 | |
|
389 | 0 | SleepFor(MonoDelta::FromMilliseconds(kSleepTimeMs)); |
390 | |
|
391 | 0 | ASSERT_OK(conn.Execute("UPDATE t SET c2 = 'NEW' WHERE k = 1")); |
392 | 0 | auto kUpdateTime2 = MonoTime::Now(); |
393 | |
|
394 | 0 | auto result = |
395 | 0 | ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only off) */ " |
396 | 0 | "SELECT * FROM t WHERE k = 1")); |
397 | 0 | ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0))); |
398 | 0 | ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 1))); |
399 | 0 | ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 2))); |
400 | |
|
401 | 0 | const int32_t kOpDurationMs = 10; |
402 | 0 | auto staleness_ms = (MonoTime::Now() - kUpdateTime0).ToMilliseconds() - kOpDurationMs; |
403 | 0 | ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", staleness_ms))); |
404 | 0 | result = |
405 | 0 | ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only on) */ " |
406 | 0 | "SELECT * FROM t WHERE k = 1")); |
407 | 0 | ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0))); |
408 | 0 | ASSERT_EQ("old", ASSERT_RESULT(GetString(result.get(), 0, 1))); |
409 | 0 | ASSERT_EQ("old", ASSERT_RESULT(GetString(result.get(), 0, 2))); |
410 | |
|
411 | 0 | staleness_ms = (MonoTime::Now() - kUpdateTime1).ToMilliseconds() - kOpDurationMs; |
412 | 0 | ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", staleness_ms))); |
413 | 0 | result = |
414 | 0 | ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only on) */ " |
415 | 0 | "SELECT * FROM t WHERE k = 1")); |
416 | 0 | ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0))); |
417 | 0 | ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 1))); |
418 | 0 | ASSERT_EQ("old", ASSERT_RESULT(GetString(result.get(), 0, 2))); |
419 | |
|
420 | 0 | SleepFor(MonoDelta::FromMilliseconds(kSleepTimeMs)); |
421 | |
|
422 | 0 | staleness_ms = (MonoTime::Now() - kUpdateTime2).ToMilliseconds(); |
423 | 0 | ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", staleness_ms))); |
424 | 0 | result = |
425 | 0 | ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only on) */ " |
426 | 0 | "SELECT * FROM t WHERE k = 1")); |
427 | 0 | ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0))); |
428 | 0 | ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 1))); |
429 | 0 | ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 2))); |
430 | 0 | } |
431 | | |
432 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(Simple)) { |
433 | 0 | auto conn = ASSERT_RESULT(Connect()); |
434 | |
|
435 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value TEXT)")); |
436 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (1, 'hello')")); |
437 | |
|
438 | 0 | auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1")); |
439 | 0 | ASSERT_EQ(value, "hello"); |
440 | 0 | } |
441 | | |
442 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(RowLockWithoutTransaction)) { |
443 | 0 | auto conn = ASSERT_RESULT(Connect()); |
444 | |
|
445 | 0 | auto status = conn.Execute( |
446 | 0 | "SELECT tmplinline FROM pg_catalog.pg_pltemplate WHERE tmplname !~ tmplhandler FOR SHARE"); |
447 | 0 | ASSERT_NOK(status); |
448 | 0 | ASSERT_STR_CONTAINS(status.message().ToBuffer(), |
449 | 0 | "Read request with row mark types must be part of a transaction"); |
450 | 0 | } |
451 | | |
452 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(WriteRetry)) { |
453 | 0 | constexpr int kKeys = 100; |
454 | 0 | auto conn = ASSERT_RESULT(Connect()); |
455 | |
|
456 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)")); |
457 | |
|
458 | 0 | SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability); |
459 | |
|
460 | 0 | LOG(INFO) << "Insert " << kKeys << " keys"; |
461 | 0 | for (int key = 0; key != kKeys; ++key) { |
462 | 0 | auto status = conn.ExecuteFormat("INSERT INTO t (key) VALUES ($0)", key); |
463 | 0 | ASSERT_TRUE(status.ok() || PgsqlError(status) == YBPgErrorCode::YB_PG_UNIQUE_VIOLATION || |
464 | 0 | status.ToString().find("Already present: Duplicate request") != std::string::npos) |
465 | 0 | << status; |
466 | 0 | } |
467 | |
|
468 | 0 | SetAtomicFlag(0, &FLAGS_TEST_respond_write_failed_probability); |
469 | |
|
470 | 0 | auto result = ASSERT_RESULT(conn.FetchMatrix("SELECT * FROM t ORDER BY key", kKeys, 1)); |
471 | 0 | for (int key = 0; key != kKeys; ++key) { |
472 | 0 | auto fetched_key = ASSERT_RESULT(GetInt32(result.get(), key, 0)); |
473 | 0 | ASSERT_EQ(fetched_key, key); |
474 | 0 | } |
475 | |
|
476 | 0 | LOG(INFO) << "Insert duplicate key"; |
477 | 0 | auto status = conn.Execute("INSERT INTO t (key) VALUES (1)"); |
478 | 0 | ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_UNIQUE_VIOLATION) << status; |
479 | 0 | ASSERT_STR_CONTAINS(status.ToString(), "duplicate key value violates unique constraint"); |
480 | 0 | } |
481 | | |
482 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(With)) { |
483 | 0 | auto conn = ASSERT_RESULT(Connect()); |
484 | |
|
485 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY, v int)")); |
486 | |
|
487 | 0 | ASSERT_OK(conn.Execute( |
488 | 0 | "WITH test2 AS (UPDATE test SET v = 2 WHERE k = 1) " |
489 | 0 | "UPDATE test SET v = 3 WHERE k = 1")); |
490 | 0 | } |
491 | | |
492 | 0 | void PgMiniTest::TestReadRestart(const bool deferrable) { |
493 | 0 | constexpr CoarseDuration kWaitTime = 60s; |
494 | 0 | constexpr int kKeys = 100; |
495 | 0 | constexpr int kNumReadThreads = 8; |
496 | 0 | constexpr int kNumUpdateThreads = 8; |
497 | 0 | constexpr int kRequiredNumReads = 500; |
498 | 0 | constexpr std::chrono::milliseconds kClockSkew = -100ms; |
499 | 0 | std::atomic<int> num_read_restarts(0); |
500 | 0 | std::atomic<int> num_read_successes(0); |
501 | 0 | TestThreadHolder thread_holder; |
502 | | |
503 | | // Set up table |
504 | 0 | auto setup_conn = ASSERT_RESULT(Connect()); |
505 | 0 | ASSERT_OK(setup_conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value INT)")); |
506 | 0 | for (int key = 0; key != kKeys; ++key) { |
507 | 0 | ASSERT_OK(setup_conn.Execute(Format("INSERT INTO t (key, value) VALUES ($0, 0)", key))); |
508 | 0 | } |
509 | | |
510 | | // Introduce clock skew |
511 | 0 | auto delta_changers = SkewClocks(cluster_.get(), kClockSkew); |
512 | | |
513 | | // Start read threads |
514 | 0 | for (int i = 0; i < kNumReadThreads; ++i) { |
515 | 0 | thread_holder.AddThreadFunctor([this, deferrable, &num_read_restarts, &num_read_successes, |
516 | 0 | &stop = thread_holder.stop_flag()] { |
517 | 0 | auto read_conn = ASSERT_RESULT(Connect()); |
518 | 0 | while (!stop.load(std::memory_order_acquire)) { |
519 | 0 | if (deferrable) { |
520 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, " |
521 | 0 | "DEFERRABLE")); |
522 | 0 | } else { |
523 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")); |
524 | 0 | } |
525 | 0 | auto result = read_conn.FetchMatrix("SELECT * FROM t", kKeys, 2); |
526 | 0 | if (!result.ok()) { |
527 | 0 | ASSERT_TRUE(result.status().IsNetworkError()) << result.status(); |
528 | 0 | ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) |
529 | 0 | << result.status(); |
530 | 0 | ASSERT_STR_CONTAINS(result.status().ToString(), "Restart read"); |
531 | 0 | ++num_read_restarts; |
532 | 0 | ASSERT_OK(read_conn.Execute("ABORT")); |
533 | 0 | break; |
534 | 0 | } else { |
535 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
536 | 0 | ++num_read_successes; |
537 | 0 | } |
538 | 0 | } |
539 | 0 | }); |
540 | 0 | } |
541 | | |
542 | | // Start update threads |
543 | 0 | for (int i = 0; i < kNumUpdateThreads; ++i) { |
544 | 0 | thread_holder.AddThreadFunctor([this, i, &stop = thread_holder.stop_flag()] { |
545 | 0 | auto update_conn = ASSERT_RESULT(Connect()); |
546 | 0 | while (!stop.load(std::memory_order_acquire)) { |
547 | 0 | for (int key = i; key < kKeys; key += kNumUpdateThreads) { |
548 | 0 | ASSERT_OK(update_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")); |
549 | 0 | ASSERT_OK(update_conn.Execute( |
550 | 0 | Format("UPDATE t SET value = value + 1 WHERE key = $0", key))); |
551 | 0 | ASSERT_OK(update_conn.Execute("COMMIT")); |
552 | 0 | } |
553 | 0 | } |
554 | 0 | }); |
555 | 0 | } |
556 | | |
557 | | // Stop threads after a while |
558 | 0 | thread_holder.WaitAndStop(kWaitTime); |
559 | | |
560 | | // Count successful reads |
561 | 0 | int num_reads = (num_read_restarts.load(std::memory_order_acquire) |
562 | 0 | + num_read_successes.load(std::memory_order_acquire)); |
563 | 0 | LOG(INFO) << "Successful reads: " << num_read_successes.load(std::memory_order_acquire) << "/" |
564 | 0 | << num_reads; |
565 | 0 | ASSERT_EQ(num_read_restarts.load(std::memory_order_acquire), 0); |
566 | 0 | ASSERT_GT(num_read_successes.load(std::memory_order_acquire), kRequiredNumReads); |
567 | 0 | } |
568 | | |
569 | | class PgMiniLargeClockSkewTest : public PgMiniTest { |
570 | | public: |
571 | 0 | void SetUp() override { |
572 | 0 | SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec); |
573 | 0 | PgMiniTestBase::SetUp(); |
574 | 0 | } |
575 | | }; |
576 | | |
577 | | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSerializableDeferrable), |
578 | 0 | PgMiniLargeClockSkewTest) { |
579 | 0 | TestReadRestart(true /* deferrable */); |
580 | 0 | } |
581 | | |
582 | | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSnapshot), |
583 | 0 | PgMiniLargeClockSkewTest) { |
584 | 0 | TestReadRestart(false /* deferrable */); |
585 | 0 | } |
586 | | |
587 | | void PgMiniTest::TestSelectRowLock( |
588 | 0 | IsolationLevel isolation, RowMarkType row_mark, TestStatement statement) { |
589 | 0 | const std::string isolation_str = ( |
590 | 0 | isolation == IsolationLevel::SNAPSHOT_ISOLATION ? "REPEATABLE READ" : "SERIALIZABLE"); |
591 | 0 | const std::string row_mark_str = RowMarkTypeToPgsqlString(row_mark); |
592 | 0 | constexpr auto kSleepTime = 1s; |
593 | 0 | constexpr int kKeys = 3; |
594 | 0 | PGConn read_conn = ASSERT_RESULT(Connect()); |
595 | 0 | PGConn misc_conn = ASSERT_RESULT(Connect()); |
596 | 0 | PGConn write_conn = ASSERT_RESULT(Connect()); |
597 | | |
598 | | // Set up table |
599 | 0 | ASSERT_OK(misc_conn.Execute("CREATE TABLE t (i INT PRIMARY KEY, j INT)")); |
600 | | // TODO: remove this when issue #2857 is fixed. |
601 | 0 | std::this_thread::sleep_for(kSleepTime); |
602 | 0 | for (int i = 0; i < kKeys; ++i) { |
603 | 0 | ASSERT_OK(misc_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", i)); |
604 | 0 | } |
605 | |
|
606 | 0 | ASSERT_OK(read_conn.ExecuteFormat("BEGIN TRANSACTION ISOLATION LEVEL $0", isolation_str)); |
607 | 0 | ASSERT_OK(read_conn.FetchFormat("SELECT * FROM t WHERE i = $0", -1)); |
608 | | |
609 | | // Sleep to ensure that read done in txn doesn't face kReadRestart after INSERT (a sleep will |
610 | | // ensure sufficient gap between write time and read point - more than clock skew). |
611 | 0 | std::this_thread::sleep_for(kSleepTime); |
612 | |
|
613 | 0 | if (statement == TestStatement::kInsert) { |
614 | 0 | ASSERT_OK(write_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", kKeys)); |
615 | 0 | } else { |
616 | 0 | ASSERT_OK(write_conn.ExecuteFormat( |
617 | 0 | "DELETE FROM t WHERE i = $0", RandomUniformInt(0, kKeys - 1))); |
618 | 0 | } |
619 | 0 | auto result = read_conn.FetchFormat("SELECT * FROM t FOR $0", row_mark_str); |
620 | 0 | if (isolation == IsolationLevel::SNAPSHOT_ISOLATION && statement == TestStatement::kDelete) { |
621 | 0 | ASSERT_NOK(result); |
622 | 0 | ASSERT_TRUE(result.status().IsNetworkError()) << result.status(); |
623 | 0 | ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) |
624 | 0 | << result.status(); |
625 | 0 | ASSERT_STR_CONTAINS(result.status().ToString(), |
626 | 0 | "could not serialize access due to concurrent update"); |
627 | 0 | ASSERT_OK(read_conn.Execute("ABORT")); |
628 | 0 | } else { |
629 | 0 | ASSERT_OK(result); |
630 | | // NOTE: vanilla PostgreSQL expects kKeys rows, but kKeys +/- 1 rows are expected for Yugabyte. |
631 | 0 | auto expected_keys = kKeys; |
632 | 0 | if (isolation != IsolationLevel::SNAPSHOT_ISOLATION) { |
633 | 0 | expected_keys += statement == TestStatement::kInsert ? 1 : -1; |
634 | 0 | } |
635 | 0 | ASSERT_EQ(PQntuples(result.get().get()), expected_keys); |
636 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
637 | 0 | } |
638 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
639 | 0 | } |
640 | | |
641 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForUpdate)) { |
642 | 0 | TestSelectRowLock( |
643 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE, TestStatement::kInsert); |
644 | 0 | } |
645 | | |
646 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForUpdate)) { |
647 | 0 | TestSelectRowLock( |
648 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE, |
649 | 0 | TestStatement::kInsert); |
650 | 0 | } |
651 | | |
652 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForNoKeyUpdate)) { |
653 | 0 | TestSelectRowLock( |
654 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE, |
655 | 0 | TestStatement::kInsert); |
656 | 0 | } |
657 | | |
658 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForNoKeyUpdate)) { |
659 | 0 | TestSelectRowLock( |
660 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE, |
661 | 0 | TestStatement::kInsert); |
662 | 0 | } |
663 | | |
664 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForShare)) { |
665 | 0 | TestSelectRowLock( |
666 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kInsert); |
667 | 0 | } |
668 | | |
669 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForShare)) { |
670 | 0 | TestSelectRowLock( |
671 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kInsert); |
672 | 0 | } |
673 | | |
674 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForKeyShare)) { |
675 | 0 | TestSelectRowLock( |
676 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE, TestStatement::kInsert); |
677 | 0 | } |
678 | | |
679 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForKeyShare)) { |
680 | 0 | TestSelectRowLock( |
681 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE, |
682 | 0 | TestStatement::kInsert); |
683 | 0 | } |
684 | | |
685 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForUpdate)) { |
686 | 0 | TestSelectRowLock( |
687 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE, TestStatement::kDelete); |
688 | 0 | } |
689 | | |
690 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForUpdate)) { |
691 | 0 | TestSelectRowLock( |
692 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE, |
693 | 0 | TestStatement::kDelete); |
694 | 0 | } |
695 | | |
696 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForNoKeyUpdate)) { |
697 | 0 | TestSelectRowLock( |
698 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE, |
699 | 0 | TestStatement::kDelete); |
700 | 0 | } |
701 | | |
702 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForNoKeyUpdate)) { |
703 | 0 | TestSelectRowLock( |
704 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE, |
705 | 0 | TestStatement::kDelete); |
706 | 0 | } |
707 | | |
708 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForShare)) { |
709 | 0 | TestSelectRowLock( |
710 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kDelete); |
711 | 0 | } |
712 | | |
713 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForShare)) { |
714 | 0 | TestSelectRowLock( |
715 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kDelete); |
716 | 0 | } |
717 | | |
718 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForKeyShare)) { |
719 | 0 | TestSelectRowLock( |
720 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE, TestStatement::kDelete); |
721 | 0 | } |
722 | | |
723 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForKeyShare)) { |
724 | 0 | TestSelectRowLock( |
725 | 0 | IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE, |
726 | 0 | TestStatement::kDelete); |
727 | 0 | } |
728 | | |
729 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadOnly)) { |
730 | 0 | PGConn read_conn = ASSERT_RESULT(Connect()); |
731 | 0 | PGConn setup_conn = ASSERT_RESULT(Connect()); |
732 | 0 | PGConn write_conn = ASSERT_RESULT(Connect()); |
733 | | |
734 | | // Set up table |
735 | 0 | ASSERT_OK(setup_conn.Execute("CREATE TABLE t (i INT)")); |
736 | 0 | ASSERT_OK(setup_conn.Execute("INSERT INTO t (i) VALUES (0)")); |
737 | | |
738 | | // SERIALIZABLE, READ ONLY should use snapshot isolation |
739 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY")); |
740 | 0 | ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE")); |
741 | 0 | ASSERT_OK(write_conn.Execute("UPDATE t SET i = i + 1")); |
742 | 0 | ASSERT_OK(read_conn.Fetch("SELECT * FROM t")); |
743 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
744 | 0 | ASSERT_OK(write_conn.Execute("COMMIT")); |
745 | | |
746 | | // READ ONLY, SERIALIZABLE should use snapshot isolation |
747 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION READ ONLY, ISOLATION LEVEL SERIALIZABLE")); |
748 | 0 | ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION READ WRITE, ISOLATION LEVEL SERIALIZABLE")); |
749 | 0 | ASSERT_OK(read_conn.Fetch("SELECT * FROM t")); |
750 | 0 | ASSERT_OK(write_conn.Execute("UPDATE t SET i = i + 1")); |
751 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
752 | 0 | ASSERT_OK(write_conn.Execute("COMMIT")); |
753 | | |
754 | | // SHOW for READ ONLY should show serializable |
755 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY")); |
756 | 0 | Result<PGResultPtr> result = read_conn.Fetch("SHOW transaction_isolation"); |
757 | 0 | ASSERT_TRUE(result.ok()) << result.status(); |
758 | 0 | string value = ASSERT_RESULT(GetString(result.get().get(), 0, 0)); |
759 | 0 | ASSERT_EQ(value, "serializable"); |
760 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
761 | | |
762 | | // SHOW for READ WRITE to READ ONLY should show serializable and read_only |
763 | 0 | ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE")); |
764 | 0 | ASSERT_OK(write_conn.Execute("SET TRANSACTION READ ONLY")); |
765 | 0 | result = write_conn.Fetch("SHOW transaction_isolation"); |
766 | 0 | ASSERT_TRUE(result.ok()) << result.status(); |
767 | 0 | value = ASSERT_RESULT(GetString(result.get().get(), 0, 0)); |
768 | 0 | ASSERT_EQ(value, "serializable"); |
769 | 0 | result = write_conn.Fetch("SHOW transaction_read_only"); |
770 | 0 | ASSERT_TRUE(result.ok()) << result.status(); |
771 | 0 | value = ASSERT_RESULT(GetString(result.get().get(), 0, 0)); |
772 | 0 | ASSERT_EQ(value, "on"); |
773 | 0 | ASSERT_OK(write_conn.Execute("COMMIT")); |
774 | | |
775 | | // SERIALIZABLE, READ ONLY to READ WRITE should not use snapshot isolation |
776 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY")); |
777 | 0 | ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE")); |
778 | 0 | ASSERT_OK(read_conn.Execute("SET TRANSACTION READ WRITE")); |
779 | 0 | ASSERT_OK(write_conn.Execute("UPDATE t SET i = i + 1")); |
780 | | // The result of the following statement is probabilistic. If it does not fail now, then it |
781 | | // should fail during COMMIT. |
782 | 0 | result = read_conn.Fetch("SELECT * FROM t"); |
783 | 0 | if (result.ok()) { |
784 | 0 | ASSERT_OK(read_conn.Execute("COMMIT")); |
785 | 0 | Status status = write_conn.Execute("COMMIT"); |
786 | 0 | ASSERT_NOK(status); |
787 | 0 | ASSERT_TRUE(status.IsNetworkError()) << status; |
788 | 0 | ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) << status; |
789 | 0 | } else { |
790 | 0 | ASSERT_TRUE(result.status().IsNetworkError()) << result.status(); |
791 | 0 | ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) |
792 | 0 | << result.status(); |
793 | 0 | ASSERT_STR_CONTAINS(result.status().ToString(), "Conflicts with higher priority transaction"); |
794 | 0 | } |
795 | 0 | } |
796 | | |
797 | 0 | void AssertAborted(const Status& status) { |
798 | 0 | ASSERT_NOK(status); |
799 | 0 | ASSERT_STR_CONTAINS(status.ToString(), "aborted"); |
800 | 0 | } |
801 | | |
802 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SelectModifySelect)) { |
803 | 0 | { |
804 | 0 | auto read_conn = ASSERT_RESULT(Connect()); |
805 | 0 | auto write_conn = ASSERT_RESULT(Connect()); |
806 | |
|
807 | 0 | ASSERT_OK(read_conn.Execute("CREATE TABLE t (i INT)")); |
808 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE")); |
809 | 0 | ASSERT_RESULT(read_conn.FetchMatrix("SELECT * FROM t", 0, 1)); |
810 | 0 | ASSERT_OK(write_conn.Execute("INSERT INTO t VALUES (1)")); |
811 | 0 | ASSERT_NO_FATALS(AssertAborted(ResultToStatus(read_conn.Fetch("SELECT * FROM t")))); |
812 | 0 | } |
813 | 0 | { |
814 | 0 | auto read_conn = ASSERT_RESULT(Connect()); |
815 | 0 | auto write_conn = ASSERT_RESULT(Connect()); |
816 | |
|
817 | 0 | ASSERT_OK(read_conn.Execute("CREATE TABLE t2 (i INT PRIMARY KEY)")); |
818 | 0 | ASSERT_OK(read_conn.Execute("INSERT INTO t2 VALUES (1)")); |
819 | |
|
820 | 0 | ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE")); |
821 | 0 | ASSERT_RESULT(read_conn.FetchMatrix("SELECT * FROM t2", 1, 1)); |
822 | 0 | ASSERT_OK(write_conn.Execute("DELETE FROM t2 WHERE i = 1")); |
823 | 0 | ASSERT_NO_FATALS(AssertAborted(ResultToStatus(read_conn.Fetch("SELECT * FROM t2")))); |
824 | 0 | } |
825 | 0 | } |
826 | | |
827 | | class PgMiniSmallWriteBufferTest : public PgMiniTest { |
828 | | public: |
829 | 0 | void SetUp() override { |
830 | 0 | FLAGS_db_write_buffer_size = 256_KB; |
831 | 0 | PgMiniTest::SetUp(); |
832 | 0 | } |
833 | | }; |
834 | | |
835 | 0 | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BulkCopyWithRestart), PgMiniSmallWriteBufferTest) { |
836 | 0 | const std::string kTableName = "key_value"; |
837 | 0 | auto conn = ASSERT_RESULT(Connect()); |
838 | 0 | ASSERT_OK(conn.ExecuteFormat( |
839 | 0 | "CREATE TABLE $0 (key INTEGER NOT NULL PRIMARY KEY, value VARCHAR)", |
840 | 0 | kTableName)); |
841 | |
|
842 | 0 | TestThreadHolder thread_holder; |
843 | 0 | constexpr int kTotalBatches = RegularBuildVsSanitizers(50, 5); |
844 | 0 | constexpr int kBatchSize = 1000; |
845 | 0 | constexpr int kValueSize = 128; |
846 | |
|
847 | 0 | std::atomic<int> key(0); |
848 | |
|
849 | 0 | thread_holder.AddThreadFunctor([this, &kTableName, &stop = thread_holder.stop_flag(), &key] { |
850 | 0 | SetFlagOnExit set_flag(&stop); |
851 | 0 | auto connection = ASSERT_RESULT(Connect()); |
852 | |
|
853 | 0 | auto se = ScopeExit([&key] { |
854 | 0 | LOG(INFO) << "Total keys: " << key; |
855 | 0 | }); |
856 | |
|
857 | 0 | while (!stop.load(std::memory_order_acquire) && key < kBatchSize * kTotalBatches) { |
858 | 0 | ASSERT_OK(connection.CopyBegin(Format("COPY $0 FROM STDIN WITH BINARY", kTableName))); |
859 | 0 | for (int j = 0; j != kBatchSize; ++j) { |
860 | 0 | connection.CopyStartRow(2); |
861 | 0 | connection.CopyPutInt32(++key); |
862 | 0 | connection.CopyPutString(RandomHumanReadableString(kValueSize)); |
863 | 0 | } |
864 | |
|
865 | 0 | ASSERT_OK(connection.CopyEnd()); |
866 | 0 | } |
867 | 0 | }); |
868 | |
|
869 | 0 | thread_holder.AddThread(RestartsThread(cluster_.get(), 5s, &thread_holder.stop_flag())); |
870 | |
|
871 | 0 | thread_holder.WaitAndStop(120s); // Actually will stop when enough batches were copied |
872 | |
|
873 | 0 | ASSERT_EQ(key.load(std::memory_order_relaxed), kTotalBatches * kBatchSize); |
874 | |
|
875 | 0 | LOG(INFO) << "Restarting cluster"; |
876 | 0 | ASSERT_OK(RestartCluster()); |
877 | |
|
878 | 0 | ASSERT_OK(WaitFor([this, &conn, &key, &kTableName] { |
879 | 0 | auto intents_count = CountIntents(cluster_.get()); |
880 | 0 | LOG(INFO) << "Intents count: " << intents_count; |
881 | |
|
882 | 0 | if (intents_count <= 5000) { |
883 | 0 | return true; |
884 | 0 | } |
885 | | |
886 | | // We cleanup only transactions that were completely aborted/applied before last replication |
887 | | // happens. |
888 | | // So we could get into situation when intents of the last transactions are not cleaned. |
889 | | // To avoid such scenario in this test we write one more row to allow cleanup. |
890 | 0 | EXPECT_OK(conn.ExecuteFormat( |
891 | 0 | "INSERT INTO $0 VALUES ($1, '$2')", kTableName, ++key, |
892 | 0 | RandomHumanReadableString(kValueSize))); |
893 | |
|
894 | 0 | return false; |
895 | 0 | }, 10s * kTimeMultiplier, "Intents cleanup", 200ms)); |
896 | 0 | } |
897 | | |
898 | 0 | void PgMiniTest::TestForeignKey(IsolationLevel isolation_level) { |
899 | 0 | const std::string kDataTable = "data"; |
900 | 0 | const std::string kReferenceTable = "reference"; |
901 | 0 | constexpr int kRows = 10; |
902 | 0 | auto conn = ASSERT_RESULT(Connect()); |
903 | |
|
904 | 0 | ASSERT_OK(conn.ExecuteFormat( |
905 | 0 | "CREATE TABLE $0 (id int NOT NULL, name VARCHAR, PRIMARY KEY (id))", |
906 | 0 | kReferenceTable)); |
907 | 0 | ASSERT_OK(conn.ExecuteFormat( |
908 | 0 | "CREATE TABLE $0 (ref_id INTEGER, data_id INTEGER, name VARCHAR, " |
909 | 0 | "PRIMARY KEY (ref_id, data_id))", |
910 | 0 | kDataTable)); |
911 | 0 | ASSERT_OK(conn.ExecuteFormat( |
912 | 0 | "ALTER TABLE $0 ADD CONSTRAINT fk FOREIGN KEY(ref_id) REFERENCES $1(id) " |
913 | 0 | "ON DELETE CASCADE", |
914 | 0 | kDataTable, kReferenceTable)); |
915 | |
|
916 | 0 | ASSERT_OK(conn.ExecuteFormat( |
917 | 0 | "INSERT INTO $0 VALUES ($1, 'reference_$1')", kReferenceTable, 1)); |
918 | |
|
919 | 0 | for (int i = 1; i <= kRows; ++i) { |
920 | 0 | ASSERT_OK(conn.StartTransaction(isolation_level)); |
921 | 0 | ASSERT_OK(conn.ExecuteFormat( |
922 | 0 | "INSERT INTO $0 VALUES ($1, $2, 'data_$2')", kDataTable, 1, i)); |
923 | 0 | ASSERT_OK(conn.CommitTransaction()); |
924 | 0 | } |
925 | |
|
926 | 0 | ASSERT_OK(WaitFor([this] { |
927 | 0 | return CountIntents(cluster_.get()) == 0; |
928 | 0 | }, 15s, "Intents cleanup")); |
929 | 0 | } |
930 | | |
931 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ForeignKeySerializable)) { |
932 | 0 | TestForeignKey(IsolationLevel::SERIALIZABLE_ISOLATION); |
933 | 0 | } |
934 | | |
935 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ForeignKeySnapshot)) { |
936 | 0 | TestForeignKey(IsolationLevel::SNAPSHOT_ISOLATION); |
937 | 0 | } |
938 | | |
939 | | class PgMiniTestNoTxnRetry : public PgMiniTest { |
940 | | protected: |
941 | 0 | void BeforePgProcessStart() override { |
942 | 0 | FLAGS_ysql_sleep_before_retry_on_txn_conflict = false; |
943 | 0 | } |
944 | | }; |
945 | | |
946 | | template<IsolationLevel level> |
947 | | class PgMiniTestTxnHelper : public PgMiniTestNoTxnRetry { |
948 | | protected: |
949 | | |
950 | | // Check possibility of updating column in case row is referenced by foreign key from another txn. |
951 | 0 | void TestReferencedTableUpdate() { |
952 | 0 | auto conn = ASSERT_RESULT(Connect()); |
953 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE pktable (k INT PRIMARY KEY, v INT)")); |
954 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE fktable (k INT PRIMARY KEY, fk_p INT, v INT, " |
955 | 0 | "FOREIGN KEY(fk_p) REFERENCES pktable(k))")); |
956 | 0 | ASSERT_OK(conn.Execute("INSERT INTO pktable VALUES(1, 2)")); |
957 | 0 | auto extra_conn = ASSERT_RESULT(Connect()); |
958 | 0 | ASSERT_OK(StartTxn(&extra_conn)); |
959 | 0 | ASSERT_OK(extra_conn.Execute("INSERT INTO fktable VALUES(1, 1, 2)")); |
960 | 0 | ASSERT_OK(conn.Execute("UPDATE pktable SET v = 20 WHERE k = 1")); |
961 | | // extra_conn created strong read intent on (1, liveness) due to foreign key check. |
962 | | // As a result weak read intent is created for (1). |
963 | | // conn UPDATE created strong write intent on (1, v). |
964 | | // As a result weak write intent is created for (1). |
965 | | // Weak read + weak write on (1) has no conflicts. |
966 | 0 | ASSERT_OK(extra_conn.Execute("COMMIT")); |
967 | 0 | auto res = ASSERT_RESULT( |
968 | 0 | conn.template FetchValue<int64_t>("SELECT COUNT(*) FROM pktable WHERE v = 20")); |
969 | 0 | ASSERT_EQ(res, 1); |
970 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE25TestReferencedTableUpdateEv Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE25TestReferencedTableUpdateEv |
971 | | |
972 | | // Check that `FOR KEY SHARE` prevents rows from being deleted even in case not all key |
973 | | // components are specified (for SERIALIZABLE isolation level). For REPEATABLE READ, only rows |
974 | | // returned to the user are locked. |
975 | 0 | void TestRowKeyShareLock(const std::string& cur_name = "") { |
976 | 0 | auto conn = ASSERT_RESULT(SetHighPriTxn(Connect())); |
977 | 0 | auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect())); |
978 | |
|
979 | 0 | ASSERT_OK(conn.Execute( |
980 | 0 | "CREATE TABLE t (h INT, r1 INT, r2 INT, v INT, PRIMARY KEY(h, r1, r2))")); |
981 | 0 | ASSERT_OK(conn.Execute( |
982 | 0 | "INSERT INTO t VALUES (1, 2, 3, 4), (1, 2, 30, 40), (1, 3, 4, 5), (10, 2, 3, 4)")); |
983 | | |
984 | | // Transaction 1. |
985 | | // For SERIALIZABLE level: |
986 | | // SELECT FOR KEY SHARE locks the sub doc key prefix (1, 2) as not all key components are |
987 | | // specified. This also means that no new rows with this matching prefix can be inserted. |
988 | | // For REPEATABLE READ level: |
989 | | // Only tuples returned by the SELECT statement are locked. |
990 | 0 | ASSERT_OK(StartTxn(&conn)); |
991 | 0 | RowLock(&conn, "SELECT * FROM t WHERE h = 1 AND r1 = 2 FOR KEY SHARE", cur_name); |
992 | |
|
993 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3")); |
994 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 30")); |
995 | | |
996 | | // Doc key (1, 3, 4) is not locked in both isolation levels. |
997 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 3 AND r2 = 4")); |
998 | | |
999 | | // New rows with prefix that matches (1, 2) can't be inserted in SERIALIZABLE level. |
1000 | 0 | if (level == IsolationLevel::SERIALIZABLE_ISOLATION) { |
1001 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)")); |
1002 | | // Doc key (1, 2, 2) doesn't exist. But still it conflicts beause of a kStrongRead intent |
1003 | | // taken on (1, 2) as part of the "fetching" the row when taking for key share locks (which |
1004 | | // only requires kWeakRead). |
1005 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1006 | 0 | } else { |
1007 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)")); |
1008 | | // Delete the row again |
1009 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 100")); |
1010 | | |
1011 | | // Doc key (1, 2, 2) doesn't exist. |
1012 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1013 | 0 | } |
1014 | |
|
1015 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1016 | | |
1017 | | // Transaction 2. |
1018 | | // For SERIALIZABLE level: |
1019 | | // SELECT FOR KEY SHARE locks the sub doc key prefix () as no prefix of the pk is specified. |
1020 | | // For REPEATABLE READ level: |
1021 | | // Only tuples returned by the SELECT statement are locked. |
1022 | 0 | ASSERT_OK(StartTxn(&conn)); |
1023 | 0 | RowLock(&conn, "SELECT * FROM t WHERE r2 = 2 FOR KEY SHARE", cur_name); |
1024 | |
|
1025 | 0 | if (level == IsolationLevel::SERIALIZABLE_ISOLATION) { |
1026 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3")); |
1027 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 10 AND r1 = 2 AND r2 = 3")); |
1028 | | // Doc key (1, 2, 2) doesn't exist. |
1029 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1030 | 0 | } else { |
1031 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3")); |
1032 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 10 AND r1 = 2 AND r2 = 3")); |
1033 | | // Re-add the rows |
1034 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 3, 4), (10, 2, 3, 4)")); |
1035 | |
|
1036 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)")); |
1037 | | // Delete the row again |
1038 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 100")); |
1039 | | |
1040 | | // Doc key (1, 2, 2) doesn't exist. |
1041 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1042 | 0 | } |
1043 | |
|
1044 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1045 | | |
1046 | | // Transaction 3. |
1047 | | // For SERIALIZABLE level: |
1048 | | // SELECT FOR KEY SHARE locks the sub doc key prefix (1) as not all key components are |
1049 | | // specified. |
1050 | | // For REPEATABLE READ level: |
1051 | | // Only tuples returned by the SELECT statement are locked. |
1052 | 0 | ASSERT_OK(StartTxn(&conn)); |
1053 | 0 | RowLock(&conn, "SELECT * FROM t WHERE h = 1 AND r2 = 2 FOR KEY SHARE", cur_name); |
1054 | | |
1055 | | // Doc key (10, 2, 3) is not locked. |
1056 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 10 AND r1 = 2 AND r2 = 3")); |
1057 | |
|
1058 | 0 | if (level == IsolationLevel::SERIALIZABLE_ISOLATION) { |
1059 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3")); |
1060 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)")); |
1061 | | |
1062 | | // Doc key (1, 2, 2) doesn't exist. |
1063 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1064 | 0 | } else { |
1065 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3")); |
1066 | | // Re-add deleted row |
1067 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 3, 4)")); |
1068 | |
|
1069 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)")); |
1070 | | // Delete the row again |
1071 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 100")); |
1072 | | |
1073 | | // Doc key (1, 2, 2) doesn't exist. |
1074 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1075 | 0 | } |
1076 | |
|
1077 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1078 | | |
1079 | | // Transaction 4. |
1080 | | // SELECT FOR KEY SHARE locks one specific row with doc key (1, 2, 3) only. |
1081 | 0 | ASSERT_OK(StartTxn(&conn)); |
1082 | 0 | RowLock(&conn, "SELECT * FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3 FOR KEY SHARE", cur_name); |
1083 | |
|
1084 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3")); |
1085 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 30")); |
1086 | | |
1087 | | // Doc key (1, 2, 2) doesn't exist. |
1088 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2")); |
1089 | |
|
1090 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1091 | |
|
1092 | 0 | auto res = ASSERT_RESULT(conn.template FetchValue<int64_t>("SELECT COUNT(*) FROM t")); |
1093 | 0 | ASSERT_EQ(res, 1); |
1094 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE19TestRowKeyShareLockERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE19TestRowKeyShareLockERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE |
1095 | | |
1096 | | // Check conflicts according to the following matrix (X - conflict, O - no conflict): |
1097 | | // | FOR KEY SHARE | FOR SHARE | FOR NO KEY UPDATE | FOR UPDATE |
1098 | | // ------------------+---------------+-----------+-------------------+----------- |
1099 | | // FOR KEY SHARE | O | O | O | X |
1100 | | // FOR SHARE | O | O | X | X |
1101 | | // FOR NO KEY UPDATE | O | X | X | X |
1102 | | // FOR UPDATE | X | X | X | X |
1103 | 0 | void TestRowLockConflictMatrix(const std::string& cur_name = "") { |
1104 | 0 | auto conn = ASSERT_RESULT(SetHighPriTxn(Connect())); |
1105 | 0 | auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect())); |
1106 | |
|
1107 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v INT)")); |
1108 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t VALUES (1, 1)")); |
1109 | | |
1110 | | // Transaction 1. |
1111 | 0 | ASSERT_OK(StartTxn(&conn)); |
1112 | 0 | RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE", cur_name); |
1113 | |
|
1114 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); |
1115 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); |
1116 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); |
1117 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); |
1118 | |
|
1119 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1120 | | |
1121 | | // Transaction 2. |
1122 | 0 | ASSERT_OK(StartTxn(&conn)); |
1123 | 0 | RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE", cur_name); |
1124 | |
|
1125 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); |
1126 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); |
1127 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); |
1128 | 0 | ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); |
1129 | |
|
1130 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1131 | | |
1132 | | // Transaction 3. |
1133 | 0 | ASSERT_OK(StartTxn(&conn)); |
1134 | 0 | RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR SHARE", cur_name); |
1135 | |
|
1136 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); |
1137 | 0 | ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); |
1138 | 0 | ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); |
1139 | 0 | ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); |
1140 | |
|
1141 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1142 | | |
1143 | | // Transaction 4. |
1144 | 0 | ASSERT_OK(StartTxn(&conn)); |
1145 | 0 | RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE", cur_name); |
1146 | |
|
1147 | 0 | ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); |
1148 | 0 | ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); |
1149 | 0 | ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); |
1150 | |
|
1151 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1152 | | |
1153 | | // Transaction 5. |
1154 | | // Check FOR KEY SHARE + FOR UPDATE conflict separately |
1155 | | // as FOR KEY SHARE uses regular and FOR UPDATE uses high txn priority. |
1156 | 0 | ASSERT_OK(StartTxn(&conn)); |
1157 | 0 | RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE", cur_name); |
1158 | |
|
1159 | 0 | ASSERT_OK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); |
1160 | |
|
1161 | 0 | ASSERT_NOK(conn.Execute("COMMIT")); |
1162 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE25TestRowLockConflictMatrixERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE25TestRowLockConflictMatrixERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE |
1163 | | |
1164 | 0 | void RowLock(PGConn* connection, const std::string& query, const std::string& cur_name) { |
1165 | 0 | std::string lock_stmt = query; |
1166 | 0 | if (!cur_name.empty()) { |
1167 | 0 | const std::string declare_stmt = Format("DECLARE $0 CURSOR FOR $1", cur_name, query); |
1168 | 0 | ASSERT_OK(connection->Execute(declare_stmt)); |
1169 | |
|
1170 | 0 | lock_stmt = Format("FETCH ALL $0", cur_name); |
1171 | 0 | } |
1172 | 0 | ASSERT_RESULT(connection->Fetch(lock_stmt)); |
1173 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE7RowLockEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEESE_ Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE7RowLockEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEESE_ |
1174 | | |
1175 | 0 | void TestInOperatorLock() { |
1176 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1177 | 0 | ASSERT_OK(conn.Execute("SET yb_transaction_priority_lower_bound = 0.9")); |
1178 | 0 | ASSERT_OK(conn.Execute( |
1179 | 0 | "CREATE TABLE t (h INT, r1 INT, r2 INT, PRIMARY KEY(h, r1 ASC, r2 ASC))")); |
1180 | 0 | ASSERT_OK(conn.Execute( |
1181 | 0 | "INSERT INTO t VALUES (1, 11, 1),(1, 12, 1),(1, 13, 1),(2, 11, 2),(2, 12, 2),(2, 13, 2)")); |
1182 | 0 | ASSERT_OK(StartTxn(&conn)); |
1183 | 0 | auto res = ASSERT_RESULT(conn.Fetch( |
1184 | 0 | "SELECT * FROM t WHERE h = 1 AND r1 IN (11, 12) AND r2 = 1 FOR KEY SHARE")); |
1185 | |
|
1186 | 0 | auto extra_conn = ASSERT_RESULT(Connect()); |
1187 | 0 | ASSERT_OK(extra_conn.Execute("SET yb_transaction_priority_upper_bound = 0.1")); |
1188 | 0 | ASSERT_OK(extra_conn.Execute("BEGIN")); |
1189 | 0 | ASSERT_NOK(extra_conn.Execute("DELETE FROM t WHERE h = 1 AND r1 = 11 AND r2 = 1")); |
1190 | 0 | ASSERT_OK(extra_conn.Execute("ROLLBACK")); |
1191 | 0 | ASSERT_OK(extra_conn.Execute("BEGIN")); |
1192 | 0 | ASSERT_OK(extra_conn.Execute("DELETE FROM t WHERE h = 1 AND r1 = 13 AND r2 = 1")); |
1193 | 0 | ASSERT_OK(extra_conn.Execute("COMMIT")); |
1194 | |
|
1195 | 0 | ASSERT_OK(conn.Execute("COMMIT;")); |
1196 | |
|
1197 | 0 | ASSERT_OK(conn.Execute("BEGIN;")); |
1198 | 0 | res = ASSERT_RESULT(conn.Fetch( |
1199 | 0 | "SELECT * FROM t WHERE h IN (1, 2) AND r1 = 11 FOR KEY SHARE")); |
1200 | |
|
1201 | 0 | ASSERT_OK(extra_conn.Execute("BEGIN")); |
1202 | 0 | ASSERT_NOK(extra_conn.Execute("DELETE FROM t WHERE h = 1 AND r1 = 11 AND r2 = 1")); |
1203 | 0 | ASSERT_OK(extra_conn.Execute("ROLLBACK")); |
1204 | 0 | ASSERT_OK(extra_conn.Execute("BEGIN")); |
1205 | 0 | ASSERT_NOK(extra_conn.Execute("DELETE FROM t WHERE h = 2 AND r1 = 11 AND r2 = 2")); |
1206 | 0 | ASSERT_OK(extra_conn.Execute("ROLLBACK")); |
1207 | 0 | ASSERT_OK(extra_conn.Execute("BEGIN")); |
1208 | 0 | ASSERT_OK(extra_conn.Execute("DELETE FROM t WHERE h = 2 AND r1 = 12 AND r2 = 2")); |
1209 | 0 | ASSERT_OK(extra_conn.Execute("COMMIT")); |
1210 | |
|
1211 | 0 | ASSERT_OK(conn.Execute("COMMIT;")); |
1212 | 0 | const auto count = ASSERT_RESULT(conn.template FetchValue<int64_t>("SELECT COUNT(*) FROM t")); |
1213 | 0 | ASSERT_EQ(4, count); |
1214 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE18TestInOperatorLockEv Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE18TestInOperatorLockEv |
1215 | | |
1216 | 0 | static Result<PGConn> SetHighPriTxn(Result<PGConn> connection) { |
1217 | 0 | return Execute(std::move(connection), "SET yb_transaction_priority_lower_bound=0.5"); |
1218 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE13SetHighPriTxnENS_6ResultINS0_6PGConnEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE13SetHighPriTxnENS_6ResultINS0_6PGConnEEE |
1219 | | |
1220 | 0 | static Result<PGConn> SetLowPriTxn(Result<PGConn> connection) { |
1221 | 0 | return Execute(std::move(connection), "SET yb_transaction_priority_upper_bound=0.4"); |
1222 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE12SetLowPriTxnENS_6ResultINS0_6PGConnEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE12SetLowPriTxnENS_6ResultINS0_6PGConnEEE |
1223 | | |
1224 | 0 | static Result<PGConn> Execute(Result<PGConn> connection, const std::string& query) { |
1225 | 0 | if (connection.ok()) { |
1226 | 0 | RETURN_NOT_OK((*connection).Execute(query)); |
1227 | 0 | } |
1228 | 0 | return connection; |
1229 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE7ExecuteENS_6ResultINS0_6PGConnEEERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE7ExecuteENS_6ResultINS0_6PGConnEEERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE |
1230 | | |
1231 | 0 | static CHECKED_STATUS StartTxn(PGConn* connection) { |
1232 | 0 | return TxnHelper<level>::StartTxn(connection); |
1233 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE8StartTxnEPNS0_6PGConnE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE8StartTxnEPNS0_6PGConnE |
1234 | | |
1235 | 0 | static CHECKED_STATUS ExecuteInTxn(PGConn* connection, const std::string& query) { |
1236 | 0 | return TxnHelper<level>::ExecuteInTxn(connection, query); |
1237 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE |
1238 | | |
1239 | 0 | static Result<PGResultPtr> FetchInTxn(PGConn* connection, const std::string& query) { |
1240 | 0 | return TxnHelper<level>::FetchInTxn(connection, query); |
1241 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE |
1242 | | }; |
1243 | | |
1244 | | class PgMiniTestTxnHelperSerializable |
1245 | | : public PgMiniTestTxnHelper<IsolationLevel::SERIALIZABLE_ISOLATION> { |
1246 | | protected: |
1247 | | // Check two SERIALIZABLE txns has no conflict in case of updating same column in same row. |
1248 | 0 | void TestSameColumnUpdate() { |
1249 | 0 | auto conn = ASSERT_RESULT(SetHighPriTxn(Connect())); |
1250 | 0 | auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect())); |
1251 | |
|
1252 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v1 INT, v2 INT)")); |
1253 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t VALUES(1, 2, 3)")); |
1254 | |
|
1255 | 0 | ASSERT_OK(StartTxn(&conn)); |
1256 | 0 | ASSERT_OK(conn.Execute("UPDATE t SET v1 = 20 WHERE k = 1")); |
1257 | |
|
1258 | 0 | ASSERT_OK(ExecuteInTxn(&extra_conn, "UPDATE t SET v1 = 40 WHERE k = 1")); |
1259 | |
|
1260 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1261 | |
|
1262 | 0 | auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t WHERE v1 = 20")); |
1263 | 0 | ASSERT_EQ(res, 1); |
1264 | |
|
1265 | 0 | ASSERT_OK(StartTxn(&conn)); |
1266 | | // Next statement will lock whole row for updates due to expression |
1267 | 0 | ASSERT_OK(conn.Execute("UPDATE t SET v2 = v2 * 2 WHERE k = 1")); |
1268 | |
|
1269 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "UPDATE t SET v2 = 10 WHERE k = 1")); |
1270 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "UPDATE t SET v1 = 10 WHERE k = 1")); |
1271 | |
|
1272 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1273 | |
|
1274 | 0 | res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t WHERE v2 = 6")); |
1275 | 0 | ASSERT_EQ(res, 1); |
1276 | 0 | } |
1277 | | }; |
1278 | | |
1279 | | class PgMiniTestTxnHelperSnapshot |
1280 | | : public PgMiniTestTxnHelper<IsolationLevel::SNAPSHOT_ISOLATION> { |
1281 | | protected: |
1282 | | // Check two SNAPSHOT txns has a conflict in case of updating same column in same row. |
1283 | 0 | void TestSameColumnUpdate() { |
1284 | 0 | auto conn = ASSERT_RESULT(SetHighPriTxn(Connect())); |
1285 | 0 | auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect())); |
1286 | |
|
1287 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v INT)")); |
1288 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t VALUES(1, 2)")); |
1289 | |
|
1290 | 0 | ASSERT_OK(StartTxn(&conn)); |
1291 | 0 | ASSERT_OK(conn.Execute("UPDATE t SET v = 20 WHERE k = 1")); |
1292 | |
|
1293 | 0 | ASSERT_NOK(ExecuteInTxn(&extra_conn, "UPDATE t SET v = 40 WHERE k = 1")); |
1294 | |
|
1295 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
1296 | |
|
1297 | 0 | const auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t WHERE v = 20")); |
1298 | 0 | ASSERT_EQ(res, 1); |
1299 | 0 | } |
1300 | | }; |
1301 | | |
1302 | | TEST_F_EX(PgMiniTest, |
1303 | | YB_DISABLE_TEST_IN_TSAN(ReferencedTableUpdateSerializable), |
1304 | 0 | PgMiniTestTxnHelperSerializable) { |
1305 | 0 | TestReferencedTableUpdate(); |
1306 | 0 | } |
1307 | | |
1308 | | TEST_F_EX(PgMiniTest, |
1309 | | YB_DISABLE_TEST_IN_TSAN(ReferencedTableUpdateSnapshot), |
1310 | 0 | PgMiniTestTxnHelperSnapshot) { |
1311 | 0 | TestReferencedTableUpdate(); |
1312 | 0 | } |
1313 | | |
1314 | | TEST_F_EX(PgMiniTest, |
1315 | | YB_DISABLE_TEST_IN_TSAN(RowKeyShareLockSerializable), |
1316 | 0 | PgMiniTestTxnHelperSerializable) { |
1317 | 0 | TestRowKeyShareLock(); |
1318 | 0 | } |
1319 | | |
1320 | | TEST_F_EX(PgMiniTest, |
1321 | | YB_DISABLE_TEST_IN_TSAN(RowKeyShareLockSnapshot), |
1322 | 0 | PgMiniTestTxnHelperSnapshot) { |
1323 | 0 | TestRowKeyShareLock(); |
1324 | 0 | } |
1325 | | |
1326 | | TEST_F_EX(PgMiniTest, |
1327 | | YB_DISABLE_TEST_IN_TSAN(RowLockConflictMatrixSerializable), |
1328 | 0 | PgMiniTestTxnHelperSerializable) { |
1329 | 0 | TestRowLockConflictMatrix(); |
1330 | 0 | } |
1331 | | |
1332 | | TEST_F_EX(PgMiniTest, |
1333 | | YB_DISABLE_TEST_IN_TSAN(RowLockConflictMatrixSnapshot), |
1334 | 0 | PgMiniTestTxnHelperSnapshot) { |
1335 | 0 | TestRowLockConflictMatrix(); |
1336 | 0 | } |
1337 | | |
1338 | | TEST_F_EX(PgMiniTest, |
1339 | | YB_DISABLE_TEST_IN_TSAN(CursorRowKeyShareLockSerializable), |
1340 | 0 | PgMiniTestTxnHelperSerializable) { |
1341 | 0 | TestRowKeyShareLock("cur_name"); |
1342 | 0 | } |
1343 | | |
1344 | | TEST_F_EX(PgMiniTest, |
1345 | | YB_DISABLE_TEST_IN_TSAN(CursorRowKeyShareLockSnapshot), |
1346 | 0 | PgMiniTestTxnHelperSnapshot) { |
1347 | 0 | TestRowKeyShareLock("cur_name"); |
1348 | 0 | } |
1349 | | |
1350 | | TEST_F_EX(PgMiniTest, |
1351 | | YB_DISABLE_TEST_IN_TSAN(CursorRowLockConflictMatrixSerializable), |
1352 | 0 | PgMiniTestTxnHelperSerializable) { |
1353 | 0 | TestRowLockConflictMatrix("cur_name"); |
1354 | 0 | } |
1355 | | |
1356 | | TEST_F_EX(PgMiniTest, |
1357 | | YB_DISABLE_TEST_IN_TSAN(CursorRowLockConflictMatrixSnapshot), |
1358 | 0 | PgMiniTestTxnHelperSnapshot) { |
1359 | 0 | TestRowLockConflictMatrix("cur_name"); |
1360 | 0 | } |
1361 | | |
1362 | | TEST_F_EX(PgMiniTest, |
1363 | | YB_DISABLE_TEST_IN_TSAN(SameColumnUpdateSerializable), |
1364 | 0 | PgMiniTestTxnHelperSerializable) { |
1365 | 0 | TestSameColumnUpdate(); |
1366 | 0 | } |
1367 | | |
1368 | | TEST_F_EX(PgMiniTest, |
1369 | | YB_DISABLE_TEST_IN_TSAN(SameColumnUpdateSnapshot), |
1370 | 0 | PgMiniTestTxnHelperSnapshot) { |
1371 | 0 | TestSameColumnUpdate(); |
1372 | 0 | } |
1373 | | |
1374 | | // ------------------------------------------------------------------------------------------------ |
1375 | | // A test performing manual transaction control on system tables. |
1376 | | // ------------------------------------------------------------------------------------------------ |
1377 | | |
1378 | 0 | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SystemTableTxnTest), PgMiniTestNoTxnRetry) { |
1379 | | |
1380 | | // Resolving conflicts between transactions on a system table. |
1381 | | // |
1382 | | // postgres=# \d pg_ts_dict; |
1383 | | // |
1384 | | // Table "pg_catalog.pg_ts_dict" |
1385 | | // Column | Type | Collation | Nullable | Default |
1386 | | // ----------------+------+-----------+----------+--------- |
1387 | | // dictname | name | | not null | |
1388 | | // dictnamespace | oid | | not null | |
1389 | | // dictowner | oid | | not null | |
1390 | | // dicttemplate | oid | | not null | |
1391 | | // dictinitoption | text | | | |
1392 | | // Indexes: |
1393 | | // "pg_ts_dict_oid_index" PRIMARY KEY, lsm (oid) |
1394 | | // "pg_ts_dict_dictname_index" UNIQUE, lsm (dictname, dictnamespace) |
1395 | |
|
1396 | 0 | auto conn1 = ASSERT_RESULT(Connect()); |
1397 | 0 | auto conn2 = ASSERT_RESULT(Connect()); |
1398 | 0 | ASSERT_OK(conn1.Execute("SET yb_non_ddl_txn_for_sys_tables_allowed=1")); |
1399 | 0 | ASSERT_OK(conn2.Execute("SET yb_non_ddl_txn_for_sys_tables_allowed=1")); |
1400 | |
|
1401 | 0 | size_t commit1_fail_count = 0; |
1402 | 0 | size_t commit2_fail_count = 0; |
1403 | 0 | size_t insert2_fail_count = 0; |
1404 | |
|
1405 | 0 | const auto kStartTxnStatementStr = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; |
1406 | 0 | const int iterations = 48; |
1407 | 0 | for (int i = 1; i <= iterations; ++i) { |
1408 | 0 | std::string dictname = Format("contendedkey$0", i); |
1409 | 0 | const int dictnamespace = i; |
1410 | 0 | ASSERT_OK(conn1.Execute(kStartTxnStatementStr)); |
1411 | 0 | ASSERT_OK(conn2.Execute(kStartTxnStatementStr)); |
1412 | | |
1413 | | // Insert a row in each transaction. The first insert should always succeed. |
1414 | 0 | ASSERT_OK(conn1.Execute( |
1415 | 0 | Format("INSERT INTO pg_ts_dict VALUES ('$0', $1, 1, 2, 'b')", dictname, dictnamespace))); |
1416 | 0 | Status insert_status2 = conn2.Execute( |
1417 | 0 | Format("INSERT INTO pg_ts_dict VALUES ('$0', $1, 3, 4, 'c')", dictname, dictnamespace)); |
1418 | 0 | if (!insert_status2.ok()) { |
1419 | 0 | LOG(INFO) << "MUST BE A CONFLICT: Insert failed: " << insert_status2; |
1420 | 0 | insert2_fail_count++; |
1421 | 0 | } |
1422 | |
|
1423 | 0 | Status commit_status1; |
1424 | 0 | Status commit_status2; |
1425 | 0 | if (RandomUniformBool()) { |
1426 | 0 | commit_status1 = conn1.Execute("COMMIT"); |
1427 | 0 | commit_status2 = conn2.Execute("COMMIT"); |
1428 | 0 | } else { |
1429 | 0 | commit_status2 = conn2.Execute("COMMIT"); |
1430 | 0 | commit_status1 = conn1.Execute("COMMIT"); |
1431 | 0 | } |
1432 | 0 | if (!commit_status1.ok()) { |
1433 | 0 | commit1_fail_count++; |
1434 | 0 | } |
1435 | 0 | if (!commit_status2.ok()) { |
1436 | 0 | commit2_fail_count++; |
1437 | 0 | } |
1438 | |
|
1439 | 0 | auto get_commit_statuses_str = [&commit_status1, &commit_status2]() { |
1440 | 0 | return Format("commit_status1=$0, commit_status2=$1", commit_status1, commit_status2); |
1441 | 0 | }; |
1442 | |
|
1443 | 0 | bool succeeded1 = commit_status1.ok(); |
1444 | 0 | bool succeeded2 = insert_status2.ok() && commit_status2.ok(); |
1445 | |
|
1446 | 0 | ASSERT_TRUE(!succeeded1 || !succeeded2) |
1447 | 0 | << "Both transactions can't commit. " << get_commit_statuses_str(); |
1448 | 0 | ASSERT_TRUE(succeeded1 || succeeded2) |
1449 | 0 | << "We expect one of the two transactions to succeed. " << get_commit_statuses_str(); |
1450 | 0 | if (!commit_status1.ok()) { |
1451 | 0 | ASSERT_OK(conn1.Execute("ROLLBACK")); |
1452 | 0 | } |
1453 | 0 | if (!commit_status2.ok()) { |
1454 | 0 | ASSERT_OK(conn2.Execute("ROLLBACK")); |
1455 | 0 | } |
1456 | |
|
1457 | 0 | if (RandomUniformBool()) { |
1458 | 0 | std::swap(conn1, conn2); |
1459 | 0 | } |
1460 | 0 | } |
1461 | 0 | LOG(INFO) << "Test stats: " |
1462 | 0 | << EXPR_VALUE_FOR_LOG(commit1_fail_count) << ", " |
1463 | 0 | << EXPR_VALUE_FOR_LOG(insert2_fail_count) << ", " |
1464 | 0 | << EXPR_VALUE_FOR_LOG(commit2_fail_count); |
1465 | 0 | ASSERT_GE(commit1_fail_count, iterations / 4); |
1466 | 0 | ASSERT_GE(insert2_fail_count, iterations / 4); |
1467 | 0 | ASSERT_EQ(commit2_fail_count, 0); |
1468 | 0 | } |
1469 | | |
1470 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBUpdateSysTablet)) { |
1471 | 0 | const std::string kDatabaseName = "testdb"; |
1472 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
1473 | 0 | std::array<int, 4> num_tables; |
1474 | |
|
1475 | 0 | auto* catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
1476 | 0 | auto sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId)); |
1477 | 0 | { |
1478 | 0 | auto tablet_lock = sys_tablet->LockForWrite(); |
1479 | 0 | num_tables[0] = tablet_lock->pb.table_ids_size(); |
1480 | 0 | } |
1481 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName)); |
1482 | 0 | { |
1483 | 0 | auto tablet_lock = sys_tablet->LockForWrite(); |
1484 | 0 | num_tables[1] = tablet_lock->pb.table_ids_size(); |
1485 | 0 | } |
1486 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); |
1487 | 0 | { |
1488 | 0 | auto tablet_lock = sys_tablet->LockForWrite(); |
1489 | 0 | num_tables[2] = tablet_lock->pb.table_ids_size(); |
1490 | 0 | } |
1491 | | // Make sure that the system catalog tablet table_ids is persisted. |
1492 | 0 | ASSERT_OK(RestartCluster()); |
1493 | 0 | catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
1494 | 0 | sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId)); |
1495 | 0 | { |
1496 | 0 | auto tablet_lock = sys_tablet->LockForWrite(); |
1497 | 0 | num_tables[3] = tablet_lock->pb.table_ids_size(); |
1498 | 0 | } |
1499 | 0 | ASSERT_LT(num_tables[0], num_tables[1]); |
1500 | 0 | ASSERT_EQ(num_tables[0], num_tables[2]); |
1501 | 0 | ASSERT_EQ(num_tables[0], num_tables[3]); |
1502 | 0 | } |
1503 | | |
1504 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBMarkDeleted)) { |
1505 | 0 | const std::string kDatabaseName = "testdb"; |
1506 | 0 | constexpr auto kSleepTime = 500ms; |
1507 | 0 | constexpr int kMaxNumSleeps = 20; |
1508 | 0 | auto *catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
1509 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
1510 | |
|
1511 | 0 | ASSERT_FALSE(catalog_manager->AreTablesDeleting()); |
1512 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName)); |
1513 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); |
1514 | | // System tables should be deleting then deleted. |
1515 | 0 | int num_sleeps = 0; |
1516 | 0 | while (catalog_manager->AreTablesDeleting() && (num_sleeps++ != kMaxNumSleeps)) { |
1517 | 0 | LOG(INFO) << "Tables are deleting..."; |
1518 | 0 | std::this_thread::sleep_for(kSleepTime); |
1519 | 0 | } |
1520 | 0 | ASSERT_FALSE(catalog_manager->AreTablesDeleting()) << "Tables should have finished deleting"; |
1521 | | // Make sure that the table deletions are persisted. |
1522 | 0 | ASSERT_OK(RestartCluster()); |
1523 | | // Refresh stale local variable after RestartSync. |
1524 | 0 | catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
1525 | 0 | ASSERT_FALSE(catalog_manager->AreTablesDeleting()); |
1526 | 0 | } |
1527 | | |
1528 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBWithTables)) { |
1529 | 0 | const std::string kDatabaseName = "testdb"; |
1530 | 0 | const std::string kTablePrefix = "testt"; |
1531 | 0 | constexpr auto kSleepTime = 500ms; |
1532 | 0 | constexpr int kMaxNumSleeps = 20; |
1533 | 0 | int num_tables_before, num_tables_after; |
1534 | 0 | auto *catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
1535 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
1536 | 0 | auto sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId)); |
1537 | |
|
1538 | 0 | { |
1539 | 0 | auto tablet_lock = sys_tablet->LockForWrite(); |
1540 | 0 | num_tables_before = tablet_lock->pb.table_ids_size(); |
1541 | 0 | } |
1542 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName)); |
1543 | 0 | { |
1544 | 0 | PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1545 | 0 | for (int i = 0; i < 10; ++i) { |
1546 | 0 | ASSERT_OK(conn_new.ExecuteFormat("CREATE TABLE $0$1 (i int)", kTablePrefix, i)); |
1547 | 0 | } |
1548 | 0 | ASSERT_OK(conn_new.ExecuteFormat("INSERT INTO $0$1 (i) VALUES (1), (2), (3)", kTablePrefix, 5)); |
1549 | 0 | } |
1550 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); |
1551 | | // User and system tables should be deleting then deleted. |
1552 | 0 | int num_sleeps = 0; |
1553 | 0 | while (catalog_manager->AreTablesDeleting() && (num_sleeps++ != kMaxNumSleeps)) { |
1554 | 0 | LOG(INFO) << "Tables are deleting..."; |
1555 | 0 | std::this_thread::sleep_for(kSleepTime); |
1556 | 0 | } |
1557 | 0 | ASSERT_FALSE(catalog_manager->AreTablesDeleting()) << "Tables should have finished deleting"; |
1558 | | // Make sure that the table deletions are persisted. |
1559 | 0 | ASSERT_OK(RestartCluster()); |
1560 | 0 | catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
1561 | 0 | sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId)); |
1562 | 0 | ASSERT_FALSE(catalog_manager->AreTablesDeleting()); |
1563 | 0 | { |
1564 | 0 | auto tablet_lock = sys_tablet->LockForWrite(); |
1565 | 0 | num_tables_after = tablet_lock->pb.table_ids_size(); |
1566 | 0 | } |
1567 | 0 | ASSERT_EQ(num_tables_before, num_tables_after); |
1568 | 0 | } |
1569 | | |
1570 | | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(DropAllTablesInColocatedDB), |
1571 | 0 | PgMiniMasterFailoverTest) { |
1572 | 0 | const std::string kDatabaseName = "testdb"; |
1573 | | // Create a colocated DB, create some tables, delete all of them. |
1574 | 0 | { |
1575 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
1576 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 with colocated=true", kDatabaseName)); |
1577 | 0 | { |
1578 | 0 | PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1579 | 0 | ASSERT_OK(conn_new.Execute("CREATE TABLE foo (i int)")); |
1580 | 0 | ASSERT_OK(conn_new.Execute("DROP TABLE foo")); |
1581 | 0 | } |
1582 | 0 | } |
1583 | | // Failover to a new master. |
1584 | 0 | LOG(INFO) << "Failover to new Master"; |
1585 | 0 | auto old_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); |
1586 | 0 | ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Shutdown(); |
1587 | 0 | auto new_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); |
1588 | 0 | ASSERT_NE(nullptr, new_master); |
1589 | 0 | ASSERT_NE(old_master, new_master); |
1590 | | // Wait for all the TabletServers to report in, so we can run CREATE TABLE with working replicas. |
1591 | 0 | ASSERT_OK(cluster_->WaitForAllTabletServers()); |
1592 | | // Ensure we can still access the colocated DB on restart. |
1593 | 0 | { |
1594 | 0 | PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1595 | 0 | ASSERT_OK(conn_new.Execute("CREATE TABLE foo (i int)")); |
1596 | 0 | } |
1597 | 0 | } |
1598 | | |
1599 | | |
1600 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigSelect)) { |
1601 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1602 | |
|
1603 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value TEXT)")); |
1604 | |
|
1605 | 0 | constexpr size_t kRows = 400; |
1606 | 0 | constexpr size_t kValueSize = RegularBuildVsSanitizers(256_KB, 4_KB); |
1607 | |
|
1608 | 0 | for (size_t i = 0; i != kRows; ++i) { |
1609 | 0 | ASSERT_OK(conn.ExecuteFormat( |
1610 | 0 | "INSERT INTO t VALUES ($0, '$1')", i, RandomHumanReadableString(kValueSize))); |
1611 | 0 | } |
1612 | |
|
1613 | 0 | auto start = MonoTime::Now(); |
1614 | 0 | auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(DISTINCT(value)) FROM t")); |
1615 | 0 | auto finish = MonoTime::Now(); |
1616 | 0 | LOG(INFO) << "Time: " << finish - start; |
1617 | 0 | ASSERT_EQ(res, kRows); |
1618 | 0 | } |
1619 | | |
1620 | 0 | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ManyRowsInsert), PgMiniSingleTServerTest) { |
1621 | 0 | constexpr int kRows = 100000; |
1622 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1623 | |
|
1624 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)")); |
1625 | |
|
1626 | 0 | auto start = MonoTime::Now(); |
1627 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO t SELECT generate_series(1, $0)", kRows)); |
1628 | 0 | auto finish = MonoTime::Now(); |
1629 | 0 | LOG(INFO) << "Time: " << finish - start; |
1630 | 0 | } |
1631 | | |
1632 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(MoveMaster)) { |
1633 | 0 | ShutdownAllMasters(cluster_.get()); |
1634 | 0 | cluster_->mini_master(0)->set_pass_master_addresses(false); |
1635 | 0 | ASSERT_OK(StartAllMasters(cluster_.get())); |
1636 | |
|
1637 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1638 | |
|
1639 | 0 | ASSERT_OK(WaitFor([&conn] { |
1640 | 0 | auto status = conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)"); |
1641 | 0 | WARN_NOT_OK(status, "Failed to create table"); |
1642 | 0 | return status.ok(); |
1643 | 0 | }, 15s, "Create table")); |
1644 | 0 | } |
1645 | | |
1646 | | class PgMiniBigPrefetchTest : public PgMiniSingleTServerTest { |
1647 | | protected: |
1648 | 0 | void SetUp() override { |
1649 | 0 | FLAGS_ysql_prefetch_limit = 20000000; |
1650 | 0 | PgMiniTest::SetUp(); |
1651 | 0 | } |
1652 | | |
1653 | 0 | void Run(int rows, int block_size, int reads, bool compact = false) { |
1654 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1655 | |
|
1656 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY) SPLIT INTO 1 TABLETS")); |
1657 | 0 | auto last_row = 0; |
1658 | 0 | while (last_row < rows) { |
1659 | 0 | auto first_row = last_row + 1; |
1660 | 0 | last_row = std::min(rows, last_row + block_size); |
1661 | 0 | ASSERT_OK(conn.ExecuteFormat( |
1662 | 0 | "INSERT INTO t SELECT generate_series($0, $1)", first_row, last_row)); |
1663 | 0 | } |
1664 | |
|
1665 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
1666 | 0 | for (const auto& peer : peers) { |
1667 | 0 | auto tp = peer->tablet()->transaction_participant(); |
1668 | 0 | if (tp) { |
1669 | 0 | LOG(INFO) << peer->LogPrefix() << "Intents: " << tp->TEST_CountIntents().first; |
1670 | 0 | } |
1671 | 0 | } |
1672 | |
|
1673 | 0 | if (compact) { |
1674 | 0 | FlushAndCompactTablets(); |
1675 | 0 | } |
1676 | |
|
1677 | 0 | LOG(INFO) << "Perform read"; |
1678 | |
|
1679 | 0 | if (VLOG_IS_ON(4)) { |
1680 | 0 | google::SetVLOGLevel("intent_aware_iterator", 4); |
1681 | 0 | google::SetVLOGLevel("docdb_rocksdb_util", 4); |
1682 | 0 | google::SetVLOGLevel("docdb", 4); |
1683 | 0 | } |
1684 | |
|
1685 | 0 | for (int i = 0; i != reads; ++i) { |
1686 | 0 | auto start = MonoTime::Now(); |
1687 | 0 | auto fetched_rows = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT count(*) FROM t")); |
1688 | 0 | auto finish = MonoTime::Now(); |
1689 | 0 | ASSERT_EQ(rows, fetched_rows); |
1690 | 0 | LOG(INFO) << i << ") Full Time: " << finish - start; |
1691 | 0 | } |
1692 | 0 | } |
1693 | | }; |
1694 | | |
1695 | 0 | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigRead), PgMiniBigPrefetchTest) { |
1696 | 0 | constexpr int kRows = RegularBuildVsDebugVsSanitizers(1000000, 100000, 10000); |
1697 | 0 | constexpr int kBlockSize = 1000; |
1698 | 0 | constexpr int kReads = 3; |
1699 | |
|
1700 | 0 | Run(kRows, kBlockSize, kReads); |
1701 | 0 | } |
1702 | | |
1703 | 0 | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigReadWithCompaction), PgMiniBigPrefetchTest) { |
1704 | 0 | constexpr int kRows = RegularBuildVsDebugVsSanitizers(1000000, 100000, 10000); |
1705 | 0 | constexpr int kBlockSize = 1000; |
1706 | 0 | constexpr int kReads = 3; |
1707 | |
|
1708 | 0 | Run(kRows, kBlockSize, kReads, /* compact= */ true); |
1709 | 0 | } |
1710 | | |
1711 | 0 | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SmallRead), PgMiniBigPrefetchTest) { |
1712 | 0 | constexpr int kRows = 10; |
1713 | 0 | constexpr int kBlockSize = kRows; |
1714 | 0 | constexpr int kReads = 1; |
1715 | |
|
1716 | 0 | Run(kRows, kBlockSize, kReads); |
1717 | 0 | } |
1718 | | |
1719 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DDLWithRestart)) { |
1720 | 0 | SetAtomicFlag(1.0, &FLAGS_TEST_transaction_ignore_applying_probability); |
1721 | 0 | FLAGS_TEST_force_master_leader_resolution = true; |
1722 | |
|
1723 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1724 | |
|
1725 | 0 | ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
1726 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY)")); |
1727 | 0 | ASSERT_OK(conn.CommitTransaction()); |
1728 | |
|
1729 | 0 | ShutdownAllMasters(cluster_.get()); |
1730 | |
|
1731 | 0 | LOG(INFO) << "Start masters"; |
1732 | 0 | ASSERT_OK(StartAllMasters(cluster_.get())); |
1733 | |
|
1734 | 0 | auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t")); |
1735 | 0 | ASSERT_EQ(res, 0); |
1736 | 0 | } |
1737 | | |
1738 | | class PgMiniRocksDbIteratorLoggingTest : public PgMiniSingleTServerTest { |
1739 | | public: |
1740 | | struct IteratorLoggingTestConfig { |
1741 | | int num_non_pk_columns; |
1742 | | int num_rows; |
1743 | | int num_overwrites; |
1744 | | int first_row_to_scan; |
1745 | | int last_row_to_scan; |
1746 | | }; |
1747 | | |
1748 | 0 | void RunIteratorLoggingTest(const IteratorLoggingTestConfig& config) { |
1749 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1750 | |
|
1751 | 0 | std::string non_pk_columns_schema; |
1752 | 0 | std::string non_pk_column_names; |
1753 | 0 | for (int i = 0; i < config.num_non_pk_columns; ++i) { |
1754 | 0 | non_pk_columns_schema += Format(", $0 TEXT", GetNonPkColName(i)); |
1755 | 0 | non_pk_column_names += Format(", $0", GetNonPkColName(i)); |
1756 | 0 | } |
1757 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE t (pk TEXT, PRIMARY KEY (pk ASC)$0)", |
1758 | 0 | non_pk_columns_schema)); |
1759 | | // Delete and overwrite every row multiple times. |
1760 | 0 | for (int overwrite_index = 0; overwrite_index < config.num_overwrites; ++overwrite_index) { |
1761 | 0 | for (int row_index = 0; row_index < config.num_rows; ++row_index) { |
1762 | 0 | string non_pk_values; |
1763 | 0 | for (int non_pk_col_index = 0; |
1764 | 0 | non_pk_col_index < config.num_non_pk_columns; |
1765 | 0 | ++non_pk_col_index) { |
1766 | 0 | non_pk_values += Format(", '$0'", GetNonPkColValue( |
1767 | 0 | non_pk_col_index, row_index, overwrite_index)); |
1768 | 0 | } |
1769 | |
|
1770 | 0 | const auto pk_value = GetPkForRow(row_index); |
1771 | 0 | ASSERT_OK(conn.ExecuteFormat( |
1772 | 0 | "INSERT INTO t(pk$0) VALUES('$1'$2)", non_pk_column_names, pk_value, non_pk_values)); |
1773 | 0 | if (overwrite_index != config.num_overwrites - 1) { |
1774 | 0 | ASSERT_OK(conn.ExecuteFormat("DELETE FROM t WHERE pk = '$0'", pk_value)); |
1775 | 0 | } |
1776 | 0 | } |
1777 | 0 | } |
1778 | 0 | const auto first_pk_to_scan = GetPkForRow(config.first_row_to_scan); |
1779 | 0 | const auto last_pk_to_scan = GetPkForRow(config.last_row_to_scan); |
1780 | 0 | auto count_stmt_str = Format( |
1781 | 0 | "SELECT COUNT(*) FROM t WHERE pk >= '$0' AND pk <= '$1'", |
1782 | 0 | first_pk_to_scan, |
1783 | 0 | last_pk_to_scan); |
1784 | | // Do the same scan twice, and only turn on iterator logging on the second scan. |
1785 | | // This way we won't be logging system table operations needed to fetch PostgreSQL metadata. |
1786 | 0 | for (bool is_warmup : {true, false}) { |
1787 | 0 | if (!is_warmup) { |
1788 | 0 | SetAtomicFlag(true, &FLAGS_rocksdb_use_logging_iterator); |
1789 | 0 | } |
1790 | 0 | auto count_result = ASSERT_RESULT(conn.Fetch(count_stmt_str)); |
1791 | 0 | ASSERT_EQ(PQntuples(count_result.get()), 1); |
1792 | |
|
1793 | 0 | auto actual_num_rows = ASSERT_RESULT(GetInt64(count_result.get(), 0, 0)); |
1794 | 0 | const int expected_num_rows = config.last_row_to_scan - config.first_row_to_scan + 1; |
1795 | 0 | ASSERT_EQ(expected_num_rows, actual_num_rows); |
1796 | 0 | } |
1797 | 0 | SetAtomicFlag(false, &FLAGS_rocksdb_use_logging_iterator); |
1798 | 0 | } |
1799 | | |
1800 | | private: |
1801 | 0 | std::string GetNonPkColName(int non_pk_col_index) { |
1802 | 0 | return Format("non_pk_col$0", non_pk_col_index); |
1803 | 0 | } |
1804 | | |
1805 | 0 | std::string GetPkForRow(int row_index) { |
1806 | 0 | return Format("PrimaryKeyForRow$0", row_index); |
1807 | 0 | } |
1808 | | |
1809 | 0 | std::string GetNonPkColValue(int non_pk_col_index, int row_index, int overwrite_index) { |
1810 | 0 | return Format("NonPkCol$0ValueForRow$1Overwrite$2", |
1811 | 0 | non_pk_col_index, row_index, overwrite_index); |
1812 | 0 | } |
1813 | | }; |
1814 | | |
1815 | | TEST_F_EX(PgMiniTest, |
1816 | 0 | YB_DISABLE_TEST_IN_TSAN(IteratorLogPkOnly), PgMiniRocksDbIteratorLoggingTest) { |
1817 | 0 | RunIteratorLoggingTest({ |
1818 | 0 | .num_non_pk_columns = 0, |
1819 | 0 | .num_rows = 5, |
1820 | 0 | .num_overwrites = 100, |
1821 | 0 | .first_row_to_scan = 1, // 0-based |
1822 | 0 | .last_row_to_scan = 3, |
1823 | 0 | }); |
1824 | 0 | } |
1825 | | |
1826 | | TEST_F_EX(PgMiniTest, |
1827 | 0 | YB_DISABLE_TEST_IN_TSAN(IteratorLogTwoNonPkCols), PgMiniRocksDbIteratorLoggingTest) { |
1828 | 0 | RunIteratorLoggingTest({ |
1829 | 0 | .num_non_pk_columns = 2, |
1830 | 0 | .num_rows = 5, |
1831 | 0 | .num_overwrites = 100, |
1832 | 0 | .first_row_to_scan = 1, // 0-based |
1833 | 0 | .last_row_to_scan = 3, |
1834 | 0 | }); |
1835 | 0 | } |
1836 | | |
1837 | | // ------------------------------------------------------------------------------------------------ |
1838 | | // Backward scan on an index |
1839 | | // ------------------------------------------------------------------------------------------------ |
1840 | | |
1841 | | class PgMiniBackwardIndexScanTest : public PgMiniSingleTServerTest { |
1842 | | protected: |
1843 | 0 | void BackwardIndexScanTest(bool uncommitted_intents) { |
1844 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1845 | |
|
1846 | 0 | ASSERT_OK(conn.Execute(R"#( |
1847 | 0 | create table events_backwardscan ( |
1848 | 0 |
|
1849 | 0 | log text not null, |
1850 | 0 | src text not null, |
1851 | 0 | inserted timestamp(3) without time zone not null, |
1852 | 0 | created timestamp(3) without time zone not null, |
1853 | 0 | data jsonb not null, |
1854 | 0 |
|
1855 | 0 | primary key (log, src, created) |
1856 | 0 | ); |
1857 | 0 | )#")); |
1858 | 0 | ASSERT_OK(conn.Execute("create index on events_backwardscan (inserted asc);")); |
1859 | |
|
1860 | 0 | for (int day = 1; day <= 31; ++day) { |
1861 | 0 | ASSERT_OK(conn.ExecuteFormat(R"#( |
1862 | 0 | insert into events_backwardscan |
1863 | 0 |
|
1864 | 0 | select |
1865 | 0 | 'log', |
1866 | 0 | 'src', |
1867 | 0 | t, |
1868 | 0 | t, |
1869 | 0 | '{}' |
1870 | 0 |
|
1871 | 0 | from generate_series( |
1872 | 0 | timestamp '2020-01-$0 00:00:00', |
1873 | 0 | timestamp '2020-01-$0 23:59:59', |
1874 | 0 | interval '1 minute' |
1875 | 0 | ) |
1876 | 0 |
|
1877 | 0 | as t(day); |
1878 | 0 | )#", day)); |
1879 | 0 | } |
1880 | |
|
1881 | 0 | boost::optional<PGConn> uncommitted_intents_conn; |
1882 | 0 | if (uncommitted_intents) { |
1883 | 0 | uncommitted_intents_conn = ASSERT_RESULT(Connect()); |
1884 | 0 | ASSERT_OK(uncommitted_intents_conn->Execute("BEGIN")); |
1885 | 0 | auto ts = "1970-01-01 00:00:00"; |
1886 | 0 | ASSERT_OK(uncommitted_intents_conn->ExecuteFormat( |
1887 | 0 | "insert into events_backwardscan values ('log', 'src', '$0', '$0', '{}')", ts, ts)); |
1888 | 0 | } |
1889 | |
|
1890 | 0 | auto count = ASSERT_RESULT( |
1891 | 0 | conn.FetchValue<int64_t>("SELECT COUNT(*) FROM events_backwardscan")); |
1892 | 0 | LOG(INFO) << "Total rows inserted: " << count; |
1893 | |
|
1894 | 0 | auto select_result = ASSERT_RESULT(conn.Fetch( |
1895 | 0 | "select * from events_backwardscan order by inserted desc limit 100" |
1896 | 0 | )); |
1897 | 0 | ASSERT_EQ(PQntuples(select_result.get()), 100); |
1898 | |
|
1899 | 0 | if (uncommitted_intents) { |
1900 | 0 | ASSERT_OK(uncommitted_intents_conn->Execute("ROLLBACK")); |
1901 | 0 | } |
1902 | 0 | } |
1903 | | }; |
1904 | | |
1905 | | TEST_F_EX(PgMiniTest, |
1906 | | YB_DISABLE_TEST_IN_TSAN(BackwardIndexScanNoIntents), |
1907 | 0 | PgMiniBackwardIndexScanTest) { |
1908 | 0 | BackwardIndexScanTest(/* uncommitted_intents */ false); |
1909 | 0 | } |
1910 | | |
1911 | | TEST_F_EX(PgMiniTest, |
1912 | | YB_DISABLE_TEST_IN_TSAN(BackwardIndexScanWithIntents), |
1913 | 0 | PgMiniBackwardIndexScanTest) { |
1914 | 0 | BackwardIndexScanTest(/* uncommitted_intents */ true); |
1915 | 0 | } |
1916 | | |
1917 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(CreateDatabase)) { |
1918 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
1919 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1920 | 0 | const std::string kDatabaseName = "testdb"; |
1921 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName)); |
1922 | 0 | ASSERT_OK(RestartCluster()); |
1923 | 0 | } |
1924 | | |
1925 | 0 | void PgMiniTest::TestBigInsert(bool restart) { |
1926 | 0 | constexpr int64_t kNumRows = RegularBuildVsSanitizers(100000, 10000); |
1927 | 0 | FLAGS_txn_max_apply_batch_records = kNumRows / 10; |
1928 | |
|
1929 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1930 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY) SPLIT INTO 1 TABLETS")); |
1931 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t VALUES (0)")); |
1932 | |
|
1933 | 0 | TestThreadHolder thread_holder; |
1934 | |
|
1935 | 0 | std::atomic<int> post_insert_reads{0}; |
1936 | 0 | std::atomic<bool> restarted{false}; |
1937 | 0 | thread_holder.AddThreadFunctor( |
1938 | 0 | [this, &stop = thread_holder.stop_flag(), &post_insert_reads, &restarted] { |
1939 | 0 | auto connection = ASSERT_RESULT(Connect()); |
1940 | 0 | while (!stop.load(std::memory_order_acquire)) { |
1941 | 0 | auto res = connection.FetchValue<int64_t>("SELECT SUM(a) FROM t"); |
1942 | 0 | if (!res.ok()) { |
1943 | 0 | auto msg = res.status().message().ToBuffer(); |
1944 | 0 | ASSERT_TRUE(msg.find("server closed the connection unexpectedly") != std::string::npos) |
1945 | 0 | << res.status(); |
1946 | 0 | while (!restarted.load() && !stop.load()) { |
1947 | 0 | std::this_thread::sleep_for(10ms); |
1948 | 0 | } |
1949 | 0 | std::this_thread::sleep_for(1s); |
1950 | 0 | LOG(INFO) << "Establishing new connection"; |
1951 | 0 | connection = ASSERT_RESULT(Connect()); |
1952 | 0 | restarted = false; |
1953 | 0 | continue; |
1954 | 0 | } |
1955 | | |
1956 | | // We should see zero or full sum only. |
1957 | 0 | if (*res) { |
1958 | 0 | ASSERT_EQ(*res, kNumRows * (kNumRows + 1) / 2); |
1959 | 0 | ++post_insert_reads; |
1960 | 0 | } |
1961 | 0 | } |
1962 | 0 | }); |
1963 | |
|
1964 | 0 | ASSERT_OK(conn.ExecuteFormat( |
1965 | 0 | "INSERT INTO t SELECT generate_series(1, $0)", kNumRows)); |
1966 | |
|
1967 | 0 | if (restart) { |
1968 | 0 | LOG(INFO) << "Restart cluster"; |
1969 | 0 | ASSERT_OK(RestartCluster()); |
1970 | 0 | restarted = true; |
1971 | 0 | } |
1972 | |
|
1973 | 0 | ASSERT_OK(WaitFor([this, &post_insert_reads] { |
1974 | 0 | auto intents_count = CountIntents(cluster_.get()); |
1975 | 0 | LOG(INFO) << "Intents count: " << intents_count; |
1976 | |
|
1977 | 0 | return intents_count == 0 && post_insert_reads.load(std::memory_order_acquire) > 0; |
1978 | 0 | }, 60s * kTimeMultiplier, "Intents cleanup", 200ms)); |
1979 | |
|
1980 | 0 | thread_holder.Stop(); |
1981 | |
|
1982 | 0 | FlushAndCompactTablets(); |
1983 | |
|
1984 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
1985 | 0 | for (const auto& peer : peers) { |
1986 | 0 | auto db = peer->tablet()->TEST_db(); |
1987 | 0 | if (!db) { |
1988 | 0 | continue; |
1989 | 0 | } |
1990 | 0 | rocksdb::ReadOptions read_opts; |
1991 | 0 | read_opts.query_id = rocksdb::kDefaultQueryId; |
1992 | 0 | std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_opts)); |
1993 | |
|
1994 | 0 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
1995 | 0 | Slice key = iter->key(); |
1996 | 0 | ASSERT_FALSE(key.TryConsumeByte(docdb::ValueTypeAsChar::kTransactionApplyState)) |
1997 | 0 | << "Key: " << iter->key().ToDebugString() << ", value: " << iter->value().ToDebugString(); |
1998 | 0 | } |
1999 | 0 | } |
2000 | 0 | } |
2001 | | |
2002 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsert)) { |
2003 | 0 | TestBigInsert(/* restart= */ false); |
2004 | 0 | } |
2005 | | |
2006 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsertWithRestart)) { |
2007 | 0 | FLAGS_apply_intents_task_injected_delay_ms = 200; |
2008 | 0 | TestBigInsert(/* restart= */ true); |
2009 | 0 | } |
2010 | | |
2011 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsertWithDropTable)) { |
2012 | 0 | constexpr int kNumRows = 10000; |
2013 | 0 | FLAGS_txn_max_apply_batch_records = kNumRows / 10; |
2014 | 0 | FLAGS_apply_intents_task_injected_delay_ms = 200; |
2015 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2016 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t(id int) SPLIT INTO 1 TABLETS")); |
2017 | 0 | ASSERT_OK(conn.ExecuteFormat( |
2018 | 0 | "INSERT INTO t SELECT generate_series(1, $0)", kNumRows)); |
2019 | 0 | ASSERT_OK(conn.Execute("DROP TABLE t")); |
2020 | 0 | } |
2021 | | |
2022 | 0 | void PgMiniTest::TestConcurrentDeleteRowAndUpdateColumn(bool select_before_update) { |
2023 | 0 | auto conn1 = ASSERT_RESULT(Connect()); |
2024 | 0 | auto conn2 = ASSERT_RESULT(Connect()); |
2025 | 0 | ASSERT_OK(conn1.Execute("CREATE TABLE t (i INT PRIMARY KEY, j INT)")); |
2026 | 0 | ASSERT_OK(conn1.Execute("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")); |
2027 | 0 | ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); |
2028 | 0 | if (select_before_update) { |
2029 | 0 | ASSERT_OK(conn1.Fetch("SELECT * FROM t")); |
2030 | 0 | } |
2031 | 0 | ASSERT_OK(conn2.Execute("DELETE FROM t WHERE i = 2")); |
2032 | 0 | auto status = conn1.Execute("UPDATE t SET j = 21 WHERE i = 2"); |
2033 | 0 | if (select_before_update) { |
2034 | 0 | ASSERT_NOK(status); |
2035 | 0 | ASSERT_STR_CONTAINS(status.message().ToBuffer(), "Value write after transaction start"); |
2036 | 0 | return; |
2037 | 0 | } |
2038 | 0 | ASSERT_OK(status); |
2039 | 0 | ASSERT_OK(conn1.CommitTransaction()); |
2040 | 0 | auto result = ASSERT_RESULT(conn1.FetchMatrix("SELECT * FROM t ORDER BY i", 2, 2)); |
2041 | 0 | auto value = ASSERT_RESULT(GetInt32(result.get(), 0, 0)); |
2042 | 0 | ASSERT_EQ(value, 1); |
2043 | 0 | value = ASSERT_RESULT(GetInt32(result.get(), 0, 1)); |
2044 | 0 | ASSERT_EQ(value, 10); |
2045 | 0 | value = ASSERT_RESULT(GetInt32(result.get(), 1, 0)); |
2046 | 0 | ASSERT_EQ(value, 3); |
2047 | 0 | value = ASSERT_RESULT(GetInt32(result.get(), 1, 1)); |
2048 | 0 | ASSERT_EQ(value, 30); |
2049 | 0 | } |
2050 | | |
2051 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDeleteRowAndUpdateColumn)) { |
2052 | 0 | TestConcurrentDeleteRowAndUpdateColumn(/* select_before_update= */ false); |
2053 | 0 | } |
2054 | | |
2055 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDeleteRowAndUpdateColumnWithSelect)) { |
2056 | 0 | TestConcurrentDeleteRowAndUpdateColumn(/* select_before_update= */ true); |
2057 | 0 | } |
2058 | | |
2059 | | // Test that we don't sequential restart read on the same table if intents were written |
2060 | | // after the first read. GH #6972. |
2061 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(NoRestartSecondRead)) { |
2062 | 0 | FLAGS_max_clock_skew_usec = 1000000000LL * kTimeMultiplier; |
2063 | 0 | auto conn1 = ASSERT_RESULT(Connect()); |
2064 | 0 | auto conn2 = ASSERT_RESULT(Connect()); |
2065 | 0 | ASSERT_OK(conn1.Execute("CREATE TABLE t (a int PRIMARY KEY, b int) SPLIT INTO 1 TABLETS")); |
2066 | 0 | ASSERT_OK(conn1.Execute("INSERT INTO t VALUES (1, 1), (2, 1), (3, 1)")); |
2067 | 0 | auto start_time = MonoTime::Now(); |
2068 | 0 | ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); |
2069 | 0 | LOG(INFO) << "Select1"; |
2070 | 0 | auto res = ASSERT_RESULT(conn1.FetchValue<int32_t>("SELECT b FROM t WHERE a = 1")); |
2071 | 0 | ASSERT_EQ(res, 1); |
2072 | 0 | LOG(INFO) << "Update"; |
2073 | 0 | ASSERT_OK(conn2.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); |
2074 | 0 | ASSERT_OK(conn2.Execute("UPDATE t SET b = 2 WHERE a = 2")); |
2075 | 0 | ASSERT_OK(conn2.CommitTransaction()); |
2076 | 0 | auto update_time = MonoTime::Now(); |
2077 | 0 | ASSERT_LE(update_time, start_time + FLAGS_max_clock_skew_usec * 1us); |
2078 | 0 | LOG(INFO) << "Select2"; |
2079 | 0 | res = ASSERT_RESULT(conn1.FetchValue<int32_t>("SELECT b FROM t WHERE a = 2")); |
2080 | 0 | ASSERT_EQ(res, 1); |
2081 | 0 | ASSERT_OK(conn1.CommitTransaction()); |
2082 | 0 | } |
2083 | | |
2084 | | TEST_F_EX( |
2085 | | PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SnapshotInOperatorLock), |
2086 | 0 | PgMiniTestTxnHelperSnapshot) { |
2087 | 0 | TestInOperatorLock(); |
2088 | 0 | } |
2089 | | |
2090 | | TEST_F_EX( |
2091 | | PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SerializableInOperatorLock), |
2092 | 0 | PgMiniTestTxnHelperSerializable) { |
2093 | 0 | TestInOperatorLock(); |
2094 | 0 | } |
2095 | | |
2096 | | // ------------------------------------------------------------------------------------------------ |
2097 | | // Tablet Splitting Tests |
2098 | | // ------------------------------------------------------------------------------------------------ |
2099 | | |
2100 | | class PgMiniTestAutoScanNextPartitions : public PgMiniTest { |
2101 | | public: |
2102 | 0 | void SetUp() override { |
2103 | 0 | FLAGS_TEST_index_read_multiple_partitions = true; |
2104 | 0 | PgMiniTest::SetUp(); |
2105 | 0 | } |
2106 | | }; |
2107 | | |
2108 | | TEST_F_EX( |
2109 | | PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(AutoScanNextPartitions), |
2110 | 0 | PgMiniTestAutoScanNextPartitions) { |
2111 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2112 | 0 | constexpr int numRows = 100; |
2113 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v1 INT, v2 INT) " |
2114 | 0 | "SPLIT INTO 6 TABLETS")); |
2115 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX ON t(v1, v2)")); |
2116 | | |
2117 | | // Insert elements into the table |
2118 | 0 | for (int i = 0; i < numRows; i++) { |
2119 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO t (k, v1, v2) VALUES ($0, $1, $2)", i, 1, i)); |
2120 | 0 | } |
2121 | | |
2122 | | // Secondary index read from the table |
2123 | | // While performing secondary index read on ybctids, the pggate layer batches requests belonging |
2124 | | // to the same tablet. However, if the tablet is split after batching, we need a mechanism to |
2125 | | // execute the batched request across both the sub-tablets. We create a scenario to test this |
2126 | | // phenomenon here. |
2127 | | // |
2128 | | // FLAGS_index_read_multiple_partitions is a test flag when set will create a scenario to check if |
2129 | | // index scans of ybctids span across multiple tablets. Specifically in this example, we try to |
2130 | | // scan the for elements that are present in tablets 0,1 which contain value v1=1 and see if they |
2131 | | // match the expected number of rows. |
2132 | 0 | auto res = ASSERT_RESULT(conn.Fetch("SELECT k FROM t WHERE v1 = 1")); |
2133 | 0 | auto lines = PQntuples(res.get()); |
2134 | 0 | ASSERT_EQ(lines, numRows); |
2135 | 0 | } |
2136 | | |
2137 | | class PgMiniTabletSplitTest : public PgMiniTest { |
2138 | | public: |
2139 | 0 | void SetUp() override { |
2140 | 0 | FLAGS_yb_num_shards_per_tserver = 1; |
2141 | 0 | FLAGS_tablet_split_low_phase_size_threshold_bytes = 0; |
2142 | 0 | FLAGS_tablet_split_high_phase_size_threshold_bytes = 0; |
2143 | 0 | FLAGS_tablet_split_low_phase_shard_count_per_node = 0; |
2144 | 0 | FLAGS_tablet_split_high_phase_shard_count_per_node = 0; |
2145 | 0 | FLAGS_tablet_force_split_threshold_bytes = 30_KB; |
2146 | 0 | FLAGS_db_write_buffer_size = FLAGS_tablet_force_split_threshold_bytes / 4; |
2147 | 0 | FLAGS_db_block_size_bytes = 2_KB; |
2148 | 0 | FLAGS_db_filter_block_size_bytes = 2_KB; |
2149 | 0 | FLAGS_db_index_block_size_bytes = 2_KB; |
2150 | 0 | FLAGS_heartbeat_interval_ms = 1000; |
2151 | 0 | FLAGS_tserver_heartbeat_metrics_interval_ms = 1000; |
2152 | 0 | FLAGS_TEST_inject_delay_between_prepare_ybctid_execute_batch_ybctid_ms = 4000; |
2153 | 0 | FLAGS_ysql_prefetch_limit = 32; |
2154 | 0 | PgMiniTest::SetUp(); |
2155 | 0 | } |
2156 | | }; |
2157 | | |
2158 | 0 | void PgMiniTest::CreateTableAndInitialize(std::string table_name, int num_tablets) { |
2159 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2160 | |
|
2161 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = false; |
2162 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (h1 int, h2 int, r int, i int, " |
2163 | 0 | "PRIMARY KEY ((h1, h2) HASH, r ASC)) " |
2164 | 0 | "SPLIT INTO $1 TABLETS", table_name, num_tablets)); |
2165 | |
|
2166 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx " |
2167 | 0 | "ON $1(i HASH, r ASC)", table_name, table_name)); |
2168 | |
|
2169 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 SELECT i, i, i, 1 FROM " |
2170 | 0 | "(SELECT generate_series(1, 500) i) t", table_name)); |
2171 | 0 | } |
2172 | | |
2173 | 0 | void PgMiniTest::DestroyTable(std::string table_name) { |
2174 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2175 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP TABLE $0", table_name)); |
2176 | 0 | } |
2177 | | |
2178 | 0 | void PgMiniTest::GetTableIDFromTableName(const std::string table_name, std::string* table_id) { |
2179 | | // Get YBClient handler and tablet ID. Using this we can get the number of tablets before starting |
2180 | | // the test and before the test ends. With this we can ensure that tablet splitting has occurred. |
2181 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
2182 | 0 | const auto tables = ASSERT_RESULT(client->ListTables()); |
2183 | 0 | for (const auto& table : tables) { |
2184 | 0 | if (table.has_table() && table.table_name() == "update_pk_complex_two_hash_one_range_keys") { |
2185 | 0 | table_id->assign(table.table_id()); |
2186 | 0 | break; |
2187 | 0 | } |
2188 | 0 | } |
2189 | 0 | } |
2190 | | |
2191 | | void PgMiniTest::StartReadWriteThreads(const std::string table_name, |
2192 | 0 | TestThreadHolder *thread_holder) { |
2193 | | // Writer thread that does parallel writes into table |
2194 | 0 | thread_holder->AddThread([this, table_name] { |
2195 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2196 | 0 | for (int i = 501; i < 2000; i++) { |
2197 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2, $3, $4)", |
2198 | 0 | table_name, i, i, i, 1)); |
2199 | 0 | } |
2200 | 0 | }); |
2201 | | |
2202 | | // Index read from the table |
2203 | 0 | thread_holder->AddThread([this, &stop = thread_holder->stop_flag(), table_name] { |
2204 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2205 | 0 | do { |
2206 | 0 | auto result = ASSERT_RESULT(conn.FetchFormat("SELECT * FROM $0 WHERE i = 1 order by r", |
2207 | 0 | table_name)); |
2208 | 0 | std::vector<int> sort_check; |
2209 | 0 | for(int x = 0; x < PQntuples(result.get()); x++) { |
2210 | 0 | auto value = ASSERT_RESULT(GetInt32(result.get(), x, 2)); |
2211 | 0 | sort_check.push_back(value); |
2212 | 0 | } |
2213 | 0 | ASSERT_TRUE(std::is_sorted(sort_check.begin(), sort_check.end())); |
2214 | 0 | } while (!stop.load(std::memory_order_acquire)); |
2215 | 0 | }); |
2216 | 0 | } |
2217 | | |
2218 | | TEST_F_EX( |
2219 | | PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(TabletSplitSecondaryIndexYSQL), |
2220 | 0 | PgMiniTabletSplitTest) { |
2221 | |
|
2222 | 0 | std::string table_name = "update_pk_complex_two_hash_one_range_keys"; |
2223 | 0 | CreateTableAndInitialize(table_name, 1); |
2224 | |
|
2225 | 0 | std::string table_id; |
2226 | 0 | GetTableIDFromTableName(table_name, &table_id); |
2227 | 0 | auto start_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size(); |
2228 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = true; |
2229 | | |
2230 | | // Insert elements into the table using a parallel thread |
2231 | 0 | TestThreadHolder thread_holder; |
2232 | | |
2233 | | /* |
2234 | | * Writer thread writes into the table continously, while the index read thread does a secondary |
2235 | | * index lookup. During the index lookup, we inject artificial delays, specified by the flag |
2236 | | * FLAGS_TEST_tablet_split_injected_delay_ms. Tablets will split in between those delays into |
2237 | | * two different partitions. |
2238 | | * |
2239 | | * The purpose of this test is to verify that when the secondary index read request is being |
2240 | | * executed, the results from both the tablets are being represented. Without the fix from |
2241 | | * the pggate layer, only one half of the results will be obtained. Hence we verify that after the |
2242 | | * split the number of elements is > 500, which is the number of elements inserted before the |
2243 | | * split. |
2244 | | */ |
2245 | 0 | StartReadWriteThreads(table_name, &thread_holder); |
2246 | |
|
2247 | 0 | thread_holder.WaitAndStop(200s); |
2248 | 0 | auto end_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size(); |
2249 | 0 | ASSERT_GT(end_num_tablets, start_num_tablets); |
2250 | 0 | DestroyTable(table_name); |
2251 | | |
2252 | | // Rerun the same test where table is created with 3 tablets. |
2253 | | // When a table is created with three tablets, the lower and upper bounds are as follows; |
2254 | | // tablet 1 -- empty to A |
2255 | | // tablet 2 -- A to B |
2256 | | // tablet 3 -- B to empty |
2257 | | // However, in situations where tables are created with just one tablet lower_bound and |
2258 | | // upper_bound for the tablet is empty to empty. Hence, to test both situations we run this test |
2259 | | // with one tablet and three tablets respectively. |
2260 | 0 | CreateTableAndInitialize(table_name, 3); |
2261 | 0 | GetTableIDFromTableName(table_name, &table_id); |
2262 | 0 | start_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size(); |
2263 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = true; |
2264 | |
|
2265 | 0 | StartReadWriteThreads(table_name, &thread_holder); |
2266 | 0 | thread_holder.WaitAndStop(200s); |
2267 | |
|
2268 | 0 | end_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size(); |
2269 | 0 | ASSERT_GT(end_num_tablets, start_num_tablets); |
2270 | 0 | DestroyTable(table_name); |
2271 | 0 | } |
2272 | | |
2273 | 0 | void PgMiniTest::RunManyConcurrentReadersTest() { |
2274 | 0 | constexpr int kNumConcurrentRead = 8; |
2275 | 0 | constexpr int kMinNumNonEmptyReads = 10; |
2276 | 0 | const std::string kTableName = "savepoints"; |
2277 | 0 | TestThreadHolder thread_holder; |
2278 | |
|
2279 | 0 | std::atomic<int32_t> next_write_start{0}; |
2280 | 0 | std::atomic<int32_t> num_non_empty_reads{0}; |
2281 | 0 | CountDownLatch reader_latch(0); |
2282 | 0 | CountDownLatch writer_latch(1); |
2283 | 0 | std::atomic<bool> writer_thread_is_stopped{false}; |
2284 | 0 | CountDownLatch reader_threads_are_stopped(kNumConcurrentRead); |
2285 | |
|
2286 | 0 | { |
2287 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2288 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (a int)", kTableName)); |
2289 | 0 | } |
2290 | |
|
2291 | 0 | thread_holder.AddThreadFunctor([ |
2292 | 0 | &stop = thread_holder.stop_flag(), &next_write_start, &reader_latch, &writer_latch, |
2293 | 0 | &writer_thread_is_stopped, kTableName, this] { |
2294 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2295 | 0 | while (!stop.load(std::memory_order_acquire)) { |
2296 | 0 | auto write_start = (next_write_start += 5); |
2297 | 0 | ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); |
2298 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start)); |
2299 | 0 | ASSERT_OK(conn.Execute("SAVEPOINT one")); |
2300 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 1)); |
2301 | 0 | ASSERT_OK(conn.Execute("SAVEPOINT two")); |
2302 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 2)); |
2303 | 0 | ASSERT_OK(conn.Execute("ROLLBACK TO SAVEPOINT one")); |
2304 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 3)); |
2305 | 0 | ASSERT_OK(conn.Execute("ROLLBACK TO SAVEPOINT one")); |
2306 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 4)); |
2307 | | |
2308 | | // Start concurrent reader threads |
2309 | 0 | reader_latch.Reset(kNumConcurrentRead * 5); |
2310 | 0 | writer_latch.CountDown(); |
2311 | | |
2312 | | // Commit while reader threads are running |
2313 | 0 | ASSERT_OK(conn.CommitTransaction()); |
2314 | | |
2315 | | // Allow reader threads to complete and halt. |
2316 | 0 | ASSERT_TRUE(reader_latch.WaitFor(5s * kTimeMultiplier)); |
2317 | 0 | writer_latch.Reset(1); |
2318 | 0 | } |
2319 | 0 | writer_thread_is_stopped = true; |
2320 | 0 | }); |
2321 | |
|
2322 | 0 | for (int reader_idx = 0; reader_idx < kNumConcurrentRead; ++reader_idx) { |
2323 | 0 | thread_holder.AddThreadFunctor([ |
2324 | 0 | &stop = thread_holder.stop_flag(), &next_write_start, &num_non_empty_reads, |
2325 | 0 | &reader_latch, &writer_latch, &reader_threads_are_stopped, kTableName, this] { |
2326 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2327 | 0 | while (!stop.load(std::memory_order_acquire)) { |
2328 | 0 | ASSERT_TRUE(writer_latch.WaitFor(10s * kTimeMultiplier)); |
2329 | |
|
2330 | 0 | auto read_start = next_write_start.load(); |
2331 | 0 | auto read_end = read_start + 4; |
2332 | 0 | auto fetch_query = strings::Substitute( |
2333 | 0 | "SELECT * FROM $0 WHERE a BETWEEN $1 AND $2 ORDER BY a ASC", |
2334 | 0 | kTableName, read_start, read_end); |
2335 | |
|
2336 | 0 | auto res = ASSERT_RESULT(conn.Fetch(fetch_query)); |
2337 | 0 | auto fetched_rows = PQntuples(res.get()); |
2338 | 0 | if (fetched_rows != 0) { |
2339 | 0 | num_non_empty_reads++; |
2340 | 0 | if (fetched_rows != 2) { |
2341 | 0 | LOG(INFO) |
2342 | 0 | << "Expected to fetch (" << read_start << ") and (" << read_end << "). " |
2343 | 0 | << "Instead, got the following results:"; |
2344 | 0 | for (int i = 0; i < fetched_rows; ++i) { |
2345 | 0 | auto fetched_val = CHECK_RESULT(GetInt32(res.get(), i, 0)); |
2346 | 0 | LOG(INFO) << "Result " << i << " - " << fetched_val; |
2347 | 0 | } |
2348 | 0 | } |
2349 | 0 | EXPECT_EQ(fetched_rows, 2); |
2350 | 0 | auto first_fetched_val = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); |
2351 | 0 | EXPECT_EQ(read_start, first_fetched_val); |
2352 | 0 | auto second_fetched_val = ASSERT_RESULT(GetInt32(res.get(), 1, 0)); |
2353 | 0 | EXPECT_EQ(read_start + 4, second_fetched_val); |
2354 | 0 | } |
2355 | 0 | reader_latch.CountDown(1); |
2356 | 0 | } |
2357 | 0 | reader_threads_are_stopped.CountDown(1); |
2358 | 0 | }); |
2359 | 0 | } |
2360 | |
|
2361 | 0 | std::this_thread::sleep_for(60s); |
2362 | 0 | thread_holder.stop_flag().store(true, std::memory_order_release); |
2363 | 0 | while (!writer_thread_is_stopped.load(std::memory_order_acquire) || |
2364 | 0 | reader_threads_are_stopped.count() != 0) { |
2365 | 0 | reader_latch.Reset(0); |
2366 | 0 | writer_latch.Reset(0); |
2367 | 0 | std::this_thread::sleep_for(10ms * kTimeMultiplier); |
2368 | 0 | } |
2369 | 0 | thread_holder.Stop(); |
2370 | 0 | EXPECT_GE(num_non_empty_reads, kMinNumNonEmptyReads); |
2371 | 0 | } |
2372 | | |
2373 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsertWithAbortedIntentsAndRestart)) { |
2374 | 0 | FLAGS_apply_intents_task_injected_delay_ms = 200; |
2375 | |
|
2376 | 0 | constexpr int64_t kRowNumModToAbort = 7; |
2377 | 0 | constexpr int64_t kNumBatches = 10; |
2378 | 0 | constexpr int64_t kNumRows = RegularBuildVsSanitizers(10000, 1000); |
2379 | 0 | FLAGS_txn_max_apply_batch_records = kNumRows / kNumBatches; |
2380 | |
|
2381 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2382 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY) SPLIT INTO 1 TABLETS")); |
2383 | |
|
2384 | 0 | ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
2385 | 0 | for (int64_t row_num = 0; row_num < kNumRows; ++row_num) { |
2386 | 0 | auto should_abort = row_num % kRowNumModToAbort == 0; |
2387 | 0 | if (should_abort) { |
2388 | 0 | ASSERT_OK(conn.Execute("SAVEPOINT A")); |
2389 | 0 | } |
2390 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO t VALUES ($0)", row_num)); |
2391 | 0 | if (should_abort) { |
2392 | 0 | ASSERT_OK(conn.Execute("ROLLBACK TO A")); |
2393 | 0 | } |
2394 | 0 | } |
2395 | |
|
2396 | 0 | ASSERT_OK(conn.CommitTransaction()); |
2397 | |
|
2398 | 0 | LOG(INFO) << "Restart cluster"; |
2399 | 0 | ASSERT_OK(RestartCluster()); |
2400 | 0 | conn = ASSERT_RESULT(Connect()); |
2401 | |
|
2402 | 0 | ASSERT_OK(WaitFor([this] { |
2403 | 0 | auto intents_count = CountIntents(cluster_.get()); |
2404 | 0 | LOG(INFO) << "Intents count: " << intents_count; |
2405 | |
|
2406 | 0 | return intents_count == 0; |
2407 | 0 | }, 60s * kTimeMultiplier, "Intents cleanup", 200ms)); |
2408 | |
|
2409 | 0 | for (int64_t row_num = 0; row_num < kNumRows; ++row_num) { |
2410 | 0 | auto should_abort = row_num % kRowNumModToAbort == 0; |
2411 | |
|
2412 | 0 | auto res = ASSERT_RESULT(conn.FetchFormat("SELECT * FROM t WHERE a = $0", row_num)); |
2413 | 0 | if (should_abort) { |
2414 | 0 | EXPECT_NOT_OK(GetInt32(res.get(), 0, 0)) << "Did not expect to find value for: " << row_num; |
2415 | 0 | } else { |
2416 | 0 | int64_t value = EXPECT_RESULT(GetInt32(res.get(), 0, 0)); |
2417 | 0 | EXPECT_EQ(value, row_num) << "Expected to find " << row_num << ", found " << value << "."; |
2418 | 0 | } |
2419 | 0 | } |
2420 | 0 | } |
2421 | | |
2422 | | TEST_F( |
2423 | | PgMiniTest, |
2424 | 0 | YB_DISABLE_TEST_IN_SANITIZERS(TestConcurrentReadersMaskAbortedIntentsWithApplyDelay)) { |
2425 | 0 | ASSERT_OK(cluster_->WaitForAllTabletServers()); |
2426 | 0 | std::this_thread::sleep_for(10s); |
2427 | 0 | FLAGS_apply_intents_task_injected_delay_ms = 10000; |
2428 | 0 | RunManyConcurrentReadersTest(); |
2429 | 0 | } |
2430 | | |
2431 | | TEST_F( |
2432 | | PgMiniTest, |
2433 | 0 | YB_DISABLE_TEST_IN_SANITIZERS(TestConcurrentReadersMaskAbortedIntentsWithResponseDelay)) { |
2434 | 0 | ASSERT_OK(cluster_->WaitForAllTabletServers()); |
2435 | 0 | std::this_thread::sleep_for(10s); |
2436 | 0 | FLAGS_TEST_inject_random_delay_on_txn_status_response_ms = 30; |
2437 | 0 | RunManyConcurrentReadersTest(); |
2438 | 0 | } |
2439 | | |
2440 | | TEST_F( |
2441 | | PgMiniTest, |
2442 | 0 | YB_DISABLE_TEST_IN_SANITIZERS(TestConcurrentReadersMaskAbortedIntentsWithUpdateDelay)) { |
2443 | 0 | ASSERT_OK(cluster_->WaitForAllTabletServers()); |
2444 | 0 | std::this_thread::sleep_for(10s); |
2445 | 0 | FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms = 30; |
2446 | 0 | RunManyConcurrentReadersTest(); |
2447 | 0 | } |
2448 | | |
2449 | 0 | TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(TestSerializableStrongReadLockNotAborted)) { |
2450 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2451 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY, b int) SPLIT INTO 1 TABLETS")); |
2452 | 0 | for (int i = 0; i < 100; ++i) { |
2453 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO t VALUES ($0, $0)", i)); |
2454 | 0 | } |
2455 | |
|
2456 | 0 | auto conn1 = ASSERT_RESULT(Connect()); |
2457 | 0 | ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
2458 | 0 | ASSERT_OK(conn1.Execute("SAVEPOINT A")); |
2459 | 0 | auto res1 = ASSERT_RESULT(conn1.FetchFormat("SELECT b FROM t WHERE a = $0", 90)); |
2460 | 0 | ASSERT_OK(conn1.Execute("ROLLBACK TO A")); |
2461 | |
|
2462 | 0 | auto conn2 = ASSERT_RESULT(Connect()); |
2463 | 0 | ASSERT_OK(conn2.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
2464 | 0 | auto update_status = conn2.ExecuteFormat("UPDATE t SET b = $0 WHERE a = $1", 1000, 90); |
2465 | |
|
2466 | 0 | auto commit_status = conn1.CommitTransaction(); |
2467 | |
|
2468 | 0 | EXPECT_TRUE(commit_status.ok() ^ update_status.ok()) |
2469 | 0 | << "Expected exactly one of commit of first transaction or update of second transaction to " |
2470 | 0 | << "fail.\n" |
2471 | 0 | << "Commit status: " << commit_status << ".\n" |
2472 | 0 | << "Update status: " << update_status << ".\n"; |
2473 | 0 | } |
2474 | | |
2475 | | // Use special mode when non leader master times out all rpcs. |
2476 | | // Then step down master leader and perform backup. |
2477 | | TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS_OR_MAC(NonRespondingMaster), |
2478 | 0 | PgMiniMasterFailoverTest) { |
2479 | 0 | FLAGS_TEST_timeout_non_leader_master_rpcs = true; |
2480 | 0 | tools::TmpDirProvider tmp_dir; |
2481 | |
|
2482 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2483 | 0 | ASSERT_OK(conn.Execute("CREATE DATABASE test")); |
2484 | 0 | conn = ASSERT_RESULT(ConnectToDB("test")); |
2485 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (i INT)")); |
2486 | |
|
2487 | 0 | auto peer = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer(); |
2488 | 0 | LOG(INFO) << "Old leader: " << peer->permanent_uuid(); |
2489 | 0 | ASSERT_OK(StepDown(peer, /* new_leader_uuid */ std::string(), ForceStepDown::kTrue)); |
2490 | 0 | ASSERT_OK(WaitFor([this, peer]() -> Result<bool> { |
2491 | 0 | auto leader = VERIFY_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer(); |
2492 | 0 | if (leader->permanent_uuid() != peer->permanent_uuid()) { |
2493 | 0 | LOG(INFO) << "New leader: " << leader->permanent_uuid(); |
2494 | 0 | return true; |
2495 | 0 | } |
2496 | 0 | return false; |
2497 | 0 | }, 10s, "Wait leader change")); |
2498 | |
|
2499 | 0 | ASSERT_OK(tools::RunBackupCommand( |
2500 | 0 | pg_host_port(), cluster_->GetMasterAddresses(), cluster_->GetTserverHTTPAddresses(), |
2501 | 0 | *tmp_dir, {"--backup_location", tmp_dir / "backup", "--no_upload", "--keyspace", "ysql.test", |
2502 | 0 | "create"})); |
2503 | 0 | } |
2504 | | |
2505 | | } // namespace pgwrapper |
2506 | | } // namespace yb |