YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql-tablet-split-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <thread>
16
17
#include <boost/range/adaptors.hpp>
18
#include <gtest/gtest.h>
19
20
#include "yb/client/table_info.h"
21
22
#include "yb/consensus/consensus.h"
23
24
#include "yb/gutil/strings/join.h"
25
26
#include "yb/integration-tests/cluster_itest_util.h"
27
#include "yb/integration-tests/cql_test_base.h"
28
#include "yb/integration-tests/external_mini_cluster.h"
29
#include "yb/integration-tests/load_generator.h"
30
#include "yb/integration-tests/mini_cluster.h"
31
32
#include "yb/master/master_client.pb.h"
33
#include "yb/master/mini_master.h"
34
35
#include "yb/tablet/tablet_metadata.h"
36
#include "yb/tablet/tablet_peer.h"
37
38
#include "yb/util/format.h"
39
#include "yb/util/logging.h"
40
#include "yb/util/monotime.h"
41
#include "yb/util/random.h"
42
#include "yb/util/size_literals.h"
43
#include "yb/util/status_log.h"
44
#include "yb/util/sync_point.h"
45
#include "yb/util/test_thread_holder.h"
46
#include "yb/util/test_util.h"
47
#include "yb/util/tsan_util.h"
48
49
using namespace std::literals;  // NOLINT
50
51
DECLARE_int32(heartbeat_interval_ms);
52
DECLARE_int32(tserver_heartbeat_metrics_interval_ms);
53
DECLARE_int32(cleanup_split_tablets_interval_sec);
54
DECLARE_int64(db_block_size_bytes);
55
DECLARE_int64(db_filter_block_size_bytes);
56
DECLARE_int64(db_index_block_size_bytes);
57
DECLARE_int64(db_write_buffer_size);
58
DECLARE_int32(yb_num_shards_per_tserver);
59
DECLARE_bool(enable_automatic_tablet_splitting);
60
DECLARE_int64(tablet_split_low_phase_size_threshold_bytes);
61
DECLARE_int64(tablet_split_high_phase_size_threshold_bytes);
62
DECLARE_int64(tablet_split_low_phase_shard_count_per_node);
63
DECLARE_int64(tablet_split_high_phase_shard_count_per_node);
64
DECLARE_int64(tablet_force_split_threshold_bytes);
65
66
DECLARE_double(TEST_simulate_lookup_partition_list_mismatch_probability);
67
DECLARE_bool(TEST_reject_delete_not_serving_tablet_rpc);
68
69
namespace yb {
70
71
namespace {
72
73
0
size_t GetNumActiveTablets(MiniCluster* cluster) {
74
0
  return ListTabletPeers(
75
0
             cluster,
76
0
             [](const std::shared_ptr<tablet::TabletPeer>& peer) -> bool {
77
0
               const auto tablet_meta = peer->tablet_metadata();
78
0
               const auto consensus = peer->shared_consensus();
79
0
               return tablet_meta && consensus &&
80
0
                      tablet_meta->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE &&
81
0
                      tablet_meta->tablet_data_state() !=
82
0
                          tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED &&
83
0
                      consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
84
0
             })
85
0
      .size();
86
0
}
87
88
Result<size_t> GetNumActiveTablets(
89
    ExternalMiniCluster* cluster, const client::YBTableName& table_name, const MonoDelta& timeout,
90
114
    const RequireTabletsRunning require_tablets_running) {
91
114
  master::GetTableLocationsResponsePB resp;
92
114
  RETURN_NOT_OK(itest::GetTableLocations(
93
114
      cluster, table_name, timeout, require_tablets_running, &resp));
94
114
  return resp.tablet_locations_size();
95
114
}
96
97
} // namespace
98
99
const auto kSecondaryIndexTestTableName =
100
    client::YBTableName(YQL_DATABASE_CQL, kCqlTestKeyspace, "cql_test_table");
101
102
class CqlTabletSplitTest : public CqlTestBase<MiniCluster> {
103
 protected:
104
2
  void SetUp() override {
105
2
    FLAGS_yb_num_shards_per_tserver = 1;
106
2
    FLAGS_enable_automatic_tablet_splitting = true;
107
108
    // Setting this very low will just cause to include metrics in every heartbeat, no overhead on
109
    // setting it lower than FLAGS_heartbeat_interval_ms.
110
2
    FLAGS_tserver_heartbeat_metrics_interval_ms = 1;
111
2
    FLAGS_heartbeat_interval_ms = 1000;
112
113
    // Reduce cleanup waiting time, so tests are completed faster.
114
2
    FLAGS_cleanup_split_tablets_interval_sec = 1;
115
116
2
    FLAGS_tablet_split_low_phase_size_threshold_bytes = 0;
117
2
    FLAGS_tablet_split_high_phase_size_threshold_bytes = 0;
118
2
    FLAGS_tablet_split_low_phase_shard_count_per_node = 0;
119
2
    FLAGS_tablet_split_high_phase_shard_count_per_node = 0;
120
2
    FLAGS_tablet_force_split_threshold_bytes = 64_KB;
121
2
    FLAGS_db_write_buffer_size = FLAGS_tablet_force_split_threshold_bytes;
122
2
    FLAGS_db_block_size_bytes = 2_KB;
123
2
    FLAGS_db_filter_block_size_bytes = 2_KB;
124
2
    FLAGS_db_index_block_size_bytes = 2_KB;
125
2
    CqlTestBase::SetUp();
126
2
  }
127
128
0
  void WaitUntilAllCommittedOpsApplied(const MonoDelta timeout) {
129
0
    const auto splits_completion_deadline = MonoTime::Now() + timeout;
130
0
    for (auto& peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kAll)) {
131
0
      auto consensus = peer->shared_consensus();
132
0
      if (consensus) {
133
0
        ASSERT_OK(Wait([consensus]() -> Result<bool> {
134
0
          return consensus->GetLastAppliedOpId() >= consensus->GetLastCommittedOpId();
135
0
        }, splits_completion_deadline, "Waiting for all committed ops to be applied"));
136
0
      }
137
0
    }
138
0
  }
139
140
  // Disable splitting and wait for pending splits to complete.
141
0
  void StopSplitsAndWait() {
142
0
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = false;
143
    // Give time to leaders for applying split ops that has been already scheduled.
144
0
    std::this_thread::sleep_for(1s * kTimeMultiplier);
145
    // Wait until followers also apply those split ops.
146
0
    ASSERT_NO_FATALS(WaitUntilAllCommittedOpsApplied(15s * kTimeMultiplier));
147
0
    LOG(INFO) << "Number of active tablets: " << GetNumActiveTablets(cluster_.get());
148
0
  }
149
150
0
  void DoTearDown() override {
151
    // TODO(tsplit): remove this workaround after
152
    // https://github.com/yugabyte/yugabyte-db/issues/8222 is fixed.
153
0
    StopSplitsAndWait();
154
0
    CqlTestBase::DoTearDown();
155
0
  }
156
157
  void StartSecondaryIndexTest();
158
  void CompleteSecondaryIndexTest(int num_splits, MonoDelta timeout);
159
160
  int writer_threads_ = 2;
161
  int reader_threads_ = 4;
162
  int value_size_bytes_ = 1024;
163
  int max_write_errors_ = 100;
164
  int max_read_errors_ = 100;
165
  CassandraSession session_;
166
  std::atomic<bool> stop_requested_{false};
167
  std::unique_ptr<load_generator::SessionFactory> load_session_factory_;
168
  std::unique_ptr<load_generator::MultiThreadedWriter> writer_;
169
  std::unique_ptr<load_generator::MultiThreadedReader> reader_;
170
  size_t start_num_active_tablets_;
171
};
172
173
class CqlTabletSplitTestMultiMaster : public CqlTabletSplitTest {
174
 public:
175
1
  int num_masters() override {
176
1
    return 3;
177
1
  }
178
};
179
180
class CqlSecondaryIndexWriter : public load_generator::SingleThreadedWriter {
181
 public:
182
  CqlSecondaryIndexWriter(
183
      load_generator::MultiThreadedWriter* writer, int writer_index, CppCassandraDriver* driver)
184
0
      : SingleThreadedWriter(writer, writer_index), driver_(driver) {}
185
186
 private:
187
  CppCassandraDriver* driver_;
188
  CassandraSession session_;
189
  CassandraPrepared prepared_insert_;
190
191
  void ConfigureSession() override;
192
  void CloseSession() override;
193
  bool Write(int64_t key_index, const string& key_str, const string& value_str) override;
194
  void HandleInsertionFailure(int64_t key_index, const string& key_str) override;
195
};
196
197
0
void CqlSecondaryIndexWriter::ConfigureSession() {
198
0
  session_ = CHECK_RESULT(EstablishSession(driver_));
199
0
  prepared_insert_ = CHECK_RESULT(session_.Prepare(Format(
200
0
      "INSERT INTO $0 (k, v) VALUES (?, ?)", kSecondaryIndexTestTableName.table_name())));
201
0
}
202
203
0
void CqlSecondaryIndexWriter::CloseSession() {
204
0
  session_.Reset();
205
0
}
206
207
bool CqlSecondaryIndexWriter::Write(
208
0
    int64_t key_index, const string& key_str, const string& value_str) {
209
0
  auto stmt = prepared_insert_.Bind();
210
0
  stmt.Bind(0, key_str);
211
0
  stmt.Bind(1, value_str);
212
0
  auto status = session_.Execute(stmt);
213
0
  if (!status.ok()) {
214
0
    LOG(INFO) << "Insert failed: " << AsString(status);
215
0
    return false;
216
0
  }
217
0
  return true;
218
0
}
219
220
0
void CqlSecondaryIndexWriter::HandleInsertionFailure(int64_t key_index, const string& key_str) {
221
0
}
222
223
class CqlSecondaryIndexReader : public load_generator::SingleThreadedReader {
224
 public:
225
  CqlSecondaryIndexReader(
226
      load_generator::MultiThreadedReader* writer, int writer_index, CppCassandraDriver* driver)
227
0
      : SingleThreadedReader(writer, writer_index), driver_(driver) {}
228
229
 private:
230
  CppCassandraDriver* driver_;
231
  CassandraSession session_;
232
  CassandraPrepared prepared_select_;
233
234
  void ConfigureSession() override;
235
  void CloseSession() override;
236
  load_generator::ReadStatus PerformRead(
237
      int64_t key_index, const string& key_str, const string& expected_value) override;
238
};
239
240
0
void CqlSecondaryIndexReader::ConfigureSession() {
241
0
  session_ = CHECK_RESULT(EstablishSession(driver_));
242
0
  prepared_select_ = CHECK_RESULT(session_.Prepare(
243
0
      Format("SELECT k, v FROM $0 WHERE v = ?", kSecondaryIndexTestTableName.table_name())));
244
0
}
245
246
0
void CqlSecondaryIndexReader::CloseSession() {
247
0
  session_.Reset();
248
0
}
249
250
load_generator::ReadStatus CqlSecondaryIndexReader::PerformRead(
251
0
      int64_t key_index, const string& key_str, const string& expected_value) {
252
0
  auto stmt = prepared_select_.Bind();
253
0
  stmt.Bind(0, expected_value);
254
0
  auto result = session_.ExecuteWithResult(stmt);
255
0
  if (!result.ok()) {
256
0
    LOG(WARNING) << "Select failed: " << AsString(result.status());
257
0
    return load_generator::ReadStatus::kOtherError;
258
0
  }
259
0
  auto iter = result->CreateIterator();
260
0
  auto values_formatter = [&] {
261
0
    return Format(
262
0
        "for v: '$0', expected key: '$1', key_index: $2", expected_value, key_str, key_index);
263
0
  };
264
0
  if (!iter.Next()) {
265
0
    LOG(ERROR) << "No rows found " << values_formatter();
266
0
    return load_generator::ReadStatus::kNoRows;
267
0
  }
268
0
  auto row = iter.Row();
269
0
  const auto k = row.Value(0).ToString();
270
0
  if (k != key_str) {
271
0
    LOG(ERROR) << "Invalid k " << values_formatter() << " got k: " << k;
272
0
    return load_generator::ReadStatus::kInvalidRead;
273
0
  }
274
0
  if (iter.Next()) {
275
0
    return load_generator::ReadStatus::kExtraRows;
276
0
    LOG(ERROR) << "More than 1 row found " << values_formatter();
277
0
    do {
278
0
      LOG(ERROR) << "k: " << iter.Row().Value(0).ToString();
279
0
    } while (iter.Next());
280
0
  }
281
0
  return load_generator::ReadStatus::kOk;
282
0
}
283
284
class CqlSecondaryIndexSessionFactory : public load_generator::SessionFactory {
285
 public:
286
0
  explicit CqlSecondaryIndexSessionFactory(CppCassandraDriver* driver) : driver_(driver) {}
287
288
0
  std::string ClientId() override { return "CQL secondary index test client"; }
289
290
  load_generator::SingleThreadedWriter* GetWriter(
291
0
      load_generator::MultiThreadedWriter* writer, int idx) override {
292
0
    return new CqlSecondaryIndexWriter(writer, idx, driver_);
293
0
  }
294
295
  load_generator::SingleThreadedReader* GetReader(
296
0
      load_generator::MultiThreadedReader* reader, int idx) override {
297
0
    return new CqlSecondaryIndexReader(reader, idx, driver_);
298
0
  }
299
300
 protected:
301
  CppCassandraDriver* driver_;
302
};
303
304
0
void CqlTabletSplitTest::StartSecondaryIndexTest() {
305
0
  const auto kNumRows = std::numeric_limits<int64_t>::max();
306
307
0
  session_ = ASSERT_RESULT(EstablishSession(driver_.get()));
308
0
  ASSERT_OK(session_.ExecuteQuery(Format(
309
0
      "CREATE TABLE $0 (k varchar PRIMARY KEY, v varchar) WITH transactions = "
310
0
      "{ 'enabled' : true }",
311
0
      kSecondaryIndexTestTableName.table_name())));
312
0
  ASSERT_OK(session_.ExecuteQuery(Format(
313
0
      "CREATE INDEX $0_by_value ON $0(v) WITH transactions = { 'enabled' : true }",
314
0
      kSecondaryIndexTestTableName.table_name())));
315
316
0
  start_num_active_tablets_ = GetNumActiveTablets(cluster_.get());
317
0
  LOG(INFO) << "Number of active tablets at workload start: " << start_num_active_tablets_;
318
319
0
  load_session_factory_ = std::make_unique<CqlSecondaryIndexSessionFactory>(driver_.get());
320
0
  stop_requested_ = false;
321
322
0
  writer_ = std::make_unique<load_generator::MultiThreadedWriter>(
323
0
      kNumRows, /* start_key = */ 0, writer_threads_, load_session_factory_.get(), &stop_requested_,
324
0
      value_size_bytes_, max_write_errors_);
325
0
  reader_ = std::make_unique<load_generator::MultiThreadedReader>(
326
0
      kNumRows, reader_threads_, load_session_factory_.get(), writer_->InsertionPoint(),
327
0
      writer_->InsertedKeys(), writer_->FailedKeys(), &stop_requested_, value_size_bytes_,
328
0
      max_read_errors_);
329
330
0
  LOG(INFO) << "Started workload";
331
0
  writer_->Start();
332
0
  reader_->Start();
333
0
}
334
335
0
void CqlTabletSplitTest::CompleteSecondaryIndexTest(const int num_splits, const MonoDelta timeout) {
336
0
  const auto num_wait_for_active_tablets = start_num_active_tablets_ + num_splits;
337
0
  size_t num_active_tablets;
338
339
0
  ASSERT_OK(LoggedWaitFor(
340
0
      [&]() {
341
0
        num_active_tablets = GetNumActiveTablets(cluster_.get());
342
0
        YB_LOG_EVERY_N_SECS(INFO, 5) << "Number of active tablets: " << num_active_tablets;
343
0
        if (!writer_->IsRunning()) {
344
0
          return true;
345
0
        }
346
0
        if (num_active_tablets >= num_wait_for_active_tablets) {
347
0
          return true;
348
0
        }
349
0
        return false;
350
0
      },
351
0
      timeout,
352
0
      Format("Waiting for $0 active tablets or writer stopped", num_wait_for_active_tablets)));
353
0
  LOG(INFO) << "Number of active tablets: " << num_active_tablets;
354
355
0
  writer_->Stop();
356
0
  reader_->Stop();
357
0
  writer_->WaitForCompletion();
358
0
  reader_->WaitForCompletion();
359
360
0
  LOG(INFO) << "Workload complete, num_writes: " << writer_->num_writes()
361
0
            << ", num_write_errors: " << writer_->num_write_errors()
362
0
            << ", num_reads: " << reader_->num_reads()
363
0
            << ", num_read_errors: " << reader_->num_read_errors()
364
0
            << ", splits done: " << num_active_tablets - start_num_active_tablets_;
365
0
  ASSERT_EQ(reader_->read_status_stopped(), load_generator::ReadStatus::kOk)
366
0
      << " reader stopped due to: " << AsString(reader_->read_status_stopped());
367
0
  ASSERT_LE(writer_->num_write_errors(), max_write_errors_);
368
0
}
369
370
0
TEST_F(CqlTabletSplitTest, SecondaryIndex) {
371
0
  const auto kNumSplits = 10;
372
373
0
  ASSERT_NO_FATALS(StartSecondaryIndexTest());
374
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability) = 0.5;
375
0
  ASSERT_NO_FATALS(CompleteSecondaryIndexTest(kNumSplits, 300s * kTimeMultiplier));
