/Users/deen/code/yugabyte-db/src/yb/client/snapshot-txn-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <atomic> |
15 | | #include <memory> |
16 | | #include <mutex> |
17 | | #include <string> |
18 | | #include <utility> |
19 | | #include <vector> |
20 | | |
21 | | #include <boost/optional/optional_fwd.hpp> |
22 | | |
23 | | #include "yb/client/session.h" |
24 | | #include "yb/client/table.h" |
25 | | #include "yb/client/transaction.h" |
26 | | #include "yb/client/transaction_pool.h" |
27 | | #include "yb/client/txn-test-base.h" |
28 | | #include "yb/client/yb_op.h" |
29 | | |
30 | | #include "yb/common/entity_ids_types.h" |
31 | | #include "yb/common/ql_value.h" |
32 | | |
33 | | #include "yb/consensus/consensus.h" |
34 | | #include "yb/consensus/consensus.pb.h" |
35 | | |
36 | | #include "yb/docdb/consensus_frontier.h" |
37 | | |
38 | | #include "yb/gutil/casts.h" |
39 | | |
40 | | #include "yb/rocksdb/db.h" |
41 | | |
42 | | #include "yb/tablet/tablet.h" |
43 | | #include "yb/tablet/tablet_peer.h" |
44 | | #include "yb/tablet/transaction_participant.h" |
45 | | |
46 | | #include "yb/tserver/mini_tablet_server.h" |
47 | | #include "yb/tserver/tablet_server.h" |
48 | | |
49 | | #include "yb/util/debug/long_operation_tracker.h" |
50 | | #include "yb/util/enums.h" |
51 | | #include "yb/util/lockfree.h" |
52 | | #include "yb/util/opid.h" |
53 | | #include "yb/util/random_util.h" |
54 | | #include "yb/util/result.h" |
55 | | #include "yb/util/scope_exit.h" |
56 | | #include "yb/util/test_thread_holder.h" |
57 | | #include "yb/util/tsan_util.h" |
58 | | |
59 | | #include "yb/yql/cql/ql/util/errcodes.h" |
60 | | #include "yb/yql/cql/ql/util/statement_result.h" |
61 | | |
62 | | using namespace std::literals; |
63 | | |
64 | | DECLARE_bool(TEST_disallow_lmp_failures); |
65 | | DECLARE_bool(fail_on_out_of_range_clock_skew); |
66 | | DECLARE_bool(ycql_consistent_transactional_paging); |
67 | | DECLARE_int32(TEST_inject_load_transaction_delay_ms); |
68 | | DECLARE_int32(TEST_inject_mvcc_delay_add_leader_pending_ms); |
69 | | DECLARE_int32(TEST_inject_status_resolver_delay_ms); |
70 | | DECLARE_int32(log_min_seconds_to_retain); |
71 | | DECLARE_int32(txn_max_apply_batch_records); |
72 | | DECLARE_int64(transaction_rpc_timeout_ms); |
73 | | DECLARE_uint64(max_clock_skew_usec); |
74 | | DECLARE_uint64(max_transactions_in_status_request); |
75 | | DECLARE_uint64(clock_skew_force_crash_bound_usec); |
76 | | |
77 | | extern double TEST_delay_create_transaction_probability; |
78 | | |
79 | | namespace yb { |
80 | | namespace client { |
81 | | |
82 | | YB_DEFINE_ENUM(BankAccountsOption, |
83 | | (kTimeStrobe) // Perform time stobe during test. |
84 | | (kStepDown) // Perform leader step downs during test. |
85 | | (kTimeJump) // Perform time jumps during test. |
86 | | (kNetworkPartition) // Partition network during test. |
87 | | (kNoSelectRead)) // Don't use select-read for updating balance, i.e. use only |
88 | | // "update set balance = balance + delta". |
89 | | typedef EnumBitSet<BankAccountsOption> BankAccountsOptions; |
90 | | |
91 | | class SnapshotTxnTest |
92 | | : public TransactionCustomLogSegmentSizeTest<0, TransactionTestBase<MiniCluster>> { |
93 | | protected: |
94 | 15 | void SetUp() override { |
95 | 15 | SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION); |
96 | 15 | TransactionTestBase::SetUp(); |
97 | 15 | } |
98 | | |
99 | | void TestBankAccounts(BankAccountsOptions options, CoarseDuration duration, |
100 | | int minimal_updates_per_second, double select_update_probability = 0.5); |
101 | | void TestBankAccountsThread( |
102 | | int accounts, double select_update_probability, std::atomic<bool>* stop, |
103 | | std::atomic<int64_t>* updates, TransactionPool* pool); |
104 | | void TestRemoteBootstrap(); |
105 | | void TestMultiWriteWithRestart(); |
106 | | }; |
107 | | |
108 | 0 | bool TransactionalFailure(const Status& status) { |
109 | 0 | return status.IsTryAgain() || status.IsExpired() || status.IsNotFound() || status.IsTimedOut(); |
110 | 0 | } |
111 | | |
112 | | void SnapshotTxnTest::TestBankAccountsThread( |
113 | | int accounts, double select_update_probability, std::atomic<bool>* stop, |
114 | 0 | std::atomic<int64_t>* updates, TransactionPool* pool) { |
115 | 0 | auto session = CreateSession(); |
116 | 0 | YBTransactionPtr txn; |
117 | 0 | int32_t key1 = 0, key2 = 0; |
118 | 0 | while (!stop->load(std::memory_order_acquire)) { |
119 | 0 | if (!txn) { |
120 | 0 | key1 = RandomUniformInt(1, accounts); |
121 | 0 | key2 = RandomUniformInt(1, accounts - 1); |
122 | 0 | if (key2 >= key1) { |
123 | 0 | ++key2; |
124 | 0 | } |
125 | 0 | txn = ASSERT_RESULT(pool->TakeAndInit(GetIsolationLevel())); |
126 | 0 | } |
127 | 0 | session->SetTransaction(txn); |
128 | 0 | int transfer = RandomUniformInt(1, 250); |
129 | |
|
130 | 0 | auto txn_id = txn->id(); |
131 | 0 | LOG(INFO) << txn_id << " transferring (" << key1 << ") => (" << key2 << "), delta: " |
132 | 0 | << transfer; |
133 | |
|
134 | 0 | Status status; |
135 | 0 | if (RandomActWithProbability(select_update_probability)) { |
136 | 0 | auto result = SelectRow(session, key1); |
137 | 0 | int32_t balance1 = -1; |
138 | 0 | if (result.ok()) { |
139 | 0 | balance1 = *result; |
140 | 0 | result = SelectRow(session, key2); |
141 | 0 | } |
142 | 0 | if (!result.ok()) { |
143 | 0 | if (txn->IsRestartRequired()) { |
144 | 0 | ASSERT_TRUE(result.status().IsQLError()) << result; |
145 | 0 | auto txn_result = pool->TakeRestarted(txn); |
146 | 0 | if (!txn_result.ok()) { |
147 | 0 | ASSERT_TRUE(txn_result.status().IsIllegalState()) << txn_result.status(); |
148 | 0 | txn = nullptr; |
149 | 0 | } else { |
150 | 0 | txn = *txn_result; |
151 | 0 | } |
152 | 0 | continue; |
153 | 0 | } |
154 | 0 | if (result.status().IsTimedOut() || result.status().IsQLError()) { |
155 | 0 | txn = nullptr; |
156 | 0 | continue; |
157 | 0 | } |
158 | 0 | ASSERT_TRUE(result.ok()) |
159 | 0 | << Format("$0, TXN: $0, key1: $1, key2: $2", result.status(), txn->id(), key1, key2); |
160 | 0 | } |
161 | 0 | auto balance2 = *result; |
162 | 0 | status = ResultToStatus(WriteRow(session, key1, balance1 - transfer)); |
163 | 0 | if (status.ok()) { |
164 | 0 | status = ResultToStatus(WriteRow(session, key2, balance2 + transfer)); |
165 | 0 | } |
166 | 0 | } else { |
167 | 0 | status = ResultToStatus(kv_table_test::Increment( |
168 | 0 | &table_, session, key1, -transfer, Flush::kTrue)); |
169 | 0 | if (status.ok()) { |
170 | 0 | status = ResultToStatus(kv_table_test::Increment( |
171 | 0 | &table_, session, key2, transfer, Flush::kTrue)); |
172 | 0 | } |
173 | 0 | } |
174 | |
|
175 | 0 | if (status.ok()) { |
176 | 0 | status = txn->CommitFuture().get(); |
177 | 0 | } |
178 | 0 | txn = nullptr; |
179 | 0 | if (status.ok()) { |
180 | 0 | LOG(INFO) << txn_id << " transferred (" << key1 << ") => (" << key2 << "), delta: " |
181 | 0 | << transfer; |
182 | 0 | updates->fetch_add(1); |
183 | 0 | } else { |
184 | 0 | ASSERT_TRUE( |
185 | 0 | status.IsTryAgain() || status.IsExpired() || status.IsNotFound() || status.IsTimedOut() || |
186 | 0 | ql::QLError(status) == ql::ErrorCode::RESTART_REQUIRED) << status; |
187 | 0 | } |
188 | 0 | } |
189 | 0 | } |
190 | | |
191 | 0 | std::thread RandomClockSkewWalkThread(MiniCluster* cluster, std::atomic<bool>* stop) { |
192 | | // Clock skew is modified by a random amount every 100ms. |
193 | 0 | return std::thread([cluster, stop] { |
194 | 0 | const server::SkewedClock::DeltaTime upperbound = |
195 | 0 | std::chrono::microseconds(GetAtomicFlag(&FLAGS_max_clock_skew_usec)) / 2; |
196 | 0 | const auto lowerbound = -upperbound; |
197 | 0 | while (!stop->load(std::memory_order_acquire)) { |
198 | 0 | auto num_servers = cluster->num_tablet_servers(); |
199 | 0 | std::vector<server::SkewedClock::DeltaTime> time_deltas(num_servers); |
200 | |
|
201 | 0 | for (size_t i = 0; i != num_servers; ++i) { |
202 | 0 | auto* tserver = cluster->mini_tablet_server(i)->server(); |
203 | 0 | auto* hybrid_clock = down_cast<server::HybridClock*>(tserver->clock()); |
204 | 0 | auto skewed_clock = |
205 | 0 | std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock()); |
206 | 0 | auto shift = RandomUniformInt(-10, 10); |
207 | 0 | std::chrono::milliseconds change(1 << std::abs(shift)); |
208 | 0 | if (shift < 0) { |
209 | 0 | change = -change; |
210 | 0 | } |
211 | |
|
212 | 0 | time_deltas[i] += change; |
213 | 0 | time_deltas[i] = std::max(std::min(time_deltas[i], upperbound), lowerbound); |
214 | 0 | LOG(INFO) << "Set delta " << i << ": " << time_deltas[i].count(); |
215 | 0 | skewed_clock->SetDelta(time_deltas[i]); |
216 | |
|
217 | 0 | std::this_thread::sleep_for(100ms); |
218 | 0 | } |
219 | 0 | } |
220 | 0 | }); |
221 | 0 | } |
222 | | |
223 | 0 | std::thread StrobeThread(MiniCluster* cluster, std::atomic<bool>* stop) { |
224 | | // When strobe time is enabled we greatly change time delta for a short amount of time, |
225 | | // then change it back to 0. |
226 | 0 | return std::thread([cluster, stop] { |
227 | 0 | int iteration = 0; |
228 | 0 | while (!stop->load(std::memory_order_acquire)) { |
229 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
230 | 0 | auto* tserver = cluster->mini_tablet_server(i)->server(); |
231 | 0 | auto* hybrid_clock = down_cast<server::HybridClock*>(tserver->clock()); |
232 | 0 | auto skewed_clock = |
233 | 0 | std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock()); |
234 | 0 | server::SkewedClock::DeltaTime time_delta; |
235 | 0 | if (iteration & 1) { |
236 | 0 | time_delta = server::SkewedClock::DeltaTime(); |
237 | 0 | } else { |
238 | 0 | auto shift = RandomUniformInt(-16, 16); |
239 | 0 | time_delta = std::chrono::microseconds(1 << (12 + std::abs(shift))); |
240 | 0 | if (shift < 0) { |
241 | 0 | time_delta = -time_delta; |
242 | 0 | } |
243 | 0 | } |
244 | 0 | skewed_clock->SetDelta(time_delta); |
245 | 0 | std::this_thread::sleep_for(15ms); |
246 | 0 | } |
247 | 0 | } |
248 | 0 | }); |
249 | 0 | } |
250 | | |
251 | | void SnapshotTxnTest::TestBankAccounts( |
252 | | BankAccountsOptions options, CoarseDuration duration, int minimal_updates_per_second, |
253 | 0 | double select_update_probability) { |
254 | 0 | TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */); |
255 | 0 | const int kAccounts = 20; |
256 | 0 | const int kThreads = 5; |
257 | 0 | const int kInitialAmount = 10000; |
258 | |
|
259 | 0 | { |
260 | 0 | auto txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel())); |
261 | 0 | LOG(INFO) << "Initial write transaction: " << txn->id(); |
262 | 0 | auto init_session = CreateSession(txn); |
263 | 0 | for (int i = 1; i <= kAccounts; ++i) { |
264 | 0 | ASSERT_OK(WriteRow(init_session, i, kInitialAmount)); |
265 | 0 | } |
266 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
267 | 0 | } |
268 | |
|
269 | 0 | TestThreadHolder threads; |
270 | 0 | if (options.Test(BankAccountsOption::kTimeStrobe)) { |
271 | 0 | threads.AddThread(StrobeThread(cluster_.get(), &threads.stop_flag())); |
272 | 0 | } |
273 | |
|
274 | 0 | if (options.Test(BankAccountsOption::kNetworkPartition)) { |
275 | 0 | threads.AddThreadFunctor([cluster = cluster_.get(), &stop = threads.stop_flag()]() { |
276 | 0 | auto num_tservers = cluster->num_tablet_servers(); |
277 | 0 | while (!stop.load(std::memory_order_acquire)) { |
278 | 0 | auto partitioned = RandomUniformInt<size_t>(0, num_tservers - 1); |
279 | 0 | for (auto connectivity : {Connectivity::kOff, Connectivity::kOn}) { |
280 | 0 | for (size_t i = 0; i != num_tservers; ++i) { |
281 | 0 | if (i == partitioned) { |
282 | 0 | continue; |
283 | 0 | } |
284 | 0 | ASSERT_OK(SetupConnectivity(cluster, i, partitioned, connectivity)); |
285 | 0 | } |
286 | 0 | std::this_thread::sleep_for(connectivity == Connectivity::kOff ? 10s : 30s); |
287 | 0 | } |
288 | 0 | } |
289 | 0 | }); |
290 | 0 | } |
291 | |
|
292 | 0 | std::atomic<int64_t> updates(0); |
293 | 0 | auto se = ScopeExit( |
294 | 0 | [&threads, &updates, duration, minimal_updates_per_second] { |
295 | 0 | threads.Stop(); |
296 | |
|
297 | 0 | LOG(INFO) << "Total updates: " << updates.load(std::memory_order_acquire); |
298 | 0 | ASSERT_GT(updates.load(std::memory_order_acquire), |
299 | 0 | minimal_updates_per_second * duration / 1s); |
300 | 0 | }); |
301 | |
|
302 | 0 | for (int i = 0; i != kThreads; ++i) { |
303 | 0 | threads.AddThreadFunctor(std::bind( |
304 | 0 | &SnapshotTxnTest::TestBankAccountsThread, this, kAccounts, select_update_probability, |
305 | 0 | &threads.stop_flag(), &updates, &pool)); |
306 | 0 | } |
307 | |
|
308 | 0 | auto end_time = CoarseMonoClock::now() + duration; |
309 | |
|
310 | 0 | if (options.Test(BankAccountsOption::kTimeJump)) { |
311 | 0 | auto* tserver = cluster_->mini_tablet_server(0)->server(); |
312 | 0 | auto* hybrid_clock = down_cast<server::HybridClock*>(tserver->clock()); |
313 | 0 | auto skewed_clock = |
314 | 0 | std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock()); |
315 | 0 | auto old_delta = skewed_clock->SetDelta(duration); |
316 | 0 | std::this_thread::sleep_for(1s); |
317 | 0 | skewed_clock->SetDelta(old_delta); |
318 | 0 | } |
319 | |
|
320 | 0 | auto session = CreateSession(); |
321 | 0 | YBTransactionPtr txn; |
322 | 0 | while (CoarseMonoClock::now() < end_time && |
323 | 0 | !threads.stop_flag().load(std::memory_order_acquire)) { |
324 | 0 | if (!txn) { |
325 | 0 | txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel())); |
326 | 0 | } |
327 | 0 | auto txn_id = txn->id(); |
328 | 0 | session->SetTransaction(txn); |
329 | 0 | auto rows = SelectAllRows(session); |
330 | 0 | if (!rows.ok()) { |
331 | 0 | if (txn->IsRestartRequired()) { |
332 | 0 | auto txn_result = pool.TakeRestarted(txn); |
333 | 0 | if (!txn_result.ok()) { |
334 | 0 | ASSERT_TRUE(txn_result.status().IsIllegalState()) << txn_result.status(); |
335 | 0 | txn = nullptr; |
336 | 0 | } else { |
337 | 0 | txn = *txn_result; |
338 | 0 | } |
339 | 0 | } else { |
340 | 0 | txn = nullptr; |
341 | 0 | } |
342 | 0 | continue; |
343 | 0 | } |
344 | 0 | txn = nullptr; |
345 | 0 | int sum_balance = 0; |
346 | 0 | for (const auto& pair : *rows) { |
347 | 0 | sum_balance += pair.second; |
348 | 0 | } |
349 | 0 | LOG(INFO) << txn_id << ", read done, values: " << AsString(*rows); |
350 | 0 | ASSERT_EQ(sum_balance, kAccounts * kInitialAmount); |
351 | |
|
352 | 0 | if (options.Test(BankAccountsOption::kStepDown)) { |
353 | 0 | StepDownRandomTablet(cluster_.get()); |
354 | 0 | } |
355 | 0 | } |
356 | 0 | } |
357 | | |
358 | 0 | TEST_F(SnapshotTxnTest, BankAccounts) { |
359 | 0 | FLAGS_TEST_disallow_lmp_failures = true; |
360 | 0 | TestBankAccounts({}, 30s, RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */); |
361 | 0 | } |
362 | | |
363 | 0 | TEST_F(SnapshotTxnTest, BankAccountsPartitioned) { |
364 | 0 | TestBankAccounts( |
365 | 0 | BankAccountsOptions{BankAccountsOption::kNetworkPartition}, 150s, |
366 | 0 | RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */); |
367 | 0 | } |
368 | | |
369 | 0 | TEST_F(SnapshotTxnTest, BankAccountsWithTimeStrobe) { |
370 | 0 | FLAGS_fail_on_out_of_range_clock_skew = false; |
371 | 0 | FLAGS_clock_skew_force_crash_bound_usec = 0; |
372 | |
|
373 | 0 | TestBankAccounts( |
374 | 0 | BankAccountsOptions{BankAccountsOption::kTimeStrobe}, 300s, |
375 | 0 | RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */); |
376 | 0 | } |
377 | | |
378 | 0 | TEST_F(SnapshotTxnTest, BankAccountsWithTimeJump) { |
379 | 0 | FLAGS_fail_on_out_of_range_clock_skew = false; |
380 | |
|
381 | 0 | TestBankAccounts( |
382 | 0 | BankAccountsOptions{BankAccountsOption::kTimeJump, BankAccountsOption::kStepDown}, 30s, |
383 | 0 | RegularBuildVsSanitizers(3, 1) /* minimal_updates_per_second */); |
384 | 0 | } |
385 | | |
386 | 0 | TEST_F(SnapshotTxnTest, BankAccountsDelayCreate) { |
387 | 0 | FLAGS_transaction_rpc_timeout_ms = 500 * kTimeMultiplier; |
388 | 0 | TEST_delay_create_transaction_probability = 0.5; |
389 | |
|
390 | 0 | TestBankAccounts({}, 30s, RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */, |
391 | 0 | 0.0 /* select_update_probability */); |
392 | 0 | } |
393 | | |
394 | 0 | TEST_F(SnapshotTxnTest, BankAccountsDelayAddLeaderPending) { |
395 | 0 | FLAGS_TEST_disallow_lmp_failures = true; |
396 | 0 | FLAGS_TEST_inject_mvcc_delay_add_leader_pending_ms = 20; |
397 | 0 | TestBankAccounts({}, 30s, RegularBuildVsSanitizers(5, 1) /* minimal_updates_per_second */); |
398 | 0 | } |
399 | | |
400 | | struct PagingReadCounts { |
401 | | int good = 0; |
402 | | int failed = 0; |
403 | | int inconsistent = 0; |
404 | | int timed_out = 0; |
405 | | |
406 | 0 | std::string ToString() const { |
407 | 0 | return Format("{ good: $0 failed: $1 inconsistent: $2 timed_out: $3 }", |
408 | 0 | good, failed, inconsistent, timed_out); |
409 | 0 | } |
410 | | |
411 | 0 | PagingReadCounts& operator+=(const PagingReadCounts& rhs) { |
412 | 0 | good += rhs.good; |
413 | 0 | failed += rhs.failed; |
414 | 0 | inconsistent += rhs.inconsistent; |
415 | 0 | timed_out += rhs.timed_out; |
416 | 0 | return *this; |
417 | 0 | } |
418 | | }; |
419 | | |
420 | | class SingleTabletSnapshotTxnTest : public SnapshotTxnTest { |
421 | | protected: |
422 | 0 | int NumTablets() { |
423 | 0 | return 1; |
424 | 0 | } |
425 | | |
426 | | Result<PagingReadCounts> TestPaging(); |
427 | | }; |
428 | | |
429 | | // Test reading from a transactional table using paging. |
430 | | // Writes values in one thread, and reads them using paging in another thread. |
431 | | // |
432 | | // Clock skew is randomized, so we expect failures because of that. |
433 | | // When ycql_consistent_transactional_paging is true we expect read restart failures. |
434 | | // And we expect missing values when ycql_consistent_transactional_paging is false. |
435 | 0 | Result<PagingReadCounts> SingleTabletSnapshotTxnTest::TestPaging() { |
436 | 0 | constexpr int kReadThreads = 4; |
437 | 0 | constexpr int kWriteThreads = 4; |
438 | | |
439 | | // Writer with index j writes keys starting with j * kWriterMul + 1 |
440 | 0 | constexpr int kWriterMul = 100000; |
441 | |
|
442 | 0 | std::array<std::atomic<int32_t>, kWriteThreads> last_written_values; |
443 | 0 | for (auto& value : last_written_values) { |
444 | 0 | value.store(0, std::memory_order_release); |
445 | 0 | } |
446 | |
|
447 | 0 | TestThreadHolder thread_holder; |
448 | |
|
449 | 0 | for (int j = 0; j != kWriteThreads; ++j) { |
450 | 0 | thread_holder.AddThreadFunctor( |
451 | 0 | [this, j, &stop = thread_holder.stop_flag(), &last_written = last_written_values[j]] { |
452 | 0 | auto session = CreateSession(); |
453 | 0 | int i = 1; |
454 | 0 | int base = j * kWriterMul; |
455 | 0 | while (!stop.load(std::memory_order_acquire)) { |
456 | 0 | auto txn = CreateTransaction2(); |
457 | 0 | session->SetTransaction(txn); |
458 | 0 | ASSERT_OK(WriteRow(session, base + i, -(base + i))); |
459 | 0 | auto commit_status = txn->CommitFuture().get(); |
460 | 0 | if (!commit_status.ok()) { |
461 | | // That could happen because of time jumps. |
462 | 0 | ASSERT_TRUE(commit_status.IsExpired()) << commit_status; |
463 | 0 | continue; |
464 | 0 | } |
465 | 0 | last_written.store(i, std::memory_order_release); |
466 | 0 | ++i; |
467 | 0 | } |
468 | 0 | }); |
469 | 0 | } |
470 | |
|
471 | 0 | thread_holder.AddThread(RandomClockSkewWalkThread(cluster_.get(), &thread_holder.stop_flag())); |
472 | |
|
473 | 0 | std::vector<PagingReadCounts> per_thread_counts(kReadThreads); |
474 | |
|
475 | 0 | for (int i = 0; i != kReadThreads; ++i) { |
476 | 0 | thread_holder.AddThreadFunctor([ |
477 | 0 | this, &stop = thread_holder.stop_flag(), &last_written_values, |
478 | 0 | &counts = per_thread_counts[i]] { |
479 | 0 | auto session = CreateSession(nullptr /* transaction */, clock_); |
480 | 0 | while (!stop.load(std::memory_order_acquire)) { |
481 | 0 | std::vector<int32_t> keys; |
482 | 0 | QLPagingStatePB paging_state; |
483 | 0 | std::array<int32_t, kWriteThreads> written_value; |
484 | 0 | int32_t total_values = 0; |
485 | 0 | for (int j = 0; j != kWriteThreads; ++j) { |
486 | 0 | written_value[j] = last_written_values[j].load(std::memory_order_acquire); |
487 | 0 | total_values += written_value[j]; |
488 | 0 | } |
489 | 0 | bool failed = false; |
490 | 0 | session->SetReadPoint(client::Restart::kFalse); |
491 | 0 | session->SetForceConsistentRead(ForceConsistentRead::kFalse); |
492 | |
|
493 | 0 | for (;;) { |
494 | 0 | const YBqlReadOpPtr op = table_.NewReadOp(); |
495 | 0 | auto* const req = op->mutable_request(); |
496 | 0 | table_.AddColumns(table_.AllColumnNames(), req); |
497 | 0 | req->set_limit(total_values / 2 + 10); |
498 | 0 | req->set_return_paging_state(true); |
499 | 0 | if (paging_state.has_table_id()) { |
500 | 0 | if (paging_state.has_read_time()) { |
501 | 0 | ReadHybridTime read_time = ReadHybridTime::FromPB(paging_state.read_time()); |
502 | 0 | if (read_time) { |
503 | 0 | session->SetReadPoint(read_time); |
504 | 0 | } |
505 | 0 | } |
506 | 0 | session->SetForceConsistentRead(ForceConsistentRead::kTrue); |
507 | 0 | *req->mutable_paging_state() = std::move(paging_state); |
508 | 0 | } |
509 | 0 | auto flush_status = session->ApplyAndFlush(op); |
510 | |
|
511 | 0 | if (!flush_status.ok() || !op->succeeded()) { |
512 | 0 | if (flush_status.IsTimedOut()) { |
513 | 0 | ++counts.timed_out; |
514 | 0 | } else { |
515 | 0 | ++counts.failed; |
516 | 0 | } |
517 | 0 | failed = true; |
518 | 0 | break; |
519 | 0 | } |
520 | | |
521 | 0 | auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock(); |
522 | 0 | for (const auto& row : rowblock->rows()) { |
523 | 0 | auto key = row.column(0).int32_value(); |
524 | 0 | ASSERT_EQ(key, -row.column(1).int32_value()); |
525 | 0 | keys.push_back(key); |
526 | 0 | } |
527 | 0 | if (!op->response().has_paging_state()) { |
528 | 0 | break; |
529 | 0 | } |
530 | 0 | paging_state = op->response().paging_state(); |
531 | 0 | } |
532 | |
|
533 | 0 | if (failed) { |
534 | 0 | continue; |
535 | 0 | } |
536 | | |
537 | 0 | std::sort(keys.begin(), keys.end()); |
538 | | |
539 | | // Check that there are no duplicates. |
540 | 0 | ASSERT_TRUE(std::unique(keys.begin(), keys.end()) == keys.end()); |
541 | |
|
542 | 0 | bool good = true; |
543 | 0 | size_t idx = 0; |
544 | 0 | for (int j = 0; j != kWriteThreads; ++j) { |
545 | | // If current writer did not write anything, then check is done. |
546 | 0 | if (written_value[j] == 0) { |
547 | 0 | continue; |
548 | 0 | } |
549 | | |
550 | | // Writer with index j writes the following keys: |
551 | | // j * kWriteMul + 1, j * kWriteMul + 2, ..., j * kWriteMul + written_value[j] |
552 | 0 | int32_t base = j * kWriterMul; |
553 | | // Find first key related to the current writer. |
554 | 0 | while (idx < keys.size() && keys[idx] < base) { |
555 | 0 | ++idx; |
556 | 0 | } |
557 | | // Since we sorted keys and removed duplicates we could just check first and last |
558 | | // entry of interval for current writer. |
559 | 0 | size_t last_idx = idx + written_value[j] - 1; |
560 | 0 | if (keys[idx] != base + 1 || last_idx >= keys.size() || |
561 | 0 | keys[last_idx] != base + written_value[j]) { |
562 | 0 | LOG(INFO) << "Inconsistency, written values: " << yb::ToString(written_value) |
563 | 0 | << ", keys: " << yb::ToString(keys); |
564 | 0 | good = false; |
565 | 0 | break; |
566 | 0 | } |
567 | 0 | idx = last_idx + 1; |
568 | 0 | } |
569 | 0 | if (good) { |
570 | 0 | ++counts.good; |
571 | 0 | } else { |
572 | 0 | ++counts.inconsistent; |
573 | 0 | } |
574 | 0 | } |
575 | 0 | }); |
576 | 0 | } |
577 | |
|
578 | 0 | thread_holder.WaitAndStop(120s); |
579 | |
|
580 | 0 | int32_t total_values = 0; |
581 | 0 | for (auto& value : last_written_values) { |
582 | 0 | total_values += value.load(std::memory_order_acquire); |
583 | 0 | } |
584 | |
|
585 | 0 | EXPECT_GE(total_values, RegularBuildVsSanitizers(1000, 100)); |
586 | |
|
587 | 0 | PagingReadCounts counts; |
588 | |
|
589 | 0 | for(const auto& entry : per_thread_counts) { |
590 | 0 | counts += entry; |
591 | 0 | } |
592 | |
|
593 | 0 | LOG(INFO) << "Read counts: " << counts.ToString(); |
594 | 0 | return counts; |
595 | 0 | } |
596 | | |
597 | | constexpr auto kExpectedMinCount = RegularBuildVsSanitizers(20, 1); |
598 | | |
599 | 0 | TEST_F_EX(SnapshotTxnTest, Paging, SingleTabletSnapshotTxnTest) { |
600 | 0 | FLAGS_ycql_consistent_transactional_paging = true; |
601 | |
|
602 | 0 | auto counts = ASSERT_RESULT(TestPaging()); |
603 | |
|
604 | 0 | EXPECT_GE(counts.good, kExpectedMinCount); |
605 | 0 | EXPECT_GE(counts.failed, kExpectedMinCount); |
606 | 0 | EXPECT_EQ(counts.inconsistent, 0); |
607 | 0 | } |
608 | | |
609 | 0 | TEST_F_EX(SnapshotTxnTest, InconsistentPaging, SingleTabletSnapshotTxnTest) { |
610 | 0 | FLAGS_ycql_consistent_transactional_paging = false; |
611 | |
|
612 | 0 | auto counts = ASSERT_RESULT(TestPaging()); |
613 | |
|
614 | 0 | EXPECT_GE(counts.good, kExpectedMinCount); |
615 | | // We need high operation rate to catch inconsistency, so doing this check only in release mode. |
616 | 0 | if (!IsSanitizer()) { |
617 | 0 | EXPECT_GE(counts.inconsistent, 1); |
618 | 0 | } |
619 | 0 | EXPECT_EQ(counts.failed, 0); |
620 | 0 | } |
621 | | |
622 | 0 | TEST_F(SnapshotTxnTest, HotRow) { |
623 | 0 | constexpr int kBlockSize = RegularBuildVsSanitizers(1000, 100); |
624 | 0 | constexpr int kNumBlocks = 10; |
625 | 0 | constexpr int kIterations = kBlockSize * kNumBlocks; |
626 | 0 | constexpr int kKey = 42; |
627 | |
|
628 | 0 | MonoDelta block_time; |
629 | 0 | TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */); |
630 | 0 | auto session = CreateSession(); |
631 | 0 | MonoTime start = MonoTime::Now(); |
632 | 0 | for (int i = 1; i <= kIterations; ++i) { |
633 | 0 | auto txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel())); |
634 | 0 | session->SetTransaction(txn); |
635 | |
|
636 | 0 | ASSERT_OK(kv_table_test::Increment(&table_, session, kKey)); |
637 | 0 | ASSERT_OK(session->Flush()); |
638 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
639 | 0 | if (i % kBlockSize == 0) { |
640 | 0 | auto now = MonoTime::Now(); |
641 | 0 | auto passed = now - start; |
642 | 0 | start = now; |
643 | |
|
644 | 0 | LOG(INFO) << "Written: " << i << " for " << passed; |
645 | 0 | if (block_time) { |
646 | 0 | ASSERT_LE(passed, block_time * 2); |
647 | 0 | } else { |
648 | 0 | block_time = passed; |
649 | 0 | } |
650 | 0 | } |
651 | 0 | } |
652 | 0 | } |
653 | | |
654 | | struct KeyToCheck { |
655 | | int value; |
656 | | TransactionId txn_id; |
657 | | KeyToCheck* next = nullptr; |
658 | | |
659 | 0 | explicit KeyToCheck(int value_, const TransactionId& txn_id_) : value(value_), txn_id(txn_id_) {} |
660 | | |
661 | 0 | friend void SetNext(KeyToCheck* key_to_check, KeyToCheck* next) { |
662 | 0 | key_to_check->next = next; |
663 | 0 | } |
664 | | |
665 | 0 | friend KeyToCheck* GetNext(KeyToCheck* key_to_check) { |
666 | 0 | return key_to_check->next; |
667 | 0 | } |
668 | | }; |
669 | | |
670 | 0 | bool IntermittentTxnFailure(const Status& status) { |
671 | 0 | static const std::vector<std::string> kAllowedMessages = { |
672 | 0 | "Commit of expired transaction"s, |
673 | 0 | "Leader does not have a valid lease"s, |
674 | 0 | "Network error"s, |
675 | 0 | "Not the leader"s, |
676 | 0 | "Service is shutting down"s, |
677 | 0 | "Timed out"s, |
678 | 0 | "Transaction aborted"s, |
679 | 0 | "expired or aborted by a conflict"s, |
680 | 0 | "Transaction metadata missing"s, |
681 | 0 | "Unknown transaction, could be recently aborted"s, |
682 | 0 | "Transaction was recently aborted"s, |
683 | 0 | }; |
684 | 0 | auto msg = status.ToString(); |
685 | 0 | for (const auto& allowed : kAllowedMessages) { |
686 | 0 | if (msg.find(allowed) != std::string::npos) { |
687 | 0 | return true; |
688 | 0 | } |
689 | 0 | } |
690 | |
|
691 | 0 | return false; |
692 | 0 | } |
693 | | |
694 | | // Concurrently execute multiple transaction, each of them writes the same key multiple times. |
695 | | // And perform tserver restarts in parallel to it. |
696 | | // This test checks that transaction participant state correctly restored after restart. |
697 | 0 | void SnapshotTxnTest::TestMultiWriteWithRestart() { |
698 | 0 | constexpr int kNumWritesPerKey = 10; |
699 | |
|
700 | 0 | FLAGS_TEST_inject_load_transaction_delay_ms = 25; |
701 | |
|
702 | 0 | TestThreadHolder thread_holder; |
703 | |
|
704 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] { |
705 | 0 | auto se = ScopeExit([] { |
706 | 0 | LOG(INFO) << "Restarts done"; |
707 | 0 | }); |
708 | 0 | int ts_idx_to_restart = 0; |
709 | 0 | while (!stop.load(std::memory_order_acquire)) { |
710 | 0 | std::this_thread::sleep_for(5s); |
711 | 0 | ts_idx_to_restart = (ts_idx_to_restart + 1) % cluster_->num_tablet_servers(); |
712 | 0 | LongOperationTracker long_operation_tracker("Restart", 20s); |
713 | 0 | ASSERT_OK(cluster_->mini_tablet_server(ts_idx_to_restart)->Restart()); |
714 | 0 | } |
715 | 0 | }); |
716 | |
|
717 | 0 | MPSCQueue<KeyToCheck> keys_to_check; |
718 | 0 | TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */); |
719 | 0 | std::atomic<int> key(0); |
720 | 0 | std::atomic<int> good_keys(0); |
721 | 0 | for (int i = 0; i != 25; ++i) { |
722 | 0 | thread_holder.AddThreadFunctor( |
723 | 0 | [this, &stop = thread_holder.stop_flag(), &pool, &key, &keys_to_check, &good_keys] { |
724 | 0 | auto se = ScopeExit([] { |
725 | 0 | LOG(INFO) << "Write done"; |
726 | 0 | }); |
727 | 0 | auto session = CreateSession(); |
728 | 0 | while (!stop.load(std::memory_order_acquire)) { |
729 | 0 | int k = key.fetch_add(1, std::memory_order_acq_rel); |
730 | 0 | auto txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel())); |
731 | 0 | session->SetTransaction(txn); |
732 | 0 | bool good = true; |
733 | 0 | for (int j = 1; j <= kNumWritesPerKey; ++j) { |
734 | 0 | if (j > 1) { |
735 | 0 | std::this_thread::sleep_for(100ms); |
736 | 0 | } |
737 | 0 | auto write_result = WriteRow(session, k, j); |
738 | 0 | if (!write_result.ok()) { |
739 | 0 | ASSERT_TRUE(IntermittentTxnFailure(write_result.status())) << write_result.status(); |
740 | 0 | good = false; |
741 | 0 | break; |
742 | 0 | } |
743 | 0 | } |
744 | 0 | if (!good) { |
745 | 0 | continue; |
746 | 0 | } |
747 | 0 | auto commit_status = txn->CommitFuture().get(); |
748 | 0 | if (!commit_status.ok()) { |
749 | 0 | ASSERT_TRUE(IntermittentTxnFailure(commit_status)) << commit_status; |
750 | 0 | } else { |
751 | 0 | keys_to_check.Push(new KeyToCheck(k, txn->id())); |
752 | 0 | good_keys.fetch_add(1, std::memory_order_acq_rel); |
753 | 0 | } |
754 | 0 | } |
755 | 0 | }); |
756 | 0 | } |
757 | |
|
758 | 0 | thread_holder.AddThreadFunctor( |
759 | 0 | [this, &stop = thread_holder.stop_flag(), &keys_to_check, kNumWritesPerKey] { |
760 | 0 | auto se = ScopeExit([] { |
761 | 0 | LOG(INFO) << "Read done"; |
762 | 0 | }); |
763 | 0 | auto session = CreateSession(); |
764 | 0 | for (;;) { |
765 | 0 | std::unique_ptr<KeyToCheck> key(keys_to_check.Pop()); |
766 | 0 | if (key == nullptr) { |
767 | 0 | if (stop.load(std::memory_order_acquire)) { |
768 | 0 | break; |
769 | 0 | } |
770 | 0 | std::this_thread::sleep_for(10ms); |
771 | 0 | continue; |
772 | 0 | } |
773 | 0 | SCOPED_TRACE(Format("Reading $0, written with: $1", key->value, key->txn_id)); |
774 | 0 | YBqlReadOpPtr op; |
775 | 0 | for (;;) { |
776 | 0 | op = ReadRow(session, key->value); |
777 | 0 | auto flush_result = session->Flush(); |
778 | 0 | if (flush_result.ok()) { |
779 | 0 | if (op->succeeded()) { |
780 | 0 | break; |
781 | 0 | } |
782 | 0 | if (op->response().error_message().find("timed out after") == std::string::npos) { |
783 | 0 | ASSERT_TRUE(op->succeeded()) << "Read failed: " << op->response().ShortDebugString(); |
784 | 0 | } |
785 | 0 | } |
786 | 0 | } |
787 | 0 | auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock(); |
788 | 0 | ASSERT_EQ(rowblock->row_count(), 1); |
789 | 0 | const auto& first_column = rowblock->row(0).column(0); |
790 | 0 | ASSERT_EQ(InternalType::kInt32Value, first_column.type()); |
791 | 0 | ASSERT_EQ(first_column.int32_value(), kNumWritesPerKey); |
792 | 0 | } |
793 | 0 | }); |
794 | |
|
795 | 0 | LOG(INFO) << "Running"; |
796 | 0 | thread_holder.WaitAndStop(60s); |
797 | |
|
798 | 0 | LOG(INFO) << "Stopped"; |
799 | |
|
800 | 0 | for (;;) { |
801 | 0 | std::unique_ptr<KeyToCheck> key(keys_to_check.Pop()); |
802 | 0 | if (key == nullptr) { |
803 | 0 | break; |
804 | 0 | } |
805 | 0 | } |
806 | |
|
807 | 0 | ASSERT_GE(good_keys.load(std::memory_order_relaxed), key.load(std::memory_order_relaxed) * 0.7); |
808 | |
|
809 | 0 | LOG(INFO) << "Done"; |
810 | 0 | } |
811 | | |
812 | 0 | TEST_F(SnapshotTxnTest, MultiWriteWithRestart) { |
813 | 0 | TestMultiWriteWithRestart(); |
814 | 0 | } |
815 | | |
816 | 0 | TEST_F(SnapshotTxnTest, MultiWriteWithRestartAndLongApply) { |
817 | 0 | FLAGS_txn_max_apply_batch_records = 3; |
818 | 0 | TestMultiWriteWithRestart(); |
819 | 0 | } |
820 | | |
821 | | using RemoteBootstrapOnStartBase = TransactionCustomLogSegmentSizeTest<128, SnapshotTxnTest>; |
822 | | |
823 | 0 | void SnapshotTxnTest::TestRemoteBootstrap() { |
824 | 0 | constexpr int kTransactionsCount = RegularBuildVsSanitizers(100, 10); |
825 | 0 | FLAGS_log_min_seconds_to_retain = 1; |
826 | 0 | DisableTransactionTimeout(); |
827 | |
|
828 | 0 | for (int iteration = 0; iteration != 4; ++iteration) { |
829 | 0 | DisableApplyingIntents(); |
830 | |
|
831 | 0 | TestThreadHolder thread_holder; |
832 | |
|
833 | 0 | std::atomic<int> transactions(0); |
834 | |
|
835 | 0 | thread_holder.AddThreadFunctor( |
836 | 0 | [this, &stop = thread_holder.stop_flag(), &transactions] { |
837 | 0 | auto session = CreateSession(); |
838 | 0 | for (int transaction_idx = 0; !stop.load(std::memory_order_acquire); ++transaction_idx) { |
839 | 0 | auto txn = CreateTransaction(); |
840 | 0 | session->SetTransaction(txn); |
841 | 0 | if (WriteRows(session, transaction_idx).ok() && txn->CommitFuture().get().ok()) { |
842 | 0 | transactions.fetch_add(1); |
843 | 0 | } |
844 | 0 | } |
845 | 0 | }); |
846 | |
|
847 | 0 | ASSERT_OK(thread_holder.WaitCondition([&transactions] { |
848 | 0 | return transactions.load(std::memory_order_acquire) >= kTransactionsCount; |
849 | 0 | })); |
850 | |
|
851 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
852 | |
|
853 | 0 | SetIgnoreApplyingProbability(0.0); |
854 | |
|
855 | 0 | std::this_thread::sleep_for(FLAGS_log_min_seconds_to_retain * 1s); |
856 | |
|
857 | 0 | auto start_transactions = transactions.load(std::memory_order_acquire); |
858 | 0 | ASSERT_OK(thread_holder.WaitCondition([&transactions, start_transactions] { |
859 | 0 | return transactions.load(std::memory_order_acquire) >= |
860 | 0 | start_transactions + kTransactionsCount; |
861 | 0 | })); |
862 | |
|
863 | 0 | thread_holder.Stop(); |
864 | |
|
865 | 0 | LOG(INFO) << "Flushing"; |
866 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
867 | |
|
868 | 0 | LOG(INFO) << "Clean logs"; |
869 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
870 | | |
871 | | // Shutdown to reset cached logs. |
872 | 0 | for (size_t i = 1; i != cluster_->num_tablet_servers(); ++i) { |
873 | 0 | cluster_->mini_tablet_server(i)->Shutdown(); |
874 | 0 | } |
875 | | |
876 | | // Start all servers. Cluster verifier should check that all tablets are synchronized. |
877 | 0 | for (auto i = cluster_->num_tablet_servers(); i > 0;) { |
878 | 0 | --i; |
879 | 0 | ASSERT_OK(cluster_->mini_tablet_server(i)->Start()); |
880 | 0 | } |
881 | |
|
882 | 0 | ASSERT_OK(WaitFor([this] { return CheckAllTabletsRunning(); }, 20s * kTimeMultiplier, |
883 | 0 | "All tablets running")); |
884 | 0 | } |
885 | 0 | } |
886 | | |
887 | 0 | TEST_F_EX(SnapshotTxnTest, RemoteBootstrapOnStart, RemoteBootstrapOnStartBase) { |
888 | 0 | TestRemoteBootstrap(); |
889 | 0 | } |
890 | | |
891 | 0 | TEST_F_EX(SnapshotTxnTest, TruncateDuringShutdown, RemoteBootstrapOnStartBase) { |
892 | 0 | FLAGS_TEST_inject_load_transaction_delay_ms = 50; |
893 | |
|
894 | 0 | constexpr int kTransactionsCount = RegularBuildVsSanitizers(20, 5); |
895 | 0 | FLAGS_log_min_seconds_to_retain = 1; |
896 | 0 | DisableTransactionTimeout(); |
897 | |
|
898 | 0 | DisableApplyingIntents(); |
899 | |
|
900 | 0 | TestThreadHolder thread_holder; |
901 | |
|
902 | 0 | std::atomic<int> transactions(0); |
903 | |
|
904 | 0 | thread_holder.AddThreadFunctor( |
905 | 0 | [this, &stop = thread_holder.stop_flag(), &transactions] { |
906 | 0 | auto session = CreateSession(); |
907 | 0 | for (int transaction_idx = 0; !stop.load(std::memory_order_acquire); ++transaction_idx) { |
908 | 0 | auto txn = CreateTransaction(); |
909 | 0 | session->SetTransaction(txn); |
910 | 0 | if (WriteRows(session, transaction_idx).ok() && txn->CommitFuture().get().ok()) { |
911 | 0 | transactions.fetch_add(1); |
912 | 0 | } |
913 | 0 | } |
914 | 0 | }); |
915 | |
|
916 | 0 | while (transactions.load(std::memory_order_acquire) < kTransactionsCount) { |
917 | 0 | std::this_thread::sleep_for(100ms); |
918 | 0 | } |
919 | |
|
920 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
921 | |
|
922 | 0 | thread_holder.Stop(); |
923 | |
|
924 | 0 | ASSERT_OK(client_->TruncateTable(table_.table()->id())); |
925 | |
|
926 | 0 | ASSERT_OK(cluster_->mini_tablet_server(0)->Start()); |
927 | |
|
928 | 0 | ASSERT_OK(WaitFor([this] { return CheckAllTabletsRunning(); }, 20s * kTimeMultiplier, |
929 | 0 | "All tablets running")); |
930 | 0 | } |
931 | | |
932 | 0 | TEST_F_EX(SnapshotTxnTest, ResolveIntents, SingleTabletSnapshotTxnTest) { |
933 | 0 | SetIgnoreApplyingProbability(0.5); |
934 | |
|
935 | 0 | TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */); |
936 | 0 | auto session = CreateSession(); |
937 | 0 | auto prev_ht = clock_->Now(); |
938 | 0 | for (int i = 0; i != 4; ++i) { |
939 | 0 | auto txn = ASSERT_RESULT(pool.TakeAndInit(isolation_level_)); |
940 | 0 | session->SetTransaction(txn); |
941 | 0 | ASSERT_OK(WriteRow(session, i, -i)); |
942 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
943 | |
|
944 | 0 | auto peers = ListTabletPeers( |
945 | 0 | cluster_.get(), [](const std::shared_ptr<tablet::TabletPeer>& peer) { |
946 | 0 | if (peer->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
947 | 0 | return false; |
948 | 0 | } |
949 | 0 | return peer->consensus()->GetLeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; |
950 | 0 | }); |
951 | 0 | ASSERT_EQ(peers.size(), 1); |
952 | 0 | auto peer = peers[0]; |
953 | 0 | auto tablet = peer->tablet(); |
954 | 0 | ASSERT_OK(tablet->transaction_participant()->ResolveIntents( |
955 | 0 | peer->clock().Now(), CoarseTimePoint::max())); |
956 | 0 | auto current_ht = clock_->Now(); |
957 | 0 | ASSERT_OK(tablet->Flush(tablet::FlushMode::kSync)); |
958 | 0 | bool found = false; |
959 | 0 | auto files = tablet->TEST_db()->GetLiveFilesMetaData(); |
960 | 0 | for (const auto& meta : files) { |
961 | 0 | auto min_ht = down_cast<docdb::ConsensusFrontier&>( |
962 | 0 | *meta.smallest.user_frontier).hybrid_time(); |
963 | 0 | auto max_ht = down_cast<docdb::ConsensusFrontier&>( |
964 | 0 | *meta.largest.user_frontier).hybrid_time(); |
965 | 0 | if (min_ht > prev_ht && max_ht < current_ht) { |
966 | 0 | found = true; |
967 | 0 | break; |
968 | 0 | } |
969 | 0 | } |
970 | |
|
971 | 0 | ASSERT_TRUE(found) << "Cannot find SST file that fits into " << prev_ht << " - " << current_ht |
972 | 0 | << " range, files: " << AsString(files); |
973 | | |
974 | 0 | prev_ht = current_ht; |
975 | 0 | } |
976 | 0 | } |
977 | | |
978 | 0 | TEST_F(SnapshotTxnTest, DeleteOnLoad) { |
979 | 0 | constexpr int kTransactions = 400; |
980 | |
|
981 | 0 | FLAGS_TEST_inject_status_resolver_delay_ms = 150 * kTimeMultiplier; |
982 | |
|
983 | 0 | DisableApplyingIntents(); |
984 | |
|
985 | 0 | TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */); |
986 | 0 | auto session = CreateSession(); |
987 | 0 | for (int i = 0; i != kTransactions; ++i) { |
988 | 0 | WriteData(WriteOpType::INSERT, i); |
989 | 0 | } |
990 | |
|
991 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
992 | |
|
993 | 0 | ASSERT_OK(client_->DeleteTable(table_.table()->name(), /* wait= */ false)); |
994 | | |
995 | | // Wait delete table request to replicate on alive node. |
996 | 0 | std::this_thread::sleep_for(1s * kTimeMultiplier); |
997 | |
|
998 | 0 | ASSERT_OK(cluster_->mini_tablet_server(0)->Start()); |
999 | 0 | } |
1000 | | |
1001 | | } // namespace client |
1002 | | } // namespace yb |