YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/twodc_test_base.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/integration-tests/twodc_test_base.h"
15
16
#include <string>
17
18
#include "yb/cdc/cdc_service.h"
19
20
#include "yb/client/client.h"
21
#include "yb/client/table.h"
22
23
#include "yb/common/wire_protocol.h"
24
25
#include "yb/gutil/casts.h"
26
27
#include "yb/integration-tests/cdc_test_util.h"
28
#include "yb/integration-tests/mini_cluster.h"
29
#include "yb/master/catalog_manager_if.h"
30
#include "yb/master/master_replication.proxy.h"
31
#include "yb/master/mini_master.h"
32
#include "yb/rpc/rpc_controller.h"
33
#include "yb/tserver/cdc_consumer.h"
34
#include "yb/tserver/mini_tablet_server.h"
35
#include "yb/tserver/tablet_server.h"
36
#include "yb/util/test_util.h"
37
#include "yb/yql/pgwrapper/libpq_utils.h"
38
#include "yb/yql/pgwrapper/pg_wrapper.h"
39
40
namespace yb {
41
42
using client::YBClient;
43
using tserver::enterprise::CDCConsumer;
44
45
namespace enterprise {
46
47
2
void TwoDCTestBase::TearDown() {
48
2
  LOG(INFO) << "Destroying CDC Clusters";
49
2
  if (consumer_cluster()) {
50
0
    if (consumer_cluster_.pg_supervisor_) {
51
0
      consumer_cluster_.pg_supervisor_->Stop();
52
0
    }
53
0
    consumer_cluster_.mini_cluster_->Shutdown();
54
0
    consumer_cluster_.mini_cluster_.reset();
55
0
  }
56
57
2
  if (producer_cluster()) {
58
2
    if (producer_cluster_.pg_supervisor_) {
59
0
      producer_cluster_.pg_supervisor_->Stop();
60
0
    }
61
2
    producer_cluster_.mini_cluster_->Shutdown();
62
2
    producer_cluster_.mini_cluster_.reset();
63
2
  }
64
65
2
  producer_cluster_.client_.reset();
66
2
  consumer_cluster_.client_.reset();
67
68
2
  YBTest::TearDown();
69
2
}
70
71
Status TwoDCTestBase::SetupUniverseReplication(
72
    MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client,
73
    const std::string& universe_id, const std::vector<std::shared_ptr<client::YBTable>>& tables,
74
0
    bool leader_only) {
75
0
  master::SetupUniverseReplicationRequestPB req;
76
0
  master::SetupUniverseReplicationResponsePB resp;
77
78
0
  req.set_producer_id(universe_id);
79
0
  string master_addr = producer_cluster->GetMasterAddresses();
80
0
  if (leader_only) {
81
0
    master_addr = VERIFY_RESULT(producer_cluster->GetLeaderMiniMaster())->bound_rpc_addr_str();
82
0
  }
83
0
  auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0));
84
0
  HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses());
85
86
0
  req.mutable_producer_table_ids()->Reserve(narrow_cast<int>(tables.size()));
87
0
  for (const auto& table : tables) {
88
0
    req.add_producer_table_ids(table->id());
89
0
  }
90
91
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
92
0
      &consumer_client->proxy_cache(),
93
0
      VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr());
94
95
0
  rpc::RpcController rpc;
96
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
97
0
  return WaitFor([&] () -> Result<bool> {
98
0
    if (!master_proxy->SetupUniverseReplication(req, &resp, &rpc).ok()) {
99
0
      return false;
100
0
    }
101
0
    if (resp.has_error()) {
102
0
      return false;
103
0
    }
104
0
    return true;
105
0
  }, MonoDelta::FromSeconds(30), "Setup universe replication");
106
0
}
107
108
Status TwoDCTestBase::VerifyUniverseReplication(
109
    MiniCluster* consumer_cluster, YBClient* consumer_client,
110
0
    const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp) {
111
0
  return LoggedWaitFor([=]() -> Result<bool> {
112
0
    master::GetUniverseReplicationRequestPB req;
113
0
    req.set_producer_id(universe_id);
114
0
    resp->Clear();
115
116
0
    auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
117
0
        &consumer_client->proxy_cache(),
118
0
        VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr());
119
0
    rpc::RpcController rpc;
120
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
121
122
0
    Status s = master_proxy->GetUniverseReplication(req, resp, &rpc);
123
0
    return s.ok() && !resp->has_error() &&
124
0
            resp->entry().state() == master::SysUniverseReplicationEntryPB::ACTIVE;
125
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication");
126
0
}
127
128
Status TwoDCTestBase::ToggleUniverseReplication(
129
    MiniCluster* consumer_cluster, YBClient* consumer_client,
130
0
    const std::string& universe_id, bool is_enabled) {
131
0
  master::SetUniverseReplicationEnabledRequestPB req;
132
0
  master::SetUniverseReplicationEnabledResponsePB resp;
133
134
0
  req.set_producer_id(universe_id);
135
0
  req.set_is_enabled(is_enabled);
136
137
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
138
0
      &consumer_client->proxy_cache(),
139
0
      VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr());
140
141
0
  rpc::RpcController rpc;
142
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
143
0
  RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc));
