YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/ts_descriptor.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/master/ts_descriptor.h"
34
35
#include <vector>
36
37
#include "yb/common/common.pb.h"
38
#include "yb/common/wire_protocol.h"
39
#include "yb/common/wire_protocol.pb.h"
40
41
#include "yb/master/master_fwd.h"
42
#include "yb/master/catalog_manager_util.h"
43
#include "yb/master/master_cluster.pb.h"
44
#include "yb/master/master_heartbeat.pb.h"
45
46
#include "yb/util/atomic.h"
47
#include "yb/util/flag_tags.h"
48
#include "yb/util/status_format.h"
49
50
DEFINE_int32(tserver_unresponsive_timeout_ms, 60 * 1000,
51
             "The period of time that a Master can go without receiving a heartbeat from a "
52
             "tablet server before considering it unresponsive. Unresponsive servers are not "
53
             "selected when assigning replicas during table creation or re-replication.");
54
TAG_FLAG(tserver_unresponsive_timeout_ms, advanced);
55
56
57
namespace yb {
58
namespace master {
59
60
Result<TSDescriptorPtr> TSDescriptor::RegisterNew(
61
    const NodeInstancePB& instance,
62
    const TSRegistrationPB& registration,
63
    CloudInfoPB local_cloud_info,
64
    rpc::ProxyCache* proxy_cache,
65
8.15k
    RegisteredThroughHeartbeat registered_through_heartbeat) {
66
8.15k
  auto result = std::make_shared<enterprise::TSDescriptor>(
67
8.15k
      instance.permanent_uuid());
68
8.15k
  if (!registered_through_heartbeat) {
69
    // This tserver hasn't actually heartbeated, so register using a last_heartbeat_ time of 0.
70
84
    std::lock_guard<decltype(result->lock_)> l(result->lock_);
71
84
    result->last_heartbeat_ = MonoTime::kMin;
72
84
    result->registered_through_heartbeat_ = RegisteredThroughHeartbeat::kFalse;
73
84
  }
74
8.15k
  RETURN_NOT_OK(result->Register(instance, registration, std::move(local_cloud_info), proxy_cache));
75
8.15k
  return std::move(result);
76
8.15k
}
77
78
TSDescriptor::TSDescriptor(std::string perm_id)
79
    : permanent_uuid_(std::move(perm_id)),
80
      last_heartbeat_(MonoTime::Now()),
81
      has_tablet_report_(false),
82
      recent_replica_creations_(0),
83
      last_replica_creations_decay_(MonoTime::Now()),
84
8.25k
      num_live_replicas_(0) {
85
8.25k
}
86
87
132
TSDescriptor::~TSDescriptor() {
88
132
}
89
90
Status TSDescriptor::Register(const NodeInstancePB& instance,
91
                              const TSRegistrationPB& registration,
92
                              CloudInfoPB local_cloud_info,
93
8.40k
                              rpc::ProxyCache* proxy_cache) {
94
8.40k
  std::lock_guard<decltype(lock_)> l(lock_);
95
8.40k
  return RegisterUnlocked(instance, registration, std::move(local_cloud_info), proxy_cache);
96
8.40k
}
97
98
Status TSDescriptor::RegisterUnlocked(
99
    const NodeInstancePB& instance,
100
    const TSRegistrationPB& registration,
101
    CloudInfoPB local_cloud_info,
102
8.40k
    rpc::ProxyCache* proxy_cache) {
103
8.40k
  CHECK_EQ(instance.permanent_uuid(), permanent_uuid_);
104
105
8.40k
  int64_t latest_seqno = ts_information_
106
8.40k
      ? 
ts_information_->tserver_instance().instance_seqno()145
107
8.40k
      : 
-18.25k
;
108
8.40k
  if (instance.instance_seqno() < latest_seqno) {
109
9
    return STATUS(AlreadyPresent,
110
9
      strings::Substitute("Cannot register with sequence number $0:"
111
9
                          " Already have a registration from sequence number $1",
112
9
                          instance.instance_seqno(),
113
9
                          latest_seqno));
114
8.39k
  } else if (instance.instance_seqno() == latest_seqno) {
115
    // It's possible that the TS registered, but our response back to it
116
    // got lost, so it's trying to register again with the same sequence
117
    // number. That's fine.
118
1
    LOG(INFO) << "Processing retry of TS registration from " << instance.ShortDebugString();
119
1
  }
120
121
8.39k
  latest_seqno = instance.instance_seqno();
122
  // After re-registering, make the TS re-report its tablets.
123
8.39k
  has_tablet_report_ = false;
124
125
8.39k
  ts_information_ = std::make_shared<TSInformationPB>();
126
8.39k
  ts_information_->mutable_registration()->CopyFrom(registration);
127
8.39k
  ts_information_->mutable_tserver_instance()->set_permanent_uuid(permanent_uuid_);
128
8.39k
  ts_information_->mutable_tserver_instance()->set_instance_seqno(latest_seqno);
129
130
8.39k
  placement_id_ = generate_placement_id(registration.common().cloud_info());
131
132
8.39k
  proxies_.reset();
133
134
8.39k
  placement_uuid_ = "";
135
8.39k
  if (registration.common().has_placement_uuid()) {
136
8.30k
    placement_uuid_ = registration.common().placement_uuid();
137
8.30k
  }
138
8.39k
  local_cloud_info_ = std::move(local_cloud_info);
139
8.39k
  proxy_cache_ = proxy_cache;
140
141
8.39k
  capabilities_.clear();
142
8.39k
  capabilities_.insert(registration.capabilities().begin(), registration.capabilities().end());
143
144
8.39k
  return Status::OK();
145
8.40k
}
146
147
15.3M
std::string TSDescriptor::placement_uuid() const {
148
15.3M
  SharedLock<decltype(lock_)> l(lock_);
149
15.3M
  return placement_uuid_;
150
15.3M
}
151
152
3.35M
std::string TSDescriptor::generate_placement_id(const CloudInfoPB& ci) {
153
3.35M
  return strings::Substitute(
154
3.35M
      "$0:$1:$2", ci.placement_cloud(), ci.placement_region(), ci.placement_zone());
155
3.35M
}
156
157
45
std::string TSDescriptor::placement_id() const {
158
45
  SharedLock<decltype(lock_)> l(lock_);
159
45
  return placement_id_;
160
45
}
161
162
4.81M
void TSDescriptor::UpdateHeartbeat(const TSHeartbeatRequestPB* req) {
163
4.81M
  DCHECK_GE(req->num_live_tablets(), 0);
164
4.81M
  DCHECK_GE(req->leader_count(), 0);
165
4.81M
  {
166
4.81M
    std::lock_guard<decltype(lock_)> l(lock_);
167
4.81M
    last_heartbeat_ = MonoTime::Now();
168
4.81M
    num_live_replicas_ = req->num_live_tablets();
169
4.81M
    leader_count_ = req->leader_count();
170
4.81M
    physical_time_ = req->ts_physical_time();
171
4.81M
    hybrid_time_ = HybridTime::FromPB(req->ts_hybrid_time());
172
4.81M
    heartbeat_rtt_ = MonoDelta::FromMicroseconds(req->rtt_us());
173
4.81M
  }
174
4.81M
}
175
176
54.7M
MonoDelta TSDescriptor::TimeSinceHeartbeat() const {
177
54.7M
  MonoTime now(MonoTime::Now());
178
54.7M
  SharedLock<decltype(lock_)> l(lock_);
179
54.7M
  return now.GetDeltaSince(last_heartbeat_);
180
54.7M
}
181
182
4.80M
int64_t TSDescriptor::latest_seqno() const {
183
4.80M
  SharedLock<decltype(lock_)> l(lock_);
184
4.80M
  return ts_information_->tserver_instance().instance_seqno();
185
4.80M
}
186
187
31.5M
bool TSDescriptor::has_tablet_report() const {
188
31.5M
  SharedLock<decltype(lock_)> l(lock_);
189
31.5M
  return has_tablet_report_;
190
31.5M
}
191
192
8.23k
void TSDescriptor::set_has_tablet_report(bool has_report) {
193
8.23k
  std::lock_guard<decltype(lock_)> l(lock_);
194
8.23k
  has_tablet_report_ = has_report;
195
8.23k
}
196
197
664k
bool TSDescriptor::registered_through_heartbeat() const {
198
664k
  SharedLock<decltype(lock_)> l(lock_);
199
664k
  return registered_through_heartbeat_;
200
664k
}
201
202
140k
void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
203
  // In most cases, we won't have any recent replica creations, so
204
  // we don't need to bother calling the clock, etc.
205
140k
  if (recent_replica_creations_ == 0) 
return6.29k
;
206
207
133k
  const double kHalflifeSecs = 60;
208
133k
  MonoTime now = MonoTime::Now();
209
133k
  double secs_since_last_decay = now.GetDeltaSince(last_replica_creations_decay_).ToSeconds();
210
133k
  recent_replica_creations_ *= pow(0.5, secs_since_last_decay / kHalflifeSecs);
211
212
  // If sufficiently small, reset down to 0 to take advantage of the fast path above.
213
133k
  if (recent_replica_creations_ < 1e-12) {
214
0
    recent_replica_creations_ = 0;
215
0
  }
216
133k
  last_replica_creations_decay_ = now;
217
133k
}
218
219
140k
void TSDescriptor::IncrementRecentReplicaCreations() {
220
140k
  std::lock_guard<decltype(lock_)> l(lock_);
221
140k
  DecayRecentReplicaCreationsUnlocked();
222
140k
  recent_replica_creations_ += 1;
223
140k
}
224
225
3
double TSDescriptor::RecentReplicaCreations() {
226
3
  std::lock_guard<decltype(lock_)> l(lock_);
227
3
  DecayRecentReplicaCreationsUnlocked();
228
3
  return recent_replica_creations_;
229
3
}
230
231
1.28M
TSRegistrationPB TSDescriptor::GetRegistration() const {
232
1.28M
  SharedLock<decltype(lock_)> l(lock_);
233
1.28M
  return ts_information_->registration();
234
1.28M
}
235
236
18.8M
const std::shared_ptr<TSInformationPB> TSDescriptor::GetTSInformationPB() const {
237
18.8M
  SharedLock<decltype(lock_)> l(lock_);
238
18.8M
  CHECK
(ts_information_) << "No stored information"1.77k
;
239
18.8M
  return ts_information_;
240
18.8M
}
241
242
4.96M
bool TSDescriptor::MatchesCloudInfo(const CloudInfoPB& cloud_info) const {
243
4.96M
  SharedLock<decltype(lock_)> l(lock_);
244
4.96M
  const auto& ts_ci = ts_information_->registration().common().cloud_info();
245
246
  // cloud_info should be a prefix of ts_ci.
247
4.96M
  return CatalogManagerUtil::IsCloudInfoPrefix(cloud_info, ts_ci);
248
4.96M
}
249
250
77.8k
CloudInfoPB TSDescriptor::GetCloudInfo() const {
251
77.8k
  SharedLock<decltype(lock_)> l(lock_);
252
77.8k
  return ts_information_->registration().common().cloud_info();
253
77.8k
}
254
255
template<typename Lambda>
256
1.14M
bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const {
257
1.14M
  TSRegistrationPB reg = GetRegistration();
258
1.14M
  if (std::find_if(reg.common().private_rpc_addresses().begin(),
259
1.14M
                   reg.common().private_rpc_addresses().end(),
260
1.14M
                   predicate) != reg.common().private_rpc_addresses().end()) {
261
64.4k
    return true;
262
64.4k
  }
263
1.07M
  if (std::find_if(reg.common().broadcast_addresses().begin(),
264
1.07M
                   reg.common().broadcast_addresses().end(),
265
1.07M
                   predicate) != reg.common().broadcast_addresses().end()) {
266
0
    return true;
267
0
  }
268
1.07M
  return false;
269
1.07M
}
ts_descriptor.cc:bool yb::master::TSDescriptor::DoesRegistrationMatch<yb::master::TSDescriptor::IsBlacklisted(std::__1::unordered_set<yb::HostPort, yb::HostPortHash, std::__1::equal_to<yb::HostPort>, std::__1::allocator<yb::HostPort> > const&) const::$_0>(yb::master::TSDescriptor::IsBlacklisted(std::__1::unordered_set<yb::HostPort, yb::HostPortHash, std::__1::equal_to<yb::HostPort>, std::__1::allocator<yb::HostPort> > const&) const::$_0) const
Line
Count
Source
256
763k
bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const {
257
763k
  TSRegistrationPB reg = GetRegistration();
258
763k
  if (std::find_if(reg.common().private_rpc_addresses().begin(),
259
763k
                   reg.common().private_rpc_addresses().end(),
260
763k
                   predicate) != reg.common().private_rpc_addresses().end()) {
261
110
    return true;
262
110
  }
263
763k
  if (std::find_if(reg.common().broadcast_addresses().begin(),
264
763k
                   reg.common().broadcast_addresses().end(),
265
763k
                   predicate) != reg.common().broadcast_addresses().end()) {
266
0
    return true;
267
0
  }
268
763k
  return false;
269
763k
}
ts_descriptor.cc:bool yb::master::TSDescriptor::DoesRegistrationMatch<yb::master::TSDescriptor::IsRunningOn(yb::HostPortPB const&) const::$_1>(yb::master::TSDescriptor::IsRunningOn(yb::HostPortPB const&) const::$_1) const
Line
Count
Source
256
380k
bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const {
257
380k
  TSRegistrationPB reg = GetRegistration();
258
380k
  if (std::find_if(reg.common().private_rpc_addresses().begin(),
259
380k
                   reg.common().private_rpc_addresses().end(),
260
380k
                   predicate) != reg.common().private_rpc_addresses().end()) {
261
64.3k
    return true;
262
64.3k
  }
263
316k
  if (std::find_if(reg.common().broadcast_addresses().begin(),
264
316k
                   reg.common().broadcast_addresses().end(),
265
316k
                   predicate) != reg.common().broadcast_addresses().end()) {
266
0
    return true;
267
0
  }
268
316k
  return false;
269
316k
}
270
271
763k
bool TSDescriptor::IsBlacklisted(const BlacklistSet& blacklist) const {
272
764k
  auto predicate = [&blacklist](const HostPortPB& rhs) {
273
764k
    return blacklist.count(HostPortFromPB(rhs)) > 0;
274
764k
  };
275
763k
  return DoesRegistrationMatch(predicate);
276
763k
}
277
278
380k
bool TSDescriptor::IsRunningOn(const HostPortPB& hp) const {
279
380k
  auto predicate = [&hp](const HostPortPB& rhs) {
280
380k
    return rhs.host() == hp.host() && 
rhs.port() == hp.port()65.7k
;
281
380k
  };
282
380k
  return DoesRegistrationMatch(predicate);
283
380k
}
284
285
26.0k
Result<HostPort> TSDescriptor::GetHostPortUnlocked() const {
286
26.0k
  const auto& addr = DesiredHostPort(ts_information_->registration().common(), local_cloud_info_);
287
26.0k
  if (addr.host().empty()) {
288
0
    return STATUS_FORMAT(NetworkError, "Unable to find the TS address for $0: $1",
289
0
                         permanent_uuid_, ts_information_->registration().ShortDebugString());
290
0
  }
291
292
26.0k
  return HostPortFromPB(addr);
293
26.0k
}
294
295
0
bool TSDescriptor::IsAcceptingLeaderLoad(const ReplicationInfoPB& replication_info) const {
296
0
  return true;
297
0
}
298
299
957k
void TSDescriptor::UpdateMetrics(const TServerMetricsPB& metrics) {
300
957k
  std::lock_guard<decltype(lock_)> l(lock_);
301
957k
  ts_metrics_.total_memory_usage = metrics.total_ram_usage();
302
957k
  ts_metrics_.total_sst_file_size = metrics.total_sst_file_size();
303
957k
  ts_metrics_.uncompressed_sst_file_size = metrics.uncompressed_sst_file_size();
304
957k
  ts_metrics_.num_sst_files = metrics.num_sst_files();
305
957k
  ts_metrics_.read_ops_per_sec = metrics.read_ops_per_sec();
306
957k
  ts_metrics_.write_ops_per_sec = metrics.write_ops_per_sec();
307
957k
  ts_metrics_.uptime_seconds = metrics.uptime_seconds();
308
957k
  ts_metrics_.path_metrics.clear();
309
957k
  for (const auto& path_metric : metrics.path_metrics()) {
310
209k
    ts_metrics_.path_metrics[path_metric.path_id()] =
311
209k
        { path_metric.used_space(), path_metric.total_space() };
312
209k
  }
313
957k
  ts_metrics_.disable_tablet_split_if_default_ttl = metrics.disable_tablet_split_if_default_ttl();
314
957k
}
315
316
34.5k
void TSDescriptor::GetMetrics(TServerMetricsPB* metrics) {
317
34.5k
  CHECK(metrics);
318
34.5k
  SharedLock<decltype(lock_)> l(lock_);
319
34.5k
  metrics->set_total_ram_usage(ts_metrics_.total_memory_usage);
320
34.5k
  metrics->set_total_sst_file_size(ts_metrics_.total_sst_file_size);
321
34.5k
  metrics->set_uncompressed_sst_file_size(ts_metrics_.uncompressed_sst_file_size);
322
34.5k
  metrics->set_num_sst_files(ts_metrics_.num_sst_files);
323
34.5k
  metrics->set_read_ops_per_sec(ts_metrics_.read_ops_per_sec);
324
34.5k
  metrics->set_write_ops_per_sec(ts_metrics_.write_ops_per_sec);
325
34.5k
  metrics->set_uptime_seconds(ts_metrics_.uptime_seconds);
326
34.5k
  for (const auto& path_metric : ts_metrics_.path_metrics) {
327
31.3k
    auto* new_path_metric = metrics->add_path_metrics();
328
31.3k
    new_path_metric->set_path_id(path_metric.first);
329
31.3k
    new_path_metric->set_used_space(path_metric.second.used_space);
330
31.3k
    new_path_metric->set_total_space(path_metric.second.total_space);
331
31.3k
  }
332
34.5k
  metrics->set_disable_tablet_split_if_default_ttl(ts_metrics_.disable_tablet_split_if_default_ttl);
333
34.5k
}
334
335
5.89M
bool TSDescriptor::HasTabletDeletePending() const {
336
5.89M
  SharedLock<decltype(lock_)> l(lock_);
337
5.89M
  return !tablets_pending_delete_.empty();
338
5.89M
}
339
340
75.8k
bool TSDescriptor::IsTabletDeletePending(const std::string& tablet_id) const {
341
75.8k
  SharedLock<decltype(lock_)> l(lock_);
342
75.8k
  return tablets_pending_delete_.count(tablet_id);
343
75.8k
}
344
345
34.3k
std::string TSDescriptor::PendingTabletDeleteToString() const {
346
34.3k
  SharedLock<decltype(lock_)> l(lock_);
347
34.3k
  return yb::ToString(tablets_pending_delete_);
348
34.3k
}
349
350
74.2k
void TSDescriptor::AddPendingTabletDelete(const std::string& tablet_id) {
351
74.2k
  std::lock_guard<decltype(lock_)> l(lock_);
352
74.2k
  tablets_pending_delete_.insert(tablet_id);
353
74.2k
}
354
355
73.9k
void TSDescriptor::ClearPendingTabletDelete(const std::string& tablet_id) {
356
73.9k
  std::lock_guard<decltype(lock_)> l(lock_);
357
73.9k
  tablets_pending_delete_.erase(tablet_id);
358
73.9k
}
359
360
0
std::size_t TSDescriptor::NumTasks() const {
361
0
  SharedLock<decltype(lock_)> l(lock_);
362
0
  return tablets_pending_delete_.size();
363
0
}
364
365
51.8M
bool TSDescriptor::IsLive() const {
366
51.8M
  return TimeSinceHeartbeat().ToMilliseconds() <
367
51.8M
         GetAtomicFlag(&FLAGS_tserver_unresponsive_timeout_ms) && 
!IsRemoved()50.0M
;
368
51.8M
}
369
370
22.6M
bool TSDescriptor::IsLiveAndHasReported() const {
371
22.6M
  return IsLive() && 
has_tablet_report()21.9M
;
372
22.6M
}
373
374
87.4k
std::string TSDescriptor::ToString() const {
375
87.4k
  SharedLock<decltype(lock_)> l(lock_);
376
87.4k
  return Format("{ permanent_uuid: $0 registration: $1 placement_id: $2 }",
377
87.4k
                permanent_uuid_, ts_information_->registration(), placement_id_);
378
87.4k
}
379
380
} // namespace master
381
} // namespace yb