/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc_ysql-test.cc
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Copyright (c) YugaByte, Inc. | 
| 2 |  | // | 
| 3 |  | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | 
| 4 |  | // in compliance with the License.  You may obtain a copy of the License at | 
| 5 |  | // | 
| 6 |  | // http://www.apache.org/licenses/LICENSE-2.0 | 
| 7 |  | // | 
| 8 |  | // Unless required by applicable law or agreed to in writing, software distributed under the License | 
| 9 |  | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | 
| 10 |  | // or implied.  See the License for the specific language governing permissions and limitations | 
| 11 |  | // under the License. | 
| 12 |  | // | 
| 13 |  |  | 
| 14 |  | #include <algorithm> | 
| 15 |  | #include <map> | 
| 16 |  | #include <string> | 
| 17 |  | #include <utility> | 
| 18 |  | #include <chrono> | 
| 19 |  | #include <boost/assign.hpp> | 
| 20 |  | #include <gflags/gflags.h> | 
| 21 |  | #include <gtest/gtest.h> | 
| 22 |  |  | 
| 23 |  | #include "yb/common/common.pb.h" | 
| 24 |  | #include "yb/common/entity_ids.h" | 
| 25 |  | #include "yb/common/ql_value.h" | 
| 26 |  | #include "yb/common/schema.h" | 
| 27 |  | #include "yb/common/wire_protocol.h" | 
| 28 |  |  | 
| 29 |  | #include "yb/cdc/cdc_service.h" | 
| 30 |  | #include "yb/cdc/cdc_service.pb.h" | 
| 31 |  | #include "yb/cdc/cdc_service.proxy.h" | 
| 32 |  | #include "yb/client/client.h" | 
| 33 |  | #include "yb/client/client-test-util.h" | 
| 34 |  | #include "yb/client/meta_cache.h" | 
| 35 |  | #include "yb/client/schema.h" | 
| 36 |  | #include "yb/client/session.h" | 
| 37 |  | #include "yb/client/table.h" | 
| 38 |  | #include "yb/client/table_alterer.h" | 
| 39 |  | #include "yb/client/table_creator.h" | 
| 40 |  | #include "yb/client/table_handle.h" | 
| 41 |  | #include "yb/client/transaction.h" | 
| 42 |  | #include "yb/client/yb_op.h" | 
| 43 |  |  | 
| 44 |  | #include "yb/gutil/stl_util.h" | 
| 45 |  | #include "yb/gutil/strings/join.h" | 
| 46 |  | #include "yb/gutil/strings/substitute.h" | 
| 47 |  | #include "yb/integration-tests/cdc_test_util.h" | 
| 48 |  | #include "yb/integration-tests/mini_cluster.h" | 
| 49 |  | #include "yb/integration-tests/twodc_test_base.h" | 
| 50 |  | #include "yb/integration-tests/yb_mini_cluster_test_base.h" | 
| 51 |  |  | 
| 52 |  | #include "yb/master/cdc_consumer_registry_service.h" | 
| 53 |  | #include "yb/master/mini_master.h" | 
| 54 |  | #include "yb/master/master.h" | 
| 55 |  | #include "yb/master/master_cluster.proxy.h" | 
| 56 |  | #include "yb/master/master_ddl.proxy.h" | 
| 57 |  | #include "yb/master/master_replication.proxy.h" | 
| 58 |  | #include "yb/master/master-test-util.h" | 
| 59 |  | #include "yb/master/sys_catalog_initialization.h" | 
| 60 |  |  | 
| 61 |  | #include "yb/rpc/rpc_controller.h" | 
| 62 |  | #include "yb/tablet/tablet.h" | 
| 63 |  | #include "yb/tablet/tablet_peer.h" | 
| 64 |  | #include "yb/tserver/mini_tablet_server.h" | 
| 65 |  | #include "yb/tserver/tablet_server.h" | 
| 66 |  | #include "yb/tserver/ts_tablet_manager.h" | 
| 67 |  |  | 
| 68 |  | #include "yb/tserver/cdc_consumer.h" | 
| 69 |  | #include "yb/util/atomic.h" | 
| 70 |  | #include "yb/util/faststring.h" | 
| 71 |  | #include "yb/util/format.h" | 
| 72 |  | #include "yb/util/monotime.h" | 
| 73 |  | #include "yb/util/random.h" | 
| 74 |  | #include "yb/util/random_util.h" | 
| 75 |  | #include "yb/util/result.h" | 
| 76 |  | #include "yb/util/stopwatch.h" | 
| 77 |  | #include "yb/util/test_util.h" | 
| 78 |  | #include "yb/util/test_macros.h" | 
| 79 |  | #include "yb/yql/pgwrapper/libpq_utils.h" | 
| 80 |  | #include "yb/yql/pgwrapper/pg_wrapper.h" | 
| 81 |  |  | 
| 82 |  | DECLARE_int32(replication_factor); | 
| 83 |  | DECLARE_int32(cdc_max_apply_batch_num_records); | 
| 84 |  | DECLARE_int32(client_read_write_timeout_ms); | 
| 85 |  | DECLARE_int32(pgsql_proxy_webserver_port); | 
| 86 |  | DECLARE_bool(enable_ysql); | 
| 87 |  | DECLARE_bool(hide_pg_catalog_table_creation_logs); | 
| 88 |  | DECLARE_bool(master_auto_run_initdb); | 
| 89 |  | DECLARE_int32(pggate_rpc_timeout_secs); | 
| 90 |  | DECLARE_bool(enable_delete_truncate_xcluster_replicated_table); | 
| 91 |  |  | 
| 92 |  | namespace yb { | 
| 93 |  |  | 
| 94 |  | using client::YBClient; | 
| 95 |  | using client::YBClientBuilder; | 
| 96 |  | using client::YBColumnSchema; | 
| 97 |  | using client::YBError; | 
| 98 |  | using client::YBSchema; | 
| 99 |  | using client::YBSchemaBuilder; | 
| 100 |  | using client::YBSession; | 
| 101 |  | using client::YBTable; | 
| 102 |  | using client::YBTableAlterer; | 
| 103 |  | using client::YBTableCreator; | 
| 104 |  | using client::YBTableType; | 
| 105 |  | using client::YBTableName; | 
| 106 |  | using master::GetNamespaceInfoResponsePB; | 
| 107 |  | using master::MiniMaster; | 
| 108 |  | using tserver::MiniTabletServer; | 
| 109 |  | using tserver::enterprise::CDCConsumer; | 
| 110 |  |  | 
| 111 |  | using pgwrapper::ToString; | 
| 112 |  | using pgwrapper::GetInt32; | 
| 113 |  | using pgwrapper::PGConn; | 
| 114 |  | using pgwrapper::PGResultPtr; | 
| 115 |  | using pgwrapper::PgSupervisor; | 
| 116 |  |  | 
| 117 |  | namespace enterprise { | 
| 118 |  |  | 
| 119 |  | constexpr static const char* const kKeyColumnName = "key"; | 
| 120 |  |  | 
| 121 |  | class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface<TwoDCTestParams> { | 
| 122 |  |  public: | 
| 123 | 0 |   Status Initialize(uint32_t replication_factor, uint32_t num_masters = 1) { | 
| 124 | 0 |     master::SetDefaultInitialSysCatalogSnapshotFlags(); | 
| 125 | 0 |     TwoDCTestBase::SetUp(); | 
| 126 | 0 |     FLAGS_enable_ysql = true; | 
| 127 | 0 |     FLAGS_master_auto_run_initdb = true; | 
| 128 | 0 |     FLAGS_hide_pg_catalog_table_creation_logs = true; | 
| 129 | 0 |     FLAGS_pggate_rpc_timeout_secs = 120; | 
| 130 | 0 |     FLAGS_cdc_max_apply_batch_num_records = GetParam().batch_size; | 
| 131 | 0 |     FLAGS_cdc_enable_replicate_intents = GetParam().enable_replicate_intents; | 
| 132 |  | 
 | 
| 133 | 0 |     MiniClusterOptions opts; | 
| 134 | 0 |     opts.num_tablet_servers = replication_factor; | 
| 135 | 0 |     opts.num_masters = num_masters; | 
| 136 | 0 |     FLAGS_replication_factor = replication_factor; | 
| 137 | 0 |     opts.cluster_id = "producer"; | 
| 138 | 0 |     producer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); | 
| 139 | 0 |     RETURN_NOT_OK(producer_cluster()->StartSync()); | 
| 140 | 0 |     RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor)); | 
| 141 | 0 |     RETURN_NOT_OK(WaitForInitDb(producer_cluster())); | 
| 142 |  | 
 | 
| 143 | 0 |     opts.cluster_id = "consumer"; | 
| 144 | 0 |     consumer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); | 
| 145 | 0 |     RETURN_NOT_OK(consumer_cluster()->StartSync()); | 
| 146 | 0 |     RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor)); | 
| 147 | 0 |     RETURN_NOT_OK(WaitForInitDb(consumer_cluster())); | 
| 148 |  | 
 | 
| 149 | 0 |     producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient()); | 
| 150 | 0 |     consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient()); | 
| 151 |  | 
 | 
| 152 | 0 |     RETURN_NOT_OK(InitPostgres(&producer_cluster_)); | 
| 153 | 0 |     RETURN_NOT_OK(InitPostgres(&consumer_cluster_)); | 
| 154 |  | 
 | 
