/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/consensus/raft_consensus.h" |
15 | | |
16 | | #include "yb/integration-tests/cql_test_base.h" |
17 | | |
18 | | #include "yb/master/mini_master.h" |
19 | | |
20 | | #include "yb/tablet/tablet_peer.h" |
21 | | |
22 | | #include "yb/util/random_util.h" |
23 | | #include "yb/util/status_log.h" |
24 | | #include "yb/util/test_macros.h" |
25 | | #include "yb/util/test_util.h" |
26 | | #include "yb/util/tsan_util.h" |
27 | | |
28 | | using namespace std::literals; |
29 | | |
30 | | DECLARE_bool(TEST_timeout_non_leader_master_rpcs); |
31 | | DECLARE_int64(cql_processors_limit); |
32 | | DECLARE_int32(client_read_write_timeout_ms); |
33 | | |
34 | | DECLARE_string(TEST_fail_to_fast_resolve_address); |
35 | | DECLARE_int32(partitions_vtable_cache_refresh_secs); |
36 | | DECLARE_int32(client_read_write_timeout_ms); |
37 | | |
38 | | namespace yb { |
39 | | |
40 | | class CqlTest : public CqlTestBase<MiniCluster> { |
41 | | public: |
42 | 0 | virtual ~CqlTest() = default; |
43 | | }; |
44 | | |
45 | 0 | TEST_F(CqlTest, ProcessorsLimit) { |
46 | 0 | constexpr int kSessions = 10; |
47 | 0 | FLAGS_cql_processors_limit = 1; |
48 | |
|
49 | 0 | std::vector<CassandraSession> sessions; |
50 | 0 | bool has_failures = false; |
51 | 0 | for (int i = 0; i != kSessions; ++i) { |
52 | 0 | auto session = EstablishSession(driver_.get()); |
53 | 0 | if (!session.ok()) { |
54 | 0 | LOG(INFO) << "Establish session failure: " << session.status(); |
55 | 0 | ASSERT_TRUE(session.status().IsServiceUnavailable()); |
56 | 0 | has_failures = true; |
57 | 0 | } else { |
58 | 0 | sessions.push_back(std::move(*session)); |
59 | 0 | } |
60 | 0 | } |
61 | |
|
62 | 0 | ASSERT_TRUE(has_failures); |
63 | 0 | } |
64 | | |
65 | | // Execute delete in parallel to transactional update of the same row. |
66 | 0 | TEST_F(CqlTest, ConcurrentDeleteRowAndUpdateColumn) { |
67 | 0 | constexpr int kIterations = 70; |
68 | 0 | auto session1 = ASSERT_RESULT(EstablishSession(driver_.get())); |
69 | 0 | auto session2 = ASSERT_RESULT(EstablishSession(driver_.get())); |
70 | 0 | ASSERT_OK(session1.ExecuteQuery( |
71 | 0 | "CREATE TABLE t (i INT PRIMARY KEY, j INT) WITH transactions = { 'enabled' : true }")); |
72 | 0 | auto insert_prepared = ASSERT_RESULT(session1.Prepare("INSERT INTO t (i, j) VALUES (?, ?)")); |
73 | 0 | for (int key = 1; key <= 2 * kIterations; ++key) { |
74 | 0 | auto stmt = insert_prepared.Bind(); |
75 | 0 | stmt.Bind(0, key); |
76 | 0 | stmt.Bind(1, key * 10); |
77 | 0 | ASSERT_OK(session1.Execute(stmt)); |
78 | 0 | } |
79 | 0 | auto update_prepared = ASSERT_RESULT(session1.Prepare( |
80 | 0 | "BEGIN TRANSACTION " |
81 | 0 | " UPDATE t SET j = j + 1 WHERE i = ?;" |
82 | 0 | " UPDATE t SET j = j + 1 WHERE i = ?;" |
83 | 0 | "END TRANSACTION;")); |
84 | 0 | auto delete_prepared = ASSERT_RESULT(session1.Prepare("DELETE FROM t WHERE i = ?")); |
85 | 0 | std::vector<CassandraFuture> futures; |
86 | 0 | for (int i = 0; i < kIterations; ++i) { |
87 | 0 | int k1 = i * 2 + 1; |
88 | 0 | int k2 = i * 2 + 2; |
89 | |
|
90 | 0 | auto update_stmt = update_prepared.Bind(); |
91 | 0 | update_stmt.Bind(0, k1); |
92 | 0 | update_stmt.Bind(1, k2); |
93 | 0 | futures.push_back(session1.ExecuteGetFuture(update_stmt)); |
94 | 0 | } |
95 | |
|
96 | 0 | for (int i = 0; i < kIterations; ++i) { |
97 | 0 | int k2 = i * 2 + 2; |
98 | |
|
99 | 0 | auto delete_stmt = delete_prepared.Bind(); |
100 | 0 | delete_stmt.Bind(0, k2); |
101 | 0 | futures.push_back(session1.ExecuteGetFuture(delete_stmt)); |
102 | 0 | } |
103 | |
|
104 | 0 | for (auto& future : futures) { |
105 | 0 | ASSERT_OK(future.Wait()); |
106 | 0 | } |
107 | |
|
108 | 0 | auto result = ASSERT_RESULT(session1.ExecuteWithResult("SELECT * FROM t")); |
109 | 0 | auto iterator = result.CreateIterator(); |
110 | 0 | int num_rows = 0; |
111 | 0 | int num_even = 0; |
112 | 0 | while (iterator.Next()) { |
113 | 0 | ++num_rows; |
114 | 0 | auto row = iterator.Row(); |
115 | 0 | auto key = row.Value(0).As<int>(); |
116 | 0 | auto value = row.Value(1).As<int>(); |
117 | 0 | if ((key & 1) == 0) { |
118 | 0 | LOG(ERROR) << "Even key: " << key; |
119 | 0 | ++num_even; |
120 | 0 | } |
121 | 0 | ASSERT_EQ(value, key * 10 + 1); |
122 | 0 | LOG(INFO) << "Row: " << key << " => " << value; |
123 | 0 | } |
124 | 0 | ASSERT_EQ(num_rows, kIterations); |
125 | 0 | ASSERT_EQ(num_even, 0); |
126 | 0 | } |
127 | | |
128 | 0 | TEST_F(CqlTest, TestUpdateListIndexAfterOverwrite) { |
129 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
130 | 0 | auto cql = [&](const std::string query) { |
131 | 0 | ASSERT_OK(session.ExecuteQuery(query)); |
132 | 0 | }; |
133 | 0 | cql("CREATE TABLE test(h INT, v LIST<INT>, PRIMARY KEY(h))"); |
134 | 0 | cql("INSERT INTO test (h, v) VALUES (1, [1, 2, 3])"); |
135 | |
|
136 | 0 | auto select = [&]() -> Result<string> { |
137 | 0 | auto result = VERIFY_RESULT(session.ExecuteWithResult("SELECT * FROM test")); |
138 | 0 | auto iter = result.CreateIterator(); |
139 | 0 | DFATAL_OR_RETURN_ERROR_IF(!iter.Next(), STATUS(NotFound, "Did not find result in test table.")); |
140 | 0 | auto row = iter.Row(); |
141 | 0 | auto key = row.Value(0).As<int>(); |
142 | 0 | EXPECT_EQ(key, 1); |
143 | 0 | return row.Value(1).ToString(); |
144 | 0 | }; |
145 | |
|
146 | 0 | cql("UPDATE test SET v = [4, 5, 6] where h = 1"); |
147 | 0 | cql("UPDATE test SET v[0] = 7 WHERE h = 1"); |
148 | 0 | auto res1 = ASSERT_RESULT(select()); |
149 | 0 | EXPECT_EQ(res1, "[7, 5, 6]"); |
150 | |
|
151 | 0 | cql("INSERT INTO test (h, v) VALUES (1, [10, 11, 12])"); |
152 | 0 | cql("UPDATE test SET v[0] = 8 WHERE h = 1"); |
153 | 0 | auto res2 = ASSERT_RESULT(select()); |
154 | 0 | EXPECT_EQ(res2, "[8, 11, 12]"); |
155 | 0 | } |
156 | | |
157 | 0 | TEST_F(CqlTest, Timeout) { |
158 | 0 | FLAGS_client_read_write_timeout_ms = 5000 * kTimeMultiplier; |
159 | |
|
160 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
161 | 0 | ASSERT_OK(session.ExecuteQuery( |
162 | 0 | "CREATE TABLE t (i INT PRIMARY KEY, j INT) WITH transactions = { 'enabled' : true }")); |
163 | |
|
164 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
165 | 0 | for (const auto& peer : peers) { |
166 | 0 | peer->raft_consensus()->TEST_DelayUpdate(100ms); |
167 | 0 | } |
168 | |
|
169 | 0 | auto prepared = ASSERT_RESULT(session.Prepare( |
170 | 0 | "BEGIN TRANSACTION " |
171 | 0 | " INSERT INTO t (i, j) VALUES (?, ?);" |
172 | 0 | "END TRANSACTION;")); |
173 | 0 | struct Request { |
174 | 0 | CassandraFuture future; |
175 | 0 | CoarseTimePoint start_time; |
176 | 0 | }; |
177 | 0 | std::deque<Request> requests; |
178 | 0 | constexpr int kOps = 50; |
179 | 0 | constexpr int kKey = 42; |
180 | 0 | int executed_ops = 0; |
181 | 0 | for (;;) { |
182 | 0 | while (!requests.empty() && requests.front().future.Ready()) { |
183 | 0 | WARN_NOT_OK(requests.front().future.Wait(), "Insert failed"); |
184 | 0 | auto passed = CoarseMonoClock::now() - requests.front().start_time; |
185 | 0 | ASSERT_LE(passed, FLAGS_client_read_write_timeout_ms * 1ms + 2s * kTimeMultiplier); |
186 | 0 | requests.pop_front(); |
187 | 0 | } |
188 | 0 | if (executed_ops >= kOps) { |
189 | 0 | if (requests.empty()) { |
190 | 0 | break; |
191 | 0 | } |
192 | 0 | std::this_thread::sleep_for(100ms); |
193 | 0 | continue; |
194 | 0 | } |
195 | | |
196 | 0 | auto stmt = prepared.Bind(); |
197 | 0 | stmt.Bind(0, kKey); |
198 | 0 | stmt.Bind(1, ++executed_ops); |
199 | 0 | requests.push_back(Request { |
200 | 0 | .future = session.ExecuteGetFuture(stmt), |
201 | 0 | .start_time = CoarseMonoClock::now(), |
202 | 0 | }); |
203 | 0 | } |
204 | 0 | } |
205 | | |
206 | 0 | TEST_F(CqlTest, RecreateTableWithInserts) { |
207 | 0 | const auto kNumKeys = 4; |
208 | 0 | const auto kNumIters = 2; |
209 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
210 | 0 | for (int i = 0; i != kNumIters; ++i) { |
211 | 0 | SCOPED_TRACE(Format("Iteration: $0", i)); |
212 | 0 | ASSERT_OK(session.ExecuteQuery( |
213 | 0 | "CREATE TABLE t (k INT PRIMARY KEY, v INT) WITH transactions = { 'enabled' : true }")); |
214 | 0 | std::string expr = "BEGIN TRANSACTION "; |
215 | 0 | for (int key = 0; key != kNumKeys; ++key) { |
216 | 0 | expr += "INSERT INTO t (k, v) VALUES (?, ?); "; |
217 | 0 | } |
218 | 0 | expr += "END TRANSACTION;"; |
219 | 0 | auto prepared = ASSERT_RESULT(session.Prepare(expr)); |
220 | 0 | auto stmt = prepared.Bind(); |
221 | 0 | size_t idx = 0; |
222 | 0 | for (int key = 0; key != kNumKeys; ++key) { |
223 | 0 | stmt.Bind(idx++, RandomUniformInt<int32_t>(-1000, 1000)); |
224 | 0 | stmt.Bind(idx++, -key); |
225 | 0 | } |
226 | 0 | ASSERT_OK(session.Execute(stmt)); |
227 | 0 | ASSERT_OK(session.ExecuteQuery("DROP TABLE t")); |
228 | 0 | } |
229 | 0 | } |
230 | | |
231 | | class CqlThreeMastersTest : public CqlTest { |
232 | | public: |
233 | 2 | void SetUp() override { |
234 | 2 | FLAGS_partitions_vtable_cache_refresh_secs = 0; |
235 | 2 | CqlTest::SetUp(); |
236 | 2 | } |
237 | | |
238 | 2 | int num_masters() override { |
239 | 2 | return 3; |
240 | 2 | } |
241 | | }; |
242 | | |
243 | 0 | Status CheckNumAddressesInYqlPartitionsTable(CassandraSession* session, int expected_num_addrs) { |
244 | 0 | const int kReplicaAddressesIndex = 5; |
245 | 0 | auto result = VERIFY_RESULT(session->ExecuteWithResult("SELECT * FROM system.partitions")); |
246 | 0 | auto iterator = result.CreateIterator(); |
247 | 0 | while (iterator.Next()) { |
248 | 0 | auto replica_addresses = iterator.Row().Value(kReplicaAddressesIndex).ToString(); |
249 | 0 | ssize_t num_addrs = 0; |
250 | 0 | if (replica_addresses.size() > std::strlen("{}")) { |
251 | 0 | num_addrs = std::count(replica_addresses.begin(), replica_addresses.end(), ',') + 1; |
252 | 0 | } |
253 | |
|
254 | 0 | EXPECT_EQ(num_addrs, expected_num_addrs); |
255 | 0 | } |
256 | 0 | return Status::OK(); |
257 | 0 | } |
258 | | |
259 | 0 | TEST_F_EX(CqlTest, HostnameResolutionFailureInYqlPartitionsTable, CqlThreeMastersTest) { |
260 | 0 | google::FlagSaver flag_saver; |
261 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
262 | 0 | ASSERT_OK(CheckNumAddressesInYqlPartitionsTable(&session, 3)); |
263 | | |
264 | | // TEST_RpcAddress is 1-indexed. |
265 | 0 | string hostname = server::TEST_RpcAddress(cluster_->LeaderMasterIdx() + 1, |
266 | 0 | server::Private::kFalse); |
267 | | |
268 | | // Fail resolution of the old leader master's hostname. |
269 | 0 | FLAGS_TEST_fail_to_fast_resolve_address = hostname; |
270 | 0 | LOG(INFO) << "Setting FLAGS_TEST_fail_to_fast_resolve_address to: " |
271 | 0 | << FLAGS_TEST_fail_to_fast_resolve_address; |
272 | | |
273 | | // Shutdown the master leader, and wait for new leader to get elected. |
274 | 0 | ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Shutdown(); |
275 | 0 | ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); |
276 | | |
277 | | // Assert that a new call will succeed, but will be missing the shutdown master address. |
278 | 0 | ASSERT_OK(CheckNumAddressesInYqlPartitionsTable(&session, 2)); |
279 | 0 | } |
280 | | |
281 | 0 | TEST_F_EX(CqlTest, NonRespondingMaster, CqlThreeMastersTest) { |
282 | 0 | FLAGS_TEST_timeout_non_leader_master_rpcs = true; |
283 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
284 | 0 | ASSERT_OK(session.ExecuteQuery("CREATE TABLE t1 (i INT PRIMARY KEY, j INT)")); |
285 | 0 | ASSERT_OK(session.ExecuteQuery("INSERT INTO t1 (i, j) VALUES (1, 1)")); |
286 | 0 | ASSERT_OK(session.ExecuteQuery("CREATE TABLE t2 (i INT PRIMARY KEY, j INT)")); |
287 | |
|
288 | 0 | LOG(INFO) << "Prepare"; |
289 | 0 | auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t2 (i, j) VALUES (?, ?)")); |
290 | 0 | LOG(INFO) << "Step down"; |
291 | 0 | auto peer = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer(); |
292 | 0 | ASSERT_OK(StepDown(peer, std::string(), ForceStepDown::kTrue)); |
293 | 0 | LOG(INFO) << "Insert"; |
294 | 0 | FLAGS_client_read_write_timeout_ms = 5000; |
295 | 0 | bool has_ok = false; |
296 | 0 | for (int i = 0; i != 3; ++i) { |
297 | 0 | auto stmt = prepared.Bind(); |
298 | 0 | stmt.Bind(0, i); |
299 | 0 | stmt.Bind(1, 1); |
300 | 0 | auto status = session.Execute(stmt); |
301 | 0 | if (status.ok()) { |
302 | 0 | has_ok = true; |
303 | 0 | break; |
304 | 0 | } |
305 | 0 | ASSERT_NE(status.message().ToBuffer().find("timed out"), std::string::npos) << status; |
306 | 0 | } |
307 | 0 | ASSERT_TRUE(has_ok); |
308 | 0 | } |
309 | | |
310 | | } // namespace yb |