YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cdc_test_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/integration-tests/cdc_test_util.h"
15
16
#include <gtest/gtest.h>
17
18
#include "yb/consensus/log.h"
19
20
#include "yb/rpc/rpc_controller.h"
21
22
#include "yb/tablet/tablet_metadata.h"
23
#include "yb/tablet/tablet_peer.h"
24
25
#include "yb/tserver/mini_tablet_server.h"
26
#include "yb/tserver/tablet_server.h"
27
#include "yb/tserver/ts_tablet_manager.h"
28
29
#include "yb/util/result.h"
30
#include "yb/util/test_macros.h"
31
#include "yb/util/test_util.h"
32
33
namespace yb {
34
namespace cdc {
35
36
using yb::MiniCluster;
37
38
void AssertIntKey(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& key,
39
                  int32_t value) {
40
  ASSERT_EQ(key.size(), 1);
41
  ASSERT_EQ(key[0].key(), "key");
42
  ASSERT_EQ(key[0].value().int32_value(), value);
43
}
44
45
void CreateCDCStream(const std::unique_ptr<CDCServiceProxy>& cdc_proxy,
46
                     const TableId& table_id,
47
                     CDCStreamId* stream_id,
48
0
                     cdc::CDCRequestSource source_type) {
49
0
  CreateCDCStreamRequestPB req;
50
0
  CreateCDCStreamResponsePB resp;
51
0
  req.set_table_id(table_id);
52
0
  req.set_source_type(source_type);
53
54
0
  rpc::RpcController rpc;
55
0
  ASSERT_OK(cdc_proxy->CreateCDCStream(req, &resp, &rpc));
56
0
  ASSERT_FALSE(resp.has_error());
57
58
0
  if (stream_id) {
59
0
    *stream_id = resp.stream_id();
60
0
  }
61
0
}
62
63
void WaitUntilWalRetentionSecs(std::function<int()> get_wal_retention_secs,
64
                               uint32_t expected_wal_retention_secs,
65
0
                               const TableName& table_name) {
66
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
67
0
    uint32_t wal_retention_secs = get_wal_retention_secs();
68
0
    if (wal_retention_secs == expected_wal_retention_secs) {
69
0
      return true;
70
0
    } else {
71
0
      LOG(INFO) << "wal_retention_secs " << wal_retention_secs
72
0
                << " doesn't match expected " << expected_wal_retention_secs
73
0
                << " for table " << table_name;
74
0
      return false;
75
0
    }
76
0
  }, MonoDelta::FromSeconds(20), "Verify wal retention set on Producer."));
77
0
}
78
79
void VerifyWalRetentionTime(MiniCluster* cluster,
80
                            const std::string& table_name_start,
81
0
                            uint32_t expected_wal_retention_secs) {
82
0
  int ntablets_checked = 0;
83
0
  for (const auto& mini_tserver : cluster->mini_tablet_servers()) {
84
0
    auto peers = mini_tserver->server()->tablet_manager()->GetTabletPeers();
85
0
    for (const auto& peer : peers) {
86
0
      const std::string& table_name = peer->tablet_metadata()->table_name();
87
0
      if (table_name.substr(0, table_name_start.length()) == table_name_start) {
88
0
        auto table_id = peer->tablet_metadata()->table_id();
89
0
        WaitUntilWalRetentionSecs([&peer]() { return peer->log()->wal_retention_secs(); },
90
0
            expected_wal_retention_secs, table_name);
91
0
        WaitUntilWalRetentionSecs(
92
0
            [&peer]() { return peer->tablet_metadata()->wal_retention_secs(); },
93
0
            expected_wal_retention_secs, table_name);
94
0
        ntablets_checked++;
95
0
      }
96
0
    }
97
0
  }
98
0
  ASSERT_GT(ntablets_checked, 0);
99
0
}
100
101
} // namespace cdc
102
} // namespace yb