| 155 | 0 |     return Status::OK(); | 
| 156 | 0 |   } | 
| 157 |  |  | 
| 158 |  |   Result<std::vector<std::shared_ptr<client::YBTable>>> | 
| 159 |  |       SetUpWithParams(std::vector<uint32_t> num_consumer_tablets, | 
| 160 |  |                       std::vector<uint32_t> num_producer_tablets, | 
| 161 |  |                       uint32_t replication_factor, | 
| 162 |  |                       uint32_t num_masters = 1, | 
| 163 |  |                       bool colocated = false, | 
| 164 | 0 |                       boost::optional<std::string> tablegroup_name = boost::none) { | 
| 165 | 0 |     RETURN_NOT_OK(Initialize(replication_factor, num_masters)); | 
| 166 |  | 
 | 
| 167 | 0 |     if (num_consumer_tablets.size() != num_producer_tablets.size()) { | 
| 168 | 0 |       return STATUS(IllegalState, | 
| 169 | 0 |                     Format("Num consumer tables: $0 num producer tables: $1 must be equal.", | 
| 170 | 0 |                            num_consumer_tablets.size(), num_producer_tablets.size())); | 
| 171 | 0 |     } | 
| 172 |  |  | 
| 173 | 0 |     RETURN_NOT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, colocated)); | 
| 174 | 0 |     RETURN_NOT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, colocated)); | 
| 175 |  | 
 | 
| 176 | 0 |     if (tablegroup_name.has_value()) { | 
| 177 | 0 |       RETURN_NOT_OK(CreateTablegroup(&producer_cluster_, kNamespaceName, tablegroup_name.get())); | 
| 178 | 0 |       RETURN_NOT_OK(CreateTablegroup(&consumer_cluster_, kNamespaceName, tablegroup_name.get())); | 
| 179 | 0 |     } | 
| 180 |  | 
 | 
| 181 | 0 |     std::vector<YBTableName> tables; | 
| 182 | 0 |     std::vector<std::shared_ptr<client::YBTable>> yb_tables; | 
| 183 | 0 |     for (uint32_t i = 0; i < num_consumer_tablets.size(); i++) { | 
| 184 | 0 |       RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], &producer_cluster_, | 
| 185 | 0 |                                 &tables, tablegroup_name, colocated)); | 
| 186 | 0 |       std::shared_ptr<client::YBTable> producer_table; | 
| 187 | 0 |       RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table)); | 
| 188 | 0 |       yb_tables.push_back(producer_table); | 
| 189 |  | 
 | 
| 190 | 0 |       RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], &consumer_cluster_, | 
| 191 | 0 |                                 &tables, tablegroup_name, colocated)); | 
| 192 | 0 |       std::shared_ptr<client::YBTable> consumer_table; | 
| 193 | 0 |       RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table)); | 
| 194 | 0 |       yb_tables.push_back(consumer_table); | 
| 195 | 0 |     } | 
| 196 |  | 
 | 
| 197 | 0 |     return yb_tables; | 
| 198 | 0 |   } | 
| 199 |  |  | 
| 200 | 0 |   Status InitPostgres(Cluster* cluster) { | 
| 201 | 0 |     auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers()); | 
| 202 | 0 |     auto port = cluster->mini_cluster_->AllocateFreePort(); | 
| 203 | 0 |     yb::pgwrapper::PgProcessConf pg_process_conf = | 
| 204 | 0 |         VERIFY_RESULT(yb::pgwrapper::PgProcessConf::CreateValidateAndRunInitDb( | 
| 205 | 0 |             yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), | 
| 206 | 0 |             pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", | 
| 207 | 0 |             pg_ts->server()->GetSharedMemoryFd())); | 
| 208 | 0 |     pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; | 
| 209 | 0 |     pg_process_conf.force_disable_log_file = true; | 
| 210 | 0 |     FLAGS_pgsql_proxy_webserver_port = cluster->mini_cluster_->AllocateFreePort(); | 
| 211 |  | 
 | 
| 212 | 0 |     LOG(INFO) << "Starting PostgreSQL server listening on " | 
| 213 | 0 |               << pg_process_conf.listen_addresses << ":" << pg_process_conf.pg_port << ", data: " | 
| 214 | 0 |               << pg_process_conf.data_dir | 
| 215 | 0 |               << ", pgsql webserver port: " << FLAGS_pgsql_proxy_webserver_port; | 
| 216 | 0 |     cluster->pg_supervisor_ = std::make_unique<yb::pgwrapper::PgSupervisor>(pg_process_conf); | 
| 217 | 0 |     RETURN_NOT_OK(cluster->pg_supervisor_->Start()); | 
| 218 |  | 
 | 
| 219 | 0 |     cluster->pg_host_port_ = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port); | 
| 220 | 0 |     return Status::OK(); | 
| 221 | 0 |   } | 
| 222 |  |  | 
| 223 |  |   Status CreateDatabase(Cluster* cluster, | 
| 224 |  |                         const std::string& namespace_name = kNamespaceName, | 
| 225 | 0 |                         bool colocated = false) { | 
| 226 | 0 |     auto conn = EXPECT_RESULT(cluster->Connect()); | 
| 227 | 0 |     EXPECT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1", | 
| 228 | 0 |                                  namespace_name, colocated ? " colocated = true" : "")); | 
| 229 | 0 |     return Status::OK(); | 
| 230 | 0 |   } | 
| 231 |  |  | 
| 232 | 0 |   Result<string> GetUniverseId(Cluster* cluster) { | 
| 233 | 0 |     master::GetMasterClusterConfigRequestPB req; | 
| 234 | 0 |     master::GetMasterClusterConfigResponsePB resp; | 
| 235 |  | 
 | 
| 236 | 0 |     master::MasterClusterProxy master_proxy( | 
| 237 | 0 |         &cluster->client_->proxy_cache(), | 
| 238 | 0 |         VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr())); | 
| 239 |  | 
 | 
| 240 | 0 |     rpc::RpcController rpc; | 
| 241 | 0 |     rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 242 | 0 |     RETURN_NOT_OK(master_proxy.GetMasterClusterConfig(req, &resp, &rpc)); | 
| 243 | 0 |     if (resp.has_error()) { | 
| 244 | 0 |       return STATUS(IllegalState, "Error getting cluster config"); | 
| 245 | 0 |     } | 
| 246 | 0 |     return resp.cluster_config().cluster_uuid(); | 
| 247 | 0 |   } | 
| 248 |  |  | 
| 249 |  |   Result<YBTableName> CreateTable(Cluster* cluster, | 
| 250 |  |                                   const std::string& namespace_name, | 
| 251 |  |                                   const std::string& table_name, | 
| 252 |  |                                   const boost::optional<std::string>& tablegroup_name, | 
| 253 |  |                                   uint32_t num_tablets, | 
| 254 |  |                                   bool colocated = false, | 
| 255 | 0 |                                   const int table_oid = 0) { | 
| 256 | 0 |     auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name)); | 
| 257 | 0 |     std::string table_oid_string = ""; | 
| 258 | 0 |     if (table_oid > 0) { | 
| 259 |  |       // Need to turn on session flag to allow for CREATE WITH table_oid. | 
| 260 | 0 |       EXPECT_OK(conn.Execute("set yb_enable_create_with_table_oid=true")); | 
| 261 | 0 |       table_oid_string = Format("table_oid = $0", table_oid); | 
| 262 | 0 |     } | 
| 263 | 0 |     std::string query = Format("CREATE TABLE $0($1 int PRIMARY KEY) ", table_name, kKeyColumnName); | 
| 264 |  |     // One cannot use tablegroup together with split into tablets. | 
| 265 | 0 |     if (tablegroup_name.has_value()) { | 
| 266 | 0 |       std::string with_clause = | 
| 267 | 0 |           table_oid_string.empty() ? "" : Format("WITH ($0) ", table_oid_string); | 
| 268 | 0 |       std::string tablegroup_clause = Format("TABLEGROUP $0", tablegroup_name.value()); | 
| 269 | 0 |       query += Format("$0$1", with_clause, tablegroup_clause); | 
| 270 | 0 |     } else { | 
| 271 | 0 |       std::string colocated_clause = Format("colocated = $0", colocated); | 
| 272 | 0 |       std::string with_clause = | 
| 273 | 0 |           table_oid_string.empty() ? colocated_clause | 
| 274 | 0 |                                    : Format("$0, $1", table_oid_string, colocated_clause); | 
| 275 | 0 |       query += Format("WITH ($0) SPLIT INTO $1 TABLETS", with_clause, num_tablets); | 
| 276 | 0 |     } | 
| 277 | 0 |     EXPECT_OK(conn.Execute(query)); | 
| 278 | 0 |     return GetTable(cluster, namespace_name, table_name); | 
| 279 | 0 |   } | 
| 280 |  |  | 
| 281 |  |   Status CreateTable(uint32_t idx, uint32_t num_tablets, Cluster* cluster, | 
| 282 |  |                      std::vector<YBTableName>* tables, | 
| 283 |  |                      const boost::optional<std::string>& tablegroup_name, | 
| 284 | 0 |                      bool colocated = false) { | 
| 285 |  |     /* | 
| 286 |  |      * If we either have tablegroup name or colocated flag | 
| 287 |  |      * Generate table_oid based on index so that we have the same table_oid for producer/consumer. | 
| 288 |  |      */ | 
| 289 | 0 |     const int table_oid = (tablegroup_name.has_value() || colocated) ? (idx + 1) * 111111 : 0; | 
| 290 | 0 |     auto table = VERIFY_RESULT(CreateTable(cluster, kNamespaceName, Format("test_table_$0", idx), | 
| 291 | 0 |                                            tablegroup_name, num_tablets, colocated, table_oid)); | 
| 292 | 0 |     tables->push_back(table); | 
| 293 | 0 |     return Status::OK(); | 
| 294 | 0 |   } | 
| 295 |  |  | 
| 296 |  |   Result<YBTableName> GetTable(Cluster* cluster, | 
| 297 |  |                                const std::string& namespace_name, | 
| 298 |  |                                const std::string& table_name, | 
| 299 |  |                                bool verify_table_name = true, | 
| 300 | 0 |                                bool exclude_system_tables = true) { | 
| 301 | 0 |     master::ListTablesRequestPB req; | 
| 302 | 0 |     master::ListTablesResponsePB resp; | 
| 303 |  | 
 | 
| 304 | 0 |     req.set_name_filter(table_name); | 
| 305 | 0 |     req.mutable_namespace_()->set_name(namespace_name); | 
| 306 | 0 |     req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL); | 
| 307 | 0 |     if (!exclude_system_tables) { | 
| 308 | 0 |       req.set_exclude_system_tables(true); | 
| 309 | 0 |       req.add_relation_type_filter(master::USER_TABLE_RELATION); | 
| 310 | 0 |     } | 
| 311 |  | 
 | 
