YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/master/cdc_consumer_registry_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/cdc_consumer_registry_service.h"
15
16
#include "yb/master/catalog_entity_info.h"
17
#include "yb/master/cdc_rpc_tasks.h"
18
#include "yb/master/master_client.pb.h"
19
#include "yb/master/master_ddl.pb.h"
20
#include "yb/master/master_util.h"
21
22
#include "yb/cdc/cdc_consumer.pb.h"
23
#include "yb/client/client.h"
24
#include "yb/client/yb_table_name.h"
25
#include "yb/common/wire_protocol.h"
26
27
#include "yb/util/random_util.h"
28
#include "yb/util/result.h"
29
#include "yb/util/status_format.h"
30
31
namespace yb {
32
namespace master {
33
namespace enterprise {
34
35
std::map<std::string, std::string> GetPartitionStartKeyConsumerTabletMapping(
36
0
    const GetTableLocationsResponsePB& consumer_tablets_resp) {
37
0
  std::map<std::string, std::string> partitions_map;
38
0
  for (const auto& tablet_location : consumer_tablets_resp.tablet_locations()) {
39
0
    partitions_map[tablet_location.partition().partition_key_start()] = tablet_location.tablet_id();
40
0
  }
41
0
  return partitions_map;
42
0
}
43
44
Status CreateTabletMapping(
45
    const std::string& producer_table_id,
46
    const std::string& consumer_table_id,
47
    const std::string& producer_id,
48
    const std::string& producer_master_addrs,
49
    const GetTableLocationsResponsePB& consumer_tablets_resp,
50
    std::unordered_set<HostPort, HostPortHash>* tserver_addrs,
51
    cdc::StreamEntryPB* stream_entry,
52
0
    std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks) {
53
54
  // Get the tablets in the producer table.
55
0
  auto producer_table_locations =
56
0
      VERIFY_RESULT(cdc_rpc_tasks->GetTableLocations(producer_table_id));
57
58
0
  auto consumer_tablets_size = consumer_tablets_resp.tablet_locations_size();
59
0
  auto partitions_map = GetPartitionStartKeyConsumerTabletMapping(consumer_tablets_resp);
60
0
  stream_entry->set_consumer_table_id(consumer_table_id);
61
0
  stream_entry->set_producer_table_id(producer_table_id);
62
0
  auto* mutable_map = stream_entry->mutable_consumer_producer_tablet_map();
63
0
  bool same_tablet_count = consumer_tablets_size == producer_table_locations.size();
64
0
  LOG(INFO) << Format("For producer table id $0 and consumer table id $1, same num tablets: $2",
65
0
                      producer_table_id, consumer_table_id, same_tablet_count);
66
0
  stream_entry->set_same_num_producer_consumer_tablets(same_tablet_count);
67
  // Create the mapping between consumer and producer tablets.
68
0
  for (int i = 0; i < producer_table_locations.size(); i++) {
69
0
    const auto& producer = producer_table_locations.Get(i).tablet_id();
70
0
    std::string consumer;
71
0
    if (same_tablet_count) {
72
      // We can optimize if we have the same tablet count in the producer and consumer table by
73
      // mapping key ranges to each other.
74
0
      const auto& it =
75
0
          partitions_map.find(producer_table_locations.Get(i).partition().partition_key_start());
76
0
      if (it == partitions_map.end()) {
77
0
        return STATUS_SUBSTITUTE(
78
0
            IllegalState, "When producer and consumer tablet counts are the same, could not find "
79
0
                          "matching keyrange for tablet $0", producer);
80
0
      }
81
0
      consumer = it->second;
82
0
    } else {
83
0
      consumer = consumer_tablets_resp.tablet_locations(i % consumer_tablets_size).tablet_id();
84
0
    }
85
86
0
    cdc::ProducerTabletListPB producer_tablets;
87
0
    auto it = mutable_map->find(consumer);
88
0
    if (it != mutable_map->end()) {
89
0
      producer_tablets = it->second;
90
0
    }
91
0
    *producer_tablets.add_tablets() = producer;
92
0
    (*mutable_map)[consumer] = producer_tablets;
93
94
    // For external CDC Consumers, populate the list of TServers they can connect to as proxies.
95
0
    for (const auto& replica : producer_table_locations.Get(i).replicas()) {
96
      // Use the public IP addresses since we're cross-universe
97
0
      for (const auto& addr : replica.ts_info().broadcast_addresses()) {
98
0
        tserver_addrs->insert(HostPortFromPB(addr));
99
0
      }
100
      // Rarely a viable setup for production replication, but used in testing...
101
0
      if (replica.ts_info().broadcast_addresses_size() == 0) {
102
0
        LOG(WARNING) << "No public broadcast addresses found for "
103
0
                     << replica.ts_info().permanent_uuid()
104
0
                     << ".  Using private addresses instead.";
105
0
        for (const auto& addr : replica.ts_info().private_rpc_addresses()) {
106
0
          tserver_addrs->insert(HostPortFromPB(addr));
107
0
        }
108
0
      }
109
0
    }
110
0
  }
111
0
  return Status::OK();
112
0
}
113
114
Status UpdateTableMappingOnTabletSplit(
115
    cdc::StreamEntryPB* stream_entry,
116
0
    const SplitTabletIds& split_tablet_ids) {
117
0
  auto* mutable_map = stream_entry->mutable_consumer_producer_tablet_map();
118
0
  auto producer_tablets = (*mutable_map)[split_tablet_ids.source];
119
0
  mutable_map->erase(split_tablet_ids.source);
120
  // TODO introduce a better mapping of tablets to improve locality (GH #10186).
121
  // For now we just distribute the producer tablets between both children.
122
0
  for (int i = 0; i < producer_tablets.tablets().size(); ++i) {
123
0
    if (i % 2) {
124
0
      *(*mutable_map)[split_tablet_ids.children.first].add_tablets() = producer_tablets.tablets(i);
125
0
    } else {
126
0
      *(*mutable_map)[split_tablet_ids.children.second].add_tablets() = producer_tablets.tablets(i);
127
0
    }
128
0
  }
129
0
  return Status::OK();
130
0
}
131
132
Result<std::vector<CDCConsumerStreamInfo>> TEST_GetConsumerProducerTableMap(
133
    const std::string& producer_master_addrs,
134
0
    const ListTablesResponsePB& resp) {
135
136
0
  auto cdc_rpc_tasks = VERIFY_RESULT(CDCRpcTasks::CreateWithMasterAddrs(
137
0
      "" /* producer_id */, producer_master_addrs));
138
0
  auto producer_tables = VERIFY_RESULT(cdc_rpc_tasks->ListTables());
139
140
0
  std::unordered_map<std::string, std::string> consumer_tables_map;
141
0
  for (const auto& table_info : resp.tables()) {
142
0
    const auto& table_name_str = Format("$0:$1", table_info.namespace_().name(), table_info.name());
143
0
    consumer_tables_map[table_name_str] = table_info.id();
144
0
  }
145
146
0
  std::vector<CDCConsumerStreamInfo> consumer_producer_list;
147
0
  for (const auto& table : producer_tables) {
148
    // TODO(Rahul): Fix this for YSQL workload testing.
149
0
    if (!master::IsSystemNamespace(table.second.namespace_name())) {
150
0
      const auto& table_name_str =
151
0
          Format("$0:$1", table.second.namespace_name(), table.second.table_name());
152
0
      CDCConsumerStreamInfo stream_info;
153
0
      stream_info.stream_id = RandomHumanReadableString(16);
154
0
      stream_info.producer_table_id = table.first;
155
0
      stream_info.consumer_table_id = consumer_tables_map[table_name_str];
156
0
      consumer_producer_list.push_back(std::move(stream_info));
157
0
    }
158
0
  }
159
0
  return consumer_producer_list;
160
0
}
161
162
} // namespace enterprise
163
} // namespace master
164
} // namespace yb