YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/xcluster-tablet-split-itest.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/cdc/cdc_service.proxy.h"
15
16
#include "yb/client/session.h"
17
#include "yb/client/table.h"
18
#include "yb/client/yb_table_name.h"
19
20
#include "yb/integration-tests/cdc_test_util.h"
21
#include "yb/integration-tests/tablet-split-itest-base.h"
22
#include "yb/master/master_ddl.proxy.h"
23
#include "yb/master/master_defaults.h"
24
#include "yb/tablet/tablet_peer.h"
25
#include "yb/tools/admin-test-base.h"
26
#include "yb/tserver/mini_tablet_server.h"
27
#include "yb/util/thread.h"
28
29
DECLARE_int32(cdc_state_table_num_tablets);
30
DECLARE_bool(enable_tablet_split_of_xcluster_replicated_tables);
31
DECLARE_uint64(snapshot_coordinator_poll_interval_ms);
32
DECLARE_bool(TEST_validate_all_tablet_candidates);
33
DECLARE_bool(TEST_xcluster_consumer_fail_after_process_split_op);
34
35
namespace yb {
36
37
class CdcTabletSplitITest : public TabletSplitITest {
38
 public:
39
11
  void SetUp() override {
40
11
    FLAGS_cdc_state_table_num_tablets = 1;
41
11
    TabletSplitITest::SetUp();
42
11
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_validate_all_tablet_candidates) = false;
43
11
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_xcluster_replicated_tables) = true;
44
45
11
    CreateSingleTablet();
46
11
  }
47
48
 protected:
49
0
  Status WaitForCdcStateTableToBeReady() {
50
0
    return WaitFor([&]() -> Result<bool> {
51
0
      master::IsCreateTableDoneRequestPB is_create_req;
52
0
      master::IsCreateTableDoneResponsePB is_create_resp;
53
54
0
      is_create_req.mutable_table()->set_table_name(master::kCdcStateTableName);
55
0
      is_create_req.mutable_table()->mutable_namespace_()->set_name(master::kSystemNamespaceName);
56
0
      master::MasterDdlProxy master_proxy(
57
0
          &client_->proxy_cache(), VERIFY_RESULT(cluster_->GetLeaderMasterBoundRpcAddr()));
58
0
      rpc::RpcController rpc;
59
0
      rpc.set_timeout(MonoDelta::FromSeconds(30));
60
61
0
      auto s = master_proxy.IsCreateTableDone(is_create_req, &is_create_resp, &rpc);
62
0
      return s.ok() && !is_create_resp.has_error() && is_create_resp.done();
63
0
    }, MonoDelta::FromSeconds(30), "Wait for cdc_state table creation to finish");
64
0
  }
65
66
  Result<std::unique_ptr<MiniCluster>> CreateNewUniverseAndTable(
67
0
      const string& cluster_id, client::TableHandle* table) {
68
    // First create the new cluster.
69
0
    MiniClusterOptions opts;
70
0
    opts.num_tablet_servers = 3;
71
0
    opts.cluster_id = cluster_id;
72
0
    std::unique_ptr<MiniCluster> cluster = std::make_unique<MiniCluster>(opts);
73
0
    RETURN_NOT_OK(cluster->Start());
74
0
    RETURN_NOT_OK(cluster->WaitForTabletServerCount(3));
75
0
    auto cluster_client = VERIFY_RESULT(cluster->CreateClient());
76
77
    // Create an identical table on the new cluster.
78
0
    client::kv_table_test::CreateTable(
79
0
        client::Transactional(GetIsolationLevel() != IsolationLevel::NON_TRANSACTIONAL),
80
0
        1,  // num_tablets
81
0
        cluster_client.get(),
82
0
        table);
83
0
    return cluster;
84
0
  }
85
};
86
87
0
TEST_F(CdcTabletSplitITest, GetChangesOnSplitParentTablet) {
88
0
  constexpr auto kNumRows = kDefaultNumRows;
89
  // Create a cdc stream for this tablet.
90
0
  auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(&client_->proxy_cache(),
91
0
      HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr()));