144
0
  if (resp.has_error()) {
145
0
    return StatusFromPB(resp.error().status());
146
0
  }
147
0
  return Status::OK();
148
0
}
149
150
Status TwoDCTestBase::VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster,
151
0
    YBClient* consumer_client, const std::string& universe_id, int timeout) {
152
0
  return LoggedWaitFor([=]() -> Result<bool> {
153
0
    master::GetUniverseReplicationRequestPB req;
154
0
    master::GetUniverseReplicationResponsePB resp;
155
0
    req.set_producer_id(universe_id);
156
157
0
    auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
158
0
        &consumer_client->proxy_cache(),
159
0
        VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr());
160
0
    rpc::RpcController rpc;
161
0
    rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
162
163
0
    Status s = master_proxy->GetUniverseReplication(req, &resp, &rpc);
164
0
    return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND;
165
0
  }, MonoDelta::FromMilliseconds(timeout), "Verify universe replication deleted");
166
0
}
167
168
Status TwoDCTestBase::GetCDCStreamForTable(
169
0
    const std::string& table_id, master::ListCDCStreamsResponsePB* resp) {
170
0
  return LoggedWaitFor([=]() -> Result<bool> {
171
0
    master::ListCDCStreamsRequestPB req;
172
0
    req.set_table_id(table_id);
173
0
    resp->Clear();
174
175
0
    auto leader_mini_master = producer_cluster()->GetLeaderMiniMaster();
176
0
    if (!leader_mini_master.ok()) {
177
0
      return false;
178
0
    }
179
0
    Status s = (*leader_mini_master)->catalog_manager().ListCDCStreams(&req, resp);
180
0
    return s.ok() && !resp->has_error() && resp->streams_size() == 1;
181
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Get CDC stream for table");
182
0
}
183
184
0
uint32_t TwoDCTestBase::GetSuccessfulWriteOps(MiniCluster* cluster) {
185
0
  uint32_t size = 0;
186
0
  for (const auto& mini_tserver : cluster->mini_tablet_servers()) {
187
0
    auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(mini_tserver->server());
188
0
    CDCConsumer* cdc_consumer;
189
0
    if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) {
190
0
      size += cdc_consumer->GetNumSuccessfulWriteRpcs();
191
0
    }
192
0
  }
193
0
  return size;
194
0
}
195
196
0
Status TwoDCTestBase::DeleteUniverseReplication(const std::string& universe_id) {
197
0
  return DeleteUniverseReplication(universe_id, consumer_client(), consumer_cluster());
198
0
}
199
200
Status TwoDCTestBase::DeleteUniverseReplication(
201
0
    const std::string& universe_id, YBClient* client, MiniCluster* cluster) {
202
0
  master::DeleteUniverseReplicationRequestPB req;
203
0
  master::DeleteUniverseReplicationResponsePB resp;
204
205
0
  req.set_producer_id(universe_id);
206
207
0
  auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
208
0
      &client->proxy_cache(),
209
0
      VERIFY_RESULT(cluster->GetLeaderMiniMaster())->bound_rpc_addr());
210
211
0
  rpc::RpcController rpc;
212
0
  rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
213
0
  RETURN_NOT_OK(master_proxy->DeleteUniverseReplication(req, &resp, &rpc));
214
0
  LOG(INFO) << "Delete universe succeeded";
215
0
  return Status::OK();
216
0
}
217
218
0
size_t TwoDCTestBase::NumProducerTabletsPolled(MiniCluster* cluster) {
219
0
  size_t size = 0;
220
0
  for (const auto& mini_tserver : cluster->mini_tablet_servers()) {
221
0
    size_t new_size = 0;
222
0
    auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(mini_tserver->server());
223
0
    CDCConsumer* cdc_consumer;
224
0
    if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) {
225
0
      auto tablets_running = cdc_consumer->TEST_producer_tablets_running();
226
0
      new_size = tablets_running.size();
227
0
    }
228
0
    size += new_size;
229
0
  }
230
0
  return size;
231
0
}
232
233
Status TwoDCTestBase::CorrectlyPollingAllTablets(
234
0
    MiniCluster* cluster, uint32_t num_producer_tablets) {
235
0
  return LoggedWaitFor([=]() -> Result<bool> {
236
0
    static int i = 0;
237
0
    constexpr int kNumIterationsWithCorrectResult = 5;
238
0
    auto cur_tablets = NumProducerTabletsPolled(cluster);
239
0
    if (cur_tablets == num_producer_tablets) {
240
0
      if (i++ == kNumIterationsWithCorrectResult) {
241
0
        i = 0;
242
0
        return true;
243
0
      }
244
0
    } else {
245
0
      i = 0;
246
0
    }
247
0
    LOG(INFO) << "Tablets being polled: " << cur_tablets;
248
0
    return false;
249
0
  }, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled");
250
0
}
251
252
} // namespace enterprise
253
} // namespace yb