| 312 | 0 |     master::MasterDdlProxy master_proxy( | 
| 313 | 0 |         &cluster->client_->proxy_cache(), | 
| 314 | 0 |         VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); | 
| 315 |  | 
 | 
| 316 | 0 |     rpc::RpcController rpc; | 
| 317 | 0 |     rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 318 | 0 |     RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc)); | 
| 319 | 0 |     if (resp.has_error()) { | 
| 320 | 0 |       return STATUS(IllegalState, "Failed listing tables"); | 
| 321 | 0 |     } | 
| 322 |  |  | 
| 323 |  |     // Now need to find the table and return it. | 
| 324 | 0 |     for (const auto& table : resp.tables()) { | 
| 325 |  |       // If !verify_table_name, just return the first table. | 
| 326 | 0 |       if (!verify_table_name || | 
| 327 | 0 |           (table.name() == table_name && table.namespace_().name() == namespace_name)) { | 
| 328 | 0 |         YBTableName yb_table; | 
| 329 | 0 |         yb_table.set_table_id(table.id()); | 
| 330 | 0 |         yb_table.set_namespace_id(table.namespace_().id()); | 
| 331 | 0 |         yb_table.set_pgschema_name(table.pgschema_name()); | 
| 332 | 0 |         return yb_table; | 
| 333 | 0 |       } | 
| 334 | 0 |     } | 
| 335 | 0 |     return STATUS(IllegalState, | 
| 336 | 0 |                   strings::Substitute("Unable to find table $0 in namespace $1", | 
| 337 | 0 |                                       table_name, namespace_name)); | 
| 338 | 0 |   } | 
| 339 |  |  | 
| 340 |  |   /* | 
| 341 |  |    * TODO (#11597): Given one is not able to get tablegroup ID by name, currently this works by | 
| 342 |  |    * getting the first available tablegroup appearing in the namespace. | 
| 343 |  |    */ | 
| 344 | 0 |   Result<TablegroupId> GetTablegroup(Cluster* cluster, const std::string& namespace_name) { | 
| 345 |  |     // Lookup the namespace id from the namespace name. | 
| 346 | 0 |     std::string namespace_id; | 
| 347 | 0 |     { | 
| 348 | 0 |       master::ListNamespacesRequestPB req; | 
| 349 | 0 |       master::ListNamespacesResponsePB resp; | 
| 350 | 0 |       master::MasterDdlProxy master_proxy( | 
| 351 | 0 |           &cluster->client_->proxy_cache(), | 
| 352 | 0 |           VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); | 
| 353 |  | 
 | 
| 354 | 0 |       rpc::RpcController rpc; | 
| 355 | 0 |       rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 356 |  | 
 | 
| 357 | 0 |       RETURN_NOT_OK(master_proxy.ListNamespaces(req, &resp, &rpc)); | 
| 358 | 0 |       if (resp.has_error()) { | 
| 359 | 0 |         return STATUS(IllegalState, "Failed to get namespace info"); | 
| 360 | 0 |       } | 
| 361 |  |  | 
| 362 |  |       // Find and return the namespace id. | 
| 363 | 0 |       bool namespaceFound = false; | 
| 364 | 0 |       for (const auto& entry : resp.namespaces()) { | 
| 365 | 0 |         if (entry.name() == namespace_name) { | 
| 366 | 0 |           namespaceFound = true; | 
| 367 | 0 |           namespace_id = entry.id(); | 
| 368 | 0 |           break; | 
| 369 | 0 |         } | 
| 370 | 0 |       } | 
| 371 |  | 
 | 
| 372 | 0 |       if (!namespaceFound) { | 
| 373 | 0 |         return STATUS(IllegalState, "Failed to find namespace"); | 
| 374 | 0 |       } | 
| 375 | 0 |     } | 
| 376 |  |  | 
| 377 | 0 |     master::ListTablegroupsRequestPB req; | 
| 378 | 0 |     master::ListTablegroupsResponsePB resp; | 
| 379 | 0 |     master::MasterDdlProxy master_proxy( | 
| 380 | 0 |         &cluster->client_->proxy_cache(), | 
| 381 | 0 |         VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); | 
| 382 |  | 
 | 
| 383 | 0 |     req.set_namespace_id(namespace_id); | 
| 384 |  | 
 | 
| 385 | 0 |     rpc::RpcController rpc; | 
| 386 | 0 |     rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 387 |  | 
 | 
| 388 | 0 |     RETURN_NOT_OK(master_proxy.ListTablegroups(req, &resp, &rpc)); | 
| 389 | 0 |     if (resp.has_error()) { | 
| 390 | 0 |       return STATUS(IllegalState, "Failed listing tablegroups"); | 
| 391 | 0 |     } | 
| 392 |  |  | 
| 393 |  |     // Find and return the tablegroup. | 
| 394 | 0 |     if (resp.tablegroups().empty()) { | 
| 395 | 0 |       return STATUS(IllegalState, | 
| 396 | 0 |                     Format("Unable to find tablegroup in namespace $0", namespace_name)); | 
| 397 | 0 |     } | 
| 398 |  |  | 
| 399 | 0 |     return resp.tablegroups()[0].id() + master::kTablegroupParentTableIdSuffix; | 
| 400 | 0 |   } | 
| 401 |  |  | 
| 402 |  |   Status CreateTablegroup(Cluster* cluster, | 
| 403 |  |                           const std::string& namespace_name, | 
| 404 | 0 |                           const std::string& tablegroup_name) { | 
| 405 | 0 |     auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name)); | 
| 406 | 0 |     EXPECT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", tablegroup_name)); | 
| 407 | 0 |     return Status::OK(); | 
| 408 | 0 |   } | 
| 409 |  |  | 
| 410 |  |   void WriteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table, | 
| 411 | 0 |                      bool delete_op = false) { | 
| 412 | 0 |     auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name())); | 
| 413 |  | 
 | 
