/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdcsdk_test_base.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #ifndef ENT_SRC_YB_INTEGRATION_TESTS_CDCSDK_TEST_BASE_H |
14 | | #define ENT_SRC_YB_INTEGRATION_TESTS_CDCSDK_TEST_BASE_H |
15 | | |
16 | | #include <string> |
17 | | |
18 | | #include "yb/client/transaction_manager.h" |
19 | | |
20 | | #include "yb/integration-tests/cdc_test_util.h" |
21 | | #include "yb/integration-tests/mini_cluster.h" |
22 | | |
23 | | #include "yb/util/test_util.h" |
24 | | #include "yb/util/tsan_util.h" |
25 | | |
26 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
27 | | #include "yb/yql/pgwrapper/pg_wrapper.h" |
28 | | |
29 | | DECLARE_int32(cdc_read_rpc_timeout_ms); |
30 | | DECLARE_int32(cdc_write_rpc_timeout_ms); |
31 | | DECLARE_bool(TEST_check_broadcast_address); |
32 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
33 | | DECLARE_bool(cdc_enable_replicate_intents); |
34 | | |
35 | | DECLARE_int32(replication_factor); |
36 | | DECLARE_int32(cdc_max_apply_batch_num_records); |
37 | | DECLARE_int32(client_read_write_timeout_ms); |
38 | | DECLARE_int32(pgsql_proxy_webserver_port); |
39 | | DECLARE_bool(enable_ysql); |
40 | | DECLARE_bool(hide_pg_catalog_table_creation_logs); |
41 | | DECLARE_bool(master_auto_run_initdb); |
42 | | DECLARE_int32(pggate_rpc_timeout_secs); |
43 | | |
44 | | namespace yb { |
45 | | using client::YBClient; |
46 | | using client::YBTableName; |
47 | | |
48 | | namespace cdc { |
49 | | namespace enterprise { |
50 | | constexpr int kRpcTimeout = NonTsanVsTsan(60, 120); |
51 | | static const std::string kUniverseId = "test_universe"; |
52 | | static const std::string kNamespaceName = "test_namespace"; |
53 | | constexpr static const char* const kTableName = "test_table"; |
54 | | constexpr static const char* const kKeyColumnName = "key"; |
55 | | constexpr static const char* const kValueColumnName = "value"; |
56 | | |
57 | | struct CDCSDKTestParams { |
58 | | CDCSDKTestParams(int batch_size_, bool enable_replicate_intents_) : |
59 | 0 | batch_size(batch_size_), enable_replicate_intents(enable_replicate_intents_) {} |
60 | | |
61 | | int batch_size; |
62 | | bool enable_replicate_intents; |
63 | | }; |
64 | | |
65 | | class CDCSDKTestBase : public YBTest { |
66 | | public: |
67 | | class Cluster { |
68 | | public: |
69 | | std::unique_ptr<MiniCluster> mini_cluster_; |
70 | | std::unique_ptr<YBClient> client_; |
71 | | std::unique_ptr<yb::pgwrapper::PgSupervisor> pg_supervisor_; |
72 | | HostPort pg_host_port_; |
73 | | boost::optional<client::TransactionManager> txn_mgr_; |
74 | | |
75 | 0 | Result<pgwrapper::PGConn> Connect() { |
76 | 0 | return pgwrapper::PGConn::Connect(pg_host_port_); |
77 | 0 | } |
78 | | |
79 | 0 | Result<pgwrapper::PGConn> ConnectToDB(const std::string& dbname) { |
80 | 0 | return pgwrapper::PGConn::Connect(pg_host_port_, dbname); |
81 | 0 | } |
82 | | }; |
83 | | |
84 | 46 | void SetUp() override { |
85 | 46 | YBTest::SetUp(); |
86 | | // Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout. |
87 | 46 | FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000; |
88 | 46 | FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000; |
89 | | |
90 | 46 | FLAGS_TEST_check_broadcast_address = false; |
91 | 46 | FLAGS_flush_rocksdb_on_shutdown = false; |
92 | 46 | } |
93 | | |
94 | | void TearDown() override; |
95 | | |
96 | | std::unique_ptr<CDCServiceProxy> GetCdcProxy(); |
97 | | |
98 | 23 | MiniCluster* test_cluster() { |
99 | 23 | return test_cluster_.mini_cluster_.get(); |
100 | 23 | } |
101 | | |
102 | 0 | client::TransactionManager* test_cluster_txn_mgr() { |
103 | 0 | return test_cluster_.txn_mgr_.get_ptr(); |
104 | 0 | } |
105 | | |
106 | 0 | YBClient* test_client() { |
107 | 0 | return test_cluster_.client_.get(); |
108 | 0 | } |
109 | | |
110 | | Status CreateDatabase( |
111 | | Cluster* cluster, |
112 | | const std::string& namespace_name = kNamespaceName, |
113 | | bool colocated = false); |
114 | | |
115 | | Status InitPostgres(Cluster* cluster); |
116 | | |
117 | | Status SetUpWithParams( |
118 | | uint32_t replication_factor, |
119 | | uint32_t num_masters = 1, |
120 | | bool colocated = false); |
121 | | |
122 | | Result<YBTableName> GetTable( |
123 | | Cluster* cluster, |
124 | | const std::string& namespace_name, |
125 | | const std::string& table_name, |
126 | | bool verify_table_name = true, |
127 | | bool exclude_system_tables = true); |
128 | | |
129 | | Result<YBTableName> CreateTable( |
130 | | Cluster* cluster, |
131 | | const std::string& namespace_name, |
132 | | const std::string& table_name, |
133 | | const uint32_t num_tablets = 1, |
134 | | const bool add_primary_key = true, |
135 | | bool colocated = false, |
136 | | const int table_oid = 0); |
137 | | |
138 | | Result<std::string> GetNamespaceId(const std::string& namespace_name); |
139 | | |
140 | | Result<std::string> GetTableId( |
141 | | Cluster* cluster, |
142 | | const std::string& namespace_name, |
143 | | const std::string& table_name, |
144 | | bool verify_table_name = true, |
145 | | bool exclude_system_tables = true); |
146 | | |
147 | | void InitCreateStreamRequest( |
148 | | CreateCDCStreamRequestPB* create_req, |
149 | | const CDCCheckpointType& checkpoint_type = CDCCheckpointType::EXPLICIT, |
150 | | const std::string& namespace_name = kNamespaceName); |
151 | | |
152 | | Result<std::string> CreateDBStream( |
153 | | CDCCheckpointType checkpoint_type = CDCCheckpointType::EXPLICIT); |
154 | | |
155 | | protected: |
156 | | // Every test needs to initialize this cdc_proxy_. |
157 | | std::unique_ptr<CDCServiceProxy> cdc_proxy_; |
158 | | |
159 | | Cluster test_cluster_; |
160 | | }; |
161 | | } // namespace enterprise |
162 | | } // namespace cdc |
163 | | } // namespace yb |
164 | | |
165 | | #endif // ENT_SRC_YB_INTEGRATION_TESTS_CDCSDK_TEST_BASE_H |