YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/master_heartbeat_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/common/common_flags.h"
15
16
#include "yb/master/catalog_entity_info.pb.h"
17
#include "yb/master/catalog_manager.h"
18
#include "yb/master/master_heartbeat.service.h"
19
#include "yb/master/master_service_base.h"
20
#include "yb/master/master_service_base-internal.h"
21
#include "yb/master/ts_manager.h"
22
23
#include "yb/util/flag_tags.h"
24
25
DEFINE_int32(tablet_report_limit, 1000,
26
             "Max Number of tablets to report during a single heartbeat. "
27
             "If this is set to INT32_MAX, then heartbeat will report all dirty tablets.");
28
TAG_FLAG(tablet_report_limit, advanced);
29
30
DECLARE_int32(heartbeat_rpc_timeout_ms);
31
32
DECLARE_CAPABILITY(TabletReportLimit);
33
34
using namespace std::literals;
35
36
namespace yb {
37
namespace master {
38
39
namespace {
40
41
class MasterHeartbeatServiceImpl : public MasterServiceBase, public MasterHeartbeatIf {
42
 public:
43
  explicit MasterHeartbeatServiceImpl(Master* master)
44
5.42k
      : MasterServiceBase(master), MasterHeartbeatIf(master->metric_entity()) {}
45
46
  void TSHeartbeat(const TSHeartbeatRequestPB* req,
47
                   TSHeartbeatResponsePB* resp,
48
383k
                   rpc::RpcContext rpc) override {
49
383k
    LongOperationTracker long_operation_tracker("TSHeartbeat", 1s);
50
51
    // If CatalogManager is not initialized don't even know whether or not we will
52
    // be a leader (so we can't tell whether or not we can accept tablet reports).
53
383k
    SCOPED_LEADER_SHARED_LOCK(l, server_->catalog_manager_impl());
54
55
383k
    consensus::ConsensusStatePB cpb;
56
383k
    Status s = server_->catalog_manager_impl()->GetCurrentConfig(&cpb);
57
383k
    if (!s.ok()) {
58
      // For now, we skip setting the config on errors (hopefully next heartbeat will work).
59
      // We could enhance to fail rpc, if there are too many error, on a case by case error basis.
60
0
      LOG(WARNING) << "Could not set master raft config : " << s.ToString();
61
384k
    } else if (cpb.has_config()) {
62
384k
      if (cpb.config().opid_index() > req->config_index()) {
63
135
        *resp->mutable_master_config() = std::move(cpb.config());
64
135
        LOG(INFO) << "Set config at index " << resp->master_config().opid_index() << " for ts uuid "
65
135
                  << req->common().ts_instance().permanent_uuid();
66
135
      }
67
384k
    } // Do nothing if config not ready.
68
69
383k
    if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, &rpc)) {
70
707
      resp->set_leader_master(false);
71
707
      return;
72
707
    }
73
74
382k
    resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
75
382k
    resp->set_leader_master(true);
76
77
    // If the TS is registering, register in the TS manager.
78
382k
    if (req->has_registration()) {
79
5.59k
      Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
80
5.59k
                                                   req->registration(),
81
5.59k
                                                   server_->MakeCloudInfoPB(),
82
5.59k
                                                   &server_->proxy_cache());
83
5.59k
      if (!s.ok()) {
84
0
        LOG(WARNING) << "Unable to register tablet server (" << rpc.requestor_string() << "): "
85
0
                     << s.ToString();
86
        // TODO: add service-specific errors.
87
0
        rpc.RespondFailure(s);
88
0
        return;
89
0
      }
90
5.59k
      SysClusterConfigEntryPB cluster_config;
91
5.59k
      s = server_->catalog_manager_impl()->GetClusterConfig(&cluster_config);
92
5.59k
      if (!s.ok()) {
93
0
        LOG(WARNING) << "Unable to get cluster configuration: " << s.ToString();
94
0
        rpc.RespondFailure(s);
95
0
      }
96
5.59k
      resp->set_cluster_uuid(cluster_config.cluster_uuid());
97
5.59k
    }
98
99
382k
    s = server_->catalog_manager_impl()->FillHeartbeatResponse(req, resp);
100
382k
    if (!s.ok()) {
101
0
      LOG(WARNING) << "Unable to fill heartbeat response: " << s.ToString();
102
0
      rpc.RespondFailure(s);
103
0
    }
104
105
    // Look up the TS -- if it just registered above, it will be found here.
106
    // This allows the TS to register and tablet-report in the same RPC.
107
382k
    TSDescriptorPtr ts_desc;
108
382k
    s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
109
382k
    if (s.IsNotFound()) {
110
134
      LOG(INFO) << "Got heartbeat from unknown tablet server { "
111
134
                << req->common().ts_instance().ShortDebugString()
112
134
                << " } as " << rpc.requestor_string()
113
134
                << "; Asking this server to re-register.";
114
134
      resp->set_needs_reregister(true);
115
134
      resp->set_needs_full_tablet_report(true);
116
134
      rpc.RespondSuccess();
117
134
      return;
118
382k
    } else if (!s.ok()) {
119
0
      LOG(WARNING) << "Unable to look up tablet server for heartbeat request "
120
0
                   << req->DebugString() << " from " << rpc.requestor_string()
121
0
                   << "\nStatus: " << s.ToString();
122
0
      rpc.RespondFailure(s.CloneAndPrepend("Unable to lookup TS"));
123
0
      return;
124
0
    }
125
126
382k
    ts_desc->UpdateHeartbeat(req);
127
128
    // Adjust the table report limit per heartbeat so this can be dynamically changed.
129
383k
    if (ts_desc->HasCapability(CAPABILITY_TabletReportLimit)) {
130
383k
      resp->set_tablet_report_limit(FLAGS_tablet_report_limit);
131
383k
    }
132
133
    // Set the TServer metrics in TS Descriptor.
134
382k
    if (req->has_metrics()) {
135
33.7k
      ts_desc->UpdateMetrics(req->metrics());
136
33.7k
    }
137
138
382k
    if (req->has_tablet_report()) {
139
382k
      s = server_->catalog_manager_impl()->ProcessTabletReport(
140
382k
        ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), &rpc);
141
382k
      if (!s.ok()) {
142
0
        rpc.RespondFailure(s.CloneAndPrepend("Failed to process tablet report"));
143
0
        return;
144
0
      }
145
382k
    }
146
147
383k
    if (!req->has_tablet_report() || req->tablet_report().is_incremental()) {
148
      // Only process split tablets if we have plenty of time to process the work
149
      // (> 50% of timeout).
150
376k
      auto safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2);
151
152
376k
      safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2);
