YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/master_heartbeat_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/common/common_flags.h"
15
16
#include "yb/master/catalog_entity_info.pb.h"
17
#include "yb/master/catalog_manager.h"
18
#include "yb/master/master_heartbeat.service.h"
19
#include "yb/master/master_service_base.h"
20
#include "yb/master/master_service_base-internal.h"
21
#include "yb/master/ts_manager.h"
22
23
#include "yb/util/flag_tags.h"
24
25
DEFINE_int32(tablet_report_limit, 1000,
26
             "Max Number of tablets to report during a single heartbeat. "
27
             "If this is set to INT32_MAX, then heartbeat will report all dirty tablets.");
28
TAG_FLAG(tablet_report_limit, advanced);
29
30
DECLARE_int32(heartbeat_rpc_timeout_ms);
31
32
DECLARE_CAPABILITY(TabletReportLimit);
33
34
using namespace std::literals;
35
36
namespace yb {
37
namespace master {
38
39
namespace {
40
41
class MasterHeartbeatServiceImpl : public MasterServiceBase, public MasterHeartbeatIf {
42
 public:
43
  explicit MasterHeartbeatServiceImpl(Master* master)
44
8.03k
      : MasterServiceBase(master), MasterHeartbeatIf(master->metric_entity()) {}
45
46
  void TSHeartbeat(const TSHeartbeatRequestPB* req,
47
                   TSHeartbeatResponsePB* resp,
48
5.17M
                   rpc::RpcContext rpc) override {
49
5.17M
    LongOperationTracker long_operation_tracker("TSHeartbeat", 1s);
50
51
    // If CatalogManager is not initialized don't even know whether or not we will
52
    // be a leader (so we can't tell whether or not we can accept tablet reports).
53
5.17M
    SCOPED_LEADER_SHARED_LOCK(l, server_->catalog_manager_impl());
54
55
5.17M
    consensus::ConsensusStatePB cpb;
56
5.17M
    Status s = server_->catalog_manager_impl()->GetCurrentConfig(&cpb);
57
5.17M
    if (!s.ok()) {
58
      // For now, we skip setting the config on errors (hopefully next heartbeat will work).
59
      // We could enhance to fail rpc, if there are too many error, on a case by case error basis.
60
8
      LOG(WARNING) << "Could not set master raft config : " << s.ToString();
61
5.17M
    } else 
if (5.17M
cpb.has_config()5.17M
) {
62
5.17M
      if (cpb.config().opid_index() > req->config_index()) {
63
224
        *resp->mutable_master_config() = std::move(cpb.config());
64
224
        LOG(INFO) << "Set config at index " << resp->master_config().opid_index() << " for ts uuid "
65
224
                  << req->common().ts_instance().permanent_uuid();
66
224
      }
67
5.17M
    } // Do nothing if config not ready.
68
69
5.17M
    if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, &rpc)) {
70
368k
      resp->set_leader_master(false);
71
368k
      return;
72
368k
    }
73
74
4.80M
    resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
75
4.80M
    resp->set_leader_master(true);
76
77
    // If the TS is registering, register in the TS manager.
78
4.80M
    if (req->has_registration()) {
79
8.20k
      Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
80
8.20k
                                                   req->registration(),
81
8.20k
                                                   server_->MakeCloudInfoPB(),
82
8.20k
                                                   &server_->proxy_cache());
83
8.20k
      if (!s.ok()) {
84
0
        LOG(WARNING) << "Unable to register tablet server (" << rpc.requestor_string() << "): "
85
0
                     << s.ToString();
86
        // TODO: add service-specific errors.
87
0
        rpc.RespondFailure(s);
88
0
        return;
89
0
      }
90
8.20k
      SysClusterConfigEntryPB cluster_config;
91
8.20k
      s = server_->catalog_manager_impl()->GetClusterConfig(&cluster_config);
92
8.20k
      if (!s.ok()) {
93
0
        LOG(WARNING) << "Unable to get cluster configuration: " << s.ToString();
94
0
        rpc.RespondFailure(s);
95
0
      }
96
8.20k
      resp->set_cluster_uuid(cluster_config.cluster_uuid());
97
8.20k
    }
98
99
4.80M
    s = server_->catalog_manager_impl()->FillHeartbeatResponse(req, resp);
100
4.80M
    if (!s.ok()) {
101
0
      LOG(WARNING) << "Unable to fill heartbeat response: " << s.ToString();
102
0
      rpc.RespondFailure(s);
103
0
    }
104
105
    // Look up the TS -- if it just registered above, it will be found here.
106
    // This allows the TS to register and tablet-report in the same RPC.
107
4.80M
    TSDescriptorPtr ts_desc;
108
4.80M
    s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
109
4.80M
    if (s.IsNotFound()) {
110
201
      LOG(INFO) << "Got heartbeat from unknown tablet server { "
111
201
                << req->common().ts_instance().ShortDebugString()
112
201
                << " } as " << rpc.requestor_string()
113
201
                << "; Asking this server to re-register.";
114
201
      resp->set_needs_reregister(true);
115
201
      resp->set_needs_full_tablet_report(true);
116
201
      rpc.RespondSuccess();
117
201
      return;
118
4.80M
    } else if (!s.ok()) {
119
0
      LOG(WARNING) << "Unable to look up tablet server for heartbeat request "
120
0
                   << req->DebugString() << " from " << rpc.requestor_string()
121
0
                   << "\nStatus: " << s.ToString();
122
0
      rpc.RespondFailure(s.CloneAndPrepend("Unable to lookup TS"));
123
0
      return;
124
0
    }
125
126
4.80M
    ts_desc->UpdateHeartbeat(req);
127
128
    // Adjust the table report limit per heartbeat so this can be dynamically changed.
129
4.80M
    if (
ts_desc->HasCapability(CAPABILITY_TabletReportLimit)4.80M
) {
130
4.80M
      resp->set_tablet_report_limit(FLAGS_tablet_report_limit);
131
4.80M
    }
132
133
    // Set the TServer metrics in TS Descriptor.
134
4.80M
    if (req->has_metrics()) {
135
956k
      ts_desc->UpdateMetrics(req->metrics());
136
956k
    }
137
138
4.80M
    if (
req->has_tablet_report()4.80M
) {
139
4.80M
      s = server_->catalog_manager_impl()->ProcessTabletReport(
140
4.80M
        ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), &rpc);
141
4.80M
      if (!s.ok()) {
142
2
        rpc.RespondFailure(s.CloneAndPrepend("Failed to process tablet report"));
143
2
        return;
144
2
      }
145
4.80M
    }
146
147
4.80M
    
if (4.80M
!req->has_tablet_report()4.80M
|| req->tablet_report().is_incremental()) {
148
      // Only process split tablets if we have plenty of time to process the work
149
      // (> 50% of timeout).
150
4.79M
      auto safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2);
151
152
4.79M
      safe_time_left = CoarseMonoClock::Now() + (FLAGS_heartbeat_rpc_timeout_ms * 1ms / 2);
153
4.79M
      if (rpc.GetClientDeadline() > safe_time_left) {
154
4.73M
        for (const auto& storage_metadata : req->storage_metadata()) {
155
2.19M
          server_->catalog_manager_impl()->ProcessTabletStorageMetadata(
156
2.19M
                ts_desc.get()->permanent_uuid(), storage_metadata);
157
2.19M
        }
158
4.73M
      }
159
160
      // Only set once. It may take multiple heartbeats to receive a full tablet report.
161
4.79M
      if (!ts_desc->has_tablet_report()) {
162
8.03k
        resp->set_needs_full_tablet_report(true);
163
8.03k
      }
164
4.79M
    }
