/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_libpq-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include <signal.h> |
14 | | |
15 | | #include <fstream> |
16 | | #include <thread> |
17 | | |
18 | | #include "yb/client/client_fwd.h" |
19 | | #include "yb/client/table_info.h" |
20 | | #include "yb/client/yb_table_name.h" |
21 | | |
22 | | #include "yb/common/common.pb.h" |
23 | | #include "yb/common/pgsql_error.h" |
24 | | #include "yb/common/schema.h" |
25 | | |
26 | | #include "yb/master/master_client.pb.h" |
27 | | #include "yb/master/master_defaults.h" |
28 | | |
29 | | #include "yb/util/async_util.h" |
30 | | #include "yb/util/barrier.h" |
31 | | #include "yb/util/metrics.h" |
32 | | #include "yb/util/monotime.h" |
33 | | #include "yb/util/path_util.h" |
34 | | #include "yb/util/random_util.h" |
35 | | #include "yb/util/scope_exit.h" |
36 | | #include "yb/util/status_log.h" |
37 | | #include "yb/util/test_thread_holder.h" |
38 | | #include "yb/util/tsan_util.h" |
39 | | |
40 | | #include "yb/yql/pgwrapper/libpq_test_base.h" |
41 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
42 | | |
43 | | using namespace std::literals; |
44 | | |
45 | | DECLARE_int64(external_mini_cluster_max_log_bytes); |
46 | | |
47 | | METRIC_DECLARE_entity(tablet); |
48 | | METRIC_DECLARE_counter(transaction_not_found); |
49 | | |
50 | | METRIC_DECLARE_entity(server); |
51 | | METRIC_DECLARE_counter(rpc_inbound_calls_created); |
52 | | |
53 | | namespace yb { |
54 | | namespace pgwrapper { |
55 | | |
56 | | class PgLibPqTest : public LibPqTestBase { |
57 | | protected: |
58 | | void TestUriAuth(); |
59 | | |
60 | | void TestMultiBankAccount(IsolationLevel isolation); |
61 | | |
62 | | void DoIncrement(int key, int num_increments, IsolationLevel isolation); |
63 | | |
64 | | void TestParallelCounter(IsolationLevel isolation); |
65 | | |
66 | | void TestConcurrentCounter(IsolationLevel isolation); |
67 | | |
68 | | void TestOnConflict(bool kill_master, const MonoDelta& duration); |
69 | | |
70 | | void TestCacheRefreshRetry(const bool is_retry_disabled); |
71 | | |
72 | | const std::vector<std::string> names{ |
73 | | "uppercase:P", |
74 | | "space: ", |
75 | | "symbol:#", |
76 | | "single_quote:'", |
77 | | "double_quote:\"", |
78 | | "backslash:\\", |
79 | | "mixed:P #'\"\\", |
80 | | }; |
81 | | }; |
82 | | |
83 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(Simple)) { |
84 | 0 | auto conn = ASSERT_RESULT(Connect()); |
85 | |
|
86 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT, value TEXT)")); |
87 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (1, 'hello')")); |
88 | |
|
89 | 0 | auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM t")); |
90 | |
|
91 | 0 | { |
92 | 0 | auto lines = PQntuples(res.get()); |
93 | 0 | ASSERT_EQ(1, lines); |
94 | |
|
95 | 0 | auto columns = PQnfields(res.get()); |
96 | 0 | ASSERT_EQ(2, columns); |
97 | |
|
98 | 0 | auto key = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); |
99 | 0 | ASSERT_EQ(key, 1); |
100 | 0 | auto value = ASSERT_RESULT(GetString(res.get(), 0, 1)); |
101 | 0 | ASSERT_EQ(value, "hello"); |
102 | 0 | } |
103 | 0 | } |
104 | | |
105 | | // Test libpq connection to various database names. |
106 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(DatabaseNames)) { |
107 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
108 | |
|
109 | 0 | for (const std::string& db_name : names) { |
110 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", PqEscapeIdentifier(db_name))); |
111 | 0 | ASSERT_OK(ConnectToDB(db_name)); |
112 | 0 | } |
113 | 0 | } |
114 | | |
115 | | // Test libpq connection to various user names. |
116 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(UserNames)) { |
117 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
118 | |
|
119 | 0 | for (const std::string& user_name : names) { |
120 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE USER $0", PqEscapeIdentifier(user_name))); |
121 | 0 | ASSERT_OK(ConnectToDBAsUser("" /* db_name */, user_name)); |
122 | 0 | } |
123 | 0 | } |
124 | | |
125 | | // Test libpq connection using URI connection string. |
126 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(Uri)) { |
127 | 0 | const std::string& host = pg_ts->bind_host(); |
128 | 0 | const uint16_t port = pg_ts->pgsql_rpc_port(); |
129 | 0 | { |
130 | 0 | const std::string& conn_str = Format("postgres://yugabyte@$0:$1", host, port); |
131 | 0 | LOG(INFO) << "Connecting using string: " << conn_str; |
132 | 0 | PGConn conn = ASSERT_RESULT(ConnectUsingString(conn_str)); |
133 | 0 | { |
134 | 0 | auto res = ASSERT_RESULT(conn.Fetch("select current_database()")); |
135 | 0 | auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0)); |
136 | 0 | ASSERT_EQ(answer, "yugabyte"); |
137 | 0 | } |
138 | 0 | { |
139 | 0 | auto res = ASSERT_RESULT(conn.Fetch("select current_user")); |
140 | 0 | auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0)); |
141 | 0 | ASSERT_EQ(answer, "yugabyte"); |
142 | 0 | } |
143 | 0 | { |
144 | 0 | auto res = ASSERT_RESULT(conn.Fetch("show listen_addresses")); |
145 | 0 | auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0)); |
146 | 0 | ASSERT_EQ(answer, host); |
147 | 0 | } |
148 | 0 | { |
149 | 0 | auto res = ASSERT_RESULT(conn.Fetch("show port")); |
150 | 0 | auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0)); |
151 | 0 | ASSERT_EQ(answer, std::to_string(port)); |
152 | 0 | } |
153 | 0 | } |
154 | | // Supply database name. |
155 | 0 | { |
156 | 0 | const std::string& conn_str = Format("postgres://yugabyte@$0:$1/template1", host, port); |
157 | 0 | LOG(INFO) << "Connecting using string: " << conn_str; |
158 | 0 | PGConn conn = ASSERT_RESULT(ConnectUsingString(conn_str)); |
159 | 0 | { |
160 | 0 | auto res = ASSERT_RESULT(conn.Fetch("select current_database()")); |
161 | 0 | auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0)); |
162 | 0 | ASSERT_EQ(answer, "template1"); |
163 | 0 | } |
164 | 0 | } |
165 | | // Supply an incorrect password. Since HBA config gives the yugabyte user trust access, postgres |
166 | | // won't request a password, our client won't send this password, and the authentication should |
167 | | // succeed. |
168 | 0 | { |
169 | 0 | const std::string& conn_str = Format("postgres://yugabyte:monkey123@$0:$1", host, port); |
170 | 0 | LOG(INFO) << "Connecting using string: " << conn_str; |
171 | 0 | ASSERT_OK(ConnectUsingString(conn_str)); |
172 | 0 | } |
173 | 0 | } |
174 | | |
175 | 0 | void PgLibPqTest::TestUriAuth() { |
176 | 0 | const std::string& host = pg_ts->bind_host(); |
177 | 0 | const uint16_t port = pg_ts->pgsql_rpc_port(); |
178 | | // Don't supply password. |
179 | 0 | { |
180 | 0 | const std::string& conn_str = Format("postgres://yugabyte@$0:$1", host, port); |
181 | 0 | LOG(INFO) << "Connecting using string: " << conn_str; |
182 | 0 | Result<PGConn> result = ConnectUsingString( |
183 | 0 | conn_str, |
184 | 0 | CoarseMonoClock::Now() + 2s /* deadline */); |
185 | 0 | ASSERT_NOK(result); |
186 | 0 | ASSERT_TRUE(result.status().IsNetworkError()); |
187 | 0 | ASSERT_TRUE(result.status().message().ToBuffer().find("Connect failed") != std::string::npos) |
188 | 0 | << result.status(); |
189 | 0 | } |
190 | | // Supply an incorrect password. |
191 | 0 | { |
192 | 0 | const std::string& conn_str = Format("postgres://yugabyte:monkey123@$0:$1", host, port); |
193 | 0 | LOG(INFO) << "Connecting using string: " << conn_str; |
194 | 0 | Result<PGConn> result = ConnectUsingString( |
195 | 0 | conn_str, |
196 | 0 | CoarseMonoClock::Now() + 2s /* deadline */); |
197 | 0 | ASSERT_NOK(result); |
198 | 0 | ASSERT_TRUE(result.status().IsNetworkError()); |
199 | 0 | ASSERT_TRUE(result.status().message().ToBuffer().find("Connect failed") != std::string::npos) |
200 | 0 | << result.status(); |
201 | 0 | } |
202 | | // Supply the correct password. |
203 | 0 | { |
204 | 0 | const std::string& conn_str = Format("postgres://yugabyte:yugabyte@$0:$1", host, port); |
205 | 0 | LOG(INFO) << "Connecting using string: " << conn_str; |
206 | 0 | ASSERT_OK(ConnectUsingString(conn_str)); |
207 | 0 | } |
208 | 0 | } |
209 | | |
210 | | // Enable authentication using password. This scheme requests the plain password. You may still |
211 | | // use SSL for encryption on the wire. |
212 | | class PgLibPqTestAuthPassword : public PgLibPqTest { |
213 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
214 | 0 | options->extra_tserver_flags.push_back("--ysql_hba_conf_csv=host all all samehost password"); |
215 | 0 | } |
216 | | }; |
217 | | |
218 | 0 | TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(UriPassword), PgLibPqTestAuthPassword) { |
219 | 0 | TestUriAuth(); |
220 | 0 | } |
221 | | |
222 | | // Enable authentication using md5. This scheme is a challenge and response, so the plain password |
223 | | // isn't sent. |
224 | | class PgLibPqTestAuthMd5 : public PgLibPqTest { |
225 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
226 | 0 | options->extra_tserver_flags.push_back("--ysql_hba_conf_csv=host all all samehost md5"); |
227 | 0 | } |
228 | | }; |
229 | | |
230 | 0 | TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(UriMd5), PgLibPqTestAuthMd5) { |
231 | 0 | TestUriAuth(); |
232 | 0 | } |
233 | | |
234 | | // Test that repeats example from this article: |
235 | | // https://blogs.msdn.microsoft.com/craigfr/2007/05/16/serializable-vs-snapshot-isolation-level/ |
236 | | // |
237 | | // Multiple rows with values 0 and 1 are stored in table. |
238 | | // Two concurrent transaction fetches all rows from table and does the following. |
239 | | // First transaction changes value of all rows with value 0 to 1. |
240 | | // Second transaction changes value of all rows with value 1 to 0. |
241 | | // As outcome we should have rows with the same value. |
242 | | // |
243 | | // The described prodecure is repeated multiple times to increase probability of catching bug, |
244 | | // w/o running test multiple times. |
245 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SerializableColoring)) { |
246 | 0 | constexpr auto kKeys = RegularBuildVsSanitizers(10, 20); |
247 | 0 | constexpr auto kColors = 2; |
248 | 0 | constexpr auto kIterations = 20; |
249 | |
|
250 | 0 | auto conn = ASSERT_RESULT(Connect()); |
251 | |
|
252 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, color INT)")); |
253 | |
|
254 | 0 | auto iterations_left = kIterations; |
255 | |
|
256 | 0 | for (int iteration = 0; iterations_left > 0; ++iteration) { |
257 | 0 | auto iteration_title = Format("Iteration: $0", iteration); |
258 | 0 | SCOPED_TRACE(iteration_title); |
259 | 0 | LOG(INFO) << iteration_title; |
260 | |
|
261 | 0 | auto s = conn.Execute("DELETE FROM t"); |
262 | 0 | if (!s.ok()) { |
263 | 0 | ASSERT_TRUE(HasTryAgain(s)) << s; |
264 | 0 | continue; |
265 | 0 | } |
266 | 0 | for (int k = 0; k != kKeys; ++k) { |
267 | 0 | int32_t color = RandomUniformInt(0, kColors - 1); |
268 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO t (key, color) VALUES ($0, $1)", k, color)); |
269 | 0 | } |
270 | |
|
271 | 0 | std::atomic<int> complete{ 0 }; |
272 | 0 | std::vector<std::thread> threads; |
273 | 0 | for (int i = 0; i != kColors; ++i) { |
274 | 0 | int32_t color = i; |
275 | 0 | threads.emplace_back([this, color, kKeys, &complete] { |
276 | 0 | auto connection = ASSERT_RESULT(Connect()); |
277 | |
|
278 | 0 | ASSERT_OK(connection.Execute("BEGIN")); |
279 | 0 | ASSERT_OK(connection.Execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")); |
280 | |
|
281 | 0 | auto res = connection.Fetch("SELECT * FROM t"); |
282 | 0 | if (!res.ok()) { |
283 | 0 | ASSERT_TRUE(HasTryAgain(res.status())) << res.status(); |
284 | 0 | return; |
285 | 0 | } |
286 | 0 | auto columns = PQnfields(res->get()); |
287 | 0 | ASSERT_EQ(2, columns); |
288 | |
|
289 | 0 | auto lines = PQntuples(res->get()); |
290 | 0 | ASSERT_EQ(kKeys, lines); |
291 | 0 | for (int j = 0; j != lines; ++j) { |
292 | 0 | if (ASSERT_RESULT(GetInt32(res->get(), j, 1)) == color) { |
293 | 0 | continue; |
294 | 0 | } |
295 | | |
296 | 0 | auto key = ASSERT_RESULT(GetInt32(res->get(), j, 0)); |
297 | 0 | auto status = connection.ExecuteFormat( |
298 | 0 | "UPDATE t SET color = $1 WHERE key = $0", key, color); |
299 | 0 | if (!status.ok()) { |
300 | 0 | auto msg = status.message().ToBuffer(); |
301 | | // Missing metadata means that transaction was aborted and cleaned. |
302 | 0 | ASSERT_TRUE(HasTryAgain(status) || |
303 | 0 | msg.find("Missing metadata") != std::string::npos) << status; |
304 | 0 | break; |
305 | 0 | } |
306 | 0 | } |
307 | |
|
308 | 0 | auto status = connection.Execute("COMMIT"); |
309 | 0 | if (!status.ok()) { |
310 | 0 | auto msg = status.message().ToBuffer(); |
311 | 0 | ASSERT_TRUE(msg.find("Operation expired") != std::string::npos) << status; |
312 | 0 | return; |
313 | 0 | } |
314 | | |
315 | 0 | ++complete; |
316 | 0 | }); |
317 | 0 | } |
318 | |
|
319 | 0 | for (auto& thread : threads) { |
320 | 0 | thread.join(); |
321 | 0 | } |
322 | |
|
323 | 0 | if (complete == 0) { |
324 | 0 | continue; |
325 | 0 | } |
326 | | |
327 | 0 | auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM t")); |
328 | 0 | auto columns = PQnfields(res.get()); |
329 | 0 | ASSERT_EQ(2, columns); |
330 | |
|
331 | 0 | auto lines = PQntuples(res.get()); |
332 | 0 | ASSERT_EQ(kKeys, lines); |
333 | |
|
334 | 0 | std::vector<int32_t> zeroes, ones; |
335 | 0 | for (int i = 0; i != lines; ++i) { |
336 | 0 | auto key = ASSERT_RESULT(GetInt32(res.get(), i, 0)); |
337 | 0 | auto current = ASSERT_RESULT(GetInt32(res.get(), i, 1)); |
338 | 0 | if (current == 0) { |
339 | 0 | zeroes.push_back(key); |
340 | 0 | } else { |
341 | 0 | ones.push_back(key); |
342 | 0 | } |
343 | 0 | } |
344 | |
|
345 | 0 | std::sort(ones.begin(), ones.end()); |
346 | 0 | std::sort(zeroes.begin(), zeroes.end()); |
347 | |
|
348 | 0 | LOG(INFO) << "Zeroes: " << yb::ToString(zeroes) << ", ones: " << yb::ToString(ones); |
349 | 0 | ASSERT_TRUE(zeroes.empty() || ones.empty()); |
350 | |
|
351 | 0 | --iterations_left; |
352 | 0 | } |
353 | 0 | } |
354 | | |
355 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadWriteConflict)) { |
356 | 0 | const auto kKeys = RegularBuildVsSanitizers(20, 5); |
357 | 0 | const auto kNumTries = RegularBuildVsSanitizers(4, 1); |
358 | 0 | auto tries = 1; |
359 | 0 | for (; tries <= kNumTries; ++tries) { |
360 | 0 | auto conn = ASSERT_RESULT(Connect()); |
361 | 0 | ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t")); |
362 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)")); |
363 | |
|
364 | 0 | size_t reads_won = 0, writes_won = 0; |
365 | 0 | for (int i = 0; i != kKeys; ++i) { |
366 | 0 | auto read_conn = ASSERT_RESULT(Connect()); |
367 | 0 | ASSERT_OK(read_conn.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE")); |
368 | 0 | auto res = read_conn.FetchFormat("SELECT * FROM t WHERE key = $0", i); |
369 | 0 | auto read_status = ResultToStatus(res); |
370 | |
|
371 | 0 | auto write_conn = ASSERT_RESULT(Connect()); |
372 | 0 | ASSERT_OK(write_conn.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE")); |
373 | 0 | auto write_status = write_conn.ExecuteFormat("INSERT INTO t (key) VALUES ($0)", i); |
374 | |
|
375 | 0 | std::thread read_commit_thread([&read_conn, &read_status] { |
376 | 0 | if (read_status.ok()) { |
377 | 0 | read_status = read_conn.Execute("COMMIT"); |
378 | 0 | } |
379 | 0 | }); |
380 | |
|
381 | 0 | std::thread write_commit_thread([&write_conn, &write_status] { |
382 | 0 | if (write_status.ok()) { |
383 | 0 | write_status = write_conn.Execute("COMMIT"); |
384 | 0 | } |
385 | 0 | }); |
386 | |
|
387 | 0 | read_commit_thread.join(); |
388 | 0 | write_commit_thread.join(); |
389 | |
|
390 | 0 | LOG(INFO) << "Read: " << read_status << ", write: " << write_status; |
391 | |
|
392 | 0 | if (!read_status.ok()) { |
393 | 0 | ASSERT_OK(write_status); |
394 | 0 | ++writes_won; |
395 | 0 | } else { |
396 | 0 | ASSERT_NOK(write_status); |
397 | 0 | ++reads_won; |
398 | 0 | } |
399 | 0 | } |
400 | |
|
401 | 0 | LOG(INFO) << "Reads won: " << reads_won << ", writes won: " << writes_won |
402 | 0 | << " (" << tries << "/" << kNumTries << ")"; |
403 | | // always pass for TSAN, we're just looking for memory issues |
404 | 0 | if (RegularBuildVsSanitizers(false, true)) { |
405 | 0 | break; |
406 | 0 | } |
407 | | // break (succeed) if we hit 25% on our "coin toss" transaction conflict above |
408 | 0 | if (reads_won >= kKeys / 4 && writes_won >= kKeys / 4) { |
409 | 0 | break; |
410 | 0 | } |
411 | | // otherwise, retry and see if this is consistent behavior |
412 | 0 | } |
413 | 0 | ASSERT_LE(tries, kNumTries); |
414 | 0 | } |
415 | | |
416 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ReadRestart)) { |
417 | 0 | auto conn = ASSERT_RESULT(Connect()); |
418 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)")); |
419 | |
|
420 | 0 | std::atomic<bool> stop(false); |
421 | 0 | std::atomic<int> last_written(0); |
422 | |
|
423 | 0 | std::thread write_thread([this, &stop, &last_written] { |
424 | 0 | auto write_conn = ASSERT_RESULT(Connect()); |
425 | 0 | int write_key = 1; |
426 | 0 | while (!stop.load(std::memory_order_acquire)) { |
427 | 0 | SCOPED_TRACE(Format("Writing: $0", write_key)); |
428 | |
|
429 | 0 | ASSERT_OK(write_conn.Execute("BEGIN")); |
430 | 0 | auto status = write_conn.ExecuteFormat("INSERT INTO t (key) VALUES ($0)", write_key); |
431 | 0 | if (status.ok()) { |
432 | 0 | status = write_conn.Execute("COMMIT"); |
433 | 0 | } |
434 | 0 | if (status.ok()) { |
435 | 0 | last_written.store(write_key, std::memory_order_release); |
436 | 0 | ++write_key; |
437 | 0 | } else { |
438 | 0 | LOG(INFO) << "Write " << write_key << " failed: " << status; |
439 | 0 | } |
440 | 0 | } |
441 | 0 | }); |
442 | |
|
443 | 0 | auto se = ScopeExit([&stop, &write_thread] { |
444 | 0 | stop.store(true, std::memory_order_release); |
445 | 0 | write_thread.join(); |
446 | 0 | }); |
447 | |
|
448 | 0 | auto deadline = CoarseMonoClock::now() + 30s; |
449 | |
|
450 | 0 | while (CoarseMonoClock::now() < deadline) { |
451 | 0 | int read_key = last_written.load(std::memory_order_acquire); |
452 | 0 | if (read_key == 0) { |
453 | 0 | std::this_thread::sleep_for(100ms); |
454 | 0 | continue; |
455 | 0 | } |
456 | | |
457 | 0 | SCOPED_TRACE(Format("Reading: $0", read_key)); |
458 | |
|
459 | 0 | ASSERT_OK(conn.Execute("BEGIN")); |
460 | |
|
461 | 0 | auto res = ASSERT_RESULT(conn.FetchFormat("SELECT * FROM t WHERE key = $0", read_key)); |
462 | 0 | auto columns = PQnfields(res.get()); |
463 | 0 | ASSERT_EQ(1, columns); |
464 | |
|
465 | 0 | auto lines = PQntuples(res.get()); |
466 | 0 | ASSERT_EQ(1, lines); |
467 | |
|
468 | 0 | auto key = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); |
469 | 0 | ASSERT_EQ(key, read_key); |
470 | |
|
471 | 0 | ASSERT_OK(conn.Execute("ROLLBACK")); |
472 | 0 | } |
473 | |
|
474 | 0 | ASSERT_GE(last_written.load(std::memory_order_acquire), 100); |
475 | 0 | } |
476 | | |
477 | | // Concurrently insert records into tables with foreign key relationship while truncating both. |
478 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentInsertTruncateForeignKey)) { |
479 | 0 | auto conn = ASSERT_RESULT(Connect()); |
480 | |
|
481 | 0 | ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t2")); |
482 | 0 | ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t1")); |
483 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t1 (k int primary key, v int)")); |
484 | 0 | ASSERT_OK(conn.Execute( |
485 | 0 | "CREATE TABLE t2 (k int primary key, t1_k int, FOREIGN KEY (t1_k) REFERENCES t1 (k))")); |
486 | |
|
487 | 0 | const int kMaxKeys = 1 << 20; |
488 | |
|
489 | 0 | constexpr auto kWriteThreads = 4; |
490 | 0 | constexpr auto kTruncateThreads = 2; |
491 | |
|
492 | 0 | TestThreadHolder thread_holder; |
493 | 0 | for (int i = 0; i != kWriteThreads; ++i) { |
494 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] { |
495 | 0 | auto write_conn = ASSERT_RESULT(Connect()); |
496 | 0 | while (!stop.load(std::memory_order_acquire)) { |
497 | 0 | int t1_k = RandomUniformInt(0, kMaxKeys - 1); |
498 | 0 | int t1_v = RandomUniformInt(0, kMaxKeys - 1); |
499 | 0 | auto status = write_conn.ExecuteFormat("INSERT INTO t1 VALUES ($0, $1)", t1_k, t1_v); |
500 | 0 | int t2_k = RandomUniformInt(0, kMaxKeys - 1); |
501 | 0 | status = write_conn.ExecuteFormat("INSERT INTO t2 VALUES ($0, $1)", t2_k, t1_k); |
502 | 0 | } |
503 | 0 | }); |
504 | 0 | } |
505 | |
|
506 | 0 | for (int i = 0; i != kTruncateThreads; ++i) { |
507 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] { |
508 | 0 | auto truncate_conn = ASSERT_RESULT(Connect()); |
509 | 0 | int idx = 0; |
510 | 0 | while (!stop.load(std::memory_order_acquire)) { |
511 | 0 | auto status = truncate_conn.Execute("TRUNCATE TABLE t1, t2 CASCADE"); |
512 | 0 | ++idx; |
513 | 0 | std::this_thread::sleep_for(100ms); |
514 | 0 | } |
515 | 0 | }); |
516 | 0 | } |
517 | |
|
518 | 0 | thread_holder.WaitAndStop(30s); |
519 | 0 | } |
520 | | |
521 | | // Concurrently insert records to table with index. |
522 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentIndexInsert)) { |
523 | 0 | auto conn = ASSERT_RESULT(Connect()); |
524 | |
|
525 | 0 | ASSERT_OK(conn.Execute( |
526 | 0 | "CREATE TABLE IF NOT EXISTS users(id text, ename text, age int, PRIMARY KEY(id))")); |
527 | |
|
528 | 0 | ASSERT_OK(conn.Execute( |
529 | 0 | "CREATE INDEX IF NOT EXISTS name_idx ON users(ename)")); |
530 | |
|
531 | 0 | constexpr auto kWriteThreads = 4; |
532 | |
|
533 | 0 | std::atomic<bool> stop(false); |
534 | 0 | std::vector<std::thread> write_threads; |
535 | |
|
536 | 0 | while (write_threads.size() != kWriteThreads) { |
537 | 0 | write_threads.emplace_back([this, &stop] { |
538 | 0 | auto write_conn = ASSERT_RESULT(Connect()); |
539 | 0 | auto this_thread_id = std::this_thread::get_id(); |
540 | 0 | auto tid = std::hash<decltype(this_thread_id)>()(this_thread_id); |
541 | 0 | int idx = 0; |
542 | 0 | while (!stop.load(std::memory_order_acquire)) { |
543 | 0 | ASSERT_OK(write_conn.ExecuteFormat( |
544 | 0 | "INSERT INTO users (id, ename, age) VALUES ('user-$0-$1', 'name-$1', $2)", |
545 | 0 | tid, idx, 20 + (idx % 50))); |
546 | 0 | ++idx; |
547 | 0 | } |
548 | 0 | }); |
549 | 0 | } |
550 | |
|
551 | 0 | auto se = ScopeExit([&stop, &write_threads] { |
552 | 0 | stop.store(true, std::memory_order_release); |
553 | 0 | for (auto& thread : write_threads) { |
554 | 0 | thread.join(); |
555 | 0 | } |
556 | 0 | }); |
557 | |
|
558 | 0 | std::this_thread::sleep_for(30s); |
559 | 0 | } |
560 | | |
561 | | Result<int64_t> ReadSumBalance( |
562 | | PGConn* conn, int accounts, IsolationLevel isolation, |
563 | 0 | std::atomic<int>* counter) { |
564 | 0 | RETURN_NOT_OK(conn->StartTransaction(isolation)); |
565 | 0 | bool failed = true; |
566 | 0 | auto se = ScopeExit([conn, &failed] { |
567 | 0 | if (failed) { |
568 | 0 | EXPECT_OK(conn->Execute("ROLLBACK")); |
569 | 0 | } |
570 | 0 | }); |
571 | |
|
572 | 0 | std::string query = ""; |
573 | 0 | for (int i = 1; i <= accounts; ++i) { |
574 | 0 | if (!query.empty()) { |
575 | 0 | query += " UNION "; |
576 | 0 | } |
577 | 0 | query += Format("SELECT balance, id FROM account_$0 WHERE id = $0", i); |
578 | 0 | } |
579 | |
|
580 | 0 | auto res = VERIFY_RESULT(conn->FetchMatrix(query, accounts, 2)); |
581 | 0 | int64_t sum = 0; |
582 | 0 | for (int i = 0; i != accounts; ++i) { |
583 | 0 | sum += VERIFY_RESULT(GetValue<int64_t>(res.get(), i, 0)); |
584 | 0 | } |
585 | |
|
586 | 0 | failed = false; |
587 | 0 | RETURN_NOT_OK(conn->Execute("COMMIT")); |
588 | 0 | return sum; |
589 | 0 | } |
590 | | |
591 | 0 | void PgLibPqTest::TestMultiBankAccount(IsolationLevel isolation) { |
592 | 0 | constexpr int kAccounts = RegularBuildVsSanitizers(20, 10); |
593 | 0 | constexpr int64_t kInitialBalance = 100; |
594 | |
|
595 | 0 | #ifndef NDEBUG |
596 | 0 | const auto kTimeout = 180s; |
597 | 0 | constexpr int kThreads = RegularBuildVsSanitizers(12, 5); |
598 | | #else |
599 | | const auto kTimeout = 60s; |
600 | | constexpr int kThreads = 5; |
601 | | #endif |
602 | |
|
603 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
604 | 0 | std::vector<PGConn> thread_connections; |
605 | 0 | for (int i = 0; i < kThreads; ++i) { |
606 | 0 | thread_connections.push_back(ASSERT_RESULT(Connect())); |
607 | 0 | } |
608 | |
|
609 | 0 | for (int i = 1; i <= kAccounts; ++i) { |
610 | 0 | ASSERT_OK(conn.ExecuteFormat( |
611 | 0 | "CREATE TABLE account_$0 (id int, balance bigint, PRIMARY KEY(id))", i)); |
612 | 0 | ASSERT_OK(conn.ExecuteFormat( |
613 | 0 | "INSERT INTO account_$0 (id, balance) VALUES ($0, $1)", i, kInitialBalance)); |
614 | 0 | } |
615 | |
|
616 | 0 | std::atomic<int> writes(0); |
617 | 0 | std::atomic<int> reads(0); |
618 | |
|
619 | 0 | constexpr auto kRequiredReads = RegularBuildVsSanitizers(5, 2); |
620 | 0 | constexpr auto kRequiredWrites = RegularBuildVsSanitizers(1000, 500); |
621 | |
|
622 | 0 | std::atomic<int> counter(100000); |
623 | 0 | TestThreadHolder thread_holder; |
624 | 0 | for (int i = 1; i <= kThreads; ++i) { |
625 | 0 | thread_holder.AddThreadFunctor( |
626 | 0 | [&conn = thread_connections[i - 1], &writes, &isolation, |
627 | 0 | &stop_flag = thread_holder.stop_flag()]() { |
628 | 0 | while (!stop_flag.load(std::memory_order_acquire)) { |
629 | 0 | int from = RandomUniformInt(1, kAccounts); |
630 | 0 | int to = RandomUniformInt(1, kAccounts - 1); |
631 | 0 | if (to >= from) { |
632 | 0 | ++to; |
633 | 0 | } |
634 | 0 | int64_t amount = RandomUniformInt(1, 10); |
635 | 0 | ASSERT_OK(conn.StartTransaction(isolation)); |
636 | 0 | auto status = conn.ExecuteFormat( |
637 | 0 | "UPDATE account_$0 SET balance = balance - $1 WHERE id = $0", from, amount); |
638 | 0 | if (status.ok()) { |
639 | 0 | status = conn.ExecuteFormat( |
640 | 0 | "UPDATE account_$0 SET balance = balance + $1 WHERE id = $0", to, amount); |
641 | 0 | } |
642 | 0 | if (status.ok()) { |
643 | 0 | status = conn.Execute("COMMIT;"); |
644 | 0 | } else { |
645 | 0 | ASSERT_OK(conn.Execute("ROLLBACK;")); |
646 | 0 | } |
647 | 0 | if (!status.ok()) { |
648 | 0 | ASSERT_TRUE(TransactionalFailure(status)) << status; |
649 | 0 | } else { |
650 | 0 | LOG(INFO) << "Updated: " << from << " => " << to << " by " << amount; |
651 | 0 | ++writes; |
652 | 0 | } |
653 | 0 | } |
654 | 0 | }); |
655 | 0 | } |
656 | |
|
657 | 0 | thread_holder.AddThreadFunctor( |
658 | 0 | [this, &counter, &reads, &writes, isolation, &stop_flag = thread_holder.stop_flag()]() { |
659 | 0 | SetFlagOnExit set_flag_on_exit(&stop_flag); |
660 | 0 | auto connection = ASSERT_RESULT(Connect()); |
661 | 0 | auto failures_in_row = 0; |
662 | 0 | while (!stop_flag.load(std::memory_order_acquire)) { |
663 | 0 | if (isolation == IsolationLevel::SERIALIZABLE_ISOLATION) { |
664 | 0 | auto lower_bound = reads.load() * kRequiredWrites < writes.load() * kRequiredReads |
665 | 0 | ? 1.0 - 1.0 / (1ULL << failures_in_row) : 0.0; |
666 | 0 | ASSERT_OK(connection.ExecuteFormat( |
667 | 0 | "SET yb_transaction_priority_lower_bound = $0", lower_bound)); |
668 | 0 | } |
669 | 0 | auto sum = ReadSumBalance(&connection, kAccounts, isolation, &counter); |
670 | 0 | if (!sum.ok()) { |
671 | | // Do not overflow long when doing bitshift above. |
672 | 0 | failures_in_row = std::min(failures_in_row + 1, 63); |
673 | 0 | ASSERT_TRUE(TransactionalFailure(sum.status())) << sum.status(); |
674 | 0 | } else { |
675 | 0 | failures_in_row = 0; |
676 | 0 | ASSERT_EQ(*sum, kAccounts * kInitialBalance); |
677 | 0 | ++reads; |
678 | 0 | } |
679 | 0 | } |
680 | 0 | }); |
681 | |
|
682 | 0 | auto wait_status = WaitFor([&reads, &writes, &stop = thread_holder.stop_flag()] { |
683 | 0 | return stop.load() || (writes.load() >= kRequiredWrites && reads.load() >= kRequiredReads); |
684 | 0 | }, kTimeout, Format("At least $0 reads and $1 writes", kRequiredReads, kRequiredWrites)); |
685 | |
|
686 | 0 | LOG(INFO) << "Writes: " << writes.load() << ", reads: " << reads.load(); |
687 | |
|
688 | 0 | ASSERT_OK(wait_status); |
689 | |
|
690 | 0 | thread_holder.Stop(); |
691 | |
|
692 | 0 | ASSERT_OK(WaitFor([&conn, isolation, &counter]() -> Result<bool> { |
693 | 0 | auto sum = ReadSumBalance(&conn, kAccounts, isolation, &counter); |
694 | 0 | if (!sum.ok()) { |
695 | 0 | if (!TransactionalFailure(sum.status())) { |
696 | 0 | return sum.status(); |
697 | 0 | } |
698 | 0 | return false; |
699 | 0 | } |
700 | 0 | EXPECT_EQ(*sum, kAccounts * kInitialBalance); |
701 | 0 | return true; |
702 | 0 | }, 10s, "Final read")); |
703 | |
|
704 | 0 | auto total_not_found = 0; |
705 | 0 | for (auto* tserver : cluster_->tserver_daemons()) { |
706 | 0 | auto tablets = ASSERT_RESULT(cluster_->GetTabletIds(tserver)); |
707 | 0 | for (const auto& tablet : tablets) { |
708 | 0 | auto result = tserver->GetInt64Metric( |
709 | 0 | &METRIC_ENTITY_tablet, tablet.c_str(), &METRIC_transaction_not_found, "value"); |
710 | 0 | if (result.ok()) { |
711 | 0 | total_not_found += *result; |
712 | 0 | } else { |
713 | 0 | ASSERT_TRUE(result.status().IsNotFound()) << result.status(); |
714 | 0 | } |
715 | 0 | } |
716 | 0 | } |
717 | |
|
718 | 0 | LOG(INFO) << "Total not found: " << total_not_found; |
719 | | // Check that total not found is not too big. |
720 | 0 | ASSERT_LE(total_not_found, 200); |
721 | 0 | } |
722 | | |
723 | | class PgLibPqSmallClockSkewTest : public PgLibPqTest { |
724 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
725 | | // Use small clock skew, to decrease number of read restarts. |
726 | 0 | options->extra_tserver_flags.push_back("--max_clock_skew_usec=5000"); |
727 | 0 | } |
728 | | }; |
729 | | |
730 | | TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(MultiBankAccountSnapshot), |
731 | 0 | PgLibPqSmallClockSkewTest) { |
732 | 0 | TestMultiBankAccount(IsolationLevel::SNAPSHOT_ISOLATION); |
733 | 0 | } |
734 | | |
735 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(MultiBankAccountSerializable)) { |
736 | 0 | TestMultiBankAccount(IsolationLevel::SERIALIZABLE_ISOLATION); |
737 | 0 | } |
738 | | |
739 | 0 | void PgLibPqTest::DoIncrement(int key, int num_increments, IsolationLevel isolation) { |
740 | 0 | auto conn = ASSERT_RESULT(Connect()); |
741 | | |
742 | | // Perform increments |
743 | 0 | int succeeded_incs = 0; |
744 | 0 | while (succeeded_incs < num_increments) { |
745 | 0 | ASSERT_OK(conn.StartTransaction(isolation)); |
746 | 0 | bool committed = false; |
747 | 0 | auto exec_status = conn.ExecuteFormat("UPDATE t SET value = value + 1 WHERE key = $0", key); |
748 | 0 | if (exec_status.ok()) { |
749 | 0 | auto commit_status = conn.Execute("COMMIT"); |
750 | 0 | if (commit_status.ok()) { |
751 | 0 | succeeded_incs++; |
752 | 0 | committed = true; |
753 | 0 | } |
754 | 0 | } |
755 | 0 | if (!committed) { |
756 | 0 | ASSERT_OK(conn.Execute("ROLLBACK")); |
757 | 0 | } |
758 | 0 | } |
759 | 0 | } |
760 | | |
761 | 0 | void PgLibPqTest::TestParallelCounter(IsolationLevel isolation) { |
762 | 0 | auto conn = ASSERT_RESULT(Connect()); |
763 | |
|
764 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT, value INT)")); |
765 | |
|
766 | 0 | const auto kThreads = RegularBuildVsSanitizers(3, 2); |
767 | 0 | const auto kIncrements = RegularBuildVsSanitizers(100, 20); |
768 | | |
769 | | // Make a counter for each thread and have each thread increment it |
770 | 0 | std::vector<std::thread> threads; |
771 | 0 | while (threads.size() != kThreads) { |
772 | 0 | int key = narrow_cast<int>(threads.size()); |
773 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO t (key, value) VALUES ($0, 0)", key)); |
774 | |
|
775 | 0 | threads.emplace_back([this, key, isolation] { |
776 | 0 | DoIncrement(key, kIncrements, isolation); |
777 | 0 | }); |
778 | 0 | } |
779 | | |
780 | | // Wait for completion |
781 | 0 | for (auto& thread : threads) { |
782 | 0 | thread.join(); |
783 | 0 | } |
784 | | |
785 | | // Check each counter |
786 | 0 | for (int i = 0; i < kThreads; i++) { |
787 | 0 | auto res = ASSERT_RESULT(conn.FetchFormat("SELECT value FROM t WHERE key = $0", i)); |
788 | |
|
789 | 0 | auto row_val = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); |
790 | 0 | ASSERT_EQ(row_val, kIncrements); |
791 | 0 | } |
792 | 0 | } |
793 | | |
794 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestParallelCounterSerializable)) { |
795 | 0 | TestParallelCounter(IsolationLevel::SERIALIZABLE_ISOLATION); |
796 | 0 | } |
797 | | |
798 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestParallelCounterRepeatableRead)) { |
799 | 0 | TestParallelCounter(IsolationLevel::SNAPSHOT_ISOLATION); |
800 | 0 | } |
801 | | |
802 | 0 | void PgLibPqTest::TestConcurrentCounter(IsolationLevel isolation) { |
803 | 0 | auto conn = ASSERT_RESULT(Connect()); |
804 | |
|
805 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT, value INT)")); |
806 | |
|
807 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (0, 0)")); |
808 | |
|
809 | 0 | const auto kThreads = RegularBuildVsSanitizers(3, 2); |
810 | 0 | const auto kIncrements = RegularBuildVsSanitizers(100, 20); |
811 | | |
812 | | // Have each thread increment the same already-created counter |
813 | 0 | std::vector<std::thread> threads; |
814 | 0 | while (threads.size() != kThreads) { |
815 | 0 | threads.emplace_back([this, isolation] { |
816 | 0 | DoIncrement(0, kIncrements, isolation); |
817 | 0 | }); |
818 | 0 | } |
819 | | |
820 | | // Wait for completion |
821 | 0 | for (auto& thread : threads) { |
822 | 0 | thread.join(); |
823 | 0 | } |
824 | | |
825 | | // Check that we incremented exactly the desired number of times |
826 | 0 | auto res = ASSERT_RESULT(conn.Fetch("SELECT value FROM t WHERE key = 0")); |
827 | |
|
828 | 0 | auto row_val = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); |
829 | 0 | ASSERT_EQ(row_val, kThreads * kIncrements); |
830 | 0 | } |
831 | | |
832 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestConcurrentCounterSerializable)) { |
833 | 0 | TestConcurrentCounter(IsolationLevel::SERIALIZABLE_ISOLATION); |
834 | 0 | } |
835 | | |
836 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestConcurrentCounterRepeatableRead)) { |
837 | 0 | TestConcurrentCounter(IsolationLevel::SNAPSHOT_ISOLATION); |
838 | 0 | } |
839 | | |
840 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SecondaryIndexInsertSelect)) { |
841 | 0 | constexpr int kThreads = 4; |
842 | |
|
843 | 0 | auto conn = ASSERT_RESULT(Connect()); |
844 | |
|
845 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (a INT PRIMARY KEY, b INT)")); |
846 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX ON t (b, a)")); |
847 | |
|
848 | 0 | TestThreadHolder holder; |
849 | 0 | std::array<std::atomic<int>, kThreads> written; |
850 | 0 | for (auto& w : written) { |
851 | 0 | w.store(0, std::memory_order_release); |
852 | 0 | } |
853 | |
|
854 | 0 | for (int i = 0; i != kThreads; ++i) { |
855 | 0 | holder.AddThread([this, i, &stop = holder.stop_flag(), &written] { |
856 | 0 | auto connection = ASSERT_RESULT(Connect()); |
857 | 0 | int key = 0; |
858 | |
|
859 | 0 | while (!stop.load(std::memory_order_acquire)) { |
860 | 0 | if (RandomUniformBool()) { |
861 | 0 | int a = i * 1000000 + key; |
862 | 0 | int b = key; |
863 | 0 | ASSERT_OK(connection.ExecuteFormat("INSERT INTO t (a, b) VALUES ($0, $1)", a, b)); |
864 | 0 | written[i].store(++key, std::memory_order_release); |
865 | 0 | } else { |
866 | 0 | int writer_index = RandomUniformInt(0, kThreads - 1); |
867 | 0 | int num_written = written[writer_index].load(std::memory_order_acquire); |
868 | 0 | if (num_written == 0) { |
869 | 0 | continue; |
870 | 0 | } |
871 | 0 | int read_key = num_written - 1; |
872 | 0 | int b = read_key; |
873 | 0 | int read_a = ASSERT_RESULT(connection.FetchValue<int32_t>( |
874 | 0 | Format("SELECT a FROM t WHERE b = $0 LIMIT 1", b))); |
875 | 0 | ASSERT_EQ(read_a % 1000000, read_key); |
876 | 0 | } |
877 | 0 | } |
878 | 0 | }); |
879 | 0 | } |
880 | |
|
881 | 0 | holder.WaitAndStop(60s); |
882 | 0 | } |
883 | | |
884 | 0 | void AssertRows(PGConn *conn, int expected_num_rows) { |
885 | 0 | auto res = ASSERT_RESULT(conn->Fetch("SELECT * FROM test")); |
886 | 0 | ASSERT_EQ(PQntuples(res.get()), expected_num_rows); |
887 | 0 | } |
888 | | |
889 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(InTxnDelete)) { |
890 | 0 | auto conn = ASSERT_RESULT(Connect()); |
891 | |
|
892 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (pk int PRIMARY KEY)")); |
893 | 0 | ASSERT_OK(conn.Execute("BEGIN")); |
894 | 0 | ASSERT_OK(conn.Execute("INSERT INTO test VALUES (1)")); |
895 | 0 | ASSERT_NO_FATALS(AssertRows(&conn, 1)); |
896 | 0 | ASSERT_OK(conn.Execute("DELETE FROM test")); |
897 | 0 | ASSERT_NO_FATALS(AssertRows(&conn, 0)); |
898 | 0 | ASSERT_OK(conn.Execute("INSERT INTO test VALUES (1)")); |
899 | 0 | ASSERT_NO_FATALS(AssertRows(&conn, 1)); |
900 | 0 | ASSERT_OK(conn.Execute("COMMIT")); |
901 | |
|
902 | 0 | ASSERT_NO_FATALS(AssertRows(&conn, 1)); |
903 | 0 | } |
904 | | |
905 | | namespace { |
906 | | |
907 | | Result<string> GetNamespaceIdByNamespaceName( |
908 | 0 | client::YBClient* client, const string& namespace_name) { |
909 | 0 | const auto namespaces = VERIFY_RESULT(client->ListNamespaces(YQL_DATABASE_PGSQL)); |
910 | 0 | for (const auto& ns : namespaces) { |
911 | 0 | if (ns.name() == namespace_name) { |
912 | 0 | return ns.id(); |
913 | 0 | } |
914 | 0 | } |
915 | 0 | return STATUS(NotFound, "The namespace does not exist"); |
916 | 0 | } |
917 | | |
918 | | Result<string> GetTableIdByTableName( |
919 | 0 | client::YBClient* client, const string& namespace_name, const string& table_name) { |
920 | 0 | const auto tables = VERIFY_RESULT(client->ListTables()); |
921 | 0 | for (const auto& t : tables) { |
922 | 0 | if (t.namespace_name() == namespace_name && t.table_name() == table_name) { |
923 | 0 | return t.table_id(); |
924 | 0 | } |
925 | 0 | } |
926 | 0 | return STATUS(NotFound, "The table does not exist"); |
927 | 0 | } |
928 | | |
929 | | } // namespace |
930 | | |
931 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CompoundKeyColumnOrder)) { |
932 | 0 | const string namespace_name = "yugabyte"; |
933 | 0 | const string table_name = "test"; |
934 | 0 | auto conn = ASSERT_RESULT(Connect()); |
935 | 0 | ASSERT_OK(conn.ExecuteFormat( |
936 | 0 | "CREATE TABLE $0 (r2 int, r1 int, h int, v2 int, v1 int, primary key (h, r1, r2))", |
937 | 0 | table_name)); |
938 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
939 | |
|
940 | 0 | std::string table_id = |
941 | 0 | ASSERT_RESULT(GetTableIdByTableName(client.get(), namespace_name, table_name)); |
942 | 0 | std::shared_ptr<client::YBTableInfo> table_info = std::make_shared<client::YBTableInfo>(); |
943 | 0 | { |
944 | 0 | Synchronizer sync; |
945 | 0 | ASSERT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback())); |
946 | 0 | ASSERT_OK(sync.Wait()); |
947 | 0 | } |
948 | |
|
949 | 0 | const auto& columns = table_info->schema.columns(); |
950 | 0 | std::array<string, 5> expected_column_names{"h", "r1", "r2", "v2", "v1"}; |
951 | 0 | ASSERT_EQ(expected_column_names.size(), columns.size()); |
952 | 0 | for (size_t i = 0; i < expected_column_names.size(); ++i) { |
953 | 0 | ASSERT_EQ(columns[i].name(), expected_column_names[i]); |
954 | 0 | } |
955 | 0 | } |
956 | | |
957 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(BulkCopy)) { |
958 | 0 | const std::string kTableName = "customer"; |
959 | 0 | auto conn = ASSERT_RESULT(Connect()); |
960 | 0 | ASSERT_OK(conn.ExecuteFormat( |
961 | 0 | "CREATE TABLE CUSTOMER ( CUSTKEY INTEGER NOT NULL PRIMARY KEY,\n" |
962 | 0 | " NAME VARCHAR(25) NOT NULL,\n" |
963 | 0 | " ADDRESS VARCHAR(40) NOT NULL,\n" |
964 | 0 | " NATIONKEY INTEGER NOT NULL,\n" |
965 | 0 | " PHONE CHAR(15) NOT NULL,\n" |
966 | 0 | " MKTSEGMENT CHAR(10) NOT NULL,\n" |
967 | 0 | " COMMENT VARCHAR(117) NOT NULL);", |
968 | 0 | kTableName)); |
969 | |
|
970 | 0 | constexpr int kNumBatches = 10; |
971 | 0 | constexpr int kBatchSize = 1000; |
972 | |
|
973 | 0 | int customer_key = 0; |
974 | 0 | for (int i = 0; i != kNumBatches; ++i) { |
975 | 0 | ASSERT_OK(conn.CopyBegin(Format("COPY $0 FROM STDIN WITH BINARY", kTableName))); |
976 | 0 | for (int j = 0; j != kBatchSize; ++j) { |
977 | 0 | conn.CopyStartRow(7); |
978 | 0 | conn.CopyPutInt32(++customer_key); |
979 | 0 | conn.CopyPutString(Format("Name $0 $1", i, j)); |
980 | 0 | conn.CopyPutString(Format("Address $0 $1", i, j)); |
981 | 0 | conn.CopyPutInt32(i); |
982 | 0 | conn.CopyPutString(std::to_string(999999876543210 + customer_key)); |
983 | 0 | conn.CopyPutString(std::to_string(9876543210 + customer_key)); |
984 | 0 | conn.CopyPutString(Format("Comment $0 $1", i, j)); |
985 | 0 | } |
986 | |
|
987 | 0 | ASSERT_OK(conn.CopyEnd()); |
988 | 0 | } |
989 | |
|
990 | 0 | LOG(INFO) << "Finished copy"; |
991 | 0 | for (;;) { |
992 | 0 | auto result = conn.FetchFormat("SELECT COUNT(*) FROM $0", kTableName); |
993 | 0 | if (result.ok()) { |
994 | 0 | LogResult(result->get()); |
995 | 0 | auto count = ASSERT_RESULT(GetInt64(result->get(), 0, 0)); |
996 | 0 | LOG(INFO) << "Total count: " << count; |
997 | 0 | ASSERT_EQ(count, kNumBatches * kBatchSize); |
998 | 0 | break; |
999 | 0 | } else { |
1000 | 0 | auto message = result.status().ToString(); |
1001 | 0 | ASSERT_TRUE(message.find("Snaphost too old") != std::string::npos) << result.status(); |
1002 | 0 | } |
1003 | 0 | } |
1004 | 0 | } |
1005 | | |
1006 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CatalogManagerMapsTest)) { |
1007 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1008 | 0 | ASSERT_OK(conn.Execute("CREATE DATABASE test_db")); |
1009 | 0 | { |
1010 | 0 | auto test_conn = ASSERT_RESULT(ConnectToDB("test_db")); |
1011 | 0 | ASSERT_OK(test_conn.Execute("CREATE TABLE foo (a int PRIMARY KEY)")); |
1012 | 0 | ASSERT_OK(test_conn.Execute("ALTER TABLE foo RENAME TO bar")); |
1013 | 0 | ASSERT_OK(test_conn.Execute("ALTER TABLE bar RENAME COLUMN a to b")); |
1014 | 0 | } |
1015 | 0 | ASSERT_OK(conn.Execute("ALTER DATABASE test_db RENAME TO test_db_renamed")); |
1016 | |
|
1017 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1018 | 0 | Result<bool> result(false); |
1019 | 0 | result = client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, "test_db_renamed", "bar")); |
1020 | 0 | ASSERT_OK(result); |
1021 | 0 | ASSERT_TRUE(result.get()); |
1022 | 0 | result = client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, "test_db_renamed", "foo")); |
1023 | 0 | ASSERT_OK(result); |
1024 | 0 | ASSERT_FALSE(result.get()); |
1025 | 0 | result = client->NamespaceExists("test_db_renamed", YQL_DATABASE_PGSQL); |
1026 | 0 | ASSERT_OK(result); |
1027 | 0 | ASSERT_TRUE(result.get()); |
1028 | 0 | result = client->NamespaceExists("test_db", YQL_DATABASE_PGSQL); |
1029 | 0 | ASSERT_OK(result); |
1030 | 0 | ASSERT_FALSE(result.get()); |
1031 | |
|
1032 | 0 | std::string table_id = |
1033 | 0 | ASSERT_RESULT(GetTableIdByTableName(client.get(), "test_db_renamed", "bar")); |
1034 | 0 | std::shared_ptr<client::YBTableInfo> table_info = std::make_shared<client::YBTableInfo>(); |
1035 | 0 | { |
1036 | 0 | Synchronizer sync; |
1037 | 0 | ASSERT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback())); |
1038 | 0 | ASSERT_OK(sync.Wait()); |
1039 | 0 | } |
1040 | 0 | ASSERT_EQ(table_info->schema.num_columns(), 1); |
1041 | 0 | ASSERT_EQ(table_info->schema.Column(0).name(), "b"); |
1042 | 0 | } |
1043 | | |
1044 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestSystemTableRollback)) { |
1045 | 0 | auto conn1 = ASSERT_RESULT(Connect()); |
1046 | 0 | ASSERT_OK(conn1.Execute("CREATE TABLE pktable (ptest1 int PRIMARY KEY);")); |
1047 | 0 | Status s = conn1.Execute("CREATE TABLE fktable (ftest1 inet REFERENCES pktable);"); |
1048 | 0 | LOG(INFO) << "Status of second table creation: " << s; |
1049 | 0 | auto res = ASSERT_RESULT(conn1.Fetch("SELECT * FROM pg_class WHERE relname='fktable'")); |
1050 | 0 | ASSERT_EQ(0, PQntuples(res.get())); |
1051 | 0 | } |
1052 | | |
1053 | | namespace { |
1054 | | |
1055 | | Result<master::TabletLocationsPB> GetColocatedTabletLocations( |
1056 | | client::YBClient* client, |
1057 | | std::string database_name, |
1058 | 0 | MonoDelta timeout) { |
1059 | 0 | const string ns_id = |
1060 | 0 | VERIFY_RESULT(GetNamespaceIdByNamespaceName(client, database_name)); |
1061 | |
|
1062 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
1063 | | |
1064 | | // Get TabletLocations for the colocated tablet. |
1065 | 0 | RETURN_NOT_OK(WaitFor( |
1066 | 0 | [&]() -> Result<bool> { |
1067 | 0 | Status s = client->GetTabletsFromTableId( |
1068 | 0 | ns_id + master::kColocatedParentTableIdSuffix, |
1069 | 0 | 0 /* max_tablets */, |
1070 | 0 | &tablets); |
1071 | 0 | if (s.ok()) { |
1072 | 0 | return tablets.size() == 1; |
1073 | 0 | } else if (s.IsNotFound()) { |
1074 | 0 | return false; |
1075 | 0 | } else { |
1076 | 0 | return s; |
1077 | 0 | } |
1078 | 0 | }, |
1079 | 0 | timeout, |
1080 | 0 | "wait for colocated parent tablet")); |
1081 | |
|
1082 | 0 | return tablets[0]; |
1083 | 0 | } |
1084 | | |
1085 | 0 | const TableId GetTableGroupTableId(const std::string& tablegroup_id) { |
1086 | 0 | return tablegroup_id + master::kTablegroupParentTableIdSuffix; |
1087 | 0 | } |
1088 | | |
1089 | | Result<master::TabletLocationsPB> GetTablegroupTabletLocations( |
1090 | | client::YBClient* client, |
1091 | | std::string database_name, |
1092 | | std::string tablegroup_id, |
1093 | 0 | MonoDelta timeout) { |
1094 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
1095 | |
|
1096 | 0 | bool exists = VERIFY_RESULT(client->TablegroupExists(database_name, tablegroup_id)); |
1097 | 0 | if (!exists) { |
1098 | 0 | return STATUS(NotFound, "tablegroup does not exist"); |
1099 | 0 | } |
1100 | | |
1101 | | // Get TabletLocations for the tablegroup tablet. |
1102 | 0 | RETURN_NOT_OK(WaitFor( |
1103 | 0 | [&]() -> Result<bool> { |
1104 | 0 | Status s = client->GetTabletsFromTableId( |
1105 | 0 | GetTableGroupTableId(tablegroup_id), |
1106 | 0 | 0 /* max_tablets */, |
1107 | 0 | &tablets); |
1108 | 0 | if (s.ok()) { |
1109 | 0 | return tablets.size() == 1; |
1110 | 0 | } else if (s.IsNotFound()) { |
1111 | 0 | return false; |
1112 | 0 | } else { |
1113 | 0 | return s; |
1114 | 0 | } |
1115 | 0 | }, |
1116 | 0 | timeout, |
1117 | 0 | "wait for tablegroup parent tablet")); |
1118 | |
|
1119 | 0 | return tablets[0]; |
1120 | 0 | } |
1121 | | |
1122 | | } // namespace |
1123 | | |
1124 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TableColocation)) { |
1125 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1126 | 0 | const string kDatabaseName = "test_db"; |
1127 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
1128 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_bar_index; |
1129 | |
|
1130 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1131 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 WITH colocated = true", kDatabaseName)); |
1132 | 0 | conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1133 | | |
1134 | | // A parent table with one tablet should be created when the database is created. |
1135 | 0 | const auto colocated_tablet_locations = ASSERT_RESULT(GetColocatedTabletLocations( |
1136 | 0 | client.get(), |
1137 | 0 | kDatabaseName, |
1138 | 0 | 30s)); |
1139 | 0 | const auto colocated_tablet_id = colocated_tablet_locations.tablet_id(); |
1140 | 0 | const auto colocated_table = ASSERT_RESULT(client->OpenTable( |
1141 | 0 | colocated_tablet_locations.table_id())); |
1142 | | |
1143 | | // Create a range partition table, the table should share the tablet with the parent table. |
1144 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE foo (a INT, PRIMARY KEY (a ASC))")); |
1145 | 0 | auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo")); |
1146 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1147 | 0 | ASSERT_EQ(tablets.size(), 1); |
1148 | 0 | ASSERT_EQ(tablets[0].tablet_id(), colocated_tablet_id); |
1149 | | |
1150 | | // Create a colocated index table. |
1151 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX foo_index1 ON foo (a)")); |
1152 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo_index1")); |
1153 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1154 | 0 | ASSERT_EQ(tablets.size(), 1); |
1155 | 0 | ASSERT_EQ(tablets[0].tablet_id(), colocated_tablet_id); |
1156 | | |
1157 | | // Create a hash partition table and opt out of using the parent tablet. |
1158 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE bar (a INT) WITH (colocated = false)")); |
1159 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar")); |
1160 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1161 | 0 | for (auto& tablet : tablets) { |
1162 | 0 | ASSERT_NE(tablet.tablet_id(), colocated_tablet_id); |
1163 | 0 | } |
1164 | | |
1165 | | // Create an index on the non-colocated table. The index should follow the table and opt out of |
1166 | | // colocation. |
1167 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX bar_index ON bar (a)")); |
1168 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar_index")); |
1169 | 0 | const auto table_bar_index = ASSERT_RESULT(client->OpenTable(table_id)); |
1170 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1171 | 0 | for (auto& tablet : tablets) { |
1172 | 0 | ASSERT_NE(tablet.tablet_id(), colocated_tablet_id); |
1173 | 0 | } |
1174 | 0 | tablets_bar_index.Swap(&tablets); |
1175 | | |
1176 | | // Create a range partition table without specifying primary key. |
1177 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE baz (a INT)")); |
1178 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "baz")); |
1179 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1180 | 0 | ASSERT_EQ(tablets.size(), 1); |
1181 | 0 | ASSERT_EQ(tablets[0].tablet_id(), colocated_tablet_id); |
1182 | | |
1183 | | // Create another table and index. |
1184 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE qux (a INT, PRIMARY KEY (a ASC)) WITH (colocated = true)")); |
1185 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX qux_index ON qux (a)")); |
1186 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "qux_index")); |
1187 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1188 | | |
1189 | | // Drop a table in the parent tablet. |
1190 | 0 | ASSERT_OK(conn.Execute("DROP TABLE qux")); |
1191 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1192 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "qux")))); |
1193 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1194 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "qux_index")))); |
1195 | | |
1196 | | // Drop a table that is opted out. |
1197 | 0 | ASSERT_OK(conn.Execute("DROP TABLE bar")); |
1198 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1199 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar")))); |
1200 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1201 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar_index")))); |
1202 | | |
1203 | | // The tablets for bar_index should be deleted. |
1204 | 0 | std::vector<bool> tablet_founds(tablets_bar_index.size(), true); |
1205 | 0 | ASSERT_OK(WaitFor( |
1206 | 0 | [&] { |
1207 | 0 | for (int i = 0; i < tablets_bar_index.size(); ++i) { |
1208 | 0 | client->LookupTabletById( |
1209 | 0 | tablets_bar_index[i].tablet_id(), |
1210 | 0 | table_bar_index, |
1211 | 0 | master::IncludeInactive::kFalse, |
1212 | 0 | CoarseMonoClock::Now() + 30s, |
1213 | 0 | [&, i](const Result<client::internal::RemoteTabletPtr>& result) { |
1214 | 0 | tablet_founds[i] = result.ok(); |
1215 | 0 | }, |
1216 | 0 | client::UseCache::kFalse); |
1217 | 0 | } |
1218 | 0 | return std::all_of( |
1219 | 0 | tablet_founds.cbegin(), |
1220 | 0 | tablet_founds.cend(), |
1221 | 0 | [](bool tablet_found) { |
1222 | 0 | return !tablet_found; |
1223 | 0 | }); |
1224 | 0 | }, |
1225 | 0 | 30s, "Drop table opted out of colocation")); |
1226 | | |
1227 | | // Drop the database. |
1228 | 0 | conn = ASSERT_RESULT(Connect()); |
1229 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); |
1230 | 0 | ASSERT_FALSE(ASSERT_RESULT(client->NamespaceExists(kDatabaseName, YQL_DATABASE_PGSQL))); |
1231 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1232 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo")))); |
1233 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1234 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo_index1")))); |
1235 | | |
1236 | | // The colocation tablet should be deleted. |
1237 | 0 | bool tablet_found = true; |
1238 | 0 | int rpc_calls = 0; |
1239 | 0 | ASSERT_OK(WaitFor( |
1240 | 0 | [&] { |
1241 | 0 | rpc_calls++; |
1242 | 0 | client->LookupTabletById( |
1243 | 0 | colocated_tablet_id, |
1244 | 0 | colocated_table, |
1245 | 0 | master::IncludeInactive::kFalse, |
1246 | 0 | CoarseMonoClock::Now() + 30s, |
1247 | 0 | [&](const Result<client::internal::RemoteTabletPtr>& result) { |
1248 | 0 | tablet_found = result.ok(); |
1249 | 0 | rpc_calls--; |
1250 | 0 | }, |
1251 | 0 | client::UseCache::kFalse); |
1252 | 0 | return !tablet_found; |
1253 | 0 | }, |
1254 | 0 | 30s, "Drop colocated database")); |
1255 | | // To prevent an "AddressSanitizer: stack-use-after-scope", do not return from this function until |
1256 | | // all callbacks are done. |
1257 | 0 | ASSERT_OK(WaitFor( |
1258 | 0 | [&rpc_calls] { |
1259 | 0 | LOG(INFO) << "Waiting for " << rpc_calls << " RPCs to run callbacks"; |
1260 | 0 | return rpc_calls == 0; |
1261 | 0 | }, |
1262 | 0 | 30s, "Drop colocated database (wait for RPCs to finish)")); |
1263 | 0 | } |
1264 | | |
1265 | | // Test for ensuring that transaction conflicts work as expected for colocated tables. |
1266 | | // Related to https://github.com/yugabyte/yugabyte-db/issues/3251. |
1267 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TxnConflictsForColocatedTables)) { |
1268 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1269 | 0 | ASSERT_OK(conn.Execute("CREATE DATABASE test_db WITH colocated = true")); |
1270 | |
|
1271 | 0 | auto conn1 = ASSERT_RESULT(ConnectToDB("test_db")); |
1272 | 0 | auto conn2 = ASSERT_RESULT(ConnectToDB("test_db")); |
1273 | |
|
1274 | 0 | ASSERT_OK(conn1.Execute("CREATE TABLE t (a INT, PRIMARY KEY (a ASC))")); |
1275 | 0 | ASSERT_OK(conn1.Execute("INSERT INTO t(a) VALUES(1)")); |
1276 | | |
1277 | | // From conn1, select the row in UPDATE row lock mode. From conn2, delete the row. |
1278 | | // Ensure that conn1's transaction will detect a conflict at the time of commit. |
1279 | 0 | ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
1280 | 0 | auto res = ASSERT_RESULT(conn1.Fetch("SELECT * FROM t FOR UPDATE")); |
1281 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
1282 | |
|
1283 | 0 | auto status = conn2.Execute("DELETE FROM t WHERE a = 1"); |
1284 | 0 | ASSERT_FALSE(status.ok()); |
1285 | 0 | ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) << status; |
1286 | 0 | ASSERT_STR_CONTAINS(status.ToString(), "Conflicts with higher priority transaction"); |
1287 | |
|
1288 | 0 | ASSERT_OK(conn1.CommitTransaction()); |
1289 | | |
1290 | | // Ensure that reads to separate tables in a colocated database do not conflict. |
1291 | 0 | ASSERT_OK(conn1.Execute("CREATE TABLE t2 (a INT, PRIMARY KEY (a ASC))")); |
1292 | 0 | ASSERT_OK(conn1.Execute("INSERT INTO t2(a) VALUES(1)")); |
1293 | |
|
1294 | 0 | ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
1295 | 0 | ASSERT_OK(conn2.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); |
1296 | |
|
1297 | 0 | res = ASSERT_RESULT(conn1.Fetch("SELECT * FROM t FOR UPDATE")); |
1298 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
1299 | 0 | res = ASSERT_RESULT(conn2.Fetch("SELECT * FROM t2 FOR UPDATE")); |
1300 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
1301 | |
|
1302 | 0 | ASSERT_OK(conn1.CommitTransaction()); |
1303 | 0 | ASSERT_OK(conn2.CommitTransaction()); |
1304 | 0 | } |
1305 | | |
1306 | | // Ensure tablet bootstrap doesn't crash when replaying change metadata operations |
1307 | | // for a deleted colocated table. This is a regression test for #6096. |
1308 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ReplayDeletedTableInColocatedDB)) { |
1309 | 0 | const std::string kDatabaseName = "testdb"; |
1310 | 0 | constexpr int kTimeoutSecs = 30; |
1311 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1312 | |
|
1313 | 0 | PGConn conn = ASSERT_RESULT(Connect()); |
1314 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 with colocated=true", kDatabaseName)); |
1315 | |
|
1316 | 0 | PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1317 | 0 | ASSERT_OK(conn_new.Execute("CREATE TABLE foo (i int)")); |
1318 | 0 | ASSERT_OK(conn_new.Execute("INSERT INTO foo VALUES (10)")); |
1319 | | |
1320 | | // Flush tablets; requests from here on will be replayed from the WAL during bootstrap. |
1321 | 0 | auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo")); |
1322 | 0 | ASSERT_OK(client->FlushTables( |
1323 | 0 | {table_id}, |
1324 | 0 | false /* add_indexes */, |
1325 | 0 | kTimeoutSecs, |
1326 | 0 | false /* is_compaction */)); |
1327 | | |
1328 | | // ALTER requires foo's table id to be in the TS raft metadata |
1329 | 0 | ASSERT_OK(conn_new.Execute("ALTER TABLE foo ADD c char")); |
1330 | 0 | ASSERT_OK(conn_new.Execute("ALTER TABLE foo RENAME COLUMN c to d")); |
1331 | | // but DROP will remove foo's table id from the TS raft metadata |
1332 | 0 | ASSERT_OK(conn_new.Execute("DROP TABLE foo")); |
1333 | 0 | ASSERT_OK(conn_new.Execute("CREATE TABLE bar (c char)")); |
1334 | | |
1335 | | // Restart a TS that serves this tablet so we do a local bootstrap and replay WAL files. |
1336 | | // Ensure we don't crash here due to missing table info in metadata when replaying the ALTER. |
1337 | 0 | ASSERT_NO_FATALS(cluster_->tablet_server(0)->Shutdown()); |
1338 | |
|
1339 | 0 | LOG(INFO) << "Start tserver"; |
1340 | 0 | ASSERT_OK(cluster_->tablet_server(0)->Restart()); |
1341 | 0 | ASSERT_OK(cluster_->WaitForTabletsRunning(cluster_->tablet_server(0), |
1342 | 0 | MonoDelta::FromSeconds(60))); |
1343 | | |
1344 | | // Ensure the rest of the WAL replayed successfully. |
1345 | 0 | PGConn conn_after = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1346 | 0 | auto res = ASSERT_RESULT(conn_after.FetchValue<int64_t>("SELECT COUNT(*) FROM bar")); |
1347 | 0 | ASSERT_EQ(res, 0); |
1348 | 0 | } |
1349 | | |
1350 | | class PgLibPqTablegroupTest : public PgLibPqTest { |
1351 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
1352 | | // Enable tablegroup beta feature |
1353 | 0 | options->extra_tserver_flags.push_back("--ysql_beta_feature_tablegroup=true"); |
1354 | 0 | options->extra_master_flags.push_back("--ysql_beta_feature_tablegroup=true"); |
1355 | 0 | } |
1356 | | }; |
1357 | | |
1358 | | namespace { |
1359 | | |
1360 | | struct TableGroupInfo { |
1361 | | int oid; |
1362 | | std::string id; |
1363 | | TabletId tablet_id; |
1364 | | std::shared_ptr<client::YBTable> table; |
1365 | | }; |
1366 | | |
1367 | | Result<TableGroupInfo> SelectTableGroup( |
1368 | | client::YBClient* client, PGConn* conn, const std::string& database_name, |
1369 | 0 | const std::string& group_name) { |
1370 | 0 | TableGroupInfo group_info; |
1371 | 0 | auto res = VERIFY_RESULT( |
1372 | 0 | conn->FetchFormat("SELECT oid FROM pg_database WHERE datname=\'$0\'", database_name)); |
1373 | 0 | const int database_oid = VERIFY_RESULT(GetInt32(res.get(), 0, 0)); |
1374 | 0 | res = VERIFY_RESULT( |
1375 | 0 | conn->FetchFormat("SELECT oid FROM pg_yb_tablegroup WHERE grpname=\'$0\'", group_name)); |
1376 | 0 | group_info.oid = VERIFY_RESULT(GetInt32(res.get(), 0, 0)); |
1377 | |
|
1378 | 0 | group_info.id = GetPgsqlTablegroupId(database_oid, group_info.oid); |
1379 | 0 | group_info.tablet_id = VERIFY_RESULT(GetTablegroupTabletLocations( |
1380 | 0 | client, |
1381 | 0 | database_name, |
1382 | 0 | group_info.id, |
1383 | 0 | 30s)) |
1384 | 0 | .tablet_id(); |
1385 | 0 | group_info.table = VERIFY_RESULT(client->OpenTable(GetTableGroupTableId(group_info.id))); |
1386 | 0 | return group_info; |
1387 | 0 | } |
1388 | | |
1389 | | } // namespace |
1390 | | |
1391 | | TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ColocatedTablegroups), |
1392 | 0 | PgLibPqTablegroupTest) { |
1393 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1394 | 0 | const string kDatabaseName ="tgroup_test_db"; |
1395 | 0 | const string kTablegroupName ="test_tgroup"; |
1396 | 0 | const string kTablegroupAltName ="test_alt_tgroup"; |
1397 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
1398 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_bar_index; |
1399 | |
|
1400 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1401 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName)); |
1402 | 0 | conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1403 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", kTablegroupName)); |
1404 | | |
1405 | | // A parent table with one tablet should be created when the tablegroup is created. |
1406 | 0 | const auto tablegroup = ASSERT_RESULT(SelectTableGroup( |
1407 | 0 | client.get(), &conn, kDatabaseName, kTablegroupName)); |
1408 | | |
1409 | | // Create a range partition table, the table should share the tablet with the parent table. |
1410 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE foo (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0", |
1411 | 0 | kTablegroupName)); |
1412 | 0 | auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo")); |
1413 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1414 | 0 | ASSERT_EQ(tablets.size(), 1); |
1415 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id); |
1416 | | |
1417 | | // Create a index table that uses the tablegroup by default. |
1418 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX foo_index1 ON foo (a)")); |
1419 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo_index1")); |
1420 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1421 | 0 | ASSERT_EQ(tablets.size(), 1); |
1422 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id); |
1423 | | |
1424 | | // Create a hash partition table and dont use tablegroup. |
1425 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE bar (a INT)")); |
1426 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar")); |
1427 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1428 | 0 | for (auto& tablet : tablets) { |
1429 | 0 | ASSERT_NE(tablet.tablet_id(), tablegroup.tablet_id); |
1430 | 0 | } |
1431 | | |
1432 | | // Create an index on the table not in a tablegroup. The index should follow the table |
1433 | | // and opt out of the tablegroup. |
1434 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX bar_index ON bar (a)")); |
1435 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar_index")); |
1436 | 0 | const auto table_bar_index = ASSERT_RESULT(client->OpenTable(table_id)); |
1437 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1438 | 0 | for (auto& tablet : tablets) { |
1439 | 0 | ASSERT_NE(tablet.tablet_id(), tablegroup.tablet_id); |
1440 | 0 | } |
1441 | 0 | tablets_bar_index.Swap(&tablets); |
1442 | | |
1443 | | // Create a range partition table without specifying primary key. |
1444 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE baz (a INT) TABLEGROUP $0", kTablegroupName)); |
1445 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "baz")); |
1446 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1447 | 0 | ASSERT_EQ(tablets.size(), 1); |
1448 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id); |
1449 | | |
1450 | | // Create another table and index. |
1451 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE qux (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0", |
1452 | 0 | kTablegroupName)); |
1453 | 0 | ASSERT_OK(conn.Execute("CREATE INDEX qux_index ON qux (a)")); |
1454 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "qux")); |
1455 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1456 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id); |
1457 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "qux_index")); |
1458 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1459 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id); |
1460 | | |
1461 | | // Now create a second tablegroup. |
1462 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", kTablegroupAltName)); |
1463 | | |
1464 | | // A parent table with one tablet should be created when the tablegroup is created. |
1465 | 0 | auto tablegroup_alt = ASSERT_RESULT(SelectTableGroup( |
1466 | 0 | client.get(), &conn, kDatabaseName, kTablegroupAltName)); |
1467 | | |
1468 | | // Create another range partition table - should be part of the second tablegroup |
1469 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE quuz (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0", |
1470 | 0 | kTablegroupAltName)); |
1471 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "quuz")); |
1472 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1473 | 0 | ASSERT_EQ(tablets.size(), 1); |
1474 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup_alt.tablet_id); |
1475 | | |
1476 | | // Drop a table in the parent tablet. |
1477 | 0 | ASSERT_OK(conn.Execute("DROP TABLE quuz")); |
1478 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1479 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "quuz")))); |
1480 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1481 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "quuz_index")))); |
1482 | | |
1483 | | // Drop a table that is opted out. |
1484 | 0 | ASSERT_OK(conn.Execute("DROP TABLE bar")); |
1485 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1486 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar")))); |
1487 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1488 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar_index")))); |
1489 | | |
1490 | | // The tablets for bar_index should be deleted. |
1491 | 0 | std::vector<bool> tablet_founds(tablets_bar_index.size(), true); |
1492 | 0 | ASSERT_OK(WaitFor( |
1493 | 0 | [&] { |
1494 | 0 | for (int i = 0; i < tablets_bar_index.size(); ++i) { |
1495 | 0 | client->LookupTabletById( |
1496 | 0 | tablets_bar_index[i].tablet_id(), |
1497 | 0 | table_bar_index, |
1498 | 0 | master::IncludeInactive::kFalse, |
1499 | 0 | CoarseMonoClock::Now() + 30s, |
1500 | 0 | [&, i](const Result<client::internal::RemoteTabletPtr>& result) { |
1501 | 0 | tablet_founds[i] = result.ok(); |
1502 | 0 | }, |
1503 | 0 | client::UseCache::kFalse); |
1504 | 0 | } |
1505 | 0 | return std::all_of( |
1506 | 0 | tablet_founds.cbegin(), |
1507 | 0 | tablet_founds.cend(), |
1508 | 0 | [](bool tablet_found) { |
1509 | 0 | return !tablet_found; |
1510 | 0 | }); |
1511 | 0 | }, |
1512 | 0 | 30s, "Drop table did not use tablegroups")); |
1513 | | |
1514 | | // Drop a tablegroup. |
1515 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP TABLEGROUP $0", kTablegroupAltName)); |
1516 | 0 | ASSERT_FALSE(ASSERT_RESULT(client->TablegroupExists(kDatabaseName, kTablegroupAltName))); |
1517 | | |
1518 | | // The alt tablegroup tablet should be deleted after dropping the tablegroup. |
1519 | 0 | bool alt_tablet_found = true; |
1520 | 0 | int rpc_calls = 0; |
1521 | 0 | ASSERT_OK(WaitFor( |
1522 | 0 | [&] { |
1523 | 0 | rpc_calls++; |
1524 | 0 | client->LookupTabletById( |
1525 | 0 | tablegroup_alt.tablet_id, |
1526 | 0 | tablegroup_alt.table, |
1527 | 0 | master::IncludeInactive::kFalse, |
1528 | 0 | CoarseMonoClock::Now() + 30s, |
1529 | 0 | [&](const Result<client::internal::RemoteTabletPtr>& result) { |
1530 | 0 | alt_tablet_found = result.ok(); |
1531 | 0 | rpc_calls--; |
1532 | 0 | }, |
1533 | 0 | client::UseCache::kFalse); |
1534 | 0 | return !alt_tablet_found; |
1535 | 0 | }, |
1536 | 0 | 30s, "Drop tablegroup")); |
1537 | | |
1538 | | // Recreate that tablegroup. Being able to recreate it and add tables to it tests that it was |
1539 | | // properly cleaned up from catalog manager maps and postgres metadata at time of DROP. |
1540 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", kTablegroupAltName)); |
1541 | | |
1542 | | // A parent table with one tablet should be created when the tablegroup is created. |
1543 | 0 | tablegroup_alt = ASSERT_RESULT(SelectTableGroup( |
1544 | 0 | client.get(), &conn, kDatabaseName, kTablegroupAltName)); |
1545 | | |
1546 | | // Add a table back in and ensure that it is part of the recreated tablegroup. |
1547 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TABLE quuz (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0", |
1548 | 0 | kTablegroupAltName)); |
1549 | 0 | table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "quuz")); |
1550 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1551 | 0 | ASSERT_EQ(tablets.size(), 1); |
1552 | 0 | ASSERT_EQ(tablets[0].tablet_id(), tablegroup_alt.tablet_id); |
1553 | | |
1554 | | // Drop the database. |
1555 | 0 | conn = ASSERT_RESULT(Connect()); |
1556 | 0 | ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName)); |
1557 | 0 | ASSERT_FALSE(ASSERT_RESULT(client->NamespaceExists(kDatabaseName, YQL_DATABASE_PGSQL))); |
1558 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1559 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo")))); |
1560 | 0 | ASSERT_FALSE(ASSERT_RESULT( |
1561 | 0 | client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo_index1")))); |
1562 | | |
1563 | | // The original tablegroup tablet should be deleted after dropping the database. |
1564 | 0 | bool orig_tablet_found = true; |
1565 | 0 | ASSERT_OK(WaitFor( |
1566 | 0 | [&] { |
1567 | 0 | rpc_calls++; |
1568 | 0 | client->LookupTabletById( |
1569 | 0 | tablegroup.tablet_id, |
1570 | 0 | tablegroup.table, |
1571 | 0 | master::IncludeInactive::kFalse, |
1572 | 0 | CoarseMonoClock::Now() + 30s, |
1573 | 0 | [&](const Result<client::internal::RemoteTabletPtr>& result) { |
1574 | 0 | orig_tablet_found = result.ok(); |
1575 | 0 | rpc_calls--; |
1576 | 0 | }, |
1577 | 0 | client::UseCache::kFalse); |
1578 | 0 | return !orig_tablet_found; |
1579 | 0 | }, |
1580 | 0 | 30s, "Drop database with tablegroup")); |
1581 | | |
1582 | | // The second tablegroup tablet should also be deleted after dropping the database. |
1583 | 0 | bool second_tablet_found = true; |
1584 | 0 | ASSERT_OK(WaitFor( |
1585 | 0 | [&] { |
1586 | 0 | rpc_calls++; |
1587 | 0 | client->LookupTabletById( |
1588 | 0 | tablegroup_alt.tablet_id, |
1589 | 0 | tablegroup_alt.table, |
1590 | 0 | master::IncludeInactive::kFalse, |
1591 | 0 | CoarseMonoClock::Now() + 30s, |
1592 | 0 | [&](const Result<client::internal::RemoteTabletPtr>& result) { |
1593 | 0 | second_tablet_found = result.ok(); |
1594 | 0 | rpc_calls--; |
1595 | 0 | }, |
1596 | 0 | client::UseCache::kFalse); |
1597 | 0 | return !second_tablet_found; |
1598 | 0 | }, |
1599 | 0 | 30s, "Drop database with tablegroup")); |
1600 | | |
1601 | | // To prevent an "AddressSanitizer: stack-use-after-scope", do not return from this function until |
1602 | | // all callbacks are done. |
1603 | 0 | ASSERT_OK(WaitFor( |
1604 | 0 | [&rpc_calls] { |
1605 | 0 | LOG(INFO) << "Waiting for " << rpc_calls << " RPCs to run callbacks"; |
1606 | 0 | return rpc_calls == 0; |
1607 | 0 | }, |
1608 | 0 | 30s, "Drop database with tablegroup (wait for RPCs to finish)")); |
1609 | 0 | } |
1610 | | |
1611 | | // Test that the number of RPCs sent to master upon first connection is not too high. |
1612 | | // See https://github.com/yugabyte/yugabyte-db/issues/3049 |
1613 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(NumberOfInitialRpcs)) { |
1614 | 0 | auto get_master_inbound_rpcs_created = [&cluster_ = this->cluster_]() -> Result<int64_t> { |
1615 | 0 | int64_t m_in_created = 0; |
1616 | 0 | for (auto* master : cluster_->master_daemons()) { |
1617 | 0 | m_in_created += VERIFY_RESULT(master->GetInt64Metric( |
1618 | 0 | &METRIC_ENTITY_server, "yb.master", &METRIC_rpc_inbound_calls_created, "value")); |
1619 | 0 | } |
1620 | 0 | return m_in_created; |
1621 | 0 | }; |
1622 | |
|
1623 | 0 | int64_t rpcs_before = ASSERT_RESULT(get_master_inbound_rpcs_created()); |
1624 | 0 | ASSERT_RESULT(Connect()); |
1625 | 0 | int64_t rpcs_after = ASSERT_RESULT(get_master_inbound_rpcs_created()); |
1626 | 0 | int64_t rpcs_during = rpcs_after - rpcs_before; |
1627 | | |
1628 | | // Real-world numbers (debug build, local Mac): 328 RPCs before, 95 after the fix for #3049 |
1629 | 0 | LOG(INFO) << "Master inbound RPC during connection: " << rpcs_during; |
1630 | | // RPC counter is affected no only by table read/write operations but also by heartbeat mechanism. |
1631 | | // As far as ASAN/TSAN builds are slower they can receive more heartbeats while |
1632 | | // processing requests. As a result RPC count might be higher in comparison to other build types. |
1633 | 0 | ASSERT_LT(rpcs_during, RegularBuildVsSanitizers(150, 200)); |
1634 | 0 | } |
1635 | | |
1636 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(RangePresplit)) { |
1637 | 0 | const string kDatabaseName ="yugabyte"; |
1638 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1639 | |
|
1640 | 0 | auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
1641 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE range(a int, PRIMARY KEY(a ASC)) " \ |
1642 | 0 | "SPLIT AT VALUES ((100), (1000))")); |
1643 | |
|
1644 | 0 | auto ns_id = ASSERT_RESULT(GetNamespaceIdByNamespaceName(client.get(), kDatabaseName)); |
1645 | 0 | ASSERT_FALSE(ns_id.empty()); |
1646 | |
|
1647 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
1648 | 0 | auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "range")); |
1649 | | |
1650 | | // Validate that number of tablets created is 3. |
1651 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
1652 | 0 | ASSERT_EQ(tablets.size(), 3); |
1653 | 0 | } |
1654 | | |
1655 | | // Override the base test to start a cluster that kicks out unresponsive tservers faster. |
1656 | | class PgLibPqTestSmallTSTimeout : public PgLibPqTest { |
1657 | | public: |
1658 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
1659 | 0 | options->extra_master_flags.push_back("--tserver_unresponsive_timeout_ms=8000"); |
1660 | 0 | options->extra_master_flags.push_back("--unresponsive_ts_rpc_timeout_ms=10000"); |
1661 | 0 | options->extra_tserver_flags.push_back("--follower_unavailable_considered_failed_sec=8"); |
1662 | 0 | } |
1663 | | }; |
1664 | | |
1665 | | // Test that adding a tserver and removing a tserver causes the colocation tablet to adjust raft |
1666 | | // configuration off the old tserver and onto the new tserver. |
1667 | | TEST_F_EX(PgLibPqTest, |
1668 | | YB_DISABLE_TEST_IN_TSAN(LoadBalanceSingleColocatedDB), |
1669 | 0 | PgLibPqTestSmallTSTimeout) { |
1670 | 0 | const std::string kDatabaseName = "co"; |
1671 | 0 | const auto kTimeout = 60s; |
1672 | 0 | const auto starting_num_tablet_servers = cluster_->num_tablet_servers(); |
1673 | 0 | ExternalMiniClusterOptions opts; |
1674 | 0 | std::map<std::string, int> ts_loads; |
1675 | 0 | static const int tserver_unresponsive_timeout_ms = 8000; |
1676 | |
|
1677 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1678 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1679 | |
|
1680 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 WITH colocated = true", kDatabaseName)); |
1681 | | |
1682 | | // Collect colocation tablet replica locations. |
1683 | 0 | { |
1684 | 0 | master::TabletLocationsPB tablet_locations = ASSERT_RESULT(GetColocatedTabletLocations( |
1685 | 0 | client.get(), |
1686 | 0 | kDatabaseName, |
1687 | 0 | kTimeout)); |
1688 | 0 | for (const auto& replica : tablet_locations.replicas()) { |
1689 | 0 | ts_loads[replica.ts_info().permanent_uuid()]++; |
1690 | 0 | } |
1691 | 0 | } |
1692 | | |
1693 | | // Ensure each tserver has exactly one colocation tablet replica. |
1694 | 0 | ASSERT_EQ(ts_loads.size(), starting_num_tablet_servers); |
1695 | 0 | for (const auto& entry : ts_loads) { |
1696 | 0 | ASSERT_NOTNULL(cluster_->tablet_server_by_uuid(entry.first)); |
1697 | 0 | ASSERT_EQ(entry.second, 1); |
1698 | 0 | LOG(INFO) << "found ts " << entry.first << " has " << entry.second << " replicas"; |
1699 | 0 | } |
1700 | | |
1701 | | // Add a tablet server. |
1702 | 0 | UpdateMiniClusterOptions(&opts); |
1703 | 0 | ASSERT_OK(cluster_->AddTabletServer(ExternalMiniClusterOptions::kDefaultStartCqlProxy, |
1704 | 0 | opts.extra_tserver_flags)); |
1705 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(starting_num_tablet_servers + 1, kTimeout)); |
1706 | | |
1707 | | // Wait for load balancing. This should move some tablet-peers (e.g. of the colocation tablet, |
1708 | | // system.transactions tablets) to the new tserver. |
1709 | 0 | ASSERT_OK(WaitFor( |
1710 | 0 | [&]() -> Result<bool> { |
1711 | 0 | bool is_idle = VERIFY_RESULT(client->IsLoadBalancerIdle()); |
1712 | 0 | return !is_idle; |
1713 | 0 | }, |
1714 | 0 | kTimeout, |
1715 | 0 | "wait for load balancer to be active")); |
1716 | 0 | ASSERT_OK(WaitFor( |
1717 | 0 | [&]() -> Result<bool> { |
1718 | 0 | return client->IsLoadBalancerIdle(); |
1719 | 0 | }, |
1720 | 0 | kTimeout, |
1721 | 0 | "wait for load balancer to be idle")); |
1722 | | |
1723 | | // Remove a tablet server. |
1724 | 0 | cluster_->tablet_server(0)->Shutdown(); |
1725 | | |
1726 | | // Wait for the master leader to mark it dead. |
1727 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
1728 | 0 | return cluster_->is_ts_stale(0); |
1729 | 0 | }, |
1730 | 0 | MonoDelta::FromMilliseconds(2 * tserver_unresponsive_timeout_ms), |
1731 | 0 | "Is TS dead", |
1732 | 0 | MonoDelta::FromSeconds(1))); |
1733 | | |
1734 | | // Collect colocation tablet replica locations and verify that load has been moved off |
1735 | | // from the dead TS. |
1736 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1737 | 0 | master::TabletLocationsPB tablet_locations = VERIFY_RESULT(GetColocatedTabletLocations( |
1738 | 0 | client.get(), |
1739 | 0 | kDatabaseName, |
1740 | 0 | kTimeout)); |
1741 | 0 | ts_loads.clear(); |
1742 | 0 | for (const auto& replica : tablet_locations.replicas()) { |
1743 | 0 | ts_loads[replica.ts_info().permanent_uuid()]++; |
1744 | 0 | } |
1745 | | // Ensure each colocation tablet replica is on the three tablet servers excluding the first one, |
1746 | | // which is shut down. |
1747 | 0 | if (ts_loads.size() != starting_num_tablet_servers) { |
1748 | 0 | return false; |
1749 | 0 | } |
1750 | 0 | for (const auto& entry : ts_loads) { |
1751 | 0 | ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(entry.first); |
1752 | 0 | if (ts == nullptr || ts == cluster_->tablet_server(0) || entry.second != 1) { |
1753 | 0 | return false; |
1754 | 0 | } |
1755 | 0 | } |
1756 | 0 | return true; |
1757 | 0 | }, |
1758 | 0 | kTimeout, |
1759 | 0 | "Wait for load to be moved off from tserver 0")); |
1760 | 0 | } |
1761 | | |
1762 | | // Test that adding a tserver causes colocation tablets to offload tablet-peers to the new tserver. |
1763 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(LoadBalanceMultipleColocatedDB)) { |
1764 | 0 | constexpr int kNumDatabases = 3; |
1765 | 0 | const auto kTimeout = 60s; |
1766 | 0 | const size_t starting_num_tablet_servers = cluster_->num_tablet_servers(); |
1767 | 0 | const std::string kDatabasePrefix = "co"; |
1768 | 0 | std::map<std::string, int> ts_loads; |
1769 | |
|
1770 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1771 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1772 | |
|
1773 | 0 | for (int i = 0; i < kNumDatabases; ++i) { |
1774 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1 WITH colocated = true", kDatabasePrefix, i)); |
1775 | 0 | } |
1776 | | |
1777 | | // Add a tablet server. |
1778 | 0 | ASSERT_OK(cluster_->AddTabletServer()); |
1779 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(starting_num_tablet_servers + 1, kTimeout)); |
1780 | | |
1781 | | // Wait for load balancing. This should move some tablet-peers (e.g. of the colocation tablets, |
1782 | | // system.transactions tablets) to the new tserver. |
1783 | 0 | ASSERT_OK(WaitFor( |
1784 | 0 | [&]() -> Result<bool> { |
1785 | 0 | bool is_idle = VERIFY_RESULT(client->IsLoadBalancerIdle()); |
1786 | 0 | return !is_idle; |
1787 | 0 | }, |
1788 | 0 | kTimeout, |
1789 | 0 | "wait for load balancer to be active")); |
1790 | 0 | ASSERT_OK(WaitFor( |
1791 | 0 | [&]() -> Result<bool> { |
1792 | 0 | return client->IsLoadBalancerIdle(); |
1793 | 0 | }, |
1794 | 0 | kTimeout, |
1795 | 0 | "wait for load balancer to be idle")); |
1796 | | |
1797 | | // Collect colocation tablets' replica locations. |
1798 | 0 | for (int i = 0; i < kNumDatabases; ++i) { |
1799 | 0 | master::TabletLocationsPB tablet_locations = ASSERT_RESULT(GetColocatedTabletLocations( |
1800 | 0 | client.get(), |
1801 | 0 | Format("$0$1", kDatabasePrefix, i), |
1802 | 0 | kTimeout)); |
1803 | 0 | for (const auto& replica : tablet_locations.replicas()) { |
1804 | 0 | ts_loads[replica.ts_info().permanent_uuid()]++; |
1805 | 0 | } |
1806 | 0 | } |
1807 | | |
1808 | | // Ensure that the load is properly distributed. |
1809 | 0 | int min_load = kNumDatabases; |
1810 | 0 | int max_load = 0; |
1811 | 0 | for (const auto& entry : ts_loads) { |
1812 | 0 | if (entry.second < min_load) { |
1813 | 0 | min_load = entry.second; |
1814 | 0 | } else if (entry.second > max_load) { |
1815 | 0 | max_load = entry.second; |
1816 | 0 | } |
1817 | 0 | } |
1818 | 0 | LOG(INFO) << "Found max_load on a TS = " << max_load << ", and min_load on a ts = " << min_load; |
1819 | 0 | ASSERT_LT(max_load - min_load, 2); |
1820 | 0 | ASSERT_EQ(ts_loads.size(), kNumDatabases + 1); |
1821 | 0 | } |
1822 | | |
1823 | | // Override the base test to start a cluster with transparent retries on cache version mismatch |
1824 | | // disabled. |
1825 | | class PgLibPqTestNoRetry : public PgLibPqTest { |
1826 | | public: |
1827 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
1828 | 0 | options->extra_tserver_flags.push_back( |
1829 | 0 | "--TEST_ysql_disable_transparent_cache_refresh_retry=true"); |
1830 | 0 | } |
1831 | | }; |
1832 | | |
1833 | | // This test is like "TestPgCacheConsistency#testVersionMismatchWithFailedRetry". That one gets |
1834 | | // failures because the queries are "parse" message types, and we don't consider retry for those. |
1835 | | // These queries are "simple query" message types, so they should be considered for transparent |
1836 | | // retry. The last factor is whether `--TEST_ysql_disable_transparent_cache_refresh_retry` is |
1837 | | // specified. |
1838 | 0 | void PgLibPqTest::TestCacheRefreshRetry(const bool is_retry_disabled) { |
1839 | 0 | constexpr int kNumTries = 5; |
1840 | 0 | const std::string kNamespaceName = "yugabyte"; |
1841 | 0 | const std::string kTableName = "t"; |
1842 | 0 | int num_successes = 0; |
1843 | 0 | std::array<PGConn, 2> conns = { |
1844 | 0 | ASSERT_RESULT(ConnectToDB(kNamespaceName, true /* simple_query_protocol */)), |
1845 | 0 | ASSERT_RESULT(ConnectToDB(kNamespaceName, true /* simple_query_protocol */)), |
1846 | 0 | }; |
1847 | |
|
1848 | 0 | ASSERT_OK(conns[0].ExecuteFormat("CREATE TABLE $0 (i int)", kTableName)); |
1849 | | // Make the catalog version cache up to date. |
1850 | 0 | ASSERT_OK(conns[1].FetchFormat("SELECT * FROM $0", kTableName)); |
1851 | |
|
1852 | 0 | for (int i = 0; i < kNumTries; ++i) { |
1853 | 0 | ASSERT_OK(conns[0].ExecuteFormat("ALTER TABLE $0 ADD COLUMN j$1 int", kTableName, i)); |
1854 | 0 | auto res = conns[1].FetchFormat("SELECT * FROM $0", kTableName); |
1855 | 0 | if (is_retry_disabled) { |
1856 | | // Ensure that we fall under one of two cases: |
1857 | | // 1. tserver gets updated catalog version before SELECT (rare) |
1858 | | // - YBCheckSharedCatalogCacheVersion causes YBRefreshCache |
1859 | | // - trying the SELECT requires getting the table schema, but it will be a cache miss since |
1860 | | // the whole cache was invalidated, so we get the up-to-date table schema and succeed |
1861 | | // 2. tserver doesn't get updated catalog version before SELECT (common) |
1862 | | // - trying the SELECT causes catalog version mismatch |
1863 | 0 | if (res.ok()) { |
1864 | 0 | LOG(WARNING) << "SELECT was ok"; |
1865 | 0 | num_successes++; |
1866 | 0 | continue; |
1867 | 0 | } |
1868 | 0 | auto msg = res.status().message().ToBuffer(); |
1869 | 0 | ASSERT_TRUE(msg.find("Catalog Version Mismatch") != std::string::npos) << res.status(); |
1870 | 0 | } else { |
1871 | | // Ensure that the request is successful (thanks to retry). |
1872 | 0 | if (!res.ok()) { |
1873 | 0 | LOG(WARNING) << "SELECT was not ok: " << res.status(); |
1874 | 0 | continue; |
1875 | 0 | } |
1876 | 0 | num_successes++; |
1877 | 0 | } |
1878 | | // Make the catalog version cache up to date, if needed. |
1879 | 0 | ASSERT_OK(conns[1].FetchFormat("SELECT * FROM $0", kTableName)); |
1880 | 0 | } |
1881 | |
|
1882 | 0 | LOG(INFO) << "number of successes: " << num_successes << "/" << kNumTries; |
1883 | 0 | if (is_retry_disabled) { |
1884 | | // Expect at least half of the tries to fail with catalog version mismatch. There can be some |
1885 | | // successes because, between the ALTER and SELECT, the catalog version could have propogated |
1886 | | // through shared memory (see `YBCheckSharedCatalogCacheVersion`). |
1887 | 0 | const int num_failures = kNumTries - num_successes; |
1888 | 0 | ASSERT_GE(num_failures, kNumTries / 2); |
1889 | 0 | } else { |
1890 | | // Expect all the tries to succeed. This is because it is unacceptable to fail when retries are |
1891 | | // enabled. |
1892 | 0 | ASSERT_EQ(num_successes, kNumTries); |
1893 | 0 | } |
1894 | 0 | } |
1895 | | |
1896 | | TEST_F_EX(PgLibPqTest, |
1897 | | YB_DISABLE_TEST_IN_TSAN(CacheRefreshRetryDisabled), |
1898 | 0 | PgLibPqTestNoRetry) { |
1899 | 0 | TestCacheRefreshRetry(true /* is_retry_disabled */); |
1900 | 0 | } |
1901 | | |
1902 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CacheRefreshRetryEnabled)) { |
1903 | 0 | TestCacheRefreshRetry(false /* is_retry_disabled */); |
1904 | 0 | } |
1905 | | |
1906 | | class PgLibPqDatabaseTimeoutTest : public PgLibPqTest { |
1907 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
1908 | 0 | options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1"); |
1909 | 0 | options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000"); |
1910 | 0 | } |
1911 | | }; |
1912 | | |
1913 | 0 | TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutGC)) { |
1914 | 0 | NamespaceName test_name = "test_pgsql"; |
1915 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1916 | | |
1917 | | // Create Database: will timeout because the admin setting is lower than the DB create latency. |
1918 | 0 | { |
1919 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1920 | 0 | ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name)); |
1921 | 0 | } |
1922 | | |
1923 | | // Verify DocDB Database creation, even though it failed in PG layer. |
1924 | | // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. |
1925 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1926 | 0 | Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); |
1927 | 0 | WARN_NOT_OK(ResultToStatus(ret), "" /* prefix */); |
1928 | 0 | return ret.ok() && ret.get(); |
1929 | 0 | }, MonoDelta::FromSeconds(60), |
1930 | 0 | "Verify Namespace was created in DocDB")); |
1931 | | |
1932 | | // After bg_task_wait, DocDB will notice the PG layer failure because the transaction aborts. |
1933 | | // Confirm that DocDB async deletes the namespace. |
1934 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1935 | 0 | Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); |
1936 | 0 | WARN_NOT_OK(ResultToStatus(ret), "ret"); |
1937 | 0 | return ret.ok() && ret.get() == false; |
1938 | 0 | }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC")); |
1939 | 0 | } |
1940 | | |
1941 | 0 | TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutAndRestartGC)) { |
1942 | 0 | NamespaceName test_name = "test_pgsql"; |
1943 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1944 | | |
1945 | | // Create Database: will timeout because the admin setting is lower than the DB create latency. |
1946 | 0 | { |
1947 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1948 | 0 | ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name)); |
1949 | 0 | } |
1950 | | |
1951 | | // Verify DocDB Database creation, even though it fails in PG layer. |
1952 | | // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. |
1953 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1954 | 0 | Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); |
1955 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
1956 | 0 | return ret.ok() && ret.get() == true; |
1957 | 0 | }, MonoDelta::FromSeconds(60), |
1958 | 0 | "Verify Namespace was created in DocDB")); |
1959 | |
|
1960 | 0 | LOG(INFO) << "Restarting Master."; |
1961 | | |
1962 | | // Restart the master before the BG task can kick in and GC the failed transaction. |
1963 | 0 | auto master = cluster_->GetLeaderMaster(); |
1964 | 0 | master->Shutdown(); |
1965 | 0 | ASSERT_OK(master->Restart()); |
1966 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1967 | 0 | auto s = cluster_->GetIsMasterLeaderServiceReady(master); |
1968 | 0 | return s.ok(); |
1969 | 0 | }, MonoDelta::FromSeconds(20), "Wait for Master to be ready.")); |
1970 | | |
1971 | | // Confirm that Catalog Loader deletes the namespace on master restart. |
1972 | 0 | client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. |
1973 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1974 | 0 | Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL); |
1975 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
1976 | 0 | return ret.ok() && ret.get() == false; |
1977 | 0 | }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC")); |
1978 | 0 | } |
1979 | | |
1980 | | class PgLibPqTableTimeoutTest : public PgLibPqTest { |
1981 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
1982 | | // Use small clock skew, to decrease number of read restarts. |
1983 | 0 | options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1"); |
1984 | 0 | options->extra_master_flags.push_back("--TEST_simulate_slow_table_create_secs=2"); |
1985 | 0 | options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=3000"); |
1986 | 0 | } |
1987 | | }; |
1988 | | |
1989 | 0 | TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutGC)) { |
1990 | 0 | const string kDatabaseName ="yugabyte"; |
1991 | 0 | NamespaceName test_name = "test_pgsql_table"; |
1992 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
1993 | | |
1994 | | // Create Table: will timeout because the admin setting is lower than the DB create latency. |
1995 | 0 | { |
1996 | 0 | auto conn = ASSERT_RESULT(Connect()); |
1997 | 0 | ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); |
1998 | 0 | } |
1999 | | |
2000 | | // Wait for DocDB Table creation, even though it will fail in PG layer. |
2001 | | // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. |
2002 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2003 | 0 | LOG(INFO) << "Requesting TableExists"; |
2004 | 0 | auto ret = client->TableExists( |
2005 | 0 | client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); |
2006 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
2007 | 0 | return ret.ok() && ret.get() == true; |
2008 | 0 | }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB")); |
2009 | | |
2010 | | // DocDB will notice the PG layer failure because the transaction aborts. |
2011 | | // Confirm that DocDB async deletes the namespace. |
2012 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2013 | 0 | auto ret = client->TableExists( |
2014 | 0 | client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); |
2015 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
2016 | 0 | return ret.ok() && ret.get() == false; |
2017 | 0 | }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC")); |
2018 | 0 | } |
2019 | | |
2020 | 0 | TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutAndRestartGC)) { |
2021 | 0 | const string kDatabaseName ="yugabyte"; |
2022 | 0 | NamespaceName test_name = "test_pgsql_table"; |
2023 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
2024 | | |
2025 | | // Create Table: will timeout because the admin setting is lower than the DB create latency. |
2026 | 0 | { |
2027 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2028 | 0 | ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); |
2029 | 0 | } |
2030 | | |
2031 | | // Wait for DocDB Table creation, even though it will fail in PG layer. |
2032 | | // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. |
2033 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2034 | 0 | LOG(INFO) << "Requesting TableExists"; |
2035 | 0 | auto ret = client->TableExists( |
2036 | 0 | client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); |
2037 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
2038 | 0 | return ret.ok() && ret.get() == true; |
2039 | 0 | }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB")); |
2040 | |
|
2041 | 0 | LOG(INFO) << "Restarting Master."; |
2042 | | |
2043 | | // Restart the master before the BG task can kick in and GC the failed transaction. |
2044 | 0 | auto master = cluster_->GetLeaderMaster(); |
2045 | 0 | master->Shutdown(); |
2046 | 0 | ASSERT_OK(master->Restart()); |
2047 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2048 | 0 | auto s = cluster_->GetIsMasterLeaderServiceReady(master); |
2049 | 0 | return s.ok(); |
2050 | 0 | }, MonoDelta::FromSeconds(20), "Wait for Master to be ready.")); |
2051 | | |
2052 | | // Confirm that Catalog Loader deletes the namespace on master restart. |
2053 | 0 | client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart. |
2054 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2055 | 0 | auto ret = client->TableExists( |
2056 | 0 | client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name)); |
2057 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
2058 | 0 | return ret.ok() && ret.get() == false; |
2059 | 0 | }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC")); |
2060 | 0 | } |
2061 | | |
2062 | | class PgLibPqIndexTableTimeoutTest : public PgLibPqTest { |
2063 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
2064 | 0 | options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=10"); |
2065 | 0 | } |
2066 | | }; |
2067 | | |
2068 | 0 | TEST_F(PgLibPqIndexTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableTimeoutGC)) { |
2069 | 0 | const string kDatabaseName ="yugabyte"; |
2070 | 0 | NamespaceName test_name = "test_pgsql_table"; |
2071 | 0 | NamespaceName test_name_idx = test_name + "_idx"; |
2072 | |
|
2073 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
2074 | | |
2075 | | // Lower the delays so we successfully create this first table. |
2076 | 0 | ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10")); |
2077 | 0 | ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "0")); |
2078 | | |
2079 | | // Create Table that Index will be set on. |
2080 | 0 | { |
2081 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2082 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)")); |
2083 | 0 | } |
2084 | | |
2085 | | // After successfully creating the first table, set to flags similar to: PgLibPqTableTimeoutTest. |
2086 | 0 | ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "13000")); |
2087 | 0 | ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "12")); |
2088 | | |
2089 | | // Create Index: will timeout because the admin setting is lower than the DB create latency. |
2090 | 0 | { |
2091 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2092 | 0 | ASSERT_NOK(conn.Execute("CREATE INDEX " + test_name_idx + " ON " + test_name + "(key)")); |
2093 | 0 | } |
2094 | | |
2095 | | // Wait for DocDB Table creation, even though it will fail in PG layer. |
2096 | | // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC. |
2097 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2098 | 0 | LOG(INFO) << "Requesting TableExists"; |
2099 | 0 | auto ret = client->TableExists( |
2100 | 0 | client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx)); |
2101 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
2102 | 0 | return ret.ok() && ret.get() == true; |
2103 | 0 | }, MonoDelta::FromSeconds(40), "Verify Index Table was created in DocDB")); |
2104 | | |
2105 | | // DocDB will notice the PG layer failure because the transaction aborts. |
2106 | | // Confirm that DocDB async deletes the namespace. |
2107 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
2108 | 0 | auto ret = client->TableExists( |
2109 | 0 | client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx)); |
2110 | 0 | WARN_NOT_OK(ResultToStatus(ret), ""); |
2111 | 0 | return ret.ok() && ret.get() == false; |
2112 | 0 | }, MonoDelta::FromSeconds(40), "Verify Index Table was removed by Transaction GC")); |
2113 | 0 | } |
2114 | | |
2115 | | class PgLibPqTestEnumType: public PgLibPqTest { |
2116 | | public: |
2117 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
2118 | 0 | options->extra_tserver_flags.push_back("--TEST_do_not_add_enum_sort_order=true"); |
2119 | 0 | } |
2120 | | }; |
2121 | | |
2122 | | // Make sure that enum type backfill works. |
2123 | | TEST_F_EX(PgLibPqTest, |
2124 | | YB_DISABLE_TEST_IN_TSAN(EnumType), |
2125 | 0 | PgLibPqTestEnumType) { |
2126 | 0 | const string kDatabaseName ="yugabyte"; |
2127 | 0 | const string kTableName ="enum_table"; |
2128 | 0 | const string kEnumTypeName ="enum_type"; |
2129 | 0 | auto conn = std::make_unique<PGConn>(ASSERT_RESULT(ConnectToDB(kDatabaseName))); |
2130 | 0 | ASSERT_OK(conn->ExecuteFormat( |
2131 | 0 | "CREATE TYPE $0 as enum('b', 'e', 'f', 'c', 'a', 'd')", kEnumTypeName)); |
2132 | 0 | ASSERT_OK(conn->ExecuteFormat( |
2133 | 0 | "CREATE TABLE $0 (id $1)", |
2134 | 0 | kTableName, |
2135 | 0 | kEnumTypeName)); |
2136 | 0 | ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('a')", kTableName)); |
2137 | 0 | ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('b')", kTableName)); |
2138 | 0 | ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('c')", kTableName)); |
2139 | | |
2140 | | // Do table scan to verify contents the table with an ORDER BY clause. This |
2141 | | // ensures that old enum values which did not have sort order can be read back, |
2142 | | // sorted and displayed correctly. |
2143 | 0 | const std::string query = Format("SELECT * FROM $0 ORDER BY id", kTableName); |
2144 | 0 | ASSERT_FALSE(ASSERT_RESULT(conn->HasIndexScan(query))); |
2145 | 0 | PGResultPtr res = ASSERT_RESULT(conn->Fetch(query)); |
2146 | 0 | ASSERT_EQ(PQntuples(res.get()), 3); |
2147 | 0 | ASSERT_EQ(PQnfields(res.get()), 1); |
2148 | 0 | std::vector<string> values = { |
2149 | 0 | ASSERT_RESULT(GetString(res.get(), 0, 0)), |
2150 | 0 | ASSERT_RESULT(GetString(res.get(), 1, 0)), |
2151 | 0 | ASSERT_RESULT(GetString(res.get(), 2, 0)), |
2152 | 0 | }; |
2153 | 0 | ASSERT_EQ(values[0], "b"); |
2154 | 0 | ASSERT_EQ(values[1], "c"); |
2155 | 0 | ASSERT_EQ(values[2], "a"); |
2156 | | |
2157 | | // Now alter the gflag so any new values will have sort order added. |
2158 | 0 | ASSERT_OK(cluster_->SetFlagOnTServers( |
2159 | 0 | "TEST_do_not_add_enum_sort_order", "false")); |
2160 | | |
2161 | | // Disconnect from the database so we don't have a case where the |
2162 | | // postmaster dies while clients are still connected. |
2163 | 0 | conn = nullptr; |
2164 | | |
2165 | | // For each tablet server, kill the corresponding PostgreSQL process. |
2166 | | // A new PostgreSQL process will be respawned by the tablet server and |
2167 | | // inherit the new --TEST_do_not_add_enum_sort_order flag from the tablet |
2168 | | // server. |
2169 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { |
2170 | 0 | ExternalTabletServer* ts = cluster_->tablet_server(i); |
2171 | 0 | const string pg_pid_file = JoinPathSegments(ts->GetRootDir(), "pg_data", |
2172 | 0 | "postmaster.pid"); |
2173 | |
|
2174 | 0 | LOG(INFO) << "pg_pid_file: " << pg_pid_file; |
2175 | 0 | ASSERT_TRUE(Env::Default()->FileExists(pg_pid_file)); |
2176 | 0 | std::ifstream pg_pid_in; |
2177 | 0 | pg_pid_in.open(pg_pid_file, std::ios_base::in); |
2178 | 0 | ASSERT_FALSE(pg_pid_in.eof()); |
2179 | 0 | pid_t pg_pid = 0; |
2180 | 0 | pg_pid_in >> pg_pid; |
2181 | 0 | ASSERT_GT(pg_pid, 0); |
2182 | 0 | LOG(INFO) << "Killing PostgresSQL process: " << pg_pid; |
2183 | 0 | ASSERT_EQ(kill(pg_pid, SIGKILL), 0); |
2184 | 0 | } |
2185 | | |
2186 | | // Reconnect to the database after the new PostgreSQL starts. |
2187 | 0 | conn = std::make_unique<PGConn>(ASSERT_RESULT(ConnectToDB(kDatabaseName))); |
2188 | | |
2189 | | // Insert three more rows with --TEST_do_not_add_enum_sort_order=false. |
2190 | | // The new enum values will have sort order added. |
2191 | 0 | ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('d')", kTableName)); |
2192 | 0 | ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('e')", kTableName)); |
2193 | 0 | ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('f')", kTableName)); |
2194 | | |
2195 | | // Do table scan again to verify contents the table with an ORDER BY clause. |
2196 | | // This ensures that old enum values which did not have sort order, mixed |
2197 | | // with new enum values which have sort order, can be read back, sorted and |
2198 | | // displayed correctly. |
2199 | 0 | ASSERT_FALSE(ASSERT_RESULT(conn->HasIndexScan(query))); |
2200 | 0 | res = ASSERT_RESULT(conn->Fetch(query)); |
2201 | 0 | ASSERT_EQ(PQntuples(res.get()), 6); |
2202 | 0 | ASSERT_EQ(PQnfields(res.get()), 1); |
2203 | 0 | values = { |
2204 | 0 | ASSERT_RESULT(GetString(res.get(), 0, 0)), |
2205 | 0 | ASSERT_RESULT(GetString(res.get(), 1, 0)), |
2206 | 0 | ASSERT_RESULT(GetString(res.get(), 2, 0)), |
2207 | 0 | ASSERT_RESULT(GetString(res.get(), 3, 0)), |
2208 | 0 | ASSERT_RESULT(GetString(res.get(), 4, 0)), |
2209 | 0 | ASSERT_RESULT(GetString(res.get(), 5, 0)), |
2210 | 0 | }; |
2211 | 0 | ASSERT_EQ(values[0], "b"); |
2212 | 0 | ASSERT_EQ(values[1], "e"); |
2213 | 0 | ASSERT_EQ(values[2], "f"); |
2214 | 0 | ASSERT_EQ(values[3], "c"); |
2215 | 0 | ASSERT_EQ(values[4], "a"); |
2216 | 0 | ASSERT_EQ(values[5], "d"); |
2217 | | |
2218 | | // Create an index on the enum table column. |
2219 | 0 | ASSERT_OK(conn->ExecuteFormat("CREATE INDEX ON $0 (id ASC)", kTableName)); |
2220 | | |
2221 | | // Index only scan to verify contents of index table. |
2222 | 0 | ASSERT_TRUE(ASSERT_RESULT(conn->HasIndexScan(query))); |
2223 | 0 | res = ASSERT_RESULT(conn->Fetch(query)); |
2224 | 0 | ASSERT_EQ(PQntuples(res.get()), 6); |
2225 | 0 | ASSERT_EQ(PQnfields(res.get()), 1); |
2226 | 0 | values = { |
2227 | 0 | ASSERT_RESULT(GetString(res.get(), 0, 0)), |
2228 | 0 | ASSERT_RESULT(GetString(res.get(), 1, 0)), |
2229 | 0 | ASSERT_RESULT(GetString(res.get(), 2, 0)), |
2230 | 0 | ASSERT_RESULT(GetString(res.get(), 3, 0)), |
2231 | 0 | ASSERT_RESULT(GetString(res.get(), 4, 0)), |
2232 | 0 | ASSERT_RESULT(GetString(res.get(), 5, 0)), |
2233 | 0 | }; |
2234 | 0 | ASSERT_EQ(values[0], "b"); |
2235 | 0 | ASSERT_EQ(values[1], "e"); |
2236 | 0 | ASSERT_EQ(values[2], "f"); |
2237 | 0 | ASSERT_EQ(values[3], "c"); |
2238 | 0 | ASSERT_EQ(values[4], "a"); |
2239 | 0 | ASSERT_EQ(values[5], "d"); |
2240 | | |
2241 | | // Test where clause. |
2242 | 0 | const std::string query2 = Format("SELECT * FROM $0 where id = 'b'", kTableName); |
2243 | 0 | ASSERT_TRUE(ASSERT_RESULT(conn->HasIndexScan(query2))); |
2244 | 0 | res = ASSERT_RESULT(conn->Fetch(query2)); |
2245 | 0 | ASSERT_EQ(PQntuples(res.get()), 1); |
2246 | 0 | ASSERT_EQ(PQnfields(res.get()), 1); |
2247 | 0 | const string value = ASSERT_RESULT(GetString(res.get(), 0, 0)); |
2248 | 0 | ASSERT_EQ(value, "b"); |
2249 | 0 | } |
2250 | | |
2251 | | // Test postgres large oid (>= 2^31). Internally postgres oid is an unsigned 32-bit integer. But |
2252 | | // when extended to Datum type (unsigned long), the sign-bit is extended so that the high 32-bit |
2253 | | // is ffffffff. This caused unexpected assertion failures and errors. |
2254 | | class PgLibPqLargeOidTest: public PgLibPqTest { |
2255 | | public: |
2256 | 0 | void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { |
2257 | 0 | options->extra_tserver_flags.push_back( |
2258 | 0 | Format("--TEST_ysql_oid_prefetch_adjustment=$0", kOidAdjustment)); |
2259 | 0 | } |
2260 | | const Oid kOidAdjustment = 2147483648U - kPgFirstNormalObjectId; // 2^31 - 16384 |
2261 | | }; |
2262 | | TEST_F_EX(PgLibPqTest, |
2263 | | YB_DISABLE_TEST_IN_TSAN(LargeOid), |
2264 | 0 | PgLibPqLargeOidTest) { |
2265 | | // Test large OID with enum type which had Postgres Assert failure. |
2266 | 0 | const string kDatabaseName ="yugabyte"; |
2267 | 0 | const string kTableName ="enum_table"; |
2268 | 0 | const string kEnumTypeName ="enum_type"; |
2269 | 0 | PGConn conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
2270 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE TYPE $0 as enum('a', 'c')", kEnumTypeName)); |
2271 | | // Do ALTER TYPE to ensure we correctly put sort order as the high 32-bit after clearing |
2272 | | // the signed extended ffffffff. The following index scan would yield wrong order if we |
2273 | | // left ffffffff in the high 32-bit. |
2274 | 0 | ASSERT_OK(conn.ExecuteFormat("ALTER TYPE $0 ADD VALUE 'b' BEFORE 'c'", kEnumTypeName)); |
2275 | 0 | std::string query = "SELECT oid FROM pg_enum"; |
2276 | 0 | PGResultPtr res = ASSERT_RESULT(conn.Fetch(query)); |
2277 | 0 | ASSERT_EQ(PQntuples(res.get()), 3); |
2278 | 0 | ASSERT_EQ(PQnfields(res.get()), 1); |
2279 | 0 | std::vector<int32> enum_oids = { |
2280 | 0 | ASSERT_RESULT(GetInt32(res.get(), 0, 0)), |
2281 | 0 | ASSERT_RESULT(GetInt32(res.get(), 1, 0)), |
2282 | 0 | ASSERT_RESULT(GetInt32(res.get(), 2, 0)), |
2283 | 0 | }; |
2284 | | // Ensure that we do see large OIDs in pg_enum table. |
2285 | 0 | LOG(INFO) << "enum_oids: " << (Oid)enum_oids[0] << "," |
2286 | 0 | << (Oid)enum_oids[1] << "," << (Oid)enum_oids[2]; |
2287 | 0 | ASSERT_GT((Oid)enum_oids[0], kOidAdjustment); |
2288 | 0 | ASSERT_GT((Oid)enum_oids[1], kOidAdjustment); |
2289 | 0 | ASSERT_GT((Oid)enum_oids[2], kOidAdjustment); |
2290 | | |
2291 | | // Create a table using the enum type and insert a few rows. |
2292 | 0 | ASSERT_OK(conn.ExecuteFormat( |
2293 | 0 | "CREATE TABLE $0 (id $1)", |
2294 | 0 | kTableName, |
2295 | 0 | kEnumTypeName)); |
2296 | 0 | ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ('a'), ('b'), ('c')", kTableName)); |
2297 | | |
2298 | | // Create an index on the enum table column. |
2299 | 0 | ASSERT_OK(conn.ExecuteFormat("CREATE INDEX ON $0 (id ASC)", kTableName)); |
2300 | | |
2301 | | // Index only scan to verify that with large OIDs, the contents of index table |
2302 | | // is still correct. This also triggers index backfill statement, which used to |
2303 | | // fail on large oid such as: |
2304 | | // BACKFILL INDEX 2147500041 WITH x'0880011a00' READ TIME 6725053491126669312 PARTITION x'5555'; |
2305 | | // We fix the syntax error by rewriting it to |
2306 | | // BACKFILL INDEX -2147467255 WITH x'0880011a00' READ TIME 6725053491126669312 PARTITION x'5555'; |
2307 | | // Internally, -2147467255 will be reinterpreted as OID 2147500041 which is the OID of the index. |
2308 | 0 | query = Format("SELECT * FROM $0 ORDER BY id", kTableName); |
2309 | 0 | ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); |
2310 | 0 | res = ASSERT_RESULT(conn.Fetch(query)); |
2311 | 0 | ASSERT_EQ(PQntuples(res.get()), 3); |
2312 | 0 | ASSERT_EQ(PQnfields(res.get()), 1); |
2313 | 0 | std::vector<string> enum_values = { |
2314 | 0 | ASSERT_RESULT(GetString(res.get(), 0, 0)), |
2315 | 0 | ASSERT_RESULT(GetString(res.get(), 1, 0)), |
2316 | 0 | ASSERT_RESULT(GetString(res.get(), 2, 0)), |
2317 | 0 | }; |
2318 | 0 | ASSERT_EQ(enum_values[0], "a"); |
2319 | 0 | ASSERT_EQ(enum_values[1], "b"); |
2320 | 0 | ASSERT_EQ(enum_values[2], "c"); |
2321 | 0 | } |
2322 | | |
2323 | | namespace { |
2324 | | |
2325 | | class CoordinatedRunner { |
2326 | | public: |
2327 | | using RepeatableCommand = std::function<Status()>; |
2328 | | |
2329 | | explicit CoordinatedRunner(std::vector<RepeatableCommand> commands) |
2330 | 0 | : barrier_(commands.size()) { |
2331 | 0 | threads_.reserve(commands.size()); |
2332 | 0 | for (auto& c : commands) { |
2333 | 0 | threads_.emplace_back([this, cmd = std::move(c)] () { |
2334 | 0 | while (!(stop_.load(std::memory_order_acquire) || |
2335 | 0 | error_detected_.load(std::memory_order_acquire))) { |
2336 | 0 | barrier_.Wait(); |
2337 | 0 | const auto status = cmd(); |
2338 | 0 | if (!status.ok()) { |
2339 | 0 | LOG(ERROR) << "Error detected: " << status; |
2340 | 0 | error_detected_.store(true, std::memory_order_release); |
2341 | 0 | } |
2342 | 0 | } |
2343 | 0 | barrier_.Detach(); |
2344 | 0 | }); |
2345 | 0 | } |
2346 | 0 | } |
2347 | | |
2348 | 0 | void Stop() { |
2349 | 0 | stop_.store(true, std::memory_order_release); |
2350 | 0 | for (auto& thread : threads_) { |
2351 | 0 | thread.join(); |
2352 | 0 | } |
2353 | 0 | } |
2354 | | |
2355 | 0 | bool HasError() { |
2356 | 0 | return error_detected_.load(std::memory_order_acquire); |
2357 | 0 | } |
2358 | | |
2359 | | private: |
2360 | | std::vector<std::thread> threads_; |
2361 | | Barrier barrier_; |
2362 | | std::atomic<bool> stop_{false}; |
2363 | | std::atomic<bool> error_detected_{false}; |
2364 | | }; |
2365 | | |
2366 | 0 | bool RetryableError(const Status& status) { |
2367 | 0 | const auto msg = status.message().ToBuffer(); |
2368 | 0 | const std::string expected_errors[] = {"Try again", |
2369 | 0 | "Catalog Version Mismatch", |
2370 | 0 | "Restart read required at", |
2371 | 0 | "schema version mismatch for table"}; |
2372 | 0 | for (const auto& expected : expected_errors) { |
2373 | 0 | if (msg.find(expected) != std::string::npos) { |
2374 | 0 | return true; |
2375 | 0 | } |
2376 | 0 | } |
2377 | 0 | return false; |
2378 | 0 | } |
2379 | | |
2380 | | } // namespace |
2381 | | |
2382 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(PagingReadRestart)) { |
2383 | 0 | auto conn = ASSERT_RESULT(Connect()); |
2384 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)")); |
2385 | 0 | ASSERT_OK(conn.Execute("INSERT INTO t SELECT generate_series(1, 5000)")); |
2386 | 0 | const size_t reader_count = 20; |
2387 | 0 | std::vector<CoordinatedRunner::RepeatableCommand> commands; |
2388 | 0 | commands.reserve(reader_count + 1); |
2389 | 0 | commands.emplace_back( |
2390 | 0 | [connection = std::make_shared<PGConn>(ASSERT_RESULT(Connect()))] () -> Status { |
2391 | 0 | RETURN_NOT_OK(connection->Execute("ALTER TABLE t ADD COLUMN v INT DEFAULT 100")); |
2392 | 0 | RETURN_NOT_OK(connection->Execute("ALTER TABLE t DROP COLUMN v")); |
2393 | 0 | return Status::OK(); |
2394 | 0 | }); |
2395 | 0 | for (size_t i = 0; i < reader_count; ++i) { |
2396 | 0 | commands.emplace_back( |
2397 | 0 | [connection = std::make_shared<PGConn>(ASSERT_RESULT(Connect()))] () -> Status { |
2398 | 0 | const auto res = connection->Fetch("SELECT key FROM t"); |
2399 | 0 | return (res.ok() || RetryableError(res.status())) ? Status::OK() : res.status(); |
2400 | 0 | }); |
2401 | 0 | } |
2402 | 0 | CoordinatedRunner runner(std::move(commands)); |
2403 | 0 | std::this_thread::sleep_for(10s); |
2404 | 0 | runner.Stop(); |
2405 | 0 | ASSERT_FALSE(runner.HasError()); |
2406 | 0 | } |
2407 | | |
2408 | 0 | TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CollationRangePresplit)) { |
2409 | 0 | const string kDatabaseName ="yugabyte"; |
2410 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
2411 | |
|
2412 | 0 | auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); |
2413 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE collrange(a text COLLATE \"en-US-x-icu\", " |
2414 | 0 | "PRIMARY KEY(a ASC)) SPLIT AT VALUES (('100'), ('200'))")); |
2415 | |
|
2416 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
2417 | 0 | auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "collrange")); |
2418 | | |
2419 | | // Validate that number of tablets created is 3. |
2420 | 0 | ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets)); |
2421 | 0 | ASSERT_EQ(tablets.size(), 3); |
2422 | | // Partition key length of plain encoded '100' or '200'. |
2423 | 0 | const size_t partition_key_length = 7; |
2424 | | // When a text value is collation encoded, we need at least 3 extra bytes. |
2425 | 0 | const size_t min_collation_extra_bytes = 3; |
2426 | 0 | for (const auto& tablet : tablets) { |
2427 | 0 | ASSERT_TRUE(tablet.has_partition()); |
2428 | 0 | auto partition_start = tablet.partition().partition_key_start(); |
2429 | 0 | auto partition_end = tablet.partition().partition_key_end(); |
2430 | 0 | LOG(INFO) << "partition_start: " << b2a_hex(partition_start) |
2431 | 0 | << ", partition_end: " << b2a_hex(partition_end); |
2432 | 0 | ASSERT_TRUE(partition_start.empty() || |
2433 | 0 | partition_start.size() >= partition_key_length + min_collation_extra_bytes); |
2434 | 0 | ASSERT_TRUE(partition_end.empty() || |
2435 | 0 | partition_end.size() >= partition_key_length + min_collation_extra_bytes); |
2436 | 0 | } |
2437 | 0 | } |
2438 | | |
2439 | | } // namespace pgwrapper |
2440 | | } // namespace yb |