YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdcsdk_test_base.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#ifndef ENT_SRC_YB_INTEGRATION_TESTS_CDCSDK_TEST_BASE_H
14
#define ENT_SRC_YB_INTEGRATION_TESTS_CDCSDK_TEST_BASE_H
15
16
#include <string>
17
18
#include "yb/client/transaction_manager.h"
19
20
#include "yb/integration-tests/cdc_test_util.h"
21
#include "yb/integration-tests/mini_cluster.h"
22
23
#include "yb/util/test_util.h"
24
#include "yb/util/tsan_util.h"
25
26
#include "yb/yql/pgwrapper/libpq_utils.h"
27
#include "yb/yql/pgwrapper/pg_wrapper.h"
28
29
DECLARE_int32(cdc_read_rpc_timeout_ms);
30
DECLARE_int32(cdc_write_rpc_timeout_ms);
31
DECLARE_bool(TEST_check_broadcast_address);
32
DECLARE_bool(flush_rocksdb_on_shutdown);
33
DECLARE_bool(cdc_enable_replicate_intents);
34
35
DECLARE_int32(replication_factor);
36
DECLARE_int32(cdc_max_apply_batch_num_records);
37
DECLARE_int32(client_read_write_timeout_ms);
38
DECLARE_int32(pgsql_proxy_webserver_port);
39
DECLARE_bool(enable_ysql);
40
DECLARE_bool(hide_pg_catalog_table_creation_logs);
41
DECLARE_bool(master_auto_run_initdb);
42
DECLARE_int32(pggate_rpc_timeout_secs);
43
44
namespace yb {
45
using client::YBClient;
46
using client::YBTableName;
47
48
namespace cdc {
49
namespace enterprise {
50
constexpr int kRpcTimeout = NonTsanVsTsan(60, 120);
51
static const std::string kUniverseId = "test_universe";
52
static const std::string kNamespaceName = "test_namespace";
53
constexpr static const char* const kTableName = "test_table";
54
constexpr static const char* const kKeyColumnName = "key";
55
constexpr static const char* const kValueColumnName = "value";
56
57
struct CDCSDKTestParams {
58
  CDCSDKTestParams(int batch_size_, bool enable_replicate_intents_) :
59
0
      batch_size(batch_size_), enable_replicate_intents(enable_replicate_intents_) {}
60
61
  int batch_size;
62
  bool enable_replicate_intents;
63
};
64
65
class CDCSDKTestBase : public YBTest {
66
 public:
67
  class Cluster {
68
   public:
69
    std::unique_ptr<MiniCluster> mini_cluster_;
70
    std::unique_ptr<YBClient> client_;
71
    std::unique_ptr<yb::pgwrapper::PgSupervisor> pg_supervisor_;
72
    HostPort pg_host_port_;
73
    boost::optional<client::TransactionManager> txn_mgr_;
74
75
0
    Result<pgwrapper::PGConn> Connect() {
76
0
      return pgwrapper::PGConn::Connect(pg_host_port_);
77
0
    }
78
79
0
    Result<pgwrapper::PGConn> ConnectToDB(const std::string& dbname) {
80
0
      return pgwrapper::PGConn::Connect(pg_host_port_, dbname);
81
0
    }
82
  };
83
84
46
  void SetUp() override {
85
46
    YBTest::SetUp();
86
    // Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout.
87
46
    FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
88
46
    FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
89
90
46
    FLAGS_TEST_check_broadcast_address = false;
91
46
    FLAGS_flush_rocksdb_on_shutdown = false;
92
46
  }
93
94
  void TearDown() override;
95
96
  std::unique_ptr<CDCServiceProxy> GetCdcProxy();
97
98
23
  MiniCluster* test_cluster() {
99
23
    return test_cluster_.mini_cluster_.get();
100
23
  }
101
102
0
  client::TransactionManager* test_cluster_txn_mgr() {
103
0
    return test_cluster_.txn_mgr_.get_ptr();
104
0
  }
105
106
0
  YBClient* test_client() {
107
0
    return test_cluster_.client_.get();
108
0
  }
109
110
  Status CreateDatabase(
111
      Cluster* cluster,
112
      const std::string& namespace_name = kNamespaceName,
113
      bool colocated = false);
114
115
  Status InitPostgres(Cluster* cluster);
116
117
  Status SetUpWithParams(
118
      uint32_t replication_factor,
119
      uint32_t num_masters = 1,
120
      bool colocated = false);
121
122
  Result<YBTableName> GetTable(
123
      Cluster* cluster,
124
      const std::string& namespace_name,
125
      const std::string& table_name,
126
      bool verify_table_name = true,
127
      bool exclude_system_tables = true);
128
129
  Result<YBTableName> CreateTable(
130
      Cluster* cluster,
131
      const std::string& namespace_name,
132
      const std::string& table_name,
133
      const uint32_t num_tablets = 1,
134
      const bool add_primary_key = true,
135
      bool colocated = false,
136
      const int table_oid = 0);
137
138
  Result<std::string> GetNamespaceId(const std::string& namespace_name);
139
140
  Result<std::string> GetTableId(
141
      Cluster* cluster,
142
      const std::string& namespace_name,
143
      const std::string& table_name,
144
      bool verify_table_name = true,
145
      bool exclude_system_tables = true);
146
147
  void InitCreateStreamRequest(
148
      CreateCDCStreamRequestPB* create_req,
149
      const CDCCheckpointType& checkpoint_type = CDCCheckpointType::EXPLICIT,
150
      const std::string& namespace_name = kNamespaceName);
151
152
  Result<std::string> CreateDBStream(
153
      CDCCheckpointType checkpoint_type = CDCCheckpointType::EXPLICIT);
154
155
 protected:
156
  // Every test needs to initialize this cdc_proxy_.
157
  std::unique_ptr<CDCServiceProxy> cdc_proxy_;
158
159
  Cluster test_cluster_;
160
};
161
} // namespace enterprise
162
} // namespace cdc
163
} // namespace yb
164
165
#endif  // ENT_SRC_YB_INTEGRATION_TESTS_CDCSDK_TEST_BASE_H