YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <algorithm>
15
#include <map>
16
#include <string>
17
#include <utility>
18
#include <chrono>
19
#include <boost/assign.hpp>
20
#include <gflags/gflags.h>
21
#include <gtest/gtest.h>
22
23
#include "yb/common/ql_value.h"
24
#include "yb/common/schema.h"
25
#include "yb/common/wire_protocol.h"
26
27
#include "yb/cdc/cdc_service.h"
28
#include "yb/cdc/cdc_service.pb.h"
29
#include "yb/cdc/cdc_service.proxy.h"
30
#include "yb/client/client.h"
31
#include "yb/client/client-test-util.h"
32
#include "yb/client/meta_cache.h"
33
#include "yb/client/schema.h"
34
#include "yb/client/session.h"
35
#include "yb/client/table.h"
36
#include "yb/client/table_alterer.h"
37
#include "yb/client/table_creator.h"
38
#include "yb/client/table_handle.h"
39
#include "yb/client/transaction.h"
40
#include "yb/client/yb_op.h"
41
42
#include "yb/gutil/stl_util.h"
43
#include "yb/gutil/strings/join.h"
44
#include "yb/gutil/strings/substitute.h"
45
#include "yb/integration-tests/cdc_test_util.h"
46
#include "yb/integration-tests/mini_cluster.h"
47
#include "yb/integration-tests/twodc_test_base.h"
48
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
49
#include "yb/master/catalog_manager_if.h"
50
#include "yb/master/master_defaults.h"
51
#include "yb/master/mini_master.h"
52
#include "yb/master/master_replication.proxy.h"
53
#include "yb/master/master-test-util.h"
54
55
#include "yb/master/cdc_consumer_registry_service.h"
56
#include "yb/rpc/rpc_controller.h"
57
#include "yb/server/hybrid_clock.h"
58
#include "yb/tablet/tablet.h"
59
#include "yb/tablet/tablet_peer.h"
60
#include "yb/tserver/mini_tablet_server.h"
61
#include "yb/tserver/tablet_server.h"
62
#include "yb/tserver/ts_tablet_manager.h"
63
64
#include "yb/tserver/cdc_consumer.h"
65
#include "yb/util/atomic.h"
66
#include "yb/util/faststring.h"
67
#include "yb/util/metrics.h"
68
#include "yb/util/random.h"
69
#include "yb/util/status_log.h"
70
#include "yb/util/stopwatch.h"
71
#include "yb/util/test_util.h"
72
73
using namespace std::literals;
74
75
DECLARE_int32(replication_factor);
76
DECLARE_bool(enable_ysql);
77
DECLARE_bool(TEST_twodc_write_hybrid_time);
78
DECLARE_int32(cdc_wal_retention_time_secs);
79
DECLARE_int32(replication_failure_delay_exponent);
80
DECLARE_double(TEST_respond_write_failed_probability);
81
DECLARE_int32(cdc_max_apply_batch_num_records);
82
DECLARE_int32(async_replication_idle_delay_ms);
83
DECLARE_int32(async_replication_max_idle_wait);
84
DECLARE_int32(external_intent_cleanup_secs);
85
DECLARE_int32(yb_num_shards_per_tserver);
86
DECLARE_uint64(TEST_yb_inbound_big_calls_parse_delay_ms);
87
DECLARE_int64(rpc_throttle_threshold_bytes);
88
DECLARE_bool(enable_automatic_tablet_splitting);
89
90
namespace yb {
91
92
using client::YBClient;
93
using client::YBClientBuilder;
94
using client::YBColumnSchema;
95
using client::YBError;
96
using client::YBSchema;
97
using client::YBSchemaBuilder;
98
using client::YBSession;
99
using client::YBTable;
100
using client::YBTableAlterer;
101
using client::YBTableCreator;
102
using client::YBTableType;
103
using client::YBTableName;
104
using master::MiniMaster;
105
using tserver::MiniTabletServer;
106
using tserver::enterprise::CDCConsumer;
107
108
namespace enterprise {
109
110
using SessionTransactionPair = std::pair<client::YBSessionPtr, client::YBTransactionPtr>;
111
112
class TwoDCTest : public TwoDCTestBase, public testing::WithParamInterface<TwoDCTestParams> {
113
 public:
114
  Result<std::vector<std::shared_ptr<client::YBTable>>> SetUpWithParams(
115
      const std::vector<uint32_t>& num_consumer_tablets,
116
      const std::vector<uint32_t>& num_producer_tablets,
117
      uint32_t replication_factor,
118
0
      uint32_t num_masters = 1) {
119
0
    FLAGS_enable_ysql = false;
120
0
    TwoDCTestBase::SetUp();
121
0
    FLAGS_cdc_max_apply_batch_num_records = GetParam().batch_size;
122
0
    FLAGS_cdc_enable_replicate_intents = GetParam().enable_replicate_intents;
123
0
    FLAGS_yb_num_shards_per_tserver = 1;
124
125
0
    MiniClusterOptions opts;
126
0
    opts.num_tablet_servers = replication_factor;
127
0
    opts.num_masters = num_masters;
128
0
    FLAGS_replication_factor = replication_factor;
129
0
    opts.cluster_id = "producer";
130
0
    producer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts);
131
0
    RETURN_NOT_OK(producer_cluster()->StartSync());
132
0
    RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor));
133
134
0
    opts.cluster_id = "consumer";
135
0
    consumer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts);
136
0
    RETURN_NOT_OK(consumer_cluster()->StartSync());
137
0
    RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor));
138
139
0
    producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient());
140
0
    consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient());
141
142
0
    RETURN_NOT_OK(clock_->Init());
143
0
    producer_cluster_.txn_mgr_.emplace(producer_client(), clock_, client::LocalTabletFilter());
144
0
    consumer_cluster_.txn_mgr_.emplace(consumer_client(), clock_, client::LocalTabletFilter());
145
146
0
    YBSchemaBuilder b;
147
0
    b.AddColumn("c0")->Type(INT32)->NotNull()->HashPrimaryKey();
148
149
    // Create transactional table.
150
0
    TableProperties table_properties;
151
0
    table_properties.SetTransactional(true);
152
0
    b.SetTableProperties(table_properties);
153
0
    CHECK_OK(b.Build(&schema_));
154
155
0
    YBSchema consumer_schema;
156
0
    table_properties.SetDefaultTimeToLive(0);
157
0
    b.SetTableProperties(table_properties);
158
0
    CHECK_OK(b.Build(&consumer_schema));
159
160
0
    if (num_consumer_tablets.size() != num_producer_tablets.size()) {
161
0
      return STATUS(IllegalState,
162
0
                    Format("Num consumer tables: $0 num producer tables: $1 must be equal.",
163
0
                           num_consumer_tablets.size(), num_producer_tablets.size()));
164
0
    }
165
166
0
    std::vector<YBTableName> tables;
167
0
    std::vector<std::shared_ptr<client::YBTable>> yb_tables;
168
0
    for (uint32_t i = 0; i < num_consumer_tablets.size(); i++) {
169
0
      RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], producer_client(), &tables));
170
0
      std::shared_ptr<client::YBTable> producer_table;
171
0
      RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table));
172
0
      yb_tables.push_back(producer_table);
173
174
0
      RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], consumer_client(),
175
0
                                consumer_schema, &tables));
176
0
      std::shared_ptr<client::YBTable> consumer_table;
177
0
      RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table));
178
0
      yb_tables.push_back(consumer_table);
179
0
    }
180
181
0
    return yb_tables;
182
0
  }
183
184
  Result<YBTableName> CreateTable(YBClient* client, const std::string& namespace_name,
185
                                  const std::string& table_name, uint32_t num_tablets,
186
0
                                  const YBSchema* schema = nullptr) {
187
0
    YBTableName table(YQL_DATABASE_CQL, namespace_name, table_name);
188
0
    RETURN_NOT_OK(client->CreateNamespaceIfNotExists(table.namespace_name(),
189
0
                                                     table.namespace_type()));
190
191
0
    if (!schema) {
192
0
      schema = &schema_;
193
0
    }
194
    // Add a table, make sure it reports itself.
195
0
    std::unique_ptr<YBTableCreator> table_creator(client->NewTableCreator());
196
0
        RETURN_NOT_OK(table_creator->table_name(table)
197
0
                          .schema(schema)
198
0
                          .table_type(YBTableType::YQL_TABLE_TYPE)
199
0
                          .num_tablets(num_tablets)
200
0
                          .Create());
201
0
    return table;
202
0
  }
203
204
  Status CreateTable(
205
0
      uint32_t idx, uint32_t num_tablets, YBClient* client, std::vector<YBTableName>* tables) {
206
0
    auto table = VERIFY_RESULT(CreateTable(client, kNamespaceName, Format("test_table_$0", idx),
207
0
                                           num_tablets));
208
0
    tables->push_back(table);
209
0
    return Status::OK();
210
0
  }
211
212
  Status CreateTable(uint32_t idx, uint32_t num_tablets, YBClient* client, YBSchema schema,
213
0
                     std::vector<YBTableName>* tables) {
214
0
    auto table = VERIFY_RESULT(CreateTable(client, kNamespaceName, Format("test_table_$0", idx),
215
0
                                           num_tablets, &schema));
216
0
    tables->push_back(table);
217
0
    return Status::OK();
218
0
  }
219
220
  void WriteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table,
221
0
                     bool delete_op = false) {
222
0
    auto session = client->NewSession();
223
0
    client::TableHandle table_handle;
224
0
    ASSERT_OK(table_handle.Open(table, client));
225
0
    std::vector<std::shared_ptr<client::YBqlOp>> ops;
226
227
0
    LOG(INFO) << "Writing " << end-start << (delete_op ? " deletes" : " inserts");
228
0
    for (uint32_t i = start; i < end; i++) {
229
0
      auto op = delete_op ? table_handle.NewDeleteOp() : table_handle.NewInsertOp();
230
0
      int32_t key = i;
231
0
      auto req = op->mutable_request();
232
0
      QLAddInt32HashValue(req, key);
233
0
      ASSERT_OK(session->ApplyAndFlush(op));
234
0
    }
235
0
  }
236
237
0
  void DeleteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table) {
238
0
    WriteWorkload(start, end, client, table, true /* delete_op */);
239
0
  }
240
241
0
  std::vector<string> ScanToStrings(const YBTableName& table_name, YBClient* client) {
242
0
    client::TableHandle table;
243
0
    EXPECT_OK(table.Open(table_name, client));
244
0
    auto result = ScanTableToStrings(table);
245
0
    std::sort(result.begin(), result.end());
246
0
    return result;
247
0
  }
