YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdcsdk_test_base.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/integration-tests/cdcsdk_test_base.h"
14
15
#include <algorithm>
16
#include <utility>
17
#include <string>
18
#include <chrono>
19
#include <boost/assign.hpp>
20
#include <gtest/gtest.h>
21
22
#include "yb/cdc/cdc_service.h"
23
#include "yb/cdc/cdc_service.proxy.h"
24
25
#include "yb/client/client.h"
26
#include "yb/client/meta_cache.h"
27
#include "yb/client/schema.h"
28
#include "yb/client/session.h"
29
#include "yb/client/table.h"
30
#include "yb/client/table_alterer.h"
31
#include "yb/client/table_creator.h"
32
#include "yb/client/table_handle.h"
33
#include "yb/client/transaction.h"
34
#include "yb/client/yb_op.h"
35
36
#include "yb/common/common.pb.h"
37
#include "yb/common/entity_ids.h"
38
#include "yb/common/ql_value.h"
39
40
#include "yb/gutil/stl_util.h"
41
#include "yb/gutil/strings/join.h"
42
#include "yb/gutil/strings/substitute.h"
43
44
#include "yb/integration-tests/mini_cluster.h"
45
46
#include "yb/master/catalog_manager.h"
47
#include "yb/master/cdc_consumer_registry_service.h"
48
#include "yb/master/master.h"
49
#include "yb/master/master_client.pb.h"
50
#include "yb/master/master_ddl.pb.h"
51
#include "yb/master/master_ddl.proxy.h"
52
#include "yb/master/master_replication.proxy.h"
53
#include "yb/master/mini_master.h"
54
#include "yb/master/sys_catalog_initialization.h"
55
56
#include "yb/rpc/rpc_controller.h"
57
58
#include "yb/tablet/tablet.h"
59
#include "yb/tablet/tablet_peer.h"
60
61
#include "yb/tserver/mini_tablet_server.h"
62
#include "yb/tserver/tablet_server.h"
63
#include "yb/tserver/ts_tablet_manager.h"
64
65
#include "yb/util/test_util.h"
66
67
#include "yb/yql/pgwrapper/libpq_utils.h"
68
#include "yb/yql/pgwrapper/pg_wrapper.h"
69
70
namespace yb {
71
using client::YBClient;
72
using client::YBTableName;
73
74
namespace cdc {
75
namespace enterprise {
76
77
0
void CDCSDKTestBase::TearDown() {
78
0
  YBTest::TearDown();
79
80
0
  LOG(INFO) << "Destroying cluster for CDCSDK";
81
82
0
  if (test_cluster()) {
83
0
    if (test_cluster_.pg_supervisor_) {
84
0
      test_cluster_.pg_supervisor_->Stop();
85
0
    }
86
0
    test_cluster_.mini_cluster_->Shutdown();
87
0
    test_cluster_.mini_cluster_.reset();
88
0
  }
89
0
  test_cluster_.client_.reset();
90
0
}
91
92
0
std::unique_ptr<CDCServiceProxy> CDCSDKTestBase::GetCdcProxy() {
93
0
  YBClient *client_ = test_client();
94
0
  const auto mini_server = test_cluster()->mini_tablet_servers().front();
95
0
  std::unique_ptr<CDCServiceProxy> proxy = std::make_unique<CDCServiceProxy>(
96
0
      &client_->proxy_cache(), HostPort::FromBoundEndpoint(mini_server->bound_rpc_addr()));
97
0
  return proxy;
98
0
}
99
100
// Create a test database to work on.
101
Status CDCSDKTestBase::CreateDatabase(
102
    Cluster* cluster,
103
    const std::string& namespace_name,
104
0
    bool colocated) {
105
0
  auto conn = VERIFY_RESULT(cluster->Connect());
106
0
      RETURN_NOT_OK(conn.ExecuteFormat(
107
0
      "CREATE DATABASE $0$1", namespace_name, colocated ? " colocated = true" : ""));
108
0
  return Status::OK();
109
0
}
110
111
0
Status CDCSDKTestBase::InitPostgres(Cluster* cluster) {
112
0
  auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers());
113
0
  auto port = cluster->mini_cluster_->AllocateFreePort();
114
0
  pgwrapper::PgProcessConf pg_process_conf =
115
0
      VERIFY_RESULT(pgwrapper::PgProcessConf::CreateValidateAndRunInitDb(
116
0
          AsString(Endpoint(pg_ts->bound_rpc_addr().address(), port)),
117
0
          pg_ts->options()->fs_opts.data_paths.front() + "/pg_data",
118
0
          pg_ts->server()->GetSharedMemoryFd()));
119
0
  pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag;
120
0
  pg_process_conf.force_disable_log_file = true;
121
0
  FLAGS_pgsql_proxy_webserver_port = cluster->mini_cluster_->AllocateFreePort();
122
123
0
  LOG(INFO) << "Starting PostgreSQL server listening on " << pg_process_conf.listen_addresses
124
0
            << ":" << pg_process_conf.pg_port << ", data: " << pg_process_conf.data_dir
125
0
            << ", pgsql webserver port: " << FLAGS_pgsql_proxy_webserver_port;
126
0
  cluster->pg_supervisor_ = std::make_unique<pgwrapper::PgSupervisor>(pg_process_conf);
127
0
      RETURN_NOT_OK(cluster->pg_supervisor_->Start());
128
129
0
  cluster->pg_host_port_ = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port);
130
0
  return Status::OK();
131
0
}
132
133
// Set up a cluster with the specified parameters.
134
Status CDCSDKTestBase::SetUpWithParams(
135
    uint32_t replication_factor,
136
    uint32_t num_masters,
137
23
    bool colocated) {
138
23
  master::SetDefaultInitialSysCatalogSnapshotFlags();
139
23
  CDCSDKTestBase::SetUp();
140
23
  FLAGS_enable_ysql = true;
141
23
  FLAGS_master_auto_run_initdb = true;
142
23
  FLAGS_hide_pg_catalog_table_creation_logs = true;
143
23
  FLAGS_pggate_rpc_timeout_secs = 120;
144
23
  FLAGS_cdc_max_apply_batch_num_records = 1;
145
23
  FLAGS_cdc_enable_replicate_intents = true;
146
23
  FLAGS_replication_factor = replication_factor;
147
148
23
  MiniClusterOptions opts;
149
23
  opts.num_masters = num_masters;
150
23
  opts.num_tablet_servers = replication_factor;
151
23
  opts.cluster_id = "cdcsdk_cluster";
152
153
23
  test_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(opts);
154
155
23
  RETURN_NOT_OK(test_cluster()->StartSync());
156
23
  RETURN_NOT_OK(test_cluster()->WaitForTabletServerCount(replication_factor));
157
23
  RETURN_NOT_OK(WaitForInitDb(test_cluster()));
158
23
  test_cluster_.client_ = VERIFY_RESULT(test_cluster()->CreateClient());
159
23
    RETURN_NOT_OK(InitPostgres(&test_cluster_));
160
23
    RETURN_NOT_OK(CreateDatabase(&test_cluster_, kNamespaceName, colocated));
161
162
23
  cdc_proxy_ = GetCdcProxy();
163
164
23
  LOG(INFO) << "Cluster created successfully for CDCSDK";
165
23
  return Status::OK();
166
23
}
167
168
Result<YBTableName> CDCSDKTestBase::GetTable(
169
    Cluster* cluster,
170
    const std::string& namespace_name,
171
    const std::string& table_name,
172
    bool verify_table_name,
173
0
    bool exclude_system_tables) {
174
0
  master::ListTablesRequestPB req;
175
0
  master::ListTablesResponsePB resp;
176
177
0
  req.set_name_filter(table_name);
178
0
  req.mutable_namespace_()->set_name(namespace_name);
179
0
  req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL);
180
0
  if (!exclude_system_tables) {
181
0
    req.set_exclude_system_tables(true);
182
0
    req.add_relation_type_filter(master::USER_TABLE_RELATION);
183
0
  }
184
185
0
  master::MasterDdlProxy master_proxy(
186
0
      &cluster->client_->proxy_cache(),
187
0
      VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr()));
