YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc_ysql-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <algorithm>
15
#include <map>
16
#include <string>
17
#include <utility>
18
#include <chrono>
19
#include <boost/assign.hpp>
20
#include <gflags/gflags.h>
21
#include <gtest/gtest.h>
22
23
#include "yb/common/common.pb.h"
24
#include "yb/common/entity_ids.h"
25
#include "yb/common/ql_value.h"
26
#include "yb/common/schema.h"
27
#include "yb/common/wire_protocol.h"
28
29
#include "yb/cdc/cdc_service.h"
30
#include "yb/cdc/cdc_service.pb.h"
31
#include "yb/cdc/cdc_service.proxy.h"
32
#include "yb/client/client.h"
33
#include "yb/client/client-test-util.h"
34
#include "yb/client/meta_cache.h"
35
#include "yb/client/schema.h"
36
#include "yb/client/session.h"
37
#include "yb/client/table.h"
38
#include "yb/client/table_alterer.h"
39
#include "yb/client/table_creator.h"
40
#include "yb/client/table_handle.h"
41
#include "yb/client/transaction.h"
42
#include "yb/client/yb_op.h"
43
44
#include "yb/gutil/stl_util.h"
45
#include "yb/gutil/strings/join.h"
46
#include "yb/gutil/strings/substitute.h"
47
#include "yb/integration-tests/cdc_test_util.h"
48
#include "yb/integration-tests/mini_cluster.h"
49
#include "yb/integration-tests/twodc_test_base.h"
50
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
51
52
#include "yb/master/cdc_consumer_registry_service.h"
53
#include "yb/master/mini_master.h"
54
#include "yb/master/master.h"
55
#include "yb/master/master_cluster.proxy.h"
56
#include "yb/master/master_ddl.proxy.h"
57
#include "yb/master/master_replication.proxy.h"
58
#include "yb/master/master-test-util.h"
59
#include "yb/master/sys_catalog_initialization.h"
60
61
#include "yb/rpc/rpc_controller.h"
62
#include "yb/tablet/tablet.h"
63
#include "yb/tablet/tablet_peer.h"
64
#include "yb/tserver/mini_tablet_server.h"
65
#include "yb/tserver/tablet_server.h"
66
#include "yb/tserver/ts_tablet_manager.h"
67
68
#include "yb/tserver/cdc_consumer.h"
69
#include "yb/util/atomic.h"
70
#include "yb/util/faststring.h"
71
#include "yb/util/format.h"
72
#include "yb/util/monotime.h"
73
#include "yb/util/random.h"
74
#include "yb/util/random_util.h"
75
#include "yb/util/result.h"
76
#include "yb/util/stopwatch.h"
77
#include "yb/util/test_util.h"
78
#include "yb/util/test_macros.h"
79
#include "yb/yql/pgwrapper/libpq_utils.h"
80
#include "yb/yql/pgwrapper/pg_wrapper.h"
81
82
DECLARE_int32(replication_factor);
83
DECLARE_int32(cdc_max_apply_batch_num_records);
84
DECLARE_int32(client_read_write_timeout_ms);
85
DECLARE_int32(pgsql_proxy_webserver_port);
86
DECLARE_bool(enable_ysql);
87
DECLARE_bool(hide_pg_catalog_table_creation_logs);
88
DECLARE_bool(master_auto_run_initdb);
89
DECLARE_int32(pggate_rpc_timeout_secs);
90
DECLARE_bool(enable_delete_truncate_xcluster_replicated_table);
91
92
namespace yb {
93
94
using client::YBClient;
95
using client::YBClientBuilder;
96
using client::YBColumnSchema;
97
using client::YBError;
98
using client::YBSchema;
99
using client::YBSchemaBuilder;
100
using client::YBSession;
101
using client::YBTable;
102
using client::YBTableAlterer;
103
using client::YBTableCreator;
104
using client::YBTableType;
105
using client::YBTableName;
106
using master::GetNamespaceInfoResponsePB;
107
using master::MiniMaster;
108
using tserver::MiniTabletServer;
109
using tserver::enterprise::CDCConsumer;
110
111
using pgwrapper::ToString;
112
using pgwrapper::GetInt32;
113
using pgwrapper::PGConn;
114
using pgwrapper::PGResultPtr;
115
using pgwrapper::PgSupervisor;
116
117
namespace enterprise {
118
119
constexpr static const char* const kKeyColumnName = "key";
120
121
class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface<TwoDCTestParams> {
122
 public:
123
0
  Status Initialize(uint32_t replication_factor, uint32_t num_masters = 1) {
124
0
    master::SetDefaultInitialSysCatalogSnapshotFlags();
125
0
    TwoDCTestBase::SetUp();
126
0
    FLAGS_enable_ysql = true;
127
0
    FLAGS_master_auto_run_initdb = true;
128
0
    FLAGS_hide_pg_catalog_table_creation_logs = true;
129
0
    FLAGS_pggate_rpc_timeout_secs = 120;
130
0
    FLAGS_cdc_max_apply_batch_num_records = GetParam().batch_size;
131
0
    FLAGS_cdc_enable_replicate_intents = GetParam().enable_replicate_intents;
132
133
0
    MiniClusterOptions opts;
134
0
    opts.num_tablet_servers = replication_factor;
135
0
    opts.num_masters = num_masters;
136
0
    FLAGS_replication_factor = replication_factor;
137
0
    opts.cluster_id = "producer";
138
0
    producer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts);
139
0
    RETURN_NOT_OK(producer_cluster()->StartSync());
140
0
    RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor));
141
0
    RETURN_NOT_OK(WaitForInitDb(producer_cluster()));
142
143
0
    opts.cluster_id = "consumer";
144
0
    consumer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts);
145
0
    RETURN_NOT_OK(consumer_cluster()->StartSync());
146
0
    RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor));
147
0
    RETURN_NOT_OK(WaitForInitDb(consumer_cluster()));
148
149
0
    producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient());
150
0
    consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient());
151
152
0
    RETURN_NOT_OK(InitPostgres(&producer_cluster_));
153
0
    RETURN_NOT_OK(InitPostgres(&consumer_cluster_));
154
155
0
    return Status::OK();
156
0
  }
157
158
  Result<std::vector<std::shared_ptr<client::YBTable>>>
159
      SetUpWithParams(std::vector<uint32_t> num_consumer_tablets,
160
                      std::vector<uint32_t> num_producer_tablets,
161
                      uint32_t replication_factor,
162
                      uint32_t num_masters = 1,
163
                      bool colocated = false,
164
0
                      boost::optional<std::string> tablegroup_name = boost::none) {
165
0
    RETURN_NOT_OK(Initialize(replication_factor, num_masters));
166
167
0
    if (num_consumer_tablets.size() != num_producer_tablets.size()) {
168
0
      return STATUS(IllegalState,
169
0
                    Format("Num consumer tables: $0 num producer tables: $1 must be equal.",
170
0
                           num_consumer_tablets.size(), num_producer_tablets.size()));
171
0
    }
172
173
0
    RETURN_NOT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, colocated));
