YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/yql_partitions_vtable.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/yql_partitions_vtable.h"
15
16
#include "yb/common/ql_type.h"
17
#include "yb/common/ql_value.h"
18
#include "yb/common/schema.h"
19
20
#include "yb/master/catalog_entity_info.h"
21
#include "yb/master/catalog_manager_if.h"
22
#include "yb/master/master.h"
23
#include "yb/master/master_client.pb.h"
24
#include "yb/master/master_util.h"
25
26
#include "yb/rpc/messenger.h"
27
28
#include "yb/util/net/dns_resolver.h"
29
#include "yb/util/pb_util.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_log.h"
32
33
DECLARE_int32(partitions_vtable_cache_refresh_secs);
34
35
DEFINE_bool(use_cache_for_partitions_vtable, true,
36
            "Whether we should use caching for system.partitions table.");
37
38
DEFINE_bool(generate_partitions_vtable_on_changes, true,
39
            "Whether we should generate the system.partitions vtable whenever relevant partition "
40
            "changes occur.");
41
42
namespace yb {
43
namespace master {
44
45
namespace {
46
47
const std::string kKeyspaceName = "keyspace_name";
48
const std::string kTableName = "table_name";
49
const std::string kStartKey = "start_key";
50
const std::string kEndKey = "end_key";
51
const std::string kId = "id";
52
const std::string kReplicaAddresses = "replica_addresses";
53
54
}  // namespace
55
56
414k
bool YQLPartitionsVTable::GeneratePartitionsVTableWithBgTask() {
57
414k
  return FLAGS_partitions_vtable_cache_refresh_secs > 0 &&
58
414k
         
!FLAGS_generate_partitions_vtable_on_changes386
;
59
414k
}
60
61
741k
bool YQLPartitionsVTable::GeneratePartitionsVTableOnChanges() {
62
741k
  return FLAGS_generate_partitions_vtable_on_changes;
63
741k
}
64
65
YQLPartitionsVTable::YQLPartitionsVTable(const TableName& table_name,
66
                                         const NamespaceName& namespace_name,
67
                                         Master * const master)
68
3.00k
    : YQLVirtualTable(table_name, namespace_name, master, CreateSchema()) {
69
3.00k
}
70
71
Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::RetrieveData(
72
15.2k
    const QLReadRequestPB& request) const {
73
15.2k
  if (GeneratePartitionsVTableWithBgTask()) {
74
116
    SharedLock<std::shared_timed_mutex> read_lock(mutex_);
75
    // The cached versions are initialized to -1, so if there is a race, we may still generate the
76
    // cache on the calling thread.
77
116
    if (cached_tablets_version_ >= 0 && cached_tablet_locations_version_ >= 0) {
78
      // Don't need a version match here, since we have a bg task handling cache refreshing.
79
116
      return cache_;
80
116
    }
81
15.1k
  } else if (GeneratePartitionsVTableOnChanges()) {
82
15.1k
    bool require_full_vtable_reset = false;
83
15.1k
    {
84
15.1k
      SharedLock<std::shared_timed_mutex> read_lock(mutex_);
85
      // If we don't need to update the cache, then a read lock is enough.
86
15.1k
      if (!update_cache_) {
87
12.1k
        return cache_;
88
12.1k
      }
89
      // If we have just reset the table, then we need to do regenerate the entire vtable.
90
2.99k
      require_full_vtable_reset = cached_tablets_version_ == kInvalidCache ||
91
2.99k
                                  cached_tablet_locations_version_ == kInvalidCache;
92
2.99k
    }
93
2.99k
    if (!require_full_vtable_reset) {
94
2.99k
      std::lock_guard<std::shared_timed_mutex> lock(mutex_);
95
      // If we don't need to regenerate the entire vtable, then we can just update it using the map.
96
2.99k
      return GetTableFromMap();
97
2.99k
    }
98
2.99k
  }
99
100
0
  return GenerateAndCacheData();
101
15.2k
}
102
103
3.00k
Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::GenerateAndCacheData() const {
104
3.00k
  auto* catalog_manager = &this->catalog_manager();
105
3.00k
  {
106
3.00k
    SharedLock<std::shared_timed_mutex> read_lock(mutex_);
107
3.00k
    if (FLAGS_use_cache_for_partitions_vtable &&
108
3.00k
        catalog_manager->tablets_version() == cached_tablets_version_ &&
109
3.00k
        
catalog_manager->tablet_locations_version() == cached_tablet_locations_version_1
&&
110
3.00k
        
!update_cache_1
) {
111
      // Cache is up to date, so we could use it.
112
1
      return cache_;
113
1
    }
114
3.00k
  }
115
116
3.00k
  std::lock_guard<std::shared_timed_mutex> lock(mutex_);
117
3.00k
  auto new_tablets_version = catalog_manager->tablets_version();
118
3.00k
  auto new_tablet_locations_version = catalog_manager->tablet_locations_version();
119
3.00k
  {
120
3.00k
    if (FLAGS_use_cache_for_partitions_vtable &&
121
3.00k
        new_tablets_version == cached_tablets_version_ &&
122
3.00k
        
new_tablet_locations_version == cached_tablet_locations_version_0
) {
123
      // Cache was updated between locks, and now it is up to date.
124
0
      return GetTableFromMap();
125
0
    }
126
3.00k
  }
127
128
3.00k
  if (GeneratePartitionsVTableOnChanges() &&
129
3.00k
      
cached_tablets_version_ >= 03.00k
&&
130
3.00k
      
cached_tablet_locations_version_ >= 00
) {
131
    // Only need to generate on first call, all later calls can just return here (need write lock
132
    // in case of update_cache_).
133
0
    return GetTableFromMap();
134
0
  }
135
136
  // Fully regenerate the entire vtable.
137
3.00k
  table_to_partition_start_to_row_map_.clear();
138
3.00k
  update_cache_ = true;
139
140
3.00k
  auto tables = master_->catalog_manager()->GetTables(GetTablesMode::kVisibleToClient);
141
142
477k
  for (const scoped_refptr<TableInfo>& table : tables) {
143
    // Skip non-YQL tables.
144
477k
    if (!IsYcqlTable(*table)) {
145
429k
      continue;
146
429k
    }
147
148
48.1k
    TabletInfos tablet_infos = table->GetTablets();
149
48.1k
    RETURN_NOT_OK(ProcessTablets(tablet_infos));
150
48.1k
  }
151
152
  // Update cache and versions.
153
3.00k
  cached_tablets_version_ = new_tablets_version;
154
3.00k
  cached_tablet_locations_version_ = new_tablet_locations_version;
155
156
3.00k
  return GetTableFromMap();
157
3.00k
}
158
159
69.2k
Status YQLPartitionsVTable::ProcessTablets(const std::vector<TabletInfoPtr>& tablets) const {
160
69.2k
  if (tablets.empty()) {
161
0
    return Status::OK();
162
0
  }
163
164
69.2k
  google::protobuf::Arena arena;
165
69.2k
  DnsLookupMap dns_lookups;
166
69.2k
  std::vector<TabletData> tablet_data;
167
168
  // Get TabletData for each tablet.
169
69.6k
  for (const auto& t : tablets) {
170
69.6k
    tablet_data.push_back(VERIFY_RESULT(GetTabletData(t, &dns_lookups, &arena)));
171
69.6k
  }
172
173
  // Process all dns_lookups futures at the end.
174
69.2k
  std::unordered_map<std::string, InetAddress> dns_results;
175
175k
  for (auto& p : dns_lookups) {
176
175k
    const auto res = p.second.get();
177
175k
    if (!res.ok()) {
178
0
      YB_LOG_EVERY_N_SECS(WARNING, 30) << "Unable to resolve host: " << res;
179
175k
    } else {
180
175k
      dns_results.emplace(p.first, InetAddress(res.get()));
181
175k
    }
182
175k
  }
183
184
69.6k
  for (const auto& data : tablet_data) {
185
    // Skip not-found tablets: they might not be running yet or have been deleted.
186
69.6k
    if (data.locations->table_id().empty()) {
187
19
      continue;
188
19
    }
189
190
    // QLRow doesn't have default ctor, so need to emplace using std::piecewise_construct.
191
69.6k
    auto row = table_to_partition_start_to_row_map_[data.table_id].emplace(
192
69.6k
        std::piecewise_construct,
193
69.6k
        std::forward_as_tuple(data.locations->partition().partition_key_start()),
194
69.6k
        std::forward_as_tuple(std::make_shared<const Schema>(*schema_)));
195
69.6k
    RETURN_NOT_OK(InsertTabletIntoRowUnlocked(data, &row.first->second, dns_results));
196
    // Will need to update the cache as the map has been modified.
197
69.6k
    update_cache_ = true;
198
69.6k
  }
199
200
69.2k
  return Status::OK();
201
69.2k
}
202
203
Result<YQLPartitionsVTable::TabletData> YQLPartitionsVTable::GetTabletData(
204
    const scoped_refptr<TabletInfo>& tablet,
205
    DnsLookupMap* dns_lookups,
206
69.6k
    google::protobuf::Arena* arena) const {
207
69.6k
  auto data = TabletData {
208
69.6k
    .namespace_name = tablet->table()->namespace_name(),
209
69.6k
    .table_name = tablet->table()->name(),
210
69.6k
    .table_id = tablet->table()->id(),
211
69.6k
    .tablet_id = tablet->tablet_id(),
212
69.6k
    .locations = google::protobuf::Arena::Create<TabletLocationsPB>(arena),
213
69.6k
  };
214
215
69.6k
  auto s = master_->catalog_manager()->GetTabletLocations(tablet, data.locations);
216
69.6k
  if (!s.ok()) {
217
19
    data.locations->Clear();
218
19
  }
219
185k
  for (const auto& replica : data.locations->replicas()) {
220
185k
    auto host = DesiredHostPort(replica.ts_info(), CloudInfoPB()).host();
221
185k
    if (dns_lookups->count(host) == 0) {
222
175k
      dns_lookups->emplace(host, master_->messenger()->resolver().ResolveFuture(host));
223
175k
    }
224
185k
  }
225
69.6k
  return data;
226
69.6k
}
227
228
Status YQLPartitionsVTable::InsertTabletIntoRowUnlocked(
229
    const TabletData& tablet, QLRow* row,
230
69.6k
    const std::unordered_map<std::string, InetAddress>& dns_results) const {
231
69.6k
  RETURN_NOT_OK(SetColumnValue(kKeyspaceName, tablet.namespace_name, row));
232
69.6k
  RETURN_NOT_OK(SetColumnValue(kTableName, tablet.table_name, row));
233
234
69.6k
  const PartitionPB& partition = tablet.locations->partition();
235
69.6k
  RETURN_NOT_OK(SetColumnValue(kStartKey, partition.partition_key_start(), row));
236
69.6k
  RETURN_NOT_OK(SetColumnValue(kEndKey, partition.partition_key_end(), row));
237
238
  // Note: tablet id is in host byte order.
239
69.6k
  Uuid uuid;
240
69.6k
  RETURN_NOT_OK(uuid.FromHexString(tablet.tablet_id));
241
69.6k
  RETURN_NOT_OK(SetColumnValue(kId, uuid, row));
242
243
  // Get replicas for tablet.
244
69.6k
  QLValuePB replica_addresses;
245
69.6k
  QLMapValuePB *map_value = replica_addresses.mutable_map_value();
246
185k
  for (const auto& replica : tablet.locations->replicas()) {
247
185k
    auto host = DesiredHostPort(replica.ts_info(), CloudInfoPB()).host();
248
249
    // In case of resolution failure, we may not find the host in dns_results.
250
185k
    const auto addr = dns_results.find(host);
251
185k
    if (addr != dns_results.end()) {
252
185k
      QLValue::set_inetaddress_value(addr->second, map_value->add_keys());
253
185k
      map_value->add_values()->set_string_value(PeerRole_Name(replica.role()));
254
185k
    }
255
185k
  }
256
69.6k
  RETURN_NOT_OK(SetColumnValue(kReplicaAddresses, replica_addresses, row));
257
258
69.6k
  return Status::OK();
259
69.6k
}
260
261
1.63k
void YQLPartitionsVTable::RemoveFromCache(const TableId& table_id) const {
262
1.63k
  std::lock_guard<std::shared_timed_mutex> lock(mutex_);
263
1.63k
  table_to_partition_start_to_row_map_.erase(table_id);
264
  // Need to update the cache as the map has been modified.
265
1.63k
  update_cache_ = true;
266
1.63k
}
267
268
70.1k
bool HasRelevantPbChanges(const SysTabletsEntryPB& old_pb, const SysTabletsEntryPB& new_pb) {
269
70.1k
  if (old_pb.has_state() && 
new_pb.has_state()70.1k
&&
old_pb.state() != new_pb.state()70.1k
) {
270
13.7k
    return true;
271
13.7k
  }
272
56.3k
  if (old_pb.has_partition() && 
new_pb.has_partition()56.3k
&&
273
56.3k
      
!pb_util::ArePBsEqual(old_pb.partition(), new_pb.partition(), /* diff_str */ nullptr)56.3k
) {
274
0
    return true;
275
0
  }
276
56.3k
  if (old_pb.has_committed_consensus_state() && 
new_pb.has_committed_consensus_state()56.3k
&&
277
56.3k
      !pb_util::ArePBsEqual(old_pb.committed_consensus_state(),
278
56.3k
                            new_pb.committed_consensus_state(),
279
56.3k
                            /* diff_str */ nullptr)) {
280
7.34k
    return true;
281
7.34k
  }
282
49.0k
  return false;
283
56.3k
}
284
285
Result<std::vector<TabletInfoPtr>> YQLPartitionsVTable::FilterRelevantTablets(
286
454k
    const std::vector<TabletInfo*>& mutated_tablets) const {
287
454k
  std::vector<TabletInfoPtr> tablets;
288
454k
  if (!GeneratePartitionsVTableOnChanges()) {
289
57
    return tablets;
290
57
  }
291
292
454k
  for (const auto& mt : mutated_tablets) {
293
225k
    if (!IsYcqlTable(*mt->table())) {
294
155k
      continue;
295
155k
    }
296
297
70.1k
    if (HasRelevantPbChanges(mt->old_pb(), mt->new_pb())) {
298
21.1k
      tablets.push_back(mt);
299
21.1k
    }
300
70.1k
  }
301
454k
  return tablets;
302
454k
}
303
304
Status YQLPartitionsVTable::ProcessMutatedTablets(
305
    const std::vector<TabletInfoPtr>& mutated_tablets,
306
21.1k
    const std::map<TabletId, TabletInfo::WriteLock>& tablet_write_locks) const {
307
21.1k
  if (GeneratePartitionsVTableOnChanges() && 
!mutated_tablets.empty()21.1k
) {
308
21.1k
    std::lock_guard<std::shared_timed_mutex> lock(mutex_);
309
21.1k
    RETURN_NOT_OK(ProcessTablets(mutated_tablets));
310
21.1k
  }
311
312
21.1k
  return Status::OK();
313
21.1k
}
314
315
7.01k
Result<std::shared_ptr<QLRowBlock>> YQLPartitionsVTable::GetTableFromMap() const {
316
7.01k
  if (update_cache_) {
317
7.01k
    auto vtable = std::make_shared<QLRowBlock>(*schema_);
318
319
118k
    for (const auto& partition_start_to_row_map : table_to_partition_start_to_row_map_) {
320
153k
      for (const auto& row : partition_start_to_row_map.second) {
321
153k
        RETURN_NOT_OK(vtable->AddRow(row.second));
322
153k
      }
323
118k
    }
324
325
7.01k
    cache_ = vtable;
326
7.01k
    update_cache_ = false;
327
7.01k
  }
328
329
7.01k
  return cache_;
330
7.01k
}
331
332
bool YQLPartitionsVTable::CheckTableIsPresent(
333
2.01k
    const TableId& table_id, size_t expected_num_tablets) const {
334
2.01k
  SharedLock<std::shared_timed_mutex> read_lock(mutex_);
335
2.01k
  auto it = table_to_partition_start_to_row_map_.find(table_id);
336
2.01k
  return it != table_to_partition_start_to_row_map_.end() &&
337
2.01k
         it->second.size() == expected_num_tablets;
338
2.01k
}
339
340
3.00k
void YQLPartitionsVTable::ResetAndRegenerateCache() const {
341
3.00k
  {
342
3.00k
    std::lock_guard<std::shared_timed_mutex> lock(mutex_);
343
3.00k
    cached_tablets_version_ = kInvalidCache;
344
3.00k
    cached_tablet_locations_version_ = kInvalidCache;
345
3.00k
    update_cache_ = true;
346
3.00k
  }
347
3.00k
  WARN_NOT_OK(ResultToStatus(GenerateAndCacheData()),
348
3.00k
              "Error while regenerating YQL system.partitions cache");
349
3.00k
}
350
351
153k
Status YQLPartitionsVTable::UpdateCache() const {
352
153k
  {
353
153k
    SharedLock<std::shared_timed_mutex> read_lock(mutex_);
354
153k
    if (!update_cache_) {
355
152k
      return Status::OK();
356
152k
    }
357
153k
  }
358
1.00k
  std::lock_guard<std::shared_timed_mutex> lock(mutex_);
359
1.00k
  return ResultToStatus(GetTableFromMap());
360
153k
}
361
362
3.00k
Schema YQLPartitionsVTable::CreateSchema() const {
363
3.00k
  SchemaBuilder builder;
364
3.00k
  CHECK_OK(builder.AddHashKeyColumn(kKeyspaceName, QLType::Create(DataType::STRING)));
365
3.00k
  CHECK_OK(builder.AddKeyColumn(kTableName, QLType::Create(DataType::STRING)));
366
3.00k
  CHECK_OK(builder.AddKeyColumn(kStartKey, QLType::Create(DataType::BINARY)));
367
3.00k
  CHECK_OK(builder.AddColumn(kEndKey, QLType::Create(DataType::BINARY)));
368
3.00k
  CHECK_OK(builder.AddColumn(kId, QLType::Create(DataType::UUID)));
369
3.00k
  CHECK_OK(builder.AddColumn(kReplicaAddresses,
370
3.00k
                             QLType::CreateTypeMap(DataType::INET, DataType::STRING)));
371
3.00k
  return builder.Build();
372
3.00k
}
373
374
}  // namespace master
375
}  // namespace yb