/Users/deen/code/yugabyte-db/src/yb/master/yql_partitions_vtable.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/yql_partitions_vtable.h" |
15 | | |
16 | | #include "yb/common/ql_type.h" |
17 | | #include "yb/common/ql_value.h" |
18 | | #include "yb/common/schema.h" |
19 | | |
20 | | #include "yb/master/catalog_entity_info.h" |
21 | | #include "yb/master/catalog_manager_if.h" |
22 | | #include "yb/master/master.h" |
23 | | #include "yb/master/master_client.pb.h" |
24 | | #include "yb/master/master_util.h" |
25 | | |
26 | | #include "yb/rpc/messenger.h" |
27 | | |
28 | | #include "yb/util/net/dns_resolver.h" |
29 | | #include "yb/util/pb_util.h" |
30 | | #include "yb/util/result.h" |
31 | | #include "yb/util/status_log.h" |
32 | | |
33 | | DECLARE_int32(partitions_vtable_cache_refresh_secs); |
34 | | |
35 | | DEFINE_bool(use_cache_for_partitions_vtable, true, |
36 | | "Whether we should use caching for system.partitions table."); |
37 | | |
38 | | DEFINE_bool(generate_partitions_vtable_on_changes, true, |
39 | | "Whether we should generate the system.partitions vtable whenever relevant partition " |
40 | | "changes occur."); |
41 | | |
42 | | namespace yb { |
43 | | namespace master { |
44 | | |
45 | | namespace { |
46 | | |
47 | | const std::string kKeyspaceName = "keyspace_name"; |
48 | | const std::string kTableName = "table_name"; |
49 | | const std::string kStartKey = "start_key"; |
50 | | const std::string kEndKey = "end_key"; |
51 | | const std::string kId = "id"; |
52 | | const std::string kReplicaAddresses = "replica_addresses"; |
53 | | |
54 | | } // namespace |
55 | | |
56 | 35.2k | bool YQLPartitionsVTable::GeneratePartitionsVTableWithBgTask() { |
57 | 35.2k | return FLAGS_partitions_vtable_cache_refresh_secs > 0 && |
58 | 334 | !FLAGS_generate_partitions_vtable_on_changes; |
59 | 35.2k | } |
60 | | |
61 | 311k | bool YQLPartitionsVTable::GeneratePartitionsVTableOnChanges() { |
62 | 311k | return FLAGS_generate_partitions_vtable_on_changes; |
63 | 311k | } |
64 | | |
65 | | YQLPartitionsVTable::YQLPartitionsVTable(const TableName& table_name, |
66 | | const NamespaceName& namespace_name, |
67 | | Master * const master) |
68 | 2.00k | : YQLVirtualTable(table_name, namespace_name, master, CreateSchema()) { |
69 | 2.00k | } |
70 | | |
71 | | Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::RetrieveData( |
72 | 14.2k | const QLReadRequestPB& request) const { |
73 | 14.2k | if (GeneratePartitionsVTableWithBgTask()) { |
74 | 105 | SharedLock<std::shared_timed_mutex> read_lock(mutex_); |
75 | | // The cached versions are initialized to -1, so if there is a race, we may still generate the |
76 | | // cache on the calling thread. |
77 | 105 | if (cached_tablets_version_ >= 0 && cached_tablet_locations_version_ >= 0) { |
78 | | // Don't need a version match here, since we have a bg task handling cache refreshing. |
79 | 105 | return cache_; |
80 | 105 | } |
81 | 14.1k | } else if (GeneratePartitionsVTableOnChanges()) { |
82 | 14.1k | bool require_full_vtable_reset = false; |
83 | 14.1k | { |
84 | 14.1k | SharedLock<std::shared_timed_mutex> read_lock(mutex_); |
85 | | // If we don't need to update the cache, then a read lock is enough. |
86 | 14.1k | if (!update_cache_) { |
87 | 11.3k | return cache_; |
88 | 11.3k | } |
89 | | // If we have just reset the table, then we need to do regenerate the entire vtable. |
90 | 2.80k | require_full_vtable_reset = cached_tablets_version_ == kInvalidCache || |
91 | 2.80k | cached_tablet_locations_version_ == kInvalidCache; |
92 | 2.80k | } |
93 | 2.80k | if (!require_full_vtable_reset) { |
94 | 2.80k | std::lock_guard<std::shared_timed_mutex> lock(mutex_); |
95 | | // If we don't need to regenerate the entire vtable, then we can just update it using the map. |
96 | 2.80k | return GetTableFromMap(); |
97 | 2.80k | } |
98 | 1 | } |
99 | | |
100 | 1 | return GenerateAndCacheData(); |
101 | 1 | } |
102 | | |
103 | 2.01k | Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::GenerateAndCacheData() const { |
104 | 2.01k | auto* catalog_manager = &this->catalog_manager(); |
105 | 2.01k | { |
106 | 2.01k | SharedLock<std::shared_timed_mutex> read_lock(mutex_); |
107 | 2.01k | if (FLAGS_use_cache_for_partitions_vtable && |
108 | 2.01k | catalog_manager->tablets_version() == cached_tablets_version_ && |
109 | 1 | catalog_manager->tablet_locations_version() == cached_tablet_locations_version_ && |
110 | 1 | !update_cache_) { |
111 | | // Cache is up to date, so we could use it. |
112 | 1 | return cache_; |
113 | 1 | } |
114 | 2.00k | } |
115 | | |
116 | 2.00k | std::lock_guard<std::shared_timed_mutex> lock(mutex_); |
117 | 2.00k | auto new_tablets_version = catalog_manager->tablets_version(); |
118 | 2.00k | auto new_tablet_locations_version = catalog_manager->tablet_locations_version(); |
119 | 2.00k | { |
120 | 2.00k | if (FLAGS_use_cache_for_partitions_vtable && |
121 | 2.00k | new_tablets_version == cached_tablets_version_ && |
122 | 0 | new_tablet_locations_version == cached_tablet_locations_version_) { |
123 | | // Cache was updated between locks, and now it is up to date. |
124 | 0 | return GetTableFromMap(); |
125 | 0 | } |
126 | 2.00k | } |
127 | | |
128 | 2.00k | if (GeneratePartitionsVTableOnChanges() && |
129 | 2.00k | cached_tablets_version_ >= 0 && |
130 | 0 | cached_tablet_locations_version_ >= 0) { |
131 | | // Only need to generate on first call, all later calls can just return here (need write lock |
132 | | // in case of update_cache_). |
133 | 0 | return GetTableFromMap(); |
134 | 0 | } |
135 | | |
136 | | // Fully regenerate the entire vtable. |
137 | 2.00k | table_to_partition_start_to_row_map_.clear(); |
138 | 2.00k | update_cache_ = true; |
139 | | |
140 | 2.00k | auto tables = master_->catalog_manager()->GetTables(GetTablesMode::kVisibleToClient); |
141 | | |
142 | 237k | for (const scoped_refptr<TableInfo>& table : tables) { |
143 | | // Skip non-YQL tables. |
144 | 237k | if (!IsYcqlTable(*table)) { |
145 | 205k | continue; |
146 | 205k | } |
147 | | |
148 | 32.1k | TabletInfos tablet_infos = table->GetTablets(); |
149 | 32.1k | RETURN_NOT_OK(ProcessTablets(tablet_infos)); |
150 | 32.1k | } |
151 | | |
152 | | // Update cache and versions. |
153 | 2.00k | cached_tablets_version_ = new_tablets_version; |
154 | 2.00k | cached_tablet_locations_version_ = new_tablet_locations_version; |
155 | | |
156 | 2.00k | return GetTableFromMap(); |
157 | 2.00k | } |
158 | | |
159 | 49.9k | Status YQLPartitionsVTable::ProcessTablets(const std::vector<TabletInfoPtr>& tablets) const { |
160 | 49.9k | if (tablets.empty()) { |
161 | 0 | return Status::OK(); |
162 | 0 | } |
163 | | |
164 | 49.9k | google::protobuf::Arena arena; |
165 | 49.9k | DnsLookupMap dns_lookups; |
166 | 49.9k | std::vector<TabletData> tablet_data; |
167 | | |
168 | | // Get TabletData for each tablet. |
169 | 50.1k | for (const auto& t : tablets) { |
170 | 50.1k | tablet_data.push_back(VERIFY_RESULT(GetTabletData(t, &dns_lookups, &arena))); |
171 | 50.1k | } |
172 | | |
173 | | // Process all dns_lookups futures at the end. |
174 | 49.9k | std::unordered_map<std::string, InetAddress> dns_results; |
175 | 132k | for (auto& p : dns_lookups) { |
176 | 132k | const auto res = p.second.get(); |
177 | 132k | if (!res.ok()) { |
178 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 30) << "Unable to resolve host: " << res; |
179 | 132k | } else { |
180 | 132k | dns_results.emplace(p.first, InetAddress(res.get())); |
181 | 132k | } |
182 | 132k | } |
183 | | |
184 | 50.1k | for (const auto& data : tablet_data) { |
185 | | // Skip not-found tablets: they might not be running yet or have been deleted. |
186 | 50.1k | if (data.locations->table_id().empty()) { |
187 | 24 | continue; |
188 | 24 | } |
189 | | |
190 | | // QLRow doesn't have default ctor, so need to emplace using std::piecewise_construct. |
191 | 50.0k | auto row = table_to_partition_start_to_row_map_[data.table_id].emplace( |
192 | 50.0k | std::piecewise_construct, |
193 | 50.0k | std::forward_as_tuple(data.locations->partition().partition_key_start()), |
194 | 50.0k | std::forward_as_tuple(std::make_shared<const Schema>(*schema_))); |
195 | 50.0k | RETURN_NOT_OK(InsertTabletIntoRowUnlocked(data, &row.first->second, dns_results)); |
196 | | // Will need to update the cache as the map has been modified. |
197 | 50.0k | update_cache_ = true; |
198 | 50.0k | } |
199 | | |
200 | 49.9k | return Status::OK(); |
201 | 49.9k | } |
202 | | |
203 | | Result<YQLPartitionsVTable::TabletData> YQLPartitionsVTable::GetTabletData( |
204 | | const scoped_refptr<TabletInfo>& tablet, |
205 | | DnsLookupMap* dns_lookups, |
206 | 50.1k | google::protobuf::Arena* arena) const { |
207 | 50.1k | auto data = TabletData { |
208 | 50.1k | .namespace_name = tablet->table()->namespace_name(), |
209 | 50.1k | .table_name = tablet->table()->name(), |
210 | 50.1k | .table_id = tablet->table()->id(), |
211 | 50.1k | .tablet_id = tablet->tablet_id(), |
212 | 50.1k | .locations = google::protobuf::Arena::Create<TabletLocationsPB>(arena), |
213 | 50.1k | }; |
214 | | |
215 | 50.1k | auto s = master_->catalog_manager()->GetTabletLocations(tablet, data.locations); |
216 | 50.1k | if (!s.ok()) { |
217 | 24 | data.locations->Clear(); |
218 | 24 | } |
219 | 137k | for (const auto& replica : data.locations->replicas()) { |
220 | 137k | auto host = DesiredHostPort(replica.ts_info(), CloudInfoPB()).host(); |
221 | 137k | if (dns_lookups->count(host) == 0) { |
222 | 132k | dns_lookups->emplace(host, master_->messenger()->resolver().ResolveFuture(host)); |
223 | 132k | } |
224 | 137k | } |
225 | 50.1k | return data; |
226 | 50.1k | } |
227 | | |
228 | | Status YQLPartitionsVTable::InsertTabletIntoRowUnlocked( |
229 | | const TabletData& tablet, QLRow* row, |
230 | 50.0k | const std::unordered_map<std::string, InetAddress>& dns_results) const { |
231 | 50.0k | RETURN_NOT_OK(SetColumnValue(kKeyspaceName, tablet.namespace_name, row)); |
232 | 50.0k | RETURN_NOT_OK(SetColumnValue(kTableName, tablet.table_name, row)); |
233 | | |
234 | 50.0k | const PartitionPB& partition = tablet.locations->partition(); |
235 | 50.0k | RETURN_NOT_OK(SetColumnValue(kStartKey, partition.partition_key_start(), row)); |
236 | 50.0k | RETURN_NOT_OK(SetColumnValue(kEndKey, partition.partition_key_end(), row)); |
237 | | |
238 | | // Note: tablet id is in host byte order. |
239 | 50.0k | Uuid uuid; |
240 | 50.0k | RETURN_NOT_OK(uuid.FromHexString(tablet.tablet_id)); |
241 | 50.0k | RETURN_NOT_OK(SetColumnValue(kId, uuid, row)); |
242 | | |
243 | | // Get replicas for tablet. |
244 | 50.0k | QLValuePB replica_addresses; |
245 | 50.0k | QLMapValuePB *map_value = replica_addresses.mutable_map_value(); |
246 | 137k | for (const auto& replica : tablet.locations->replicas()) { |
247 | 137k | auto host = DesiredHostPort(replica.ts_info(), CloudInfoPB()).host(); |
248 | | |
249 | | // In case of resolution failure, we may not find the host in dns_results. |
250 | 137k | const auto addr = dns_results.find(host); |
251 | 137k | if (addr != dns_results.end()) { |
252 | 137k | QLValue::set_inetaddress_value(addr->second, map_value->add_keys()); |
253 | 137k | map_value->add_values()->set_string_value(PeerRole_Name(replica.role())); |
254 | 137k | } |
255 | 137k | } |
256 | 50.0k | RETURN_NOT_OK(SetColumnValue(kReplicaAddresses, replica_addresses, row)); |
257 | | |
258 | 50.0k | return Status::OK(); |
259 | 50.0k | } |
260 | | |
261 | 1.55k | void YQLPartitionsVTable::RemoveFromCache(const TableId& table_id) const { |
262 | 1.55k | std::lock_guard<std::shared_timed_mutex> lock(mutex_); |
263 | 1.55k | table_to_partition_start_to_row_map_.erase(table_id); |
264 | | // Need to update the cache as the map has been modified. |
265 | 1.55k | update_cache_ = true; |
266 | 1.55k | } |
267 | | |
268 | 64.4k | bool HasRelevantPbChanges(const SysTabletsEntryPB& old_pb, const SysTabletsEntryPB& new_pb) { |
269 | 64.4k | if (old_pb.has_state() && new_pb.has_state() && old_pb.state() != new_pb.state()) { |
270 | 13.1k | return true; |
271 | 13.1k | } |
272 | 51.2k | if (old_pb.has_partition() && new_pb.has_partition() && |
273 | 51.2k | !pb_util::ArePBsEqual(old_pb.partition(), new_pb.partition(), /* diff_str */ nullptr)) { |
274 | 0 | return true; |
275 | 0 | } |
276 | 51.2k | if (old_pb.has_committed_consensus_state() && new_pb.has_committed_consensus_state() && |
277 | 51.2k | !pb_util::ArePBsEqual(old_pb.committed_consensus_state(), |
278 | 51.2k | new_pb.committed_consensus_state(), |
279 | 4.54k | /* diff_str */ nullptr)) { |
280 | 4.54k | return true; |
281 | 4.54k | } |
282 | 46.7k | return false; |
283 | 46.7k | } |
284 | | |
285 | | Result<std::vector<TabletInfoPtr>> YQLPartitionsVTable::FilterRelevantTablets( |
286 | 259k | const std::vector<TabletInfo*>& mutated_tablets) const { |
287 | 259k | std::vector<TabletInfoPtr> tablets; |
288 | 259k | if (!GeneratePartitionsVTableOnChanges()) { |
289 | 77 | return tablets; |
290 | 77 | } |
291 | | |
292 | 259k | for (const auto& mt : mutated_tablets) { |
293 | 131k | if (!IsYcqlTable(*mt->table())) { |
294 | 66.9k | continue; |
295 | 66.9k | } |
296 | | |
297 | 64.4k | if (HasRelevantPbChanges(mt->old_pb(), mt->new_pb())) { |
298 | 17.7k | tablets.push_back(mt); |
299 | 17.7k | } |
300 | 64.4k | } |
301 | 259k | return tablets; |
302 | 259k | } |
303 | | |
304 | | Status YQLPartitionsVTable::ProcessMutatedTablets( |
305 | | const std::vector<TabletInfoPtr>& mutated_tablets, |
306 | 17.7k | const std::map<TabletId, TabletInfo::WriteLock>& tablet_write_locks) const { |
307 | 17.7k | if (GeneratePartitionsVTableOnChanges() && !mutated_tablets.empty()) { |
308 | 17.7k | std::lock_guard<std::shared_timed_mutex> lock(mutex_); |
309 | 17.7k | RETURN_NOT_OK(ProcessTablets(mutated_tablets)); |
310 | 17.7k | } |
311 | | |
312 | 17.7k | return Status::OK(); |
313 | 17.7k | } |
314 | | |
315 | 5.55k | Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::GetTableFromMap() const { |
316 | 5.55k | if (update_cache_) { |
317 | 5.55k | auto vtable = std::make_shared<QLRowBlock>(*schema_); |
318 | | |
319 | 93.6k | for (const auto& partition_start_to_row_map : table_to_partition_start_to_row_map_) { |
320 | 123k | for (const auto& row : partition_start_to_row_map.second) { |
321 | 123k | RETURN_NOT_OK(vtable->AddRow(row.second)); |
322 | 123k | } |
323 | 93.6k | } |
324 | | |
325 | 5.55k | cache_ = vtable; |
326 | 5.55k | update_cache_ = false; |
327 | 5.55k | } |
328 | | |
329 | 5.55k | return cache_; |
330 | 5.55k | } |
331 | | |
332 | | bool YQLPartitionsVTable::CheckTableIsPresent( |
333 | 1.91k | const TableId& table_id, size_t expected_num_tablets) const { |
334 | 1.91k | SharedLock<std::shared_timed_mutex> read_lock(mutex_); |
335 | 1.91k | auto it = table_to_partition_start_to_row_map_.find(table_id); |
336 | 1.91k | return it != table_to_partition_start_to_row_map_.end() && |
337 | 1.91k | it->second.size() == expected_num_tablets; |
338 | 1.91k | } |
339 | | |
340 | 2.00k | void YQLPartitionsVTable::ResetAndRegenerateCache() const { |
341 | 2.00k | { |
342 | 2.00k | std::lock_guard<std::shared_timed_mutex> lock(mutex_); |
343 | 2.00k | cached_tablets_version_ = kInvalidCache; |
344 | 2.00k | cached_tablet_locations_version_ = kInvalidCache; |
345 | 2.00k | update_cache_ = true; |
346 | 2.00k | } |
347 | 2.00k | WARN_NOT_OK(ResultToStatus(GenerateAndCacheData()), |
348 | 2.00k | "Error while regenerating YQL system.partitions cache"); |
349 | 2.00k | } |
350 | | |
351 | 5.02k | Status YQLPartitionsVTable::UpdateCache() const { |
352 | 5.02k | { |
353 | 5.02k | SharedLock<std::shared_timed_mutex> read_lock(mutex_); |
354 | 5.02k | if (!update_cache_) { |
355 | 4.28k | return Status::OK(); |
356 | 4.28k | } |
357 | 743 | } |
358 | 743 | std::lock_guard<std::shared_timed_mutex> lock(mutex_); |
359 | 743 | return ResultToStatus(GetTableFromMap()); |
360 | 743 | } |
361 | | |
362 | 2.00k | Schema YQLPartitionsVTable::CreateSchema() const { |
363 | 2.00k | SchemaBuilder builder; |
364 | 2.00k | CHECK_OK(builder.AddHashKeyColumn(kKeyspaceName, QLType::Create(DataType::STRING))); |
365 | 2.00k | CHECK_OK(builder.AddKeyColumn(kTableName, QLType::Create(DataType::STRING))); |
366 | 2.00k | CHECK_OK(builder.AddKeyColumn(kStartKey, QLType::Create(DataType::BINARY))); |
367 | 2.00k | CHECK_OK(builder.AddColumn(kEndKey, QLType::Create(DataType::BINARY))); |
368 | 2.00k | CHECK_OK(builder.AddColumn(kId, QLType::Create(DataType::UUID))); |
369 | 2.00k | CHECK_OK(builder.AddColumn(kReplicaAddresses, |
370 | 2.00k | QLType::CreateTypeMap(DataType::INET, DataType::STRING))); |
371 | 2.00k | return builder.Build(); |
372 | 2.00k | } |
373 | | |
374 | | } // namespace master |
375 | | } // namespace yb |