174
0
    RETURN_NOT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, colocated));
175
176
0
    if (tablegroup_name.has_value()) {
177
0
      RETURN_NOT_OK(CreateTablegroup(&producer_cluster_, kNamespaceName, tablegroup_name.get()));
178
0
      RETURN_NOT_OK(CreateTablegroup(&consumer_cluster_, kNamespaceName, tablegroup_name.get()));
179
0
    }
180
181
0
    std::vector<YBTableName> tables;
182
0
    std::vector<std::shared_ptr<client::YBTable>> yb_tables;
183
0
    for (uint32_t i = 0; i < num_consumer_tablets.size(); i++) {
184
0
      RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], &producer_cluster_,
185
0
                                &tables, tablegroup_name, colocated));
186
0
      std::shared_ptr<client::YBTable> producer_table;
187
0
      RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table));
188
0
      yb_tables.push_back(producer_table);
189
190
0
      RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], &consumer_cluster_,
191
0
                                &tables, tablegroup_name, colocated));
192
0
      std::shared_ptr<client::YBTable> consumer_table;
193
0
      RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table));
194
0
      yb_tables.push_back(consumer_table);
195
0
    }
196
197
0
    return yb_tables;
198
0
  }
199
200
0
  Status InitPostgres(Cluster* cluster) {
201
0
    auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers());
202
0
    auto port = cluster->mini_cluster_->AllocateFreePort();
203
0
    yb::pgwrapper::PgProcessConf pg_process_conf =
204
0
        VERIFY_RESULT(yb::pgwrapper::PgProcessConf::CreateValidateAndRunInitDb(
205
0
            yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), port)),
206
0
            pg_ts->options()->fs_opts.data_paths.front() + "/pg_data",
207
0
            pg_ts->server()->GetSharedMemoryFd()));
208
0
    pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag;
209
0
    pg_process_conf.force_disable_log_file = true;
210
0
    FLAGS_pgsql_proxy_webserver_port = cluster->mini_cluster_->AllocateFreePort();
211
212
0
    LOG(INFO) << "Starting PostgreSQL server listening on "
213
0
              << pg_process_conf.listen_addresses << ":" << pg_process_conf.pg_port << ", data: "
214
0
              << pg_process_conf.data_dir
215
0
              << ", pgsql webserver port: " << FLAGS_pgsql_proxy_webserver_port;
216
0
    cluster->pg_supervisor_ = std::make_unique<yb::pgwrapper::PgSupervisor>(pg_process_conf);
217
0
    RETURN_NOT_OK(cluster->pg_supervisor_->Start());
218
219
0
    cluster->pg_host_port_ = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port);
220
0
    return Status::OK();
221
0
  }
222
223
  Status CreateDatabase(Cluster* cluster,
224
                        const std::string& namespace_name = kNamespaceName,
225
0
                        bool colocated = false) {
226
0
    auto conn = EXPECT_RESULT(cluster->Connect());
227
0
    EXPECT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1",
228
0
                                 namespace_name, colocated ? " colocated = true" : ""));
229
0
    return Status::OK();
230
0
  }
231
232
0
  Result<string> GetUniverseId(Cluster* cluster) {
233
0
    master::GetMasterClusterConfigRequestPB req;
234
0
    master::GetMasterClusterConfigResponsePB resp;
235
236
0
    master::MasterClusterProxy master_proxy(
237
0
        &cluster->client_->proxy_cache(),
238
0
        VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr()));
239
240
0
    rpc::RpcController rpc;
241
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
242
0
    RETURN_NOT_OK(master_proxy.GetMasterClusterConfig(req, &resp, &rpc));
243
0
    if (resp.has_error()) {
244
0
      return STATUS(IllegalState, "Error getting cluster config");
245
0
    }
246
0
    return resp.cluster_config().cluster_uuid();
247
0
  }
248
249
  Result<YBTableName> CreateTable(Cluster* cluster,
250
                                  const std::string& namespace_name,
251
                                  const std::string& table_name,
252
                                  const boost::optional<std::string>& tablegroup_name,
253
                                  uint32_t num_tablets,
254
                                  bool colocated = false,
255
0
                                  const int table_oid = 0) {
256
0
    auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name));
257
0
    std::string table_oid_string = "";
258
0
    if (table_oid > 0) {
259
      // Need to turn on session flag to allow for CREATE WITH table_oid.
260
0
      EXPECT_OK(conn.Execute("set yb_enable_create_with_table_oid=true"));
261
0
      table_oid_string = Format("table_oid = $0", table_oid);
262
0
    }
263
0
    std::string query = Format("CREATE TABLE $0($1 int PRIMARY KEY) ", table_name, kKeyColumnName);
264
    // One cannot use tablegroup together with split into tablets.
265
0
    if (tablegroup_name.has_value()) {
266
0
      std::string with_clause =
267
0
          table_oid_string.empty() ? "" : Format("WITH ($0) ", table_oid_string);
268
0
      std::string tablegroup_clause = Format("TABLEGROUP $0", tablegroup_name.value());
269
0
      query += Format("$0$1", with_clause, tablegroup_clause);
270
0
    } else {
271
0
      std::string colocated_clause = Format("colocated = $0", colocated);
272
0
      std::string with_clause =
273
0
          table_oid_string.empty() ? colocated_clause
274
0
                                   : Format("$0, $1", table_oid_string, colocated_clause);
275
0
      query += Format("WITH ($0) SPLIT INTO $1 TABLETS", with_clause, num_tablets);
276
0
    }
277
0
    EXPECT_OK(conn.Execute(query));
278
0
    return GetTable(cluster, namespace_name, table_name);
279
0
  }
280
281
  Status CreateTable(uint32_t idx, uint32_t num_tablets, Cluster* cluster,
282
                     std::vector<YBTableName>* tables,
283
                     const boost::optional<std::string>& tablegroup_name,
284
0
                     bool colocated = false) {
285
    /*
286
     * If we either have tablegroup name or colocated flag
287
     * Generate table_oid based on index so that we have the same table_oid for producer/consumer.
288
     */
289
0
    const int table_oid = (tablegroup_name.has_value() || colocated) ? (idx + 1) * 111111 : 0;
290
0
    auto table = VERIFY_RESULT(CreateTable(cluster, kNamespaceName, Format("test_table_$0", idx),
291
0
                                           tablegroup_name, num_tablets, colocated, table_oid));
292
0
    tables->push_back(table);
293
0
    return Status::OK();
294
0
  }
295
296
  Result<YBTableName> GetTable(Cluster* cluster,
297
                               const std::string& namespace_name,
298
                               const std::string& table_name,
299
                               bool verify_table_name = true,
300
0
                               bool exclude_system_tables = true) {
301
0
    master::ListTablesRequestPB req;
302
0
    master::ListTablesResponsePB resp;
303
304
0
    req.set_name_filter(table_name);
305
0
    req.mutable_namespace_()->set_name(namespace_name);
306
0
    req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL);
