YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/heartbeater.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/heartbeater.h"
34
35
#include <cstdint>
36
#include <iosfwd>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <ostream>
41
#include <string>
42
#include <vector>
43
44
#include <boost/function.hpp>
45
#include <glog/logging.h>
46
47
#include "yb/common/common_flags.h"
48
#include "yb/common/hybrid_time.h"
49
#include "yb/common/wire_protocol.h"
50
51
#include "yb/consensus/log_fwd.h"
52
53
#include "yb/docdb/docdb.pb.h"
54
55
#include "yb/gutil/atomicops.h"
56
#include "yb/gutil/bind.h"
57
#include "yb/gutil/macros.h"
58
#include "yb/gutil/ref_counted.h"
59
#include "yb/gutil/strings/substitute.h"
60
#include "yb/gutil/thread_annotations.h"
61
62
#include "yb/master/master_heartbeat.proxy.h"
63
#include "yb/master/master_rpc.h"
64
#include "yb/master/master_types.pb.h"
65
66
#include "yb/rocksdb/cache.h"
67
#include "yb/rocksdb/options.h"
68
#include "yb/rocksdb/statistics.h"
69
70
#include "yb/rpc/rpc_fwd.h"
71
72
#include "yb/server/hybrid_clock.h"
73
#include "yb/server/server_base.proxy.h"
74
75
#include "yb/tablet/tablet_options.h"
76
77
#include "yb/tserver/tablet_server.h"
78
#include "yb/tserver/ts_tablet_manager.h"
79
80
#include "yb/util/async_util.h"
81
#include "yb/util/capabilities.h"
82
#include "yb/util/countdown_latch.h"
83
#include "yb/util/enums.h"
84
#include "yb/util/flag_tags.h"
85
#include "yb/util/locks.h"
86
#include "yb/util/logging.h"
87
#include "yb/util/monotime.h"
88
#include "yb/util/net/net_util.h"
89
#include "yb/util/slice.h"
90
#include "yb/util/status.h"
91
#include "yb/util/status_format.h"
92
#include "yb/util/status_log.h"
93
#include "yb/util/strongly_typed_bool.h"
94
#include "yb/util/thread.h"
95
#include "yb/util/threadpool.h"
96
97
using namespace std::literals;
98
99
DEFINE_int32(heartbeat_rpc_timeout_ms, 15000,
100
             "Timeout used for the TS->Master heartbeat RPCs.");
101
TAG_FLAG(heartbeat_rpc_timeout_ms, advanced);
102
TAG_FLAG(heartbeat_rpc_timeout_ms, runtime);
103
104
DEFINE_int32(heartbeat_interval_ms, 1000,
105
             "Interval at which the TS heartbeats to the master.");
106
TAG_FLAG(heartbeat_interval_ms, advanced);
107
TAG_FLAG(heartbeat_interval_ms, runtime);
108
109
DEFINE_int32(heartbeat_max_failures_before_backoff, 3,
110
             "Maximum number of consecutive heartbeat failures until the "
111
             "Tablet Server backs off to the normal heartbeat interval, "
112
             "rather than retrying.");
