/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdcsdk_test_base.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/integration-tests/cdcsdk_test_base.h" |
14 | | |
15 | | #include <algorithm> |
16 | | #include <utility> |
17 | | #include <string> |
18 | | #include <chrono> |
19 | | #include <boost/assign.hpp> |
20 | | #include <gtest/gtest.h> |
21 | | |
22 | | #include "yb/cdc/cdc_service.h" |
23 | | #include "yb/cdc/cdc_service.proxy.h" |
24 | | |
25 | | #include "yb/client/client.h" |
26 | | #include "yb/client/meta_cache.h" |
27 | | #include "yb/client/schema.h" |
28 | | #include "yb/client/session.h" |
29 | | #include "yb/client/table.h" |
30 | | #include "yb/client/table_alterer.h" |
31 | | #include "yb/client/table_creator.h" |
32 | | #include "yb/client/table_handle.h" |
33 | | #include "yb/client/transaction.h" |
34 | | #include "yb/client/yb_op.h" |
35 | | |
36 | | #include "yb/common/common.pb.h" |
37 | | #include "yb/common/entity_ids.h" |
38 | | #include "yb/common/ql_value.h" |
39 | | |
40 | | #include "yb/gutil/stl_util.h" |
41 | | #include "yb/gutil/strings/join.h" |
42 | | #include "yb/gutil/strings/substitute.h" |
43 | | |
44 | | #include "yb/integration-tests/mini_cluster.h" |
45 | | |
46 | | #include "yb/master/catalog_manager.h" |
47 | | #include "yb/master/cdc_consumer_registry_service.h" |
48 | | #include "yb/master/master.h" |
49 | | #include "yb/master/master_client.pb.h" |
50 | | #include "yb/master/master_ddl.pb.h" |
51 | | #include "yb/master/master_ddl.proxy.h" |
52 | | #include "yb/master/master_replication.proxy.h" |
53 | | #include "yb/master/mini_master.h" |
54 | | #include "yb/master/sys_catalog_initialization.h" |
55 | | |
56 | | #include "yb/rpc/rpc_controller.h" |
57 | | |
58 | | #include "yb/tablet/tablet.h" |
59 | | #include "yb/tablet/tablet_peer.h" |
60 | | |
61 | | #include "yb/tserver/mini_tablet_server.h" |
62 | | #include "yb/tserver/tablet_server.h" |
63 | | #include "yb/tserver/ts_tablet_manager.h" |
64 | | |
65 | | #include "yb/util/test_util.h" |
66 | | |
67 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
68 | | #include "yb/yql/pgwrapper/pg_wrapper.h" |
69 | | |
70 | | namespace yb { |
71 | | using client::YBClient; |
72 | | using client::YBTableName; |
73 | | |
74 | | namespace cdc { |
75 | | namespace enterprise { |
76 | | |
77 | 0 | void CDCSDKTestBase::TearDown() { |
78 | 0 | YBTest::TearDown(); |
79 | |
|
80 | 0 | LOG(INFO) << "Destroying cluster for CDCSDK"; |
81 | |
|
82 | 0 | if (test_cluster()) { |
83 | 0 | if (test_cluster_.pg_supervisor_) { |
84 | 0 | test_cluster_.pg_supervisor_->Stop(); |
85 | 0 | } |
86 | 0 | test_cluster_.mini_cluster_->Shutdown(); |
87 | 0 | test_cluster_.mini_cluster_.reset(); |
88 | 0 | } |
89 | 0 | test_cluster_.client_.reset(); |
90 | 0 | } |
91 | | |
92 | 0 | std::unique_ptr<CDCServiceProxy> CDCSDKTestBase::GetCdcProxy() { |
93 | 0 | YBClient *client_ = test_client(); |
94 | 0 | const auto mini_server = test_cluster()->mini_tablet_servers().front(); |
95 | 0 | std::unique_ptr<CDCServiceProxy> proxy = std::make_unique<CDCServiceProxy>( |
96 | 0 | &client_->proxy_cache(), HostPort::FromBoundEndpoint(mini_server->bound_rpc_addr())); |
97 | 0 | return proxy; |
98 | 0 | } |
99 | | |
100 | | // Create a test database to work on. |
101 | | Status CDCSDKTestBase::CreateDatabase( |
102 | | Cluster* cluster, |
103 | | const std::string& namespace_name, |
104 | 0 | bool colocated) { |
105 | 0 | auto conn = VERIFY_RESULT(cluster->Connect()); |
106 | 0 | RETURN_NOT_OK(conn.ExecuteFormat( |
107 | 0 | "CREATE DATABASE $0$1", namespace_name, colocated ? " colocated = true" : "")); |
108 | 0 | return Status::OK(); |
109 | 0 | } |
110 | | |
111 | 0 | Status CDCSDKTestBase::InitPostgres(Cluster* cluster) { |
112 | 0 | auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers()); |
113 | 0 | auto port = cluster->mini_cluster_->AllocateFreePort(); |
114 | 0 | pgwrapper::PgProcessConf pg_process_conf = |
115 | 0 | VERIFY_RESULT(pgwrapper::PgProcessConf::CreateValidateAndRunInitDb( |
116 | 0 | AsString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), |
117 | 0 | pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", |
118 | 0 | pg_ts->server()->GetSharedMemoryFd())); |
119 | 0 | pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; |
120 | 0 | pg_process_conf.force_disable_log_file = true; |
121 | 0 | FLAGS_pgsql_proxy_webserver_port = cluster->mini_cluster_->AllocateFreePort(); |
122 | |
|
123 | 0 | LOG(INFO) << "Starting PostgreSQL server listening on " << pg_process_conf.listen_addresses |
124 | 0 | << ":" << pg_process_conf.pg_port << ", data: " << pg_process_conf.data_dir |
125 | 0 | << ", pgsql webserver port: " << FLAGS_pgsql_proxy_webserver_port; |
126 | 0 | cluster->pg_supervisor_ = std::make_unique<pgwrapper::PgSupervisor>(pg_process_conf); |
127 | 0 | RETURN_NOT_OK(cluster->pg_supervisor_->Start()); |
128 | | |
129 | 0 | cluster->pg_host_port_ = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port); |
130 | 0 | return Status::OK(); |
131 | 0 | } |
132 | | |
133 | | // Set up a cluster with the specified parameters. |
134 | | Status CDCSDKTestBase::SetUpWithParams( |
135 | | uint32_t replication_factor, |
136 | | uint32_t num_masters, |
137 | 23 | bool colocated) { |
138 | 23 | master::SetDefaultInitialSysCatalogSnapshotFlags(); |
139 | 23 | CDCSDKTestBase::SetUp(); |
140 | 23 | FLAGS_enable_ysql = true; |
141 | 23 | FLAGS_master_auto_run_initdb = true; |
142 | 23 | FLAGS_hide_pg_catalog_table_creation_logs = true; |
143 | 23 | FLAGS_pggate_rpc_timeout_secs = 120; |
144 | 23 | FLAGS_cdc_max_apply_batch_num_records = 1; |
145 | 23 | FLAGS_cdc_enable_replicate_intents = true; |
146 | 23 | FLAGS_replication_factor = replication_factor; |
147 | | |
148 | 23 | MiniClusterOptions opts; |
149 | 23 | opts.num_masters = num_masters; |
150 | 23 | opts.num_tablet_servers = replication_factor; |
151 | 23 | opts.cluster_id = "cdcsdk_cluster"; |
152 | | |
153 | 23 | test_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts); |
154 | | |
155 | 23 | RETURN_NOT_OK(test_cluster()->StartSync()); |
156 | 23 | RETURN_NOT_OK(test_cluster()->WaitForTabletServerCount(replication_factor)); |
157 | 23 | RETURN_NOT_OK(WaitForInitDb(test_cluster())); |
158 | 23 | test_cluster_.client_ = VERIFY_RESULT(test_cluster()->CreateClient()); |
159 | 23 | RETURN_NOT_OK(InitPostgres(&test_cluster_)); |
160 | 23 | RETURN_NOT_OK(CreateDatabase(&test_cluster_, kNamespaceName, colocated)); |
161 | | |
162 | 23 | cdc_proxy_ = GetCdcProxy(); |
163 | | |
164 | 23 | LOG(INFO) << "Cluster created successfully for CDCSDK"; |
165 | 23 | return Status::OK(); |
166 | 23 | } |
167 | | |
168 | | Result<YBTableName> CDCSDKTestBase::GetTable( |
169 | | Cluster* cluster, |
170 | | const std::string& namespace_name, |
171 | | const std::string& table_name, |
172 | | bool verify_table_name, |
173 | 0 | bool exclude_system_tables) { |
174 | 0 | master::ListTablesRequestPB req; |
175 | 0 | master::ListTablesResponsePB resp; |
176 | |
|
177 | 0 | req.set_name_filter(table_name); |
178 | 0 | req.mutable_namespace_()->set_name(namespace_name); |
179 | 0 | req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL); |
180 | 0 | if (!exclude_system_tables) { |
181 | 0 | req.set_exclude_system_tables(true); |
182 | 0 | req.add_relation_type_filter(master::USER_TABLE_RELATION); |
183 | 0 | } |
184 | |
|
185 | 0 | master::MasterDdlProxy master_proxy( |
186 | 0 | &cluster->client_->proxy_cache(), |
187 | 0 | VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr())); |
188 | | |
189 | 0 | rpc::RpcController rpc; |
190 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
191 | 0 | RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc)); |
192 | 0 | if (resp.has_error()) { |
193 | 0 | return STATUS(IllegalState, "Failed listing tables"); |
194 | 0 | } |
195 | | |
196 | | // Now need to find the table and return it. |
197 | 0 | for (const auto& table : resp.tables()) { |
198 | | // If !verify_table_name, just return the first table. |
199 | 0 | if (!verify_table_name || |
200 | 0 | (table.name() == table_name && table.namespace_().name() == namespace_name)) { |
201 | 0 | YBTableName yb_table; |
202 | 0 | yb_table.set_table_id(table.id()); |
203 | 0 | yb_table.set_namespace_id(table.namespace_().id()); |
204 | 0 | return yb_table; |
205 | 0 | } |
206 | 0 | } |
207 | 0 | return STATUS_FORMAT( |
208 | 0 | IllegalState, "Unable to find table $0 in namespace $1", table_name, namespace_name); |
209 | 0 | } |
210 | | |
211 | | Result<YBTableName> CDCSDKTestBase::CreateTable( |
212 | | Cluster* cluster, |
213 | | const std::string& namespace_name, |
214 | | const std::string& table_name, |
215 | | const uint32_t num_tablets, |
216 | | const bool add_primary_key, |
217 | | bool colocated, |
218 | 0 | const int table_oid) { |
219 | 0 | auto conn = VERIFY_RESULT(cluster->ConnectToDB(namespace_name)); |
220 | 0 | std::string table_oid_string = ""; |
221 | 0 | if (table_oid > 0) { |
222 | | // Need to turn on session flag to allow for CREATE WITH table_oid. |
223 | 0 | RETURN_NOT_OK(conn.Execute("set yb_enable_create_with_table_oid=true")); |
224 | 0 | table_oid_string = Format("table_oid = $0,", table_oid); |
225 | 0 | } |
226 | 0 | RETURN_NOT_OK(conn.ExecuteFormat( |
227 | 0 | "CREATE TABLE $0($1 int $2, $3 int) WITH ($4colocated = $5) " |
228 | 0 | "SPLIT INTO $6 TABLETS", |
229 | 0 | table_name, kKeyColumnName, (add_primary_key) ? "PRIMARY KEY" : "", kValueColumnName, |
230 | 0 | table_oid_string, colocated, num_tablets)); |
231 | 0 | return GetTable(cluster, namespace_name, table_name); |
232 | 0 | } |
233 | | |
234 | 0 | Result<std::string> CDCSDKTestBase::GetNamespaceId(const std::string& namespace_name) { |
235 | 0 | master::GetNamespaceInfoResponsePB namespace_info_resp; |
236 | |
|
237 | 0 | RETURN_NOT_OK(test_client()->GetNamespaceInfo( |
238 | 0 | std::string(), kNamespaceName, YQL_DATABASE_PGSQL, &namespace_info_resp)); |
239 | | |
240 | | // Return namespace_id. |
241 | 0 | return namespace_info_resp.namespace_().id(); |
242 | 0 | } |
243 | | |
244 | | Result<std::string> CDCSDKTestBase::GetTableId( |
245 | | Cluster* cluster, |
246 | | const std::string& namespace_name, |
247 | | const std::string& table_name, |
248 | | bool verify_table_name, |
249 | 0 | bool exclude_system_tables) { |
250 | 0 | master::ListTablesRequestPB req; |
251 | 0 | master::ListTablesResponsePB resp; |
252 | |
|
253 | 0 | req.set_name_filter(table_name); |
254 | 0 | req.mutable_namespace_()->set_name(namespace_name); |
255 | 0 | req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL); |
256 | 0 | if (!exclude_system_tables) { |
257 | 0 | req.set_exclude_system_tables(true); |
258 | 0 | req.add_relation_type_filter(master::USER_TABLE_RELATION); |
259 | 0 | } |
260 | |
|
261 | 0 | master::MasterDdlProxy master_proxy( |
262 | 0 | &cluster->client_->proxy_cache(), |
263 | 0 | VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr())); |
264 | | |
265 | 0 | rpc::RpcController rpc; |
266 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); |
267 | 0 | RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc)); |
268 | 0 | if (resp.has_error()) { |
269 | 0 | return STATUS(IllegalState, "Failed listing tables"); |
270 | 0 | } |
271 | | |
272 | | // Now need to find the table and return it. |
273 | 0 | for (const auto& table : resp.tables()) { |
274 | | // If !verify_table_name, just return the first table. |
275 | 0 | if (!verify_table_name || |
276 | 0 | (table.name() == table_name && table.namespace_().name() == namespace_name)) { |
277 | 0 | return table.id(); |
278 | 0 | } |
279 | 0 | } |
280 | 0 | return STATUS_FORMAT( |
281 | 0 | IllegalState, "Unable to find table id for $0 in $1", table_name, namespace_name); |
282 | 0 | } |
283 | | |
284 | | // Initialize a CreateCDCStreamRequest to be used while creating a DB stream ID. |
285 | | void CDCSDKTestBase::InitCreateStreamRequest( |
286 | | CreateCDCStreamRequestPB* create_req, |
287 | | const CDCCheckpointType& checkpoint_type, |
288 | 0 | const std::string& namespace_name) { |
289 | 0 | create_req->set_namespace_name(namespace_name); |
290 | 0 | create_req->set_checkpoint_type(checkpoint_type); |
291 | 0 | create_req->set_record_type(CDCRecordType::CHANGE); |
292 | 0 | create_req->set_record_format(CDCRecordFormat::PROTO); |
293 | 0 | create_req->set_source_type(CDCSDK); |
294 | 0 | } |
295 | | |
296 | | // This creates a DB stream on the database kNamespaceName by default. |
297 | 0 | Result<std::string> CDCSDKTestBase::CreateDBStream(CDCCheckpointType checkpoint_type) { |
298 | 0 | CreateCDCStreamRequestPB req; |
299 | 0 | CreateCDCStreamResponsePB resp; |
300 | |
|
301 | 0 | rpc::RpcController rpc; |
302 | 0 | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms)); |
303 | |
|
304 | 0 | InitCreateStreamRequest(&req, checkpoint_type); |
305 | |
|
306 | 0 | RETURN_NOT_OK(cdc_proxy_->CreateCDCStream(req, &resp, &rpc)); |
307 | | |
308 | 0 | return resp.db_stream_id(); |
309 | 0 | } |
310 | | |
311 | | } // namespace enterprise |
312 | | } // namespace cdc |
313 | | } // namespace yb |