307
0
    if (!exclude_system_tables) {
308
0
      req.set_exclude_system_tables(true);
309
0
      req.add_relation_type_filter(master::USER_TABLE_RELATION);
310
0
    }
311
312
0
    master::MasterDdlProxy master_proxy(
313
0
        &cluster->client_->proxy_cache(),
314
0
        VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr());
315
316
0
    rpc::RpcController rpc;
317
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
318
0
    RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc));
319
0
    if (resp.has_error()) {
320
0
      return STATUS(IllegalState, "Failed listing tables");
321
0
    }
322
323
    // Now need to find the table and return it.
324
0
    for (const auto& table : resp.tables()) {
325
      // If !verify_table_name, just return the first table.
326
0
      if (!verify_table_name ||
327
0
          (table.name() == table_name && table.namespace_().name() == namespace_name)) {
328
0
        YBTableName yb_table;
329
0
        yb_table.set_table_id(table.id());
330
0
        yb_table.set_namespace_id(table.namespace_().id());
331
0
        yb_table.set_pgschema_name(table.pgschema_name());
332
0
        return yb_table;
333
0
      }
334
0
    }
335
0
    return STATUS(IllegalState,
336
0
                  strings::Substitute("Unable to find table $0 in namespace $1",
337
0
                                      table_name, namespace_name));
338
0
  }
339
340
  /*
341
   * TODO (#11597): Given one is not able to get tablegroup ID by name, currently this works by
342
   * getting the first available tablegroup appearing in the namespace.
343
   */
344
0
  Result<TablegroupId> GetTablegroup(Cluster* cluster, const std::string& namespace_name) {
345
    // Lookup the namespace id from the namespace name.
346
0
    std::string namespace_id;
347
0
    {
348
0
      master::ListNamespacesRequestPB req;
349
0
      master::ListNamespacesResponsePB resp;
350
0
      master::MasterDdlProxy master_proxy(
351
0
          &cluster->client_->proxy_cache(),
352
0
          VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr());
353
354
0
      rpc::RpcController rpc;
355
0
      rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
356
357
0
      RETURN_NOT_OK(master_proxy.ListNamespaces(req, &resp, &rpc));
358
0
      if (resp.has_error()) {
359
0
        return STATUS(IllegalState, "Failed to get namespace info");
360
0
      }
361
362
      // Find and return the namespace id.
363
0
      bool namespaceFound = false;
364
0
      for (const auto& entry : resp.namespaces()) {
365
0
        if (entry.name() == namespace_name) {
366
0
          namespaceFound = true;
367
0
          namespace_id = entry.id();
368
0
          break;
369
0
        }
370
0
      }
371
372
0
      if (!namespaceFound) {
373
0
        return STATUS(IllegalState, "Failed to find namespace");
374
0
      }
375
0
    }
376
377
0
    master::ListTablegroupsRequestPB req;
378
0
    master::ListTablegroupsResponsePB resp;
379
0
    master::MasterDdlProxy master_proxy(
380
0
        &cluster->client_->proxy_cache(),
381
0
        VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr());
382
383
0
    req.set_namespace_id(namespace_id);
384
385
0
    rpc::RpcController rpc;
386
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
387
388
0
    RETURN_NOT_OK(master_proxy.ListTablegroups(req, &resp, &rpc));
389
0
    if (resp.has_error()) {
390
0
      return STATUS(IllegalState, "Failed listing tablegroups");
391
0
    }
392
393
    // Find and return the tablegroup.
394
0
    if (resp.tablegroups().empty()) {
395
0
      return STATUS(IllegalState,
396
0
                    Format("Unable to find tablegroup in namespace $0", namespace_name));
397
0
    }
398
399
0
    return resp.tablegroups()[0].id() + master::kTablegroupParentTableIdSuffix;
400
0
  }
401
402
  Status CreateTablegroup(Cluster* cluster,
403
                          const std::string& namespace_name,
404
0
                          const std::string& tablegroup_name) {
405
0
    auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name));
406
0
    EXPECT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", tablegroup_name));
407
0
    return Status::OK();
408
0
  }
409
410
  void WriteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table,
411
0
                     bool delete_op = false) {
412
0
    auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name()));
413
414
0
    LOG(INFO) << "Writing " << end-start << (delete_op ? " deletes" : " inserts");
415
0
    for (uint32_t i = start; i < end; i++) {
416
0
      if (delete_op) {
417
0
        EXPECT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = $2",
418
0
                                     table.table_name(), kKeyColumnName, i));
419
0
      } else {
420
        // TODO(#6582) transactions currently don't work, so don't use ON CONFLICT DO NOTHING now.
421
0
        EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2)", // ON CONFLICT DO NOTHING",
422
0
                                     table.table_name(), kKeyColumnName, i));
423
0
      }
424
0
    }
425
0
  }
426
427
  void WriteTransactionalWorkload(uint32_t start, uint32_t end, Cluster* cluster,
428
0
                                  const YBTableName& table) {
429
0
    auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name()));
430
0
    EXPECT_OK(conn.Execute("BEGIN"));
431
0
    for (uint32_t i = start; i < end; i++) {
432
0
      EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2) ON CONFLICT DO NOTHING",
433
0
                                   table.table_name(), kKeyColumnName, i));
434
0
    }
435
0
    EXPECT_OK(conn.Execute("COMMIT"));
436
0
  }
437
438
0
  void DeleteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table) {
439
0
    WriteWorkload(start, end, cluster, table, true /* delete_op */);
440
0
  }
441
442
0
  PGResultPtr ScanToStrings(const YBTableName& table_name, Cluster* cluster) {
443
0
    auto conn = EXPECT_RESULT(cluster->ConnectToDB(table_name.namespace_name()));
444
0
    auto result =
445
0
        EXPECT_RESULT(conn.FetchFormat("SELECT * FROM $0 ORDER BY key", table_name.table_name()));
446
0
    return result;
447
0
  }