248
249
  Status VerifyWrittenRecords(const YBTableName& producer_table,
250
                              const YBTableName& consumer_table,
251
0
                              int timeout_secs = kRpcTimeout) {
252
0
    return LoggedWaitFor([=]() -> Result<bool> {
253
0
      auto producer_results = ScanToStrings(producer_table, producer_client());
254
0
      auto consumer_results = ScanToStrings(consumer_table, consumer_client());
255
0
      return producer_results == consumer_results;
256
0
    }, MonoDelta::FromSeconds(timeout_secs), "Verify written records");
257
0
  }
258
259
0
  Status VerifyNumRecords(const YBTableName& table, YBClient* client, size_t expected_size) {
260
0
    return LoggedWaitFor([=]() -> Result<bool> {
261
0
      auto results = ScanToStrings(table, client);
262
0
      return results.size() == expected_size;
263
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records");
264
0
  }
265
266
  Result<SessionTransactionPair> CreateSessionWithTransaction(
267
0
      YBClient* client, client::TransactionManager* txn_mgr) {
268
0
    auto session = client->NewSession();
269
0
    auto transaction = std::make_shared<client::YBTransaction>(txn_mgr);
270
0
    ReadHybridTime read_time;
271
0
    RETURN_NOT_OK(transaction->Init(IsolationLevel::SNAPSHOT_ISOLATION, read_time));
272
0
    session->SetTransaction(transaction);
273
0
    return std::make_pair(session, transaction);
274
0
  }
275
276
  void WriteIntents(uint32_t start, uint32_t end, YBClient* client,
277
                    const std::shared_ptr<YBSession>& session, const YBTableName& table,
278
0
                    bool delete_op = false) {
279
0
    client::TableHandle table_handle;
280
0
    ASSERT_OK(table_handle.Open(table, client));
281
0
    std::vector<std::shared_ptr<client::YBqlOp>> ops;
282
283
0
    for (uint32_t i = start; i < end; i++) {
284
0
      auto op = delete_op ? table_handle.NewDeleteOp() : table_handle.NewInsertOp();
285
0
      int32_t key = i;
286
0
      auto req = op->mutable_request();
287
0
      QLAddInt32HashValue(req, key);
288
0
      ASSERT_OK(session->ApplyAndFlush(op));
289
0
    }
290
0
  }
291
292
  void WriteTransactionalWorkload(uint32_t start, uint32_t end, YBClient* client,
293
                                  client::TransactionManager* txn_mgr, const YBTableName& table,
294
0
                                  bool delete_op = false) {
295
0
    auto pair = ASSERT_RESULT(CreateSessionWithTransaction(client, txn_mgr));
296
0
    ASSERT_NO_FATALS(WriteIntents(start, end, client, pair.first, table, delete_op));
297
0
    ASSERT_OK(pair.second->CommitFuture().get());
298
0
  }
299
300
 private:
301
  server::ClockPtr clock_{new server::HybridClock()};
302
303
  YBSchema schema_;
304
};
305
306
INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCTest,
307
                        ::testing::Values(TwoDCTestParams(1, true), TwoDCTestParams(1, false),
308
                                          TwoDCTestParams(0, true), TwoDCTestParams(0, false)));
309
310
0
TEST_P(TwoDCTest, SetupUniverseReplication) {
311
0
  auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, 3));
312
313
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
314
  // tables contains both producer and consumer universe tables (alternately).
315
  // Pick out just the producer tables from the list.
316
0
  producer_tables.reserve(tables.size() / 2);
317
0
  for (size_t i = 0; i < tables.size(); i += 2) {
318
0
    producer_tables.push_back(tables[i]);
319
0
  }
320
0
  ASSERT_OK(SetupUniverseReplication(
321
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
322
323
  // Verify that universe was setup on consumer.
324
0
  master::GetUniverseReplicationResponsePB resp;
325
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
326
0
  ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
327
0
  ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
328
0
  for (uint32_t i = 0; i < producer_tables.size(); i++) {
329
0
    ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id());
330
0
  }
331
332
  // Verify that CDC streams were created on producer for all tables.
333
0
  for (size_t i = 0; i < producer_tables.size(); i++) {
334
0
    master::ListCDCStreamsResponsePB stream_resp;
335
0
    ASSERT_OK(GetCDCStreamForTable(producer_tables[i]->id(), &stream_resp));
336
0
    ASSERT_EQ(stream_resp.streams_size(), 1);
337
0
    ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_tables[i]->id());
338
0
  }
339
340
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
341
0
}
342
343
0
TEST_P(TwoDCTest, SetupUniverseReplicationErrorChecking) {
344
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, 1));
345
0
  rpc::RpcController rpc;
346
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
347
0
      &consumer_client()->proxy_cache(),
348
0
      ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
349
350
0
  {
351
0
    rpc.Reset();
352
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
353
0
    master::SetupUniverseReplicationRequestPB setup_universe_req;
354
0
    master::SetupUniverseReplicationResponsePB setup_universe_resp;
355
0
    ASSERT_OK(master_proxy->SetupUniverseReplication(
356
0
      setup_universe_req, &setup_universe_resp, &rpc));
357
0
    ASSERT_TRUE(setup_universe_resp.has_error());
358
0
    std::string prefix = "Producer universe ID must be provided";
359
0
    ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix);
360
0
  }
361
362
0
  {
363
0
    rpc.Reset();
364
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
365
0
    master::SetupUniverseReplicationRequestPB setup_universe_req;
366
0
    setup_universe_req.set_producer_id(kUniverseId);
367
0
    master::SetupUniverseReplicationResponsePB setup_universe_resp;
368
0
    ASSERT_OK(master_proxy->SetupUniverseReplication(
369
0
      setup_universe_req, &setup_universe_resp, &rpc));
370
0
    ASSERT_TRUE(setup_universe_resp.has_error());
371
0
    std::string prefix = "Producer master address must be provided";
372
0
    ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix);
373
0
  }
374
375
0
  {
376
0
    rpc.Reset();
377
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
378
0
    master::SetupUniverseReplicationRequestPB setup_universe_req;
379
0
    setup_universe_req.set_producer_id(kUniverseId);
380
0
    string master_addr = producer_cluster()->GetMasterAddresses();
381
0
    auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
382
0
    HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
383
0
    setup_universe_req.add_producer_table_ids("a");
384
0
    setup_universe_req.add_producer_table_ids("b");
385
0
    setup_universe_req.add_producer_bootstrap_ids("c");
386
0
    master::SetupUniverseReplicationResponsePB setup_universe_resp;
387
0
    ASSERT_OK(master_proxy->SetupUniverseReplication(
388
0
      setup_universe_req, &setup_universe_resp, &rpc));
389
0
    ASSERT_TRUE(setup_universe_resp.has_error());
390
0
    std::string prefix = "Number of bootstrap ids must be equal to number of tables";
391
0
    ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix);
392
0
  }
393
394
0
  {
395
0
    rpc.Reset();
396
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
397
398
0
    master::SetupUniverseReplicationRequestPB setup_universe_req;
399
0
    master::SetupUniverseReplicationResponsePB setup_universe_resp;
400
0
    setup_universe_req.set_producer_id(kUniverseId);
401
0
    string master_addr = consumer_cluster()->GetMasterAddresses();
402
0
    auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
403
0
    HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
404
405
0
    setup_universe_req.add_producer_table_ids("prod_table_id_1");
406
0
    setup_universe_req.add_producer_table_ids("prod_table_id_2");
407
0
    setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_1");
408
0
    setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_2");
409
410
0
    ASSERT_OK(master_proxy->SetupUniverseReplication(
411
0
      setup_universe_req, &setup_universe_resp, &rpc));
412
0
    ASSERT_TRUE(setup_universe_resp.has_error());
413
0
    std::string prefix = "Duplicate between request master addresses";
414
0
    ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix);
415
0
  }
416
417
0
  {
418
0
    rpc.Reset();
419
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
420
421
0
    master::SetupUniverseReplicationRequestPB setup_universe_req;
422
0
    master::SetupUniverseReplicationResponsePB setup_universe_resp;
423
0
    master::SysClusterConfigEntryPB cluster_info;
424
0
    auto& cm = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->catalog_manager();
425
0
    CHECK_OK(cm.GetClusterConfig(&cluster_info));
426
0
    setup_universe_req.set_producer_id(cluster_info.cluster_uuid());
427
428
0
    string master_addr = producer_cluster()->GetMasterAddresses();
429
0
    auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
430
0
    HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
431
432
0
    setup_universe_req.add_producer_table_ids("prod_table_id_1");
433
0
    setup_universe_req.add_producer_table_ids("prod_table_id_2");
434
0
    setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_1");
435
0
    setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_2");
436
437
0
    ASSERT_OK(master_proxy->SetupUniverseReplication(
438
0
      setup_universe_req, &setup_universe_resp, &rpc));
439
0
    ASSERT_TRUE(setup_universe_resp.has_error());
440
0
    std::string prefix = "The request UUID and cluster UUID are identical.";
441
0
    ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix);
442
0
  }
443
0
}
444
445
0
TEST_P(TwoDCTest, SetupUniverseReplicationWithProducerBootstrapId) {
446
0
  constexpr int kNTabletsPerTable = 1;
447
0
  std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable};
448
0
  auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3));
449
450
0
  std::unique_ptr<client::YBClient> client;
451
0
  std::unique_ptr<cdc::CDCServiceProxy> producer_cdc_proxy;
452
0
  client = ASSERT_RESULT(consumer_cluster()->CreateClient());
453
0
  producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(
454
0
      &client->proxy_cache(),
455
0
      HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr()));
456
457
  // tables contains both producer and consumer universe tables (alternately).
458
  // Pick out just the producer tables from the list.
459
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
460
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
461
0
  producer_tables.reserve(tables.size() / 2);
462
0
  consumer_tables.reserve(tables.size() / 2);
463
0
  for (size_t i = 0; i < tables.size(); i ++) {
464
0
    if (i % 2 == 0) {
465
0
      producer_tables.push_back(tables[i]);
466
0
    } else {
467
0
      consumer_tables.push_back(tables[i]);
468
0
    }
469
0
  }
470
471
  // 1. Write some data so that we can verify that only new records get replicated
472
0
  for (const auto& producer_table : producer_tables) {
473
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
474
0
    WriteWorkload(0, 100, producer_client(), producer_table->name());
475
0
  }
476
477
0
  SleepFor(MonoDelta::FromSeconds(10));
478
0
  cdc::BootstrapProducerRequestPB req;
