YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql-index-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/integration-tests/cql_test_base.h"
15
#include "yb/integration-tests/mini_cluster_utils.h"
16
17
#include "yb/util/atomic.h"
18
#include "yb/util/logging.h"
19
#include "yb/util/random_util.h"
20
#include "yb/util/status_log.h"
21
#include "yb/util/test_thread_holder.h"
22
#include "yb/util/test_util.h"
23
#include "yb/util/tsan_util.h"
24
25
using namespace std::literals;
26
27
DECLARE_bool(allow_index_table_read_write);
28
DECLARE_int32(cql_prepare_child_threshold_ms);
29
DECLARE_bool(disable_index_backfill);
30
DECLARE_bool(transactions_poll_check_aborted);
31
DECLARE_bool(TEST_disable_proactive_txn_cleanup_on_abort);
32
DECLARE_int32(client_read_write_timeout_ms);
33
DECLARE_int32(rpc_workers_limit);
34
DECLARE_uint64(transaction_manager_workers_limit);
35
DECLARE_uint64(TEST_inject_txn_get_status_delay_ms);
36
DECLARE_int64(transaction_abort_check_interval_ms);
37
38
namespace yb {
39
40
class CqlIndexTest : public CqlTestBase<MiniCluster> {
41
 public:
42
0
  virtual ~CqlIndexTest() = default;
43
44
  void TestTxnCleanup(size_t max_remaining_txns_per_tablet);
45
  void TestConcurrentModify2Columns(const std::string& expr);
46
};
47
48
YB_STRONGLY_TYPED_BOOL(UniqueIndex);
49
50
CHECKED_STATUS CreateIndexedTable(
51
0
    CassandraSession* session, UniqueIndex unique_index = UniqueIndex::kFalse) {
52
0
  RETURN_NOT_OK(
53
0
      session->ExecuteQuery("CREATE TABLE IF NOT EXISTS t (key INT PRIMARY KEY, value INT) WITH "
54
0
                            "transactions = { 'enabled' : true }"));
55
0
  return session->ExecuteQuery(
56
0
      Format("CREATE $0 INDEX IF NOT EXISTS idx ON T (value)", unique_index ? "UNIQUE" : ""));
57
0
}
58
59
0
TEST_F(CqlIndexTest, Simple) {
60
0
  constexpr int kKey = 1;
61
0
  constexpr int kValue = 2;
62
63
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
64
65
0
  ASSERT_OK(CreateIndexedTable(&session));
66
67
0
  ASSERT_OK(session.ExecuteQuery("INSERT INTO t (key, value) VALUES (1, 2)"));
68
0
  auto result = ASSERT_RESULT(session.ExecuteWithResult("SELECT * FROM t WHERE value = 2"));
69
0
  auto iter = result.CreateIterator();
70
0
  ASSERT_TRUE(iter.Next());
71
0
  auto row = iter.Row();
72
0
  ASSERT_EQ(row.Value(0).As<cass_int32_t>(), kKey);
73
0
  ASSERT_EQ(row.Value(1).As<cass_int32_t>(), kValue);
74
0
  ASSERT_FALSE(iter.Next());
75
0
}
76
77
0
TEST_F(CqlIndexTest, MultipleIndex) {
78
0
  FLAGS_disable_index_backfill = false;
79
0
  auto session1 = ASSERT_RESULT(EstablishSession(driver_.get()));
80
0
  auto session2 = ASSERT_RESULT(EstablishSession(driver_.get()));
81
82
0
  WARN_NOT_OK(session1.ExecuteQuery(
83
0
      "CREATE TABLE t (key INT PRIMARY KEY, value INT) WITH transactions = { 'enabled' : true }"),
84
0
          "Create table failed.");
85
0
  auto future1 = session1.ExecuteGetFuture("CREATE INDEX idx1 ON T (value)");
86
0
  auto future2 = session2.ExecuteGetFuture("CREATE INDEX idx2 ON T (value)");
87
88
0
  constexpr auto kNamespace = "test";
89
0
  const client::YBTableName table_name(YQL_DATABASE_CQL, kNamespace, "t");
90
0
  const client::YBTableName index_table_name1(YQL_DATABASE_CQL, kNamespace, "idx1");
91
0
  const client::YBTableName index_table_name2(YQL_DATABASE_CQL, kNamespace, "idx2");
92
93
0
  LOG(INFO) << "Waiting for idx1 got " << future1.Wait();
94
0
  auto perm1 = ASSERT_RESULT(client_->WaitUntilIndexPermissionsAtLeast(
95
0
      table_name, index_table_name1, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE));
96
0
  CHECK_EQ(perm1, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE);
97
0
  LOG(INFO) << "Waiting for idx2 got " << future2.Wait();
98
0
  auto perm2 = ASSERT_RESULT(client_->WaitUntilIndexPermissionsAtLeast(
99
0
      table_name, index_table_name2, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE));
100
0
  CHECK_EQ(perm2, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE);
101
0
}
102
103
class CqlIndexSmallWorkersTest : public CqlIndexTest {
104
 public:
105
1
  void SetUp() override {
106
1
    FLAGS_rpc_workers_limit = 4;
107
1
    FLAGS_transaction_manager_workers_limit = 4;
108
1
    CqlIndexTest::SetUp();
109
1
  }
110
};
111
112
0
TEST_F_EX(CqlIndexTest, ConcurrentIndexUpdate, CqlIndexSmallWorkersTest) {
113
0
  constexpr int kThreads = 8;
114
0
  constexpr cass_int32_t kKeys = kThreads / 2;
115
0
  constexpr cass_int32_t kValues = kKeys;
116
0
  constexpr int kNumInserts = kThreads * 5;
117
118
0
  FLAGS_client_read_write_timeout_ms = 10000 * kTimeMultiplier;
119
0
  SetAtomicFlag(1000, &FLAGS_TEST_inject_txn_get_status_delay_ms);
120
0
  FLAGS_transaction_abort_check_interval_ms = 100000;
121
122
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
123
124
0
  ASSERT_OK(LoggedWaitFor(
125
0
      [&session]() { return CreateIndexedTable(&session).ok(); }, 60s, "create table", 12s));
126
0
  constexpr auto kNamespace = "test";
127
0
  const client::YBTableName table_name(YQL_DATABASE_CQL, kNamespace, "t");
128
0
  const client::YBTableName index_table_name(YQL_DATABASE_CQL, kNamespace, "idx");
129
0
  ASSERT_OK(LoggedWaitFor(
130
0
      [this, table_name, index_table_name]() {
131
0
        auto result = client_->WaitUntilIndexPermissionsAtLeast(
132
0
            table_name, index_table_name, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE);
133
0
        return result.ok() && *result == IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE;
134
0
      },
135
0
      90s,
136
0
      "wait for create index to complete",
137
0
      12s));
138
139
0
  TestThreadHolder thread_holder;
140
0
  std::atomic<int> inserts(0);
141
0
  for (int i = 0; i != kThreads; ++i) {
142
0
    thread_holder.AddThreadFunctor([this, &inserts, &stop = thread_holder.stop_flag()] {
143
0
      auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
144
0
      auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t (key, value) VALUES (?, ?)"));
145
0
      while (!stop.load(std::memory_order_acquire)) {
146
0
        auto stmt = prepared.Bind();
147
0
        stmt.Bind(0, RandomUniformInt<cass_int32_t>(1, kKeys));
148
0
        stmt.Bind(1, RandomUniformInt<cass_int32_t>(1, kValues));
149
0
        auto status = session.Execute(stmt);
150
0
        if (status.ok()) {
151
0
          ++inserts;
152
0
        } else {
153
0
          LOG(INFO) << "Insert failed: " << status;
154
0
        }
155
0
      }
156
0
    });
157
0
  }
158
159
0
  while (!thread_holder.stop_flag().load(std::memory_order_acquire)) {
160
0
    auto num_inserts = inserts.load(std::memory_order_acquire);
161
0
    if (num_inserts >= kNumInserts) {
162
0
      break;
163
0
    }
164
0
    YB_LOG_EVERY_N_SECS(INFO, 5) << "Num inserts " << num_inserts << " of " << kNumInserts;
165
0
    std::this_thread::sleep_for(100ms);
166
0
  }
167
168
0
  thread_holder.Stop();
169
170
0
  SetAtomicFlag(0, &FLAGS_TEST_inject_txn_get_status_delay_ms);
171
0
}
172
173
0
TEST_F(CqlIndexTest, TestSaturatedWorkers) {
174
  /*
175
   * (#11258) We set a very short timeout to force failure if child transaction
176
   * is not created quickly enough.
177
178
   * TODO: when switching to a fully asynchronous model, this failure will disappear.
179
   */
180
0
  FLAGS_cql_prepare_child_threshold_ms = 1;
181
182
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
183
0
  ASSERT_OK(session.ExecuteQuery(
184
0
      "CREATE TABLE t (key INT PRIMARY KEY, v1 INT, v2 INT) WITH "
185
0
      "transactions = { 'enabled' : true }"));
186
0
  ASSERT_OK(session.ExecuteQuery(
187
0
      "CREATE INDEX i1 ON t(key, v1) WITH "
188
0
      "transactions = { 'enabled' : true }"));
189
0
  ASSERT_OK(session.ExecuteQuery(
190
0
      "CREATE INDEX i2 ON t(key, v2) WITH "
191
0
      "transactions = { 'enabled' : true }"));
192
193
0
  constexpr int kKeys = 10000;
194
0
  std::string expr = "BEGIN TRANSACTION ";
195
0
  for (int i = 0; i < kKeys; i++) {
196
0
    expr += Format("INSERT INTO t (key, v1, v2) VALUES ($0, $1, $2); ", i, i, i);
197
0
  }
198
0
  expr += "END TRANSACTION;";
199
200
  // We should expect to see timed out error
201
0
  auto status = session.ExecuteQuery(expr);
202
0
  ASSERT_FALSE(status.ok());
203
0
  ASSERT_NE(status.message().ToBuffer().find("Timed out waiting for prepare child status"),
204
0
            std::string::npos) << status;
205
0
}
206
207
YB_STRONGLY_TYPED_BOOL(CheckReady);
208
209
0
void CleanFutures(std::deque<CassandraFuture>* futures, CheckReady check_ready) {
210
0
  while (!futures->empty() && (!check_ready || futures->front().Ready())) {
211
0
    auto status = futures->front().Wait();
212
0
    if (!status.ok() && !status.IsTimedOut()) {
213
0
      auto msg = status.message().ToBuffer();
214
0
      if (msg.find("Duplicate value disallowed by unique index") == std::string::npos) {
215
0
        ASSERT_OK(status);
216
0
      }
217
0
    }
218
0
    futures->pop_front();
219
0
  }
220
0
}
221
222
0
void CqlIndexTest::TestTxnCleanup(size_t max_remaining_txns_per_tablet) {
223
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
224
225
0
  ASSERT_OK(CreateIndexedTable(&session, UniqueIndex::kTrue));
226
0
  std::deque<CassandraFuture> futures;
227
228
0
  auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t (key, value) VALUES (?, ?)"));
229
230
0
  for (int i = 0; i != RegularBuildVsSanitizers(100, 30); ++i) {
231
0
    ASSERT_NO_FATALS(CleanFutures(&futures, CheckReady::kTrue));
232
233
0
    auto stmt = prepared.Bind();
234
0
    stmt.Bind(0, i);
235
0
    stmt.Bind(1, RandomUniformInt<cass_int32_t>(1, 10));
236
0
    futures.push_back(session.ExecuteGetFuture(stmt));
237
0
  }
238
239
0
  ASSERT_NO_FATALS(CleanFutures(&futures, CheckReady::kFalse));
240
241
0
  AssertRunningTransactionsCountLessOrEqualTo(cluster_.get(), max_remaining_txns_per_tablet);
242
0
}
243
244
// Test proactive aborted transactions cleanup.
245
0
TEST_F(CqlIndexTest, TxnCleanup) {
246
0
  FLAGS_transactions_poll_check_aborted = false;
247
248
0
  TestTxnCleanup(/* max_remaining_txns_per_tablet= */ 5);
249
0
}
250
251
// Test poll based aborted transactions cleanup.
252
0
TEST_F(CqlIndexTest, TxnPollCleanup) {
253
0
  FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true;
254
0
  FLAGS_transaction_abort_check_interval_ms = 1000;
255
256
0
  TestTxnCleanup(/* max_remaining_txns_per_tablet= */ 0);
257
0
}
258
259
0
void CqlIndexTest::TestConcurrentModify2Columns(const std::string& expr) {
260
0
  FLAGS_allow_index_table_read_write = true;
261
262
0
  constexpr int kKeys = RegularBuildVsSanitizers(50, 10);
263
264
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
265
0
  ASSERT_OK(session.ExecuteQuery(
266
0
      "CREATE TABLE t (key INT PRIMARY KEY, v1 INT, v2 INT) WITH "
267
0
      "transactions = { 'enabled' : true }"));
268
0
  ASSERT_OK(session.ExecuteQuery("CREATE INDEX v1_idx ON t (v1)"));
269
270
0
  auto prepared1 = ASSERT_RESULT(session.Prepare(Format(expr, "v1")));
271
0
  auto prepared2 = ASSERT_RESULT(session.Prepare(Format(expr, "v2")));
272
273
0
  std::vector<CassandraFuture> futures;
274
275
0
  for (int i = 0; i != kKeys; ++i) {
276
0
    for (auto* prepared : {&prepared1, &prepared2}) {
277
0
      auto statement = prepared->Bind();
278
0
      statement.Bind(0, i);
279
0
      statement.Bind(1, i);
280
0
      futures.push_back(session.ExecuteGetFuture(statement));
281
0
    }
282
0
  }
283
284
0
  int good = 0;
285
0
  int bad = 0;
286
0
  for (auto& future : futures) {
287
0
    auto status = future.Wait();
288
0
    if (status.ok()) {
289
0
      ++good;
290
0
    } else {
291
0
      ASSERT_TRUE(status.IsTimedOut()) << status;
292
0
      ++bad;
293
0
    }
294
0
  }
295
296
0
  ASSERT_GE(good, bad * 4);
297
298
0
  std::vector<bool> present(kKeys);
299
300
0
  int read_keys = 0;
301
0
  auto result = ASSERT_RESULT(session.ExecuteWithResult("SELECT * FROM v1_idx"));
302
0
  auto iter = result.CreateIterator();
303
0
  while (iter.Next()) {
304
0
    auto row = iter.Row();
305
0
    auto key = row.Value(0).As<int32_t>();
306
0
    auto value = row.Value(1);
307
0
    LOG(INFO) << key << ", " << value.ToString();
308
309
0
    ASSERT_FALSE(present[key]) << key;
310
0
    present[key] = true;
311
0
    ++read_keys;
312
0
    ASSERT_FALSE(value.IsNull());
313
0
    ASSERT_EQ(key, value.As<int32_t>());
314
0
  }
315
316
0
  ASSERT_EQ(read_keys, kKeys);
317
0
}
318
319
0
TEST_F(CqlIndexTest, ConcurrentInsert2Columns) {
320
0
  TestConcurrentModify2Columns("INSERT INTO t ($0, key) VALUES (?, ?)");
321
0
}
322
323
0
TEST_F(CqlIndexTest, ConcurrentUpdate2Columns) {
324
0
  TestConcurrentModify2Columns("UPDATE t SET $0 = ? WHERE key = ?");
325
0
}
326
327
} // namespace yb