/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_txn-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/pgwrapper/pg_mini_test_base.h" |
15 | | |
16 | | #include "yb/util/test_macros.h" |
17 | | #include "yb/util/test_thread_holder.h" |
18 | | |
19 | | using namespace std::literals; |
20 | | |
21 | | DECLARE_bool(TEST_fail_in_apply_if_no_metadata); |
22 | | |
23 | | namespace yb { |
24 | | namespace pgwrapper { |
25 | | |
26 | | class PgTxnTest : public PgMiniTestBase { |
27 | | |
28 | | }; |
29 | | |
30 | 0 | TEST_F(PgTxnTest, YB_DISABLE_TEST_IN_SANITIZERS(EmptyUpdate)) { |
31 | 0 | FLAGS_TEST_fail_in_apply_if_no_metadata = true; |
32 | |
|
33 | 0 | auto conn = ASSERT_RESULT(Connect()); |
34 | |
|
35 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (key TEXT, value TEXT, PRIMARY KEY((key) HASH))")); |
36 | 0 | ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); |
37 | 0 | ASSERT_OK(conn.Execute("UPDATE test SET value = 'a' WHERE key = 'b'")); |
38 | 0 | ASSERT_OK(conn.CommitTransaction()); |
39 | 0 | } |
40 | | |
41 | | class PgTxnRF1Test : public PgTxnTest { |
42 | | public: |
43 | 0 | size_t NumTabletServers() override { |
44 | 0 | return 1; |
45 | 0 | } |
46 | | }; |
47 | | |
48 | 0 | TEST_F_EX(PgTxnTest, YB_DISABLE_TEST_IN_TSAN(SelectRF1ReadOnlyDeferred), PgTxnRF1Test) { |
49 | 0 | auto conn = ASSERT_RESULT(Connect()); |
50 | |
|
51 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (key INT)")); |
52 | 0 | ASSERT_OK(conn.Execute("INSERT INTO test VALUES (1)")); |
53 | 0 | ASSERT_OK(conn.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE")); |
54 | 0 | auto res = ASSERT_RESULT(conn.FetchValue<int32_t>("SELECT * FROM test")); |
55 | 0 | ASSERT_EQ(res, 1); |
56 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
57 | 0 | } |
58 | | |
59 | 0 | TEST_F(PgTxnTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadWriteConflicts)) { |
60 | 0 | auto conn1 = ASSERT_RESULT(Connect()); |
61 | 0 | auto conn2 = ASSERT_RESULT(Connect()); |
62 | 0 | constexpr double kPriorityBound = 0.5; |
63 | |
|
64 | 0 | ASSERT_OK(conn1.ExecuteFormat("SET yb_transaction_priority_lower_bound = $0", kPriorityBound)); |
65 | 0 | ASSERT_OK(conn1.Execute("CREATE TABLE test (key INT, value INT, PRIMARY KEY((key) HASH))")); |
66 | 0 | ASSERT_OK(conn1.Execute("CREATE INDEX idx ON test (value)")); |
67 | 0 | ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
68 | 0 | ASSERT_OK(conn1.Execute("INSERT INTO test VALUES (1, 1)")); |
69 | 0 | ASSERT_OK(conn2.ExecuteFormat("SET yb_transaction_priority_upper_bound = $0", kPriorityBound)); |
70 | 0 | ASSERT_OK(conn2.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
71 | 0 | ASSERT_NOK(conn2.Fetch("SELECT key FROM test WHERE value = 1")); |
72 | 0 | ASSERT_OK(conn1.Execute("COMMIT")); |
73 | 0 | } |
74 | | |
75 | | // Test concurrently insert increasing values, and in parallel perform read of several recent |
76 | | // values. |
77 | | // Checking that reads could be serialized. |
78 | 0 | TEST_F(PgTxnTest, YB_DISABLE_TEST_IN_TSAN(ReadRecentSet)) { |
79 | 0 | auto conn = ASSERT_RESULT(Connect()); |
80 | 0 | constexpr int kWriters = 16; |
81 | 0 | constexpr int kReaders = 16; |
82 | 0 | constexpr int kReadLength = 32; |
83 | |
|
84 | 0 | ASSERT_OK(conn.Execute( |
85 | 0 | "CREATE TABLE test (key INT, value INT, PRIMARY KEY((key) HASH)) SPLIT INTO 1 TABLETS")); |
86 | 0 | ASSERT_OK(conn.Execute( |
87 | 0 | "CREATE INDEX idx ON test (value) SPLIT INTO 2 TABLETS")); |
88 | 0 | TestThreadHolder thread_holder; |
89 | 0 | std::atomic<int> value(0); |
90 | 0 | for (int i = 0; i != kWriters; ++i) { |
91 | 0 | thread_holder.AddThreadFunctor( |
92 | 0 | [this, &stop = thread_holder.stop_flag(), &value] { |
93 | 0 | auto connection = ASSERT_RESULT(Connect()); |
94 | 0 | while (!stop.load()) { |
95 | 0 | int cur = value.fetch_add(1); |
96 | 0 | ASSERT_OK(connection.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
97 | 0 | auto status = connection.ExecuteFormat("INSERT INTO test VALUES ($0, $0)", cur); |
98 | 0 | if (status.ok()) { |
99 | 0 | status = connection.CommitTransaction(); |
100 | 0 | } |
101 | 0 | if (!status.ok()) { |
102 | 0 | ASSERT_OK(connection.RollbackTransaction()); |
103 | 0 | } |
104 | 0 | } |
105 | 0 | }); |
106 | 0 | } |
107 | 0 | std::mutex reads_mutex; |
108 | 0 | struct Read { |
109 | 0 | int read_min; |
110 | 0 | uint64_t mask; |
111 | |
|
112 | 0 | static std::string ValuesToString(int read_min, uint64_t mask) { |
113 | 0 | auto v = read_min; |
114 | 0 | auto m = mask; |
115 | 0 | std::vector<int> values; |
116 | 0 | while (m) { |
117 | 0 | if (m & 1ULL) { |
118 | 0 | values.push_back(v); |
119 | 0 | } |
120 | 0 | ++v; |
121 | 0 | m >>= 1ULL; |
122 | 0 | } |
123 | |
|
124 | 0 | return AsString(values); |
125 | 0 | } |
126 | |
|
127 | 0 | std::string ToString() const { |
128 | 0 | return Format("{ read range: $0-$1 values: $2 }", |
129 | 0 | read_min, read_min + kReadLength - 1, ValuesToString(read_min, mask)); |
130 | 0 | } |
131 | 0 | }; |
132 | 0 | std::vector<Read> reads; |
133 | 0 | for (int i = 0; i != kReaders; ++i) { |
134 | 0 | thread_holder.AddThreadFunctor( |
135 | 0 | [this, &stop = thread_holder.stop_flag(), &value, &reads, &reads_mutex] { |
136 | 0 | auto connection = ASSERT_RESULT(Connect()); |
137 | 0 | char str_buffer[0x200]; |
138 | 0 | while (!stop.load()) { |
139 | 0 | const auto read_min = std::max(value.load() - kReadLength, 0); |
140 | 0 | char* p = str_buffer; |
141 | 0 | for (auto v = read_min; v != read_min + kReadLength; ++v) { |
142 | 0 | if (p != str_buffer) { |
143 | 0 | *p++ = ','; |
144 | 0 | } |
145 | 0 | p = FastInt64ToBufferLeft(v, p); |
146 | 0 | } |
147 | 0 | ASSERT_OK(connection.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
148 | 0 | auto res = connection.FetchFormat("SELECT value FROM test WHERE value in ($0)", str_buffer); |
149 | 0 | if (!res.ok()) { |
150 | 0 | ASSERT_OK(connection.RollbackTransaction()); |
151 | 0 | continue; |
152 | 0 | } |
153 | 0 | auto status = connection.CommitTransaction(); |
154 | 0 | if (!status.ok()) { |
155 | 0 | ASSERT_OK(connection.RollbackTransaction()); |
156 | 0 | continue; |
157 | 0 | } |
158 | 0 | uint64_t mask = 0; |
159 | 0 | for (int j = 0, count = PQntuples(res->get()); j != count; ++j) { |
160 | 0 | mask |= 1ULL << (ASSERT_RESULT(GetInt32(res->get(), j, 0)) - read_min); |
161 | 0 | } |
162 | 0 | std::lock_guard<std::mutex> lock(reads_mutex); |
163 | 0 | Read new_read{read_min, mask}; |
164 | 0 | reads.erase(std::remove_if(reads.begin(), reads.end(), |
165 | 0 | [&new_read, &stop](const auto& old_read) { |
166 | 0 | int read_min_delta = new_read.read_min - old_read.read_min; |
167 | | // Existing read is too old, remove it. |
168 | 0 | if (read_min_delta >= 64) { |
169 | 0 | return true; |
170 | 0 | } |
171 | | // New read is too old, cannot check it. |
172 | 0 | if (read_min_delta <= -64) { |
173 | 0 | return false; |
174 | 0 | } |
175 | 0 | constexpr auto kFullMask = (1ULL << kReadLength) - 1ULL; |
176 | | // Extract only numbers that belong to both reads. |
177 | 0 | uint64_t lmask, rmask; |
178 | 0 | if (read_min_delta >= 0) { |
179 | 0 | lmask = new_read.mask & (kFullMask >> read_min_delta); |
180 | 0 | rmask = (old_read.mask >> read_min_delta) & kFullMask; |
181 | 0 | } else { |
182 | 0 | lmask = (new_read.mask >> -read_min_delta) & kFullMask; |
183 | 0 | rmask = old_read.mask & (kFullMask >> -read_min_delta); |
184 | 0 | } |
185 | | // Check that one set is subset of another subset. |
186 | | // I.e. only one set is allowed to have elements that is not contained in another set. |
187 | 0 | if ((lmask | rmask) != std::max(lmask, rmask)) { |
188 | 0 | const auto read = std::max(old_read.read_min, new_read.read_min); |
189 | 0 | ADD_FAILURE() << "R1: " << old_read.ToString() << "\nR2: " << new_read.ToString() |
190 | 0 | << "\nR1-R2: " << Read::ValuesToString(read, rmask ^ (lmask & rmask)) |
191 | 0 | << ", R2-R1: " << Read::ValuesToString(read, lmask ^ (lmask & rmask)); |
192 | 0 | stop.store(true); |
193 | 0 | } |
194 | 0 | return false; |
195 | 0 | }), reads.end()); |
196 | 0 | reads.push_back(new_read); |
197 | 0 | } |
198 | 0 | }); |
199 | 0 | } |
200 | |
|
201 | 0 | thread_holder.WaitAndStop(30s); |
202 | 0 | } |
203 | | |
204 | | } // namespace pgwrapper |
205 | | } // namespace yb |