479
0
  cdc::BootstrapProducerResponsePB resp;
480
481
0
  for (const auto& producer_table : producer_tables) {
482
0
    req.add_table_ids(producer_table->id());
483
0
  }
484
485
0
  rpc::RpcController rpc;
486
0
  ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &resp, &rpc));
487
0
  ASSERT_FALSE(resp.has_error());
488
489
0
  ASSERT_EQ(resp.cdc_bootstrap_ids().size(), producer_tables.size());
490
491
0
  int table_idx = 0;
492
0
  for (const auto& bootstrap_id : resp.cdc_bootstrap_ids()) {
493
0
    LOG(INFO) << "Got bootstrap id " << bootstrap_id
494
0
              << " for table " << producer_tables[table_idx++]->name().table_name();
495
0
  }
496
497
0
  std::unordered_map<std::string, int> tablet_bootstraps;
498
499
  // Verify that for each of the table's tablets, a new row in cdc_state table with the returned
500
  // id was inserted.
501
0
  client::TableHandle table;
502
0
  client::YBTableName cdc_state_table(
503
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
504
0
  ASSERT_OK(table.Open(cdc_state_table, producer_client()));
505
506
  // 2 tables with 8 tablets each.
507
0
  ASSERT_EQ(tables_vector.size() * kNTabletsPerTable, boost::size(client::TableRange(table)));
508
0
  int nrows = 0;
509
0
  for (const auto& row : client::TableRange(table)) {
510
0
    nrows++;
511
0
    string stream_id = row.column(0).string_value();
512
0
    tablet_bootstraps[stream_id]++;
513
514
0
    string checkpoint = row.column(2).string_value();
515
0
    auto s = OpId::FromString(checkpoint);
516
0
    ASSERT_OK(s);
517
0
    OpId op_id = *s;
518
0
    ASSERT_GT(op_id.index, 0);
519
520
0
    LOG(INFO) << "Bootstrap id " << stream_id
521
0
              << " for tablet " << row.column(1).string_value();
522
0
  }
523
524
0
  ASSERT_EQ(tablet_bootstraps.size(), producer_tables.size());
525
  // Check that each bootstrap id has 8 tablets.
526
0
  for (const auto& e : tablet_bootstraps) {
527
0
    ASSERT_EQ(e.second, kNTabletsPerTable);
528
0
  }
529
530
  // Map table -> bootstrap_id. We will need when setting up replication.
531
0
  std::unordered_map<TableId, std::string> table_bootstrap_ids;
532
0
  for (int i = 0; i < resp.cdc_bootstrap_ids_size(); i++) {
533
0
    table_bootstrap_ids[req.table_ids(i)] = resp.cdc_bootstrap_ids(i);
534
0
  }
535
536
  // 2. Setup replication.
537
0
  master::SetupUniverseReplicationRequestPB setup_universe_req;
538
0
  master::SetupUniverseReplicationResponsePB setup_universe_resp;
539
0
  setup_universe_req.set_producer_id(kUniverseId);
540
0
  string master_addr = producer_cluster()->GetMasterAddresses();
541
0
  auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
542
0
  HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
543
544
0
  setup_universe_req.mutable_producer_table_ids()->Reserve(
545
0
      narrow_cast<int>(producer_tables.size()));
546
0
  for (const auto& producer_table : producer_tables) {
547
0
    setup_universe_req.add_producer_table_ids(producer_table->id());
548
0
    const auto& iter = table_bootstrap_ids.find(producer_table->id());
549
0
    ASSERT_NE(iter, table_bootstrap_ids.end());
550
0
    setup_universe_req.add_producer_bootstrap_ids(iter->second);
551
0
  }
552
553
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
554
0
      &consumer_client()->proxy_cache(),
555
0
      ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
556
557
0
  rpc.Reset();
558
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
559
0
  ASSERT_OK(master_proxy->SetupUniverseReplication(
560
0
    setup_universe_req, &setup_universe_resp, &rpc));
561
0
  ASSERT_FALSE(setup_universe_resp.has_error());
562
563
  // 3. Verify everything is setup correctly.
564
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
565
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
566
0
      &get_universe_replication_resp));
567
0
  ASSERT_OK(CorrectlyPollingAllTablets(
568
0
      consumer_cluster(), narrow_cast<int32_t>(tables_vector.size() * kNTabletsPerTable)));
569
570
  // 4. Write more data.
571
0
  for (const auto& producer_table : producer_tables) {
572
0
    WriteWorkload(1000, 1005, producer_client(), producer_table->name());
573
0
  }
574
575
  // 5. Verify that only new writes get replicated to consumer since we bootstrapped the producer
576
  // after we had already written some data, therefore the old data (whatever was there before we
577
  // bootstrapped the producer) should not be replicated.
578
0
  auto data_replicated_correctly = [&]() {
579
0
    for (const auto& consumer_table : consumer_tables) {
580
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
581
0
      std::vector<std::string> expected_results;
582
0
      for (int key = 1000; key < 1005; key++) {
583
0
        expected_results.emplace_back("{ int32:" + std::to_string(key) + " }");
584
0
      }
585
0
      std::sort(expected_results.begin(), expected_results.end());
586
587
0
      auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client());
588
0
      std::sort(consumer_results.begin(), consumer_results.end());
589
590
0
      if (expected_results.size() != consumer_results.size()) {
591
0
        return false;
592
0
      }
593
594
0
      for (size_t idx = 0; idx < expected_results.size(); idx++) {
595
0
        if (expected_results[idx] != consumer_results[idx]) {
596
0
          return false;
597
0
        }
598
0
      }
599
0
    }
600
0
    return true;
601
0
  };
602
0
  ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(); },
603
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
604
0
}
605
606
// Test for #2250 to verify that replication for tables with the same prefix gets set up correctly.
607
0
TEST_P(TwoDCTest, SetupUniverseReplicationMultipleTables) {
608
  // Setup the two clusters without any tables.
609
0
  auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1));
610
611
  // Create tables with the same prefix.
612
0
  std::string table_names[2] = {"table", "table_index"};
613
614
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
615
0
  for (int i = 0; i < 2; i++) {
616
0
    auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, table_names[i], 3));
617
0
    std::shared_ptr<client::YBTable> producer_table;
618
0
    ASSERT_OK(producer_client()->OpenTable(t, &producer_table));
619
0
    producer_tables.push_back(producer_table);
620
0
  }
621
622
0
  for (int i = 0; i < 2; i++) {
623
0
    ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, table_names[i], 3));
624
0
  }
625
626
  // Setup universe replication on both these tables.
627
0
  ASSERT_OK(SetupUniverseReplication(
628
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
629
630
  // Verify that universe was setup on consumer.
631
0
  master::GetUniverseReplicationResponsePB resp;
632
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
633
0
  ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
634
0
  ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
635
0
  for (uint32_t i = 0; i < producer_tables.size(); i++) {
636
0
    ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id());
637
0
  }
638
639
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
640
0
}
641
642
0
TEST_P(TwoDCTest, SetupUniverseReplicationLargeTableCount) {
643
0
  if (IsSanitizer()) {
644
0
    LOG(INFO) << "Skipping slow test";
645
0
    return;
646
0
  }
647
648
  // Setup the two clusters without any tables.
649
0
  auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1));
650
0
  FLAGS_enable_automatic_tablet_splitting = false;
651
652
  // Create a large number of tables to test the performance of setup_replication.
653
0
  int table_count = 2;
654
0
  int amplification[2] = {1, 5};
655
0
  MonoDelta setup_latency[2];
656
0
  std::string table_prefix = "stress_table_";
657
0
  bool passed_test = false;
658
659
0
  for (int retries = 0; retries < 3 && !passed_test; ++retries) {
660
0
    for (int a : {0, 1}) {
661
0
      std::vector<std::shared_ptr<client::YBTable>> producer_tables;
662
0
      for (int i = 0; i < table_count * amplification[a]; i++) {
663
0
        std::string cur_table =
664
0
            table_prefix + std::to_string(amplification[a]) + "-" + std::to_string(i);
665
0
        ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, cur_table, 3));
666
0
        auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, cur_table, 3));
667
0
        std::shared_ptr<client::YBTable> producer_table;
668
0
        ASSERT_OK(producer_client()->OpenTable(t, &producer_table));
669
0
        producer_tables.push_back(producer_table);
670
0
      }
671
672
      // Add delays to all rpc calls to simulate live environment and ensure the test is IO bound.
673
0
      FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 200;
674
0
      FLAGS_rpc_throttle_threshold_bytes = 200;
675
676
0
      auto start_time = CoarseMonoClock::Now();
677
678
      // Setup universe replication on all tables.
679
0
      ASSERT_OK(SetupUniverseReplication(
680
0
          producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
681
682
      // Verify that universe was setup on consumer.
683
0
      master::GetUniverseReplicationResponsePB resp;
684
0
      ASSERT_OK(
685
0
          VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
686
0
      ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
687
0
      ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
688
0
      for (uint32_t i = 0; i < producer_tables.size(); i++) {
689
0
        ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id());
690
0
      }
691
692
0
      setup_latency[a] = CoarseMonoClock::Now() - start_time;
693
0
      LOG(INFO) << "SetupReplication [" << a << "] took: " << setup_latency[a].ToSeconds() << "s";
694
695
      // Remove delays for cleanup and next setup.
696
0
      FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 0;
697
698
0
      ASSERT_OK(DeleteUniverseReplication(kUniverseId));
699
0
    }
700
701
    // We increased our table count by 5x, but we shouldn't have a linear latency increase.
702
0
    passed_test = (setup_latency[1] < setup_latency[0] * 3);
703
0
  }
704
705
0
  ASSERT_TRUE(passed_test);
706
0
}
707
708
0
TEST_P(TwoDCTest, BootstrapAndSetupLargeTableCount) {
709
0
  if (IsSanitizer()) {
710
0
    LOG(INFO) << "Skipping slow test";
711
0
    return;
712
0
  }
713
714
  // Setup the two clusters without any tables.
715
0
  auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1));
716
0
  FLAGS_enable_automatic_tablet_splitting = false;
717
718
  // Create a medium, then large number of tables to test the performance of our CLI commands.
719
0
  int table_count = 2;
720
0
  int amplification[2] = {1, 5};
721
0
  MonoDelta bootstrap_latency[2];
722
0
  MonoDelta setup_latency[2];
723
0
  std::string table_prefix = "stress_table_";
724
0
  bool passed_test = false;
