YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdc_service-int-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
3
#include <gflags/gflags_declare.h>
4
5
#include "yb/common/wire_protocol.h"
6
#include "yb/common/wire_protocol-test-util.h"
7
#include "yb/common/ql_value.h"
8
9
#include "yb/consensus/log.h"
10
#include "yb/consensus/log_reader.h"
11
12
#include "yb/cdc/cdc_service.h"
13
#include "yb/cdc/cdc_service.proxy.h"
14
#include "yb/client/error.h"
15
#include "yb/client/schema.h"
16
#include "yb/client/session.h"
17
#include "yb/client/table.h"
18
#include "yb/client/table_handle.h"
19
#include "yb/client/yb_op.h"
20
#include "yb/client/yb_table_name.h"
21
#include "yb/client/client-test-util.h"
22
#include "yb/docdb/primitive_value.h"
23
#include "yb/docdb/value_type.h"
24
25
#include "yb/gutil/casts.h"
26
#include "yb/gutil/walltime.h"
27
28
#include "yb/integration-tests/cdc_test_util.h"
29
#include "yb/integration-tests/mini_cluster.h"
30
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
31
32
#include "yb/master/master_defaults.h"
33
#include "yb/master/mini_master.h"
34
#include "yb/rpc/messenger.h"
35
#include "yb/rpc/rpc_controller.h"
36
#include "yb/tablet/tablet.h"
37
#include "yb/tablet/tablet_metadata.h"
38
#include "yb/tablet/tablet_peer.h"
39
40
#include "yb/tserver/mini_tablet_server.h"
41
#include "yb/tserver/tablet_server.h"
42
#include "yb/tserver/ts_tablet_manager.h"
43
#include "yb/tserver/tserver_service.proxy.h"
44
45
#include "yb/util/format.h"
46
#include "yb/util/metrics.h"
47
#include "yb/util/monotime.h"
48
#include "yb/util/slice.h"
49
#include "yb/util/status_format.h"
50
#include "yb/util/tsan_util.h"
51
#include "yb/yql/cql/ql/util/errcodes.h"
52
#include "yb/yql/cql/ql/util/statement_result.h"
53
54
DECLARE_bool(TEST_record_segments_violate_max_time_policy);
55
DECLARE_bool(TEST_record_segments_violate_min_space_policy);
56
DECLARE_bool(enable_load_balancing);
57
DECLARE_bool(enable_log_retention_by_op_idx);
58
DECLARE_bool(enable_ysql);
59
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
60
DECLARE_int32(cdc_min_replicated_index_considered_stale_secs);
61
DECLARE_int32(cdc_state_checkpoint_update_interval_ms);
62
DECLARE_int32(cdc_wal_retention_time_secs);
63
DECLARE_int32(client_read_write_timeout_ms);
64
DECLARE_int32(follower_unavailable_considered_failed_sec);
65
DECLARE_int32(log_max_seconds_to_retain);
66
DECLARE_int32(log_min_seconds_to_retain);
67
DECLARE_int32(log_min_segments_to_retain);
68
DECLARE_int32(update_min_cdc_indices_interval_secs);
69
DECLARE_int64(TEST_simulate_free_space_bytes);
70
DECLARE_int64(log_stop_retaining_min_disk_mb);
71
DECLARE_uint64(log_segment_size_bytes);
72
DECLARE_int32(update_metrics_interval_ms);
73
DECLARE_bool(enable_collect_cdc_metrics);
74
DECLARE_bool(cdc_enable_replicate_intents);
75
DECLARE_bool(get_changes_honor_deadline);
76
DECLARE_int32(cdc_read_rpc_timeout_ms);
77
DECLARE_int32(TEST_get_changes_read_loop_delay_ms);
78
DECLARE_double(cdc_read_safe_deadline_ratio);
79
80
METRIC_DECLARE_entity(cdc);
81
METRIC_DECLARE_gauge_int64(last_read_opid_index);
82
83
namespace yb {
84
85
namespace log {
86
class LogReader;
87
}
88
89
namespace cdc {
90
91
using client::TableHandle;
92
using client::YBSessionPtr;
93
using master::MiniMaster;
94
using rpc::RpcController;
95
96
const std::string kCDCTestKeyspace = "my_keyspace";
97
const std::string kCDCTestTableName = "cdc_test_table";
98
const client::YBTableName kTableName(YQL_DATABASE_CQL, kCDCTestKeyspace, kCDCTestTableName);
99
const client::YBTableName kCdcStateTableName(
100
    YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
101
102
0
CDCServiceImpl* CDCService(tserver::TabletServer* tserver) {
103
0
  return down_cast<CDCServiceImpl*>(
104
0
      tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get());
105
0
}
106
107
class CDCServiceTest : public YBMiniClusterTestBase<MiniCluster>,
108
                       public testing::WithParamInterface<bool> {
109
 protected:
110
0
  void SetUp() override {
111
0
    YBMiniClusterTestBase::SetUp();
112
113
0
    MiniClusterOptions opts;
114
0
    SetAtomicFlag(false, &FLAGS_enable_ysql);
115
0
    SetAtomicFlag(GetParam(), &FLAGS_cdc_enable_replicate_intents);
116
0
    SetAtomicFlag(1000, &FLAGS_update_metrics_interval_ms);
117
0
    opts.num_tablet_servers = server_count();
118
0
    opts.num_masters = 1;
119
0
    cluster_.reset(new MiniCluster(opts));
120
0
    ASSERT_OK(cluster_->Start());
121
122
0
    client_ = ASSERT_RESULT(cluster_->CreateClient());
123
0
    cdc_proxy_ = std::make_unique<CDCServiceProxy>(
124
0
        &client_->proxy_cache(),
125
0
        HostPort::FromBoundEndpoint(cluster_->mini_tablet_server(0)->bound_rpc_addr()));
126
127
0
    CreateTable(tablet_count(), &table_);
128
0
  }
129
130
0
  void DoTearDown() override {
131
0
    if (!(stream_id_.empty())) {
132
0
      ASSERT_OK(client_->DeleteCDCStream(stream_id_,
133
0
                                         false /*force_delete*/,
134
0
                                         true /*ignore_errors*/));
135
      // wait for stream to completely finish deleting
136
0
    }
137
0
    Result<bool> exist = client_->TableExists(kTableName);
138
0
    ASSERT_OK(exist);
139
140
0
    if (exist.get()) {
141
0
      ASSERT_OK(client_->DeleteTable(kTableName));
142
0
    }
143
144
0
    client_.reset();
145
146
0
    if (cluster_) {
147
0
      cluster_->Shutdown();
148
0
      cluster_.reset();
149
0
    }
150
151
0
    YBMiniClusterTestBase::DoTearDown();
152
0
  }
153
154
  void CreateTable(int num_tablets, TableHandle* table);
155
  void GetTablets(std::vector<TabletId>* tablet_ids,
156
                  const client::YBTableName& table_name = kTableName);
157
  void GetTablet(std::string* tablet_id,
158
                 const client::YBTableName& table_name = kTableName);
159
  void GetChanges(const TabletId& tablet_id, const CDCStreamId& stream_id,
160
      int64_t term, int64_t index, bool* has_error = nullptr);
161
  void WriteTestRow(int32_t key, int32_t int_val, const string& string_val,
162
      const TabletId& tablet_id, const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy);
163
  CHECKED_STATUS WriteToProxyWithRetries(
164
      const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy,
165
      const tserver::WriteRequestPB& req, tserver::WriteResponsePB* resp, RpcController* rpc);
166
  CHECKED_STATUS GetChangesWithRetries(
167
      const GetChangesRequestPB& change_req, GetChangesResponsePB* change_resp,
168
      int timeout_ms, int max_attempts = 3);
169
  tserver::MiniTabletServer* GetLeaderForTablet(const std::string& tablet_id);
170
0
  virtual int server_count() { return 1; }
171
0
  virtual int tablet_count() { return 1; }
172
173
  std::unique_ptr<CDCServiceProxy> cdc_proxy_;
174
  std::unique_ptr<client::YBClient> client_;
175
  CDCStreamId stream_id_;
176
  TableHandle table_;
177
};
178
179
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTest, ::testing::Bool());
180
181
0
void CDCServiceTest::CreateTable(int num_tablets, TableHandle* table) {
182
0
  ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
183
0
                                                kTableName.namespace_type()));
184
185
0
  client::YBSchemaBuilder builder;
186
0
  builder.AddColumn("key")->Type(INT32)->HashPrimaryKey()->NotNull();
187
0
  builder.AddColumn("int_val")->Type(INT32);
188
0
  builder.AddColumn("string_val")->Type(STRING);
189
190
0
  TableProperties table_properties;
191
0
  table_properties.SetTransactional(true);
192
0
  builder.SetTableProperties(table_properties);
193
194
0
  ASSERT_OK(table->Create(kTableName, num_tablets, client_.get(), &builder));
195
0
}
196
197
void AssertChangeRecords(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& changes,
198
                         int32_t expected_int, std::string expected_str) {
199
  ASSERT_EQ(changes.size(), 2);
200
  ASSERT_EQ(changes[0].key(), "int_val");
201
  ASSERT_EQ(changes[0].value().int32_value(), expected_int);
202
  ASSERT_EQ(changes[1].key(), "string_val");
203
  ASSERT_EQ(changes[1].value().string_value(), expected_str);
204
}
205
206
0
void VerifyCdcStateNotEmpty(client::YBClient* client) {
207
0
  client::TableHandle table;
208
0
  client::YBTableName cdc_state_table(
209
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
210
0
  ASSERT_OK(table.Open(cdc_state_table, client));
211
0
  ASSERT_EQ(1, boost::size(client::TableRange(table)));
212
0
  const auto& row = client::TableRange(table).begin();
213
0
  string checkpoint = row->column(master::kCdcCheckpointIdx).string_value();
214
0
  auto result = OpId::FromString(checkpoint);
215
0
  ASSERT_OK(result);
216
0
  OpId op_id = *result;
217
  // Verify that op id index has been advanced and is not 0.
218
0
  ASSERT_GT(op_id.index, 0);
219
0
}
220
221
void VerifyCdcStateMatches(client::YBClient* client,
222
                           const CDCStreamId& stream_id,
223
                           const TabletId& tablet_id,
224
                           uint64_t term,
225
0
                           uint64_t index)  {
226
0
  client::TableHandle table;
227
0
  client::YBTableName cdc_state_table(
228
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
229
0
  ASSERT_OK(table.Open(cdc_state_table, client));
230
0
  const auto op = table.NewReadOp();
231
0
  auto* const req = op->mutable_request();
232
0
  QLAddStringHashValue(req, tablet_id);
233
0
  auto cond = req->mutable_where_expr()->mutable_condition();
234
0
  cond->set_op(QLOperator::QL_OP_AND);
235
0
  QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL,
236
0
      stream_id);
237
0
  table.AddColumns({master::kCdcCheckpoint}, req);
238
239
0
  auto session = client->NewSession();
240
0
  ASSERT_OK(session->ApplyAndFlush(op));
241
242
0
  LOG(INFO) << strings::Substitute("Verifying tablet: $0, stream: $1, op_id: $2",
243
0
      tablet_id, stream_id, OpId(term, index).ToString());
244
245
0
  auto row_block = ql::RowsResult(op.get()).GetRowBlock();
246
0
  ASSERT_EQ(row_block->row_count(), 1);
247
248
0
  string checkpoint = row_block->row(0).column(0).string_value();
249
0
  auto result = OpId::FromString(checkpoint);
250
0
  ASSERT_OK(result);
251
0
  OpId op_id = *result;
252
253
0
  ASSERT_EQ(op_id.term, term);
254
0
  ASSERT_EQ(op_id.index, index);
255
0
}
256
257
void VerifyStreamDeletedFromCdcState(client::YBClient* client,
258
                                     const CDCStreamId& stream_id,
259
                                     const TabletId& tablet_id,
260
0
                                     int timeout_secs = 10) {
261
0
  client::TableHandle table;
262
0
  const client::YBTableName cdc_state_table(
263
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
264
0
  ASSERT_OK(table.Open(cdc_state_table, client));
265
266
0
  const auto op = table.NewReadOp();
267
0
  auto* const req = op->mutable_request();
268
0
  QLAddStringHashValue(req, tablet_id);
269
270
0
  auto cond = req->mutable_where_expr()->mutable_condition();
271
0
  cond->set_op(QLOperator::QL_OP_AND);
272
0
  QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL,
273
0
      stream_id);
274
275
0
  table.AddColumns({master::kCdcCheckpoint}, req);
276
0
  auto session = client->NewSession();
277
278
  // The deletion of cdc_state rows for the specified stream happen in an asynchronous thread,
279
  // so even if the request has returned, it doesn't mean that the rows have been deleted yet.
280
0
  ASSERT_OK(WaitFor([&](){
281
0
    EXPECT_OK(session->ApplyAndFlush(op));
282
0
    auto row_block = ql::RowsResult(op.get()).GetRowBlock();
283
0
    if (row_block->row_count() == 0) {
284
0
      return true;
285
0
    }
286
0
    return false;
287
0
  }, MonoDelta::FromSeconds(timeout_secs) * kTimeMultiplier,
288
0
      "Stream rows in cdc_state have been deleted."));
289
0
}
290
291
void CDCServiceTest::GetTablets(std::vector<TabletId>* tablet_ids,
292
0
                                const client::YBTableName& table_name) {
293
0
  std::vector<std::string> ranges;
294
0
  ASSERT_OK(client_->GetTablets(
295
0
      table_name, 0 /* max_tablets */, tablet_ids, &ranges, nullptr /* locations */,
296
0
      RequireTabletsRunning::kFalse, master::IncludeInactive::kTrue));
297
0
  ASSERT_EQ(tablet_ids->size(), tablet_count());
298
0
}
299
300
void CDCServiceTest::GetTablet(std::string* tablet_id,
301
0
                               const client::YBTableName& table_name) {
302
0
  std::vector<TabletId> tablet_ids;
303
0
  GetTablets(&tablet_ids, table_name);
304
0
  *tablet_id = tablet_ids[0];
305
0
}
306
307
void CDCServiceTest::GetChanges(const TabletId& tablet_id, const CDCStreamId& stream_id,
308
0
                                int64_t term, int64_t index, bool* has_error) {
309
0
  GetChangesRequestPB change_req;
310
0
  GetChangesResponsePB change_resp;
311
312
0
  change_req.set_tablet_id(tablet_id);
313
0
  change_req.set_stream_id(stream_id);
314
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(term);
315
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(index);
316
0
  change_req.set_serve_as_proxy(true);
317
318
0
  {
319
0
    RpcController rpc;
320
0
    rpc.set_timeout(MonoDelta::FromSeconds(10.0) * kTimeMultiplier);
321
0
    SCOPED_TRACE(change_req.DebugString());
322
0
    auto s = cdc_proxy_->GetChanges(change_req, &change_resp, &rpc);
323
0
    if (!has_error) {
324
0
      ASSERT_OK(s);
325
0
      ASSERT_FALSE(change_resp.has_error());
326
0
    } else if (!s.ok() || change_resp.has_error()) {
327
0
      *has_error = true;
328
0
      return;
329
0
    }
330
0
  }
331
0
}
332
333
void CDCServiceTest::WriteTestRow(int32_t key,
334
                                  int32_t int_val,
335
                                  const string& string_val,
336
                                  const TabletId& tablet_id,
337
0
                                  const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy) {
338
0
  tserver::WriteRequestPB write_req;
339
0
  tserver::WriteResponsePB write_resp;
340
0
  write_req.set_tablet_id(tablet_id);
341
342
0
  RpcController rpc;
343
0
  AddTestRowInsert(key, int_val, string_val, &write_req);
344
0
  SCOPED_TRACE(write_req.DebugString());
345
0
  ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
346
0
  SCOPED_TRACE(write_resp.DebugString());
347
0
  ASSERT_FALSE(write_resp.has_error());
348
0
}
349
350
Status CDCServiceTest::WriteToProxyWithRetries(
351
    const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy,
352
    const tserver::WriteRequestPB& req,
353
    tserver::WriteResponsePB* resp,
354
0
    RpcController* rpc) {
355
0
  return LoggedWaitFor(
356
0
      [&req, resp, rpc, proxy]() -> Result<bool> {
357
0
        auto s = proxy->Write(req, resp, rpc);
358
0
        if (s.IsTryAgain() ||
359
0
            (resp->has_error() && StatusFromPB(resp->error().status()).IsTryAgain())) {
360
0
          rpc->Reset();
361
0
          return false;
362
0
        }
363
0
        RETURN_NOT_OK(s);
364
0
        return true;
365
0
      },
366
0
      MonoDelta::FromSeconds(10) * kTimeMultiplier, "Write test row");
367
0
}
368
369
Status CDCServiceTest::GetChangesWithRetries(
370
  const GetChangesRequestPB& req,
371
  GetChangesResponsePB* resp,
372
  int timeout_ms,
373
0
  int max_attempts) {
374
  // Keep track of number of attempts.
375
0
  int num_attempts = 0;
376
377
0
  auto return_status = LoggedWaitFor(
378
0
    [&]() -> Result<bool> {
379
      // RpcController needs to be rebuilt every time since the deadline is constant.
380
0
      RpcController rpc;
381
0
      rpc.set_timeout(MonoDelta::FromMilliseconds(timeout_ms));
382
383
      // Update return_status to be the status from the latest attempt.
384
0
      auto s = cdc_proxy_->GetChanges(req, resp, &rpc);
385
0
      ++num_attempts;
386
387
      // Exit if we exhausted the number of attempts.
388
0
      if (num_attempts >= max_attempts) {
389
0
        return s.ok() ? Status::OK() : STATUS_FORMAT(
390
0
          TimedOut, "Tried calling GetChanges $0 times with no success.", num_attempts);
391
0
      }
392
393
      // Try again if we still have attempts left.
394
0
      if (!s.ok()) {
395
0
        return false;
396
0
      }
397
398
0
      return true;
399
0
    },
400
0
    MonoDelta::FromSeconds(5) * max_attempts * kTimeMultiplier, "Call GetChanges");
401
402
0
  return return_status;
403
0
}
404
405
0
tserver::MiniTabletServer* CDCServiceTest::GetLeaderForTablet(const std::string& tablet_id) {
406
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
407
0
    if (cluster_->mini_tablet_server(i)->server()->LeaderAndReady(tablet_id)) {
408
0
      return cluster_->mini_tablet_server(i);
409
0
    }
410
0
  }
411
0
  return nullptr;
412
0
}
413
414
0
TEST_P(CDCServiceTest, TestCompoundKey) {
415
  // Create a table with a compound primary key.
416
0
  static const std::string kCDCTestTableCompoundKeyName = "cdc_test_table_compound_key";
417
0
  static const client::YBTableName kTableNameCompoundKey(
418
0
    YQL_DATABASE_CQL, kCDCTestKeyspace, kCDCTestTableCompoundKeyName);
419
420
0
  client::YBSchemaBuilder builder;
421
0
  builder.AddColumn("hash_key")->Type(STRING)->HashPrimaryKey()->NotNull();
422
0
  builder.AddColumn("range_key")->Type(STRING)->PrimaryKey()->NotNull();
423
0
  builder.AddColumn("val")->Type(INT32);
424
425
0
  TableProperties table_properties;
426
0
  table_properties.SetTransactional(true);
427
0
  builder.SetTableProperties(table_properties);
428
429
0
  TableHandle table;
430
0
  ASSERT_OK(table.Create(kTableNameCompoundKey, tablet_count(), client_.get(), &builder));
431
432
  // Create a stream on the table
433
0
  CreateCDCStream(cdc_proxy_, table.table()->id(), &stream_id_);
434
435
0
  std::string tablet_id;
436
0
  GetTablet(&tablet_id, table.name());
437
438
  // Now apply two ops with same hash key but different range key in a batch.
439
0
  auto session = client_->NewSession();
440
0
  for (int i = 0; i < 2; i++) {
441
0
    const auto op = table.NewUpdateOp();
442
0
    auto* const req = op->mutable_request();
443
0
    QLAddStringHashValue(req, "hk");
444
0
    QLAddStringRangeValue(req, Format("rk_$0", i));
445
0
    table.AddInt32ColumnValue(req, "val", i);
446
0
    session->Apply(op);
447
0
  }
448
0
  ASSERT_OK(session->Flush());
449
450
  // Get CDC changes.
451
0
  GetChangesRequestPB change_req;
452
0
  GetChangesResponsePB change_resp;
453
454
0
  change_req.set_tablet_id(tablet_id);
455
0
  change_req.set_stream_id(stream_id_);
456
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
457
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
458
459
0
  {
460
0
    RpcController rpc;
461
0
    SCOPED_TRACE(change_req.DebugString());
462
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
463
0
    ASSERT_FALSE(change_resp.has_error());
464
0
    ASSERT_EQ(change_resp.records_size(), 2);
465
0
  }
466
467
  // Verify the results.
468
0
  for (int i = 0; i < change_resp.records_size(); i++) {
469
0
    ASSERT_EQ(change_resp.records(i).operation(), CDCRecordPB::WRITE);
470
471
0
    ASSERT_EQ(change_resp.records(i).key_size(), 2);
472
    // Check the key.
473
0
    ASSERT_EQ(change_resp.records(i).key(0).value().string_value(), "hk");
474
0
    ASSERT_EQ(change_resp.records(i).key(1).value().string_value(), Format("rk_$0", i));
475
476
0
    ASSERT_EQ(change_resp.records(i).changes_size(), 1);
477
0
    ASSERT_EQ(change_resp.records(i).changes(0).value().int32_value(), i);
478
0
  }
479
0
}
480
481
0
TEST_P(CDCServiceTest, TestCreateCDCStream) {
482
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
483
484
0
  NamespaceId ns_id;
485
0
  std::vector<TableId> table_ids;
486
0
  std::unordered_map<std::string, std::string> options;
487
0
  ASSERT_OK(client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options));