113
TAG_FLAG(heartbeat_max_failures_before_backoff, advanced);
114
115
DEFINE_test_flag(bool, tserver_disable_heartbeat, false, "Should heartbeat be disabled");
116
TAG_FLAG(TEST_tserver_disable_heartbeat, runtime);
117
118
DEFINE_CAPABILITY(TabletReportLimit, 0xb1a2a020);
119
120
using google::protobuf::RepeatedPtrField;
121
using yb::HostPortPB;
122
using yb::consensus::RaftPeerPB;
123
using yb::master::GetLeaderMasterRpc;
124
using yb::rpc::RpcController;
125
using std::shared_ptr;
126
using std::vector;
127
using strings::Substitute;
128
129
namespace yb {
130
namespace tserver {
131
132
// Most of the actual logic of the heartbeater is inside this inner class,
133
// to avoid having too many dependencies from the header itself.
134
//
135
// This is basically the "PIMPL" pattern.
136
class Heartbeater::Thread {
137
 public:
138
  Thread(
139
      const TabletServerOptions& opts, TabletServer* server,
140
      std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers);
141
  Thread(const Thread& other) = delete;
142
  void operator=(const Thread& other) = delete;
143
144
  Status Start();
145
  Status Stop();
146
  void TriggerASAP();
147
148
224
  void set_master_addresses(server::MasterAddressesPtr master_addresses) {
149
224
    std::lock_guard<std::mutex> l(master_addresses_mtx_);
150
224
    master_addresses_ = std::move(master_addresses);
151
224
    
VLOG_WITH_PREFIX0
(1) << "Setting master addresses to " << yb::ToString(master_addresses_)0
;
152
224
  }
153
154
 private:
155
  void RunThread();
156
  Status FindLeaderMaster(CoarseTimePoint deadline, HostPort* leader_hostport);
157
  Status ConnectToMaster();
158
  int GetMinimumHeartbeatMillis() const;
159
  int GetMillisUntilNextHeartbeat() const;
160
  CHECKED_STATUS DoHeartbeat();
161
  CHECKED_STATUS TryHeartbeat();
162
  CHECKED_STATUS SetupRegistration(master::TSRegistrationPB* reg);
163
  void SetupCommonField(master::TSToMasterCommonPB* common);
164
  bool IsCurrentThread() const;
165
166
873k
  const std::string& LogPrefix() const {
167
873k
    return server_->LogPrefix();
168
873k
  }
169
170
856k
  server::MasterAddressesPtr get_master_addresses() {
171
856k
    std::lock_guard<std::mutex> l(master_addresses_mtx_);
172
856k
    CHECK_NOTNULL(master_addresses_.get());
173
856k
    return master_addresses_;
174
856k
  }
175
176
  // Protecting master_addresses_.
177
  std::mutex master_addresses_mtx_;
178
179
  // The hosts/ports of masters that we may heartbeat to.
180
  //
181
  // We keep the HostPort around rather than a Sockaddr because the
182
  // masters may change IP addresses, and we'd like to re-resolve on
183
  // every new attempt at connecting.
184
  server::MasterAddressesPtr master_addresses_;
185
186
  // The server for which we are heartbeating.
187
  TabletServer* const server_;
188
189
  // Roundtrip time of previous heartbeat to yb-master.
190
  MonoDelta heartbeat_rtt_ = MonoDelta::kZero;
191
192
  // The actual running thread (NULL before it is started)
193
  scoped_refptr<yb::Thread> thread_;
194
195
  // Host and port of the most recent leader master.
196
  HostPort leader_master_hostport_;
197
198
  // Current RPC proxy to the leader master.
199
  std::unique_ptr<master::MasterHeartbeatProxy> proxy_;
200
201
  // The most recent response from a heartbeat.
202
  master::TSHeartbeatResponsePB last_hb_response_;
203
204
  // Full reports can take multiple heartbeats.
205
  // Flag to indicate if next heartbeat is part of a full report.
206
  bool sending_full_report_ = false;
207
208
  // The number of heartbeats which have failed in a row.
209
  // This is tracked so as to back-off heartbeating.
210
  int consecutive_failed_heartbeats_ = 0;
211
212
  // Mutex/condition pair to trigger the heartbeater thread
213
  // to either heartbeat early or exit.
214
  Mutex mutex_;
215
  ConditionVariable cond_;
216
217
  // Protected by mutex_.
218
  bool should_run_ = false;
219
  bool heartbeat_asap_ = false;
220
221
  rpc::Rpcs rpcs_;
222
223
  std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers_;
224
};
225
226
////////////////////////////////////////////////////////////
227
// Heartbeater
228
////////////////////////////////////////////////////////////
229
230
Heartbeater::Heartbeater(
231
    const TabletServerOptions& opts, TabletServer* server,
232
    std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers)
233
8.74k
  : thread_(new Thread(opts, server, std::move(data_providers))) {
234
8.74k
}
235
236
89
Heartbeater::~Heartbeater() {
237
89
  WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
238
89
}
239
240
8.58k
Status Heartbeater::Start() { return thread_->Start(); }
241
288
Status Heartbeater::Stop() { return thread_->Stop(); }
242
592k
void Heartbeater::TriggerASAP() { thread_->TriggerASAP(); }
243
244
224
void Heartbeater::set_master_addresses(server::MasterAddressesPtr master_addresses) {
245
224
  thread_->set_master_addresses(std::move(master_addresses));
246
224
}
247
248
////////////////////////////////////////////////////////////
249
// Heartbeater::Thread
250
////////////////////////////////////////////////////////////
251
252
Heartbeater::Thread::Thread(
253
    const TabletServerOptions& opts, TabletServer* server,
254
    std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers)
255
  : master_addresses_(opts.GetMasterAddresses()),
256
    server_(server),
257
    cond_(&mutex_),
258
8.74k
    data_providers_(std::move(data_providers)) {
259
8.74k
  CHECK_NOTNULL(master_addresses_.get());
260
8.74k
  CHECK(!master_addresses_->empty());
261
8.74k
  
VLOG_WITH_PREFIX0
(1) << "Initializing heartbeater thread with master addresses: "
262
0
          << yb::ToString(master_addresses_);
263
8.74k
}
264
265
namespace {
266
267
struct FindLeaderMasterData {
268
  HostPort result;
269
  Synchronizer sync;
270
  std::shared_ptr<GetLeaderMasterRpc> rpc;
271
};
272
273
void LeaderMasterCallback(const std::shared_ptr<FindLeaderMasterData>& data,
274
                          const Status& status,
275
417k
                          const HostPort& result) {
276
417k
  if (status.ok()) {
277
405k
    data->result = result;
278
405k
  }
279
417k
  data->sync.StatusCB(status);
280
417k
}
281
282
} // anonymous namespace
283
284
420k
Status Heartbeater::Thread::FindLeaderMaster(CoarseTimePoint deadline, HostPort* leader_hostport) {
285
420k
  Status s = Status::OK();
286
420k
  const auto master_addresses = get_master_addresses();
287
420k
  if (master_addresses->size() == 1 && 
(*master_addresses)[0].size() == 13.73k
) {
288
    // "Shortcut" the process when a single master is specified.
289
1.71k
    *leader_hostport = (*master_addresses)[0][0];
290
1.71k
    return Status::OK();
291
1.71k
  }
292
418k
  auto master_sock_addrs = *master_addresses;
293
418k
  if (master_sock_addrs.empty()) {
294
0
    return STATUS(NotFound, "Unable to resolve any of the master addresses!");
295
0
  }
296
418k
  auto data = std::make_shared<FindLeaderMasterData>();
297
418k
  data->rpc = std::make_shared<GetLeaderMasterRpc>(
298
418k
      Bind(&LeaderMasterCallback, data),
299
418k
      master_sock_addrs,
300
418k
      deadline,
301
418k
      server_->messenger(),
302
418k
      &server_->proxy_cache(),
303
418k
      &rpcs_,
304
418k
      true /* should_timeout_to_follower_ */);
305
418k
  data->rpc->SendRpc();
306
418k
  auto status = data->sync.WaitFor(deadline - CoarseMonoClock::Now() + 1s);
307
418k
  if (status.ok()) {
308
405k
    *leader_hostport = data->result;
309
405k
  }
310
418k
  rpcs_.RequestAbortAll();
311
418k
  return status;
312
418k
}
313
314
420k
Status Heartbeater::Thread::ConnectToMaster() {
315
420k
  auto deadline = CoarseMonoClock::Now() + FLAGS_heartbeat_rpc_timeout_ms * 1ms;
316
  // TODO send heartbeats without tablet reports to non-leader masters.
317
420k
  Status s = FindLeaderMaster(deadline, &leader_master_hostport_);
318
420k
  if (!s.ok()) {
319
12.6k
    LOG_WITH_PREFIX(INFO) << "Find leader master " <<  leader_master_hostport_.ToString()
320
12.6k
                          << " hit error " << s;
321
12.6k
    return s;
322
12.6k
  }
323
324
  // Reset report state if we have master failover.
325
407k
  sending_full_report_ = false;
326
327
  // Pings are common for both Master and Tserver.
328
407k
  auto new_proxy = std::make_unique<server::GenericServiceProxy>(
329
407k
      &server_->proxy_cache(), leader_master_hostport_);
330
331
  // Ping the master to verify that it's alive.
332
407k
  server::PingRequestPB req;
333
407k
  server::PingResponsePB resp;
334
407k
  RpcController rpc;
335
407k
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
336
407k
  RETURN_NOT_OK_PREPEND(new_proxy->Ping(req, &resp, &rpc),
337
407k
                        Format("Failed to ping master at $0", leader_master_hostport_));
338
407k
  LOG_WITH_PREFIX(INFO) << "Connected to a leader master server at " << leader_master_hostport_;
339
340
  // Save state in the instance.
341
407k
  proxy_ = std::make_unique<master::MasterHeartbeatProxy>(
342
407k
      &server_->proxy_cache(), leader_master_hostport_);
343
407k
  return Status::OK();
344
407k
}
345
346
5.25M
void Heartbeater::Thread::SetupCommonField(master::TSToMasterCommonPB* common) {
347
5.25M
  common->mutable_ts_instance()->CopyFrom(server_->instance_pb());
348
5.25M
}
349
350
9.44k
Status Heartbeater::Thread::SetupRegistration(master::TSRegistrationPB* reg) {
351
9.44k
  reg->Clear();
352
9.44k
  RETURN_NOT_OK(server_->GetRegistration(reg->mutable_common()));
353
354
9.44k
  return Status::OK();
355
9.44k
}
356
357
355k
int Heartbeater::Thread::GetMinimumHeartbeatMillis() const {
358
  // If we've failed a few heartbeats in a row, back off to the normal
359
  // interval, rather than retrying in a loop.
360
355k
  if (consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
361
304
    LOG_WITH_PREFIX(WARNING) << "Failed " << consecutive_failed_heartbeats_  <<" heartbeats "
362
304
                             << "in a row: no longer allowing fast heartbeat attempts.";
363
304
  }
364
365
355k
  return consecutive_failed_heartbeats_ > FLAGS_heartbeat_max_failures_before_backoff ?
366
354k
    
FLAGS_heartbeat_interval_ms718
: 0;
367
355k
}
368
369
5.61M
int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const {
370
  // If the master needs something from us, we should immediately
371
  // send another heartbeat with that info, rather than waiting for the interval.
372
5.61M
  if (sending_full_report_ ||
373
5.61M
      
last_hb_response_.needs_reregister()5.61M
||
374
5.61M
      
last_hb_response_.needs_full_tablet_report()5.25M
) {
375
355k
    return GetMinimumHeartbeatMillis();
376
355k
  }
377
378
5.25M
  return FLAGS_heartbeat_interval_ms;
379
5.61M
}
380
381
382
5.25M
Status Heartbeater::Thread::TryHeartbeat() {
383
5.25M
  master::TSHeartbeatRequestPB req;
384
385
5.25M
  SetupCommonField(req.mutable_common());
386
5.25M
  if (last_hb_response_.needs_reregister()) {
387
9.44k
    LOG_WITH_PREFIX(INFO) << "Registering TS with master...";
388
9.44k
    RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()),
389
9.44k
                          "Unable to set up registration");
390
9.44k
    auto capabilities = Capabilities();
391
9.44k
    *req.mutable_registration()->mutable_capabilities() =
392
9.44k
        google::protobuf::RepeatedField<CapabilityId>(capabilities.begin(), capabilities.end());
393
9.44k
  }
394
395
5.25M
  if (last_hb_response_.needs_full_tablet_report()) {
396
8.21k
    LOG_WITH_PREFIX(INFO) << "Sending a full tablet report to master...";
397
8.21k
    server_->tablet_manager()->StartFullTabletReport(req.mutable_tablet_report());
398
8.21k
    sending_full_report_ = true;
399
5.24M
  } else {
400
5.24M
    if (sending_full_report_) {
401
249
      LOG_WITH_PREFIX(INFO) << "Continuing full tablet report to master...";
402
5.24M
    } else {
403
5.24M
      
VLOG_WITH_PREFIX3
(2) << "Sending an incremental tablet report to master..."3
;
404
5.24M
    }
405
5.24M
    server_->tablet_manager()->GenerateTabletReport(req.mutable_tablet_report(),
406
5.24M
                                                    !sending_full_report_ /* include_bootstrap */);
407
5.24M
  }
408
5.25M
  req.mutable_tablet_report()->set_is_incremental(!sending_full_report_);
409
5.25M
  req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
410
5.25M
  req.set_leader_count(server_->tablet_manager()->GetLeaderCount());
411
412
5.25M
  for (auto& data_provider : data_providers_) {
413
5.25M
    data_provider->AddData(last_hb_response_, &req);
414
5.25M
  }
415
416
5.25M
  RpcController rpc;
417
5.25M
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
418
419
5.25M
  req.set_config_index(server_->GetCurrentMasterIndex());
420
5.25M
  req.set_cluster_config_version(server_->cluster_config_version());
421
5.25M
  req.set_rtt_us(heartbeat_rtt_.ToMicroseconds());
422
423
  // Include the hybrid time of this tablet server in the heartbeat.
424
5.25M
  auto* hybrid_clock = dynamic_cast<server::HybridClock*>(server_->Clock());
425
5.25M
  if (hybrid_clock) {
426
5.25M
    req.set_ts_hybrid_time(hybrid_clock->Now().ToUint64());
427
    // Also include the physical clock time of this tablet server in the heartbeat.
428
5.25M
    Result<PhysicalTime> now = hybrid_clock->physical_clock()->Now();
429
5.25M
    if (!now.ok()) {
430
0
      YB_LOG_EVERY_N_SECS(WARNING, 10) << "Failed to read clock: " << now.status();
431
0
      req.set_ts_physical_time(0);
432
5.25M
    } else {
433
5.25M
      req.set_ts_physical_time(now->time_point);
434
5.25M
    }
435
5.25M
  } else {
436
321
    req.set_ts_hybrid_time(0);
437
321
    req.set_ts_physical_time(0);
438
321
  }
439
440
5.25M
  {
441
5.25M
    
VLOG_WITH_PREFIX15
(2) << "Sending heartbeat:\n" << req.DebugString()15
;
442
5.25M
    heartbeat_rtt_ = MonoDelta::kZero;
443
5.25M
    MonoTime start_time = MonoTime::Now();
444
5.25M
    master::TSHeartbeatResponsePB resp;
445
5.25M
    RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc),
446
5.25M
                          "Failed to send heartbeat");
