YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/heartbeater.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/heartbeater.h"
34
35
#include <cstdint>
36
#include <iosfwd>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <ostream>
41
#include <string>
42
#include <vector>
43
44
#include <boost/function.hpp>
45
#include <glog/logging.h>
46
47
#include "yb/common/common_flags.h"
48
#include "yb/common/hybrid_time.h"
49
#include "yb/common/wire_protocol.h"
50
51
#include "yb/consensus/log_fwd.h"
52
53
#include "yb/docdb/docdb.pb.h"
54
55
#include "yb/gutil/atomicops.h"
56
#include "yb/gutil/bind.h"
57
#include "yb/gutil/macros.h"
58
#include "yb/gutil/ref_counted.h"
59
#include "yb/gutil/strings/substitute.h"
60
#include "yb/gutil/thread_annotations.h"
61
62
#include "yb/master/master_heartbeat.proxy.h"
63
#include "yb/master/master_rpc.h"
64
#include "yb/master/master_types.pb.h"
65
66
#include "yb/rocksdb/cache.h"
67
#include "yb/rocksdb/options.h"
68
#include "yb/rocksdb/statistics.h"
69
70
#include "yb/rpc/rpc_fwd.h"
71
72
#include "yb/server/hybrid_clock.h"
73
#include "yb/server/server_base.proxy.h"
74
75
#include "yb/tablet/tablet_options.h"
76
77
#include "yb/tserver/tablet_server.h"
78
#include "yb/tserver/ts_tablet_manager.h"
79
80
#include "yb/util/async_util.h"
81
#include "yb/util/capabilities.h"
82
#include "yb/util/countdown_latch.h"
83
#include "yb/util/enums.h"
84
#include "yb/util/flag_tags.h"
85
#include "yb/util/locks.h"
86
#include "yb/util/logging.h"
87
#include "yb/util/monotime.h"
88
#include "yb/util/net/net_util.h"
89
#include "yb/util/slice.h"
90
#include "yb/util/status.h"
91
#include "yb/util/status_format.h"
92
#include "yb/util/status_log.h"
93
#include "yb/util/strongly_typed_bool.h"
94
#include "yb/util/thread.h"
95
#include "yb/util/threadpool.h"
96
97
using namespace std::literals;
98
99
DEFINE_int32(heartbeat_rpc_timeout_ms, 15000,
100
             "Timeout used for the TS->Master heartbeat RPCs.");
101
TAG_FLAG(heartbeat_rpc_timeout_ms, advanced);
102
TAG_FLAG(heartbeat_rpc_timeout_ms, runtime);
103
104
DEFINE_int32(heartbeat_interval_ms, 1000,
105
             "Interval at which the TS heartbeats to the master.");
106
TAG_FLAG(heartbeat_interval_ms, advanced);
107
TAG_FLAG(heartbeat_interval_ms, runtime);
108
109
DEFINE_int32(heartbeat_max_failures_before_backoff, 3,
110
             "Maximum number of consecutive heartbeat failures until the "
111
             "Tablet Server backs off to the normal heartbeat interval, "
112
             "rather than retrying.");