488
0
  ASSERT_EQ(table_ids.front(), table_.table()->id());
489
0
}
490
491
0
TEST_P(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) {
492
  // Set default WAL retention time to 10 hours.
493
0
  FLAGS_cdc_wal_retention_time_secs = 36000;
494
495
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
496
497
0
  NamespaceId ns_id;
498
0
  std::vector<TableId> table_ids;
499
0
  std::unordered_map<std::string, std::string> options;
500
0
  ASSERT_OK(client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options));
501
502
503
  // Verify that the wal retention time was set at the tablet level.
504
0
  VerifyWalRetentionTime(cluster_.get(), kCDCTestTableName, FLAGS_cdc_wal_retention_time_secs);
505
0
}
506
507
0
TEST_P(CDCServiceTest, TestDeleteCDCStream) {
508
0
  FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
509
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
510
511
0
  NamespaceId ns_id;
512
0
  std::vector<TableId> table_ids;
513
0
  std::unordered_map<std::string, std::string> options;
514
0
  ASSERT_OK(client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options));
515
0
  ASSERT_EQ(table_ids.front(), table_.table()->id());
516
517
518
0
  std::vector<std::string> tablet_ids;
519
0
  std::vector<std::string> ranges;
520
0
  ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges));
521
522
0
  for (const auto& tablet_id : tablet_ids) {
523
0
    VerifyCdcStateMatches(client_.get(), stream_id_, tablet_id, 0, 0);
524
0
  }