188
189
0
  rpc::RpcController rpc;
190
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
191
0
      RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc));
192
0
  if (resp.has_error()) {
193
0
    return STATUS(IllegalState, "Failed listing tables");
194
0
  }
195
196
  // Now need to find the table and return it.
197
0
  for (const auto& table : resp.tables()) {
198
    // If !verify_table_name, just return the first table.
199
0
    if (!verify_table_name ||
200
0
        (table.name() == table_name && table.namespace_().name() == namespace_name)) {
201
0
      YBTableName yb_table;
202
0
      yb_table.set_table_id(table.id());
203
0
      yb_table.set_namespace_id(table.namespace_().id());
204
0
      return yb_table;
205
0
    }
206
0
  }
207
0
  return STATUS_FORMAT(
208
0
      IllegalState, "Unable to find table $0 in namespace $1", table_name, namespace_name);
209
0
}
210
211
Result<YBTableName> CDCSDKTestBase::CreateTable(
212
    Cluster* cluster,
213
    const std::string& namespace_name,
214
    const std::string& table_name,
215
    const uint32_t num_tablets,
216
    const bool add_primary_key,
217
    bool colocated,
218
0
    const int table_oid) {
219
0
  auto conn = VERIFY_RESULT(cluster->ConnectToDB(namespace_name));
220
0
  std::string table_oid_string = "";
221
0
  if (table_oid > 0) {
222
    // Need to turn on session flag to allow for CREATE WITH table_oid.
223
0
        RETURN_NOT_OK(conn.Execute("set yb_enable_create_with_table_oid=true"));
224
0
    table_oid_string = Format("table_oid = $0,", table_oid);
225
0
  }
226
0
  RETURN_NOT_OK(conn.ExecuteFormat(
227
0
      "CREATE TABLE $0($1 int $2, $3 int) WITH ($4colocated = $5) "
228
0
      "SPLIT INTO $6 TABLETS",
229
0
      table_name, kKeyColumnName, (add_primary_key) ? "PRIMARY KEY" : "", kValueColumnName,
230
0
      table_oid_string, colocated, num_tablets));
231
0
  return GetTable(cluster, namespace_name, table_name);
232
0
}
233
234
0
Result<std::string> CDCSDKTestBase::GetNamespaceId(const std::string& namespace_name) {
235
0
  master::GetNamespaceInfoResponsePB namespace_info_resp;
236
237
0
  RETURN_NOT_OK(test_client()->GetNamespaceInfo(
238
0
      std::string(), kNamespaceName, YQL_DATABASE_PGSQL, &namespace_info_resp));
239
240
  // Return namespace_id.
241
0
  return namespace_info_resp.namespace_().id();
242
0
}
243
244
Result<std::string> CDCSDKTestBase::GetTableId(
245
    Cluster* cluster,
246
    const std::string& namespace_name,
247
    const std::string& table_name,
248
    bool verify_table_name,
249
0
    bool exclude_system_tables) {
250
0
  master::ListTablesRequestPB req;
251
0
  master::ListTablesResponsePB resp;
252
253
0
  req.set_name_filter(table_name);
254
0
  req.mutable_namespace_()->set_name(namespace_name);
255
0
  req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL);
