YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdcsdk_stream-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include <algorithm>
14
#include <chrono>
15
#include <utility>
16
#include <boost/assign.hpp>
17
#include <gflags/gflags.h>
18
#include <gtest/gtest.h>
19
20
#include "yb/cdc/cdc_service.h"
21
#include "yb/cdc/cdc_service.pb.h"
22
23
#include "yb/client/client-test-util.h"
24
#include "yb/client/client.h"
25
#include "yb/client/meta_cache.h"
26
#include "yb/client/schema.h"
27
#include "yb/client/session.h"
28
#include "yb/client/table.h"
29
#include "yb/client/table_alterer.h"
30
#include "yb/client/table_creator.h"
31
#include "yb/client/table_handle.h"
32
#include "yb/client/transaction.h"
33
#include "yb/client/yb_op.h"
34
35
#include "yb/common/common.pb.h"
36
#include "yb/common/entity_ids.h"
37
#include "yb/common/ql_value.h"
38
39
#include "yb/gutil/stl_util.h"
40
#include "yb/gutil/strings/join.h"
41
#include "yb/gutil/strings/substitute.h"
42
43
#include "yb/integration-tests/cdcsdk_test_base.h"
44
#include "yb/integration-tests/mini_cluster.h"
45
46
#include "yb/master/master.h"
47
#include "yb/master/master_client.pb.h"
48
#include "yb/master/master_ddl.pb.h"
49
#include "yb/master/master_replication.proxy.h"
50
#include "yb/master/mini_master.h"
51
52
#include "yb/rpc/rpc_controller.h"
53
54
#include "yb/tablet/tablet.h"
55
#include "yb/tablet/tablet_peer.h"
56
57
#include "yb/tserver/cdc_consumer.h"
58
#include "yb/tserver/mini_tablet_server.h"
59
#include "yb/tserver/tablet_server.h"
60
#include "yb/tserver/ts_tablet_manager.h"
61
62
#include "yb/util/monotime.h"
63
#include "yb/util/result.h"
64
#include "yb/util/test_macros.h"
65
66
#include "yb/yql/pgwrapper/libpq_utils.h"
67
#include "yb/yql/pgwrapper/pg_wrapper.h"
68
69
namespace yb {
70
71
using client::YBClient;
72
using client::YBClientBuilder;
73
using client::YBColumnSchema;
74
using client::YBError;
75
using client::YBSchema;
76
using client::YBSchemaBuilder;
77
using client::YBSession;
78
using client::YBTable;
79
using client::YBTableAlterer;
80
using client::YBTableCreator;
81
using client::YBTableName;
82
using client::YBTableType;
83
using master::GetNamespaceInfoResponsePB;
84
using master::MiniMaster;
85
using tserver::MiniTabletServer;
86
using tserver::enterprise::CDCConsumer;
87
88
using pgwrapper::GetInt32;
89
using pgwrapper::PGConn;
90
using pgwrapper::PGResultPtr;
91
using pgwrapper::PgSupervisor;
92
using pgwrapper::ToString;
93
94
using rpc::RpcController;
95
96
namespace cdc {
97
namespace enterprise {
98
class CDCSDKStreamTest : public CDCSDKTestBase {
99
 public:
100
  struct ExpectedRecord {
101
    std::string key;
102
    std::string value;
103
  };
104
105
0
  CHECKED_STATUS DeleteCDCStream(const std::string& db_stream_id) {
106
0
    RpcController delete_rpc;
107
0
    delete_rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
108
109
0
    DeleteCDCStreamRequestPB delete_req;
110
0
    DeleteCDCStreamResponsePB delete_resp;
111
0
    delete_req.add_stream_id(db_stream_id);
112
113
    // The following line assumes that cdc_proxy_ has been initialized in the test already
114
0
    return cdc_proxy_->DeleteCDCStream(delete_req, &delete_resp, &delete_rpc);
115
0
  }
116
117
0
  Result<std::vector<std::string>> CreateDBStreams(const int num_streams) {
118
0
    std::vector<std::string> created_streams;
119
    // We will create some DB Streams to be listed out later.
120
0
    for (int i = 0; i < num_streams; i++) {
121
0
      std::string db_stream_id = VERIFY_RESULT(CreateDBStream());
122
0
      SCHECK(!db_stream_id.empty(), IllegalState, "The created db_stream_id is empty!");
123
0
      created_streams.push_back(db_stream_id);
124
0
    }
125
126
    // Sorting the stream IDs in order to simplify assertion.
127
0
    std::sort(created_streams.begin(), created_streams.end());
128
0
    return created_streams;
129
0
  }
130
131
  Result<google::protobuf::RepeatedPtrField<yb::master::CDCStreamInfoPB>> ListDBStreams(
132
0
      const std::string& namespace_name = kNamespaceName, const TableId table_id = "") {
133
    // Listing the streams now.
134
0
    master::ListCDCStreamsRequestPB list_req;
135
0
    master::ListCDCStreamsResponsePB list_resp;
136
137
    // If table_id is passed i.e. it is not empty, it means that now the xCluster streams are being
138
    // requested, so we will be doing further operations based on the same check.
139
0
    if (!table_id.empty()) {
140
0
      list_req.set_id_type(master::IdTypePB::TABLE_ID);
141
0
      list_req.set_table_id(table_id);
142
0
    } else {
143
0
      list_req.set_id_type(master::IdTypePB::NAMESPACE_ID);
144
0
      list_req.set_namespace_id(VERIFY_RESULT(GetNamespaceId(kNamespaceName)));
145
0
    }
146
147
0
    RpcController list_rpc;
148
0
    list_rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
149
150
0
    master::MasterReplicationProxy master_proxy_(
151
0
        &test_client()->proxy_cache(),
152
0
        VERIFY_RESULT(test_cluster_.mini_cluster_->GetLeaderMasterBoundRpcAddr()));
153
154
0
    RETURN_NOT_OK(master_proxy_.ListCDCStreams(list_req, &list_resp, &list_rpc));
155
156
0
    if (list_resp.has_error()) {
157
0
      return StatusFromPB(list_resp.error().status());
158
0
    }
159
160
0
    return list_resp.streams();
161
0
  }
162
163
0
  Result<master::GetCDCDBStreamInfoResponsePB> GetDBStreamInfo(std::string db_stream_id) {
164
0
    master::GetCDCDBStreamInfoRequestPB get_req;
165
0
    master::GetCDCDBStreamInfoResponsePB get_resp;
166
0
    get_req.set_db_stream_id(db_stream_id);
167
168
0
    RpcController get_rpc;
169
0
    get_rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
170
171
0
    master::MasterReplicationProxy master_proxy_(
172
0
        &test_client()->proxy_cache(),
173
0
        VERIFY_RESULT(test_cluster_.mini_cluster_->GetLeaderMasterBoundRpcAddr()));
174
175
0
    RETURN_NOT_OK(master_proxy_.GetCDCDBStreamInfo(get_req, &get_resp, &get_rpc));
176
177
0
    return get_resp;
178
0
  }
179
180
0
  void TestListDBStreams(bool with_table) {
181
    // Create one table.
182
0
    std::string table_id;
183
184
0
    if (with_table) {
185
0
      auto table =
186
0
          ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
187
188
      // Get the table_id of the created table.
189
0
      table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
190
0
    }
191
    // We will create some DB Streams to be listed out later.
192
0
    auto created_streams = ASSERT_RESULT(CreateDBStreams(3));
193
194
0
    const size_t total_created_streams = created_streams.size();
195
196
0
    google::protobuf::RepeatedPtrField<yb::master::CDCStreamInfoPB> list_streams =
197
0
        ASSERT_RESULT(ListDBStreams());
198
199
0
    const uint32_t num_streams = list_streams.size();
200
0
    ASSERT_EQ(total_created_streams, num_streams);
201
202
0
    std::vector<std::string> resp_stream_ids;
203
0
    for (uint32_t i = 0; i < num_streams; ++i) {
204
0
      if (with_table) {
205
        // Since there is one table, all the streams would contain one table_id in their response.
206
0
        ASSERT_EQ(1, list_streams.Get(i).table_id_size());
207
        // That particular table_id would be equal to the created table id.
208
0
        ASSERT_EQ(table_id, list_streams.Get(i).table_id(0));
209
0
      } else {
210
        // Since there are no tables in DB, there would be no table_ids in the response.
211
0
        ASSERT_EQ(0, list_streams.Get(i).table_id_size());
212
0
      }
213
0
      resp_stream_ids.push_back(list_streams.Get(i).stream_id());
214
0
    }
215
    // Sorting to simplify assertion.
216
0
    std::sort(resp_stream_ids.begin(), resp_stream_ids.end());
217
218
    // Verify if the stream ids returned with the response are the same as the ones created.
219
0
    for (uint32_t i = 0; i < resp_stream_ids.size(); ++i) {
220
0
      ASSERT_EQ(created_streams[i], resp_stream_ids[i]);
221
0
    }
222
0
  }
223
224
  void TestDBStreamInfo(
225
0
      const vector<std::string>& table_with_pk, const vector<std::string>& table_without_pk) {
226
0
    std::vector<std::string>::size_type num_of_tables_with_pk = table_with_pk.size();
227
228
0
    for (const auto& table_name : table_with_pk) {
229
0
      ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, table_name));
230
0
    }
231
232
0
    for (const auto& table_name : table_without_pk) {
233
0
      ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, table_name,
234
0
                                1 /* num_tablets */, false));
235
0
    }
236
237
0
    std::vector<std::string> created_table_ids_with_pk;
238
239
0
    for (const auto& table_name : table_with_pk) {
240
0
      created_table_ids_with_pk.push_back(
241
0
          ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, table_name)));