447
5.22M
    MonoTime end_time = MonoTime::Now();
448
5.22M
    if (resp.has_error()) {
449
398k
      if (resp.error().code() != master::MasterErrorPB::NOT_THE_LEADER) {
450
0
        return StatusFromPB(resp.error().status());
451
398k
      } else {
452
398k
        DCHECK(!resp.leader_master());
453
        // Treat a not-the-leader error code as leader_master=false.
454
398k
        if (resp.leader_master()) {
455
0
          LOG_WITH_PREFIX(WARNING) << "Setting leader master to false for "
456
0
                                   << resp.error().code() << " code.";
457
0
          resp.set_leader_master(false);
458
0
        }
459
398k
      }
460
398k
    }
461
462
5.22M
    
VLOG_WITH_PREFIX160
(2) << "Received heartbeat response:\n" << resp.DebugString()160
;
463
5.22M
    if (resp.has_master_config()) {
464
224
      LOG_WITH_PREFIX(INFO) << "Received heartbeat response with config " << resp.DebugString();
465
466
224
      RETURN_NOT_OK(server_->UpdateMasterAddresses(resp.master_config(), resp.leader_master()));
467
224
    }
468
469
5.22M
    if (!resp.leader_master()) {
470
      // If the master is no longer a leader, reset proxy so that we can
471
      // determine the master and attempt to heartbeat during in the
472
      // next heartbeat interval.
473
398k
      proxy_.reset();
474
398k
      return STATUS_FORMAT(ServiceUnavailable, "Master is no longer the leader: $0", resp.error());
475
398k
    }