256
0
  if (!exclude_system_tables) {
257
0
    req.set_exclude_system_tables(true);
258
0
    req.add_relation_type_filter(master::USER_TABLE_RELATION);
259
0
  }
260
261
0
  master::MasterDdlProxy master_proxy(
262
0
      &cluster->client_->proxy_cache(),
263
0
      VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMasterBoundRpcAddr()));
264
265
0
  rpc::RpcController rpc;
266
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
267
0
      RETURN_NOT_OK(master_proxy.ListTables(req, &resp, &rpc));
268
0
  if (resp.has_error()) {
269
0
    return STATUS(IllegalState, "Failed listing tables");
270
0
  }
271
272
  // Now need to find the table and return it.
273
0
  for (const auto& table : resp.tables()) {
274
    // If !verify_table_name, just return the first table.
275
0
    if (!verify_table_name ||
276
0
        (table.name() == table_name && table.namespace_().name() == namespace_name)) {
277
0
      return table.id();
278
0
    }
279
0
  }
280
0
  return STATUS_FORMAT(
281
0
      IllegalState, "Unable to find table id for $0 in $1", table_name, namespace_name);
282
0
}
283
284
// Initialize a CreateCDCStreamRequest to be used while creating a DB stream ID.
285
void CDCSDKTestBase::InitCreateStreamRequest(
286
    CreateCDCStreamRequestPB* create_req,
287
    const CDCCheckpointType& checkpoint_type,
288
0
    const std::string& namespace_name) {
289
0
  create_req->set_namespace_name(namespace_name);
290
0
  create_req->set_checkpoint_type(checkpoint_type);
291
0
  create_req->set_record_type(CDCRecordType::CHANGE);
292
0
  create_req->set_record_format(CDCRecordFormat::PROTO);
293
0
  create_req->set_source_type(CDCSDK);
294
0
}
295
296
// This creates a DB stream on the database kNamespaceName by default.
297
0
Result<std::string> CDCSDKTestBase::CreateDBStream(CDCCheckpointType checkpoint_type) {
298
0
  CreateCDCStreamRequestPB req;
299
0
  CreateCDCStreamResponsePB resp;
300
301
0
  rpc::RpcController rpc;
302
0
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
303
304
0
  InitCreateStreamRequest(&req, checkpoint_type);
305
306
0
  RETURN_NOT_OK(cdc_proxy_->CreateCDCStream(req, &resp, &rpc));
307
308
0
  return resp.db_stream_id();
309
0
}
310
311
} // namespace enterprise
312
} // namespace cdc
313
} // namespace yb