YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc_test_base.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H
15
#define ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H
16
17
#include <string>
18
19
#include "yb/client/transaction_manager.h"
20
21
#include "yb/integration-tests/cdc_test_util.h"
22
#include "yb/integration-tests/mini_cluster.h"
23
24
#include "yb/master/master_replication.fwd.h"
25
26
#include "yb/util/test_util.h"
27
#include "yb/util/tsan_util.h"
28
29
#include "yb/yql/pgwrapper/libpq_utils.h"
30
#include "yb/yql/pgwrapper/pg_wrapper.h"
31
32
DECLARE_int32(cdc_read_rpc_timeout_ms);
33
DECLARE_int32(cdc_write_rpc_timeout_ms);
34
DECLARE_bool(TEST_check_broadcast_address);
35
DECLARE_bool(flush_rocksdb_on_shutdown);
36
DECLARE_bool(cdc_enable_replicate_intents);
37
38
namespace yb {
39
40
using client::YBClient;
41
42
namespace enterprise {
43
44
constexpr int kRpcTimeout = NonTsanVsTsan(60, 120);
45
static const std::string kUniverseId = "test_universe";
46
static const std::string kNamespaceName = "test_namespace";
47
48
struct TwoDCTestParams {
49
  TwoDCTestParams(int batch_size_, bool enable_replicate_intents_) :
50
      batch_size(batch_size_), enable_replicate_intents(enable_replicate_intents_) {}
51
52
  int batch_size;
53
  bool enable_replicate_intents;
54
};
55
56
class TwoDCTestBase : public YBTest {
57
 public:
58
  class Cluster {
59
   public:
60
    std::unique_ptr<MiniCluster> mini_cluster_;
61
    std::unique_ptr<YBClient> client_;
62
    std::unique_ptr<yb::pgwrapper::PgSupervisor> pg_supervisor_;
63
    HostPort pg_host_port_;
64
    boost::optional<client::TransactionManager> txn_mgr_;
65
66
0
    Result<pgwrapper::PGConn> Connect() {
67
0
      return pgwrapper::PGConn::Connect(pg_host_port_);
68
0
    }
69
70
0
    Result<pgwrapper::PGConn> ConnectToDB(const std::string& dbname) {
71
0
      return pgwrapper::PGConn::Connect(pg_host_port_, dbname);
72
0
    }
73
  };
74
75
288
  void SetUp() override {
76
288
    YBTest::SetUp();
77
    // Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout.
78
288
    FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
79
288
    FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
80
    // Not a useful test for us. It's testing Public+Private IP NW errors and we're only public
81
288
    FLAGS_TEST_check_broadcast_address = false;
82
288
    FLAGS_flush_rocksdb_on_shutdown = false;
83
288
  }
84
85
  void TearDown() override;
86
87
  CHECKED_STATUS SetupUniverseReplication(
88
      MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client,
89
      const std::string& universe_id, const std::vector<std::shared_ptr<client::YBTable>>& tables,
90
      bool leader_only = true);
91
92
  CHECKED_STATUS VerifyUniverseReplication(
93
      MiniCluster* consumer_cluster, YBClient* consumer_client,
94
      const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp);
95
96
  CHECKED_STATUS ToggleUniverseReplication(
97
      MiniCluster* consumer_cluster, YBClient* consumer_client,
98
      const std::string& universe_id, bool is_enabled);
99
100
  CHECKED_STATUS VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster,
101
      YBClient* consumer_client, const std::string& universe_id, int timeout);
102
103
  CHECKED_STATUS GetCDCStreamForTable(
104
      const std::string& table_id, master::ListCDCStreamsResponsePB* resp);
105
106
  uint32_t GetSuccessfulWriteOps(MiniCluster* cluster);
107
108
  CHECKED_STATUS DeleteUniverseReplication(const std::string& universe_id);
109
110
  CHECKED_STATUS DeleteUniverseReplication(
111
      const std::string& universe_id, YBClient* client, MiniCluster* cluster);
112
113
  size_t NumProducerTabletsPolled(MiniCluster* cluster);
114
115
  CHECKED_STATUS CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets);
116
117
0
  YBClient* producer_client() {
118
0
    return producer_cluster_.client_.get();
119
0
  }
120
121
0
  YBClient* consumer_client() {
122
0
    return consumer_cluster_.client_.get();
123
0
  }
124
125
146
  MiniCluster* producer_cluster() {
126
146
    return producer_cluster_.mini_cluster_.get();
127
146
  }
128
129
2
  MiniCluster* consumer_cluster() {
130
2
    return consumer_cluster_.mini_cluster_.get();
131
2
  }
132
133
0
  client::TransactionManager* producer_txn_mgr() {
134
0
    return producer_cluster_.txn_mgr_.get_ptr();
135
0
  }
136
137
0
  client::TransactionManager* consumer_txn_mgr() {
138
0
    return consumer_cluster_.txn_mgr_.get_ptr();
139
0
  }
140
141
 protected:
142
  Cluster producer_cluster_;
143
  Cluster consumer_cluster_;
144
};
145
146
} // namespace enterprise
147
} // namespace yb
148
149
#endif // ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H