476
477
    // Check for a universe key registry for encryption.
478
4.83M
    if (resp.has_universe_key_registry()) {
479
46
      RETURN_NOT_OK(server_->SetUniverseKeyRegistry(resp.universe_key_registry()));
480
46
    }
481
482
    // Check for CDC Universe Replication.
483
4.83M
    if (resp.has_consumer_registry()) {
484
0
      int32_t cluster_config_version = -1;
485
0
      if (!resp.has_cluster_config_version()) {
486
0
        YB_LOG_EVERY_N_SECS(INFO, 30)
487
0
            << "Invalid heartbeat response without a cluster config version";
488
0
      } else {
489
0
        cluster_config_version = resp.cluster_config_version();
490
0
      }
491
0
      RETURN_NOT_OK(static_cast<enterprise::TabletServer*>(server_)->
492
0
          SetConfigVersionAndConsumerRegistry(cluster_config_version, &resp.consumer_registry()));
493
4.83M
    } else if (resp.has_cluster_config_version()) {
494
4.83M
      RETURN_NOT_OK(static_cast<enterprise::TabletServer*>(server_)->
495
4.83M
          SetConfigVersionAndConsumerRegistry(resp.cluster_config_version(), nullptr));
496
4.83M
    }
497
498
    // At this point we know resp is a successful heartbeat response from the master so set it as
