/Users/deen/code/yugabyte-db/src/yb/client/ql-transaction-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/client/error.h" |
17 | | #include "yb/client/schema.h" |
18 | | #include "yb/client/session.h" |
19 | | #include "yb/client/table.h" |
20 | | #include "yb/client/table_alterer.h" |
21 | | #include "yb/client/transaction.h" |
22 | | #include "yb/client/transaction_rpc.h" |
23 | | #include "yb/client/txn-test-base.h" |
24 | | #include "yb/client/yb_op.h" |
25 | | |
26 | | #include "yb/common/ql_value.h" |
27 | | |
28 | | #include "yb/consensus/consensus.h" |
29 | | #include "yb/consensus/log.h" |
30 | | |
31 | | #include "yb/rocksdb/db.h" |
32 | | |
33 | | #include "yb/rpc/rpc.h" |
34 | | |
35 | | #include "yb/tablet/tablet.h" |
36 | | #include "yb/tablet/tablet_bootstrap_if.h" |
37 | | #include "yb/tablet/tablet_peer.h" |
38 | | #include "yb/tablet/transaction_coordinator.h" |
39 | | |
40 | | #include "yb/tserver/mini_tablet_server.h" |
41 | | #include "yb/tserver/tablet_server.h" |
42 | | #include "yb/tserver/ts_tablet_manager.h" |
43 | | #include "yb/tserver/tserver_service.pb.h" |
44 | | |
45 | | #include "yb/util/async_util.h" |
46 | | #include "yb/util/random_util.h" |
47 | | #include "yb/util/scope_exit.h" |
48 | | #include "yb/util/size_literals.h" |
49 | | #include "yb/util/test_thread_holder.h" |
50 | | #include "yb/util/tsan_util.h" |
51 | | |
52 | | #include "yb/yql/cql/ql/util/errcodes.h" |
53 | | #include "yb/yql/cql/ql/util/statement_result.h" |
54 | | |
55 | | using namespace std::literals; |
56 | | |
57 | | using yb::tablet::GetTransactionTimeout; |
58 | | using yb::tablet::TabletPeer; |
59 | | |
60 | | DECLARE_bool(TEST_disable_proactive_txn_cleanup_on_abort); |
61 | | DECLARE_bool(TEST_fail_in_apply_if_no_metadata); |
62 | | DECLARE_bool(TEST_master_fail_transactional_tablet_lookups); |
63 | | DECLARE_bool(TEST_transaction_allow_rerequest_status); |
64 | | DECLARE_bool(delete_intents_sst_files); |
65 | | DECLARE_bool(enable_load_balancing); |
66 | | DECLARE_bool(fail_on_out_of_range_clock_skew); |
67 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
68 | | DECLARE_bool(rocksdb_disable_compactions); |
69 | | DECLARE_int32(TEST_delay_init_tablet_peer_ms); |
70 | | DECLARE_int32(log_min_seconds_to_retain); |
71 | | DECLARE_int32(remote_bootstrap_max_chunk_size); |
72 | | DECLARE_int64(transaction_rpc_timeout_ms); |
73 | | DECLARE_uint64(TEST_transaction_delay_status_reply_usec_in_tests); |
74 | | DECLARE_uint64(aborted_intent_cleanup_ms); |
75 | | DECLARE_uint64(max_clock_skew_usec); |
76 | | DECLARE_uint64(transaction_heartbeat_usec); |
77 | | |
78 | | namespace yb { |
79 | | namespace client { |
80 | | |
81 | | struct WriteConflictsOptions { |
82 | | bool do_restarts = false; |
83 | | size_t active_transactions = 50; |
84 | | int total_keys = 5; |
85 | | bool non_txn_writes = false; |
86 | | }; |
87 | | |
88 | | class QLTransactionTest : public TransactionTestBase<MiniCluster> { |
89 | | protected: |
90 | 44 | void SetUp() override { |
91 | 44 | SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION); |
92 | 44 | TransactionTestBase::SetUp(); |
93 | 44 | } |
94 | | |
95 | | // We write data with first transaction then try to read it another one. |
96 | | // If commit is true, then first transaction is committed and second should be restarted. |
97 | | // Otherwise second transaction would see pending intents from first one and should not restart. |
98 | | void TestReadRestart(bool commit = true); |
99 | | |
100 | | void TestWriteConflicts(const WriteConflictsOptions& options); |
101 | | |
102 | | void TestReadOnlyTablets(IsolationLevel isolation_level, |
103 | | bool perform_write, |
104 | | bool written_intents_expected); |
105 | | |
106 | 0 | CHECKED_STATUS WaitTransactionsCleaned() { |
107 | 0 | return WaitFor( |
108 | 0 | [this] { return !HasTransactions(); }, kTransactionApplyTime, "Transactions cleaned"); |
109 | 0 | } |
110 | | |
111 | 0 | CHECKED_STATUS WaitIntentsCleaned() { |
112 | 0 | return WaitFor( |
113 | 0 | [this] { return CountIntents(cluster_.get()) == 0; }, kIntentsCleanupTime, "Intents cleaned"); |
114 | 0 | } |
115 | | }; |
116 | | |
117 | | typedef TransactionCustomLogSegmentSizeTest<0, QLTransactionTest> |
118 | | QLTransactionBigLogSegmentSizeTest; |
119 | | |
120 | 0 | TEST_F(QLTransactionTest, Simple) { |
121 | 0 | ASSERT_NO_FATALS(WriteData()); |
122 | 0 | ASSERT_NO_FATALS(VerifyData()); |
123 | 0 | ASSERT_OK(cluster_->RestartSync()); |
124 | 0 | AssertNoRunningTransactions(); |
125 | 0 | } |
126 | | |
127 | 0 | TEST_F(QLTransactionTest, LookupTabletFailure) { |
128 | 0 | FLAGS_TEST_master_fail_transactional_tablet_lookups = true; |
129 | |
|
130 | 0 | auto txn = CreateTransaction(); |
131 | 0 | auto result = WriteRow(CreateSession(txn), 0 /* key */, 1 /* value */); |
132 | |
|
133 | 0 | ASSERT_TRUE(!result.ok() && result.status().IsTimedOut()) << "Result: " << AsString(result); |
134 | 0 | } |
135 | | |
136 | 0 | TEST_F(QLTransactionTest, ReadWithTimeInFuture) { |
137 | 0 | FLAGS_fail_on_out_of_range_clock_skew = false; |
138 | |
|
139 | 0 | WriteData(); |
140 | 0 | server::SkewedClockDeltaChanger delta_changer(100ms, skewed_clock_); |
141 | 0 | for (size_t i = 0; i != 100; ++i) { |
142 | 0 | auto transaction = CreateTransaction2(); |
143 | 0 | auto session = CreateSession(transaction); |
144 | 0 | VerifyRows(session); |
145 | 0 | } |
146 | 0 | ASSERT_OK(cluster_->RestartSync()); |
147 | 0 | AssertNoRunningTransactions(); |
148 | 0 | } |
149 | | |
150 | | TEST_F(QLTransactionTest, WriteSameKey) { |
151 | | ASSERT_NO_FATALS(WriteDataWithRepetition()); |
152 | | std::this_thread::sleep_for(1s); // Wait some time for intents to apply. |
153 | | ASSERT_NO_FATALS(VerifyData()); |
154 | | ASSERT_OK(cluster_->RestartSync()); |
155 | | } |
156 | | |
157 | 0 | TEST_F(QLTransactionTest, WriteSameKeyWithIntents) { |
158 | 0 | DisableApplyingIntents(); |
159 | |
|
160 | 0 | ASSERT_NO_FATALS(WriteDataWithRepetition()); |
161 | 0 | ASSERT_NO_FATALS(VerifyData()); |
162 | 0 | ASSERT_OK(cluster_->RestartSync()); |
163 | 0 | } |
164 | | |
165 | | // Commit flags says whether we should commit write txn during this test. |
166 | 0 | void QLTransactionTest::TestReadRestart(bool commit) { |
167 | 0 | SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec); |
168 | |
|
169 | 0 | { |
170 | 0 | auto write_txn = CreateTransaction(); |
171 | 0 | ASSERT_OK(WriteRows(CreateSession(write_txn))); |
172 | 0 | if (commit) { |
173 | 0 | ASSERT_OK(write_txn->CommitFuture().get()); |
174 | 0 | } |
175 | 0 | auto se = ScopeExit([write_txn, commit] { |
176 | 0 | if (!commit) { |
177 | 0 | write_txn->Abort(); |
178 | 0 | } |
179 | 0 | }); |
180 | |
|
181 | 0 | server::SkewedClockDeltaChanger delta_changer(-100ms, skewed_clock_); |
182 | |
|
183 | 0 | auto txn1 = CreateTransaction2(SetReadTime::kTrue); |
184 | 0 | auto se2 = ScopeExit([txn1, commit] { |
185 | 0 | if (!commit) { |
186 | 0 | txn1->Abort(); |
187 | 0 | } |
188 | 0 | }); |
189 | 0 | auto session = CreateSession(txn1); |
190 | 0 | if (commit) { |
191 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
192 | 0 | auto row = SelectRow(session, KeyForTransactionAndIndex(0, r)); |
193 | 0 | ASSERT_NOK(row); |
194 | 0 | ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(row.status())) |
195 | 0 | << "Bad row: " << row; |
196 | 0 | } |
197 | 0 | auto txn2 = ASSERT_RESULT(txn1->CreateRestartedTransaction()); |
198 | 0 | auto se = ScopeExit([txn2] { |
199 | 0 | txn2->Abort(); |
200 | 0 | }); |
201 | 0 | session->SetTransaction(txn2); |
202 | 0 | VerifyRows(session); |
203 | 0 | VerifyData(); |
204 | 0 | } else { |
205 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
206 | 0 | auto row = SelectRow(session, KeyForTransactionAndIndex(0, r)); |
207 | 0 | ASSERT_TRUE(!row.ok() && row.status().IsNotFound()) << "Bad row: " << row; |
208 | 0 | } |
209 | 0 | } |
210 | 0 | } |
211 | |
|
212 | 0 | ASSERT_OK(cluster_->RestartSync()); |
213 | 0 | } |
214 | | |
215 | 0 | TEST_F(QLTransactionTest, ReadRestart) { |
216 | 0 | TestReadRestart(); |
217 | 0 | AssertNoRunningTransactions(); |
218 | 0 | } |
219 | | |
220 | 0 | TEST_F(QLTransactionTest, ReadRestartWithIntents) { |
221 | 0 | DisableApplyingIntents(); |
222 | 0 | TestReadRestart(); |
223 | 0 | } |
224 | | |
225 | 0 | TEST_F(QLTransactionTest, ReadRestartWithPendingIntents) { |
226 | 0 | FLAGS_TEST_transaction_allow_rerequest_status = false; |
227 | 0 | DisableApplyingIntents(); |
228 | 0 | TestReadRestart(false /* commit */); |
229 | 0 | } |
230 | | |
231 | | // Non transactional restart happens in server, so we just checking that we read correct values. |
232 | | // Skewed clocks are used because there could be case when applied intents or commit transaction |
233 | | // has time greater than max safetime to read, that causes restart. |
234 | 0 | TEST_F(QLTransactionTest, ReadRestartNonTransactional) { |
235 | 0 | const auto kClockSkew = 500ms; |
236 | |
|
237 | 0 | SetAtomicFlag(1000000ULL, &FLAGS_max_clock_skew_usec); |
238 | 0 | DisableTransactionTimeout(); |
239 | |
|
240 | 0 | auto delta_changers = SkewClocks(cluster_.get(), kClockSkew); |
241 | 0 | constexpr size_t kTotalTransactions = 10; |
242 | |
|
243 | 0 | for (size_t i = 0; i != kTotalTransactions; ++i) { |
244 | 0 | SCOPED_TRACE(Format("Transaction $0", i)); |
245 | 0 | auto txn = CreateTransaction(); |
246 | 0 | ASSERT_OK(WriteRows(CreateSession(txn), i)); |
247 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
248 | 0 | ASSERT_NO_FATALS(VerifyRows(CreateSession(), i)); |
249 | | |
250 | | // We propagate hybrid time, so when commit and read finishes, all servers has about the same |
251 | | // physical component. We are waiting double skew, until time on servers became skewed again. |
252 | 0 | std::this_thread::sleep_for(kClockSkew * 2); |
253 | 0 | } |
254 | |
|
255 | 0 | cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back. |
256 | 0 | cluster_.reset(); |
257 | 0 | } |
258 | | |
259 | 0 | TEST_F(QLTransactionTest, WriteRestart) { |
260 | 0 | SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec); |
261 | |
|
262 | 0 | const std::string kExtraColumn = "v2"; |
263 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
264 | 0 | table_alterer->AddColumn(kExtraColumn)->Type(DataType::INT32); |
265 | 0 | ASSERT_OK(table_alterer->Alter()); |
266 | |
|
267 | 0 | ASSERT_OK(table_.Open(kTableName, client_.get())); // Reopen to update schema version. |
268 | |
|
269 | 0 | WriteData(); |
270 | |
|
271 | 0 | server::SkewedClockDeltaChanger delta_changer(-100ms, skewed_clock_); |
272 | 0 | auto txn1 = CreateTransaction2(SetReadTime::kTrue); |
273 | 0 | YBTransactionPtr txn2; |
274 | 0 | auto session = CreateSession(txn1); |
275 | 0 | for (bool retry : {false, true}) { |
276 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
277 | 0 | const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE); |
278 | 0 | auto* const req = op->mutable_request(); |
279 | 0 | auto key = KeyForTransactionAndIndex(0, r); |
280 | 0 | auto old_value = ValueForTransactionAndIndex(0, r, WriteOpType::INSERT); |
281 | 0 | auto value = ValueForTransactionAndIndex(0, r, WriteOpType::UPDATE); |
282 | 0 | QLAddInt32HashValue(req, key); |
283 | 0 | table_.AddInt32ColumnValue(req, kExtraColumn, value); |
284 | 0 | auto cond = req->mutable_where_expr()->mutable_condition(); |
285 | 0 | table_.SetInt32Condition(cond, kValueColumn, QLOperator::QL_OP_EQUAL, old_value); |
286 | 0 | req->mutable_column_refs()->add_ids(table_.ColumnId(kValueColumn)); |
287 | 0 | LOG(INFO) << "Updating value"; |
288 | 0 | auto status = session->ApplyAndFlush(op); |
289 | 0 | ASSERT_OK(status); |
290 | 0 | if (!retry) { |
291 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR, op->response().status()); |
292 | 0 | } else { |
293 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status()); |
294 | 0 | } |
295 | 0 | } |
296 | 0 | if (!retry) { |
297 | 0 | txn2 = ASSERT_RESULT(txn1->CreateRestartedTransaction()); |
298 | 0 | session->SetTransaction(txn2); |
299 | 0 | } |
300 | 0 | } |
301 | 0 | txn2->CommitFuture().wait(); |
302 | 0 | VerifyData(); |
303 | 0 | VerifyData(1, WriteOpType::UPDATE, kExtraColumn); |
304 | |
|
305 | 0 | ASSERT_OK(cluster_->RestartSync()); |
306 | 0 | AssertNoRunningTransactions(); |
307 | 0 | } |
308 | | |
309 | | // Check that we could write to transaction that were restarted. |
310 | 0 | TEST_F(QLTransactionTest, WriteAfterReadRestart) { |
311 | 0 | const auto kClockDelta = 100ms; |
312 | 0 | SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec); |
313 | |
|
314 | 0 | auto write_txn = CreateTransaction(); |
315 | 0 | ASSERT_OK(WriteRows(CreateSession(write_txn))); |
316 | 0 | ASSERT_OK(write_txn->CommitFuture().get()); |
317 | |
|
318 | 0 | server::SkewedClockDeltaChanger delta_changer(-kClockDelta, skewed_clock_); |
319 | |
|
320 | 0 | auto txn1 = CreateTransaction2(SetReadTime::kTrue); |
321 | 0 | auto session = CreateSession(txn1); |
322 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
323 | 0 | auto row = SelectRow(session, KeyForTransactionAndIndex(0, r)); |
324 | 0 | ASSERT_NOK(row); |
325 | 0 | ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(row.status())) |
326 | 0 | << "Bad row: " << row; |
327 | 0 | } |
328 | 0 | { |
329 | | // To reset clock back. |
330 | 0 | auto temp_delta_changed = std::move(delta_changer); |
331 | 0 | } |
332 | 0 | auto txn2 = ASSERT_RESULT(txn1->CreateRestartedTransaction()); |
333 | 0 | session->SetTransaction(txn2); |
334 | 0 | VerifyRows(session); |
335 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
336 | 0 | auto result = WriteRow( |
337 | 0 | session, KeyForTransactionAndIndex(0, r), |
338 | 0 | ValueForTransactionAndIndex(0, r, WriteOpType::UPDATE), WriteOpType::UPDATE); |
339 | 0 | ASSERT_OK(result); |
340 | 0 | } |
341 | |
|
342 | 0 | txn2->Abort(); |
343 | |
|
344 | 0 | VerifyData(); |
345 | 0 | } |
346 | | |
347 | 0 | TEST_F(QLTransactionTest, Child) { |
348 | 0 | auto txn = CreateTransaction(); |
349 | 0 | TransactionManager manager2(client_.get(), clock_, client::LocalTabletFilter()); |
350 | 0 | auto data_pb = txn->PrepareChildFuture(ForceConsistentRead::kFalse).get(); |
351 | 0 | ASSERT_OK(data_pb); |
352 | 0 | auto data = ChildTransactionData::FromPB(*data_pb); |
353 | 0 | ASSERT_OK(data); |
354 | 0 | auto txn2 = std::make_shared<YBTransaction>(&manager2, std::move(*data)); |
355 | |
|
356 | 0 | ASSERT_OK(WriteRows(CreateSession(txn2), 0)); |
357 | 0 | auto result = txn2->FinishChild(); |
358 | 0 | ASSERT_OK(result); |
359 | 0 | ASSERT_OK(txn->ApplyChildResult(*result)); |
360 | |
|
361 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
362 | |
|
363 | 0 | ASSERT_NO_FATALS(VerifyData()); |
364 | 0 | ASSERT_OK(cluster_->RestartSync()); |
365 | 0 | AssertNoRunningTransactions(); |
366 | 0 | } |
367 | | |
368 | 0 | TEST_F(QLTransactionTest, ChildReadRestart) { |
369 | 0 | SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec); |
370 | |
|
371 | 0 | { |
372 | 0 | auto write_txn = CreateTransaction(); |
373 | 0 | ASSERT_OK(WriteRows(CreateSession(write_txn))); |
374 | 0 | ASSERT_OK(write_txn->CommitFuture().get()); |
375 | 0 | } |
376 | |
|
377 | 0 | server::SkewedClockDeltaChanger delta_changer(-100ms, skewed_clock_); |
378 | 0 | auto parent_txn = CreateTransaction2(SetReadTime::kTrue); |
379 | |
|
380 | 0 | auto data_pb = parent_txn->PrepareChildFuture(ForceConsistentRead::kFalse).get(); |
381 | 0 | ASSERT_OK(data_pb); |
382 | 0 | auto data = ChildTransactionData::FromPB(*data_pb); |
383 | 0 | ASSERT_OK(data); |
384 | |
|
385 | 0 | server::ClockPtr clock3(new server::HybridClock(skewed_clock_)); |
386 | 0 | ASSERT_OK(clock3->Init()); |
387 | 0 | TransactionManager manager3(client_.get(), clock3, client::LocalTabletFilter()); |
388 | 0 | auto child_txn = std::make_shared<YBTransaction>(&manager3, std::move(*data)); |
389 | |
|
390 | 0 | auto session = CreateSession(child_txn); |
391 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
392 | 0 | auto row = SelectRow(session, KeyForTransactionAndIndex(0, r)); |
393 | 0 | ASSERT_NOK(row); |
394 | 0 | ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(row.status())) |
395 | 0 | << "Bad row: " << row; |
396 | 0 | } |
397 | |
|
398 | 0 | auto result = child_txn->FinishChild(); |
399 | 0 | ASSERT_OK(result); |
400 | 0 | ASSERT_OK(parent_txn->ApplyChildResult(*result)); |
401 | |
|
402 | 0 | auto master2_txn = ASSERT_RESULT(parent_txn->CreateRestartedTransaction()); |
403 | 0 | session->SetTransaction(master2_txn); |
404 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
405 | 0 | auto row = SelectRow(session, KeyForTransactionAndIndex(0, r)); |
406 | 0 | ASSERT_OK(row); |
407 | 0 | ASSERT_EQ(ValueForTransactionAndIndex(0, r, WriteOpType::INSERT), *row); |
408 | 0 | } |
409 | 0 | ASSERT_NO_FATALS(VerifyData()); |
410 | |
|
411 | 0 | ASSERT_OK(cluster_->RestartSync()); |
412 | 0 | AssertNoRunningTransactions(); |
413 | 0 | } |
414 | | |
415 | 0 | TEST_F(QLTransactionTest, InsertUpdate) { |
416 | 0 | DisableApplyingIntents(); |
417 | 0 | WriteData(); // Add data |
418 | 0 | WriteData(); // Update data |
419 | 0 | VerifyData(); |
420 | 0 | ASSERT_OK(cluster_->RestartSync()); |
421 | 0 | } |
422 | | |
423 | 0 | TEST_F(QLTransactionTest, Cleanup) { |
424 | 0 | WriteData(); |
425 | 0 | VerifyData(); |
426 | | |
427 | | // Wait transaction apply. Otherwise count could be non zero. |
428 | 0 | ASSERT_OK(WaitTransactionsCleaned()); |
429 | 0 | VerifyData(); |
430 | 0 | ASSERT_OK(cluster_->RestartSync()); |
431 | 0 | AssertNoRunningTransactions(); |
432 | 0 | } |
433 | | |
434 | 0 | TEST_F(QLTransactionTest, Heartbeat) { |
435 | 0 | auto txn = CreateTransaction(); |
436 | 0 | auto session = CreateSession(txn); |
437 | 0 | ASSERT_OK(WriteRows(session)); |
438 | 0 | std::this_thread::sleep_for(GetTransactionTimeout() * 2); |
439 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
440 | 0 | VerifyData(); |
441 | 0 | AssertNoRunningTransactions(); |
442 | 0 | } |
443 | | |
444 | 0 | TEST_F(QLTransactionTest, Expire) { |
445 | 0 | SetDisableHeartbeatInTests(true); |
446 | 0 | auto txn = CreateTransaction(); |
447 | 0 | auto session = CreateSession(txn); |
448 | 0 | ASSERT_OK(WriteRows(session)); |
449 | 0 | std::this_thread::sleep_for(GetTransactionTimeout() * 2); |
450 | 0 | auto commit_status = txn->CommitFuture().get(); |
451 | 0 | ASSERT_TRUE(commit_status.IsExpired()) << "Bad status: " << commit_status; |
452 | 0 | std::this_thread::sleep_for(std::chrono::microseconds(FLAGS_transaction_heartbeat_usec * 2)); |
453 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
454 | 0 | ASSERT_FALSE(HasTransactions()); |
455 | 0 | } |
456 | | |
457 | 0 | TEST_F(QLTransactionTest, PreserveLogs) { |
458 | 0 | FLAGS_transaction_rpc_timeout_ms = 60000; |
459 | 0 | SetDisableHeartbeatInTests(true); |
460 | 0 | DisableTransactionTimeout(); |
461 | 0 | std::vector<std::shared_ptr<YBTransaction>> transactions; |
462 | 0 | constexpr size_t kTransactions = 20; |
463 | 0 | for (size_t i = 0; i != kTransactions; ++i) { |
464 | 0 | auto txn = CreateTransaction(); |
465 | 0 | auto session = CreateSession(txn); |
466 | 0 | ASSERT_OK(WriteRows(session, i)); |
467 | 0 | transactions.push_back(std::move(txn)); |
468 | 0 | std::this_thread::sleep_for(100ms); |
469 | 0 | } |
470 | 0 | LOG(INFO) << "Request clean"; |
471 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
472 | 0 | ASSERT_OK(cluster_->RestartSync()); |
473 | 0 | CountDownLatch latch(kTransactions); |
474 | 0 | for (auto& transaction : transactions) { |
475 | 0 | transaction->Commit([&latch](const Status& status) { |
476 | 0 | EXPECT_OK(status); |
477 | 0 | latch.CountDown(); |
478 | 0 | }); |
479 | 0 | } |
480 | 0 | latch.Wait(); |
481 | 0 | VerifyData(kTransactions); |
482 | 0 | AssertNoRunningTransactions(); |
483 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
484 | 0 | uint64_t max_active_segment_sequence_number = 0; |
485 | 0 | for (const auto& peer : peers) { |
486 | 0 | if (peer->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
487 | 0 | continue; |
488 | 0 | } |
489 | 0 | auto current_active_segment_sequence_number = peer->log()->active_segment_sequence_number(); |
490 | 0 | LOG(INFO) << peer->LogPrefix() << "active segment: " |
491 | 0 | << current_active_segment_sequence_number; |
492 | 0 | max_active_segment_sequence_number = std::max( |
493 | 0 | max_active_segment_sequence_number, current_active_segment_sequence_number); |
494 | 0 | } |
495 | | |
496 | | // Ensure that we had enough log segments, otherwise this test is pretty useless. |
497 | 0 | ASSERT_GE(max_active_segment_sequence_number, kTransactions / 4); |
498 | 0 | } |
499 | | |
500 | 0 | TEST_F(QLTransactionTest, ResendApplying) { |
501 | 0 | DisableApplyingIntents(); |
502 | 0 | WriteData(); |
503 | 0 | std::this_thread::sleep_for(5s); // Transaction should not be applied here. |
504 | 0 | ASSERT_TRUE(HasTransactions()); |
505 | |
|
506 | 0 | SetIgnoreApplyingProbability(0.0); |
507 | |
|
508 | 0 | ASSERT_OK(WaitTransactionsCleaned()); |
509 | 0 | VerifyData(); |
510 | 0 | ASSERT_OK(cluster_->RestartSync()); |
511 | 0 | AssertNoRunningTransactions(); |
512 | 0 | } |
513 | | |
514 | 0 | TEST_F(QLTransactionTest, ConflictResolution) { |
515 | 0 | constexpr int kTotalTransactions = 5; |
516 | 0 | constexpr int kNumRows = 10; |
517 | 0 | std::vector<YBTransactionPtr> transactions; |
518 | 0 | std::vector<YBSessionPtr> sessions; |
519 | 0 | std::vector<std::vector<YBqlWriteOpPtr>> write_ops(kTotalTransactions); |
520 | |
|
521 | 0 | CountDownLatch latch(kTotalTransactions); |
522 | 0 | for (int i = 0; i != kTotalTransactions; ++i) { |
523 | 0 | transactions.push_back(CreateTransaction()); |
524 | 0 | auto session = CreateSession(transactions.back()); |
525 | 0 | sessions.push_back(session); |
526 | 0 | for (int r = 0; r != kNumRows; ++r) { |
527 | 0 | write_ops[i].push_back(ASSERT_RESULT(WriteRow( |
528 | 0 | sessions.back(), r, i, WriteOpType::INSERT, Flush::kFalse))); |
529 | 0 | } |
530 | 0 | session->FlushAsync([&latch](FlushStatus* flush_status) { latch.CountDown(); }); |
531 | 0 | } |
532 | 0 | latch.Wait(); |
533 | |
|
534 | 0 | latch.Reset(transactions.size()); |
535 | 0 | std::atomic<size_t> successes(0); |
536 | 0 | std::atomic<size_t> failures(0); |
537 | |
|
538 | 0 | for (size_t i = 0; i != kTotalTransactions; ++i) { |
539 | 0 | bool success = true; |
540 | 0 | for (auto& op : write_ops[i]) { |
541 | 0 | if (!op->succeeded()) { |
542 | 0 | success = false; |
543 | 0 | break; |
544 | 0 | } |
545 | 0 | } |
546 | 0 | if (!success) { |
547 | 0 | failures.fetch_add(1, std::memory_order_release); |
548 | 0 | latch.CountDown(1); |
549 | 0 | continue; |
550 | 0 | } |
551 | 0 | transactions[i]->Commit([&latch, &successes, &failures](const Status& status) { |
552 | 0 | if (status.ok()) { |
553 | 0 | successes.fetch_add(1, std::memory_order_release); |
554 | 0 | } else { |
555 | 0 | failures.fetch_add(1, std::memory_order_release); |
556 | 0 | } |
557 | 0 | latch.CountDown(1); |
558 | 0 | }); |
559 | 0 | } |
560 | |
|
561 | 0 | latch.Wait(); |
562 | 0 | LOG(INFO) << "Committed, successes: " << successes.load() << ", failures: " << failures.load(); |
563 | |
|
564 | 0 | ASSERT_GE(successes.load(std::memory_order_acquire), 1); |
565 | 0 | ASSERT_GE(failures.load(std::memory_order_acquire), 1); |
566 | |
|
567 | 0 | auto session = CreateSession(); |
568 | 0 | std::vector<int32_t> values; |
569 | 0 | for (int r = 0; r != kNumRows; ++r) { |
570 | 0 | auto row = SelectRow(session, r); |
571 | 0 | ASSERT_OK(row); |
572 | 0 | values.push_back(*row); |
573 | 0 | } |
574 | 0 | for (const auto& value : values) { |
575 | 0 | ASSERT_EQ(values.front(), value) << "Values: " << yb::ToString(values); |
576 | 0 | } |
577 | 0 | } |
578 | | |
579 | 0 | TEST_F(QLTransactionTest, SimpleWriteConflict) { |
580 | 0 | auto transaction = CreateTransaction(); |
581 | 0 | ASSERT_OK(WriteRows(CreateSession(transaction))); |
582 | 0 | ASSERT_OK(WriteRows(CreateSession())); |
583 | |
|
584 | 0 | ASSERT_NOK(transaction->CommitFuture().get()); |
585 | 0 | } |
586 | | |
587 | | void QLTransactionTest::TestReadOnlyTablets(IsolationLevel isolation_level, |
588 | | bool perform_write, |
589 | 0 | bool written_intents_expected) { |
590 | 0 | SetIsolationLevel(isolation_level); |
591 | |
|
592 | 0 | YBTransactionPtr txn = CreateTransaction(); |
593 | 0 | YBSessionPtr session = CreateSession(txn); |
594 | |
|
595 | 0 | ReadRow(session, 0 /* key */); |
596 | 0 | if (perform_write) { |
597 | 0 | ASSERT_OK(WriteRow(session, 1 /* key */, 1 /* value */, WriteOpType::INSERT, Flush::kFalse)); |
598 | 0 | } |
599 | 0 | ASSERT_OK(session->Flush()); |
600 | | |
601 | | // Verify intents were written if expected. |
602 | 0 | if (written_intents_expected) { |
603 | 0 | ASSERT_GT(CountIntents(cluster_.get()), 0); |
604 | 0 | } else { |
605 | 0 | ASSERT_EQ(CountIntents(cluster_.get()), 0); |
606 | 0 | } |
607 | | |
608 | | // Commit and verify transaction and intents were applied/cleaned up. |
609 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
610 | 0 | ASSERT_OK(WaitTransactionsCleaned()); |
611 | 0 | ASSERT_OK(WaitIntentsCleaned()); |
612 | 0 | } |
613 | | |
614 | 0 | TEST_F(QLTransactionTest, ReadOnlyTablets) { |
615 | 0 | FLAGS_TEST_fail_in_apply_if_no_metadata = true; |
616 | | |
617 | | // In snapshot isolation, tablets only read from will not have metadata written, so applying |
618 | | // intents on this tablet would cause the test to fail. |
619 | 0 | TestReadOnlyTablets(IsolationLevel::SNAPSHOT_ISOLATION, |
620 | 0 | false /* perform_write */, |
621 | 0 | false /* written_intents_expected */); |
622 | | |
623 | | // Writes always write intents, so metadata should be written and intents should be applied |
624 | | // on this tablet. |
625 | 0 | TestReadOnlyTablets(IsolationLevel::SNAPSHOT_ISOLATION, |
626 | 0 | true /* perform_write */, |
627 | 0 | true /* written_intents_expected */); |
628 | | |
629 | | // In serializable isolation, reads write intents, so metadata should be written and intents |
630 | | // should be applied on this tablet. |
631 | 0 | TestReadOnlyTablets(IsolationLevel::SERIALIZABLE_ISOLATION, |
632 | 0 | false /* perform_write */, |
633 | 0 | true /* written_intents_expected */); |
634 | 0 | } |
635 | | |
636 | 0 | void QLTransactionTest::TestWriteConflicts(const WriteConflictsOptions& options) { |
637 | 0 | struct ActiveTransaction { |
638 | 0 | YBTransactionPtr transaction; |
639 | 0 | YBSessionPtr session; |
640 | 0 | std::future<FlushStatus> flush_future; |
641 | 0 | std::future<Status> commit_future; |
642 | |
|
643 | 0 | std::string ToString() const { |
644 | 0 | ANNOTATE_IGNORE_READS_BEGIN(); |
645 | 0 | auto str = transaction ? transaction->ToString() : "no-txn"; |
646 | 0 | ANNOTATE_IGNORE_READS_END(); |
647 | 0 | return str; |
648 | 0 | } |
649 | 0 | }; |
650 | |
|
651 | 0 | constexpr auto kTestTime = 60s; |
652 | 0 | std::vector<ActiveTransaction> active_transactions; |
653 | |
|
654 | 0 | auto stop = std::chrono::steady_clock::now() + kTestTime; |
655 | |
|
656 | 0 | std::thread restart_thread; |
657 | |
|
658 | 0 | if (options.do_restarts) { |
659 | 0 | restart_thread = std::thread([this, stop] { |
660 | 0 | CDSAttacher attacher; |
661 | 0 | int it = 0; |
662 | 0 | while (std::chrono::steady_clock::now() < stop) { |
663 | 0 | std::this_thread::sleep_for(5s); |
664 | 0 | ASSERT_OK(cluster_->mini_tablet_server(++it % cluster_->num_tablet_servers())->Restart()); |
665 | 0 | } |
666 | 0 | }); |
667 | 0 | } |
668 | |
|
669 | 0 | int value = 0; |
670 | 0 | size_t tries = 0; |
671 | 0 | size_t written = 0; |
672 | 0 | size_t flushed = 0; |
673 | 0 | for (;;) { |
674 | 0 | auto expired = std::chrono::steady_clock::now() >= stop; |
675 | 0 | if (expired) { |
676 | 0 | if (active_transactions.empty()) { |
677 | 0 | break; |
678 | 0 | } |
679 | 0 | LOG(INFO) << "Time expired, remaining transactions: " << active_transactions.size(); |
680 | 0 | for (const auto& txn : active_transactions) { |
681 | 0 | LOG(INFO) << "TXN: " << txn.ToString() << ", " |
682 | 0 | << (!txn.commit_future.valid() ? "Flushing" : "Committing"); |
683 | 0 | } |
684 | 0 | } |
685 | 0 | while (!expired && active_transactions.size() < options.active_transactions) { |
686 | 0 | auto key = RandomUniformInt<int>(1, options.total_keys); |
687 | 0 | ActiveTransaction active_txn; |
688 | 0 | if (!options.non_txn_writes || RandomUniformBool()) { |
689 | 0 | active_txn.transaction = CreateTransaction(); |
690 | 0 | } |
691 | 0 | active_txn.session = CreateSession(active_txn.transaction); |
692 | 0 | const auto op = table_.NewInsertOp(); |
693 | 0 | auto* const req = op->mutable_request(); |
694 | 0 | QLAddInt32HashValue(req, key); |
695 | 0 | const auto val = ++value; |
696 | 0 | table_.AddInt32ColumnValue(req, kValueColumn, val); |
697 | 0 | LOG(INFO) << "TXN: " << active_txn.ToString() << " write " << key << " = " << val; |
698 | 0 | active_txn.session->Apply(op); |
699 | 0 | active_txn.flush_future = active_txn.session->FlushFuture(); |
700 | |
|
701 | 0 | ++tries; |
702 | 0 | active_transactions.push_back(std::move(active_txn)); |
703 | 0 | } |
704 | |
|
705 | 0 | auto w = active_transactions.begin(); |
706 | 0 | for (auto i = active_transactions.begin(); i != active_transactions.end(); ++i) { |
707 | 0 | if (!i->commit_future.valid()) { |
708 | 0 | if (IsReady(i->flush_future)) { |
709 | 0 | auto flush_status = i->flush_future.get().status; |
710 | 0 | if (!flush_status.ok()) { |
711 | 0 | LOG(INFO) << "TXN: " << i->ToString() << ", flush failed: " << flush_status; |
712 | 0 | continue; |
713 | 0 | } |
714 | 0 | ++flushed; |
715 | 0 | LOG(INFO) << "TXN: " << i->ToString() << ", flushed"; |
716 | 0 | if (!i->transaction) { |
717 | 0 | ++written; |
718 | 0 | continue; |
719 | 0 | } |
720 | 0 | i->commit_future = i->transaction->CommitFuture(); |
721 | 0 | } |
722 | 0 | } else if (IsReady(i->commit_future)) { |
723 | 0 | auto commit_status = i->commit_future.get(); |
724 | 0 | if (!commit_status.ok()) { |
725 | 0 | LOG(INFO) << "TXN: " << i->ToString() << ", commit failed: " << commit_status; |
726 | 0 | continue; |
727 | 0 | } |
728 | 0 | LOG(INFO) << "TXN: " << i->ToString() << ", committed"; |
729 | 0 | ++written; |
730 | 0 | continue; |
731 | 0 | } |
732 | | |
733 | 0 | if (w != i) { |
734 | 0 | *w = std::move(*i); |
735 | 0 | } |
736 | 0 | ++w; |
737 | 0 | } |
738 | 0 | active_transactions.erase(w, active_transactions.end()); |
739 | |
|
740 | 0 | std::this_thread::sleep_for(expired ? 1s : 100ms); |
741 | 0 | } |
742 | |
|
743 | 0 | if (options.do_restarts) { |
744 | 0 | restart_thread.join(); |
745 | 0 | } |
746 | |
|
747 | 0 | LOG(INFO) << "Written: " << written << ", flushed: " << flushed << ", tries: " << tries; |
748 | |
|
749 | 0 | ASSERT_GE(written, options.total_keys); |
750 | 0 | ASSERT_GT(flushed, written); |
751 | 0 | ASSERT_GT(flushed, options.active_transactions); |
752 | 0 | ASSERT_GT(tries, flushed); |
753 | 0 | } |
754 | | |
755 | 0 | TEST_F_EX(QLTransactionTest, WriteConflicts, QLTransactionBigLogSegmentSizeTest) { |
756 | 0 | WriteConflictsOptions options = { |
757 | 0 | .do_restarts = false, |
758 | 0 | }; |
759 | 0 | TestWriteConflicts(options); |
760 | 0 | } |
761 | | |
762 | 0 | TEST_F_EX(QLTransactionTest, WriteConflictsWithRestarts, QLTransactionBigLogSegmentSizeTest) { |
763 | 0 | WriteConflictsOptions options = { |
764 | 0 | .do_restarts = true, |
765 | 0 | }; |
766 | 0 | TestWriteConflicts(options); |
767 | 0 | } |
768 | | |
769 | 0 | TEST_F_EX(QLTransactionTest, MixedWriteConflicts, QLTransactionBigLogSegmentSizeTest) { |
770 | 0 | WriteConflictsOptions options = { |
771 | 0 | .do_restarts = false, |
772 | 0 | .active_transactions = 3, |
773 | 0 | .total_keys = 1, |
774 | 0 | .non_txn_writes = true, |
775 | 0 | }; |
776 | 0 | TestWriteConflicts(options); |
777 | 0 | } |
778 | | |
779 | 0 | TEST_F(QLTransactionTest, ResolveIntentsWriteReadUpdateRead) { |
780 | 0 | DisableApplyingIntents(); |
781 | |
|
782 | 0 | WriteData(); |
783 | 0 | VerifyData(); |
784 | |
|
785 | 0 | WriteData(WriteOpType::UPDATE); |
786 | 0 | VerifyData(1, WriteOpType::UPDATE); |
787 | |
|
788 | 0 | ASSERT_OK(cluster_->RestartSync()); |
789 | 0 | } |
790 | | |
791 | 0 | TEST_F(QLTransactionTest, ResolveIntentsWriteReadWithinTransactionAndRollback) { |
792 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
793 | 0 | DisableApplyingIntents(); |
794 | | |
795 | | // Write { 1 -> 1, 2 -> 2 }. |
796 | 0 | { |
797 | 0 | auto session = CreateSession(); |
798 | 0 | ASSERT_OK(WriteRow(session, 1, 1)); |
799 | 0 | ASSERT_OK(WriteRow(session, 2, 2)); |
800 | 0 | } |
801 | |
|
802 | 0 | { |
803 | | // Start T1. |
804 | 0 | auto txn = CreateTransaction(); |
805 | 0 | auto session = CreateSession(txn); |
806 | | |
807 | | // T1: Update { 1 -> 11, 2 -> 12 }. |
808 | 0 | ASSERT_OK(UpdateRow(session, 1, 11)); |
809 | 0 | ASSERT_OK(UpdateRow(session, 2, 12)); |
810 | | |
811 | | // T1: Should read { 1 -> 11, 2 -> 12 }. |
812 | 0 | VERIFY_ROW(session, 1, 11); |
813 | 0 | VERIFY_ROW(session, 2, 12); |
814 | | |
815 | | // Need to wait transaction to be replicated to all tablet replicas, otherwise direct intents |
816 | | // cleanup could not happen. |
817 | 0 | ASSERT_OK(WaitFor([this] { |
818 | 0 | return CountRunningTransactions() == 6; |
819 | 0 | }, 10s, "Wait transactions replicated to all tablet replicas")); |
820 | |
|
821 | 0 | txn->Abort(); |
822 | 0 | } |
823 | |
|
824 | 0 | ASSERT_OK(WaitTransactionsCleaned()); |
825 | | |
826 | | // Should read { 1 -> 1, 2 -> 2 }, since T1 has been aborted. |
827 | 0 | { |
828 | 0 | auto session = CreateSession(); |
829 | 0 | VERIFY_ROW(session, 1, 1); |
830 | 0 | VERIFY_ROW(session, 2, 2); |
831 | 0 | } |
832 | |
|
833 | 0 | ASSERT_OK(WaitIntentsCleaned()); |
834 | |
|
835 | 0 | ASSERT_OK(cluster_->RestartSync()); |
836 | 0 | } |
837 | | |
838 | 0 | TEST_F(QLTransactionTest, CheckCompactionAbortCleanup) { |
839 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
840 | 0 | FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true; |
841 | 0 | FLAGS_aborted_intent_cleanup_ms = 1000; // 1 sec |
842 | | |
843 | | // Write { 1 -> 1, 2 -> 2 }. |
844 | 0 | { |
845 | 0 | auto session = CreateSession(); |
846 | 0 | ASSERT_OK(WriteRow(session, 1, 1)); |
847 | 0 | ASSERT_OK(WriteRow(session, 2, 2)); |
848 | 0 | } |
849 | |
|
850 | 0 | { |
851 | | // Start T1. |
852 | 0 | auto txn = CreateTransaction(); |
853 | 0 | auto session = CreateSession(txn); |
854 | | |
855 | | // T1: Update { 1 -> 11, 2 -> 12 }. |
856 | 0 | ASSERT_OK(UpdateRow(session, 1, 11)); |
857 | 0 | ASSERT_OK(UpdateRow(session, 2, 12)); |
858 | | |
859 | | // T1: Should read { 1 -> 11, 2 -> 12 }. |
860 | 0 | VERIFY_ROW(session, 1, 11); |
861 | 0 | VERIFY_ROW(session, 2, 12); |
862 | |
|
863 | 0 | txn->Abort(); |
864 | 0 | } |
865 | |
|
866 | 0 | ASSERT_OK(WaitTransactionsCleaned()); |
867 | |
|
868 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); |
869 | 0 | ASSERT_OK(cluster_->CompactTablets()); |
870 | | |
871 | | // Should read { 1 -> 1, 2 -> 2 }, since T1 has been aborted. |
872 | 0 | { |
873 | 0 | auto session = CreateSession(); |
874 | 0 | VERIFY_ROW(session, 1, 1); |
875 | 0 | VERIFY_ROW(session, 2, 2); |
876 | 0 | } |
877 | |
|
878 | 0 | ASSERT_OK(WaitIntentsCleaned()); |
879 | |
|
880 | 0 | ASSERT_OK(cluster_->RestartSync()); |
881 | 0 | } |
882 | | |
883 | | class QLTransactionTestWithDisabledCompactions : public QLTransactionTest { |
884 | | public: |
885 | 1 | void SetUp() override { |
886 | 1 | FLAGS_rocksdb_disable_compactions = true; |
887 | 1 | QLTransactionTest::SetUp(); |
888 | 1 | } |
889 | | }; |
890 | | |
891 | 0 | TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDisabledCompactions) { |
892 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
893 | 0 | FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true; |
894 | 0 | FLAGS_aborted_intent_cleanup_ms = 1000; // 1 sec |
895 | 0 | FLAGS_delete_intents_sst_files = false; |
896 | |
|
897 | 0 | #ifndef NDEBUG |
898 | 0 | constexpr int kTransactions = 10; |
899 | | #else |
900 | | constexpr int kTransactions = 20; |
901 | | #endif |
902 | | // Empirically determined constant. |
903 | 0 | constexpr int kBytesPerRow = 75; |
904 | 0 | constexpr int kRequiredCompactedBytes = kTransactions * kNumRows * kBytesPerRow; |
905 | |
|
906 | 0 | LOG(INFO) << "Write values"; |
907 | |
|
908 | 0 | for (int i = 0; i != kTransactions; ++i) { |
909 | 0 | SCOPED_TRACE(Format("Transaction $0", i)); |
910 | 0 | auto txn = CreateTransaction(); |
911 | 0 | auto session = CreateSession(txn); |
912 | 0 | for (int row = 0; row != kNumRows; ++row) { |
913 | 0 | ASSERT_OK(WriteRow(session, i * kNumRows + row, row)); |
914 | 0 | } |
915 | 0 | ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kAsync)); |
916 | | |
917 | | // Need some time for flush to be initiated. |
918 | 0 | std::this_thread::sleep_for(100ms); |
919 | |
|
920 | 0 | txn->Abort(); |
921 | 0 | } |
922 | |
|
923 | 0 | ASSERT_OK(WaitTransactionsCleaned()); |
924 | |
|
925 | 0 | LOG(INFO) << "Shutdown cluster"; |
926 | 0 | cluster_->Shutdown(); |
927 | |
|
928 | 0 | std::this_thread::sleep_for(FLAGS_aborted_intent_cleanup_ms * 1ms); |
929 | |
|
930 | 0 | FLAGS_TEST_delay_init_tablet_peer_ms = 100; |
931 | 0 | FLAGS_rocksdb_disable_compactions = false; |
932 | |
|
933 | 0 | LOG(INFO) << "Start cluster"; |
934 | 0 | ASSERT_OK(cluster_->StartSync()); |
935 | |
|
936 | 0 | ASSERT_OK(WaitFor([cluster = cluster_.get()] { |
937 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll); |
938 | 0 | int64_t bytes = 0; |
939 | 0 | for (const auto& peer : peers) { |
940 | 0 | if (peer->tablet()) { |
941 | 0 | bytes += |
942 | 0 | peer->tablet()->intentsdb_statistics()->getTickerCount(rocksdb::COMPACT_READ_BYTES); |
943 | 0 | } |
944 | 0 | } |
945 | 0 | LOG(INFO) << "Compact read bytes: " << bytes; |
946 | |
|
947 | 0 | return bytes >= kRequiredCompactedBytes; |
948 | 0 | }, 10s, "Enough compactions happen")); |
949 | 0 | } |
950 | | |
951 | 0 | TEST_F(QLTransactionTest, ResolveIntentsWriteReadBeforeAndAfterCommit) { |
952 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
953 | 0 | DisableApplyingIntents(); |
954 | | |
955 | | // Write { 1 -> 1, 2 -> 2 }. |
956 | 0 | { |
957 | 0 | auto session = CreateSession(); |
958 | 0 | ASSERT_OK(WriteRow(session, 1, 1)); |
959 | 0 | ASSERT_OK(WriteRow(session, 2, 2)); |
960 | 0 | } |
961 | | |
962 | | // Start T1. |
963 | 0 | auto txn1 = CreateTransaction(); |
964 | 0 | auto session1 = CreateSession(txn1); |
965 | | |
966 | | // T1: Update { 1 -> 11, 2 -> 12 }. |
967 | 0 | ASSERT_OK(UpdateRow(session1, 1, 11)); |
968 | 0 | ASSERT_OK(UpdateRow(session1, 2, 12)); |
969 | | |
970 | | // Start T2. |
971 | 0 | auto txn2 = CreateTransaction(); |
972 | 0 | auto session2 = CreateSession(txn2); |
973 | | |
974 | | // T2: Should read { 1 -> 1, 2 -> 2 }. |
975 | 0 | VERIFY_ROW(session2, 1, 1); |
976 | 0 | VERIFY_ROW(session2, 2, 2); |
977 | | |
978 | | // T1: Commit |
979 | 0 | CommitAndResetSync(&txn1); |
980 | | |
981 | | // T2: Should still read { 1 -> 1, 2 -> 2 }, because it should read at the time of it's start. |
982 | 0 | VERIFY_ROW(session2, 1, 1); |
983 | 0 | VERIFY_ROW(session2, 2, 2); |
984 | | |
985 | | // Simple read should get { 1 -> 11, 2 -> 12 }, since T1 has been already committed. |
986 | 0 | { |
987 | 0 | auto session = CreateSession(); |
988 | 0 | VERIFY_ROW(session, 1, 11); |
989 | 0 | VERIFY_ROW(session, 2, 12); |
990 | 0 | } |
991 | |
|
992 | 0 | ASSERT_NO_FATALS(CommitAndResetSync(&txn2)); |
993 | |
|
994 | 0 | ASSERT_OK(cluster_->RestartSync()); |
995 | 0 | } |
996 | | |
997 | 0 | TEST_F(QLTransactionTest, ResolveIntentsCheckConsistency) { |
998 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
999 | 0 | DisableApplyingIntents(); |
1000 | | |
1001 | | // Write { 1 -> 1, 2 -> 2 }. |
1002 | 0 | { |
1003 | 0 | auto session = CreateSession(); |
1004 | 0 | ASSERT_OK(WriteRow(session, 1, 1)); |
1005 | 0 | ASSERT_OK(WriteRow(session, 2, 2)); |
1006 | 0 | } |
1007 | | |
1008 | | // Start T1. |
1009 | 0 | auto txn1 = CreateTransaction(); |
1010 | | |
1011 | | // T1: Update { 1 -> 11, 2 -> 12 }. |
1012 | 0 | { |
1013 | 0 | auto session = CreateSession(txn1); |
1014 | 0 | ASSERT_OK(UpdateRow(session, 1, 11)); |
1015 | 0 | ASSERT_OK(UpdateRow(session, 2, 12)); |
1016 | 0 | } |
1017 | | |
1018 | | // T1: Request commit. |
1019 | 0 | CountDownLatch commit_latch(1); |
1020 | 0 | txn1->Commit([&commit_latch](const Status& status) { |
1021 | 0 | ASSERT_OK(status); |
1022 | 0 | commit_latch.CountDown(1); |
1023 | 0 | }); |
1024 | | |
1025 | | // Start T2. |
1026 | 0 | auto txn2 = CreateTransaction(SetReadTime::kTrue); |
1027 | | |
1028 | | // T2: Should read { 1 -> 1, 2 -> 2 } even in case T1 is committed between reading k1 and k2. |
1029 | 0 | { |
1030 | 0 | auto session = CreateSession(txn2); |
1031 | 0 | VERIFY_ROW(session, 1, 1); |
1032 | 0 | commit_latch.Wait(); |
1033 | 0 | VERIFY_ROW(session, 2, 2); |
1034 | 0 | } |
1035 | | |
1036 | | // Simple read should get { 1 -> 11, 2 -> 12 }, since T1 has been already committed. |
1037 | 0 | { |
1038 | 0 | auto session = CreateSession(); |
1039 | 0 | VERIFY_ROW(session, 1, 11); |
1040 | 0 | VERIFY_ROW(session, 2, 12); |
1041 | 0 | } |
1042 | |
|
1043 | 0 | CommitAndResetSync(&txn2); |
1044 | |
|
1045 | 0 | ASSERT_OK(cluster_->RestartSync()); |
1046 | 0 | } |
1047 | | |
1048 | | // This test launches write thread, that writes increasing value to key using transaction. |
1049 | | // Then it launches multiple read threads, each of them tries to read this key and |
1050 | | // verifies that its value is at least the same like it was written before read was started. |
1051 | | // |
1052 | | // It is don't for multiple keys sequentially. So those keys are located on different tablets |
1053 | | // and tablet servers, and we test different cases of clock skew. |
1054 | 0 | TEST_F_EX(QLTransactionTest, CorrectStatusRequestBatching, QLTransactionBigLogSegmentSizeTest) { |
1055 | 0 | const auto kClockSkew = 100ms; |
1056 | 0 | constexpr auto kMinWrites = RegularBuildVsSanitizers(25, 1); |
1057 | 0 | constexpr auto kMinReads = 10; |
1058 | 0 | constexpr size_t kConcurrentReads = RegularBuildVsSanitizers<size_t>(20, 5); |
1059 | |
|
1060 | 0 | FLAGS_TEST_transaction_delay_status_reply_usec_in_tests = 200000; |
1061 | 0 | SetAtomicFlag(std::chrono::microseconds(kClockSkew).count() * 3, &FLAGS_max_clock_skew_usec); |
1062 | |
|
1063 | 0 | auto delta_changers = SkewClocks(cluster_.get(), kClockSkew); |
1064 | |
|
1065 | 0 | for (int32_t key = 0; key != 10; ++key) { |
1066 | 0 | std::atomic<bool> stop(false); |
1067 | 0 | std::atomic<int32_t> value(0); |
1068 | |
|
1069 | 0 | std::thread write_thread([this, key, &stop, &value] { |
1070 | 0 | CDSAttacher attacher; |
1071 | 0 | auto session = CreateSession(); |
1072 | 0 | while (!stop) { |
1073 | 0 | auto txn = CreateTransaction(); |
1074 | 0 | session->SetTransaction(txn); |
1075 | 0 | auto write_result = WriteRow(session, key, value + 1); |
1076 | 0 | if (write_result.ok()) { |
1077 | 0 | auto status = txn->CommitFuture().get(); |
1078 | 0 | if (status.ok()) { |
1079 | 0 | ++value; |
1080 | 0 | } |
1081 | 0 | } |
1082 | 0 | } |
1083 | 0 | }); |
1084 | |
|
1085 | 0 | std::vector<std::thread> read_threads; |
1086 | 0 | std::array<std::atomic<size_t>, kConcurrentReads> reads; |
1087 | 0 | for (auto& read : reads) { |
1088 | 0 | read.store(0); |
1089 | 0 | } |
1090 | |
|
1091 | 0 | for (size_t i = 0; i != kConcurrentReads; ++i) { |
1092 | 0 | read_threads.emplace_back([this, key, &stop, &value, &read = reads[i]] { |
1093 | 0 | CDSAttacher attacher; |
1094 | 0 | auto session = CreateSession(); |
1095 | 0 | StopOnFailure stop_on_failure(&stop); |
1096 | 0 | while (!stop) { |
1097 | 0 | auto value_before_start = value.load(); |
1098 | 0 | YBqlReadOpPtr op = ReadRow(session, key); |
1099 | 0 | ASSERT_OK(session->Flush()); |
1100 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK) |
1101 | 0 | << op->response().ShortDebugString(); |
1102 | 0 | auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock(); |
1103 | 0 | int32_t current_value; |
1104 | 0 | if (rowblock->row_count() == 0) { |
1105 | 0 | current_value = 0; |
1106 | 0 | } else { |
1107 | 0 | current_value = rowblock->row(0).column(0).int32_value(); |
1108 | 0 | } |
1109 | 0 | ASSERT_GE(current_value, value_before_start); |
1110 | 0 | ++read; |
1111 | 0 | } |
1112 | 0 | stop_on_failure.Success(); |
1113 | 0 | }); |
1114 | 0 | } |
1115 | |
|
1116 | 0 | WaitStopped(10s, &stop); |
1117 | | |
1118 | | // Already failed |
1119 | 0 | bool failed = stop.exchange(true); |
1120 | 0 | write_thread.join(); |
1121 | |
|
1122 | 0 | for (auto& thread : read_threads) { |
1123 | 0 | thread.join(); |
1124 | 0 | } |
1125 | |
|
1126 | 0 | if (failed) { |
1127 | 0 | break; |
1128 | 0 | } |
1129 | | |
1130 | 0 | LOG(INFO) << "Writes: " << value.load() << ", reads: " << yb::ToString(reads); |
1131 | |
|
1132 | 0 | EXPECT_GE(value.load(), kMinWrites); |
1133 | 0 | for (auto& read : reads) { |
1134 | 0 | EXPECT_GE(read.load(), kMinReads); |
1135 | 0 | } |
1136 | 0 | } |
1137 | |
|
1138 | 0 | cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back. |
1139 | 0 | cluster_.reset(); |
1140 | 0 | } |
1141 | | |
1142 | | struct TransactionState { |
1143 | | YBTransactionPtr transaction; |
1144 | | std::shared_future<Result<TransactionMetadata>> metadata_future; |
1145 | | std::future<Status> commit_future; |
1146 | | std::future<Result<tserver::GetTransactionStatusResponsePB>> status_future; |
1147 | | TransactionMetadata metadata; |
1148 | | HybridTime status_time = HybridTime::kMin; |
1149 | | TransactionStatus last_status = TransactionStatus::PENDING; |
1150 | | |
1151 | | void CheckStatus() { |
1152 | | ASSERT_TRUE(status_future.valid()); |
1153 | | ASSERT_EQ(status_future.wait_for(NonTsanVsTsan(3s, 10s)), std::future_status::ready); |
1154 | | auto resp = status_future.get(); |
1155 | | ASSERT_OK(resp); |
1156 | | |
1157 | | ASSERT_EQ(1, resp->status().size()); |
1158 | | ASSERT_EQ(1, resp->status_hybrid_time().size()); |
1159 | | |
1160 | | if (resp->status(0) == TransactionStatus::ABORTED) { |
1161 | | ASSERT_TRUE(commit_future.valid()); |
1162 | | transaction = nullptr; |
1163 | | return; |
1164 | | } |
1165 | | |
1166 | | auto new_time = HybridTime(resp->status_hybrid_time()[0]); |
1167 | | if (last_status == TransactionStatus::PENDING) { |
1168 | | if (resp->status(0) == TransactionStatus::PENDING) { |
1169 | | ASSERT_GE(new_time, status_time); |
1170 | | } else { |
1171 | | ASSERT_EQ(TransactionStatus::COMMITTED, resp->status(0)); |
1172 | | ASSERT_GT(new_time, status_time); |
1173 | | } |
1174 | | } else { |
1175 | | ASSERT_EQ(last_status, TransactionStatus::COMMITTED); |
1176 | | ASSERT_EQ(resp->status(0), TransactionStatus::COMMITTED) |
1177 | | << "Bad transaction status: " << TransactionStatus_Name(resp->status(0)); |
1178 | | ASSERT_EQ(status_time, new_time); |
1179 | | } |
1180 | | status_time = new_time; |
1181 | | last_status = resp->status(0); |
1182 | | } |
1183 | | }; |
1184 | | |
1185 | | // Test transaction status evolution. |
1186 | | // The following should happen: |
1187 | | // If both previous and new transaction state are PENDING, then the new time of status is >= the |
1188 | | // old time of status. |
1189 | | // Previous - PENDING, new - COMMITTED, new_time > old_time. |
1190 | | // Previous - COMMITTED, new - COMMITTED, new_time == old_time. |
1191 | | // All other cases are invalid. |
1192 | 0 | TEST_F(QLTransactionTest, StatusEvolution) { |
1193 | | // We don't care about exact probability of create/commit operations. |
1194 | | // Just create rate should be higher than commit one. |
1195 | 0 | const int kTransactionCreateChance = 10; |
1196 | 0 | const int kTransactionCommitChance = 20; |
1197 | 0 | size_t transactions_to_create = 10; |
1198 | 0 | size_t active_transactions = 0; |
1199 | 0 | std::vector<TransactionState> states; |
1200 | 0 | rpc::Rpcs rpcs; |
1201 | 0 | states.reserve(transactions_to_create); |
1202 | |
|
1203 | 0 | while (transactions_to_create || active_transactions) { |
1204 | 0 | if (transactions_to_create && |
1205 | 0 | (!active_transactions || RandomWithChance(kTransactionCreateChance))) { |
1206 | 0 | LOG(INFO) << "Create transaction"; |
1207 | 0 | auto txn = CreateTransaction(); |
1208 | 0 | { |
1209 | 0 | auto session = CreateSession(txn); |
1210 | | // Insert using different keys to avoid conflicts. |
1211 | 0 | int idx = narrow_cast<int>(states.size()); |
1212 | 0 | ASSERT_OK(WriteRow(session, idx, idx)); |
1213 | 0 | } |
1214 | 0 | states.push_back({ txn, txn->GetMetadata() }); |
1215 | 0 | ++active_transactions; |
1216 | 0 | --transactions_to_create; |
1217 | 0 | } |
1218 | 0 | if (active_transactions && RandomWithChance(kTransactionCommitChance)) { |
1219 | 0 | LOG(INFO) << "Destroy transaction"; |
1220 | 0 | size_t idx = RandomUniformInt<size_t>(1, active_transactions); |
1221 | 0 | for (auto& state : states) { |
1222 | 0 | if (!state.transaction) { |
1223 | 0 | continue; |
1224 | 0 | } |
1225 | 0 | if (!--idx) { |
1226 | 0 | state.commit_future = state.transaction->CommitFuture(); |
1227 | 0 | break; |
1228 | 0 | } |
1229 | 0 | } |
1230 | 0 | } |
1231 | |
|
1232 | 0 | for (auto& state : states) { |
1233 | 0 | if (!state.transaction) { |
1234 | 0 | continue; |
1235 | 0 | } |
1236 | 0 | if (state.metadata.isolation == IsolationLevel::NON_TRANSACTIONAL) { |
1237 | 0 | if (!IsReady(state.metadata_future)) { |
1238 | 0 | continue; |
1239 | 0 | } |
1240 | 0 | state.metadata = ASSERT_RESULT(Copy(state.metadata_future.get())); |
1241 | 0 | } |
1242 | 0 | tserver::GetTransactionStatusRequestPB req; |
1243 | 0 | req.set_tablet_id(state.metadata.status_tablet); |
1244 | 0 | req.add_transaction_id()->assign( |
1245 | 0 | pointer_cast<const char*>(state.metadata.transaction_id.data()), |
1246 | 0 | state.metadata.transaction_id.size()); |
1247 | 0 | state.status_future = rpc::WrapRpcFuture<tserver::GetTransactionStatusResponsePB>( |
1248 | 0 | GetTransactionStatus, &rpcs)( |
1249 | 0 | TransactionRpcDeadline(), nullptr /* tablet */, client_.get(), &req); |
1250 | 0 | } |
1251 | 0 | for (auto& state : states) { |
1252 | 0 | if (!state.transaction) { |
1253 | 0 | continue; |
1254 | 0 | } |
1255 | 0 | state.CheckStatus(); |
1256 | 0 | if (!state.transaction) { |
1257 | 0 | --active_transactions; |
1258 | 0 | } |
1259 | 0 | } |
1260 | 0 | } |
1261 | |
|
1262 | 0 | for (auto& state : states) { |
1263 | 0 | ASSERT_EQ(state.commit_future.wait_for(NonTsanVsTsan(3s, 15s)), std::future_status::ready); |
1264 | 0 | } |
1265 | 0 | } |
1266 | | |
1267 | | // Writing multiple keys concurrently, each key is increasing by 1 at each step. |
1268 | | // At the same time concurrently execute several transactions that read all those keys. |
1269 | | // Suppose two transactions have read values t1_i and t2_i respectively. |
1270 | | // And t1_j > t2_j for some j, then we expect that t1_i >= t2_i for all i. |
1271 | | // |
1272 | | // Suppose we have 2 transactions, both reading k1 (from tablet1) and k2 (from tablet2). |
1273 | | // ht1 - read time of first transaction, and ht2 - read time of second transaction. |
1274 | | // Suppose ht1 <= ht2 for simplicity. |
1275 | | // Old value of k1 is v1before, and after ht_k1 it has v1after. |
1276 | | // Old value of k2 is v2before, and after ht_k2 it has v2after. |
1277 | | // ht_k1 <= ht1, ht_k2 <= ht1. |
1278 | | // |
1279 | | // Suppose following sequence of read requests: |
1280 | | // 1) The read request for the first transaction arrives at tablet1 when it has safe read |
1281 | | // time < ht1. But it is already replicating k1 (with ht_k1). Then it would read v1before for k1. |
1282 | | // 2) The read request for the second transaction arrives at tablet2 when it has safe read |
1283 | | // time < ht2. But it is already replicating k2 (with ht_k2). So it reads v2before for k2. |
1284 | | // 3) The remaining read request requests arrive after the appropriate operations have replicated. |
1285 | | // So we get v2after in the first transaction and v1after for the second. |
1286 | | // The read result for the first transaction (v1before, v2after), for the second is is |
1287 | | // (v1after, v2before). |
1288 | | // |
1289 | | // Such read is inconsistent. |
1290 | | // |
1291 | | // This test addresses this issue. |
1292 | 0 | TEST_F_EX(QLTransactionTest, WaitRead, QLTransactionBigLogSegmentSizeTest) { |
1293 | 0 | constexpr int kWriteThreads = 10; |
1294 | 0 | constexpr size_t kCycles = 100; |
1295 | 0 | constexpr size_t kConcurrentReads = 4; |
1296 | |
|
1297 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
1298 | |
|
1299 | 0 | std::atomic<bool> stop(false); |
1300 | 0 | std::vector<std::thread> threads; |
1301 | |
|
1302 | 0 | for (int i = 0; i != kWriteThreads; ++i) { |
1303 | 0 | threads.emplace_back([this, i, &stop] { |
1304 | 0 | CDSAttacher attacher; |
1305 | 0 | auto session = CreateSession(); |
1306 | 0 | int32_t value = 0; |
1307 | 0 | while (!stop) { |
1308 | 0 | ASSERT_OK(WriteRow(session, i, ++value)); |
1309 | 0 | } |
1310 | 0 | }); |
1311 | 0 | } |
1312 | |
|
1313 | 0 | CountDownLatch latch(kConcurrentReads); |
1314 | |
|
1315 | 0 | std::vector<std::vector<YBqlReadOpPtr>> reads(kConcurrentReads); |
1316 | 0 | std::vector<std::shared_future<Status>> futures(kConcurrentReads); |
1317 | | // values[i] contains values read by i-th transaction. |
1318 | 0 | std::vector<std::vector<int32_t>> values(kConcurrentReads); |
1319 | |
|
1320 | 0 | for (size_t i = 0; i != kCycles; ++i) { |
1321 | 0 | latch.Reset(kConcurrentReads); |
1322 | 0 | for (size_t j = 0; j != kConcurrentReads; ++j) { |
1323 | 0 | values[j].clear(); |
1324 | 0 | auto session = CreateSession(CreateTransaction()); |
1325 | 0 | for (int key = 0; key != kWriteThreads; ++key) { |
1326 | 0 | reads[j].push_back(ReadRow(session, key)); |
1327 | 0 | } |
1328 | 0 | session->FlushAsync([&latch](FlushStatus* flush_status) { |
1329 | 0 | ASSERT_OK(flush_status->status); |
1330 | 0 | latch.CountDown(); |
1331 | 0 | }); |
1332 | 0 | } |
1333 | 0 | latch.Wait(); |
1334 | 0 | for (size_t j = 0; j != kConcurrentReads; ++j) { |
1335 | 0 | values[j].clear(); |
1336 | 0 | for (auto& op : reads[j]) { |
1337 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK) |
1338 | 0 | << op->response().ShortDebugString(); |
1339 | 0 | auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock(); |
1340 | 0 | if (rowblock->row_count() == 1) { |
1341 | 0 | values[j].push_back(rowblock->row(0).column(0).int32_value()); |
1342 | 0 | } else { |
1343 | 0 | values[j].push_back(0); |
1344 | 0 | } |
1345 | 0 | } |
1346 | 0 | } |
1347 | 0 | std::sort(values.begin(), values.end()); |
1348 | 0 | for (size_t j = 1; j != kConcurrentReads; ++j) { |
1349 | 0 | for (size_t k = 0; k != values[j].size(); ++k) { |
1350 | 0 | ASSERT_GE(values[j][k], values[j - 1][k]); |
1351 | 0 | } |
1352 | 0 | } |
1353 | 0 | } |
1354 | |
|
1355 | 0 | stop = true; |
1356 | 0 | for (auto& thread : threads) { |
1357 | 0 | thread.join(); |
1358 | 0 | } |
1359 | 0 | } |
1360 | | |
1361 | 0 | TEST_F(QLTransactionTest, InsertDelete) { |
1362 | 0 | DisableApplyingIntents(); |
1363 | |
|
1364 | 0 | auto txn = CreateTransaction(); |
1365 | 0 | auto session = CreateSession(txn); |
1366 | 0 | ASSERT_OK(WriteRow(session, 1 /* key */, 10 /* value */, WriteOpType::INSERT)); |
1367 | 0 | ASSERT_OK(DeleteRow(session, 1 /* key */)); |
1368 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
1369 | |
|
1370 | 0 | session = CreateSession(); |
1371 | 0 | auto row = SelectRow(session, 1 /* key */); |
1372 | 0 | ASSERT_FALSE(row.ok()) << "Row: " << row; |
1373 | 0 | } |
1374 | | |
1375 | 0 | TEST_F(QLTransactionTest, InsertDeleteWithClusterRestart) { |
1376 | 0 | DisableApplyingIntents(); |
1377 | 0 | DisableTransactionTimeout(); |
1378 | 0 | constexpr int kKeys = 100; |
1379 | |
|
1380 | 0 | for (int i = 0; i != kKeys; ++i) { |
1381 | 0 | ASSERT_OK(WriteRow(CreateSession(), i /* key */, i * 2 /* value */, WriteOpType::INSERT)); |
1382 | 0 | } |
1383 | |
|
1384 | 0 | auto txn = CreateTransaction(); |
1385 | 0 | auto session = CreateSession(txn); |
1386 | 0 | for (int i = 0; i != kKeys; ++i) { |
1387 | 0 | SCOPED_TRACE(Format("Key: $0", i)); |
1388 | 0 | ASSERT_OK(WriteRow(session, i /* key */, i * 3 /* value */, WriteOpType::UPDATE)); |
1389 | 0 | } |
1390 | |
|
1391 | 0 | std::this_thread::sleep_for(1s); // Wait some time for intents to populate. |
1392 | 0 | ASSERT_OK(cluster_->RestartSync()); |
1393 | |
|
1394 | 0 | for (int i = 0; i != kKeys; ++i) { |
1395 | 0 | SCOPED_TRACE(Format("Key: $0", i)); |
1396 | 0 | ASSERT_OK(DeleteRow(session, i /* key */)); |
1397 | 0 | } |
1398 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
1399 | |
|
1400 | 0 | session = CreateSession(); |
1401 | 0 | for (int i = 0; i != kKeys; ++i) { |
1402 | 0 | SCOPED_TRACE(Format("Key: $0", i)); |
1403 | 0 | auto row = SelectRow(session, 1 /* key */); |
1404 | 0 | ASSERT_FALSE(row.ok()) << "Row: " << row; |
1405 | 0 | } |
1406 | 0 | } |
1407 | | |
1408 | 0 | TEST_F_EX(QLTransactionTest, ChangeLeader, QLTransactionBigLogSegmentSizeTest) { |
1409 | 0 | constexpr size_t kThreads = 2; |
1410 | 0 | constexpr auto kTestTime = 5s; |
1411 | |
|
1412 | 0 | DisableTransactionTimeout(); |
1413 | 0 | FLAGS_transaction_rpc_timeout_ms = MonoDelta(1min).ToMilliseconds(); |
1414 | |
|
1415 | 0 | std::vector<std::thread> threads; |
1416 | 0 | std::atomic<bool> stopped{false}; |
1417 | 0 | std::atomic<int> successes{0}; |
1418 | 0 | std::atomic<int> expirations{0}; |
1419 | 0 | for (size_t i = 0; i != kThreads; ++i) { |
1420 | 0 | threads.emplace_back([this, i, &stopped, &successes, &expirations] { |
1421 | 0 | CDSAttacher attacher; |
1422 | 0 | size_t idx = i; |
1423 | 0 | while (!stopped) { |
1424 | 0 | auto txn = CreateTransaction(); |
1425 | 0 | ASSERT_OK(WriteRows(CreateSession(txn), idx, WriteOpType::INSERT)); |
1426 | 0 | auto status = txn->CommitFuture().get(); |
1427 | 0 | if (status.ok()) { |
1428 | 0 | ++successes; |
1429 | 0 | } else { |
1430 | | // We allow expiration on commit, because it means that commit succeed after leader |
1431 | | // change. And we just did not receive respose. But rate of such cases should be small. |
1432 | 0 | ASSERT_TRUE(status.IsExpired()) << status; |
1433 | 0 | ++expirations; |
1434 | 0 | } |
1435 | 0 | idx += kThreads; |
1436 | 0 | } |
1437 | 0 | }); |
1438 | 0 | } |
1439 | |
|
1440 | 0 | auto test_finish = std::chrono::steady_clock::now() + kTestTime; |
1441 | 0 | while (std::chrono::steady_clock::now() < test_finish) { |
1442 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1443 | 0 | auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
1444 | 0 | for (const auto& peer : peers) { |
1445 | 0 | if (peer->consensus() && |
1446 | 0 | peer->consensus()->GetLeaderStatus() != |
1447 | 0 | consensus::LeaderStatus::NOT_LEADER && |
1448 | 0 | peer->tablet()->transaction_coordinator() && |
1449 | 0 | peer->tablet()->transaction_coordinator()->test_count_transactions()) { |
1450 | 0 | consensus::LeaderStepDownRequestPB req; |
1451 | 0 | req.set_tablet_id(peer->tablet_id()); |
1452 | 0 | consensus::LeaderStepDownResponsePB resp; |
1453 | 0 | ASSERT_OK(peer->consensus()->StepDown(&req, &resp)); |
1454 | 0 | } |
1455 | 0 | } |
1456 | 0 | } |
1457 | 0 | std::this_thread::sleep_for(3s); |
1458 | 0 | } |
1459 | 0 | stopped = true; |
1460 | |
|
1461 | 0 | for (auto& thread : threads) { |
1462 | 0 | thread.join(); |
1463 | 0 | } |
1464 | | |
1465 | | // Allow expirations to be 5% of successful commits. |
1466 | 0 | ASSERT_LE(expirations.load() * 100, successes * 5); |
1467 | 0 | } |
1468 | | |
1469 | | class RemoteBootstrapTest : public QLTransactionTest { |
1470 | | protected: |
1471 | 1 | void SetUp() override { |
1472 | 1 | FLAGS_remote_bootstrap_max_chunk_size = 1_KB; |
1473 | 1 | FLAGS_log_min_seconds_to_retain = 1; |
1474 | 1 | QLTransactionTest::SetUp(); |
1475 | 1 | } |
1476 | | }; |
1477 | | |
1478 | | // Check that we do correct remote bootstrap for intents db. |
1479 | | // Workflow is the following: |
1480 | | // Shutdown TServer with index 0. |
1481 | | // Write some data to two remaining servers. |
1482 | | // Flush data and clean logs. |
1483 | | // Restart cluster. |
1484 | | // Verify that all tablets at all tservers are up and running. |
1485 | | // Verify that all tservers have same amount of running tablets. |
1486 | | // During test tear down cluster verifier will check that all servers have same data. |
1487 | 0 | TEST_F_EX(QLTransactionTest, RemoteBootstrap, RemoteBootstrapTest) { |
1488 | 0 | constexpr size_t kNumWrites = 10; |
1489 | 0 | constexpr size_t kTransactionalWrites = 8; |
1490 | 0 | constexpr size_t kNumRows = 30; |
1491 | |
|
1492 | 0 | DisableTransactionTimeout(); |
1493 | 0 | DisableApplyingIntents(); |
1494 | |
|
1495 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
1496 | |
|
1497 | 0 | for (size_t i = 0; i != kNumWrites; ++i) { |
1498 | 0 | auto transaction = i < kTransactionalWrites ? CreateTransaction() : nullptr; |
1499 | 0 | auto session = CreateSession(transaction); |
1500 | 0 | for (size_t r = 0; r != kNumRows; ++r) { |
1501 | 0 | ASSERT_OK(WriteRow( |
1502 | 0 | session, |
1503 | 0 | KeyForTransactionAndIndex(i, r), |
1504 | 0 | ValueForTransactionAndIndex(i, r, WriteOpType::INSERT))); |
1505 | 0 | } |
1506 | 0 | if (transaction) { |
1507 | 0 | ASSERT_OK(transaction->CommitFuture().get()); |
1508 | 0 | } |
1509 | 0 | } |
1510 | |
|
1511 | 0 | VerifyData(kNumWrites); |
1512 | | |
1513 | | // Wait until all tablets done writing to db. |
1514 | 0 | std::this_thread::sleep_for(5s); |
1515 | |
|
1516 | 0 | LOG(INFO) << "Flushing"; |
1517 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
1518 | |
|
1519 | 0 | LOG(INFO) << "Clean logs"; |
1520 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
1521 | | |
1522 | | // Wait logs cleanup. |
1523 | 0 | std::this_thread::sleep_for(5s * kTimeMultiplier); |
1524 | | |
1525 | | // Shutdown to reset cached logs. |
1526 | 0 | for (size_t i = 1; i != cluster_->num_tablet_servers(); ++i) { |
1527 | 0 | cluster_->mini_tablet_server(i)->Shutdown(); |
1528 | 0 | } |
1529 | | |
1530 | | // Start all servers. Cluster verifier should check that all tablets are synchronized. |
1531 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1532 | 0 | ASSERT_OK(cluster_->mini_tablet_server(i)->Start()); |
1533 | 0 | } |
1534 | |
|
1535 | 0 | ASSERT_OK(WaitFor([this] { return CheckAllTabletsRunning(); }, 20s * kTimeMultiplier, |
1536 | 0 | "All tablets running")); |
1537 | 0 | } |
1538 | | |
1539 | 0 | TEST_F(QLTransactionTest, FlushIntents) { |
1540 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
1541 | |
|
1542 | 0 | WriteData(); |
1543 | 0 | ASSERT_OK(WriteRows(CreateSession(), 1)); |
1544 | |
|
1545 | 0 | VerifyData(2); |
1546 | |
|
1547 | 0 | ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, tablet::FlushFlags::kIntents)); |
1548 | 0 | cluster_->Shutdown(); |
1549 | 0 | ASSERT_OK(cluster_->StartSync()); |
1550 | |
|
1551 | 0 | VerifyData(2); |
1552 | 0 | } |
1553 | | |
1554 | | // This test checks that read restart never happen during first read request to single table. |
1555 | 0 | TEST_F_EX(QLTransactionTest, PickReadTimeAtServer, QLTransactionBigLogSegmentSizeTest) { |
1556 | 0 | constexpr int kKeys = 10; |
1557 | 0 | constexpr int kThreads = 5; |
1558 | |
|
1559 | 0 | std::atomic<bool> stop(false); |
1560 | 0 | std::vector<std::thread> threads; |
1561 | 0 | while (threads.size() != kThreads) { |
1562 | 0 | threads.emplace_back([this, &stop] { |
1563 | 0 | CDSAttacher attacher; |
1564 | 0 | StopOnFailure stop_on_failure(&stop); |
1565 | 0 | while (!stop.load(std::memory_order_acquire)) { |
1566 | 0 | auto txn = CreateTransaction(); |
1567 | 0 | auto session = CreateSession(txn); |
1568 | 0 | auto key = RandomUniformInt(1, kKeys); |
1569 | 0 | auto value_result = SelectRow(session, key); |
1570 | 0 | int value; |
1571 | 0 | if (value_result.ok()) { |
1572 | 0 | value = *value_result; |
1573 | 0 | } else { |
1574 | 0 | ASSERT_TRUE(value_result.status().IsNotFound()) << value_result.status(); |
1575 | 0 | value = 0; |
1576 | 0 | } |
1577 | 0 | auto status = ResultToStatus(WriteRow(session, key, value)); |
1578 | 0 | if (status.ok()) { |
1579 | 0 | status = txn->CommitFuture().get(); |
1580 | 0 | } |
1581 | | // Write or commit could fail because of conflict during write or transaction conflict |
1582 | | // during commit. |
1583 | 0 | ASSERT_TRUE(status.ok() || status.IsTryAgain() || status.IsExpired()) << status; |
1584 | 0 | } |
1585 | 0 | stop_on_failure.Success(); |
1586 | 0 | }); |
1587 | 0 | } |
1588 | |
|
1589 | 0 | WaitStopped(30s, &stop); |
1590 | |
|
1591 | 0 | stop.store(true, std::memory_order_release); |
1592 | |
|
1593 | 0 | for (auto& thread : threads) { |
1594 | 0 | thread.join(); |
1595 | 0 | } |
1596 | 0 | } |
1597 | | |
1598 | | // Test that we could init transaction after it was originally created. |
1599 | 0 | TEST_F(QLTransactionTest, DelayedInit) { |
1600 | 0 | SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test. |
1601 | |
|
1602 | 0 | auto txn1 = std::make_shared<YBTransaction>(transaction_manager_.get_ptr()); |
1603 | 0 | auto txn2 = std::make_shared<YBTransaction>(transaction_manager_.get_ptr()); |
1604 | |
|
1605 | 0 | auto write_session = CreateSession(); |
1606 | 0 | ASSERT_OK(WriteRow(write_session, 0, 0)); |
1607 | |
|
1608 | 0 | ConsistentReadPoint read_point(transaction_manager_->clock()); |
1609 | 0 | read_point.SetCurrentReadTime(); |
1610 | |
|
1611 | 0 | ASSERT_OK(WriteRow(write_session, 1, 1)); |
1612 | |
|
1613 | 0 | ASSERT_OK(txn1->Init(IsolationLevel::SNAPSHOT_ISOLATION, read_point.GetReadTime())); |
1614 | | // To check delayed init we specify read time here. |
1615 | 0 | ASSERT_OK(txn2->Init( |
1616 | 0 | IsolationLevel::SNAPSHOT_ISOLATION, |
1617 | 0 | ReadHybridTime::FromHybridTimeRange(transaction_manager_->clock()->NowRange()))); |
1618 | |
|
1619 | 0 | ASSERT_OK(WriteRow(write_session, 2, 2)); |
1620 | |
|
1621 | 0 | { |
1622 | 0 | auto read_session = CreateSession(txn1); |
1623 | 0 | auto row0 = ASSERT_RESULT(SelectRow(read_session, 0)); |
1624 | 0 | ASSERT_EQ(0, row0); |
1625 | 0 | auto row1 = SelectRow(read_session, 1); |
1626 | 0 | ASSERT_TRUE(!row1.ok() && row1.status().IsNotFound()) << row1; |
1627 | 0 | auto row2 = SelectRow(read_session, 2); |
1628 | 0 | ASSERT_TRUE(!row2.ok() && row2.status().IsNotFound()) << row2; |
1629 | 0 | } |
1630 | | |
1631 | 0 | { |
1632 | 0 | auto read_session = CreateSession(txn2); |
1633 | 0 | auto row0 = ASSERT_RESULT(SelectRow(read_session, 0)); |
1634 | 0 | ASSERT_EQ(0, row0); |
1635 | 0 | auto row1 = ASSERT_RESULT(SelectRow(read_session, 1)); |
1636 | 0 | ASSERT_EQ(1, row1); |
1637 | 0 | auto row2 = SelectRow(read_session, 2); |
1638 | 0 | ASSERT_TRUE(!row2.ok() && row2.status().IsNotFound()) << row2; |
1639 | 0 | } |
1640 | 0 | } |
1641 | | |
1642 | | class QLTransactionTestSingleTablet : |
1643 | | public TransactionCustomLogSegmentSizeTest<4_KB, QLTransactionTest> { |
1644 | | public: |
1645 | 0 | int NumTablets() override { |
1646 | 0 | return 1; |
1647 | 0 | } |
1648 | | }; |
1649 | | |
1650 | 0 | TEST_F_EX(QLTransactionTest, DeleteFlushedIntents, QLTransactionTestSingleTablet) { |
1651 | 0 | constexpr int kNumWrites = 10; |
1652 | |
|
1653 | 0 | auto session = CreateSession(); |
1654 | 0 | for (size_t idx = 0; idx != kNumWrites; ++idx) { |
1655 | 0 | auto txn = CreateTransaction(); |
1656 | 0 | session->SetTransaction(txn); |
1657 | 0 | ASSERT_OK(WriteRows(session, idx, WriteOpType::INSERT)); |
1658 | 0 | ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, tablet::FlushFlags::kIntents)); |
1659 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
1660 | 0 | } |
1661 | |
|
1662 | 0 | ASSERT_OK(WaitFor([this] { |
1663 | 0 | if (CountIntents(cluster_.get()) != 0) { |
1664 | 0 | return false; |
1665 | 0 | } |
1666 | |
|
1667 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
1668 | 0 | size_t total_sst_files = 0; |
1669 | 0 | for (auto& peer : peers) { |
1670 | 0 | auto intents_db = peer->tablet()->TEST_intents_db(); |
1671 | 0 | if (!intents_db) { |
1672 | 0 | continue; |
1673 | 0 | } |
1674 | 0 | std::vector<rocksdb::LiveFileMetaData> files; |
1675 | 0 | intents_db->GetLiveFilesMetaData(&files); |
1676 | 0 | LOG(INFO) << "T " << peer->tablet_id() << " P " << peer->permanent_uuid() << ": files: " |
1677 | 0 | << AsString(files); |
1678 | 0 | total_sst_files += files.size(); |
1679 | 0 | } |
1680 | |
|
1681 | 0 | return total_sst_files == 0; |
1682 | 0 | }, 15s, "Intents and files are removed")); |
1683 | 0 | } |
1684 | | |
1685 | | // Test performs transactional writes to get flushed intents. |
1686 | | // Then performs non transactional writes and checks that log size stabilizes, meaning |
1687 | | // log gc is working. |
1688 | 0 | TEST_F_EX(QLTransactionTest, GCLogsAfterTransactionalWritesStop, QLTransactionTestSingleTablet) { |
1689 | | // An amount of time during which we require log size to be stable. |
1690 | 0 | const MonoDelta kStableTimePeriod = 10s; |
1691 | 0 | const MonoDelta kTimeout = 30s + kStableTimePeriod; |
1692 | |
|
1693 | 0 | LOG(INFO) << "Perform transactional writes, to get non empty intents db"; |
1694 | 0 | TestThreadHolder thread_holder; |
1695 | 0 | std::atomic<bool> use_transaction(true); |
1696 | | // This thread first does transactional writes and then switches to doing non-transactional |
1697 | | // writes. |
1698 | 0 | thread_holder.AddThreadFunctor([this, &use_transaction, &stop = thread_holder.stop_flag()] { |
1699 | 0 | SetFlagOnExit set_flag_on_exit(&stop); |
1700 | 0 | auto session = CreateSession(); |
1701 | 0 | int txn_idx = 0; |
1702 | 0 | while (!stop.load(std::memory_order_acquire)) { |
1703 | 0 | YBTransactionPtr write_txn = use_transaction.load(std::memory_order_acquire) |
1704 | 0 | ? CreateTransaction() : nullptr; |
1705 | 0 | session->SetTransaction(write_txn); |
1706 | 0 | ASSERT_OK(WriteRows(session, txn_idx++)); |
1707 | 0 | if (write_txn) { |
1708 | 0 | ASSERT_OK(write_txn->CommitFuture().get()); |
1709 | 0 | } |
1710 | 0 | } |
1711 | 0 | }); |
1712 | | // Waiting for some intent SSTables to be flushed. |
1713 | 0 | bool has_flushed_intents_db = false; |
1714 | 0 | while (!has_flushed_intents_db && !thread_holder.stop_flag().load(std::memory_order_acquire)) { |
1715 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
1716 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
1717 | 0 | for (const auto& peer : peers) { |
1718 | 0 | auto* tablet = peer->tablet(); |
1719 | 0 | if (!tablet) { |
1720 | 0 | continue; |
1721 | 0 | } |
1722 | 0 | auto persistent_op_id = ASSERT_RESULT(tablet->MaxPersistentOpId()); |
1723 | 0 | if (persistent_op_id.intents.index > 10) { |
1724 | 0 | has_flushed_intents_db = true; |
1725 | 0 | break; |
1726 | 0 | } |
1727 | 0 | } |
1728 | 0 | std::this_thread::sleep_for(10ms); |
1729 | 0 | } |
1730 | | |
1731 | | // We are expecting the log size to stay bounded, which means the maximum log size we've ever |
1732 | | // seen for any tablet should stabilize. That would indicate that the bug with unbounded log |
1733 | | // growth (https://github.com/YugaByte/yugabyte-db/issues/2221) is not happening. |
1734 | 0 | LOG(INFO) << "Perform non transactional writes"; |
1735 | |
|
1736 | 0 | use_transaction.store(false, std::memory_order_release); |
1737 | 0 | uint64_t max_log_size = 0; |
1738 | 0 | auto last_log_size_increment = CoarseMonoClock::now(); |
1739 | 0 | auto deadline = last_log_size_increment + kTimeout; |
1740 | 0 | while (!thread_holder.stop_flag().load(std::memory_order_acquire)) { |
1741 | 0 | auto now = CoarseMonoClock::now(); |
1742 | 0 | ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, tablet::FlushFlags::kRegular)); |
1743 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
1744 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
1745 | 0 | for (const auto& peer : peers) { |
1746 | 0 | auto* log = peer->log(); |
1747 | 0 | if (!log) { |
1748 | 0 | continue; |
1749 | 0 | } |
1750 | 0 | uint64_t current_log_size = log->OnDiskSize(); |
1751 | 0 | if (current_log_size > max_log_size) { |
1752 | 0 | LOG(INFO) << Format("T $1 P $0: Log size increased: $2", peer->permanent_uuid(), |
1753 | 0 | peer->tablet_id(), current_log_size); |
1754 | 0 | last_log_size_increment = now; |
1755 | 0 | max_log_size = current_log_size; |
1756 | 0 | } |
1757 | 0 | } |
1758 | 0 | if (now - last_log_size_increment > kStableTimePeriod) { |
1759 | 0 | break; |
1760 | 0 | } else { |
1761 | 0 | ASSERT_LE(last_log_size_increment + kStableTimePeriod, deadline) |
1762 | 0 | << "Log size would not stabilize in " << kTimeout; |
1763 | 0 | } |
1764 | 0 | std::this_thread::sleep_for(100ms); |
1765 | 0 | } |
1766 | |
|
1767 | 0 | thread_holder.Stop(); |
1768 | 0 | } |
1769 | | |
1770 | 0 | TEST_F(QLTransactionTest, DeleteTableDuringWrite) { |
1771 | 0 | DisableApplyingIntents(); |
1772 | 0 | ASSERT_NO_FATALS(WriteData()); |
1773 | 0 | ASSERT_OK(client_->DeleteTable(table_.table()->id())); |
1774 | 0 | SetIgnoreApplyingProbability(0.0); |
1775 | 0 | ASSERT_OK(WaitFor([this] { |
1776 | 0 | return !HasTransactions(); |
1777 | 0 | }, 10s * kTimeMultiplier, "Cleanup transactions from coordinator")); |
1778 | 0 | } |
1779 | | |
1780 | | } // namespace client |
1781 | | } // namespace yb |