113
TAG_FLAG(heartbeat_max_failures_before_backoff, advanced);
114
115
DEFINE_test_flag(bool, tserver_disable_heartbeat, false, "Should heartbeat be disabled");
116
TAG_FLAG(TEST_tserver_disable_heartbeat, runtime);
117
118
DEFINE_CAPABILITY(TabletReportLimit, 0xb1a2a020);
119
120
using google::protobuf::RepeatedPtrField;
121
using yb::HostPortPB;
122
using yb::consensus::RaftPeerPB;
123
using yb::master::GetLeaderMasterRpc;
124
using yb::rpc::RpcController;
125
using std::shared_ptr;
126
using std::vector;
127
using strings::Substitute;
128
129
namespace yb {
130
namespace tserver {
131
132
// Most of the actual logic of the heartbeater is inside this inner class,
133
// to avoid having too many dependencies from the header itself.
134
//
135
// This is basically the "PIMPL" pattern.
136
class Heartbeater::Thread {
137
 public:
138
  Thread(
139
      const TabletServerOptions& opts, TabletServer* server,
140
      std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers);
141
  Thread(const Thread& other) = delete;
142
  void operator=(const Thread& other) = delete;
143
144
  Status Start();
145
  Status Stop();
146
  void TriggerASAP();
147
148
135
  void set_master_addresses(server::MasterAddressesPtr master_addresses) {
149
135
    std::lock_guard<std::mutex> l(master_addresses_mtx_);
150
135
    master_addresses_ = std::move(master_addresses);
151
0
    VLOG_WITH_PREFIX(1) << "Setting master addresses to " << yb::ToString(master_addresses_);
152
135
  }
153
154
 private:
155
  void RunThread();
156
  Status FindLeaderMaster(CoarseTimePoint deadline, HostPort* leader_hostport);
157
  Status ConnectToMaster();
158
  int GetMinimumHeartbeatMillis() const;
159
  int GetMillisUntilNextHeartbeat() const;
160
  CHECKED_STATUS DoHeartbeat();
161
  CHECKED_STATUS TryHeartbeat();
162
  CHECKED_STATUS SetupRegistration(master::TSRegistrationPB* reg);
163
  void SetupCommonField(master::TSToMasterCommonPB* common);
164
  bool IsCurrentThread() const;
165
166
20.3k
  const std::string& LogPrefix() const {
167
20.3k
    return server_->LogPrefix();
168
20.3k
  }
169
170
8.85k
  server::MasterAddressesPtr get_master_addresses() {
171
8.85k
    std::lock_guard<std::mutex> l(master_addresses_mtx_);
172
8.85k
    CHECK_NOTNULL(master_addresses_.get());
173
8.85k
    return master_addresses_;
174
8.85k
  }
175
176
  // Protecting master_addresses_.
177
  std::mutex master_addresses_mtx_;
178
179
  // The hosts/ports of masters that we may heartbeat to.
180
  //
181
  // We keep the HostPort around rather than a Sockaddr because the
182
  // masters may change IP addresses, and we'd like to re-resolve on
183
  // every new attempt at connecting.
184
  server::MasterAddressesPtr master_addresses_;
185
186
  // The server for which we are heartbeating.
187
  TabletServer* const server_;
188
189
  // Roundtrip time of previous heartbeat to yb-master.
190
  MonoDelta heartbeat_rtt_ = MonoDelta::kZero;
191
192
  // The actual running thread (NULL before it is started)
193
  scoped_refptr<yb::Thread> thread_;
194
195
  // Host and port of the most recent leader master.
196
  HostPort leader_master_hostport_;
197
198
  // Current RPC proxy to the leader master.
199
  std::unique_ptr<master::MasterHeartbeatProxy> proxy_;
200
201
  // The most recent response from a heartbeat.
202
  master::TSHeartbeatResponsePB last_hb_response_;
203
204
  // Full reports can take multiple heartbeats.
205
  // Flag to indicate if next heartbeat is part of a full report.
206
  bool sending_full_report_ = false;
207
208
  // The number of heartbeats which have failed in a row.
209
  // This is tracked so as to back-off heartbeating.
210
  int consecutive_failed_heartbeats_ = 0;
211
212
  // Mutex/condition pair to trigger the heartbeater thread
213
  // to either heartbeat early or exit.
214
  Mutex mutex_;
215
  ConditionVariable cond_;
216
217
  // Protected by mutex_.
218
  bool should_run_ = false;
219
  bool heartbeat_asap_ = false;
220
221
  rpc::Rpcs rpcs_;
222
223
  std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers_;
224
};
225
226
////////////////////////////////////////////////////////////
227
// Heartbeater
228
////////////////////////////////////////////////////////////
229
230
Heartbeater::Heartbeater(
231
    const TabletServerOptions& opts, TabletServer* server,
232
    std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers)
233
5.81k
  : thread_(new Thread(opts, server, std::move(data_providers))) {
234
5.81k
}
235
236
73
Heartbeater::~Heartbeater() {
237
73
  WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
238
73
}
239
240
5.80k
Status Heartbeater::Start() { return thread_->Start(); }
241
153
Status Heartbeater::Stop() { return thread_->Stop(); }
242
338k
void Heartbeater::TriggerASAP() { thread_->TriggerASAP(); }
243
244
135
void Heartbeater::set_master_addresses(server::MasterAddressesPtr master_addresses) {
245
135
  thread_->set_master_addresses(std::move(master_addresses));
246
135
}
247
248
////////////////////////////////////////////////////////////
249
// Heartbeater::Thread
250
////////////////////////////////////////////////////////////
251
252
Heartbeater::Thread::Thread(
253
    const TabletServerOptions& opts, TabletServer* server,
254
    std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers)
255
  : master_addresses_(opts.GetMasterAddresses()),
256
    server_(server),
257
    cond_(&mutex_),
258
5.81k
    data_providers_(std::move(data_providers)) {
259
5.81k
  CHECK_NOTNULL(master_addresses_.get());
260
5.81k
  CHECK(!master_addresses_->empty());
261
0
  VLOG_WITH_PREFIX(1) << "Initializing heartbeater thread with master addresses: "
262
0
          << yb::ToString(master_addresses_);
263
5.81k
}
264
265
namespace {
266
267
struct FindLeaderMasterData {
268
  HostPort result;
269
  Synchronizer sync;
270
  std::shared_ptr<GetLeaderMasterRpc> rpc;
271
};
272
273
void LeaderMasterCallback(const std::shared_ptr<FindLeaderMasterData>& data,
274
                          const Status& status,
275
5.42k
                          const HostPort& result) {
276
5.42k
  if (status.ok()) {
277
5.37k
    data->result = result;
278
5.37k
  }
279
5.42k
  data->sync.StatusCB(status);
280
5.42k
}
281
282
} // anonymous namespace
283
284
6.86k
Status Heartbeater::Thread::FindLeaderMaster(CoarseTimePoint deadline, HostPort* leader_hostport) {
285
6.86k
  Status s = Status::OK();
286
6.86k
  const auto master_addresses = get_master_addresses();
287
6.86k
  if (master_addresses->size() == 1 && (*master_addresses)[0].size() == 1) {
288
    // "Shortcut" the process when a single master is specified.
289
1.14k
    *leader_hostport = (*master_addresses)[0][0];
290
1.14k
    return Status::OK();
291
1.14k
  }
292
5.72k
  auto master_sock_addrs = *master_addresses;
293
5.72k
  if (master_sock_addrs.empty()) {
294
0
    return STATUS(NotFound, "Unable to resolve any of the master addresses!");
295
0
  }
296
5.72k
  auto data = std::make_shared<FindLeaderMasterData>();
297
5.72k
  data->rpc = std::make_shared<GetLeaderMasterRpc>(
298
5.72k
      Bind(&LeaderMasterCallback, data),
299
5.72k
      master_sock_addrs,
300
5.72k
      deadline,
301
5.72k
      server_->messenger(),
302
5.72k
      &server_->proxy_cache(),
303
5.72k
      &rpcs_,
304
5.72k
      true /* should_timeout_to_follower_ */);
305
5.72k
  data->rpc->SendRpc();
306
5.72k
  auto status = data->sync.WaitFor(deadline - CoarseMonoClock::Now() + 1s);
307
5.72k
  if (status.ok()) {
308
5.37k
    *leader_hostport = data->result;
309
5.37k
  }
310
5.72k
  rpcs_.RequestAbortAll();
311
5.72k
  return status;
312
5.72k
}
313
314
6.86k
Status Heartbeater::Thread::ConnectToMaster() {
315
6.86k
  auto deadline = CoarseMonoClock::Now() + FLAGS_heartbeat_rpc_timeout_ms * 1ms;
316
  // TODO send heartbeats without tablet reports to non-leader masters.
317
6.86k
  Status s = FindLeaderMaster(deadline, &leader_master_hostport_);
318
6.86k
  if (!s.ok()) {
319
50
    LOG_WITH_PREFIX(INFO) << "Find leader master " <<  leader_master_hostport_.ToString()
320
50
                          << " hit error " << s;
321
50
    return s;
322
50
  }
323
324
  // Reset report state if we have master failover.
325
6.81k
  sending_full_report_ = false;
326
327
  // Pings are common for both Master and Tserver.
328
6.81k
  auto new_proxy = std::make_unique<server::GenericServiceProxy>(
329
6.81k
      &server_->proxy_cache(), leader_master_hostport_);
330
331
  // Ping the master to verify that it's alive.
332
6.81k
  server::PingRequestPB req;
333
6.81k
  server::PingResponsePB resp;
334
6.81k
  RpcController rpc;
335
6.81k
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
336
6.81k
  RETURN_NOT_OK_PREPEND(new_proxy->Ping(req, &resp, &rpc),
337
6.51k
                        Format("Failed to ping master at $0", leader_master_hostport_));
338
6.51k
  LOG_WITH_PREFIX(INFO) << "Connected to a leader master server at " << leader_master_hostport_;
339
340
  // Save state in the instance.
341
6.51k
  proxy_ = std::make_unique<master::MasterHeartbeatProxy>(
342
6.51k
      &server_->proxy_cache(), leader_master_hostport_);
343
6.51k
  return Status::OK();
344
6.81k
}
345
346
403k
void Heartbeater::Thread::SetupCommonField(master::TSToMasterCommonPB* common) {
347
403k
  common->mutable_ts_instance()->CopyFrom(server_->instance_pb());
348
403k
}
349
350
5.95k
Status Heartbeater::Thread::SetupRegistration(master::TSRegistrationPB* reg) {
351
5.95k
  reg->Clear();
352
5.95k
  RETURN_NOT_OK(server_->GetRegistration(reg->mutable_common()));
353
354
5.95k
  return Status::OK();
355
5.95k
}
356
357
423k
int Heartbeater::Thread::GetMinimumHeartbeatMillis() const {
358
  // If we've failed a few heartbeats in a row, back off to the normal
359
  // interval, rather than retrying in a loop.
360
423k
  if (consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
361
114
    LOG_WITH_PREFIX(WARNING) << "Failed " << consecutive_failed_heartbeats_  <<" heartbeats "
362
114
                             << "in a row: no longer allowing fast heartbeat attempts.";
363
114
  }
364
365
423k
  return consecutive_failed_heartbeats_ > FLAGS_heartbeat_max_failures_before_backoff ?
366
423k
    FLAGS_heartbeat_interval_ms : 0;
367
423k
}
368
369
821k
int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const {
370
  // If the master needs something from us, we should immediately
371
  // send another heartbeat with that info, rather than waiting for the interval.
372
821k
  if (sending_full_report_ ||
373
821k
      last_hb_response_.needs_reregister() ||
374
423k
      last_hb_response_.needs_full_tablet_report()) {
375
423k
    return GetMinimumHeartbeatMillis();
376
423k
  }
377
378
397k
  return FLAGS_heartbeat_interval_ms;
379
397k
}
380
381
382
403k
Status Heartbeater::Thread::TryHeartbeat() {
383
403k
  master::TSHeartbeatRequestPB req;
384
385
403k
  SetupCommonField(req.mutable_common());
386
403k
  if (last_hb_response_.needs_reregister()) {
387
5.95k
    LOG_WITH_PREFIX(INFO) << "Registering TS with master...";
388
5.95k
    RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()),
389
5.95k
                          "Unable to set up registration");
390
5.95k
    auto capabilities = Capabilities();
391
5.95k
    *req.mutable_registration()->mutable_capabilities() =
392
5.95k
        google::protobuf::RepeatedField<CapabilityId>(capabilities.begin(), capabilities.end());
393
5.95k
  }
394
395
403k
  if (last_hb_response_.needs_full_tablet_report()) {
396
5.59k
    LOG_WITH_PREFIX(INFO) << "Sending a full tablet report to master...";
397
5.59k
    server_->tablet_manager()->StartFullTabletReport(req.mutable_tablet_report());
398
5.59k
    sending_full_report_ = true;
399
398k
  } else {
400
398k
    if (sending_full_report_) {
401
249
      LOG_WITH_PREFIX(INFO) << "Continuing full tablet report to master...";
402
398k
    } else {
403
3
      VLOG_WITH_PREFIX(2) << "Sending an incremental tablet report to master...";
404
398k
    }
405
398k
    server_->tablet_manager()->GenerateTabletReport(req.mutable_tablet_report(),
406
398k
                                                    !sending_full_report_ /* include_bootstrap */);
407
398k
  }
408
403k
  req.mutable_tablet_report()->set_is_incremental(!sending_full_report_);
409
403k
  req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
410
403k
  req.set_leader_count(server_->tablet_manager()->GetLeaderCount());
411
412
403k
  for (auto& data_provider : data_providers_) {
413
403k
    data_provider->AddData(last_hb_response_, &req);
414
403k
  }
415
416
403k
  RpcController rpc;
417
403k
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
418
419
403k
  req.set_config_index(server_->GetCurrentMasterIndex());
420
403k
  req.set_cluster_config_version(server_->cluster_config_version());
421
403k
  req.set_rtt_us(heartbeat_rtt_.ToMicroseconds());
422
423
  // Include the hybrid time of this tablet server in the heartbeat.
424
403k
  auto* hybrid_clock = dynamic_cast<server::HybridClock*>(server_->Clock());
425
403k
  if (hybrid_clock) {
426
403k
    req.set_ts_hybrid_time(hybrid_clock->Now().ToUint64());
427
    // Also include the physical clock time of this tablet server in the heartbeat.
428
403k
    Result<PhysicalTime> now = hybrid_clock->physical_clock()->Now();
429
403k
    if (!now.ok()) {
430
0
      YB_LOG_EVERY_N_SECS(WARNING, 10) << "Failed to read clock: " << now.status();
431
0
      req.set_ts_physical_time(0);
432
403k
    } else {
433
403k
      req.set_ts_physical_time(now->time_point);
434
403k
    }
435
273
  } else {
436
273
    req.set_ts_hybrid_time(0);
437
273
    req.set_ts_physical_time(0);
438
273
  }
439
440
403k
  {
441
3
    VLOG_WITH_PREFIX(2) << "Sending heartbeat:\n" << req.DebugString();
442
403k
    heartbeat_rtt_ = MonoDelta::kZero;
443
403k
    MonoTime start_time = MonoTime::Now();
444
403k
    master::TSHeartbeatResponsePB resp;
445
403k
    RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc),
446
403k
                          "Failed to send heartbeat");
