/Users/deen/code/yugabyte-db/src/yb/tserver/heartbeater.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/heartbeater.h" |
34 | | |
35 | | #include <cstdint> |
36 | | #include <iosfwd> |
37 | | #include <map> |
38 | | #include <memory> |
39 | | #include <mutex> |
40 | | #include <ostream> |
41 | | #include <string> |
42 | | #include <vector> |
43 | | |
44 | | #include <boost/function.hpp> |
45 | | #include <glog/logging.h> |
46 | | |
47 | | #include "yb/common/common_flags.h" |
48 | | #include "yb/common/hybrid_time.h" |
49 | | #include "yb/common/wire_protocol.h" |
50 | | |
51 | | #include "yb/consensus/log_fwd.h" |
52 | | |
53 | | #include "yb/docdb/docdb.pb.h" |
54 | | |
55 | | #include "yb/gutil/atomicops.h" |
56 | | #include "yb/gutil/bind.h" |
57 | | #include "yb/gutil/macros.h" |
58 | | #include "yb/gutil/ref_counted.h" |
59 | | #include "yb/gutil/strings/substitute.h" |
60 | | #include "yb/gutil/thread_annotations.h" |
61 | | |
62 | | #include "yb/master/master_heartbeat.proxy.h" |
63 | | #include "yb/master/master_rpc.h" |
64 | | #include "yb/master/master_types.pb.h" |
65 | | |
66 | | #include "yb/rocksdb/cache.h" |
67 | | #include "yb/rocksdb/options.h" |
68 | | #include "yb/rocksdb/statistics.h" |
69 | | |
70 | | #include "yb/rpc/rpc_fwd.h" |
71 | | |
72 | | #include "yb/server/hybrid_clock.h" |
73 | | #include "yb/server/server_base.proxy.h" |
74 | | |
75 | | #include "yb/tablet/tablet_options.h" |
76 | | |
77 | | #include "yb/tserver/tablet_server.h" |
78 | | #include "yb/tserver/ts_tablet_manager.h" |
79 | | |
80 | | #include "yb/util/async_util.h" |
81 | | #include "yb/util/capabilities.h" |
82 | | #include "yb/util/countdown_latch.h" |
83 | | #include "yb/util/enums.h" |
84 | | #include "yb/util/flag_tags.h" |
85 | | #include "yb/util/locks.h" |
86 | | #include "yb/util/logging.h" |
87 | | #include "yb/util/monotime.h" |
88 | | #include "yb/util/net/net_util.h" |
89 | | #include "yb/util/slice.h" |
90 | | #include "yb/util/status.h" |
91 | | #include "yb/util/status_format.h" |
92 | | #include "yb/util/status_log.h" |
93 | | #include "yb/util/strongly_typed_bool.h" |
94 | | #include "yb/util/thread.h" |
95 | | #include "yb/util/threadpool.h" |
96 | | |
97 | | using namespace std::literals; |
98 | | |
99 | | DEFINE_int32(heartbeat_rpc_timeout_ms, 15000, |
100 | | "Timeout used for the TS->Master heartbeat RPCs."); |
101 | | TAG_FLAG(heartbeat_rpc_timeout_ms, advanced); |
102 | | TAG_FLAG(heartbeat_rpc_timeout_ms, runtime); |
103 | | |
104 | | DEFINE_int32(heartbeat_interval_ms, 1000, |
105 | | "Interval at which the TS heartbeats to the master."); |
106 | | TAG_FLAG(heartbeat_interval_ms, advanced); |
107 | | TAG_FLAG(heartbeat_interval_ms, runtime); |
108 | | |
109 | | DEFINE_int32(heartbeat_max_failures_before_backoff, 3, |
110 | | "Maximum number of consecutive heartbeat failures until the " |
111 | | "Tablet Server backs off to the normal heartbeat interval, " |
112 | | "rather than retrying."); |
113 | | TAG_FLAG(heartbeat_max_failures_before_backoff, advanced); |
114 | | |
115 | | DEFINE_test_flag(bool, tserver_disable_heartbeat, false, "Should heartbeat be disabled"); |
116 | | TAG_FLAG(TEST_tserver_disable_heartbeat, runtime); |
117 | | |
118 | | DEFINE_CAPABILITY(TabletReportLimit, 0xb1a2a020); |
119 | | |
120 | | using google::protobuf::RepeatedPtrField; |
121 | | using yb::HostPortPB; |
122 | | using yb::consensus::RaftPeerPB; |
123 | | using yb::master::GetLeaderMasterRpc; |
124 | | using yb::rpc::RpcController; |
125 | | using std::shared_ptr; |
126 | | using std::vector; |
127 | | using strings::Substitute; |
128 | | |
129 | | namespace yb { |
130 | | namespace tserver { |
131 | | |
132 | | // Most of the actual logic of the heartbeater is inside this inner class, |
133 | | // to avoid having too many dependencies from the header itself. |
134 | | // |
135 | | // This is basically the "PIMPL" pattern. |
136 | | class Heartbeater::Thread { |
137 | | public: |
138 | | Thread( |
139 | | const TabletServerOptions& opts, TabletServer* server, |
140 | | std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers); |
141 | | Thread(const Thread& other) = delete; |
142 | | void operator=(const Thread& other) = delete; |
143 | | |
144 | | Status Start(); |
145 | | Status Stop(); |
146 | | void TriggerASAP(); |
147 | | |
148 | 224 | void set_master_addresses(server::MasterAddressesPtr master_addresses) { |
149 | 224 | std::lock_guard<std::mutex> l(master_addresses_mtx_); |
150 | 224 | master_addresses_ = std::move(master_addresses); |
151 | 224 | VLOG_WITH_PREFIX0 (1) << "Setting master addresses to " << yb::ToString(master_addresses_)0 ; |
152 | 224 | } |
153 | | |
154 | | private: |
155 | | void RunThread(); |
156 | | Status FindLeaderMaster(CoarseTimePoint deadline, HostPort* leader_hostport); |
157 | | Status ConnectToMaster(); |
158 | | int GetMinimumHeartbeatMillis() const; |
159 | | int GetMillisUntilNextHeartbeat() const; |
160 | | CHECKED_STATUS DoHeartbeat(); |
161 | | CHECKED_STATUS TryHeartbeat(); |
162 | | CHECKED_STATUS SetupRegistration(master::TSRegistrationPB* reg); |
163 | | void SetupCommonField(master::TSToMasterCommonPB* common); |
164 | | bool IsCurrentThread() const; |
165 | | |
166 | 873k | const std::string& LogPrefix() const { |
167 | 873k | return server_->LogPrefix(); |
168 | 873k | } |
169 | | |
170 | 856k | server::MasterAddressesPtr get_master_addresses() { |
171 | 856k | std::lock_guard<std::mutex> l(master_addresses_mtx_); |
172 | 856k | CHECK_NOTNULL(master_addresses_.get()); |
173 | 856k | return master_addresses_; |
174 | 856k | } |
175 | | |
176 | | // Protecting master_addresses_. |
177 | | std::mutex master_addresses_mtx_; |
178 | | |
179 | | // The hosts/ports of masters that we may heartbeat to. |
180 | | // |
181 | | // We keep the HostPort around rather than a Sockaddr because the |
182 | | // masters may change IP addresses, and we'd like to re-resolve on |
183 | | // every new attempt at connecting. |
184 | | server::MasterAddressesPtr master_addresses_; |
185 | | |
186 | | // The server for which we are heartbeating. |
187 | | TabletServer* const server_; |
188 | | |
189 | | // Roundtrip time of previous heartbeat to yb-master. |
190 | | MonoDelta heartbeat_rtt_ = MonoDelta::kZero; |
191 | | |
192 | | // The actual running thread (NULL before it is started) |
193 | | scoped_refptr<yb::Thread> thread_; |
194 | | |
195 | | // Host and port of the most recent leader master. |
196 | | HostPort leader_master_hostport_; |
197 | | |
198 | | // Current RPC proxy to the leader master. |
199 | | std::unique_ptr<master::MasterHeartbeatProxy> proxy_; |
200 | | |
201 | | // The most recent response from a heartbeat. |
202 | | master::TSHeartbeatResponsePB last_hb_response_; |
203 | | |
204 | | // Full reports can take multiple heartbeats. |
205 | | // Flag to indicate if next heartbeat is part of a full report. |
206 | | bool sending_full_report_ = false; |
207 | | |
208 | | // The number of heartbeats which have failed in a row. |
209 | | // This is tracked so as to back-off heartbeating. |
210 | | int consecutive_failed_heartbeats_ = 0; |
211 | | |
212 | | // Mutex/condition pair to trigger the heartbeater thread |
213 | | // to either heartbeat early or exit. |
214 | | Mutex mutex_; |
215 | | ConditionVariable cond_; |
216 | | |
217 | | // Protected by mutex_. |
218 | | bool should_run_ = false; |
219 | | bool heartbeat_asap_ = false; |
220 | | |
221 | | rpc::Rpcs rpcs_; |
222 | | |
223 | | std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers_; |
224 | | }; |
225 | | |
226 | | //////////////////////////////////////////////////////////// |
227 | | // Heartbeater |
228 | | //////////////////////////////////////////////////////////// |
229 | | |
230 | | Heartbeater::Heartbeater( |
231 | | const TabletServerOptions& opts, TabletServer* server, |
232 | | std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers) |
233 | 8.74k | : thread_(new Thread(opts, server, std::move(data_providers))) { |
234 | 8.74k | } |
235 | | |
236 | 89 | Heartbeater::~Heartbeater() { |
237 | 89 | WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread"); |
238 | 89 | } |
239 | | |
240 | 8.58k | Status Heartbeater::Start() { return thread_->Start(); } |
241 | 288 | Status Heartbeater::Stop() { return thread_->Stop(); } |
242 | 592k | void Heartbeater::TriggerASAP() { thread_->TriggerASAP(); } |
243 | | |
244 | 224 | void Heartbeater::set_master_addresses(server::MasterAddressesPtr master_addresses) { |
245 | 224 | thread_->set_master_addresses(std::move(master_addresses)); |
246 | 224 | } |
247 | | |
248 | | //////////////////////////////////////////////////////////// |
249 | | // Heartbeater::Thread |
250 | | //////////////////////////////////////////////////////////// |
251 | | |
252 | | Heartbeater::Thread::Thread( |
253 | | const TabletServerOptions& opts, TabletServer* server, |
254 | | std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers) |
255 | | : master_addresses_(opts.GetMasterAddresses()), |
256 | | server_(server), |
257 | | cond_(&mutex_), |
258 | 8.74k | data_providers_(std::move(data_providers)) { |
259 | 8.74k | CHECK_NOTNULL(master_addresses_.get()); |
260 | 8.74k | CHECK(!master_addresses_->empty()); |
261 | 8.74k | VLOG_WITH_PREFIX0 (1) << "Initializing heartbeater thread with master addresses: " |
262 | 0 | << yb::ToString(master_addresses_); |
263 | 8.74k | } |
264 | | |
265 | | namespace { |
266 | | |
267 | | struct FindLeaderMasterData { |
268 | | HostPort result; |
269 | | Synchronizer sync; |
270 | | std::shared_ptr<GetLeaderMasterRpc> rpc; |
271 | | }; |
272 | | |
273 | | void LeaderMasterCallback(const std::shared_ptr<FindLeaderMasterData>& data, |
274 | | const Status& status, |
275 | 417k | const HostPort& result) { |
276 | 417k | if (status.ok()) { |
277 | 405k | data->result = result; |
278 | 405k | } |
279 | 417k | data->sync.StatusCB(status); |
280 | 417k | } |
281 | | |
282 | | } // anonymous namespace |
283 | | |
284 | 420k | Status Heartbeater::Thread::FindLeaderMaster(CoarseTimePoint deadline, HostPort* leader_hostport) { |
285 | 420k | Status s = Status::OK(); |
286 | 420k | const auto master_addresses = get_master_addresses(); |
287 | 420k | if (master_addresses->size() == 1 && (*master_addresses)[0].size() == 13.73k ) { |
288 | | // "Shortcut" the process when a single master is specified. |
289 | 1.71k | *leader_hostport = (*master_addresses)[0][0]; |
290 | 1.71k | return Status::OK(); |
291 | 1.71k | } |
292 | 418k | auto master_sock_addrs = *master_addresses; |
293 | 418k | if (master_sock_addrs.empty()) { |
294 | 0 | return STATUS(NotFound, "Unable to resolve any of the master addresses!"); |
295 | 0 | } |
296 | 418k | auto data = std::make_shared<FindLeaderMasterData>(); |
297 | 418k | data->rpc = std::make_shared<GetLeaderMasterRpc>( |
298 | 418k | Bind(&LeaderMasterCallback, data), |
299 | 418k | master_sock_addrs, |
300 | 418k | deadline, |
301 | 418k | server_->messenger(), |
302 | 418k | &server_->proxy_cache(), |
303 | 418k | &rpcs_, |
304 | 418k | true /* should_timeout_to_follower_ */); |
305 | 418k | data->rpc->SendRpc(); |
306 | 418k | auto status = data->sync.WaitFor(deadline - CoarseMonoClock::Now() + 1s); |
307 | 418k | if (status.ok()) { |
308 | 405k | *leader_hostport = data->result; |
309 | 405k | } |
310 | 418k | rpcs_.RequestAbortAll(); |
311 | 418k | return status; |
312 | 418k | } |
313 | | |
314 | 420k | Status Heartbeater::Thread::ConnectToMaster() { |
315 | 420k | auto deadline = CoarseMonoClock::Now() + FLAGS_heartbeat_rpc_timeout_ms * 1ms; |
316 | | // TODO send heartbeats without tablet reports to non-leader masters. |
317 | 420k | Status s = FindLeaderMaster(deadline, &leader_master_hostport_); |
318 | 420k | if (!s.ok()) { |
319 | 12.6k | LOG_WITH_PREFIX(INFO) << "Find leader master " << leader_master_hostport_.ToString() |
320 | 12.6k | << " hit error " << s; |
321 | 12.6k | return s; |
322 | 12.6k | } |
323 | | |
324 | | // Reset report state if we have master failover. |
325 | 407k | sending_full_report_ = false; |
326 | | |
327 | | // Pings are common for both Master and Tserver. |
328 | 407k | auto new_proxy = std::make_unique<server::GenericServiceProxy>( |
329 | 407k | &server_->proxy_cache(), leader_master_hostport_); |
330 | | |
331 | | // Ping the master to verify that it's alive. |
332 | 407k | server::PingRequestPB req; |
333 | 407k | server::PingResponsePB resp; |
334 | 407k | RpcController rpc; |
335 | 407k | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms)); |
336 | 407k | RETURN_NOT_OK_PREPEND(new_proxy->Ping(req, &resp, &rpc), |
337 | 407k | Format("Failed to ping master at $0", leader_master_hostport_)); |
338 | 407k | LOG_WITH_PREFIX(INFO) << "Connected to a leader master server at " << leader_master_hostport_; |
339 | | |
340 | | // Save state in the instance. |
341 | 407k | proxy_ = std::make_unique<master::MasterHeartbeatProxy>( |
342 | 407k | &server_->proxy_cache(), leader_master_hostport_); |
343 | 407k | return Status::OK(); |
344 | 407k | } |
345 | | |
346 | 5.25M | void Heartbeater::Thread::SetupCommonField(master::TSToMasterCommonPB* common) { |
347 | 5.25M | common->mutable_ts_instance()->CopyFrom(server_->instance_pb()); |
348 | 5.25M | } |
349 | | |
350 | 9.44k | Status Heartbeater::Thread::SetupRegistration(master::TSRegistrationPB* reg) { |
351 | 9.44k | reg->Clear(); |
352 | 9.44k | RETURN_NOT_OK(server_->GetRegistration(reg->mutable_common())); |
353 | | |
354 | 9.44k | return Status::OK(); |
355 | 9.44k | } |
356 | | |
357 | 355k | int Heartbeater::Thread::GetMinimumHeartbeatMillis() const { |
358 | | // If we've failed a few heartbeats in a row, back off to the normal |
359 | | // interval, rather than retrying in a loop. |
360 | 355k | if (consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) { |
361 | 304 | LOG_WITH_PREFIX(WARNING) << "Failed " << consecutive_failed_heartbeats_ <<" heartbeats " |
362 | 304 | << "in a row: no longer allowing fast heartbeat attempts."; |
363 | 304 | } |
364 | | |
365 | 355k | return consecutive_failed_heartbeats_ > FLAGS_heartbeat_max_failures_before_backoff ? |
366 | 354k | FLAGS_heartbeat_interval_ms718 : 0; |
367 | 355k | } |
368 | | |
369 | 5.61M | int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const { |
370 | | // If the master needs something from us, we should immediately |
371 | | // send another heartbeat with that info, rather than waiting for the interval. |
372 | 5.61M | if (sending_full_report_ || |
373 | 5.61M | last_hb_response_.needs_reregister()5.61M || |
374 | 5.61M | last_hb_response_.needs_full_tablet_report()5.25M ) { |
375 | 355k | return GetMinimumHeartbeatMillis(); |
376 | 355k | } |
377 | | |
378 | 5.25M | return FLAGS_heartbeat_interval_ms; |
379 | 5.61M | } |
380 | | |
381 | | |
382 | 5.25M | Status Heartbeater::Thread::TryHeartbeat() { |
383 | 5.25M | master::TSHeartbeatRequestPB req; |
384 | | |
385 | 5.25M | SetupCommonField(req.mutable_common()); |
386 | 5.25M | if (last_hb_response_.needs_reregister()) { |
387 | 9.44k | LOG_WITH_PREFIX(INFO) << "Registering TS with master..."; |
388 | 9.44k | RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()), |
389 | 9.44k | "Unable to set up registration"); |
390 | 9.44k | auto capabilities = Capabilities(); |
391 | 9.44k | *req.mutable_registration()->mutable_capabilities() = |
392 | 9.44k | google::protobuf::RepeatedField<CapabilityId>(capabilities.begin(), capabilities.end()); |
393 | 9.44k | } |
394 | | |
395 | 5.25M | if (last_hb_response_.needs_full_tablet_report()) { |
396 | 8.21k | LOG_WITH_PREFIX(INFO) << "Sending a full tablet report to master..."; |
397 | 8.21k | server_->tablet_manager()->StartFullTabletReport(req.mutable_tablet_report()); |
398 | 8.21k | sending_full_report_ = true; |
399 | 5.24M | } else { |
400 | 5.24M | if (sending_full_report_) { |
401 | 249 | LOG_WITH_PREFIX(INFO) << "Continuing full tablet report to master..."; |
402 | 5.24M | } else { |
403 | 5.24M | VLOG_WITH_PREFIX3 (2) << "Sending an incremental tablet report to master..."3 ; |
404 | 5.24M | } |
405 | 5.24M | server_->tablet_manager()->GenerateTabletReport(req.mutable_tablet_report(), |
406 | 5.24M | !sending_full_report_ /* include_bootstrap */); |
407 | 5.24M | } |
408 | 5.25M | req.mutable_tablet_report()->set_is_incremental(!sending_full_report_); |
409 | 5.25M | req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets()); |
410 | 5.25M | req.set_leader_count(server_->tablet_manager()->GetLeaderCount()); |
411 | | |
412 | 5.25M | for (auto& data_provider : data_providers_) { |
413 | 5.25M | data_provider->AddData(last_hb_response_, &req); |
414 | 5.25M | } |
415 | | |
416 | 5.25M | RpcController rpc; |
417 | 5.25M | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms)); |
418 | | |
419 | 5.25M | req.set_config_index(server_->GetCurrentMasterIndex()); |
420 | 5.25M | req.set_cluster_config_version(server_->cluster_config_version()); |
421 | 5.25M | req.set_rtt_us(heartbeat_rtt_.ToMicroseconds()); |
422 | | |
423 | | // Include the hybrid time of this tablet server in the heartbeat. |
424 | 5.25M | auto* hybrid_clock = dynamic_cast<server::HybridClock*>(server_->Clock()); |
425 | 5.25M | if (hybrid_clock) { |
426 | 5.25M | req.set_ts_hybrid_time(hybrid_clock->Now().ToUint64()); |
427 | | // Also include the physical clock time of this tablet server in the heartbeat. |
428 | 5.25M | Result<PhysicalTime> now = hybrid_clock->physical_clock()->Now(); |
429 | 5.25M | if (!now.ok()) { |
430 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 10) << "Failed to read clock: " << now.status(); |
431 | 0 | req.set_ts_physical_time(0); |
432 | 5.25M | } else { |
433 | 5.25M | req.set_ts_physical_time(now->time_point); |
434 | 5.25M | } |
435 | 5.25M | } else { |
436 | 321 | req.set_ts_hybrid_time(0); |
437 | 321 | req.set_ts_physical_time(0); |
438 | 321 | } |
439 | | |
440 | 5.25M | { |
441 | 5.25M | VLOG_WITH_PREFIX15 (2) << "Sending heartbeat:\n" << req.DebugString()15 ; |
442 | 5.25M | heartbeat_rtt_ = MonoDelta::kZero; |
443 | 5.25M | MonoTime start_time = MonoTime::Now(); |
444 | 5.25M | master::TSHeartbeatResponsePB resp; |
445 | 5.25M | RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc), |
446 | 5.25M | "Failed to send heartbeat"); |
447 | 5.22M | MonoTime end_time = MonoTime::Now(); |
448 | 5.22M | if (resp.has_error()) { |
449 | 398k | if (resp.error().code() != master::MasterErrorPB::NOT_THE_LEADER) { |
450 | 0 | return StatusFromPB(resp.error().status()); |
451 | 398k | } else { |
452 | 398k | DCHECK(!resp.leader_master()); |
453 | | // Treat a not-the-leader error code as leader_master=false. |
454 | 398k | if (resp.leader_master()) { |
455 | 0 | LOG_WITH_PREFIX(WARNING) << "Setting leader master to false for " |
456 | 0 | << resp.error().code() << " code."; |
457 | 0 | resp.set_leader_master(false); |
458 | 0 | } |
459 | 398k | } |
460 | 398k | } |
461 | | |
462 | 5.22M | VLOG_WITH_PREFIX160 (2) << "Received heartbeat response:\n" << resp.DebugString()160 ; |
463 | 5.22M | if (resp.has_master_config()) { |
464 | 224 | LOG_WITH_PREFIX(INFO) << "Received heartbeat response with config " << resp.DebugString(); |
465 | | |
466 | 224 | RETURN_NOT_OK(server_->UpdateMasterAddresses(resp.master_config(), resp.leader_master())); |
467 | 224 | } |
468 | | |
469 | 5.22M | if (!resp.leader_master()) { |
470 | | // If the master is no longer a leader, reset proxy so that we can |
471 | | // determine the master and attempt to heartbeat during in the |
472 | | // next heartbeat interval. |
473 | 398k | proxy_.reset(); |
474 | 398k | return STATUS_FORMAT(ServiceUnavailable, "Master is no longer the leader: $0", resp.error()); |
475 | 398k | } |
476 | | |
477 | | // Check for a universe key registry for encryption. |
478 | 4.83M | if (resp.has_universe_key_registry()) { |
479 | 46 | RETURN_NOT_OK(server_->SetUniverseKeyRegistry(resp.universe_key_registry())); |
480 | 46 | } |
481 | | |
482 | | // Check for CDC Universe Replication. |
483 | 4.83M | if (resp.has_consumer_registry()) { |
484 | 0 | int32_t cluster_config_version = -1; |
485 | 0 | if (!resp.has_cluster_config_version()) { |
486 | 0 | YB_LOG_EVERY_N_SECS(INFO, 30) |
487 | 0 | << "Invalid heartbeat response without a cluster config version"; |
488 | 0 | } else { |
489 | 0 | cluster_config_version = resp.cluster_config_version(); |
490 | 0 | } |
491 | 0 | RETURN_NOT_OK(static_cast<enterprise::TabletServer*>(server_)-> |
492 | 0 | SetConfigVersionAndConsumerRegistry(cluster_config_version, &resp.consumer_registry())); |
493 | 4.83M | } else if (resp.has_cluster_config_version()) { |
494 | 4.83M | RETURN_NOT_OK(static_cast<enterprise::TabletServer*>(server_)-> |
495 | 4.83M | SetConfigVersionAndConsumerRegistry(resp.cluster_config_version(), nullptr)); |
496 | 4.83M | } |
497 | | |
498 | | // At this point we know resp is a successful heartbeat response from the master so set it as |
499 | | // the last heartbeat response. This invalidates resp so we should use last_hb_response_ instead |
500 | | // below (hence using the nested scope for resp until here). |
501 | 4.83M | last_hb_response_.Swap(&resp); |
502 | 4.83M | heartbeat_rtt_ = end_time.GetDeltaSince(start_time); |
503 | 4.83M | } |
504 | | |
505 | 4.83M | if (last_hb_response_.has_cluster_uuid() && !last_hb_response_.cluster_uuid().empty()8.16k ) { |
506 | 8.16k | server_->set_cluster_uuid(last_hb_response_.cluster_uuid()); |
507 | 8.16k | } |
508 | | |
509 | | // The Master responds with the max entries for a single Tablet Report to avoid overwhelming it. |
510 | 4.83M | if (last_hb_response_.has_tablet_report_limit()) { |
511 | 4.83M | server_->tablet_manager()->SetReportLimit(last_hb_response_.tablet_report_limit()); |
512 | 4.83M | } |
513 | | |
514 | 4.83M | if (last_hb_response_.needs_full_tablet_report()) { |
515 | 8.21k | return STATUS(TryAgain, ""); |
516 | 8.21k | } |
517 | | |
518 | | // Handle TSHeartbeatResponsePB (e.g. tablets ack'd by master as processed) |
519 | 4.82M | bool all_processed = req.tablet_report().remaining_tablet_count() == 0 && |
520 | 4.82M | !last_hb_response_.tablet_report().processing_truncated()4.82M ; |
521 | 4.82M | server_->tablet_manager()->MarkTabletReportAcknowledged( |
522 | 4.82M | req.tablet_report().sequence_number(), last_hb_response_.tablet_report(), all_processed); |
523 | | |
524 | | // Trigger another heartbeat ASAP if we didn't process all tablets on this request. |
525 | 4.82M | sending_full_report_ = sending_full_report_ && !all_processed8.41k ; |
526 | | |
527 | | // Update the master's YSQL catalog version (i.e. if there were schema changes for YSQL objects). |
528 | 4.82M | if (last_hb_response_.has_ysql_catalog_version()) { |
529 | 4.82M | if (FLAGS_log_ysql_catalog_versions) { |
530 | 0 | VLOG_WITH_FUNC(1) << "got master catalog version: " |
531 | 0 | << last_hb_response_.ysql_catalog_version() |
532 | 0 | << ", breaking version: " |
533 | 0 | << (last_hb_response_.has_ysql_last_breaking_catalog_version() |
534 | 0 | ? Format("$1", last_hb_response_.ysql_last_breaking_catalog_version()) |
535 | 0 | : "(none)"); |
536 | 0 | } |
537 | 4.82M | if (last_hb_response_.has_ysql_last_breaking_catalog_version()) { |
538 | 4.82M | server_->SetYSQLCatalogVersion(last_hb_response_.ysql_catalog_version(), |
539 | 4.82M | last_hb_response_.ysql_last_breaking_catalog_version()); |
540 | 4.82M | } else { |
541 | | /* Assuming all changes are breaking if last breaking version not explicitly set. */ |
542 | 0 | server_->SetYSQLCatalogVersion(last_hb_response_.ysql_catalog_version(), |
543 | 0 | last_hb_response_.ysql_catalog_version()); |
544 | 0 | } |
545 | 4.82M | } |
546 | | |
547 | 4.82M | RETURN_NOT_OK(server_->tablet_manager()->UpdateSnapshotsInfo(last_hb_response_.snapshots_info())); |
548 | | |
549 | 4.82M | if (last_hb_response_.has_transaction_tables_version()) { |
550 | 4.82M | server_->UpdateTransactionTablesVersion(last_hb_response_.transaction_tables_version()); |
551 | 4.82M | } |
552 | | |
553 | | // Update the live tserver list. |
554 | 4.82M | return server_->PopulateLiveTServers(last_hb_response_); |
555 | 4.82M | } |
556 | | |
557 | 5.60M | Status Heartbeater::Thread::DoHeartbeat() { |
558 | 5.60M | if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) { |
559 | 97 | return STATUS(IOError, "failing all heartbeats for tests"); |
560 | 97 | } |
561 | | |
562 | 5.60M | if (PREDICT_FALSE(FLAGS_TEST_tserver_disable_heartbeat)) { |
563 | 344k | YB_LOG_EVERY_N_SECS(INFO, 1) << "Heartbeat disabled for testing."118 ; |
564 | 344k | return Status::OK(); |
565 | 344k | } |
566 | | |
567 | 5.25M | CHECK(IsCurrentThread()); |
568 | | |
569 | 5.25M | if (!proxy_) { |
570 | 420k | VLOG_WITH_PREFIX0 (1) << "No valid master proxy. Connecting..."0 ; |
571 | 420k | RETURN_NOT_OK(ConnectToMaster()); |
572 | 407k | DCHECK(proxy_); |
573 | 407k | } |
574 | | |
575 | 5.25M | for (;;)5.24M { |
576 | 5.25M | auto status = TryHeartbeat(); |
577 | 5.25M | if (!status.ok() && status.IsTryAgain()431k ) { |
578 | 8.21k | continue; |
579 | 8.21k | } |
580 | 5.24M | return status; |
581 | 5.25M | } |
582 | | |
583 | 552 | return Status::OK(); |
584 | 5.24M | } |
585 | | |
586 | 8.58k | void Heartbeater::Thread::RunThread() { |
587 | 8.58k | CHECK(IsCurrentThread()); |
588 | 8.58k | VLOG_WITH_PREFIX0 (1) << "Heartbeat thread starting"0 ; |
589 | | |
590 | | // Config the "last heartbeat response" to indicate that we need to register |
591 | | // -- since we've never registered before, we know this to be true. |
592 | 8.58k | last_hb_response_.set_needs_reregister(true); |
593 | | // Have the Master request a full tablet report on 2nd HB, once it knows our capabilities. |
594 | 8.58k | last_hb_response_.set_needs_full_tablet_report(false); |
595 | | |
596 | 5.62M | while (true) { |
597 | 5.61M | MonoTime next_heartbeat = MonoTime::Now(); |
598 | 5.61M | next_heartbeat.AddDelta(MonoDelta::FromMilliseconds(GetMillisUntilNextHeartbeat())); |
599 | | |
600 | | // Wait for either the heartbeat interval to elapse, or for an "ASAP" heartbeat, |
601 | | // or for the signal to shut down. |
602 | 5.61M | { |
603 | 5.61M | MutexLock l(mutex_); |
604 | 10.7M | while (true) { |
605 | 10.7M | MonoDelta remaining = next_heartbeat.GetDeltaSince(MonoTime::Now()); |
606 | 10.7M | if (remaining.ToMilliseconds() <= 0 || |
607 | 10.7M | heartbeat_asap_5.41M || |
608 | 10.7M | !should_run_5.11M ) { |
609 | 5.60M | break; |
610 | 5.60M | } |
611 | 5.11M | cond_.TimedWait(remaining); |
612 | 5.11M | } |
613 | | |
614 | 5.61M | heartbeat_asap_ = false; |
615 | | |
616 | 5.61M | if (!should_run_) { |
617 | 102 | VLOG_WITH_PREFIX0 (1) << "Heartbeat thread finished"0 ; |
618 | 102 | return; |
619 | 102 | } |
620 | 5.61M | } |
621 | | |
622 | 5.61M | Status s = DoHeartbeat(); |
623 | 5.61M | if (!s.ok()) { |
624 | 436k | const auto master_addresses = get_master_addresses(); |
625 | 436k | LOG_WITH_PREFIX(WARNING) |
626 | 436k | << "Failed to heartbeat to " << leader_master_hostport_.ToString() |
627 | 436k | << ": " << s << " tries=" << consecutive_failed_heartbeats_ |
628 | 436k | << ", num=" << master_addresses->size() |
629 | 436k | << ", masters=" << yb::ToString(master_addresses) |
630 | 436k | << ", code=" << s.CodeAsString(); |
631 | 436k | consecutive_failed_heartbeats_++; |
632 | | // If there's multiple masters... |
633 | 436k | if (master_addresses->size() > 1 || (*master_addresses)[0].size() > 125.6k ) { |
634 | | // If we encountered a network error (e.g., connection refused) or reached our failure |
635 | | // threshold. Try determining the leader master again. Heartbeats function as a watchdog, |
636 | | // so timeouts should be considered normal failures. |
637 | 411k | if (s.IsNetworkError() || |
638 | 411k | consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff411k ) { |
639 | 1.00k | proxy_.reset(); |
640 | 1.00k | } |
641 | 411k | } |
642 | 436k | continue; |
643 | 436k | } |
644 | 5.17M | consecutive_failed_heartbeats_ = 0; |
645 | 5.17M | } |
646 | 8.58k | } |
647 | | |
648 | 5.26M | bool Heartbeater::Thread::IsCurrentThread() const { |
649 | 5.26M | return thread_.get() == yb::Thread::current_thread(); |
650 | 5.26M | } |
651 | | |
652 | 8.58k | Status Heartbeater::Thread::Start() { |
653 | 8.58k | CHECK(thread_ == nullptr); |
654 | | |
655 | 8.58k | should_run_ = true; |
656 | 8.58k | return yb::Thread::Create("heartbeater", "heartbeat", |
657 | 8.58k | &Heartbeater::Thread::RunThread, this, &thread_); |
658 | 8.58k | } |
659 | | |
660 | 288 | Status Heartbeater::Thread::Stop() { |
661 | 288 | if (!thread_) { |
662 | 183 | return Status::OK(); |
663 | 183 | } |
664 | | |
665 | 105 | { |
666 | 105 | MutexLock l(mutex_); |
667 | 105 | should_run_ = false; |
668 | 105 | cond_.Signal(); |
669 | 105 | } |
670 | 105 | RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join()); |
671 | 105 | thread_ = nullptr; |
672 | 105 | return Status::OK(); |
673 | 105 | } |
674 | | |
675 | 592k | void Heartbeater::Thread::TriggerASAP() { |
676 | 592k | MutexLock l(mutex_); |
677 | 592k | heartbeat_asap_ = true; |
678 | 592k | cond_.Signal(); |
679 | 592k | } |
680 | | |
681 | | |
682 | 0 | const std::string& HeartbeatDataProvider::LogPrefix() const { |
683 | 0 | return server_.LogPrefix(); |
684 | 0 | } |
685 | | |
686 | | void PeriodicalHeartbeatDataProvider::AddData( |
687 | 5.25M | const master::TSHeartbeatResponsePB& last_resp, master::TSHeartbeatRequestPB* req) { |
688 | 5.25M | if (prev_run_time_ + period_ < CoarseMonoClock::Now()) { |
689 | 1.07M | DoAddData(last_resp, req); |
690 | 1.07M | prev_run_time_ = CoarseMonoClock::Now(); |
691 | 1.07M | } |
692 | 5.25M | } |
693 | | |
694 | | } // namespace tserver |
695 | | } // namespace yb |