725
726
0
  for (int retries = 0; retries < 3 && !passed_test; ++retries) {
727
0
    for (int a : {0, 1}) {
728
0
      std::vector<std::shared_ptr<client::YBTable>> producer_tables;
729
0
      for (int i = 0; i < table_count * amplification[a]; i++) {
730
0
        std::string cur_table =
731
0
            table_prefix + std::to_string(amplification[a]) + "-" + std::to_string(i);
732
0
        ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, cur_table, 3));
733
0
        auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, cur_table, 3));
734
0
        std::shared_ptr<client::YBTable> producer_table;
735
0
        ASSERT_OK(producer_client()->OpenTable(t, &producer_table));
736
0
        producer_tables.push_back(producer_table);
737
0
      }
738
739
      // Add delays to all rpc calls to simulate live environment and ensure the test is IO bound.
740
0
      FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 200;
741
0
      FLAGS_rpc_throttle_threshold_bytes = 200;
742
743
      // Performance test of BootstrapProducer.
744
0
      cdc::BootstrapProducerResponsePB boot_resp;
745
0
      {
746
0
        cdc::BootstrapProducerRequestPB req;
747
748
0
        for (const auto& producer_table : producer_tables) {
749
0
          req.add_table_ids(producer_table->id());
750
0
        }
751
752
0
        auto start_time = CoarseMonoClock::Now();
753
754
0
        auto producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(
755
0
            &producer_client()->proxy_cache(),
756
0
            HostPort::FromBoundEndpoint(
757
0
                producer_cluster()->mini_tablet_server(0)->bound_rpc_addr()));
758
0
        rpc::RpcController rpc;
759
0
        ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &boot_resp, &rpc));
760
0
        ASSERT_FALSE(boot_resp.has_error());
761
0
        ASSERT_EQ(boot_resp.cdc_bootstrap_ids().size(), producer_tables.size());
762
763
0
        bootstrap_latency[a] = CoarseMonoClock::Now() - start_time;
764
0
        LOG(INFO) << "BootstrapProducer [" << a << "] took: " << bootstrap_latency[a].ToSeconds()
765
0
                  << "s";
766
0
      }
767
768
      // Performance test of SetupReplication, with Bootstrap IDs.
769
0
      {
770
0
        auto start_time = CoarseMonoClock::Now();
771
772
        // Calling the SetupUniverse API directly so we can use producer_bootstrap_ids.
773
0
        master::SetupUniverseReplicationRequestPB req;
774
0
        master::SetupUniverseReplicationResponsePB resp;
775
0
        req.set_producer_id(kUniverseId);
776
0
        auto master_addrs = producer_cluster()->GetMasterAddresses();
777
0
        auto vec = ASSERT_RESULT(HostPort::ParseStrings(master_addrs, 0));
778
0
        HostPortsToPBs(vec, req.mutable_producer_master_addresses());
779
0
        for (const auto& table : producer_tables) {
780
0
          req.add_producer_table_ids(table->id());
781
0
        }
782
0
        for (const auto& bootstrap_id : boot_resp.cdc_bootstrap_ids()) {
783
0
          req.add_producer_bootstrap_ids(bootstrap_id);
784
0
        }
785
786
0
        auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
787
0
            &consumer_client()->proxy_cache(),
788
0
            ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
789
0
        ASSERT_OK(WaitFor(
790
0
            [&]() -> Result<bool> {
791
0
              rpc::RpcController rpc;
792
0
              rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
793
0
              if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) {
794
0
                return false;
795
0
              }
796
0
              if (resp.has_error()) {
797
0
                return false;
798
0
              }
799
0
              return true;
800
0
            },
801
0
            MonoDelta::FromSeconds(30), "Setup universe replication"));
802
803
        // Verify that universe was setup on consumer.
804
0
        {
805
0
          master::GetUniverseReplicationResponsePB resp;
806
0
          ASSERT_OK(
807
0
              VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
808
0
          ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
809
0
          ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
810
0
          for (uint32_t i = 0; i < producer_tables.size(); i++) {
811
0
            ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id());
812
0
          }
813
0
        }
814
815
0
        setup_latency[a] = CoarseMonoClock::Now() - start_time;
816
0
        LOG(INFO) << "SetupReplication [" << a << "] took: " << setup_latency[a].ToSeconds() << "s";
817
0
      }
818
819
      // Remove delays for cleanup and next setup.
820
0
      FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 0;
821
822
0
      ASSERT_OK(DeleteUniverseReplication(kUniverseId));
823
0
    }
824
    // We increased our table count by 5x, but we shouldn't have a linear latency increase.
825
    // ASSERT_LT(bootstrap_latency[1], bootstrap_latency[0] * 5);
826
0
    passed_test = (setup_latency[1] < setup_latency[0] * 3);
827
0
  }
828
0
  ASSERT_TRUE(passed_test);
829
0
}
830
831
0
TEST_P(TwoDCTest, PollWithConsumerRestart) {
832
  // Avoid long delays with node failures so we can run with more aggressive test timing
833
0
  FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms
834
835
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
836
0
  auto tables = ASSERT_RESULT(SetUpWithParams({4}, {4}, replication_factor));
837
838
0
  ASSERT_OK(SetupUniverseReplication(
839
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId,
840
0
      {tables[0]} /* all producer tables */));
841
842
  // After creating the cluster, make sure all tablets being polled for.
843
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
844
845
0
  consumer_cluster()->mini_tablet_server(0)->Shutdown();
846
847
  // After shutting down a single consumer node, the other consumers should pick up the slack.
848
0
  if (replication_factor > 1) {
849
0
    ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
850
0
  }
851
852
0
  ASSERT_OK(consumer_cluster()->mini_tablet_server(0)->Start());
853
854
  // After restarting the node.
855
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
856
857
0
  ASSERT_OK(consumer_cluster()->RestartSync());
858
859
  // After consumer cluster restart.
860
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
861
862
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
863
0
}
864
865
0
TEST_P(TwoDCTest, PollWithProducerNodesRestart) {
866
  // Avoid long delays with node failures so we can run with more aggressive test timing
867
0
  FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms
868
869
0
  uint32_t replication_factor = 3, tablet_count = 4, master_count = 3;
870
0
  auto tables = ASSERT_RESULT(
871
0
      SetUpWithParams({tablet_count}, {tablet_count}, replication_factor,  master_count));
872
873
0
  ASSERT_OK(SetupUniverseReplication(
874
0
    producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId,
875
0
    {tables[0]} /* all producer tables */, false /* leader_only */));
876
877
  // After creating the cluster, make sure all tablets being polled for.
878
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
879
880
  // Stop the Master and wait for failover.
881
0
  LOG(INFO) << "Failover to new Master";
882
0
  MiniMaster* old_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster());
883
0
  ASSERT_OK(old_master->WaitUntilCatalogManagerIsLeaderAndReadyForTests());
884
0
  ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->Shutdown();
885
0
  MiniMaster* new_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster());
886
0
  ASSERT_NE(nullptr, new_master);
887
0
  ASSERT_NE(old_master, new_master);
888
0
  ASSERT_OK(producer_cluster()->WaitForAllTabletServers());
889
890
  // Stop a TServer on the Producer after failing its master.
891
0
  producer_cluster()->mini_tablet_server(0)->Shutdown();
892
  // This Verifies:
893
  // 1. Consumer successfully transitions over to using the new master for Tablet lookup.
894
  // 2. Consumer cluster has rebalanced all the CDC Pollers
895
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
896
0
  WriteWorkload(0, 5, producer_client(), tables[0]->name());
897
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
898
899
  // Restart the Producer TServer and verify that rebalancing happens.
900
0
  ASSERT_OK(old_master->Start());
901
0
  ASSERT_OK(producer_cluster()->mini_tablet_server(0)->Start());
902
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
903
0
  WriteWorkload(6, 10, producer_client(), tables[0]->name());
904
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
905
906
  // Cleanup.
907
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
908
0
}
909
910
0
TEST_P(TwoDCTest, PollWithProducerClusterRestart) {
911
  // Avoid long delays with node failures so we can run with more aggressive test timing
912
0
  FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms
913
914
0
  uint32_t replication_factor = 3, tablet_count = 4;
915
0
  auto tables = ASSERT_RESULT(
916
0
      SetUpWithParams({tablet_count}, {tablet_count}, replication_factor));
917
918
0
  ASSERT_OK(SetupUniverseReplication(
919
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId,
920
0
      {tables[0]} /* all producer tables */));
921
922
  // After creating the cluster, make sure all tablets being polled for.
923
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
924
925
  // Restart the ENTIRE Producer cluster.
926
0
  ASSERT_OK(producer_cluster()->RestartSync());
927
928
  // After producer cluster restart.
929
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4));
930
0
  WriteWorkload(0, 5, producer_client(), tables[0]->name());
931
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
932
933
  // Cleanup.
934
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
935
0
}
936
937
938
0
TEST_P(TwoDCTest, PollAndObserveIdleDampening) {
939
0
  uint32_t replication_factor = 3, tablet_count = 1, master_count = 1;
940
0
  auto tables = ASSERT_RESULT(
941
0
      SetUpWithParams({tablet_count}, {tablet_count}, replication_factor,  master_count));
942
943
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
944
0
                                     kUniverseId, {tables[0]} , false ));
945
946
  // After creating the cluster, make sure all tablets being polled for.
947
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
948
949
  // Write some Info and query GetChanges to setup the CDCTabletMetrics.
950
0
  WriteWorkload(0, 5, producer_client(), tables[0]->name());
951
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
952
953
  /*****************************************************************
954
   * Find the CDC Tablet Metrics, which we will use for this test. *
955
   *****************************************************************/
956
  // Find the stream.
957
0
  master::ListCDCStreamsResponsePB stream_resp;
958
0
  ASSERT_OK(GetCDCStreamForTable(tables[0]->id(), &stream_resp));
959
0
  ASSERT_EQ(stream_resp.streams_size(), 1);
960
0
  ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), tables[0]->id());
961
0
  auto stream_id = stream_resp.streams(0).stream_id();
962
963
  // Find the tablet id for the stream.
964
0
  TabletId tablet_id;
965
0
  {
966
0
    yb::cdc::ListTabletsRequestPB tablets_req;
967
0
    yb::cdc::ListTabletsResponsePB tablets_resp;
968
0
    rpc::RpcController rpc;
969
0
    tablets_req.set_stream_id(stream_id);
970
971
0
    auto producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(
972
0
        &producer_client()->proxy_cache(),
973
0
        HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr()));
974
0
    ASSERT_OK(producer_cdc_proxy->ListTablets(tablets_req, &tablets_resp, &rpc));
975
0
    ASSERT_FALSE(tablets_resp.has_error());