447
403k
    MonoTime end_time = MonoTime::Now();
448
403k
    if (resp.has_error()) {
449
695
      if (resp.error().code() != master::MasterErrorPB::NOT_THE_LEADER) {
450
0
        return StatusFromPB(resp.error().status());
451
695
      } else {
452
695
        DCHECK(!resp.leader_master());
453
        // Treat a not-the-leader error code as leader_master=false.
454
695
        if (resp.leader_master()) {
455
0
          LOG_WITH_PREFIX(WARNING) << "Setting leader master to false for "
456
0
                                   << resp.error().code() << " code.";
457
0
          resp.set_leader_master(false);
458
0
        }
459
695
      }
460
695
    }
461
462
31
    VLOG_WITH_PREFIX(2) << "Received heartbeat response:\n" << resp.DebugString();
463
403k
    if (resp.has_master_config()) {
464
135
      LOG_WITH_PREFIX(INFO) << "Received heartbeat response with config " << resp.DebugString();
465
466
135
      RETURN_NOT_OK(server_->UpdateMasterAddresses(resp.master_config(), resp.leader_master()));
467
135
    }
468
469
403k
    if (!resp.leader_master()) {
470
      // If the master is no longer a leader, reset proxy so that we can
471
      // determine the master and attempt to heartbeat during in the
472
      // next heartbeat interval.
473
695
      proxy_.reset();
474
695
      return STATUS_FORMAT(ServiceUnavailable, "Master is no longer the leader: $0", resp.error());
475
695
    }
