/Users/deen/code/yugabyte-db/src/yb/master/master_heartbeat_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/common/common_flags.h" |
15 | | |
16 | | #include "yb/master/catalog_entity_info.pb.h" |
17 | | #include "yb/master/catalog_manager.h" |
18 | | #include "yb/master/master_heartbeat.service.h" |
19 | | #include "yb/master/master_service_base.h" |
20 | | #include "yb/master/master_service_base-internal.h" |
21 | | #include "yb/master/ts_manager.h" |
22 | | |
23 | | #include "yb/util/flag_tags.h" |
24 | | |
25 | | DEFINE_int32(tablet_report_limit, 1000, |
26 | | "Max Number of tablets to report during a single heartbeat. " |
27 | | "If this is set to INT32_MAX, then heartbeat will report all dirty tablets."); |
28 | | TAG_FLAG(tablet_report_limit, advanced); |
29 | | |
30 | | DECLARE_int32(heartbeat_rpc_timeout_ms); |
31 | | |
32 | | DECLARE_CAPABILITY(TabletReportLimit); |
33 | | |
34 | | using namespace std::literals; |
35 | | |
36 | | namespace yb { |
37 | | namespace master { |
38 | | |
39 | | namespace { |
40 | | |
41 | | class MasterHeartbeatServiceImpl : public MasterServiceBase, public MasterHeartbeatIf { |
42 | | public: |
43 | | explicit MasterHeartbeatServiceImpl(Master* master) |
44 | 5.42k | : MasterServiceBase(master), MasterHeartbeatIf(master->metric_entity()) {} |
45 | | |
46 | | void TSHeartbeat(const TSHeartbeatRequestPB* req, |
47 | | TSHeartbeatResponsePB* resp, |
48 | 383k | rpc::RpcContext rpc) override { |
49 | 383k | LongOperationTracker long_operation_tracker("TSHeartbeat", 1s); |
50 | | |
51 | | // If CatalogManager is not initialized don't even know whether or not we will |
52 | | // be a leader (so we can't tell whether or not we can accept tablet reports). |
53 | 383k | SCOPED_LEADER_SHARED_LOCK(l, server_->catalog_manager_impl()); |
54 | | |
55 | 383k | consensus::ConsensusStatePB cpb; |
56 | 383k | Status s = server_->catalog_manager_impl()->GetCurrentConfig(&cpb); |
57 | 383k | if (!s.ok()) { |
58 | | // For now, we skip setting the config on errors (hopefully next heartbeat will work). |
59 | | // We could enhance to fail rpc, if there are too many error, on a case by case error basis. |
60 | 0 | LOG(WARNING) << "Could not set master raft config : " << s.ToString(); |
61 | 384k | } else if (cpb.has_config()) { |
62 | 384k | if (cpb.config().opid_index() > req->config_index()) { |
63 | 135 | *resp->mutable_master_config() = std::move(cpb.config()); |
64 | 135 | LOG(INFO) << "Set config at index " << resp->master_config().opid_index() << " for ts uuid " |
65 | 135 | << req->common().ts_instance().permanent_uuid(); |
66 | 135 | } |
67 | 384k | } // Do nothing if config not ready. |
68 | | |
69 | 383k | if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, &rpc)) { |
70 | 707 | resp->set_leader_master(false); |
71 | 707 | return; |
72 | 707 | } |
73 | | |
74 | 382k | resp->mutable_master_instance()->CopyFrom(server_->instance_pb()); |
75 | 382k | resp->set_leader_master(true); |
76 | | |
77 | | // If the TS is registering, register in the TS manager. |
78 | 382k | if (req->has_registration()) { |
79 | 5.59k | Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(), |
80 | 5.59k | req->registration(), |
81 | 5.59k | server_->MakeCloudInfoPB(), |
82 | 5.59k | &server_->proxy_cache()); |
83 | 5.59k | if (!s.ok()) { |
84 | 0 | LOG(WARNING) << "Unable to register tablet server (" << rpc.requestor_string() << "): " |
85 | 0 | << s.ToString(); |
86 | | // TODO: add service-specific errors. |
87 | 0 | rpc.RespondFailure(s); |
88 | 0 | return; |
89 | 0 | } |
90 | 5.59k | SysClusterConfigEntryPB cluster_config; |
91 | 5.59k | s = server_->catalog_manager_impl()->GetClusterConfig(&cluster_config); |
92 | 5.59k | if (!s.ok()) { |
93 | 0 | LOG(WARNING) << "Unable to get cluster configuration: " << s.ToString(); |
94 | 0 | rpc.RespondFailure(s); |
95 | 0 | } |
96 | 5.59k | resp->set_cluster_uuid(cluster_config.cluster_uuid()); |
97 | 5.59k | } |
98 | | |
99 | 382k | s = server_->catalog_manager_impl()->FillHeartbeatResponse(req, resp); |
100 | 382k | if (!s.ok()) { |
101 | 0 | LOG(WARNING) << "Unable to fill heartbeat response: " << s.ToString(); |
102 | 0 | rpc.RespondFailure(s); |
103 | 0 | } |
104 | | |
105 | | // Look up the TS -- if it just registered above, it will be found here. |
106 | | // This allows the TS to register and tablet-report in the same RPC. |
107 | 382k | TSDescriptorPtr ts_desc; |
108 | 382k | s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc); |
109 | 382k | if (s.IsNotFound()) { |
110 | 134 | LOG(INFO) << "Got heartbeat from unknown tablet server { " |
111 | 134 | << req->common().ts_instance().ShortDebugString() |
112 | 134 | << " } as " << rpc.requestor_string() |
113 | 134 | << "; Asking this server to re-register."; |
114 | 134 | resp->set_needs_reregister(true); |
115 | 134 | resp->set_needs_full_tablet_report(true); |
116 | 134 | rpc.RespondSuccess(); |
117 | 134 | return; |
118 | 382k | } else if (!s.ok()) { |
119 | 0 | LOG(WARNING) << "Unable to look up tablet server for heartbeat request " |
120 | 0 | << req->DebugString() << " from " << rpc.requestor_string() |
121 | 0 | << "\nStatus: " << s.ToString(); |
122 | 0 | rpc.RespondFailure(s.CloneAndPrepend("Unable to lookup TS")); |
123 | 0 | return; |
124 | 0 | } |
125 | | |
126 | 382k | ts_desc->UpdateHeartbeat(req); |
127 | | |
128 | | // Adjust the table report limit per heartbeat so this can be dynamically changed. |
129 | 383k | if (ts_desc->HasCapability(CAPABILITY_TabletReportLimit)) { |
130 | 383k | resp->set_tablet_report_limit(FLAGS_tablet_report_limit); |
131 | 383k | } |
132 | | |
133 | | // Set the TServer metrics in TS Descriptor. |
134 | 382k | if (req->has_metrics()) { |
135 | 33.7k | ts_desc->UpdateMetrics(req->metrics()); |
136 | 33.7k | } |
137 | | |
138 | 382k | if (req->has_tablet_report()) { |
139 | 382k | s = server_->catalog_manager_impl()->ProcessTabletReport( |
140 | 382k | ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), &rpc); |
141 | 382k | if (!s.ok()) { |
142 | 0 | rpc.RespondFailure(s.CloneAndPrepend("Failed to process tablet report")); |
143 | 0 | return; |
144 | 0 | } |
145 | 382k | } |
146 | | |
147 | 383k | if (!req->has_tablet_report() || req->tablet_report().is_incremental()) { |
148 | | // Only process split tablets if we have plenty of time to process the work |
149 | | // (> 50% of timeout). |
150 | 376k | auto safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2); |
151 | | |
152 | 376k | safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2); |
153 | 376k | if (rpc.GetClientDeadline() > safe_time_left) { |
154 | 286k | for (const auto& storage_metadata : req->storage_metadata()) { |
155 | 286k | server_->catalog_manager_impl()->ProcessTabletStorageMetadata( |
156 | 286k | ts_desc.get()->permanent_uuid(), storage_metadata); |
157 | 286k | } |
158 | 374k | } |
159 | | |
160 | | // Only set once. It may take multiple heartbeats to receive a full tablet report. |
161 | 376k | if (!ts_desc->has_tablet_report()) { |
162 | 5.46k | resp->set_needs_full_tablet_report(true); |
163 | 5.46k | } |
164 | 376k | } |
165 | | |
166 | | // Retrieve all the nodes known by the master. |
167 | 382k | std::vector<std::shared_ptr<TSDescriptor>> descs; |
168 | 382k | server_->ts_manager()->GetAllLiveDescriptors(&descs); |
169 | 1.25M | for (const auto& desc : descs) { |
170 | 1.25M | *resp->add_tservers() = *desc->GetTSInformationPB(); |
171 | 1.25M | } |
172 | | |
173 | | // Retrieve the ysql catalog schema version. |
174 | 382k | uint64_t last_breaking_version = 0; |
175 | 382k | uint64_t catalog_version = 0; |
176 | 382k | s = server_->catalog_manager_impl()->GetYsqlCatalogVersion( |
177 | 382k | &catalog_version, &last_breaking_version); |
178 | 382k | if (s.ok()) { |
179 | 382k | resp->set_ysql_catalog_version(catalog_version); |
180 | 382k | resp->set_ysql_last_breaking_catalog_version(last_breaking_version); |
181 | 382k | if (FLAGS_log_ysql_catalog_versions) { |
182 | 0 | VLOG_WITH_FUNC(1) << "responding (to ts " << req->common().ts_instance().permanent_uuid() |
183 | 0 | << ") catalog version: " << catalog_version |
184 | 0 | << ", breaking version: " << last_breaking_version; |
185 | 0 | } |
186 | 18.4E | } else { |
187 | 18.4E | LOG(WARNING) << "Could not get YSQL catalog version for heartbeat response: " |
188 | 18.4E | << s.ToUserMessage(); |
189 | 18.4E | } |
190 | | |
191 | 382k | uint64_t transaction_tables_version = server_->catalog_manager()->GetTransactionTablesVersion(); |
192 | 382k | resp->set_transaction_tables_version(transaction_tables_version); |
193 | | |
194 | 382k | rpc.RespondSuccess(); |
195 | 382k | } |
196 | | |
197 | | }; |
198 | | |
199 | | } // namespace |
200 | | |
201 | 5.42k | std::unique_ptr<rpc::ServiceIf> MakeMasterHeartbeatService(Master* master) { |
202 | 5.42k | return std::make_unique<MasterHeartbeatServiceImpl>(master); |
203 | 5.42k | } |
204 | | |
205 | | } // namespace master |
206 | | } // namespace yb |