242
0
    }
243
244
0
    std::vector<std::string> created_table_ids_without_pk;
245
246
    // Sorting would make assertion easier later on.
247
0
    std::sort(created_table_ids_with_pk.begin(), created_table_ids_with_pk.end());
248
0
    std::string db_stream_id = ASSERT_RESULT(CreateDBStream());
249
250
0
    auto get_resp = ASSERT_RESULT(GetDBStreamInfo(db_stream_id));
251
0
    ASSERT_FALSE(get_resp.has_error());
252
253
    // Get the namespace ID.
254
0
    std::string namespace_id = ASSERT_RESULT(GetNamespaceId(kNamespaceName));
255
256
    // We have only 1 table, so the response will (should) have 1 table info only.
257
0
    uint32_t table_info_size = get_resp.table_info_size();
258
0
    ASSERT_EQ(num_of_tables_with_pk, table_info_size);
259
260
    // Check whether the namespace ID in the response is correct.
261
0
    ASSERT_EQ(namespace_id, get_resp.namespace_id());
262
263
    // Store the table IDs received in the response.
264
0
    std::vector<std::string> table_ids_in_resp;
265
0
    for (uint32_t i = 0; i < table_info_size; ++i) {
266
      // Also assert that all the table_info(s) contain the same db_stream_id.
267
0
      ASSERT_EQ(db_stream_id, get_resp.table_info(i).stream_id());
268
269
0
      table_ids_in_resp.push_back(get_resp.table_info(i).table_id());
270
0
    }
