/Users/deen/code/yugabyte-db/src/yb/master/master_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/master_util.h" |
15 | | |
16 | | #include <boost/container/stable_vector.hpp> |
17 | | |
18 | | #include "yb/common/redis_constants_common.h" |
19 | | #include "yb/common/wire_protocol.h" |
20 | | |
21 | | #include "yb/consensus/metadata.pb.h" |
22 | | |
23 | | #include "yb/master/master_client.pb.h" |
24 | | #include "yb/master/master_cluster.proxy.h" |
25 | | #include "yb/master/master_defaults.h" |
26 | | #include "yb/master/master_error.h" |
27 | | |
28 | | #include "yb/rpc/rpc_controller.h" |
29 | | |
30 | | #include "yb/util/countdown_latch.h" |
31 | | #include "yb/util/net/net_util.h" |
32 | | #include "yb/util/result.h" |
33 | | #include "yb/util/status_format.h" |
34 | | |
35 | | namespace yb { |
36 | | namespace master { |
37 | | |
38 | | namespace { |
39 | | |
40 | | struct GetMasterRegistrationData { |
41 | | GetMasterRegistrationRequestPB req; |
42 | | GetMasterRegistrationResponsePB resp; |
43 | | rpc::RpcController controller; |
44 | | MasterClusterProxy proxy; |
45 | | |
46 | | GetMasterRegistrationData(rpc::ProxyCache* proxy_cache, const HostPort& hp) |
47 | 451 | : proxy(proxy_cache, hp) {} |
48 | | }; |
49 | | |
50 | | } // namespace |
51 | | |
52 | | Status GetMasterEntryForHosts(rpc::ProxyCache* proxy_cache, |
53 | | const std::vector<HostPort>& hostports, |
54 | | MonoDelta timeout, |
55 | 445 | ServerEntryPB* e) { |
56 | 445 | CHECK(!hostports.empty()); |
57 | | |
58 | 445 | boost::container::stable_vector<GetMasterRegistrationData> datas; |
59 | 445 | datas.reserve(hostports.size()); |
60 | 445 | std::atomic<GetMasterRegistrationData*> last_data{nullptr}; |
61 | 445 | CountDownLatch latch(hostports.size()); |
62 | 896 | for (size_t i = 0; i != hostports.size(); ++i451 ) { |
63 | 451 | datas.emplace_back(proxy_cache, hostports[i]); |
64 | 451 | auto& data = datas.back(); |
65 | 451 | data.controller.set_timeout(timeout); |
66 | 451 | data.proxy.GetMasterRegistrationAsync( |
67 | 451 | data.req, &data.resp, &data.controller, |
68 | 451 | [&data, &latch, &last_data] { |
69 | 451 | last_data.store(&data, std::memory_order_release); |
70 | 451 | latch.CountDown(); |
71 | 451 | }); |
72 | 451 | } |
73 | | |
74 | 445 | latch.Wait(); |
75 | | |
76 | 445 | for (const auto& data : datas) { |
77 | 445 | if (!data.controller.status().ok() || data.resp.has_error()440 ) { |
78 | 5 | continue; |
79 | 5 | } |
80 | 440 | e->mutable_instance_id()->CopyFrom(data.resp.instance_id()); |
81 | 440 | e->mutable_registration()->CopyFrom(data.resp.registration()); |
82 | 440 | e->set_role(data.resp.role()); |
83 | 440 | return Status::OK(); |
84 | 445 | } |
85 | | |
86 | 5 | auto last_data_value = last_data.load(std::memory_order_acquire); |
87 | 5 | if (last_data_value->controller.status().ok()) { |
88 | 0 | return StatusFromPB(last_data_value->resp.error().status()); |
89 | 5 | } else { |
90 | 5 | return last_data_value->controller.status(); |
91 | 5 | } |
92 | 5 | } |
93 | | |
94 | 371k | const HostPortPB& DesiredHostPort(const TSInfoPB& ts_info, const CloudInfoPB& from) { |
95 | 371k | return DesiredHostPort(ts_info.broadcast_addresses(), ts_info.private_rpc_addresses(), |
96 | 371k | ts_info.cloud_info(), from); |
97 | 371k | } |
98 | | |
99 | 0 | void TakeRegistration(consensus::RaftPeerPB* source, TSInfoPB* dest) { |
100 | 0 | dest->mutable_private_rpc_addresses()->Swap(source->mutable_last_known_private_addr()); |
101 | 0 | dest->mutable_broadcast_addresses()->Swap(source->mutable_last_known_broadcast_addr()); |
102 | 0 | dest->mutable_cloud_info()->Swap(source->mutable_cloud_info()); |
103 | 0 | } |
104 | | |
105 | 559k | void CopyRegistration(const consensus::RaftPeerPB& source, TSInfoPB* dest) { |
106 | 559k | *dest->mutable_private_rpc_addresses() = source.last_known_private_addr(); |
107 | 559k | *dest->mutable_broadcast_addresses() = source.last_known_broadcast_addr(); |
108 | 559k | *dest->mutable_cloud_info() = source.cloud_info(); |
109 | 559k | } |
110 | | |
111 | 0 | void TakeRegistration(ServerRegistrationPB* source, TSInfoPB* dest) { |
112 | 0 | dest->mutable_private_rpc_addresses()->Swap(source->mutable_private_rpc_addresses()); |
113 | 0 | dest->mutable_broadcast_addresses()->Swap(source->mutable_broadcast_addresses()); |
114 | 0 | dest->mutable_cloud_info()->Swap(source->mutable_cloud_info()); |
115 | 0 | } |
116 | | |
117 | 853k | void CopyRegistration(const ServerRegistrationPB& source, TSInfoPB* dest) { |
118 | 853k | dest->mutable_private_rpc_addresses()->CopyFrom(source.private_rpc_addresses()); |
119 | 853k | dest->mutable_broadcast_addresses()->CopyFrom(source.broadcast_addresses()); |
120 | 853k | dest->mutable_cloud_info()->CopyFrom(source.cloud_info()); |
121 | 853k | } |
122 | | |
123 | 9.89M | bool IsSystemNamespace(const std::string& namespace_name) { |
124 | 9.89M | return namespace_name == master::kSystemNamespaceName || |
125 | 9.89M | namespace_name == master::kSystemAuthNamespaceName9.56M || |
126 | 9.89M | namespace_name == master::kSystemDistributedNamespaceName9.55M || |
127 | 9.89M | namespace_name == master::kSystemSchemaNamespaceName9.55M || |
128 | 9.89M | namespace_name == master::kSystemTracesNamespaceName9.32M ; |
129 | 9.89M | } |
130 | | |
131 | 35.4k | YQLDatabase GetDefaultDatabaseType(const std::string& keyspace_name) { |
132 | 35.4k | return keyspace_name == common::kRedisKeyspaceName ? YQLDatabase::YQL_DATABASE_REDIS2.38k |
133 | 35.4k | : YQLDatabase::YQL_DATABASE_CQL33.0k ; |
134 | 35.4k | } |
135 | | |
136 | 63.0k | YQLDatabase GetDatabaseTypeForTable(const TableType table_type) { |
137 | 63.0k | switch (table_type) { |
138 | 21.4k | case TableType::YQL_TABLE_TYPE: |
139 | 21.4k | return YQLDatabase::YQL_DATABASE_CQL; |
140 | 566 | case TableType::REDIS_TABLE_TYPE: |
141 | 566 | return YQLDatabase::YQL_DATABASE_REDIS; |
142 | 39.6k | case TableType::PGSQL_TABLE_TYPE: |
143 | 39.6k | return YQLDatabase::YQL_DATABASE_PGSQL; |
144 | 1.34k | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
145 | | // Transactions status table is created in "system" keyspace in CQL. |
146 | 1.34k | return YQLDatabase::YQL_DATABASE_CQL; |
147 | 63.0k | } |
148 | 0 | return YQL_DATABASE_UNKNOWN; |
149 | 63.0k | } |
150 | | |
151 | 17 | TableType GetTableTypeForDatabase(const YQLDatabase database_type) { |
152 | 17 | switch (database_type) { |
153 | 1 | case YQLDatabase::YQL_DATABASE_CQL: |
154 | 1 | return TableType::YQL_TABLE_TYPE; |
155 | 0 | case YQLDatabase::YQL_DATABASE_REDIS: |
156 | 0 | return TableType::REDIS_TABLE_TYPE; |
157 | 16 | case YQLDatabase::YQL_DATABASE_PGSQL: |
158 | 16 | return TableType::PGSQL_TABLE_TYPE; |
159 | 0 | default: |
160 | 0 | DCHECK_EQ(database_type, YQLDatabase::YQL_DATABASE_UNKNOWN); |
161 | 0 | return TableType::DEFAULT_TABLE_TYPE; |
162 | 17 | } |
163 | 17 | } |
164 | | |
165 | | Result<bool> NamespaceMatchesIdentifier( |
166 | | const NamespaceId& namespace_id, YQLDatabase db_type, const NamespaceName& namespace_name, |
167 | 8.49k | const NamespaceIdentifierPB& ns_identifier) { |
168 | 8.49k | if (ns_identifier.has_id()) { |
169 | 8.48k | return namespace_id == ns_identifier.id(); |
170 | 8.48k | } |
171 | 6 | if (ns_identifier.has_database_type() && ns_identifier.database_type() != db_type4 ) { |
172 | 2 | return false; |
173 | 2 | } |
174 | 4 | if (ns_identifier.has_name()) { |
175 | 4 | return namespace_name == ns_identifier.name(); |
176 | 4 | } |
177 | 0 | return STATUS_FORMAT( |
178 | 4 | InvalidArgument, "Wrong namespace identifier format: $0", ns_identifier); |
179 | 4 | } |
180 | | |
181 | | Result<bool> TableMatchesIdentifier( |
182 | 8.37k | const TableId& id, const SysTablesEntryPB& table, const TableIdentifierPB& table_identifier) { |
183 | 8.37k | if (table_identifier.has_table_id()) { |
184 | 2 | return id == table_identifier.table_id(); |
185 | 2 | } |
186 | 8.37k | if (!table_identifier.table_name().empty() && table_identifier.table_name() != table.name()16 ) { |
187 | 8 | return false; |
188 | 8 | } |
189 | 8.36k | if (table_identifier.has_namespace_()) { |
190 | 8.36k | return NamespaceMatchesIdentifier( |
191 | 8.36k | table.namespace_id(), master::GetDatabaseTypeForTable(table.table_type()), |
192 | 8.36k | table.namespace_name(), table_identifier.namespace_()); |
193 | 8.36k | } |
194 | 0 | return STATUS_FORMAT( |
195 | 8.36k | InvalidArgument, "Wrong table identifier format: $0", table_identifier); |
196 | 8.36k | } |
197 | | |
198 | 410 | CHECKED_STATUS SetupError(MasterErrorPB* error, const Status& s) { |
199 | 410 | StatusToPB(s, error->mutable_status()); |
200 | 410 | error->set_code(MasterError::ValueFromStatus(s).get_value_or(MasterErrorPB::UNKNOWN_ERROR)); |
201 | 410 | return s; |
202 | 410 | } |
203 | | |
204 | | } // namespace master |
205 | | } // namespace yb |