/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc_ysql-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <algorithm> |
15 | | #include <map> |
16 | | #include <string> |
17 | | #include <utility> |
18 | | #include <chrono> |
19 | | #include <boost/assign.hpp> |
20 | | #include <gflags/gflags.h> |
21 | | #include <gtest/gtest.h> |
22 | | |
23 | | #include "yb/common/common.pb.h" |
24 | | #include "yb/common/entity_ids.h" |
25 | | #include "yb/common/ql_value.h" |
26 | | #include "yb/common/schema.h" |
27 | | #include "yb/common/wire_protocol.h" |
28 | | |
29 | | #include "yb/cdc/cdc_service.h" |
30 | | #include "yb/cdc/cdc_service.pb.h" |
31 | | #include "yb/cdc/cdc_service.proxy.h" |
32 | | #include "yb/client/client.h" |
33 | | #include "yb/client/client-test-util.h" |
34 | | #include "yb/client/meta_cache.h" |
35 | | #include "yb/client/schema.h" |
36 | | #include "yb/client/session.h" |
37 | | #include "yb/client/table.h" |
38 | | #include "yb/client/table_alterer.h" |
39 | | #include "yb/client/table_creator.h" |
40 | | #include "yb/client/table_handle.h" |
41 | | #include "yb/client/transaction.h" |
42 | | #include "yb/client/yb_op.h" |
43 | | |
44 | | #include "yb/gutil/stl_util.h" |
45 | | #include "yb/gutil/strings/join.h" |
46 | | #include "yb/gutil/strings/substitute.h" |
47 | | #include "yb/integration-tests/cdc_test_util.h" |
48 | | #include "yb/integration-tests/mini_cluster.h" |
49 | | #include "yb/integration-tests/twodc_test_base.h" |
50 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
51 | | |
52 | | #include "yb/master/cdc_consumer_registry_service.h" |
53 | | #include "yb/master/mini_master.h" |
54 | | #include "yb/master/master.h" |
55 | | #include "yb/master/master_cluster.proxy.h" |
56 | | #include "yb/master/master_ddl.proxy.h" |
57 | | #include "yb/master/master_replication.proxy.h" |
58 | | #include "yb/master/master-test-util.h" |
59 | | #include "yb/master/sys_catalog_initialization.h" |
60 | | |
61 | | #include "yb/rpc/rpc_controller.h" |
62 | | #include "yb/tablet/tablet.h" |
63 | | #include "yb/tablet/tablet_peer.h" |
64 | | #include "yb/tserver/mini_tablet_server.h" |
65 | | #include "yb/tserver/tablet_server.h" |
66 | | #include "yb/tserver/ts_tablet_manager.h" |
67 | | |
68 | | #include "yb/tserver/cdc_consumer.h" |
69 | | #include "yb/util/atomic.h" |
70 | | #include "yb/util/faststring.h" |
71 | | #include "yb/util/format.h" |
72 | | #include "yb/util/monotime.h" |
73 | | #include "yb/util/random.h" |
74 | | #include "yb/util/random_util.h" |
75 | | #include "yb/util/result.h" |
76 | | #include "yb/util/stopwatch.h" |
77 | | #include "yb/util/test_util.h" |
78 | | #include "yb/util/test_macros.h" |
79 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
80 | | #include "yb/yql/pgwrapper/pg_wrapper.h" |
81 | | |
82 | | DECLARE_int32(replication_factor); |
83 | | DECLARE_int32(cdc_max_apply_batch_num_records); |
84 | | DECLARE_int32(client_read_write_timeout_ms); |
85 | | DECLARE_int32(pgsql_proxy_webserver_port); |
86 | | DECLARE_bool(enable_ysql); |
87 | | DECLARE_bool(hide_pg_catalog_table_creation_logs); |
88 | | DECLARE_bool(master_auto_run_initdb); |
89 | | DECLARE_int32(pggate_rpc_timeout_secs); |
90 | | DECLARE_bool(enable_delete_truncate_xcluster_replicated_table); |
91 | | |
92 | | namespace yb { |
93 | | |
94 | | using client::YBClient; |
95 | | using client::YBClientBuilder; |
96 | | using client::YBColumnSchema; |
97 | | using client::YBError; |
98 | | using client::YBSchema; |
99 | | using client::YBSchemaBuilder; |
100 | | using client::YBSession; |
101 | | using client::YBTable; |
102 | | using client::YBTableAlterer; |
103 | | using client::YBTableCreator; |
104 | | using client::YBTableType; |
105 | | using client::YBTableName; |
106 | | using master::GetNamespaceInfoResponsePB; |
107 | | using master::MiniMaster; |
108 | | using tserver::MiniTabletServer; |
109 | | using tserver::enterprise::CDCConsumer; |
110 | | |
111 | | using pgwrapper::ToString; |
112 | | using pgwrapper::GetInt32; |
113 | | using pgwrapper::PGConn; |
114 | | using pgwrapper::PGResultPtr; |
115 | | using pgwrapper::PgSupervisor; |
116 | | |
117 | | namespace enterprise { |
118 | | |
119 | | constexpr static const char* const kKeyColumnName = "key"; |
120 | | |
121 | | class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface<TwoDCTestParams> { |
122 | | public: |
123 | 0 | Status Initialize(uint32_t replication_factor, uint32_t num_masters = 1) { |
124 | 0 | master::SetDefaultInitialSysCatalogSnapshotFlags(); |
125 | 0 | TwoDCTestBase::SetUp(); |
126 | 0 | FLAGS_enable_ysql = true; |
127 | 0 | FLAGS_master_auto_run_initdb = true; |
128 | 0 | FLAGS_hide_pg_catalog_table_creation_logs = true; |
129 | 0 | FLAGS_pggate_rpc_timeout_secs = 120; |
130 | 0 | FLAGS_cdc_max_apply_batch_num_records = GetParam().batch_size; |
131 | 0 | FLAGS_cdc_enable_replicate_intents = GetParam().enable_replicate_intents; |
132 | |
|
133 | 0 | MiniClusterOptions opts; |
134 | 0 | opts.num_tablet_servers = replication_factor; |
135 | 0 | opts.num_masters = num_masters; |
136 | 0 | FLAGS_replication_factor = replication_factor; |
137 | 0 | opts.cluster_id = "producer"; |
138 | 0 | producer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); |
139 | 0 | RETURN_NOT_OK(producer_cluster()->StartSync()); |
140 | 0 | RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor)); |
141 | 0 | RETURN_NOT_OK(WaitForInitDb(producer_cluster())); |
142 | |
|
143 | 0 | opts.cluster_id = "consumer"; |
144 | 0 | consumer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); |
145 | 0 | RETURN_NOT_OK(consumer_cluster()->StartSync()); |
146 | 0 | RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor)); |
147 | 0 | RETURN_NOT_OK(WaitForInitDb(consumer_cluster())); |
148 | |
|
149 | 0 | producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient()); |
150 | 0 | consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient()); |
151 | |
|
152 | 0 | RETURN_NOT_OK(InitPostgres(&producer_cluster_)); |
153 | 0 | RETURN_NOT_OK(InitPostgres(&consumer_cluster_)); |
154 | |
|
155 | 0 | return Status::OK(); |
156 | 0 | } |
157 | | |
158 | | Result<std::vector<std::shared_ptr<client::YBTable>>> |
159 | | SetUpWithParams(std::vector<uint32_t> num_consumer_tablets, |
160 | | std::vector<uint32_t> num_producer_tablets, |
161 | | uint32_t replication_factor, |
162 | | uint32_t num_masters = 1, |
163 | | bool colocated = false, |
164 | 0 | boost::optional<std::string> tablegroup_name = boost::none) { |
165 | 0 | RETURN_NOT_OK(Initialize(replication_factor, num_masters)); |
166 | |
|
167 | 0 | if (num_consumer_tablets.size() != num_producer_tablets.size()) { |
168 | 0 | return STATUS(IllegalState, |
169 | 0 | Format("Num consumer tables: $0 num producer tables: $1 must be equal.", |
170 | 0 | num_consumer_tablets.size(), num_producer_tablets.size())); |
171 | 0 | } |
172 | | |
173 | 0 | RETURN_NOT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, colocated)); |
174 | 0 | RETURN_NOT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, colocated)); |
175 | |
|
176 | 0 | if (tablegroup_name.has_value()) { |
177 | 0 | RETURN_NOT_OK(CreateTablegroup(&producer_cluster_, kNamespaceName, tablegroup_name.get())); |
178 | 0 | RETURN_NOT_OK(CreateTablegroup(&consumer_cluster_, kNamespaceName, tablegroup_name.get())); |
179 | 0 | } |
180 | |
|
181 | 0 | std::vector<YBTableName> tables; |
182 | 0 | std::vector<std::shared_ptr<client::YBTable>> yb_tables; |
183 | 0 | for (uint32_t i = 0; i < num_consumer_tablets.size(); i++) { |
184 | 0 | RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], &producer_cluster_, |
185 | 0 | &tables, tablegroup_name, colocated)); |
186 | 0 | std::shared_ptr<client::YBTable> producer_table; |
187 | 0 | RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table)); |
188 | 0 | yb_tables.push_back(producer_table); |
189 | |
|
190 | 0 | RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], &consumer_cluster_, |
191 | 0 | &tables, tablegroup_name, colocated)); |
192 | 0 | std::shared_ptr<client::YBTable> consumer_table; |
193 | 0 | RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table)); |
194 | 0 | yb_tables.push_back(consumer_table); |
195 | 0 | } |
196 | |
|
197 | 0 | return yb_tables; |
198 | 0 | } |
199 | | |
200 | 0 | Status InitPostgres(Cluster* cluster) { |
201 | 0 | auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers()); |
202 | 0 | auto port = cluster->mini_cluster_->AllocateFreePort(); |
203 | 0 | yb::pgwrapper::PgProcessConf pg_process_conf = |
204 | 0 | VERIFY_RESULT(yb::pgwrapper::PgProcessConf::CreateValidateAndRunInitDb( |
205 | 0 | yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), |
206 | 0 | pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", |
207 | 0 | pg_ts->server()->GetSharedMemoryFd())); |
208 | 0 | pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; |
209 | 0 | pg_process_conf.force_disable_log_file = true; |
210 | 0 | FLAGS_pgsql_proxy_webserver_port = cluster->mini_cluster_->AllocateFreePort(); |
211 | |
|
212 | 0 | LOG(INFO) << "Starting PostgreSQL server listening on " |
213 | 0 | << pg_process_conf.listen_addresses << ":" << pg_process_conf.pg_port << ", data: " |
214 | 0 | << pg_process_conf.data_dir |
215 | 0 | << ", pgsql webserver port: " << FLAGS_pgsql_proxy_webserver_port; |
216 | 0 | cluster->pg_supervisor_ = std::make_unique<yb::pgwrapper::PgSupervisor>(pg_process_conf); |
217 | 0 | RETURN_NOT_OK(cluster->pg_supervisor_->Start()); |
218 | |
|
219 | 0 | cluster->pg_host_port_ = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port); |
220 | 0 | return Status::OK(); |
221 | 0 | } |
222 | | |
223 | | Status CreateDatabase(Cluster* cluster, |
224 | | const std::string& namespace_name = kNamespaceName, |
225 | 0 | bool colocated = false) { |
226 | 0 | auto conn = EXPECT_RESULT(cluster->Connect()); |
227 | 0 | EXPECT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1", |
228 | 0 | namespace_name, colocated ? " colocated = true" : "")); |
229 | 0 | return Status::OK(); |
230 | 0 | } |
231 | | |
232 | 0 | Result<string> GetUniverseId(Cluster* cluster) { |
233 | 0 | master::GetMasterClusterConfigRequestPB req; |
234 | 0 | master::GetMasterClusterConfigResponsePB resp; |
235 | |
|
236 | 0 | master::MasterClusterProxy master_proxy( |
237 | 0 | &cluster->client_->proxy_cache(), |
238 | 0 | VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr())); |
239 | |
|
240 | 0 | rpc::RpcController rpc; |
241 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
242 | 0 | RETURN_NOT_OK(master_proxy.GetMasterClusterConfig(req, &resp, &rpc)); |
243 | 0 | if (resp.has_error()) { |
244 | 0 | return STATUS(IllegalState, "Error getting cluster config"); |
245 | 0 | } |
246 | 0 | return resp.cluster_config().cluster_uuid(); |
247 | 0 | } |
248 | | |
249 | | Result<YBTableName> CreateTable(Cluster* cluster, |
250 | | const std::string& namespace_name, |
251 | | const std::string& table_name, |
252 | | const boost::optional<std::string>& tablegroup_name, |
253 | | uint32_t num_tablets, |
254 | | bool colocated = false, |
255 | 0 | const int table_oid = 0) { |
256 | 0 | auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name)); |
257 | 0 | std::string table_oid_string = ""; |
258 | 0 | if (table_oid > 0) { |
259 | | // Need to turn on session flag to allow for CREATE WITH table_oid. |
260 | 0 | EXPECT_OK(conn.Execute("set yb_enable_create_with_table_oid=true")); |
261 | 0 | table_oid_string = Format("table_oid = $0", table_oid); |
262 | 0 | } |
263 | 0 | std::string query = Format("CREATE TABLE $0($1 int PRIMARY KEY) ", table_name, kKeyColumnName); |
264 | | // One cannot use tablegroup together with split into tablets. |
265 | 0 | if (tablegroup_name.has_value()) { |
266 | 0 | std::string with_clause = |
267 | 0 | table_oid_string.empty() ? "" : Format("WITH ($0) ", table_oid_string); |
268 | 0 | std::string tablegroup_clause = Format("TABLEGROUP $0", tablegroup_name.value()); |
269 | 0 | query += Format("$0$1", with_clause, tablegroup_clause); |
270 | 0 | } else { |
271 | 0 | std::string colocated_clause = Format("colocated = $0", colocated); |
272 | 0 | std::string with_clause = |
273 | 0 | table_oid_string.empty() ? colocated_clause |
274 | 0 | : Format("$0, $1", table_oid_string, colocated_clause); |
275 | 0 | query += Format("WITH ($0) SPLIT INTO $1 TABLETS", with_clause, num_tablets); |
276 | 0 | } |
277 | 0 | EXPECT_OK(conn.Execute(query)); |
278 | 0 | return GetTable(cluster, namespace_name, table_name); |
279 | 0 | } |
280 | | |
281 | | Status CreateTable(uint32_t idx, uint32_t num_tablets, Cluster* cluster, |
282 | | std::vector<YBTableName>* tables, |
283 | | const boost::optional<std::string>& tablegroup_name, |
284 | 0 | bool colocated = false) { |
285 | | /* |
286 | | * If we either have tablegroup name or colocated flag |
287 | | * Generate table_oid based on index so that we have the same table_oid for producer/consumer. |
288 | | */ |
289 | 0 | const int table_oid = (tablegroup_name.has_value() || colocated) ? (idx + 1) * 111111 : 0; |
290 | 0 | auto table = VERIFY_RESULT(CreateTable(cluster, kNamespaceName, Format("test_table_$0", idx), |
291 | 0 | tablegroup_name, num_tablets, colocated, table_oid)); |
292 | 0 | tables->push_back(table); |
293 | 0 | return Status::OK(); |
294 | 0 | } |
295 | | |
296 | | Result<YBTableName> GetTable(Cluster* cluster, |
297 | | const std::string& namespace_name, |
298 | | const std::string& table_name, |
299 | | bool verify_table_name = true, |
300 | 0 | bool exclude_system_tables = true) { |
301 | 0 | master::ListTablesRequestPB req; |
302 | 0 | master::ListTablesResponsePB resp; |
303 | |
|
304 | 0 | req.set_name_filter(table_name); |
305 | 0 | req.mutable_namespace_()->set_name(namespace_name); |
306 | 0 | req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL); |
307 | 0 | if (!exclude_system_tables) { |
308 | 0 | req.set_exclude_system_tables(true); |
309 | 0 | req.add_relation_type_filter(master::USER_TABLE_RELATION); |
310 | 0 | } |
311 | |
|
312 | 0 | master::MasterDdlProxy master_proxy( |
313 | 0 | &cluster->client_->proxy_cache(), |
314 | 0 | VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); |
315 | |
|
316 | 0 | rpc::RpcController rpc; |
317 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
318 | 0 | RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc)); |
319 | 0 | if (resp.has_error()) { |
320 | 0 | return STATUS(IllegalState, "Failed listing tables"); |
321 | 0 | } |
322 | | |
323 | | // Now need to find the table and return it. |
324 | 0 | for (const auto& table : resp.tables()) { |
325 | | // If !verify_table_name, just return the first table. |
326 | 0 | if (!verify_table_name || |
327 | 0 | (table.name() == table_name && table.namespace_().name() == namespace_name)) { |
328 | 0 | YBTableName yb_table; |
329 | 0 | yb_table.set_table_id(table.id()); |
330 | 0 | yb_table.set_namespace_id(table.namespace_().id()); |
331 | 0 | yb_table.set_pgschema_name(table.pgschema_name()); |
332 | 0 | return yb_table; |
333 | 0 | } |
334 | 0 | } |
335 | 0 | return STATUS(IllegalState, |
336 | 0 | strings::Substitute("Unable to find table $0 in namespace $1", |
337 | 0 | table_name, namespace_name)); |
338 | 0 | } |
339 | | |
340 | | /* |
341 | | * TODO (#11597): Given one is not able to get tablegroup ID by name, currently this works by |
342 | | * getting the first available tablegroup appearing in the namespace. |
343 | | */ |
344 | 0 | Result<TablegroupId> GetTablegroup(Cluster* cluster, const std::string& namespace_name) { |
345 | | // Lookup the namespace id from the namespace name. |
346 | 0 | std::string namespace_id; |
347 | 0 | { |
348 | 0 | master::ListNamespacesRequestPB req; |
349 | 0 | master::ListNamespacesResponsePB resp; |
350 | 0 | master::MasterDdlProxy master_proxy( |
351 | 0 | &cluster->client_->proxy_cache(), |
352 | 0 | VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); |
353 | |
|
354 | 0 | rpc::RpcController rpc; |
355 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
356 | |
|
357 | 0 | RETURN_NOT_OK(master_proxy.ListNamespaces(req, &resp, &rpc)); |
358 | 0 | if (resp.has_error()) { |
359 | 0 | return STATUS(IllegalState, "Failed to get namespace info"); |
360 | 0 | } |
361 | | |
362 | | // Find and return the namespace id. |
363 | 0 | bool namespaceFound = false; |
364 | 0 | for (const auto& entry : resp.namespaces()) { |
365 | 0 | if (entry.name() == namespace_name) { |
366 | 0 | namespaceFound = true; |
367 | 0 | namespace_id = entry.id(); |
368 | 0 | break; |
369 | 0 | } |
370 | 0 | } |
371 | |
|
372 | 0 | if (!namespaceFound) { |
373 | 0 | return STATUS(IllegalState, "Failed to find namespace"); |
374 | 0 | } |
375 | 0 | } |
376 | | |
377 | 0 | master::ListTablegroupsRequestPB req; |
378 | 0 | master::ListTablegroupsResponsePB resp; |
379 | 0 | master::MasterDdlProxy master_proxy( |
380 | 0 | &cluster->client_->proxy_cache(), |
381 | 0 | VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); |
382 | |
|
383 | 0 | req.set_namespace_id(namespace_id); |
384 | |
|
385 | 0 | rpc::RpcController rpc; |
386 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
387 | |
|
388 | 0 | RETURN_NOT_OK(master_proxy.ListTablegroups(req, &resp, &rpc)); |
389 | 0 | if (resp.has_error()) { |
390 | 0 | return STATUS(IllegalState, "Failed listing tablegroups"); |
391 | 0 | } |
392 | | |
393 | | // Find and return the tablegroup. |
394 | 0 | if (resp.tablegroups().empty()) { |
395 | 0 | return STATUS(IllegalState, |
396 | 0 | Format("Unable to find tablegroup in namespace $0", namespace_name)); |
397 | 0 | } |
398 | | |
399 | 0 | return resp.tablegroups()[0].id() + master::kTablegroupParentTableIdSuffix; |
400 | 0 | } |
401 | | |
402 | | Status CreateTablegroup(Cluster* cluster, |
403 | | const std::string& namespace_name, |
404 | 0 | const std::string& tablegroup_name) { |
405 | 0 | auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name)); |
406 | 0 | EXPECT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", tablegroup_name)); |
407 | 0 | return Status::OK(); |
408 | 0 | } |
409 | | |
410 | | void WriteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table, |
411 | 0 | bool delete_op = false) { |
412 | 0 | auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name())); |
413 | |
|
414 | 0 | LOG(INFO) << "Writing " << end-start << (delete_op ? " deletes" : " inserts"); |
415 | 0 | for (uint32_t i = start; i < end; i++) { |
416 | 0 | if (delete_op) { |
417 | 0 | EXPECT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = $2", |
418 | 0 | table.table_name(), kKeyColumnName, i)); |
419 | 0 | } else { |
420 | | // TODO(#6582) transactions currently don't work, so don't use ON CONFLICT DO NOTHING now. |
421 | 0 | EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2)", // ON CONFLICT DO NOTHING", |
422 | 0 | table.table_name(), kKeyColumnName, i)); |
423 | 0 | } |
424 | 0 | } |
425 | 0 | } |
426 | | |
427 | | void WriteTransactionalWorkload(uint32_t start, uint32_t end, Cluster* cluster, |
428 | 0 | const YBTableName& table) { |
429 | 0 | auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name())); |
430 | 0 | EXPECT_OK(conn.Execute("BEGIN")); |
431 | 0 | for (uint32_t i = start; i < end; i++) { |
432 | 0 | EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2) ON CONFLICT DO NOTHING", |
433 | 0 | table.table_name(), kKeyColumnName, i)); |
434 | 0 | } |
435 | 0 | EXPECT_OK(conn.Execute("COMMIT")); |
436 | 0 | } |
437 | | |
438 | 0 | void DeleteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table) { |
439 | 0 | WriteWorkload(start, end, cluster, table, true /* delete_op */); |
440 | 0 | } |
441 | | |
442 | 0 | PGResultPtr ScanToStrings(const YBTableName& table_name, Cluster* cluster) { |
443 | 0 | auto conn = EXPECT_RESULT(cluster->ConnectToDB(table_name.namespace_name())); |
444 | 0 | auto result = |
445 | 0 | EXPECT_RESULT(conn.FetchFormat("SELECT * FROM $0 ORDER BY key", table_name.table_name())); |
446 | 0 | return result; |
447 | 0 | } |
448 | | |
449 | | Status VerifyWrittenRecords(const YBTableName& producer_table, |
450 | 0 | const YBTableName& consumer_table) { |
451 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
452 | 0 | auto producer_results = ScanToStrings(producer_table, &producer_cluster_); |
453 | 0 | auto consumer_results = ScanToStrings(consumer_table, &consumer_cluster_); |
454 | 0 | if (PQntuples(producer_results.get()) != PQntuples(consumer_results.get())) { |
455 | 0 | return false; |
456 | 0 | } |
457 | 0 | for (int i = 0; i < PQntuples(producer_results.get()); ++i) { |
458 | 0 | auto prod_val = EXPECT_RESULT(ToString(producer_results.get(), i, 0)); |
459 | 0 | auto cons_val = EXPECT_RESULT(ToString(consumer_results.get(), i, 0)); |
460 | 0 | if (prod_val != cons_val) { |
461 | 0 | return false; |
462 | 0 | } |
463 | 0 | } |
464 | 0 | return true; |
465 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify written records"); |
466 | 0 | } |
467 | | |
468 | 0 | Status VerifyNumRecords(const YBTableName& table, Cluster* cluster, int expected_size) { |
469 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
470 | 0 | auto results = ScanToStrings(table, cluster); |
471 | 0 | return PQntuples(results.get()) == expected_size; |
472 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records"); |
473 | 0 | } |
474 | | |
475 | | Status DeleteTable(Cluster* cluster, |
476 | 0 | TableId* table_id /* = nullptr */) { |
477 | 0 | RETURN_NOT_OK(cluster->client_->DeleteTable(*table_id)); |
478 | |
|
479 | 0 | return Status::OK(); |
480 | 0 | } |
481 | | |
482 | | Status TruncateTable(Cluster* cluster, |
483 | 0 | std::vector<string> table_ids) { |
484 | 0 | RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids)); |
485 | 0 | return Status::OK(); |
486 | 0 | } |
487 | | }; |
488 | | |
489 | | INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCYsqlTest, |
490 | | ::testing::Values(TwoDCTestParams(1, true), TwoDCTestParams(1, false), |
491 | | TwoDCTestParams(0, true), TwoDCTestParams(0, false))); |
492 | | |
493 | | |
494 | 0 | TEST_P(TwoDCYsqlTest, SetupUniverseReplication) { |
495 | 0 | YB_SKIP_TEST_IN_TSAN(); |
496 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, 3, 1, false /* colocated */)); |
497 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
498 | |
|
499 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
500 | | // tables contains both producer and consumer universe tables (alternately). |
501 | | // Pick out just the producer tables from the list. |
502 | 0 | producer_tables.reserve(tables.size() / 2); |
503 | 0 | for (size_t i = 0; i < tables.size(); i += 2) { |
504 | 0 | producer_tables.push_back(tables[i]); |
505 | 0 | } |
506 | 0 | ASSERT_OK(SetupUniverseReplication( |
507 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
508 | | |
509 | | // Verify that universe was setup on consumer. |
510 | 0 | master::GetUniverseReplicationResponsePB resp; |
511 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
512 | 0 | ASSERT_EQ(resp.entry().producer_id(), kUniverseId); |
513 | 0 | ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); |
514 | 0 | for (size_t i = 0; i < producer_tables.size(); i++) { |
515 | 0 | ASSERT_EQ(resp.entry().tables(narrow_cast<int>(i)), producer_tables[i]->id()); |
516 | 0 | } |
517 | | |
518 | | // Verify that CDC streams were created on producer for all tables. |
519 | 0 | for (size_t i = 0; i < producer_tables.size(); i++) { |
520 | 0 | master::ListCDCStreamsResponsePB stream_resp; |
521 | 0 | ASSERT_OK(GetCDCStreamForTable(producer_tables[i]->id(), &stream_resp)); |
522 | 0 | ASSERT_EQ(stream_resp.streams_size(), 1); |
523 | 0 | ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_tables[i]->id()); |
524 | 0 | } |
525 | |
|
526 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
527 | 0 | } |
528 | | |
529 | 0 | TEST_P(TwoDCYsqlTest, SimpleReplication) { |
530 | 0 | YB_SKIP_TEST_IN_TSAN(); |
531 | 0 | constexpr int kNTabletsPerTable = 1; |
532 | 0 | std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; |
533 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); |
534 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
535 | | |
536 | | // tables contains both producer and consumer universe tables (alternately). |
537 | | // Pick out just the producer tables from the list. |
538 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
539 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
540 | 0 | producer_tables.reserve(tables.size() / 2); |
541 | 0 | consumer_tables.reserve(tables.size() / 2); |
542 | 0 | for (size_t i = 0; i < tables.size(); ++i) { |
543 | 0 | if (i % 2 == 0) { |
544 | 0 | producer_tables.push_back(tables[i]); |
545 | 0 | } else { |
546 | 0 | consumer_tables.push_back(tables[i]); |
547 | 0 | } |
548 | 0 | } |
549 | | |
550 | | // 1. Write some data. |
551 | 0 | for (const auto& producer_table : producer_tables) { |
552 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
553 | 0 | WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); |
554 | 0 | } |
555 | | |
556 | | // Verify data is written on the producer. |
557 | 0 | for (const auto& producer_table : producer_tables) { |
558 | 0 | auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); |
559 | 0 | ASSERT_EQ(100, PQntuples(producer_results.get())); |
560 | 0 | int result; |
561 | 0 | for (int i = 0; i < 100; ++i) { |
562 | 0 | result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); |
563 | 0 | ASSERT_EQ(i, result); |
564 | 0 | } |
565 | 0 | } |
566 | | |
567 | | // 2. Setup replication. |
568 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
569 | 0 | kUniverseId, producer_tables)); |
570 | | |
571 | | // 3. Verify everything is setup correctly. |
572 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
573 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
574 | 0 | &get_universe_replication_resp)); |
575 | 0 | ASSERT_OK(CorrectlyPollingAllTablets( |
576 | 0 | consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); |
577 | |
|
578 | 0 | auto data_replicated_correctly = [&](int num_results) -> Result<bool> { |
579 | 0 | for (const auto& consumer_table : consumer_tables) { |
580 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
581 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); |
582 | |
|
583 | 0 | if (num_results != PQntuples(consumer_results.get())) { |
584 | 0 | return false; |
585 | 0 | } |
586 | 0 | int result; |
587 | 0 | for (int i = 0; i < num_results; ++i) { |
588 | 0 | result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); |
589 | 0 | if (i != result) { |
590 | 0 | return false; |
591 | 0 | } |
592 | 0 | } |
593 | 0 | } |
594 | 0 | return true; |
595 | 0 | }; |
596 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, |
597 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
598 | | |
599 | | // 4. Write more data. |
600 | 0 | for (const auto& producer_table : producer_tables) { |
601 | 0 | WriteWorkload(100, 105, &producer_cluster_, producer_table->name()); |
602 | 0 | } |
603 | | |
604 | | // 5. Make sure this data is also replicated now. |
605 | 0 | ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); }, |
606 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
607 | 0 | } |
608 | | |
609 | 0 | TEST_P(TwoDCYsqlTest, SetupUniverseReplicationWithProducerBootstrapId) { |
610 | 0 | YB_SKIP_TEST_IN_TSAN(); |
611 | 0 | constexpr int kNTabletsPerTable = 1; |
612 | 0 | std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; |
613 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3)); |
614 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
615 | 0 | auto* producer_leader_mini_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster()); |
616 | 0 | auto producer_master_proxy = std::make_shared<master::MasterReplicationProxy>( |
617 | 0 | &producer_client()->proxy_cache(), |
618 | 0 | producer_leader_mini_master->bound_rpc_addr()); |
619 | |
|
620 | 0 | std::unique_ptr<client::YBClient> client; |
621 | 0 | std::unique_ptr<cdc::CDCServiceProxy> producer_cdc_proxy; |
622 | 0 | client = ASSERT_RESULT(consumer_cluster()->CreateClient()); |
623 | 0 | producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>( |
624 | 0 | &client->proxy_cache(), |
625 | 0 | HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr())); |
626 | | |
627 | | // tables contains both producer and consumer universe tables (alternately). |
628 | | // Pick out just the producer tables from the list. |
629 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
630 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
631 | 0 | producer_tables.reserve(tables.size() / 2); |
632 | 0 | consumer_tables.reserve(tables.size() / 2); |
633 | 0 | for (size_t i = 0; i < tables.size(); ++i) { |
634 | 0 | if (i % 2 == 0) { |
635 | 0 | producer_tables.push_back(tables[i]); |
636 | 0 | } else { |
637 | 0 | consumer_tables.push_back(tables[i]); |
638 | 0 | } |
639 | 0 | } |
640 | | |
641 | | // 1. Write some data so that we can verify that only new records get replicated. |
642 | 0 | for (const auto& producer_table : producer_tables) { |
643 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
644 | 0 | WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); |
645 | 0 | } |
646 | |
|
647 | 0 | SleepFor(MonoDelta::FromSeconds(10)); |
648 | 0 | cdc::BootstrapProducerRequestPB req; |
649 | 0 | cdc::BootstrapProducerResponsePB resp; |
650 | |
|
651 | 0 | for (const auto& producer_table : producer_tables) { |
652 | 0 | req.add_table_ids(producer_table->id()); |
653 | 0 | } |
654 | |
|
655 | 0 | rpc::RpcController rpc; |
656 | 0 | ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &resp, &rpc)); |
657 | 0 | ASSERT_FALSE(resp.has_error()); |
658 | |
|
659 | 0 | ASSERT_EQ(resp.cdc_bootstrap_ids().size(), producer_tables.size()); |
660 | |
|
661 | 0 | int table_idx = 0; |
662 | 0 | for (const auto& bootstrap_id : resp.cdc_bootstrap_ids()) { |
663 | 0 | LOG(INFO) << "Got bootstrap id " << bootstrap_id |
664 | 0 | << " for table " << producer_tables[table_idx++]->name().table_name(); |
665 | 0 | } |
666 | |
|
667 | 0 | std::unordered_map<std::string, int> tablet_bootstraps; |
668 | | |
669 | | // Verify that for each of the table's tablets, a new row in cdc_state table with the returned |
670 | | // id was inserted. |
671 | 0 | client::TableHandle table; |
672 | 0 | client::YBTableName cdc_state_table( |
673 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
674 | 0 | ASSERT_OK(table.Open(cdc_state_table, producer_client())); |
675 | | |
676 | | // 2 tables with 8 tablets each. |
677 | 0 | ASSERT_EQ(tables_vector.size() * kNTabletsPerTable, boost::size(client::TableRange(table))); |
678 | 0 | int nrows = 0; |
679 | 0 | for (const auto& row : client::TableRange(table)) { |
680 | 0 | nrows++; |
681 | 0 | string stream_id = row.column(0).string_value(); |
682 | 0 | tablet_bootstraps[stream_id]++; |
683 | |
|
684 | 0 | string checkpoint = row.column(2).string_value(); |
685 | 0 | auto s = OpId::FromString(checkpoint); |
686 | 0 | ASSERT_OK(s); |
687 | 0 | OpId op_id = *s; |
688 | 0 | ASSERT_GT(op_id.index, 0); |
689 | |
|
690 | 0 | LOG(INFO) << "Bootstrap id " << stream_id |
691 | 0 | << " for tablet " << row.column(1).string_value(); |
692 | 0 | } |
693 | |
|
694 | 0 | ASSERT_EQ(tablet_bootstraps.size(), producer_tables.size()); |
695 | | // Check that each bootstrap id has 8 tablets. |
696 | 0 | for (const auto& e : tablet_bootstraps) { |
697 | 0 | ASSERT_EQ(e.second, kNTabletsPerTable); |
698 | 0 | } |
699 | | |
700 | | // Map table -> bootstrap_id. We will need when setting up replication. |
701 | 0 | std::unordered_map<TableId, std::string> table_bootstrap_ids; |
702 | 0 | for (int i = 0; i < resp.cdc_bootstrap_ids_size(); i++) { |
703 | 0 | table_bootstrap_ids[req.table_ids(i)] = resp.cdc_bootstrap_ids(i); |
704 | 0 | } |
705 | | |
706 | | // 2. Setup replication. |
707 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
708 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
709 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
710 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
711 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
712 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
713 | |
|
714 | 0 | setup_universe_req.mutable_producer_table_ids()->Reserve( |
715 | 0 | narrow_cast<int>(producer_tables.size())); |
716 | 0 | for (const auto& producer_table : producer_tables) { |
717 | 0 | setup_universe_req.add_producer_table_ids(producer_table->id()); |
718 | 0 | const auto& iter = table_bootstrap_ids.find(producer_table->id()); |
719 | 0 | ASSERT_NE(iter, table_bootstrap_ids.end()); |
720 | 0 | setup_universe_req.add_producer_bootstrap_ids(iter->second); |
721 | 0 | } |
722 | |
|
723 | 0 | auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); |
724 | 0 | master::MasterReplicationProxy master_proxy( |
725 | 0 | &consumer_client()->proxy_cache(), |
726 | 0 | consumer_leader_mini_master->bound_rpc_addr()); |
727 | |
|
728 | 0 | rpc.Reset(); |
729 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
730 | 0 | ASSERT_OK(master_proxy.SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); |
731 | 0 | ASSERT_FALSE(setup_universe_resp.has_error()); |
732 | | |
733 | | // 3. Verify everything is setup correctly. |
734 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
735 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
736 | 0 | &get_universe_replication_resp)); |
737 | 0 | ASSERT_OK(CorrectlyPollingAllTablets( |
738 | 0 | consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); |
739 | | |
740 | | // 4. Write more data. |
741 | 0 | for (const auto& producer_table : producer_tables) { |
742 | 0 | WriteWorkload(1000, 1005, &producer_cluster_, producer_table->name()); |
743 | 0 | } |
744 | | |
745 | | // 5. Verify that only new writes get replicated to consumer since we bootstrapped the producer |
746 | | // after we had already written some data, therefore the old data (whatever was there before we |
747 | | // bootstrapped the producer) should not be replicated. |
748 | 0 | auto data_replicated_correctly = [&]() -> Result<bool> { |
749 | 0 | for (const auto& consumer_table : consumer_tables) { |
750 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
751 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); |
752 | |
|
753 | 0 | if (5 != PQntuples(consumer_results.get())) { |
754 | 0 | return false; |
755 | 0 | } |
756 | 0 | int result; |
757 | 0 | for (int i = 0; i < 5; ++i) { |
758 | 0 | result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); |
759 | 0 | if ((1000 + i) != result) { |
760 | 0 | return false; |
761 | 0 | } |
762 | 0 | } |
763 | 0 | } |
764 | 0 | return true; |
765 | 0 | }; |
766 | |
|
767 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(); }, |
768 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
769 | 0 | } |
770 | | |
771 | 0 | TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { |
772 | 0 | YB_SKIP_TEST_IN_TSAN(); |
773 | 0 | constexpr int kNTabletsPerColocatedTable = 1; |
774 | 0 | constexpr int kNTabletsPerTable = 3; |
775 | 0 | std::vector<uint32_t> tables_vector = {kNTabletsPerColocatedTable, kNTabletsPerColocatedTable}; |
776 | | // Create two colocated tables on each cluster. |
777 | 0 | auto colocated_tables = |
778 | 0 | ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3, 1, true /* colocated */)); |
779 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
780 | | |
781 | | // Also create an additional non-colocated table in each database. |
782 | 0 | auto non_colocated_table = ASSERT_RESULT(CreateTable(&producer_cluster_, |
783 | 0 | kNamespaceName, |
784 | 0 | "test_table_2", |
785 | 0 | boost::none /* tablegroup */, |
786 | 0 | kNTabletsPerTable, |
787 | 0 | false /* colocated */)); |
788 | 0 | std::shared_ptr<client::YBTable> non_colocated_producer_table; |
789 | 0 | ASSERT_OK(producer_client()->OpenTable(non_colocated_table, &non_colocated_producer_table)); |
790 | 0 | non_colocated_table = ASSERT_RESULT(CreateTable(&consumer_cluster_, |
791 | 0 | kNamespaceName, |
792 | 0 | "test_table_2", |
793 | 0 | boost::none /* tablegroup */, |
794 | 0 | kNTabletsPerTable, |
795 | 0 | false /* colocated */)); |
796 | 0 | std::shared_ptr<client::YBTable> non_colocated_consumer_table; |
797 | 0 | ASSERT_OK(consumer_client()->OpenTable(non_colocated_table, &non_colocated_consumer_table)); |
798 | | |
799 | | // colocated_tables contains both producer and consumer universe tables (alternately). |
800 | | // Pick out just the producer tables from the list. |
801 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
802 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
803 | 0 | std::vector<std::shared_ptr<client::YBTable>> colocated_producer_tables; |
804 | 0 | std::vector<std::shared_ptr<client::YBTable>> colocated_consumer_tables; |
805 | 0 | producer_tables.reserve(colocated_tables.size() / 2 + 1); |
806 | 0 | consumer_tables.reserve(colocated_tables.size() / 2 + 1); |
807 | 0 | colocated_producer_tables.reserve(colocated_tables.size() / 2); |
808 | 0 | colocated_consumer_tables.reserve(colocated_tables.size() / 2); |
809 | 0 | for (size_t i = 0; i < colocated_tables.size(); ++i) { |
810 | 0 | if (i % 2 == 0) { |
811 | 0 | producer_tables.push_back(colocated_tables[i]); |
812 | 0 | colocated_producer_tables.push_back(colocated_tables[i]); |
813 | 0 | } else { |
814 | 0 | consumer_tables.push_back(colocated_tables[i]); |
815 | 0 | colocated_consumer_tables.push_back(colocated_tables[i]); |
816 | 0 | } |
817 | 0 | } |
818 | 0 | producer_tables.push_back(non_colocated_producer_table); |
819 | 0 | consumer_tables.push_back(non_colocated_consumer_table); |
820 | | |
821 | | // 1. Write some data to all tables. |
822 | 0 | for (const auto& producer_table : producer_tables) { |
823 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
824 | 0 | WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); |
825 | 0 | } |
826 | | |
827 | | // 2. Setup replication for only the colocated tables. |
828 | | // Get the producer namespace id, so we can construct the colocated parent table id. |
829 | 0 | GetNamespaceInfoResponsePB ns_resp; |
830 | 0 | ASSERT_OK(producer_client()->GetNamespaceInfo("", kNamespaceName, YQL_DATABASE_PGSQL, &ns_resp)); |
831 | |
|
832 | 0 | rpc::RpcController rpc; |
833 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
834 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
835 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
836 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
837 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
838 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
839 | | // Only need to add the colocated parent table id. |
840 | 0 | setup_universe_req.mutable_producer_table_ids()->Reserve(1); |
841 | 0 | setup_universe_req.add_producer_table_ids( |
842 | 0 | ns_resp.namespace_().id() + master::kColocatedParentTableIdSuffix); |
843 | 0 | auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); |
844 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
845 | 0 | &consumer_client()->proxy_cache(), |
846 | 0 | consumer_leader_mini_master->bound_rpc_addr()); |
847 | |
|
848 | 0 | rpc.Reset(); |
849 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
850 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); |
851 | 0 | ASSERT_FALSE(setup_universe_resp.has_error()); |
852 | | |
853 | | // 3. Verify everything is setup correctly. |
854 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
855 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
856 | 0 | &get_universe_replication_resp)); |
857 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNTabletsPerColocatedTable)); |
858 | | |
859 | | // 4. Check that colocated tables are being replicated. |
860 | 0 | auto data_replicated_correctly = [&](int num_results, bool onlyColocated) -> Result<bool> { |
861 | 0 | auto &tables = onlyColocated ? colocated_consumer_tables : consumer_tables; |
862 | 0 | for (const auto& consumer_table : tables) { |
863 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
864 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); |
865 | |
|
866 | 0 | if (num_results != PQntuples(consumer_results.get())) { |
867 | 0 | return false; |
868 | 0 | } |
869 | 0 | int result; |
870 | 0 | for (int i = 0; i < num_results; ++i) { |
871 | 0 | result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); |
872 | 0 | if (i != result) { |
873 | 0 | return false; |
874 | 0 | } |
875 | 0 | } |
876 | 0 | } |
877 | 0 | return true; |
878 | 0 | }; |
879 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100, true); }, |
880 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
881 | | // Ensure that the non colocated table is not replicated. |
882 | 0 | auto non_coloc_results = ScanToStrings(non_colocated_consumer_table->name(), &consumer_cluster_); |
883 | 0 | ASSERT_EQ(0, PQntuples(non_coloc_results.get())); |
884 | | |
885 | | // 5. Add the regular table to replication. |
886 | | // Prepare and send AlterUniverseReplication command. |
887 | 0 | master::AlterUniverseReplicationRequestPB alter_req; |
888 | 0 | master::AlterUniverseReplicationResponsePB alter_resp; |
889 | 0 | alter_req.set_producer_id(kUniverseId); |
890 | 0 | alter_req.add_producer_table_ids_to_add(non_colocated_producer_table->id()); |
891 | |
|
892 | 0 | rpc.Reset(); |
893 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
894 | 0 | ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); |
895 | 0 | ASSERT_FALSE(alter_resp.has_error()); |
896 | | // Wait until we have 2 tables (colocated tablet + regular table) logged. |
897 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
898 | 0 | master::GetUniverseReplicationResponsePB tmp_resp; |
899 | 0 | return VerifyUniverseReplication(consumer_cluster(), consumer_client(), |
900 | 0 | kUniverseId, &tmp_resp).ok() && |
901 | 0 | tmp_resp.entry().tables_size() == 2; |
902 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); |
903 | |
|
904 | 0 | ASSERT_OK(CorrectlyPollingAllTablets( |
905 | 0 | consumer_cluster(), kNTabletsPerColocatedTable + kNTabletsPerTable)); |
906 | | // Check that all data is replicated for the new table as well. |
907 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100, false); }, |
908 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
909 | | |
910 | | // 6. Add additional data to all tables |
911 | 0 | for (const auto& producer_table : producer_tables) { |
912 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
913 | 0 | WriteWorkload(100, 150, &producer_cluster_, producer_table->name()); |
914 | 0 | } |
915 | | |
916 | | // 7. Verify all tables are properly replicated. |
917 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(150, false); }, |
918 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
919 | 0 | } |
920 | | |
921 | 0 | TEST_P(TwoDCYsqlTest, ColocatedDatabaseDifferentTableOids) { |
922 | 0 | YB_SKIP_TEST_IN_TSAN(); |
923 | 0 | auto colocated_tables = ASSERT_RESULT(SetUpWithParams({}, {}, 3, 1, true /* colocated */)); |
924 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
925 | | |
926 | | // Create two tables with different table oids. |
927 | 0 | auto conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(kNamespaceName)); |
928 | 0 | auto table_info = ASSERT_RESULT(CreateTable(&producer_cluster_, |
929 | 0 | kNamespaceName, |
930 | 0 | "test_table_0", |
931 | 0 | boost::none /* tablegroup */, |
932 | 0 | 1 /* num_tablets */, |
933 | 0 | true /* colocated */, |
934 | 0 | 123456 /* table_oid */)); |
935 | 0 | ASSERT_RESULT(CreateTable(&consumer_cluster_, |
936 | 0 | kNamespaceName, |
937 | 0 | "test_table_0", |
938 | 0 | boost::none /* tablegroup */, |
939 | 0 | 1 /* num_tablets */, |
940 | 0 | true /* colocated */, |
941 | 0 | 123457 /* table_oid */)); |
942 | 0 | std::shared_ptr<client::YBTable> producer_table; |
943 | 0 | ASSERT_OK(producer_client()->OpenTable(table_info, &producer_table)); |
944 | | |
945 | | // Try to setup replication, should fail on schema validation due to different table oids. |
946 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
947 | 0 | kUniverseId, {producer_table})); |
948 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
949 | 0 | ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
950 | 0 | &get_universe_replication_resp)); |
951 | 0 | } |
952 | | |
953 | 0 | TEST_P(TwoDCYsqlTest, TablegroupReplication) { |
954 | 0 | YB_SKIP_TEST_IN_TSAN(); |
955 | |
|
956 | 0 | std::vector<uint32_t> tables_vector = {1, 1}; |
957 | 0 | boost::optional<std::string> kTablegroupName("mytablegroup"); |
958 | 0 | auto tables = ASSERT_RESULT( |
959 | 0 | SetUpWithParams(tables_vector, tables_vector, 1, 1, false /* colocated */, kTablegroupName)); |
960 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
961 | | |
962 | | // tables contains both producer and consumer universe tables (alternately). |
963 | | // Pick out just the producer tables from the list. |
964 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
965 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
966 | 0 | producer_tables.reserve(tables.size() / 2); |
967 | 0 | consumer_tables.reserve(tables.size() / 2); |
968 | 0 | for (size_t i = 0; i < tables.size(); ++i) { |
969 | 0 | if (i % 2 == 0) { |
970 | 0 | producer_tables.push_back(tables[i]); |
971 | 0 | } else { |
972 | 0 | consumer_tables.push_back(tables[i]); |
973 | 0 | } |
974 | 0 | } |
975 | | |
976 | | // 1. Write some data to all tables. |
977 | 0 | for (const auto& producer_table : producer_tables) { |
978 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
979 | 0 | WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); |
980 | 0 | } |
981 | | |
982 | | // 2. Setup replication for the tablegroup. |
983 | 0 | auto tablegroup_id = ASSERT_RESULT(GetTablegroup(&producer_cluster_, kNamespaceName)); |
984 | 0 | LOG(INFO) << "Tablegroup id to replicate: " << tablegroup_id; |
985 | |
|
986 | 0 | rpc::RpcController rpc; |
987 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
988 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
989 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
990 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
991 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
992 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
993 | 0 | setup_universe_req.mutable_producer_table_ids()->Reserve(1); |
994 | 0 | setup_universe_req.add_producer_table_ids(tablegroup_id); |
995 | |
|
996 | 0 | auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); |
997 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
998 | 0 | &consumer_client()->proxy_cache(), |
999 | 0 | consumer_leader_mini_master->bound_rpc_addr()); |
1000 | |
|
1001 | 0 | rpc.Reset(); |
1002 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1003 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); |
1004 | 0 | ASSERT_FALSE(setup_universe_resp.has_error()); |
1005 | | |
1006 | | // 3. Verify everything is setup correctly. |
1007 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
1008 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
1009 | 0 | &get_universe_replication_resp)); |
1010 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); |
1011 | | |
1012 | | // 4. Check that tables are being replicated. |
1013 | 0 | auto data_replicated_correctly = [&](int num_results) -> Result<bool> { |
1014 | 0 | for (const auto& consumer_table : consumer_tables) { |
1015 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
1016 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); |
1017 | |
|
1018 | 0 | if (num_results != PQntuples(consumer_results.get())) { |
1019 | 0 | return false; |
1020 | 0 | } |
1021 | 0 | int result; |
1022 | 0 | for (int i = 0; i < num_results; ++i) { |
1023 | 0 | result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); |
1024 | 0 | if (i != result) { |
1025 | 0 | return false; |
1026 | 0 | } |
1027 | 0 | } |
1028 | 0 | } |
1029 | 0 | return true; |
1030 | 0 | }; |
1031 | |
|
1032 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, |
1033 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
1034 | | |
1035 | | // 5. Write more data. |
1036 | 0 | for (const auto& producer_table : producer_tables) { |
1037 | 0 | WriteWorkload(100, 105, &producer_cluster_, producer_table->name()); |
1038 | 0 | } |
1039 | |
|
1040 | 0 | ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); }, |
1041 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
1042 | 0 | } |
1043 | | |
1044 | 0 | TEST_P(TwoDCYsqlTest, TablegroupReplicationMismatch) { |
1045 | 0 | YB_SKIP_TEST_IN_TSAN(); |
1046 | 0 | ASSERT_OK(Initialize(1 /* replication_factor */)); |
1047 | |
|
1048 | 0 | boost::optional<std::string> tablegroup_name("mytablegroup"); |
1049 | |
|
1050 | 0 | ASSERT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, false /* colocated */)); |
1051 | 0 | ASSERT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, false /* colocated */)); |
1052 | 0 | ASSERT_OK(CreateTablegroup(&producer_cluster_, kNamespaceName, tablegroup_name.get())); |
1053 | 0 | ASSERT_OK(CreateTablegroup(&consumer_cluster_, kNamespaceName, tablegroup_name.get())); |
1054 | | |
1055 | | // We intentionally set up so that the number of producer and consumer tables don't match. |
1056 | | // The replication should fail during validation. |
1057 | 0 | const uint32_t num_producer_tables = 2; |
1058 | 0 | const uint32_t num_consumer_tables = 3; |
1059 | 0 | std::vector<YBTableName> tables; |
1060 | 0 | for (uint32_t i = 0; i < num_producer_tables; i++) { |
1061 | 0 | ASSERT_OK(CreateTable(i, 1 /* num_tablets */, &producer_cluster_, |
1062 | 0 | &tables, tablegroup_name, false /* colocated */)); |
1063 | 0 | } |
1064 | 0 | for (uint32_t i = 0; i < num_consumer_tables; i++) { |
1065 | 0 | ASSERT_OK(CreateTable(i, 1 /* num_tablets */, &consumer_cluster_, |
1066 | 0 | &tables, tablegroup_name, false /* colocated */)); |
1067 | 0 | } |
1068 | |
|
1069 | 0 | auto tablegroup_id = ASSERT_RESULT(GetTablegroup(&producer_cluster_, kNamespaceName)); |
1070 | | |
1071 | | // Try to set up replication. |
1072 | 0 | rpc::RpcController rpc; |
1073 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
1074 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
1075 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
1076 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
1077 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
1078 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
1079 | 0 | setup_universe_req.mutable_producer_table_ids()->Reserve(1); |
1080 | 0 | setup_universe_req.add_producer_table_ids(tablegroup_id); |
1081 | |
|
1082 | 0 | auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); |
1083 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1084 | 0 | &consumer_client()->proxy_cache(), |
1085 | 0 | consumer_leader_mini_master->bound_rpc_addr()); |
1086 | |
|
1087 | 0 | rpc.Reset(); |
1088 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1089 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); |
1090 | 0 | ASSERT_FALSE(setup_universe_resp.has_error()); |
1091 | | |
1092 | | // The schema validation should fail. |
1093 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
1094 | 0 | ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
1095 | 0 | &get_universe_replication_resp)); |
1096 | 0 | } |
1097 | | |
1098 | | // TODO adapt rest of twodc-test.cc tests. |
1099 | | |
1100 | 0 | TEST_P(TwoDCYsqlTest, DeleteTableChecks) { |
1101 | 0 | YB_SKIP_TEST_IN_TSAN(); |
1102 | 0 | constexpr int kNTabletsPerTable = 1; |
1103 | 0 | std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; |
1104 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); |
1105 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
1106 | | |
1107 | | // tables contains both producer and consumer universe tables (alternately). |
1108 | | // Pick out just the producer tables from the list. |
1109 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1110 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
1111 | 0 | producer_tables.reserve(tables.size() / 2); |
1112 | 0 | consumer_tables.reserve(tables.size() / 2); |
1113 | 0 | for (size_t i = 0; i < tables.size(); ++i) { |
1114 | 0 | if (i % 2 == 0) { |
1115 | 0 | producer_tables.push_back(tables[i]); |
1116 | 0 | } else { |
1117 | 0 | consumer_tables.push_back(tables[i]); |
1118 | 0 | } |
1119 | 0 | } |
1120 | | |
1121 | | // 1. Write some data. |
1122 | 0 | for (const auto& producer_table : producer_tables) { |
1123 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
1124 | 0 | WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); |
1125 | 0 | } |
1126 | | |
1127 | | // Verify data is written on the producer. |
1128 | 0 | for (const auto& producer_table : producer_tables) { |
1129 | 0 | auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); |
1130 | 0 | ASSERT_EQ(100, PQntuples(producer_results.get())); |
1131 | 0 | int result; |
1132 | 0 | for (int i = 0; i < 100; ++i) { |
1133 | 0 | result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); |
1134 | 0 | ASSERT_EQ(i, result); |
1135 | 0 | } |
1136 | 0 | } |
1137 | | |
1138 | | // 2. Setup replication. |
1139 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
1140 | 0 | kUniverseId, producer_tables)); |
1141 | | |
1142 | | // 3. Verify everything is setup correctly. |
1143 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
1144 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
1145 | 0 | &get_universe_replication_resp)); |
1146 | 0 | ASSERT_OK(CorrectlyPollingAllTablets( |
1147 | 0 | consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); |
1148 | |
|
1149 | 0 | auto data_replicated_correctly = [&](int num_results) -> Result<bool> { |
1150 | 0 | for (const auto& consumer_table : consumer_tables) { |
1151 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
1152 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); |
1153 | |
|
1154 | 0 | if (num_results != PQntuples(consumer_results.get())) { |
1155 | 0 | return false; |
1156 | 0 | } |
1157 | 0 | int result; |
1158 | 0 | for (int i = 0; i < num_results; ++i) { |
1159 | 0 | result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); |
1160 | 0 | if (i != result) { |
1161 | 0 | return false; |
1162 | 0 | } |
1163 | 0 | } |
1164 | 0 | } |
1165 | 0 | return true; |
1166 | 0 | }; |
1167 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, |
1168 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
1169 | | |
1170 | | // Attempt to destroy the producer and consumer tables. |
1171 | 0 | string producer_table_name = producer_tables[0]->name().ToString(); |
1172 | 0 | string producer_table_id = producer_tables[0]->id(); |
1173 | 0 | string consumer_table_name = consumer_tables[0]->name().ToString(); |
1174 | 0 | string consumer_table_id = consumer_tables[0]->id(); |
1175 | 0 | ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id)); |
1176 | 0 | ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id)); |
1177 | |
|
1178 | 0 | FLAGS_enable_delete_truncate_xcluster_replicated_table = true; |
1179 | 0 | ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id)); |
1180 | 0 | ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id)); |
1181 | 0 | } |
1182 | | |
1183 | 0 | TEST_P(TwoDCYsqlTest, TruncateTableChecks) { |
1184 | 0 | YB_SKIP_TEST_IN_TSAN(); |
1185 | 0 | constexpr int kNTabletsPerTable = 1; |
1186 | 0 | std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; |
1187 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); |
1188 | 0 | const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); |
1189 | | |
1190 | | // tables contains both producer and consumer universe tables (alternately). |
1191 | | // Pick out just the producer tables from the list. |
1192 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1193 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
1194 | 0 | producer_tables.reserve(tables.size() / 2); |
1195 | 0 | consumer_tables.reserve(tables.size() / 2); |
1196 | 0 | for (size_t i = 0; i < tables.size(); ++i) { |
1197 | 0 | if (i % 2 == 0) { |
1198 | 0 | producer_tables.push_back(tables[i]); |
1199 | 0 | } else { |
1200 | 0 | consumer_tables.push_back(tables[i]); |
1201 | 0 | } |
1202 | 0 | } |
1203 | | |
1204 | | // 1. Write some data. |
1205 | 0 | for (const auto& producer_table : producer_tables) { |
1206 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
1207 | 0 | WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); |
1208 | 0 | } |
1209 | | |
1210 | | // Verify data is written on the producer. |
1211 | 0 | for (const auto& producer_table : producer_tables) { |
1212 | 0 | auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); |
1213 | 0 | ASSERT_EQ(100, PQntuples(producer_results.get())); |
1214 | 0 | int result; |
1215 | 0 | for (int i = 0; i < 100; ++i) { |
1216 | 0 | result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); |
1217 | 0 | ASSERT_EQ(i, result); |
1218 | 0 | } |
1219 | 0 | } |
1220 | | |
1221 | | // 2. Setup replication. |
1222 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
1223 | 0 | kUniverseId, producer_tables)); |
1224 | | |
1225 | | // 3. Verify everything is setup correctly. |
1226 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
1227 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
1228 | 0 | &get_universe_replication_resp)); |
1229 | 0 | ASSERT_OK(CorrectlyPollingAllTablets( |
1230 | 0 | consumer_cluster(), narrow_cast<uint32_t>(tables_vector.size() * kNTabletsPerTable))); |
1231 | |
|
1232 | 0 | auto data_replicated_correctly = [&](int num_results) -> Result<bool> { |
1233 | 0 | for (const auto& consumer_table : consumer_tables) { |
1234 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
1235 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); |
1236 | |
|
1237 | 0 | if (num_results != PQntuples(consumer_results.get())) { |
1238 | 0 | return false; |
1239 | 0 | } |
1240 | 0 | int result; |
1241 | 0 | for (int i = 0; i < num_results; ++i) { |
1242 | 0 | result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); |
1243 | 0 | if (i != result) { |
1244 | 0 | return false; |
1245 | 0 | } |
1246 | 0 | } |
1247 | 0 | } |
1248 | 0 | return true; |
1249 | 0 | }; |
1250 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); }, |
1251 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
1252 | | |
1253 | | // Attempt to Truncate the producer and consumer tables. |
1254 | 0 | string producer_table_id = producer_tables[0]->id(); |
1255 | 0 | string consumer_table_id = consumer_tables[0]->id(); |
1256 | 0 | ASSERT_NOK(TruncateTable(&producer_cluster_, {producer_table_id})); |
1257 | 0 | ASSERT_NOK(TruncateTable(&consumer_cluster_, {consumer_table_id})); |
1258 | |
|
1259 | 0 | FLAGS_enable_delete_truncate_xcluster_replicated_table = true; |
1260 | 0 | ASSERT_OK(TruncateTable(&producer_cluster_, {producer_table_id})); |
1261 | 0 | ASSERT_OK(TruncateTable(&consumer_cluster_, {consumer_table_id})); |
1262 | 0 | } |
1263 | | |
1264 | | } // namespace enterprise |
1265 | | } // namespace yb |