YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/master/master-test_ent.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include <boost/algorithm/string.hpp>
14
#include <boost/lexical_cast.hpp>
15
16
#include "yb/common/schema.h"
17
#include "yb/common/wire_protocol.h"
18
19
#include "yb/gutil/casts.h"
20
21
#include "yb/master/master_ddl.proxy.h"
22
#include "yb/master/master_replication.proxy.h"
23
#include "yb/master/master_defaults.h"
24
25
#include "../../src/yb/master/master-test_base.h"
26
27
#include "yb/util/result.h"
28
29
DECLARE_int32(cdc_state_table_num_tablets);
30
31
namespace yb {
32
namespace master {
33
namespace enterprise {
34
35
constexpr const char* kTableName = "cdc_table";
36
static const Schema kTableSchema({ ColumnSchema("key", INT32),
37
                                   ColumnSchema("v1", UINT64),
38
                                   ColumnSchema("v2", STRING) },
39
                                 1);
40
41
class MasterTestEnt  : public MasterTestBase {
42
 protected:
43
  Status CreateCDCStream(const TableId& table_id, CDCStreamId* stream_id);
44
  Status GetCDCStream(const CDCStreamId& stream_id, GetCDCStreamResponsePB* resp);
45
  Status DeleteCDCStream(const CDCStreamId& stream_id);
46
  Status ListCDCStreams(ListCDCStreamsResponsePB* resp);
47
48
  Status SetupUniverseReplication(
49
      const std::string& producer_id, const std::vector<std::string>& master_addr,
50
      const std::vector<std::string>& tables);
51
  Status DeleteUniverseReplication(const std::string& producer_id);
52
  Status GetUniverseReplication(
53
      const std::string& producer_id, GetUniverseReplicationResponsePB* resp);
54
55
};
56
57
4
Status MasterTestEnt::CreateCDCStream(const TableId& table_id, CDCStreamId* stream_id) {
58
4
  CreateCDCStreamRequestPB req;
59
4
  CreateCDCStreamResponsePB resp;
60
61
4
  req.set_table_id(table_id);
62
4
  RETURN_NOT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController()));
63
4
  if (resp.has_error()) {
64
0
    RETURN_NOT_OK(StatusFromPB(resp.error().status()));
65
0
  }
66
67
4
  RETURN_NOT_OK(WaitFor([&](){
68
4
    IsCreateTableDoneRequestPB is_create_req;
69
4
    IsCreateTableDoneResponsePB is_create_resp;
70
71
4
    is_create_req.mutable_table()->set_table_name(master::kCdcStateTableName);
72
4
    is_create_req.mutable_table()->mutable_namespace_()->set_name(master::kSystemNamespaceName);
73
74
4
    auto s = proxy_ddl_->IsCreateTableDone(is_create_req, &is_create_resp, ResetAndGetController());
75
4
    if (!s.ok()) {
76
4
      return false;
77
4
    }
78
4
    return true;
79
4
  }, MonoDelta::FromSeconds(30), "Wait for cdc_state table creation to finish"));
80
81
4
  *stream_id = resp.stream_id();
82
4
  return Status::OK();
83
4
}
84
85
5
Status MasterTestEnt::GetCDCStream(const CDCStreamId& stream_id, GetCDCStreamResponsePB* resp) {
86
5
  GetCDCStreamRequestPB req;
87
5
  req.set_stream_id(stream_id);
88
89
5
  RETURN_NOT_OK(proxy_replication_->GetCDCStream(req, resp, ResetAndGetController()));
90
5
  if (resp->has_error()) {
91
1
    RETURN_NOT_OK(StatusFromPB(resp->error().status()));
92
1
  }
93
4
  return Status::OK();
94
5
}
95
96
1
Status MasterTestEnt::DeleteCDCStream(const CDCStreamId& stream_id) {
97
1
  DeleteCDCStreamRequestPB req;
98
1
  DeleteCDCStreamResponsePB resp;
99
1
  req.add_stream_id(stream_id);
100
101
1
  RETURN_NOT_OK(proxy_replication_->DeleteCDCStream(req, &resp, ResetAndGetController()));
102
1
  if (resp.has_error()) {
103
0
    RETURN_NOT_OK(StatusFromPB(resp.error().status()));
104
0
  }
105
1
  return Status::OK();
106
1
}
107
108
1
Status MasterTestEnt::ListCDCStreams(ListCDCStreamsResponsePB* resp) {
109
1
  ListCDCStreamsRequestPB req;
110
111
1
  RETURN_NOT_OK(proxy_replication_->ListCDCStreams(req, resp, ResetAndGetController()));
112
1
  if (resp->has_error()) {
113
0
    RETURN_NOT_OK(StatusFromPB(resp->error().status()));
114
0
  }
115
1
  return Status::OK();
116
1
}
117
118
Status MasterTestEnt::SetupUniverseReplication(
119
    const std::string& producer_id, const std::vector<std::string>& producer_master_addrs,
120
2
    const std::vector<TableId>& tables) {
121
2
  SetupUniverseReplicationRequestPB req;
122
2
  SetupUniverseReplicationResponsePB resp;
123
124
2
  req.set_producer_id(producer_id);
125
2
  req.mutable_producer_master_addresses()->Reserve(narrow_cast<int>(producer_master_addrs.size()));
126
2
  for (const auto& addr : producer_master_addrs) {
127
2
    std::vector<std::string> hp;
128
2
    boost::split(hp, addr, boost::is_any_of(":"));
129
2
    CHECK_EQ(hp.size(), 2);
130
2
    auto* master = req.add_producer_master_addresses();
131
2
    master->set_host(hp[0]);
132
2
    master->set_port(boost::lexical_cast<uint32_t>(hp[1]));
133
2
  }
134
2
  req.mutable_producer_table_ids()->Reserve(narrow_cast<int>(tables.size()));
135
2
  for (const auto& table : tables) {
136
2
    req.add_producer_table_ids(table);
137
2
  }
138
139
2
  RETURN_NOT_OK(proxy_replication_->SetupUniverseReplication(req, &resp, ResetAndGetController()));
140
2
  if (resp.has_error()) {
141
0
    RETURN_NOT_OK(StatusFromPB(resp.error().status()));
142
0
  }
143
2
  return Status::OK();
144
2
}
145
146
Status MasterTestEnt::GetUniverseReplication(
147
0
    const std::string& producer_id, GetUniverseReplicationResponsePB* resp) {
148
0
  GetUniverseReplicationRequestPB req;
149
0
  req.set_producer_id(producer_id);
150
151
0
  RETURN_NOT_OK(proxy_replication_->GetUniverseReplication(req, resp, ResetAndGetController()));
152
0
  if (resp->has_error()) {
153
0
    RETURN_NOT_OK(StatusFromPB(resp->error().status()));
154
0
  }
155
0
  return Status::OK();
156
0
}
157
158
0
Status MasterTestEnt::DeleteUniverseReplication(const std::string& producer_id) {
159
0
  DeleteUniverseReplicationRequestPB req;
160
0
  DeleteUniverseReplicationResponsePB resp;
161
0
  req.set_producer_id(producer_id);
162
163
0
  RETURN_NOT_OK(proxy_replication_->DeleteUniverseReplication(req, &resp, ResetAndGetController()));
164
0
  if (resp.has_error()) {
165
0
    RETURN_NOT_OK(StatusFromPB(resp.error().status()));
166
0
  }
167
0
  return Status::OK();
168
0
}
169
170
1
TEST_F(MasterTestEnt, TestCreateCDCStreamInvalidTable) {
171
1
  CreateCDCStreamRequestPB req;
172
1
  CreateCDCStreamResponsePB resp;
173
174
1
  req.set_table_id("invalidid");
175
1
  ASSERT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController()));