376
0
}
377
378
0
TEST_F_EX(CqlTabletSplitTest, SecondaryIndexWithDrop, CqlTabletSplitTestMultiMaster) {
379
0
  const auto kNumSplits = 3;
380
0
  const auto kNumTestIters = 2;
381
382
0
#ifndef NDEBUG
383
0
  SyncPoint::GetInstance()->LoadDependency(
384
0
      {{"CatalogManager::DeleteNotServingTablet:Reject",
385
0
        "CqlTabletSplitTest::SecondaryIndexWithDrop:WaitForReject"}});
386
0
#endif // NDEBUG
387
388
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
389
390
0
  for (auto iter = 1; iter <= kNumTestIters; ++iter) {
391
0
    LOG(INFO) << "Iteration " << iter;
392
0
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_reject_delete_not_serving_tablet_rpc) = true;
393
0
#ifndef NDEBUG
394
0
    SyncPoint::GetInstance()->EnableProcessing();
395
0
#endif // NDEBUG
396
397
0
    ASSERT_NO_FATALS(StartSecondaryIndexTest());
398
0
    const auto table_info =
399
0
        ASSERT_RESULT(client->GetYBTableInfo(kSecondaryIndexTestTableName));
400
0
    ASSERT_NO_FATALS(CompleteSecondaryIndexTest(kNumSplits, 300s * kTimeMultiplier));
401
402
0
    TEST_SYNC_POINT("CqlTabletSplitTest::SecondaryIndexWithDrop:WaitForReject");
403
404
0
    if (iter > 1) {
405
      // Test tracking split tablets in case of leader master failover.
406
0
      const auto leader_master_idx = cluster_->LeaderMasterIdx();
407
0
      const auto sys_catalog_tablet_peer_leader =
408
0
          cluster_->mini_master(leader_master_idx)->tablet_peer();
409
0
      const auto sys_catalog_tablet_peer_follower =
410
0
          cluster_->mini_master((leader_master_idx + 1) % cluster_->num_masters())->tablet_peer();
411
0
      LOG(INFO) << "Iteration " << iter << ": stepping down master leader";
412
0
      ASSERT_OK(StepDown(
413
0
          sys_catalog_tablet_peer_leader, sys_catalog_tablet_peer_follower->permanent_uuid(),
414
0
          ForceStepDown::kFalse));
415
0
      LOG(INFO) << "Iteration " << iter << ": stepping down master leader - done";
416
0
    }
417
418
0
    LOG(INFO) << "Iteration " << iter << ": deleting test table";
419
0
    ASSERT_OK(session_.ExecuteQuery("DROP TABLE " + kSecondaryIndexTestTableName.table_name()));
420
0
    LOG(INFO) << "Iteration " << iter << ": deleted test table";
421
422
    // Make sure all table tablets deleted on all tservers.
423
0
    auto peer_to_str = [](const tablet::TabletPeerPtr& peer) { return peer->LogPrefix(); };
424
0
    std::vector<tablet::TabletPeerPtr> tablet_peers;
425
0
    auto s = LoggedWaitFor([&]() -> Result<bool> {
426
0
      tablet_peers = ListTableTabletPeers(cluster_.get(), table_info.table_id);
427
0
      return tablet_peers.size() == 0;
428
0
    }, 10s * kTimeMultiplier, "Waiting for tablets to be deleted");
429
0
    ASSERT_TRUE(s.ok()) << AsString(s) + ": expected tablets to be deleted, but following left:\n"
430
0
                        << JoinStrings(
431
0
                               tablet_peers | boost::adaptors::transformed(peer_to_str), "\n");
432
433
0
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_reject_delete_not_serving_tablet_rpc) = false;
434
0
#ifndef NDEBUG
435
0
    SyncPoint::GetInstance()->DisableProcessing();
