/Users/deen/code/yugabyte-db/src/yb/integration-tests/cdc_test_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/integration-tests/cdc_test_util.h" |
15 | | |
16 | | #include <gtest/gtest.h> |
17 | | |
18 | | #include "yb/consensus/log.h" |
19 | | |
20 | | #include "yb/rpc/rpc_controller.h" |
21 | | |
22 | | #include "yb/tablet/tablet_metadata.h" |
23 | | #include "yb/tablet/tablet_peer.h" |
24 | | |
25 | | #include "yb/tserver/mini_tablet_server.h" |
26 | | #include "yb/tserver/tablet_server.h" |
27 | | #include "yb/tserver/ts_tablet_manager.h" |
28 | | |
29 | | #include "yb/util/result.h" |
30 | | #include "yb/util/test_macros.h" |
31 | | #include "yb/util/test_util.h" |
32 | | |
33 | | namespace yb { |
34 | | namespace cdc { |
35 | | |
36 | | using yb::MiniCluster; |
37 | | |
38 | | void AssertIntKey(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& key, |
39 | 0 | int32_t value) { |
40 | 0 | ASSERT_EQ(key.size(), 1); |
41 | 0 | ASSERT_EQ(key[0].key(), "key"); |
42 | 0 | ASSERT_EQ(key[0].value().int32_value(), value); |
43 | 0 | } |
44 | | |
45 | | void CreateCDCStream(const std::unique_ptr<CDCServiceProxy>& cdc_proxy, |
46 | | const TableId& table_id, |
47 | | CDCStreamId* stream_id, |
48 | 0 | cdc::CDCRequestSource source_type) { |
49 | 0 | CreateCDCStreamRequestPB req; |
50 | 0 | CreateCDCStreamResponsePB resp; |
51 | 0 | req.set_table_id(table_id); |
52 | 0 | req.set_source_type(source_type); |
53 | |
|
54 | 0 | rpc::RpcController rpc; |
55 | 0 | ASSERT_OK(cdc_proxy->CreateCDCStream(req, &resp, &rpc)); |
56 | 0 | ASSERT_FALSE(resp.has_error()); |
57 | | |
58 | 0 | if (stream_id) { |
59 | 0 | *stream_id = resp.stream_id(); |
60 | 0 | } |
61 | 0 | } |
62 | | |
63 | | void WaitUntilWalRetentionSecs(std::function<int()> get_wal_retention_secs, |
64 | | uint32_t expected_wal_retention_secs, |
65 | 0 | const TableName& table_name) { |
66 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
67 | 0 | uint32_t wal_retention_secs = get_wal_retention_secs(); |
68 | 0 | if (wal_retention_secs == expected_wal_retention_secs) { |
69 | 0 | return true; |
70 | 0 | } else { |
71 | 0 | LOG(INFO) << "wal_retention_secs " << wal_retention_secs |
72 | 0 | << " doesn't match expected " << expected_wal_retention_secs |
73 | 0 | << " for table " << table_name; |
74 | 0 | return false; |
75 | 0 | } |
76 | 0 | }, MonoDelta::FromSeconds(20), "Verify wal retention set on Producer.")); |
77 | 0 | } |
78 | | |
79 | | void VerifyWalRetentionTime(MiniCluster* cluster, |
80 | | const std::string& table_name_start, |
81 | 0 | uint32_t expected_wal_retention_secs) { |
82 | 0 | int ntablets_checked = 0; |
83 | 0 | for (const auto& mini_tserver : cluster->mini_tablet_servers()) { |
84 | 0 | auto peers = mini_tserver->server()->tablet_manager()->GetTabletPeers(); |
85 | 0 | for (const auto& peer : peers) { |
86 | 0 | const std::string& table_name = peer->tablet_metadata()->table_name(); |
87 | 0 | if (table_name.substr(0, table_name_start.length()) == table_name_start) { |
88 | 0 | auto table_id = peer->tablet_metadata()->table_id(); |
89 | 0 | WaitUntilWalRetentionSecs([&peer]() { return peer->log()->wal_retention_secs(); }, |
90 | 0 | expected_wal_retention_secs, table_name); |
91 | 0 | WaitUntilWalRetentionSecs( |
92 | 0 | [&peer]() { return peer->tablet_metadata()->wal_retention_secs(); }, |
93 | 0 | expected_wal_retention_secs, table_name); |
94 | 0 | ntablets_checked++; |
95 | 0 | } |
96 | 0 | } |
97 | 0 | } |
98 | 0 | ASSERT_GT(ntablets_checked, 0); |
99 | 0 | } |
100 | | |
101 | | } // namespace cdc |
102 | | } // namespace yb |