92
0
  CDCStreamId stream_id;
93
0
  cdc::CreateCDCStream(cdc_proxy, table_->id(), &stream_id);
94
  // Ensure that the cdc_state table is ready before inserting rows and splitting.
95
0
  ASSERT_OK(WaitForCdcStateTableToBeReady());
96
97
0
  LOG(INFO) << "Created a CDC stream for table " << table_.name().table_name()
98
0
            << " with stream id " << stream_id;
99
100
  // Write some rows to the tablet.
101
0
  const auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kNumRows));
102
0
  const auto source_tablet_id = ASSERT_RESULT(SplitTabletAndValidate(
103
0
      split_hash_code, kNumRows, /* parent_tablet_protected_from_deletion */ true));
104
105
  // Ensure that a GetChanges still works on the source tablet.
106
0
  cdc::GetChangesRequestPB change_req;
107
0
  cdc::GetChangesResponsePB change_resp;
108
109
0
  change_req.set_tablet_id(source_tablet_id);
110
0
  change_req.set_stream_id(stream_id);
111
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
112
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
113
114
0
  rpc::RpcController rpc;
115
0
  ASSERT_OK(cdc_proxy->GetChanges(change_req, &change_resp, &rpc));
116
0
  ASSERT_FALSE(change_resp.has_error());
117
118
  // Test that if the tablet leadership of the parent tablet changes we can still call GetChanges.
119
0
  StepDownAllTablets(cluster_.get());
120
121
0
  rpc.Reset();
122
0
  ASSERT_OK(cdc_proxy->GetChanges(change_req, &change_resp, &rpc));
123
0
  ASSERT_FALSE(change_resp.has_error()) << change_resp.ShortDebugString();
124
125
  // Now let the table get deleted by the background task. Need to lower the wal_retention_secs.
126
0
  master::AlterTableRequestPB alter_table_req;
127
0
  master::AlterTableResponsePB alter_table_resp;
128
0
  alter_table_req.mutable_table()->set_table_id(table_->id());
129
0
  alter_table_req.set_wal_retention_secs(1);
130
131
0
  master::MasterDdlProxy master_proxy(
132
0
      &client_->proxy_cache(), ASSERT_RESULT(cluster_->GetLeaderMasterBoundRpcAddr()));
133
0
  rpc.Reset();
134
0
  ASSERT_OK(master_proxy.AlterTable(alter_table_req, &alter_table_resp, &rpc));
135
136
0
  SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_snapshot_coordinator_poll_interval_ms));
137
138
  // Try to do a GetChanges again, it should fail.
139
0
  rpc.Reset();
140
0
  ASSERT_OK(cdc_proxy->GetChanges(change_req, &change_resp, &rpc));
141
0
  ASSERT_TRUE(change_resp.has_error());
142
0
}
143
144
// For testing xCluster setups. Since most test utility functions expect there to be only one
145
// cluster, they implicitly use cluster_ / client_ / table_ everywhere. For this test, we default
146
// those to point to the producer cluster, but allow calls to SwitchToProducer/Consumer, to swap
147
// those to point to the other cluster.
148
class XClusterTabletSplitITest : public CdcTabletSplitITest {
149
 public:
150
6
  void SetUp() override {
151
6
    CdcTabletSplitITest::SetUp();
152
153
    // Also create the consumer cluster.
154
0
    consumer_cluster_ = ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_table_));
155
0
    consumer_client_ = ASSERT_RESULT(consumer_cluster_->CreateClient());
156
157
0
    ASSERT_OK(tools::RunAdminToolCommand(
158
0
        consumer_cluster_->GetMasterAddresses(), "setup_universe_replication", kProducerClusterId,
159
0
        cluster_->GetMasterAddresses(), table_->id()));
160
0
  }
161
162
 protected:
163
0
  void DoBeforeTearDown() override {
164
0
    SwitchToConsumer();
165
0
    ASSERT_OK(tools::RunAdminToolCommand(
166
0
        cluster_->GetMasterAddresses(), "delete_universe_replication", kProducerClusterId));
167
168
    // Shutdown producer cluster here, CdcTabletSplitITest will shutdown cluster_ (consumer).
169
0
    producer_cluster_->Shutdown();
170
0
    CdcTabletSplitITest::DoBeforeTearDown();
171
0
  }
172
173
0
  void SwitchToProducer() {
174
0
    if (!producer_cluster_) {
175
0
      return;
176
0
    }
177
    // cluster_ is currently the consumer.
178
0
    consumer_cluster_ = std::move(cluster_);
179
0
    consumer_client_ = std::move(client_);
180
0
    consumer_table_ = std::move(table_);
181
0
    cluster_ = std::move(producer_cluster_);
182
0
    client_ = std::move(producer_client_);
183
0
    table_ = std::move(producer_table_);
184
0
    LOG(INFO) << "Swapped to the producer cluster.";
185
0
  }
186
187
0
  void SwitchToConsumer() {
188
0
    if (!consumer_cluster_) {
189
0
      return;
190
0
    }
191
    // cluster_ is currently the producer.
192
0
    producer_cluster_ = std::move(cluster_);
193
0
    producer_client_ = std::move(client_);
194
0
    producer_table_ = std::move(table_);
195
0
    cluster_ = std::move(consumer_cluster_);
196
0
    client_ = std::move(consumer_client_);
197
0
    table_ = std::move(consumer_table_);
198
0
    LOG(INFO) << "Swapped to the consumer cluster.";
199
0
  }
200
201
0
  CHECKED_STATUS CheckForNumRowsOnConsumer(size_t expected_num_rows) {
202
0
    client::YBClient* consumer_client(consumer_cluster_ ? consumer_client_.get() : client_.get());
203
0
    client::TableHandle* consumer_table(consumer_cluster_ ? &consumer_table_ : &table_);
204
205
0
    client::YBSessionPtr consumer_session = consumer_client->NewSession();
206
0
    consumer_session->SetTimeout(60s);
207
0
    size_t num_rows = 0;
208
0
    Status s = WaitFor([&]() -> Result<bool> {
209
0
      num_rows = VERIFY_RESULT(SelectRowsCount(consumer_session, *consumer_table));
210
0
      return num_rows == expected_num_rows;
211
0
    }, MonoDelta::FromSeconds(60), "Wait for data to be replicated");
212
213
0
    LOG(INFO) << "Found " << num_rows << " rows on consumer, expected " << expected_num_rows;
214
215
0
    return s;
216
0
  }
217
218
  CHECKED_STATUS SplitAllTablets(
219
0
      int cur_num_tablets, bool parent_tablet_protected_from_deletion = true) {
220
    // Splits all tablets for cluster_.
221
0
    auto* catalog_mgr = VERIFY_RESULT(catalog_manager());
222
0
    auto tablet_peers = ListTableActiveTabletLeadersPeers(cluster_.get(), table_->id());
223
0
    EXPECT_EQ(tablet_peers.size(), cur_num_tablets);
224
0
    for (const auto& peer : tablet_peers) {
225
0
      const auto source_tablet_ptr = peer->tablet();
226
0
      EXPECT_NE(source_tablet_ptr, nullptr);
227
0
      const auto& source_tablet = *source_tablet_ptr;
228
0
      RETURN_NOT_OK(SplitTablet(catalog_mgr, source_tablet));
229
0
    }
230
0
    size_t expected_non_split_tablets = cur_num_tablets * 2;
231
0
    size_t expected_split_tablets = parent_tablet_protected_from_deletion
232
0
                                    ? cur_num_tablets * 2 - 1
233
0
                                    : 0;
234
0
    return WaitForTabletSplitCompletion(expected_non_split_tablets, expected_split_tablets);
235
0
  }
236
237
  // Only one set of these is valid at any time.