499
    // the last heartbeat response. This invalidates resp so we should use last_hb_response_ instead
500
    // below (hence using the nested scope for resp until here).
501
4.83M
    last_hb_response_.Swap(&resp);
502
4.83M
    heartbeat_rtt_ = end_time.GetDeltaSince(start_time);
503
4.83M
  }
504
505
4.83M
  if (last_hb_response_.has_cluster_uuid() && 
!last_hb_response_.cluster_uuid().empty()8.16k
) {
506
8.16k
    server_->set_cluster_uuid(last_hb_response_.cluster_uuid());
507
8.16k
  }
508
509
  // The Master responds with the max entries for a single Tablet Report to avoid overwhelming it.
510
4.83M
  if (last_hb_response_.has_tablet_report_limit()) {
511
4.83M
    server_->tablet_manager()->SetReportLimit(last_hb_response_.tablet_report_limit());
512
4.83M
  }
513
514
4.83M
  if (last_hb_response_.needs_full_tablet_report()) {
515
8.21k
    return STATUS(TryAgain, "");
516
8.21k
  }
517
518
  // Handle TSHeartbeatResponsePB (e.g. tablets ack'd by master as processed)
519
4.82M
  bool all_processed = req.tablet_report().remaining_tablet_count() == 0 &&
520
4.82M
                       
