/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/tablet_server.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <limits> |
37 | | #include <list> |
38 | | #include <thread> |
39 | | #include <utility> |
40 | | #include <vector> |
41 | | |
42 | | #include <glog/logging.h> |
43 | | |
44 | | #include "yb/client/transaction_manager.h" |
45 | | #include "yb/client/universe_key_client.h" |
46 | | |
47 | | #include "yb/common/common_flags.h" |
48 | | #include "yb/common/wire_protocol.h" |
49 | | |
50 | | #include "yb/encryption/universe_key_manager.h" |
51 | | |
52 | | #include "yb/fs/fs_manager.h" |
53 | | |
54 | | #include "yb/gutil/strings/substitute.h" |
55 | | |
56 | | #include "yb/master/master_heartbeat.pb.h" |
57 | | |
58 | | #include "yb/rpc/messenger.h" |
59 | | #include "yb/rpc/service_if.h" |
60 | | #include "yb/rpc/yb_rpc.h" |
61 | | |
62 | | #include "yb/server/rpc_server.h" |
63 | | #include "yb/server/webserver.h" |
64 | | |
65 | | #include "yb/tablet/maintenance_manager.h" |
66 | | #include "yb/tablet/tablet_peer.h" |
67 | | |
68 | | #include "yb/tserver/heartbeater.h" |
69 | | #include "yb/tserver/heartbeater_factory.h" |
70 | | #include "yb/tserver/metrics_snapshotter.h" |
71 | | #include "yb/tserver/pg_client_service.h" |
72 | | #include "yb/tserver/remote_bootstrap_service.h" |
73 | | #include "yb/tserver/tablet_service.h" |
74 | | #include "yb/tserver/ts_tablet_manager.h" |
75 | | #include "yb/tserver/tserver-path-handlers.h" |
76 | | #include "yb/tserver/tserver_service.proxy.h" |
77 | | |
78 | | #include "yb/util/flag_tags.h" |
79 | | #include "yb/util/logging.h" |
80 | | #include "yb/util/net/net_util.h" |
81 | | #include "yb/util/net/sockaddr.h" |
82 | | #include "yb/util/random_util.h" |
83 | | #include "yb/util/size_literals.h" |
84 | | #include "yb/util/status.h" |
85 | | #include "yb/util/status_log.h" |
86 | | |
87 | | using std::make_shared; |
88 | | using std::shared_ptr; |
89 | | using std::vector; |
90 | | using yb::rpc::ServiceIf; |
91 | | using yb::tablet::TabletPeer; |
92 | | |
93 | | using namespace yb::size_literals; |
94 | | using namespace std::placeholders; |
95 | | |
96 | | DEFINE_int32(tablet_server_svc_num_threads, -1, |
97 | | "Number of RPC worker threads for the TS service. If -1, it is auto configured."); |
98 | | TAG_FLAG(tablet_server_svc_num_threads, advanced); |
99 | | |
100 | | DEFINE_int32(ts_admin_svc_num_threads, 10, |
101 | | "Number of RPC worker threads for the TS admin service"); |
102 | | TAG_FLAG(ts_admin_svc_num_threads, advanced); |
103 | | |
104 | | DEFINE_int32(ts_consensus_svc_num_threads, -1, |
105 | | "Number of RPC worker threads for the TS consensus service. If -1, it is auto " |
106 | | "configured."); |
107 | | TAG_FLAG(ts_consensus_svc_num_threads, advanced); |
108 | | |
109 | | DEFINE_int32(ts_remote_bootstrap_svc_num_threads, 10, |
110 | | "Number of RPC worker threads for the TS remote bootstrap service"); |
111 | | TAG_FLAG(ts_remote_bootstrap_svc_num_threads, advanced); |
112 | | |
113 | | DEFINE_int32(tablet_server_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength, |
114 | | "RPC queue length for the TS service."); |
115 | | TAG_FLAG(tablet_server_svc_queue_length, advanced); |
116 | | |
117 | | DEFINE_int32(ts_admin_svc_queue_length, 50, |
118 | | "RPC queue length for the TS admin service"); |
119 | | TAG_FLAG(ts_admin_svc_queue_length, advanced); |
120 | | |
121 | | DEFINE_int32(ts_consensus_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength, |
122 | | "RPC queue length for the TS consensus service."); |
123 | | TAG_FLAG(ts_consensus_svc_queue_length, advanced); |
124 | | |
125 | | DEFINE_int32(ts_remote_bootstrap_svc_queue_length, 50, |
126 | | "RPC queue length for the TS remote bootstrap service"); |
127 | | TAG_FLAG(ts_remote_bootstrap_svc_queue_length, advanced); |
128 | | |
129 | | DEFINE_bool(enable_direct_local_tablet_server_call, |
130 | | true, |
131 | | "Enable direct call to local tablet server"); |
132 | | TAG_FLAG(enable_direct_local_tablet_server_call, advanced); |
133 | | |
134 | | DEFINE_string(redis_proxy_bind_address, "", "Address to bind the redis proxy to"); |
135 | | DEFINE_int32(redis_proxy_webserver_port, 0, "Webserver port for redis proxy"); |
136 | | |
137 | | DEFINE_string(cql_proxy_bind_address, "", "Address to bind the CQL proxy to"); |
138 | | DEFINE_int32(cql_proxy_webserver_port, 0, "Webserver port for CQL proxy"); |
139 | | |
140 | | DEFINE_string(pgsql_proxy_bind_address, "", "Address to bind the PostgreSQL proxy to"); |
141 | | DECLARE_int32(pgsql_proxy_webserver_port); |
142 | | |
143 | | DEFINE_int64(inbound_rpc_memory_limit, 0, "Inbound RPC memory limit"); |
144 | | |
145 | | DEFINE_bool(start_pgsql_proxy, false, |
146 | | "Whether to run a PostgreSQL server as a child process of the tablet server"); |
147 | | |
148 | | DEFINE_bool(tserver_enable_metrics_snapshotter, false, "Should metrics snapshotter be enabled"); |
149 | | DECLARE_int32(num_concurrent_backfills_allowed); |
150 | | DECLARE_int32(svc_queue_length_default); |
151 | | |
152 | | namespace yb { |
153 | | namespace tserver { |
154 | | |
155 | | TabletServer::TabletServer(const TabletServerOptions& opts) |
156 | | : DbServerBase( |
157 | | "TabletServer", opts, "yb.tabletserver", server::CreateMemTrackerForServer()), |
158 | | fail_heartbeats_for_tests_(false), |
159 | | opts_(opts), |
160 | | tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())), |
161 | | path_handlers_(new TabletServerPathHandlers(this)), |
162 | | maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)), |
163 | | master_config_index_(0), |
164 | 6.10k | tablet_server_service_(nullptr) { |
165 | 6.10k | SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>( |
166 | 6.10k | FLAGS_inbound_rpc_memory_limit, mem_tracker())); |
167 | | |
168 | 6.10k | LOG(INFO) << "yb::tserver::TabletServer created at " << this; |
169 | 6.10k | LOG(INFO) << "yb::tserver::TSTabletManager created at " << tablet_manager_.get(); |
170 | 6.10k | } |
171 | | |
172 | 113 | TabletServer::~TabletServer() { |
173 | 113 | Shutdown(); |
174 | 113 | } |
175 | | |
176 | 0 | std::string TabletServer::ToString() const { |
177 | 0 | return strings::Substitute("TabletServer : rpc=$0, uuid=$1", |
178 | 0 | yb::ToString(first_rpc_address()), |
179 | 0 | fs_manager_->uuid()); |
180 | 0 | } |
181 | | |
182 | 6.10k | Status TabletServer::ValidateMasterAddressResolution() const { |
183 | 6.10k | return ResultToStatus(server::ResolveMasterAddresses(*opts_.GetMasterAddresses())); |
184 | 6.10k | } |
185 | | |
186 | | Status TabletServer::UpdateMasterAddresses(const consensus::RaftConfigPB& new_config, |
187 | 135 | bool is_master_leader) { |
188 | 135 | shared_ptr<server::MasterAddresses> new_master_addresses; |
189 | 135 | if (is_master_leader) { |
190 | 124 | SetCurrentMasterIndex(new_config.opid_index()); |
191 | 124 | new_master_addresses = make_shared<server::MasterAddresses>(); |
192 | | |
193 | 124 | SetCurrentMasterIndex(new_config.opid_index()); |
194 | | |
195 | 428 | for (const auto& peer : new_config.peers()) { |
196 | 428 | std::vector<HostPort> list; |
197 | 428 | for (const auto& hp : peer.last_known_private_addr()) { |
198 | 428 | list.push_back(HostPortFromPB(hp)); |
199 | 428 | } |
200 | 0 | for (const auto& hp : peer.last_known_broadcast_addr()) { |
201 | 0 | list.push_back(HostPortFromPB(hp)); |
202 | 0 | } |
203 | 428 | new_master_addresses->push_back(std::move(list)); |
204 | 428 | } |
205 | 11 | } else { |
206 | 11 | new_master_addresses = make_shared<server::MasterAddresses>(*opts_.GetMasterAddresses()); |
207 | | |
208 | 32 | for (auto& list : *new_master_addresses) { |
209 | 32 | std::sort(list.begin(), list.end()); |
210 | 32 | } |
211 | | |
212 | 34 | for (const auto& peer : new_config.peers()) { |
213 | 34 | std::vector<HostPort> list; |
214 | 34 | for (const auto& hp : peer.last_known_private_addr()) { |
215 | 34 | list.push_back(HostPortFromPB(hp)); |
216 | 34 | } |
217 | 0 | for (const auto& hp : peer.last_known_broadcast_addr()) { |
218 | 0 | list.push_back(HostPortFromPB(hp)); |
219 | 0 | } |
220 | 34 | std::sort(list.begin(), list.end()); |
221 | 34 | bool found = false; |
222 | 91 | for (const auto& existing : *new_master_addresses) { |
223 | 91 | if (existing == list) { |
224 | 20 | found = true; |
225 | 20 | break; |
226 | 20 | } |
227 | 91 | } |
228 | 34 | if (!found) { |
229 | 14 | new_master_addresses->push_back(std::move(list)); |
230 | 14 | } |
231 | 34 | } |
232 | 11 | } |
233 | | |
234 | 135 | LOG(INFO) << "Got new list of " << new_config.peers_size() << " masters at index " |
235 | 135 | << new_config.opid_index() << " old masters = " |
236 | 135 | << yb::ToString(opts_.GetMasterAddresses()) |
237 | 135 | << " new masters = " << yb::ToString(new_master_addresses) << " from " |
238 | 124 | << (is_master_leader ? "leader." : "follower."); |
239 | | |
240 | 135 | opts_.SetMasterAddresses(new_master_addresses); |
241 | | |
242 | 135 | heartbeater_->set_master_addresses(new_master_addresses); |
243 | | |
244 | 135 | return Status::OK(); |
245 | 135 | } |
246 | | |
247 | 0 | void TabletServer::SetUniverseKeys(const encryption::UniverseKeysPB& universe_keys) { |
248 | 0 | opts_.universe_key_manager->SetUniverseKeys(universe_keys); |
249 | 0 | } |
250 | | |
251 | 0 | void TabletServer::GetUniverseKeyRegistrySync() { |
252 | 0 | universe_key_client_->GetUniverseKeyRegistrySync(); |
253 | 0 | } |
254 | | |
255 | 6.10k | Status TabletServer::Init() { |
256 | 6.10k | CHECK(!initted_.load(std::memory_order_acquire)); |
257 | | |
258 | | // Validate that the passed master address actually resolves. |
259 | | // We don't validate that we can connect at this point -- it should |
260 | | // be allowed to start the TS and the master in whichever order -- |
261 | | // our heartbeat thread will loop until successfully connecting. |
262 | 6.10k | RETURN_NOT_OK(ValidateMasterAddressResolution()); |
263 | | |
264 | 6.10k | RETURN_NOT_OK(RpcAndWebServerBase::Init()); |
265 | 6.10k | RETURN_NOT_OK(path_handlers_->Register(web_server_.get())); |
266 | | |
267 | 6.10k | log_prefix_ = Format("P $0: ", permanent_uuid()); |
268 | | |
269 | 6.10k | heartbeater_ = CreateHeartbeater(opts_, this); |
270 | | |
271 | 6.10k | if (FLAGS_tserver_enable_metrics_snapshotter) { |
272 | 0 | metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this)); |
273 | 0 | } |
274 | | |
275 | 6.10k | std::vector<HostPort> hps; |
276 | 15.0k | for (const auto& master_addr_vector : *opts_.GetMasterAddresses()) { |
277 | 15.4k | for (const auto& master_addr : master_addr_vector) { |
278 | 15.4k | hps.push_back(master_addr); |
279 | 15.4k | } |
280 | 15.0k | } |
281 | | |
282 | 6.10k | universe_key_client_ = std::make_unique<client::UniverseKeyClient>( |
283 | 0 | hps, proxy_cache_.get(), [&] (const encryption::UniverseKeysPB& universe_keys) { |
284 | 0 | opts_.universe_key_manager->SetUniverseKeys(universe_keys); |
285 | 0 | }); |
286 | 0 | opts_.universe_key_manager->SetGetUniverseKeysCallback([&]() { |
287 | 0 | universe_key_client_->GetUniverseKeyRegistrySync(); |
288 | 0 | }); |
289 | 6.10k | RETURN_NOT_OK_PREPEND(tablet_manager_->Init(), |
290 | 6.10k | "Could not init Tablet Manager"); |
291 | | |
292 | 6.10k | initted_.store(true, std::memory_order_release); |
293 | | |
294 | 6.10k | auto bound_addresses = rpc_server()->GetBoundAddresses(); |
295 | 6.10k | if (!bound_addresses.empty()) { |
296 | 5.81k | ServerRegistrationPB reg; |
297 | 5.81k | RETURN_NOT_OK(GetRegistration(®, server::RpcOnly::kTrue)); |
298 | 5.81k | shared_object().SetHostEndpoint(bound_addresses.front(), PublicHostPort(reg).host()); |
299 | 5.81k | } |
300 | | |
301 | | // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h. |
302 | 6.10k | RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433)); |
303 | 6.10k | shared_object().SetPostgresAuthKey(RandomUniformInt<uint64_t>()); |
304 | | |
305 | 6.10k | return Status::OK(); |
306 | 6.10k | } |
307 | | |
308 | 17.5k | Status TabletServer::GetRegistration(ServerRegistrationPB* reg, server::RpcOnly rpc_only) const { |
309 | 17.5k | RETURN_NOT_OK(RpcAndWebServerBase::GetRegistration(reg, rpc_only)); |
310 | 17.5k | reg->set_pg_port(pgsql_proxy_bind_address().port()); |
311 | 17.5k | return Status::OK(); |
312 | 17.5k | } |
313 | | |
314 | 14 | Status TabletServer::WaitInited() { |
315 | 14 | return tablet_manager_->WaitForAllBootstrapsToFinish(); |
316 | 14 | } |
317 | | |
318 | 5.81k | void TabletServer::AutoInitServiceFlags() { |
319 | 5.81k | const int32 num_cores = base::NumCPUs(); |
320 | | |
321 | 5.81k | if (FLAGS_tablet_server_svc_num_threads == -1) { |
322 | | // Auto select number of threads for the TS service based on number of cores. |
323 | | // But bound it between 64 & 512. |
324 | 4.69k | const int32 num_threads = std::min(512, num_cores * 32); |
325 | 4.69k | FLAGS_tablet_server_svc_num_threads = std::max(64, num_threads); |
326 | 4.69k | LOG(INFO) << "Auto setting FLAGS_tablet_server_svc_num_threads to " |
327 | 4.69k | << FLAGS_tablet_server_svc_num_threads; |
328 | 4.69k | } |
329 | | |
330 | 5.81k | if (FLAGS_num_concurrent_backfills_allowed == -1) { |
331 | 5.73k | const int32 num_threads = std::min(8, num_cores / 2); |
332 | 5.73k | FLAGS_num_concurrent_backfills_allowed = std::max(1, num_threads); |
333 | 5.73k | LOG(INFO) << "Auto setting FLAGS_num_concurrent_backfills_allowed to " |
334 | 5.73k | << FLAGS_num_concurrent_backfills_allowed; |
335 | 5.73k | } |
336 | | |
337 | 5.81k | if (FLAGS_ts_consensus_svc_num_threads == -1) { |
338 | | // Auto select number of threads for the TS service based on number of cores. |
339 | | // But bound it between 64 & 512. |
340 | 4.69k | const int32 num_threads = std::min(512, num_cores * 32); |
341 | 4.69k | FLAGS_ts_consensus_svc_num_threads = std::max(64, num_threads); |
342 | 4.69k | LOG(INFO) << "Auto setting FLAGS_ts_consensus_svc_num_threads to " |
343 | 4.69k | << FLAGS_ts_consensus_svc_num_threads; |
344 | 4.69k | } |
345 | 5.81k | } |
346 | | |
347 | 5.81k | Status TabletServer::RegisterServices() { |
348 | 5.81k | tablet_server_service_ = new TabletServiceImpl(this); |
349 | 5.81k | LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_; |
350 | 5.81k | std::unique_ptr<ServiceIf> ts_service(tablet_server_service_); |
351 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length, |
352 | 5.81k | std::move(ts_service))); |
353 | | |
354 | 5.81k | std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this)); |
355 | 5.81k | LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get(); |
356 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length, |
357 | 5.81k | std::move(admin_service))); |
358 | | |
359 | 5.81k | std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(), |
360 | 5.81k | tablet_manager_.get())); |
361 | 5.81k | LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get(); |
362 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length, |
363 | 5.81k | std::move(consensus_service), |
364 | 5.81k | rpc::ServicePriority::kHigh)); |
365 | | |
366 | 5.81k | std::unique_ptr<ServiceIf> remote_bootstrap_service = |
367 | 5.81k | std::make_unique<RemoteBootstrapServiceImpl>( |
368 | 5.81k | fs_manager_.get(), tablet_manager_.get(), metric_entity()); |
369 | 5.81k | LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " << |
370 | 5.81k | remote_bootstrap_service.get(); |
371 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length, |
372 | 5.81k | std::move(remote_bootstrap_service))); |
373 | | |
374 | 5.81k | std::unique_ptr<ServiceIf> forward_service = |
375 | 5.81k | std::make_unique<TabletServerForwardServiceImpl>(tablet_server_service_, this); |
376 | 5.81k | LOG(INFO) << "yb::tserver::ForwardServiceImpl created at " << forward_service.get(); |
377 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length, |
378 | 5.81k | std::move(forward_service))); |
379 | | |
380 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService( |
381 | 5.81k | FLAGS_svc_queue_length_default, |
382 | 5.81k | std::make_unique<PgClientServiceImpl>( |
383 | 5.81k | tablet_manager_->client_future(), |
384 | 5.81k | clock(), |
385 | 5.81k | std::bind(&TabletServer::TransactionPool, this), |
386 | 5.81k | metric_entity(), |
387 | 5.81k | &messenger()->scheduler()))); |
388 | | |
389 | 5.81k | return Status::OK(); |
390 | 5.81k | } |
391 | | |
392 | 5.81k | Status TabletServer::Start() { |
393 | 5.81k | CHECK(initted_.load(std::memory_order_acquire)); |
394 | | |
395 | 5.81k | AutoInitServiceFlags(); |
396 | | |
397 | 5.81k | RETURN_NOT_OK(RegisterServices()); |
398 | 5.81k | RETURN_NOT_OK(RpcAndWebServerBase::Start()); |
399 | | |
400 | | // If enabled, creates a proxy to call this tablet server locally. |
401 | 5.81k | if (FLAGS_enable_direct_local_tablet_server_call) { |
402 | 5.80k | proxy_ = std::make_shared<TabletServerServiceProxy>(proxy_cache_.get(), HostPort()); |
403 | 5.80k | } |
404 | | |
405 | 5.81k | RETURN_NOT_OK(tablet_manager_->Start()); |
406 | | |
407 | 5.81k | RETURN_NOT_OK(heartbeater_->Start()); |
408 | | |
409 | 5.81k | if (FLAGS_tserver_enable_metrics_snapshotter) { |
410 | 0 | RETURN_NOT_OK(metrics_snapshotter_->Start()); |
411 | 0 | } |
412 | | |
413 | 5.81k | RETURN_NOT_OK(maintenance_manager_->Init()); |
414 | | |
415 | 5.81k | google::FlushLogFiles(google::INFO); // Flush the startup messages. |
416 | | |
417 | 5.81k | return Status::OK(); |
418 | 5.81k | } |
419 | | |
420 | 300 | void TabletServer::Shutdown() { |
421 | 300 | LOG(INFO) << "TabletServer shutting down..."; |
422 | | |
423 | 300 | bool expected = true; |
424 | 300 | if (initted_.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) { |
425 | 76 | maintenance_manager_->Shutdown(); |
426 | 76 | WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread"); |
427 | | |
428 | 76 | if (FLAGS_tserver_enable_metrics_snapshotter) { |
429 | 0 | WARN_NOT_OK(metrics_snapshotter_->Stop(), "Failed to stop TS Metrics Snapshotter thread"); |
430 | 0 | } |
431 | | |
432 | 76 | { |
433 | 76 | std::lock_guard<simple_spinlock> l(lock_); |
434 | 76 | tablet_server_service_ = nullptr; |
435 | 76 | } |
436 | 76 | tablet_manager_->StartShutdown(); |
437 | 76 | RpcAndWebServerBase::Shutdown(); |
438 | 76 | tablet_manager_->CompleteShutdown(); |
439 | 76 | } |
440 | | |
441 | 300 | LOG(INFO) << "TabletServer shut down complete. Bye!"; |
442 | 300 | } |
443 | | |
444 | 396k | Status TabletServer::PopulateLiveTServers(const master::TSHeartbeatResponsePB& heartbeat_resp) { |
445 | 396k | std::lock_guard<simple_spinlock> l(lock_); |
446 | | // We reset the list each time, since we want to keep the tservers that are live from the |
447 | | // master's perspective. |
448 | | // TODO: In the future, we should enhance the logic here to keep track information retrieved |
449 | | // from the master and compare it with information stored here. Based on this information, we |
450 | | // can only send diff updates CQL clients about whether a node came up or went down. |
451 | 396k | live_tservers_.assign(heartbeat_resp.tservers().begin(), heartbeat_resp.tservers().end()); |
452 | 396k | return Status::OK(); |
453 | 396k | } |
454 | | |
455 | | Status TabletServer::GetLiveTServers( |
456 | 10.8k | std::vector<master::TSInformationPB> *live_tservers) const { |
457 | 10.8k | std::lock_guard<simple_spinlock> l(lock_); |
458 | 10.8k | *live_tservers = live_tservers_; |
459 | 10.8k | return Status::OK(); |
460 | 10.8k | } |
461 | | |
462 | | Status TabletServer::GetTabletStatus(const GetTabletStatusRequestPB* req, |
463 | 955 | GetTabletStatusResponsePB* resp) const { |
464 | 0 | VLOG(3) << "GetTabletStatus called for tablet " << req->tablet_id(); |
465 | 955 | tablet::TabletPeerPtr peer; |
466 | 955 | if (!tablet_manager_->LookupTablet(req->tablet_id(), &peer)) { |
467 | 523 | return STATUS(NotFound, "Tablet not found", req->tablet_id()); |
468 | 523 | } |
469 | 432 | peer->GetTabletStatusPB(resp->mutable_tablet_status()); |
470 | 432 | return Status::OK(); |
471 | 432 | } |
472 | | |
473 | 391 | bool TabletServer::LeaderAndReady(const TabletId& tablet_id, bool allow_stale) const { |
474 | 391 | tablet::TabletPeerPtr peer; |
475 | 391 | if (!tablet_manager_->LookupTablet(tablet_id, &peer)) { |
476 | 0 | return false; |
477 | 0 | } |
478 | 391 | return peer->LeaderStatus(allow_stale) == consensus::LeaderStatus::LEADER_AND_READY; |
479 | 391 | } |
480 | | |
481 | | Status TabletServer::SetUniverseKeyRegistry( |
482 | 0 | const encryption::UniverseKeyRegistryPB& universe_key_registry) { |
483 | 0 | return Status::OK(); |
484 | 0 | } |
485 | | |
486 | 5.59k | void TabletServer::set_cluster_uuid(const std::string& cluster_uuid) { |
487 | 5.59k | std::lock_guard<simple_spinlock> l(lock_); |
488 | 5.59k | cluster_uuid_ = cluster_uuid; |
489 | 5.59k | } |
490 | | |
491 | 0 | std::string TabletServer::cluster_uuid() const { |
492 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
493 | 0 | return cluster_uuid_; |
494 | 0 | } |
495 | | |
496 | 0 | TabletServiceImpl* TabletServer::tablet_server_service() { |
497 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
498 | 0 | return tablet_server_service_; |
499 | 0 | } |
500 | | |
501 | | Status GetDynamicUrlTile( |
502 | | const string& path, const string& hostport, const int port, |
503 | 0 | const string& http_addr_host, string* url) { |
504 | | // We get an incoming hostport string like '127.0.0.1:5433' or '[::1]:5433' or [::1] |
505 | | // and a port 13000 which has to be converted to '127.0.0.1:13000'. If the hostport is |
506 | | // a wildcard - 0.0.0.0 - the URLs are formed based on the http address for web instead |
507 | 0 | HostPort hp; |
508 | 0 | RETURN_NOT_OK(hp.ParseString(hostport, port)); |
509 | 0 | if (IsWildcardAddress(hp.host())) { |
510 | 0 | hp.set_host(http_addr_host); |
511 | 0 | } |
512 | 0 | hp.set_port(port); |
513 | |
|
514 | 0 | *url = strings::Substitute("http://$0$1", hp.ToString(), path); |
515 | 0 | return Status::OK(); |
516 | 0 | } |
517 | | |
518 | 0 | Status TabletServer::DisplayRpcIcons(std::stringstream* output) { |
519 | 0 | ServerRegistrationPB reg; |
520 | 0 | RETURN_NOT_OK(GetRegistration(®)); |
521 | 0 | string http_addr_host = reg.http_addresses(0).host(); |
522 | | |
523 | | // RPCs in Progress. |
524 | 0 | DisplayIconTile(output, "fa-tasks", "TServer Live Ops", "/rpcz"); |
525 | | // YCQL RPCs in Progress. |
526 | 0 | string cass_url; |
527 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
528 | 0 | "/rpcz", FLAGS_cql_proxy_bind_address, FLAGS_cql_proxy_webserver_port, |
529 | 0 | http_addr_host, &cass_url)); |
530 | 0 | DisplayIconTile(output, "fa-tasks", "YCQL Live Ops", cass_url); |
531 | | |
532 | | // YEDIS RPCs in Progress. |
533 | 0 | string redis_url; |
534 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
535 | 0 | "/rpcz", FLAGS_redis_proxy_bind_address, FLAGS_redis_proxy_webserver_port, |
536 | 0 | http_addr_host, &redis_url)); |
537 | 0 | DisplayIconTile(output, "fa-tasks", "YEDIS Live Ops", redis_url); |
538 | | |
539 | | // YSQL RPCs in Progress. |
540 | 0 | string sql_url; |
541 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
542 | 0 | "/rpcz", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port, |
543 | 0 | http_addr_host, &sql_url)); |
544 | 0 | DisplayIconTile(output, "fa-tasks", "YSQL Live Ops", sql_url); |
545 | | |
546 | | // YSQL All Ops |
547 | 0 | string sql_all_url; |
548 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
549 | 0 | "/statements", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port, |
550 | 0 | http_addr_host, &sql_all_url)); |
551 | 0 | DisplayIconTile(output, "fa-tasks", "YSQL All Ops", sql_all_url); |
552 | 0 | return Status::OK(); |
553 | 0 | } |
554 | | |
555 | 44.2k | Env* TabletServer::GetEnv() { |
556 | 44.2k | return opts_.env; |
557 | 44.2k | } |
558 | | |
559 | 5.81k | rocksdb::Env* TabletServer::GetRocksDBEnv() { |
560 | 5.81k | return opts_.rocksdb_env; |
561 | 5.81k | } |
562 | | |
563 | 235 | uint64_t TabletServer::GetSharedMemoryPostgresAuthKey() { |
564 | 235 | return shared_object().postgres_auth_key(); |
565 | 235 | } |
566 | | |
567 | 396k | void TabletServer::SetYSQLCatalogVersion(uint64_t new_version, uint64_t new_breaking_version) { |
568 | 396k | std::lock_guard<simple_spinlock> l(lock_); |
569 | | |
570 | 396k | if (new_version > ysql_catalog_version_) { |
571 | 5.73k | ysql_catalog_version_ = new_version; |
572 | 5.73k | shared_object().SetYSQLCatalogVersion(new_version); |
573 | 5.73k | ysql_last_breaking_catalog_version_ = new_breaking_version; |
574 | 5.73k | if (FLAGS_log_ysql_catalog_versions) { |
575 | 0 | LOG_WITH_FUNC(INFO) << "set catalog version: " << new_version << ", breaking version: " |
576 | 0 | << new_breaking_version; |
577 | 0 | } |
578 | 391k | } else if (new_version < ysql_catalog_version_) { |
579 | 0 | LOG(DFATAL) << "Ignoring ysql catalog version update: new version too old. " |
580 | 0 | << "New: " << new_version << ", Old: " << ysql_catalog_version_; |
581 | 0 | } |
582 | 396k | } |
583 | | |
584 | 396k | void TabletServer::UpdateTransactionTablesVersion(uint64_t new_version) { |
585 | 396k | const auto transaction_manager = transaction_manager_.load(std::memory_order_acquire); |
586 | 396k | if (transaction_manager) { |
587 | 38.8k | transaction_manager->UpdateTransactionTablesVersion(new_version); |
588 | 38.8k | } |
589 | 396k | } |
590 | | |
591 | 7.31M | TabletPeerLookupIf* TabletServer::tablet_peer_lookup() { |
592 | 7.31M | return tablet_manager_.get(); |
593 | 7.31M | } |
594 | | |
595 | 45.9k | const std::shared_future<client::YBClient*>& TabletServer::client_future() const { |
596 | 45.9k | return tablet_manager_->client_future(); |
597 | 45.9k | } |
598 | | |
599 | 103k | client::TransactionPool* TabletServer::TransactionPool() { |
600 | 103k | return DbServerBase::TransactionPool(); |
601 | 103k | } |
602 | | |
603 | 658 | client::LocalTabletFilter TabletServer::CreateLocalTabletFilter() { |
604 | 658 | return std::bind(&TSTabletManager::PreserveLocalLeadersOnly, tablet_manager(), _1); |
605 | 658 | } |
606 | | |
607 | 196k | const std::shared_ptr<MemTracker>& TabletServer::mem_tracker() const { |
608 | 196k | return RpcServerBase::mem_tracker(); |
609 | 196k | } |
610 | | |
611 | 181 | void TabletServer::SetPublisher(rpc::Publisher service) { |
612 | 181 | publish_service_ptr_.reset(new rpc::Publisher(std::move(service))); |
613 | 181 | } |
614 | | |
615 | | } // namespace tserver |
616 | | } // namespace yb |