525
526
0
  {
527
0
    const auto& tserver = cluster_->mini_tablet_server(0)->server();
528
0
    std::string tablet_id;
529
0
    GetTablet(&tablet_id, table_.name());
530
0
    ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, tserver->proxy()));
531
0
  }
532
533
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
534
535
  // Check that the stream still no longer exists.
536
0
  ns_id.clear();
537
0
  table_ids.clear();
538
0
  options.clear();
539
0
  Status s = client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options);
540
0
  ASSERT_TRUE(s.IsNotFound());
541
542
0
  for (const auto& tablet_id : tablet_ids) {
543
0
    VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
544
0
  }
545
546
  // Once the CatalogManager has cleaned up cdc_state, make sure a subsequent call to GetChanges
547
  // doesn't re-populate cdc_state with cleaned up entries. UpdateCheckpoint will be called since
548
  // this is the first time we're calling GetChanges.
549
0
  bool get_changes_error = false;
550
0
  for (const auto& tablet_id : tablet_ids) {
551
0
    GetChanges(tablet_id, stream_id_, 0, 0, &get_changes_error);
552
0
    ASSERT_FALSE(get_changes_error);
553
0
  }
554
555
0
  for (const auto& tablet_id : tablet_ids) {
556
0
    VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
557
0
  }
558
0
}
559
560
0
TEST_P(CDCServiceTest, TestMetricsOnDeletedReplication) {
561
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
562
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true;
563
564
0
  std::string tablet_id;
565
0
  GetTablet(&tablet_id);
566
567
0
  const auto& tserver = cluster_->mini_tablet_server(0)->server();
568
  // Use proxy for to most accurately simulate normal requests.
569
0
  const auto& proxy = tserver->proxy();
570
571
0
  GetChangesRequestPB change_req;
572
0
  GetChangesResponsePB change_resp;
573
0
  change_req.set_tablet_id(tablet_id);
574
0
  change_req.set_stream_id(stream_id_);
575
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
576
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
577
0
  {
578
0
    RpcController rpc;
579
0
    SCOPED_TRACE(change_req.DebugString());
580
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
581
0
  }
582
583
  // Insert test rows, one at a time so they have different hybrid times.
584
0
  tserver::WriteRequestPB write_req;
585
0
  tserver::WriteResponsePB write_resp;
586
0
  write_req.set_tablet_id(tablet_id);
587
0
  {
588
0
    RpcController rpc;
589
0
    AddTestRowInsert(1, 11, "key1", &write_req);
590
0
    AddTestRowInsert(2, 22, "key2", &write_req);
591
0
    SCOPED_TRACE(write_req.DebugString());
592
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
593
0
    SCOPED_TRACE(write_resp.DebugString());
594
0
    ASSERT_FALSE(write_resp.has_error());
595
0
  }
596
597
0
  auto cdc_service = CDCService(tserver);
598
  // Assert that leader lag > 0.
599
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
600
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
601
0
    return metrics->async_replication_sent_lag_micros->value() > 0 &&
602
0
        metrics->async_replication_committed_lag_micros->value() > 0;
603
0
  }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Lag > 0"));
604
605
  // Now, delete the replication stream and assert that lag is 0.
606
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
607
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
608
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
609
0
    return metrics->async_replication_sent_lag_micros->value() == 0 &&
610
0
        metrics->async_replication_committed_lag_micros->value() == 0;
611
0
  }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Lag = 0"));
612
0
}
613
614
615
0
TEST_P(CDCServiceTest, TestGetChanges) {
616
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
617
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true;
618
619
0
  std::string tablet_id;
620
0
  GetTablet(&tablet_id);
621
622
0
  const auto& tserver = cluster_->mini_tablet_server(0)->server();
623
  // Use proxy for to most accurately simulate normal requests.
624
0
  const auto& proxy = tserver->proxy();
625
626
  // Insert test rows, one at a time so they have different hybrid times.
627
0
  tserver::WriteRequestPB write_req;
628
0
  tserver::WriteResponsePB write_resp;
629
0
  write_req.set_tablet_id(tablet_id);
630
0
  {
631
0
    RpcController rpc;
632
0
    AddTestRowInsert(1, 11, "key1", &write_req);
633
0
    AddTestRowInsert(2, 22, "key2", &write_req);
634
0
    SCOPED_TRACE(write_req.DebugString());
635
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
636
0
    SCOPED_TRACE(write_resp.DebugString());
637
0
    ASSERT_FALSE(write_resp.has_error());
638
0
  }
639
640
  // Get CDC changes.
641
0
  GetChangesRequestPB change_req;
642
0
  GetChangesResponsePB change_resp;
643
644
0
  change_req.set_tablet_id(tablet_id);
645
0
  change_req.set_stream_id(stream_id_);
646
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
647
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
648
649
0
  {
650
0
    RpcController rpc;
651
0
    SCOPED_TRACE(change_req.DebugString());
652
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
653
0
    SCOPED_TRACE(change_resp.DebugString());
654
0
    ASSERT_FALSE(change_resp.has_error());
655
0
    ASSERT_EQ(change_resp.records_size(), 2);
656
657
0
    std::pair<int, std::string> expected_results[2] =
658
0
        {std::make_pair(11, "key1"), std::make_pair(22, "key2")};
659
0
    for (int i = 0; i < change_resp.records_size(); i++) {
660
0
      ASSERT_EQ(change_resp.records(i).operation(), CDCRecordPB::WRITE);
661
662
      // Check the key.
663
0
      ASSERT_NO_FATALS(AssertIntKey(change_resp.records(i).key(), i + 1));
664
665
      // Check the change records.
666
0
      ASSERT_NO_FATALS(AssertChangeRecords(change_resp.records(i).changes(),
667
0
                                           expected_results[i].first,
668
0
                                           expected_results[i].second));
669
0
    }
670
671
    // Verify the CDC Service-level metrics match what we just did.
672
0
    auto cdc_service = CDCService(tserver);
673
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
674
0
    ASSERT_EQ(metrics->last_read_opid_index->value(), metrics->last_readable_opid_index->value());
675
0
    ASSERT_EQ(metrics->last_read_opid_index->value(), change_resp.records_size() + 1 /* checkpt */);
676
0
    ASSERT_EQ(metrics->rpc_payload_bytes_responded->TotalCount(), 1);
677
0
  }
678
679
  // Insert another row.
680
0
  {
681
0
    write_req.Clear();
682
0
    write_req.set_tablet_id(tablet_id);
683
0
    AddTestRowInsert(3, 33, "key3", &write_req);
684
685
0
    RpcController rpc;
686
0
    SCOPED_TRACE(write_req.DebugString());
687
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
688
0
    SCOPED_TRACE(write_resp.DebugString());
689
0
    ASSERT_FALSE(write_resp.has_error());
690
0
  }
691
692
  // Get next set of changes.
693
  // Copy checkpoint received from previous GetChanges CDC request.
694
0
  change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
695
0
  change_resp.Clear();
696
0
  {
697
0
    RpcController rpc;
698
0
    SCOPED_TRACE(change_req.DebugString());
699
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
700
0
    SCOPED_TRACE(change_resp.DebugString());
701
0
    ASSERT_FALSE(change_resp.has_error());
702
0
    ASSERT_EQ(change_resp.records_size(), 1);
703
0
    ASSERT_EQ(change_resp.records(0).operation(), CDCRecordPB_OperationType_WRITE);
704
705
    // Check the key.
706
0
    ASSERT_NO_FATALS(AssertIntKey(change_resp.records(0).key(), 3));
707
708
    // Check the change records.
709
0
    ASSERT_NO_FATALS(AssertChangeRecords(change_resp.records(0).changes(), 33, "key3"));
710
0
  }
711
712
  // Delete a row.
713
0
  {
714
0
    write_req.Clear();
715
0
    write_req.set_tablet_id(tablet_id);
716
0
    AddTestRowDelete(1, &write_req);
717
718
0
    RpcController rpc;
719
0
    SCOPED_TRACE(write_req.DebugString());
720
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
721
0
    SCOPED_TRACE(write_resp.DebugString());
722
0
    ASSERT_FALSE(write_resp.has_error());
723
0
  }
724
725
  // Get next set of changes.
726
  // Copy checkpoint received from previous GetChanges CDC request.
727
0
  change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
728
0
  change_resp.Clear();
729
0
  {
730
0
    RpcController rpc;
731
0
    SCOPED_TRACE(change_req.DebugString());
732
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
733
0
    SCOPED_TRACE(change_resp.DebugString());
734
0
    ASSERT_FALSE(change_resp.has_error());
735
0
    ASSERT_EQ(change_resp.records_size(), 1);
736
0
    ASSERT_EQ(change_resp.records(0).operation(), CDCRecordPB_OperationType_DELETE);
737
738
    // Check the key deleted.
739
0
    ASSERT_NO_FATALS(AssertIntKey(change_resp.records(0).key(), 1));
740
0
  }
741
742
  // Cleanup stream before shutdown.
743
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
744
0
  VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
745
0
}
746
747
0
TEST_P(CDCServiceTest, TestGetChangesWithDeadline) {
748
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
749
0
  FLAGS_TEST_get_changes_read_loop_delay_ms = kTimeMultiplier;
750
0
  FLAGS_log_segment_size_bytes = 100;
751
0
  FLAGS_cdc_read_rpc_timeout_ms = 100 * kTimeMultiplier;
752
0
  FLAGS_get_changes_honor_deadline = true;
753
0
  FLAGS_cdc_read_safe_deadline_ratio = 0.30;
754
755
0
  std::string tablet_id;
756
0
  GetTablet(&tablet_id);
757
758
0
  const auto& tserver = cluster_->mini_tablet_server(0)->server();
759
  // Use proxy for to most accurately simulate normal requests.
760
0
  const auto& proxy = tserver->proxy();
761
762
  // Insert <num_records> test rows.
763
0
  const int num_records = 500;
764
0
  for (int i = 0; i < num_records; i++) {
765
0
    WriteTestRow(i, i, Format("key$0", i), tablet_id, proxy);
766
0
  }
767
768
  // Get CDC changes. Note that the timeout value and read delay
769
  // should ensure that some, but not all records are read.
770
0
  GetChangesRequestPB change_req;
771
0
  GetChangesResponsePB change_resp;
772
773
0
  change_req.set_tablet_id(tablet_id);
774
0
  change_req.set_stream_id(stream_id_);
775
0
  change_req.set_max_records(num_records);
776
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
777
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
778
779
0
  SCOPED_TRACE(change_req.DebugString());
780
0
  ASSERT_OK(GetChangesWithRetries(change_req, &change_resp,
781
0
                                  FLAGS_cdc_read_rpc_timeout_ms));
782
783
0
  SCOPED_TRACE(change_resp.DebugString());
784
0
  ASSERT_FALSE(change_resp.has_error());
785
786
  // Ensure that partial results are returned
787
0
  ASSERT_GT(change_resp.records_size(), 0);
788
0
  ASSERT_LT(change_resp.records_size(), num_records);
789
790
  // Try again, but use a timeout value large enough such that
791
  // all records should be read before timeout.
792
0
  FLAGS_TEST_get_changes_read_loop_delay_ms = 0;
793
0
  FLAGS_cdc_read_rpc_timeout_ms = 30 * 1000 * kTimeMultiplier;
794
795
0
  ASSERT_OK(GetChangesWithRetries(change_req, &change_resp,
796
0
                                  FLAGS_cdc_read_rpc_timeout_ms));
797
0
  SCOPED_TRACE(change_resp.DebugString());
798
0
  ASSERT_FALSE(change_resp.has_error());
799
800
  // This time, we expect all records to be read.
801
0
  ASSERT_EQ(change_resp.records_size(), num_records);
802
803
  // Cleanup stream before shutdown.
804
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
805
0
  VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
806
0
}
807
808
0
TEST_P(CDCServiceTest, TestGetChangesInvalidStream) {
809
0
  std::string tablet_id;
810
0
  GetTablet(&tablet_id);
811
812
  // Get CDC changes for non-existent stream.
813
0
  GetChangesRequestPB change_req;
814
0
  GetChangesResponsePB change_resp;
815
816
0
  change_req.set_tablet_id(tablet_id);
817
0
  change_req.set_stream_id("InvalidStreamId");
818
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
819
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
820
821
0
  RpcController rpc;
822
0
  ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
823
0
  ASSERT_TRUE(change_resp.has_error());
824
0
}
825
826
0
TEST_P(CDCServiceTest, TestGetCheckpoint) {
827
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
828
829
0
  std::string tablet_id;
830
0
  GetTablet(&tablet_id);
831
832
0
  GetCheckpointRequestPB req;
833
0
  GetCheckpointResponsePB resp;
834
835
0
  req.set_tablet_id(tablet_id);
836
0
  req.set_stream_id(stream_id_);
837
838
0
  {
839
0
    RpcController rpc;
840
0
    SCOPED_TRACE(req.DebugString());
841
0
    ASSERT_OK(cdc_proxy_->GetCheckpoint(req, &resp, &rpc));
842
0
    SCOPED_TRACE(resp.DebugString());
843
0
    ASSERT_FALSE(resp.has_error());
844
0
    ASSERT_EQ(resp.checkpoint().op_id().term(), 0);
845
0
    ASSERT_EQ(resp.checkpoint().op_id().index(), 0);
846
0
  }
847
0
}
848
849
class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest {
850
0
  virtual int server_count() override { return 3; }
851
0
  virtual int tablet_count() override { return 1; }
852
};
853
854
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMultipleServersOneTablet,
855
                        ::testing::Bool());