238
  // The other cluster is accessible via cluster_ / client_ / table_.
239
  std::unique_ptr<MiniCluster> consumer_cluster_;
240
  std::unique_ptr<client::YBClient> consumer_client_;
241
  client::TableHandle consumer_table_;
242
243
  std::unique_ptr<MiniCluster> producer_cluster_;
244
  std::unique_ptr<client::YBClient> producer_client_;
245
  client::TableHandle producer_table_;
246
247
  const string kProducerClusterId = "producer";
248
};
249
250
0
TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) {
251
  // Perform a split on the consumer side and ensure replication still works.
252
253
  // To begin with, cluster_ will be our producer.
254
  // Write some rows to the producer.
255
0
  auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows));
256
257
  // Wait until the rows are all replicated on the consumer.
258
0
  ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows));
259
260
0
  SwitchToConsumer();
261
262
  // Perform a split on the CONSUMER cluster.
263
0
  ASSERT_OK(SplitTabletAndValidate(split_hash_code, kDefaultNumRows));
264
265
0
  SwitchToProducer();
266
267
  // Write another set of rows, and make sure the new poller picks up on the changes.
268
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1));
269
270
0
  ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows));
271
0
}
272
273
0
TEST_F(XClusterTabletSplitITest, SplittingWithXClusterReplicationOnProducer) {
274
  // Perform a split on the producer side and ensure replication still works.
275
276
  // Default cluster_ will be our producer.
277
0
  auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows));
278
279
  // Wait until the rows are all replicated on the consumer.
280
0
  ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows));
281
282
  // Split the tablet on the producer. Note that parent tablet will only be HIDDEN and not deleted.
283
0
  ASSERT_OK(SplitTabletAndValidate(
284
0
      split_hash_code, kDefaultNumRows, /* parent_tablet_protected_from_deletion */ true));
285
286
  // Write another set of rows, and make sure the consumer picks up on the changes.
287
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1));
288
289
0
  ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows));
290
0
}
291
292
0
TEST_F(XClusterTabletSplitITest, MultipleSplitsDuringPausedReplication) {
293
  // Simulate network partition with paused replication, then perform multiple splits on producer
294
  // before re-enabling replication. Should be able to handle all of the splits.
295
296
  // Default cluster_ will be our producer.
297
  // Start with replication disabled.
298
0
  ASSERT_OK(tools::RunAdminToolCommand(
299
0
      consumer_cluster_->GetMasterAddresses(), "set_universe_replication_enabled",
300
0
      kProducerClusterId, "0"));
301
302
  // Perform one tablet split.
303
0
  auto split_hash_code = ASSERT_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows));
304
0
  ASSERT_OK(SplitTabletAndValidate(
305
0
      split_hash_code, kDefaultNumRows, /* parent_tablet_protected_from_deletion */ true));
306
307
  // Write some more rows, and then perform another split on both children.
308
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1));
309
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2));
310
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, 2 * kDefaultNumRows + 1));
311
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 4));
312
313
  // Now re-enable replication.
314
0
  ASSERT_OK(tools::RunAdminToolCommand(
315
0
      consumer_cluster_->GetMasterAddresses(), "set_universe_replication_enabled",
316
0
      kProducerClusterId, "1"));
317
318
  // Ensure all the rows are all replicated on the consumer.
319
0
  ASSERT_OK(CheckForNumRowsOnConsumer(3 * kDefaultNumRows));
320
321
  // Write another set of rows, and make sure the consumer picks up on the changes.
322
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, 3 * kDefaultNumRows + 1));
323
324
0
  ASSERT_OK(CheckForNumRowsOnConsumer(4 * kDefaultNumRows));
325
0
}
326
327
0
TEST_F(XClusterTabletSplitITest, MultipleSplitsInSequence) {
328
  // Handle case where there are multiple SPLIT_OPs immediately after each other.
329
  // This is to test when we receive an older SPLIT_OP that has already been processed, and its
330
  // children have also been processed - see the "Unable to find matching source tablet" warning.
331
332
  // Default cluster_ will be our producer.
333
0
  ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows));