| 414 | 0 |     LOG(INFO) << "Writing " << end-start << (delete_op ? " deletes" : " inserts"); | 
| 415 | 0 |     for (uint32_t i = start; i < end; i++) { | 
| 416 | 0 |       if (delete_op) { | 
| 417 | 0 |         EXPECT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = $2", | 
| 418 | 0 |                                      table.table_name(), kKeyColumnName, i)); | 
| 419 | 0 |       } else { | 
| 420 |  |         // TODO(#6582) transactions currently don't work, so don't use ON CONFLICT DO NOTHING now. | 
| 421 | 0 |         EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2)", // ON CONFLICT DO NOTHING", | 
| 422 | 0 |                                      table.table_name(), kKeyColumnName, i)); | 
| 423 | 0 |       } | 
| 424 | 0 |     } | 
| 425 | 0 |   } | 
| 426 |  |  | 
| 427 |  |   void WriteTransactionalWorkload(uint32_t start, uint32_t end, Cluster* cluster, | 
| 428 | 0 |                                   const YBTableName& table) { | 
| 429 | 0 |     auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name())); | 
| 430 | 0 |     EXPECT_OK(conn.Execute("BEGIN")); | 
| 431 | 0 |     for (uint32_t i = start; i < end; i++) { | 
| 432 | 0 |       EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2) ON CONFLICT DO NOTHING", | 
| 433 | 0 |                                    table.table_name(), kKeyColumnName, i)); | 
| 434 | 0 |     } | 
| 435 | 0 |     EXPECT_OK(conn.Execute("COMMIT")); | 
| 436 | 0 |   } | 
| 437 |  |  | 
| 438 | 0 |   void DeleteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table) { | 
| 439 | 0 |     WriteWorkload(start, end, cluster, table, true /* delete_op */); | 
| 440 | 0 |   } | 
| 441 |  |  | 
| 442 | 0 |   PGResultPtr ScanToStrings(const YBTableName& table_name, Cluster* cluster) { | 
| 443 | 0 |     auto conn = EXPECT_RESULT(cluster->ConnectToDB(table_name.namespace_name())); | 
| 444 | 0 |     auto result = | 
| 445 | 0 |         EXPECT_RESULT(conn.FetchFormat("SELECT * FROM $0 ORDER BY key", table_name.table_name())); | 
| 446 | 0 |     return result; | 
| 447 | 0 |   } | 
| 448 |  |  | 
| 449 |  |   Status VerifyWrittenRecords(const YBTableName& producer_table, | 
| 450 | 0 |                               const YBTableName& consumer_table) { | 
| 451 | 0 |     return LoggedWaitFor([=]() -> Result<bool> { | 
| 452 | 0 |       auto producer_results = ScanToStrings(producer_table, &producer_cluster_); | 
| 453 | 0 |       auto consumer_results = ScanToStrings(consumer_table, &consumer_cluster_); | 
| 454 | 0 |       if (PQntuples(producer_results.get()) != PQntuples(consumer_results.get())) { | 
| 455 | 0 |         return false; | 
| 456 | 0 |       } | 
| 457 | 0 |       for (int i = 0; i < PQntuples(producer_results.get()); ++i) { | 
| 458 | 0 |         auto prod_val = EXPECT_RESULT(ToString(producer_results.get(), i, 0)); | 
| 459 | 0 |         auto cons_val = EXPECT_RESULT(ToString(consumer_results.get(), i, 0)); | 
| 460 | 0 |         if (prod_val != cons_val) { | 
| 461 | 0 |           return false; | 
| 462 | 0 |         } | 
| 463 | 0 |       } | 
| 464 | 0 |       return true; | 
| 465 | 0 |     }, MonoDelta::FromSeconds(kRpcTimeout), "Verify written records"); | 
| 466 | 0 |   } | 
| 467 |  |  | 
| 468 | 0 |   Status VerifyNumRecords(const YBTableName& table, Cluster* cluster, int expected_size) { | 
| 469 | 0 |     return LoggedWaitFor([=]() -> Result<bool> { | 
| 470 | 0 |       auto results = ScanToStrings(table, cluster); | 
| 471 | 0 |       return PQntuples(results.get()) == expected_size; | 
| 472 | 0 |     }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records"); | 
| 473 | 0 |   } | 
| 474 |  |  | 
| 475 |  |   Status DeleteTable(Cluster* cluster, | 
| 476 | 0 |                      TableId* table_id /* = nullptr */) { | 
| 477 | 0 |     RETURN_NOT_OK(cluster->client_->DeleteTable(*table_id)); | 
| 478 |  | 
 | 
| 479 | 0 |     return Status::OK(); | 
| 480 | 0 |   } | 
| 481 |  |  | 
| 482 |  |   Status TruncateTable(Cluster* cluster, | 
| 483 | 0 |                        std::vector<string> table_ids) { | 
| 484 | 0 |     RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids)); | 
| 485 | 0 |     return Status::OK(); | 
| 486 | 0 |   } | 
| 487 |  | }; | 
| 488 |  |  | 
| 489 |  | INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCYsqlTest, | 
| 490 |  |                         ::testing::Values(TwoDCTestParams(1, true), TwoDCTestParams(1, false), | 
| 491 |  |                                           TwoDCTestParams(0, true), TwoDCTestParams(0, false))); | 
| 492 |  |  | 
| 493 |  |  | 
| 494 | 0 | TEST_P(TwoDCYsqlTest, SetupUniverseReplication) { | 
| 495 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 496 | 0 |   auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, 3, 1, false /* colocated */)); | 
| 497 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 498 |  | 
 | 
| 499 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 500 |  |   // tables contains both producer and consumer universe tables (alternately). | 
| 501 |  |   // Pick out just the producer tables from the list. | 
| 502 | 0 |   producer_tables.reserve(tables.size() / 2); | 
| 503 | 0 |   for (size_t i = 0; i < tables.size(); i += 2) { | 
| 504 | 0 |     producer_tables.push_back(tables[i]); | 
| 505 | 0 |   } | 
| 506 | 0 |   ASSERT_OK(SetupUniverseReplication( | 
| 507 | 0 |       producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); | 
| 508 |  |  | 
| 509 |  |   // Verify that universe was setup on consumer. | 
| 510 | 0 |   master::GetUniverseReplicationResponsePB resp; | 
| 511 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); | 
| 512 | 0 |   ASSERT_EQ(resp.entry().producer_id(), kUniverseId); | 
| 513 | 0 |   ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); | 
| 514 | 0 |   for (size_t i = 0; i < producer_tables.size(); i++) { | 
| 515 | 0 |     ASSERT_EQ(resp.entry().tables(narrow_cast<int>(i)), producer_tables[i]->id()); | 
| 516 | 0 |   } | 
| 517 |  |  | 
| 518 |  |   // Verify that CDC streams were created on producer for all tables. | 
| 519 | 0 |   for (size_t i = 0; i < producer_tables.size(); i++) { | 
| 520 | 0 |     master::ListCDCStreamsResponsePB stream_resp; | 
| 521 | 0 |     ASSERT_OK(GetCDCStreamForTable(producer_tables[i]->id(), &stream_resp)); | 
| 522 | 0 |     ASSERT_EQ(stream_resp.streams_size(), 1); | 
| 523 | 0 |     ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_tables[i]->id()); | 
| 524 | 0 |   } | 
| 525 |  | 
 | 
| 526 | 0 |   ASSERT_OK(DeleteUniverseReplication(kUniverseId)); | 
| 527 | 0 | } | 
| 528 |  |  | 
| 529 | 0 | TEST_P(TwoDCYsqlTest, SimpleReplication) { | 
| 530 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 531 | 0 |   constexpr int kNTabletsPerTable = 1; | 
| 532 | 0 |   std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; | 
| 533 | 0 |   auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); | 
| 534 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 535 |  |  | 
| 536 |  |   // tables contains both producer and consumer universe tables (alternately). | 
| 537 |  |   // Pick out just the producer tables from the list. | 
| 538 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 539 | 0 |   std::vector<std::shared_ptr<client::YBTable>> consumer_tables; | 
| 540 | 0 |   producer_tables.reserve(tables.size() / 2); | 
| 541 | 0 |   consumer_tables.reserve(tables.size() / 2); | 
| 542 | 0 |   for (size_t i = 0; i < tables.size(); ++i) { | 
| 543 | 0 |     if (i % 2 == 0) { | 
| 544 | 0 |       producer_tables.push_back(tables[i]); | 
| 545 | 0 |     } else { | 
| 546 | 0 |       consumer_tables.push_back(tables[i]); | 
| 547 | 0 |     } | 
| 548 | 0 |   } | 
| 549 |  |  | 
| 550 |  |   // 1. Write some data. | 
| 551 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 552 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 553 | 0 |     WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); | 
| 554 | 0 |   } | 
| 555 |  |  | 
| 556 |  |   // Verify data is written on the producer. | 
| 557 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 558 | 0 |     auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); | 
| 559 | 0 |     ASSERT_EQ(100, PQntuples(producer_results.get())); | 
| 560 | 0 |     int result; | 
| 561 | 0 |     for (int i = 0; i < 100; ++i) { | 
| 562 | 0 |       result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); | 
| 563 | 0 |       ASSERT_EQ(i, result); | 
| 564 | 0 |     } | 
| 565 | 0 |   } | 
| 566 |  |  | 
| 567 |  |   // 2. Setup replication. | 
| 568 | 0 |   ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), | 
| 569 | 0 |                                      kUniverseId, producer_tables)); | 
| 570 |  |  | 
| 571 |  |   // 3. Verify everything is setup correctly. | 
| 572 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 573 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 574 | 0 |       &get_universe_replication_resp)); | 
| 575 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets( | 
| 576 | 0 |       consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); | 
| 577 |  | 
 | 
| 578 | 0 |   auto data_replicated_correctly = [&](int num_results) -> Result<bool> { | 
| 579 | 0 |     for (const auto& consumer_table : consumer_tables) { | 
| 580 | 0 |       LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); | 
| 581 | 0 |       auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); | 
| 582 |  | 
 | 
| 583 | 0 |       if (num_results != PQntuples(consumer_results.get())) { | 
| 584 | 0 |         return false; | 
| 585 | 0 |       } | 
| 586 | 0 |       int result; | 
| 587 | 0 |       for (int i = 0; i < num_results; ++i) { | 
| 588 | 0 |         result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); | 
| 589 | 0 |         if (i != result) { | 
| 590 | 0 |           return false; | 
| 591 | 0 |         } | 
| 592 | 0 |       } | 
| 593 | 0 |     } | 
| 594 | 0 |     return true; | 
| 595 | 0 |   }; | 
| 596 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, | 
| 597 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 598 |  |  | 
| 599 |  |   // 4. Write more data. | 
| 600 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 601 | 0 |     WriteWorkload(100, 105, &producer_cluster_, producer_table->name()); | 
| 602 | 0 |   } | 
| 603 |  |  | 
| 604 |  |   // 5. Make sure this data is also replicated now. | 
| 605 | 0 |   ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); }, | 
| 606 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 607 | 0 | } | 
| 608 |  |  | 
| 609 | 0 | TEST_P(TwoDCYsqlTest, SetupUniverseReplicationWithProducerBootstrapId) { | 
| 610 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 611 | 0 |   constexpr int kNTabletsPerTable = 1; | 
| 612 | 0 |   std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; | 
| 613 | 0 |   auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3)); | 
| 614 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 615 | 0 |   auto* producer_leader_mini_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster()); | 
| 616 | 0 |   auto producer_master_proxy = std::make_shared<master::MasterReplicationProxy>( | 
| 617 | 0 |       &producer_client()->proxy_cache(), | 
| 618 | 0 |       producer_leader_mini_master->bound_rpc_addr()); | 
| 619 |  | 
 | 
