YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/yql_partitions_vtable.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/yql_partitions_vtable.h"
15
16
#include "yb/common/ql_type.h"
17
#include "yb/common/ql_value.h"
18
#include "yb/common/schema.h"
19
20
#include "yb/master/catalog_entity_info.h"
21
#include "yb/master/catalog_manager_if.h"
22
#include "yb/master/master.h"
23
#include "yb/master/master_client.pb.h"
24
#include "yb/master/master_util.h"
25
26
#include "yb/rpc/messenger.h"
27
28
#include "yb/util/net/dns_resolver.h"
29
#include "yb/util/pb_util.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_log.h"
32
33
DECLARE_int32(partitions_vtable_cache_refresh_secs);
34
35
DEFINE_bool(use_cache_for_partitions_vtable, true,
36
            "Whether we should use caching for system.partitions table.");
37
38
DEFINE_bool(generate_partitions_vtable_on_changes, true,
39
            "Whether we should generate the system.partitions vtable whenever relevant partition "
40
            "changes occur.");
41
42
namespace yb {
43
namespace master {
44
45
namespace {
46
47
const std::string kKeyspaceName = "keyspace_name";
48
const std::string kTableName = "table_name";
49
const std::string kStartKey = "start_key";
50
const std::string kEndKey = "end_key";
51
const std::string kId = "id";
52
const std::string kReplicaAddresses = "replica_addresses";
53
54
}  // namespace
55
56
35.2k
bool YQLPartitionsVTable::GeneratePartitionsVTableWithBgTask() {
57
35.2k
  return FLAGS_partitions_vtable_cache_refresh_secs > 0 &&
58
334
         !FLAGS_generate_partitions_vtable_on_changes;
59
35.2k
}
60
61
311k
bool YQLPartitionsVTable::GeneratePartitionsVTableOnChanges() {
62
311k
  return FLAGS_generate_partitions_vtable_on_changes;
63
311k
}
64
65
YQLPartitionsVTable::YQLPartitionsVTable(const TableName& table_name,
66
                                         const NamespaceName& namespace_name,
67
                                         Master * const master)
68
2.00k
    : YQLVirtualTable(table_name, namespace_name, master, CreateSchema()) {
69
2.00k
}
70
71
Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::RetrieveData(
72
14.2k
    const QLReadRequestPB& request) const {
73
14.2k
  if (GeneratePartitionsVTableWithBgTask()) {
74
105
    SharedLock<std::shared_timed_mutex> read_lock(mutex_);
75
    // The cached versions are initialized to -1, so if there is a race, we may still generate the
76
    // cache on the calling thread.
77
105
    if (cached_tablets_version_ >= 0 && cached_tablet_locations_version_ >= 0) {
78
      // Don't need a version match here, since we have a bg task handling cache refreshing.
79
105
      return cache_;
80
105
    }
81
14.1k
  } else if (GeneratePartitionsVTableOnChanges()) {
82
14.1k
    bool require_full_vtable_reset = false;
83
14.1k
    {
84
14.1k
      SharedLock<std::shared_timed_mutex> read_lock(mutex_);
85
      // If we don't need to update the cache, then a read lock is enough.
86
14.1k
      if (!update_cache_) {
87
11.3k
        return cache_;
88
11.3k
      }
89
      // If we have just reset the table, then we need to do regenerate the entire vtable.
90
2.80k
      require_full_vtable_reset = cached_tablets_version_ == kInvalidCache ||
91
2.80k
                                  cached_tablet_locations_version_ == kInvalidCache;
92
2.80k
    }
93
2.80k
    if (!require_full_vtable_reset) {
94
2.80k
      std::lock_guard<std::shared_timed_mutex> lock(mutex_);
95
      // If we don't need to regenerate the entire vtable, then we can just update it using the map.
96
2.80k
      return GetTableFromMap();
97
2.80k
    }
98
1
  }
99
100
1
  return GenerateAndCacheData();
101
1
}
102
103
2.01k
Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::GenerateAndCacheData() const {
104
2.01k
  auto* catalog_manager = &this->catalog_manager();
105
2.01k
  {
106
2.01k
    SharedLock<std::shared_timed_mutex> read_lock(mutex_);
107
2.01k
    if (FLAGS_use_cache_for_partitions_vtable &&
108
2.01k
        catalog_manager->tablets_version() == cached_tablets_version_ &&
109
1
        catalog_manager->tablet_locations_version() == cached_tablet_locations_version_ &&
110
1
        !update_cache_) {
111
      // Cache is up to date, so we could use it.
112
1
      return cache_;
113
1
    }
114
2.00k
  }
115
116
2.00k
  std::lock_guard<std::shared_timed_mutex> lock(mutex_);
117
2.00k
  auto new_tablets_version = catalog_manager->tablets_version();
118
2.00k
  auto new_tablet_locations_version = catalog_manager->tablet_locations_version();
119
2.00k
  {
120
2.00k
    if (FLAGS_use_cache_for_partitions_vtable &&
121
2.00k
        new_tablets_version == cached_tablets_version_ &&
122
0
        new_tablet_locations_version == cached_tablet_locations_version_) {
123
      // Cache was updated between locks, and now it is up to date.
124
0
      return GetTableFromMap();
125
0
    }
126
2.00k
  }
127
128
2.00k
  if (GeneratePartitionsVTableOnChanges() &&
129
2.00k
      cached_tablets_version_ >= 0 &&
130
0
      cached_tablet_locations_version_ >= 0) {
131
    // Only need to generate on first call, all later calls can just return here (need write lock
132
    // in case of update_cache_).
133
0
    return GetTableFromMap();
134
0
  }
135
136
  // Fully regenerate the entire vtable.
137
2.00k
  table_to_partition_start_to_row_map_.clear();
138
2.00k
  update_cache_ = true;
139
140
2.00k
  auto tables = master_->catalog_manager()->GetTables(GetTablesMode::kVisibleToClient);
141
142
237k
  for (const scoped_refptr<TableInfo>& table : tables) {
143
    // Skip non-YQL tables.
144
237k
    if (!IsYcqlTable(*table)) {
145
205k
      continue;
146
205k
    }
147
148
32.1k
    TabletInfos tablet_infos = table->GetTablets();
149
32.1k
    RETURN_NOT_OK(ProcessTablets(tablet_infos));
150
32.1k
  }
151
152
  // Update cache and versions.
153
2.00k
  cached_tablets_version_ = new_tablets_version;
154
2.00k
  cached_tablet_locations_version_ = new_tablet_locations_version;
155
156
2.00k
  return GetTableFromMap();
157
2.00k
}
158
159
49.9k
Status YQLPartitionsVTable::ProcessTablets(const std::vector<TabletInfoPtr>& tablets) const {
160
49.9k
  if (tablets.empty()) {
161
0
    return Status::OK();
162
0
  }
163
164
49.9k
  google::protobuf::Arena arena;
165
49.9k
  DnsLookupMap dns_lookups;
166
49.9k
  std::vector<TabletData> tablet_data;
167
168
  // Get TabletData for each tablet.
169
50.1k
  for (const auto& t : tablets) {
170
50.1k
    tablet_data.push_back(VERIFY_RESULT(GetTabletData(t, &dns_lookups, &arena)));
171
50.1k
  }
172
173
  // Process all dns_lookups futures at the end.
174
49.9k
  std::unordered_map<std::string, InetAddress> dns_results;
175
132k
  for (auto& p : dns_lookups) {
176
132k
    const auto res = p.second.get();
177
132k
    if (!res.ok()) {
178
0
      YB_LOG_EVERY_N_SECS(WARNING, 30) << "Unable to resolve host: " << res;
179
132k
    } else {
180
132k
      dns_results.emplace(p.first, InetAddress(res.get()));
181
132k
    }
182
132k
  }
183
184
50.1k
  for (const auto& data : tablet_data) {
185
    // Skip not-found tablets: they might not be running yet or have been deleted.
186
50.1k
    if (data.locations->table_id().empty()) {
187
24
      continue;
188
24
    }
189
190
    // QLRow doesn't have default ctor, so need to emplace using std::piecewise_construct.
191
50.0k
    auto row = table_to_partition_start_to_row_map_[data.table_id].emplace(
192
50.0k
        std::piecewise_construct,
193
50.0k
        std::forward_as_tuple(data.locations->partition().partition_key_start()),
194
50.0k
        std::forward_as_tuple(std::make_shared<const Schema>(*schema_)));
195
50.0k
    RETURN_NOT_OK(InsertTabletIntoRowUnlocked(data, &row.first->second, dns_results));
196
    // Will need to update the cache as the map has been modified.
197
50.0k
    update_cache_ = true;
198
50.0k
  }
199
200
49.9k
  return Status::OK();
201
49.9k
}
202
203
Result<YQLPartitionsVTable::TabletData> YQLPartitionsVTable::GetTabletData(
204
    const scoped_refptr<TabletInfo>& tablet,
205
    DnsLookupMap* dns_lookups,
206
50.1k
    google::protobuf::Arena* arena) const {
207
50.1k
  auto data = TabletData {
208
50.1k
    .namespace_name = tablet->table()->namespace_name(),
209
50.1k
    .table_name = tablet->table()->name(),
210
50.1k
    .table_id = tablet->table()->id(),
211
50.1k
    .tablet_id = tablet->tablet_id(),
212
50.1k
    .locations = google::protobuf::Arena::Create<TabletLocationsPB>(arena),
213
50.1k
  };
214
215
50.1k
  auto s = master_->catalog_manager()->GetTabletLocations(tablet, data.locations);
216
50.1k
  if (!s.ok()) {
217
24
    data.locations->Clear();
218
24
  }
219
137k
  for (const auto& replica : data.locations->replicas()) {
220
137k
    auto host = DesiredHostPort(replica.ts_info(), CloudInfoPB()).host();
221
137k
    if (dns_lookups->count(host) == 0) {
222
132k
      dns_lookups->emplace(host, master_->messenger()->resolver().ResolveFuture(host));
223
132k
    }
224
137k
  }
225
50.1k
  return data;
226
50.1k
}
227
228
Status YQLPartitionsVTable::InsertTabletIntoRowUnlocked(
229
    const TabletData& tablet, QLRow* row,
230
50.0k
    const std::unordered_map<std::string, InetAddress>& dns_results) const {
231
50.0k
  RETURN_NOT_OK(SetColumnValue(kKeyspaceName, tablet.namespace_name, row));
232
50.0k
  RETURN_NOT_OK(SetColumnValue(kTableName, tablet.table_name, row));
233
234
50.0k
  const PartitionPB& partition = tablet.locations->partition();
235
50.0k
  RETURN_NOT_OK(SetColumnValue(kStartKey, partition.partition_key_start(), row));
236
50.0k
  RETURN_NOT_OK(SetColumnValue(kEndKey, partition.partition_key_end(), row));
237
238
  // Note: tablet id is in host byte order.
239
50.0k
  Uuid uuid;
240
50.0k
  RETURN_NOT_OK(uuid.FromHexString(tablet.tablet_id));
241
50.0k
  RETURN_NOT_OK(SetColumnValue(kId, uuid, row));
242
243
  // Get replicas for tablet.
244
50.0k
  QLValuePB replica_addresses;
245
50.0k
  QLMapValuePB *map_value = replica_addresses.mutable_map_value();
246
137k
  for (const auto& replica : tablet.locations->replicas()) {
247
137k
    auto host = DesiredHostPort(replica.ts_info(), CloudInfoPB()).host();
248
249
    // In case of resolution failure, we may not find the host in dns_results.
250
137k
    const auto addr = dns_results.find(host);
251
137k
    if (addr != dns_results.end()) {
252
137k
      QLValue::set_inetaddress_value(addr->second, map_value->add_keys());
253
137k
      map_value->add_values()->set_string_value(PeerRole_Name(replica.role()));
254
137k
    }
255
137k
  }
256
50.0k
  RETURN_NOT_OK(SetColumnValue(kReplicaAddresses, replica_addresses, row));
257
258
50.0k
  return Status::OK();
259
50.0k
}
260
261
1.55k
void YQLPartitionsVTable::RemoveFromCache(const TableId& table_id) const {
262
1.55k
  std::lock_guard<std::shared_timed_mutex> lock(mutex_);
263
1.55k
  table_to_partition_start_to_row_map_.erase(table_id);
264
  // Need to update the cache as the map has been modified.
265
1.55k
  update_cache_ = true;
266
1.55k
}
267
268
64.4k
bool HasRelevantPbChanges(const SysTabletsEntryPB& old_pb, const SysTabletsEntryPB& new_pb) {
269
64.4k
  if (old_pb.has_state() && new_pb.has_state() && old_pb.state() != new_pb.state()) {
270
13.1k
    return true;
271
13.1k
  }
272
51.2k
  if (old_pb.has_partition() && new_pb.has_partition() &&
273
51.2k
      !pb_util::ArePBsEqual(old_pb.partition(), new_pb.partition(), /* diff_str */ nullptr)) {
274
0
    return true;
275
0
  }
276
51.2k
  if (old_pb.has_committed_consensus_state() && new_pb.has_committed_consensus_state() &&
277
51.2k
      !pb_util::ArePBsEqual(old_pb.committed_consensus_state(),
278
51.2k
                            new_pb.committed_consensus_state(),
279
4.54k
                            /* diff_str */ nullptr)) {
280
4.54k
    return true;
281
4.54k
  }
282
46.7k
  return false;
283
46.7k
}
284
285
Result<std::vector<TabletInfoPtr>> YQLPartitionsVTable::FilterRelevantTablets(
286
259k
    const std::vector<TabletInfo*>& mutated_tablets) const {
287
259k
  std::vector<TabletInfoPtr> tablets;
288
259k
  if (!GeneratePartitionsVTableOnChanges()) {
289
77
    return tablets;
290
77
  }
291
292
259k
  for (const auto& mt : mutated_tablets) {
293
131k
    if (!IsYcqlTable(*mt->table())) {
294
66.9k
      continue;
295
66.9k
    }
296
297
64.4k
    if (HasRelevantPbChanges(mt->old_pb(), mt->new_pb())) {
298
17.7k
      tablets.push_back(mt);
299
17.7k
    }
300
64.4k
  }
301
259k
  return tablets;
302
259k
}
303
304
Status YQLPartitionsVTable::ProcessMutatedTablets(
305
    const std::vector<TabletInfoPtr>& mutated_tablets,
306
17.7k
    const std::map<TabletId, TabletInfo::WriteLock>& tablet_write_locks) const {
307
17.7k
  if (GeneratePartitionsVTableOnChanges() && !mutated_tablets.empty()) {
308
17.7k
    std::lock_guard<std::shared_timed_mutex> lock(mutex_);
309
17.7k
    RETURN_NOT_OK(ProcessTablets(mutated_tablets));
310
17.7k
  }
311
312
17.7k
  return Status::OK();
313
17.7k
}
314
315
5.55k
Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::GetTableFromMap() const {
316
5.55k
  if (update_cache_) {
317
5.55k
    auto vtable = std::make_shared<QLRowBlock>(*schema_);
318
319
93.6k
    for (const auto& partition_start_to_row_map : table_to_partition_start_to_row_map_) {
320
123k
      for (const auto& row : partition_start_to_row_map.second) {
321
123k
        RETURN_NOT_OK(vtable->AddRow(row.second));
322
123k
      }
323
93.6k
    }
324
325
5.55k
    cache_ = vtable;
326
5.55k
    update_cache_ = false;
327
5.55k
  }
328
329
5.55k
  return cache_;
330
5.55k
}
331
332
bool YQLPartitionsVTable::CheckTableIsPresent(
333
1.91k
    const TableId& table_id, size_t expected_num_tablets) const {
334
1.91k
  SharedLock<std::shared_timed_mutex> read_lock(mutex_);
335
1.91k
  auto it = table_to_partition_start_to_row_map_.find(table_id);
336
1.91k
  return it != table_to_partition_start_to_row_map_.end() &&
337
1.91k
         it->second.size() == expected_num_tablets;
338
1.91k
}
339
340
2.00k
void YQLPartitionsVTable::ResetAndRegenerateCache() const {
341
2.00k
  {
342
2.00k
    std::lock_guard<std::shared_timed_mutex> lock(mutex_);
343
2.00k
    cached_tablets_version_ = kInvalidCache;
344
2.00k
    cached_tablet_locations_version_ = kInvalidCache;
345
2.00k
    update_cache_ = true;
346
2.00k
  }
347
2.00k
  WARN_NOT_OK(ResultToStatus(GenerateAndCacheData()),
348
2.00k
              "Error while regenerating YQL system.partitions cache");
349
2.00k
}
350
351
5.02k
Status YQLPartitionsVTable::UpdateCache() const {
352
5.02k
  {
353
5.02k
    SharedLock<std::shared_timed_mutex> read_lock(mutex_);
354
5.02k
    if (!update_cache_) {
355
4.28k
      return Status::OK();
356
4.28k
    }
357
743
  }
358
743
  std::lock_guard<std::shared_timed_mutex> lock(mutex_);
359
743
  return ResultToStatus(GetTableFromMap());
360
743
}
361
362
2.00k
Schema YQLPartitionsVTable::CreateSchema() const {
363
2.00k
  SchemaBuilder builder;
364
2.00k
  CHECK_OK(builder.AddHashKeyColumn(kKeyspaceName, QLType::Create(DataType::STRING)));
365
2.00k
  CHECK_OK(builder.AddKeyColumn(kTableName, QLType::Create(DataType::STRING)));
366
2.00k
  CHECK_OK(builder.AddKeyColumn(kStartKey, QLType::Create(DataType::BINARY)));
367
2.00k
  CHECK_OK(builder.AddColumn(kEndKey, QLType::Create(DataType::BINARY)));
368
2.00k
  CHECK_OK(builder.AddColumn(kId, QLType::Create(DataType::UUID)));
369
2.00k
  CHECK_OK(builder.AddColumn(kReplicaAddresses,
370
2.00k
                             QLType::CreateTypeMap(DataType::INET, DataType::STRING)));
371
2.00k
  return builder.Build();
372
2.00k
}
373
374
}  // namespace master
375
}  // namespace yb