176
1
  SCOPED_TRACE(resp.DebugString());
177
1
  ASSERT_TRUE(resp.has_error());
178
1
  ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());
179
1
}
180
181
1
TEST_F(MasterTestEnt, TestCreateCDCStream) {
182
1
  TableId table_id;
183
1
  ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
184
185
1
  CDCStreamId stream_id;
186
1
  FLAGS_cdc_state_table_num_tablets = 1;
187
1
  ASSERT_OK(CreateCDCStream(table_id, &stream_id));
188
189
1
  GetCDCStreamResponsePB resp;
190
1
  ASSERT_OK(GetCDCStream(stream_id, &resp));
191
1
  ASSERT_EQ(resp.stream().table_id().Get(0), table_id);
192
1
}
193
194
1
TEST_F(MasterTestEnt, TestDeleteCDCStream) {
195
1
  TableId table_id;
196
1
  ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
197
198
1
  CDCStreamId stream_id;
199
1
  FLAGS_cdc_state_table_num_tablets = 1;
200
1
  ASSERT_OK(CreateCDCStream(table_id, &stream_id));
201
202
1
  GetCDCStreamResponsePB resp;
203
1
  ASSERT_OK(GetCDCStream(stream_id, &resp));
204
1
  ASSERT_EQ(resp.stream().table_id().Get(0), table_id);
205
206
1
  ASSERT_OK(DeleteCDCStream(stream_id));
207
208
1
  resp.Clear();
209
1
  ASSERT_NOK(GetCDCStream(stream_id, &resp));
210
1
  ASSERT_TRUE(resp.has_error());
211
1
  ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());