!last_hb_response_.tablet_report().processing_truncated()4.82M
;
521
4.82M
  server_->tablet_manager()->MarkTabletReportAcknowledged(
522
4.82M
      req.tablet_report().sequence_number(), last_hb_response_.tablet_report(), all_processed);
523
524
  // Trigger another heartbeat ASAP if we didn't process all tablets on this request.
525
4.82M
  sending_full_report_ = sending_full_report_ && 
!all_processed8.41k
;
526
527
  // Update the master's YSQL catalog version (i.e. if there were schema changes for YSQL objects).
528
4.82M
  if (last_hb_response_.has_ysql_catalog_version()) {
529
4.82M
    if (FLAGS_log_ysql_catalog_versions) {
530
0
      VLOG_WITH_FUNC(1) << "got master catalog version: "
531
0
                        << last_hb_response_.ysql_catalog_version()
532
0
                        << ", breaking version: "
533
0
                        << (last_hb_response_.has_ysql_last_breaking_catalog_version()
534
0
                            ? Format("$1", last_hb_response_.ysql_last_breaking_catalog_version())
535
0
                            : "(none)");
536
0
    }
537
4.82M
    if (last_hb_response_.has_ysql_last_breaking_catalog_version()) {
538
4.82M
      server_->SetYSQLCatalogVersion(last_hb_response_.ysql_catalog_version(),
539
4.82M
                                     last_hb_response_.ysql_last_breaking_catalog_version());
540
4.82M
    } else {
541
      /* Assuming all changes are breaking if last breaking version not explicitly set. */
542
0
      server_->SetYSQLCatalogVersion(last_hb_response_.ysql_catalog_version(),
543
0
                                     last_hb_response_.ysql_catalog_version());
544
0
    }
545
4.82M
  }
