/Users/deen/code/yugabyte-db/src/yb/master/master.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/master/master.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <list> |
37 | | #include <memory> |
38 | | #include <vector> |
39 | | |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/client/async_initializer.h" |
43 | | #include "yb/client/client.h" |
44 | | |
45 | | #include "yb/common/wire_protocol.h" |
46 | | |
47 | | #include "yb/consensus/consensus_meta.h" |
48 | | |
49 | | #include "yb/gutil/bind.h" |
50 | | |
51 | | #include "yb/master/master_fwd.h" |
52 | | #include "yb/master/catalog_manager.h" |
53 | | #include "yb/master/flush_manager.h" |
54 | | #include "yb/master/master-path-handlers.h" |
55 | | #include "yb/master/master_cluster.proxy.h" |
56 | | #include "yb/master/master_service.h" |
57 | | #include "yb/master/master_tablet_service.h" |
58 | | #include "yb/master/master_util.h" |
59 | | |
60 | | #include "yb/rpc/messenger.h" |
61 | | #include "yb/rpc/service_if.h" |
62 | | #include "yb/rpc/service_pool.h" |
63 | | #include "yb/rpc/yb_rpc.h" |
64 | | |
65 | | #include "yb/server/rpc_server.h" |
66 | | |
67 | | #include "yb/tablet/maintenance_manager.h" |
68 | | |
69 | | #include "yb/tserver/pg_client_service.h" |
70 | | #include "yb/tserver/remote_bootstrap_service.h" |
71 | | #include "yb/tserver/tablet_service.h" |
72 | | #include "yb/tserver/tserver_shared_mem.h" |
73 | | |
74 | | #include "yb/util/flag_tags.h" |
75 | | #include "yb/util/metrics.h" |
76 | | #include "yb/util/net/net_util.h" |
77 | | #include "yb/util/net/sockaddr.h" |
78 | | #include "yb/util/shared_lock.h" |
79 | | #include "yb/util/status.h" |
80 | | #include "yb/util/threadpool.h" |
81 | | |
82 | | DEFINE_int32(master_rpc_timeout_ms, 1500, |
83 | | "Timeout for retrieving master registration over RPC."); |
84 | | TAG_FLAG(master_rpc_timeout_ms, experimental); |
85 | | |
86 | | METRIC_DEFINE_entity(cluster); |
87 | | |
88 | | using std::min; |
89 | | using std::shared_ptr; |
90 | | using std::vector; |
91 | | |
92 | | using yb::consensus::RaftPeerPB; |
93 | | using yb::rpc::ServiceIf; |
94 | | using yb::tserver::ConsensusServiceImpl; |
95 | | using strings::Substitute; |
96 | | |
97 | | DEFINE_int32(master_tserver_svc_num_threads, 10, |
98 | | "Number of RPC worker threads to run for the master tserver service"); |
99 | | TAG_FLAG(master_tserver_svc_num_threads, advanced); |
100 | | |
101 | | DEFINE_int32(master_svc_num_threads, 10, |
102 | | "Number of RPC worker threads to run for the master service"); |
103 | | TAG_FLAG(master_svc_num_threads, advanced); |
104 | | |
105 | | DEFINE_int32(master_consensus_svc_num_threads, 10, |
106 | | "Number of RPC threads for the master consensus service"); |
107 | | TAG_FLAG(master_consensus_svc_num_threads, advanced); |
108 | | |
109 | | DEFINE_int32(master_remote_bootstrap_svc_num_threads, 10, |
110 | | "Number of RPC threads for the master remote bootstrap service"); |
111 | | TAG_FLAG(master_remote_bootstrap_svc_num_threads, advanced); |
112 | | |
113 | | DEFINE_int32(master_tserver_svc_queue_length, 1000, |
114 | | "RPC queue length for master tserver service"); |
115 | | TAG_FLAG(master_tserver_svc_queue_length, advanced); |
116 | | |
117 | | DEFINE_int32(master_svc_queue_length, 1000, |
118 | | "RPC queue length for master service"); |
119 | | TAG_FLAG(master_svc_queue_length, advanced); |
120 | | |
121 | | DEFINE_int32(master_consensus_svc_queue_length, 1000, |
122 | | "RPC queue length for master consensus service"); |
123 | | TAG_FLAG(master_consensus_svc_queue_length, advanced); |
124 | | |
125 | | DEFINE_int32(master_remote_bootstrap_svc_queue_length, 50, |
126 | | "RPC queue length for master remote bootstrap service"); |
127 | | TAG_FLAG(master_remote_bootstrap_svc_queue_length, advanced); |
128 | | |
129 | | DEFINE_test_flag(string, master_extra_list_host_port, "", |
130 | | "Additional host port used in list masters"); |
131 | | |
132 | | DECLARE_int64(inbound_rpc_memory_limit); |
133 | | |
134 | | DECLARE_int32(master_ts_rpc_timeout_ms); |
135 | | namespace yb { |
136 | | namespace master { |
137 | | |
138 | | Master::Master(const MasterOptions& opts) |
139 | | : DbServerBase("Master", opts, "yb.master", server::CreateMemTrackerForServer()), |
140 | | state_(kStopped), |
141 | | ts_manager_(new TSManager()), |
142 | | catalog_manager_(new enterprise::CatalogManager(this)), |
143 | | path_handlers_(new MasterPathHandlers(this)), |
144 | | flush_manager_(new FlushManager(this, catalog_manager())), |
145 | | init_future_(init_status_.get_future()), |
146 | | opts_(opts), |
147 | | maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)), |
148 | | metric_entity_cluster_(METRIC_ENTITY_cluster.Instantiate(metric_registry_.get(), |
149 | | "yb.cluster")), |
150 | 5.45k | master_tablet_server_(new MasterTabletServer(this, metric_entity())) { |
151 | 5.45k | SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>( |
152 | 5.45k | GetAtomicFlag(&FLAGS_inbound_rpc_memory_limit), |
153 | 5.45k | mem_tracker())); |
154 | | |
155 | 5.45k | LOG(INFO) << "yb::master::Master created at " << this; |
156 | 5.45k | LOG(INFO) << "yb::master::TSManager created at " << ts_manager_.get(); |
157 | 5.45k | LOG(INFO) << "yb::master::CatalogManager created at " << catalog_manager_.get(); |
158 | 5.45k | } |
159 | | |
160 | 92 | Master::~Master() { |
161 | 92 | Shutdown(); |
162 | 92 | } |
163 | | |
164 | 189 | string Master::ToString() const { |
165 | 189 | if (state_ != kRunning) { |
166 | 92 | return "Master (stopped)"; |
167 | 92 | } |
168 | 97 | return strings::Substitute("Master@$0", yb::ToString(first_rpc_address())); |
169 | 97 | } |
170 | | |
171 | 5.45k | Status Master::Init() { |
172 | 5.45k | CHECK_EQ(kStopped, state_); |
173 | | |
174 | 5.45k | RETURN_NOT_OK(ThreadPoolBuilder("init").set_max_threads(1).Build(&init_pool_)); |
175 | | |
176 | 5.45k | RETURN_NOT_OK(RpcAndWebServerBase::Init()); |
177 | | |
178 | 5.45k | RETURN_NOT_OK(path_handlers_->Register(web_server_.get())); |
179 | | |
180 | 5.45k | auto bound_addresses = rpc_server()->GetBoundAddresses(); |
181 | 5.45k | if (!bound_addresses.empty()) { |
182 | 5.42k | shared_object().SetHostEndpoint(bound_addresses.front(), get_hostname()); |
183 | 5.42k | } |
184 | | |
185 | 5.45k | async_client_init_ = std::make_unique<client::AsyncClientInitialiser>( |
186 | 5.45k | "master_client", 0 /* num_reactors */, |
187 | | // TODO: use the correct flag |
188 | 5.45k | 60, // FLAGS_tserver_yb_client_default_timeout_ms / 1000, |
189 | 5.45k | "" /* tserver_uuid */, |
190 | 5.45k | &options(), |
191 | 5.45k | metric_entity(), |
192 | 5.45k | mem_tracker(), |
193 | 5.45k | messenger()); |
194 | 5.45k | async_client_init_->builder() |
195 | 5.45k | .set_master_address_flag_name("master_addresses") |
196 | 5.45k | .default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_master_rpc_timeout_ms)) |
197 | 12.8k | .AddMasterAddressSource([this] { |
198 | 12.8k | return catalog_manager_->GetMasterAddresses(); |
199 | 12.8k | }); |
200 | 5.45k | async_client_init_->Start(); |
201 | | |
202 | 5.45k | cdc_state_client_init_ = std::make_unique<client::AsyncClientInitialiser>( |
203 | 5.45k | "cdc_state_client", 0 /* num_reactors */, |
204 | | // TODO: use the correct flag |
205 | 5.45k | 60, // FLAGS_tserver_yb_client_default_timeout_ms / 1000, |
206 | 5.45k | "" /* tserver_uuid */, |
207 | 5.45k | &options(), |
208 | 5.45k | metric_entity(), |
209 | 5.45k | mem_tracker(), |
210 | 5.45k | messenger()); |
211 | 5.45k | cdc_state_client_init_->builder() |
212 | 5.45k | .set_master_address_flag_name("master_addresses") |
213 | 5.45k | .default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms)) |
214 | 6.04k | .AddMasterAddressSource([this] { |
215 | 6.04k | return catalog_manager_->GetMasterAddresses(); |
216 | 6.04k | }); |
217 | 5.45k | cdc_state_client_init_->Start(); |
218 | | |
219 | 5.45k | state_ = kInitialized; |
220 | 5.45k | return Status::OK(); |
221 | 5.45k | } |
222 | | |
223 | 4.93k | Status Master::Start() { |
224 | 4.93k | RETURN_NOT_OK(StartAsync()); |
225 | 4.93k | RETURN_NOT_OK(WaitForCatalogManagerInit()); |
226 | 4.93k | google::FlushLogFiles(google::INFO); // Flush the startup messages. |
227 | 4.93k | return Status::OK(); |
228 | 4.93k | } |
229 | | |
230 | 5.42k | Status Master::RegisterServices() { |
231 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterAdminService(this))); |
232 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterClientService(this))); |
233 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterClusterService(this))); |
234 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterDclService(this))); |
235 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterDdlService(this))); |
236 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterEncryptionService(this))); |
237 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterHeartbeatService(this))); |
238 | 5.42k | RETURN_NOT_OK(RegisterService(FLAGS_master_svc_queue_length, MakeMasterReplicationService(this))); |
239 | | |
240 | 5.42k | std::unique_ptr<ServiceIf> master_tablet_service( |
241 | 5.42k | new MasterTabletServiceImpl(master_tablet_server_.get(), this)); |
242 | 5.42k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_master_tserver_svc_queue_length, |
243 | 5.42k | std::move(master_tablet_service))); |
244 | | |
245 | 5.42k | std::unique_ptr<ServiceIf> consensus_service( |
246 | 5.42k | new ConsensusServiceImpl(metric_entity(), catalog_manager_.get())); |
247 | 5.42k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_master_consensus_svc_queue_length, |
248 | 5.42k | std::move(consensus_service), |
249 | 5.42k | rpc::ServicePriority::kHigh)); |
250 | | |
251 | 5.42k | std::unique_ptr<ServiceIf> remote_bootstrap_service( |
252 | 5.42k | new tserver::RemoteBootstrapServiceImpl( |
253 | 5.42k | fs_manager_.get(), catalog_manager_.get(), metric_entity())); |
254 | 5.42k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_master_remote_bootstrap_svc_queue_length, |
255 | 5.42k | std::move(remote_bootstrap_service))); |
256 | | |
257 | 5.42k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService( |
258 | 5.42k | FLAGS_master_svc_queue_length, |
259 | 5.42k | std::make_unique<tserver::PgClientServiceImpl>( |
260 | 5.42k | client_future(), clock(), std::bind(&Master::TransactionPool, this), metric_entity(), |
261 | 5.42k | &messenger()->scheduler()))); |
262 | | |
263 | 5.42k | return Status::OK(); |
264 | 5.42k | } |
265 | | |
266 | 0 | void Master::DisplayGeneralInfoIcons(std::stringstream* output) { |
267 | 0 | server::RpcAndWebServerBase::DisplayGeneralInfoIcons(output); |
268 | | // Tasks. |
269 | 0 | DisplayIconTile(output, "fa-check", "Tasks", "/tasks"); |
270 | 0 | DisplayIconTile(output, "fa-clone", "Replica Info", "/tablet-replication"); |
271 | 0 | DisplayIconTile(output, "fa-check", "TServer Clocks", "/tablet-server-clocks"); |
272 | 0 | } |
273 | | |
274 | 5.42k | Status Master::StartAsync() { |
275 | 5.42k | CHECK_EQ(kInitialized, state_); |
276 | | |
277 | 5.42k | RETURN_NOT_OK(maintenance_manager_->Init()); |
278 | 5.42k | RETURN_NOT_OK(RegisterServices()); |
279 | 5.42k | RETURN_NOT_OK(RpcAndWebServerBase::Start()); |
280 | | |
281 | | // Now that we've bound, construct our ServerRegistrationPB. |
282 | 5.42k | RETURN_NOT_OK(InitMasterRegistration()); |
283 | | |
284 | | // Start initializing the catalog manager. |
285 | 5.42k | RETURN_NOT_OK(init_pool_->SubmitClosure(Bind(&Master::InitCatalogManagerTask, |
286 | 5.42k | Unretained(this)))); |
287 | | |
288 | 5.42k | state_ = kRunning; |
289 | 5.42k | return Status::OK(); |
290 | 5.42k | } |
291 | | |
292 | 5.42k | void Master::InitCatalogManagerTask() { |
293 | 5.42k | Status s = InitCatalogManager(); |
294 | 5.42k | if (!s.ok()) { |
295 | 10 | LOG(ERROR) << ToString() << ": Unable to init master catalog manager: " << s.ToString(); |
296 | 10 | } |
297 | 5.42k | init_status_.set_value(s); |
298 | 5.42k | } |
299 | | |
300 | 5.42k | Status Master::InitCatalogManager() { |
301 | 5.42k | if (catalog_manager_->IsInitialized()) { |
302 | 0 | return STATUS(IllegalState, "Catalog manager is already initialized"); |
303 | 0 | } |
304 | 5.42k | RETURN_NOT_OK_PREPEND(catalog_manager_->Init(), |
305 | 5.41k | "Unable to initialize catalog manager"); |
306 | 5.41k | return Status::OK(); |
307 | 5.42k | } |
308 | | |
309 | 5.35k | Status Master::WaitForCatalogManagerInit() { |
310 | 5.35k | CHECK_EQ(state_, kRunning); |
311 | | |
312 | 5.35k | return init_future_.get(); |
313 | 5.35k | } |
314 | | |
315 | 57 | Status Master::WaitUntilCatalogManagerIsLeaderAndReadyForTests(const MonoDelta& timeout) { |
316 | 57 | RETURN_NOT_OK(catalog_manager_->WaitForWorkerPoolTests(timeout)); |
317 | 57 | Status s; |
318 | 57 | MonoTime start = MonoTime::Now(); |
319 | 57 | int backoff_ms = 1; |
320 | 57 | const int kMaxBackoffMs = 256; |
321 | 1.19k | do { |
322 | 1.19k | SCOPED_LEADER_SHARED_LOCK(l, catalog_manager_.get()); |
323 | 1.19k | if (l.catalog_status().ok() && l.leader_status().ok()) { |
324 | 56 | return Status::OK(); |
325 | 56 | } |
326 | 1.13k | l.Unlock(); |
327 | | |
328 | 1.13k | SleepFor(MonoDelta::FromMilliseconds(backoff_ms)); |
329 | 1.13k | backoff_ms = min(backoff_ms << 1, kMaxBackoffMs); |
330 | 1.13k | } while (MonoTime::Now().GetDeltaSince(start).LessThan(timeout)); |
331 | 1 | return STATUS(TimedOut, "Maximum time exceeded waiting for master leadership", |
332 | 57 | s.ToString()); |
333 | 57 | } |
334 | | |
335 | 179 | void Master::Shutdown() { |
336 | 179 | if (state_ == kRunning) { |
337 | 87 | string name = ToString(); |
338 | 87 | LOG(INFO) << name << " shutting down..."; |
339 | 87 | maintenance_manager_->Shutdown(); |
340 | | // We shutdown RpcAndWebServerBase here in order to shutdown messenger and reactor threads |
341 | | // before shutting down catalog manager. This is needed to prevent async calls callbacks |
342 | | // (running on reactor threads) from trying to use catalog manager thread pool which would be |
343 | | // already shutdown. |
344 | 87 | auto started = catalog_manager_->StartShutdown(); |
345 | 0 | LOG_IF(DFATAL, !started) << name << " catalog manager shutdown already in progress"; |
346 | 87 | async_client_init_->Shutdown(); |
347 | 87 | cdc_state_client_init_->Shutdown(); |
348 | 87 | RpcAndWebServerBase::Shutdown(); |
349 | 87 | catalog_manager_->CompleteShutdown(); |
350 | 87 | LOG(INFO) << name << " shutdown complete."; |
351 | 92 | } else { |
352 | 92 | LOG(INFO) << ToString() << " did not start, shutting down all that started..."; |
353 | 92 | RpcAndWebServerBase::Shutdown(); |
354 | 92 | } |
355 | 179 | state_ = kStopped; |
356 | 179 | } |
357 | | |
358 | 3.90M | Status Master::GetMasterRegistration(ServerRegistrationPB* reg) const { |
359 | 3.90M | auto* registration = registration_.get(); |
360 | 3.90M | if (!registration) { |
361 | 0 | return STATUS(ServiceUnavailable, "Master startup not complete"); |
362 | 0 | } |
363 | 3.90M | reg->CopyFrom(*registration); |
364 | 3.90M | return Status::OK(); |
365 | 3.90M | } |
366 | | |
367 | 5.42k | Status Master::InitMasterRegistration() { |
368 | 5.42k | CHECK(!registration_.get()); |
369 | | |
370 | 5.42k | auto reg = std::make_unique<ServerRegistrationPB>(); |
371 | 5.42k | RETURN_NOT_OK(GetRegistration(reg.get())); |
372 | 5.42k | registration_.reset(reg.release()); |
373 | | |
374 | 5.42k | return Status::OK(); |
375 | 5.42k | } |
376 | | |
377 | 364 | Status Master::ResetMemoryState(const consensus::RaftConfigPB& config) { |
378 | 364 | LOG(INFO) << "Memory state set to config: " << config.ShortDebugString(); |
379 | | |
380 | 364 | auto master_addr = std::make_shared<server::MasterAddresses>(); |
381 | 1.32k | for (const RaftPeerPB& peer : config.peers()) { |
382 | 1.32k | master_addr->push_back({HostPortFromPB(DesiredHostPort(peer, opts_.MakeCloudInfoPB()))}); |
383 | 1.32k | } |
384 | | |
385 | 364 | SetMasterAddresses(std::move(master_addr)); |
386 | | |
387 | 364 | return Status::OK(); |
388 | 364 | } |
389 | | |
390 | 0 | void Master::DumpMasterOptionsInfo(std::ostream* out) { |
391 | 0 | *out << "Master options : "; |
392 | 0 | auto master_addresses_shared_ptr = opts_.GetMasterAddresses(); // ENG-285 |
393 | 0 | bool first = true; |
394 | 0 | for (const auto& list : *master_addresses_shared_ptr) { |
395 | 0 | if (first) { |
396 | 0 | first = false; |
397 | 0 | } else { |
398 | 0 | *out << ", "; |
399 | 0 | } |
400 | 0 | bool need_comma = false; |
401 | 0 | for (const HostPort& hp : list) { |
402 | 0 | if (need_comma) { |
403 | 0 | *out << "/ "; |
404 | 0 | } |
405 | 0 | need_comma = true; |
406 | 0 | *out << hp.ToString(); |
407 | 0 | } |
408 | 0 | } |
409 | 0 | *out << "\n"; |
410 | 0 | } |
411 | | |
412 | 5.31k | Status Master::ListRaftConfigMasters(std::vector<RaftPeerPB>* masters) const { |
413 | 5.31k | consensus::ConsensusStatePB cpb; |
414 | 5.31k | RETURN_NOT_OK(catalog_manager_->GetCurrentConfig(&cpb)); |
415 | 5.31k | if (cpb.has_config()) { |
416 | 14.4k | for (RaftPeerPB peer : cpb.config().peers()) { |
417 | 14.4k | masters->push_back(peer); |
418 | 14.4k | } |
419 | 5.31k | return Status::OK(); |
420 | 0 | } else { |
421 | 0 | return STATUS(NotFound, "No raft config found."); |
422 | 0 | } |
423 | 5.31k | } |
424 | | |
425 | 142 | Status Master::ListMasters(std::vector<ServerEntryPB>* masters) const { |
426 | 142 | if (IsShellMode()) { |
427 | 0 | ServerEntryPB local_entry; |
428 | 0 | local_entry.mutable_instance_id()->CopyFrom(catalog_manager_->NodeInstance()); |
429 | 0 | RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration())); |
430 | 0 | local_entry.set_role(IsShellMode() ? PeerRole::NON_PARTICIPANT : PeerRole::LEADER); |
431 | 0 | masters->push_back(local_entry); |
432 | 0 | return Status::OK(); |
433 | 142 | } |
434 | | |
435 | 142 | consensus::ConsensusStatePB cpb; |
436 | 142 | RETURN_NOT_OK(catalog_manager_->GetCurrentConfig(&cpb)); |
437 | 142 | if (!cpb.has_config()) { |
438 | 0 | return STATUS(NotFound, "No raft config found."); |
439 | 0 | } |
440 | | |
441 | 322 | for (const RaftPeerPB& peer : cpb.config().peers()) { |
442 | | // Get all network addresses associated with this peer master |
443 | 322 | std::vector<HostPort> addrs; |
444 | 322 | for (const auto& hp : peer.last_known_private_addr()) { |
445 | 322 | addrs.push_back(HostPortFromPB(hp)); |
446 | 322 | } |
447 | 6 | for (const auto& hp : peer.last_known_broadcast_addr()) { |
448 | 6 | addrs.push_back(HostPortFromPB(hp)); |
449 | 6 | } |
450 | 322 | if (!FLAGS_TEST_master_extra_list_host_port.empty()) { |
451 | 0 | addrs.push_back(VERIFY_RESULT(HostPort::FromString( |
452 | 0 | FLAGS_TEST_master_extra_list_host_port, 0))); |
453 | 0 | } |
454 | | |
455 | | // Make GetMasterRegistration calls for peer master info. |
456 | 322 | ServerEntryPB peer_entry; |
457 | 322 | Status s = GetMasterEntryForHosts( |
458 | 322 | proxy_cache_.get(), addrs, MonoDelta::FromMilliseconds(FLAGS_master_rpc_timeout_ms), |
459 | 322 | &peer_entry); |
460 | 322 | if (!s.ok()) { |
461 | | // In case of errors talking to the peer master, |
462 | | // fill in fields from our catalog best as we can. |
463 | 0 | s = s.CloneAndPrepend( |
464 | 0 | Format("Unable to get registration information for peer ($0) id ($1)", |
465 | 0 | addrs, peer.permanent_uuid())); |
466 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 5) << "ListMasters: " << s; |
467 | 0 | StatusToPB(s, peer_entry.mutable_error()); |
468 | 0 | peer_entry.mutable_instance_id()->set_permanent_uuid(peer.permanent_uuid()); |
469 | 0 | peer_entry.mutable_instance_id()->set_instance_seqno(0); |
470 | 0 | auto reg = peer_entry.mutable_registration(); |
471 | 0 | reg->mutable_private_rpc_addresses()->CopyFrom(peer.last_known_private_addr()); |
472 | 0 | reg->mutable_broadcast_addresses()->CopyFrom(peer.last_known_broadcast_addr()); |
473 | 0 | } |
474 | 322 | masters->push_back(peer_entry); |
475 | 322 | } |
476 | | |
477 | 142 | return Status::OK(); |
478 | 142 | } |
479 | | |
480 | 32 | Status Master::InformRemovedMaster(const HostPortPB& hp_pb) { |
481 | 32 | HostPort hp(hp_pb.host(), hp_pb.port()); |
482 | 32 | MasterClusterProxy proxy(proxy_cache_.get(), hp); |
483 | 32 | RemovedMasterUpdateRequestPB req; |
484 | 32 | RemovedMasterUpdateResponsePB resp; |
485 | 32 | rpc::RpcController controller; |
486 | 32 | controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_master_rpc_timeout_ms)); |
487 | 32 | RETURN_NOT_OK(proxy.RemovedMasterUpdate(req, &resp, &controller)); |
488 | 26 | if (resp.has_error()) { |
489 | 0 | return StatusFromPB(resp.error().status()); |
490 | 0 | } |
491 | | |
492 | 26 | return Status::OK(); |
493 | 26 | } |
494 | | |
495 | | scoped_refptr<Histogram> Master::GetMetric( |
496 | 518k | const std::string& metric_identifier, MasterMetricType type, const std::string& description) { |
497 | 518k | std::string temp_metric_identifier = Format("$0_$1", metric_identifier, |
498 | 272k | (type == TaskMetric ? "Task" : "Attempt")); |
499 | 518k | EscapeMetricNameForPrometheus(&temp_metric_identifier); |
500 | 518k | { |
501 | 518k | std::lock_guard<std::mutex> lock(master_metrics_mutex_); |
502 | 518k | std::map<std::string, scoped_refptr<Histogram>>* master_metrics_ptr = master_metrics(); |
503 | 518k | auto it = master_metrics_ptr->find(temp_metric_identifier); |
504 | 518k | if (it == master_metrics_ptr->end()) { |
505 | 11.6k | std::unique_ptr<HistogramPrototype> histogram = std::make_unique<OwningHistogramPrototype>( |
506 | 11.6k | "server", temp_metric_identifier, description, yb::MetricUnit::kMicroseconds, |
507 | 11.6k | description, yb::MetricLevel::kInfo, 0, 10000000, 2); |
508 | 11.6k | scoped_refptr<Histogram> temp = |
509 | 11.6k | metric_entity()->FindOrCreateHistogram(std::move(histogram)); |
510 | 11.6k | (*master_metrics_ptr)[temp_metric_identifier] = temp; |
511 | 11.6k | return temp; |
512 | 11.6k | } |
513 | 506k | return it->second; |
514 | 506k | } |
515 | 506k | } |
516 | | |
517 | 27 | Status Master::GoIntoShellMode() { |
518 | 27 | maintenance_manager_->Shutdown(); |
519 | 27 | RETURN_NOT_OK(catalog_manager_impl()->GoIntoShellMode()); |
520 | 27 | return Status::OK(); |
521 | 27 | } |
522 | | |
523 | 5.42k | const std::shared_future<client::YBClient*>& Master::client_future() const { |
524 | 5.42k | return async_client_init_->get_client_future(); |
525 | 5.42k | } |
526 | | |
527 | 10.8k | scoped_refptr<MetricEntity> Master::metric_entity_cluster() { |
528 | 10.8k | return metric_entity_cluster_; |
529 | 10.8k | } |
530 | | |
531 | 0 | client::LocalTabletFilter Master::CreateLocalTabletFilter() { |
532 | 0 | return client::LocalTabletFilter(); |
533 | 0 | } |
534 | | |
535 | 2.00M | CatalogManagerIf* Master::catalog_manager() const { |
536 | 2.00M | return catalog_manager_.get(); |
537 | 2.00M | } |
538 | | |
539 | 2 | SysCatalogTable& Master::sys_catalog() const { |
540 | 2 | return *catalog_manager_->sys_catalog(); |
541 | 2 | } |
542 | | |
543 | 120k | PermissionsManager& Master::permissions_manager() { |
544 | 120k | return *catalog_manager_->permissions_manager(); |
545 | 120k | } |
546 | | |
547 | 9.22k | EncryptionManager& Master::encryption_manager() { |
548 | 9.22k | return catalog_manager_->encryption_manager(); |
549 | 9.22k | } |
550 | | |
551 | | } // namespace master |
552 | | } // namespace yb |