| 620 | 0 |   std::unique_ptr<client::YBClient> client; | 
| 621 | 0 |   std::unique_ptr<cdc::CDCServiceProxy> producer_cdc_proxy; | 
| 622 | 0 |   client = ASSERT_RESULT(consumer_cluster()->CreateClient()); | 
| 623 | 0 |   producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>( | 
| 624 | 0 |       &client->proxy_cache(), | 
| 625 | 0 |       HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr())); | 
| 626 |  |  | 
| 627 |  |   // tables contains both producer and consumer universe tables (alternately). | 
| 628 |  |   // Pick out just the producer tables from the list. | 
| 629 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 630 | 0 |   std::vector<std::shared_ptr<client::YBTable>> consumer_tables; | 
| 631 | 0 |   producer_tables.reserve(tables.size() / 2); | 
| 632 | 0 |   consumer_tables.reserve(tables.size() / 2); | 
| 633 | 0 |   for (size_t i = 0; i < tables.size(); ++i) { | 
| 634 | 0 |     if (i % 2 == 0) { | 
| 635 | 0 |       producer_tables.push_back(tables[i]); | 
| 636 | 0 |     } else { | 
| 637 | 0 |       consumer_tables.push_back(tables[i]); | 
| 638 | 0 |     } | 
| 639 | 0 |   } | 
| 640 |  |  | 
| 641 |  |   // 1. Write some data so that we can verify that only new records get replicated. | 
| 642 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 643 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 644 | 0 |     WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); | 
| 645 | 0 |   } | 
| 646 |  | 
 | 
| 647 | 0 |   SleepFor(MonoDelta::FromSeconds(10)); | 
| 648 | 0 |   cdc::BootstrapProducerRequestPB req; | 
| 649 | 0 |   cdc::BootstrapProducerResponsePB resp; | 
| 650 |  | 
 | 
| 651 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 652 | 0 |     req.add_table_ids(producer_table->id()); | 
| 653 | 0 |   } | 
| 654 |  | 
 | 
| 655 | 0 |   rpc::RpcController rpc; | 
| 656 | 0 |   ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &resp, &rpc)); | 
| 657 | 0 |   ASSERT_FALSE(resp.has_error()); | 
| 658 |  | 
 | 
| 659 | 0 |   ASSERT_EQ(resp.cdc_bootstrap_ids().size(), producer_tables.size()); | 
| 660 |  | 
 | 
| 661 | 0 |   int table_idx = 0; | 
| 662 | 0 |   for (const auto& bootstrap_id : resp.cdc_bootstrap_ids()) { | 
| 663 | 0 |     LOG(INFO) << "Got bootstrap id " << bootstrap_id | 
| 664 | 0 |               << " for table " << producer_tables[table_idx++]->name().table_name(); | 
| 665 | 0 |   } | 
| 666 |  | 
 | 
| 667 | 0 |   std::unordered_map<std::string, int> tablet_bootstraps; | 
| 668 |  |  | 
| 669 |  |   // Verify that for each of the table's tablets, a new row in cdc_state table with the returned | 
| 670 |  |   // id was inserted. | 
| 671 | 0 |   client::TableHandle table; | 
| 672 | 0 |   client::YBTableName cdc_state_table( | 
| 673 | 0 |       YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); | 
| 674 | 0 |   ASSERT_OK(table.Open(cdc_state_table, producer_client())); | 
| 675 |  |  | 
| 676 |  |   // 2 tables with 8 tablets each. | 
| 677 | 0 |   ASSERT_EQ(tables_vector.size() * kNTabletsPerTable, boost::size(client::TableRange(table))); | 
| 678 | 0 |   int nrows = 0; | 
| 679 | 0 |   for (const auto& row : client::TableRange(table)) { | 
| 680 | 0 |     nrows++; | 
| 681 | 0 |     string stream_id = row.column(0).string_value(); | 
| 682 | 0 |     tablet_bootstraps[stream_id]++; | 
| 683 |  | 
 | 
| 684 | 0 |     string checkpoint = row.column(2).string_value(); | 
| 685 | 0 |     auto s = OpId::FromString(checkpoint); | 
| 686 | 0 |     ASSERT_OK(s); | 
| 687 | 0 |     OpId op_id = *s; | 
| 688 | 0 |     ASSERT_GT(op_id.index, 0); | 
| 689 |  | 
 | 
| 690 | 0 |     LOG(INFO) << "Bootstrap id " << stream_id | 
| 691 | 0 |               << " for tablet " << row.column(1).string_value(); | 
| 692 | 0 |   } | 
| 693 |  | 
 | 
| 694 | 0 |   ASSERT_EQ(tablet_bootstraps.size(), producer_tables.size()); | 
| 695 |  |   // Check that each bootstrap id has 8 tablets. | 
| 696 | 0 |   for (const auto& e : tablet_bootstraps) { | 
| 697 | 0 |     ASSERT_EQ(e.second, kNTabletsPerTable); | 
| 698 | 0 |   } | 
| 699 |  |  | 
| 700 |  |   // Map table -> bootstrap_id. We will need when setting up replication. | 
| 701 | 0 |   std::unordered_map<TableId, std::string> table_bootstrap_ids; | 
| 702 | 0 |   for (int i = 0; i < resp.cdc_bootstrap_ids_size(); i++) { | 
| 703 | 0 |     table_bootstrap_ids[req.table_ids(i)] = resp.cdc_bootstrap_ids(i); | 
| 704 | 0 |   } | 
| 705 |  |  | 
| 706 |  |   // 2. Setup replication. | 
| 707 | 0 |   master::SetupUniverseReplicationRequestPB setup_universe_req; | 
| 708 | 0 |   master::SetupUniverseReplicationResponsePB setup_universe_resp; | 
| 709 | 0 |   setup_universe_req.set_producer_id(kUniverseId); | 
| 710 | 0 |   string master_addr = producer_cluster()->GetMasterAddresses(); | 
| 711 | 0 |   auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); | 
| 712 | 0 |   HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); | 
| 713 |  | 
 | 
| 714 | 0 |   setup_universe_req.mutable_producer_table_ids()->Reserve( | 
| 715 | 0 |       narrow_cast<int>(producer_tables.size())); | 
| 716 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 717 | 0 |     setup_universe_req.add_producer_table_ids(producer_table->id()); | 
| 718 | 0 |     const auto& iter = table_bootstrap_ids.find(producer_table->id()); | 
| 719 | 0 |     ASSERT_NE(iter, table_bootstrap_ids.end()); | 
| 720 | 0 |     setup_universe_req.add_producer_bootstrap_ids(iter->second); | 
| 721 | 0 |   } | 
| 722 |  | 
 | 
| 723 | 0 |   auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); | 
| 724 | 0 |   master::MasterReplicationProxy master_proxy( | 
| 725 | 0 |       &consumer_client()->proxy_cache(), | 
| 726 | 0 |       consumer_leader_mini_master->bound_rpc_addr()); | 
| 727 |  | 
 | 
| 728 | 0 |   rpc.Reset(); | 
| 729 | 0 |   rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 730 | 0 |   ASSERT_OK(master_proxy.SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); | 
| 731 | 0 |   ASSERT_FALSE(setup_universe_resp.has_error()); | 
| 732 |  |  | 
| 733 |  |   // 3. Verify everything is setup correctly. | 
| 734 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 735 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 736 | 0 |       &get_universe_replication_resp)); | 
| 737 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets( | 
| 738 | 0 |       consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); | 
| 739 |  |  | 
| 740 |  |   // 4. Write more data. | 
| 741 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 742 | 0 |     WriteWorkload(1000, 1005, &producer_cluster_, producer_table->name()); | 
| 743 | 0 |   } | 
| 744 |  |  | 
| 745 |  |   // 5. Verify that only new writes get replicated to consumer since we bootstrapped the producer | 
| 746 |  |   // after we had already written some data, therefore the old data (whatever was there before we | 
| 747 |  |   // bootstrapped the producer) should not be replicated. | 
| 748 | 0 |   auto data_replicated_correctly = [&]() -> Result<bool> { | 
| 749 | 0 |     for (const auto& consumer_table : consumer_tables) { | 
| 750 | 0 |       LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); | 
| 751 | 0 |       auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); | 
| 752 |  | 
 | 
| 753 | 0 |       if (5 != PQntuples(consumer_results.get())) { | 
| 754 | 0 |         return false; | 
| 755 | 0 |       } | 
| 756 | 0 |       int result; | 
| 757 | 0 |       for (int i = 0; i < 5; ++i) { | 
| 758 | 0 |         result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); | 
| 759 | 0 |         if ((1000 + i) != result) { | 
| 760 | 0 |           return false; | 
| 761 | 0 |         } | 
| 762 | 0 |       } | 
| 763 | 0 |     } | 
| 764 | 0 |     return true; | 
| 765 | 0 |   }; | 
| 766 |  | 
 | 
