/Users/deen/code/yugabyte-db/src/yb/integration-tests/xcluster-tablet-split-itest.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/cdc/cdc_service.proxy.h" |
15 | | |
16 | | #include "yb/client/session.h" |
17 | | #include "yb/client/table.h" |
18 | | #include "yb/client/yb_table_name.h" |
19 | | |
20 | | #include "yb/integration-tests/cdc_test_util.h" |
21 | | #include "yb/integration-tests/tablet-split-itest-base.h" |
22 | | #include "yb/master/master_ddl.proxy.h" |
23 | | #include "yb/master/master_defaults.h" |
24 | | #include "yb/tablet/tablet_peer.h" |
25 | | #include "yb/tools/admin-test-base.h" |
26 | | #include "yb/tserver/mini_tablet_server.h" |
27 | | #include "yb/util/thread.h" |
28 | | |
29 | | DECLARE_int32(cdc_state_table_num_tablets); |
30 | | DECLARE_bool(enable_tablet_split_of_xcluster_replicated_tables); |
31 | | DECLARE_uint64(snapshot_coordinator_poll_interval_ms); |
32 | | DECLARE_bool(TEST_validate_all_tablet_candidates); |
33 | | DECLARE_bool(TEST_xcluster_consumer_fail_after_process_split_op); |
34 | | |
35 | | namespace yb { |
36 | | |
37 | | class CdcTabletSplitITest : public TabletSplitITest { |
38 | | public: |
39 | 11 | void SetUp() override { |
40 | 11 | FLAGS_cdc_state_table_num_tablets = 1; |
41 | 11 | TabletSplitITest::SetUp(); |
42 | 11 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_validate_all_tablet_candidates) = false; |
43 | 11 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_xcluster_replicated_tables) = true; |
44 | | |
45 | 11 | CreateSingleTablet(); |
46 | 11 | } |
47 | | |
48 | | protected: |
49 | 0 | Status WaitForCdcStateTableToBeReady() { |
50 | 0 | return WaitFor([&]() -> Result<bool> { |
51 | 0 | master::IsCreateTableDoneRequestPB is_create_req; |
52 | 0 | master::IsCreateTableDoneResponsePB is_create_resp; |
53 | |
|
54 | 0 | is_create_req.mutable_table()->set_table_name(master::kCdcStateTableName); |
55 | 0 | is_create_req.mutable_table()->mutable_namespace_()->set_name(master::kSystemNamespaceName); |
56 | 0 | master::MasterDdlProxy master_proxy( |
57 | 0 | &client_->proxy_cache(), VERIFY_RESULT(cluster_->GetLeaderMasterBoundRpcAddr())); |
58 | 0 | rpc::RpcController rpc; |
59 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(30)); |
60 | |
|
61 | 0 | auto s = master_proxy.IsCreateTableDone(is_create_req, &is_create_resp, &rpc); |
62 | 0 | return s.ok() && !is_create_resp.has_error() && is_create_resp.done(); |
63 | 0 | }, MonoDelta::FromSeconds(30), "Wait for cdc_state table creation to finish"); |
64 | 0 | } |
65 | | |
66 | | Result<std::unique_ptr<MiniCluster>> CreateNewUniverseAndTable( |
67 | 0 | const string& cluster_id, client::TableHandle* table) { |
68 | | // First create the new cluster. |
69 | 0 | MiniClusterOptions opts; |
70 | 0 | opts.num_tablet_servers = 3; |
71 | 0 | opts.cluster_id = cluster_id; |
72 | 0 | std::unique_ptr<MiniCluster> cluster = std::make_unique<MiniCluster>(opts); |
73 | 0 | RETURN_NOT_OK(cluster->Start()); |
74 | 0 | RETURN_NOT_OK(cluster->WaitForTabletServerCount(3)); |
75 | 0 | auto cluster_client = VERIFY_RESULT(cluster->CreateClient()); |
76 | | |
77 | | // Create an identical table on the new cluster. |
78 | 0 | client::kv_table_test::CreateTable( |
79 | 0 | client::Transactional(GetIsolationLevel() != IsolationLevel::NON_TRANSACTIONAL), |
80 | 0 | 1, // num_tablets |
81 | 0 | cluster_client.get(), |
82 | 0 | table); |
83 | 0 | return cluster; |
84 | 0 | } |
85 | | }; |
86 | | |
87 | 0 | TEST_F(CdcTabletSplitITest, GetChangesOnSplitParentTablet) { |
88 | 0 | constexpr auto kNumRows = kDefaultNumRows; |
89 | | // Create a cdc stream for this tablet. |
90 | 0 | auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(&client_->proxy_cache(), |
91 | 0 | HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr())); |
92 | 0 | CDCStreamId stream_id; |
93 | 0 | cdc::CreateCDCStream(cdc_proxy, table_->id(), &stream_id); |
94 | | // Ensure that the cdc_state table is ready before inserting rows and splitting. |
95 | 0 | ASSERT_OK(WaitForCdcStateTableToBeReady()); |
96 | |
|
97 | 0 | LOG(INFO) << "Created a CDC stream for table " << table_.name().table_name() |
98 | 0 | << " with stream id " << stream_id; |
99 | | |
100 | | // Write some rows to the tablet. |
101 | 0 | const auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kNumRows)); |
102 | 0 | const auto source_tablet_id = ASSERT_RESULT(SplitTabletAndValidate( |
103 | 0 | split_hash_code, kNumRows, /* parent_tablet_protected_from_deletion */ true)); |
104 | | |
105 | | // Ensure that a GetChanges still works on the source tablet. |
106 | 0 | cdc::GetChangesRequestPB change_req; |
107 | 0 | cdc::GetChangesResponsePB change_resp; |
108 | |
|
109 | 0 | change_req.set_tablet_id(source_tablet_id); |
110 | 0 | change_req.set_stream_id(stream_id); |
111 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
112 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
113 | |
|
114 | 0 | rpc::RpcController rpc; |
115 | 0 | ASSERT_OK(cdc_proxy->GetChanges(change_req, &change_resp, &rpc)); |
116 | 0 | ASSERT_FALSE(change_resp.has_error()); |
117 | | |
118 | | // Test that if the tablet leadership of the parent tablet changes we can still call GetChanges. |
119 | 0 | StepDownAllTablets(cluster_.get()); |
120 | |
|
121 | 0 | rpc.Reset(); |
122 | 0 | ASSERT_OK(cdc_proxy->GetChanges(change_req, &change_resp, &rpc)); |
123 | 0 | ASSERT_FALSE(change_resp.has_error()) << change_resp.ShortDebugString(); |
124 | | |
125 | | // Now let the table get deleted by the background task. Need to lower the wal_retention_secs. |
126 | 0 | master::AlterTableRequestPB alter_table_req; |
127 | 0 | master::AlterTableResponsePB alter_table_resp; |
128 | 0 | alter_table_req.mutable_table()->set_table_id(table_->id()); |
129 | 0 | alter_table_req.set_wal_retention_secs(1); |
130 | |
|
131 | 0 | master::MasterDdlProxy master_proxy( |
132 | 0 | &client_->proxy_cache(), ASSERT_RESULT(cluster_->GetLeaderMasterBoundRpcAddr())); |
133 | 0 | rpc.Reset(); |
134 | 0 | ASSERT_OK(master_proxy.AlterTable(alter_table_req, &alter_table_resp, &rpc)); |
135 | |
|
136 | 0 | SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_snapshot_coordinator_poll_interval_ms)); |
137 | | |
138 | | // Try to do a GetChanges again, it should fail. |
139 | 0 | rpc.Reset(); |
140 | 0 | ASSERT_OK(cdc_proxy->GetChanges(change_req, &change_resp, &rpc)); |
141 | 0 | ASSERT_TRUE(change_resp.has_error()); |
142 | 0 | } |
143 | | |
144 | | // For testing xCluster setups. Since most test utility functions expect there to be only one |
145 | | // cluster, they implicitly use cluster_ / client_ / table_ everywhere. For this test, we default |
146 | | // those to point to the producer cluster, but allow calls to SwitchToProducer/Consumer, to swap |
147 | | // those to point to the other cluster. |
148 | | class XClusterTabletSplitITest : public CdcTabletSplitITest { |
149 | | public: |
150 | 6 | void SetUp() override { |
151 | 6 | CdcTabletSplitITest::SetUp(); |
152 | | |
153 | | // Also create the consumer cluster. |
154 | 0 | consumer_cluster_ = ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_table_)); |
155 | 0 | consumer_client_ = ASSERT_RESULT(consumer_cluster_->CreateClient()); |
156 | |
|
157 | 0 | ASSERT_OK(tools::RunAdminToolCommand( |
158 | 0 | consumer_cluster_->GetMasterAddresses(), "setup_universe_replication", kProducerClusterId, |
159 | 0 | cluster_->GetMasterAddresses(), table_->id())); |
160 | 0 | } |
161 | | |
162 | | protected: |
163 | 0 | void DoBeforeTearDown() override { |
164 | 0 | SwitchToConsumer(); |
165 | 0 | ASSERT_OK(tools::RunAdminToolCommand( |
166 | 0 | cluster_->GetMasterAddresses(), "delete_universe_replication", kProducerClusterId)); |
167 | | |
168 | | // Shutdown producer cluster here, CdcTabletSplitITest will shutdown cluster_ (consumer). |
169 | 0 | producer_cluster_->Shutdown(); |
170 | 0 | CdcTabletSplitITest::DoBeforeTearDown(); |
171 | 0 | } |
172 | | |
173 | 0 | void SwitchToProducer() { |
174 | 0 | if (!producer_cluster_) { |
175 | 0 | return; |
176 | 0 | } |
177 | | // cluster_ is currently the consumer. |
178 | 0 | consumer_cluster_ = std::move(cluster_); |
179 | 0 | consumer_client_ = std::move(client_); |
180 | 0 | consumer_table_ = std::move(table_); |
181 | 0 | cluster_ = std::move(producer_cluster_); |
182 | 0 | client_ = std::move(producer_client_); |
183 | 0 | table_ = std::move(producer_table_); |
184 | 0 | LOG(INFO) << "Swapped to the producer cluster."; |
185 | 0 | } |
186 | | |
187 | 0 | void SwitchToConsumer() { |
188 | 0 | if (!consumer_cluster_) { |
189 | 0 | return; |
190 | 0 | } |
191 | | // cluster_ is currently the producer. |
192 | 0 | producer_cluster_ = std::move(cluster_); |
193 | 0 | producer_client_ = std::move(client_); |
194 | 0 | producer_table_ = std::move(table_); |
195 | 0 | cluster_ = std::move(consumer_cluster_); |
196 | 0 | client_ = std::move(consumer_client_); |
197 | 0 | table_ = std::move(consumer_table_); |
198 | 0 | LOG(INFO) << "Swapped to the consumer cluster."; |
199 | 0 | } |
200 | | |
201 | 0 | CHECKED_STATUS CheckForNumRowsOnConsumer(size_t expected_num_rows) { |
202 | 0 | client::YBClient* consumer_client(consumer_cluster_ ? consumer_client_.get() : client_.get()); |
203 | 0 | client::TableHandle* consumer_table(consumer_cluster_ ? &consumer_table_ : &table_); |
204 | |
|
205 | 0 | client::YBSessionPtr consumer_session = consumer_client->NewSession(); |
206 | 0 | consumer_session->SetTimeout(60s); |
207 | 0 | size_t num_rows = 0; |
208 | 0 | Status s = WaitFor([&]() -> Result<bool> { |
209 | 0 | num_rows = VERIFY_RESULT(SelectRowsCount(consumer_session, *consumer_table)); |
210 | 0 | return num_rows == expected_num_rows; |
211 | 0 | }, MonoDelta::FromSeconds(60), "Wait for data to be replicated"); |
212 | |
|
213 | 0 | LOG(INFO) << "Found " << num_rows << " rows on consumer, expected " << expected_num_rows; |
214 | |
|
215 | 0 | return s; |
216 | 0 | } |
217 | | |
218 | | CHECKED_STATUS SplitAllTablets( |
219 | 0 | int cur_num_tablets, bool parent_tablet_protected_from_deletion = true) { |
220 | | // Splits all tablets for cluster_. |
221 | 0 | auto* catalog_mgr = VERIFY_RESULT(catalog_manager()); |
222 | 0 | auto tablet_peers = ListTableActiveTabletLeadersPeers(cluster_.get(), table_->id()); |
223 | 0 | EXPECT_EQ(tablet_peers.size(), cur_num_tablets); |
224 | 0 | for (const auto& peer : tablet_peers) { |
225 | 0 | const auto source_tablet_ptr = peer->tablet(); |
226 | 0 | EXPECT_NE(source_tablet_ptr, nullptr); |
227 | 0 | const auto& source_tablet = *source_tablet_ptr; |
228 | 0 | RETURN_NOT_OK(SplitTablet(catalog_mgr, source_tablet)); |
229 | 0 | } |
230 | 0 | size_t expected_non_split_tablets = cur_num_tablets * 2; |
231 | 0 | size_t expected_split_tablets = parent_tablet_protected_from_deletion |
232 | 0 | ? cur_num_tablets * 2 - 1 |
233 | 0 | : 0; |
234 | 0 | return WaitForTabletSplitCompletion(expected_non_split_tablets, expected_split_tablets); |
235 | 0 | } |
236 | | |
237 | | // Only one set of these is valid at any time. |
238 | | // The other cluster is accessible via cluster_ / client_ / table_. |
239 | | std::unique_ptr<MiniCluster> consumer_cluster_; |
240 | | std::unique_ptr<client::YBClient> consumer_client_; |
241 | | client::TableHandle consumer_table_; |
242 | | |
243 | | std::unique_ptr<MiniCluster> producer_cluster_; |
244 | | std::unique_ptr<client::YBClient> producer_client_; |
245 | | client::TableHandle producer_table_; |
246 | | |
247 | | const string kProducerClusterId = "producer"; |
248 | | }; |
249 | | |
250 | 0 | TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) { |
251 | | // Perform a split on the consumer side and ensure replication still works. |
252 | | |
253 | | // To begin with, cluster_ will be our producer. |
254 | | // Write some rows to the producer. |
255 | 0 | auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows)); |
256 | | |
257 | | // Wait until the rows are all replicated on the consumer. |
258 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows)); |
259 | |
|
260 | 0 | SwitchToConsumer(); |
261 | | |
262 | | // Perform a split on the CONSUMER cluster. |
263 | 0 | ASSERT_OK(SplitTabletAndValidate(split_hash_code, kDefaultNumRows)); |
264 | |
|
265 | 0 | SwitchToProducer(); |
266 | | |
267 | | // Write another set of rows, and make sure the new poller picks up on the changes. |
268 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); |
269 | |
|
270 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); |
271 | 0 | } |
272 | | |
273 | 0 | TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnProducer) { |
274 | | // Perform a split on the producer side and ensure replication still works. |
275 | | |
276 | | // Default cluster_ will be our producer. |
277 | 0 | auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows)); |
278 | | |
279 | | // Wait until the rows are all replicated on the consumer. |
280 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows)); |
281 | | |
282 | | // Split the tablet on the producer. Note that parent tablet will only be HIDDEN and not deleted. |
283 | 0 | ASSERT_OK(SplitTabletAndValidate( |
284 | 0 | split_hash_code, kDefaultNumRows, /* parent_tablet_protected_from_deletion */ true)); |
285 | | |
286 | | // Write another set of rows, and make sure the consumer picks up on the changes. |
287 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); |
288 | |
|
289 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); |
290 | 0 | } |
291 | | |
292 | 0 | TEST_F(XClusterTabletSplitITest, MultipleSplitsDuringPausedReplication) { |
293 | | // Simulate network partition with paused replication, then perform multiple splits on producer |
294 | | // before re-enabling replication. Should be able to handle all of the splits. |
295 | | |
296 | | // Default cluster_ will be our producer. |
297 | | // Start with replication disabled. |
298 | 0 | ASSERT_OK(tools::RunAdminToolCommand( |
299 | 0 | consumer_cluster_->GetMasterAddresses(), "set_universe_replication_enabled", |
300 | 0 | kProducerClusterId, "0")); |
301 | | |
302 | | // Perform one tablet split. |
303 | 0 | auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows)); |
304 | 0 | ASSERT_OK(SplitTabletAndValidate( |
305 | 0 | split_hash_code, kDefaultNumRows, /* parent_tablet_protected_from_deletion */ true)); |
306 | | |
307 | | // Write some more rows, and then perform another split on both children. |
308 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); |
309 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2)); |
310 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, 2 * kDefaultNumRows + 1)); |
311 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 4)); |
312 | | |
313 | | // Now re-enable replication. |
314 | 0 | ASSERT_OK(tools::RunAdminToolCommand( |
315 | 0 | consumer_cluster_->GetMasterAddresses(), "set_universe_replication_enabled", |
316 | 0 | kProducerClusterId, "1")); |
317 | | |
318 | | // Ensure all the rows are all replicated on the consumer. |
319 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(3 * kDefaultNumRows)); |
320 | | |
321 | | // Write another set of rows, and make sure the consumer picks up on the changes. |
322 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, 3 * kDefaultNumRows + 1)); |
323 | |
|
324 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(4 * kDefaultNumRows)); |
325 | 0 | } |
326 | | |
327 | 0 | TEST_F(XClusterTabletSplitITest, MultipleSplitsInSequence) { |
328 | | // Handle case where there are multiple SPLIT_OPs immediately after each other. |
329 | | // This is to test when we receive an older SPLIT_OP that has already been processed, and its |
330 | | // children have also been processed - see the "Unable to find matching source tablet" warning. |
331 | | |
332 | | // Default cluster_ will be our producer. |
333 | 0 | ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows)); |
334 | |
|
335 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows)); |
336 | | |
337 | | // Perform one tablet split. |
338 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1)); |
339 | | |
340 | | // Perform another tablet split immediately after. |
341 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2)); |
342 | | |
343 | | // Write some more rows and check that everything is replicated correctly. |
344 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); |
345 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); |
346 | 0 | } |
347 | | |
348 | 0 | TEST_F(XClusterTabletSplitITest, SplittingOnProducerAndConsumer) { |
349 | | // Test splits on both producer and consumer while writes to the producer are happening. |
350 | | |
351 | | // Default cluster_ will be our producer. |
352 | | // Start by writing some rows and waiting for them to be replicated. |
353 | 0 | ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows)); |
354 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows)); |
355 | | |
356 | | // Setup a new thread for continuous writing to producer. |
357 | 0 | std::atomic<bool> stop(false); |
358 | 0 | std::thread write_thread([this, &stop] { |
359 | 0 | CDSAttacher attacher; |
360 | 0 | client::TableHandle producer_table; |
361 | 0 | ASSERT_OK(producer_table.Open(table_->name(), client_.get())); |
362 | 0 | auto producer_session = client_->NewSession(); |
363 | 0 | producer_session->SetTimeout(60s); |
364 | 0 | int32_t key = kDefaultNumRows; |
365 | 0 | while (!stop) { |
366 | 0 | key = (key + 1); |
367 | 0 | ASSERT_RESULT(client::kv_table_test::WriteRow( |
368 | 0 | &producer_table, producer_session, key, key, |
369 | 0 | client::WriteOpType::INSERT, client::Flush::kTrue)); |
370 | 0 | } |
371 | 0 | }); |
372 | | |
373 | | // Perform tablet splits on both sides. |
374 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1)); |
375 | 0 | SwitchToConsumer(); |
376 | 0 | ASSERT_OK(SplitAllTablets( |
377 | 0 | /* cur_num_tablets */ 1, /* parent_tablet_protected_from_deletion */ false)); |
378 | 0 | SwitchToProducer(); |
379 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2)); |
380 | 0 | SwitchToConsumer(); |
381 | 0 | ASSERT_OK(SplitAllTablets( |
382 | 0 | /* cur_num_tablets */ 2, /* parent_tablet_protected_from_deletion */ false)); |
383 | 0 | SwitchToProducer(); |
384 | | |
385 | | // Stop writes. |
386 | 0 | stop.store(true, std::memory_order_release); |
387 | 0 | write_thread.join(); |
388 | | |
389 | | // Verify that both sides have the same number of rows. |
390 | 0 | client::YBSessionPtr producer_session = client_->NewSession(); |
391 | 0 | producer_session->SetTimeout(60s); |
392 | 0 | size_t num_rows = ASSERT_RESULT(SelectRowsCount(producer_session, table_)); |
393 | |
|
394 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(num_rows)); |
395 | 0 | } |
396 | | |
397 | 0 | TEST_F(XClusterTabletSplitITest, ConsumerClusterFailureWhenProcessingSplitOp) { |
398 | 0 | ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows)); |
399 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows)); |
400 | | |
401 | | // Force consumer to fail after processing the split op. |
402 | 0 | SetAtomicFlag(true, &FLAGS_TEST_xcluster_consumer_fail_after_process_split_op); |
403 | | |
404 | | // Perform a split. |
405 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1)); |
406 | | // Write some additional rows. |
407 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); |
408 | | |
409 | | // Wait for a bit, as the consumer keeps trying to process the split_op but fails. |
410 | 0 | SleepFor(10s); |
411 | | // Check that these new rows aren't replicated since we're stuck on the split_op. |
412 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows)); |
413 | | |
414 | | // Allow for the split op to be processed properly, and check that everything is replicated. |
415 | 0 | SetAtomicFlag(false, &FLAGS_TEST_xcluster_consumer_fail_after_process_split_op); |
416 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); |
417 | |
|
418 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, 2 * kDefaultNumRows + 1)); |
419 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(3 * kDefaultNumRows)); |
420 | 0 | } |
421 | | |
422 | | class XClusterBootstrapTabletSplitITest : public XClusterTabletSplitITest { |
423 | | public: |
424 | 1 | void SetUp() override { |
425 | 1 | CdcTabletSplitITest::SetUp(); |
426 | | |
427 | | // Create the consumer cluster, but don't setup the universe replication yet. |
428 | 0 | consumer_cluster_ = ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_table_)); |
429 | 0 | consumer_client_ = ASSERT_RESULT(consumer_cluster_->CreateClient()); |
430 | 0 | } |
431 | | |
432 | | protected: |
433 | 0 | Result<string> BootstrapProducer() { |
434 | 0 | const int kStreamUuidLength = 32; |
435 | 0 | string output = VERIFY_RESULT(tools::RunAdminToolCommand( |
436 | 0 | cluster_->GetMasterAddresses(), "bootstrap_cdc_producer", table_->id())); |
437 | | // Get the bootstrap id (output format is "table id: 123, CDC bootstrap id: 123\n"). |
438 | 0 | string bootstrap_id = output.substr(output.find_last_of(' ') + 1, kStreamUuidLength); |
439 | 0 | return bootstrap_id; |
440 | 0 | } |
441 | | |
442 | 0 | CHECKED_STATUS SetupReplication(const string& bootstrap_id = "") { |
443 | 0 | VERIFY_RESULT(tools::RunAdminToolCommand( |
444 | 0 | consumer_cluster_->GetMasterAddresses(), "setup_universe_replication", kProducerClusterId, |
445 | 0 | cluster_->GetMasterAddresses(), table_->id(), bootstrap_id)); |
446 | 0 | return Status::OK(); |
447 | 0 | } |
448 | | }; |
449 | | |
450 | 0 | TEST_F(XClusterBootstrapTabletSplitITest, BootstrapWithSplits) { |
451 | | // Start by writing some rows to the producer. |
452 | 0 | ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows)); |
453 | |
|
454 | 0 | string bootstrap_id = ASSERT_RESULT(BootstrapProducer()); |
455 | | |
456 | | // Instead of doing a backup, we'll just rewrite the same rows to the consumer. |
457 | 0 | SwitchToConsumer(); |
458 | 0 | ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows)); |
459 | 0 | SwitchToProducer(); |
460 | | |
461 | | // Now before setting up replication, lets perform some splits and write some more rows. |
462 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1)); |
463 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1)); |
464 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2)); |
465 | | |
466 | | // Now setup replication. |
467 | 0 | ASSERT_OK(SetupReplication(bootstrap_id)); |
468 | | |
469 | | // Replication should work fine. |
470 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows)); |
471 | | |
472 | | // Perform an additional write + split afterwards. |
473 | 0 | ASSERT_RESULT(WriteRows(kDefaultNumRows, 2 * kDefaultNumRows + 1)); |
474 | 0 | ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 4)); |
475 | |
|
476 | 0 | ASSERT_OK(CheckForNumRowsOnConsumer(3 * kDefaultNumRows)); |
477 | 0 | } |
478 | | |
479 | | class NotSupportedTabletSplitITest : public CdcTabletSplitITest { |
480 | | public: |
481 | 3 | void SetUp() override { |
482 | 3 | CdcTabletSplitITest::SetUp(); |
483 | 3 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_xcluster_replicated_tables) = false; |
484 | 3 | } |
485 | | |
486 | | protected: |
487 | 0 | Result<docdb::DocKeyHash> SplitTabletAndCheckForNotSupported(bool restart_server) { |
488 | 0 | auto split_hash_code = VERIFY_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows)); |
489 | 0 | auto s = SplitTabletAndValidate(split_hash_code, kDefaultNumRows); |
490 | 0 | EXPECT_NOT_OK(s); |
491 | 0 | EXPECT_TRUE(s.status().IsNotSupported()) << s.status(); |
492 | |
|
493 | 0 | if (restart_server) { |
494 | | // Now try to restart the cluster and check that tablet splitting still fails. |
495 | 0 | RETURN_NOT_OK(cluster_->RestartSync()); |
496 | |
|
497 | 0 | s = SplitTabletAndValidate(split_hash_code, kDefaultNumRows); |
498 | 0 | EXPECT_NOT_OK(s); |
499 | 0 | EXPECT_TRUE(s.status().IsNotSupported()) << s.status(); |
500 | 0 | } |
501 | |
|
502 | 0 | return split_hash_code; |
503 | 0 | } |
504 | | }; |
505 | | |
506 | 0 | TEST_F(NotSupportedTabletSplitITest, SplittingWithCdcStream) { |
507 | | // Create a cdc stream for this tablet. |
508 | 0 | auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(&client_->proxy_cache(), |
509 | 0 | HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr())); |
510 | 0 | CDCStreamId stream_id; |
511 | 0 | cdc::CreateCDCStream(cdc_proxy, table_->id(), &stream_id); |
512 | | // Ensure that the cdc_state table is ready before inserting rows and splitting. |
513 | 0 | ASSERT_OK(WaitForCdcStateTableToBeReady()); |
514 | |
|
515 | 0 | LOG(INFO) << "Created a CDC stream for table " << table_.name().table_name() |
516 | 0 | << " with stream id " << stream_id; |
517 | | |
518 | | // Try splitting this tablet. |
519 | 0 | ASSERT_RESULT(SplitTabletAndCheckForNotSupported(false /* restart_server */)); |
520 | 0 | } |
521 | | |
522 | 0 | TEST_F(NotSupportedTabletSplitITest, SplittingWithXClusterReplicationOnProducer) { |
523 | | // Default cluster_ will be our producer. |
524 | | // Create a consumer universe and table, then setup universe replication. |
525 | 0 | client::TableHandle consumer_cluster_table; |
526 | 0 | auto consumer_cluster = |
527 | 0 | ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_cluster_table)); |
528 | |
|
529 | 0 | ASSERT_OK(tools::RunAdminToolCommand(consumer_cluster->GetMasterAddresses(), |
530 | 0 | "setup_universe_replication", |
531 | 0 | "", // Producer cluster id (default is set to ""). |
532 | 0 | cluster_->GetMasterAddresses(), |
533 | 0 | table_->id())); |
534 | | |
535 | | // Try splitting this tablet, and restart the server to ensure split still fails after a restart. |
536 | 0 | const auto split_hash_code = |
537 | 0 | ASSERT_RESULT(SplitTabletAndCheckForNotSupported(true /* restart_server */)); |
538 | | |
539 | | // Now delete replication and verify that the tablet can now be split. |
540 | 0 | ASSERT_OK(tools::RunAdminToolCommand( |
541 | 0 | consumer_cluster->GetMasterAddresses(), "delete_universe_replication", "")); |
542 | | // Deleting cdc streams is async so wait for that to complete. |
543 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
544 | 0 | return SplitTabletAndValidate(split_hash_code, kDefaultNumRows).ok(); |
545 | 0 | }, 20s * kTimeMultiplier, "Split tablet after deleting xCluster replication")); |
546 | |
|
547 | 0 | consumer_cluster->Shutdown(); |
548 | 0 | } |
549 | | |
550 | 0 | TEST_F(NotSupportedTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) { |
551 | | // Default cluster_ will be our consumer. |
552 | | // Create a producer universe and table, then setup universe replication. |
553 | 0 | const string kProducerClusterId = "producer"; |
554 | 0 | client::TableHandle producer_cluster_table; |
555 | 0 | auto producer_cluster = |
556 | 0 | ASSERT_RESULT(CreateNewUniverseAndTable(kProducerClusterId, &producer_cluster_table)); |
557 | |
|
558 | 0 | ASSERT_OK(tools::RunAdminToolCommand(cluster_->GetMasterAddresses(), |
559 | 0 | "setup_universe_replication", |
560 | 0 | kProducerClusterId, |
561 | 0 | producer_cluster->GetMasterAddresses(), |
562 | 0 | producer_cluster_table->id())); |
563 | | |
564 | | // Try splitting this tablet, and restart the server to ensure split still fails after a restart. |
565 | 0 | const auto split_hash_code = |
566 | 0 | ASSERT_RESULT(SplitTabletAndCheckForNotSupported(true /* restart_server */)); |
567 | | |
568 | | // Now delete replication and verify that the tablet can now be split. |
569 | 0 | ASSERT_OK(tools::RunAdminToolCommand( |
570 | 0 | cluster_->GetMasterAddresses(), "delete_universe_replication", kProducerClusterId)); |
571 | 0 | ASSERT_OK(SplitTabletAndValidate(split_hash_code, kDefaultNumRows)); |
572 | |
|
573 | 0 | producer_cluster->Shutdown(); |
574 | 0 | } |
575 | | |
576 | | } // namespace yb |