546
547
4.82M
  RETURN_NOT_OK(server_->tablet_manager()->UpdateSnapshotsInfo(last_hb_response_.snapshots_info()));
548
549
4.82M
  if (last_hb_response_.has_transaction_tables_version()) {
550
4.82M
    server_->UpdateTransactionTablesVersion(last_hb_response_.transaction_tables_version());
551
4.82M
  }
552
553
  // Update the live tserver list.
554
4.82M
  return server_->PopulateLiveTServers(last_hb_response_);
555
4.82M
}
556
557
5.60M
Status Heartbeater::Thread::DoHeartbeat() {
558
5.60M
  if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
559
97
    return STATUS(IOError, "failing all heartbeats for tests");
560
97
  }
561
562
5.60M
  if (PREDICT_FALSE(FLAGS_TEST_tserver_disable_heartbeat)) {
563
344k
    YB_LOG_EVERY_N_SECS
(INFO, 1) << "Heartbeat disabled for testing."118
;
564
344k
    return Status::OK();
565
344k
  }
566
567
5.25M
  CHECK(IsCurrentThread());
568
569
5.25M
  if (!proxy_) {
570
420k
    
VLOG_WITH_PREFIX0
(1) << "No valid master proxy. Connecting..."0
;
571
420k
    RETURN_NOT_OK(ConnectToMaster());
572
407k
    DCHECK(proxy_);
573
407k
  }
574
575
5.25M
  
for (;;)5.24M
{
576
5.25M
    auto status = TryHeartbeat();
577
5.25M
    if (!status.ok() && 
status.IsTryAgain()431k
) {
578
8.21k
      continue;
579
8.21k
    }
580
5.24M
    return status;
581
5.25M
  }
582
583
552
  return Status::OK();