856
857
0
TEST_P(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure) {
858
  // Test that the metric value is not time since epoch after a leadership change.
859
0
  SetAtomicFlag(0, &FLAGS_cdc_state_checkpoint_update_interval_ms);
860
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
861
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = false;
862
863
0
  std::string tablet_id;
864
0
  GetTablet(&tablet_id);
865
866
0
  tserver::MiniTabletServer* leader_mini_tserver;
867
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
868
0
    leader_mini_tserver = GetLeaderForTablet(tablet_id);
869
0
    return leader_mini_tserver != nullptr;
870
0
  }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader."));
871
0
  auto timestamp_before_write = GetCurrentTimeMicros();
872
0
  ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, leader_mini_tserver->server()->proxy()));
873
0
  ASSERT_NO_FATALS(GetChanges(tablet_id, stream_id_, 0, 0));
874
0
  ASSERT_OK(leader_mini_tserver->Restart());
875
0
  leader_mini_tserver = nullptr;
876
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
877
0
    leader_mini_tserver = GetLeaderForTablet(tablet_id);
878
0
    return leader_mini_tserver != nullptr;
879
0
  }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader."));
880
881
0
  auto leader_tserver = leader_mini_tserver->server();
882
0
  auto leader_proxy = std::make_unique<CDCServiceProxy>(
883
0
      &client_->proxy_cache(),
884
0
      HostPort::FromBoundEndpoint(leader_mini_tserver->bound_rpc_addr()));
885
0
  auto cdc_service = CDCService(leader_tserver);
886
0
  cdc_service->UpdateLagMetrics();
887
0
  auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
888
0
  auto timestamp_after_write = GetCurrentTimeMicros();
889
0
  auto value = metrics->async_replication_committed_lag_micros->value();
890
0
  ASSERT_GE(value, 0);
891
0
  ASSERT_LE(value, timestamp_after_write - timestamp_before_write);
892
0
}
893
894
0
TEST_P(CDCServiceTestMultipleServersOneTablet, TestUpdateLagMetrics) {
895
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
896
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true;
897
898
0
  std::string tablet_id;
899
0
  GetTablet(&tablet_id);
900
901
  // Get the leader and a follower for the tablet.
902
0
  tserver::MiniTabletServer* leader_mini_tserver = nullptr;
903
0
  tserver::MiniTabletServer* follower_mini_tserver = nullptr;
904
905
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
906
0
    leader_mini_tserver = nullptr;
907
0
    follower_mini_tserver = nullptr;
908
0
    for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
909
0
      std::shared_ptr<tablet::TabletPeer> tablet_peer;
910
0
      Status s = cluster_->mini_tablet_server(i)->server()->tablet_manager()->
911
0
                 GetTabletPeer(tablet_id, &tablet_peer);
912
0
      if (!s.ok()) {
913
0
        continue;
914
0
      }
915
0
      if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
916
0
        leader_mini_tserver = cluster_->mini_tablet_server(i);
917
0
      } else {
918
0
        follower_mini_tserver = cluster_->mini_tablet_server(i);
919
0
      }
920
0
    }
921
0
    return leader_mini_tserver != nullptr && follower_mini_tserver != nullptr;
922
0
  }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader."));
923
924
0
  auto leader_proxy = std::make_unique<CDCServiceProxy>(
925
0
      &client_->proxy_cache(),
926
0
      HostPort::FromBoundEndpoint(leader_mini_tserver->bound_rpc_addr()));
927
928
0
  auto follower_proxy = std::make_unique<CDCServiceProxy>(
929
0
      &client_->proxy_cache(),
930
0
      HostPort::FromBoundEndpoint(follower_mini_tserver->bound_rpc_addr()));
931
932
0
  auto leader_tserver = leader_mini_tserver->server();
933
0
  auto follower_tserver = follower_mini_tserver->server();
934
  // Use proxy for to most accurately simulate normal requests.
935
0
  const auto& proxy = leader_tserver->proxy();
936
937
0
  auto cdc_service = CDCService(leader_tserver);
938
0
  auto cdc_service_follower = CDCService(follower_tserver);
939
940
  // At the start of time, assert both leader and follower at 0 lag.
941
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
942
0
    {
943
      // Leader metrics
944
0
      auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
945
0
      if (!(metrics->async_replication_sent_lag_micros->value() == 0 &&
946
0
          metrics->async_replication_committed_lag_micros->value() == 0)) {
947
0
        return false;
948
0
      }
949
0
    }
950
0
    {
951
      // Follower metrics
952
0
      auto follower_metrics =
953
0
          cdc_service_follower->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
954
0
      return follower_metrics->async_replication_sent_lag_micros->value() == 0 &&
955
0
          follower_metrics->async_replication_committed_lag_micros->value() == 0;
956
0
    }
957
0
  }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "At start, wait for Lag = 0"));
958
959
960
  // Create the in-memory structures for both follower and leader by polling for the tablet.
961
0
  GetChangesRequestPB change_req;
962
0
  GetChangesResponsePB change_resp;
963
0
  change_req.set_tablet_id(tablet_id);
964
0
  change_req.set_stream_id(stream_id_);
965
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
966
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
967
0
  {
968
0
    RpcController rpc;
969
0
    SCOPED_TRACE(change_req.DebugString());
970
0
    ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc));
971
0
    change_resp.Clear();
972
0
    rpc.Reset();
973
0
    ASSERT_OK(follower_proxy->GetChanges(change_req, &change_resp, &rpc));
974
0
  }
975
976
  // Insert test rows, one at a time so they have different hybrid times.
977
0
  tserver::WriteRequestPB write_req;
978
0
  tserver::WriteResponsePB write_resp;
979
0
  write_req.set_tablet_id(tablet_id);
980
0
  {
981
0
    RpcController rpc;
982
0
    AddTestRowInsert(1, 11, "key1", &write_req);
983
0
    SCOPED_TRACE(write_req.DebugString());
984
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
985
0
    SCOPED_TRACE(write_resp.DebugString());
986
0
    ASSERT_FALSE(write_resp.has_error());
987
0
  }
988
989
0
  {
990
0
    write_req.Clear();
991
0
    write_req.set_tablet_id(tablet_id);
992
0
    RpcController rpc;
993
0
    AddTestRowInsert(2, 22, "key2", &write_req);
994
0
    SCOPED_TRACE(write_req.DebugString());
995
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
996
0
    SCOPED_TRACE(write_resp.DebugString());
997
0
    ASSERT_FALSE(write_resp.has_error());
998
0
  }
999
1000
  // Assert that leader lag > 0.
1001
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
1002
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
1003
0
    return metrics->async_replication_sent_lag_micros->value() > 0 &&
1004
0
        metrics->async_replication_committed_lag_micros->value() > 0;
1005
0
  }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Lag > 0"));
1006
1007
0
  {
1008
    // Make sure we wait for follower update thread to run at least once.
1009
0
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_update_metrics_interval_ms));
1010
    // On the follower, we shouldn't create metrics for tablets that we're not leader for, so these
1011
    // should be 0 even if there are un-polled for records.
1012
0
    auto metrics_follower = cdc_service_follower->
1013
0
        GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
1014
0
    ASSERT_TRUE(metrics_follower->async_replication_sent_lag_micros->value() == 0 &&
1015
0
                metrics_follower->async_replication_committed_lag_micros->value() == 0);
1016
0
  }
1017
1018
0
  change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
1019
0
  change_resp.Clear();
1020
0
  {
1021
0
    RpcController rpc;
1022
0
    SCOPED_TRACE(change_req.DebugString());
1023
0
    ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc));
1024
0
  }
1025
1026
  // When we GetChanges the first time, only the read lag metric should be 0.
1027
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
1028
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
1029
0
    return metrics->async_replication_sent_lag_micros->value() == 0 &&
1030
0
        metrics->async_replication_committed_lag_micros->value() > 0;
1031
0
  }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Read Lag = 0"));
1032
1033
0
  change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
1034
0
  change_resp.Clear();
1035
0
  {
1036
0
    RpcController rpc;
1037
0
    SCOPED_TRACE(change_req.DebugString());
1038
0
    ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc));
1039
0
  }
