/Users/deen/code/yugabyte-db/src/yb/client/seal-txn-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/error.h" |
15 | | #include "yb/client/session.h" |
16 | | #include "yb/client/transaction.h" |
17 | | #include "yb/client/txn-test-base.h" |
18 | | |
19 | | #include "yb/tablet/tablet.h" |
20 | | #include "yb/tablet/tablet_peer.h" |
21 | | #include "yb/tablet/transaction_participant.h" |
22 | | |
23 | | #include "yb/util/bitmap.h" |
24 | | #include "yb/util/tsan_util.h" |
25 | | |
26 | | using namespace std::literals; |
27 | | |
28 | | DECLARE_bool(enable_load_balancing); |
29 | | DECLARE_bool(enable_transaction_sealing); |
30 | | DECLARE_bool(TEST_fail_on_replicated_batch_idx_set_in_txn_record); |
31 | | DECLARE_double(transaction_max_missed_heartbeat_periods); |
32 | | DECLARE_int32(TEST_write_rejection_percentage); |
33 | | DECLARE_int64(transaction_rpc_timeout_ms); |
34 | | |
35 | | namespace yb { |
36 | | namespace client { |
37 | | |
38 | | class SealTxnTest : public TransactionTestBase<MiniCluster> { |
39 | | protected: |
40 | 6 | void SetUp() override { |
41 | 6 | FLAGS_enable_transaction_sealing = true; |
42 | | |
43 | 6 | SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION); |
44 | 6 | TransactionTestBase::SetUp(); |
45 | 6 | } |
46 | | |
47 | | void TestNumBatches(bool restart); |
48 | | }; |
49 | | |
50 | | // Writes some data as part of transaction and check that batches are correcly tracked by |
51 | | // transaction participant. |
52 | 0 | void SealTxnTest::TestNumBatches(bool restart) { |
53 | 0 | auto txn = CreateTransaction(); |
54 | 0 | auto session = CreateSession(txn); |
55 | |
|
56 | 0 | size_t prev_num_non_empty = 0; |
57 | 0 | for (auto op_type : {WriteOpType::INSERT, WriteOpType::UPDATE}) { |
58 | 0 | ASSERT_OK(WriteRows(session, /* transaction= */ 0, op_type, Flush::kFalse)); |
59 | 0 | ASSERT_OK(session->Flush()); |
60 | |
|
61 | 0 | size_t num_non_empty = 0; |
62 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
63 | 0 | for (const auto& peer : peers) { |
64 | 0 | auto txn_participant = peer->tablet()->transaction_participant(); |
65 | 0 | if (!txn_participant) { |
66 | 0 | continue; |
67 | 0 | } |
68 | 0 | auto replicated_batch_idx_set = txn_participant->TEST_TransactionReplicatedBatches(txn->id()); |
69 | 0 | LOG(INFO) << peer->tablet_id() << ": " << replicated_batch_idx_set.ToString(); |
70 | 0 | if (replicated_batch_idx_set.CountSet() != 0) { |
71 | 0 | ++num_non_empty; |
72 | 0 | ASSERT_EQ(replicated_batch_idx_set.ToString(), |
73 | 0 | op_type == WriteOpType::INSERT ? "[0]" : "[0, 1]"); |
74 | 0 | } |
75 | 0 | } |
76 | |
|
77 | 0 | if (op_type == WriteOpType::INSERT) { |
78 | 0 | ASSERT_GT(num_non_empty, 0); |
79 | 0 | if (restart) { |
80 | 0 | ASSERT_OK(cluster_->RestartSync()); |
81 | 0 | } |
82 | 0 | } else { |
83 | 0 | ASSERT_EQ(num_non_empty, prev_num_non_empty); |
84 | 0 | } |
85 | 0 | prev_num_non_empty = num_non_empty; |
86 | 0 | } |
87 | 0 | } |
88 | | |
89 | 0 | TEST_F(SealTxnTest, NumBatches) { |
90 | 0 | TestNumBatches(/* restart= */ false); |
91 | 0 | } |
92 | | |
93 | 0 | TEST_F(SealTxnTest, NumBatchesWithRestart) { |
94 | | // Restarting whole cluster could result in expired transaction, that is not expected by the test. |
95 | | // Increase transaction timeout to avoid such kind of failures. |
96 | 0 | FLAGS_transaction_max_missed_heartbeat_periods = 50; |
97 | 0 | TestNumBatches(/* restart= */ true); |
98 | 0 | } |
99 | | |
100 | 0 | TEST_F(SealTxnTest, NumBatchesWithRejection) { |
101 | 0 | FLAGS_TEST_write_rejection_percentage = 75; |
102 | 0 | TestNumBatches(/* restart= */ false); |
103 | 0 | } |
104 | | |
105 | | // Check that we could disable writing information about the number of batches, |
106 | | // since it is required for backward compatibility. |
107 | 0 | TEST_F(SealTxnTest, NumBatchesDisable) { |
108 | 0 | DisableTransactionTimeout(); |
109 | | // Should be enough for the restarted servers to be back online |
110 | 0 | FLAGS_transaction_rpc_timeout_ms = 20000 * kTimeMultiplier; |
111 | 0 | FLAGS_enable_transaction_sealing = false; |
112 | 0 | FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record = true; |
113 | |
|
114 | 0 | auto txn = CreateTransaction(); |
115 | 0 | auto session = CreateSession(txn); |
116 | 0 | ASSERT_OK(WriteRows(session)); |
117 | 0 | ASSERT_OK(cluster_->RestartSync()); |
118 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
119 | 0 | } |
120 | | |
121 | 0 | TEST_F(SealTxnTest, Simple) { |
122 | 0 | auto txn = CreateTransaction(); |
123 | 0 | auto session = CreateSession(txn); |
124 | 0 | ASSERT_OK(WriteRows(session, /* transaction = */ 0, WriteOpType::INSERT, Flush::kFalse)); |
125 | 0 | auto flush_future = session->FlushFuture(); |
126 | 0 | auto commit_future = txn->CommitFuture(CoarseTimePoint(), SealOnly::kTrue); |
127 | 0 | ASSERT_OK(flush_future.get().status); |
128 | 0 | LOG(INFO) << "Flushed: " << txn->id(); |
129 | 0 | ASSERT_OK(commit_future.get()); |
130 | 0 | LOG(INFO) << "Committed: " << txn->id(); |
131 | 0 | ASSERT_NO_FATALS(VerifyData()); |
132 | 0 | ASSERT_OK(cluster_->RestartSync()); |
133 | 0 | AssertNoRunningTransactions(); |
134 | 0 | } |
135 | | |
136 | 0 | TEST_F(SealTxnTest, Update) { |
137 | 0 | auto txn = CreateTransaction(); |
138 | 0 | auto session = CreateSession(txn); |
139 | 0 | LOG(INFO) << "Inserting rows"; |
140 | 0 | ASSERT_OK(WriteRows(session, /* transaction = */ 0)); |
141 | 0 | LOG(INFO) << "Updating rows"; |
142 | 0 | ASSERT_OK(WriteRows(session, /* transaction = */ 0, WriteOpType::UPDATE, Flush::kFalse)); |
143 | 0 | auto flush_future = session->FlushFuture(); |
144 | 0 | auto commit_future = txn->CommitFuture(CoarseTimePoint(), SealOnly::kTrue); |
145 | 0 | ASSERT_OK(flush_future.get().status); |
146 | 0 | LOG(INFO) << "Flushed: " << txn->id(); |
147 | 0 | ASSERT_OK(commit_future.get()); |
148 | 0 | LOG(INFO) << "Committed: " << txn->id(); |
149 | 0 | ASSERT_NO_FATALS(VerifyData(1, WriteOpType::UPDATE)); |
150 | 0 | ASSERT_OK(cluster_->RestartSync()); |
151 | 0 | AssertNoRunningTransactions(); |
152 | 0 | } |
153 | | |
154 | | } // namespace client |
155 | | } // namespace yb |