584
5.24M
}
585
586
8.58k
void Heartbeater::Thread::RunThread() {
587
8.58k
  CHECK(IsCurrentThread());
588
8.58k
  
VLOG_WITH_PREFIX0
(1) << "Heartbeat thread starting"0
;
589
590
  // Config the "last heartbeat response" to indicate that we need to register
591
  // -- since we've never registered before, we know this to be true.
592
8.58k
  last_hb_response_.set_needs_reregister(true);
593
  // Have the Master request a full tablet report on 2nd HB, once it knows our capabilities.
594
8.58k
  last_hb_response_.set_needs_full_tablet_report(false);
595
596
5.62M
  while (true) {
597
5.61M
    MonoTime next_heartbeat = MonoTime::Now();
598
5.61M
    next_heartbeat.AddDelta(MonoDelta::FromMilliseconds(GetMillisUntilNextHeartbeat()));
599
600
    // Wait for either the heartbeat interval to elapse, or for an "ASAP" heartbeat,
601
    // or for the signal to shut down.
602
5.61M
    {
603
5.61M
      MutexLock l(mutex_);
604
10.7M
      while (true) {
605
10.7M
        MonoDelta remaining = next_heartbeat.GetDeltaSince(MonoTime::Now());
606
10.7M
        if (remaining.ToMilliseconds() <= 0 ||
607
10.7M
            
heartbeat_asap_5.41M
||
608
10.7M
            
!should_run_5.11M
) {
609
5.60M
          break;
610
5.60M
        }
611
5.11M
        cond_.TimedWait(remaining);
612
5.11M
      }
613
614
5.61M
      heartbeat_asap_ = false;
615
616
5.61M
      if (!should_run_) {
617
102
        
VLOG_WITH_PREFIX0
(1) << "Heartbeat thread finished"0
;
618
102
        return;
619
102
      }
620
5.61M
    }
621
622
5.61M
    Status s = DoHeartbeat();
623
5.61M
    if (!s.ok()) {
624
436k
      const auto master_addresses = get_master_addresses();
625
436k
      LOG_WITH_PREFIX(WARNING)
626
436k
          << "Failed to heartbeat to " << leader_master_hostport_.ToString()
627
436k
          << ": " << s << " tries=" << consecutive_failed_heartbeats_
628
436k
          << ", num=" << master_addresses->size()
629
436k
          << ", masters=" << yb::ToString(master_addresses)
630
436k
          << ", code=" << s.CodeAsString();
631
436k
      consecutive_failed_heartbeats_++;
632
      // If there's multiple masters...
633
436k
      if (master_addresses->size() > 1 || 
(*master_addresses)[0].size() > 125.6k
) {
634
        // If we encountered a network error (e.g., connection refused) or reached our failure
635
        // threshold.  Try determining the leader master again.  Heartbeats function as a watchdog,
636
        // so timeouts should be considered normal failures.
637
411k
        if (s.IsNetworkError() ||
638
411k
            
consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff411k
) {
639
1.00k
          proxy_.reset();
640
1.00k
        }
641
411k
      }
642
436k
      continue;
643
436k
    }
644
5.17M
    consecutive_failed_heartbeats_ = 0;
645
5.17M
  }
646
8.58k
}
647
648
5.26M
bool Heartbeater::Thread::IsCurrentThread() const {
649
5.26M
  return thread_.get() == yb::Thread::current_thread();
650
5.26M
}
651
652
8.58k
Status Heartbeater::Thread::Start() {
653
8.58k
  CHECK(thread_ == nullptr);
654
655
8.58k
  should_run_ = true;
656
8.58k
  return yb::Thread::Create("heartbeater", "heartbeat",
657
8.58k
      &Heartbeater::Thread::RunThread, this, &thread_);
658
8.58k
}
659
660
288
Status Heartbeater::Thread::Stop() {
661
288
  if (!thread_) {
662
183
    return Status::OK();
663
183
  }
664
665
105
  {
666
105
    MutexLock l(mutex_);
667
105
    should_run_ = false;
668
105
    cond_.Signal();
669
105
  }
670
105
  RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
671
105
  thread_ = nullptr;
672
105
  return Status::OK();
673
105
}
674
675
592k
void Heartbeater::Thread::TriggerASAP() {
676
592k
  MutexLock l(mutex_);
677
592k
  heartbeat_asap_ = true;
678
592k
  cond_.Signal();
679
592k
}
680
681
682
0
const std::string& HeartbeatDataProvider::LogPrefix() const {
683
0
  return server_.LogPrefix();
684
0
}
685
686
void PeriodicalHeartbeatDataProvider::AddData(
687
5.25M
    const master::TSHeartbeatResponsePB& last_resp, master::TSHeartbeatRequestPB* req) {
688
5.25M
  if (prev_run_time_ + period_ < CoarseMonoClock::Now()) {
689
1.07M
    DoAddData(last_resp, req);
690
1.07M
    prev_run_time_ = CoarseMonoClock::Now();
691
1.07M
  }
692
5.25M
}
693
694
} // namespace tserver
695
} // namespace yb