976
0
    ASSERT_EQ(tablets_resp.tablets_size(), 1);
977
0
    tablet_id = tablets_resp.tablets(0).tablet_id();
978
0
  }
979
980
  // Find the TServer that is hosting this tablet.
981
0
  tserver::TabletServer* cdc_ts = nullptr;
982
0
  std::string ts_uuid;
983
0
  std::mutex data_mutex;
984
0
  {
985
0
    ASSERT_OK(WaitFor([this, &tablet_id, &table = tables[0], &ts_uuid, &data_mutex] {
986
0
        producer_client()->LookupTabletById(
987
0
            tablet_id,
988
0
            table,
989
            // TODO(tablet splitting + xCluster): After splitting integration is working (+ metrics
990
            // support), then set this to kTrue.
991
0
            master::IncludeInactive::kFalse,
992
0
            CoarseMonoClock::Now() + MonoDelta::FromSeconds(3),
993
0
            [&ts_uuid, &data_mutex](const Result<client::internal::RemoteTabletPtr>& result) {
994
0
              if (result.ok()) {
995
0
                std::lock_guard<std::mutex> l(data_mutex);
996
0
                ts_uuid = (*result)->LeaderTServer()->permanent_uuid();
997
0
              }
998
0
            },
999
0
            client::UseCache::kFalse);
1000
0
        std::lock_guard<std::mutex> l(data_mutex);
1001
0
        return !ts_uuid.empty();
1002
0
      }, MonoDelta::FromSeconds(10), "Get TS for Tablet"));
1003
1004
0
    for (auto ts : producer_cluster()->mini_tablet_servers()) {
1005
0
      if (ts->server()->permanent_uuid() == ts_uuid) {
1006
0
        cdc_ts = ts->server();
1007
0
        break;
1008
0
      }
1009
0
    }
1010
0
  }
1011
0
  ASSERT_NOTNULL(cdc_ts);
1012
1013
  // Find the CDCTabletMetric associated with the above pair.
1014
0
  auto cdc_service = dynamic_cast<cdc::CDCServiceImpl*>(
1015
0
    cdc_ts->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get());
1016
0
  std::shared_ptr<cdc::CDCTabletMetrics> metrics =
1017
0
      cdc_service->GetCDCTabletMetrics({"", stream_id, tablet_id});
1018
1019
  /***********************************
1020
   * Setup Complete.  Starting test. *
1021
   ***********************************/
1022
  // Log the first heartbeat count for baseline
1023
0
  auto first_heartbeat_count = metrics->rpc_heartbeats_responded->value();
1024
0
  LOG(INFO) << "first_heartbeat_count = " << first_heartbeat_count;
1025
1026
  // Write some Info to the producer, which should be consumed quickly by GetChanges.
1027
0
  WriteWorkload(6, 10, producer_client(), tables[0]->name());
1028
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1029
1030
  // Sleep for the idle timeout.
1031
0
  SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms));
1032
0
  auto active_heartbeat_count = metrics->rpc_heartbeats_responded->value();
1033
0
  LOG(INFO) << "active_heartbeat_count  = " << active_heartbeat_count;
1034
  // The new heartbeat count should be at least 3 (idle_wait)
1035
0
  ASSERT_GE(active_heartbeat_count - first_heartbeat_count, FLAGS_async_replication_max_idle_wait);
1036
1037
  // Now, wait past update request frequency, so we should be using idle timing.
1038
0
  auto multiplier = 2;
1039
0
  SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms * multiplier));
1040
0
  auto idle_heartbeat_count = metrics->rpc_heartbeats_responded->value();
1041
0
  ASSERT_LE(idle_heartbeat_count - active_heartbeat_count, multiplier + 1 /*allow subtle race*/);
1042
0
  LOG(INFO) << "idle_heartbeat_count = " << idle_heartbeat_count;
1043
1044
  // Write some more data to the producer and call GetChanges with some real data.
1045
0
  WriteWorkload(11, 15, producer_client(), tables[0]->name());
1046
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1047
1048
  // Sleep for the idle timeout and Verify that the idle behavior ended now that we have new data.
1049
0
  SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms));
1050
0
  active_heartbeat_count = metrics->rpc_heartbeats_responded->value();
1051
0
  LOG(INFO) << "active_heartbeat_count  = " << active_heartbeat_count;
1052
  // The new heartbeat count should be at least 3 (idle_wait)
1053
0
  ASSERT_GE(active_heartbeat_count - idle_heartbeat_count, FLAGS_async_replication_max_idle_wait);
1054
1055
  // Cleanup.
1056
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1057
0
}
1058
1059
0
TEST_P(TwoDCTest, ApplyOperations) {
1060
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1061
  // Use just one tablet here to more easily catch lower-level write issues with this test.
1062
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor));
1063
1064
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1065
  // tables contains both producer and consumer universe tables (alternately).
1066
  // Pick out just the producer table from the list.
1067
0
  producer_tables.reserve(1);
1068
0
  producer_tables.push_back(tables[0]);
1069
0
  ASSERT_OK(SetupUniverseReplication(
1070
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1071
1072
  // After creating the cluster, make sure all producer tablets are being polled for.
1073
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
1074
1075
0
  WriteWorkload(0, 5, producer_client(), tables[0]->name());
1076
1077
  // Check that all tablets continue to be polled for.
1078
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
1079
1080
  // Verify that both clusters have the same records.
1081
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1082
1083
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1084
0
}
1085
1086
0
TEST_P(TwoDCTest, ApplyOperationsWithTransactions) {
1087
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1088
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor));
1089
1090
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1091
  // tables contains both producer and consumer universe tables (alternately).
1092
  // Pick out just the producer table from the list.
1093
0
  producer_tables.reserve(1);
1094
0
  producer_tables.push_back(tables[0]);
1095
0
  ASSERT_OK(SetupUniverseReplication(
1096
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1097
1098
  // After creating the cluster, make sure all producer tablets are being polled for.
1099
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1100
1101
  // Write some transactional rows.
1102
0
  WriteTransactionalWorkload(0, 5, producer_client(), producer_txn_mgr(), tables[0]->name());
1103
1104
  // Write some non-transactional rows.
1105
0
  WriteWorkload(6, 10, producer_client(), tables[0]->name());
1106
1107
  // Check that all tablets continue to be polled for.
1108
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1109
1110
  // Verify that both clusters have the same records.
1111
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1112
1113
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1114
0
}
1115
1116
class TwoDCTestWithEnableIntentsReplication : public TwoDCTest {
1117
};
1118
1119
INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCTestWithEnableIntentsReplication,
1120
                        ::testing::Values(TwoDCTestParams(0, true), TwoDCTestParams(1, true)));
1121
1122
0
TEST_P(TwoDCTestWithEnableIntentsReplication, UpdateWithinTransaction) {
1123
0
  constexpr int kNumTablets = 1;
1124
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1125
0
  auto tables = ASSERT_RESULT(SetUpWithParams({kNumTablets}, {kNumTablets}, replication_factor));
1126
1127
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1128
  // tables contains both producer and consumer universe tables (alternately).
1129
  // Pick out just the producer table from the list.
1130
0
  producer_tables.reserve(1);
1131
0
  producer_tables.push_back(tables[0]);
1132
0
  ASSERT_OK(SetupUniverseReplication(
1133
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1134
1135
  // After creating the cluster, make sure all producer tablets are being polled for.
1136
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNumTablets));
1137
1138
0
  auto txn = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr()));
1139
0
  for (bool del : {false, true}) {
1140
0
    WriteIntents(1, 5, producer_client(), txn.first, tables[0]->name(), del);
1141
0
  }
1142
0
  ASSERT_OK(txn.second->CommitFuture().get());
1143
1144
0
  txn.first->SetTransaction(nullptr);
1145
0
  client::TableHandle table_handle;
1146
0
  ASSERT_OK(table_handle.Open(tables[0]->name(), producer_client()));
1147
0
  auto op = table_handle.NewInsertOp();
1148
0
  auto req = op->mutable_request();
1149
0
  QLAddInt32HashValue(req, 0);
1150
0
  ASSERT_OK(txn.first->ApplyAndFlush(op));
1151
1152
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1153
1154
  // Check that all tablets continue to be polled for.
1155
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNumTablets));
1156
1157
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1158
0
}
1159
1160
0
TEST_P(TwoDCTestWithEnableIntentsReplication, TransactionsWithRestart) {
1161
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 3));
1162
1163
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables = { tables[0] };
1164
  // tables contains both producer and consumer universe tables (alternately).
1165
  // Pick out just the producer table from the list.
1166
0
  ASSERT_OK(SetupUniverseReplication(
1167
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1168
1169
  // After creating the cluster, make sure all producer tablets are being polled for.
1170
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1171
1172
0
  auto txn = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr()));
1173
  // Write some transactional rows.
1174
0
  WriteTransactionalWorkload(
1175
0
      0, 5, producer_client(), producer_txn_mgr(), tables[0]->name(), /* delete_op */ false);
1176
1177
0
  WriteWorkload(6, 10, producer_client(), tables[0]->name());
1178
1179
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1180
0
  std::this_thread::sleep_for(5s);
1181
0
  ASSERT_OK(consumer_cluster()->FlushTablets(
1182
0
      tablet::FlushMode::kSync, tablet::FlushFlags::kRegular));
1183
0
  LOG(INFO) << "Restart";
1184
0
  ASSERT_OK(consumer_cluster()->RestartSync());
1185
0
  std::this_thread::sleep_for(5s);
1186
0
  LOG(INFO) << "Commit";
1187
0
  ASSERT_OK(txn.second->CommitFuture().get());
1188
1189
  // Verify that both clusters have the same records.
1190
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1191
1192
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1193
0
}
1194
1195
0
TEST_P(TwoDCTestWithEnableIntentsReplication, MultipleTransactions) {
1196
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1197
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor));
1198
1199
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1200
  // tables contains both producer and consumer universe tables (alternately).
1201
  // Pick out just the producer table from the list.
1202
0
  producer_tables.reserve(1);
1203
0
  producer_tables.push_back(tables[0]);
1204
0
  ASSERT_OK(SetupUniverseReplication(
1205
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1206
1207
  // After creating the cluster, make sure all producer tablets are being polled for.
1208
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
1209
1210
0
  auto txn_0 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr()));
1211
0
  auto txn_1 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr()));
1212
1213
0
  ASSERT_NO_FATALS(WriteIntents(0, 5, producer_client(), txn_0.first, tables[0]->name()));
1214
0
  ASSERT_NO_FATALS(WriteIntents(5, 10, producer_client(), txn_0.first, tables[0]->name()));