476
477
    // Check for a universe key registry for encryption.
478
402k
    if (resp.has_universe_key_registry()) {
479
3
      RETURN_NOT_OK(server_->SetUniverseKeyRegistry(resp.universe_key_registry()));
480
3
    }
481
482
    // Check for CDC Universe Replication.
483
402k
    if (resp.has_consumer_registry()) {
484
0
      int32_t cluster_config_version = -1;
485
0
      if (!resp.has_cluster_config_version()) {
486
0
        YB_LOG_EVERY_N_SECS(INFO, 30)
487
0
            << "Invalid heartbeat response without a cluster config version";
488
0
      } else {
489
0
        cluster_config_version = resp.cluster_config_version();
490
0
      }
491
0
      RETURN_NOT_OK(static_cast<enterprise::TabletServer*>(server_)->
492
0
          SetConfigVersionAndConsumerRegistry(cluster_config_version, &resp.consumer_registry()));
493
402k
    } else if (resp.has_cluster_config_version()) {
494
402k
      RETURN_NOT_OK(static_cast<enterprise::TabletServer*>(server_)->
495
402k
          SetConfigVersionAndConsumerRegistry(resp.cluster_config_version(), nullptr));
496
402k
    }
497
498
    // At this point we know resp is a successful heartbeat response from the master so set it as
