YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/consensus/raft_consensus.h"
15
16
#include "yb/integration-tests/cql_test_base.h"
17
18
#include "yb/master/mini_master.h"
19
20
#include "yb/tablet/tablet_peer.h"
21
22
#include "yb/util/random_util.h"
23
#include "yb/util/status_log.h"
24
#include "yb/util/test_macros.h"
25
#include "yb/util/test_util.h"
26
#include "yb/util/tsan_util.h"
27
28
using namespace std::literals;
29
30
DECLARE_bool(TEST_timeout_non_leader_master_rpcs);
31
DECLARE_int64(cql_processors_limit);
32
DECLARE_int32(client_read_write_timeout_ms);
33
34
DECLARE_string(TEST_fail_to_fast_resolve_address);
35
DECLARE_int32(partitions_vtable_cache_refresh_secs);
36
DECLARE_int32(client_read_write_timeout_ms);
37
38
namespace yb {
39
40
class CqlTest : public CqlTestBase<MiniCluster> {
41
 public:
42
0
  virtual ~CqlTest() = default;
43
};
44
45
0
TEST_F(CqlTest, ProcessorsLimit) {
46
0
  constexpr int kSessions = 10;
47
0
  FLAGS_cql_processors_limit = 1;
48
49
0
  std::vector<CassandraSession> sessions;
50
0
  bool has_failures = false;
51
0
  for (int i = 0; i != kSessions; ++i) {
52
0
    auto session = EstablishSession(driver_.get());
53
0
    if (!session.ok()) {
54
0
      LOG(INFO) << "Establish session failure: " << session.status();
55
0
      ASSERT_TRUE(session.status().IsServiceUnavailable());
56
0
      has_failures = true;
57
0
    } else {
58
0
      sessions.push_back(std::move(*session));
59
0
    }
60
0
  }
61
62
0
  ASSERT_TRUE(has_failures);
63
0
}
64
65
// Execute delete in parallel to transactional update of the same row.
66
0
TEST_F(CqlTest, ConcurrentDeleteRowAndUpdateColumn) {
67
0
  constexpr int kIterations = 70;
68
0
  auto session1 = ASSERT_RESULT(EstablishSession(driver_.get()));
69
0
  auto session2 = ASSERT_RESULT(EstablishSession(driver_.get()));
70
0
  ASSERT_OK(session1.ExecuteQuery(
71
0
      "CREATE TABLE t (i INT PRIMARY KEY, j INT) WITH transactions = { 'enabled' : true }"));
72
0
  auto insert_prepared = ASSERT_RESULT(session1.Prepare("INSERT INTO t (i, j) VALUES (?, ?)"));
73
0
  for (int key = 1; key <= 2 * kIterations; ++key) {
74
0
    auto stmt = insert_prepared.Bind();
75
0
    stmt.Bind(0, key);
76
0
    stmt.Bind(1, key * 10);
77
0
    ASSERT_OK(session1.Execute(stmt));
78
0
  }
79
0
  auto update_prepared = ASSERT_RESULT(session1.Prepare(
80
0
      "BEGIN TRANSACTION "
81
0
      "  UPDATE t SET j = j + 1 WHERE i = ?;"
82
0
      "  UPDATE t SET j = j + 1 WHERE i = ?;"
83
0
      "END TRANSACTION;"));
84
0
  auto delete_prepared = ASSERT_RESULT(session1.Prepare("DELETE FROM t WHERE i = ?"));
85
0
  std::vector<CassandraFuture> futures;
86
0
  for (int i = 0; i < kIterations; ++i) {
87
0
    int k1 = i * 2 + 1;
88
0
    int k2 = i * 2 + 2;
89
90
0
    auto update_stmt = update_prepared.Bind();
91
0
    update_stmt.Bind(0, k1);
92
0
    update_stmt.Bind(1, k2);
93
0
    futures.push_back(session1.ExecuteGetFuture(update_stmt));
94
0
  }
95
96
0
  for (int i = 0; i < kIterations; ++i) {
97
0
    int k2 = i * 2 + 2;
98
99
0
    auto delete_stmt = delete_prepared.Bind();
100
0
    delete_stmt.Bind(0, k2);
101
0
    futures.push_back(session1.ExecuteGetFuture(delete_stmt));
102
0
  }
103
104
0
  for (auto& future : futures) {
105
0
    ASSERT_OK(future.Wait());
106
0
  }
107
108
0
  auto result = ASSERT_RESULT(session1.ExecuteWithResult("SELECT * FROM t"));
109
0
  auto iterator = result.CreateIterator();
110
0
  int num_rows = 0;
111
0
  int num_even = 0;
112
0
  while (iterator.Next()) {
113
0
    ++num_rows;
114
0
    auto row = iterator.Row();
115
0
    auto key = row.Value(0).As<int>();
116
0
    auto value = row.Value(1).As<int>();
117
0
    if ((key & 1) == 0) {
118
0
      LOG(ERROR) << "Even key: " << key;
119
0
      ++num_even;
120
0
    }
121
0
    ASSERT_EQ(value, key * 10 + 1);
122
0
    LOG(INFO) << "Row: " << key << " => " << value;
123
0
  }
124
0
  ASSERT_EQ(num_rows, kIterations);
125
0
  ASSERT_EQ(num_even, 0);
126
0
}
127
128
0
TEST_F(CqlTest, TestUpdateListIndexAfterOverwrite) {
129
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
130
0
  auto cql = [&](const std::string query) {
131
0
    ASSERT_OK(session.ExecuteQuery(query));
132
0
  };
133
0
  cql("CREATE TABLE test(h INT, v LIST<INT>, PRIMARY KEY(h))");
134
0
  cql("INSERT INTO test (h, v) VALUES (1, [1, 2, 3])");
135
136
0
  auto select = [&]() -> Result<string> {
137
0
    auto result = VERIFY_RESULT(session.ExecuteWithResult("SELECT * FROM test"));
138
0
    auto iter = result.CreateIterator();
139
0
    DFATAL_OR_RETURN_ERROR_IF(!iter.Next(), STATUS(NotFound, "Did not find result in test table."));
140
0
    auto row = iter.Row();
141
0
    auto key = row.Value(0).As<int>();
142
0
    EXPECT_EQ(key, 1);
143
0
    return row.Value(1).ToString();
144
0
  };
145
146
0
  cql("UPDATE test SET v = [4, 5, 6] where h = 1");
147
0
  cql("UPDATE test SET v[0] = 7 WHERE h = 1");
148
0
  auto res1 = ASSERT_RESULT(select());
149
0
  EXPECT_EQ(res1, "[7, 5, 6]");
150
151
0
  cql("INSERT INTO test (h, v) VALUES (1, [10, 11, 12])");
152
0
  cql("UPDATE test SET v[0] = 8 WHERE h = 1");
153
0
  auto res2 = ASSERT_RESULT(select());
154
0
  EXPECT_EQ(res2, "[8, 11, 12]");
155
0
}
156
157
0
TEST_F(CqlTest, Timeout) {
158
0
  FLAGS_client_read_write_timeout_ms = 5000 * kTimeMultiplier;
159
160
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
161
0
  ASSERT_OK(session.ExecuteQuery(
162
0
      "CREATE TABLE t (i INT PRIMARY KEY, j INT) WITH transactions = { 'enabled' : true }"));
163
164
0
  auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
165
0
  for (const auto& peer : peers) {
166
0
    peer->raft_consensus()->TEST_DelayUpdate(100ms);
167
0
  }
168
169
0
  auto prepared = ASSERT_RESULT(session.Prepare(
170
0
      "BEGIN TRANSACTION "
171
0
      "  INSERT INTO t (i, j) VALUES (?, ?);"
172
0
      "END TRANSACTION;"));
173
0
  struct Request {
174
0
    CassandraFuture future;
175
0
    CoarseTimePoint start_time;
176
0
  };
177
0
  std::deque<Request> requests;
178
0
  constexpr int kOps = 50;
179
0
  constexpr int kKey = 42;
180
0
  int executed_ops = 0;
181
0
  for (;;) {
182
0
    while (!requests.empty() && requests.front().future.Ready()) {
183
0
      WARN_NOT_OK(requests.front().future.Wait(), "Insert failed");
184
0
      auto passed = CoarseMonoClock::now() - requests.front().start_time;
185
0
      ASSERT_LE(passed, FLAGS_client_read_write_timeout_ms * 1ms + 2s * kTimeMultiplier);
186
0
      requests.pop_front();
187
0
    }
188
0
    if (executed_ops >= kOps) {
189
0
      if (requests.empty()) {
190
0
        break;
191
0
      }
192
0
      std::this_thread::sleep_for(100ms);
193
0
      continue;
194
0
    }
195
196
0
    auto stmt = prepared.Bind();
197
0
    stmt.Bind(0, kKey);
198
0
    stmt.Bind(1, ++executed_ops);
199
0
    requests.push_back(Request {
200
0
        .future = session.ExecuteGetFuture(stmt),
201
0
        .start_time = CoarseMonoClock::now(),
202
0
    });
203
0
  }
204
0
}
205
206
0
TEST_F(CqlTest, RecreateTableWithInserts) {
207
0
  const auto kNumKeys = 4;
208
0
  const auto kNumIters = 2;
209
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
210
0
  for (int i = 0; i != kNumIters; ++i) {
211
0
    SCOPED_TRACE(Format("Iteration: $0", i));
212
0
    ASSERT_OK(session.ExecuteQuery(
213
0
        "CREATE TABLE t (k INT PRIMARY KEY, v INT) WITH transactions = { 'enabled' : true }"));
214
0
    std::string expr = "BEGIN TRANSACTION ";
215
0
    for (int key = 0; key != kNumKeys; ++key) {
216
0
      expr += "INSERT INTO t (k, v) VALUES (?, ?); ";
217
0
    }
218
0
    expr += "END TRANSACTION;";
219
0
    auto prepared = ASSERT_RESULT(session.Prepare(expr));
220
0
    auto stmt = prepared.Bind();
221
0
    size_t idx = 0;
222
0
    for (int key = 0; key != kNumKeys; ++key) {
223
0
      stmt.Bind(idx++, RandomUniformInt<int32_t>(-1000, 1000));
224
0
      stmt.Bind(idx++, -key);
225
0
    }
226
0
    ASSERT_OK(session.Execute(stmt));
227
0
    ASSERT_OK(session.ExecuteQuery("DROP TABLE t"));
228
0
  }
229
0
}
230
231
class CqlThreeMastersTest : public CqlTest {
232
 public:
233
2
  void SetUp() override {
234
2
    FLAGS_partitions_vtable_cache_refresh_secs = 0;
235
2
    CqlTest::SetUp();
236
2
  }
237
238
2
  int num_masters() override {
239
2
    return 3;
240
2
  }
241
};
242
243
0
Status CheckNumAddressesInYqlPartitionsTable(CassandraSession* session, int expected_num_addrs) {
244
0
  const int kReplicaAddressesIndex = 5;
245
0
  auto result = VERIFY_RESULT(session->ExecuteWithResult("SELECT * FROM system.partitions"));
246
0
  auto iterator = result.CreateIterator();
247
0
  while (iterator.Next()) {
248
0
    auto replica_addresses = iterator.Row().Value(kReplicaAddressesIndex).ToString();
249
0
    ssize_t num_addrs = 0;
250
0
    if (replica_addresses.size() > std::strlen("{}")) {
251
0
      num_addrs = std::count(replica_addresses.begin(), replica_addresses.end(), ',') + 1;
252
0
    }
253
254
0
    EXPECT_EQ(num_addrs, expected_num_addrs);
255
0
  }
256
0
  return Status::OK();
257
0
}
258
259
0
TEST_F_EX(CqlTest, HostnameResolutionFailureInYqlPartitionsTable, CqlThreeMastersTest) {
260
0
  google::FlagSaver flag_saver;
261
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
262
0
  ASSERT_OK(CheckNumAddressesInYqlPartitionsTable(&session, 3));
263
264
  // TEST_RpcAddress is 1-indexed.
265
0
  string hostname = server::TEST_RpcAddress(cluster_->LeaderMasterIdx() + 1,
266
0
                                            server::Private::kFalse);
267
268
  // Fail resolution of the old leader master's hostname.
269
0
  FLAGS_TEST_fail_to_fast_resolve_address = hostname;
270
0
  LOG(INFO) << "Setting FLAGS_TEST_fail_to_fast_resolve_address to: "
271
0
            << FLAGS_TEST_fail_to_fast_resolve_address;
272
273
  // Shutdown the master leader, and wait for new leader to get elected.
274
0
  ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Shutdown();
275
0
  ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
276
277
  // Assert that a new call will succeed, but will be missing the shutdown master address.
278
0
  ASSERT_OK(CheckNumAddressesInYqlPartitionsTable(&session, 2));
279
0
}
280
281
0
TEST_F_EX(CqlTest, NonRespondingMaster, CqlThreeMastersTest) {
282
0
  FLAGS_TEST_timeout_non_leader_master_rpcs = true;
283
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
284
0
  ASSERT_OK(session.ExecuteQuery("CREATE TABLE t1 (i INT PRIMARY KEY, j INT)"));
285
0
  ASSERT_OK(session.ExecuteQuery("INSERT INTO t1 (i, j) VALUES (1, 1)"));
286
0
  ASSERT_OK(session.ExecuteQuery("CREATE TABLE t2 (i INT PRIMARY KEY, j INT)"));
287
288
0
  LOG(INFO) << "Prepare";
289
0
  auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t2 (i, j) VALUES (?, ?)"));
290
0
  LOG(INFO) << "Step down";
291
0
  auto peer = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer();
292
0
  ASSERT_OK(StepDown(peer, std::string(), ForceStepDown::kTrue));
293
0
  LOG(INFO) << "Insert";
294
0
  FLAGS_client_read_write_timeout_ms = 5000;
295
0
  bool has_ok = false;
296
0
  for (int i = 0; i != 3; ++i) {
297
0
    auto stmt = prepared.Bind();
298
0
    stmt.Bind(0, i);
299
0
    stmt.Bind(1, 1);
300
0
    auto status = session.Execute(stmt);
301
0
    if (status.ok()) {
302
0
      has_ok = true;
303
0
      break;
304
0
    }
305
0
    ASSERT_NE(status.message().ToBuffer().find("timed out"), std::string::npos) << status;
306
0
  }
307
0
  ASSERT_TRUE(has_ok);
308
0
}
309
310
} // namespace yb