1040
1041
  // When we GetChanges the second time, both the lag metrics should be 0.
1042
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
1043
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id});
1044
0
    return metrics->async_replication_sent_lag_micros->value() == 0 &&
1045
0
        metrics->async_replication_committed_lag_micros->value() == 0;
1046
0
  }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for All Lag = 0"));
1047
0
}
1048
1049
class CDCServiceTestMultipleServers : public CDCServiceTest {
1050
 public:
1051
0
  virtual int server_count() override { return 2; }
1052
0
  virtual int tablet_count() override { return 4; }
1053
};
1054
1055
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMultipleServers, ::testing::Bool());
1056
1057
0
TEST_P(CDCServiceTestMultipleServers, TestListTablets) {
1058
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1059
1060
0
  std::string tablet_id;
1061
0
  GetTablet(&tablet_id);
1062
1063
0
  ListTabletsRequestPB req;
1064
0
  ListTabletsResponsePB resp;
1065
1066
0
  req.set_stream_id(stream_id_);
1067
1068
0
  auto cdc_proxy_bcast_addr = cluster_->mini_tablet_server(0)->options()->broadcast_addresses[0];
1069
0
  int cdc_proxy_count = 0;
1070
1071
  // Test a simple query for all tablets.
1072
0
  {
1073
0
    RpcController rpc;
1074
0
    SCOPED_TRACE(req.DebugString());
1075
0
    ASSERT_OK(cdc_proxy_->ListTablets(req, &resp, &rpc));
1076
0
    SCOPED_TRACE(resp.DebugString());
1077
0
    ASSERT_FALSE(resp.has_error());
1078
1079
0
    ASSERT_EQ(resp.tablets_size(), tablet_count());
1080
0
    ASSERT_EQ(resp.tablets(0).tablet_id(), tablet_id);
1081
1082
0
    for (auto& tablet : resp.tablets()) {
1083
0
      auto owner_tserver = HostPort::FromPB(tablet.tservers(0).broadcast_addresses(0));
1084
0
      if (owner_tserver == cdc_proxy_bcast_addr) {
1085
0
        ++cdc_proxy_count;
1086
0
      }
1087
0
    }
1088
0
  }
1089
1090
  // Query for tablets only on the first server.  We should only get a subset.
1091
0
  {
1092
0
    req.set_local_only(true);
1093
0
    RpcController rpc;
1094
0
    SCOPED_TRACE(req.DebugString());
1095
0
    ASSERT_OK(cdc_proxy_->ListTablets(req, &resp, &rpc));
1096
0
    SCOPED_TRACE(resp.DebugString());
1097
0
    ASSERT_FALSE(resp.has_error());
1098
0
    ASSERT_EQ(resp.tablets_size(), cdc_proxy_count);
1099
0
  }
1100
0
}
1101
1102
0
TEST_P(CDCServiceTestMultipleServers, TestGetChangesProxyRouting) {
1103
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1104
1105
  // Figure out [1] all tablets and [2] which ones are local to the first server.
1106
0
  std::vector<std::string> local_tablets, all_tablets;
1107
0
  for (bool is_local : {true, false}) {
1108
0
    RpcController rpc;
1109
0
    ListTabletsRequestPB req;
1110
0
    ListTabletsResponsePB resp;
1111
0
    req.set_stream_id(stream_id_);
1112
0
    req.set_local_only(is_local);
1113
0
    ASSERT_OK(cdc_proxy_->ListTablets(req, &resp, &rpc));
1114
0
    ASSERT_FALSE(resp.has_error());
1115
0
    auto& cur_tablets = is_local ? local_tablets : all_tablets;
1116
0
    for (int i = 0; i < resp.tablets_size(); ++i) {
1117
0
      cur_tablets.push_back(resp.tablets(i).tablet_id());
1118
0
    }
1119
0
    std::sort(cur_tablets.begin(), cur_tablets.end());
1120
0
  }
1121
0
  ASSERT_LT(local_tablets.size(), all_tablets.size());
1122
0
  ASSERT_LT(0, local_tablets.size());
1123
0
  {
1124
    // Overlap between these two lists should be all the local tablets
1125
0
    std::vector<std::string> tablet_intersection;
1126
0
    std::set_intersection(local_tablets.begin(), local_tablets.end(),
1127
0
        all_tablets.begin(), all_tablets.end(),
1128
0
        std::back_inserter(tablet_intersection));
1129
0
    ASSERT_TRUE(std::equal(local_tablets.begin(), local_tablets.end(),
1130
0
        tablet_intersection.begin()));
1131
0
  }
1132
  // Difference should be all tablets on the other server.
1133
0
  std::vector<std::string> remote_tablets;
1134
0
  std::set_difference(all_tablets.begin(), all_tablets.end(),
1135
0
      local_tablets.begin(), local_tablets.end(),
1136
0
      std::back_inserter(remote_tablets));
1137
0
  ASSERT_LT(0, remote_tablets.size());
1138
0
  ASSERT_EQ(all_tablets.size() - local_tablets.size(), remote_tablets.size());
1139
1140
  // Insert test rows, equal amount per tablet.
1141
0
  int cur_row = 1;
1142
0
  int to_write = 2;
1143
0
  for (bool is_local : {true, false}) {
1144
0
    const auto& tserver = cluster_->mini_tablet_server(is_local?0:1)->server();
1145
    // Use proxy for to most accurately simulate normal requests.
1146
0
    const auto& proxy = tserver->proxy();
1147
0
    auto& cur_tablets = is_local ? local_tablets : remote_tablets;
1148
0
    for (auto& tablet_id : cur_tablets) {
1149
0
      tserver::WriteRequestPB write_req;
1150
0
      tserver::WriteResponsePB write_resp;
1151
0
      write_req.set_tablet_id(tablet_id);
1152
0
      RpcController rpc;
1153
0
      for (int i = 1; i <= to_write; ++i) {
1154
0
        AddTestRowInsert(cur_row, 11 * cur_row, "key" + std::to_string(cur_row), &write_req);
1155
0
        ++cur_row;
1156
0
      }
1157
1158
0
      SCOPED_TRACE(write_req.DebugString());
1159
0
      ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
1160
0
      SCOPED_TRACE(write_resp.DebugString());
1161
0
      ASSERT_FALSE(write_resp.has_error());
1162
0
    }
1163
0
  }
1164
1165
  // Query for all tablets on the first server. Ensure the non-local ones have errors.
1166
0
  for (bool is_local : {true, false}) {
1167
0
    auto& cur_tablets = is_local ? local_tablets : remote_tablets;
1168
0
    for (auto tablet_id : cur_tablets) {
1169
0
      std::vector<bool> proxy_options{false};
1170
      // Verify that remote tablet queries work only when proxy forwarding is enabled.
1171
0
      if (!is_local) proxy_options.push_back(true);
1172
0
      for (auto use_proxy : proxy_options) {
1173
0
        GetChangesRequestPB change_req;
1174
0
        GetChangesResponsePB change_resp;
1175
0
        change_req.set_tablet_id(tablet_id);
1176
0
        change_req.set_stream_id(stream_id_);
1177
0
        change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
1178
0
        change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
1179
0
        change_req.set_serve_as_proxy(use_proxy);
1180
0
        RpcController rpc;
1181
0
        SCOPED_TRACE(change_req.DebugString());
1182
0
        ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
1183
0
        SCOPED_TRACE(change_resp.DebugString());
1184
0
        bool should_error = !(is_local || use_proxy);
1185
0
        ASSERT_EQ(change_resp.has_error(), should_error);
1186
0
        if (!should_error) {
1187
0
          ASSERT_EQ(to_write, change_resp.records_size());
1188
0
        }
1189
0
      }
1190
0
    }
1191
0
  }
1192
1193
  // Verify the CDC metrics match what we just did.
1194
0
  const auto& tserver = cluster_->mini_tablet_server(0)->server();
1195
0
  auto cdc_service = CDCService(tserver);
1196
0
  auto server_metrics = cdc_service->GetCDCServerMetrics();
1197
0
  ASSERT_EQ(server_metrics->cdc_rpc_proxy_count->value(), remote_tablets.size());
1198
0
}
1199
1200
0
TEST_P(CDCServiceTest, TestOnlyGetLocalChanges) {
1201
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1202
1203
0
  std::string tablet_id;
1204
0
  GetTablet(&tablet_id);
1205
1206
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1207
1208
0
  {
1209
    // Insert local test rows.
1210
0
    tserver::WriteRequestPB write_req;
1211
0
    tserver::WriteResponsePB write_resp;
1212
0
    write_req.set_tablet_id(tablet_id);
1213
0
    RpcController rpc;
1214
0
    AddTestRowInsert(1, 11, "key1", &write_req);
1215
0
    AddTestRowInsert(2, 22, "key2", &write_req);
1216
1217
0
    SCOPED_TRACE(write_req.DebugString());
1218
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
1219
0
    SCOPED_TRACE(write_resp.DebugString());
1220
0
    ASSERT_FALSE(write_resp.has_error());
1221
0
  }
1222
1223
0
  {
1224
    // Insert remote test rows.
1225
0
    tserver::WriteRequestPB write_req;
1226
0
    tserver::WriteResponsePB write_resp;
1227
0
    write_req.set_tablet_id(tablet_id);
1228
    // Apply at the lowest possible hybrid time.
1229
0
    write_req.set_external_hybrid_time(yb::kInitialHybridTimeValue);
1230
1231
0
    RpcController rpc;
1232
0
    AddTestRowInsert(1, 11, "key1_ext", &write_req);
1233
0
    AddTestRowInsert(3, 33, "key3_ext", &write_req);
1234
1235
0
    SCOPED_TRACE(write_req.DebugString());
1236
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
1237
0
    SCOPED_TRACE(write_resp.DebugString());
1238
0
    ASSERT_FALSE(write_resp.has_error());
1239
0
  }
1240
1241
0
  auto CheckChangesAndTable = [&]() {
1242
    // Get CDC changes.
1243
0
    GetChangesRequestPB change_req;
1244
0
    GetChangesResponsePB change_resp;
1245
1246
0
    change_req.set_tablet_id(tablet_id);
1247
0
    change_req.set_stream_id(stream_id_);
1248
0
    change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
1249
0
    change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
1250
1251
0
    {
1252
      // Make sure only the two local test rows show up.
1253
0
      RpcController rpc;
1254
0
      SCOPED_TRACE(change_req.DebugString());
1255
0
      ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
1256
0
      SCOPED_TRACE(change_resp.DebugString());
1257
0
      ASSERT_FALSE(change_resp.has_error());
1258
0
      ASSERT_EQ(change_resp.records_size(), 2);
1259
1260
0
      std::pair<int, std::string> expected_results[2] =
1261
0
          {std::make_pair(11, "key1"), std::make_pair(22, "key2")};
1262
0
      for (int i = 0; i < change_resp.records_size(); i++) {
1263
0
        ASSERT_EQ(change_resp.records(i).operation(), CDCRecordPB::WRITE);
1264
1265
        // Check the key.
1266
0
        ASSERT_NO_FATALS(AssertIntKey(change_resp.records(i).key(), i + 1));
1267
1268
        // Check the change records.
1269
0
        ASSERT_NO_FATALS(AssertChangeRecords(change_resp.records(i).changes(),
1270
0
                                             expected_results[i].first,
1271
0
                                             expected_results[i].second));
1272
0
      }
1273
0
    }
1274
1275
    // Now, fetch the entire table and ensure that we fetch all the keys inserted.
1276
0
    client::TableHandle table;
1277
0
    EXPECT_OK(table.Open(table_.table()->name(), client_.get()));
1278
0
    auto result = ScanTableToStrings(table);
1279
0
    std::sort(result.begin(), result.end());
1280
1281
0
    ASSERT_EQ(3, result.size());
1282
1283
    // Make sure that key1 and not key1_ext shows up, since we applied key1_ext at a lower hybrid
1284
    // time.
1285
0
    ASSERT_EQ("{ int32:1, int32:11, string:\"key1\" }", result[0]);
1286
0
    ASSERT_EQ("{ int32:2, int32:22, string:\"key2\" }", result[1]);
1287
0
    ASSERT_EQ("{ int32:3, int32:33, string:\"key3_ext\" }", result[2]);
1288
0
  };
1289
1290
0
  ASSERT_NO_FATALS(CheckChangesAndTable());
1291
1292
0
  ASSERT_OK(cluster_->RestartSync());
1293
1294
0
  ASSERT_OK(WaitFor([&](){
1295
0
    std::shared_ptr<tablet::TabletPeer> tablet_peer;
1296
0
    if (!cluster_->mini_tablet_server(0)->server()->tablet_manager()->
1297
0
        LookupTablet(tablet_id, &tablet_peer)) {
1298
0
      return false;
1299
0
    }
1300
0
    return tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
1301
0
  }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader."));
1302
1303
0
  ASSERT_NO_FATALS(CheckChangesAndTable());
1304
1305
  // Cleanup stream before shutdown.
1306
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
1307
0
  VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
1308
0
}
1309
1310
0
TEST_P(CDCServiceTest, TestCheckpointUpdatedForRemoteRows) {
1311
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1312
1313
0
  std::string tablet_id;
1314
0
  GetTablet(&tablet_id);
1315
1316
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1317
1318
0
  {
1319
    // Insert remote test rows.
1320
0
    tserver::WriteRequestPB write_req;
1321
0
    tserver::WriteResponsePB write_resp;
1322
0
    write_req.set_tablet_id(tablet_id);
1323
    // Apply at the lowest possible hybrid time.
1324
0
    write_req.set_external_hybrid_time(yb::kInitialHybridTimeValue);
1325
1326
0
    RpcController rpc;
1327
0
    AddTestRowInsert(1, 11, "key1_ext", &write_req);
1328
0
    AddTestRowInsert(3, 33, "key3_ext", &write_req);
1329
1330
0
    SCOPED_TRACE(write_req.DebugString());
1331
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
1332
0
    SCOPED_TRACE(write_resp.DebugString());
1333
0
    ASSERT_FALSE(write_resp.has_error());
1334
0
  }
1335
1336
0
  auto CheckChanges = [&]() {
1337
    // Get CDC changes.
1338
0
    GetChangesRequestPB change_req;
1339
0
    GetChangesResponsePB change_resp;
1340
1341
0
    change_req.set_tablet_id(tablet_id);
1342
0
    change_req.set_stream_id(stream_id_);
1343
0
    change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
1344
0
    change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
1345
1346
0
    {
1347
      // Make sure that checkpoint is updated even when there are no CDC records.
1348
0
      RpcController rpc;
1349
0
      SCOPED_TRACE(change_req.DebugString());
1350
0
      ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
1351
0
      SCOPED_TRACE(change_resp.DebugString());
1352
0
      ASSERT_FALSE(change_resp.has_error());
1353
0
      ASSERT_EQ(change_resp.records_size(), 0);
1354
0
      ASSERT_GT(change_resp.checkpoint().op_id().index(), 0);
1355
0
    }
1356
0
  };
1357
1358
0
  ASSERT_NO_FATALS(CheckChanges());
1359
1360
  // Cleanup stream before shutdown.
1361
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
1362
0
  VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
1363
0
}
1364
1365
// Test to ensure that cdc_state table's checkpoint is updated as expected.
1366
// This also tests for #2897 to ensure that cdc_state table checkpoint is not overwritten to 0.0
1367
// in case the consumer does not send from checkpoint.
1368
0
TEST_P(CDCServiceTest, TestCheckpointUpdate) {
1369
0
  FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
1370
1371
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1372
1373
0
  std::string tablet_id;
1374
0
  GetTablet(&tablet_id);
1375
1376
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1377
1378
  // Insert test rows.
1379
0
  tserver::WriteRequestPB write_req;
1380
0
  tserver::WriteResponsePB write_resp;
1381
0
  write_req.set_tablet_id(tablet_id);
1382
0
  {
1383
0
    RpcController rpc;
1384
0
    AddTestRowInsert(1, 11, "key1", &write_req);
1385
0
    AddTestRowInsert(2, 22, "key2", &write_req);
1386
1387
0
    SCOPED_TRACE(write_req.DebugString());
1388
0
    ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc));
1389
0
    SCOPED_TRACE(write_resp.DebugString());
1390
0
    ASSERT_FALSE(write_resp.has_error());
1391
0
  }
1392
1393
  // Get CDC changes.
1394
0
  GetChangesRequestPB change_req;
1395
0
  GetChangesResponsePB change_resp;
1396
1397
0
  change_req.set_tablet_id(tablet_id);
1398
0
  change_req.set_stream_id(stream_id_);
1399
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
1400
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
1401
1402
0
  {
1403
0
    RpcController rpc;
1404
0
    SCOPED_TRACE(change_req.DebugString());
1405
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
1406
0
    SCOPED_TRACE(change_resp.DebugString());
1407
0
    ASSERT_FALSE(change_resp.has_error());
1408
0
    ASSERT_EQ(change_resp.records_size(), 2);
1409
0
  }
1410
1411
  // Call GetChanges again and pass in checkpoint that producer can mark as committed.
1412
0
  change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
1413
0
  change_resp.Clear();
1414
0
  {
1415
0
    RpcController rpc;
1416
0
    SCOPED_TRACE(change_req.DebugString());
1417
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
1418
0
    SCOPED_TRACE(change_resp.DebugString());
1419
0
    ASSERT_FALSE(change_resp.has_error());
1420
    // No more changes, so 0 records should be received.
1421
0
    ASSERT_EQ(change_resp.records_size(), 0);
1422
0
  }
1423
1424
  // Verify that cdc_state table has correct checkpoint.
1425
0
  ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get()));
