/Users/deen/code/yugabyte-db/src/yb/client/serializable-txn-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/error.h" |
15 | | #include "yb/client/session.h" |
16 | | #include "yb/client/transaction.h" |
17 | | #include "yb/client/txn-test-base.h" |
18 | | #include "yb/client/yb_op.h" |
19 | | |
20 | | #include "yb/gutil/casts.h" |
21 | | |
22 | | #include "yb/util/async_util.h" |
23 | | #include "yb/util/random_util.h" |
24 | | #include "yb/util/thread.h" |
25 | | #include "yb/util/tsan_util.h" |
26 | | |
27 | | using namespace std::literals; |
28 | | |
29 | | DECLARE_int64(transaction_rpc_timeout_ms); |
30 | | DECLARE_int32(txn_max_apply_batch_records); |
31 | | |
32 | | namespace yb { |
33 | | namespace client { |
34 | | |
35 | | class SerializableTxnTest |
36 | | : public TransactionCustomLogSegmentSizeTest<0, TransactionTestBase<MiniCluster>> { |
37 | | protected: |
38 | 6 | void SetUp() override { |
39 | 6 | SetIsolationLevel(IsolationLevel::SERIALIZABLE_ISOLATION); |
40 | 6 | TransactionTestBase::SetUp(); |
41 | 6 | } |
42 | | |
43 | | void TestIncrements(bool transactional); |
44 | | void TestIncrement(int key, bool transactional); |
45 | | void TestColoring(); |
46 | | }; |
47 | | |
48 | 0 | TEST_F(SerializableTxnTest, NonConflictingWrites) { |
49 | 0 | const auto kTransactions = 10; |
50 | 0 | const auto kKey = 0; |
51 | |
|
52 | 0 | struct Entry { |
53 | 0 | YBTransactionPtr txn; |
54 | 0 | YBqlWriteOpPtr op; |
55 | 0 | std::future<FlushStatus> flush_future; |
56 | 0 | std::future<Status> commit_future; |
57 | 0 | bool done = false; |
58 | 0 | }; |
59 | |
|
60 | 0 | std::vector<Entry> entries; |
61 | 0 | for (int i = 0; i != kTransactions; ++i) { |
62 | 0 | entries.emplace_back(); |
63 | 0 | auto& entry = entries.back(); |
64 | 0 | entry.txn = CreateTransaction(); |
65 | 0 | auto session = CreateSession(entry.txn); |
66 | 0 | entry.op = ASSERT_RESULT(WriteRow(session, kKey, i)); |
67 | 0 | entry.flush_future = session->FlushFuture(); |
68 | 0 | } |
69 | |
|
70 | 0 | ASSERT_OK(WaitFor([&entries]() -> Result<bool> { |
71 | 0 | for (auto& entry : entries) { |
72 | 0 | if (entry.flush_future.valid() && IsReady(entry.flush_future)) { |
73 | 0 | LOG(INFO) << "Flush done"; |
74 | 0 | RETURN_NOT_OK(entry.flush_future.get().status); |
75 | 0 | entry.commit_future = entry.txn->CommitFuture(); |
76 | 0 | } |
77 | 0 | } |
78 | |
|
79 | 0 | for (auto& entry : entries) { |
80 | 0 | if (entry.commit_future.valid() && IsReady(entry.commit_future)) { |
81 | 0 | LOG(INFO) << "Commit done"; |
82 | 0 | RETURN_NOT_OK(entry.commit_future.get()); |
83 | 0 | entry.done = true; |
84 | 0 | } |
85 | 0 | } |
86 | |
|
87 | 0 | for (const auto& entry : entries) { |
88 | 0 | if (!entry.done) { |
89 | 0 | return false; |
90 | 0 | } |
91 | 0 | } |
92 | |
|
93 | 0 | return true; |
94 | 0 | }, 10s, "Complete all operations")); |
95 | |
|
96 | 0 | for (const auto& entry : entries) { |
97 | 0 | ASSERT_EQ(entry.op->response().status(), QLResponsePB::YQL_STATUS_OK); |
98 | 0 | } |
99 | 0 | } |
100 | | |
101 | 0 | TEST_F(SerializableTxnTest, ReadWriteConflict) { |
102 | 0 | const auto kKeys = 20; |
103 | |
|
104 | 0 | size_t reads_won = 0, writes_won = 0; |
105 | 0 | for (int i = 0; i != kKeys; ++i) { |
106 | 0 | auto read_txn = CreateTransaction(); |
107 | 0 | auto read_session = CreateSession(read_txn); |
108 | 0 | auto read = ReadRow(read_session, i); |
109 | 0 | ASSERT_OK(read_session->Flush()); |
110 | |
|
111 | 0 | auto write_txn = CreateTransaction(); |
112 | 0 | auto write_session = CreateSession(write_txn); |
113 | 0 | auto write_status = ResultToStatus(WriteRow( |
114 | 0 | write_session, i, i, WriteOpType::INSERT, Flush::kTrue)); |
115 | |
|
116 | 0 | auto read_commit_future = read_txn->CommitFuture(); |
117 | 0 | if (write_status.ok()) { |
118 | 0 | write_status = write_txn->CommitFuture().get(); |
119 | 0 | } |
120 | 0 | auto read_status = read_commit_future.get(); |
121 | |
|
122 | 0 | LOG(INFO) << "Read: " << read_status << ", write: " << write_status; |
123 | |
|
124 | 0 | if (!read_status.ok()) { |
125 | 0 | ASSERT_OK(write_status); |
126 | 0 | ++writes_won; |
127 | 0 | } else { |
128 | 0 | ASSERT_NOK(write_status); |
129 | 0 | ++reads_won; |
130 | 0 | } |
131 | 0 | } |
132 | |
|
133 | 0 | LOG(INFO) << "Reads won: " << reads_won << ", writes won: " << writes_won; |
134 | 0 | ASSERT_GE(reads_won, kKeys / 4); |
135 | 0 | ASSERT_GE(writes_won, kKeys / 4); |
136 | 0 | } |
137 | | |
138 | | // Execute UPDATE table SET value = value + 1 WHERE key = kKey in parallel, using |
139 | | // serializable isolation. |
140 | | // With retries the resulting value should be equal to number of increments. |
141 | 0 | void SerializableTxnTest::TestIncrement(int key, bool transactional) { |
142 | 0 | const auto kIncrements = RegularBuildVsSanitizers(100, 20); |
143 | |
|
144 | 0 | { |
145 | 0 | auto session = CreateSession(); |
146 | 0 | auto op = ASSERT_RESULT(WriteRow(session, key, 0)); |
147 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK); |
148 | 0 | } |
149 | |
|
150 | 0 | struct Entry { |
151 | 0 | YBqlWriteOpPtr op; |
152 | 0 | YBTransactionPtr txn; |
153 | 0 | YBSessionPtr session; |
154 | 0 | std::shared_future<FlushStatus> write_future; |
155 | 0 | std::shared_future<Status> commit_future; |
156 | 0 | }; |
157 | |
|
158 | 0 | std::vector<Entry> entries; |
159 | |
|
160 | 0 | for (int i = 0; i != kIncrements; ++i) { |
161 | 0 | Entry entry; |
162 | 0 | entry.txn = transactional ? CreateTransaction() : nullptr; |
163 | 0 | entry.session = CreateSession(entry.txn, clock_); |
164 | 0 | entry.session->SetReadPoint(Restart::kFalse); |
165 | 0 | entries.push_back(entry); |
166 | 0 | } |
167 | | |
168 | | // For each of entries we do the following: |
169 | | // 1) Write increment operation. |
170 | | // 2) Wait until write complete and commit transaction of this entry. |
171 | | // 3) Wait until commit complete. |
172 | | // When failure happens on any step - retry from step 1. |
173 | | // Exit from loop when all entries successfully committed their transactions. |
174 | | // We do all actions in busy loop to get most possible concurrency for operations. |
175 | 0 | for (;;) { |
176 | 0 | bool incomplete = false; |
177 | 0 | for (auto& entry : entries) { |
178 | 0 | bool entry_complete = false; |
179 | 0 | if (!entry.op) { |
180 | | // Execute UPDATE table SET value = value + 1 WHERE key = kKey |
181 | 0 | entry.session->SetTransaction(entry.txn); |
182 | 0 | entry.op = ASSERT_RESULT(kv_table_test::Increment(&table_, entry.session, key)); |
183 | 0 | entry.write_future = entry.session->FlushFuture(); |
184 | 0 | } else if (entry.write_future.valid()) { |
185 | 0 | if (IsReady(entry.write_future)) { |
186 | 0 | auto write_status = entry.write_future.get().status; |
187 | 0 | entry.write_future = std::shared_future<FlushStatus>(); |
188 | 0 | if (!write_status.ok()) { |
189 | 0 | ASSERT_TRUE(write_status.IsTryAgain() || |
190 | 0 | ((write_status.IsTimedOut() || write_status.IsServiceUnavailable()) |
191 | 0 | && transactional)) << write_status; |
192 | 0 | entry.txn = transactional ? CreateTransaction() : nullptr; |
193 | 0 | entry.op = nullptr; |
194 | 0 | } else { |
195 | 0 | if (entry.op->response().status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) { |
196 | 0 | auto old_txn = entry.txn; |
197 | 0 | if (transactional) { |
198 | 0 | entry.txn = ASSERT_RESULT(entry.txn->CreateRestartedTransaction()); |
199 | 0 | } else { |
200 | 0 | entry.session->SetReadPoint(Restart::kTrue); |
201 | 0 | } |
202 | 0 | entry.op = nullptr; |
203 | 0 | } else { |
204 | 0 | ASSERT_EQ(entry.op->response().status(), QLResponsePB::YQL_STATUS_OK); |
205 | 0 | if (transactional) { |
206 | 0 | entry.commit_future = entry.txn->CommitFuture(); |
207 | 0 | } |
208 | 0 | } |
209 | 0 | } |
210 | 0 | } |
211 | 0 | } else if (entry.commit_future.valid()) { |
212 | 0 | if (IsReady(entry.commit_future)) { |
213 | 0 | auto status = entry.commit_future.get(); |
214 | 0 | if (status.IsExpired()) { |
215 | 0 | entry.txn = transactional ? CreateTransaction() : nullptr; |
216 | 0 | entry.op = nullptr; |
217 | 0 | } else { |
218 | 0 | ASSERT_OK(status); |
219 | 0 | entry.commit_future = std::shared_future<Status>(); |
220 | 0 | } |
221 | 0 | } |
222 | 0 | } else { |
223 | 0 | entry_complete = true; |
224 | 0 | } |
225 | 0 | incomplete = incomplete || !entry_complete; |
226 | 0 | } |
227 | 0 | if (!incomplete) { |
228 | 0 | break; |
229 | 0 | } |
230 | 0 | } |
231 | |
|
232 | 0 | auto value = ASSERT_RESULT(SelectRow(CreateSession(), key)); |
233 | 0 | ASSERT_EQ(value, kIncrements); |
234 | 0 | } |
235 | | |
236 | | // Execute UPDATE table SET value = value + 1 WHERE key = kKey in parallel, using |
237 | | // serializable isolation. |
238 | | // With retries the resulting value should be equal to number of increments. |
239 | 0 | void SerializableTxnTest::TestIncrements(bool transactional) { |
240 | 0 | FLAGS_transaction_rpc_timeout_ms = MonoDelta(1min).ToMilliseconds(); |
241 | |
|
242 | 0 | const auto kThreads = RegularBuildVsSanitizers(3, 2); |
243 | |
|
244 | 0 | std::vector<std::thread> threads; |
245 | 0 | while (threads.size() != kThreads) { |
246 | 0 | int key = narrow_cast<int>(threads.size()); |
247 | 0 | threads.emplace_back([this, key, transactional] { |
248 | 0 | CDSAttacher attacher; |
249 | 0 | TestIncrement(key, transactional); |
250 | 0 | }); |
251 | 0 | } |
252 | |
|
253 | 0 | for (auto& thread : threads) { |
254 | 0 | thread.join(); |
255 | 0 | } |
256 | 0 | } |
257 | | |
258 | 0 | TEST_F(SerializableTxnTest, Increment) { |
259 | 0 | TestIncrements(true /* transactional */); |
260 | 0 | } |
261 | | |
262 | 0 | TEST_F(SerializableTxnTest, IncrementNonTransactional) { |
263 | 0 | TestIncrements(false /* transactional */); |
264 | 0 | } |
265 | | |
266 | | // Test that repeats example from this article: |
267 | | // https://blogs.msdn.microsoft.com/craigfr/2007/05/16/serializable-vs-snapshot-isolation-level/ |
268 | | // |
269 | | // Multiple rows with values 0 and 1 are stored in table. |
270 | | // Two concurrent transaction fetches all rows from table and does the following. |
271 | | // First transaction changes value of all rows with value 0 to 1. |
272 | | // Second transaction changes value of all rows with value 1 to 0. |
273 | | // As outcome we should have rows with the same value. |
274 | | // |
275 | | // The described prodecure is repeated multiple times to increase probability of catching bug, |
276 | | // w/o running test multiple times. |
277 | 0 | void SerializableTxnTest::TestColoring() { |
278 | 0 | constexpr auto kKeys = 20; |
279 | 0 | constexpr auto kColors = 2; |
280 | 0 | constexpr auto kIterations = 20; |
281 | |
|
282 | 0 | size_t iterations_left = kIterations; |
283 | 0 | for (int i = 0; iterations_left > 0 && !testing::Test::HasFailure(); ++i) { |
284 | 0 | SCOPED_TRACE(Format("Iteration: $0", i)); |
285 | |
|
286 | 0 | auto session = CreateSession(nullptr /* transaction */, clock_); |
287 | 0 | session->SetForceConsistentRead(ForceConsistentRead::kTrue); |
288 | |
|
289 | 0 | { |
290 | 0 | std::vector<YBqlWriteOpPtr> ops; |
291 | 0 | for (int j = 0; j != kKeys; ++j) { |
292 | 0 | auto color = RandomUniformInt(0, kColors - 1); |
293 | 0 | ops.push_back(ASSERT_RESULT(WriteRow(session, |
294 | 0 | j, |
295 | 0 | color, |
296 | 0 | WriteOpType::INSERT, |
297 | 0 | Flush::kFalse))); |
298 | 0 | } |
299 | |
|
300 | 0 | ASSERT_OK(session->Flush()); |
301 | |
|
302 | 0 | for (const auto& op : ops) { |
303 | 0 | ASSERT_OK(CheckOp(op.get())); |
304 | 0 | } |
305 | 0 | } |
306 | |
|
307 | 0 | std::vector<std::thread> threads; |
308 | 0 | std::atomic<size_t> successes(0); |
309 | |
|
310 | 0 | while (threads.size() != kColors) { |
311 | 0 | int32_t color = narrow_cast<int32_t>(threads.size()); |
312 | 0 | threads.emplace_back([this, color, &successes, kKeys] { |
313 | 0 | CDSAttacher attacher; |
314 | 0 | for (;;) { |
315 | 0 | auto txn = CreateTransaction(); |
316 | 0 | LOG(INFO) << "Start: " << txn->id() << ", color: " << color; |
317 | 0 | auto session = CreateSession(txn); |
318 | 0 | session->SetTransaction(txn); |
319 | 0 | auto values = SelectAllRows(session); |
320 | 0 | if (!values.ok()) { |
321 | 0 | ASSERT_TRUE(values.status().IsTryAgain()) << values.status(); |
322 | 0 | continue; |
323 | 0 | } |
324 | 0 | ASSERT_EQ(values->size(), kKeys); |
325 | |
|
326 | 0 | std::vector<YBqlWriteOpPtr> ops; |
327 | 0 | for (const auto& p : *values) { |
328 | 0 | if (p.second == color) { |
329 | 0 | continue; |
330 | 0 | } |
331 | 0 | ops.push_back(ASSERT_RESULT(WriteRow( |
332 | 0 | session, p.first, color, WriteOpType::INSERT, Flush::kFalse))); |
333 | 0 | } |
334 | |
|
335 | 0 | if (ops.empty()) { |
336 | 0 | break; |
337 | 0 | } |
338 | | |
339 | 0 | auto flush_status = session->Flush(); |
340 | 0 | if (!flush_status.ok()) { |
341 | 0 | ASSERT_TRUE(flush_status.IsTryAgain()) << flush_status; |
342 | 0 | break; |
343 | 0 | } |
344 | | |
345 | 0 | for (const auto& op : ops) { |
346 | 0 | ASSERT_OK(CheckOp(op.get())); |
347 | 0 | } |
348 | |
|
349 | 0 | LOG(INFO) << "Commit: " << txn->id() << ", color: " << color; |
350 | 0 | auto commit_status = txn->CommitFuture().get(); |
351 | 0 | if (!commit_status.ok()) { |
352 | 0 | ASSERT_TRUE(commit_status.IsExpired()) << commit_status; |
353 | 0 | break; |
354 | 0 | } |
355 | | |
356 | 0 | ++successes; |
357 | 0 | break; |
358 | 0 | } |
359 | 0 | }); |
360 | 0 | } |
361 | |
|
362 | 0 | for (auto& thread : threads) { |
363 | 0 | thread.join(); |
364 | 0 | } |
365 | |
|
366 | 0 | if (successes == 0) { |
367 | 0 | continue; |
368 | 0 | } |
369 | | |
370 | 0 | session->SetReadPoint(Restart::kFalse); |
371 | 0 | auto values = ASSERT_RESULT(SelectAllRows(session)); |
372 | 0 | ASSERT_EQ(values.size(), kKeys); |
373 | 0 | LOG(INFO) << "Values: " << yb::ToString(values); |
374 | 0 | int32_t color = -1; |
375 | 0 | for (const auto& p : values) { |
376 | 0 | if (color == -1) { |
377 | 0 | color = p.second; |
378 | 0 | } else { |
379 | 0 | ASSERT_EQ(color, p.second); |
380 | 0 | } |
381 | 0 | } |
382 | 0 | --iterations_left; |
383 | 0 | } |
384 | 0 | } |
385 | | |
386 | 0 | TEST_F(SerializableTxnTest, Coloring) { |
387 | 0 | TestColoring(); |
388 | 0 | } |
389 | | |
390 | 0 | TEST_F(SerializableTxnTest, ColoringWithLongApply) { |
391 | 0 | FLAGS_txn_max_apply_batch_records = 3; |
392 | 0 | TestColoring(); |
393 | 0 | } |
394 | | |
395 | | } // namespace client |
396 | | } // namespace yb |