/Users/deen/code/yugabyte-db/ent/src/yb/master/master-test_ent.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include <boost/algorithm/string.hpp> |
14 | | #include <boost/lexical_cast.hpp> |
15 | | |
16 | | #include "yb/common/schema.h" |
17 | | #include "yb/common/wire_protocol.h" |
18 | | |
19 | | #include "yb/gutil/casts.h" |
20 | | |
21 | | #include "yb/master/master_ddl.proxy.h" |
22 | | #include "yb/master/master_replication.proxy.h" |
23 | | #include "yb/master/master_defaults.h" |
24 | | |
25 | | #include "../../src/yb/master/master-test_base.h" |
26 | | |
27 | | #include "yb/util/result.h" |
28 | | |
29 | | DECLARE_int32(cdc_state_table_num_tablets); |
30 | | |
31 | | namespace yb { |
32 | | namespace master { |
33 | | namespace enterprise { |
34 | | |
35 | | constexpr const char* kTableName = "cdc_table"; |
36 | | static const Schema kTableSchema({ ColumnSchema("key", INT32), |
37 | | ColumnSchema("v1", UINT64), |
38 | | ColumnSchema("v2", STRING) }, |
39 | | 1); |
40 | | |
41 | | class MasterTestEnt : public MasterTestBase { |
42 | | protected: |
43 | | Status CreateCDCStream(const TableId& table_id, CDCStreamId* stream_id); |
44 | | Status GetCDCStream(const CDCStreamId& stream_id, GetCDCStreamResponsePB* resp); |
45 | | Status DeleteCDCStream(const CDCStreamId& stream_id); |
46 | | Status ListCDCStreams(ListCDCStreamsResponsePB* resp); |
47 | | |
48 | | Status SetupUniverseReplication( |
49 | | const std::string& producer_id, const std::vector<std::string>& master_addr, |
50 | | const std::vector<std::string>& tables); |
51 | | Status DeleteUniverseReplication(const std::string& producer_id); |
52 | | Status GetUniverseReplication( |
53 | | const std::string& producer_id, GetUniverseReplicationResponsePB* resp); |
54 | | |
55 | | }; |
56 | | |
57 | 4 | Status MasterTestEnt::CreateCDCStream(const TableId& table_id, CDCStreamId* stream_id) { |
58 | 4 | CreateCDCStreamRequestPB req; |
59 | 4 | CreateCDCStreamResponsePB resp; |
60 | | |
61 | 4 | req.set_table_id(table_id); |
62 | 4 | RETURN_NOT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController())); |
63 | 4 | if (resp.has_error()) { |
64 | 0 | RETURN_NOT_OK(StatusFromPB(resp.error().status())); |
65 | 0 | } |
66 | | |
67 | 4 | RETURN_NOT_OK(WaitFor([&](){ |
68 | 4 | IsCreateTableDoneRequestPB is_create_req; |
69 | 4 | IsCreateTableDoneResponsePB is_create_resp; |
70 | | |
71 | 4 | is_create_req.mutable_table()->set_table_name(master::kCdcStateTableName); |
72 | 4 | is_create_req.mutable_table()->mutable_namespace_()->set_name(master::kSystemNamespaceName); |
73 | | |
74 | 4 | auto s = proxy_ddl_->IsCreateTableDone(is_create_req, &is_create_resp, ResetAndGetController()); |
75 | 4 | if (!s.ok()) { |
76 | 4 | return false; |
77 | 4 | } |
78 | 4 | return true; |
79 | 4 | }, MonoDelta::FromSeconds(30), "Wait for cdc_state table creation to finish")); |
80 | | |
81 | 4 | *stream_id = resp.stream_id(); |
82 | 4 | return Status::OK(); |
83 | 4 | } |
84 | | |
85 | 5 | Status MasterTestEnt::GetCDCStream(const CDCStreamId& stream_id, GetCDCStreamResponsePB* resp) { |
86 | 5 | GetCDCStreamRequestPB req; |
87 | 5 | req.set_stream_id(stream_id); |
88 | | |
89 | 5 | RETURN_NOT_OK(proxy_replication_->GetCDCStream(req, resp, ResetAndGetController())); |
90 | 5 | if (resp->has_error()) { |
91 | 1 | RETURN_NOT_OK(StatusFromPB(resp->error().status())); |
92 | 1 | } |
93 | 4 | return Status::OK(); |
94 | 5 | } |
95 | | |
96 | 1 | Status MasterTestEnt::DeleteCDCStream(const CDCStreamId& stream_id) { |
97 | 1 | DeleteCDCStreamRequestPB req; |
98 | 1 | DeleteCDCStreamResponsePB resp; |
99 | 1 | req.add_stream_id(stream_id); |
100 | | |
101 | 1 | RETURN_NOT_OK(proxy_replication_->DeleteCDCStream(req, &resp, ResetAndGetController())); |
102 | 1 | if (resp.has_error()) { |
103 | 0 | RETURN_NOT_OK(StatusFromPB(resp.error().status())); |
104 | 0 | } |
105 | 1 | return Status::OK(); |
106 | 1 | } |
107 | | |
108 | 1 | Status MasterTestEnt::ListCDCStreams(ListCDCStreamsResponsePB* resp) { |
109 | 1 | ListCDCStreamsRequestPB req; |
110 | | |
111 | 1 | RETURN_NOT_OK(proxy_replication_->ListCDCStreams(req, resp, ResetAndGetController())); |
112 | 1 | if (resp->has_error()) { |
113 | 0 | RETURN_NOT_OK(StatusFromPB(resp->error().status())); |
114 | 0 | } |
115 | 1 | return Status::OK(); |
116 | 1 | } |
117 | | |
118 | | Status MasterTestEnt::SetupUniverseReplication( |
119 | | const std::string& producer_id, const std::vector<std::string>& producer_master_addrs, |
120 | 2 | const std::vector<TableId>& tables) { |
121 | 2 | SetupUniverseReplicationRequestPB req; |
122 | 2 | SetupUniverseReplicationResponsePB resp; |
123 | | |
124 | 2 | req.set_producer_id(producer_id); |
125 | 2 | req.mutable_producer_master_addresses()->Reserve(narrow_cast<int>(producer_master_addrs.size())); |
126 | 2 | for (const auto& addr : producer_master_addrs) { |
127 | 2 | std::vector<std::string> hp; |
128 | 2 | boost::split(hp, addr, boost::is_any_of(":")); |
129 | 2 | CHECK_EQ(hp.size(), 2); |
130 | 2 | auto* master = req.add_producer_master_addresses(); |
131 | 2 | master->set_host(hp[0]); |
132 | 2 | master->set_port(boost::lexical_cast<uint32_t>(hp[1])); |
133 | 2 | } |
134 | 2 | req.mutable_producer_table_ids()->Reserve(narrow_cast<int>(tables.size())); |
135 | 2 | for (const auto& table : tables) { |
136 | 2 | req.add_producer_table_ids(table); |
137 | 2 | } |
138 | | |
139 | 2 | RETURN_NOT_OK(proxy_replication_->SetupUniverseReplication(req, &resp, ResetAndGetController())); |
140 | 2 | if (resp.has_error()) { |
141 | 0 | RETURN_NOT_OK(StatusFromPB(resp.error().status())); |
142 | 0 | } |
143 | 2 | return Status::OK(); |
144 | 2 | } |
145 | | |
146 | | Status MasterTestEnt::GetUniverseReplication( |
147 | 0 | const std::string& producer_id, GetUniverseReplicationResponsePB* resp) { |
148 | 0 | GetUniverseReplicationRequestPB req; |
149 | 0 | req.set_producer_id(producer_id); |
150 | |
|
151 | 0 | RETURN_NOT_OK(proxy_replication_->GetUniverseReplication(req, resp, ResetAndGetController())); |
152 | 0 | if (resp->has_error()) { |
153 | 0 | RETURN_NOT_OK(StatusFromPB(resp->error().status())); |
154 | 0 | } |
155 | 0 | return Status::OK(); |
156 | 0 | } |
157 | | |
158 | 0 | Status MasterTestEnt::DeleteUniverseReplication(const std::string& producer_id) { |
159 | 0 | DeleteUniverseReplicationRequestPB req; |
160 | 0 | DeleteUniverseReplicationResponsePB resp; |
161 | 0 | req.set_producer_id(producer_id); |
162 | |
|
163 | 0 | RETURN_NOT_OK(proxy_replication_->DeleteUniverseReplication(req, &resp, ResetAndGetController())); |
164 | 0 | if (resp.has_error()) { |
165 | 0 | RETURN_NOT_OK(StatusFromPB(resp.error().status())); |
166 | 0 | } |
167 | 0 | return Status::OK(); |
168 | 0 | } |
169 | | |
170 | 1 | TEST_F(MasterTestEnt, TestCreateCDCStreamInvalidTable) { |
171 | 1 | CreateCDCStreamRequestPB req; |
172 | 1 | CreateCDCStreamResponsePB resp; |
173 | | |
174 | 1 | req.set_table_id("invalidid"); |
175 | 1 | ASSERT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController())); |
176 | 1 | SCOPED_TRACE(resp.DebugString()); |
177 | 1 | ASSERT_TRUE(resp.has_error()); |
178 | 1 | ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code()); |
179 | 1 | } |
180 | | |
181 | 1 | TEST_F(MasterTestEnt, TestCreateCDCStream) { |
182 | 1 | TableId table_id; |
183 | 1 | ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id)); |
184 | | |
185 | 1 | CDCStreamId stream_id; |
186 | 1 | FLAGS_cdc_state_table_num_tablets = 1; |
187 | 1 | ASSERT_OK(CreateCDCStream(table_id, &stream_id)); |
188 | | |
189 | 1 | GetCDCStreamResponsePB resp; |
190 | 1 | ASSERT_OK(GetCDCStream(stream_id, &resp)); |
191 | 1 | ASSERT_EQ(resp.stream().table_id().Get(0), table_id); |
192 | 1 | } |
193 | | |
194 | 1 | TEST_F(MasterTestEnt, TestDeleteCDCStream) { |
195 | 1 | TableId table_id; |
196 | 1 | ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id)); |
197 | | |
198 | 1 | CDCStreamId stream_id; |
199 | 1 | FLAGS_cdc_state_table_num_tablets = 1; |
200 | 1 | ASSERT_OK(CreateCDCStream(table_id, &stream_id)); |
201 | | |
202 | 1 | GetCDCStreamResponsePB resp; |
203 | 1 | ASSERT_OK(GetCDCStream(stream_id, &resp)); |
204 | 1 | ASSERT_EQ(resp.stream().table_id().Get(0), table_id); |
205 | | |
206 | 1 | ASSERT_OK(DeleteCDCStream(stream_id)); |
207 | | |
208 | 1 | resp.Clear(); |
209 | 1 | ASSERT_NOK(GetCDCStream(stream_id, &resp)); |
210 | 1 | ASSERT_TRUE(resp.has_error()); |
211 | 1 | ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code()); |
212 | 1 | } |
213 | | |
214 | 1 | TEST_F(MasterTestEnt, TestDeleteTableWithCDCStream) { |
215 | 1 | TableId table_id; |
216 | 1 | ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id)); |
217 | | |
218 | 1 | CDCStreamId stream_id; |
219 | 1 | FLAGS_cdc_state_table_num_tablets = 1; |
220 | 1 | ASSERT_OK(CreateCDCStream(table_id, &stream_id)); |
221 | | |
222 | 1 | GetCDCStreamResponsePB resp; |
223 | 1 | ASSERT_OK(GetCDCStream(stream_id, &resp)); |
224 | 1 | ASSERT_EQ(resp.stream().table_id().Get(0), table_id); |
225 | | |
226 | | // Deleting the table will fail since it has a CDC stream attached. |
227 | 1 | TableId id; |
228 | 1 | ASSERT_NOK(DeleteTableSync(default_namespace_name, kTableName, &id)); |
229 | | |
230 | 1 | ASSERT_OK(GetCDCStream(stream_id, &resp)); |
231 | 1 | } |
232 | | |
233 | 1 | TEST_F(MasterTestEnt, TestListCDCStreams) { |
234 | 1 | TableId table_id; |
235 | 1 | ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id)); |
236 | | |
237 | 1 | CDCStreamId stream_id; |
238 | 1 | FLAGS_cdc_state_table_num_tablets = 1; |
239 | 1 | ASSERT_OK(CreateCDCStream(table_id, &stream_id)); |
240 | | |
241 | 1 | ListCDCStreamsResponsePB resp; |
242 | 1 | ASSERT_OK(ListCDCStreams(&resp)); |
243 | 1 | ASSERT_EQ(1, resp.streams_size()); |
244 | 1 | ASSERT_EQ(stream_id, resp.streams(0).stream_id()); |
245 | 1 | } |
246 | | |
247 | 1 | TEST_F(MasterTestEnt, TestSetupUniverseReplication) { |
248 | 1 | std::string producer_id = "producer_universe"; |
249 | 1 | std::vector<std::string> producer_masters {"127.0.0.1:7100"}; |
250 | 1 | std::vector<std::string> tables {"some_table_id"}; |
251 | | // Always fails because we don't have actual producer. |
252 | 1 | ASSERT_NOK(SetupUniverseReplication(producer_id, producer_masters, tables)); |
253 | | |
254 | 0 | GetUniverseReplicationResponsePB resp; |
255 | 0 | ASSERT_OK(GetUniverseReplication(producer_id, &resp)); |
256 | 0 | ASSERT_EQ(resp.entry().producer_id(), producer_id); |
257 | |
|
258 | 0 | ASSERT_EQ(resp.entry().producer_master_addresses_size(), 1); |
259 | 0 | std::string addr; |
260 | 0 | const auto& hp = resp.entry().producer_master_addresses(0); |
261 | 0 | addr = hp.host() + ":" + std::to_string(hp.port()); |
262 | 0 | ASSERT_EQ(addr, "127.0.0.1:7100"); |
263 | |
|
264 | 0 | ASSERT_EQ(resp.entry().tables_size(), 1); |
265 | 0 | ASSERT_EQ(resp.entry().tables(0), "some_table_id"); |
266 | 0 | } |
267 | | |
268 | 1 | TEST_F(MasterTestEnt, TestDeleteUniverseReplication) { |
269 | 1 | std::string producer_id = "producer_universe"; |
270 | 1 | std::vector<std::string> producer_masters {"127.0.0.1:7100"}; |
271 | 1 | std::vector<std::string> tables {"some_table_id"}; |
272 | | // Always fails because we don't have actual producer. |
273 | 1 | ASSERT_NOK(SetupUniverseReplication(producer_id, producer_masters, tables)); |
274 | | |
275 | | // Verify that universe was created. |
276 | 0 | GetUniverseReplicationResponsePB resp; |
277 | 0 | ASSERT_OK(GetUniverseReplication(producer_id, &resp)); |
278 | 0 | ASSERT_EQ(resp.entry().producer_id(), producer_id); |
279 | |
|
280 | 0 | ASSERT_OK(DeleteUniverseReplication(producer_id)); |
281 | |
|
282 | 0 | resp.Clear(); |
283 | 0 | ASSERT_NOK(GetUniverseReplication(producer_id, &resp)); |
284 | 0 | ASSERT_TRUE(resp.has_error()); |
285 | 0 | ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code()); |
286 | 0 | } |
287 | | |
288 | | } // namespace enterprise |
289 | | } // namespace master |
290 | | } // namespace yb |