448
449
  Status VerifyWrittenRecords(const YBTableName& producer_table,
450
0
                              const YBTableName& consumer_table) {
451
0
    return LoggedWaitFor([=]() -> Result<bool> {
452
0
      auto producer_results = ScanToStrings(producer_table, &producer_cluster_);
453
0
      auto consumer_results = ScanToStrings(consumer_table, &consumer_cluster_);
454
0
      if (PQntuples(producer_results.get()) != PQntuples(consumer_results.get())) {
455
0
        return false;
456
0
      }
457
0
      for (int i = 0; i < PQntuples(producer_results.get()); ++i) {
458
0
        auto prod_val = EXPECT_RESULT(ToString(producer_results.get(), i, 0));
459
0
        auto cons_val = EXPECT_RESULT(ToString(consumer_results.get(), i, 0));
460
0
        if (prod_val != cons_val) {
461
0
          return false;
462
0
        }
463
0
      }
464
0
      return true;
465
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify written records");
466
0
  }
467
468
0
  Status VerifyNumRecords(const YBTableName& table, Cluster* cluster, int expected_size) {
469
0
    return LoggedWaitFor([=]() -> Result<bool> {
470
0
      auto results = ScanToStrings(table, cluster);
471
0
      return PQntuples(results.get()) == expected_size;
472
0
    }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records");
473
0
  }
474
475
  Status DeleteTable(Cluster* cluster,
476
0
                     TableId* table_id /* = nullptr */) {
477
0
    RETURN_NOT_OK(cluster->client_->DeleteTable(*table_id));
478
479
0
    return Status::OK();
480
0
  }
481
482
  Status TruncateTable(Cluster* cluster,
483
0
                       std::vector<string> table_ids) {
484
0
    RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids));
485
0
    return Status::OK();
486
0
  }
487
};
488
489
INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCYsqlTest,
490
                        ::testing::Values(TwoDCTestParams(1, true), TwoDCTestParams(1, false),
491
                                          TwoDCTestParams(0, true), TwoDCTestParams(0, false)));
492
493
494
0
TEST_P(TwoDCYsqlTest, SetupUniverseReplication) {
495
0
  YB_SKIP_TEST_IN_TSAN();
496
0
  auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, 3, 1, false /* colocated */));
497
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
498
499
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
500
  // tables contains both producer and consumer universe tables (alternately).
501
  // Pick out just the producer tables from the list.
502
0
  producer_tables.reserve(tables.size() / 2);
503
0
  for (size_t i = 0; i < tables.size(); i += 2) {
504
0
    producer_tables.push_back(tables[i]);
505
0
  }
506
0
  ASSERT_OK(SetupUniverseReplication(
507
0
      producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables));
508
509
  // Verify that universe was setup on consumer.
510
0
  master::GetUniverseReplicationResponsePB resp;
511
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp));
512
0
  ASSERT_EQ(resp.entry().producer_id(), kUniverseId);
513
0
  ASSERT_EQ(resp.entry().tables_size(), producer_tables.size());
514
0
  for (size_t i = 0; i < producer_tables.size(); i++) {
515
0
    ASSERT_EQ(resp.entry().tables(narrow_cast<int>(i)), producer_tables[i]->id());
516
0
  }
517
518
  // Verify that CDC streams were created on producer for all tables.
519
0
  for (size_t i = 0; i < producer_tables.size(); i++) {
520
0
    master::ListCDCStreamsResponsePB stream_resp;
521
0
    ASSERT_OK(GetCDCStreamForTable(producer_tables[i]->id(), &stream_resp));
522
0
    ASSERT_EQ(stream_resp.streams_size(), 1);
523
0
    ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_tables[i]->id());
524
0
  }
525
526
0
  ASSERT_OK(DeleteUniverseReplication(kUniverseId));
527
0
}
528
529
0
TEST_P(TwoDCYsqlTest, SimpleReplication) {
530
0
  YB_SKIP_TEST_IN_TSAN();
531
0
  constexpr int kNTabletsPerTable = 1;
532
0
  std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable};
533
0
  auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1));
534
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
535
536
  // tables contains both producer and consumer universe tables (alternately).
537
  // Pick out just the producer tables from the list.
538
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
539
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
540
0
  producer_tables.reserve(tables.size() / 2);
541
0
  consumer_tables.reserve(tables.size() / 2);
542
0
  for (size_t i = 0; i < tables.size(); ++i) {
543
0
    if (i % 2 == 0) {
544
0
      producer_tables.push_back(tables[i]);
545
0
    } else {
546
0
      consumer_tables.push_back(tables[i]);
547
0
    }
548
0
  }
549
550
  // 1. Write some data.
551
0
  for (const auto& producer_table : producer_tables) {
552
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
553
0
    WriteWorkload(0, 100, &producer_cluster_, producer_table->name());
554
0
  }
555
556
  // Verify data is written on the producer.
557
0
  for (const auto& producer_table : producer_tables) {
558
0
    auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_);
559
0
    ASSERT_EQ(100, PQntuples(producer_results.get()));
560
0
    int result;
561
0
    for (int i = 0; i < 100; ++i) {
562
0
      result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0));
563
0
      ASSERT_EQ(i, result);
564
0
    }
565
0
  }
566
567
  // 2. Setup replication.
568
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
569
0
                                     kUniverseId, producer_tables));
570
571
  // 3. Verify everything is setup correctly.
572
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
573
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
574
0
      &get_universe_replication_resp));
575
0
  ASSERT_OK(CorrectlyPollingAllTablets(
576
0
      consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable)));
577
578
0
  auto data_replicated_correctly = [&](int num_results) -> Result<bool> {
579
0
    for (const auto& consumer_table : consumer_tables) {
580
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
581
0
      auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_);
582
583
0
      if (num_results != PQntuples(consumer_results.get())) {
584
0
        return false;
585
0
      }
586
0
      int result;
587
0
      for (int i = 0; i < num_results; ++i) {
588
0
        result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0));
589
0
        if (i != result) {
590
0
          return false;
591
0
        }
592
0
      }
593
0
    }
594
0
    return true;
595
0
  };
596
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); },
597
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
598
599
  // 4. Write more data.
600
0
  for (const auto& producer_table : producer_tables) {
601
0
    WriteWorkload(100, 105, &producer_cluster_, producer_table->name());
602
0
  }
603
604
  // 5. Make sure this data is also replicated now.
605
0
  ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); },
606
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
607
0
}
608
609
0
TEST_P(TwoDCYsqlTest, SetupUniverseReplicationWithProducerBootstrapId) {
610
0
  YB_SKIP_TEST_IN_TSAN();
611
0
  constexpr int kNTabletsPerTable = 1;
612
0
  std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable};
613
0
  auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3));
614
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
615
0
  auto* producer_leader_mini_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster());
616
0
  auto producer_master_proxy = std::make_shared<master::MasterReplicationProxy>(
617
0
      &producer_client()->proxy_cache(),
618
0
      producer_leader_mini_master->bound_rpc_addr());
619
620
0
  std::unique_ptr<client::YBClient> client;
621
0
  std::unique_ptr<cdc::CDCServiceProxy> producer_cdc_proxy;
622
0
  client = ASSERT_RESULT(consumer_cluster()->CreateClient());
623
0
  producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(
624
0
      &client->proxy_cache(),
625
0
      HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr()));
626
627
  // tables contains both producer and consumer universe tables (alternately).
628
  // Pick out just the producer tables from the list.
629
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
630
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
631
0
  producer_tables.reserve(tables.size() / 2);
632
0
  consumer_tables.reserve(tables.size() / 2);