271
0
    std::sort(table_ids_in_resp.begin(), table_ids_in_resp.end());
272
273
    // Verifying that the table IDs received in the response are for the tables which were
274
    // created earlier.
275
0
    for (uint32_t i = 0; i < table_ids_in_resp.size(); ++i) {
276
0
      ASSERT_EQ(created_table_ids_with_pk[i], table_ids_in_resp[i]);
277
0
    }
278
0
  }
279
};
280
281
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(CreateCDCSDKStreamImplicit)) {
282
  // Create a cluster.
283
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
284
285
0
  std::string db_stream_id = ASSERT_RESULT(CreateDBStream(CDCCheckpointType::IMPLICIT));
286
0
  ASSERT_NE(0, db_stream_id.length());
287
0
}
288
289
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(CreateCDCSDKStreamExplicit)) {
290
  // Create a cluster.
291
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
292
293
  // The function CreateDBStream() creates a stream with EXPLICIT checkpointing by default.
294
0
  std::string db_stream_id = ASSERT_RESULT(CreateDBStream());
295
0
  ASSERT_NE(0, db_stream_id.length());
296
0
}
297
298
// This test is to verify the fix for the following:
299
// [#10945] Error while creating a DB Stream if any table in the database is without a primary key.
300
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(TestStreamCreation)) {
301
  // Create a cluster.
302
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
303
304
  // Create a table with primary key.
305
0
  auto table1 =
306
0
      ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_with_pk"));
307
  // Create another table without primary key.
308
0
  auto table2 = ASSERT_RESULT(
309
0
      CreateTable(&test_cluster_, kNamespaceName, "table_without_pk", 1 /* num_tablets */, false));
310
311
  // We have a table with primary key and one without primary key so while creating
312
  // the DB Stream ID, the latter one will be ignored and will not be a part of streaming with CDC.
313
  // Now we just need to ensure that everything is working fine.
314
0
  std::string db_stream_id = ASSERT_RESULT(CreateDBStream());
315
0
  ASSERT_NE(0, db_stream_id.length());
316
0
}
317
318
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(TestOnSingleRF)) {
319
  // Create a cluster.
320
0
  ASSERT_OK(SetUpWithParams(1, 1, false));
321
322
0
  std::string db_stream_id = ASSERT_RESULT(CreateDBStream());
323
0
  ASSERT_NE(0, db_stream_id.length());
324
0
}
325
326
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(DeleteDBStream)) {
327
  // Setup cluster.
328
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
329
330
  // Create a DB Stream ID to be deleted later on.
331
0
  std::string db_stream_id = ASSERT_RESULT(CreateDBStream());
332
0
  ASSERT_NE(0, db_stream_id.length());
333
334
  // Deleting the created DB Stream ID.
335
0
  ASSERT_OK(DeleteCDCStream(db_stream_id));
336
0
}
337
338
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(CreateMultipleStreams)) {
339
  // Setup cluster.
340
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
341
342
0
  auto stream_ids = ASSERT_RESULT(CreateDBStreams(3));
343
0
  ASSERT_EQ(3, stream_ids.size());
344
0
}
345
346
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(DeleteMultipleStreams)) {
347
  // Setup cluster.
348
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
349
350
0
  auto stream_ids = ASSERT_RESULT(CreateDBStreams(3));
351
0
  ASSERT_EQ(3, stream_ids.size());
352
353
0
  for (const auto& stream_id : stream_ids) {
354
    // Since we have created 3 streams, we will be deleting 3 streams too.
355
0
    ASSERT_OK(DeleteCDCStream(stream_id));
356
0
  }
357
0
}
358
359
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(ListDBStreams)) {
360
  // Setup cluster.
361
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
362
363
0
  TestListDBStreams(true);
364
0
}
365
366
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(ListDBStreams_NoTablesInDB)) {
367
  // Setup cluster.
368
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
369
370
0
  TestListDBStreams(false);
371
0
}
372
373
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(DBStreamInfoTest)) {
374
  // Set up a cluster with RF 3.
375
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
376
377
0
  TestDBStreamInfo(std::vector<std::string>{kTableName}, {});
378
0
}
379
380
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(DBStreamInfoTest_MultipleTablesInDB)) {
381
  // Set up a cluster with RF 3.
382
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
383
0
  std::vector<std::string> table_names_with_pk = {
384
0
      "pk_table1", "pk_table2", "pk_table3", "pk_table4"};
385
0
  std::vector<std::string> table_names_without_pk = {"table_without_pk"};
386
387
0
  TestDBStreamInfo(table_names_with_pk, table_names_without_pk);
388
0
}
389
390
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(DBStreamInfoTest_NoTablesInDB)) {
391
  // Set up a cluster with RF 3.
392
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
393
394
0
  TestDBStreamInfo({}, {});
395
0
}
396
397
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(DBStreamInfoTest_AllTablesWithoutPrimaryKey)) {
398
  // Set up a cluster with RF 3.
399
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
400
0
  std::vector<std::string> table_names_without_pk = {"table_without_pk_1", "table_without_pk_2"};
401
402
0
  TestDBStreamInfo({}, table_names_without_pk);
403
0
}
404
405
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(CDCWithXclusterEnabled)) {
406
  // Set up an RF 3 cluster.
407
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
408
0
  auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
409
410
  // We not need to create both xcluster and cdc streams on a table,
411
  // and we will list them to check that they are not the same.
412
413
0
  const uint32_t num_of_streams = 100;
414
415
  // Creating CDC DB streams on the table.
416
  // We get a sorted vector from CreateDBStreams() function already.
417
0
  std::vector<CDCStreamId> created_db_streams = ASSERT_RESULT(CreateDBStreams(num_of_streams));
418
419
  // Creating xCluster streams now.
420
0
  std::vector<CDCStreamId> created_xcluster_streams;
421
0
  for (uint32_t i = 0; i < num_of_streams; ++i) {
422
0
    RpcController rpc;
423
0
    CreateCDCStreamRequestPB create_req;
424
0
    CreateCDCStreamResponsePB create_resp;
425
426
0
    create_req.set_table_id(table.table_id());
427
0
    ASSERT_OK(cdc_proxy_->CreateCDCStream(create_req, &create_resp, &rpc));
428
429
    // Assert that there is no DB stream ID in the response while creating xCluster stream.
430
0
    ASSERT_FALSE(create_resp.has_db_stream_id());
431
432
0
    created_xcluster_streams.push_back(create_resp.stream_id());
433
0
  }
434
0
  std::sort(created_xcluster_streams.begin(), created_xcluster_streams.end());
435
436
  // Ensure that created streams are all different.
437
0
  for (uint32_t i = 0; i < num_of_streams; ++i) {
438
0
    ASSERT_NE(created_db_streams[i], created_xcluster_streams[i]);
439
0
  }
440
441
  // List streams for CDC and xCluster. They both should not be the same.
442
0
  google::protobuf::RepeatedPtrField<yb::master::CDCStreamInfoPB> list_cdc_resp =
443
0
      ASSERT_RESULT(ListDBStreams(kNamespaceName));
444
0
  std::vector<std::string> db_streams;
445
0
  for (int32_t i = 0; i < list_cdc_resp.size(); ++i) {
446
0
    db_streams.push_back(list_cdc_resp.Get(i).stream_id());
447
0
  }
448
0
  std::sort(db_streams.begin(), db_streams.end());
449
450
  // List the streams for xCluster.
451
0
  google::protobuf::RepeatedPtrField<yb::master::CDCStreamInfoPB> list_xcluster_resp =
452
0
      ASSERT_RESULT(ListDBStreams(kNamespaceName, table.table_id()));
453
0
  std::vector<std::string> xcluster_streams;
454
0
  for (int32_t i = 0; i < list_xcluster_resp.size(); ++i) {
455
0
    xcluster_streams.push_back(list_xcluster_resp.Get(i).stream_id());
456
0
  }
457
0
  std::sort(xcluster_streams.begin(), xcluster_streams.end());
458
459
  // Ensuring that the streams we got in both the cases are different in order to make sure that
460
  // there are no clashes.
461
0
  for (uint32_t i = 0; i < num_of_streams; ++i) {
462
0
    ASSERT_NE(db_streams[i], xcluster_streams[i]);
463
0
  }
464
0
}
465
466
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(ImplicitCheckPointValidate)) {
467
  // Create a cluster.
468
0
  ASSERT_OK(SetUpWithParams(3, 1, false));
469
470
  // Create a DB Stream.
471
0
  std::string db_stream_id = ASSERT_RESULT(CreateDBStream(CDCCheckpointType::IMPLICIT));
472
0
  ASSERT_NE(0, db_stream_id.length());
473
474
  // Get the list of dbstream.
475
0
  google::protobuf::RepeatedPtrField<yb::master::CDCStreamInfoPB> list_streams =
476
0
      ASSERT_RESULT(ListDBStreams(kNamespaceName));
477
0
  const uint32_t num_streams = list_streams.size();
478
479
0
  for (uint32_t i = 0; i < num_streams; ++i) {
480
    // Validate the streamid.
481
0
    ASSERT_EQ(db_stream_id, list_streams.Get(i).stream_id());
482
483
0
    const uint32_t options_sz = list_streams.Get(i).options_size();
484
0
    for (uint32_t j = 0; j < options_sz; j++) {
485
      // Validate the checkpoint type IMPLICIT.
486
0
      string cur_key = list_streams.Get(i).options(j).key();
487
0
      string cur_value = list_streams.Get(i).options(j).value();
488
0
      if (cur_key == string("checkpoint_type")) {
489
0
        ASSERT_EQ(cur_value, string("IMPLICIT"));
490
0
      }
491
0
    }
492
0
  }
493
0
}
494
495
0
TEST_F(CDCSDKStreamTest, YB_DISABLE_TEST_IN_TSAN(ExplicitCheckPointValidate)) {
496
    // Create a cluster.
497
0
    ASSERT_OK(SetUpWithParams(3, 1, false));
498
499
    // Create a DB Stream.
500
0
    std::string db_stream_id = ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT));
501
0
    ASSERT_NE(0, db_stream_id.length());
502
503
    // Get the list of dbstream.
504
0
    google::protobuf::RepeatedPtrField<yb::master::CDCStreamInfoPB> list_streams =
505
0
        ASSERT_RESULT(ListDBStreams(kNamespaceName));
506
0
    const uint32_t num_streams = list_streams.size();
507
508
0
    for (uint32_t i = 0; i < num_streams; ++i) {
509
      // Validate the streamid.
510
0
      ASSERT_EQ(db_stream_id, list_streams.Get(i).stream_id());
511
512
0
      const uint32_t options_sz = list_streams.Get(i).options_size();
513
0
      for (uint32_t j = 0; j < options_sz; j++) {
514
        // Validate the checkpoint type EXPLICIT.
515
0
        string cur_key = list_streams.Get(i).options(j).key();
516
0
        string cur_value = list_streams.Get(i).options(j).value();
517
0
        if (cur_key == string("checkpoint_type")) {
518
0
          ASSERT_EQ(cur_value, string("EXPLICIT"));
519
0
        }
520
0
      }
521
0
    }
522
0
}
523
524
}  // namespace enterprise
525
}  // namespace cdc
526
}  // namespace yb