/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <algorithm> |
15 | | #include <map> |
16 | | #include <string> |
17 | | #include <utility> |
18 | | #include <chrono> |
19 | | #include <boost/assign.hpp> |
20 | | #include <gflags/gflags.h> |
21 | | #include <gtest/gtest.h> |
22 | | |
23 | | #include "yb/common/ql_value.h" |
24 | | #include "yb/common/schema.h" |
25 | | #include "yb/common/wire_protocol.h" |
26 | | |
27 | | #include "yb/cdc/cdc_service.h" |
28 | | #include "yb/cdc/cdc_service.pb.h" |
29 | | #include "yb/cdc/cdc_service.proxy.h" |
30 | | #include "yb/client/client.h" |
31 | | #include "yb/client/client-test-util.h" |
32 | | #include "yb/client/meta_cache.h" |
33 | | #include "yb/client/schema.h" |
34 | | #include "yb/client/session.h" |
35 | | #include "yb/client/table.h" |
36 | | #include "yb/client/table_alterer.h" |
37 | | #include "yb/client/table_creator.h" |
38 | | #include "yb/client/table_handle.h" |
39 | | #include "yb/client/transaction.h" |
40 | | #include "yb/client/yb_op.h" |
41 | | |
42 | | #include "yb/gutil/stl_util.h" |
43 | | #include "yb/gutil/strings/join.h" |
44 | | #include "yb/gutil/strings/substitute.h" |
45 | | #include "yb/integration-tests/cdc_test_util.h" |
46 | | #include "yb/integration-tests/mini_cluster.h" |
47 | | #include "yb/integration-tests/twodc_test_base.h" |
48 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
49 | | #include "yb/master/catalog_manager_if.h" |
50 | | #include "yb/master/master_defaults.h" |
51 | | #include "yb/master/mini_master.h" |
52 | | #include "yb/master/master_replication.proxy.h" |
53 | | #include "yb/master/master-test-util.h" |
54 | | |
55 | | #include "yb/master/cdc_consumer_registry_service.h" |
56 | | #include "yb/rpc/rpc_controller.h" |
57 | | #include "yb/server/hybrid_clock.h" |
58 | | #include "yb/tablet/tablet.h" |
59 | | #include "yb/tablet/tablet_peer.h" |
60 | | #include "yb/tserver/mini_tablet_server.h" |
61 | | #include "yb/tserver/tablet_server.h" |
62 | | #include "yb/tserver/ts_tablet_manager.h" |
63 | | |
64 | | #include "yb/tserver/cdc_consumer.h" |
65 | | #include "yb/util/atomic.h" |
66 | | #include "yb/util/faststring.h" |
67 | | #include "yb/util/metrics.h" |
68 | | #include "yb/util/random.h" |
69 | | #include "yb/util/status_log.h" |
70 | | #include "yb/util/stopwatch.h" |
71 | | #include "yb/util/test_util.h" |
72 | | |
73 | | using namespace std::literals; |
74 | | |
75 | | DECLARE_int32(replication_factor); |
76 | | DECLARE_bool(enable_ysql); |
77 | | DECLARE_bool(TEST_twodc_write_hybrid_time); |
78 | | DECLARE_int32(cdc_wal_retention_time_secs); |
79 | | DECLARE_int32(replication_failure_delay_exponent); |
80 | | DECLARE_double(TEST_respond_write_failed_probability); |
81 | | DECLARE_int32(cdc_max_apply_batch_num_records); |
82 | | DECLARE_int32(async_replication_idle_delay_ms); |
83 | | DECLARE_int32(async_replication_max_idle_wait); |
84 | | DECLARE_int32(external_intent_cleanup_secs); |
85 | | DECLARE_int32(yb_num_shards_per_tserver); |
86 | | DECLARE_uint64(TEST_yb_inbound_big_calls_parse_delay_ms); |
87 | | DECLARE_int64(rpc_throttle_threshold_bytes); |
88 | | DECLARE_bool(enable_automatic_tablet_splitting); |
89 | | |
90 | | namespace yb { |
91 | | |
92 | | using client::YBClient; |
93 | | using client::YBClientBuilder; |
94 | | using client::YBColumnSchema; |
95 | | using client::YBError; |
96 | | using client::YBSchema; |
97 | | using client::YBSchemaBuilder; |
98 | | using client::YBSession; |
99 | | using client::YBTable; |
100 | | using client::YBTableAlterer; |
101 | | using client::YBTableCreator; |
102 | | using client::YBTableType; |
103 | | using client::YBTableName; |
104 | | using master::MiniMaster; |
105 | | using tserver::MiniTabletServer; |
106 | | using tserver::enterprise::CDCConsumer; |
107 | | |
108 | | namespace enterprise { |
109 | | |
110 | | using SessionTransactionPair = std::pair<client::YBSessionPtr, client::YBTransactionPtr>; |
111 | | |
112 | | class TwoDCTest : public TwoDCTestBase, public testing::WithParamInterface<TwoDCTestParams> { |
113 | | public: |
114 | | Result<std::vector<std::shared_ptr<client::YBTable>>> SetUpWithParams( |
115 | | const std::vector<uint32_t>& num_consumer_tablets, |
116 | | const std::vector<uint32_t>& num_producer_tablets, |
117 | | uint32_t replication_factor, |
118 | 0 | uint32_t num_masters = 1) { |
119 | 0 | FLAGS_enable_ysql = false; |
120 | 0 | TwoDCTestBase::SetUp(); |
121 | 0 | FLAGS_cdc_max_apply_batch_num_records = GetParam().batch_size; |
122 | 0 | FLAGS_cdc_enable_replicate_intents = GetParam().enable_replicate_intents; |
123 | 0 | FLAGS_yb_num_shards_per_tserver = 1; |
124 | |
|
125 | 0 | MiniClusterOptions opts; |
126 | 0 | opts.num_tablet_servers = replication_factor; |
127 | 0 | opts.num_masters = num_masters; |
128 | 0 | FLAGS_replication_factor = replication_factor; |
129 | 0 | opts.cluster_id = "producer"; |
130 | 0 | producer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); |
131 | 0 | RETURN_NOT_OK(producer_cluster()->StartSync()); |
132 | 0 | RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor)); |
133 | |
|
134 | 0 | opts.cluster_id = "consumer"; |
135 | 0 | consumer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); |
136 | 0 | RETURN_NOT_OK(consumer_cluster()->StartSync()); |
137 | 0 | RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor)); |
138 | |
|
139 | 0 | producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient()); |
140 | 0 | consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient()); |
141 | |
|
142 | 0 | RETURN_NOT_OK(clock_->Init()); |
143 | 0 | producer_cluster_.txn_mgr_.emplace(producer_client(), clock_, client::LocalTabletFilter()); |
144 | 0 | consumer_cluster_.txn_mgr_.emplace(consumer_client(), clock_, client::LocalTabletFilter()); |
145 | |
|
146 | 0 | YBSchemaBuilder b; |
147 | 0 | b.AddColumn("c0")->Type(INT32)->NotNull()->HashPrimaryKey(); |
148 | | |
149 | | // Create transactional table. |
150 | 0 | TableProperties table_properties; |
151 | 0 | table_properties.SetTransactional(true); |
152 | 0 | b.SetTableProperties(table_properties); |
153 | 0 | CHECK_OK(b.Build(&schema_)); |
154 | |
|
155 | 0 | YBSchema consumer_schema; |
156 | 0 | table_properties.SetDefaultTimeToLive(0); |
157 | 0 | b.SetTableProperties(table_properties); |
158 | 0 | CHECK_OK(b.Build(&consumer_schema)); |
159 | |
|
160 | 0 | if (num_consumer_tablets.size() != num_producer_tablets.size()) { |
161 | 0 | return STATUS(IllegalState, |
162 | 0 | Format("Num consumer tables: $0 num producer tables: $1 must be equal.", |
163 | 0 | num_consumer_tablets.size(), num_producer_tablets.size())); |
164 | 0 | } |
165 | | |
166 | 0 | std::vector<YBTableName> tables; |
167 | 0 | std::vector<std::shared_ptr<client::YBTable>> yb_tables; |
168 | 0 | for (uint32_t i = 0; i < num_consumer_tablets.size(); i++) { |
169 | 0 | RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], producer_client(), &tables)); |
170 | 0 | std::shared_ptr<client::YBTable> producer_table; |
171 | 0 | RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table)); |
172 | 0 | yb_tables.push_back(producer_table); |
173 | |
|
174 | 0 | RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], consumer_client(), |
175 | 0 | consumer_schema, &tables)); |
176 | 0 | std::shared_ptr<client::YBTable> consumer_table; |
177 | 0 | RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table)); |
178 | 0 | yb_tables.push_back(consumer_table); |
179 | 0 | } |
180 | |
|
181 | 0 | return yb_tables; |
182 | 0 | } |
183 | | |
184 | | Result<YBTableName> CreateTable(YBClient* client, const std::string& namespace_name, |
185 | | const std::string& table_name, uint32_t num_tablets, |
186 | 0 | const YBSchema* schema = nullptr) { |
187 | 0 | YBTableName table(YQL_DATABASE_CQL, namespace_name, table_name); |
188 | 0 | RETURN_NOT_OK(client->CreateNamespaceIfNotExists(table.namespace_name(), |
189 | 0 | table.namespace_type())); |
190 | |
|
191 | 0 | if (!schema) { |
192 | 0 | schema = &schema_; |
193 | 0 | } |
194 | | // Add a table, make sure it reports itself. |
195 | 0 | std::unique_ptr<YBTableCreator> table_creator(client->NewTableCreator()); |
196 | 0 | RETURN_NOT_OK(table_creator->table_name(table) |
197 | 0 | .schema(schema) |
198 | 0 | .table_type(YBTableType::YQL_TABLE_TYPE) |
199 | 0 | .num_tablets(num_tablets) |
200 | 0 | .Create()); |
201 | 0 | return table; |
202 | 0 | } |
203 | | |
204 | | Status CreateTable( |
205 | 0 | uint32_t idx, uint32_t num_tablets, YBClient* client, std::vector<YBTableName>* tables) { |
206 | 0 | auto table = VERIFY_RESULT(CreateTable(client, kNamespaceName, Format("test_table_$0", idx), |
207 | 0 | num_tablets)); |
208 | 0 | tables->push_back(table); |
209 | 0 | return Status::OK(); |
210 | 0 | } |
211 | | |
212 | | Status CreateTable(uint32_t idx, uint32_t num_tablets, YBClient* client, YBSchema schema, |
213 | 0 | std::vector<YBTableName>* tables) { |
214 | 0 | auto table = VERIFY_RESULT(CreateTable(client, kNamespaceName, Format("test_table_$0", idx), |
215 | 0 | num_tablets, &schema)); |
216 | 0 | tables->push_back(table); |
217 | 0 | return Status::OK(); |
218 | 0 | } |
219 | | |
220 | | void WriteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table, |
221 | 0 | bool delete_op = false) { |
222 | 0 | auto session = client->NewSession(); |
223 | 0 | client::TableHandle table_handle; |
224 | 0 | ASSERT_OK(table_handle.Open(table, client)); |
225 | 0 | std::vector<std::shared_ptr<client::YBqlOp>> ops; |
226 | |
|
227 | 0 | LOG(INFO) << "Writing " << end-start << (delete_op ? " deletes" : " inserts"); |
228 | 0 | for (uint32_t i = start; i < end; i++) { |
229 | 0 | auto op = delete_op ? table_handle.NewDeleteOp() : table_handle.NewInsertOp(); |
230 | 0 | int32_t key = i; |
231 | 0 | auto req = op->mutable_request(); |
232 | 0 | QLAddInt32HashValue(req, key); |
233 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
234 | 0 | } |
235 | 0 | } |
236 | | |
237 | 0 | void DeleteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table) { |
238 | 0 | WriteWorkload(start, end, client, table, true /* delete_op */); |
239 | 0 | } |
240 | | |
241 | 0 | std::vector<string> ScanToStrings(const YBTableName& table_name, YBClient* client) { |
242 | 0 | client::TableHandle table; |
243 | 0 | EXPECT_OK(table.Open(table_name, client)); |
244 | 0 | auto result = ScanTableToStrings(table); |
245 | 0 | std::sort(result.begin(), result.end()); |
246 | 0 | return result; |
247 | 0 | } |
248 | | |
249 | | Status VerifyWrittenRecords(const YBTableName& producer_table, |
250 | | const YBTableName& consumer_table, |
251 | 0 | int timeout_secs = kRpcTimeout) { |
252 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
253 | 0 | auto producer_results = ScanToStrings(producer_table, producer_client()); |
254 | 0 | auto consumer_results = ScanToStrings(consumer_table, consumer_client()); |
255 | 0 | return producer_results == consumer_results; |
256 | 0 | }, MonoDelta::FromSeconds(timeout_secs), "Verify written records"); |
257 | 0 | } |
258 | | |
259 | 0 | Status VerifyNumRecords(const YBTableName& table, YBClient* client, size_t expected_size) { |
260 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
261 | 0 | auto results = ScanToStrings(table, client); |
262 | 0 | return results.size() == expected_size; |
263 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records"); |
264 | 0 | } |
265 | | |
266 | | Result<SessionTransactionPair> CreateSessionWithTransaction( |
267 | 0 | YBClient* client, client::TransactionManager* txn_mgr) { |
268 | 0 | auto session = client->NewSession(); |
269 | 0 | auto transaction = std::make_shared<client::YBTransaction>(txn_mgr); |
270 | 0 | ReadHybridTime read_time; |
271 | 0 | RETURN_NOT_OK(transaction->Init(IsolationLevel::SNAPSHOT_ISOLATION, read_time)); |
272 | 0 | session->SetTransaction(transaction); |
273 | 0 | return std::make_pair(session, transaction); |
274 | 0 | } |
275 | | |
276 | | void WriteIntents(uint32_t start, uint32_t end, YBClient* client, |
277 | | const std::shared_ptr<YBSession>& session, const YBTableName& table, |
278 | 0 | bool delete_op = false) { |
279 | 0 | client::TableHandle table_handle; |
280 | 0 | ASSERT_OK(table_handle.Open(table, client)); |
281 | 0 | std::vector<std::shared_ptr<client::YBqlOp>> ops; |
282 | |
|
283 | 0 | for (uint32_t i = start; i < end; i++) { |
284 | 0 | auto op = delete_op ? table_handle.NewDeleteOp() : table_handle.NewInsertOp(); |
285 | 0 | int32_t key = i; |
286 | 0 | auto req = op->mutable_request(); |
287 | 0 | QLAddInt32HashValue(req, key); |
288 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
289 | 0 | } |
290 | 0 | } |
291 | | |
292 | | void WriteTransactionalWorkload(uint32_t start, uint32_t end, YBClient* client, |
293 | | client::TransactionManager* txn_mgr, const YBTableName& table, |
294 | 0 | bool delete_op = false) { |
295 | 0 | auto pair = ASSERT_RESULT(CreateSessionWithTransaction(client, txn_mgr)); |
296 | 0 | ASSERT_NO_FATALS(WriteIntents(start, end, client, pair.first, table, delete_op)); |
297 | 0 | ASSERT_OK(pair.second->CommitFuture().get()); |
298 | 0 | } |
299 | | |
300 | | private: |
301 | | server::ClockPtr clock_{new server::HybridClock()}; |
302 | | |
303 | | YBSchema schema_; |
304 | | }; |
305 | | |
306 | | INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCTest, |
307 | | ::testing::Values(TwoDCTestParams(1, true), TwoDCTestParams(1, false), |
308 | | TwoDCTestParams(0, true), TwoDCTestParams(0, false))); |
309 | | |
310 | 0 | TEST_P(TwoDCTest, SetupUniverseReplication) { |
311 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, 3)); |
312 | |
|
313 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
314 | | // tables contains both producer and consumer universe tables (alternately). |
315 | | // Pick out just the producer tables from the list. |
316 | 0 | producer_tables.reserve(tables.size() / 2); |
317 | 0 | for (size_t i = 0; i < tables.size(); i += 2) { |
318 | 0 | producer_tables.push_back(tables[i]); |
319 | 0 | } |
320 | 0 | ASSERT_OK(SetupUniverseReplication( |
321 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
322 | | |
323 | | // Verify that universe was setup on consumer. |
324 | 0 | master::GetUniverseReplicationResponsePB resp; |
325 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
326 | 0 | ASSERT_EQ(resp.entry().producer_id(), kUniverseId); |
327 | 0 | ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); |
328 | 0 | for (uint32_t i = 0; i < producer_tables.size(); i++) { |
329 | 0 | ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id()); |
330 | 0 | } |
331 | | |
332 | | // Verify that CDC streams were created on producer for all tables. |
333 | 0 | for (size_t i = 0; i < producer_tables.size(); i++) { |
334 | 0 | master::ListCDCStreamsResponsePB stream_resp; |
335 | 0 | ASSERT_OK(GetCDCStreamForTable(producer_tables[i]->id(), &stream_resp)); |
336 | 0 | ASSERT_EQ(stream_resp.streams_size(), 1); |
337 | 0 | ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), producer_tables[i]->id()); |
338 | 0 | } |
339 | |
|
340 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
341 | 0 | } |
342 | | |
343 | 0 | TEST_P(TwoDCTest, SetupUniverseReplicationErrorChecking) { |
344 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, 1)); |
345 | 0 | rpc::RpcController rpc; |
346 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
347 | 0 | &consumer_client()->proxy_cache(), |
348 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
349 | |
|
350 | 0 | { |
351 | 0 | rpc.Reset(); |
352 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
353 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
354 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
355 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication( |
356 | 0 | setup_universe_req, &setup_universe_resp, &rpc)); |
357 | 0 | ASSERT_TRUE(setup_universe_resp.has_error()); |
358 | 0 | std::string prefix = "Producer universe ID must be provided"; |
359 | 0 | ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix); |
360 | 0 | } |
361 | |
|
362 | 0 | { |
363 | 0 | rpc.Reset(); |
364 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
365 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
366 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
367 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
368 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication( |
369 | 0 | setup_universe_req, &setup_universe_resp, &rpc)); |
370 | 0 | ASSERT_TRUE(setup_universe_resp.has_error()); |
371 | 0 | std::string prefix = "Producer master address must be provided"; |
372 | 0 | ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix); |
373 | 0 | } |
374 | |
|
375 | 0 | { |
376 | 0 | rpc.Reset(); |
377 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
378 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
379 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
380 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
381 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
382 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
383 | 0 | setup_universe_req.add_producer_table_ids("a"); |
384 | 0 | setup_universe_req.add_producer_table_ids("b"); |
385 | 0 | setup_universe_req.add_producer_bootstrap_ids("c"); |
386 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
387 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication( |
388 | 0 | setup_universe_req, &setup_universe_resp, &rpc)); |
389 | 0 | ASSERT_TRUE(setup_universe_resp.has_error()); |
390 | 0 | std::string prefix = "Number of bootstrap ids must be equal to number of tables"; |
391 | 0 | ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix); |
392 | 0 | } |
393 | |
|
394 | 0 | { |
395 | 0 | rpc.Reset(); |
396 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
397 | |
|
398 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
399 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
400 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
401 | 0 | string master_addr = consumer_cluster()->GetMasterAddresses(); |
402 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
403 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
404 | |
|
405 | 0 | setup_universe_req.add_producer_table_ids("prod_table_id_1"); |
406 | 0 | setup_universe_req.add_producer_table_ids("prod_table_id_2"); |
407 | 0 | setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_1"); |
408 | 0 | setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_2"); |
409 | |
|
410 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication( |
411 | 0 | setup_universe_req, &setup_universe_resp, &rpc)); |
412 | 0 | ASSERT_TRUE(setup_universe_resp.has_error()); |
413 | 0 | std::string prefix = "Duplicate between request master addresses"; |
414 | 0 | ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix); |
415 | 0 | } |
416 | |
|
417 | 0 | { |
418 | 0 | rpc.Reset(); |
419 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
420 | |
|
421 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
422 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
423 | 0 | master::SysClusterConfigEntryPB cluster_info; |
424 | 0 | auto& cm = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->catalog_manager(); |
425 | 0 | CHECK_OK(cm.GetClusterConfig(&cluster_info)); |
426 | 0 | setup_universe_req.set_producer_id(cluster_info.cluster_uuid()); |
427 | |
|
428 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
429 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
430 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
431 | |
|
432 | 0 | setup_universe_req.add_producer_table_ids("prod_table_id_1"); |
433 | 0 | setup_universe_req.add_producer_table_ids("prod_table_id_2"); |
434 | 0 | setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_1"); |
435 | 0 | setup_universe_req.add_producer_bootstrap_ids("prod_bootstrap_id_2"); |
436 | |
|
437 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication( |
438 | 0 | setup_universe_req, &setup_universe_resp, &rpc)); |
439 | 0 | ASSERT_TRUE(setup_universe_resp.has_error()); |
440 | 0 | std::string prefix = "The request UUID and cluster UUID are identical."; |
441 | 0 | ASSERT_TRUE(setup_universe_resp.error().status().message().substr(0, prefix.size()) == prefix); |
442 | 0 | } |
443 | 0 | } |
444 | | |
445 | 0 | TEST_P(TwoDCTest, SetupUniverseReplicationWithProducerBootstrapId) { |
446 | 0 | constexpr int kNTabletsPerTable = 1; |
447 | 0 | std::vector<uint32_t> tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; |
448 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3)); |
449 | |
|
450 | 0 | std::unique_ptr<client::YBClient> client; |
451 | 0 | std::unique_ptr<cdc::CDCServiceProxy> producer_cdc_proxy; |
452 | 0 | client = ASSERT_RESULT(consumer_cluster()->CreateClient()); |
453 | 0 | producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>( |
454 | 0 | &client->proxy_cache(), |
455 | 0 | HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr())); |
456 | | |
457 | | // tables contains both producer and consumer universe tables (alternately). |
458 | | // Pick out just the producer tables from the list. |
459 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
460 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
461 | 0 | producer_tables.reserve(tables.size() / 2); |
462 | 0 | consumer_tables.reserve(tables.size() / 2); |
463 | 0 | for (size_t i = 0; i < tables.size(); i ++) { |
464 | 0 | if (i % 2 == 0) { |
465 | 0 | producer_tables.push_back(tables[i]); |
466 | 0 | } else { |
467 | 0 | consumer_tables.push_back(tables[i]); |
468 | 0 | } |
469 | 0 | } |
470 | | |
471 | | // 1. Write some data so that we can verify that only new records get replicated |
472 | 0 | for (const auto& producer_table : producer_tables) { |
473 | 0 | LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); |
474 | 0 | WriteWorkload(0, 100, producer_client(), producer_table->name()); |
475 | 0 | } |
476 | |
|
477 | 0 | SleepFor(MonoDelta::FromSeconds(10)); |
478 | 0 | cdc::BootstrapProducerRequestPB req; |
479 | 0 | cdc::BootstrapProducerResponsePB resp; |
480 | |
|
481 | 0 | for (const auto& producer_table : producer_tables) { |
482 | 0 | req.add_table_ids(producer_table->id()); |
483 | 0 | } |
484 | |
|
485 | 0 | rpc::RpcController rpc; |
486 | 0 | ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &resp, &rpc)); |
487 | 0 | ASSERT_FALSE(resp.has_error()); |
488 | |
|
489 | 0 | ASSERT_EQ(resp.cdc_bootstrap_ids().size(), producer_tables.size()); |
490 | |
|
491 | 0 | int table_idx = 0; |
492 | 0 | for (const auto& bootstrap_id : resp.cdc_bootstrap_ids()) { |
493 | 0 | LOG(INFO) << "Got bootstrap id " << bootstrap_id |
494 | 0 | << " for table " << producer_tables[table_idx++]->name().table_name(); |
495 | 0 | } |
496 | |
|
497 | 0 | std::unordered_map<std::string, int> tablet_bootstraps; |
498 | | |
499 | | // Verify that for each of the table's tablets, a new row in cdc_state table with the returned |
500 | | // id was inserted. |
501 | 0 | client::TableHandle table; |
502 | 0 | client::YBTableName cdc_state_table( |
503 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
504 | 0 | ASSERT_OK(table.Open(cdc_state_table, producer_client())); |
505 | | |
506 | | // 2 tables with 8 tablets each. |
507 | 0 | ASSERT_EQ(tables_vector.size() * kNTabletsPerTable, boost::size(client::TableRange(table))); |
508 | 0 | int nrows = 0; |
509 | 0 | for (const auto& row : client::TableRange(table)) { |
510 | 0 | nrows++; |
511 | 0 | string stream_id = row.column(0).string_value(); |
512 | 0 | tablet_bootstraps[stream_id]++; |
513 | |
|
514 | 0 | string checkpoint = row.column(2).string_value(); |
515 | 0 | auto s = OpId::FromString(checkpoint); |
516 | 0 | ASSERT_OK(s); |
517 | 0 | OpId op_id = *s; |
518 | 0 | ASSERT_GT(op_id.index, 0); |
519 | |
|
520 | 0 | LOG(INFO) << "Bootstrap id " << stream_id |
521 | 0 | << " for tablet " << row.column(1).string_value(); |
522 | 0 | } |
523 | |
|
524 | 0 | ASSERT_EQ(tablet_bootstraps.size(), producer_tables.size()); |
525 | | // Check that each bootstrap id has 8 tablets. |
526 | 0 | for (const auto& e : tablet_bootstraps) { |
527 | 0 | ASSERT_EQ(e.second, kNTabletsPerTable); |
528 | 0 | } |
529 | | |
530 | | // Map table -> bootstrap_id. We will need when setting up replication. |
531 | 0 | std::unordered_map<TableId, std::string> table_bootstrap_ids; |
532 | 0 | for (int i = 0; i < resp.cdc_bootstrap_ids_size(); i++) { |
533 | 0 | table_bootstrap_ids[req.table_ids(i)] = resp.cdc_bootstrap_ids(i); |
534 | 0 | } |
535 | | |
536 | | // 2. Setup replication. |
537 | 0 | master::SetupUniverseReplicationRequestPB setup_universe_req; |
538 | 0 | master::SetupUniverseReplicationResponsePB setup_universe_resp; |
539 | 0 | setup_universe_req.set_producer_id(kUniverseId); |
540 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
541 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
542 | 0 | HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); |
543 | |
|
544 | 0 | setup_universe_req.mutable_producer_table_ids()->Reserve( |
545 | 0 | narrow_cast<int>(producer_tables.size())); |
546 | 0 | for (const auto& producer_table : producer_tables) { |
547 | 0 | setup_universe_req.add_producer_table_ids(producer_table->id()); |
548 | 0 | const auto& iter = table_bootstrap_ids.find(producer_table->id()); |
549 | 0 | ASSERT_NE(iter, table_bootstrap_ids.end()); |
550 | 0 | setup_universe_req.add_producer_bootstrap_ids(iter->second); |
551 | 0 | } |
552 | |
|
553 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
554 | 0 | &consumer_client()->proxy_cache(), |
555 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
556 | |
|
557 | 0 | rpc.Reset(); |
558 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
559 | 0 | ASSERT_OK(master_proxy->SetupUniverseReplication( |
560 | 0 | setup_universe_req, &setup_universe_resp, &rpc)); |
561 | 0 | ASSERT_FALSE(setup_universe_resp.has_error()); |
562 | | |
563 | | // 3. Verify everything is setup correctly. |
564 | 0 | master::GetUniverseReplicationResponsePB get_universe_replication_resp; |
565 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, |
566 | 0 | &get_universe_replication_resp)); |
567 | 0 | ASSERT_OK(CorrectlyPollingAllTablets( |
568 | 0 | consumer_cluster(), narrow_cast<int32_t>(tables_vector.size() * kNTabletsPerTable))); |
569 | | |
570 | | // 4. Write more data. |
571 | 0 | for (const auto& producer_table : producer_tables) { |
572 | 0 | WriteWorkload(1000, 1005, producer_client(), producer_table->name()); |
573 | 0 | } |
574 | | |
575 | | // 5. Verify that only new writes get replicated to consumer since we bootstrapped the producer |
576 | | // after we had already written some data, therefore the old data (whatever was there before we |
577 | | // bootstrapped the producer) should not be replicated. |
578 | 0 | auto data_replicated_correctly = [&]() { |
579 | 0 | for (const auto& consumer_table : consumer_tables) { |
580 | 0 | LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); |
581 | 0 | std::vector<std::string> expected_results; |
582 | 0 | for (int key = 1000; key < 1005; key++) { |
583 | 0 | expected_results.emplace_back("{ int32:" + std::to_string(key) + " }"); |
584 | 0 | } |
585 | 0 | std::sort(expected_results.begin(), expected_results.end()); |
586 | |
|
587 | 0 | auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client()); |
588 | 0 | std::sort(consumer_results.begin(), consumer_results.end()); |
589 | |
|
590 | 0 | if (expected_results.size() != consumer_results.size()) { |
591 | 0 | return false; |
592 | 0 | } |
593 | | |
594 | 0 | for (size_t idx = 0; idx < expected_results.size(); idx++) { |
595 | 0 | if (expected_results[idx] != consumer_results[idx]) { |
596 | 0 | return false; |
597 | 0 | } |
598 | 0 | } |
599 | 0 | } |
600 | 0 | return true; |
601 | 0 | }; |
602 | 0 | ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(); }, |
603 | 0 | MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); |
604 | 0 | } |
605 | | |
606 | | // Test for #2250 to verify that replication for tables with the same prefix gets set up correctly. |
607 | 0 | TEST_P(TwoDCTest, SetupUniverseReplicationMultipleTables) { |
608 | | // Setup the two clusters without any tables. |
609 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1)); |
610 | | |
611 | | // Create tables with the same prefix. |
612 | 0 | std::string table_names[2] = {"table", "table_index"}; |
613 | |
|
614 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
615 | 0 | for (int i = 0; i < 2; i++) { |
616 | 0 | auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, table_names[i], 3)); |
617 | 0 | std::shared_ptr<client::YBTable> producer_table; |
618 | 0 | ASSERT_OK(producer_client()->OpenTable(t, &producer_table)); |
619 | 0 | producer_tables.push_back(producer_table); |
620 | 0 | } |
621 | |
|
622 | 0 | for (int i = 0; i < 2; i++) { |
623 | 0 | ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, table_names[i], 3)); |
624 | 0 | } |
625 | | |
626 | | // Setup universe replication on both these tables. |
627 | 0 | ASSERT_OK(SetupUniverseReplication( |
628 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
629 | | |
630 | | // Verify that universe was setup on consumer. |
631 | 0 | master::GetUniverseReplicationResponsePB resp; |
632 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
633 | 0 | ASSERT_EQ(resp.entry().producer_id(), kUniverseId); |
634 | 0 | ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); |
635 | 0 | for (uint32_t i = 0; i < producer_tables.size(); i++) { |
636 | 0 | ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id()); |
637 | 0 | } |
638 | |
|
639 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
640 | 0 | } |
641 | | |
642 | 0 | TEST_P(TwoDCTest, SetupUniverseReplicationLargeTableCount) { |
643 | 0 | if (IsSanitizer()) { |
644 | 0 | LOG(INFO) << "Skipping slow test"; |
645 | 0 | return; |
646 | 0 | } |
647 | | |
648 | | // Setup the two clusters without any tables. |
649 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1)); |
650 | 0 | FLAGS_enable_automatic_tablet_splitting = false; |
651 | | |
652 | | // Create a large number of tables to test the performance of setup_replication. |
653 | 0 | int table_count = 2; |
654 | 0 | int amplification[2] = {1, 5}; |
655 | 0 | MonoDelta setup_latency[2]; |
656 | 0 | std::string table_prefix = "stress_table_"; |
657 | 0 | bool passed_test = false; |
658 | |
|
659 | 0 | for (int retries = 0; retries < 3 && !passed_test; ++retries) { |
660 | 0 | for (int a : {0, 1}) { |
661 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
662 | 0 | for (int i = 0; i < table_count * amplification[a]; i++) { |
663 | 0 | std::string cur_table = |
664 | 0 | table_prefix + std::to_string(amplification[a]) + "-" + std::to_string(i); |
665 | 0 | ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, cur_table, 3)); |
666 | 0 | auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, cur_table, 3)); |
667 | 0 | std::shared_ptr<client::YBTable> producer_table; |
668 | 0 | ASSERT_OK(producer_client()->OpenTable(t, &producer_table)); |
669 | 0 | producer_tables.push_back(producer_table); |
670 | 0 | } |
671 | | |
672 | | // Add delays to all rpc calls to simulate live environment and ensure the test is IO bound. |
673 | 0 | FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 200; |
674 | 0 | FLAGS_rpc_throttle_threshold_bytes = 200; |
675 | |
|
676 | 0 | auto start_time = CoarseMonoClock::Now(); |
677 | | |
678 | | // Setup universe replication on all tables. |
679 | 0 | ASSERT_OK(SetupUniverseReplication( |
680 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
681 | | |
682 | | // Verify that universe was setup on consumer. |
683 | 0 | master::GetUniverseReplicationResponsePB resp; |
684 | 0 | ASSERT_OK( |
685 | 0 | VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
686 | 0 | ASSERT_EQ(resp.entry().producer_id(), kUniverseId); |
687 | 0 | ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); |
688 | 0 | for (uint32_t i = 0; i < producer_tables.size(); i++) { |
689 | 0 | ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id()); |
690 | 0 | } |
691 | |
|
692 | 0 | setup_latency[a] = CoarseMonoClock::Now() - start_time; |
693 | 0 | LOG(INFO) << "SetupReplication [" << a << "] took: " << setup_latency[a].ToSeconds() << "s"; |
694 | | |
695 | | // Remove delays for cleanup and next setup. |
696 | 0 | FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 0; |
697 | |
|
698 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
699 | 0 | } |
700 | | |
701 | | // We increased our table count by 5x, but we shouldn't have a linear latency increase. |
702 | 0 | passed_test = (setup_latency[1] < setup_latency[0] * 3); |
703 | 0 | } |
704 | |
|
705 | 0 | ASSERT_TRUE(passed_test); |
706 | 0 | } |
707 | | |
708 | 0 | TEST_P(TwoDCTest, BootstrapAndSetupLargeTableCount) { |
709 | 0 | if (IsSanitizer()) { |
710 | 0 | LOG(INFO) << "Skipping slow test"; |
711 | 0 | return; |
712 | 0 | } |
713 | | |
714 | | // Setup the two clusters without any tables. |
715 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({}, {}, 1)); |
716 | 0 | FLAGS_enable_automatic_tablet_splitting = false; |
717 | | |
718 | | // Create a medium, then large number of tables to test the performance of our CLI commands. |
719 | 0 | int table_count = 2; |
720 | 0 | int amplification[2] = {1, 5}; |
721 | 0 | MonoDelta bootstrap_latency[2]; |
722 | 0 | MonoDelta setup_latency[2]; |
723 | 0 | std::string table_prefix = "stress_table_"; |
724 | 0 | bool passed_test = false; |
725 | |
|
726 | 0 | for (int retries = 0; retries < 3 && !passed_test; ++retries) { |
727 | 0 | for (int a : {0, 1}) { |
728 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
729 | 0 | for (int i = 0; i < table_count * amplification[a]; i++) { |
730 | 0 | std::string cur_table = |
731 | 0 | table_prefix + std::to_string(amplification[a]) + "-" + std::to_string(i); |
732 | 0 | ASSERT_RESULT(CreateTable(consumer_client(), kNamespaceName, cur_table, 3)); |
733 | 0 | auto t = ASSERT_RESULT(CreateTable(producer_client(), kNamespaceName, cur_table, 3)); |
734 | 0 | std::shared_ptr<client::YBTable> producer_table; |
735 | 0 | ASSERT_OK(producer_client()->OpenTable(t, &producer_table)); |
736 | 0 | producer_tables.push_back(producer_table); |
737 | 0 | } |
738 | | |
739 | | // Add delays to all rpc calls to simulate live environment and ensure the test is IO bound. |
740 | 0 | FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 200; |
741 | 0 | FLAGS_rpc_throttle_threshold_bytes = 200; |
742 | | |
743 | | // Performance test of BootstrapProducer. |
744 | 0 | cdc::BootstrapProducerResponsePB boot_resp; |
745 | 0 | { |
746 | 0 | cdc::BootstrapProducerRequestPB req; |
747 | |
|
748 | 0 | for (const auto& producer_table : producer_tables) { |
749 | 0 | req.add_table_ids(producer_table->id()); |
750 | 0 | } |
751 | |
|
752 | 0 | auto start_time = CoarseMonoClock::Now(); |
753 | |
|
754 | 0 | auto producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>( |
755 | 0 | &producer_client()->proxy_cache(), |
756 | 0 | HostPort::FromBoundEndpoint( |
757 | 0 | producer_cluster()->mini_tablet_server(0)->bound_rpc_addr())); |
758 | 0 | rpc::RpcController rpc; |
759 | 0 | ASSERT_OK(producer_cdc_proxy->BootstrapProducer(req, &boot_resp, &rpc)); |
760 | 0 | ASSERT_FALSE(boot_resp.has_error()); |
761 | 0 | ASSERT_EQ(boot_resp.cdc_bootstrap_ids().size(), producer_tables.size()); |
762 | |
|
763 | 0 | bootstrap_latency[a] = CoarseMonoClock::Now() - start_time; |
764 | 0 | LOG(INFO) << "BootstrapProducer [" << a << "] took: " << bootstrap_latency[a].ToSeconds() |
765 | 0 | << "s"; |
766 | 0 | } |
767 | | |
768 | | // Performance test of SetupReplication, with Bootstrap IDs. |
769 | 0 | { |
770 | 0 | auto start_time = CoarseMonoClock::Now(); |
771 | | |
772 | | // Calling the SetupUniverse API directly so we can use producer_bootstrap_ids. |
773 | 0 | master::SetupUniverseReplicationRequestPB req; |
774 | 0 | master::SetupUniverseReplicationResponsePB resp; |
775 | 0 | req.set_producer_id(kUniverseId); |
776 | 0 | auto master_addrs = producer_cluster()->GetMasterAddresses(); |
777 | 0 | auto vec = ASSERT_RESULT(HostPort::ParseStrings(master_addrs, 0)); |
778 | 0 | HostPortsToPBs(vec, req.mutable_producer_master_addresses()); |
779 | 0 | for (const auto& table : producer_tables) { |
780 | 0 | req.add_producer_table_ids(table->id()); |
781 | 0 | } |
782 | 0 | for (const auto& bootstrap_id : boot_resp.cdc_bootstrap_ids()) { |
783 | 0 | req.add_producer_bootstrap_ids(bootstrap_id); |
784 | 0 | } |
785 | |
|
786 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
787 | 0 | &consumer_client()->proxy_cache(), |
788 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
789 | 0 | ASSERT_OK(WaitFor( |
790 | 0 | [&]() -> Result<bool> { |
791 | 0 | rpc::RpcController rpc; |
792 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
793 | 0 | if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) { |
794 | 0 | return false; |
795 | 0 | } |
796 | 0 | if (resp.has_error()) { |
797 | 0 | return false; |
798 | 0 | } |
799 | 0 | return true; |
800 | 0 | }, |
801 | 0 | MonoDelta::FromSeconds(30), "Setup universe replication")); |
802 | | |
803 | | // Verify that universe was setup on consumer. |
804 | 0 | { |
805 | 0 | master::GetUniverseReplicationResponsePB resp; |
806 | 0 | ASSERT_OK( |
807 | 0 | VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
808 | 0 | ASSERT_EQ(resp.entry().producer_id(), kUniverseId); |
809 | 0 | ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); |
810 | 0 | for (uint32_t i = 0; i < producer_tables.size(); i++) { |
811 | 0 | ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id()); |
812 | 0 | } |
813 | 0 | } |
814 | |
|
815 | 0 | setup_latency[a] = CoarseMonoClock::Now() - start_time; |
816 | 0 | LOG(INFO) << "SetupReplication [" << a << "] took: " << setup_latency[a].ToSeconds() << "s"; |
817 | 0 | } |
818 | | |
819 | | // Remove delays for cleanup and next setup. |
820 | 0 | FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms = 0; |
821 | |
|
822 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
823 | 0 | } |
824 | | // We increased our table count by 5x, but we shouldn't have a linear latency increase. |
825 | | // ASSERT_LT(bootstrap_latency[1], bootstrap_latency[0] * 5); |
826 | 0 | passed_test = (setup_latency[1] < setup_latency[0] * 3); |
827 | 0 | } |
828 | 0 | ASSERT_TRUE(passed_test); |
829 | 0 | } |
830 | | |
831 | 0 | TEST_P(TwoDCTest, PollWithConsumerRestart) { |
832 | | // Avoid long delays with node failures so we can run with more aggressive test timing |
833 | 0 | FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms |
834 | |
|
835 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
836 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({4}, {4}, replication_factor)); |
837 | |
|
838 | 0 | ASSERT_OK(SetupUniverseReplication( |
839 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, |
840 | 0 | {tables[0]} /* all producer tables */)); |
841 | | |
842 | | // After creating the cluster, make sure all tablets being polled for. |
843 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
844 | |
|
845 | 0 | consumer_cluster()->mini_tablet_server(0)->Shutdown(); |
846 | | |
847 | | // After shutting down a single consumer node, the other consumers should pick up the slack. |
848 | 0 | if (replication_factor > 1) { |
849 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
850 | 0 | } |
851 | |
|
852 | 0 | ASSERT_OK(consumer_cluster()->mini_tablet_server(0)->Start()); |
853 | | |
854 | | // After restarting the node. |
855 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
856 | |
|
857 | 0 | ASSERT_OK(consumer_cluster()->RestartSync()); |
858 | | |
859 | | // After consumer cluster restart. |
860 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
861 | |
|
862 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
863 | 0 | } |
864 | | |
865 | 0 | TEST_P(TwoDCTest, PollWithProducerNodesRestart) { |
866 | | // Avoid long delays with node failures so we can run with more aggressive test timing |
867 | 0 | FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms |
868 | |
|
869 | 0 | uint32_t replication_factor = 3, tablet_count = 4, master_count = 3; |
870 | 0 | auto tables = ASSERT_RESULT( |
871 | 0 | SetUpWithParams({tablet_count}, {tablet_count}, replication_factor, master_count)); |
872 | |
|
873 | 0 | ASSERT_OK(SetupUniverseReplication( |
874 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, |
875 | 0 | {tables[0]} /* all producer tables */, false /* leader_only */)); |
876 | | |
877 | | // After creating the cluster, make sure all tablets being polled for. |
878 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
879 | | |
880 | | // Stop the Master and wait for failover. |
881 | 0 | LOG(INFO) << "Failover to new Master"; |
882 | 0 | MiniMaster* old_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster()); |
883 | 0 | ASSERT_OK(old_master->WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
884 | 0 | ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->Shutdown(); |
885 | 0 | MiniMaster* new_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster()); |
886 | 0 | ASSERT_NE(nullptr, new_master); |
887 | 0 | ASSERT_NE(old_master, new_master); |
888 | 0 | ASSERT_OK(producer_cluster()->WaitForAllTabletServers()); |
889 | | |
890 | | // Stop a TServer on the Producer after failing its master. |
891 | 0 | producer_cluster()->mini_tablet_server(0)->Shutdown(); |
892 | | // This Verifies: |
893 | | // 1. Consumer successfully transitions over to using the new master for Tablet lookup. |
894 | | // 2. Consumer cluster has rebalanced all the CDC Pollers |
895 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
896 | 0 | WriteWorkload(0, 5, producer_client(), tables[0]->name()); |
897 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
898 | | |
899 | | // Restart the Producer TServer and verify that rebalancing happens. |
900 | 0 | ASSERT_OK(old_master->Start()); |
901 | 0 | ASSERT_OK(producer_cluster()->mini_tablet_server(0)->Start()); |
902 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
903 | 0 | WriteWorkload(6, 10, producer_client(), tables[0]->name()); |
904 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
905 | | |
906 | | // Cleanup. |
907 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
908 | 0 | } |
909 | | |
910 | 0 | TEST_P(TwoDCTest, PollWithProducerClusterRestart) { |
911 | | // Avoid long delays with node failures so we can run with more aggressive test timing |
912 | 0 | FLAGS_replication_failure_delay_exponent = 7; // 2^7 == 128ms |
913 | |
|
914 | 0 | uint32_t replication_factor = 3, tablet_count = 4; |
915 | 0 | auto tables = ASSERT_RESULT( |
916 | 0 | SetUpWithParams({tablet_count}, {tablet_count}, replication_factor)); |
917 | |
|
918 | 0 | ASSERT_OK(SetupUniverseReplication( |
919 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, |
920 | 0 | {tables[0]} /* all producer tables */)); |
921 | | |
922 | | // After creating the cluster, make sure all tablets being polled for. |
923 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
924 | | |
925 | | // Restart the ENTIRE Producer cluster. |
926 | 0 | ASSERT_OK(producer_cluster()->RestartSync()); |
927 | | |
928 | | // After producer cluster restart. |
929 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); |
930 | 0 | WriteWorkload(0, 5, producer_client(), tables[0]->name()); |
931 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
932 | | |
933 | | // Cleanup. |
934 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
935 | 0 | } |
936 | | |
937 | | |
938 | 0 | TEST_P(TwoDCTest, PollAndObserveIdleDampening) { |
939 | 0 | uint32_t replication_factor = 3, tablet_count = 1, master_count = 1; |
940 | 0 | auto tables = ASSERT_RESULT( |
941 | 0 | SetUpWithParams({tablet_count}, {tablet_count}, replication_factor, master_count)); |
942 | |
|
943 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
944 | 0 | kUniverseId, {tables[0]} , false )); |
945 | | |
946 | | // After creating the cluster, make sure all tablets being polled for. |
947 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); |
948 | | |
949 | | // Write some Info and query GetChanges to setup the CDCTabletMetrics. |
950 | 0 | WriteWorkload(0, 5, producer_client(), tables[0]->name()); |
951 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
952 | | |
953 | | /***************************************************************** |
954 | | * Find the CDC Tablet Metrics, which we will use for this test. * |
955 | | *****************************************************************/ |
956 | | // Find the stream. |
957 | 0 | master::ListCDCStreamsResponsePB stream_resp; |
958 | 0 | ASSERT_OK(GetCDCStreamForTable(tables[0]->id(), &stream_resp)); |
959 | 0 | ASSERT_EQ(stream_resp.streams_size(), 1); |
960 | 0 | ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), tables[0]->id()); |
961 | 0 | auto stream_id = stream_resp.streams(0).stream_id(); |
962 | | |
963 | | // Find the tablet id for the stream. |
964 | 0 | TabletId tablet_id; |
965 | 0 | { |
966 | 0 | yb::cdc::ListTabletsRequestPB tablets_req; |
967 | 0 | yb::cdc::ListTabletsResponsePB tablets_resp; |
968 | 0 | rpc::RpcController rpc; |
969 | 0 | tablets_req.set_stream_id(stream_id); |
970 | |
|
971 | 0 | auto producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>( |
972 | 0 | &producer_client()->proxy_cache(), |
973 | 0 | HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr())); |
974 | 0 | ASSERT_OK(producer_cdc_proxy->ListTablets(tablets_req, &tablets_resp, &rpc)); |
975 | 0 | ASSERT_FALSE(tablets_resp.has_error()); |
976 | 0 | ASSERT_EQ(tablets_resp.tablets_size(), 1); |
977 | 0 | tablet_id = tablets_resp.tablets(0).tablet_id(); |
978 | 0 | } |
979 | | |
980 | | // Find the TServer that is hosting this tablet. |
981 | 0 | tserver::TabletServer* cdc_ts = nullptr; |
982 | 0 | std::string ts_uuid; |
983 | 0 | std::mutex data_mutex; |
984 | 0 | { |
985 | 0 | ASSERT_OK(WaitFor([this, &tablet_id, &table = tables[0], &ts_uuid, &data_mutex] { |
986 | 0 | producer_client()->LookupTabletById( |
987 | 0 | tablet_id, |
988 | 0 | table, |
989 | | // TODO(tablet splitting + xCluster): After splitting integration is working (+ metrics |
990 | | // support), then set this to kTrue. |
991 | 0 | master::IncludeInactive::kFalse, |
992 | 0 | CoarseMonoClock::Now() + MonoDelta::FromSeconds(3), |
993 | 0 | [&ts_uuid, &data_mutex](const Result<client::internal::RemoteTabletPtr>& result) { |
994 | 0 | if (result.ok()) { |
995 | 0 | std::lock_guard<std::mutex> l(data_mutex); |
996 | 0 | ts_uuid = (*result)->LeaderTServer()->permanent_uuid(); |
997 | 0 | } |
998 | 0 | }, |
999 | 0 | client::UseCache::kFalse); |
1000 | 0 | std::lock_guard<std::mutex> l(data_mutex); |
1001 | 0 | return !ts_uuid.empty(); |
1002 | 0 | }, MonoDelta::FromSeconds(10), "Get TS for Tablet")); |
1003 | |
|
1004 | 0 | for (auto ts : producer_cluster()->mini_tablet_servers()) { |
1005 | 0 | if (ts->server()->permanent_uuid() == ts_uuid) { |
1006 | 0 | cdc_ts = ts->server(); |
1007 | 0 | break; |
1008 | 0 | } |
1009 | 0 | } |
1010 | 0 | } |
1011 | 0 | ASSERT_NOTNULL(cdc_ts); |
1012 | | |
1013 | | // Find the CDCTabletMetric associated with the above pair. |
1014 | 0 | auto cdc_service = dynamic_cast<cdc::CDCServiceImpl*>( |
1015 | 0 | cdc_ts->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); |
1016 | 0 | std::shared_ptr<cdc::CDCTabletMetrics> metrics = |
1017 | 0 | cdc_service->GetCDCTabletMetrics({"", stream_id, tablet_id}); |
1018 | | |
1019 | | /*********************************** |
1020 | | * Setup Complete. Starting test. * |
1021 | | ***********************************/ |
1022 | | // Log the first heartbeat count for baseline |
1023 | 0 | auto first_heartbeat_count = metrics->rpc_heartbeats_responded->value(); |
1024 | 0 | LOG(INFO) << "first_heartbeat_count = " << first_heartbeat_count; |
1025 | | |
1026 | | // Write some Info to the producer, which should be consumed quickly by GetChanges. |
1027 | 0 | WriteWorkload(6, 10, producer_client(), tables[0]->name()); |
1028 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1029 | | |
1030 | | // Sleep for the idle timeout. |
1031 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms)); |
1032 | 0 | auto active_heartbeat_count = metrics->rpc_heartbeats_responded->value(); |
1033 | 0 | LOG(INFO) << "active_heartbeat_count = " << active_heartbeat_count; |
1034 | | // The new heartbeat count should be at least 3 (idle_wait) |
1035 | 0 | ASSERT_GE(active_heartbeat_count - first_heartbeat_count, FLAGS_async_replication_max_idle_wait); |
1036 | | |
1037 | | // Now, wait past update request frequency, so we should be using idle timing. |
1038 | 0 | auto multiplier = 2; |
1039 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms * multiplier)); |
1040 | 0 | auto idle_heartbeat_count = metrics->rpc_heartbeats_responded->value(); |
1041 | 0 | ASSERT_LE(idle_heartbeat_count - active_heartbeat_count, multiplier + 1 /*allow subtle race*/); |
1042 | 0 | LOG(INFO) << "idle_heartbeat_count = " << idle_heartbeat_count; |
1043 | | |
1044 | | // Write some more data to the producer and call GetChanges with some real data. |
1045 | 0 | WriteWorkload(11, 15, producer_client(), tables[0]->name()); |
1046 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1047 | | |
1048 | | // Sleep for the idle timeout and Verify that the idle behavior ended now that we have new data. |
1049 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms)); |
1050 | 0 | active_heartbeat_count = metrics->rpc_heartbeats_responded->value(); |
1051 | 0 | LOG(INFO) << "active_heartbeat_count = " << active_heartbeat_count; |
1052 | | // The new heartbeat count should be at least 3 (idle_wait) |
1053 | 0 | ASSERT_GE(active_heartbeat_count - idle_heartbeat_count, FLAGS_async_replication_max_idle_wait); |
1054 | | |
1055 | | // Cleanup. |
1056 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1057 | 0 | } |
1058 | | |
1059 | 0 | TEST_P(TwoDCTest, ApplyOperations) { |
1060 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1061 | | // Use just one tablet here to more easily catch lower-level write issues with this test. |
1062 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor)); |
1063 | |
|
1064 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1065 | | // tables contains both producer and consumer universe tables (alternately). |
1066 | | // Pick out just the producer table from the list. |
1067 | 0 | producer_tables.reserve(1); |
1068 | 0 | producer_tables.push_back(tables[0]); |
1069 | 0 | ASSERT_OK(SetupUniverseReplication( |
1070 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1071 | | |
1072 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1073 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); |
1074 | |
|
1075 | 0 | WriteWorkload(0, 5, producer_client(), tables[0]->name()); |
1076 | | |
1077 | | // Check that all tablets continue to be polled for. |
1078 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); |
1079 | | |
1080 | | // Verify that both clusters have the same records. |
1081 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1082 | |
|
1083 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1084 | 0 | } |
1085 | | |
1086 | 0 | TEST_P(TwoDCTest, ApplyOperationsWithTransactions) { |
1087 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1088 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor)); |
1089 | |
|
1090 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1091 | | // tables contains both producer and consumer universe tables (alternately). |
1092 | | // Pick out just the producer table from the list. |
1093 | 0 | producer_tables.reserve(1); |
1094 | 0 | producer_tables.push_back(tables[0]); |
1095 | 0 | ASSERT_OK(SetupUniverseReplication( |
1096 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1097 | | |
1098 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1099 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1100 | | |
1101 | | // Write some transactional rows. |
1102 | 0 | WriteTransactionalWorkload(0, 5, producer_client(), producer_txn_mgr(), tables[0]->name()); |
1103 | | |
1104 | | // Write some non-transactional rows. |
1105 | 0 | WriteWorkload(6, 10, producer_client(), tables[0]->name()); |
1106 | | |
1107 | | // Check that all tablets continue to be polled for. |
1108 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1109 | | |
1110 | | // Verify that both clusters have the same records. |
1111 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1112 | |
|
1113 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1114 | 0 | } |
1115 | | |
1116 | | class TwoDCTestWithEnableIntentsReplication : public TwoDCTest { |
1117 | | }; |
1118 | | |
1119 | | INSTANTIATE_TEST_CASE_P(TwoDCTestParams, TwoDCTestWithEnableIntentsReplication, |
1120 | | ::testing::Values(TwoDCTestParams(0, true), TwoDCTestParams(1, true))); |
1121 | | |
1122 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, UpdateWithinTransaction) { |
1123 | 0 | constexpr int kNumTablets = 1; |
1124 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1125 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({kNumTablets}, {kNumTablets}, replication_factor)); |
1126 | |
|
1127 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1128 | | // tables contains both producer and consumer universe tables (alternately). |
1129 | | // Pick out just the producer table from the list. |
1130 | 0 | producer_tables.reserve(1); |
1131 | 0 | producer_tables.push_back(tables[0]); |
1132 | 0 | ASSERT_OK(SetupUniverseReplication( |
1133 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1134 | | |
1135 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1136 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNumTablets)); |
1137 | |
|
1138 | 0 | auto txn = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr())); |
1139 | 0 | for (bool del : {false, true}) { |
1140 | 0 | WriteIntents(1, 5, producer_client(), txn.first, tables[0]->name(), del); |
1141 | 0 | } |
1142 | 0 | ASSERT_OK(txn.second->CommitFuture().get()); |
1143 | |
|
1144 | 0 | txn.first->SetTransaction(nullptr); |
1145 | 0 | client::TableHandle table_handle; |
1146 | 0 | ASSERT_OK(table_handle.Open(tables[0]->name(), producer_client())); |
1147 | 0 | auto op = table_handle.NewInsertOp(); |
1148 | 0 | auto req = op->mutable_request(); |
1149 | 0 | QLAddInt32HashValue(req, 0); |
1150 | 0 | ASSERT_OK(txn.first->ApplyAndFlush(op)); |
1151 | |
|
1152 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1153 | | |
1154 | | // Check that all tablets continue to be polled for. |
1155 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNumTablets)); |
1156 | |
|
1157 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1158 | 0 | } |
1159 | | |
1160 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, TransactionsWithRestart) { |
1161 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 3)); |
1162 | |
|
1163 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables = { tables[0] }; |
1164 | | // tables contains both producer and consumer universe tables (alternately). |
1165 | | // Pick out just the producer table from the list. |
1166 | 0 | ASSERT_OK(SetupUniverseReplication( |
1167 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1168 | | |
1169 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1170 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1171 | |
|
1172 | 0 | auto txn = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr())); |
1173 | | // Write some transactional rows. |
1174 | 0 | WriteTransactionalWorkload( |
1175 | 0 | 0, 5, producer_client(), producer_txn_mgr(), tables[0]->name(), /* delete_op */ false); |
1176 | |
|
1177 | 0 | WriteWorkload(6, 10, producer_client(), tables[0]->name()); |
1178 | |
|
1179 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1180 | 0 | std::this_thread::sleep_for(5s); |
1181 | 0 | ASSERT_OK(consumer_cluster()->FlushTablets( |
1182 | 0 | tablet::FlushMode::kSync, tablet::FlushFlags::kRegular)); |
1183 | 0 | LOG(INFO) << "Restart"; |
1184 | 0 | ASSERT_OK(consumer_cluster()->RestartSync()); |
1185 | 0 | std::this_thread::sleep_for(5s); |
1186 | 0 | LOG(INFO) << "Commit"; |
1187 | 0 | ASSERT_OK(txn.second->CommitFuture().get()); |
1188 | | |
1189 | | // Verify that both clusters have the same records. |
1190 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1191 | |
|
1192 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1193 | 0 | } |
1194 | | |
1195 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, MultipleTransactions) { |
1196 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1197 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor)); |
1198 | |
|
1199 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1200 | | // tables contains both producer and consumer universe tables (alternately). |
1201 | | // Pick out just the producer table from the list. |
1202 | 0 | producer_tables.reserve(1); |
1203 | 0 | producer_tables.push_back(tables[0]); |
1204 | 0 | ASSERT_OK(SetupUniverseReplication( |
1205 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1206 | | |
1207 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1208 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); |
1209 | |
|
1210 | 0 | auto txn_0 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr())); |
1211 | 0 | auto txn_1 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr())); |
1212 | |
|
1213 | 0 | ASSERT_NO_FATALS(WriteIntents(0, 5, producer_client(), txn_0.first, tables[0]->name())); |
1214 | 0 | ASSERT_NO_FATALS(WriteIntents(5, 10, producer_client(), txn_0.first, tables[0]->name())); |
1215 | 0 | ASSERT_NO_FATALS(WriteIntents(10, 15, producer_client(), txn_1.first, tables[0]->name())); |
1216 | 0 | ASSERT_NO_FATALS(WriteIntents(10, 20, producer_client(), txn_1.first, tables[0]->name())); |
1217 | |
|
1218 | 0 | ASSERT_OK(WaitFor([&]() { |
1219 | 0 | return CountIntents(consumer_cluster()) > 0; |
1220 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster replicated intents")); |
1221 | | |
1222 | | // Make sure that none of the intents replicated have been committed. |
1223 | 0 | auto consumer_results = ScanToStrings(tables[1]->name(), consumer_client()); |
1224 | 0 | ASSERT_EQ(consumer_results.size(), 0); |
1225 | |
|
1226 | 0 | ASSERT_OK(txn_0.second->CommitFuture().get()); |
1227 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1228 | |
|
1229 | 0 | ASSERT_OK(txn_1.second->CommitFuture().get()); |
1230 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1231 | 0 | ASSERT_OK(WaitFor([&]() { |
1232 | 0 | return CountIntents(consumer_cluster()) == 0; |
1233 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster cleaned up intents")); |
1234 | 0 | } |
1235 | | |
1236 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, CleanupAbortedTransactions) { |
1237 | 0 | static const int kNumRecordsPerBatch = 5; |
1238 | 0 | const uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1239 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1 /* num_consumer_tablets */}, |
1240 | 0 | {1 /* num_producer_tablets */}, |
1241 | 0 | replication_factor)); |
1242 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1243 | | // tables contains both producer and consumer universe tables (alternately). |
1244 | | // Pick out just the producer table from the list. |
1245 | 0 | producer_tables.reserve(1); |
1246 | 0 | producer_tables.push_back(tables[0]); |
1247 | 0 | ASSERT_OK(SetupUniverseReplication( |
1248 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1249 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1250 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1 /* num_producer_tablets */)); |
1251 | 0 | auto txn_0 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr())); |
1252 | 0 | ASSERT_NO_FATALS(WriteIntents(0, kNumRecordsPerBatch, producer_client(), txn_0.first, |
1253 | 0 | tables[0]->name())); |
1254 | | // Wait for records to be replicated. |
1255 | 0 | ASSERT_OK(WaitFor([&]() { |
1256 | 0 | return CountIntents(consumer_cluster()) == kNumRecordsPerBatch * replication_factor; |
1257 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster created intents")); |
1258 | 0 | ASSERT_OK(consumer_cluster()->FlushTablets()); |
1259 | | // Then, set timeout to 0 and make sure we do cleanup on the next compaction. |
1260 | 0 | SetAtomicFlag(0, &FLAGS_external_intent_cleanup_secs); |
1261 | 0 | ASSERT_NO_FATALS(WriteIntents(kNumRecordsPerBatch, kNumRecordsPerBatch * 2, producer_client(), |
1262 | 0 | txn_0.first, tables[0]->name())); |
1263 | | // Wait for records to be replicated. |
1264 | 0 | ASSERT_OK(WaitFor([&]() { |
1265 | 0 | return CountIntents(consumer_cluster()) == 2 * kNumRecordsPerBatch * replication_factor; |
1266 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster created intents")); |
1267 | 0 | ASSERT_OK(consumer_cluster()->CompactTablets()); |
1268 | 0 | ASSERT_OK(WaitFor([&]() { |
1269 | 0 | return CountIntents(consumer_cluster()) == 0; |
1270 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster cleaned up intents")); |
1271 | 0 | txn_0.second->Abort(); |
1272 | 0 | } |
1273 | | |
1274 | | // Make sure when we compact a tablet, we retain intents. |
1275 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, NoCleanupOfFlushedFiles) { |
1276 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1277 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor)); |
1278 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1279 | | // tables contains both producer and consumer universe tables (alternately). |
1280 | | // Pick out just the producer table from the list. |
1281 | 0 | producer_tables.reserve(1); |
1282 | 0 | producer_tables.push_back(tables[0]); |
1283 | 0 | ASSERT_OK(SetupUniverseReplication( |
1284 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1285 | | |
1286 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1287 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); |
1288 | 0 | auto txn_0 = ASSERT_RESULT(CreateSessionWithTransaction(producer_client(), producer_txn_mgr())); |
1289 | 0 | ASSERT_NO_FATALS(WriteIntents(0, 5, producer_client(), txn_0.first, tables[0]->name())); |
1290 | 0 | auto consumer_results = ScanToStrings(tables[1]->name(), consumer_client()); |
1291 | 0 | ASSERT_EQ(consumer_results.size(), 0); |
1292 | 0 | ASSERT_OK(consumer_cluster()->FlushTablets()); |
1293 | 0 | ASSERT_NO_FATALS(WriteIntents(5, 10, producer_client(), txn_0.first, tables[0]->name())); |
1294 | 0 | ASSERT_OK(consumer_cluster()->FlushTablets()); |
1295 | 0 | ASSERT_OK(consumer_cluster()->CompactTablets()); |
1296 | | // Wait for 5 seconds to make sure background CleanupIntents thread doesn't cleanup intents on the |
1297 | | // consumer. |
1298 | 0 | SleepFor(MonoDelta::FromSeconds(5)); |
1299 | 0 | ASSERT_OK(txn_0.second->CommitFuture().get()); |
1300 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1301 | 0 | ASSERT_OK(WaitFor([&]() { |
1302 | 0 | return CountIntents(consumer_cluster()) == 0; |
1303 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Consumer cluster cleaned up intents")); |
1304 | 0 | } |
1305 | | |
1306 | | |
1307 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, ManyToOneTabletMapping) { |
1308 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1309 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {5}, replication_factor)); |
1310 | |
|
1311 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1312 | | // tables contains both producer and consumer universe tables (alternately). |
1313 | | // Pick out just the producer table from the list. |
1314 | 0 | producer_tables.reserve(1); |
1315 | 0 | producer_tables.push_back(tables[0]); |
1316 | 0 | ASSERT_OK(SetupUniverseReplication( |
1317 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1318 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 5)); |
1319 | |
|
1320 | 0 | WriteTransactionalWorkload(0, 100, producer_client(), producer_txn_mgr(), tables[0]->name()); |
1321 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name(), 60 /* timeout_secs */)); |
1322 | 0 | } |
1323 | | |
1324 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, OneToManyTabletMapping) { |
1325 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1326 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({5}, {2}, replication_factor)); |
1327 | |
|
1328 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1329 | | // tables contains both producer and consumer universe tables (alternately). |
1330 | | // Pick out just the producer table from the list. |
1331 | 0 | producer_tables.reserve(1); |
1332 | 0 | producer_tables.push_back(tables[0]); |
1333 | 0 | ASSERT_OK(SetupUniverseReplication( |
1334 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1335 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1336 | 0 | WriteTransactionalWorkload(0, 50, producer_client(), producer_txn_mgr(), tables[0]->name()); |
1337 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name(), 60 /* timeout_secs */)); |
1338 | 0 | } |
1339 | | |
1340 | 0 | TEST_P(TwoDCTest, TestExternalWriteHybridTime) { |
1341 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1342 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor)); |
1343 | |
|
1344 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1345 | 0 | producer_tables.push_back(tables[0]); |
1346 | 0 | ASSERT_OK(SetupUniverseReplication( |
1347 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1348 | | |
1349 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1350 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1351 | | |
1352 | | // Write 2 rows. |
1353 | 0 | WriteWorkload(0, 2, producer_client(), tables[0]->name()); |
1354 | | |
1355 | | // Ensure that records can be read. |
1356 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1357 | | |
1358 | | // Delete 1 record. |
1359 | 0 | DeleteWorkload(0, 1, producer_client(), tables[0]->name()); |
1360 | | |
1361 | | // Ensure that record is deleted on both universes. |
1362 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1363 | | |
1364 | | // Delete 2nd record but replicate at a low timestamp (timestamp lower than insertion timestamp). |
1365 | 0 | FLAGS_TEST_twodc_write_hybrid_time = true; |
1366 | 0 | DeleteWorkload(1, 2, producer_client(), tables[0]->name()); |
1367 | | |
1368 | | // Verify that record exists on consumer universe, but is deleted from producer universe. |
1369 | 0 | ASSERT_OK(VerifyNumRecords(tables[0]->name(), producer_client(), 0)); |
1370 | 0 | ASSERT_OK(VerifyNumRecords(tables[1]->name(), consumer_client(), 1)); |
1371 | |
|
1372 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1373 | 0 | } |
1374 | | |
1375 | 0 | TEST_P(TwoDCTestWithEnableIntentsReplication, BiDirectionalWrites) { |
1376 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 1)); |
1377 | | |
1378 | | // Setup bi-directional replication. |
1379 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1380 | 0 | producer_tables.push_back(tables[0]); |
1381 | 0 | ASSERT_OK(SetupUniverseReplication( |
1382 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1383 | |
|
1384 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables_reverse; |
1385 | 0 | producer_tables_reverse.push_back(tables[1]); |
1386 | 0 | ASSERT_OK(SetupUniverseReplication( |
1387 | 0 | consumer_cluster(), producer_cluster(), producer_client(), kUniverseId, |
1388 | 0 | producer_tables_reverse)); |
1389 | | |
1390 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1391 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1392 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 2)); |
1393 | | |
1394 | | // Write non-conflicting rows on both clusters. |
1395 | 0 | WriteWorkload(0, 5, producer_client(), tables[0]->name()); |
1396 | 0 | WriteWorkload(5, 10, consumer_client(), tables[1]->name()); |
1397 | | |
1398 | | // Ensure that records are the same on both clusters. |
1399 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1400 | | // Ensure that both universes have all 10 records. |
1401 | 0 | ASSERT_OK(VerifyNumRecords(tables[0]->name(), producer_client(), 10)); |
1402 | | |
1403 | | // Write conflicting records on both clusters (1 clusters adds key, another deletes key). |
1404 | 0 | std::vector<std::thread> threads; |
1405 | 0 | for (int i = 0; i < 2; ++i) { |
1406 | 0 | auto client = i == 0 ? producer_client() : consumer_client(); |
1407 | 0 | int index = i; |
1408 | 0 | bool is_delete = i == 0; |
1409 | 0 | threads.emplace_back([this, client, index, tables, is_delete] { |
1410 | 0 | WriteWorkload(10, 20, client, tables[index]->name(), is_delete); |
1411 | 0 | }); |
1412 | 0 | } |
1413 | |
|
1414 | 0 | for (auto& thread : threads) { |
1415 | 0 | thread.join(); |
1416 | 0 | } |
1417 | | |
1418 | | // Ensure that same records exist on both universes. |
1419 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1420 | |
|
1421 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1422 | 0 | } |
1423 | | |
1424 | 0 | TEST_P(TwoDCTest, AlterUniverseReplicationMasters) { |
1425 | | // Tablets = Servers + 1 to stay simple but ensure round robin gives a tablet to everyone. |
1426 | 0 | uint32_t t_count = 2; |
1427 | 0 | int master_count = 3; |
1428 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams( |
1429 | 0 | {t_count, t_count}, {t_count, t_count}, 1, master_count)); |
1430 | | |
1431 | | // tables contains both producer and consumer universe tables (alternately). |
1432 | | // Pick out just the producer table from the list. |
1433 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables{tables[0], tables[2]}, |
1434 | 0 | initial_tables{tables[0]}; |
1435 | | |
1436 | | // SetupUniverseReplication only utilizes 1 master. |
1437 | 0 | ASSERT_OK(SetupUniverseReplication( |
1438 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, initial_tables)); |
1439 | |
|
1440 | 0 | master::GetUniverseReplicationResponsePB v_resp; |
1441 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &v_resp)); |
1442 | 0 | ASSERT_EQ(v_resp.entry().producer_master_addresses_size(), 1); |
1443 | 0 | ASSERT_EQ(HostPortFromPB(v_resp.entry().producer_master_addresses(0)), |
1444 | 0 | ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1445 | | |
1446 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1447 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count)); |
1448 | |
|
1449 | 0 | LOG(INFO) << "Alter Replication to include all Masters"; |
1450 | | // Alter Replication to include the other masters. |
1451 | 0 | { |
1452 | 0 | master::AlterUniverseReplicationRequestPB alter_req; |
1453 | 0 | master::AlterUniverseReplicationResponsePB alter_resp; |
1454 | 0 | alter_req.set_producer_id(kUniverseId); |
1455 | | |
1456 | | // GetMasterAddresses returns 3 masters. |
1457 | 0 | string master_addr = producer_cluster()->GetMasterAddresses(); |
1458 | 0 | auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); |
1459 | 0 | HostPortsToPBs(hp_vec, alter_req.mutable_producer_master_addresses()); |
1460 | |
|
1461 | 0 | rpc::RpcController rpc; |
1462 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1463 | |
|
1464 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1465 | 0 | &consumer_client()->proxy_cache(), |
1466 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1467 | 0 | ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); |
1468 | 0 | ASSERT_FALSE(alter_resp.has_error()); |
1469 | | |
1470 | | // Verify that the consumer now has all masters. |
1471 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1472 | 0 | master::GetUniverseReplicationResponsePB tmp_resp; |
1473 | 0 | return VerifyUniverseReplication(consumer_cluster(), consumer_client(), |
1474 | 0 | kUniverseId, &tmp_resp).ok() && |
1475 | 0 | tmp_resp.entry().producer_master_addresses_size() == master_count; |
1476 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify master count increased.")); |
1477 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count)); |
1478 | 0 | } |
1479 | | |
1480 | | // Stop the old master. |
1481 | 0 | LOG(INFO) << "Failover to new Master"; |
1482 | 0 | MiniMaster* old_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster()); |
1483 | 0 | ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->Shutdown(); |
1484 | 0 | MiniMaster* new_master = ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster()); |
1485 | 0 | ASSERT_NE(nullptr, new_master); |
1486 | 0 | ASSERT_NE(old_master, new_master); |
1487 | 0 | ASSERT_OK(producer_cluster()->WaitForAllTabletServers()); |
1488 | |
|
1489 | 0 | LOG(INFO) << "Add Table after Master Failover"; |
1490 | | // Add a new table to replication and ensure that it can read using the new master config. |
1491 | 0 | { |
1492 | 0 | master::AlterUniverseReplicationRequestPB alter_req; |
1493 | 0 | master::AlterUniverseReplicationResponsePB alter_resp; |
1494 | 0 | alter_req.set_producer_id(kUniverseId); |
1495 | 0 | alter_req.add_producer_table_ids_to_add(producer_tables[1]->id()); |
1496 | 0 | rpc::RpcController rpc; |
1497 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1498 | |
|
1499 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1500 | 0 | &consumer_client()->proxy_cache(), |
1501 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1502 | 0 | ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); |
1503 | 0 | ASSERT_FALSE(alter_resp.has_error()); |
1504 | | |
1505 | | // Verify that the consumer now has both tables in the universe. |
1506 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1507 | 0 | master::GetUniverseReplicationResponsePB tmp_resp; |
1508 | 0 | return VerifyUniverseReplication(consumer_cluster(), consumer_client(), |
1509 | 0 | kUniverseId, &tmp_resp).ok() && |
1510 | 0 | tmp_resp.entry().tables_size() == 2; |
1511 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); |
1512 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count * 2)); |
1513 | 0 | } |
1514 | |
|
1515 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1516 | 0 | } |
1517 | | |
1518 | 0 | TEST_P(TwoDCTest, AlterUniverseReplicationTables) { |
1519 | | // Setup the consumer and producer cluster. |
1520 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({3, 3}, {3, 3}, 1)); |
1521 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables{tables[0], tables[2]}; |
1522 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables{tables[1], tables[3]}; |
1523 | | |
1524 | | // Setup universe replication on the first table. |
1525 | 0 | auto initial_table = { producer_tables[0] }; |
1526 | 0 | ASSERT_OK(SetupUniverseReplication( |
1527 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, initial_table)); |
1528 | | |
1529 | | // Verify that universe was setup on consumer. |
1530 | 0 | master::GetUniverseReplicationResponsePB v_resp; |
1531 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &v_resp)); |
1532 | 0 | ASSERT_EQ(v_resp.entry().producer_id(), kUniverseId); |
1533 | 0 | ASSERT_EQ(v_resp.entry().tables_size(), 1); |
1534 | 0 | ASSERT_EQ(v_resp.entry().tables(0), producer_tables[0]->id()); |
1535 | |
|
1536 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 3)); |
1537 | | |
1538 | | // 'add_table'. Add the next table with the alter command. |
1539 | 0 | { |
1540 | 0 | master::AlterUniverseReplicationRequestPB alter_req; |
1541 | 0 | master::AlterUniverseReplicationResponsePB alter_resp; |
1542 | 0 | alter_req.set_producer_id(kUniverseId); |
1543 | 0 | alter_req.add_producer_table_ids_to_add(producer_tables[1]->id()); |
1544 | 0 | rpc::RpcController rpc; |
1545 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1546 | |
|
1547 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1548 | 0 | &consumer_client()->proxy_cache(), |
1549 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1550 | 0 | ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); |
1551 | 0 | ASSERT_FALSE(alter_resp.has_error()); |
1552 | | |
1553 | | // Verify that the consumer now has both tables in the universe. |
1554 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1555 | 0 | master::GetUniverseReplicationResponsePB tmp_resp; |
1556 | 0 | return VerifyUniverseReplication(consumer_cluster(), consumer_client(), |
1557 | 0 | kUniverseId, &tmp_resp).ok() && |
1558 | 0 | tmp_resp.entry().tables_size() == 2; |
1559 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); |
1560 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 6)); |
1561 | 0 | } |
1562 | | |
1563 | | // Write some rows to the new table on the Producer. Ensure that the Consumer gets it. |
1564 | 0 | WriteWorkload(6, 10, producer_client(), producer_tables[1]->name()); |
1565 | 0 | ASSERT_OK(VerifyWrittenRecords(producer_tables[1]->name(), consumer_tables[1]->name())); |
1566 | | |
1567 | | // 'remove_table'. Remove the original table, leaving only the new one. |
1568 | 0 | { |
1569 | 0 | master::AlterUniverseReplicationRequestPB alter_req; |
1570 | 0 | master::AlterUniverseReplicationResponsePB alter_resp; |
1571 | 0 | alter_req.set_producer_id(kUniverseId); |
1572 | 0 | alter_req.add_producer_table_ids_to_remove(producer_tables[0]->id()); |
1573 | 0 | rpc::RpcController rpc; |
1574 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1575 | |
|
1576 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1577 | 0 | &consumer_client()->proxy_cache(), |
1578 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1579 | 0 | ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); |
1580 | 0 | ASSERT_FALSE(alter_resp.has_error()); |
1581 | | |
1582 | | // Verify that the consumer now has only the new table created by the previous alter. |
1583 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
1584 | 0 | return VerifyUniverseReplication(consumer_cluster(), consumer_client(), |
1585 | 0 | kUniverseId, &v_resp).ok() && |
1586 | 0 | v_resp.entry().tables_size() == 1; |
1587 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table removed with alter.")); |
1588 | 0 | ASSERT_EQ(v_resp.entry().tables(0), producer_tables[1]->id()); |
1589 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 3)); |
1590 | 0 | } |
1591 | |
|
1592 | 0 | LOG(INFO) << "All alter tests passed. Tearing down..."; |
1593 | |
|
1594 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1595 | 0 | } |
1596 | | |
1597 | 0 | TEST_P(TwoDCTest, ToggleReplicationEnabled) { |
1598 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1599 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor)); |
1600 | |
|
1601 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1602 | | // tables contains both producer and consumer universe tables (alternately). |
1603 | | // Pick out just the producer table from the list. |
1604 | 0 | producer_tables.reserve(1); |
1605 | 0 | producer_tables.push_back(tables[0]); |
1606 | 0 | ASSERT_OK(SetupUniverseReplication( |
1607 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1608 | | |
1609 | | // Verify that universe is now ACTIVE |
1610 | 0 | master::GetUniverseReplicationResponsePB resp; |
1611 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
1612 | | |
1613 | | // After we know the universe is ACTIVE, make sure all tablets are getting polled. |
1614 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1615 | | |
1616 | | // Disable the replication and ensure no tablets are being polled |
1617 | 0 | ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, false)); |
1618 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0)); |
1619 | | |
1620 | | // Enable replication and ensure that all the tablets start being polled again |
1621 | 0 | ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, true)); |
1622 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1623 | 0 | } |
1624 | | |
1625 | 0 | TEST_P(TwoDCTest, TestDeleteUniverse) { |
1626 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1627 | |
|
1628 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, replication_factor)); |
1629 | |
|
1630 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
1631 | 0 | kUniverseId, {tables[0], tables[2]} /* all producer tables */)); |
1632 | | |
1633 | | // Verify that universe was setup on consumer. |
1634 | 0 | master::GetUniverseReplicationResponsePB resp; |
1635 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
1636 | | |
1637 | | // After creating the cluster, make sure all tablets being polled for. |
1638 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 12)); |
1639 | |
|
1640 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1641 | |
|
1642 | 0 | ASSERT_OK(VerifyUniverseReplicationDeleted(consumer_cluster(), consumer_client(), kUniverseId, |
1643 | 0 | FLAGS_cdc_read_rpc_timeout_ms * 2)); |
1644 | |
|
1645 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0)); |
1646 | 0 | } |
1647 | | |
1648 | 0 | TEST_P(TwoDCTest, TestWalRetentionSet) { |
1649 | 0 | FLAGS_cdc_wal_retention_time_secs = 8 * 3600; |
1650 | |
|
1651 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1652 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({8, 4, 4, 12}, {8, 4, 12, 8}, replication_factor)); |
1653 | |
|
1654 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1655 | | // tables contains both producer and consumer universe tables (alternately). |
1656 | | // Pick out just the producer tables from the list. |
1657 | 0 | producer_tables.reserve(tables.size() / 2); |
1658 | 0 | for (size_t i = 0; i < tables.size(); i += 2) { |
1659 | 0 | producer_tables.push_back(tables[i]); |
1660 | 0 | } |
1661 | 0 | ASSERT_OK(SetupUniverseReplication( |
1662 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1663 | | |
1664 | | // Verify that universe was setup on consumer. |
1665 | 0 | master::GetUniverseReplicationResponsePB resp; |
1666 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
1667 | | |
1668 | | // After creating the cluster, make sure all 32 tablets being polled for. |
1669 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); |
1670 | |
|
1671 | 0 | cdc::VerifyWalRetentionTime(producer_cluster(), "test_table_", FLAGS_cdc_wal_retention_time_secs); |
1672 | |
|
1673 | 0 | YBTableName table_name(YQL_DATABASE_CQL, kNamespaceName, "test_table_0"); |
1674 | | |
1675 | | // Issue an ALTER TABLE request on the producer to verify that it doesn't crash. |
1676 | 0 | auto table_alterer = producer_client()->NewTableAlterer(table_name); |
1677 | 0 | table_alterer->AddColumn("new_col")->Type(INT32); |
1678 | 0 | ASSERT_OK(table_alterer->timeout(MonoDelta::FromSeconds(kRpcTimeout))->Alter()); |
1679 | | |
1680 | | // Verify that the table got altered on the producer. |
1681 | 0 | YBSchema schema; |
1682 | 0 | PartitionSchema partition_schema; |
1683 | 0 | ASSERT_OK(producer_client()->GetTableSchema(table_name, &schema, &partition_schema)); |
1684 | |
|
1685 | 0 | ASSERT_NE(static_cast<int>(Schema::kColumnNotFound), schema.FindColumn("new_col")); |
1686 | 0 | } |
1687 | | |
1688 | 0 | TEST_P(TwoDCTest, TestProducerUniverseExpansion) { |
1689 | | // Test that after new node(s) are added to producer universe, we are able to get replicated data |
1690 | | // from the new node(s). |
1691 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 1)); |
1692 | |
|
1693 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1694 | | // tables contains both producer and consumer universe tables (alternately). |
1695 | | // Pick out just the producer table from the list. |
1696 | 0 | producer_tables.reserve(1); |
1697 | 0 | producer_tables.push_back(tables[0]); |
1698 | 0 | ASSERT_OK(SetupUniverseReplication( |
1699 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1700 | | |
1701 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1702 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1703 | |
|
1704 | 0 | WriteWorkload(0, 5, producer_client(), tables[0]->name()); |
1705 | | |
1706 | | // Add new node and wait for tablets to be rebalanced. |
1707 | | // After rebalancing, each node will be leader for 1 tablet. |
1708 | 0 | ASSERT_OK(producer_cluster()->AddTabletServer()); |
1709 | 0 | ASSERT_OK(producer_cluster()->WaitForTabletServerCount(2)); |
1710 | 0 | ASSERT_OK(WaitFor([&] () { return producer_client()->IsLoadBalanced(2); }, |
1711 | 0 | MonoDelta::FromSeconds(kRpcTimeout), "IsLoadBalanced")); |
1712 | | |
1713 | | // Check that all tablets continue to be polled for. |
1714 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); |
1715 | | |
1716 | | // Write some more rows. Note that some of these rows will have the new node as the tablet leader. |
1717 | 0 | WriteWorkload(6, 10, producer_client(), tables[0]->name()); |
1718 | | |
1719 | | // Verify that both clusters have the same records. |
1720 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1721 | 0 | } |
1722 | | |
1723 | 0 | TEST_P(TwoDCTest, ApplyOperationsRandomFailures) { |
1724 | 0 | SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability); |
1725 | |
|
1726 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1727 | | // Use unequal table count so we have M:N mapping and output to multiple tablets. |
1728 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({3}, {5}, replication_factor)); |
1729 | |
|
1730 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1731 | | // tables contains both producer and consumer universe tables (alternately). |
1732 | | // Pick out just the producer table from the list. |
1733 | 0 | producer_tables.reserve(1); |
1734 | 0 | producer_tables.push_back(tables[0]); |
1735 | 0 | ASSERT_OK(SetupUniverseReplication( |
1736 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1737 | | |
1738 | | // Set up bi-directional replication. |
1739 | 0 | std::vector<std::shared_ptr<client::YBTable>> consumer_tables; |
1740 | 0 | consumer_tables.reserve(1); |
1741 | 0 | consumer_tables.push_back(tables[1]); |
1742 | 0 | ASSERT_OK(SetupUniverseReplication( |
1743 | 0 | consumer_cluster(), producer_cluster(), producer_client(), kUniverseId, consumer_tables)); |
1744 | | |
1745 | | // After creating the cluster, make sure all producer tablets are being polled for. |
1746 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 5)); |
1747 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 3)); |
1748 | | |
1749 | | // Write 1000 entries to each cluster. |
1750 | 0 | std::thread t1([&]() { WriteWorkload(0, 1000, producer_client(), tables[0]->name()); }); |
1751 | 0 | std::thread t2([&]() { WriteWorkload(1000, 2000, consumer_client(), tables[1]->name()); }); |
1752 | |
|
1753 | 0 | t1.join(); |
1754 | 0 | t2.join(); |
1755 | | |
1756 | | // Verify that both clusters have the same records. |
1757 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1758 | | |
1759 | | // Stop replication on consumer. |
1760 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1761 | | |
1762 | | // Stop replication on producer |
1763 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId, producer_client(), producer_cluster())); |
1764 | 0 | } |
1765 | | |
1766 | 0 | TEST_P(TwoDCTest, TestInsertDeleteWorkloadWithRestart) { |
1767 | | // Good test for batching, make sure we can handle operations on the same key with different |
1768 | | // hybrid times. Then, do a restart and make sure we can successfully bootstrap the batched data. |
1769 | | // In additional, make sure we write exactly num_total_ops / batch_size batches to the cluster to |
1770 | | // ensure batching is actually enabled. |
1771 | 0 | constexpr uint32_t num_ops_per_workload = 100; |
1772 | 0 | constexpr uint32_t num_runs = 5; |
1773 | |
|
1774 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1775 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor)); |
1776 | |
|
1777 | 0 | WriteWorkload(0, num_ops_per_workload, producer_client(), tables[0]->name()); |
1778 | 0 | for (size_t i = 0; i < num_runs; i++) { |
1779 | 0 | WriteWorkload(0, num_ops_per_workload, producer_client(), tables[0]->name(), true); |
1780 | 0 | WriteWorkload(0, num_ops_per_workload, producer_client(), tables[0]->name()); |
1781 | 0 | } |
1782 | | |
1783 | | // Count the number of ops in total, expect 1 batch if the batch flag is set to 0. |
1784 | 0 | uint32_t expected_num_writes = FLAGS_cdc_max_apply_batch_num_records > 0 ? |
1785 | 0 | (num_ops_per_workload * (num_runs * 2 + 1)) / FLAGS_cdc_max_apply_batch_num_records : 1; |
1786 | |
|
1787 | 0 | LOG(INFO) << "expected num writes: " <<expected_num_writes; |
1788 | |
|
1789 | 0 | std::vector<std::shared_ptr<client::YBTable>> producer_tables; |
1790 | 0 | producer_tables.reserve(1); |
1791 | 0 | producer_tables.push_back(tables[0]); |
1792 | 0 | ASSERT_OK(SetupUniverseReplication( |
1793 | 0 | producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); |
1794 | |
|
1795 | 0 | ASSERT_OK(LoggedWaitFor([&]() { |
1796 | 0 | return GetSuccessfulWriteOps(consumer_cluster()) == expected_num_writes; |
1797 | 0 | }, MonoDelta::FromSeconds(60), "Wait for all batches to finish.")); |
1798 | | |
1799 | | // Verify that both clusters have the same records. |
1800 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1801 | |
|
1802 | 0 | ASSERT_OK(consumer_cluster()->RestartSync()); |
1803 | | |
1804 | | // Verify that both clusters have the same records. |
1805 | 0 | ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); |
1806 | | // Stop replication on consumer. |
1807 | 0 | ASSERT_OK(DeleteUniverseReplication(kUniverseId)); |
1808 | 0 | } |
1809 | | |
1810 | 0 | TEST_P(TwoDCTest, TestDeleteCDCStreamWithMissingStreams) { |
1811 | 0 | uint32_t replication_factor = NonTsanVsTsan(3, 1); |
1812 | |
|
1813 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, replication_factor)); |
1814 | |
|
1815 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
1816 | 0 | kUniverseId, {tables[0], tables[2]} /* all producer tables */)); |
1817 | | |
1818 | | // Verify that universe was setup on consumer. |
1819 | 0 | master::GetUniverseReplicationResponsePB resp; |
1820 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
1821 | | |
1822 | | // After creating the cluster, make sure all tablets being polled for. |
1823 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 12)); |
1824 | | |
1825 | | // Delete the CDC stream on the producer for a table. |
1826 | 0 | master::ListCDCStreamsResponsePB stream_resp; |
1827 | 0 | ASSERT_OK(GetCDCStreamForTable(tables[0]->id(), &stream_resp)); |
1828 | 0 | ASSERT_EQ(stream_resp.streams_size(), 1); |
1829 | 0 | ASSERT_EQ(stream_resp.streams(0).table_id().Get(0), tables[0]->id()); |
1830 | 0 | auto stream_id = stream_resp.streams(0).stream_id(); |
1831 | |
|
1832 | 0 | rpc::RpcController rpc; |
1833 | 0 | auto producer_proxy = std::make_shared<master::MasterReplicationProxy>( |
1834 | 0 | &producer_client()->proxy_cache(), |
1835 | 0 | ASSERT_RESULT(producer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1836 | |
|
1837 | 0 | master::DeleteCDCStreamRequestPB delete_cdc_stream_req; |
1838 | 0 | master::DeleteCDCStreamResponsePB delete_cdc_stream_resp; |
1839 | 0 | delete_cdc_stream_req.add_stream_id(stream_id); |
1840 | 0 | delete_cdc_stream_req.set_force_delete(true); |
1841 | |
|
1842 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1843 | 0 | ASSERT_OK(producer_proxy->DeleteCDCStream( |
1844 | 0 | delete_cdc_stream_req, &delete_cdc_stream_resp, &rpc)); |
1845 | | |
1846 | | // Try to delete the universe. |
1847 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1848 | 0 | &consumer_client()->proxy_cache(), |
1849 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1850 | 0 | rpc.Reset(); |
1851 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1852 | 0 | master::DeleteUniverseReplicationRequestPB delete_universe_req; |
1853 | 0 | master::DeleteUniverseReplicationResponsePB delete_universe_resp; |
1854 | 0 | delete_universe_req.set_producer_id(kUniverseId); |
1855 | 0 | delete_universe_req.set_ignore_errors(false); |
1856 | 0 | ASSERT_OK( |
1857 | 0 | master_proxy->DeleteUniverseReplication(delete_universe_req, &delete_universe_resp, &rpc)); |
1858 | | // Ensure that the error message describes the missing stream and related table. |
1859 | 0 | ASSERT_TRUE(delete_universe_resp.has_error()); |
1860 | 0 | std::string prefix = "Could not find the following streams:"; |
1861 | 0 | const auto error_str = delete_universe_resp.error().status().message(); |
1862 | 0 | ASSERT_TRUE(error_str.substr(0, prefix.size()) == prefix); |
1863 | 0 | ASSERT_NE(error_str.find(stream_id), string::npos); |
1864 | 0 | ASSERT_NE(error_str.find(tables[0]->id()), string::npos); |
1865 | | |
1866 | | // Force the delete. |
1867 | 0 | rpc.Reset(); |
1868 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1869 | 0 | delete_universe_req.set_ignore_errors(true); |
1870 | 0 | ASSERT_OK( |
1871 | 0 | master_proxy->DeleteUniverseReplication(delete_universe_req, &delete_universe_resp, &rpc)); |
1872 | | |
1873 | | // Ensure that the delete is now succesful. |
1874 | 0 | ASSERT_OK(VerifyUniverseReplicationDeleted(consumer_cluster(), consumer_client(), kUniverseId, |
1875 | 0 | FLAGS_cdc_read_rpc_timeout_ms * 2)); |
1876 | |
|
1877 | 0 | ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0)); |
1878 | 0 | } |
1879 | | |
1880 | 0 | TEST_P(TwoDCTest, TestAlterWhenProducerIsInaccessible) { |
1881 | 0 | auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, 1)); |
1882 | |
|
1883 | 0 | ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), |
1884 | 0 | kUniverseId, {tables[0]} /* all producer tables */)); |
1885 | | |
1886 | | // Verify everything is setup correctly. |
1887 | 0 | master::GetUniverseReplicationResponsePB resp; |
1888 | 0 | ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); |
1889 | | |
1890 | | // Stop the producer master. |
1891 | 0 | producer_cluster()->mini_master(0)->Shutdown(); |
1892 | | |
1893 | | // Try to alter replication. |
1894 | 0 | master::AlterUniverseReplicationRequestPB alter_req; |
1895 | 0 | master::AlterUniverseReplicationResponsePB alter_resp; |
1896 | 0 | alter_req.set_producer_id(kUniverseId); |
1897 | 0 | alter_req.add_producer_table_ids_to_add("123"); // Doesn't matter as we cannot connect. |
1898 | 0 | rpc::RpcController rpc; |
1899 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
1900 | |
|
1901 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
1902 | 0 | &consumer_client()->proxy_cache(), |
1903 | 0 | ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); |
1904 | | |
1905 | | // Ensure that we just return an error and don't have a fatal. |
1906 | 0 | ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); |
1907 | 0 | ASSERT_TRUE(alter_resp.has_error()); |
1908 | 0 | } |
1909 | | |
1910 | | } // namespace enterprise |
1911 | | } // namespace yb |