1426
1427
  // Call GetChanges again but without any from checkpoint.
1428
0
  change_req.Clear();
1429
0
  change_req.set_tablet_id(tablet_id);
1430
0
  change_req.set_stream_id(stream_id_);
1431
0
  change_resp.Clear();
1432
0
  {
1433
0
    RpcController rpc;
1434
0
    SCOPED_TRACE(change_req.DebugString());
1435
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
1436
0
    SCOPED_TRACE(change_resp.DebugString());
1437
0
    ASSERT_FALSE(change_resp.has_error());
1438
    // Verify that producer uses the "from_checkpoint" from cdc_state table and does not send back
1439
    // any records.
1440
0
    ASSERT_EQ(change_resp.records_size(), 0);
1441
0
  }
1442
1443
  // Verify that cdc_state table's checkpoint is unaffected.
1444
0
  ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get()));
1445
1446
  // Cleanup stream before shutdown.
1447
0
  ASSERT_OK(client_->DeleteCDCStream(stream_id_));
1448
0
  VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id);
1449
0
}
1450
1451
namespace {
1452
void WaitForCDCIndex(const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
1453
                     int64_t expected_index,
1454
0
                     int timeout_secs) {
1455
0
  LOG(INFO) << "Waiting until index equals " << expected_index
1456
0
            << ". Timeout: " << timeout_secs;
1457
0
  ASSERT_OK(WaitFor([&](){
1458
0
    if (tablet_peer->log_available() &&
1459
0
        tablet_peer->log()->cdc_min_replicated_index() == expected_index &&
1460
0
        tablet_peer->tablet_metadata()->cdc_min_replicated_index() == expected_index) {
1461
0
      return true;
1462
0
    }
1463
0
    return false;
1464
0
  }, MonoDelta::FromSeconds(timeout_secs) * kTimeMultiplier,
1465
0
      "Wait until cdc min replicated index."));
1466
0
  LOG(INFO) << "Done waiting";
1467
0
}
1468
} // namespace
1469
1470
class CDCServiceTestMaxRentionTime : public CDCServiceTest {
1471
 public:
1472
0
  void SetUp() override {
1473
    // Immediately write any index provided by a GetChanges request to cdc_state table.
1474
0
    FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
1475
0
    FLAGS_log_min_segments_to_retain = 1;
1476
0
    FLAGS_log_min_seconds_to_retain = 1;
1477
0
    FLAGS_cdc_wal_retention_time_secs = 1;
1478
0
    FLAGS_enable_log_retention_by_op_idx = true;
1479
0
    FLAGS_log_max_seconds_to_retain = kMaxSecondsToRetain;
1480
0
    FLAGS_TEST_record_segments_violate_max_time_policy = true;
1481
0
    FLAGS_update_min_cdc_indices_interval_secs = 1;
1482
1483
    // This will rollover log segments a lot faster.
1484
0
    FLAGS_log_segment_size_bytes = 100;
1485
0
    CDCServiceTest::SetUp();
1486
0
  }
1487
  const int kMaxSecondsToRetain = 30;
1488
};
1489
1490
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMaxRentionTime, ::testing::Bool());
1491
1492
0
TEST_P(CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime) {
1493
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1494
1495
0
  std::string tablet_id;
1496
0
  GetTablet(&tablet_id);
1497
1498
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1499
1500
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1501
0
  ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id,
1502
0
      &tablet_peer));
1503
1504
  // Write a row so that the next GetChanges request doesn't fail.
1505
0
  WriteTestRow(0, 10, "key0", tablet_id, proxy);
1506
1507
  // Get CDC changes.
1508
0
  GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0);
1509
1510
0
  WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1511
1512
0
  MonoTime start = MonoTime::Now();
1513
  // Write a lot more data to generate many log files that can be GCed. This should take less
1514
  // than kMaxSecondsToRetain for the next check to succeed.
1515
0
  for (int i = 1; i <= 100; i++) {
1516
0
    WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy);
1517
0
  }
1518
0
  MonoDelta elapsed = MonoTime::Now().GetDeltaSince(start);
1519
0
  ASSERT_LT(elapsed.ToSeconds(), kMaxSecondsToRetain);
1520
0
  MonoDelta time_to_sleep = MonoDelta::FromSeconds(kMaxSecondsToRetain + 10) - elapsed;
1521
1522
  // Since we haven't updated the minimum cdc index, and the elapsed time is less than
1523
  // kMaxSecondsToRetain, no log files should be returned.
1524
0
  log::SegmentSequence segment_sequence;
1525
0
  ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
1526
0
                                                        &segment_sequence));
1527
0
  ASSERT_EQ(segment_sequence.size(), 0);
1528
0
  LOG(INFO) << "No segments to be GCed because less than " << kMaxSecondsToRetain
1529
0
            << " seconds have elapsed";
1530
1531
0
  SleepFor(time_to_sleep);
1532
1533
0
  ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
1534
0
                                                              &segment_sequence));
1535
0
  ASSERT_GT(segment_sequence.size(), 0);