165
166
    // Retrieve all the nodes known by the master.
167
4.80M
    std::vector<std::shared_ptr<TSDescriptor>> descs;
168
4.80M
    server_->ts_manager()->GetAllLiveDescriptors(&descs);
169
17.6M
    for (const auto& desc : descs) {
170
17.6M
      *resp->add_tservers() = *desc->GetTSInformationPB();
171
17.6M
    }
172
173
    // Retrieve the ysql catalog schema version.
174
4.80M
    uint64_t last_breaking_version = 0;
175
4.80M
    uint64_t catalog_version = 0;
176
4.80M
    s = server_->catalog_manager_impl()->GetYsqlCatalogVersion(
177
4.80M
        &catalog_version, &last_breaking_version);
178
4.80M
    if (
s.ok()4.80M
) {
179
4.80M
      resp->set_ysql_catalog_version(catalog_version);
180
4.80M
      resp->set_ysql_last_breaking_catalog_version(last_breaking_version);
181
4.80M
      if (FLAGS_log_ysql_catalog_versions) {
182
0
        VLOG_WITH_FUNC(1) << "responding (to ts " << req->common().ts_instance().permanent_uuid()
183
0
                          << ") catalog version: " << catalog_version
184
0
                          << ", breaking version: " << last_breaking_version;
185
0
      }
186
18.4E
    } else {
187
18.4E
      LOG(WARNING) << "Could not get YSQL catalog version for heartbeat response: "
188
18.4E
                   << s.ToUserMessage();
189
18.4E
    }
190
191
4.80M
    uint64_t transaction_tables_version = server_->catalog_manager()->GetTransactionTablesVersion();
192
4.80M
    resp->set_transaction_tables_version(transaction_tables_version);
193
194
4.80M
    rpc.RespondSuccess();
195
4.80M
  }
196
197
};
198
199
} // namespace
200
201
8.03k
std::unique_ptr<rpc::ServiceIf> MakeMasterHeartbeatService(Master* master) {
202
8.03k
  return std::make_unique<MasterHeartbeatServiceImpl>(master);
203
8.03k
}
204
205
} // namespace master
206
} // namespace yb