334
335
0
  ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows));
336
337
  // Perform one tablet split.
338
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1));
339
340
  // Perform another tablet split immediately after.
341
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2));
342
343
  // Write some more rows and check that everything is replicated correctly.
344
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1));
345
0
  ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows));
346
0
}
347
348
0
TEST_F(XClusterTabletSplitITest, SplittingOnProducerAndConsumer) {
349
  // Test splits on both producer and consumer while writes to the producer are happening.
350
351
  // Default cluster_ will be our producer.
352
  // Start by writing some rows and waiting for them to be replicated.
353
0
  ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows));
354
0
  ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows));
355
356
  // Setup a new thread for continuous writing to producer.
357
0
  std::atomic<bool> stop(false);
358
0
  std::thread write_thread([this, &stop] {
359
0
    CDSAttacher attacher;
360
0
    client::TableHandle producer_table;
361
0
    ASSERT_OK(producer_table.Open(table_->name(), client_.get()));
362
0
    auto producer_session = client_->NewSession();
363
0
    producer_session->SetTimeout(60s);
364
0
    int32_t key = kDefaultNumRows;
365
0
    while (!stop) {
366
0
      key = (key + 1);
367
0
      ASSERT_RESULT(client::kv_table_test::WriteRow(
368
0
          &producer_table, producer_session, key, key,
369
0
          client::WriteOpType::INSERT, client::Flush::kTrue));
370
0
    }
371
0
  });
372
373
  // Perform tablet splits on both sides.
374
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1));
375
0
  SwitchToConsumer();
376
0
  ASSERT_OK(SplitAllTablets(
377
0
      /* cur_num_tablets */ 1, /* parent_tablet_protected_from_deletion */ false));
378
0
  SwitchToProducer();
379
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2));
380
0
  SwitchToConsumer();
381
0
  ASSERT_OK(SplitAllTablets(
382
0
      /* cur_num_tablets */ 2, /* parent_tablet_protected_from_deletion */ false));
383
0
  SwitchToProducer();
384
385
  // Stop writes.
386
0
  stop.store(true, std::memory_order_release);
387
0
  write_thread.join();
388
389
  // Verify that both sides have the same number of rows.
390
0
  client::YBSessionPtr producer_session = client_->NewSession();
391
0
  producer_session->SetTimeout(60s);
392
0
  size_t num_rows = ASSERT_RESULT(SelectRowsCount(producer_session, table_));
393
394
0
  ASSERT_OK(CheckForNumRowsOnConsumer(num_rows));
395
0
}
396
397
0
TEST_F(XClusterTabletSplitITest, ConsumerClusterFailureWhenProcessingSplitOp) {
398
0
  ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows));
399
0
  ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows));
400
401
  // Force consumer to fail after processing the split op.
402
0
  SetAtomicFlag(true, &FLAGS_TEST_xcluster_consumer_fail_after_process_split_op);
403
404
  // Perform a split.
405
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1));
406
  // Write some additional rows.
407
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1));
408
409
  // Wait for a bit, as the consumer keeps trying to process the split_op but fails.
410
0
  SleepFor(10s);
411
  // Check that these new rows aren't replicated since we're stuck on the split_op.
412
0
  ASSERT_OK(CheckForNumRowsOnConsumer(kDefaultNumRows));
413
414
  // Allow for the split op to be processed properly, and check that everything is replicated.
415
0
  SetAtomicFlag(false, &FLAGS_TEST_xcluster_consumer_fail_after_process_split_op);
416
0
  ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows));
417
418
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, 2 * kDefaultNumRows + 1));
419
0
  ASSERT_OK(CheckForNumRowsOnConsumer(3 * kDefaultNumRows));
420
0
}
421
422
class XClusterBootstrapTabletSplitITest : public XClusterTabletSplitITest {
423
 public:
424
1
  void SetUp() override {
425
1
    CdcTabletSplitITest::SetUp();
426
427
    // Create the consumer cluster, but don't setup the universe replication yet.
428
0
    consumer_cluster_ = ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_table_));