1536
0
  ASSERT_EQ(segment_sequence.size(),
1537
0
            tablet_peer->log()->reader_->segments_violate_max_time_policy_->size());
1538
1539
0
  for (size_t i = 0; i < segment_sequence.size(); i++) {
1540
0
    ASSERT_EQ(segment_sequence[i]->path(),
1541
0
              (*tablet_peer->log()->reader_->segments_violate_max_time_policy_)[i]->path());
1542
0
    LOG(INFO) << "Segment " << segment_sequence[i]->path() << " to be GCed";
1543
0
  }
1544
0
}
1545
1546
class CDCServiceTestDurableMinReplicatedIndex : public CDCServiceTest {
1547
 public:
1548
0
  void SetUp() override {
1549
    // Immediately write any index provided by a GetChanges request to cdc_state table.
1550
0
    FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
1551
0
    FLAGS_update_min_cdc_indices_interval_secs = 1;
1552
0
    FLAGS_enable_log_retention_by_op_idx = true;
1553
0
    CDCServiceTest::SetUp();
1554
0
  }
1555
};
1556
1557
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestDurableMinReplicatedIndex,
1558
                        ::testing::Bool());
1559
1560
0
TEST_P(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) {
1561
0
  constexpr int kNRows = 100;
1562
1563
0
  std::string tablet_id;
1564
0
  GetTablet(&tablet_id);
1565
1566
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1567
0
  for (int i = 0; i < kNRows; i++) {
1568
0
    WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy);
1569
0
  }
1570
1571
0
  BootstrapProducerRequestPB req;
1572
0
  BootstrapProducerResponsePB resp;
1573
0
  req.add_table_ids(table_.table()->id());
1574
0
  rpc::RpcController rpc;
1575
0
  ASSERT_OK(cdc_proxy_->BootstrapProducer(req, &resp, &rpc));
1576
0
  ASSERT_FALSE(resp.has_error());
1577
1578
0
  ASSERT_EQ(resp.cdc_bootstrap_ids().size(), 1);
1579
1580
0
  string bootstrap_id = resp.cdc_bootstrap_ids(0);
1581
1582
  // Verify that for each of the table's tablets, a new row in cdc_state table with the returned
1583
  // id was inserted.
1584
0
  client::TableHandle table;
1585
0
  client::YBTableName cdc_state_table(
1586
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
1587
0
  ASSERT_OK(table.Open(cdc_state_table, client_.get()));
1588
0
  ASSERT_EQ(1, boost::size(client::TableRange(table)));
1589
0
  int nrows = 0;
1590
0
  for (const auto& row : client::TableRange(table)) {
1591
0
    nrows++;
1592
0
    stream_id_ = row.column(master::kCdcStreamIdIdx).string_value();
1593
0
    ASSERT_EQ(stream_id_, bootstrap_id);
1594
1595
0
    string checkpoint = row.column(master::kCdcCheckpointIdx).string_value();
1596
0
    auto s = OpId::FromString(checkpoint);
1597
0
    ASSERT_OK(s);
1598
0
    OpId op_id = *s;
1599
    // When no writes are present, the checkpoint's index is 1. Plus one for the ALTER WAL RETENTION
1600
    // TIME that we issue when cdc is enabled on a table.
1601
0
    ASSERT_EQ(op_id.index, 2 + kNRows);
1602
0
  }
1603
1604
  // This table only has one tablet.
1605
0
  ASSERT_EQ(nrows, 1);
1606
1607
  // Ensure that cdc_min_replicated_index is set to the correct value after Bootstrap.
1608
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1609
0
  ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id,
1610
0
              &tablet_peer));
1611
1612
0
  auto latest_opid = tablet_peer->log()->GetLatestEntryOpId();
1613
0
  WaitForCDCIndex(tablet_peer, latest_opid.index, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1614
0
}
1615
1616
0
TEST_P(CDCServiceTestDurableMinReplicatedIndex, TestLogCDCMinReplicatedIndexIsDurable) {
1617
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1618
1619
0
  std::string tablet_id;
1620
0
  GetTablet(&tablet_id);
1621
1622
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1623
1624
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1625
0
  ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id,
1626
0
      &tablet_peer));
1627
  // Write a row so that the next GetChanges request doesn't fail.
1628
0
  WriteTestRow(0, 10, "key0", tablet_id, proxy);
1629
1630
  // Get CDC changes.
1631
0
  GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 10);
1632
1633
0
  WaitForCDCIndex(tablet_peer, 10, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1634
1635
  // Restart the entire cluster to verify that the CDC tablet metadata got loaded from disk.
1636
0
  ASSERT_OK(cluster_->RestartSync());
1637
1638
0
  ASSERT_OK(WaitFor([&](){
1639
0
    if (cluster_->mini_tablet_server(0)->server()->tablet_manager()->
1640
0
            LookupTablet(tablet_id, &tablet_peer)) {
1641
0
      if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY &&
1642
0
          tablet_peer->log() != nullptr) {
1643
0
        LOG(INFO) << "TServer is ready ";
1644
0
        return true;
1645
0
      }
1646
0
    }
1647
0
    return false;
1648
0
  }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader."));
1649
1650
  // Verify the log and meta min replicated index was loaded correctly from disk.
1651
0
  ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), 10);
1652
0
  ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), 10);
1653
0
}
1654
1655
class CDCServiceTestMinSpace : public CDCServiceTest {
1656
 public:
1657
0
  void SetUp() override {
1658
0
    FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
1659
0
    FLAGS_log_min_segments_to_retain = 1;
1660
0
    FLAGS_log_min_seconds_to_retain = 1;
1661
0
    FLAGS_cdc_wal_retention_time_secs = 1;
1662
0
    FLAGS_enable_log_retention_by_op_idx = true;
1663
    // We want the logs to be GCed because of space, not because they exceeded the maximum time to
1664
    // be retained.
1665
0
    FLAGS_log_max_seconds_to_retain = 10 * 3600; // 10 hours.
1666
0
    FLAGS_log_stop_retaining_min_disk_mb = 1;
1667
0
    FLAGS_TEST_record_segments_violate_min_space_policy = true;
1668
1669
    // This will rollover log segments a lot faster.
1670
0
    FLAGS_log_segment_size_bytes = 500;
1671
0
    CDCServiceTest::SetUp();
1672
0
  }
1673
};
1674
1675
0
TEST_P(CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace) {
1676
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1677
1678
0
  std::string tablet_id;
1679
0
  GetTablet(&tablet_id);
1680
1681
0
  const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1682
1683
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1684
0
  ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id,
1685
0
      &tablet_peer));
1686
  // Write a row so that the next GetChanges request doesn't fail.
1687
0
  WriteTestRow(0, 10, "key0", tablet_id, proxy);
1688
1689
  // Get CDC changes.
1690
0
  GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0);
1691
1692
0
  WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1693
1694
  // Write a lot more data to generate many log files that can be GCed. This should take less
1695
  // than kMaxSecondsToRetain for the next check to succeed.
1696
0
  for (int i = 1; i <= 5000; i++) {
1697
0
    WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy);
1698
0
  }
1699
1700
0
  log::SegmentSequence segment_sequence;
1701
0
  ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
1702
0
                                                        &segment_sequence));
1703
0
  ASSERT_EQ(segment_sequence.size(), 0);
1704
1705
0
  FLAGS_TEST_simulate_free_space_bytes = 128;
1706
1707
0
  ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
1708
0
                                                        &segment_sequence));
1709
0
  ASSERT_GT(segment_sequence.size(), 0);
1710
0
  ASSERT_EQ(segment_sequence.size(),
1711
0
            tablet_peer->log()->reader_->segments_violate_min_space_policy_->size());
1712
1713
0
  for (size_t i = 0; i < segment_sequence.size(); i++) {
1714
0
    ASSERT_EQ(segment_sequence[i]->path(),
1715
0
              (*tablet_peer->log()->reader_->segments_violate_min_space_policy_)[i]->path());
1716
0
    LOG(INFO) << "Segment " << segment_sequence[i]->path() << " to be GCed";
1717
0
  }
1718
1719
0
  int32_t num_gced(0);
1720
0
  ASSERT_OK(tablet_peer->log()->GC(std::numeric_limits<int64_t>::max(), &num_gced));
1721
0
  ASSERT_EQ(num_gced, segment_sequence.size());
1722
1723
  // Read from 0.0.  This should start reading from the beginning of the logs.
1724
0
  GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0);
1725
0
}
1726
1727
class CDCLogAndMetaIndex : public CDCServiceTest {
1728
 public:
1729
0
  void SetUp() override {
1730
    // Immediately write any index provided by a GetChanges request to cdc_state table.
1731
0
    FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
1732
0
    FLAGS_update_min_cdc_indices_interval_secs = 1;
1733
0
    FLAGS_cdc_min_replicated_index_considered_stale_secs = 5;
1734
0
    FLAGS_enable_log_retention_by_op_idx = true;
1735
0
    CDCServiceTest::SetUp();
1736
0
  }
1737
};
1738
1739
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCLogAndMetaIndex, ::testing::Bool());
1740
1741
0
TEST_P(CDCLogAndMetaIndex, TestLogAndMetaCdcIndex) {
1742
0
  constexpr int kNStreams = 5;
1743
1744
  // This will rollover log segments a lot faster.
1745
0
  FLAGS_log_segment_size_bytes = 100;
1746
1747
0
  CDCStreamId stream_id[kNStreams];
1748
1749
0
  for (int i = 0; i < kNStreams; i++) {
1750
0
    CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id[i]);
1751
0
  }
1752
1753
0
  std::string tablet_id;
1754
0
  GetTablet(&tablet_id);
1755
1756
0
  const auto &proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1757
1758
  // Insert test rows.
1759
0
  for (int i = 1; i <= kNStreams; i++) {
1760
0
    WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy);
1761
0
  }
1762
1763
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1764
0
  ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id,
1765
0
      &tablet_peer));
1766
1767
  // Before any cdc request, the min index should be max value.
1768
0
  ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64_t>::max());
1769
0
  ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(),
1770
0
            std::numeric_limits<int64_t>::max());
1771
1772
0
  for (int i = 0; i < kNStreams; i++) {
1773
    // Get CDC changes.
1774
0
    GetChanges(tablet_id, stream_id[i], /* term */ 0, /* index */ i);
1775
0
  }
1776
1777
  // After the request succeeded, verify that the min cdc limit was set correctly. In this case
1778
  // it belongs to stream_id[0] with index 0.
1779
0
  WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1780
1781
  // Changing the lowest index from all the streams should also be reflected in the log object.
1782
0
  GetChanges(tablet_id, stream_id[0], /* term */ 0, /* index */ 4);
1783
1784
  // After the request succeeded, verify that the min cdc limit was set correctly. In this case
1785
  // it belongs to stream_id[1] with index 1.