1215
0
  ASSERT_NO_FATALS(WriteIntents(10, 15, producer_client(), txn_1.first, tables[0]->name()));
1216
0
  ASSERT_NO_FATALS(WriteIntents(10, 20, producer_client(), txn_1.first, tables[0]->name()));
1217
1218
0
  ASSERT_OK(WaitFor([&]() {
1219
0
    return CountIntents(consumer_cluster()) > 0;
1220
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster replicated intents"));
1221
1222
  // Make sure that none of the intents replicated have been committed.
1223
0
  auto consumer_results = ScanToStrings(tables[1]->name(), consumer_client());
1224
0
  ASSERT_EQ(consumer_results.size(), 0);
1225
1226
0
  ASSERT_OK(txn_0.second->CommitFuture().get());
1227
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1228
1229
0
  ASSERT_OK(txn_1.second->CommitFuture().get());
1230
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1231
0
  ASSERT_OK(WaitFor([&]() {
1232
0
    return CountIntents(consumer_cluster()) == 0;
1233
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster cleaned up intents"));
1234
0
}
1235
1236
0
TEST_P(TwoDCTestWithEnableIntentsReplication, CleanupAbortedTransactions) {
1237
0
  static const int kNumRecordsPerBatch = 5;
1238
0
  const uint32_t replication_factor = NonTsanVsTsan(3, 1);
1239
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1 /* num_consumer_tablets */},
1240
0
                                              {1 /* num_producer_tablets */},
1241
0
                                              replication_factor));
1242
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1243
  // tables contains both producer and consumer universe tables (alternately).
1244
  // Pick out just the producer table from the list.
1245
0
  producer_tables.reserve(1);
1246
0
  producer_tables.push_back(tables[0]);
1247
0
  ASSERT_OK(SetupUniverseReplication(
1248
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1249
  // After creating the cluster, make sure all producer tablets are being polled for.
1250
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1 /* num_producer_tablets */));
1251
0
  auto txn_0 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr()));
1252
0
  ASSERT_NO_FATALS(WriteIntents(0, kNumRecordsPerBatch, producer_client(), txn_0.first,
1253
0
                                tables[0]->name()));
1254
  // Wait for records to be replicated.
1255
0
  ASSERT_OK(WaitFor([&]() {
1256
0
    return CountIntents(consumer_cluster()) == kNumRecordsPerBatch * replication_factor;
1257
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster created intents"));
1258
0
  ASSERT_OK(consumer_cluster()->FlushTablets());
1259
  // Then, set timeout to 0 and make sure we do cleanup on the next compaction.
1260
0
  SetAtomicFlag(0, &FLAGS_external_intent_cleanup_secs);
1261
0
  ASSERT_NO_FATALS(WriteIntents(kNumRecordsPerBatch, kNumRecordsPerBatch * 2, producer_client(),
1262
0
                                txn_0.first, tables[0]->name()));
1263
  // Wait for records to be replicated.
1264
0
  ASSERT_OK(WaitFor([&]() {
1265
0
    return CountIntents(consumer_cluster()) == 2 * kNumRecordsPerBatch * replication_factor;
1266
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster created intents"));
1267
0
  ASSERT_OK(consumer_cluster()->CompactTablets());
1268
0
  ASSERT_OK(WaitFor([&]() {
1269
0
    return CountIntents(consumer_cluster()) == 0;
1270
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster cleaned up intents"));
1271
0
  txn_0.second->Abort();
1272
0
}
1273
1274
// Make sure when we compact a tablet, we retain intents.
1275
0
TEST_P(TwoDCTestWithEnableIntentsReplication, NoCleanupOfFlushedFiles) {
1276
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1277
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor));
1278
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1279
  // tables contains both producer and consumer universe tables (alternately).
1280
  // Pick out just the producer table from the list.
1281
0
  producer_tables.reserve(1);
1282
0
  producer_tables.push_back(tables[0]);
1283
0
  ASSERT_OK(SetupUniverseReplication(
1284
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1285
1286
  // After creating the cluster, make sure all producer tablets are being polled for.
1287
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
1288
0
  auto txn_0 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr()));
1289
0
  ASSERT_NO_FATALS(WriteIntents(0, 5, producer_client(), txn_0.first, tables[0]->name()));
1290
0
  auto consumer_results = ScanToStrings(tables[1]->name(), consumer_client());
1291
0
  ASSERT_EQ(consumer_results.size(), 0);
1292
0
  ASSERT_OK(consumer_cluster()->FlushTablets());
1293
0
  ASSERT_NO_FATALS(WriteIntents(5, 10, producer_client(), txn_0.first, tables[0]->name()));
1294
0
  ASSERT_OK(consumer_cluster()->FlushTablets());
1295
0
  ASSERT_OK(consumer_cluster()->CompactTablets());
1296
  // Wait for 5 seconds to make sure background CleanupIntents thread doesn't cleanup intents on the
1297
  // consumer.
1298
0
  SleepFor(MonoDelta::FromSeconds(5));
1299
0
  ASSERT_OK(txn_0.second->CommitFuture().get());
1300
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1301
0
  ASSERT_OK(WaitFor([&]() {
1302
0
    return CountIntents(consumer_cluster()) == 0;
1303
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster cleaned up intents"));
1304
0
}
1305
1306
1307
0
TEST_P(TwoDCTestWithEnableIntentsReplication, ManyToOneTabletMapping) {
1308
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1309
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {5}, replication_factor));
1310
1311
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1312
  // tables contains both producer and consumer universe tables (alternately).
1313
  // Pick out just the producer table from the list.
1314
0
  producer_tables.reserve(1);
1315
0
  producer_tables.push_back(tables[0]);
1316
0
  ASSERT_OK(SetupUniverseReplication(
1317
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1318
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 5));
1319
1320
0
  WriteTransactionalWorkload(0, 100, producer_client(), producer_txn_mgr(), tables[0]->name());
1321
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name(), 60 /* timeout_secs */));
1322
0
}
1323
1324
0
TEST_P(TwoDCTestWithEnableIntentsReplication, OneToManyTabletMapping) {
1325
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1326
0
  auto tables = ASSERT_RESULT(SetUpWithParams({5}, {2}, replication_factor));
1327
1328
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1329
  // tables contains both producer and consumer universe tables (alternately).
1330
  // Pick out just the producer table from the list.
1331
0
  producer_tables.reserve(1);
1332
0
  producer_tables.push_back(tables[0]);
1333
0
  ASSERT_OK(SetupUniverseReplication(
1334
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1335
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1336
0
  WriteTransactionalWorkload(0, 50, producer_client(), producer_txn_mgr(), tables[0]->name());
1337
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name(), 60 /* timeout_secs */));
1338
0
}
1339
1340
0
TEST_P(TwoDCTest, TestExternalWriteHybridTime) {
1341
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1342
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor));
1343
1344
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1345
0
  producer_tables.push_back(tables[0]);
1346
0
  ASSERT_OK(SetupUniverseReplication(
1347
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1348
1349
  // After creating the cluster, make sure all producer tablets are being polled for.
1350
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1351
1352
  // Write 2 rows.
1353
0
  WriteWorkload(0, 2, producer_client(), tables[0]->name());
1354
1355
  // Ensure that records can be read.
1356
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1357
1358
  // Delete 1 record.
1359
0
  DeleteWorkload(0, 1, producer_client(), tables[0]->name());
1360
1361
  // Ensure that record is deleted on both universes.
1362
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1363
1364
  // Delete 2nd record but replicate at a low timestamp (timestamp lower than insertion timestamp).
1365
0
  FLAGS_TEST_twodc_write_hybrid_time = true;
1366
0
  DeleteWorkload(1, 2, producer_client(), tables[0]->name());
1367
1368
  // Verify that record exists on consumer universe, but is deleted from producer universe.
1369
0
  ASSERT_OK(VerifyNumRecords(tables[0]->name(), producer_client(), 0));
1370
0
  ASSERT_OK(VerifyNumRecords(tables[1]->name(), consumer_client(), 1));
1371
1372
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1373
0
}
1374
1375
0
TEST_P(TwoDCTestWithEnableIntentsReplication, BiDirectionalWrites) {
1376
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 1));
1377
1378
  // Setup bi-directional replication.
1379
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1380
0
  producer_tables.push_back(tables[0]);
1381
0
  ASSERT_OK(SetupUniverseReplication(
1382
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1383
1384
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables_reverse;
1385
0
  producer_tables_reverse.push_back(tables[1]);
1386
0
  ASSERT_OK(SetupUniverseReplication(
1387
0
      consumer_cluster(), producer_cluster(), producer_client(), kUniverseId,
1388
0
      producer_tables_reverse));
1389
1390
  // After creating the cluster, make sure all producer tablets are being polled for.
1391
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1392
0
  ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 2));
1393
1394
  // Write non-conflicting rows on both clusters.
1395
0
  WriteWorkload(0, 5, producer_client(), tables[0]->name());
1396
0
  WriteWorkload(5, 10, consumer_client(), tables[1]->name());
1397
1398
  // Ensure that records are the same on both clusters.
1399
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1400
  // Ensure that both universes have all 10 records.
1401
0
  ASSERT_OK(VerifyNumRecords(tables[0]->name(), producer_client(), 10));
1402
1403
  // Write conflicting records on both clusters (1 clusters adds key, another deletes key).
1404
0
  std::vector<std::thread> threads;
1405
0
  for (int i = 0; i < 2; ++i) {
1406
0
    auto client = i == 0 ? producer_client() : consumer_client();
1407
0
    int index = i;
1408
0
    bool is_delete = i == 0;
1409
0
    threads.emplace_back([this, client, index, tables, is_delete] {
1410
0
      WriteWorkload(10, 20, client, tables[index]->name(), is_delete);
1411
0
    });
1412
0
  }
1413
1414
0
  for (auto& thread : threads) {
1415
0
    thread.join();
1416
0
  }
1417
1418
  // Ensure that same records exist on both universes.
1419
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1420
1421
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1422
0
}
1423
1424
0
TEST_P(TwoDCTest, AlterUniverseReplicationMasters) {
1425
  // Tablets = Servers + 1 to stay simple but ensure round robin gives a tablet to everyone.
1426
0
  uint32_t t_count = 2;
1427
0
  int master_count = 3;
1428
0
  auto tables = ASSERT_RESULT(SetUpWithParams(
1429
0
      {t_count, t_count}, {t_count, t_count}, 1,  master_count));
1430
1431
  // tables contains both producer and consumer universe tables (alternately).
1432
  // Pick out just the producer table from the list.
1433
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables{tables[0], tables[2]},
1434
0
    initial_tables{tables[0]};
1435
1436
  // SetupUniverseReplication only utilizes 1 master.
1437
0
  ASSERT_OK(SetupUniverseReplication(
1438
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, initial_tables));
1439
1440
0
  master::GetUniverseReplicationResponsePB v_resp;
1441
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &v_resp));
1442
0
  ASSERT_EQ(v_resp.entry().producer_master_addresses_size(), 1);
1443
0
  ASSERT_EQ(HostPortFromPB(v_resp.entry().producer_master_addresses(0)),
1444
0
            ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1445
1446
  // After creating the cluster, make sure all producer tablets are being polled for.
1447
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count));
1448
1449
0
  LOG(INFO) << "Alter Replication to include all Masters";
1450
  // Alter Replication to include the other masters.
1451
0
  {
1452
0
    master::AlterUniverseReplicationRequestPB alter_req;
1453
0
    master::AlterUniverseReplicationResponsePB alter_resp;
1454
0
    alter_req.set_producer_id(kUniverseId);
1455
1456
    // GetMasterAddresses returns 3 masters.
1457
0
    string master_addr = producer_cluster()->GetMasterAddresses();
1458
0
    auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
1459
0
    HostPortsToPBs(hp_vec, alter_req.mutable_producer_master_addresses());
1460
1461
0
    rpc::RpcController rpc;
1462
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1463
1464
0
    auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1465
0
        &consumer_client()->proxy_cache(),
1466
0
        ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1467
0
    ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
1468
0
    ASSERT_FALSE(alter_resp.has_error());
1469
1470
    // Verify that the consumer now has all masters.
1471
0
    ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1472
0
      master::GetUniverseReplicationResponsePB tmp_resp;
1473
0
      return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
1474
0
          kUniverseId, &tmp_resp).ok() &&
1475
0
          tmp_resp.entry().producer_master_addresses_size() == master_count;
1476
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify master count increased."));
1477
0
    ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count));
