/Users/deen/code/yugabyte-db/src/yb/master/ts_descriptor.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/master/ts_descriptor.h" |
34 | | |
35 | | #include <vector> |
36 | | |
37 | | #include "yb/common/common.pb.h" |
38 | | #include "yb/common/wire_protocol.h" |
39 | | #include "yb/common/wire_protocol.pb.h" |
40 | | |
41 | | #include "yb/master/master_fwd.h" |
42 | | #include "yb/master/catalog_manager_util.h" |
43 | | #include "yb/master/master_cluster.pb.h" |
44 | | #include "yb/master/master_heartbeat.pb.h" |
45 | | |
46 | | #include "yb/util/atomic.h" |
47 | | #include "yb/util/flag_tags.h" |
48 | | #include "yb/util/status_format.h" |
49 | | |
50 | | DEFINE_int32(tserver_unresponsive_timeout_ms, 60 * 1000, |
51 | | "The period of time that a Master can go without receiving a heartbeat from a " |
52 | | "tablet server before considering it unresponsive. Unresponsive servers are not " |
53 | | "selected when assigning replicas during table creation or re-replication."); |
54 | | TAG_FLAG(tserver_unresponsive_timeout_ms, advanced); |
55 | | |
56 | | |
57 | | namespace yb { |
58 | | namespace master { |
59 | | |
60 | | Result<TSDescriptorPtr> TSDescriptor::RegisterNew( |
61 | | const NodeInstancePB& instance, |
62 | | const TSRegistrationPB& registration, |
63 | | CloudInfoPB local_cloud_info, |
64 | | rpc::ProxyCache* proxy_cache, |
65 | 5.53k | RegisteredThroughHeartbeat registered_through_heartbeat) { |
66 | 5.53k | auto result = std::make_shared<enterprise::TSDescriptor>( |
67 | 5.53k | instance.permanent_uuid()); |
68 | 5.53k | if (!registered_through_heartbeat) { |
69 | | // This tserver hasn't actually heartbeated, so register using a last_heartbeat_ time of 0. |
70 | 46 | std::lock_guard<decltype(result->lock_)> l(result->lock_); |
71 | 46 | result->last_heartbeat_ = MonoTime::kMin; |
72 | 46 | result->registered_through_heartbeat_ = RegisteredThroughHeartbeat::kFalse; |
73 | 46 | } |
74 | 5.53k | RETURN_NOT_OK(result->Register(instance, registration, std::move(local_cloud_info), proxy_cache)); |
75 | 5.53k | return std::move(result); |
76 | 5.53k | } |
77 | | |
78 | | TSDescriptor::TSDescriptor(std::string perm_id) |
79 | | : permanent_uuid_(std::move(perm_id)), |
80 | | last_heartbeat_(MonoTime::Now()), |
81 | | has_tablet_report_(false), |
82 | | recent_replica_creations_(0), |
83 | | last_replica_creations_decay_(MonoTime::Now()), |
84 | 5.64k | num_live_replicas_(0) { |
85 | 5.64k | } |
86 | | |
87 | 126 | TSDescriptor::~TSDescriptor() { |
88 | 126 | } |
89 | | |
90 | | Status TSDescriptor::Register(const NodeInstancePB& instance, |
91 | | const TSRegistrationPB& registration, |
92 | | CloudInfoPB local_cloud_info, |
93 | 5.75k | rpc::ProxyCache* proxy_cache) { |
94 | 5.75k | std::lock_guard<decltype(lock_)> l(lock_); |
95 | 5.75k | return RegisterUnlocked(instance, registration, std::move(local_cloud_info), proxy_cache); |
96 | 5.75k | } |
97 | | |
98 | | Status TSDescriptor::RegisterUnlocked( |
99 | | const NodeInstancePB& instance, |
100 | | const TSRegistrationPB& registration, |
101 | | CloudInfoPB local_cloud_info, |
102 | 5.75k | rpc::ProxyCache* proxy_cache) { |
103 | 5.75k | CHECK_EQ(instance.permanent_uuid(), permanent_uuid_); |
104 | | |
105 | 5.75k | int64_t latest_seqno = ts_information_ |
106 | 116 | ? ts_information_->tserver_instance().instance_seqno() |
107 | 5.63k | : -1; |
108 | 5.75k | if (instance.instance_seqno() < latest_seqno) { |
109 | 9 | return STATUS(AlreadyPresent, |
110 | 9 | strings::Substitute("Cannot register with sequence number $0:" |
111 | 9 | " Already have a registration from sequence number $1", |
112 | 9 | instance.instance_seqno(), |
113 | 9 | latest_seqno)); |
114 | 5.74k | } else if (instance.instance_seqno() == latest_seqno) { |
115 | | // It's possible that the TS registered, but our response back to it |
116 | | // got lost, so it's trying to register again with the same sequence |
117 | | // number. That's fine. |
118 | 1 | LOG(INFO) << "Processing retry of TS registration from " << instance.ShortDebugString(); |
119 | 1 | } |
120 | | |
121 | 5.74k | latest_seqno = instance.instance_seqno(); |
122 | | // After re-registering, make the TS re-report its tablets. |
123 | 5.74k | has_tablet_report_ = false; |
124 | | |
125 | 5.74k | ts_information_ = std::make_shared<TSInformationPB>(); |
126 | 5.74k | ts_information_->mutable_registration()->CopyFrom(registration); |
127 | 5.74k | ts_information_->mutable_tserver_instance()->set_permanent_uuid(permanent_uuid_); |
128 | 5.74k | ts_information_->mutable_tserver_instance()->set_instance_seqno(latest_seqno); |
129 | | |
130 | 5.74k | placement_id_ = generate_placement_id(registration.common().cloud_info()); |
131 | | |
132 | 5.74k | proxies_.reset(); |
133 | | |
134 | 5.74k | placement_uuid_ = ""; |
135 | 5.74k | if (registration.common().has_placement_uuid()) { |
136 | 5.65k | placement_uuid_ = registration.common().placement_uuid(); |
137 | 5.65k | } |
138 | 5.74k | local_cloud_info_ = std::move(local_cloud_info); |
139 | 5.74k | proxy_cache_ = proxy_cache; |
140 | | |
141 | 5.74k | capabilities_.clear(); |
142 | 5.74k | capabilities_.insert(registration.capabilities().begin(), registration.capabilities().end()); |
143 | | |
144 | 5.74k | return Status::OK(); |
145 | 5.75k | } |
146 | | |
147 | 3.89M | std::string TSDescriptor::placement_uuid() const { |
148 | 3.89M | SharedLock<decltype(lock_)> l(lock_); |
149 | 3.89M | return placement_uuid_; |
150 | 3.89M | } |
151 | | |
152 | 145k | std::string TSDescriptor::generate_placement_id(const CloudInfoPB& ci) { |
153 | 145k | return strings::Substitute( |
154 | 145k | "$0:$1:$2", ci.placement_cloud(), ci.placement_region(), ci.placement_zone()); |
155 | 145k | } |
156 | | |
157 | 45 | std::string TSDescriptor::placement_id() const { |
158 | 45 | SharedLock<decltype(lock_)> l(lock_); |
159 | 45 | return placement_id_; |
160 | 45 | } |
161 | | |
162 | 383k | void TSDescriptor::UpdateHeartbeat(const TSHeartbeatRequestPB* req) { |
163 | 383k | DCHECK_GE(req->num_live_tablets(), 0); |
164 | 383k | DCHECK_GE(req->leader_count(), 0); |
165 | 383k | { |
166 | 383k | std::lock_guard<decltype(lock_)> l(lock_); |
167 | 383k | last_heartbeat_ = MonoTime::Now(); |
168 | 383k | num_live_replicas_ = req->num_live_tablets(); |
169 | 383k | leader_count_ = req->leader_count(); |
170 | 383k | physical_time_ = req->ts_physical_time(); |
171 | 383k | hybrid_time_ = HybridTime::FromPB(req->ts_hybrid_time()); |
172 | 383k | heartbeat_rtt_ = MonoDelta::FromMicroseconds(req->rtt_us()); |
173 | 383k | } |
174 | 383k | } |
175 | | |
176 | 7.32M | MonoDelta TSDescriptor::TimeSinceHeartbeat() const { |
177 | 7.32M | MonoTime now(MonoTime::Now()); |
178 | 7.32M | SharedLock<decltype(lock_)> l(lock_); |
179 | 7.32M | return now.GetDeltaSince(last_heartbeat_); |
180 | 7.32M | } |
181 | | |
182 | 383k | int64_t TSDescriptor::latest_seqno() const { |
183 | 383k | SharedLock<decltype(lock_)> l(lock_); |
184 | 383k | return ts_information_->tserver_instance().instance_seqno(); |
185 | 383k | } |
186 | | |
187 | 5.02M | bool TSDescriptor::has_tablet_report() const { |
188 | 5.02M | SharedLock<decltype(lock_)> l(lock_); |
189 | 5.02M | return has_tablet_report_; |
190 | 5.02M | } |
191 | | |
192 | 5.58k | void TSDescriptor::set_has_tablet_report(bool has_report) { |
193 | 5.58k | std::lock_guard<decltype(lock_)> l(lock_); |
194 | 5.58k | has_tablet_report_ = has_report; |
195 | 5.58k | } |
196 | | |
197 | 378k | bool TSDescriptor::registered_through_heartbeat() const { |
198 | 378k | SharedLock<decltype(lock_)> l(lock_); |
199 | 378k | return registered_through_heartbeat_; |
200 | 378k | } |
201 | | |
202 | 82.1k | void TSDescriptor::DecayRecentReplicaCreationsUnlocked() { |
203 | | // In most cases, we won't have any recent replica creations, so |
204 | | // we don't need to bother calling the clock, etc. |
205 | 82.1k | if (recent_replica_creations_ == 0) return; |
206 | | |
207 | 78.1k | const double kHalflifeSecs = 60; |
208 | 78.1k | MonoTime now = MonoTime::Now(); |
209 | 78.1k | double secs_since_last_decay = now.GetDeltaSince(last_replica_creations_decay_).ToSeconds(); |
210 | 78.1k | recent_replica_creations_ *= pow(0.5, secs_since_last_decay / kHalflifeSecs); |
211 | | |
212 | | // If sufficiently small, reset down to 0 to take advantage of the fast path above. |
213 | 78.1k | if (recent_replica_creations_ < 1e-12) { |
214 | 0 | recent_replica_creations_ = 0; |
215 | 0 | } |
216 | 78.1k | last_replica_creations_decay_ = now; |
217 | 78.1k | } |
218 | | |
219 | 82.1k | void TSDescriptor::IncrementRecentReplicaCreations() { |
220 | 82.1k | std::lock_guard<decltype(lock_)> l(lock_); |
221 | 82.1k | DecayRecentReplicaCreationsUnlocked(); |
222 | 82.1k | recent_replica_creations_ += 1; |
223 | 82.1k | } |
224 | | |
225 | 3 | double TSDescriptor::RecentReplicaCreations() { |
226 | 3 | std::lock_guard<decltype(lock_)> l(lock_); |
227 | 3 | DecayRecentReplicaCreationsUnlocked(); |
228 | 3 | return recent_replica_creations_; |
229 | 3 | } |
230 | | |
231 | 715k | TSRegistrationPB TSDescriptor::GetRegistration() const { |
232 | 715k | SharedLock<decltype(lock_)> l(lock_); |
233 | 715k | return ts_information_->registration(); |
234 | 715k | } |
235 | | |
236 | 2.22M | const std::shared_ptr<TSInformationPB> TSDescriptor::GetTSInformationPB() const { |
237 | 2.22M | SharedLock<decltype(lock_)> l(lock_); |
238 | 421 | CHECK(ts_information_) << "No stored information"; |
239 | 2.22M | return ts_information_; |
240 | 2.22M | } |
241 | | |
242 | 173k | bool TSDescriptor::MatchesCloudInfo(const CloudInfoPB& cloud_info) const { |
243 | 173k | SharedLock<decltype(lock_)> l(lock_); |
244 | 173k | const auto& ts_ci = ts_information_->registration().common().cloud_info(); |
245 | | |
246 | | // cloud_info should be a prefix of ts_ci. |
247 | 173k | return CatalogManagerUtil::IsCloudInfoPrefix(cloud_info, ts_ci); |
248 | 173k | } |
249 | | |
250 | 26.2k | CloudInfoPB TSDescriptor::GetCloudInfo() const { |
251 | 26.2k | SharedLock<decltype(lock_)> l(lock_); |
252 | 26.2k | return ts_information_->registration().common().cloud_info(); |
253 | 26.2k | } |
254 | | |
255 | | template<typename Lambda> |
256 | 631k | bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const { |
257 | 631k | TSRegistrationPB reg = GetRegistration(); |
258 | 631k | if (std::find_if(reg.common().private_rpc_addresses().begin(), |
259 | 631k | reg.common().private_rpc_addresses().end(), |
260 | 32.2k | predicate) != reg.common().private_rpc_addresses().end()) { |
261 | 32.2k | return true; |
262 | 32.2k | } |
263 | 599k | if (std::find_if(reg.common().broadcast_addresses().begin(), |
264 | 599k | reg.common().broadcast_addresses().end(), |
265 | 0 | predicate) != reg.common().broadcast_addresses().end()) { |
266 | 0 | return true; |
267 | 0 | } |
268 | 599k | return false; |
269 | 599k | } ts_descriptor.cc:_ZNK2yb6master12TSDescriptor21DoesRegistrationMatchIZNKS1_13IsBlacklistedERKNSt3__113unordered_setINS_8HostPortENS_12HostPortHashENS3_8equal_toIS5_EENS3_9allocatorIS5_EEEEE3$_0EEbT_ Line | Count | Source | 256 | 444k | bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const { | 257 | 444k | TSRegistrationPB reg = GetRegistration(); | 258 | 444k | if (std::find_if(reg.common().private_rpc_addresses().begin(), | 259 | 444k | reg.common().private_rpc_addresses().end(), | 260 | 104 | predicate) != reg.common().private_rpc_addresses().end()) { | 261 | 104 | return true; | 262 | 104 | } | 263 | 444k | if (std::find_if(reg.common().broadcast_addresses().begin(), | 264 | 444k | reg.common().broadcast_addresses().end(), | 265 | 0 | predicate) != reg.common().broadcast_addresses().end()) { | 266 | 0 | return true; | 267 | 0 | } | 268 | 444k | return false; | 269 | 444k | } |
ts_descriptor.cc:_ZNK2yb6master12TSDescriptor21DoesRegistrationMatchIZNKS1_11IsRunningOnERKNS_10HostPortPBEE3$_1EEbT_ Line | Count | Source | 256 | 187k | bool TSDescriptor::DoesRegistrationMatch(Lambda predicate) const { | 257 | 187k | TSRegistrationPB reg = GetRegistration(); | 258 | 187k | if (std::find_if(reg.common().private_rpc_addresses().begin(), | 259 | 187k | reg.common().private_rpc_addresses().end(), | 260 | 32.1k | predicate) != reg.common().private_rpc_addresses().end()) { | 261 | 32.1k | return true; | 262 | 32.1k | } | 263 | 154k | if (std::find_if(reg.common().broadcast_addresses().begin(), | 264 | 154k | reg.common().broadcast_addresses().end(), | 265 | 0 | predicate) != reg.common().broadcast_addresses().end()) { | 266 | 0 | return true; | 267 | 0 | } | 268 | 154k | return false; | 269 | 154k | } |
|
270 | | |
271 | 444k | bool TSDescriptor::IsBlacklisted(const BlacklistSet& blacklist) const { |
272 | 444k | auto predicate = [&blacklist](const HostPortPB& rhs) { |
273 | 444k | return blacklist.count(HostPortFromPB(rhs)) > 0; |
274 | 444k | }; |
275 | 444k | return DoesRegistrationMatch(predicate); |
276 | 444k | } |
277 | | |
278 | 187k | bool TSDescriptor::IsRunningOn(const HostPortPB& hp) const { |
279 | 187k | auto predicate = [&hp](const HostPortPB& rhs) { |
280 | 187k | return rhs.host() == hp.host() && rhs.port() == hp.port(); |
281 | 187k | }; |
282 | 187k | return DoesRegistrationMatch(predicate); |
283 | 187k | } |
284 | | |
285 | 16.5k | Result<HostPort> TSDescriptor::GetHostPortUnlocked() const { |
286 | 16.5k | const auto& addr = DesiredHostPort(ts_information_->registration().common(), local_cloud_info_); |
287 | 16.5k | if (addr.host().empty()) { |
288 | 0 | return STATUS_FORMAT(NetworkError, "Unable to find the TS address for $0: $1", |
289 | 0 | permanent_uuid_, ts_information_->registration().ShortDebugString()); |
290 | 0 | } |
291 | | |
292 | 16.5k | return HostPortFromPB(addr); |
293 | 16.5k | } |
294 | | |
295 | 0 | bool TSDescriptor::IsAcceptingLeaderLoad(const ReplicationInfoPB& replication_info) const { |
296 | 0 | return true; |
297 | 0 | } |
298 | | |
299 | 33.7k | void TSDescriptor::UpdateMetrics(const TServerMetricsPB& metrics) { |
300 | 33.7k | std::lock_guard<decltype(lock_)> l(lock_); |
301 | 33.7k | ts_metrics_.total_memory_usage = metrics.total_ram_usage(); |
302 | 33.7k | ts_metrics_.total_sst_file_size = metrics.total_sst_file_size(); |
303 | 33.7k | ts_metrics_.uncompressed_sst_file_size = metrics.uncompressed_sst_file_size(); |
304 | 33.7k | ts_metrics_.num_sst_files = metrics.num_sst_files(); |
305 | 33.7k | ts_metrics_.read_ops_per_sec = metrics.read_ops_per_sec(); |
306 | 33.7k | ts_metrics_.write_ops_per_sec = metrics.write_ops_per_sec(); |
307 | 33.7k | ts_metrics_.uptime_seconds = metrics.uptime_seconds(); |
308 | 23.1k | for (const auto& path_metric : metrics.path_metrics()) { |
309 | 23.1k | ts_metrics_.path_metrics[path_metric.path_id()] = |
310 | 23.1k | { path_metric.used_space(), path_metric.total_space() }; |
311 | 23.1k | } |
312 | 33.7k | } |
313 | | |
314 | 20.4k | void TSDescriptor::GetMetrics(TServerMetricsPB* metrics) { |
315 | 20.4k | CHECK(metrics); |
316 | 20.4k | SharedLock<decltype(lock_)> l(lock_); |
317 | 20.4k | metrics->set_total_ram_usage(ts_metrics_.total_memory_usage); |
318 | 20.4k | metrics->set_total_sst_file_size(ts_metrics_.total_sst_file_size); |
319 | 20.4k | metrics->set_uncompressed_sst_file_size(ts_metrics_.uncompressed_sst_file_size); |
320 | 20.4k | metrics->set_num_sst_files(ts_metrics_.num_sst_files); |
321 | 20.4k | metrics->set_read_ops_per_sec(ts_metrics_.read_ops_per_sec); |
322 | 20.4k | metrics->set_write_ops_per_sec(ts_metrics_.write_ops_per_sec); |
323 | 20.4k | metrics->set_uptime_seconds(ts_metrics_.uptime_seconds); |
324 | 17.8k | for (const auto& path_metric : ts_metrics_.path_metrics) { |
325 | 17.8k | auto* new_path_metric = metrics->add_path_metrics(); |
326 | 17.8k | new_path_metric->set_path_id(path_metric.first); |
327 | 17.8k | new_path_metric->set_used_space(path_metric.second.used_space); |
328 | 17.8k | new_path_metric->set_total_space(path_metric.second.total_space); |
329 | 17.8k | } |
330 | 20.4k | } |
331 | | |
332 | 577k | bool TSDescriptor::HasTabletDeletePending() const { |
333 | 577k | SharedLock<decltype(lock_)> l(lock_); |
334 | 577k | return !tablets_pending_delete_.empty(); |
335 | 577k | } |
336 | | |
337 | 47.8k | bool TSDescriptor::IsTabletDeletePending(const std::string& tablet_id) const { |
338 | 47.8k | SharedLock<decltype(lock_)> l(lock_); |
339 | 47.8k | return tablets_pending_delete_.count(tablet_id); |
340 | 47.8k | } |
341 | | |
342 | 16.0k | std::string TSDescriptor::PendingTabletDeleteToString() const { |
343 | 16.0k | SharedLock<decltype(lock_)> l(lock_); |
344 | 16.0k | return yb::ToString(tablets_pending_delete_); |
345 | 16.0k | } |
346 | | |
347 | 47.0k | void TSDescriptor::AddPendingTabletDelete(const std::string& tablet_id) { |
348 | 47.0k | std::lock_guard<decltype(lock_)> l(lock_); |
349 | 47.0k | tablets_pending_delete_.insert(tablet_id); |
350 | 47.0k | } |
351 | | |
352 | 46.9k | void TSDescriptor::ClearPendingTabletDelete(const std::string& tablet_id) { |
353 | 46.9k | std::lock_guard<decltype(lock_)> l(lock_); |
354 | 46.9k | tablets_pending_delete_.erase(tablet_id); |
355 | 46.9k | } |
356 | | |
357 | 0 | std::size_t TSDescriptor::NumTasks() const { |
358 | 0 | SharedLock<decltype(lock_)> l(lock_); |
359 | 0 | return tablets_pending_delete_.size(); |
360 | 0 | } |
361 | | |
362 | 6.93M | bool TSDescriptor::IsLive() const { |
363 | 6.93M | return TimeSinceHeartbeat().ToMilliseconds() < |
364 | 6.88M | GetAtomicFlag(&FLAGS_tserver_unresponsive_timeout_ms) && !IsRemoved(); |
365 | 6.93M | } |
366 | | |
367 | 4.28M | bool TSDescriptor::IsLiveAndHasReported() const { |
368 | 4.28M | return IsLive() && has_tablet_report(); |
369 | 4.28M | } |
370 | | |
371 | 49.6k | std::string TSDescriptor::ToString() const { |
372 | 49.6k | SharedLock<decltype(lock_)> l(lock_); |
373 | 49.6k | return Format("{ permanent_uuid: $0 registration: $1 placement_id: $2 }", |
374 | 49.6k | permanent_uuid_, ts_information_->registration(), placement_id_); |
375 | 49.6k | } |
376 | | |
377 | | } // namespace master |
378 | | } // namespace yb |