1786
0
  WaitForCDCIndex(tablet_peer, 1, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1787
1788
0
  for (int i = 0; i < kNStreams; i++) {
1789
0
    ASSERT_OK(client_->DeleteCDCStream(stream_id[i],
1790
0
                                       true /*force_delete*/,
1791
0
                                       true /*ignore_errors*/));
1792
0
  }
1793
0
}
1794
1795
class CDCLogAndMetaIndexReset : public CDCLogAndMetaIndex {
1796
 public:
1797
0
  void SetUp() override {
1798
0
    FLAGS_cdc_min_replicated_index_considered_stale_secs = 5;
1799
    // This will rollover log segments a lot faster.
1800
0
    FLAGS_log_segment_size_bytes = 100;
1801
0
    CDCLogAndMetaIndex::SetUp();
1802
0
  }
1803
};
1804
1805
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCLogAndMetaIndexReset, ::testing::Bool());
1806
1807
// Test that when all the streams for a specific tablet have been deleted, the log and meta
1808
// cdc min replicated index is reset to max int64.
1809
0
TEST_P(CDCLogAndMetaIndexReset, TestLogAndMetaCdcIndexAreReset) {
1810
0
  constexpr int kNStreams = 5;
1811
1812
  // This will rollover log segments a lot faster.
1813
0
  FLAGS_log_segment_size_bytes = 100;
1814
1815
0
  CDCStreamId stream_id[kNStreams];
1816
1817
0
  for (int i = 0; i < kNStreams; i++) {
1818
0
    CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id[i]);
1819
0
  }
1820
1821
0
  std::string tablet_id;
1822
0
  GetTablet(&tablet_id);
1823
1824
0
  const auto &proxy = cluster_->mini_tablet_server(0)->server()->proxy();
1825
1826
  // Insert test rows.
1827
0
  for (int i = 1; i <= kNStreams; i++) {
1828
0
    WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy);
1829
0
  }
1830
1831
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1832
0
  ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id,
1833
0
      &tablet_peer));
1834
1835
  // Before any cdc request, the min index should be max value.
1836
0
  ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64_t>::max());
1837
0
  ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(),
1838
0
            std::numeric_limits<int64_t>::max());
1839
1840
1841
0
  for (int i = 0; i < kNStreams; i++) {
1842
    // Get CDC changes.
1843
0
    GetChanges(tablet_id, stream_id[i], /* term */ 0, /* index */ 5);
1844
0
  }
1845
1846
  // After the request succeeded, verify that the min cdc limit was set correctly. In this case
1847
  // all the streams have index 5.
1848
0
  WaitForCDCIndex(tablet_peer, 5, 4 * FLAGS_update_min_cdc_indices_interval_secs);
1849
1850
0
  client::TableHandle table;
1851
0
  client::YBTableName cdc_state_table(
1852
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
1853
0
  ASSERT_OK(table.Open(cdc_state_table, client_.get()));
1854
1855
0
  auto session = client_->NewSession();
1856
0
  for (int i = 0; i < kNStreams; i++) {
1857
0
    const auto delete_op = table.NewDeleteOp();
1858
0
    auto* delete_req = delete_op->mutable_request();
1859
0
    QLAddStringHashValue(delete_req, tablet_id);
1860
0
    QLAddStringRangeValue(delete_req, stream_id[i]);
1861
0
    session->Apply(delete_op);
1862
0
  }
1863
0
  ASSERT_OK(session->Flush());
1864
0
  LOG(INFO) << "Successfully deleted all streams from cdc_state";
1865
1866
0
  SleepFor(MonoDelta::FromSeconds(FLAGS_cdc_min_replicated_index_considered_stale_secs + 1));
1867
1868
0
  LOG(INFO) << "Done sleeping";
1869
  // RunLogGC should reset cdc min replicated index to max int64 because more than
1870
  // FLAGS_cdc_min_replicated_index_considered_stale_secs seconds have elapsed since the index
1871
  // was last updated.
1872
0
  ASSERT_OK(tablet_peer->RunLogGC());
1873
0
  LOG(INFO) << "GC done running";
1874
0
  ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64_t>::max());
1875
0
  ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(),
1876
0
      std::numeric_limits<int64_t>::max());
1877
1878
0
  for (int i = 0; i < kNStreams; i++) {
1879
0
    ASSERT_OK(client_->DeleteCDCStream(stream_id[i],
1880
0
                                       true /*force_delete*/,
1881
0
                                       true /*ignore_errors*/));
1882
0
  }
1883
0
}
1884
1885
class CDCServiceTestThreeServers : public CDCServiceTest {
1886
 public:
1887
0
  void SetUp() override {
1888
    // We don't want the tablets to move in the middle of the test.
1889
0
    FLAGS_enable_load_balancing = false;
1890
0
    FLAGS_leader_failure_max_missed_heartbeat_periods = 12.0;
1891
0
    FLAGS_update_min_cdc_indices_interval_secs = 5;
1892
0
    FLAGS_enable_log_retention_by_op_idx = true;
1893
0
    FLAGS_client_read_write_timeout_ms = 20 * 1000 * kTimeMultiplier;
1894
1895
    // Always update cdc_state table.
1896
0
    FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
1897
1898
0
    FLAGS_follower_unavailable_considered_failed_sec = 20 * kTimeMultiplier;
1899
1900
0
    CDCServiceTest::SetUp();
1901
0
  }
1902
1903
0
  void DoTearDown() override {
1904
0
    YBMiniClusterTestBase::DoTearDown();
1905
0
  }
1906
1907
0
  virtual int server_count() override { return 3; }
1908
0
  virtual int tablet_count() override { return 3; }
1909
1910
  // Get the first tablet_id for which any peer is a leader.
1911
  void GetFirstTabletIdAndLeaderPeer(TabletId* tablet_id, ssize_t* leader_idx, int timeout_secs);
1912
};
1913
1914
INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestThreeServers, ::testing::Bool());
1915
1916
// Sometimes leadership takes a while. Keep retrying until timeout_secs seconds have elapsed.
1917
void CDCServiceTestThreeServers::GetFirstTabletIdAndLeaderPeer(TabletId* tablet_id,
1918
                                                               ssize_t* leader_idx,
1919
0
                                                               int timeout_secs) {
1920
0
  std::vector<TabletId> tablet_ids;
1921
  // Verify that we are only returning a tablet that belongs to the table created for this test.
1922
0
  GetTablets(&tablet_ids);
1923
0
  ASSERT_EQ(tablet_ids.size(), tablet_count());
1924
1925
0
  MonoTime now = MonoTime::Now();
1926
0
  MonoTime deadline = now + MonoDelta::FromSeconds(timeout_secs);
1927
0
  while(now.ComesBefore(deadline) && (!tablet_id || tablet_id->empty())) {
1928
0
    for (size_t idx = 0; idx < cluster_->num_tablet_servers(); idx++) {
1929
0
      auto peers = cluster_->mini_tablet_server(idx)->server()->tablet_manager()->GetTabletPeers();
1930
0
      ASSERT_GT(peers.size(), 0);
1931
1932
0
      for (const auto &peer : peers) {
1933
0
        auto it = std::find(tablet_ids.begin(), tablet_ids.end(), peer->tablet_id());
1934
0
        if (it != tablet_ids.end() &&
1935
0
            peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
1936
0
          *tablet_id = peer->tablet_id();
1937
0
          *leader_idx = idx;
1938
0
          LOG(INFO) << "Selected tablet " << tablet_id << " for tablet server " << idx;
1939
0
          break;
1940
0
        }
1941
0
      }
1942
0
    }
1943
0
    now = MonoTime::Now();
1944
0
  }
1945
0
}
1946
1947
1948
// Test that whenever a leader change happens (forced here by shutting down the tablet leader),
1949
// next leader correctly reads the minimum applied cdc index by reading the cdc_state table.
1950
0
TEST_P(CDCServiceTestThreeServers, TestNewLeaderUpdatesLogCDCAppliedIndex) {
1951
0
  constexpr int kNRecords = 30;
1952
0
  constexpr int kGettingLeaderTimeoutSecs = 20;
1953
1954
0
  TabletId tablet_id;
1955
  // Index of the TS that is the leader for the selected tablet_id.
1956
0
  ssize_t leader_idx = -1;
1957
1958
0
  GetFirstTabletIdAndLeaderPeer(&tablet_id, &leader_idx, kGettingLeaderTimeoutSecs);
1959
0
  ASSERT_FALSE(tablet_id.empty());
1960
0
  ASSERT_GE(leader_idx, 0);
1961
1962
0
  const auto &proxy = cluster_->mini_tablet_server(leader_idx)->server()->proxy();
1963
0
  for (int i = 0; i < kNRecords; i++) {
1964
0
    WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy);
1965
0
  }
1966
0
  LOG(INFO) << "Inserted " << kNRecords << " records";
1967
1968
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_);
1969
0
  LOG(INFO) << "Created cdc stream " << stream_id_;
1970
1971
0
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
1972
  // Check that the index hasn't been updated in any of the peers.
1973
0
  for (int idx = 0; idx < server_count(); idx++) {
1974
0
    if (cluster_->mini_tablet_server(idx)->server()->tablet_manager()->
1975
0
        LookupTablet(tablet_id, &tablet_peer)) {
1976
0
      ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64>::max());
1977
0
      ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(),
1978
0
                std::numeric_limits<int64>::max());
1979
1980
0
    }
1981
0
  }
1982
1983
  // Kill the tablet leader tserver so that another tserver becomes the leader.
1984
0
  cluster_->mini_tablet_server(leader_idx)->Shutdown();
1985
0
  LOG(INFO) << "tserver " << leader_idx << " was shutdown";
1986
1987
  // CDC Proxy is pinned to the first TServer, so we need to update the proxy if we kill that one.
1988
0
  if (leader_idx == 0) {
1989
0
    cdc_proxy_ = std::make_unique<CDCServiceProxy>(
1990
0
        &client_->proxy_cache(),
1991
0
        HostPort::FromBoundEndpoint(cluster_->mini_tablet_server(1)->bound_rpc_addr()));
1992
0
  }
1993
1994
  // Wait until GetChanges doesn't return any errors. This means that we are able to write to
1995
  // the cdc_state table.
1996
0
  ASSERT_OK(WaitFor([&](){
1997
0
    bool has_error = false;
1998
0
    GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 5, &has_error);
1999
0
    return !has_error;
2000
0
  }, MonoDelta::FromSeconds(180) * kTimeMultiplier, "Wait until cdc state table can take writes."));
2001
2002
0
  std::unique_ptr<CDCServiceProxy> cdc_proxy;
2003
0
  ASSERT_OK(WaitFor([&](){
2004
0
    for (int idx = 0; idx < server_count(); idx++) {
2005
0
      if (idx == leader_idx) {
2006
        // This TServer is shutdown for now.
2007
0
        continue;
2008
0
      }
2009
0
      if (cluster_->mini_tablet_server(idx)->server()->tablet_manager()->
2010
0
          LookupTablet(tablet_id, &tablet_peer)) {
2011
0
        if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
2012
0
          LOG(INFO) << "Found new leader for tablet " << tablet_id << " in TS " << idx;
2013
0
          return true;
2014
0
        }
2015
0
      }
2016
0
    }
2017
0
    return false;
2018
0
  }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader."));
2019
2020
0
  SleepFor(MonoDelta::FromSeconds((FLAGS_update_min_cdc_indices_interval_secs * 3)));
2021
0
  LOG(INFO) << "Done sleeping";
2022
2023
0
  ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), 5);
2024
0
  ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), 5);
2025
2026
0
  ASSERT_OK(cluster_->mini_tablet_server(leader_idx)->Start());
2027
0
  ASSERT_OK(cluster_->mini_tablet_server(leader_idx)->WaitStarted());
2028
0
}
2029
2030
} // namespace cdc
2031
} // namespace yb