/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/tablet_server.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <limits> |
37 | | #include <list> |
38 | | #include <thread> |
39 | | #include <utility> |
40 | | #include <vector> |
41 | | |
42 | | #include <glog/logging.h> |
43 | | |
44 | | #include "yb/client/transaction_manager.h" |
45 | | #include "yb/client/universe_key_client.h" |
46 | | |
47 | | #include "yb/common/common_flags.h" |
48 | | #include "yb/common/wire_protocol.h" |
49 | | |
50 | | #include "yb/encryption/universe_key_manager.h" |
51 | | |
52 | | #include "yb/fs/fs_manager.h" |
53 | | |
54 | | #include "yb/gutil/strings/substitute.h" |
55 | | |
56 | | #include "yb/master/master_heartbeat.pb.h" |
57 | | |
58 | | #include "yb/rpc/messenger.h" |
59 | | #include "yb/rpc/service_if.h" |
60 | | #include "yb/rpc/yb_rpc.h" |
61 | | |
62 | | #include "yb/server/rpc_server.h" |
63 | | #include "yb/server/webserver.h" |
64 | | |
65 | | #include "yb/tablet/maintenance_manager.h" |
66 | | #include "yb/tablet/tablet_peer.h" |
67 | | |
68 | | #include "yb/tserver/heartbeater.h" |
69 | | #include "yb/tserver/heartbeater_factory.h" |
70 | | #include "yb/tserver/metrics_snapshotter.h" |
71 | | #include "yb/tserver/pg_client_service.h" |
72 | | #include "yb/tserver/remote_bootstrap_service.h" |
73 | | #include "yb/tserver/tablet_service.h" |
74 | | #include "yb/tserver/ts_tablet_manager.h" |
75 | | #include "yb/tserver/tserver-path-handlers.h" |
76 | | #include "yb/tserver/tserver_service.proxy.h" |
77 | | |
78 | | #include "yb/util/flag_tags.h" |
79 | | #include "yb/util/logging.h" |
80 | | #include "yb/util/net/net_util.h" |
81 | | #include "yb/util/net/sockaddr.h" |
82 | | #include "yb/util/random_util.h" |
83 | | #include "yb/util/size_literals.h" |
84 | | #include "yb/util/status.h" |
85 | | #include "yb/util/status_log.h" |
86 | | |
87 | | using std::make_shared; |
88 | | using std::shared_ptr; |
89 | | using std::vector; |
90 | | using yb::rpc::ServiceIf; |
91 | | using yb::tablet::TabletPeer; |
92 | | |
93 | | using namespace yb::size_literals; |
94 | | using namespace std::placeholders; |
95 | | |
96 | | DEFINE_int32(tablet_server_svc_num_threads, -1, |
97 | | "Number of RPC worker threads for the TS service. If -1, it is auto configured."); |
98 | | TAG_FLAG(tablet_server_svc_num_threads, advanced); |
99 | | |
100 | | DEFINE_int32(ts_admin_svc_num_threads, 10, |
101 | | "Number of RPC worker threads for the TS admin service"); |
102 | | TAG_FLAG(ts_admin_svc_num_threads, advanced); |
103 | | |
104 | | DEFINE_int32(ts_consensus_svc_num_threads, -1, |
105 | | "Number of RPC worker threads for the TS consensus service. If -1, it is auto " |
106 | | "configured."); |
107 | | TAG_FLAG(ts_consensus_svc_num_threads, advanced); |
108 | | |
109 | | DEFINE_int32(ts_remote_bootstrap_svc_num_threads, 10, |
110 | | "Number of RPC worker threads for the TS remote bootstrap service"); |
111 | | TAG_FLAG(ts_remote_bootstrap_svc_num_threads, advanced); |
112 | | |
113 | | DEFINE_int32(tablet_server_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength, |
114 | | "RPC queue length for the TS service."); |
115 | | TAG_FLAG(tablet_server_svc_queue_length, advanced); |
116 | | |
117 | | DEFINE_int32(ts_admin_svc_queue_length, 50, |
118 | | "RPC queue length for the TS admin service"); |
119 | | TAG_FLAG(ts_admin_svc_queue_length, advanced); |
120 | | |
121 | | DEFINE_int32(ts_consensus_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength, |
122 | | "RPC queue length for the TS consensus service."); |
123 | | TAG_FLAG(ts_consensus_svc_queue_length, advanced); |
124 | | |
125 | | DEFINE_int32(ts_remote_bootstrap_svc_queue_length, 50, |
126 | | "RPC queue length for the TS remote bootstrap service"); |
127 | | TAG_FLAG(ts_remote_bootstrap_svc_queue_length, advanced); |
128 | | |
129 | | DEFINE_int32(pg_client_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength, |
130 | | "RPC queue length for the Pg Client service."); |
131 | | TAG_FLAG(pg_client_svc_queue_length, advanced); |
132 | | |
133 | | DEFINE_bool(enable_direct_local_tablet_server_call, |
134 | | true, |
135 | | "Enable direct call to local tablet server"); |
136 | | TAG_FLAG(enable_direct_local_tablet_server_call, advanced); |
137 | | |
138 | | DEFINE_string(redis_proxy_bind_address, "", "Address to bind the redis proxy to"); |
139 | | DEFINE_int32(redis_proxy_webserver_port, 0, "Webserver port for redis proxy"); |
140 | | |
141 | | DEFINE_string(cql_proxy_bind_address, "", "Address to bind the CQL proxy to"); |
142 | | DEFINE_int32(cql_proxy_webserver_port, 0, "Webserver port for CQL proxy"); |
143 | | |
144 | | DEFINE_string(pgsql_proxy_bind_address, "", "Address to bind the PostgreSQL proxy to"); |
145 | | DECLARE_int32(pgsql_proxy_webserver_port); |
146 | | |
147 | | DEFINE_int64(inbound_rpc_memory_limit, 0, "Inbound RPC memory limit"); |
148 | | |
149 | | DEFINE_bool(start_pgsql_proxy, false, |
150 | | "Whether to run a PostgreSQL server as a child process of the tablet server"); |
151 | | |
152 | | DEFINE_bool(tserver_enable_metrics_snapshotter, false, "Should metrics snapshotter be enabled"); |
153 | | DECLARE_int32(num_concurrent_backfills_allowed); |
154 | | DECLARE_int32(svc_queue_length_default); |
155 | | |
156 | | namespace yb { |
157 | | namespace tserver { |
158 | | |
159 | | TabletServer::TabletServer(const TabletServerOptions& opts) |
160 | | : DbServerBase( |
161 | | "TabletServer", opts, "yb.tabletserver", server::CreateMemTrackerForServer()), |
162 | | fail_heartbeats_for_tests_(false), |
163 | | opts_(opts), |
164 | | tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())), |
165 | | path_handlers_(new TabletServerPathHandlers(this)), |
166 | | maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)), |
167 | | master_config_index_(0), |
168 | 9.28k | tablet_server_service_(nullptr) { |
169 | 9.28k | SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>( |
170 | 9.28k | FLAGS_inbound_rpc_memory_limit, mem_tracker())); |
171 | | |
172 | 9.28k | LOG(INFO) << "yb::tserver::TabletServer created at " << this; |
173 | 9.28k | LOG(INFO) << "yb::tserver::TSTabletManager created at " << tablet_manager_.get(); |
174 | 9.28k | } |
175 | | |
176 | 166 | TabletServer::~TabletServer() { |
177 | 166 | Shutdown(); |
178 | 166 | } |
179 | | |
180 | 0 | std::string TabletServer::ToString() const { |
181 | 0 | return strings::Substitute("TabletServer : rpc=$0, uuid=$1", |
182 | 0 | yb::ToString(first_rpc_address()), |
183 | 0 | fs_manager_->uuid()); |
184 | 0 | } |
185 | | |
186 | 9.27k | Status TabletServer::ValidateMasterAddressResolution() const { |
187 | 9.27k | return ResultToStatus(server::ResolveMasterAddresses(*opts_.GetMasterAddresses())); |
188 | 9.27k | } |
189 | | |
190 | | Status TabletServer::UpdateMasterAddresses(const consensus::RaftConfigPB& new_config, |
191 | 224 | bool is_master_leader) { |
192 | 224 | shared_ptr<server::MasterAddresses> new_master_addresses; |
193 | 224 | if (is_master_leader) { |
194 | 208 | SetCurrentMasterIndex(new_config.opid_index()); |
195 | 208 | new_master_addresses = make_shared<server::MasterAddresses>(); |
196 | | |
197 | 208 | SetCurrentMasterIndex(new_config.opid_index()); |
198 | | |
199 | 701 | for (const auto& peer : new_config.peers()) { |
200 | 701 | std::vector<HostPort> list; |
201 | 701 | for (const auto& hp : peer.last_known_private_addr()) { |
202 | 701 | list.push_back(HostPortFromPB(hp)); |
203 | 701 | } |
204 | 701 | for (const auto& hp : peer.last_known_broadcast_addr()) { |
205 | 0 | list.push_back(HostPortFromPB(hp)); |
206 | 0 | } |
207 | 701 | new_master_addresses->push_back(std::move(list)); |
208 | 701 | } |
209 | 208 | } else { |
210 | 16 | new_master_addresses = make_shared<server::MasterAddresses>(*opts_.GetMasterAddresses()); |
211 | | |
212 | 46 | for (auto& list : *new_master_addresses) { |
213 | 46 | std::sort(list.begin(), list.end()); |
214 | 46 | } |
215 | | |
216 | 44 | for (const auto& peer : new_config.peers()) { |
217 | 44 | std::vector<HostPort> list; |
218 | 44 | for (const auto& hp : peer.last_known_private_addr()) { |
219 | 44 | list.push_back(HostPortFromPB(hp)); |
220 | 44 | } |
221 | 44 | for (const auto& hp : peer.last_known_broadcast_addr()) { |
222 | 0 | list.push_back(HostPortFromPB(hp)); |
223 | 0 | } |
224 | 44 | std::sort(list.begin(), list.end()); |
225 | 44 | bool found = false; |
226 | 122 | for (const auto& existing : *new_master_addresses) { |
227 | 122 | if (existing == list) { |
228 | 22 | found = true; |
229 | 22 | break; |
230 | 22 | } |
231 | 122 | } |
232 | 44 | if (!found) { |
233 | 22 | new_master_addresses->push_back(std::move(list)); |
234 | 22 | } |
235 | 44 | } |
236 | 16 | } |
237 | | |
238 | 224 | LOG(INFO) << "Got new list of " << new_config.peers_size() << " masters at index " |
239 | 224 | << new_config.opid_index() << " old masters = " |
240 | 224 | << yb::ToString(opts_.GetMasterAddresses()) |
241 | 224 | << " new masters = " << yb::ToString(new_master_addresses) << " from " |
242 | 224 | << (is_master_leader ? "leader."208 : "follower."16 ); |
243 | | |
244 | 224 | opts_.SetMasterAddresses(new_master_addresses); |
245 | | |
246 | 224 | heartbeater_->set_master_addresses(new_master_addresses); |
247 | | |
248 | 224 | return Status::OK(); |
249 | 224 | } |
250 | | |
251 | 0 | void TabletServer::SetUniverseKeys(const encryption::UniverseKeysPB& universe_keys) { |
252 | 0 | opts_.universe_key_manager->SetUniverseKeys(universe_keys); |
253 | 0 | } |
254 | | |
255 | 0 | void TabletServer::GetUniverseKeyRegistrySync() { |
256 | 0 | universe_key_client_->GetUniverseKeyRegistrySync(); |
257 | 0 | } |
258 | | |
259 | 9.27k | Status TabletServer::Init() { |
260 | 9.27k | CHECK(!initted_.load(std::memory_order_acquire)); |
261 | | |
262 | | // Validate that the passed master address actually resolves. |
263 | | // We don't validate that we can connect at this point -- it should |
264 | | // be allowed to start the TS and the master in whichever order -- |
265 | | // our heartbeat thread will loop until successfully connecting. |
266 | 9.27k | RETURN_NOT_OK(ValidateMasterAddressResolution()); |
267 | | |
268 | 9.27k | RETURN_NOT_OK(RpcAndWebServerBase::Init()); |
269 | | |
270 | | // If enabled, creates a proxy to call this tablet server locally. |
271 | 9.27k | if (FLAGS_enable_direct_local_tablet_server_call) { |
272 | 8.75k | proxy_ = std::make_shared<TabletServerServiceProxy>(proxy_cache_.get(), HostPort()); |
273 | 8.75k | } |
274 | | |
275 | 9.27k | RETURN_NOT_OK(path_handlers_->Register(web_server_.get())); |
276 | | |
277 | 9.27k | log_prefix_ = Format("P $0: ", permanent_uuid()); |
278 | | |
279 | 9.27k | heartbeater_ = CreateHeartbeater(opts_, this); |
280 | | |
281 | 9.27k | if (FLAGS_tserver_enable_metrics_snapshotter) { |
282 | 1 | metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this)); |
283 | 1 | } |
284 | | |
285 | 9.27k | std::vector<HostPort> hps; |
286 | 21.8k | for (const auto& master_addr_vector : *opts_.GetMasterAddresses()) { |
287 | 22.7k | for (const auto& master_addr : master_addr_vector) { |
288 | 22.7k | hps.push_back(master_addr); |
289 | 22.7k | } |
290 | 21.8k | } |
291 | | |
292 | 9.27k | universe_key_client_ = std::make_unique<client::UniverseKeyClient>( |
293 | 9.27k | hps, proxy_cache_.get(), [&] (const encryption::UniverseKeysPB& universe_keys) { |
294 | 0 | opts_.universe_key_manager->SetUniverseKeys(universe_keys); |
295 | 0 | }); |
296 | 9.27k | opts_.universe_key_manager->SetGetUniverseKeysCallback([&]() { |
297 | 0 | universe_key_client_->GetUniverseKeyRegistrySync(); |
298 | 0 | }); |
299 | 9.27k | RETURN_NOT_OK_PREPEND(tablet_manager_->Init(), |
300 | 9.27k | "Could not init Tablet Manager"); |
301 | | |
302 | 9.27k | initted_.store(true, std::memory_order_release); |
303 | | |
304 | 9.27k | auto bound_addresses = rpc_server()->GetBoundAddresses(); |
305 | 9.27k | if (!bound_addresses.empty()) { |
306 | 8.74k | ServerRegistrationPB reg; |
307 | 8.74k | RETURN_NOT_OK(GetRegistration(®, server::RpcOnly::kTrue)); |
308 | 8.74k | shared_object().SetHostEndpoint(bound_addresses.front(), PublicHostPort(reg).host()); |
309 | 8.74k | } |
310 | | |
311 | | // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h. |
312 | 9.27k | RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433)); |
313 | 9.27k | shared_object().SetPostgresAuthKey(RandomUniformInt<uint64_t>()); |
314 | | |
315 | 9.27k | return Status::OK(); |
316 | 9.27k | } |
317 | | |
318 | 26.9k | Status TabletServer::GetRegistration(ServerRegistrationPB* reg, server::RpcOnly rpc_only) const { |
319 | 26.9k | RETURN_NOT_OK(RpcAndWebServerBase::GetRegistration(reg, rpc_only)); |
320 | 26.9k | reg->set_pg_port(pgsql_proxy_bind_address().port()); |
321 | 26.9k | return Status::OK(); |
322 | 26.9k | } |
323 | | |
324 | 14 | Status TabletServer::WaitInited() { |
325 | 14 | return tablet_manager_->WaitForAllBootstrapsToFinish(); |
326 | 14 | } |
327 | | |
328 | 8.74k | void TabletServer::AutoInitServiceFlags() { |
329 | 8.74k | const int32 num_cores = base::NumCPUs(); |
330 | | |
331 | 8.74k | if (FLAGS_tablet_server_svc_num_threads == -1) { |
332 | | // Auto select number of threads for the TS service based on number of cores. |
333 | | // But bound it between 64 & 512. |
334 | 6.43k | const int32 num_threads = std::min(512, num_cores * 32); |
335 | 6.43k | FLAGS_tablet_server_svc_num_threads = std::max(64, num_threads); |
336 | 6.43k | LOG(INFO) << "Auto setting FLAGS_tablet_server_svc_num_threads to " |
337 | 6.43k | << FLAGS_tablet_server_svc_num_threads; |
338 | 6.43k | } |
339 | | |
340 | 8.74k | if (FLAGS_num_concurrent_backfills_allowed == -1) { |
341 | 8.66k | const int32 num_threads = std::min(8, num_cores / 2); |
342 | 8.66k | FLAGS_num_concurrent_backfills_allowed = std::max(1, num_threads); |
343 | 8.66k | LOG(INFO) << "Auto setting FLAGS_num_concurrent_backfills_allowed to " |
344 | 8.66k | << FLAGS_num_concurrent_backfills_allowed; |
345 | 8.66k | } |
346 | | |
347 | 8.74k | if (FLAGS_ts_consensus_svc_num_threads == -1) { |
348 | | // Auto select number of threads for the TS service based on number of cores. |
349 | | // But bound it between 64 & 512. |
350 | 6.43k | const int32 num_threads = std::min(512, num_cores * 32); |
351 | 6.43k | FLAGS_ts_consensus_svc_num_threads = std::max(64, num_threads); |
352 | 6.43k | LOG(INFO) << "Auto setting FLAGS_ts_consensus_svc_num_threads to " |
353 | 6.43k | << FLAGS_ts_consensus_svc_num_threads; |
354 | 6.43k | } |
355 | 8.74k | } |
356 | | |
357 | 8.74k | Status TabletServer::RegisterServices() { |
358 | 8.74k | tablet_server_service_ = new TabletServiceImpl(this); |
359 | 8.74k | LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_; |
360 | 8.74k | std::unique_ptr<ServiceIf> ts_service(tablet_server_service_); |
361 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length, |
362 | 8.74k | std::move(ts_service))); |
363 | | |
364 | 8.74k | std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this)); |
365 | 8.74k | LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get(); |
366 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length, |
367 | 8.74k | std::move(admin_service))); |
368 | | |
369 | 8.74k | std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(), |
370 | 8.74k | tablet_manager_.get())); |
371 | 8.74k | LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get(); |
372 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length, |
373 | 8.74k | std::move(consensus_service), |
374 | 8.74k | rpc::ServicePriority::kHigh)); |
375 | | |
376 | 8.74k | std::unique_ptr<ServiceIf> remote_bootstrap_service = |
377 | 8.74k | std::make_unique<RemoteBootstrapServiceImpl>( |
378 | 8.74k | fs_manager_.get(), tablet_manager_.get(), metric_entity()); |
379 | 8.74k | LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " << |
380 | 8.74k | remote_bootstrap_service.get(); |
381 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length, |
382 | 8.74k | std::move(remote_bootstrap_service))); |
383 | | |
384 | 8.74k | std::unique_ptr<ServiceIf> forward_service = |
385 | 8.74k | std::make_unique<TabletServerForwardServiceImpl>(tablet_server_service_, this); |
386 | 8.74k | LOG(INFO) << "yb::tserver::ForwardServiceImpl created at " << forward_service.get(); |
387 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length, |
388 | 8.74k | std::move(forward_service))); |
389 | | |
390 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService( |
391 | 8.74k | FLAGS_pg_client_svc_queue_length, |
392 | 8.74k | std::make_unique<PgClientServiceImpl>( |
393 | 8.74k | tablet_manager_->client_future(), |
394 | 8.74k | clock(), |
395 | 8.74k | std::bind(&TabletServer::TransactionPool, this), |
396 | 8.74k | metric_entity(), |
397 | 8.74k | &messenger()->scheduler()))); |
398 | | |
399 | 8.74k | return Status::OK(); |
400 | 8.74k | } |
401 | | |
402 | 8.74k | Status TabletServer::Start() { |
403 | 8.74k | CHECK(initted_.load(std::memory_order_acquire)); |
404 | | |
405 | 8.74k | AutoInitServiceFlags(); |
406 | | |
407 | 8.74k | RETURN_NOT_OK(RegisterServices()); |
408 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::Start()); |
409 | | |
410 | 8.65k | RETURN_NOT_OK(tablet_manager_->Start()); |
411 | | |
412 | 8.65k | RETURN_NOT_OK(heartbeater_->Start()); |
413 | | |
414 | 8.65k | if (FLAGS_tserver_enable_metrics_snapshotter) { |
415 | 1 | RETURN_NOT_OK(metrics_snapshotter_->Start()); |
416 | 1 | } |
417 | | |
418 | 8.65k | RETURN_NOT_OK(maintenance_manager_->Init()); |
419 | | |
420 | 8.65k | google::FlushLogFiles(google::INFO); // Flush the startup messages. |
421 | | |
422 | 8.65k | return Status::OK(); |
423 | 8.65k | } |
424 | | |
425 | 525 | void TabletServer::Shutdown() { |
426 | 525 | LOG(INFO) << "TabletServer shutting down..."; |
427 | | |
428 | 525 | bool expected = true; |
429 | 525 | if (initted_.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) { |
430 | 195 | maintenance_manager_->Shutdown(); |
431 | 195 | WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread"); |
432 | | |
433 | 195 | if (FLAGS_tserver_enable_metrics_snapshotter) { |
434 | 1 | WARN_NOT_OK(metrics_snapshotter_->Stop(), "Failed to stop TS Metrics Snapshotter thread"); |
435 | 1 | } |
436 | | |
437 | 195 | { |
438 | 195 | std::lock_guard<simple_spinlock> l(lock_); |
439 | 195 | tablet_server_service_ = nullptr; |
440 | 195 | } |
441 | 195 | tablet_manager_->StartShutdown(); |
442 | 195 | RpcAndWebServerBase::Shutdown(); |
443 | 195 | tablet_manager_->CompleteShutdown(); |
444 | 195 | } |
445 | | |
446 | 525 | LOG(INFO) << "TabletServer shut down complete. Bye!"; |
447 | 525 | } |
448 | | |
449 | 4.82M | Status TabletServer::PopulateLiveTServers(const master::TSHeartbeatResponsePB& heartbeat_resp) { |
450 | 4.82M | std::lock_guard<simple_spinlock> l(lock_); |
451 | | // We reset the list each time, since we want to keep the tservers that are live from the |
452 | | // master's perspective. |
453 | | // TODO: In the future, we should enhance the logic here to keep track information retrieved |
454 | | // from the master and compare it with information stored here. Based on this information, we |
455 | | // can only send diff updates CQL clients about whether a node came up or went down. |
456 | 4.82M | live_tservers_.assign(heartbeat_resp.tservers().begin(), heartbeat_resp.tservers().end()); |
457 | 4.82M | return Status::OK(); |
458 | 4.82M | } |
459 | | |
460 | | Status TabletServer::GetLiveTServers( |
461 | 37.2k | std::vector<master::TSInformationPB> *live_tservers) const { |
462 | 37.2k | std::lock_guard<simple_spinlock> l(lock_); |
463 | 37.2k | *live_tservers = live_tservers_; |
464 | 37.2k | return Status::OK(); |
465 | 37.2k | } |
466 | | |
467 | | Status TabletServer::GetTabletStatus(const GetTabletStatusRequestPB* req, |
468 | 1.08k | GetTabletStatusResponsePB* resp) const { |
469 | 1.08k | VLOG(3) << "GetTabletStatus called for tablet " << req->tablet_id()0 ; |
470 | 1.08k | tablet::TabletPeerPtr peer; |
471 | 1.08k | if (!tablet_manager_->LookupTablet(req->tablet_id(), &peer)) { |
472 | 557 | return STATUS(NotFound, "Tablet not found", req->tablet_id()); |
473 | 557 | } |
474 | 528 | peer->GetTabletStatusPB(resp->mutable_tablet_status()); |
475 | 528 | return Status::OK(); |
476 | 1.08k | } |
477 | | |
478 | 467 | bool TabletServer::LeaderAndReady(const TabletId& tablet_id, bool allow_stale) const { |
479 | 467 | tablet::TabletPeerPtr peer; |
480 | 467 | if (!tablet_manager_->LookupTablet(tablet_id, &peer)) { |
481 | 0 | return false; |
482 | 0 | } |
483 | 467 | return peer->LeaderStatus(allow_stale) == consensus::LeaderStatus::LEADER_AND_READY; |
484 | 467 | } |
485 | | |
486 | | Status TabletServer::SetUniverseKeyRegistry( |
487 | 0 | const encryption::UniverseKeyRegistryPB& universe_key_registry) { |
488 | 0 | return Status::OK(); |
489 | 0 | } |
490 | | |
491 | 8.16k | void TabletServer::set_cluster_uuid(const std::string& cluster_uuid) { |
492 | 8.16k | std::lock_guard<simple_spinlock> l(lock_); |
493 | 8.16k | cluster_uuid_ = cluster_uuid; |
494 | 8.16k | } |
495 | | |
496 | 0 | std::string TabletServer::cluster_uuid() const { |
497 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
498 | 0 | return cluster_uuid_; |
499 | 0 | } |
500 | | |
501 | 0 | TabletServiceImpl* TabletServer::tablet_server_service() { |
502 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
503 | 0 | return tablet_server_service_; |
504 | 0 | } |
505 | | |
506 | | Status GetDynamicUrlTile( |
507 | | const string& path, const string& hostport, const int port, |
508 | 0 | const string& http_addr_host, string* url) { |
509 | | // We get an incoming hostport string like '127.0.0.1:5433' or '[::1]:5433' or [::1] |
510 | | // and a port 13000 which has to be converted to '127.0.0.1:13000'. If the hostport is |
511 | | // a wildcard - 0.0.0.0 - the URLs are formed based on the http address for web instead |
512 | 0 | HostPort hp; |
513 | 0 | RETURN_NOT_OK(hp.ParseString(hostport, port)); |
514 | 0 | if (IsWildcardAddress(hp.host())) { |
515 | 0 | hp.set_host(http_addr_host); |
516 | 0 | } |
517 | 0 | hp.set_port(port); |
518 | |
|
519 | 0 | *url = strings::Substitute("http://$0$1", hp.ToString(), path); |
520 | 0 | return Status::OK(); |
521 | 0 | } |
522 | | |
523 | 0 | Status TabletServer::DisplayRpcIcons(std::stringstream* output) { |
524 | 0 | ServerRegistrationPB reg; |
525 | 0 | RETURN_NOT_OK(GetRegistration(®)); |
526 | 0 | string http_addr_host = reg.http_addresses(0).host(); |
527 | | |
528 | | // RPCs in Progress. |
529 | 0 | DisplayIconTile(output, "fa-tasks", "TServer Live Ops", "/rpcz"); |
530 | | // YCQL RPCs in Progress. |
531 | 0 | string cass_url; |
532 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
533 | 0 | "/rpcz", FLAGS_cql_proxy_bind_address, FLAGS_cql_proxy_webserver_port, |
534 | 0 | http_addr_host, &cass_url)); |
535 | 0 | DisplayIconTile(output, "fa-tasks", "YCQL Live Ops", cass_url); |
536 | | |
537 | | // YEDIS RPCs in Progress. |
538 | 0 | string redis_url; |
539 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
540 | 0 | "/rpcz", FLAGS_redis_proxy_bind_address, FLAGS_redis_proxy_webserver_port, |
541 | 0 | http_addr_host, &redis_url)); |
542 | 0 | DisplayIconTile(output, "fa-tasks", "YEDIS Live Ops", redis_url); |
543 | | |
544 | | // YSQL RPCs in Progress. |
545 | 0 | string sql_url; |
546 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
547 | 0 | "/rpcz", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port, |
548 | 0 | http_addr_host, &sql_url)); |
549 | 0 | DisplayIconTile(output, "fa-tasks", "YSQL Live Ops", sql_url); |
550 | | |
551 | | // YSQL All Ops |
552 | 0 | string sql_all_url; |
553 | 0 | RETURN_NOT_OK(GetDynamicUrlTile( |
554 | 0 | "/statements", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port, |
555 | 0 | http_addr_host, &sql_all_url)); |
556 | 0 | DisplayIconTile(output, "fa-tasks", "YSQL All Ops", sql_all_url); |
557 | 0 | return Status::OK(); |
558 | 0 | } |
559 | | |
560 | 1.09M | Env* TabletServer::GetEnv() { |
561 | 1.09M | return opts_.env; |
562 | 1.09M | } |
563 | | |
564 | 8.74k | rocksdb::Env* TabletServer::GetRocksDBEnv() { |
565 | 8.74k | return opts_.rocksdb_env; |
566 | 8.74k | } |
567 | | |
568 | 1.96k | uint64_t TabletServer::GetSharedMemoryPostgresAuthKey() { |
569 | 1.96k | return shared_object().postgres_auth_key(); |
570 | 1.96k | } |
571 | | |
572 | 4.82M | void TabletServer::SetYSQLCatalogVersion(uint64_t new_version, uint64_t new_breaking_version) { |
573 | 4.82M | std::lock_guard<simple_spinlock> l(lock_); |
574 | | |
575 | 4.82M | if (new_version > ysql_catalog_version_) { |
576 | 19.3k | ysql_catalog_version_ = new_version; |
577 | 19.3k | shared_object().SetYSQLCatalogVersion(new_version); |
578 | 19.3k | ysql_last_breaking_catalog_version_ = new_breaking_version; |
579 | 19.3k | if (FLAGS_log_ysql_catalog_versions) { |
580 | 0 | LOG_WITH_FUNC(INFO) << "set catalog version: " << new_version << ", breaking version: " |
581 | 0 | << new_breaking_version; |
582 | 0 | } |
583 | 4.80M | } else if (new_version < ysql_catalog_version_) { |
584 | 0 | LOG(DFATAL) << "Ignoring ysql catalog version update: new version too old. " |
585 | 0 | << "New: " << new_version << ", Old: " << ysql_catalog_version_; |
586 | 0 | } |
587 | 4.82M | } |
588 | | |
589 | 4.82M | void TabletServer::UpdateTransactionTablesVersion(uint64_t new_version) { |
590 | 4.82M | const auto transaction_manager = transaction_manager_.load(std::memory_order_acquire); |
591 | 4.82M | if (transaction_manager) { |
592 | 88.5k | transaction_manager->UpdateTransactionTablesVersion(new_version); |
593 | 88.5k | } |
594 | 4.82M | } |
595 | | |
596 | 13.6M | TabletPeerLookupIf* TabletServer::tablet_peer_lookup() { |
597 | 13.6M | return tablet_manager_.get(); |
598 | 13.6M | } |
599 | | |
600 | 75.0k | const std::shared_future<client::YBClient*>& TabletServer::client_future() const { |
601 | 75.0k | return tablet_manager_->client_future(); |
602 | 75.0k | } |
603 | | |
604 | 287k | client::TransactionPool* TabletServer::TransactionPool() { |
605 | 287k | return DbServerBase::TransactionPool(); |
606 | 287k | } |
607 | | |
608 | 1.04k | client::LocalTabletFilter TabletServer::CreateLocalTabletFilter() { |
609 | 1.04k | return std::bind(&TSTabletManager::PreserveLocalLeadersOnly, tablet_manager(), _1); |
610 | 1.04k | } |
611 | | |
612 | 329k | const std::shared_ptr<MemTracker>& TabletServer::mem_tracker() const { |
613 | 329k | return RpcServerBase::mem_tracker(); |
614 | 329k | } |
615 | | |
616 | 572 | void TabletServer::SetPublisher(rpc::Publisher service) { |
617 | 572 | publish_service_ptr_.reset(new rpc::Publisher(std::move(service))); |
618 | 572 | } |
619 | | |
620 | | } // namespace tserver |
621 | | } // namespace yb |