633
0
  for (size_t i = 0; i < tables.size(); ++i) {
634
0
    if (i % 2 == 0) {
635
0
      producer_tables.push_back(tables[i]);
636
0
    } else {
637
0
      consumer_tables.push_back(tables[i]);
638
0
    }
639
0
  }
640
641
  // 1. Write some data so that we can verify that only new records get replicated.
642
0
  for (const auto& producer_table : producer_tables) {
643
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
644
0
    WriteWorkload(0, 100, &producer_cluster_, producer_table->name());
645
0
  }
646
647
0
  SleepFor(MonoDelta::FromSeconds(10));
648
0
  cdc::BootstrapProducerRequestPB req;
649
0
  cdc::BootstrapProducerResponsePB resp;
650
651
0
  for (const auto& producer_table : producer_tables) {
652
0
    req.add_table_ids(producer_table->id());
653
0
  }
654
655
0
  rpc::RpcController rpc;
656
0
  ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &resp, &rpc));
657
0
  ASSERT_FALSE(resp.has_error());
658
659
0
  ASSERT_EQ(resp.cdc_bootstrap_ids().size(), producer_tables.size());
660
661
0
  int table_idx = 0;
662
0
  for (const auto& bootstrap_id : resp.cdc_bootstrap_ids()) {
663
0
    LOG(INFO) << "Got bootstrap id " << bootstrap_id
664
0
              << " for table " << producer_tables[table_idx++]->name().table_name();
665
0
  }
666
667
0
  std::unordered_map<std::string, int> tablet_bootstraps;
668
669
  // Verify that for each of the table's tablets, a new row in cdc_state table with the returned
670
  // id was inserted.
671
0
  client::TableHandle table;
672
0
  client::YBTableName cdc_state_table(
673
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
674
0
  ASSERT_OK(table.Open(cdc_state_table, producer_client()));
675
676
  // 2 tables with 8 tablets each.
677
0
  ASSERT_EQ(tables_vector.size() * kNTabletsPerTable, boost::size(client::TableRange(table)));
678
0
  int nrows = 0;
679
0
  for (const auto& row : client::TableRange(table)) {
680
0
    nrows++;
681
0
    string stream_id = row.column(0).string_value();
682
0
    tablet_bootstraps[stream_id]++;
683
684
0
    string checkpoint = row.column(2).string_value();
685
0
    auto s = OpId::FromString(checkpoint);
686
0
    ASSERT_OK(s);
687
0
    OpId op_id = *s;
688
0
    ASSERT_GT(op_id.index, 0);
689
690
0
    LOG(INFO) << "Bootstrap id " << stream_id
691
0
              << " for tablet " << row.column(1).string_value();
692
0
  }
693
694
0
  ASSERT_EQ(tablet_bootstraps.size(), producer_tables.size());
695
  // Check that each bootstrap id has 8 tablets.
696
0
  for (const auto& e : tablet_bootstraps) {
697
0
    ASSERT_EQ(e.second, kNTabletsPerTable);
698
0
  }
699
700
  // Map table -> bootstrap_id. We will need when setting up replication.
701
0
  std::unordered_map<TableId, std::string> table_bootstrap_ids;
702
0
  for (int i = 0; i < resp.cdc_bootstrap_ids_size(); i++) {
703
0
    table_bootstrap_ids[req.table_ids(i)] = resp.cdc_bootstrap_ids(i);
704
0
  }
705
706
  // 2. Setup replication.
707
0
  master::SetupUniverseReplicationRequestPB setup_universe_req;
708
0
  master::SetupUniverseReplicationResponsePB setup_universe_resp;
709
0
  setup_universe_req.set_producer_id(kUniverseId);
710
0
  string master_addr = producer_cluster()->GetMasterAddresses();
711
0
  auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
712
0
  HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
713
714
0
  setup_universe_req.mutable_producer_table_ids()->Reserve(
715
0
      narrow_cast<int>(producer_tables.size()));
716
0
  for (const auto& producer_table : producer_tables) {
717
0
    setup_universe_req.add_producer_table_ids(producer_table->id());
718
0
    const auto& iter = table_bootstrap_ids.find(producer_table->id());
719
0
    ASSERT_NE(iter, table_bootstrap_ids.end());
720
0
    setup_universe_req.add_producer_bootstrap_ids(iter->second);
721
0
  }
722
723
0
  auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
724
0
  master::MasterReplicationProxy master_proxy(
725
0
      &consumer_client()->proxy_cache(),
726
0
      consumer_leader_mini_master->bound_rpc_addr());
727
728
0
  rpc.Reset();
729
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
730
0
  ASSERT_OK(master_proxy.SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc));
731
0
  ASSERT_FALSE(setup_universe_resp.has_error());
732
733
  // 3. Verify everything is setup correctly.
734
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
735
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
736
0
      &get_universe_replication_resp));
737
0
  ASSERT_OK(CorrectlyPollingAllTablets(
738
0
      consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable)));
739
740
  // 4. Write more data.
741
0
  for (const auto& producer_table : producer_tables) {
742
0
    WriteWorkload(1000, 1005, &producer_cluster_, producer_table->name());
743
0
  }
744
745
  // 5. Verify that only new writes get replicated to consumer since we bootstrapped the producer
746
  // after we had already written some data, therefore the old data (whatever was there before we
747
  // bootstrapped the producer) should not be replicated.
748
0
  auto data_replicated_correctly = [&]() -> Result<bool> {
749
0
    for (const auto& consumer_table : consumer_tables) {
750
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
751
0
      auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_);
752
753
0
      if (5 != PQntuples(consumer_results.get())) {
754
0
        return false;
755
0
      }
756
0
      int result;
757
0
      for (int i = 0; i < 5; ++i) {
758
0
        result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0));
759
0
        if ((1000 + i) != result) {
760
0
          return false;
761
0
        }
762
0
      }
763
0
    }
764
0
    return true;
765
0
  };
766
767
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(); },
768
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
769
0
}
770
771
0
TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) {
772
0
  YB_SKIP_TEST_IN_TSAN();
773
0
  constexpr int kNTabletsPerColocatedTable = 1;
774
0
  constexpr int kNTabletsPerTable = 3;
775
0
  std::vector<uint32_t> tables_vector = {kNTabletsPerColocatedTable, kNTabletsPerColocatedTable};
776
  // Create two colocated tables on each cluster.
777
0
  auto colocated_tables =
778
0
      ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3, 1, true /* colocated */));
779
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
780
781
  // Also create an additional non-colocated table in each database.
782
0
  auto non_colocated_table = ASSERT_RESULT(CreateTable(&producer_cluster_,
783
0
                                                       kNamespaceName,
784
0
                                                       "test_table_2",
785
0
                                                       boost::none /* tablegroup */,
786
0
                                                       kNTabletsPerTable,
787
0
                                                       false /* colocated */));
788
0
  std::shared_ptr<client::YBTable> non_colocated_producer_table;
789
0
  ASSERT_OK(producer_client()->OpenTable(non_colocated_table, &non_colocated_producer_table));