499
    // the last heartbeat response. This invalidates resp so we should use last_hb_response_ instead
500
    // below (hence using the nested scope for resp until here).
501
402k
    last_hb_response_.Swap(&resp);
502
402k
    heartbeat_rtt_ = end_time.GetDeltaSince(start_time);
503
402k
  }
504
505
402k
  if (last_hb_response_.has_cluster_uuid() && !last_hb_response_.cluster_uuid().empty()) {
506
5.59k
    server_->set_cluster_uuid(last_hb_response_.cluster_uuid());
507
5.59k
  }
508
509
  // The Master responds with the max entries for a single Tablet Report to avoid overwhelming it.
510
402k
  if (last_hb_response_.has_tablet_report_limit()) {
511
402k
    server_->tablet_manager()->SetReportLimit(last_hb_response_.tablet_report_limit());
512
402k
  }
513
514
402k
  if (last_hb_response_.needs_full_tablet_report()) {
515
5.59k
    return STATUS(TryAgain, "");
516
5.59k
  }
517
518
  // Handle TSHeartbeatResponsePB (e.g. tablets ack'd by master as processed)
519
396k
  bool all_processed = req.tablet_report().remaining_tablet_count() == 0 &&
520
396k
                       !last_hb_response_.tablet_report().processing_truncated();