| 767 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(); }, | 
| 768 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 769 | 0 | } | 
| 770 |  |  | 
| 771 | 0 | TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { | 
| 772 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 773 | 0 |   constexpr int kNTabletsPerColocatedTable = 1; | 
| 774 | 0 |   constexpr int kNTabletsPerTable = 3; | 
| 775 | 0 |   std::vector<uint32_t> tables_vector = {kNTabletsPerColocatedTable, kNTabletsPerColocatedTable}; | 
| 776 |  |   // Create two colocated tables on each cluster. | 
| 777 | 0 |   auto colocated_tables = | 
| 778 | 0 |       ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3, 1, true /* colocated */)); | 
| 779 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 780 |  |  | 
| 781 |  |   // Also create an additional non-colocated table in each database. | 
| 782 | 0 |   auto non_colocated_table = ASSERT_RESULT(CreateTable(&producer_cluster_, | 
| 783 | 0 |                                                        kNamespaceName, | 
| 784 | 0 |                                                        "test_table_2", | 
| 785 | 0 |                                                        boost::none /* tablegroup */, | 
| 786 | 0 |                                                        kNTabletsPerTable, | 
| 787 | 0 |                                                        false /* colocated */)); | 
| 788 | 0 |   std::shared_ptr<client::YBTable> non_colocated_producer_table; | 
| 789 | 0 |   ASSERT_OK(producer_client()->OpenTable(non_colocated_table, &non_colocated_producer_table)); | 
| 790 | 0 |   non_colocated_table = ASSERT_RESULT(CreateTable(&consumer_cluster_, | 
| 791 | 0 |                                                   kNamespaceName, | 
| 792 | 0 |                                                   "test_table_2", | 
| 793 | 0 |                                                   boost::none /* tablegroup */, | 
| 794 | 0 |                                                   kNTabletsPerTable, | 
| 795 | 0 |                                                   false /* colocated */)); | 
| 796 | 0 |   std::shared_ptr<client::YBTable> non_colocated_consumer_table; | 
| 797 | 0 |   ASSERT_OK(consumer_client()->OpenTable(non_colocated_table, &non_colocated_consumer_table)); | 
| 798 |  |  | 
| 799 |  |   // colocated_tables contains both producer and consumer universe tables (alternately). | 
| 800 |  |   // Pick out just the producer tables from the list. | 
| 801 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 802 | 0 |   std::vector<std::shared_ptr<client::YBTable>> consumer_tables; | 
| 803 | 0 |   std::vector<std::shared_ptr<client::YBTable>> colocated_producer_tables; | 
| 804 | 0 |   std::vector<std::shared_ptr<client::YBTable>> colocated_consumer_tables; | 
| 805 | 0 |   producer_tables.reserve(colocated_tables.size() / 2 + 1); | 
| 806 | 0 |   consumer_tables.reserve(colocated_tables.size() / 2 + 1); | 
| 807 | 0 |   colocated_producer_tables.reserve(colocated_tables.size() / 2); | 
| 808 | 0 |   colocated_consumer_tables.reserve(colocated_tables.size() / 2); | 
| 809 | 0 |   for (size_t i = 0; i < colocated_tables.size(); ++i) { | 
| 810 | 0 |     if (i % 2 == 0) { | 
| 811 | 0 |       producer_tables.push_back(colocated_tables[i]); | 
| 812 | 0 |       colocated_producer_tables.push_back(colocated_tables[i]); | 
| 813 | 0 |     } else { | 
| 814 | 0 |       consumer_tables.push_back(colocated_tables[i]); | 
| 815 | 0 |       colocated_consumer_tables.push_back(colocated_tables[i]); | 
| 816 | 0 |     } | 
| 817 | 0 |   } | 
| 818 | 0 |   producer_tables.push_back(non_colocated_producer_table); | 
| 819 | 0 |   consumer_tables.push_back(non_colocated_consumer_table); | 
| 820 |  |  | 
| 821 |  |   // 1. Write some data to all tables. | 
| 822 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 823 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 824 | 0 |     WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); | 
| 825 | 0 |   } | 
| 826 |  |  | 
| 827 |  |   // 2. Setup replication for only the colocated tables. | 
| 828 |  |   // Get the producer namespace id, so we can construct the colocated parent table id. | 
| 829 | 0 |   GetNamespaceInfoResponsePB ns_resp; | 
| 830 | 0 |   ASSERT_OK(producer_client()->GetNamespaceInfo("", kNamespaceName, YQL_DATABASE_PGSQL, &ns_resp)); | 
| 831 |  | 
 | 
| 832 | 0 |   rpc::RpcController rpc; | 
| 833 | 0 |   master::SetupUniverseReplicationRequestPB setup_universe_req; | 
| 834 | 0 |   master::SetupUniverseReplicationResponsePB setup_universe_resp; | 
| 835 | 0 |   setup_universe_req.set_producer_id(kUniverseId); | 
| 836 | 0 |   string master_addr = producer_cluster()->GetMasterAddresses(); | 
| 837 | 0 |   auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); | 
| 838 | 0 |   HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); | 
| 839 |  |   // Only need to add the colocated parent table id. | 
| 840 | 0 |   setup_universe_req.mutable_producer_table_ids()->Reserve(1); | 
| 841 | 0 |   setup_universe_req.add_producer_table_ids( | 
| 842 | 0 |       ns_resp.namespace_().id() + master::kColocatedParentTableIdSuffix); | 
| 843 | 0 |   auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); | 
| 844 | 0 |   auto master_proxy = std::make_shared<master::MasterReplicationProxy>( | 
| 845 | 0 |       &consumer_client()->proxy_cache(), | 
| 846 | 0 |       consumer_leader_mini_master->bound_rpc_addr()); | 
| 847 |  | 
 | 
| 848 | 0 |   rpc.Reset(); | 
| 849 | 0 |   rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 850 | 0 |   ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); | 
| 851 | 0 |   ASSERT_FALSE(setup_universe_resp.has_error()); | 
| 852 |  |  | 
| 853 |  |   // 3. Verify everything is setup correctly. | 
| 854 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 855 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 856 | 0 |       &get_universe_replication_resp)); | 
| 857 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNTabletsPerColocatedTable)); | 
| 858 |  |  | 
| 859 |  |   // 4. Check that colocated tables are being replicated. | 
| 860 | 0 |   auto data_replicated_correctly = [&](int num_results, bool onlyColocated) -> Result<bool> { | 
| 861 | 0 |     auto &tables = onlyColocated ? colocated_consumer_tables : consumer_tables; | 
| 862 | 0 |     for (const auto& consumer_table : tables) { | 
| 863 | 0 |       LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); | 
| 864 | 0 |       auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); | 
| 865 |  | 
 | 
| 866 | 0 |       if (num_results != PQntuples(consumer_results.get())) { | 
| 867 | 0 |         return false; | 
| 868 | 0 |       } | 
| 869 | 0 |       int result; | 
| 870 | 0 |       for (int i = 0; i < num_results; ++i) { | 
| 871 | 0 |         result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); | 
| 872 | 0 |         if (i != result) { | 
| 873 | 0 |           return false; | 
| 874 | 0 |         } | 
| 875 | 0 |       } | 
| 876 | 0 |     } | 
| 877 | 0 |     return true; | 
| 878 | 0 |   }; | 
| 879 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100, true); }, | 
| 880 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 881 |  |   // Ensure that the non colocated table is not replicated. | 
| 882 | 0 |   auto non_coloc_results = ScanToStrings(non_colocated_consumer_table->name(), &consumer_cluster_); | 
| 883 | 0 |   ASSERT_EQ(0, PQntuples(non_coloc_results.get())); | 
| 884 |  |  | 
| 885 |  |   // 5. Add the regular table to replication. | 
| 886 |  |   // Prepare and send AlterUniverseReplication command. | 
| 887 | 0 |   master::AlterUniverseReplicationRequestPB alter_req; | 
| 888 | 0 |   master::AlterUniverseReplicationResponsePB alter_resp; | 
| 889 | 0 |   alter_req.set_producer_id(kUniverseId); | 
| 890 | 0 |   alter_req.add_producer_table_ids_to_add(non_colocated_producer_table->id()); | 
| 891 |  | 
 | 
| 892 | 0 |   rpc.Reset(); | 
| 893 | 0 |   rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 894 | 0 |   ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); | 
| 895 | 0 |   ASSERT_FALSE(alter_resp.has_error()); | 
| 896 |  |   // Wait until we have 2 tables (colocated tablet + regular table) logged. | 
| 897 | 0 |   ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { | 
| 898 | 0 |     master::GetUniverseReplicationResponsePB tmp_resp; | 
| 899 | 0 |     return VerifyUniverseReplication(consumer_cluster(), consumer_client(), | 
| 900 | 0 |         kUniverseId, &tmp_resp).ok() && | 
| 901 | 0 |         tmp_resp.entry().tables_size() == 2; | 
| 902 | 0 |   }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); | 
| 903 |  | 
 | 