429
0
    consumer_client_ = ASSERT_RESULT(consumer_cluster_->CreateClient());
430
0
  }
431
432
 protected:
433
0
  Result<string> BootstrapProducer() {
434
0
    const int kStreamUuidLength = 32;
435
0
    string output = VERIFY_RESULT(tools::RunAdminToolCommand(
436
0
        cluster_->GetMasterAddresses(), "bootstrap_cdc_producer", table_->id()));
437
    // Get the bootstrap id (output format is "table id: 123, CDC bootstrap id: 123\n").
438
0
    string bootstrap_id = output.substr(output.find_last_of(' ') + 1, kStreamUuidLength);
439
0
    return bootstrap_id;
440
0
  }
441
442
0
  CHECKED_STATUS SetupReplication(const string& bootstrap_id = "") {
443
0
    VERIFY_RESULT(tools::RunAdminToolCommand(
444
0
        consumer_cluster_->GetMasterAddresses(), "setup_universe_replication", kProducerClusterId,
445
0
        cluster_->GetMasterAddresses(), table_->id(), bootstrap_id));
446
0
    return Status::OK();
447
0
  }
448
};
449
450
0
TEST_F(XClusterBootstrapTabletSplitITest, BootstrapWithSplits) {
451
  // Start by writing some rows to the producer.
452
0
  ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows));
453
454
0
  string bootstrap_id = ASSERT_RESULT(BootstrapProducer());
455
456
  // Instead of doing a backup, we'll just rewrite the same rows to the consumer.
457
0
  SwitchToConsumer();
458
0
  ASSERT_RESULT(WriteRowsAndFlush(kDefaultNumRows));
459
0
  SwitchToProducer();
460
461
  // Now before setting up replication, lets perform some splits and write some more rows.
462
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 1));
463
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, kDefaultNumRows + 1));
464
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 2));
465
466
  // Now setup replication.
467
0
  ASSERT_OK(SetupReplication(bootstrap_id));
468
469
  // Replication should work fine.
470
0
  ASSERT_OK(CheckForNumRowsOnConsumer(2 * kDefaultNumRows));
471
472
  // Perform an additional write + split afterwards.
473
0
  ASSERT_RESULT(WriteRows(kDefaultNumRows, 2 * kDefaultNumRows + 1));
474
0
  ASSERT_OK(SplitAllTablets(/* cur_num_tablets */ 4));
475
476
0
  ASSERT_OK(CheckForNumRowsOnConsumer(3 * kDefaultNumRows));
477
0
}
478
479
class NotSupportedTabletSplitITest : public CdcTabletSplitITest {
480
 public:
481
3
  void SetUp() override {
482
3
    CdcTabletSplitITest::SetUp();
483
3
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_tablet_split_of_xcluster_replicated_tables) = false;
484
3
  }
485
486
 protected:
487
0
  Result<docdb::DocKeyHash> SplitTabletAndCheckForNotSupported(bool restart_server) {
488
0
    auto split_hash_code = VERIFY_RESULT(WriteRowsAndGetMiddleHashCode(kDefaultNumRows));
489
0
    auto s = SplitTabletAndValidate(split_hash_code, kDefaultNumRows);
490
0
    EXPECT_NOT_OK(s);
491
0
    EXPECT_TRUE(s.status().IsNotSupported()) << s.status();
492
493
0
    if (restart_server) {
494
      // Now try to restart the cluster and check that tablet splitting still fails.
495
0
      RETURN_NOT_OK(cluster_->RestartSync());
496
497
0
      s = SplitTabletAndValidate(split_hash_code, kDefaultNumRows);
498
0
      EXPECT_NOT_OK(s);
499
0
      EXPECT_TRUE(s.status().IsNotSupported()) << s.status();
500
0
    }
501
502
0
    return split_hash_code;
503
0
  }