790
0
  non_colocated_table = ASSERT_RESULT(CreateTable(&consumer_cluster_,
791
0
                                                  kNamespaceName,
792
0
                                                  "test_table_2",
793
0
                                                  boost::none /* tablegroup */,
794
0
                                                  kNTabletsPerTable,
795
0
                                                  false /* colocated */));
796
0
  std::shared_ptr<client::YBTable> non_colocated_consumer_table;
797
0
  ASSERT_OK(consumer_client()->OpenTable(non_colocated_table, &non_colocated_consumer_table));
798
799
  // colocated_tables contains both producer and consumer universe tables (alternately).
800
  // Pick out just the producer tables from the list.
801
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
802
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
803
0
  std::vector<std::shared_ptr<client::YBTable>> colocated_producer_tables;
804
0
  std::vector<std::shared_ptr<client::YBTable>> colocated_consumer_tables;
805
0
  producer_tables.reserve(colocated_tables.size() / 2 + 1);
806
0
  consumer_tables.reserve(colocated_tables.size() / 2 + 1);
807
0
  colocated_producer_tables.reserve(colocated_tables.size() / 2);
808
0
  colocated_consumer_tables.reserve(colocated_tables.size() / 2);
809
0
  for (size_t i = 0; i < colocated_tables.size(); ++i) {
810
0
    if (i % 2 == 0) {
811
0
      producer_tables.push_back(colocated_tables[i]);
812
0
      colocated_producer_tables.push_back(colocated_tables[i]);
813
0
    } else {
814
0
      consumer_tables.push_back(colocated_tables[i]);
815
0
      colocated_consumer_tables.push_back(colocated_tables[i]);
816
0
    }
817
0
  }
818
0
  producer_tables.push_back(non_colocated_producer_table);
819
0
  consumer_tables.push_back(non_colocated_consumer_table);
820
821
  // 1. Write some data to all tables.
822
0
  for (const auto& producer_table : producer_tables) {
823
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
824
0
    WriteWorkload(0, 100, &producer_cluster_, producer_table->name());
825
0
  }
826
827
  // 2. Setup replication for only the colocated tables.
828
  // Get the producer namespace id, so we can construct the colocated parent table id.
829
0
  GetNamespaceInfoResponsePB ns_resp;
830
0
  ASSERT_OK(producer_client()->GetNamespaceInfo("", kNamespaceName, YQL_DATABASE_PGSQL, &ns_resp));
831
832
0
  rpc::RpcController rpc;
833
0
  master::SetupUniverseReplicationRequestPB setup_universe_req;
834
0
  master::SetupUniverseReplicationResponsePB setup_universe_resp;
835
0
  setup_universe_req.set_producer_id(kUniverseId);
836
0
  string master_addr = producer_cluster()->GetMasterAddresses();
837
0
  auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
838
0
  HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
839
  // Only need to add the colocated parent table id.
840
0
  setup_universe_req.mutable_producer_table_ids()->Reserve(1);
841
0
  setup_universe_req.add_producer_table_ids(
842
0
      ns_resp.namespace_().id() + master::kColocatedParentTableIdSuffix);
843
0
  auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
844
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
845
0
      &consumer_client()->proxy_cache(),
846
0
      consumer_leader_mini_master->bound_rpc_addr());
847
848
0
  rpc.Reset();
849
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
850
0
  ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc));
851
0
  ASSERT_FALSE(setup_universe_resp.has_error());
852
853
  // 3. Verify everything is setup correctly.
854
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
855
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
856
0
      &get_universe_replication_resp));
857
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNTabletsPerColocatedTable));
858
859
  // 4. Check that colocated tables are being replicated.
860
0
  auto data_replicated_correctly = [&](int num_results, bool onlyColocated) -> Result<bool> {
861
0
    auto &tables = onlyColocated ? colocated_consumer_tables : consumer_tables;
862
0
    for (const auto& consumer_table : tables) {
863
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
864
0
      auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_);
865
866
0
      if (num_results != PQntuples(consumer_results.get())) {
867
0
        return false;
868
0
      }
869
0
      int result;
870
0
      for (int i = 0; i < num_results; ++i) {
871
0
        result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0));
872
0
        if (i != result) {
873
0
          return false;
874
0
        }
875
0
      }
876
0
    }
877
0
    return true;
878
0
  };
879
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100, true); },
880
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
881
  // Ensure that the non colocated table is not replicated.
882
0
  auto non_coloc_results = ScanToStrings(non_colocated_consumer_table->name(), &consumer_cluster_);
883
0
  ASSERT_EQ(0, PQntuples(non_coloc_results.get()));
884
885
  // 5. Add the regular table to replication.
886
  // Prepare and send AlterUniverseReplication command.
887
0
  master::AlterUniverseReplicationRequestPB alter_req;
888
0
  master::AlterUniverseReplicationResponsePB alter_resp;
889
0
  alter_req.set_producer_id(kUniverseId);
890
0
  alter_req.add_producer_table_ids_to_add(non_colocated_producer_table->id());
891
892
0
  rpc.Reset();
893
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
894
0
  ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
895
0
  ASSERT_FALSE(alter_resp.has_error());
896
  // Wait until we have 2 tables (colocated tablet + regular table) logged.
897
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
898
0
    master::GetUniverseReplicationResponsePB tmp_resp;
899
0
    return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
900
0
        kUniverseId, &tmp_resp).ok() &&
901
0
        tmp_resp.entry().tables_size() == 2;
902
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));
903
904
0
  ASSERT_OK(CorrectlyPollingAllTablets(
905
0
      consumer_cluster(), kNTabletsPerColocatedTable + kNTabletsPerTable));
906
  // Check that all data is replicated for the new table as well.
907
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100, false); },
908
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
909
910
  // 6. Add additional data to all tables
911
0
  for (const auto& producer_table : producer_tables) {
912
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
913
0
    WriteWorkload(100, 150, &producer_cluster_, producer_table->name());
914
0
  }
915
916
  // 7. Verify all tables are properly replicated.
917
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(150, false); },
918
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
919
0
}
920
921
0
TEST_P(TwoDCYsqlTest, ColocatedDatabaseDifferentTableOids) {
922
0
  YB_SKIP_TEST_IN_TSAN();
923
0
  auto colocated_tables = ASSERT_RESULT(SetUpWithParams({}, {}, 3, 1, true /* colocated */));
924
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
925
926
  // Create two tables with different table oids.
927
0
  auto conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(kNamespaceName));
928
0
  auto table_info = ASSERT_RESULT(CreateTable(&producer_cluster_,
929
0
                                              kNamespaceName,
930
0
                                              "test_table_0",
931
0
                                              boost::none /* tablegroup */,
932
0
                                              1 /* num_tablets */,
933
0
                                              true /* colocated */,
934
0
                                              123456 /* table_oid */));
