/Users/deen/code/yugabyte-db/ent/src/yb/master/cdc_consumer_registry_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/cdc_consumer_registry_service.h" |
15 | | |
16 | | #include "yb/master/catalog_entity_info.h" |
17 | | #include "yb/master/cdc_rpc_tasks.h" |
18 | | #include "yb/master/master_client.pb.h" |
19 | | #include "yb/master/master_ddl.pb.h" |
20 | | #include "yb/master/master_util.h" |
21 | | |
22 | | #include "yb/cdc/cdc_consumer.pb.h" |
23 | | #include "yb/client/client.h" |
24 | | #include "yb/client/yb_table_name.h" |
25 | | #include "yb/common/wire_protocol.h" |
26 | | |
27 | | #include "yb/util/random_util.h" |
28 | | #include "yb/util/result.h" |
29 | | #include "yb/util/status_format.h" |
30 | | |
31 | | namespace yb { |
32 | | namespace master { |
33 | | namespace enterprise { |
34 | | |
35 | | std::map<std::string, std::string> GetPartitionStartKeyConsumerTabletMapping( |
36 | 0 | const GetTableLocationsResponsePB& consumer_tablets_resp) { |
37 | 0 | std::map<std::string, std::string> partitions_map; |
38 | 0 | for (const auto& tablet_location : consumer_tablets_resp.tablet_locations()) { |
39 | 0 | partitions_map[tablet_location.partition().partition_key_start()] = tablet_location.tablet_id(); |
40 | 0 | } |
41 | 0 | return partitions_map; |
42 | 0 | } |
43 | | |
44 | | Status CreateTabletMapping( |
45 | | const std::string& producer_table_id, |
46 | | const std::string& consumer_table_id, |
47 | | const std::string& producer_id, |
48 | | const std::string& producer_master_addrs, |
49 | | const GetTableLocationsResponsePB& consumer_tablets_resp, |
50 | | std::unordered_set<HostPort, HostPortHash>* tserver_addrs, |
51 | | cdc::StreamEntryPB* stream_entry, |
52 | 0 | std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) { |
53 | | |
54 | | // Get the tablets in the producer table. |
55 | 0 | auto producer_table_locations = |
56 | 0 | VERIFY_RESULT(cdc_rpc_tasks->GetTableLocations(producer_table_id)); |
57 | |
|
58 | 0 | auto consumer_tablets_size = consumer_tablets_resp.tablet_locations_size(); |
59 | 0 | auto partitions_map = GetPartitionStartKeyConsumerTabletMapping(consumer_tablets_resp); |
60 | 0 | stream_entry->set_consumer_table_id(consumer_table_id); |
61 | 0 | stream_entry->set_producer_table_id(producer_table_id); |
62 | 0 | auto* mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); |
63 | 0 | bool same_tablet_count = consumer_tablets_size == producer_table_locations.size(); |
64 | 0 | LOG(INFO) << Format("For producer table id $0 and consumer table id $1, same num tablets: $2", |
65 | 0 | producer_table_id, consumer_table_id, same_tablet_count); |
66 | 0 | stream_entry->set_same_num_producer_consumer_tablets(same_tablet_count); |
67 | | // Create the mapping between consumer and producer tablets. |
68 | 0 | for (int i = 0; i < producer_table_locations.size(); i++) { |
69 | 0 | const auto& producer = producer_table_locations.Get(i).tablet_id(); |
70 | 0 | std::string consumer; |
71 | 0 | if (same_tablet_count) { |
72 | | // We can optimize if we have the same tablet count in the producer and consumer table by |
73 | | // mapping key ranges to each other. |
74 | 0 | const auto& it = |
75 | 0 | partitions_map.find(producer_table_locations.Get(i).partition().partition_key_start()); |
76 | 0 | if (it == partitions_map.end()) { |
77 | 0 | return STATUS_SUBSTITUTE( |
78 | 0 | IllegalState, "When producer and consumer tablet counts are the same, could not find " |
79 | 0 | "matching keyrange for tablet $0", producer); |
80 | 0 | } |
81 | 0 | consumer = it->second; |
82 | 0 | } else { |
83 | 0 | consumer = consumer_tablets_resp.tablet_locations(i % consumer_tablets_size).tablet_id(); |
84 | 0 | } |
85 | |
|
86 | 0 | cdc::ProducerTabletListPB producer_tablets; |
87 | 0 | auto it = mutable_map->find(consumer); |
88 | 0 | if (it != mutable_map->end()) { |
89 | 0 | producer_tablets = it->second; |
90 | 0 | } |
91 | 0 | *producer_tablets.add_tablets() = producer; |
92 | 0 | (*mutable_map)[consumer] = producer_tablets; |
93 | | |
94 | | // For external CDC Consumers, populate the list of TServers they can connect to as proxies. |
95 | 0 | for (const auto& replica : producer_table_locations.Get(i).replicas()) { |
96 | | // Use the public IP addresses since we're cross-universe |
97 | 0 | for (const auto& addr : replica.ts_info().broadcast_addresses()) { |
98 | 0 | tserver_addrs->insert(HostPortFromPB(addr)); |
99 | 0 | } |
100 | | // Rarely a viable setup for production replication, but used in testing... |
101 | 0 | if (replica.ts_info().broadcast_addresses_size() == 0) { |
102 | 0 | LOG(WARNING) << "No public broadcast addresses found for " |
103 | 0 | << replica.ts_info().permanent_uuid() |
104 | 0 | << ". Using private addresses instead."; |
105 | 0 | for (const auto& addr : replica.ts_info().private_rpc_addresses()) { |
106 | 0 | tserver_addrs->insert(HostPortFromPB(addr)); |
107 | 0 | } |
108 | 0 | } |
109 | 0 | } |
110 | 0 | } |
111 | 0 | return Status::OK(); |
112 | 0 | } |
113 | | |
114 | | Status UpdateTableMappingOnTabletSplit( |
115 | | cdc::StreamEntryPB* stream_entry, |
116 | 0 | const SplitTabletIds& split_tablet_ids) { |
117 | 0 | auto* mutable_map = stream_entry->mutable_consumer_producer_tablet_map(); |
118 | 0 | auto producer_tablets = (*mutable_map)[split_tablet_ids.source]; |
119 | 0 | mutable_map->erase(split_tablet_ids.source); |
120 | | // TODO introduce a better mapping of tablets to improve locality (GH #10186). |
121 | | // For now we just distribute the producer tablets between both children. |
122 | 0 | for (int i = 0; i < producer_tablets.tablets().size(); ++i) { |
123 | 0 | if (i % 2) { |
124 | 0 | *(*mutable_map)[split_tablet_ids.children.first].add_tablets() = producer_tablets.tablets(i); |
125 | 0 | } else { |
126 | 0 | *(*mutable_map)[split_tablet_ids.children.second].add_tablets() = producer_tablets.tablets(i); |
127 | 0 | } |
128 | 0 | } |
129 | 0 | return Status::OK(); |
130 | 0 | } |
131 | | |
132 | | Result<std::vector<CDCConsumerStreamInfo>> TEST_GetConsumerProducerTableMap( |
133 | | const std::string& producer_master_addrs, |
134 | 0 | const ListTablesResponsePB& resp) { |
135 | |
|
136 | 0 | auto cdc_rpc_tasks = VERIFY_RESULT(CDCRpcTasks::CreateWithMasterAddrs( |
137 | 0 | "" /* producer_id */, producer_master_addrs)); |
138 | 0 | auto producer_tables = VERIFY_RESULT(cdc_rpc_tasks->ListTables()); |
139 | |
|
140 | 0 | std::unordered_map<std::string, std::string> consumer_tables_map; |
141 | 0 | for (const auto& table_info : resp.tables()) { |
142 | 0 | const auto& table_name_str = Format("$0:$1", table_info.namespace_().name(), table_info.name()); |
143 | 0 | consumer_tables_map[table_name_str] = table_info.id(); |
144 | 0 | } |
145 | |
|
146 | 0 | std::vector<CDCConsumerStreamInfo> consumer_producer_list; |
147 | 0 | for (const auto& table : producer_tables) { |
148 | | // TODO(Rahul): Fix this for YSQL workload testing. |
149 | 0 | if (!master::IsSystemNamespace(table.second.namespace_name())) { |
150 | 0 | const auto& table_name_str = |
151 | 0 | Format("$0:$1", table.second.namespace_name(), table.second.table_name()); |
152 | 0 | CDCConsumerStreamInfo stream_info; |
153 | 0 | stream_info.stream_id = RandomHumanReadableString(16); |
154 | 0 | stream_info.producer_table_id = table.first; |
155 | 0 | stream_info.consumer_table_id = consumer_tables_map[table_name_str]; |
156 | 0 | consumer_producer_list.push_back(std::move(stream_info)); |
157 | 0 | } |
158 | 0 | } |
159 | 0 | return consumer_producer_list; |
160 | 0 | } |
161 | | |
162 | | } // namespace enterprise |
163 | | } // namespace master |
164 | | } // namespace yb |