436
0
    SyncPoint::GetInstance()->ClearTrace();
437
0
#endif // NDEBUG
438
0
  }
439
0
}
440
441
class CqlTabletSplitTestExt : public CqlTestBase<ExternalMiniCluster> {
442
 protected:
443
2
  void SetUpFlags() override {
444
2
    const int64 kSplitThreshold = 64_KB;
445
446
2
    std::vector<std::string> common_flags;
447
2
    common_flags.push_back("--yb_num_shards_per_tserver=1");
448
449
2
    auto& master_flags = mini_cluster_opt_.extra_master_flags;
450
2
    master_flags.push_back(
451
2
        Format("--replication_factor=$0", std::min(num_tablet_servers(), 3)));
452
2
    master_flags.push_back("--enable_automatic_tablet_splitting=true");
453
2
    master_flags.push_back("--tablet_split_low_phase_size_threshold_bytes=0");
454
2
    master_flags.push_back("--tablet_split_high_phase_size_threshold_bytes=0");
455
2
    master_flags.push_back("--tablet_split_low_phase_shard_count_per_node=0");
456
2
    master_flags.push_back("--tablet_split_high_phase_shard_count_per_node=0");
457
2
    master_flags.push_back(Format("--tablet_force_split_threshold_bytes=$0", kSplitThreshold));
458
459
2
    auto& tserver_flags = mini_cluster_opt_.extra_tserver_flags;
460
2
    tserver_flags.push_back(Format("--db_write_buffer_size=$0", kSplitThreshold));
461
    // Lower SST block size to have multiple entries in index and be able to detect a split key.
462
2
    tserver_flags.push_back(Format(
463
2
        "--db_block_size_bytes=$0", std::min(FLAGS_db_block_size_bytes, kSplitThreshold / 8)));
464
465
2
    for (auto& flag : common_flags) {
466
2
      master_flags.push_back(flag);
467
2
      tserver_flags.push_back(flag);
468
2
    }
469
2
  }
470
};
471
472
class CqlTabletSplitTestExtRf1 : public CqlTabletSplitTestExt {
473
 public:
474
2
  int num_tablet_servers() override { return 1; }
475
};
476
477
struct BatchTimeseriesDataSource {
478
28.9k
  explicit BatchTimeseriesDataSource(const std::string& metric_id_) : metric_id(metric_id_) {}
479
480
  const std::string metric_id;
481
  const int64_t data_emit_start_ts = 1;
482
  std::atomic<int64_t> last_emitted_ts{-1};
483
};
484
485
CHECKED_STATUS RunBatchTimeSeriesTest(
486
    ExternalMiniCluster* cluster, CppCassandraDriver* driver, const int num_splits,
487
2
    const MonoDelta timeout) {
488
2
  const auto kWriterThreads = 4;
489
2
  const auto kReaderThreads = 4;
490
2
  const auto kMinMetricsCount = 10000;
491
2
  const auto kMaxMetricsCount = 20000;
492
2
  const auto kReadBatchSize = 100;
493
2
  const auto kWriteBatchSize = 500;
494
2
  const auto kReadBackDeltaTime = 100;
495
2
  const auto kValueSize = 100;
496
497
2
  const auto kMaxWriteErrors = 100;
498
2
  const auto kMaxReadErrors = 100;
499
500
2
  const auto kTableTtlSeconds = MonoDelta(24h).ToSeconds();
501
502
2
  const client::YBTableName kTableName(YQL_DATABASE_CQL, "test", "batch_timeseries_test");
503
504
2
  std::atomic_int num_reads(0);
505
2
  std::atomic_int num_writes(0);
506
2
  std::atomic_int num_read_errors(0);
507
2
  std::atomic_int num_write_errors(0);
508
509
2
  std::mt19937_64 rng(/* seed */ 29383);
510
2
  const auto num_metrics = RandomUniformInt<>(kMinMetricsCount, kMaxMetricsCount - 1, &rng);
511
2
  std::vector<std::unique_ptr<BatchTimeseriesDataSource>> data_sources;
512
28.9k
  for (int i = 0; i < num_metrics; ++i) {
513
28.9k
    data_sources.emplace_back(std::make_unique<BatchTimeseriesDataSource>(Format("metric-$0", i)));
514
28.9k
  }
515
516
2
  auto session = CHECK_RESULT(EstablishSession(driver));
517
518
2
  RETURN_NOT_OK(
519
2
      session.ExecuteQuery(Format(
520
2
          "CREATE TABLE $0 ("
521
2
            "metric_id varchar, "
522
2
            "ts bigint, "
523
2
            "value varchar, "
524
2
            "primary key (metric_id, ts)) "
525
2
            "WITH default_time_to_live = $1", kTableName.table_name(), kTableTtlSeconds)));
526
527
2
  const auto start_num_active_tablets = VERIFY_RESULT(GetNumActiveTablets(
528
2
      cluster, kTableName, 60s * kTimeMultiplier, RequireTabletsRunning::kTrue));
529
2
  LOG(INFO) << "Number of active tablets at workload start: " << start_num_active_tablets;
530
531
2
  auto prepared_write = VERIFY_RESULT(session.Prepare(Format(
532
2
      "INSERT INTO $0 (metric_id, ts, value) VALUES (?, ?, ?)", kTableName.table_name())));
533
534
2
  auto prepared_read = VERIFY_RESULT(session.Prepare(Format(
535
2
      "SELECT * from $0 WHERE metric_id = ? AND ts > ? AND ts < ? ORDER BY ts DESC LIMIT ?",
536
2
      kTableName.table_name())));
537
538
2
  std::mutex random_mutex;
539
936k
  auto get_random_source = [&rng, &random_mutex, &data_sources]() -> BatchTimeseriesDataSource* {
540
936k
    std::lock_guard<decltype(random_mutex)> lock(random_mutex);
541
936k
    return RandomElement(data_sources, &rng).get();
542
936k
  };
543
544
368k
  auto get_value = [](int64_t ts, std::string* value) {
545
368k
    value->clear();
546
368k
    value->append(AsString(ts));
547
368k
    const auto suffix_size = value->size() >= kValueSize ? 0 : kValueSize - value->size();
548
9.57M
    while (value->size() < kValueSize) {
549
9.20M
      value->append(Uint16ToHexString(RandomUniformInt<uint16_t>()));
550
9.20M
    }
551
368k
    if (suffix_size > 0) {
552
368k
      value->resize(kValueSize);
553
368k
    }
554
368k
    return value;
555
368k
  };
556
557
2
  TestThreadHolder io_threads;
558
8
  auto reader = [&, &stop = io_threads.stop_flag()]() -> void {
559
936k
    while (!stop.load(std::memory_order_acquire)) {
560
936k
      auto& source = *CHECK_NOTNULL(get_random_source());
561
936k
      if (source.last_emitted_ts < source.data_emit_start_ts) {
562
931k
        continue;
563
931k
      }
564
4.33k
      const int64_t end_ts = source.last_emitted_ts + 1;
565
4.33k
      const int64_t start_ts = std::max(end_ts - kReadBackDeltaTime, source.data_emit_start_ts);
566
567
4.33k
      auto stmt = prepared_read.Bind();
568
4.33k
      stmt.Bind(0, source.metric_id);
569
4.33k
      stmt.Bind(1, start_ts);
570
4.33k
      stmt.Bind(2, end_ts);
571
4.33k
      stmt.Bind(3, kReadBatchSize);
572
4.33k
      auto status = session.Execute(stmt);
573
4.33k
      if (!status.ok()) {
574
0
        YB_LOG_EVERY_N_SECS(INFO, 1) << "Read failed: " << AsString(status);
575
0
        num_read_errors++;
576
4.33k
      } else {
577
4.33k
        num_reads += 1;
578
4.33k
      }
579
4.33k
      YB_LOG_EVERY_N_SECS(INFO, 5)
580
12
          << "Completed " << num_reads << " reads, read errors: " << num_read_errors;
581
4.33k
    }
582
8
  };
583
584
2
  std::atomic<bool> stop_writes{false};
585
8
  auto writer = [&, &stop = io_threads.stop_flag()]() -> void {
586
8
    std::string value;
587
    // Reserve more bytes, because we fill by 2-byte blocks and might overfill and then
588
    // truncate, but don't want reallocation on growth.
589
8
    value.reserve(kValueSize + 1);
590
745
    while (!stop.load(std::memory_order_acquire) && !stop_writes.load(std::memory_order_acquire)) {
591
737
      auto& source = *CHECK_NOTNULL(get_random_source());
592
593
737
      if (source.last_emitted_ts == -1) {
594
728
        source.last_emitted_ts = source.data_emit_start_ts;
595
728
      }
596
737
      auto ts = source.last_emitted_ts.load(std::memory_order_acquire);
597
737
      CassandraBatch batch(CassBatchType::CASS_BATCH_TYPE_LOGGED);
598
599
369k
      for (int i = 0; i < kWriteBatchSize; ++i) {
600
368k
        auto stmt = prepared_write.Bind();
601
368k
        stmt.Bind(0, source.metric_id);
602
368k
        stmt.Bind(1, ts);
603
368k
        get_value(ts, &value);
604
368k
        stmt.Bind(2, value);
605
368k
        batch.Add(&stmt);
606
368k
        ts++;
607
368k
      }
608
609
737
      auto status = session.ExecuteBatch(batch);
610
737
      if (!status.ok()) {
611
0
        YB_LOG_EVERY_N_SECS(INFO, 1) << "Write failed: " << AsString(status);
612
0
        num_write_errors++;
613
737
      } else {
614
737
        num_writes += kWriteBatchSize;
615
737
        source.last_emitted_ts = ts;
616
737
      }
617
737
      YB_LOG_EVERY_N_SECS(INFO, 5)
618
12
          << "Completed " << num_writes << " writes, num_write_errors: " << num_write_errors;
619
737
    }
620
8
  };
621
622
10
  for (int i = 0; i < kReaderThreads; ++i) {
623
8
    io_threads.AddThreadFunctor(reader);
624
8
  }
625
10
  for (int i = 0; i < kWriterThreads; ++i) {
626
8
    io_threads.AddThreadFunctor(writer);
627
8
  }
628
629
2
  const auto deadline = CoarseMonoClock::Now() + timeout;
630
2
  const auto num_wait_for_active_tablets = start_num_active_tablets + num_splits;
631
2
  size_t num_active_tablets = start_num_active_tablets;
632
112
  while (CoarseMonoClock::Now() < deadline && num_read_errors < kMaxReadErrors &&
633
112
         num_write_errors < kMaxWriteErrors) {
634
    // When we get num_wait_for_active_tablets active tablets (not necessarily running), we stop
635
    // writes to avoid creating too many post-split tablets and overloading local cluster.
636
    // After stopping writes, we continue reads and wait for num_wait_for_active_tablets active
637
    // running tablets.
638
112
    auto num_active_tablets_res = GetNumActiveTablets(
639
112
        cluster, kTableName, 10s * kTimeMultiplier,
640
112
        RequireTabletsRunning(stop_writes.load(std::memory_order_acquire)));
641
112
    if (num_active_tablets_res.ok()) {
642
112
      num_active_tablets = *num_active_tablets_res;
643
112
      YB_LOG_EVERY_N_SECS(INFO, 3) << "Number of active tablets: " << num_active_tablets;
644
112
      if (num_active_tablets >= num_wait_for_active_tablets) {
645
4
        if (stop_writes.load(std::memory_order_acquire)) {
646
2
          break;
647
2
        } else {
648
2
          LOG(INFO) << "Stopping writes";
649
2
          stop_writes = true;
650
2
        }
651
4
      }
652
112
    }
653
110
    SleepFor(500ms);
654
110
  }
655
2
  if (CoarseMonoClock::Now() >= deadline) {
656
    // Produce a core dump for investigation.
657
0
    for (auto* daemon : cluster->daemons()) {
658
0
      ERROR_NOT_OK(daemon->Kill(SIGSEGV), "Failed to crash process: ");
659
0
    }
660
0
  }
661
662
2
  io_threads.Stop();
663
2
  LOG(INFO) << "num_reads: " << num_reads;
664
2
  LOG(INFO) << "num_writes: " << num_writes;
665
2
  LOG(INFO) << "num_read_errors: " << num_read_errors;
666
2
  LOG(INFO) << "num_write_errors: " << num_write_errors;
667
2
  EXPECT_LE(num_read_errors, kMaxReadErrors);
668
2
  EXPECT_LE(num_write_errors, kMaxWriteErrors);
669
2
  SCHECK_GE(
670
2
      num_active_tablets, num_wait_for_active_tablets, IllegalState,
671
2
      Format("Didn't achieve $0 splits", num_splits));
672
673
2
  return Status::OK();
674
2
}
675
676
1
TEST_F_EX(CqlTabletSplitTest, BatchTimeseries, CqlTabletSplitTestExt) {
677
  // TODO(#10498) - Set this back to 20 once outstanding_tablet_split_limit is set back to a higher
678
  // value.
679
1
  const auto kNumSplits = 4;
680
1
  ASSERT_OK(
681
1
      RunBatchTimeSeriesTest(cluster_.get(), driver_.get(), kNumSplits, 300s * kTimeMultiplier));
682
1
}
683
684
1
TEST_F_EX(CqlTabletSplitTest, BatchTimeseriesRf1, CqlTabletSplitTestExtRf1) {
685
  // TODO(#10498) - Set this back to 20 once outstanding_tablet_split_limit is set back to a higher
686
  // value.
687
1
  const auto kNumSplits = 4;
688
1
  ASSERT_OK(
689
1
      RunBatchTimeSeriesTest(cluster_.get(), driver_.get(), kNumSplits, 300s * kTimeMultiplier));
690
1
}
691
692
}  // namespace yb