935
0
  ASSERT_RESULT(CreateTable(&consumer_cluster_,
936
0
                            kNamespaceName,
937
0
                            "test_table_0",
938
0
                            boost::none /* tablegroup */,
939
0
                            1 /* num_tablets */,
940
0
                            true /* colocated */,
941
0
                            123457 /* table_oid */));
942
0
  std::shared_ptr<client::YBTable> producer_table;
943
0
  ASSERT_OK(producer_client()->OpenTable(table_info, &producer_table));
944
945
  // Try to setup replication, should fail on schema validation due to different table oids.
946
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
947
0
                                     kUniverseId, {producer_table}));
948
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
949
0
  ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
950
0
      &get_universe_replication_resp));
951
0
}
952
953
0
TEST_P(TwoDCYsqlTest, TablegroupReplication) {
954
0
  YB_SKIP_TEST_IN_TSAN();
955
956
0
  std::vector<uint32_t> tables_vector = {1, 1};
957
0
  boost::optional<std::string> kTablegroupName("mytablegroup");
958
0
  auto tables = ASSERT_RESULT(
959
0
      SetUpWithParams(tables_vector, tables_vector, 1, 1, false /* colocated */, kTablegroupName));
960
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
961
962
  // tables contains both producer and consumer universe tables (alternately).
963
  // Pick out just the producer tables from the list.
964
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
965
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
966
0
  producer_tables.reserve(tables.size() / 2);
967
0
  consumer_tables.reserve(tables.size() / 2);
968
0
  for (size_t i = 0; i < tables.size(); ++i) {
969
0
    if (i % 2 == 0) {
970
0
      producer_tables.push_back(tables[i]);
971
0
    } else {
972
0
      consumer_tables.push_back(tables[i]);
973
0
    }
974
0
  }
975
976
  // 1. Write some data to all tables.
977
0
  for (const auto& producer_table : producer_tables) {
978
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
979
0
    WriteWorkload(0, 100, &producer_cluster_, producer_table->name());
980
0
  }
981
982
  // 2. Setup replication for the tablegroup.
983
0
  auto tablegroup_id = ASSERT_RESULT(GetTablegroup(&producer_cluster_, kNamespaceName));
984
0
  LOG(INFO) << "Tablegroup id to replicate: " << tablegroup_id;
985
986
0
  rpc::RpcController rpc;
987
0
  master::SetupUniverseReplicationRequestPB setup_universe_req;
988
0
  master::SetupUniverseReplicationResponsePB setup_universe_resp;
989
0
  setup_universe_req.set_producer_id(kUniverseId);
990
0
  string master_addr = producer_cluster()->GetMasterAddresses();
991
0
  auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
992
0
  HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
993
0
  setup_universe_req.mutable_producer_table_ids()->Reserve(1);
994
0
  setup_universe_req.add_producer_table_ids(tablegroup_id);
995
996
0
  auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
997
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
998
0
      &consumer_client()->proxy_cache(),
999
0
      consumer_leader_mini_master->bound_rpc_addr());
1000
1001
0
  rpc.Reset();
1002
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1003
0
  ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc));
1004
0
  ASSERT_FALSE(setup_universe_resp.has_error());
1005
1006
  // 3. Verify everything is setup correctly.
1007
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
1008
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
1009
0
      &get_universe_replication_resp));
1010
0
  ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));
1011
1012
  // 4. Check that tables are being replicated.
1013
0
  auto data_replicated_correctly = [&](int num_results) -> Result<bool> {
1014
0
    for (const auto& consumer_table : consumer_tables) {
1015
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
1016
0
      auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_);
1017
1018
0
      if (num_results != PQntuples(consumer_results.get())) {
1019
0
        return false;
1020
0
      }
1021
0
      int result;
1022
0
      for (int i = 0; i < num_results; ++i) {
1023
0
        result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0));
1024
0
        if (i != result) {
1025
0
          return false;
1026
0
        }
1027
0
      }
1028
0
    }
1029
0
    return true;
1030
0
  };
1031
1032
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); },
1033
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
1034
1035
  // 5. Write more data.
1036
0
  for (const auto& producer_table : producer_tables) {
1037
0
    WriteWorkload(100, 105, &producer_cluster_, producer_table->name());
1038
0
  }
1039
1040
0
  ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); },
1041
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
1042
0
}
1043
1044
0
TEST_P(TwoDCYsqlTest, TablegroupReplicationMismatch) {
1045
0
  YB_SKIP_TEST_IN_TSAN();
1046
0
  ASSERT_OK(Initialize(1 /* replication_factor */));
1047
1048
0
  boost::optional<std::string> tablegroup_name("mytablegroup");
1049
1050
0
  ASSERT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, false /* colocated */));
1051
0
  ASSERT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, false /* colocated */));
1052
0
  ASSERT_OK(CreateTablegroup(&producer_cluster_, kNamespaceName, tablegroup_name.get()));
1053
0
  ASSERT_OK(CreateTablegroup(&consumer_cluster_, kNamespaceName, tablegroup_name.get()));
1054
1055
  // We intentionally set up so that the number of producer and consumer tables don't match.
1056
  // The replication should fail during validation.
1057
0
  const uint32_t num_producer_tables = 2;
1058
0
  const uint32_t num_consumer_tables = 3;
1059
0
  std::vector<YBTableName> tables;
1060
0
  for (uint32_t i = 0; i < num_producer_tables; i++) {
1061
0
    ASSERT_OK(CreateTable(i, 1 /* num_tablets */, &producer_cluster_,
1062
0
                          &tables, tablegroup_name, false /* colocated */));
1063
0
  }
1064
0
  for (uint32_t i = 0; i < num_consumer_tables; i++) {
1065
0
    ASSERT_OK(CreateTable(i, 1 /* num_tablets */, &consumer_cluster_,
1066
0
                          &tables, tablegroup_name, false /* colocated */));
1067
0
  }
1068
1069
0
  auto tablegroup_id = ASSERT_RESULT(GetTablegroup(&producer_cluster_, kNamespaceName));
1070
1071
  // Try to set up replication.
1072
0
  rpc::RpcController rpc;
1073
0
  master::SetupUniverseReplicationRequestPB setup_universe_req;
1074
0
  master::SetupUniverseReplicationResponsePB setup_universe_resp;
1075
0
  setup_universe_req.set_producer_id(kUniverseId);
1076
0
  string master_addr = producer_cluster()->GetMasterAddresses();
1077
0
  auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
1078
0
  HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses());
1079
0
  setup_universe_req.mutable_producer_table_ids()->Reserve(1);
1080
0
  setup_universe_req.add_producer_table_ids(tablegroup_id);
1081
1082
0
  auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
1083
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
1084
0
      &consumer_client()->proxy_cache(),
1085
0
      consumer_leader_mini_master->bound_rpc_addr());
1086
1087
0
  rpc.Reset();
1088
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
1089
0
  ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc));
1090
0
  ASSERT_FALSE(setup_universe_resp.has_error());
