/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql-index-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/integration-tests/cql_test_base.h" |
15 | | #include "yb/integration-tests/mini_cluster_utils.h" |
16 | | |
17 | | #include "yb/util/atomic.h" |
18 | | #include "yb/util/logging.h" |
19 | | #include "yb/util/random_util.h" |
20 | | #include "yb/util/status_log.h" |
21 | | #include "yb/util/test_thread_holder.h" |
22 | | #include "yb/util/test_util.h" |
23 | | #include "yb/util/tsan_util.h" |
24 | | |
25 | | using namespace std::literals; |
26 | | |
27 | | DECLARE_bool(allow_index_table_read_write); |
28 | | DECLARE_int32(cql_prepare_child_threshold_ms); |
29 | | DECLARE_bool(disable_index_backfill); |
30 | | DECLARE_bool(transactions_poll_check_aborted); |
31 | | DECLARE_bool(TEST_disable_proactive_txn_cleanup_on_abort); |
32 | | DECLARE_int32(client_read_write_timeout_ms); |
33 | | DECLARE_int32(rpc_workers_limit); |
34 | | DECLARE_uint64(transaction_manager_workers_limit); |
35 | | DECLARE_uint64(TEST_inject_txn_get_status_delay_ms); |
36 | | DECLARE_int64(transaction_abort_check_interval_ms); |
37 | | |
38 | | namespace yb { |
39 | | |
40 | | class CqlIndexTest : public CqlTestBase<MiniCluster> { |
41 | | public: |
42 | 0 | virtual ~CqlIndexTest() = default; |
43 | | |
44 | | void TestTxnCleanup(size_t max_remaining_txns_per_tablet); |
45 | | void TestConcurrentModify2Columns(const std::string& expr); |
46 | | }; |
47 | | |
48 | | YB_STRONGLY_TYPED_BOOL(UniqueIndex); |
49 | | |
50 | | CHECKED_STATUS CreateIndexedTable( |
51 | 0 | CassandraSession* session, UniqueIndex unique_index = UniqueIndex::kFalse) { |
52 | 0 | RETURN_NOT_OK( |
53 | 0 | session->ExecuteQuery("CREATE TABLE IF NOT EXISTS t (key INT PRIMARY KEY, value INT) WITH " |
54 | 0 | "transactions = { 'enabled' : true }")); |
55 | 0 | return session->ExecuteQuery( |
56 | 0 | Format("CREATE $0 INDEX IF NOT EXISTS idx ON T (value)", unique_index ? "UNIQUE" : "")); |
57 | 0 | } |
58 | | |
59 | 0 | TEST_F(CqlIndexTest, Simple) { |
60 | 0 | constexpr int kKey = 1; |
61 | 0 | constexpr int kValue = 2; |
62 | |
|
63 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
64 | |
|
65 | 0 | ASSERT_OK(CreateIndexedTable(&session)); |
66 | |
|
67 | 0 | ASSERT_OK(session.ExecuteQuery("INSERT INTO t (key, value) VALUES (1, 2)")); |
68 | 0 | auto result = ASSERT_RESULT(session.ExecuteWithResult("SELECT * FROM t WHERE value = 2")); |
69 | 0 | auto iter = result.CreateIterator(); |
70 | 0 | ASSERT_TRUE(iter.Next()); |
71 | 0 | auto row = iter.Row(); |
72 | 0 | ASSERT_EQ(row.Value(0).As<cass_int32_t>(), kKey); |
73 | 0 | ASSERT_EQ(row.Value(1).As<cass_int32_t>(), kValue); |
74 | 0 | ASSERT_FALSE(iter.Next()); |
75 | 0 | } |
76 | | |
77 | 0 | TEST_F(CqlIndexTest, MultipleIndex) { |
78 | 0 | FLAGS_disable_index_backfill = false; |
79 | 0 | auto session1 = ASSERT_RESULT(EstablishSession(driver_.get())); |
80 | 0 | auto session2 = ASSERT_RESULT(EstablishSession(driver_.get())); |
81 | |
|
82 | 0 | WARN_NOT_OK(session1.ExecuteQuery( |
83 | 0 | "CREATE TABLE t (key INT PRIMARY KEY, value INT) WITH transactions = { 'enabled' : true }"), |
84 | 0 | "Create table failed."); |
85 | 0 | auto future1 = session1.ExecuteGetFuture("CREATE INDEX idx1 ON T (value)"); |
86 | 0 | auto future2 = session2.ExecuteGetFuture("CREATE INDEX idx2 ON T (value)"); |
87 | |
|
88 | 0 | constexpr auto kNamespace = "test"; |
89 | 0 | const client::YBTableName table_name(YQL_DATABASE_CQL, kNamespace, "t"); |
90 | 0 | const client::YBTableName index_table_name1(YQL_DATABASE_CQL, kNamespace, "idx1"); |
91 | 0 | const client::YBTableName index_table_name2(YQL_DATABASE_CQL, kNamespace, "idx2"); |
92 | |
|
93 | 0 | LOG(INFO) << "Waiting for idx1 got " << future1.Wait(); |
94 | 0 | auto perm1 = ASSERT_RESULT(client_->WaitUntilIndexPermissionsAtLeast( |
95 | 0 | table_name, index_table_name1, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE)); |
96 | 0 | CHECK_EQ(perm1, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE); |
97 | 0 | LOG(INFO) << "Waiting for idx2 got " << future2.Wait(); |
98 | 0 | auto perm2 = ASSERT_RESULT(client_->WaitUntilIndexPermissionsAtLeast( |
99 | 0 | table_name, index_table_name2, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE)); |
100 | 0 | CHECK_EQ(perm2, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE); |
101 | 0 | } |
102 | | |
103 | | class CqlIndexSmallWorkersTest : public CqlIndexTest { |
104 | | public: |
105 | 1 | void SetUp() override { |
106 | 1 | FLAGS_rpc_workers_limit = 4; |
107 | 1 | FLAGS_transaction_manager_workers_limit = 4; |
108 | 1 | CqlIndexTest::SetUp(); |
109 | 1 | } |
110 | | }; |
111 | | |
112 | 0 | TEST_F_EX(CqlIndexTest, ConcurrentIndexUpdate, CqlIndexSmallWorkersTest) { |
113 | 0 | constexpr int kThreads = 8; |
114 | 0 | constexpr cass_int32_t kKeys = kThreads / 2; |
115 | 0 | constexpr cass_int32_t kValues = kKeys; |
116 | 0 | constexpr int kNumInserts = kThreads * 5; |
117 | |
|
118 | 0 | FLAGS_client_read_write_timeout_ms = 10000 * kTimeMultiplier; |
119 | 0 | SetAtomicFlag(1000, &FLAGS_TEST_inject_txn_get_status_delay_ms); |
120 | 0 | FLAGS_transaction_abort_check_interval_ms = 100000; |
121 | |
|
122 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
123 | |
|
124 | 0 | ASSERT_OK(LoggedWaitFor( |
125 | 0 | [&session]() { return CreateIndexedTable(&session).ok(); }, 60s, "create table", 12s)); |
126 | 0 | constexpr auto kNamespace = "test"; |
127 | 0 | const client::YBTableName table_name(YQL_DATABASE_CQL, kNamespace, "t"); |
128 | 0 | const client::YBTableName index_table_name(YQL_DATABASE_CQL, kNamespace, "idx"); |
129 | 0 | ASSERT_OK(LoggedWaitFor( |
130 | 0 | [this, table_name, index_table_name]() { |
131 | 0 | auto result = client_->WaitUntilIndexPermissionsAtLeast( |
132 | 0 | table_name, index_table_name, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE); |
133 | 0 | return result.ok() && *result == IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE; |
134 | 0 | }, |
135 | 0 | 90s, |
136 | 0 | "wait for create index to complete", |
137 | 0 | 12s)); |
138 | |
|
139 | 0 | TestThreadHolder thread_holder; |
140 | 0 | std::atomic<int> inserts(0); |
141 | 0 | for (int i = 0; i != kThreads; ++i) { |
142 | 0 | thread_holder.AddThreadFunctor([this, &inserts, &stop = thread_holder.stop_flag()] { |
143 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
144 | 0 | auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t (key, value) VALUES (?, ?)")); |
145 | 0 | while (!stop.load(std::memory_order_acquire)) { |
146 | 0 | auto stmt = prepared.Bind(); |
147 | 0 | stmt.Bind(0, RandomUniformInt<cass_int32_t>(1, kKeys)); |
148 | 0 | stmt.Bind(1, RandomUniformInt<cass_int32_t>(1, kValues)); |
149 | 0 | auto status = session.Execute(stmt); |
150 | 0 | if (status.ok()) { |
151 | 0 | ++inserts; |
152 | 0 | } else { |
153 | 0 | LOG(INFO) << "Insert failed: " << status; |
154 | 0 | } |
155 | 0 | } |
156 | 0 | }); |
157 | 0 | } |
158 | |
|
159 | 0 | while (!thread_holder.stop_flag().load(std::memory_order_acquire)) { |
160 | 0 | auto num_inserts = inserts.load(std::memory_order_acquire); |
161 | 0 | if (num_inserts >= kNumInserts) { |
162 | 0 | break; |
163 | 0 | } |
164 | 0 | YB_LOG_EVERY_N_SECS(INFO, 5) << "Num inserts " << num_inserts << " of " << kNumInserts; |
165 | 0 | std::this_thread::sleep_for(100ms); |
166 | 0 | } |
167 | |
|
168 | 0 | thread_holder.Stop(); |
169 | |
|
170 | 0 | SetAtomicFlag(0, &FLAGS_TEST_inject_txn_get_status_delay_ms); |
171 | 0 | } |
172 | | |
173 | 0 | TEST_F(CqlIndexTest, TestSaturatedWorkers) { |
174 | | /* |
175 | | * (#11258) We set a very short timeout to force failure if child transaction |
176 | | * is not created quickly enough. |
177 | | |
178 | | * TODO: when switching to a fully asynchronous model, this failure will disappear. |
179 | | */ |
180 | 0 | FLAGS_cql_prepare_child_threshold_ms = 1; |
181 | |
|
182 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
183 | 0 | ASSERT_OK(session.ExecuteQuery( |
184 | 0 | "CREATE TABLE t (key INT PRIMARY KEY, v1 INT, v2 INT) WITH " |
185 | 0 | "transactions = { 'enabled' : true }")); |
186 | 0 | ASSERT_OK(session.ExecuteQuery( |
187 | 0 | "CREATE INDEX i1 ON t(key, v1) WITH " |
188 | 0 | "transactions = { 'enabled' : true }")); |
189 | 0 | ASSERT_OK(session.ExecuteQuery( |
190 | 0 | "CREATE INDEX i2 ON t(key, v2) WITH " |
191 | 0 | "transactions = { 'enabled' : true }")); |
192 | |
|
193 | 0 | constexpr int kKeys = 10000; |
194 | 0 | std::string expr = "BEGIN TRANSACTION "; |
195 | 0 | for (int i = 0; i < kKeys; i++) { |
196 | 0 | expr += Format("INSERT INTO t (key, v1, v2) VALUES ($0, $1, $2); ", i, i, i); |
197 | 0 | } |
198 | 0 | expr += "END TRANSACTION;"; |
199 | | |
200 | | // We should expect to see timed out error |
201 | 0 | auto status = session.ExecuteQuery(expr); |
202 | 0 | ASSERT_FALSE(status.ok()); |
203 | 0 | ASSERT_NE(status.message().ToBuffer().find("Timed out waiting for prepare child status"), |
204 | 0 | std::string::npos) << status; |
205 | 0 | } |
206 | | |
207 | | YB_STRONGLY_TYPED_BOOL(CheckReady); |
208 | | |
209 | 0 | void CleanFutures(std::deque<CassandraFuture>* futures, CheckReady check_ready) { |
210 | 0 | while (!futures->empty() && (!check_ready || futures->front().Ready())) { |
211 | 0 | auto status = futures->front().Wait(); |
212 | 0 | if (!status.ok() && !status.IsTimedOut()) { |
213 | 0 | auto msg = status.message().ToBuffer(); |
214 | 0 | if (msg.find("Duplicate value disallowed by unique index") == std::string::npos) { |
215 | 0 | ASSERT_OK(status); |
216 | 0 | } |
217 | 0 | } |
218 | 0 | futures->pop_front(); |
219 | 0 | } |
220 | 0 | } |
221 | | |
222 | 0 | void CqlIndexTest::TestTxnCleanup(size_t max_remaining_txns_per_tablet) { |
223 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
224 | |
|
225 | 0 | ASSERT_OK(CreateIndexedTable(&session, UniqueIndex::kTrue)); |
226 | 0 | std::deque<CassandraFuture> futures; |
227 | |
|
228 | 0 | auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t (key, value) VALUES (?, ?)")); |
229 | |
|
230 | 0 | for (int i = 0; i != RegularBuildVsSanitizers(100, 30); ++i) { |
231 | 0 | ASSERT_NO_FATALS(CleanFutures(&futures, CheckReady::kTrue)); |
232 | |
|
233 | 0 | auto stmt = prepared.Bind(); |
234 | 0 | stmt.Bind(0, i); |
235 | 0 | stmt.Bind(1, RandomUniformInt<cass_int32_t>(1, 10)); |
236 | 0 | futures.push_back(session.ExecuteGetFuture(stmt)); |
237 | 0 | } |
238 | |
|
239 | 0 | ASSERT_NO_FATALS(CleanFutures(&futures, CheckReady::kFalse)); |
240 | |
|
241 | 0 | AssertRunningTransactionsCountLessOrEqualTo(cluster_.get(), max_remaining_txns_per_tablet); |
242 | 0 | } |
243 | | |
244 | | // Test proactive aborted transactions cleanup. |
245 | 0 | TEST_F(CqlIndexTest, TxnCleanup) { |
246 | 0 | FLAGS_transactions_poll_check_aborted = false; |
247 | |
|
248 | 0 | TestTxnCleanup(/* max_remaining_txns_per_tablet= */ 5); |
249 | 0 | } |
250 | | |
251 | | // Test poll based aborted transactions cleanup. |
252 | 0 | TEST_F(CqlIndexTest, TxnPollCleanup) { |
253 | 0 | FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true; |
254 | 0 | FLAGS_transaction_abort_check_interval_ms = 1000; |
255 | |
|
256 | 0 | TestTxnCleanup(/* max_remaining_txns_per_tablet= */ 0); |
257 | 0 | } |
258 | | |
259 | 0 | void CqlIndexTest::TestConcurrentModify2Columns(const std::string& expr) { |
260 | 0 | FLAGS_allow_index_table_read_write = true; |
261 | |
|
262 | 0 | constexpr int kKeys = RegularBuildVsSanitizers(50, 10); |
263 | |
|
264 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
265 | 0 | ASSERT_OK(session.ExecuteQuery( |
266 | 0 | "CREATE TABLE t (key INT PRIMARY KEY, v1 INT, v2 INT) WITH " |
267 | 0 | "transactions = { 'enabled' : true }")); |
268 | 0 | ASSERT_OK(session.ExecuteQuery("CREATE INDEX v1_idx ON t (v1)")); |
269 | |
|
270 | 0 | auto prepared1 = ASSERT_RESULT(session.Prepare(Format(expr, "v1"))); |
271 | 0 | auto prepared2 = ASSERT_RESULT(session.Prepare(Format(expr, "v2"))); |
272 | |
|
273 | 0 | std::vector<CassandraFuture> futures; |
274 | |
|
275 | 0 | for (int i = 0; i != kKeys; ++i) { |
276 | 0 | for (auto* prepared : {&prepared1, &prepared2}) { |
277 | 0 | auto statement = prepared->Bind(); |
278 | 0 | statement.Bind(0, i); |
279 | 0 | statement.Bind(1, i); |
280 | 0 | futures.push_back(session.ExecuteGetFuture(statement)); |
281 | 0 | } |
282 | 0 | } |
283 | |
|
284 | 0 | int good = 0; |
285 | 0 | int bad = 0; |
286 | 0 | for (auto& future : futures) { |
287 | 0 | auto status = future.Wait(); |
288 | 0 | if (status.ok()) { |
289 | 0 | ++good; |
290 | 0 | } else { |
291 | 0 | ASSERT_TRUE(status.IsTimedOut()) << status; |
292 | 0 | ++bad; |
293 | 0 | } |
294 | 0 | } |
295 | |
|
296 | 0 | ASSERT_GE(good, bad * 4); |
297 | |
|
298 | 0 | std::vector<bool> present(kKeys); |
299 | |
|
300 | 0 | int read_keys = 0; |
301 | 0 | auto result = ASSERT_RESULT(session.ExecuteWithResult("SELECT * FROM v1_idx")); |
302 | 0 | auto iter = result.CreateIterator(); |
303 | 0 | while (iter.Next()) { |
304 | 0 | auto row = iter.Row(); |
305 | 0 | auto key = row.Value(0).As<int32_t>(); |
306 | 0 | auto value = row.Value(1); |
307 | 0 | LOG(INFO) << key << ", " << value.ToString(); |
308 | |
|
309 | 0 | ASSERT_FALSE(present[key]) << key; |
310 | 0 | present[key] = true; |
311 | 0 | ++read_keys; |
312 | 0 | ASSERT_FALSE(value.IsNull()); |
313 | 0 | ASSERT_EQ(key, value.As<int32_t>()); |
314 | 0 | } |
315 | |
|
316 | 0 | ASSERT_EQ(read_keys, kKeys); |
317 | 0 | } |
318 | | |
319 | 0 | TEST_F(CqlIndexTest, ConcurrentInsert2Columns) { |
320 | 0 | TestConcurrentModify2Columns("INSERT INTO t ($0, key) VALUES (?, ?)"); |
321 | 0 | } |
322 | | |
323 | 0 | TEST_F(CqlIndexTest, ConcurrentUpdate2Columns) { |
324 | 0 | TestConcurrentModify2Columns("UPDATE t SET $0 = ? WHERE key = ?"); |
325 | 0 | } |
326 | | |
327 | | } // namespace yb |