153
376k
      if (rpc.GetClientDeadline() > safe_time_left) {
154
286k
        for (const auto& storage_metadata : req->storage_metadata()) {
155
286k
          server_->catalog_manager_impl()->ProcessTabletStorageMetadata(
156
286k
                ts_desc.get()->permanent_uuid(), storage_metadata);
157
286k
        }
158
374k
      }
159
160
      // Only set once. It may take multiple heartbeats to receive a full tablet report.
161
376k
      if (!ts_desc->has_tablet_report()) {
162
5.46k
        resp->set_needs_full_tablet_report(true);
163
5.46k
      }
164
376k
    }
165
166
    // Retrieve all the nodes known by the master.
167
382k
    std::vector<std::shared_ptr<TSDescriptor>> descs;
168
382k
    server_->ts_manager()->GetAllLiveDescriptors(&descs);
169
1.25M
    for (const auto& desc : descs) {
170
1.25M
      *resp->add_tservers() = *desc->GetTSInformationPB();
171
1.25M
    }
172
173
    // Retrieve the ysql catalog schema version.
174
382k
    uint64_t last_breaking_version = 0;
175
382k
    uint64_t catalog_version = 0;
176
382k
    s = server_->catalog_manager_impl()->GetYsqlCatalogVersion(
177
382k
        &catalog_version, &last_breaking_version);
178
382k
    if (s.ok()) {
179
382k
      resp->set_ysql_catalog_version(catalog_version);
180
382k
      resp->set_ysql_last_breaking_catalog_version(last_breaking_version);
181
382k
      if (FLAGS_log_ysql_catalog_versions) {
182
0
        VLOG_WITH_FUNC(1) << "responding (to ts " << req->common().ts_instance().permanent_uuid()
183
0
                          << ") catalog version: " << catalog_version
184
0
                          << ", breaking version: " << last_breaking_version;
185
0
      }
186
18.4E
    } else {
187
18.4E
      LOG(WARNING) << "Could not get YSQL catalog version for heartbeat response: "
188
18.4E
                   << s.ToUserMessage();
189
18.4E
    }
190
191
382k
    uint64_t transaction_tables_version = server_->catalog_manager()->GetTransactionTablesVersion();
192
382k
    resp->set_transaction_tables_version(transaction_tables_version);
193
194
382k
    rpc.RespondSuccess();
195
382k
  }
196
197
};
198
199
} // namespace
200
201
5.42k
std::unique_ptr<rpc::ServiceIf> MakeMasterHeartbeatService(Master* master) {
202
5.42k
  return std::make_unique<MasterHeartbeatServiceImpl>(master);
203
5.42k
}
204
205
} // namespace master
206
} // namespace yb