1091
1092
  // The schema validation should fail.
1093
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
1094
0
  ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
1095
0
      &get_universe_replication_resp));
1096
0
}
1097
1098
// TODO adapt rest of twodc-test.cc tests.
1099
1100
0
TEST_P(TwoDCYsqlTest, DeleteTableChecks) {
1101
0
  YB_SKIP_TEST_IN_TSAN();
1102
0
  constexpr int kNTabletsPerTable = 1;
1103
0
  std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable};
1104
0
  auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1));
1105
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
1106
1107
  // tables contains both producer and consumer universe tables (alternately).
1108
  // Pick out just the producer tables from the list.
1109
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1110
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
1111
0
  producer_tables.reserve(tables.size() / 2);
1112
0
  consumer_tables.reserve(tables.size() / 2);
1113
0
  for (size_t i = 0; i < tables.size(); ++i) {
1114
0
    if (i % 2 == 0) {
1115
0
      producer_tables.push_back(tables[i]);
1116
0
    } else {
1117
0
      consumer_tables.push_back(tables[i]);
1118
0
    }
1119
0
  }
1120
1121
  // 1. Write some data.
1122
0
  for (const auto& producer_table : producer_tables) {
1123
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
1124
0
    WriteWorkload(0, 100, &producer_cluster_, producer_table->name());
1125
0
  }
1126
1127
  // Verify data is written on the producer.
1128
0
  for (const auto& producer_table : producer_tables) {
1129
0
    auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_);
1130
0
    ASSERT_EQ(100, PQntuples(producer_results.get()));
1131
0
    int result;
1132
0
    for (int i = 0; i < 100; ++i) {
1133
0
      result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0));
1134
0
      ASSERT_EQ(i, result);
1135
0
    }
1136
0
  }
1137
1138
  // 2. Setup replication.
1139
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
1140
0
                                     kUniverseId, producer_tables));
1141
1142
  // 3. Verify everything is setup correctly.
1143
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
1144
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
1145
0
      &get_universe_replication_resp));
1146
0
  ASSERT_OK(CorrectlyPollingAllTablets(
1147
0
      consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable)));
1148
1149
0
  auto data_replicated_correctly = [&](int num_results) -> Result<bool> {
1150
0
    for (const auto& consumer_table : consumer_tables) {
1151
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
1152
0
      auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_);
1153
1154
0
      if (num_results != PQntuples(consumer_results.get())) {
1155
0
        return false;
1156
0
      }
1157
0
      int result;
1158
0
      for (int i = 0; i < num_results; ++i) {
1159
0
        result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0));
1160
0
        if (i != result) {
1161
0
          return false;
1162
0
        }
1163
0
      }
1164
0
    }
1165
0
    return true;
1166
0
  };
1167
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); },
1168
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
1169
1170
  // Attempt to destroy the producer and consumer tables.
1171
0
  string producer_table_name = producer_tables[0]->name().ToString();
1172
0
  string producer_table_id = producer_tables[0]->id();
1173
0
  string consumer_table_name = consumer_tables[0]->name().ToString();
1174
0
  string consumer_table_id = consumer_tables[0]->id();
1175
0
  ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id));
1176
0
  ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id));
1177
1178
0
  FLAGS_enable_delete_truncate_xcluster_replicated_table = true;
1179
0
  ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id));
1180
0
  ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id));
1181
0
}
1182
1183
0
TEST_P(TwoDCYsqlTest, TruncateTableChecks) {
1184
0
  YB_SKIP_TEST_IN_TSAN();
1185
0
  constexpr int kNTabletsPerTable = 1;
1186
0
  std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable};
1187
0
  auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1));
1188
0
  const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_));
1189
1190
  // tables contains both producer and consumer universe tables (alternately).
1191
  // Pick out just the producer tables from the list.
1192
0
  std::vector<std::shared_ptr<client::YBTable>> producer_tables;
1193
0
  std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
1194
0
  producer_tables.reserve(tables.size() / 2);
1195
0
  consumer_tables.reserve(tables.size() / 2);
1196
0
  for (size_t i = 0; i < tables.size(); ++i) {
1197
0
    if (i % 2 == 0) {
1198
0
      producer_tables.push_back(tables[i]);
1199
0
    } else {
1200
0
      consumer_tables.push_back(tables[i]);
1201
0
    }
1202
0
  }
1203
1204
  // 1. Write some data.
1205
0
  for (const auto& producer_table : producer_tables) {
1206
0
    LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
1207
0
    WriteWorkload(0, 100, &producer_cluster_, producer_table->name());
1208
0
  }
1209
1210
  // Verify data is written on the producer.
1211
0
  for (const auto& producer_table : producer_tables) {
1212
0
    auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_);
1213
0
    ASSERT_EQ(100, PQntuples(producer_results.get()));
1214
0
    int result;
1215
0
    for (int i = 0; i < 100; ++i) {
1216
0
      result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0));
1217
0
      ASSERT_EQ(i, result);
1218
0
    }
1219
0
  }
1220
1221
  // 2. Setup replication.
1222
0
  ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
1223
0
                                     kUniverseId, producer_tables));
1224
1225
  // 3. Verify everything is setup correctly.
1226
0
  master::GetUniverseReplicationResponsePB get_universe_replication_resp;
1227
0
  ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
1228
0
      &get_universe_replication_resp));
1229
0
  ASSERT_OK(CorrectlyPollingAllTablets(
1230
0
      consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable)));
1231
1232
0
  auto data_replicated_correctly = [&](int num_results) -> Result<bool> {
1233
0
    for (const auto& consumer_table : consumer_tables) {
1234
0
      LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
1235
0
      auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_);
1236
1237
0
      if (num_results != PQntuples(consumer_results.get())) {
1238
0
        return false;
1239
0
      }
1240
0
      int result;
1241
0
      for (int i = 0; i < num_results; ++i) {
1242
0
        result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0));
1243
0
        if (i != result) {
1244
0
          return false;
1245
0
        }
1246
0
      }
1247
0
    }
1248
0
    return true;
1249
0
  };
1250
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); },
1251
0
                    MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));
1252
1253
  // Attempt to Truncate the producer and consumer tables.
1254
0
  string producer_table_id = producer_tables[0]->id();
1255
0
  string consumer_table_id = consumer_tables[0]->id();
1256
0
  ASSERT_NOK(TruncateTable(&producer_cluster_, {producer_table_id}));
1257
0
  ASSERT_NOK(TruncateTable(&consumer_cluster_, {consumer_table_id}));
1258
1259
0
  FLAGS_enable_delete_truncate_xcluster_replicated_table = true;
1260
0
  ASSERT_OK(TruncateTable(&producer_cluster_, {producer_table_id}));
1261
0
  ASSERT_OK(TruncateTable(&consumer_cluster_, {consumer_table_id}));
1262
0
}
1263
1264
} // namespace enterprise
1265
} // namespace yb