/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql-tablet-split-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <atomic> |
15 | | #include <thread> |
16 | | |
17 | | #include <boost/range/adaptors.hpp> |
18 | | #include <gtest/gtest.h> |
19 | | |
20 | | #include "yb/client/table_info.h" |
21 | | |
22 | | #include "yb/consensus/consensus.h" |
23 | | |
24 | | #include "yb/gutil/strings/join.h" |
25 | | |
26 | | #include "yb/integration-tests/cluster_itest_util.h" |
27 | | #include "yb/integration-tests/cql_test_base.h" |
28 | | #include "yb/integration-tests/external_mini_cluster.h" |
29 | | #include "yb/integration-tests/load_generator.h" |
30 | | #include "yb/integration-tests/mini_cluster.h" |
31 | | |
32 | | #include "yb/master/master_client.pb.h" |
33 | | #include "yb/master/mini_master.h" |
34 | | |
35 | | #include "yb/tablet/tablet_metadata.h" |
36 | | #include "yb/tablet/tablet_peer.h" |
37 | | |
38 | | #include "yb/util/format.h" |
39 | | #include "yb/util/logging.h" |
40 | | #include "yb/util/monotime.h" |
41 | | #include "yb/util/random.h" |
42 | | #include "yb/util/size_literals.h" |
43 | | #include "yb/util/status_log.h" |
44 | | #include "yb/util/sync_point.h" |
45 | | #include "yb/util/test_thread_holder.h" |
46 | | #include "yb/util/test_util.h" |
47 | | #include "yb/util/tsan_util.h" |
48 | | |
49 | | using namespace std::literals; // NOLINT |
50 | | |
51 | | DECLARE_int32(heartbeat_interval_ms); |
52 | | DECLARE_int32(tserver_heartbeat_metrics_interval_ms); |
53 | | DECLARE_int32(cleanup_split_tablets_interval_sec); |
54 | | DECLARE_int64(db_block_size_bytes); |
55 | | DECLARE_int64(db_filter_block_size_bytes); |
56 | | DECLARE_int64(db_index_block_size_bytes); |
57 | | DECLARE_int64(db_write_buffer_size); |
58 | | DECLARE_int32(yb_num_shards_per_tserver); |
59 | | DECLARE_bool(enable_automatic_tablet_splitting); |
60 | | DECLARE_int64(tablet_split_low_phase_size_threshold_bytes); |
61 | | DECLARE_int64(tablet_split_high_phase_size_threshold_bytes); |
62 | | DECLARE_int64(tablet_split_low_phase_shard_count_per_node); |
63 | | DECLARE_int64(tablet_split_high_phase_shard_count_per_node); |
64 | | DECLARE_int64(tablet_force_split_threshold_bytes); |
65 | | |
66 | | DECLARE_double(TEST_simulate_lookup_partition_list_mismatch_probability); |
67 | | DECLARE_bool(TEST_reject_delete_not_serving_tablet_rpc); |
68 | | |
69 | | namespace yb { |
70 | | |
71 | | namespace { |
72 | | |
73 | 0 | size_t GetNumActiveTablets(MiniCluster* cluster) { |
74 | 0 | return ListTabletPeers( |
75 | 0 | cluster, |
76 | 0 | [](const std::shared_ptr<tablet::TabletPeer>& peer) -> bool { |
77 | 0 | const auto tablet_meta = peer->tablet_metadata(); |
78 | 0 | const auto consensus = peer->shared_consensus(); |
79 | 0 | return tablet_meta && consensus && |
80 | 0 | tablet_meta->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE && |
81 | 0 | tablet_meta->tablet_data_state() != |
82 | 0 | tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED && |
83 | 0 | consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER; |
84 | 0 | }) |
85 | 0 | .size(); |
86 | 0 | } |
87 | | |
88 | | Result<size_t> GetNumActiveTablets( |
89 | | ExternalMiniCluster* cluster, const client::YBTableName& table_name, const MonoDelta& timeout, |
90 | 114 | const RequireTabletsRunning require_tablets_running) { |
91 | 114 | master::GetTableLocationsResponsePB resp; |
92 | 114 | RETURN_NOT_OK(itest::GetTableLocations( |
93 | 114 | cluster, table_name, timeout, require_tablets_running, &resp)); |
94 | 114 | return resp.tablet_locations_size(); |
95 | 114 | } |
96 | | |
97 | | } // namespace |
98 | | |
99 | | const auto kSecondaryIndexTestTableName = |
100 | | client::YBTableName(YQL_DATABASE_CQL, kCqlTestKeyspace, "cql_test_table"); |
101 | | |
102 | | class CqlTabletSplitTest : public CqlTestBase<MiniCluster> { |
103 | | protected: |
104 | 2 | void SetUp() override { |
105 | 2 | FLAGS_yb_num_shards_per_tserver = 1; |
106 | 2 | FLAGS_enable_automatic_tablet_splitting = true; |
107 | | |
108 | | // Setting this very low will just cause to include metrics in every heartbeat, no overhead on |
109 | | // setting it lower than FLAGS_heartbeat_interval_ms. |
110 | 2 | FLAGS_tserver_heartbeat_metrics_interval_ms = 1; |
111 | 2 | FLAGS_heartbeat_interval_ms = 1000; |
112 | | |
113 | | // Reduce cleanup waiting time, so tests are completed faster. |
114 | 2 | FLAGS_cleanup_split_tablets_interval_sec = 1; |
115 | | |
116 | 2 | FLAGS_tablet_split_low_phase_size_threshold_bytes = 0; |
117 | 2 | FLAGS_tablet_split_high_phase_size_threshold_bytes = 0; |
118 | 2 | FLAGS_tablet_split_low_phase_shard_count_per_node = 0; |
119 | 2 | FLAGS_tablet_split_high_phase_shard_count_per_node = 0; |
120 | 2 | FLAGS_tablet_force_split_threshold_bytes = 64_KB; |
121 | 2 | FLAGS_db_write_buffer_size = FLAGS_tablet_force_split_threshold_bytes; |
122 | 2 | FLAGS_db_block_size_bytes = 2_KB; |
123 | 2 | FLAGS_db_filter_block_size_bytes = 2_KB; |
124 | 2 | FLAGS_db_index_block_size_bytes = 2_KB; |
125 | 2 | CqlTestBase::SetUp(); |
126 | 2 | } |
127 | | |
128 | 0 | void WaitUntilAllCommittedOpsApplied(const MonoDelta timeout) { |
129 | 0 | const auto splits_completion_deadline = MonoTime::Now() + timeout; |
130 | 0 | for (auto& peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kAll)) { |
131 | 0 | auto consensus = peer->shared_consensus(); |
132 | 0 | if (consensus) { |
133 | 0 | ASSERT_OK(Wait([consensus]() -> Result<bool> { |
134 | 0 | return consensus->GetLastAppliedOpId() >= consensus->GetLastCommittedOpId(); |
135 | 0 | }, splits_completion_deadline, "Waiting for all committed ops to be applied")); |
136 | 0 | } |
137 | 0 | } |
138 | 0 | } |
139 | | |
140 | | // Disable splitting and wait for pending splits to complete. |
141 | 0 | void StopSplitsAndWait() { |
142 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = false; |
143 | | // Give time to leaders for applying split ops that has been already scheduled. |
144 | 0 | std::this_thread::sleep_for(1s * kTimeMultiplier); |
145 | | // Wait until followers also apply those split ops. |
146 | 0 | ASSERT_NO_FATALS(WaitUntilAllCommittedOpsApplied(15s * kTimeMultiplier)); |
147 | 0 | LOG(INFO) << "Number of active tablets: " << GetNumActiveTablets(cluster_.get()); |
148 | 0 | } |
149 | | |
150 | 0 | void DoTearDown() override { |
151 | | // TODO(tsplit): remove this workaround after |
152 | | // https://github.com/yugabyte/yugabyte-db/issues/8222 is fixed. |
153 | 0 | StopSplitsAndWait(); |
154 | 0 | CqlTestBase::DoTearDown(); |
155 | 0 | } |
156 | | |
157 | | void StartSecondaryIndexTest(); |
158 | | void CompleteSecondaryIndexTest(int num_splits, MonoDelta timeout); |
159 | | |
160 | | int writer_threads_ = 2; |
161 | | int reader_threads_ = 4; |
162 | | int value_size_bytes_ = 1024; |
163 | | int max_write_errors_ = 100; |
164 | | int max_read_errors_ = 100; |
165 | | CassandraSession session_; |
166 | | std::atomic<bool> stop_requested_{false}; |
167 | | std::unique_ptr<load_generator::SessionFactory> load_session_factory_; |
168 | | std::unique_ptr<load_generator::MultiThreadedWriter> writer_; |
169 | | std::unique_ptr<load_generator::MultiThreadedReader> reader_; |
170 | | size_t start_num_active_tablets_; |
171 | | }; |
172 | | |
173 | | class CqlTabletSplitTestMultiMaster : public CqlTabletSplitTest { |
174 | | public: |
175 | 1 | int num_masters() override { |
176 | 1 | return 3; |
177 | 1 | } |
178 | | }; |
179 | | |
180 | | class CqlSecondaryIndexWriter : public load_generator::SingleThreadedWriter { |
181 | | public: |
182 | | CqlSecondaryIndexWriter( |
183 | | load_generator::MultiThreadedWriter* writer, int writer_index, CppCassandraDriver* driver) |
184 | 0 | : SingleThreadedWriter(writer, writer_index), driver_(driver) {} |
185 | | |
186 | | private: |
187 | | CppCassandraDriver* driver_; |
188 | | CassandraSession session_; |
189 | | CassandraPrepared prepared_insert_; |
190 | | |
191 | | void ConfigureSession() override; |
192 | | void CloseSession() override; |
193 | | bool Write(int64_t key_index, const string& key_str, const string& value_str) override; |
194 | | void HandleInsertionFailure(int64_t key_index, const string& key_str) override; |
195 | | }; |
196 | | |
197 | 0 | void CqlSecondaryIndexWriter::ConfigureSession() { |
198 | 0 | session_ = CHECK_RESULT(EstablishSession(driver_)); |
199 | 0 | prepared_insert_ = CHECK_RESULT(session_.Prepare(Format( |
200 | 0 | "INSERT INTO $0 (k, v) VALUES (?, ?)", kSecondaryIndexTestTableName.table_name()))); |
201 | 0 | } |
202 | | |
203 | 0 | void CqlSecondaryIndexWriter::CloseSession() { |
204 | 0 | session_.Reset(); |
205 | 0 | } |
206 | | |
207 | | bool CqlSecondaryIndexWriter::Write( |
208 | 0 | int64_t key_index, const string& key_str, const string& value_str) { |
209 | 0 | auto stmt = prepared_insert_.Bind(); |
210 | 0 | stmt.Bind(0, key_str); |
211 | 0 | stmt.Bind(1, value_str); |
212 | 0 | auto status = session_.Execute(stmt); |
213 | 0 | if (!status.ok()) { |
214 | 0 | LOG(INFO) << "Insert failed: " << AsString(status); |
215 | 0 | return false; |
216 | 0 | } |
217 | 0 | return true; |
218 | 0 | } |
219 | | |
220 | 0 | void CqlSecondaryIndexWriter::HandleInsertionFailure(int64_t key_index, const string& key_str) { |
221 | 0 | } |
222 | | |
223 | | class CqlSecondaryIndexReader : public load_generator::SingleThreadedReader { |
224 | | public: |
225 | | CqlSecondaryIndexReader( |
226 | | load_generator::MultiThreadedReader* writer, int writer_index, CppCassandraDriver* driver) |
227 | 0 | : SingleThreadedReader(writer, writer_index), driver_(driver) {} |
228 | | |
229 | | private: |
230 | | CppCassandraDriver* driver_; |
231 | | CassandraSession session_; |
232 | | CassandraPrepared prepared_select_; |
233 | | |
234 | | void ConfigureSession() override; |
235 | | void CloseSession() override; |
236 | | load_generator::ReadStatus PerformRead( |
237 | | int64_t key_index, const string& key_str, const string& expected_value) override; |
238 | | }; |
239 | | |
240 | 0 | void CqlSecondaryIndexReader::ConfigureSession() { |
241 | 0 | session_ = CHECK_RESULT(EstablishSession(driver_)); |
242 | 0 | prepared_select_ = CHECK_RESULT(session_.Prepare( |
243 | 0 | Format("SELECT k, v FROM $0 WHERE v = ?", kSecondaryIndexTestTableName.table_name()))); |
244 | 0 | } |
245 | | |
246 | 0 | void CqlSecondaryIndexReader::CloseSession() { |
247 | 0 | session_.Reset(); |
248 | 0 | } |
249 | | |
250 | | load_generator::ReadStatus CqlSecondaryIndexReader::PerformRead( |
251 | 0 | int64_t key_index, const string& key_str, const string& expected_value) { |
252 | 0 | auto stmt = prepared_select_.Bind(); |
253 | 0 | stmt.Bind(0, expected_value); |
254 | 0 | auto result = session_.ExecuteWithResult(stmt); |
255 | 0 | if (!result.ok()) { |
256 | 0 | LOG(WARNING) << "Select failed: " << AsString(result.status()); |
257 | 0 | return load_generator::ReadStatus::kOtherError; |
258 | 0 | } |
259 | 0 | auto iter = result->CreateIterator(); |
260 | 0 | auto values_formatter = [&] { |
261 | 0 | return Format( |
262 | 0 | "for v: '$0', expected key: '$1', key_index: $2", expected_value, key_str, key_index); |
263 | 0 | }; |
264 | 0 | if (!iter.Next()) { |
265 | 0 | LOG(ERROR) << "No rows found " << values_formatter(); |
266 | 0 | return load_generator::ReadStatus::kNoRows; |
267 | 0 | } |
268 | 0 | auto row = iter.Row(); |
269 | 0 | const auto k = row.Value(0).ToString(); |
270 | 0 | if (k != key_str) { |
271 | 0 | LOG(ERROR) << "Invalid k " << values_formatter() << " got k: " << k; |
272 | 0 | return load_generator::ReadStatus::kInvalidRead; |
273 | 0 | } |
274 | 0 | if (iter.Next()) { |
275 | 0 | return load_generator::ReadStatus::kExtraRows; |
276 | 0 | LOG(ERROR) << "More than 1 row found " << values_formatter(); |
277 | 0 | do { |
278 | 0 | LOG(ERROR) << "k: " << iter.Row().Value(0).ToString(); |
279 | 0 | } while (iter.Next()); |
280 | 0 | } |
281 | 0 | return load_generator::ReadStatus::kOk; |
282 | 0 | } |
283 | | |
284 | | class CqlSecondaryIndexSessionFactory : public load_generator::SessionFactory { |
285 | | public: |
286 | 0 | explicit CqlSecondaryIndexSessionFactory(CppCassandraDriver* driver) : driver_(driver) {} |
287 | | |
288 | 0 | std::string ClientId() override { return "CQL secondary index test client"; } |
289 | | |
290 | | load_generator::SingleThreadedWriter* GetWriter( |
291 | 0 | load_generator::MultiThreadedWriter* writer, int idx) override { |
292 | 0 | return new CqlSecondaryIndexWriter(writer, idx, driver_); |
293 | 0 | } |
294 | | |
295 | | load_generator::SingleThreadedReader* GetReader( |
296 | 0 | load_generator::MultiThreadedReader* reader, int idx) override { |
297 | 0 | return new CqlSecondaryIndexReader(reader, idx, driver_); |
298 | 0 | } |
299 | | |
300 | | protected: |
301 | | CppCassandraDriver* driver_; |
302 | | }; |
303 | | |
304 | 0 | void CqlTabletSplitTest::StartSecondaryIndexTest() { |
305 | 0 | const auto kNumRows = std::numeric_limits<int64_t>::max(); |
306 | |
|
307 | 0 | session_ = ASSERT_RESULT(EstablishSession(driver_.get())); |
308 | 0 | ASSERT_OK(session_.ExecuteQuery(Format( |
309 | 0 | "CREATE TABLE $0 (k varchar PRIMARY KEY, v varchar) WITH transactions = " |
310 | 0 | "{ 'enabled' : true }", |
311 | 0 | kSecondaryIndexTestTableName.table_name()))); |
312 | 0 | ASSERT_OK(session_.ExecuteQuery(Format( |
313 | 0 | "CREATE INDEX $0_by_value ON $0(v) WITH transactions = { 'enabled' : true }", |
314 | 0 | kSecondaryIndexTestTableName.table_name()))); |
315 | |
|
316 | 0 | start_num_active_tablets_ = GetNumActiveTablets(cluster_.get()); |
317 | 0 | LOG(INFO) << "Number of active tablets at workload start: " << start_num_active_tablets_; |
318 | |
|
319 | 0 | load_session_factory_ = std::make_unique<CqlSecondaryIndexSessionFactory>(driver_.get()); |
320 | 0 | stop_requested_ = false; |
321 | |
|
322 | 0 | writer_ = std::make_unique<load_generator::MultiThreadedWriter>( |
323 | 0 | kNumRows, /* start_key = */ 0, writer_threads_, load_session_factory_.get(), &stop_requested_, |
324 | 0 | value_size_bytes_, max_write_errors_); |
325 | 0 | reader_ = std::make_unique<load_generator::MultiThreadedReader>( |
326 | 0 | kNumRows, reader_threads_, load_session_factory_.get(), writer_->InsertionPoint(), |
327 | 0 | writer_->InsertedKeys(), writer_->FailedKeys(), &stop_requested_, value_size_bytes_, |
328 | 0 | max_read_errors_); |
329 | |
|
330 | 0 | LOG(INFO) << "Started workload"; |
331 | 0 | writer_->Start(); |
332 | 0 | reader_->Start(); |
333 | 0 | } |
334 | | |
335 | 0 | void CqlTabletSplitTest::CompleteSecondaryIndexTest(const int num_splits, const MonoDelta timeout) { |
336 | 0 | const auto num_wait_for_active_tablets = start_num_active_tablets_ + num_splits; |
337 | 0 | size_t num_active_tablets; |
338 | |
|
339 | 0 | ASSERT_OK(LoggedWaitFor( |
340 | 0 | [&]() { |
341 | 0 | num_active_tablets = GetNumActiveTablets(cluster_.get()); |
342 | 0 | YB_LOG_EVERY_N_SECS(INFO, 5) << "Number of active tablets: " << num_active_tablets; |
343 | 0 | if (!writer_->IsRunning()) { |
344 | 0 | return true; |
345 | 0 | } |
346 | 0 | if (num_active_tablets >= num_wait_for_active_tablets) { |
347 | 0 | return true; |
348 | 0 | } |
349 | 0 | return false; |
350 | 0 | }, |
351 | 0 | timeout, |
352 | 0 | Format("Waiting for $0 active tablets or writer stopped", num_wait_for_active_tablets))); |
353 | 0 | LOG(INFO) << "Number of active tablets: " << num_active_tablets; |
354 | |
|
355 | 0 | writer_->Stop(); |
356 | 0 | reader_->Stop(); |
357 | 0 | writer_->WaitForCompletion(); |
358 | 0 | reader_->WaitForCompletion(); |
359 | |
|
360 | 0 | LOG(INFO) << "Workload complete, num_writes: " << writer_->num_writes() |
361 | 0 | << ", num_write_errors: " << writer_->num_write_errors() |
362 | 0 | << ", num_reads: " << reader_->num_reads() |
363 | 0 | << ", num_read_errors: " << reader_->num_read_errors() |
364 | 0 | << ", splits done: " << num_active_tablets - start_num_active_tablets_; |
365 | 0 | ASSERT_EQ(reader_->read_status_stopped(), load_generator::ReadStatus::kOk) |
366 | 0 | << " reader stopped due to: " << AsString(reader_->read_status_stopped()); |
367 | 0 | ASSERT_LE(writer_->num_write_errors(), max_write_errors_); |
368 | 0 | } |
369 | | |
370 | 0 | TEST_F(CqlTabletSplitTest, SecondaryIndex) { |
371 | 0 | const auto kNumSplits = 10; |
372 | |
|
373 | 0 | ASSERT_NO_FATALS(StartSecondaryIndexTest()); |
374 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability) = 0.5; |
375 | 0 | ASSERT_NO_FATALS(CompleteSecondaryIndexTest(kNumSplits, 300s * kTimeMultiplier)); |
376 | 0 | } |
377 | | |
378 | 0 | TEST_F_EX(CqlTabletSplitTest, SecondaryIndexWithDrop, CqlTabletSplitTestMultiMaster) { |
379 | 0 | const auto kNumSplits = 3; |
380 | 0 | const auto kNumTestIters = 2; |
381 | |
|
382 | 0 | #ifndef NDEBUG |
383 | 0 | SyncPoint::GetInstance()->LoadDependency( |
384 | 0 | {{"CatalogManager::DeleteNotServingTablet:Reject", |
385 | 0 | "CqlTabletSplitTest::SecondaryIndexWithDrop:WaitForReject"}}); |
386 | 0 | #endif // NDEBUG |
387 | |
|
388 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
389 | |
|
390 | 0 | for (auto iter = 1; iter <= kNumTestIters; ++iter) { |
391 | 0 | LOG(INFO) << "Iteration " << iter; |
392 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_reject_delete_not_serving_tablet_rpc) = true; |
393 | 0 | #ifndef NDEBUG |
394 | 0 | SyncPoint::GetInstance()->EnableProcessing(); |
395 | 0 | #endif // NDEBUG |
396 | |
|
397 | 0 | ASSERT_NO_FATALS(StartSecondaryIndexTest()); |
398 | 0 | const auto table_info = |
399 | 0 | ASSERT_RESULT(client->GetYBTableInfo(kSecondaryIndexTestTableName)); |
400 | 0 | ASSERT_NO_FATALS(CompleteSecondaryIndexTest(kNumSplits, 300s * kTimeMultiplier)); |
401 | |
|
402 | 0 | TEST_SYNC_POINT("CqlTabletSplitTest::SecondaryIndexWithDrop:WaitForReject"); |
403 | |
|
404 | 0 | if (iter > 1) { |
405 | | // Test tracking split tablets in case of leader master failover. |
406 | 0 | const auto leader_master_idx = cluster_->LeaderMasterIdx(); |
407 | 0 | const auto sys_catalog_tablet_peer_leader = |
408 | 0 | cluster_->mini_master(leader_master_idx)->tablet_peer(); |
409 | 0 | const auto sys_catalog_tablet_peer_follower = |
410 | 0 | cluster_->mini_master((leader_master_idx + 1) % cluster_->num_masters())->tablet_peer(); |
411 | 0 | LOG(INFO) << "Iteration " << iter << ": stepping down master leader"; |
412 | 0 | ASSERT_OK(StepDown( |
413 | 0 | sys_catalog_tablet_peer_leader, sys_catalog_tablet_peer_follower->permanent_uuid(), |
414 | 0 | ForceStepDown::kFalse)); |
415 | 0 | LOG(INFO) << "Iteration " << iter << ": stepping down master leader - done"; |
416 | 0 | } |
417 | |
|
418 | 0 | LOG(INFO) << "Iteration " << iter << ": deleting test table"; |
419 | 0 | ASSERT_OK(session_.ExecuteQuery("DROP TABLE " + kSecondaryIndexTestTableName.table_name())); |
420 | 0 | LOG(INFO) << "Iteration " << iter << ": deleted test table"; |
421 | | |
422 | | // Make sure all table tablets deleted on all tservers. |
423 | 0 | auto peer_to_str = [](const tablet::TabletPeerPtr& peer) { return peer->LogPrefix(); }; |
424 | 0 | std::vector<tablet::TabletPeerPtr> tablet_peers; |
425 | 0 | auto s = LoggedWaitFor([&]() -> Result<bool> { |
426 | 0 | tablet_peers = ListTableTabletPeers(cluster_.get(), table_info.table_id); |
427 | 0 | return tablet_peers.size() == 0; |
428 | 0 | }, 10s * kTimeMultiplier, "Waiting for tablets to be deleted"); |
429 | 0 | ASSERT_TRUE(s.ok()) << AsString(s) + ": expected tablets to be deleted, but following left:\n" |
430 | 0 | << JoinStrings( |
431 | 0 | tablet_peers | boost::adaptors::transformed(peer_to_str), "\n"); |
432 | | |
433 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_reject_delete_not_serving_tablet_rpc) = false; |
434 | 0 | #ifndef NDEBUG |
435 | 0 | SyncPoint::GetInstance()->DisableProcessing(); |
436 | 0 | SyncPoint::GetInstance()->ClearTrace(); |
437 | 0 | #endif // NDEBUG |
438 | 0 | } |
439 | 0 | } |
440 | | |
441 | | class CqlTabletSplitTestExt : public CqlTestBase<ExternalMiniCluster> { |
442 | | protected: |
443 | 2 | void SetUpFlags() override { |
444 | 2 | const int64 kSplitThreshold = 64_KB; |
445 | | |
446 | 2 | std::vector<std::string> common_flags; |
447 | 2 | common_flags.push_back("--yb_num_shards_per_tserver=1"); |
448 | | |
449 | 2 | auto& master_flags = mini_cluster_opt_.extra_master_flags; |
450 | 2 | master_flags.push_back( |
451 | 2 | Format("--replication_factor=$0", std::min(num_tablet_servers(), 3))); |
452 | 2 | master_flags.push_back("--enable_automatic_tablet_splitting=true"); |
453 | 2 | master_flags.push_back("--tablet_split_low_phase_size_threshold_bytes=0"); |
454 | 2 | master_flags.push_back("--tablet_split_high_phase_size_threshold_bytes=0"); |
455 | 2 | master_flags.push_back("--tablet_split_low_phase_shard_count_per_node=0"); |
456 | 2 | master_flags.push_back("--tablet_split_high_phase_shard_count_per_node=0"); |
457 | 2 | master_flags.push_back(Format("--tablet_force_split_threshold_bytes=$0", kSplitThreshold)); |
458 | | |
459 | 2 | auto& tserver_flags = mini_cluster_opt_.extra_tserver_flags; |
460 | 2 | tserver_flags.push_back(Format("--db_write_buffer_size=$0", kSplitThreshold)); |
461 | | // Lower SST block size to have multiple entries in index and be able to detect a split key. |
462 | 2 | tserver_flags.push_back(Format( |
463 | 2 | "--db_block_size_bytes=$0", std::min(FLAGS_db_block_size_bytes, kSplitThreshold / 8))); |
464 | | |
465 | 2 | for (auto& flag : common_flags) { |
466 | 2 | master_flags.push_back(flag); |
467 | 2 | tserver_flags.push_back(flag); |
468 | 2 | } |
469 | 2 | } |
470 | | }; |
471 | | |
472 | | class CqlTabletSplitTestExtRf1 : public CqlTabletSplitTestExt { |
473 | | public: |
474 | 2 | int num_tablet_servers() override { return 1; } |
475 | | }; |
476 | | |
477 | | struct BatchTimeseriesDataSource { |
478 | 28.9k | explicit BatchTimeseriesDataSource(const std::string& metric_id_) : metric_id(metric_id_) {} |
479 | | |
480 | | const std::string metric_id; |
481 | | const int64_t data_emit_start_ts = 1; |
482 | | std::atomic<int64_t> last_emitted_ts{-1}; |
483 | | }; |
484 | | |
485 | | CHECKED_STATUS RunBatchTimeSeriesTest( |
486 | | ExternalMiniCluster* cluster, CppCassandraDriver* driver, const int num_splits, |
487 | 2 | const MonoDelta timeout) { |
488 | 2 | const auto kWriterThreads = 4; |
489 | 2 | const auto kReaderThreads = 4; |
490 | 2 | const auto kMinMetricsCount = 10000; |
491 | 2 | const auto kMaxMetricsCount = 20000; |
492 | 2 | const auto kReadBatchSize = 100; |
493 | 2 | const auto kWriteBatchSize = 500; |
494 | 2 | const auto kReadBackDeltaTime = 100; |
495 | 2 | const auto kValueSize = 100; |
496 | | |
497 | 2 | const auto kMaxWriteErrors = 100; |
498 | 2 | const auto kMaxReadErrors = 100; |
499 | | |
500 | 2 | const auto kTableTtlSeconds = MonoDelta(24h).ToSeconds(); |
501 | | |
502 | 2 | const client::YBTableName kTableName(YQL_DATABASE_CQL, "test", "batch_timeseries_test"); |
503 | | |
504 | 2 | std::atomic_int num_reads(0); |
505 | 2 | std::atomic_int num_writes(0); |
506 | 2 | std::atomic_int num_read_errors(0); |
507 | 2 | std::atomic_int num_write_errors(0); |
508 | | |
509 | 2 | std::mt19937_64 rng(/* seed */ 29383); |
510 | 2 | const auto num_metrics = RandomUniformInt<>(kMinMetricsCount, kMaxMetricsCount - 1, &rng); |
511 | 2 | std::vector<std::unique_ptr<BatchTimeseriesDataSource>> data_sources; |
512 | 28.9k | for (int i = 0; i < num_metrics; ++i) { |
513 | 28.9k | data_sources.emplace_back(std::make_unique<BatchTimeseriesDataSource>(Format("metric-$0", i))); |
514 | 28.9k | } |
515 | | |
516 | 2 | auto session = CHECK_RESULT(EstablishSession(driver)); |
517 | | |
518 | 2 | RETURN_NOT_OK( |
519 | 2 | session.ExecuteQuery(Format( |
520 | 2 | "CREATE TABLE $0 (" |
521 | 2 | "metric_id varchar, " |
522 | 2 | "ts bigint, " |
523 | 2 | "value varchar, " |
524 | 2 | "primary key (metric_id, ts)) " |
525 | 2 | "WITH default_time_to_live = $1", kTableName.table_name(), kTableTtlSeconds))); |
526 | | |
527 | 2 | const auto start_num_active_tablets = VERIFY_RESULT(GetNumActiveTablets( |
528 | 2 | cluster, kTableName, 60s * kTimeMultiplier, RequireTabletsRunning::kTrue)); |
529 | 2 | LOG(INFO) << "Number of active tablets at workload start: " << start_num_active_tablets; |
530 | | |
531 | 2 | auto prepared_write = VERIFY_RESULT(session.Prepare(Format( |
532 | 2 | "INSERT INTO $0 (metric_id, ts, value) VALUES (?, ?, ?)", kTableName.table_name()))); |
533 | | |
534 | 2 | auto prepared_read = VERIFY_RESULT(session.Prepare(Format( |
535 | 2 | "SELECT * from $0 WHERE metric_id = ? AND ts > ? AND ts < ? ORDER BY ts DESC LIMIT ?", |
536 | 2 | kTableName.table_name()))); |
537 | | |
538 | 2 | std::mutex random_mutex; |
539 | 936k | auto get_random_source = [&rng, &random_mutex, &data_sources]() -> BatchTimeseriesDataSource* { |
540 | 936k | std::lock_guard<decltype(random_mutex)> lock(random_mutex); |
541 | 936k | return RandomElement(data_sources, &rng).get(); |
542 | 936k | }; |
543 | | |
544 | 368k | auto get_value = [](int64_t ts, std::string* value) { |
545 | 368k | value->clear(); |
546 | 368k | value->append(AsString(ts)); |
547 | 368k | const auto suffix_size = value->size() >= kValueSize ? 0 : kValueSize - value->size(); |
548 | 9.57M | while (value->size() < kValueSize) { |
549 | 9.20M | value->append(Uint16ToHexString(RandomUniformInt<uint16_t>())); |
550 | 9.20M | } |
551 | 368k | if (suffix_size > 0) { |
552 | 368k | value->resize(kValueSize); |
553 | 368k | } |
554 | 368k | return value; |
555 | 368k | }; |
556 | | |
557 | 2 | TestThreadHolder io_threads; |
558 | 8 | auto reader = [&, &stop = io_threads.stop_flag()]() -> void { |
559 | 936k | while (!stop.load(std::memory_order_acquire)) { |
560 | 936k | auto& source = *CHECK_NOTNULL(get_random_source()); |
561 | 936k | if (source.last_emitted_ts < source.data_emit_start_ts) { |
562 | 931k | continue; |
563 | 931k | } |
564 | 4.33k | const int64_t end_ts = source.last_emitted_ts + 1; |
565 | 4.33k | const int64_t start_ts = std::max(end_ts - kReadBackDeltaTime, source.data_emit_start_ts); |
566 | | |
567 | 4.33k | auto stmt = prepared_read.Bind(); |
568 | 4.33k | stmt.Bind(0, source.metric_id); |
569 | 4.33k | stmt.Bind(1, start_ts); |
570 | 4.33k | stmt.Bind(2, end_ts); |
571 | 4.33k | stmt.Bind(3, kReadBatchSize); |
572 | 4.33k | auto status = session.Execute(stmt); |
573 | 4.33k | if (!status.ok()) { |
574 | 0 | YB_LOG_EVERY_N_SECS(INFO, 1) << "Read failed: " << AsString(status); |
575 | 0 | num_read_errors++; |
576 | 4.33k | } else { |
577 | 4.33k | num_reads += 1; |
578 | 4.33k | } |
579 | 4.33k | YB_LOG_EVERY_N_SECS(INFO, 5) |
580 | 12 | << "Completed " << num_reads << " reads, read errors: " << num_read_errors; |
581 | 4.33k | } |
582 | 8 | }; |
583 | | |
584 | 2 | std::atomic<bool> stop_writes{false}; |
585 | 8 | auto writer = [&, &stop = io_threads.stop_flag()]() -> void { |
586 | 8 | std::string value; |
587 | | // Reserve more bytes, because we fill by 2-byte blocks and might overfill and then |
588 | | // truncate, but don't want reallocation on growth. |
589 | 8 | value.reserve(kValueSize + 1); |
590 | 745 | while (!stop.load(std::memory_order_acquire) && !stop_writes.load(std::memory_order_acquire)) { |
591 | 737 | auto& source = *CHECK_NOTNULL(get_random_source()); |
592 | | |
593 | 737 | if (source.last_emitted_ts == -1) { |
594 | 728 | source.last_emitted_ts = source.data_emit_start_ts; |
595 | 728 | } |
596 | 737 | auto ts = source.last_emitted_ts.load(std::memory_order_acquire); |
597 | 737 | CassandraBatch batch(CassBatchType::CASS_BATCH_TYPE_LOGGED); |
598 | | |
599 | 369k | for (int i = 0; i < kWriteBatchSize; ++i) { |
600 | 368k | auto stmt = prepared_write.Bind(); |
601 | 368k | stmt.Bind(0, source.metric_id); |
602 | 368k | stmt.Bind(1, ts); |
603 | 368k | get_value(ts, &value); |
604 | 368k | stmt.Bind(2, value); |
605 | 368k | batch.Add(&stmt); |
606 | 368k | ts++; |
607 | 368k | } |
608 | | |
609 | 737 | auto status = session.ExecuteBatch(batch); |
610 | 737 | if (!status.ok()) { |
611 | 0 | YB_LOG_EVERY_N_SECS(INFO, 1) << "Write failed: " << AsString(status); |
612 | 0 | num_write_errors++; |
613 | 737 | } else { |
614 | 737 | num_writes += kWriteBatchSize; |
615 | 737 | source.last_emitted_ts = ts; |
616 | 737 | } |
617 | 737 | YB_LOG_EVERY_N_SECS(INFO, 5) |
618 | 12 | << "Completed " << num_writes << " writes, num_write_errors: " << num_write_errors; |
619 | 737 | } |
620 | 8 | }; |
621 | | |
622 | 10 | for (int i = 0; i < kReaderThreads; ++i) { |
623 | 8 | io_threads.AddThreadFunctor(reader); |
624 | 8 | } |
625 | 10 | for (int i = 0; i < kWriterThreads; ++i) { |
626 | 8 | io_threads.AddThreadFunctor(writer); |
627 | 8 | } |
628 | | |
629 | 2 | const auto deadline = CoarseMonoClock::Now() + timeout; |
630 | 2 | const auto num_wait_for_active_tablets = start_num_active_tablets + num_splits; |
631 | 2 | size_t num_active_tablets = start_num_active_tablets; |
632 | 112 | while (CoarseMonoClock::Now() < deadline && num_read_errors < kMaxReadErrors && |
633 | 112 | num_write_errors < kMaxWriteErrors) { |
634 | | // When we get num_wait_for_active_tablets active tablets (not necessarily running), we stop |
635 | | // writes to avoid creating too many post-split tablets and overloading local cluster. |
636 | | // After stopping writes, we continue reads and wait for num_wait_for_active_tablets active |
637 | | // running tablets. |
638 | 112 | auto num_active_tablets_res = GetNumActiveTablets( |
639 | 112 | cluster, kTableName, 10s * kTimeMultiplier, |
640 | 112 | RequireTabletsRunning(stop_writes.load(std::memory_order_acquire))); |
641 | 112 | if (num_active_tablets_res.ok()) { |
642 | 112 | num_active_tablets = *num_active_tablets_res; |
643 | 112 | YB_LOG_EVERY_N_SECS(INFO, 3) << "Number of active tablets: " << num_active_tablets; |
644 | 112 | if (num_active_tablets >= num_wait_for_active_tablets) { |
645 | 4 | if (stop_writes.load(std::memory_order_acquire)) { |
646 | 2 | break; |
647 | 2 | } else { |
648 | 2 | LOG(INFO) << "Stopping writes"; |
649 | 2 | stop_writes = true; |
650 | 2 | } |
651 | 4 | } |
652 | 112 | } |
653 | 110 | SleepFor(500ms); |
654 | 110 | } |
655 | 2 | if (CoarseMonoClock::Now() >= deadline) { |
656 | | // Produce a core dump for investigation. |
657 | 0 | for (auto* daemon : cluster->daemons()) { |
658 | 0 | ERROR_NOT_OK(daemon->Kill(SIGSEGV), "Failed to crash process: "); |
659 | 0 | } |
660 | 0 | } |
661 | | |
662 | 2 | io_threads.Stop(); |
663 | 2 | LOG(INFO) << "num_reads: " << num_reads; |
664 | 2 | LOG(INFO) << "num_writes: " << num_writes; |
665 | 2 | LOG(INFO) << "num_read_errors: " << num_read_errors; |
666 | 2 | LOG(INFO) << "num_write_errors: " << num_write_errors; |
667 | 2 | EXPECT_LE(num_read_errors, kMaxReadErrors); |
668 | 2 | EXPECT_LE(num_write_errors, kMaxWriteErrors); |
669 | 2 | SCHECK_GE( |
670 | 2 | num_active_tablets, num_wait_for_active_tablets, IllegalState, |
671 | 2 | Format("Didn't achieve $0 splits", num_splits)); |
672 | | |
673 | 2 | return Status::OK(); |
674 | 2 | } |
675 | | |
676 | 1 | TEST_F_EX(CqlTabletSplitTest, BatchTimeseries, CqlTabletSplitTestExt) { |
677 | | // TODO(#10498) - Set this back to 20 once outstanding_tablet_split_limit is set back to a higher |
678 | | // value. |
679 | 1 | const auto kNumSplits = 4; |
680 | 1 | ASSERT_OK( |
681 | 1 | RunBatchTimeSeriesTest(cluster_.get(), driver_.get(), kNumSplits, 300s * kTimeMultiplier)); |
682 | 1 | } |
683 | | |
684 | 1 | TEST_F_EX(CqlTabletSplitTest, BatchTimeseriesRf1, CqlTabletSplitTestExtRf1) { |
685 | | // TODO(#10498) - Set this back to 20 once outstanding_tablet_split_limit is set back to a higher |
686 | | // value. |
687 | 1 | const auto kNumSplits = 4; |
688 | 1 | ASSERT_OK( |
689 | 1 | RunBatchTimeSeriesTest(cluster_.get(), driver_.get(), kNumSplits, 300s * kTimeMultiplier)); |
690 | 1 | } |
691 | | |
692 | | } // namespace yb |