521
396k
  server_->tablet_manager()->MarkTabletReportAcknowledged(
522
396k
      req.tablet_report().sequence_number(), last_hb_response_.tablet_report(), all_processed);
523
524
  // Trigger another heartbeat ASAP if we didn't process all tablets on this request.
525
396k
  sending_full_report_ = sending_full_report_ && !all_processed;
526
527
  // Update the master's YSQL catalog version (i.e. if there were schema changes for YSQL objects).
528
396k
  if (last_hb_response_.has_ysql_catalog_version()) {
529
396k
    if (FLAGS_log_ysql_catalog_versions) {
530
0
      VLOG_WITH_FUNC(1) << "got master catalog version: "
531
0
                        << last_hb_response_.ysql_catalog_version()
532
0
                        << ", breaking version: "
533
0
                        << (last_hb_response_.has_ysql_last_breaking_catalog_version()
534
0
                            ? Format("$1", last_hb_response_.ysql_last_breaking_catalog_version())
535
0
                            : "(none)");
536
0
    }
537
396k
    if (last_hb_response_.has_ysql_last_breaking_catalog_version()) {
538
396k
      server_->SetYSQLCatalogVersion(last_hb_response_.ysql_catalog_version(),
539
396k
                                     last_hb_response_.ysql_last_breaking_catalog_version());
540
0
    } else {
541
      /* Assuming all changes are breaking if last breaking version not explicitly set. */
542
0
      server_->SetYSQLCatalogVersion(last_hb_response_.ysql_catalog_version(),
543
0
                                     last_hb_response_.ysql_catalog_version());
544
0
    }
545
396k
  }