1478
0
  }
1479
1480
  // Stop the old master.
1481
0
  LOG(INFO) << "Failover to new Master";
1482
0
  MiniMaster* old_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster());
1483
0
  ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->Shutdown();
1484
0
  MiniMaster* new_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster());
1485
0
  ASSERT_NE(nullptr, new_master);
1486
0
  ASSERT_NE(old_master, new_master);
1487
0
  ASSERT_OK(producer_cluster()->WaitForAllTabletServers());
1488
1489
0
  LOG(INFO) << "Add Table after Master Failover";
1490
  // Add a new table to replication and ensure that it can read using the new master config.
1491
0
  {
1492
0
    master::AlterUniverseReplicationRequestPB alter_req;
1493
0
    master::AlterUniverseReplicationResponsePB alter_resp;
1494
0
    alter_req.set_producer_id(kUniverseId);
1495
0
    alter_req.add_producer_table_ids_to_add(producer_tables[1]->id());
1496
0
    rpc::RpcController rpc;
1497
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1498
1499
0
    auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1500
0
        &consumer_client()->proxy_cache(),
1501
0
        ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1502
0
    ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
1503
0
    ASSERT_FALSE(alter_resp.has_error());
1504
1505
    // Verify that the consumer now has both tables in the universe.
1506
0
    ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1507
0
      master::GetUniverseReplicationResponsePB tmp_resp;
1508
0
      return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
1509
0
          kUniverseId, &tmp_resp).ok() &&
1510
0
          tmp_resp.entry().tables_size() == 2;
1511
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));
1512
0
    ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count * 2));
1513
0
  }
1514
1515
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1516
0
}
1517
1518
0
TEST_P(TwoDCTest, AlterUniverseReplicationTables) {
1519
  // Setup the consumer and producer cluster.
1520
0
  auto tables = ASSERT_RESULT(SetUpWithParams({3, 3}, {3, 3}, 1));
1521
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables{tables[0], tables[2]};
1522
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables{tables[1], tables[3]};
1523
1524
  // Setup universe replication on the first table.
1525
0
  auto initial_table = { producer_tables[0] };
1526
0
  ASSERT_OK(SetupUniverseReplication(
1527
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, initial_table));
1528
1529
  // Verify that universe was setup on consumer.
1530
0
  master::GetUniverseReplicationResponsePB v_resp;
1531
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &v_resp));
1532
0
  ASSERT_EQ(v_resp.entry().producer_id(), kUniverseId);
1533
0
  ASSERT_EQ(v_resp.entry().tables_size(), 1);
1534
0
  ASSERT_EQ(v_resp.entry().tables(0), producer_tables[0]->id());
1535
1536
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 3));
1537
1538
  // 'add_table'. Add the next table with the alter command.
1539
0
  {
1540
0
    master::AlterUniverseReplicationRequestPB alter_req;
1541
0
    master::AlterUniverseReplicationResponsePB alter_resp;
1542
0
    alter_req.set_producer_id(kUniverseId);
1543
0
    alter_req.add_producer_table_ids_to_add(producer_tables[1]->id());
1544
0
    rpc::RpcController rpc;
1545
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1546
1547
0
    auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1548
0
        &consumer_client()->proxy_cache(),
1549
0
        ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1550
0
    ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
1551
0
    ASSERT_FALSE(alter_resp.has_error());
1552
1553
    // Verify that the consumer now has both tables in the universe.
1554
0
    ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1555
0
      master::GetUniverseReplicationResponsePB tmp_resp;
1556
0
      return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
1557
0
                                          kUniverseId, &tmp_resp).ok() &&
1558
0
             tmp_resp.entry().tables_size() == 2;
1559
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));
1560
0
    ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 6));
1561
0
  }
1562
1563
  // Write some rows to the new table on the Producer. Ensure that the Consumer gets it.
1564
0
  WriteWorkload(6, 10, producer_client(), producer_tables[1]->name());
1565
0
  ASSERT_OK(VerifyWrittenRecords(producer_tables[1]->name(), consumer_tables[1]->name()));
1566
1567
  // 'remove_table'. Remove the original table, leaving only the new one.
1568
0
  {
1569
0
    master::AlterUniverseReplicationRequestPB alter_req;
1570
0
    master::AlterUniverseReplicationResponsePB alter_resp;
1571
0
    alter_req.set_producer_id(kUniverseId);
1572
0
    alter_req.add_producer_table_ids_to_remove(producer_tables[0]->id());
1573
0
    rpc::RpcController rpc;
1574
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1575
1576
0
    auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1577
0
        &consumer_client()->proxy_cache(),
1578
0
        ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1579
0
    ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
1580
0
    ASSERT_FALSE(alter_resp.has_error());
1581
1582
    // Verify that the consumer now has only the new table created by the previous alter.
1583
0
    ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1584
0
      return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
1585
0
          kUniverseId, &v_resp).ok() &&
1586
0
          v_resp.entry().tables_size() == 1;
1587
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table removed with alter."));
1588
0
    ASSERT_EQ(v_resp.entry().tables(0), producer_tables[1]->id());
1589
0
    ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 3));
1590
0
  }
1591
1592
0
  LOG(INFO) << "All alter tests passed.  Tearing down...";
1593
1594
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1595
0
}
1596
1597
0
TEST_P(TwoDCTest, ToggleReplicationEnabled) {
1598
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1599
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor));
1600
1601
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1602
  // tables contains both producer and consumer universe tables (alternately).
1603
  // Pick out just the producer table from the list.
1604
0
  producer_tables.reserve(1);
1605
0
  producer_tables.push_back(tables[0]);
1606
0
  ASSERT_OK(SetupUniverseReplication(
1607
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1608
1609
  // Verify that universe is now ACTIVE
1610
0
  master::GetUniverseReplicationResponsePB resp;
1611
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
1612
1613
  // After we know the universe is ACTIVE, make sure all tablets are getting polled.
1614
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1615
1616
  // Disable the replication and ensure no tablets are being polled
1617
0
  ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, false));
1618
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0));
1619
1620
  // Enable replication and ensure that all the tablets start being polled again
1621
0
  ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, true));
1622
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1623
0
}
1624
1625
0
TEST_P(TwoDCTest, TestDeleteUniverse) {
1626
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1627
1628
0
  auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, replication_factor));
1629
1630
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
1631
0
      kUniverseId, {tables[0], tables[2]} /* all producer tables */));
1632
1633
  // Verify that universe was setup on consumer.
1634
0
  master::GetUniverseReplicationResponsePB resp;
1635
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
1636
1637
  // After creating the cluster, make sure all tablets being polled for.
1638
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 12));
1639
1640
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1641
1642
0
  ASSERT_OK(VerifyUniverseReplicationDeleted(consumer_cluster(), consumer_client(), kUniverseId,
1643
0
      FLAGS_cdc_read_rpc_timeout_ms * 2));
1644
1645
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0));
1646
0
}
1647
1648
0
TEST_P(TwoDCTest, TestWalRetentionSet) {
1649
0
  FLAGS_cdc_wal_retention_time_secs = 8 * 3600;
1650
1651
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1652
0
  auto tables = ASSERT_RESULT(SetUpWithParams({8, 4, 4, 12}, {8, 4, 12, 8}, replication_factor));
1653
1654
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1655
  // tables contains both producer and consumer universe tables (alternately).
1656
  // Pick out just the producer tables from the list.
1657
0
  producer_tables.reserve(tables.size() / 2);
1658
0
  for (size_t i = 0; i < tables.size(); i += 2) {
1659
0
    producer_tables.push_back(tables[i]);
1660
0
  }
1661
0
  ASSERT_OK(SetupUniverseReplication(
1662
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1663
1664
  // Verify that universe was setup on consumer.
1665
0
  master::GetUniverseReplicationResponsePB resp;
1666
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
1667
1668
  // After creating the cluster, make sure all 32 tablets being polled for.
1669
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32));
1670
1671
0
  cdc::VerifyWalRetentionTime(producer_cluster(), "test_table_", FLAGS_cdc_wal_retention_time_secs);
1672
1673
0
  YBTableName table_name(YQL_DATABASE_CQL, kNamespaceName, "test_table_0");
1674
1675
  // Issue an ALTER TABLE request on the producer to verify that it doesn't crash.
1676
0
  auto table_alterer = producer_client()->NewTableAlterer(table_name);
1677
0
  table_alterer->AddColumn("new_col")->Type(INT32);
1678
0
  ASSERT_OK(table_alterer->timeout(MonoDelta::FromSeconds(kRpcTimeout))->Alter());
1679
1680
  // Verify that the table got altered on the producer.
1681
0
  YBSchema schema;
1682
0
  PartitionSchema partition_schema;
1683
0
  ASSERT_OK(producer_client()->GetTableSchema(table_name, &schema, &partition_schema));
1684
1685
0
  ASSERT_NE(static_cast<int>(Schema::kColumnNotFound), schema.FindColumn("new_col"));
1686
0
}
1687
1688
0
TEST_P(TwoDCTest, TestProducerUniverseExpansion) {
1689
  // Test that after new node(s) are added to producer universe, we are able to get replicated data
1690
  // from the new node(s).
1691
0
  auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 1));
