/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_explicit_lock-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/pggate/pggate_flags.h" |
15 | | #include "yb/yql/pgwrapper/pg_mini_test_base.h" |
16 | | |
17 | | using namespace std::literals; |
18 | | |
19 | | namespace yb { |
20 | | namespace pgwrapper { |
21 | | |
22 | | template<IsolationLevel level> |
23 | | class PgExplicitLockTest : public PgMiniTestBase { |
24 | | protected: |
25 | 0 | void BeforePgProcessStart() override { |
26 | 0 | FLAGS_ysql_sleep_before_retry_on_txn_conflict = false; |
27 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper18PgExplicitLockTestILNS_14IsolationLevelE1EE20BeforePgProcessStartEv Unexecuted instantiation: _ZN2yb9pgwrapper18PgExplicitLockTestILNS_14IsolationLevelE2EE20BeforePgProcessStartEv |
28 | | |
29 | 0 | void TestRowLockInJoin() { |
30 | 0 | PGConn join_conn = ASSERT_RESULT(Connect()); |
31 | 0 | ASSERT_OK(join_conn.Execute("SET yb_transaction_priority_upper_bound=0.4")); |
32 | 0 | PGConn misc_conn = ASSERT_RESULT(Connect()); |
33 | 0 | PGConn select_conn = ASSERT_RESULT(Connect()); |
34 | 0 | ASSERT_OK(select_conn.Execute("SET yb_transaction_priority_lower_bound=0.5")); |
35 | | |
36 | | // Set up tables |
37 | 0 | ASSERT_OK(misc_conn.Execute( |
38 | 0 | "create table employees (k int primary key, profession text, email text)")); |
39 | 0 | ASSERT_OK(misc_conn.Execute("create table physicians (k int primary key, email text)")); |
40 | 0 | ASSERT_OK(misc_conn.Execute("insert into employees values (1, 'sales', 'salesman1@xyz.com')")); |
41 | 0 | ASSERT_OK(misc_conn.Execute("insert into employees values (2, 'sales', 'salesman2@xyz.com')")); |
42 | 0 | ASSERT_OK(misc_conn.Execute("insert into employees values (3, 'physician', 'phy1@xyz.com')")); |
43 | 0 | ASSERT_OK(misc_conn.Execute("insert into employees values (4, 'physician', 'phy2@xyz.com')")); |
44 | 0 | ASSERT_OK(misc_conn.Execute("insert into physicians values (1, 'phy1@xyz.com')")); |
45 | 0 | ASSERT_OK(misc_conn.Execute("insert into physicians values (2, 'phy2@xyz.com')")); |
46 | | |
47 | | // Test case 1: Join returns no rows. |
48 | 0 | ASSERT_OK(StartTxn(&join_conn)); |
49 | | |
50 | | // 1. For SERIALIZABLE level: all tablets of the table are locked since we need to lock the |
51 | | // whole predicate. |
52 | | // 2. For REPEATABLE READ level: No rows are locked since none match the conditions. |
53 | 0 | auto res = ASSERT_RESULT( |
54 | 0 | join_conn.Fetch( |
55 | 0 | "select * from physicians, employees where employees.profession = 'sales' and " |
56 | 0 | "employees.email = physicians.email for update")); |
57 | 0 | ASSERT_EQ(PQntuples(res.get()), 0); |
58 | | |
59 | | // The below statement will have a higher priority than the above txn. |
60 | | // Given this, the join txn will face following fate based on isolation level - |
61 | | // 1. SERIALIZABLE level: aborted due to conflicting locks. |
62 | | // 2. REPEATABLE READ level: not aborted since no locks taken. |
63 | 0 | res = ASSERT_RESULT(select_conn.Fetch("select * from employees for update")); |
64 | 0 | ASSERT_EQ(PQntuples(res.get()), 4); |
65 | 0 | if (level == IsolationLevel::SERIALIZABLE_ISOLATION) { |
66 | 0 | ASSERT_NOK(join_conn.Execute("COMMIT")); |
67 | 0 | } else { |
68 | 0 | ASSERT_OK(join_conn.Execute("COMMIT")); |
69 | 0 | } |
70 | | |
71 | | // Test case 2: Join returns 2 rows (but differernt from those returned later a by singular |
72 | | // select statement). |
73 | 0 | ASSERT_OK(StartTxn(&join_conn)); |
74 | | |
75 | | // 1. For SERIALIZABLE level: all tablets of the table are locked since we need to lock the |
76 | | // whole predicate. |
77 | | // 2. For REPEATABLE READ level: 2 rows are locked (the 'physician' ones). |
78 | 0 | res = ASSERT_RESULT( |
79 | 0 | join_conn.Fetch( |
80 | 0 | "select * from physicians, employees where employees.profession = 'physician' and " |
81 | 0 | "employees.email = physicians.email for update;")); |
82 | 0 | ASSERT_EQ(PQntuples(res.get()), 2); |
83 | | |
84 | | // The below statement will have a higher priority than the above txn. |
85 | | // Given this, the join txn will face following fate based on isolation level - |
86 | | // 1. SERIALIZABLE level: aborted due to conflicting locks. |
87 | | // 2. REPEATABLE READ level: not aborted since locks are on different sets of rows. |
88 | 0 | res = ASSERT_RESULT(select_conn.Fetch( |
89 | 0 | "select * from employees where employees.profession = 'sales' for update;")); |
90 | 0 | ASSERT_EQ(PQntuples(res.get()), 2); |
91 | 0 | if (level == IsolationLevel::SERIALIZABLE_ISOLATION) { |
92 | 0 | ASSERT_NOK(join_conn.Execute("COMMIT")); |
93 | 0 | } else { |
94 | 0 | ASSERT_OK(join_conn.Execute("COMMIT")); |
95 | 0 | } |
96 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper18PgExplicitLockTestILNS_14IsolationLevelE1EE17TestRowLockInJoinEv Unexecuted instantiation: _ZN2yb9pgwrapper18PgExplicitLockTestILNS_14IsolationLevelE2EE17TestRowLockInJoinEv |
97 | | |
98 | 0 | static CHECKED_STATUS StartTxn(PGConn* connection) { |
99 | 0 | return connection->StartTransaction(level); |
100 | 0 | } Unexecuted instantiation: _ZN2yb9pgwrapper18PgExplicitLockTestILNS_14IsolationLevelE1EE8StartTxnEPNS0_6PGConnE Unexecuted instantiation: _ZN2yb9pgwrapper18PgExplicitLockTestILNS_14IsolationLevelE2EE8StartTxnEPNS0_6PGConnE |
101 | | }; |
102 | | |
103 | | class PgExplicitLockTestSerializable |
104 | | : public PgExplicitLockTest<IsolationLevel::SERIALIZABLE_ISOLATION> { |
105 | | }; |
106 | | |
107 | | class PgExplicitLockTestSnapshot : public PgExplicitLockTest<IsolationLevel::SNAPSHOT_ISOLATION> { |
108 | | protected: |
109 | | void TestSkipLocked(); |
110 | | }; |
111 | | |
112 | | // Currently SKIP LOCKED is supported only SELECT statements in REPEATABLE READ isolation level. |
113 | 0 | void PgExplicitLockTestSnapshot::TestSkipLocked() { |
114 | 0 | PGConn misc_conn = ASSERT_RESULT(Connect()); |
115 | | |
116 | | // Set up table |
117 | 0 | ASSERT_OK(misc_conn.Execute("create table test (k int primary key, v int)")); |
118 | 0 | ASSERT_OK(misc_conn.Execute("insert into test values (1, 10), (2, 20), (3, 30)")); |
119 | | |
120 | | // Test case 1: 2 REPEATABLE READ txns skipping rows locked by each other. |
121 | 0 | PGConn txn1_conn = ASSERT_RESULT(Connect()); |
122 | 0 | PGConn txn2_conn = ASSERT_RESULT(Connect()); |
123 | 0 | ASSERT_OK(StartTxn(&txn1_conn)); |
124 | 0 | ASSERT_OK(StartTxn(&txn2_conn)); |
125 | |
|
126 | 0 | auto res = ASSERT_RESULT(txn1_conn.Fetch("select * from test for update skip locked limit 1")); |
127 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
128 | 0 | auto assert_val = [](PGResultPtr& res, int row, int col, int expected_val) { |
129 | 0 | auto val = ASSERT_RESULT(GetInt32(res.get(), row, col)); |
130 | 0 | ASSERT_EQ(val, expected_val); |
131 | 0 | }; |
132 | |
|
133 | 0 | assert_val(res, 0, 0, 1); |
134 | 0 | assert_val(res, 0, 1, 10); |
135 | 0 | assert_val(res, 0, 0, 1); |
136 | 0 | assert_val(res, 0, 1, 10); |
137 | |
|
138 | 0 | res = ASSERT_RESULT(txn2_conn.Fetch("select * from test for update skip locked limit 1")); |
139 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
140 | 0 | assert_val(res, 0, 0, 2); |
141 | 0 | assert_val(res, 0, 1, 20); |
142 | |
|
143 | 0 | res = ASSERT_RESULT(txn1_conn.Fetch("select * from test for update skip locked limit 2")); |
144 | 0 | ASSERT_EQ(PQntuples(res.get()), 2); |
145 | 0 | assert_val(res, 0, 0, 1); |
146 | 0 | assert_val(res, 0, 1, 10); |
147 | 0 | assert_val(res, 1, 0, 3); |
148 | 0 | assert_val(res, 1, 1, 30); |
149 | |
|
150 | 0 | res = ASSERT_RESULT(txn2_conn.Fetch("select * from test for update skip locked limit 2")); |
151 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
152 | 0 | assert_val(res, 0, 0, 2); |
153 | 0 | assert_val(res, 0, 1, 20); |
154 | |
|
155 | 0 | ASSERT_OK(txn1_conn.Execute("COMMIT")); |
156 | 0 | ASSERT_OK(txn2_conn.Execute("COMMIT")); |
157 | | |
158 | | // Test case 2: A txn holds lock on some rows. A single statement then skips the locked rows. |
159 | 0 | ASSERT_OK(StartTxn(&txn1_conn)); |
160 | 0 | res = ASSERT_RESULT(txn1_conn.Fetch("select * from test for update skip locked limit 1")); |
161 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
162 | 0 | assert_val(res, 0, 0, 1); |
163 | 0 | assert_val(res, 0, 1, 10); |
164 | |
|
165 | 0 | PGConn single_stmt_conn = ASSERT_RESULT(Connect()); |
166 | 0 | res = ASSERT_RESULT(single_stmt_conn.Fetch("select * from test for update skip locked limit 1")); |
167 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
168 | 0 | assert_val(res, 0, 0, 2); |
169 | 0 | assert_val(res, 0, 1, 20); |
170 | |
|
171 | 0 | res = ASSERT_RESULT(txn1_conn.Fetch("select * from test for update skip locked limit 2")); |
172 | 0 | ASSERT_EQ(PQntuples(res.get()), 2); |
173 | 0 | assert_val(res, 0, 0, 1); |
174 | 0 | assert_val(res, 0, 1, 10); |
175 | 0 | assert_val(res, 1, 0, 2); |
176 | 0 | assert_val(res, 1, 1, 20); |
177 | |
|
178 | 0 | ASSERT_OK(txn1_conn.Execute("COMMIT")); |
179 | | |
180 | | // Test case 3: |
181 | | // Use a join (involving 2 tables) that tries to lock rows based on some join predicate i.e., |
182 | | // locks 2 rows, one from each table (say r1 and r2) and also has SKIP LOCKED clause. But the join |
183 | | // finds that one of those rows (say r1) is already locked by some other txn. In this case assert |
184 | | // two things - |
185 | | // 1. the join should move on to the next set of rows that satisfy the predicate and lock those. |
186 | | // 2. r2 should still be available for locking |
187 | 0 | ASSERT_OK(misc_conn.Execute("create table test2 (k int primary key, v int)")); |
188 | 0 | ASSERT_OK(misc_conn.Execute("insert into test2 values (4, 10), (5, 20), (6, 30)")); |
189 | |
|
190 | 0 | ASSERT_OK(StartTxn(&txn1_conn)); |
191 | 0 | ASSERT_OK(StartTxn(&txn2_conn)); |
192 | 0 | res = ASSERT_RESULT(txn1_conn.Fetch("select * from test where k=1 for update;")); |
193 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
194 | 0 | assert_val(res, 0, 0, 1); |
195 | 0 | assert_val(res, 0, 1, 10); |
196 | |
|
197 | 0 | res = ASSERT_RESULT(txn2_conn.Fetch("select * from test, test2 where test.v=test2.v for update " |
198 | 0 | "skip locked limit 1;")); |
199 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
200 | 0 | assert_val(res, 0, 0, 2); |
201 | 0 | assert_val(res, 0, 1, 20); |
202 | 0 | assert_val(res, 0, 2, 5); |
203 | 0 | assert_val(res, 0, 3, 20); |
204 | |
|
205 | 0 | res = ASSERT_RESULT(txn1_conn.Fetch("select * from test2 where k=4 for update;")); |
206 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
207 | 0 | assert_val(res, 0, 0, 4); |
208 | 0 | assert_val(res, 0, 1, 10); |
209 | |
|
210 | 0 | ASSERT_OK(txn1_conn.Execute("COMMIT")); |
211 | 0 | ASSERT_OK(txn2_conn.Execute("COMMIT")); |
212 | 0 | } |
213 | | |
214 | 0 | TEST_F(PgExplicitLockTestSnapshot, YB_DISABLE_TEST_IN_SANITIZERS(RowLockInJoin)) { |
215 | 0 | TestRowLockInJoin(); |
216 | 0 | } |
217 | | |
218 | 0 | TEST_F(PgExplicitLockTestSerializable, YB_DISABLE_TEST_IN_SANITIZERS(RowLockInJoin)) { |
219 | 0 | TestRowLockInJoin(); |
220 | 0 | } |
221 | | |
222 | 0 | TEST_F(PgExplicitLockTestSnapshot, YB_DISABLE_TEST_IN_SANITIZERS(SkipLocked)) { |
223 | 0 | TestSkipLocked(); |
224 | 0 | } |
225 | | |
226 | | } // namespace pgwrapper |
227 | | } // namespace yb |