/Users/deen/code/yugabyte-db/src/yb/master/master_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/master_util.h" |
15 | | |
16 | | #include <boost/container/stable_vector.hpp> |
17 | | |
18 | | #include "yb/common/redis_constants_common.h" |
19 | | #include "yb/common/wire_protocol.h" |
20 | | |
21 | | #include "yb/consensus/metadata.pb.h" |
22 | | |
23 | | #include "yb/master/master_client.pb.h" |
24 | | #include "yb/master/master_cluster.proxy.h" |
25 | | #include "yb/master/master_defaults.h" |
26 | | #include "yb/master/master_error.h" |
27 | | |
28 | | #include "yb/rpc/rpc_controller.h" |
29 | | |
30 | | #include "yb/util/countdown_latch.h" |
31 | | #include "yb/util/net/net_util.h" |
32 | | #include "yb/util/result.h" |
33 | | #include "yb/util/status_format.h" |
34 | | |
35 | | namespace yb { |
36 | | namespace master { |
37 | | |
38 | | namespace { |
39 | | |
40 | | struct GetMasterRegistrationData { |
41 | | GetMasterRegistrationRequestPB req; |
42 | | GetMasterRegistrationResponsePB resp; |
43 | | rpc::RpcController controller; |
44 | | MasterClusterProxy proxy; |
45 | | |
46 | | GetMasterRegistrationData(rpc::ProxyCache* proxy_cache, const HostPort& hp) |
47 | 328 | : proxy(proxy_cache, hp) {} |
48 | | }; |
49 | | |
50 | | } // namespace |
51 | | |
52 | | Status GetMasterEntryForHosts(rpc::ProxyCache* proxy_cache, |
53 | | const std::vector<HostPort>& hostports, |
54 | | MonoDelta timeout, |
55 | 322 | ServerEntryPB* e) { |
56 | 322 | CHECK(!hostports.empty()); |
57 | | |
58 | 322 | boost::container::stable_vector<GetMasterRegistrationData> datas; |
59 | 322 | datas.reserve(hostports.size()); |
60 | 322 | std::atomic<GetMasterRegistrationData*> last_data{nullptr}; |
61 | 322 | CountDownLatch latch(hostports.size()); |
62 | 650 | for (size_t i = 0; i != hostports.size(); ++i) { |
63 | 328 | datas.emplace_back(proxy_cache, hostports[i]); |
64 | 328 | auto& data = datas.back(); |
65 | 328 | data.controller.set_timeout(timeout); |
66 | 328 | data.proxy.GetMasterRegistrationAsync( |
67 | 328 | data.req, &data.resp, &data.controller, |
68 | 328 | [&data, &latch, &last_data] { |
69 | 328 | last_data.store(&data, std::memory_order_release); |
70 | 328 | latch.CountDown(); |
71 | 328 | }); |
72 | 328 | } |
73 | | |
74 | 322 | latch.Wait(); |
75 | | |
76 | 322 | for (const auto& data : datas) { |
77 | 322 | if (!data.controller.status().ok() || data.resp.has_error()) { |
78 | 0 | continue; |
79 | 0 | } |
80 | 322 | e->mutable_instance_id()->CopyFrom(data.resp.instance_id()); |
81 | 322 | e->mutable_registration()->CopyFrom(data.resp.registration()); |
82 | 322 | e->set_role(data.resp.role()); |
83 | 322 | return Status::OK(); |
84 | 322 | } |
85 | | |
86 | 0 | auto last_data_value = last_data.load(std::memory_order_acquire); |
87 | 0 | if (last_data_value->controller.status().ok()) { |
88 | 0 | return StatusFromPB(last_data_value->resp.error().status()); |
89 | 0 | } else { |
90 | 0 | return last_data_value->controller.status(); |
91 | 0 | } |
92 | 0 | } |
93 | | |
94 | 275k | const HostPortPB& DesiredHostPort(const TSInfoPB& ts_info, const CloudInfoPB& from) { |
95 | 275k | return DesiredHostPort(ts_info.broadcast_addresses(), ts_info.private_rpc_addresses(), |
96 | 275k | ts_info.cloud_info(), from); |
97 | 275k | } |
98 | | |
99 | 0 | void TakeRegistration(consensus::RaftPeerPB* source, TSInfoPB* dest) { |
100 | 0 | dest->mutable_private_rpc_addresses()->Swap(source->mutable_last_known_private_addr()); |
101 | 0 | dest->mutable_broadcast_addresses()->Swap(source->mutable_last_known_broadcast_addr()); |
102 | 0 | dest->mutable_cloud_info()->Swap(source->mutable_cloud_info()); |
103 | 0 | } |
104 | | |
105 | 416k | void CopyRegistration(const consensus::RaftPeerPB& source, TSInfoPB* dest) { |
106 | 416k | *dest->mutable_private_rpc_addresses() = source.last_known_private_addr(); |
107 | 416k | *dest->mutable_broadcast_addresses() = source.last_known_broadcast_addr(); |
108 | 416k | *dest->mutable_cloud_info() = source.cloud_info(); |
109 | 416k | } |
110 | | |
111 | 0 | void TakeRegistration(ServerRegistrationPB* source, TSInfoPB* dest) { |
112 | 0 | dest->mutable_private_rpc_addresses()->Swap(source->mutable_private_rpc_addresses()); |
113 | 0 | dest->mutable_broadcast_addresses()->Swap(source->mutable_broadcast_addresses()); |
114 | 0 | dest->mutable_cloud_info()->Swap(source->mutable_cloud_info()); |
115 | 0 | } |
116 | | |
117 | 584k | void CopyRegistration(const ServerRegistrationPB& source, TSInfoPB* dest) { |
118 | 584k | dest->mutable_private_rpc_addresses()->CopyFrom(source.private_rpc_addresses()); |
119 | 584k | dest->mutable_broadcast_addresses()->CopyFrom(source.broadcast_addresses()); |
120 | 584k | dest->mutable_cloud_info()->CopyFrom(source.cloud_info()); |
121 | 584k | } |
122 | | |
123 | 5.02M | bool IsSystemNamespace(const std::string& namespace_name) { |
124 | 5.02M | return namespace_name == master::kSystemNamespaceName || |
125 | 4.72M | namespace_name == master::kSystemAuthNamespaceName || |
126 | 4.71M | namespace_name == master::kSystemDistributedNamespaceName || |
127 | 4.71M | namespace_name == master::kSystemSchemaNamespaceName || |
128 | 4.48M | namespace_name == master::kSystemTracesNamespaceName; |
129 | 5.02M | } |
130 | | |
131 | 21.0k | YQLDatabase GetDefaultDatabaseType(const std::string& keyspace_name) { |
132 | 731 | return keyspace_name == common::kRedisKeyspaceName ? YQLDatabase::YQL_DATABASE_REDIS |
133 | 20.2k | : YQLDatabase::YQL_DATABASE_CQL; |
134 | 21.0k | } |
135 | | |
136 | 24.9k | YQLDatabase GetDatabaseTypeForTable(const TableType table_type) { |
137 | 24.9k | switch (table_type) { |
138 | 20.0k | case TableType::YQL_TABLE_TYPE: |
139 | 20.0k | return YQLDatabase::YQL_DATABASE_CQL; |
140 | 107 | case TableType::REDIS_TABLE_TYPE: |
141 | 107 | return YQLDatabase::YQL_DATABASE_REDIS; |
142 | 3.99k | case TableType::PGSQL_TABLE_TYPE: |
143 | 3.99k | return YQLDatabase::YQL_DATABASE_PGSQL; |
144 | 790 | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
145 | | // Transactions status table is created in "system" keyspace in CQL. |
146 | 790 | return YQLDatabase::YQL_DATABASE_CQL; |
147 | 0 | } |
148 | 0 | return YQL_DATABASE_UNKNOWN; |
149 | 0 | } |
150 | | |
151 | 6 | TableType GetTableTypeForDatabase(const YQLDatabase database_type) { |
152 | 6 | switch (database_type) { |
153 | 1 | case YQLDatabase::YQL_DATABASE_CQL: |
154 | 1 | return TableType::YQL_TABLE_TYPE; |
155 | 0 | case YQLDatabase::YQL_DATABASE_REDIS: |
156 | 0 | return TableType::REDIS_TABLE_TYPE; |
157 | 5 | case YQLDatabase::YQL_DATABASE_PGSQL: |
158 | 5 | return TableType::PGSQL_TABLE_TYPE; |
159 | 0 | default: |
160 | 0 | DCHECK_EQ(database_type, YQLDatabase::YQL_DATABASE_UNKNOWN); |
161 | 0 | return TableType::DEFAULT_TABLE_TYPE; |
162 | 6 | } |
163 | 6 | } |
164 | | |
165 | | Result<bool> NamespaceMatchesIdentifier( |
166 | | const NamespaceId& namespace_id, YQLDatabase db_type, const NamespaceName& namespace_name, |
167 | 8 | const NamespaceIdentifierPB& ns_identifier) { |
168 | 8 | if (ns_identifier.has_id()) { |
169 | 2 | return namespace_id == ns_identifier.id(); |
170 | 2 | } |
171 | 6 | if (ns_identifier.has_database_type() && ns_identifier.database_type() != db_type) { |
172 | 2 | return false; |
173 | 2 | } |
174 | 4 | if (ns_identifier.has_name()) { |
175 | 4 | return namespace_name == ns_identifier.name(); |
176 | 4 | } |
177 | 0 | return STATUS_FORMAT( |
178 | 0 | InvalidArgument, "Wrong namespace identifier format: $0", ns_identifier); |
179 | 0 | } |
180 | | |
181 | | Result<bool> TableMatchesIdentifier( |
182 | 18 | const TableId& id, const SysTablesEntryPB& table, const TableIdentifierPB& table_identifier) { |
183 | 18 | if (table_identifier.has_table_id()) { |
184 | 2 | return id == table_identifier.table_id(); |
185 | 2 | } |
186 | 16 | if (!table_identifier.table_name().empty() && table_identifier.table_name() != table.name()) { |
187 | 8 | return false; |
188 | 8 | } |
189 | 8 | if (table_identifier.has_namespace_()) { |
190 | 8 | return NamespaceMatchesIdentifier( |
191 | 8 | table.namespace_id(), master::GetDatabaseTypeForTable(table.table_type()), |
192 | 8 | table.namespace_name(), table_identifier.namespace_()); |
193 | 8 | } |
194 | 0 | return STATUS_FORMAT( |
195 | 0 | InvalidArgument, "Wrong table identifier format: $0", table_identifier); |
196 | 0 | } |
197 | | |
198 | 187 | CHECKED_STATUS SetupError(MasterErrorPB* error, const Status& s) { |
199 | 187 | StatusToPB(s, error->mutable_status()); |
200 | 187 | error->set_code(MasterError::ValueFromStatus(s).get_value_or(MasterErrorPB::UNKNOWN_ERROR)); |
201 | 187 | return s; |
202 | 187 | } |
203 | | |
204 | | } // namespace master |
205 | | } // namespace yb |