546
547
396k
  RETURN_NOT_OK(server_->tablet_manager()->UpdateSnapshotsInfo(last_hb_response_.snapshots_info()));
548
549
396k
  if (last_hb_response_.has_transaction_tables_version()) {
550
396k
    server_->UpdateTransactionTablesVersion(last_hb_response_.transaction_tables_version());
551
396k
  }
552
553
  // Update the live tserver list.
554
396k
  return server_->PopulateLiveTServers(last_hb_response_);
555
396k
}
556
557
816k
Status Heartbeater::Thread::DoHeartbeat() {
558
816k
  if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
559
95
    return STATUS(IOError, "failing all heartbeats for tests");
560
95
  }
561
562
816k
  if (PREDICT_FALSE(FLAGS_TEST_tserver_disable_heartbeat)) {
563
417k
    YB_LOG_EVERY_N_SECS(INFO, 1) << "Heartbeat disabled for testing.";
564
417k
    return Status::OK();
565
417k
  }
566
567
399k
  CHECK(IsCurrentThread());
568
569
399k
  if (!proxy_) {
570
0
    VLOG_WITH_PREFIX(1) << "No valid master proxy. Connecting...";
571
6.86k
    RETURN_NOT_OK(ConnectToMaster());
572
6.51k
    DCHECK(proxy_);
573
6.51k
  }
574
575
403k
  for (;;) {
576
403k
    auto status = TryHeartbeat();
577
403k
    if (!status.ok() && status.IsTryAgain()) {
578
5.59k
      continue;
579
5.59k
    }
580
398k
    return status;
581
398k
  }
582
583
296
  return Status::OK();
