/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_on_conflict-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/common/common.pb.h" |
14 | | |
15 | | #include "yb/util/random_util.h" |
16 | | #include "yb/util/scope_exit.h" |
17 | | #include "yb/util/test_thread_holder.h" |
18 | | #include "yb/util/tsan_util.h" |
19 | | |
20 | | #include "yb/yql/pgwrapper/libpq_test_base.h" |
21 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
22 | | |
23 | | using namespace std::literals; |
24 | | |
25 | | DECLARE_int64(external_mini_cluster_max_log_bytes); |
26 | | |
27 | | namespace yb { |
28 | | namespace pgwrapper { |
29 | | |
30 | | class PgOnConflictTest : public LibPqTestBase { |
31 | | protected: |
32 | | void TestOnConflict(bool kill_master, const MonoDelta& duration); |
33 | | }; |
34 | | |
35 | | namespace { |
36 | | |
37 | | struct OnConflictKey { |
38 | | int key; |
39 | | size_t operation_index = 0; |
40 | | }; |
41 | | |
42 | | constexpr int kMaxBatchSize = 5; |
43 | | |
44 | | struct BatchInfo { |
45 | | int key; |
46 | | char append_char; // Zero means read request |
47 | | std::string read_value; |
48 | | |
49 | 0 | std::string ToString() const { |
50 | 0 | if (append_char) { |
51 | 0 | char x[2] = {append_char, 0}; |
52 | 0 | return Format("[$0+$1]", key, x); |
53 | 0 | } else { |
54 | 0 | return Format("[$0 $1]", key, read_value); |
55 | 0 | } |
56 | 0 | } |
57 | | |
58 | 0 | bool ComesBefore(const BatchInfo& rhs) const { |
59 | 0 | if (key != rhs.key) { |
60 | 0 | return false; |
61 | 0 | } |
62 | 0 | if (append_char) { |
63 | 0 | if (rhs.append_char) { |
64 | 0 | return false; |
65 | 0 | } |
66 | | // rhs see our append |
67 | 0 | return rhs.read_value.find(append_char) != std::string::npos; |
68 | 0 | } else if (!rhs.append_char) { |
69 | | // rhs has larger list |
70 | 0 | return read_value.length() < rhs.read_value.length(); |
71 | 0 | } else { |
72 | | // we don't see the result of rhs |
73 | 0 | return read_value.find(rhs.append_char) == std::string::npos; |
74 | 0 | } |
75 | 0 | } |
76 | | }; |
77 | | |
78 | | struct TransactionInfo { |
79 | | typedef std::array<BatchInfo, kMaxBatchSize> Batches; |
80 | | typedef Batches::const_iterator const_iterator; |
81 | | |
82 | | int batch_size = 0; |
83 | | Batches batches; |
84 | | int last_visit = 0; // Used to check whether this vertex was visited by current DFS run. |
85 | | |
86 | 0 | const_iterator begin() const { |
87 | 0 | return batches.begin(); |
88 | 0 | } |
89 | | |
90 | 0 | const_iterator end() const { |
91 | 0 | return batches.begin() + batch_size; |
92 | 0 | } |
93 | | |
94 | 0 | bool ComesBefore(const TransactionInfo& rhs) const { |
95 | 0 | for (const auto& lbatch : *this) { |
96 | 0 | for (const auto& rbatch : rhs) { |
97 | 0 | if (lbatch.ComesBefore(rbatch)) { |
98 | 0 | return true; |
99 | 0 | } |
100 | 0 | } |
101 | 0 | } |
102 | 0 | return false; |
103 | 0 | } |
104 | | }; |
105 | | |
106 | | class OnConflictHelper { |
107 | | public: |
108 | | explicit OnConflictHelper(size_t concurrent_keys) |
109 | 0 | : concurrent_keys_(concurrent_keys), active_keys_(concurrent_keys) { |
110 | 0 | for(size_t i = 0; i != concurrent_keys; ++i) { |
111 | 0 | active_keys_[i].key = ++next_key_; |
112 | 0 | } |
113 | 0 | for (auto i = 'A'; i <= 'Z'; ++i) { |
114 | 0 | chars_.push_back(i); |
115 | 0 | } |
116 | 0 | } |
117 | | |
118 | 0 | std::pair<int, char> RandomPair() { |
119 | 0 | size_t i = RandomUniformInt<size_t>(0, concurrent_keys_ - 1); |
120 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
121 | 0 | auto& key = active_keys_[i]; |
122 | 0 | char append_char; |
123 | 0 | if (RandomUniformBool()) { |
124 | 0 | append_char = 0; // Read key |
125 | 0 | } else { |
126 | 0 | append_char = chars_[key.operation_index]; |
127 | 0 | if (++key.operation_index == chars_.size()) { |
128 | 0 | key.key = ++next_key_; |
129 | 0 | key.operation_index = 0; |
130 | 0 | } |
131 | 0 | } |
132 | 0 | return std::make_pair(key.key, append_char); |
133 | 0 | } |
134 | | |
135 | 0 | void Committed(TransactionInfo&& info) { |
136 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
137 | 0 | committed_.push_back(std::move(info)); |
138 | 0 | } |
139 | | |
140 | 0 | void Report() { |
141 | 0 | LOG(INFO) << "Committed transactions:"; |
142 | |
|
143 | 0 | ordered_.reserve(committed_.size()); |
144 | | // Iteration order does not matter here, so we iterate from end to have lower keys at the start |
145 | | // of the list. |
146 | 0 | for (auto it = committed_.rbegin(); it != committed_.rend(); ++it) { |
147 | 0 | if (it->last_visit == 0) { |
148 | 0 | DepthFirstSearch(&*it, nullptr /* dest */); |
149 | 0 | } |
150 | 0 | } |
151 | |
|
152 | 0 | std::reverse(ordered_.begin(), ordered_.end()); |
153 | |
|
154 | 0 | for (const auto* info : ordered_) { |
155 | 0 | LOG(INFO) << " " << yb::ToString(*info); |
156 | 0 | } |
157 | |
|
158 | 0 | int inversions = 0; |
159 | 0 | for (auto it = ordered_.begin(); it != ordered_.end(); ++it) { |
160 | 0 | for (auto j = ordered_.begin(); j != it; ++j) { |
161 | 0 | if ((**it).ComesBefore(**j)) { |
162 | 0 | LOG(INFO) << "Order inversion: " << yb::ToString(**it) << " and " << yb::ToString(**j); |
163 | 0 | ++inversions; |
164 | 0 | ++query_; |
165 | 0 | DepthFirstSearch(*j, *it); |
166 | 0 | } |
167 | 0 | } |
168 | 0 | } |
169 | |
|
170 | 0 | ASSERT_EQ(inversions, 0); |
171 | 0 | } |
172 | | |
173 | | private: |
174 | | // Returns true if dest was reached. |
175 | 0 | bool DepthFirstSearch(TransactionInfo* v, TransactionInfo* dest) { |
176 | 0 | v->last_visit = query_; |
177 | 0 | if (v == dest) { |
178 | 0 | LOG(INFO) << " " << yb::ToString(*v); |
179 | 0 | return true; |
180 | 0 | } |
181 | 0 | for (auto& target : committed_) { |
182 | 0 | if (target.last_visit < query_ && v->ComesBefore(target)) { |
183 | 0 | if (DepthFirstSearch(&target, dest)) { |
184 | 0 | LOG(INFO) << " " << yb::ToString(*v); |
185 | 0 | return true; |
186 | 0 | } |
187 | 0 | } |
188 | 0 | } |
189 | 0 | if (!dest) { |
190 | 0 | ordered_.push_back(v); |
191 | 0 | } |
192 | 0 | return false; |
193 | 0 | } |
194 | | |
195 | | const size_t concurrent_keys_; |
196 | | std::string chars_; |
197 | | |
198 | | std::mutex mutex_; |
199 | | int next_key_ = 0; |
200 | | std::vector<OnConflictKey> active_keys_; |
201 | | std::vector<TransactionInfo> committed_; |
202 | | std::vector<TransactionInfo*> ordered_; |
203 | | // Number of depth-first search run, used to filter visited vertexes. |
204 | | int query_ = 1; |
205 | | }; |
206 | | |
207 | | } // anonymous namespace |
208 | | |
209 | | // Check that INSERT .. ON CONFLICT .. does not generate duplicate key errors. |
210 | 0 | void PgOnConflictTest::TestOnConflict(bool kill_master, const MonoDelta& duration) { |
211 | 0 | #ifndef NDEBUG |
212 | 0 | constexpr int kWriters = RegularBuildVsSanitizers(15, 5); |
213 | | #else |
214 | | constexpr int kWriters = 25; |
215 | | #endif |
216 | 0 | auto conn = ASSERT_RESULT(Connect()); |
217 | |
|
218 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY, v TEXT)")); |
219 | |
|
220 | 0 | std::atomic<int> processed(0); |
221 | 0 | TestThreadHolder thread_holder; |
222 | 0 | OnConflictHelper helper(3); |
223 | 0 | for (int i = 0; i != kWriters; ++i) { |
224 | 0 | thread_holder.AddThreadFunctor( |
225 | 0 | [this, &stop = thread_holder.stop_flag(), &processed, &helper] { |
226 | 0 | SetFlagOnExit set_flag_on_exit(&stop); |
227 | 0 | auto connection = ASSERT_RESULT(Connect()); |
228 | 0 | char value[2] = "0"; |
229 | 0 | while (!stop.load(std::memory_order_acquire)) { |
230 | 0 | int batch_size = RandomUniformInt(2, kMaxBatchSize); |
231 | 0 | TransactionInfo transaction_info; |
232 | 0 | transaction_info.batch_size = batch_size; |
233 | 0 | bool ok = false; |
234 | 0 | if (batch_size != 1) { |
235 | 0 | ASSERT_OK(connection.Execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")); |
236 | 0 | } |
237 | 0 | auto se = ScopeExit([&connection, batch_size, &ok, &processed, &helper, &transaction_info] { |
238 | 0 | if (batch_size != 1) { |
239 | 0 | if (ok) { |
240 | 0 | auto status = connection.Execute("COMMIT"); |
241 | 0 | if (status.ok()) { |
242 | 0 | ++processed; |
243 | 0 | helper.Committed(std::move(transaction_info)); |
244 | 0 | return; |
245 | 0 | } |
246 | 0 | auto msg = status.message().ToBuffer(); |
247 | 0 | if (msg.find("expired or aborted by a conflict") == std::string::npos && |
248 | 0 | msg.find("Transaction aborted") == std::string::npos) { |
249 | 0 | ASSERT_OK(status); |
250 | 0 | } |
251 | 0 | } |
252 | 0 | ASSERT_OK(connection.Execute("ROLLBACK")); |
253 | 0 | } else if (ok) { |
254 | | // To re-enable this we need to decrease the lower bound of batch_size to 1. |
255 | 0 | ++processed; |
256 | 0 | } |
257 | 0 | }); |
258 | 0 | ok = true; |
259 | 0 | for (int j = 0; j != batch_size; ++j) { |
260 | 0 | auto key_and_appended_char = helper.RandomPair(); |
261 | 0 | Status status; |
262 | 0 | auto& current_batch = transaction_info.batches[j]; |
263 | 0 | current_batch.key = key_and_appended_char.first; |
264 | 0 | current_batch.append_char = key_and_appended_char.second; |
265 | 0 | if (key_and_appended_char.second) { |
266 | 0 | value[0] = key_and_appended_char.second; |
267 | 0 | status = connection.ExecuteFormat( |
268 | 0 | "INSERT INTO test (k, v) VALUES ($0, '$1') ON CONFLICT (K) DO " |
269 | 0 | "UPDATE SET v = CONCAT(test.v, '$1')", |
270 | 0 | key_and_appended_char.first, value); |
271 | 0 | } else { |
272 | 0 | auto result = connection.FetchFormat( |
273 | 0 | "SELECT v FROM test WHERE k = $0", key_and_appended_char.first); |
274 | 0 | if (!result.ok()) { |
275 | 0 | status = result.status(); |
276 | 0 | } else { |
277 | 0 | auto tuples = PQntuples(result->get()); |
278 | 0 | if (tuples == 1) { |
279 | 0 | ASSERT_EQ(PQnfields(result->get()), 1); |
280 | 0 | current_batch.read_value = ASSERT_RESULT( |
281 | 0 | GetString(result->get(), 0, 0)); |
282 | 0 | } else { |
283 | 0 | ASSERT_EQ(tuples, 0); |
284 | 0 | } |
285 | 0 | } |
286 | 0 | } |
287 | 0 | if (status.ok()) { |
288 | 0 | continue; |
289 | 0 | } |
290 | 0 | ok = false; |
291 | 0 | if (TransactionalFailure(status)) { |
292 | 0 | break; |
293 | 0 | } |
294 | 0 | auto msg = status.message().ToBuffer(); |
295 | 0 | if (msg.find("Snapshot too old: Snapshot too old.") != std::string::npos || |
296 | 0 | msg.find("Commit of expired transaction") != std::string::npos || |
297 | 0 | msg.find("Catalog Version Mismatch") != std::string::npos || |
298 | 0 | msg.find("Soft memory limit exceeded") != std::string::npos || |
299 | 0 | msg.find("Missing metadata for transaction") != std::string::npos || |
300 | 0 | msg.find("timed out after deadline expired") != std::string::npos) { |
301 | 0 | break; |
302 | 0 | } |
303 | | |
304 | 0 | ASSERT_OK(status); |
305 | 0 | } |
306 | 0 | } |
307 | 0 | }); |
308 | 0 | } |
309 | |
|
310 | 0 | if (!kill_master) { |
311 | 0 | thread_holder.WaitAndStop(duration.ToSteadyDuration()); |
312 | 0 | } else { |
313 | | // Every 15 seconds, pick a random master, then kill it if it is running, otherwise resume it. |
314 | 0 | auto deadline = CoarseMonoClock::now() + duration; |
315 | 0 | auto num_masters = cluster_->num_masters(); |
316 | 0 | while (!thread_holder.stop_flag().load(std::memory_order_acquire)) { |
317 | 0 | MonoDelta left(deadline - CoarseMonoClock::now()); |
318 | 0 | if (left < MonoDelta::kZero) { |
319 | 0 | break; |
320 | 0 | } |
321 | 0 | auto* master = cluster_->master(RandomUniformInt<size_t>(0, num_masters - 1)); |
322 | 0 | if (master->IsProcessAlive()) { |
323 | 0 | std::this_thread::sleep_for( |
324 | 0 | std::min(left, MonoDelta(20s) * kTimeMultiplier).ToSteadyDuration()); |
325 | 0 | LOG(INFO) << "Killing: " << master->uuid(); |
326 | 0 | master->Shutdown(); |
327 | 0 | } else { |
328 | 0 | std::this_thread::sleep_for( |
329 | 0 | std::min(left, MonoDelta(15s)).ToSteadyDuration()); |
330 | 0 | LOG(INFO) << "Resuming: " << master->uuid(); |
331 | 0 | ASSERT_OK(master->Start()); |
332 | 0 | } |
333 | 0 | int live_masters = 0; |
334 | 0 | for (size_t i = 0; i != num_masters; ++i) { |
335 | 0 | if (cluster_->master(i)->IsProcessAlive()) { |
336 | 0 | ++live_masters; |
337 | 0 | } |
338 | 0 | } |
339 | 0 | LOG(INFO) << "Live masters: " << live_masters; |
340 | 0 | } |
341 | |
|
342 | 0 | for (size_t i = 0; i != num_masters; ++i) { |
343 | 0 | if (!cluster_->master(i)->IsProcessAlive()) { |
344 | 0 | ASSERT_OK(cluster_->master(i)->Start()); |
345 | 0 | } |
346 | 0 | } |
347 | |
|
348 | 0 | thread_holder.Stop(); |
349 | 0 | } |
350 | |
|
351 | 0 | for (;;) { |
352 | 0 | auto res = conn.Fetch("SELECT * FROM test ORDER BY k"); |
353 | 0 | if (!res.ok()) { |
354 | 0 | ASSERT_TRUE(TransactionalFailure(res.status())) << res.status(); |
355 | 0 | continue; |
356 | 0 | } |
357 | 0 | int cols = PQnfields(res->get()); |
358 | 0 | ASSERT_EQ(cols, 2); |
359 | 0 | int rows = PQntuples(res->get()); |
360 | 0 | for (int i = 0; i != rows; ++i) { |
361 | 0 | auto key = GetInt32(res->get(), i, 0); |
362 | 0 | auto value = GetString(res->get(), i, 1); |
363 | 0 | LOG(INFO) << " " << key << ": " << value; |
364 | 0 | } |
365 | 0 | LOG(INFO) << "Total processed: " << processed.load(std::memory_order_acquire); |
366 | 0 | break; |
367 | 0 | } |
368 | |
|
369 | 0 | helper.Report(); |
370 | 0 | } |
371 | | |
372 | 0 | TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(OnConflict)) { |
373 | 0 | TestOnConflict(false /* kill_master */, 120s); |
374 | 0 | } |
375 | | |
376 | 0 | TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(OnConflictWithKillMaster)) { |
377 | 0 | TestOnConflict(true /* kill_master */, 180s); |
378 | 0 | } |
379 | | |
380 | | // When auto-commit fails block state switched to TBLOCK_ABORT. |
381 | | // But correct state in this case is TBLOCK_DEFAULT. |
382 | | // https://github.com/YugaByte/yugabyte-db/commit/73e966e5735efc21bf2ad43f9d961a488afbe050 |
383 | 0 | TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(NoTxnOnConflict)) { |
384 | 0 | constexpr int kWriters = 5; |
385 | 0 | constexpr int kKeys = 20; |
386 | 0 | auto conn = ASSERT_RESULT(Connect()); |
387 | |
|
388 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY, v TEXT)")); |
389 | |
|
390 | 0 | TestThreadHolder thread_holder; |
391 | 0 | for (int i = 0; i != kWriters; ++i) { |
392 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] { |
393 | 0 | SetFlagOnExit set_flag_on_exit(&stop); |
394 | 0 | auto connection = ASSERT_RESULT(Connect()); |
395 | 0 | char value[2] = "0"; |
396 | 0 | while (!stop.load(std::memory_order_acquire)) { |
397 | 0 | int key = RandomUniformInt(1, kKeys); |
398 | 0 | value[0] = RandomUniformInt('A', 'Z'); |
399 | 0 | auto status = connection.ExecuteFormat( |
400 | 0 | "INSERT INTO test (k, v) VALUES ($0, '$1') ON CONFLICT (K) DO " |
401 | 0 | "UPDATE SET v = CONCAT(test.v, '$1')", |
402 | 0 | key, value); |
403 | 0 | if (status.ok() || TransactionalFailure(status)) { |
404 | 0 | continue; |
405 | 0 | } |
406 | 0 | ASSERT_OK(status); |
407 | 0 | } |
408 | 0 | }); |
409 | 0 | } |
410 | |
|
411 | 0 | thread_holder.WaitAndStop(30s); |
412 | 0 | LogResult(ASSERT_RESULT(conn.Fetch("SELECT * FROM test ORDER BY k")).get()); |
413 | 0 | } |
414 | | |
415 | 0 | TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(ValidSessionAfterTxnCommitConflict)) { |
416 | 0 | auto conn = ASSERT_RESULT(Connect()); |
417 | 0 | ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY)")); |
418 | 0 | ASSERT_OK(conn.Execute("BEGIN")); |
419 | 0 | ASSERT_OK(conn.Execute("INSERT INTO test VALUES(1)")); |
420 | 0 | auto extra_conn = ASSERT_RESULT(Connect()); |
421 | 0 | ASSERT_OK(extra_conn.Execute("INSERT INTO test VALUES(1)")); |
422 | 0 | ASSERT_NOK(conn.Execute("COMMIT")); |
423 | | // Check connection is in valid state after failed COMMIT |
424 | 0 | auto result_ptr = ASSERT_RESULT(conn.Fetch("SELECT * FROM test")); |
425 | 0 | auto value = ASSERT_RESULT(GetInt32(result_ptr.get(), 0, 0)); |
426 | 0 | ASSERT_EQ(value, 1); |
427 | 0 | } |
428 | | |
429 | | } // namespace pgwrapper |
430 | | } // namespace yb |