| 904 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets( | 
| 905 | 0 |       consumer_cluster(), kNTabletsPerColocatedTable + kNTabletsPerTable)); | 
| 906 |  |   // Check that all data is replicated for the new table as well. | 
| 907 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100, false); }, | 
| 908 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 909 |  |  | 
| 910 |  |   // 6. Add additional data to all tables | 
| 911 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 912 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 913 | 0 |     WriteWorkload(100, 150, &producer_cluster_, producer_table->name()); | 
| 914 | 0 |   } | 
| 915 |  |  | 
| 916 |  |   // 7. Verify all tables are properly replicated. | 
| 917 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(150, false); }, | 
| 918 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 919 | 0 | } | 
| 920 |  |  | 
| 921 | 0 | TEST_P(TwoDCYsqlTest, ColocatedDatabaseDifferentTableOids) { | 
| 922 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 923 | 0 |   auto colocated_tables = ASSERT_RESULT(SetUpWithParams({}, {}, 3, 1, true /* colocated */)); | 
| 924 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 925 |  |  | 
| 926 |  |   // Create two tables with different table oids. | 
| 927 | 0 |   auto conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(kNamespaceName)); | 
| 928 | 0 |   auto table_info = ASSERT_RESULT(CreateTable(&producer_cluster_, | 
| 929 | 0 |                                               kNamespaceName, | 
| 930 | 0 |                                               "test_table_0", | 
| 931 | 0 |                                               boost::none /* tablegroup */, | 
| 932 | 0 |                                               1 /* num_tablets */, | 
| 933 | 0 |                                               true /* colocated */, | 
| 934 | 0 |                                               123456 /* table_oid */)); | 
| 935 | 0 |   ASSERT_RESULT(CreateTable(&consumer_cluster_, | 
| 936 | 0 |                             kNamespaceName, | 
| 937 | 0 |                             "test_table_0", | 
| 938 | 0 |                             boost::none /* tablegroup */, | 
| 939 | 0 |                             1 /* num_tablets */, | 
| 940 | 0 |                             true /* colocated */, | 
| 941 | 0 |                             123457 /* table_oid */)); | 
| 942 | 0 |   std::shared_ptr<client::YBTable> producer_table; | 
| 943 | 0 |   ASSERT_OK(producer_client()->OpenTable(table_info, &producer_table)); | 
| 944 |  |  | 
| 945 |  |   // Try to setup replication, should fail on schema validation due to different table oids. | 
| 946 | 0 |   ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), | 
| 947 | 0 |                                      kUniverseId, {producer_table})); | 
| 948 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 949 | 0 |   ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 950 | 0 |       &get_universe_replication_resp)); | 
| 951 | 0 | } | 
| 952 |  |  | 
| 953 | 0 | TEST_P(TwoDCYsqlTest, TablegroupReplication) { | 
| 954 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 955 |  | 
 | 
| 956 | 0 |   std::vector<uint32_t> tables_vector = {1, 1}; | 
| 957 | 0 |   boost::optional<std::string> kTablegroupName("mytablegroup"); | 
| 958 | 0 |   auto tables = ASSERT_RESULT( | 
| 959 | 0 |       SetUpWithParams(tables_vector, tables_vector, 1, 1, false /* colocated */, kTablegroupName)); | 
| 960 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 961 |  |  | 
| 962 |  |   // tables contains both producer and consumer universe tables (alternately). | 
| 963 |  |   // Pick out just the producer tables from the list. | 
| 964 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 965 | 0 |   std::vector<std::shared_ptr<client::YBTable>> consumer_tables; | 
| 966 | 0 |   producer_tables.reserve(tables.size() / 2); | 
| 967 | 0 |   consumer_tables.reserve(tables.size() / 2); | 
| 968 | 0 |   for (size_t i = 0; i < tables.size(); ++i) { | 
| 969 | 0 |     if (i % 2 == 0) { | 
| 970 | 0 |       producer_tables.push_back(tables[i]); | 
| 971 | 0 |     } else { | 
| 972 | 0 |       consumer_tables.push_back(tables[i]); | 
| 973 | 0 |     } | 
| 974 | 0 |   } | 
| 975 |  |  | 
| 976 |  |   // 1. Write some data to all tables. | 
| 977 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 978 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 979 | 0 |     WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); | 
| 980 | 0 |   } | 
| 981 |  |  | 
| 982 |  |   // 2. Setup replication for the tablegroup. | 
| 983 | 0 |   auto tablegroup_id = ASSERT_RESULT(GetTablegroup(&producer_cluster_, kNamespaceName)); | 
| 984 | 0 |   LOG(INFO) << "Tablegroup id to replicate: " << tablegroup_id; | 
| 985 |  | 
 | 
| 986 | 0 |   rpc::RpcController rpc; | 
| 987 | 0 |   master::SetupUniverseReplicationRequestPB setup_universe_req; | 
| 988 | 0 |   master::SetupUniverseReplicationResponsePB setup_universe_resp; | 
| 989 | 0 |   setup_universe_req.set_producer_id(kUniverseId); | 
| 990 | 0 |   string master_addr = producer_cluster()->GetMasterAddresses(); | 
| 991 | 0 |   auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); | 
| 992 | 0 |   HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); | 
| 993 | 0 |   setup_universe_req.mutable_producer_table_ids()->Reserve(1); | 
| 994 | 0 |   setup_universe_req.add_producer_table_ids(tablegroup_id); | 
| 995 |  | 
 | 
| 996 | 0 |   auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); | 
| 997 | 0 |   auto master_proxy = std::make_shared<master::MasterReplicationProxy>( | 
| 998 | 0 |       &consumer_client()->proxy_cache(), | 
| 999 | 0 |       consumer_leader_mini_master->bound_rpc_addr()); | 
| 1000 |  | 
 | 
| 1001 | 0 |   rpc.Reset(); | 
| 1002 | 0 |   rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 1003 | 0 |   ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); | 
| 1004 | 0 |   ASSERT_FALSE(setup_universe_resp.has_error()); | 
| 1005 |  |  | 
| 1006 |  |   // 3. Verify everything is setup correctly. | 
| 1007 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 1008 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 1009 | 0 |       &get_universe_replication_resp)); | 
| 1010 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); | 
| 1011 |  |  | 
| 1012 |  |   // 4. Check that tables are being replicated. | 
| 1013 | 0 |   auto data_replicated_correctly = [&](int num_results) -> Result<bool> { | 
| 1014 | 0 |     for (const auto& consumer_table : consumer_tables) { | 
| 1015 | 0 |       LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); | 
| 1016 | 0 |       auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); | 
| 1017 |  | 
 | 
| 1018 | 0 |       if (num_results != PQntuples(consumer_results.get())) { | 
| 1019 | 0 |         return false; | 
| 1020 | 0 |       } | 
| 1021 | 0 |       int result; | 
| 1022 | 0 |       for (int i = 0; i < num_results; ++i) { | 
| 1023 | 0 |         result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); | 
| 1024 | 0 |         if (i != result) { | 
| 1025 | 0 |           return false; | 
| 1026 | 0 |         } | 
| 1027 | 0 |       } | 
| 1028 | 0 |     } | 
| 1029 | 0 |     return true; | 
| 1030 | 0 |   }; | 
| 1031 |  | 
 | 
| 1032 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, | 
| 1033 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 1034 |  |  | 
| 1035 |  |   // 5. Write more data. | 
| 1036 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 1037 | 0 |     WriteWorkload(100, 105, &producer_cluster_, producer_table->name()); | 
| 1038 | 0 |   } | 
| 1039 |  | 
 | 
| 1040 | 0 |   ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); }, | 
| 1041 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 1042 | 0 | } | 
| 1043 |  |  | 
| 1044 | 0 | TEST_P(TwoDCYsqlTest, TablegroupReplicationMismatch) { | 
| 1045 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 1046 | 0 |   ASSERT_OK(Initialize(1 /* replication_factor */)); | 
| 1047 |  | 
 | 
| 1048 | 0 |   boost::optional<std::string> tablegroup_name("mytablegroup"); | 
| 1049 |  | 
 | 
| 1050 | 0 |   ASSERT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, false /* colocated */)); | 
| 1051 | 0 |   ASSERT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, false /* colocated */)); | 
| 1052 | 0 |   ASSERT_OK(CreateTablegroup(&producer_cluster_, kNamespaceName, tablegroup_name.get())); | 
| 1053 | 0 |   ASSERT_OK(CreateTablegroup(&consumer_cluster_, kNamespaceName, tablegroup_name.get())); | 
| 1054 |  |  | 
| 1055 |  |   // We intentionally set up so that the number of producer and consumer tables don't match. | 
| 1056 |  |   // The replication should fail during validation. | 
| 1057 | 0 |   const uint32_t num_producer_tables = 2; | 
| 1058 | 0 |   const uint32_t num_consumer_tables = 3; | 
| 1059 | 0 |   std::vector<YBTableName> tables; | 
| 1060 | 0 |   for (uint32_t i = 0; i < num_producer_tables; i++) { | 
| 1061 | 0 |     ASSERT_OK(CreateTable(i, 1 /* num_tablets */, &producer_cluster_, | 
| 1062 | 0 |                           &tables, tablegroup_name, false /* colocated */)); | 
| 1063 | 0 |   } | 
| 1064 | 0 |   for (uint32_t i = 0; i < num_consumer_tables; i++) { | 
| 1065 | 0 |     ASSERT_OK(CreateTable(i, 1 /* num_tablets */, &consumer_cluster_, | 
| 1066 | 0 |                           &tables, tablegroup_name, false /* colocated */)); | 
| 1067 | 0 |   } | 
| 1068 |  | 
 | 
| 1069 | 0 |   auto tablegroup_id = ASSERT_RESULT(GetTablegroup(&producer_cluster_, kNamespaceName)); | 
| 1070 |  |  | 
| 1071 |  |   // Try to set up replication. | 
| 1072 | 0 |   rpc::RpcController rpc; | 
| 1073 | 0 |   master::SetupUniverseReplicationRequestPB setup_universe_req; | 
| 1074 | 0 |   master::SetupUniverseReplicationResponsePB setup_universe_resp; | 
| 1075 | 0 |   setup_universe_req.set_producer_id(kUniverseId); | 
| 1076 | 0 |   string master_addr = producer_cluster()->GetMasterAddresses(); | 
| 1077 | 0 |   auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); | 
| 1078 | 0 |   HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); | 
| 1079 | 0 |   setup_universe_req.mutable_producer_table_ids()->Reserve(1); | 
| 1080 | 0 |   setup_universe_req.add_producer_table_ids(tablegroup_id); | 
| 1081 |  | 
 | 
| 1082 | 0 |   auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); | 
| 1083 | 0 |   auto master_proxy = std::make_shared<master::MasterReplicationProxy>( | 
| 1084 | 0 |       &consumer_client()->proxy_cache(), | 
| 1085 | 0 |       consumer_leader_mini_master->bound_rpc_addr()); | 
| 1086 |  | 
 | 