504
};
505
506
0
TEST_F(NotSupportedTabletSplitITest, SplittingWithCdcStream) {
507
  // Create a cdc stream for this tablet.
508
0
  auto cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(&client_->proxy_cache(),
509
0
      HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr()));
510
0
  CDCStreamId stream_id;
511
0
  cdc::CreateCDCStream(cdc_proxy, table_->id(), &stream_id);
512
  // Ensure that the cdc_state table is ready before inserting rows and splitting.
513
0
  ASSERT_OK(WaitForCdcStateTableToBeReady());
514
515
0
  LOG(INFO) << "Created a CDC stream for table " << table_.name().table_name()
516
0
            << " with stream id " << stream_id;
517
518
  // Try splitting this tablet.
519
0
  ASSERT_RESULT(SplitTabletAndCheckForNotSupported(false /* restart_server */));
520
0
}
521
522
0
TEST_F(NotSupportedTabletSplitITest, SplittingWithXClusterReplicationOnProducer) {
523
  // Default cluster_ will be our producer.
524
  // Create a consumer universe and table, then setup universe replication.
525
0
  client::TableHandle consumer_cluster_table;
526
0
  auto consumer_cluster =
527
0
      ASSERT_RESULT(CreateNewUniverseAndTable("consumer", &consumer_cluster_table));
528
529
0
  ASSERT_OK(tools::RunAdminToolCommand(consumer_cluster->GetMasterAddresses(),
530
0
                                       "setup_universe_replication",
531
0
                                       "",  // Producer cluster id (default is set to "").
532
0
                                       cluster_->GetMasterAddresses(),
533
0
                                       table_->id()));
534
535
  // Try splitting this tablet, and restart the server to ensure split still fails after a restart.
536
0
  const auto split_hash_code =
537
0
      ASSERT_RESULT(SplitTabletAndCheckForNotSupported(true /* restart_server */));
538
539
  // Now delete replication and verify that the tablet can now be split.
540
0
  ASSERT_OK(tools::RunAdminToolCommand(
541
0
      consumer_cluster->GetMasterAddresses(), "delete_universe_replication", ""));
542
  // Deleting cdc streams is async so wait for that to complete.
543
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
544
0
    return SplitTabletAndValidate(split_hash_code, kDefaultNumRows).ok();
545
0
  }, 20s * kTimeMultiplier, "Split tablet after deleting xCluster replication"));
546
547
0
  consumer_cluster->Shutdown();
548
0
}
549
550
0
TEST_F(NotSupportedTabletSplitITest, SplittingWithXClusterReplicationOnConsumer) {
551
  // Default cluster_ will be our consumer.
552
  // Create a producer universe and table, then setup universe replication.
553
0
  const string kProducerClusterId = "producer";
554
0
  client::TableHandle producer_cluster_table;
555
0
  auto producer_cluster =
556
0
      ASSERT_RESULT(CreateNewUniverseAndTable(kProducerClusterId, &producer_cluster_table));
557
558
0
  ASSERT_OK(tools::RunAdminToolCommand(cluster_->GetMasterAddresses(),
559
0
                                       "setup_universe_replication",
560
0
                                       kProducerClusterId,
561
0
                                       producer_cluster->GetMasterAddresses(),
562
0
                                       producer_cluster_table->id()));
563
564
  // Try splitting this tablet, and restart the server to ensure split still fails after a restart.
565
0
  const auto split_hash_code =
566
0
      ASSERT_RESULT(SplitTabletAndCheckForNotSupported(true /* restart_server */));
567
568
  // Now delete replication and verify that the tablet can now be split.
569
0
  ASSERT_OK(tools::RunAdminToolCommand(
570
0
      cluster_->GetMasterAddresses(), "delete_universe_replication", kProducerClusterId));
571
0
  ASSERT_OK(SplitTabletAndValidate(split_hash_code, kDefaultNumRows));
572
573
0
  producer_cluster->Shutdown();
574
0
}
575
576
}  // namespace yb