1692
1693
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1694
  // tables contains both producer and consumer universe tables (alternately).
1695
  // Pick out just the producer table from the list.
1696
0
  producer_tables.reserve(1);
1697
0
  producer_tables.push_back(tables[0]);
1698
0
  ASSERT_OK(SetupUniverseReplication(
1699
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1700
1701
  // After creating the cluster, make sure all producer tablets are being polled for.
1702
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1703
1704
0
  WriteWorkload(0, 5, producer_client(), tables[0]->name());
1705
1706
  // Add new node and wait for tablets to be rebalanced.
1707
  // After rebalancing, each node will be leader for 1 tablet.
1708
0
  ASSERT_OK(producer_cluster()->AddTabletServer());
1709
0
  ASSERT_OK(producer_cluster()->WaitForTabletServerCount(2));
1710
0
  ASSERT_OK(WaitFor([&] () { return producer_client()->IsLoadBalanced(2); },
1711
0
                    MonoDelta::FromSeconds(kRpcTimeout), "IsLoadBalanced"));
1712
1713
  // Check that all tablets continue to be polled for.
1714
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2));
1715
1716
  // Write some more rows. Note that some of these rows will have the new node as the tablet leader.
1717
0
  WriteWorkload(6, 10, producer_client(), tables[0]->name());
1718
1719
  // Verify that both clusters have the same records.
1720
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1721
0
}
1722
1723
0
TEST_P(TwoDCTest, ApplyOperationsRandomFailures) {
1724
0
  SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability);
1725
1726
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1727
  // Use unequal table count so we have M:N mapping and output to multiple tablets.
1728
0
  auto tables = ASSERT_RESULT(SetUpWithParams({3}, {5}, replication_factor));
1729
1730
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1731
  // tables contains both producer and consumer universe tables (alternately).
1732
  // Pick out just the producer table from the list.
1733
0
  producer_tables.reserve(1);
1734
0
  producer_tables.push_back(tables[0]);
1735
0
  ASSERT_OK(SetupUniverseReplication(
1736
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1737
1738
  // Set up bi-directional replication.
1739
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
1740
0
  consumer_tables.reserve(1);
1741
0
  consumer_tables.push_back(tables[1]);
1742
0
  ASSERT_OK(SetupUniverseReplication(
1743
0
      consumer_cluster(), producer_cluster(), producer_client(), kUniverseId, consumer_tables));
1744
1745
  // After creating the cluster, make sure all producer tablets are being polled for.
1746
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 5));
1747
0
  ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 3));
1748
1749
  // Write 1000 entries to each cluster.
1750
0
  std::thread t1([&]() { WriteWorkload(0, 1000, producer_client(), tables[0]->name()); });
1751
0
  std::thread t2([&]() { WriteWorkload(1000, 2000, consumer_client(), tables[1]->name()); });
1752
1753
0
  t1.join();
1754
0
  t2.join();
1755
1756
  // Verify that both clusters have the same records.
1757
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1758
1759
  // Stop replication on consumer.
1760
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1761
1762
  // Stop replication on producer
1763
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId, producer_client(), producer_cluster()));
1764
0
}
1765
1766
0
TEST_P(TwoDCTest, TestInsertDeleteWorkloadWithRestart) {
1767
  // Good test for batching, make sure we can handle operations on the same key with different
1768
  // hybrid times. Then, do a restart and make sure we can successfully bootstrap the batched data.
1769
  // In additional, make sure we write exactly num_total_ops / batch_size batches to the cluster to
1770
  // ensure batching is actually enabled.
1771
0
  constexpr uint32_t num_ops_per_workload = 100;
1772
0
  constexpr uint32_t num_runs = 5;
1773
1774
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1775
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor));
1776
1777
0
  WriteWorkload(0, num_ops_per_workload, producer_client(), tables[0]->name());
1778
0
  for (size_t i = 0; i < num_runs; i++) {
1779
0
    WriteWorkload(0, num_ops_per_workload, producer_client(), tables[0]->name(), true);
1780
0
    WriteWorkload(0, num_ops_per_workload, producer_client(), tables[0]->name());
1781
0
  }
1782
1783
  // Count the number of ops in total, expect 1 batch if the batch flag is set to 0.
1784
0
  uint32_t expected_num_writes = FLAGS_cdc_max_apply_batch_num_records > 0 ?
1785
0
      (num_ops_per_workload * (num_runs * 2 + 1)) / FLAGS_cdc_max_apply_batch_num_records : 1;
1786
1787
0
  LOG(INFO) << "expected num writes: " <<expected_num_writes;
1788
1789
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1790
0
  producer_tables.reserve(1);
1791
0
  producer_tables.push_back(tables[0]);
1792
0
  ASSERT_OK(SetupUniverseReplication(
1793
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
1794
1795
0
  ASSERT_OK(LoggedWaitFor([&]() {
1796
0
    return GetSuccessfulWriteOps(consumer_cluster()) == expected_num_writes;
1797
0
  }, MonoDelta::FromSeconds(60), "Wait for all batches to finish."));
1798
1799
  // Verify that both clusters have the same records.
1800
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1801
1802
0
  ASSERT_OK(consumer_cluster()->RestartSync());
1803
1804
  // Verify that both clusters have the same records.
1805
0
  ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));
1806
  // Stop replication on consumer.
1807
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
1808
0
}
1809
1810
0
TEST_P(TwoDCTest, TestDeleteCDCStreamWithMissingStreams) {
1811
0
  uint32_t replication_factor = NonTsanVsTsan(3, 1);
1812
1813
0
  auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, replication_factor));
1814
1815
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
1816
0
      kUniverseId, {tables[0], tables[2]} /* all producer tables */));
1817
1818
  // Verify that universe was setup on consumer.
1819
0
  master::GetUniverseReplicationResponsePB resp;
1820
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
1821
1822
  // After creating the cluster, make sure all tablets being polled for.
1823
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 12));
1824
1825
  // Delete the CDC stream on the producer for a table.
1826
0
  master::ListCDCStreamsResponsePB stream_resp;
1827
0
  ASSERT_OK(GetCDCStreamForTable(tables[0]->id(), &stream_resp));
1828
0
  ASSERT_EQ(stream_resp.streams_size(), 1);
1829
0
  ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), tables[0]->id());
1830
0
  auto stream_id = stream_resp.streams(0).stream_id();
1831
1832
0
  rpc::RpcController rpc;
1833
0
  auto producer_proxy = std::make_shared<master::MasterReplicationProxy>(
1834
0
      &producer_client()->proxy_cache(),
1835
0
      ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1836
1837
0
  master::DeleteCDCStreamRequestPB delete_cdc_stream_req;
1838
0
  master::DeleteCDCStreamResponsePB delete_cdc_stream_resp;
1839
0
  delete_cdc_stream_req.add_stream_id(stream_id);
1840
0
  delete_cdc_stream_req.set_force_delete(true);
1841
1842
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1843
0
  ASSERT_OK(producer_proxy->DeleteCDCStream(
1844
0
      delete_cdc_stream_req, &delete_cdc_stream_resp, &rpc));
1845
1846
  // Try to delete the universe.
1847
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1848
0
      &consumer_client()->proxy_cache(),
1849
0
      ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1850
0
  rpc.Reset();
1851
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1852
0
  master::DeleteUniverseReplicationRequestPB delete_universe_req;
1853
0
  master::DeleteUniverseReplicationResponsePB delete_universe_resp;
1854
0
  delete_universe_req.set_producer_id(kUniverseId);
1855
0
  delete_universe_req.set_ignore_errors(false);
1856
0
  ASSERT_OK(
1857
0
      master_proxy->DeleteUniverseReplication(delete_universe_req, &delete_universe_resp, &rpc));
1858
  // Ensure that the error message describes the missing stream and related table.
1859
0
  ASSERT_TRUE(delete_universe_resp.has_error());
1860
0
  std::string prefix = "Could not find the following streams:";
1861
0
  const auto error_str = delete_universe_resp.error().status().message();
1862
0
  ASSERT_TRUE(error_str.substr(0, prefix.size()) == prefix);
1863
0
  ASSERT_NE(error_str.find(stream_id), string::npos);
1864
0
  ASSERT_NE(error_str.find(tables[0]->id()), string::npos);
1865
1866
  // Force the delete.
1867
0
  rpc.Reset();
1868
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1869
0
  delete_universe_req.set_ignore_errors(true);
1870
0
  ASSERT_OK(
1871
0
      master_proxy->DeleteUniverseReplication(delete_universe_req, &delete_universe_resp, &rpc));
1872
1873
  // Ensure that the delete is now succesful.
1874
0
  ASSERT_OK(VerifyUniverseReplicationDeleted(consumer_cluster(), consumer_client(), kUniverseId,
1875
0
      FLAGS_cdc_read_rpc_timeout_ms * 2));
1876
1877
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0));
1878
0
}
1879
1880
0
TEST_P(TwoDCTest, TestAlterWhenProducerIsInaccessible) {
1881
0
  auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, 1));
1882
1883
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
1884
0
      kUniverseId, {tables[0]} /* all producer tables */));
1885
1886
  // Verify everything is setup correctly.
1887
0
  master::GetUniverseReplicationResponsePB resp;
1888
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
1889
1890
  // Stop the producer master.
1891
0
  producer_cluster()->mini_master(0)->Shutdown();
1892
1893
  // Try to alter replication.
1894
0
  master::AlterUniverseReplicationRequestPB alter_req;
1895
0
  master::AlterUniverseReplicationResponsePB alter_resp;
1896
0
  alter_req.set_producer_id(kUniverseId);
1897
0
  alter_req.add_producer_table_ids_to_add("123");  // Doesn't matter as we cannot connect.
1898
0
  rpc::RpcController rpc;
1899
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1900
1901
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1902
0
      &consumer_client()->proxy_cache(),
1903
0
      ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());
1904
1905
  // Ensure that we just return an error and don't have a fatal.
1906
0
  ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
1907
0
  ASSERT_TRUE(alter_resp.has_error());
1908
0
}
1909
1910
} // namespace enterprise
1911
} // namespace yb