| 1087 | 0 |   rpc.Reset(); | 
| 1088 | 0 |   rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); | 
| 1089 | 0 |   ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); | 
| 1090 | 0 |   ASSERT_FALSE(setup_universe_resp.has_error()); | 
| 1091 |  |  | 
| 1092 |  |   // The schema validation should fail. | 
| 1093 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 1094 | 0 |   ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 1095 | 0 |       &get_universe_replication_resp)); | 
| 1096 | 0 | } | 
| 1097 |  |  | 
| 1098 |  | // TODO adapt rest of twodc-test.cc tests. | 
| 1099 |  |  | 
| 1100 | 0 | TEST_P(TwoDCYsqlTest, DeleteTableChecks) { | 
| 1101 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 1102 | 0 |   constexpr int kNTabletsPerTable = 1; | 
| 1103 | 0 |   std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; | 
| 1104 | 0 |   auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); | 
| 1105 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 1106 |  |  | 
| 1107 |  |   // tables contains both producer and consumer universe tables (alternately). | 
| 1108 |  |   // Pick out just the producer tables from the list. | 
| 1109 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 1110 | 0 |   std::vector<std::shared_ptr<client::YBTable>> consumer_tables; | 
| 1111 | 0 |   producer_tables.reserve(tables.size() / 2); | 
| 1112 | 0 |   consumer_tables.reserve(tables.size() / 2); | 
| 1113 | 0 |   for (size_t i = 0; i < tables.size(); ++i) { | 
| 1114 | 0 |     if (i % 2 == 0) { | 
| 1115 | 0 |       producer_tables.push_back(tables[i]); | 
| 1116 | 0 |     } else { | 
| 1117 | 0 |       consumer_tables.push_back(tables[i]); | 
| 1118 | 0 |     } | 
| 1119 | 0 |   } | 
| 1120 |  |  | 
| 1121 |  |   // 1. Write some data. | 
| 1122 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 1123 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 1124 | 0 |     WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); | 
| 1125 | 0 |   } | 
| 1126 |  |  | 
| 1127 |  |   // Verify data is written on the producer. | 
| 1128 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 1129 | 0 |     auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); | 
| 1130 | 0 |     ASSERT_EQ(100, PQntuples(producer_results.get())); | 
| 1131 | 0 |     int result; | 
| 1132 | 0 |     for (int i = 0; i < 100; ++i) { | 
| 1133 | 0 |       result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); | 
| 1134 | 0 |       ASSERT_EQ(i, result); | 
| 1135 | 0 |     } | 
| 1136 | 0 |   } | 
| 1137 |  |  | 
| 1138 |  |   // 2. Setup replication. | 
| 1139 | 0 |   ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), | 
| 1140 | 0 |                                      kUniverseId, producer_tables)); | 
| 1141 |  |  | 
| 1142 |  |   // 3. Verify everything is setup correctly. | 
| 1143 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 1144 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 1145 | 0 |       &get_universe_replication_resp)); | 
| 1146 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets( | 
| 1147 | 0 |       consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); | 
| 1148 |  | 
 | 
| 1149 | 0 |   auto data_replicated_correctly = [&](int num_results) -> Result<bool> { | 
| 1150 | 0 |     for (const auto& consumer_table : consumer_tables) { | 
| 1151 | 0 |       LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); | 
| 1152 | 0 |       auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); | 
| 1153 |  | 
 | 
| 1154 | 0 |       if (num_results != PQntuples(consumer_results.get())) { | 
| 1155 | 0 |         return false; | 
| 1156 | 0 |       } | 
| 1157 | 0 |       int result; | 
| 1158 | 0 |       for (int i = 0; i < num_results; ++i) { | 
| 1159 | 0 |         result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); | 
| 1160 | 0 |         if (i != result) { | 
| 1161 | 0 |           return false; | 
| 1162 | 0 |         } | 
| 1163 | 0 |       } | 
| 1164 | 0 |     } | 
| 1165 | 0 |     return true; | 
| 1166 | 0 |   }; | 
| 1167 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, | 
| 1168 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 1169 |  |  | 
| 1170 |  |   // Attempt to destroy the producer and consumer tables. | 
| 1171 | 0 |   string producer_table_name = producer_tables[0]->name().ToString(); | 
| 1172 | 0 |   string producer_table_id = producer_tables[0]->id(); | 
| 1173 | 0 |   string consumer_table_name = consumer_tables[0]->name().ToString(); | 
| 1174 | 0 |   string consumer_table_id = consumer_tables[0]->id(); | 
| 1175 | 0 |   ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id)); | 
| 1176 | 0 |   ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id)); | 
| 1177 |  | 
 | 
| 1178 | 0 |   FLAGS_enable_delete_truncate_xcluster_replicated_table = true; | 
| 1179 | 0 |   ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id)); | 
| 1180 | 0 |   ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id)); | 
| 1181 | 0 | } | 
| 1182 |  |  | 
| 1183 | 0 | TEST_P(TwoDCYsqlTest, TruncateTableChecks) { | 
| 1184 | 0 |   YB_SKIP_TEST_IN_TSAN(); | 
| 1185 | 0 |   constexpr int kNTabletsPerTable = 1; | 
| 1186 | 0 |   std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; | 
| 1187 | 0 |   auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); | 
| 1188 | 0 |   const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); | 
| 1189 |  |  | 
| 1190 |  |   // tables contains both producer and consumer universe tables (alternately). | 
| 1191 |  |   // Pick out just the producer tables from the list. | 
| 1192 | 0 |   std::vector<std::shared_ptr<client::YBTable>> producer_tables; | 
| 1193 | 0 |   std::vector<std::shared_ptr<client::YBTable>> consumer_tables; | 
| 1194 | 0 |   producer_tables.reserve(tables.size() / 2); | 
| 1195 | 0 |   consumer_tables.reserve(tables.size() / 2); | 
| 1196 | 0 |   for (size_t i = 0; i < tables.size(); ++i) { | 
| 1197 | 0 |     if (i % 2 == 0) { | 
| 1198 | 0 |       producer_tables.push_back(tables[i]); | 
| 1199 | 0 |     } else { | 
| 1200 | 0 |       consumer_tables.push_back(tables[i]); | 
| 1201 | 0 |     } | 
| 1202 | 0 |   } | 
| 1203 |  |  | 
| 1204 |  |   // 1. Write some data. | 
| 1205 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 1206 | 0 |     LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); | 
| 1207 | 0 |     WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); | 
| 1208 | 0 |   } | 
| 1209 |  |  | 
| 1210 |  |   // Verify data is written on the producer. | 
| 1211 | 0 |   for (const auto& producer_table : producer_tables) { | 
| 1212 | 0 |     auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); | 
| 1213 | 0 |     ASSERT_EQ(100, PQntuples(producer_results.get())); | 
| 1214 | 0 |     int result; | 
| 1215 | 0 |     for (int i = 0; i < 100; ++i) { | 
| 1216 | 0 |       result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); | 
| 1217 | 0 |       ASSERT_EQ(i, result); | 
| 1218 | 0 |     } | 
| 1219 | 0 |   } | 
| 1220 |  |  | 
| 1221 |  |   // 2. Setup replication. | 
| 1222 | 0 |   ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), | 
| 1223 | 0 |                                      kUniverseId, producer_tables)); | 
| 1224 |  |  | 
| 1225 |  |   // 3. Verify everything is setup correctly. | 
| 1226 | 0 |   master::GetUniverseReplicationResponsePB get_universe_replication_resp; | 
| 1227 | 0 |   ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, | 
| 1228 | 0 |       &get_universe_replication_resp)); | 
| 1229 | 0 |   ASSERT_OK(CorrectlyPollingAllTablets( | 
| 1230 | 0 |       consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); | 
| 1231 |  | 
 | 
| 1232 | 0 |   auto data_replicated_correctly = [&](int num_results) -> Result<bool> { | 
| 1233 | 0 |     for (const auto& consumer_table : consumer_tables) { | 
| 1234 | 0 |       LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); | 
| 1235 | 0 |       auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); | 
| 1236 |  | 
 | 
| 1237 | 0 |       if (num_results != PQntuples(consumer_results.get())) { | 
| 1238 | 0 |         return false; | 
| 1239 | 0 |       } | 
| 1240 | 0 |       int result; | 
| 1241 | 0 |       for (int i = 0; i < num_results; ++i) { | 
| 1242 | 0 |         result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); | 
| 1243 | 0 |         if (i != result) { | 
| 1244 | 0 |           return false; | 
| 1245 | 0 |         } | 
| 1246 | 0 |       } | 
| 1247 | 0 |     } | 
| 1248 | 0 |     return true; | 
| 1249 | 0 |   }; | 
| 1250 | 0 |   ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, | 
| 1251 | 0 |                     MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); | 
| 1252 |  |  | 
| 1253 |  |   // Attempt to Truncate the producer and consumer tables. | 
| 1254 | 0 |   string producer_table_id = producer_tables[0]->id(); | 
| 1255 | 0 |   string consumer_table_id = consumer_tables[0]->id(); | 
| 1256 | 0 |   ASSERT_NOK(TruncateTable(&producer_cluster_, {producer_table_id})); | 
| 1257 | 0 |   ASSERT_NOK(TruncateTable(&consumer_cluster_, {consumer_table_id})); | 
| 1258 |  | 
 | 
| 1259 | 0 |   FLAGS_enable_delete_truncate_xcluster_replicated_table = true; | 
| 1260 | 0 |   ASSERT_OK(TruncateTable(&producer_cluster_, {producer_table_id})); | 
| 1261 | 0 |   ASSERT_OK(TruncateTable(&consumer_cluster_, {consumer_table_id})); | 
| 1262 | 0 | } | 
| 1263 |  |  | 
| 1264 |  | } // namespace enterprise | 
| 1265 |  | } // namespace yb |