/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc_test_base.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/integration-tests/twodc_test_base.h" |
15 | | |
16 | | #include <string> |
17 | | |
18 | | #include "yb/cdc/cdc_service.h" |
19 | | |
20 | | #include "yb/client/client.h" |
21 | | #include "yb/client/table.h" |
22 | | |
23 | | #include "yb/common/wire_protocol.h" |
24 | | |
25 | | #include "yb/gutil/casts.h" |
26 | | |
27 | | #include "yb/integration-tests/cdc_test_util.h" |
28 | | #include "yb/integration-tests/mini_cluster.h" |
29 | | #include "yb/master/catalog_manager_if.h" |
30 | | #include "yb/master/master_replication.proxy.h" |
31 | | #include "yb/master/mini_master.h" |
32 | | #include "yb/rpc/rpc_controller.h" |
33 | | #include "yb/tserver/cdc_consumer.h" |
34 | | #include "yb/tserver/mini_tablet_server.h" |
35 | | #include "yb/tserver/tablet_server.h" |
36 | | #include "yb/util/test_util.h" |
37 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
38 | | #include "yb/yql/pgwrapper/pg_wrapper.h" |
39 | | |
40 | | namespace yb { |
41 | | |
42 | | using client::YBClient; |
43 | | using tserver::enterprise::CDCConsumer; |
44 | | |
45 | | namespace enterprise { |
46 | | |
47 | 2 | void TwoDCTestBase::TearDown() { |
48 | 2 | LOG(INFO) << "Destroying CDC Clusters"; |
49 | 2 | if (consumer_cluster()) { |
50 | 0 | if (consumer_cluster_.pg_supervisor_) { |
51 | 0 | consumer_cluster_.pg_supervisor_->Stop(); |
52 | 0 | } |
53 | 0 | consumer_cluster_.mini_cluster_->Shutdown(); |
54 | 0 | consumer_cluster_.mini_cluster_.reset(); |
55 | 0 | } |
56 | | |
57 | 2 | if (producer_cluster()) { |
58 | 2 | if (producer_cluster_.pg_supervisor_) { |
59 | 0 | producer_cluster_.pg_supervisor_->Stop(); |
60 | 0 | } |
61 | 2 | producer_cluster_.mini_cluster_->Shutdown(); |
62 | 2 | producer_cluster_.mini_cluster_.reset(); |
63 | 2 | } |
64 | | |
65 | 2 | producer_cluster_.client_.reset(); |
66 | 2 | consumer_cluster_.client_.reset(); |
67 | | |
68 | 2 | YBTest::TearDown(); |
69 | 2 | } |
70 | | |
71 | | Status TwoDCTestBase::SetupUniverseReplication( |
72 | | MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, |
73 | | const std::string& universe_id, const std::vector<std::shared_ptr<client::YBTable>>& tables, |
74 | 0 | bool leader_only) { |
75 | 0 | master::SetupUniverseReplicationRequestPB req; |
76 | 0 | master::SetupUniverseReplicationResponsePB resp; |
77 | |
|
78 | 0 | req.set_producer_id(universe_id); |
79 | 0 | string master_addr = producer_cluster->GetMasterAddresses(); |
80 | 0 | if (leader_only) { |
81 | 0 | master_addr = VERIFY_RESULT(producer_cluster->GetLeaderMiniMaster())->bound_rpc_addr_str(); |
82 | 0 | } |
83 | 0 | auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0)); |
84 | 0 | HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses()); |
85 | |
|
86 | 0 | req.mutable_producer_table_ids()->Reserve(narrow_cast<int>(tables.size())); |
87 | 0 | for (const auto& table : tables) { |
88 | 0 | req.add_producer_table_ids(table->id()); |
89 | 0 | } |
90 | |
|
91 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
92 | 0 | &consumer_client->proxy_cache(), |
93 | 0 | VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr()); |
94 | | |
95 | 0 | rpc::RpcController rpc; |
96 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
97 | 0 | return WaitFor([&] () -> Result<bool> { |
98 | 0 | if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) { |
99 | 0 | return false; |
100 | 0 | } |
101 | 0 | if (resp.has_error()) { |
102 | 0 | return false; |
103 | 0 | } |
104 | 0 | return true; |
105 | 0 | }, MonoDelta::FromSeconds(30), "Setup universe replication"); |
106 | 0 | } |
107 | | |
108 | | Status TwoDCTestBase::VerifyUniverseReplication( |
109 | | MiniCluster* consumer_cluster, YBClient* consumer_client, |
110 | 0 | const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp) { |
111 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
112 | 0 | master::GetUniverseReplicationRequestPB req; |
113 | 0 | req.set_producer_id(universe_id); |
114 | 0 | resp->Clear(); |
115 | |
|
116 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
117 | 0 | &consumer_client->proxy_cache(), |
118 | 0 | VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr()); |
119 | 0 | rpc::RpcController rpc; |
120 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
121 | |
|
122 | 0 | Status s = master_proxy->GetUniverseReplication(req, resp, &rpc); |
123 | 0 | return s.ok() && !resp->has_error() && |
124 | 0 | resp->entry().state() == master::SysUniverseReplicationEntryPB::ACTIVE; |
125 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication"); |
126 | 0 | } |
127 | | |
128 | | Status TwoDCTestBase::ToggleUniverseReplication( |
129 | | MiniCluster* consumer_cluster, YBClient* consumer_client, |
130 | 0 | const std::string& universe_id, bool is_enabled) { |
131 | 0 | master::SetUniverseReplicationEnabledRequestPB req; |
132 | 0 | master::SetUniverseReplicationEnabledResponsePB resp; |
133 | |
|
134 | 0 | req.set_producer_id(universe_id); |
135 | 0 | req.set_is_enabled(is_enabled); |
136 | |
|
137 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
138 | 0 | &consumer_client->proxy_cache(), |
139 | 0 | VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr()); |
140 | | |
141 | 0 | rpc::RpcController rpc; |
142 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
143 | 0 | RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc)); |
144 | 0 | if (resp.has_error()) { |
145 | 0 | return StatusFromPB(resp.error().status()); |
146 | 0 | } |
147 | 0 | return Status::OK(); |
148 | 0 | } |
149 | | |
150 | | Status TwoDCTestBase::VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster, |
151 | 0 | YBClient* consumer_client, const std::string& universe_id, int timeout) { |
152 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
153 | 0 | master::GetUniverseReplicationRequestPB req; |
154 | 0 | master::GetUniverseReplicationResponsePB resp; |
155 | 0 | req.set_producer_id(universe_id); |
156 | |
|
157 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
158 | 0 | &consumer_client->proxy_cache(), |
159 | 0 | VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr()); |
160 | 0 | rpc::RpcController rpc; |
161 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
162 | |
|
163 | 0 | Status s = master_proxy->GetUniverseReplication(req, &resp, &rpc); |
164 | 0 | return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; |
165 | 0 | }, MonoDelta::FromMilliseconds(timeout), "Verify universe replication deleted"); |
166 | 0 | } |
167 | | |
168 | | Status TwoDCTestBase::GetCDCStreamForTable( |
169 | 0 | const std::string& table_id, master::ListCDCStreamsResponsePB* resp) { |
170 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
171 | 0 | master::ListCDCStreamsRequestPB req; |
172 | 0 | req.set_table_id(table_id); |
173 | 0 | resp->Clear(); |
174 | |
|
175 | 0 | auto leader_mini_master = producer_cluster()->GetLeaderMiniMaster(); |
176 | 0 | if (!leader_mini_master.ok()) { |
177 | 0 | return false; |
178 | 0 | } |
179 | 0 | Status s = (*leader_mini_master)->catalog_manager().ListCDCStreams(&req, resp); |
180 | 0 | return s.ok() && !resp->has_error() && resp->streams_size() == 1; |
181 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Get CDC stream for table"); |
182 | 0 | } |
183 | | |
184 | 0 | uint32_t TwoDCTestBase::GetSuccessfulWriteOps(MiniCluster* cluster) { |
185 | 0 | uint32_t size = 0; |
186 | 0 | for (const auto& mini_tserver : cluster->mini_tablet_servers()) { |
187 | 0 | auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(mini_tserver->server()); |
188 | 0 | CDCConsumer* cdc_consumer; |
189 | 0 | if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { |
190 | 0 | size += cdc_consumer->GetNumSuccessfulWriteRpcs(); |
191 | 0 | } |
192 | 0 | } |
193 | 0 | return size; |
194 | 0 | } |
195 | | |
196 | 0 | Status TwoDCTestBase::DeleteUniverseReplication(const std::string& universe_id) { |
197 | 0 | return DeleteUniverseReplication(universe_id, consumer_client(), consumer_cluster()); |
198 | 0 | } |
199 | | |
200 | | Status TwoDCTestBase::DeleteUniverseReplication( |
201 | 0 | const std::string& universe_id, YBClient* client, MiniCluster* cluster) { |
202 | 0 | master::DeleteUniverseReplicationRequestPB req; |
203 | 0 | master::DeleteUniverseReplicationResponsePB resp; |
204 | |
|
205 | 0 | req.set_producer_id(universe_id); |
206 | |
|
207 | 0 | auto master_proxy = std::make_shared<master::MasterReplicationProxy>( |
208 | 0 | &client->proxy_cache(), |
209 | 0 | VERIFY_RESULT(cluster->GetLeaderMiniMaster())->bound_rpc_addr()); |
210 | | |
211 | 0 | rpc::RpcController rpc; |
212 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
213 | 0 | RETURN_NOT_OK(master_proxy->DeleteUniverseReplication(req, &resp, &rpc)); |
214 | 0 | LOG(INFO) << "Delete universe succeeded"; |
215 | 0 | return Status::OK(); |
216 | 0 | } |
217 | | |
218 | 0 | size_t TwoDCTestBase::NumProducerTabletsPolled(MiniCluster* cluster) { |
219 | 0 | size_t size = 0; |
220 | 0 | for (const auto& mini_tserver : cluster->mini_tablet_servers()) { |
221 | 0 | size_t new_size = 0; |
222 | 0 | auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(mini_tserver->server()); |
223 | 0 | CDCConsumer* cdc_consumer; |
224 | 0 | if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { |
225 | 0 | auto tablets_running = cdc_consumer->TEST_producer_tablets_running(); |
226 | 0 | new_size = tablets_running.size(); |
227 | 0 | } |
228 | 0 | size += new_size; |
229 | 0 | } |
230 | 0 | return size; |
231 | 0 | } |
232 | | |
233 | | Status TwoDCTestBase::CorrectlyPollingAllTablets( |
234 | 0 | MiniCluster* cluster, uint32_t num_producer_tablets) { |
235 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
236 | 0 | static int i = 0; |
237 | 0 | constexpr int kNumIterationsWithCorrectResult = 5; |
238 | 0 | auto cur_tablets = NumProducerTabletsPolled(cluster); |
239 | 0 | if (cur_tablets == num_producer_tablets) { |
240 | 0 | if (i++ == kNumIterationsWithCorrectResult) { |
241 | 0 | i = 0; |
242 | 0 | return true; |
243 | 0 | } |
244 | 0 | } else { |
245 | 0 | i = 0; |
246 | 0 | } |
247 | 0 | LOG(INFO) << "Tablets being polled: " << cur_tablets; |
248 | 0 | return false; |
249 | 0 | }, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled"); |
250 | 0 | } |
251 | | |
252 | | } // namespace enterprise |
253 | | } // namespace yb |