212
1
}
213
214
1
TEST_F(MasterTestEnt, TestDeleteTableWithCDCStream) {
215
1
  TableId table_id;
216
1
  ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
217
218
1
  CDCStreamId stream_id;
219
1
  FLAGS_cdc_state_table_num_tablets = 1;
220
1
  ASSERT_OK(CreateCDCStream(table_id, &stream_id));
221
222
1
  GetCDCStreamResponsePB resp;
223
1
  ASSERT_OK(GetCDCStream(stream_id, &resp));
224
1
  ASSERT_EQ(resp.stream().table_id().Get(0), table_id);
225
226
  // Deleting the table will fail since it has a CDC stream attached.
227
1
  TableId id;
228
1
  ASSERT_NOK(DeleteTableSync(default_namespace_name, kTableName, &id));
229
230
1
  ASSERT_OK(GetCDCStream(stream_id, &resp));
231
1
}
232
233
1
TEST_F(MasterTestEnt, TestListCDCStreams) {
234
1
  TableId table_id;
235
1
  ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
236
237
1
  CDCStreamId stream_id;
238
1
  FLAGS_cdc_state_table_num_tablets = 1;
239
1
  ASSERT_OK(CreateCDCStream(table_id, &stream_id));
240
241
1
  ListCDCStreamsResponsePB resp;
242
1
  ASSERT_OK(ListCDCStreams(&resp));
243
1
  ASSERT_EQ(1, resp.streams_size());
244
1
  ASSERT_EQ(stream_id, resp.streams(0).stream_id());
245
1
}
246
247
1
TEST_F(MasterTestEnt, TestSetupUniverseReplication) {
248
1
  std::string producer_id = "producer_universe";
249
1
  std::vector<std::string> producer_masters {"127.0.0.1:7100"};
250
1
  std::vector<std::string> tables {"some_table_id"};
251
  // Always fails because we don't have actual producer.
252
1
  ASSERT_NOK(SetupUniverseReplication(producer_id, producer_masters, tables));
253
254
0
  GetUniverseReplicationResponsePB resp;
255
0
  ASSERT_OK(GetUniverseReplication(producer_id, &resp));
256
0
  ASSERT_EQ(resp.entry().producer_id(), producer_id);
257
258
0
  ASSERT_EQ(resp.entry().producer_master_addresses_size(), 1);
259
0
  std::string addr;
260
0
  const auto& hp = resp.entry().producer_master_addresses(0);
261
0
  addr = hp.host() + ":" + std::to_string(hp.port());
262
0
  ASSERT_EQ(addr, "127.0.0.1:7100");
263
264
0
  ASSERT_EQ(resp.entry().tables_size(), 1);
265
0
  ASSERT_EQ(resp.entry().tables(0), "some_table_id");
266
0
}
267
268
1
TEST_F(MasterTestEnt, TestDeleteUniverseReplication) {
269
1
  std::string producer_id = "producer_universe";
270
1
  std::vector<std::string> producer_masters {"127.0.0.1:7100"};
271
1
  std::vector<std::string> tables {"some_table_id"};
272
  // Always fails because we don't have actual producer.
273
1
  ASSERT_NOK(SetupUniverseReplication(producer_id, producer_masters, tables));
274
275
  // Verify that universe was created.
276
0
  GetUniverseReplicationResponsePB resp;
277
0
  ASSERT_OK(GetUniverseReplication(producer_id, &resp));
278
0
  ASSERT_EQ(resp.entry().producer_id(), producer_id);
279
280
0
  ASSERT_OK(DeleteUniverseReplication(producer_id));
281
282
0
  resp.Clear();
283
0
  ASSERT_NOK(GetUniverseReplication(producer_id, &resp));
284
0
  ASSERT_TRUE(resp.has_error());
285
0
  ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());
286
0
}
287
288
} // namespace enterprise
289
} // namespace master
290
} // namespace yb