584
398k
}
585
586
5.80k
void Heartbeater::Thread::RunThread() {
587
5.80k
  CHECK(IsCurrentThread());
588
0
  VLOG_WITH_PREFIX(1) << "Heartbeat thread starting";
589
590
  // Config the "last heartbeat response" to indicate that we need to register
591
  // -- since we've never registered before, we know this to be true.
592
5.80k
  last_hb_response_.set_needs_reregister(true);
593
  // Have the Master request a full tablet report on 2nd HB, once it knows our capabilities.
594
5.80k
  last_hb_response_.set_needs_full_tablet_report(false);
595
596
827k
  while (true) {
597
821k
    MonoTime next_heartbeat = MonoTime::Now();
598
821k
    next_heartbeat.AddDelta(MonoDelta::FromMilliseconds(GetMillisUntilNextHeartbeat()));
599
600
    // Wait for either the heartbeat interval to elapse, or for an "ASAP" heartbeat,
601
    // or for the signal to shut down.
602
821k
    {
603
821k
      MutexLock l(mutex_);
604
1.13M
      while (true) {
605
1.13M
        MonoDelta remaining = next_heartbeat.GetDeltaSince(MonoTime::Now());
606
1.13M
        if (remaining.ToMilliseconds() <= 0 ||
607
483k
            heartbeat_asap_ ||
608
816k
            !should_run_) {
609
816k
          break;
610
816k
        }
611
314k
        cond_.TimedWait(remaining);
612
314k
      }
613
614
821k
      heartbeat_asap_ = false;
615
616
821k
      if (!should_run_) {
617
0
        VLOG_WITH_PREFIX(1) << "Heartbeat thread finished";
618
74
        return;
619
74
      }
620
821k
    }
621
622
821k
    Status s = DoHeartbeat();
623
821k
    if (!s.ok()) {
624
1.98k
      const auto master_addresses = get_master_addresses();
625
1.98k
      LOG_WITH_PREFIX(WARNING)
626
1.98k
          << "Failed to heartbeat to " << leader_master_hostport_.ToString()
627
1.98k
          << ": " << s << " tries=" << consecutive_failed_heartbeats_
628
1.98k
          << ", num=" << master_addresses->size()
629
1.98k
          << ", masters=" << yb::ToString(master_addresses)
630
1.98k
          << ", code=" << s.CodeAsString();
631
1.98k
      consecutive_failed_heartbeats_++;
632
      // If there's multiple masters...
633
1.98k
      if (master_addresses->size() > 1 || (*master_addresses)[0].size() > 1) {
634
        // If we encountered a network error (e.g., connection refused) or reached our failure
635
        // threshold.  Try determining the leader master again.  Heartbeats function as a watchdog,
636
        // so timeouts should be considered normal failures.
637
1.15k
        if (s.IsNetworkError() ||
638
788
            consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
639
485
          proxy_.reset();
640
485
        }
641
1.15k
      }
642
1.98k
      continue;
643
1.98k
    }
644
819k
    consecutive_failed_heartbeats_ = 0;
645
819k
  }
646
5.80k
}
647
648
404k
bool Heartbeater::Thread::IsCurrentThread() const {
649
404k
  return thread_.get() == yb::Thread::current_thread();
650
404k
}
651
652
5.80k
Status Heartbeater::Thread::Start() {
653
5.80k
  CHECK(thread_ == nullptr);
654
655
5.80k
  should_run_ = true;
656
5.80k
  return yb::Thread::Create("heartbeater", "heartbeat",
657
5.80k
      &Heartbeater::Thread::RunThread, this, &thread_);
658
5.80k
}
659
660
153
Status Heartbeater::Thread::Stop() {
661
153
  if (!thread_) {
662
76
    return Status::OK();
663
76
  }
664
665
77
  {
666
77
    MutexLock l(mutex_);
667
77
    should_run_ = false;
668
77
    cond_.Signal();
669
77
  }
670
77
  RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
671
77
  thread_ = nullptr;
672
77
  return Status::OK();
673
77
}
674
675
338k
void Heartbeater::Thread::TriggerASAP() {
676
338k
  MutexLock l(mutex_);
677
338k
  heartbeat_asap_ = true;
678
338k
  cond_.Signal();
679
338k
}
680
681
682
0
const std::string& HeartbeatDataProvider::LogPrefix() const {
683
0
  return server_.LogPrefix();
684
0
}
685
686
void PeriodicalHeartbeatDataProvider::AddData(
687
403k
    const master::TSHeartbeatResponsePB& last_resp, master::TSHeartbeatRequestPB* req) {
688
403k
  if (prev_run_time_ + period_ < CoarseMonoClock::Now()) {
689
37.7k
    DoAddData(last_resp, req);
690
37.7k
    prev_run_time_ = CoarseMonoClock::Now();
691
37.7k
  }
692
403k
}
693
694
} // namespace tserver
695
} // namespace yb