YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/ts_descriptor.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/master/ts_descriptor.h"
34
35
#include <vector>
36
37
#include "yb/common/common.pb.h"
38
#include "yb/common/wire_protocol.h"
39
#include "yb/common/wire_protocol.pb.h"
40
41
#include "yb/master/master_fwd.h"
42
#include "yb/master/catalog_manager_util.h"
43
#include "yb/master/master_cluster.pb.h"
44
#include "yb/master/master_heartbeat.pb.h"
45
46
#include "yb/util/atomic.h"
47
#include "yb/util/flag_tags.h"
48
#include "yb/util/status_format.h"
49
50
DEFINE_int32(tserver_unresponsive_timeout_ms, 60 * 1000,
51
             "The period of time that a Master can go without receiving a heartbeat from a "
52
             "tablet server before considering it unresponsive. Unresponsive servers are not "
53
             "selected when assigning replicas during table creation or re-replication.");
54
TAG_FLAG(tserver_unresponsive_timeout_ms, advanced);
55
56
57
namespace yb {
58
namespace master {
59
60
Result<TSDescriptorPtr> TSDescriptor::RegisterNew(
61
    const NodeInstancePB& instance,
62
    const TSRegistrationPB& registration,
63
    CloudInfoPB local_cloud_info,
64
    rpc::ProxyCache* proxy_cache,
65
5.53k
    RegisteredThroughHeartbeat registered_through_heartbeat) {
66
5.53k
  auto result = std::make_shared<enterprise::TSDescriptor>(
67
5.53k
      instance.permanent_uuid());
68
5.53k
  if (!registered_through_heartbeat) {
69
    // This tserver hasn't actually heartbeated, so register using a last_heartbeat_ time of 0.
70
46
    std::lock_guard<decltype(result->lock_)> l(result->lock_);
71
46
    result->last_heartbeat_ = MonoTime::kMin;
72
46
    result->registered_through_heartbeat_ = RegisteredThroughHeartbeat::kFalse;
73
46
  }
74
5.53k
  RETURN_NOT_OK(result->Register(instance, registration, std::move(local_cloud_info), proxy_cache));
75
5.53k
  return std::move(result);
76
5.53k
}
77
78
TSDescriptor::TSDescriptor(std::string perm_id)
79
    : permanent_uuid_(std::move(perm_id)),
80
      last_heartbeat_(MonoTime::Now()),
81
      has_tablet_report_(false),
82
      recent_replica_creations_(0),
83
      last_replica_creations_decay_(MonoTime::Now()),
84
5.64k
      num_live_replicas_(0) {
85
5.64k
}
86
87
126
TSDescriptor::~TSDescriptor() {
88
126
}
89
90
Status TSDescriptor::Register(const NodeInstancePB& instance,
91
                              const TSRegistrationPB& registration,
92
                              CloudInfoPB local_cloud_info,
93
5.75k
                              rpc::ProxyCache* proxy_cache) {
94
5.75k
  std::lock_guard<decltype(lock_)> l(lock_);
95
5.75k
  return RegisterUnlocked(instance, registration, std::move(local_cloud_info), proxy_cache);
96
5.75k
}
97
98
Status TSDescriptor::RegisterUnlocked(
99
    const NodeInstancePB& instance,
100
    const TSRegistrationPB& registration,
101
    CloudInfoPB local_cloud_info,
102
5.75k
    rpc::ProxyCache* proxy_cache) {
103
5.75k
  CHECK_EQ(instance.permanent_uuid(), permanent_uuid_);
104
105
5.75k
  int64_t latest_seqno = ts_information_
106
116
      ? ts_information_->tserver_instance().instance_seqno()
107
5.63k
      : -1;
108
5.75k
  if (instance.instance_seqno() < latest_seqno) {
109
9
    return STATUS(AlreadyPresent,
110
9
      strings::Substitute("Cannot register with sequence number $0:"
111
9
                          " Already have a registration from sequence number $1",
112
9
                          instance.instance_seqno(),
113
9
                          latest_seqno));
114
5.74k
  } else if (instance.instance_seqno() == latest_seqno) {
115
    // It's possible that the TS registered, but our response back to it
116
    // got lost, so it's trying to register again with the same sequence
117
    // number. That's fine.
118
1
    LOG(INFO) << "Processing retry of TS registration from " << instance.ShortDebugString();
119
1
  }
120
121
5.74k
  latest_seqno = instance.instance_seqno();
122
  // After re-registering, make the TS re-report its tablets.
123
5.74k
  has_tablet_report_ = false;
124
125
5.74k
  ts_information_ = std::make_shared<TSInformationPB>();
126
5.74k
  ts_information_->mutable_registration()->CopyFrom(registration);
127
5.74k
  ts_information_->mutable_tserver_instance()->set_permanent_uuid(permanent_uuid_);
128
5.74k
  ts_information_->mutable_tserver_instance()->set_instance_seqno(latest_seqno);
129
130
5.74k
  placement_id_ = generate_placement_id(registration.common().cloud_info());
131
132
5.74k
  proxies_.reset();
133
134
5.74k
  placement_uuid_ = "";
135
5.74k
  if (registration.common().has_placement_uuid()) {
136
5.65k
    placement_uuid_ = registration.common().placement_uuid();
137
5.65k
  }
138
5.74k
  local_cloud_info_ = std::move(local_cloud_info);
139
5.74k
  proxy_cache_ = proxy_cache;
140
141
5.74k
  capabilities_.clear();
142
5.74k
  capabilities_.insert(registration.capabilities().begin(), registration.capabilities().end());
143
144
5.74k
  return Status::OK();
145
5.75k
}
146
147
3.89M
std::string TSDescriptor::placement_uuid() const {
148
3.89M
  SharedLock<decltype(lock_)> l(lock_);
149
3.89M
  return placement_uuid_;
150
3.89M
}
151
152
145k
std::string TSDescriptor::generate_placement_id(const CloudInfoPB& ci) {
153
145k
  return strings::Substitute(
154
145k
      "$0:$1:$2", ci.placement_cloud(), ci.placement_region(), ci.placement_zone());
155
145k
}
156
157
45
std::string TSDescriptor::placement_id() const {
158
45
  SharedLock<decltype(lock_)> l(lock_);
159
45
  return placement_id_;
160
45
}
161
162
383k
void TSDescriptor::UpdateHeartbeat(const TSHeartbeatRequestPB* req) {
163
383k
  DCHECK_GE(req->num_live_tablets(), 0);
164
383k
  DCHECK_GE(req->leader_count(), 0);
165
383k
  {
166
383k
    std::lock_guard<decltype(lock_)> l(lock_);
167
383k
    last_heartbeat_ = MonoTime::Now();
168
383k
    num_live_replicas_ = req->num_live_tablets();
169
383k
    leader_count_ = req->leader_count();
170
383k
    physical_time_ = req->ts_physical_time();
171
383k
    hybrid_time_ = HybridTime::FromPB(req->ts_hybrid_time());
172
383k
    heartbeat_rtt_ = MonoDelta::FromMicroseconds(req->rtt_us());
173
383k
  }
174
383k
}
175
176
7.32M
MonoDelta TSDescriptor::TimeSinceHeartbeat() const {
177
7.32M
  MonoTime now(MonoTime::Now());
178
7.32M
  SharedLock<decltype(lock_)> l(lock_);
179
7.32M
  return now.GetDeltaSince(last_heartbeat_);
180
7.32M
}
181
182
383k
int64_t TSDescriptor::latest_seqno() const {
183
383k
  SharedLock<decltype(lock_)> l(lock_);
184
383k
  return ts_information_->tserver_instance().instance_seqno();
185
383k
}
186
187
5.02M
bool TSDescriptor::has_tablet_report() const {
188
5.02M
  SharedLock<decltype(lock_)> l(lock_);
189
5.02M
  return has_tablet_report_;
190
5.02M
}
191
192
5.58k
void TSDescriptor::set_has_tablet_report(bool has_report) {
193
5.58k
  std::lock_guard<decltype(lock_)> l(lock_);
194
5.58k
  has_tablet_report_ = has_report;
195
5.58k
}
196
197
378k
bool TSDescriptor::registered_through_heartbeat() const {
198
378k
  SharedLock<decltype(lock_)> l(lock_);
199
378k
  return registered_through_heartbeat_;
200
378k
}
201
202
82.1k
void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
203
  // In most cases, we won't have any recent replica creations, so
204
  // we don't need to bother calling the clock, etc.
205
82.1k
  if (recent_replica_creations_ == 0) return;
206
207
78.1k
  const double kHalflifeSecs = 60;
208
78.1k
  MonoTime now = MonoTime::Now();
209
78.1k
  double secs_since_last_decay = now.GetDeltaSince(last_replica_creations_decay_).ToSeconds();
210
78.1k
  recent_replica_creations_ *= pow(0.5, secs_since_last_decay / kHalflifeSecs);
211
212
  // If sufficiently small, reset down to 0 to take advantage of the fast path above.
213
78.1k
  if (recent_replica_creations_ < 1e-12) {
214
0
    recent_replica_creations_ = 0;
215
0
  }
216
78.1k
  last_replica_creations_decay_ = now;
217
78.1k
}
218
219
82.1k
void TSDescriptor::IncrementRecentReplicaCreations() {
220
82.1k
  std::lock_guard<decltype(lock_)> l(lock_);
221
82.1k
  DecayRecentReplicaCreationsUnlocked();
222
82.1k
  recent_replica_creations_ += 1;
223
82.1k
}
224
225
3
double TSDescriptor::RecentReplicaCreations() {
226
3
  std::lock_guard<decltype(lock_)> l(lock_);
227
3
  DecayRecentReplicaCreationsUnlocked();
228
3
  return recent_replica_creations_;
229
3
}
230
231
715k
TSRegistrationPB TSDescriptor::GetRegistration() const {
232
715k
  SharedLock<decltype(lock_)> l(lock_);
233
715k
  return ts_information_->registration();
234
715k
}
235
236
2.22M
const std::shared_ptr<TSInformationPB> TSDescriptor::GetTSInformationPB() const {
237
2.22M
  SharedLock<decltype(lock_)> l(lock_);
238
421
  CHECK(ts_information_) << "No stored information";
239
2.22M
  return ts_information_;
240
2.22M
}
241
242
173k
bool TSDescriptor::MatchesCloudInfo(const CloudInfoPB& cloud_info) const {
243
173k
  SharedLock<decltype(lock_)> l(lock_);
244
173k
  const auto& ts_ci = ts_information_->registration().common().cloud_info();
245
246
  // cloud_info should be a prefix of ts_ci.
247
173k
  return CatalogManagerUtil::IsCloudInfoPrefix(cloud_info, ts_ci);
248
173k
}
249
250
26.2k
CloudInfoPB TSDescriptor::GetCloudInfo() const {
251
26.2k
  SharedLock<decltype(lock_)> l(lock_);
252
26.2k
  return ts_information_->registration().common().cloud_info();
253
26.2k
}
254
255
template<typename Lambda>
256
631k
bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const {
257
631k
  TSRegistrationPB reg = GetRegistration();
258
631k
  if (std::find_if(reg.common().private_rpc_addresses().begin(),
259
631k
                   reg.common().private_rpc_addresses().end(),
260
32.2k
                   predicate) != reg.common().private_rpc_addresses().end()) {
261
32.2k
    return true;
262
32.2k
  }
263
599k
  if (std::find_if(reg.common().broadcast_addresses().begin(),
264
599k
                   reg.common().broadcast_addresses().end(),
265
0
                   predicate) != reg.common().broadcast_addresses().end()) {
266
0
    return true;
267
0
  }
268
599k
  return false;
269
599k
}
ts_descriptor.cc:_ZNK2yb6master12TSDescriptor21DoesRegistrationMatchIZNKS1_13IsBlacklistedERKNSt3__113unordered_setINS_8HostPortENS_12HostPortHashENS3_8equal_toIS5_EENS3_9allocatorIS5_EEEEE3$_0EEbT_
Line
Count
Source
256
444k
bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const {
257
444k
  TSRegistrationPB reg = GetRegistration();
258
444k
  if (std::find_if(reg.common().private_rpc_addresses().begin(),
259
444k
                   reg.common().private_rpc_addresses().end(),
260
104
                   predicate) != reg.common().private_rpc_addresses().end()) {
261
104
    return true;
262
104
  }
263
444k
  if (std::find_if(reg.common().broadcast_addresses().begin(),
264
444k
                   reg.common().broadcast_addresses().end(),
265
0
                   predicate) != reg.common().broadcast_addresses().end()) {
266
0
    return true;
267
0
  }
268
444k
  return false;
269
444k
}
ts_descriptor.cc:_ZNK2yb6master12TSDescriptor21DoesRegistrationMatchIZNKS1_11IsRunningOnERKNS_10HostPortPBEE3$_1EEbT_
Line
Count
Source
256
187k
bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const {
257
187k
  TSRegistrationPB reg = GetRegistration();
258
187k
  if (std::find_if(reg.common().private_rpc_addresses().begin(),
259
187k
                   reg.common().private_rpc_addresses().end(),
260
32.1k
                   predicate) != reg.common().private_rpc_addresses().end()) {
261
32.1k
    return true;
262
32.1k
  }
263
154k
  if (std::find_if(reg.common().broadcast_addresses().begin(),
264
154k
                   reg.common().broadcast_addresses().end(),
265
0
                   predicate) != reg.common().broadcast_addresses().end()) {
266
0
    return true;
267
0
  }
268
154k
  return false;
269
154k
}
270
271
444k
bool TSDescriptor::IsBlacklisted(const BlacklistSet& blacklist) const {
272
444k
  auto predicate = [&blacklist](const HostPortPB& rhs) {
273
444k
    return blacklist.count(HostPortFromPB(rhs)) > 0;
274
444k
  };
275
444k
  return DoesRegistrationMatch(predicate);
276
444k
}
277
278
187k
bool TSDescriptor::IsRunningOn(const HostPortPB& hp) const {
279
187k
  auto predicate = [&hp](const HostPortPB& rhs) {
280
187k
    return rhs.host() == hp.host() && rhs.port() == hp.port();
281
187k
  };
282
187k
  return DoesRegistrationMatch(predicate);
283
187k
}
284
285
16.5k
Result<HostPort> TSDescriptor::GetHostPortUnlocked() const {
286
16.5k
  const auto& addr = DesiredHostPort(ts_information_->registration().common(), local_cloud_info_);
287
16.5k
  if (addr.host().empty()) {
288
0
    return STATUS_FORMAT(NetworkError, "Unable to find the TS address for $0: $1",
289
0
                         permanent_uuid_, ts_information_->registration().ShortDebugString());
290
0
  }
291
292
16.5k
  return HostPortFromPB(addr);
293
16.5k
}
294
295
0
bool TSDescriptor::IsAcceptingLeaderLoad(const ReplicationInfoPB& replication_info) const {
296
0
  return true;
297
0
}
298
299
33.7k
void TSDescriptor::UpdateMetrics(const TServerMetricsPB& metrics) {
300
33.7k
  std::lock_guard<decltype(lock_)> l(lock_);
301
33.7k
  ts_metrics_.total_memory_usage = metrics.total_ram_usage();
302
33.7k
  ts_metrics_.total_sst_file_size = metrics.total_sst_file_size();
303
33.7k
  ts_metrics_.uncompressed_sst_file_size = metrics.uncompressed_sst_file_size();
304
33.7k
  ts_metrics_.num_sst_files = metrics.num_sst_files();
305
33.7k
  ts_metrics_.read_ops_per_sec = metrics.read_ops_per_sec();
306
33.7k
  ts_metrics_.write_ops_per_sec = metrics.write_ops_per_sec();
307
33.7k
  ts_metrics_.uptime_seconds = metrics.uptime_seconds();
308
23.1k
  for (const auto& path_metric : metrics.path_metrics()) {
309
23.1k
    ts_metrics_.path_metrics[path_metric.path_id()] =
310
23.1k
        { path_metric.used_space(), path_metric.total_space() };
311
23.1k
  }
312
33.7k
}
313
314
20.4k
void TSDescriptor::GetMetrics(TServerMetricsPB* metrics) {
315
20.4k
  CHECK(metrics);
316
20.4k
  SharedLock<decltype(lock_)> l(lock_);
317
20.4k
  metrics->set_total_ram_usage(ts_metrics_.total_memory_usage);
318
20.4k
  metrics->set_total_sst_file_size(ts_metrics_.total_sst_file_size);
319
20.4k
  metrics->set_uncompressed_sst_file_size(ts_metrics_.uncompressed_sst_file_size);
320
20.4k
  metrics->set_num_sst_files(ts_metrics_.num_sst_files);
321
20.4k
  metrics->set_read_ops_per_sec(ts_metrics_.read_ops_per_sec);
322
20.4k
  metrics->set_write_ops_per_sec(ts_metrics_.write_ops_per_sec);
323
20.4k
  metrics->set_uptime_seconds(ts_metrics_.uptime_seconds);
324
17.8k
  for (const auto& path_metric : ts_metrics_.path_metrics) {
325
17.8k
    auto* new_path_metric = metrics->add_path_metrics();
326
17.8k
    new_path_metric->set_path_id(path_metric.first);
327
17.8k
    new_path_metric->set_used_space(path_metric.second.used_space);
328
17.8k
    new_path_metric->set_total_space(path_metric.second.total_space);
329
17.8k
  }
330
20.4k
}
331
332
577k
bool TSDescriptor::HasTabletDeletePending() const {
333
577k
  SharedLock<decltype(lock_)> l(lock_);
334
577k
  return !tablets_pending_delete_.empty();
335
577k
}
336
337
47.8k
bool TSDescriptor::IsTabletDeletePending(const std::string& tablet_id) const {
338
47.8k
  SharedLock<decltype(lock_)> l(lock_);
339
47.8k
  return tablets_pending_delete_.count(tablet_id);
340
47.8k
}
341
342
16.0k
std::string TSDescriptor::PendingTabletDeleteToString() const {
343
16.0k
  SharedLock<decltype(lock_)> l(lock_);
344
16.0k
  return yb::ToString(tablets_pending_delete_);
345
16.0k
}
346
347
47.0k
void TSDescriptor::AddPendingTabletDelete(const std::string& tablet_id) {
348
47.0k
  std::lock_guard<decltype(lock_)> l(lock_);
349
47.0k
  tablets_pending_delete_.insert(tablet_id);
350
47.0k
}
351
352
46.9k
void TSDescriptor::ClearPendingTabletDelete(const std::string& tablet_id) {
353
46.9k
  std::lock_guard<decltype(lock_)> l(lock_);
354
46.9k
  tablets_pending_delete_.erase(tablet_id);
355
46.9k
}
356
357
0
std::size_t TSDescriptor::NumTasks() const {
358
0
  SharedLock<decltype(lock_)> l(lock_);
359
0
  return tablets_pending_delete_.size();
360
0
}
361
362
6.93M
bool TSDescriptor::IsLive() const {
363
6.93M
  return TimeSinceHeartbeat().ToMilliseconds() <
364
6.88M
         GetAtomicFlag(&FLAGS_tserver_unresponsive_timeout_ms) && !IsRemoved();
365
6.93M
}
366
367
4.28M
bool TSDescriptor::IsLiveAndHasReported() const {
368
4.28M
  return IsLive() && has_tablet_report();
369
4.28M
}
370
371
49.6k
std::string TSDescriptor::ToString() const {
372
49.6k
  SharedLock<decltype(lock_)> l(lock_);
373
49.6k
  return Format("{ permanent_uuid: $0 registration: $1 placement_id: $2 }",
374
49.6k
                permanent_uuid_, ts_information_->registration(), placement_id_);
375
49.6k
}
376
377
} // namespace master
378
} // namespace yb