YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/tablet_server.h"
34
35
#include <algorithm>
36
#include <limits>
37
#include <list>
38
#include <thread>
39
#include <utility>
40
#include <vector>
41
42
#include <glog/logging.h>
43
44
#include "yb/client/transaction_manager.h"
45
#include "yb/client/universe_key_client.h"
46
47
#include "yb/common/common_flags.h"
48
#include "yb/common/wire_protocol.h"
49
50
#include "yb/encryption/universe_key_manager.h"
51
52
#include "yb/fs/fs_manager.h"
53
54
#include "yb/gutil/strings/substitute.h"
55
56
#include "yb/master/master_heartbeat.pb.h"
57
58
#include "yb/rpc/messenger.h"
59
#include "yb/rpc/service_if.h"
60
#include "yb/rpc/yb_rpc.h"
61
62
#include "yb/server/rpc_server.h"
63
#include "yb/server/webserver.h"
64
65
#include "yb/tablet/maintenance_manager.h"
66
#include "yb/tablet/tablet_peer.h"
67
68
#include "yb/tserver/heartbeater.h"
69
#include "yb/tserver/heartbeater_factory.h"
70
#include "yb/tserver/metrics_snapshotter.h"
71
#include "yb/tserver/pg_client_service.h"
72
#include "yb/tserver/remote_bootstrap_service.h"
73
#include "yb/tserver/tablet_service.h"
74
#include "yb/tserver/ts_tablet_manager.h"
75
#include "yb/tserver/tserver-path-handlers.h"
76
#include "yb/tserver/tserver_service.proxy.h"
77
78
#include "yb/util/flag_tags.h"
79
#include "yb/util/logging.h"
80
#include "yb/util/net/net_util.h"
81
#include "yb/util/net/sockaddr.h"
82
#include "yb/util/random_util.h"
83
#include "yb/util/size_literals.h"
84
#include "yb/util/status.h"
85
#include "yb/util/status_log.h"
86
87
using std::make_shared;
88
using std::shared_ptr;
89
using std::vector;
90
using yb::rpc::ServiceIf;
91
using yb::tablet::TabletPeer;
92
93
using namespace yb::size_literals;
94
using namespace std::placeholders;
95
96
DEFINE_int32(tablet_server_svc_num_threads, -1,
97
             "Number of RPC worker threads for the TS service. If -1, it is auto configured.");
98
TAG_FLAG(tablet_server_svc_num_threads, advanced);
99
100
DEFINE_int32(ts_admin_svc_num_threads, 10,
101
             "Number of RPC worker threads for the TS admin service");
102
TAG_FLAG(ts_admin_svc_num_threads, advanced);
103
104
DEFINE_int32(ts_consensus_svc_num_threads, -1,
105
             "Number of RPC worker threads for the TS consensus service. If -1, it is auto "
106
             "configured.");
107
TAG_FLAG(ts_consensus_svc_num_threads, advanced);
108
109
DEFINE_int32(ts_remote_bootstrap_svc_num_threads, 10,
110
             "Number of RPC worker threads for the TS remote bootstrap service");
111
TAG_FLAG(ts_remote_bootstrap_svc_num_threads, advanced);
112
113
DEFINE_int32(tablet_server_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength,
114
             "RPC queue length for the TS service.");
115
TAG_FLAG(tablet_server_svc_queue_length, advanced);
116
117
DEFINE_int32(ts_admin_svc_queue_length, 50,
118
             "RPC queue length for the TS admin service");
119
TAG_FLAG(ts_admin_svc_queue_length, advanced);
120
121
DEFINE_int32(ts_consensus_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength,
122
             "RPC queue length for the TS consensus service.");
123
TAG_FLAG(ts_consensus_svc_queue_length, advanced);
124
125
DEFINE_int32(ts_remote_bootstrap_svc_queue_length, 50,
126
             "RPC queue length for the TS remote bootstrap service");
127
TAG_FLAG(ts_remote_bootstrap_svc_queue_length, advanced);
128
129
DEFINE_bool(enable_direct_local_tablet_server_call,
130
            true,
131
            "Enable direct call to local tablet server");
132
TAG_FLAG(enable_direct_local_tablet_server_call, advanced);
133
134
DEFINE_string(redis_proxy_bind_address, "", "Address to bind the redis proxy to");
135
DEFINE_int32(redis_proxy_webserver_port, 0, "Webserver port for redis proxy");
136
137
DEFINE_string(cql_proxy_bind_address, "", "Address to bind the CQL proxy to");
138
DEFINE_int32(cql_proxy_webserver_port, 0, "Webserver port for CQL proxy");
139
140
DEFINE_string(pgsql_proxy_bind_address, "", "Address to bind the PostgreSQL proxy to");
141
DECLARE_int32(pgsql_proxy_webserver_port);
142
143
DEFINE_int64(inbound_rpc_memory_limit, 0, "Inbound RPC memory limit");
144
145
DEFINE_bool(start_pgsql_proxy, false,
146
            "Whether to run a PostgreSQL server as a child process of the tablet server");
147
148
DEFINE_bool(tserver_enable_metrics_snapshotter, false, "Should metrics snapshotter be enabled");
149
DECLARE_int32(num_concurrent_backfills_allowed);
150
DECLARE_int32(svc_queue_length_default);
151
152
namespace yb {
153
namespace tserver {
154
155
TabletServer::TabletServer(const TabletServerOptions& opts)
156
    : DbServerBase(
157
          "TabletServer", opts, "yb.tabletserver", server::CreateMemTrackerForServer()),
158
      fail_heartbeats_for_tests_(false),
159
      opts_(opts),
160
      tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
161
      path_handlers_(new TabletServerPathHandlers(this)),
162
      maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)),
163
      master_config_index_(0),
164
6.10k
      tablet_server_service_(nullptr) {
165
6.10k
  SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>(
166
6.10k
      FLAGS_inbound_rpc_memory_limit, mem_tracker()));
167
168
6.10k
  LOG(INFO) << "yb::tserver::TabletServer created at " << this;
169
6.10k
  LOG(INFO) << "yb::tserver::TSTabletManager created at " << tablet_manager_.get();
170
6.10k
}
171
172
113
TabletServer::~TabletServer() {
173
113
  Shutdown();
174
113
}
175
176
0
std::string TabletServer::ToString() const {
177
0
  return strings::Substitute("TabletServer : rpc=$0, uuid=$1",
178
0
                             yb::ToString(first_rpc_address()),
179
0
                             fs_manager_->uuid());
180
0
}
181
182
6.10k
Status TabletServer::ValidateMasterAddressResolution() const {
183
6.10k
  return ResultToStatus(server::ResolveMasterAddresses(*opts_.GetMasterAddresses()));
184
6.10k
}
185
186
Status TabletServer::UpdateMasterAddresses(const consensus::RaftConfigPB& new_config,
187
135
                                           bool is_master_leader) {
188
135
  shared_ptr<server::MasterAddresses> new_master_addresses;
189
135
  if (is_master_leader) {
190
124
    SetCurrentMasterIndex(new_config.opid_index());
191
124
    new_master_addresses = make_shared<server::MasterAddresses>();
192
193
124
    SetCurrentMasterIndex(new_config.opid_index());
194
195
428
    for (const auto& peer : new_config.peers()) {
196
428
      std::vector<HostPort> list;
197
428
      for (const auto& hp : peer.last_known_private_addr()) {
198
428
        list.push_back(HostPortFromPB(hp));
199
428
      }
200
0
      for (const auto& hp : peer.last_known_broadcast_addr()) {
201
0
        list.push_back(HostPortFromPB(hp));
202
0
      }
203
428
      new_master_addresses->push_back(std::move(list));
204
428
    }
205
11
  } else {
206
11
    new_master_addresses = make_shared<server::MasterAddresses>(*opts_.GetMasterAddresses());
207
208
32
    for (auto& list : *new_master_addresses) {
209
32
      std::sort(list.begin(), list.end());
210
32
    }
211
212
34
    for (const auto& peer : new_config.peers()) {
213
34
      std::vector<HostPort> list;
214
34
      for (const auto& hp : peer.last_known_private_addr()) {
215
34
        list.push_back(HostPortFromPB(hp));
216
34
      }
217
0
      for (const auto& hp : peer.last_known_broadcast_addr()) {
218
0
        list.push_back(HostPortFromPB(hp));
219
0
      }
220
34
      std::sort(list.begin(), list.end());
221
34
      bool found = false;
222
91
      for (const auto& existing : *new_master_addresses) {
223
91
        if (existing == list) {
224
20
          found = true;
225
20
          break;
226
20
        }
227
91
      }
228
34
      if (!found) {
229
14
        new_master_addresses->push_back(std::move(list));
230
14
      }
231
34
    }
232
11
  }
233
234
135
  LOG(INFO) << "Got new list of " << new_config.peers_size() << " masters at index "
235
135
            << new_config.opid_index() << " old masters = "
236
135
            << yb::ToString(opts_.GetMasterAddresses())
237
135
            << " new masters = " << yb::ToString(new_master_addresses) << " from "
238
124
            << (is_master_leader ? "leader." : "follower.");
239
240
135
  opts_.SetMasterAddresses(new_master_addresses);
241
242
135
  heartbeater_->set_master_addresses(new_master_addresses);
243
244
135
  return Status::OK();
245
135
}
246
247
0
void TabletServer::SetUniverseKeys(const encryption::UniverseKeysPB& universe_keys) {
248
0
  opts_.universe_key_manager->SetUniverseKeys(universe_keys);
249
0
}
250
251
0
void TabletServer::GetUniverseKeyRegistrySync() {
252
0
  universe_key_client_->GetUniverseKeyRegistrySync();
253
0
}
254
255
6.10k
Status TabletServer::Init() {
256
6.10k
  CHECK(!initted_.load(std::memory_order_acquire));
257
258
  // Validate that the passed master address actually resolves.
259
  // We don't validate that we can connect at this point -- it should
260
  // be allowed to start the TS and the master in whichever order --
261
  // our heartbeat thread will loop until successfully connecting.
262
6.10k
  RETURN_NOT_OK(ValidateMasterAddressResolution());
263
264
6.10k
  RETURN_NOT_OK(RpcAndWebServerBase::Init());
265
6.10k
  RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
266
267
6.10k
  log_prefix_ = Format("P $0: ", permanent_uuid());
268
269
6.10k
  heartbeater_ = CreateHeartbeater(opts_, this);
270
271
6.10k
  if (FLAGS_tserver_enable_metrics_snapshotter) {
272
0
    metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this));
273
0
  }
274
275
6.10k
  std::vector<HostPort> hps;
276
15.0k
  for (const auto& master_addr_vector : *opts_.GetMasterAddresses()) {
277
15.4k
    for (const auto& master_addr : master_addr_vector) {
278
15.4k
      hps.push_back(master_addr);
279
15.4k
    }
280
15.0k
  }
281
282
6.10k
  universe_key_client_ = std::make_unique<client::UniverseKeyClient>(
283
0
      hps, proxy_cache_.get(), [&] (const encryption::UniverseKeysPB& universe_keys) {
284
0
        opts_.universe_key_manager->SetUniverseKeys(universe_keys);
285
0
  });
286
0
  opts_.universe_key_manager->SetGetUniverseKeysCallback([&]() {
287
0
    universe_key_client_->GetUniverseKeyRegistrySync();
288
0
  });
289
6.10k
  RETURN_NOT_OK_PREPEND(tablet_manager_->Init(),
290
6.10k
                        "Could not init Tablet Manager");
291
292
6.10k
  initted_.store(true, std::memory_order_release);
293
294
6.10k
  auto bound_addresses = rpc_server()->GetBoundAddresses();
295
6.10k
  if (!bound_addresses.empty()) {
296
5.81k
    ServerRegistrationPB reg;
297
5.81k
    RETURN_NOT_OK(GetRegistration(&reg, server::RpcOnly::kTrue));
298
5.81k
    shared_object().SetHostEndpoint(bound_addresses.front(), PublicHostPort(reg).host());
299
5.81k
  }
300
301
  // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h.
302
6.10k
  RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433));
303
6.10k
  shared_object().SetPostgresAuthKey(RandomUniformInt<uint64_t>());
304
305
6.10k
  return Status::OK();
306
6.10k
}
307
308
17.5k
Status TabletServer::GetRegistration(ServerRegistrationPB* reg, server::RpcOnly rpc_only) const {
309
17.5k
  RETURN_NOT_OK(RpcAndWebServerBase::GetRegistration(reg, rpc_only));
310
17.5k
  reg->set_pg_port(pgsql_proxy_bind_address().port());
311
17.5k
  return Status::OK();
312
17.5k
}
313
314
14
Status TabletServer::WaitInited() {
315
14
  return tablet_manager_->WaitForAllBootstrapsToFinish();
316
14
}
317
318
5.81k
void TabletServer::AutoInitServiceFlags() {
319
5.81k
  const int32 num_cores = base::NumCPUs();
320
321
5.81k
  if (FLAGS_tablet_server_svc_num_threads == -1) {
322
    // Auto select number of threads for the TS service based on number of cores.
323
    // But bound it between 64 & 512.
324
4.69k
    const int32 num_threads = std::min(512, num_cores * 32);
325
4.69k
    FLAGS_tablet_server_svc_num_threads = std::max(64, num_threads);
326
4.69k
    LOG(INFO) << "Auto setting FLAGS_tablet_server_svc_num_threads to "
327
4.69k
              << FLAGS_tablet_server_svc_num_threads;
328
4.69k
  }
329
330
5.81k
  if (FLAGS_num_concurrent_backfills_allowed == -1) {
331
5.73k
    const int32 num_threads = std::min(8, num_cores / 2);
332
5.73k
    FLAGS_num_concurrent_backfills_allowed = std::max(1, num_threads);
333
5.73k
    LOG(INFO) << "Auto setting FLAGS_num_concurrent_backfills_allowed to "
334
5.73k
              << FLAGS_num_concurrent_backfills_allowed;
335
5.73k
  }
336
337
5.81k
  if (FLAGS_ts_consensus_svc_num_threads == -1) {
338
    // Auto select number of threads for the TS service based on number of cores.
339
    // But bound it between 64 & 512.
340
4.69k
    const int32 num_threads = std::min(512, num_cores * 32);
341
4.69k
    FLAGS_ts_consensus_svc_num_threads = std::max(64, num_threads);
342
4.69k
    LOG(INFO) << "Auto setting FLAGS_ts_consensus_svc_num_threads to "
343
4.69k
              << FLAGS_ts_consensus_svc_num_threads;
344
4.69k
  }
345
5.81k
}
346
347
5.81k
Status TabletServer::RegisterServices() {
348
5.81k
  tablet_server_service_ = new TabletServiceImpl(this);
349
5.81k
  LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_;
350
5.81k
  std::unique_ptr<ServiceIf> ts_service(tablet_server_service_);
351
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
352
5.81k
                                                     std::move(ts_service)));
353
354
5.81k
  std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
355
5.81k
  LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get();
356
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length,
357
5.81k
                                                     std::move(admin_service)));
358
359
5.81k
  std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(),
360
5.81k
                                                                        tablet_manager_.get()));
361
5.81k
  LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get();
362
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length,
363
5.81k
                                                     std::move(consensus_service),
364
5.81k
                                                     rpc::ServicePriority::kHigh));
365
366
5.81k
  std::unique_ptr<ServiceIf> remote_bootstrap_service =
367
5.81k
      std::make_unique<RemoteBootstrapServiceImpl>(
368
5.81k
          fs_manager_.get(), tablet_manager_.get(), metric_entity());
369
5.81k
  LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " <<
370
5.81k
    remote_bootstrap_service.get();
371
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length,
372
5.81k
                                                     std::move(remote_bootstrap_service)));
373
374
5.81k
  std::unique_ptr<ServiceIf> forward_service =
375
5.81k
    std::make_unique<TabletServerForwardServiceImpl>(tablet_server_service_, this);
376
5.81k
  LOG(INFO) << "yb::tserver::ForwardServiceImpl created at " << forward_service.get();
377
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
378
5.81k
                                                     std::move(forward_service)));
379
380
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(
381
5.81k
      FLAGS_svc_queue_length_default,
382
5.81k
      std::make_unique<PgClientServiceImpl>(
383
5.81k
          tablet_manager_->client_future(),
384
5.81k
          clock(),
385
5.81k
          std::bind(&TabletServer::TransactionPool, this),
386
5.81k
          metric_entity(),
387
5.81k
          &messenger()->scheduler())));
388
389
5.81k
  return Status::OK();
390
5.81k
}
391
392
5.81k
Status TabletServer::Start() {
393
5.81k
  CHECK(initted_.load(std::memory_order_acquire));
394
395
5.81k
  AutoInitServiceFlags();
396
397
5.81k
  RETURN_NOT_OK(RegisterServices());
398
5.81k
  RETURN_NOT_OK(RpcAndWebServerBase::Start());
399
400
  // If enabled, creates a proxy to call this tablet server locally.
401
5.81k
  if (FLAGS_enable_direct_local_tablet_server_call) {
402
5.80k
    proxy_ = std::make_shared<TabletServerServiceProxy>(proxy_cache_.get(), HostPort());
403
5.80k
  }
404
405
5.81k
  RETURN_NOT_OK(tablet_manager_->Start());
406
407
5.81k
  RETURN_NOT_OK(heartbeater_->Start());
408
409
5.81k
  if (FLAGS_tserver_enable_metrics_snapshotter) {
410
0
    RETURN_NOT_OK(metrics_snapshotter_->Start());
411
0
  }
412
413
5.81k
  RETURN_NOT_OK(maintenance_manager_->Init());
414
415
5.81k
  google::FlushLogFiles(google::INFO); // Flush the startup messages.
416
417
5.81k
  return Status::OK();
418
5.81k
}
419
420
300
void TabletServer::Shutdown() {
421
300
  LOG(INFO) << "TabletServer shutting down...";
422
423
300
  bool expected = true;
424
300
  if (initted_.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) {
425
76
    maintenance_manager_->Shutdown();
426
76
    WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
427
428
76
    if (FLAGS_tserver_enable_metrics_snapshotter) {
429
0
      WARN_NOT_OK(metrics_snapshotter_->Stop(), "Failed to stop TS Metrics Snapshotter thread");
430
0
    }
431
432
76
    {
433
76
      std::lock_guard<simple_spinlock> l(lock_);
434
76
      tablet_server_service_ = nullptr;
435
76
    }
436
76
    tablet_manager_->StartShutdown();
437
76
    RpcAndWebServerBase::Shutdown();
438
76
    tablet_manager_->CompleteShutdown();
439
76
  }
440
441
300
  LOG(INFO) << "TabletServer shut down complete. Bye!";
442
300
}
443
444
396k
Status TabletServer::PopulateLiveTServers(const master::TSHeartbeatResponsePB& heartbeat_resp) {
445
396k
  std::lock_guard<simple_spinlock> l(lock_);
446
  // We reset the list each time, since we want to keep the tservers that are live from the
447
  // master's perspective.
448
  // TODO: In the future, we should enhance the logic here to keep track information retrieved
449
  // from the master and compare it with information stored here. Based on this information, we
450
  // can only send diff updates CQL clients about whether a node came up or went down.
451
396k
  live_tservers_.assign(heartbeat_resp.tservers().begin(), heartbeat_resp.tservers().end());
452
396k
  return Status::OK();
453
396k
}
454
455
Status TabletServer::GetLiveTServers(
456
10.8k
    std::vector<master::TSInformationPB> *live_tservers) const {
457
10.8k
  std::lock_guard<simple_spinlock> l(lock_);
458
10.8k
  *live_tservers = live_tservers_;
459
10.8k
  return Status::OK();
460
10.8k
}
461
462
Status TabletServer::GetTabletStatus(const GetTabletStatusRequestPB* req,
463
955
                                     GetTabletStatusResponsePB* resp) const {
464
0
  VLOG(3) << "GetTabletStatus called for tablet " << req->tablet_id();
465
955
  tablet::TabletPeerPtr peer;
466
955
  if (!tablet_manager_->LookupTablet(req->tablet_id(), &peer)) {
467
523
    return STATUS(NotFound, "Tablet not found", req->tablet_id());
468
523
  }
469
432
  peer->GetTabletStatusPB(resp->mutable_tablet_status());
470
432
  return Status::OK();
471
432
}
472
473
391
bool TabletServer::LeaderAndReady(const TabletId& tablet_id, bool allow_stale) const {
474
391
  tablet::TabletPeerPtr peer;
475
391
  if (!tablet_manager_->LookupTablet(tablet_id, &peer)) {
476
0
    return false;
477
0
  }
478
391
  return peer->LeaderStatus(allow_stale) == consensus::LeaderStatus::LEADER_AND_READY;
479
391
}
480
481
Status TabletServer::SetUniverseKeyRegistry(
482
0
    const encryption::UniverseKeyRegistryPB& universe_key_registry) {
483
0
  return Status::OK();
484
0
}
485
486
5.59k
void TabletServer::set_cluster_uuid(const std::string& cluster_uuid) {
487
5.59k
  std::lock_guard<simple_spinlock> l(lock_);
488
5.59k
  cluster_uuid_ = cluster_uuid;
489
5.59k
}
490
491
0
std::string TabletServer::cluster_uuid() const {
492
0
  std::lock_guard<simple_spinlock> l(lock_);
493
0
  return cluster_uuid_;
494
0
}
495
496
0
TabletServiceImpl* TabletServer::tablet_server_service() {
497
0
  std::lock_guard<simple_spinlock> l(lock_);
498
0
  return tablet_server_service_;
499
0
}
500
501
Status GetDynamicUrlTile(
502
  const string& path, const string& hostport, const int port,
503
0
  const string& http_addr_host, string* url) {
504
  // We get an incoming hostport string like '127.0.0.1:5433' or '[::1]:5433' or [::1]
505
  // and a port 13000 which has to be converted to '127.0.0.1:13000'. If the hostport is
506
  // a wildcard - 0.0.0.0 - the URLs are formed based on the http address for web instead
507
0
  HostPort hp;
508
0
  RETURN_NOT_OK(hp.ParseString(hostport, port));
509
0
  if (IsWildcardAddress(hp.host())) {
510
0
    hp.set_host(http_addr_host);
511
0
  }
512
0
  hp.set_port(port);
513
514
0
  *url = strings::Substitute("http://$0$1", hp.ToString(), path);
515
0
  return Status::OK();
516
0
}
517
518
0
Status TabletServer::DisplayRpcIcons(std::stringstream* output) {
519
0
  ServerRegistrationPB reg;
520
0
  RETURN_NOT_OK(GetRegistration(&reg));
521
0
  string http_addr_host = reg.http_addresses(0).host();
522
523
  // RPCs in Progress.
524
0
  DisplayIconTile(output, "fa-tasks", "TServer Live Ops", "/rpcz");
525
  // YCQL RPCs in Progress.
526
0
  string cass_url;
527
0
  RETURN_NOT_OK(GetDynamicUrlTile(
528
0
      "/rpcz", FLAGS_cql_proxy_bind_address, FLAGS_cql_proxy_webserver_port,
529
0
      http_addr_host, &cass_url));
530
0
  DisplayIconTile(output, "fa-tasks", "YCQL Live Ops", cass_url);
531
532
  // YEDIS RPCs in Progress.
533
0
  string redis_url;
534
0
  RETURN_NOT_OK(GetDynamicUrlTile(
535
0
      "/rpcz", FLAGS_redis_proxy_bind_address, FLAGS_redis_proxy_webserver_port,
536
0
      http_addr_host,  &redis_url));
537
0
  DisplayIconTile(output, "fa-tasks", "YEDIS Live Ops", redis_url);
538
539
  // YSQL RPCs in Progress.
540
0
  string sql_url;
541
0
  RETURN_NOT_OK(GetDynamicUrlTile(
542
0
      "/rpcz", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port,
543
0
      http_addr_host, &sql_url));
544
0
  DisplayIconTile(output, "fa-tasks", "YSQL Live Ops", sql_url);
545
546
  // YSQL All Ops
547
0
  string sql_all_url;
548
0
  RETURN_NOT_OK(GetDynamicUrlTile(
549
0
      "/statements", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port,
550
0
      http_addr_host, &sql_all_url));
551
0
  DisplayIconTile(output, "fa-tasks", "YSQL All Ops", sql_all_url);
552
0
  return Status::OK();
553
0
}
554
555
44.2k
Env* TabletServer::GetEnv() {
556
44.2k
  return opts_.env;
557
44.2k
}
558
559
5.81k
rocksdb::Env* TabletServer::GetRocksDBEnv() {
560
5.81k
  return opts_.rocksdb_env;
561
5.81k
}
562
563
235
uint64_t TabletServer::GetSharedMemoryPostgresAuthKey() {
564
235
  return shared_object().postgres_auth_key();
565
235
}
566
567
396k
void TabletServer::SetYSQLCatalogVersion(uint64_t new_version, uint64_t new_breaking_version) {
568
396k
  std::lock_guard<simple_spinlock> l(lock_);
569
570
396k
  if (new_version > ysql_catalog_version_) {
571
5.73k
    ysql_catalog_version_ = new_version;
572
5.73k
    shared_object().SetYSQLCatalogVersion(new_version);
573
5.73k
    ysql_last_breaking_catalog_version_ = new_breaking_version;
574
5.73k
    if (FLAGS_log_ysql_catalog_versions) {
575
0
      LOG_WITH_FUNC(INFO) << "set catalog version: " << new_version << ", breaking version: "
576
0
                          << new_breaking_version;
577
0
    }
578
391k
  } else if (new_version < ysql_catalog_version_) {
579
0
    LOG(DFATAL) << "Ignoring ysql catalog version update: new version too old. "
580
0
                 << "New: " << new_version << ", Old: " << ysql_catalog_version_;
581
0
  }
582
396k
}
583
584
396k
void TabletServer::UpdateTransactionTablesVersion(uint64_t new_version) {
585
396k
  const auto transaction_manager = transaction_manager_.load(std::memory_order_acquire);
586
396k
  if (transaction_manager) {
587
38.8k
    transaction_manager->UpdateTransactionTablesVersion(new_version);
588
38.8k
  }
589
396k
}
590
591
7.31M
TabletPeerLookupIf* TabletServer::tablet_peer_lookup() {
592
7.31M
  return tablet_manager_.get();
593
7.31M
}
594
595
45.9k
const std::shared_future<client::YBClient*>& TabletServer::client_future() const {
596
45.9k
  return tablet_manager_->client_future();
597
45.9k
}
598
599
103k
client::TransactionPool* TabletServer::TransactionPool() {
600
103k
  return DbServerBase::TransactionPool();
601
103k
}
602
603
658
client::LocalTabletFilter TabletServer::CreateLocalTabletFilter() {
604
658
  return std::bind(&TSTabletManager::PreserveLocalLeadersOnly, tablet_manager(), _1);
605
658
}
606
607
196k
const std::shared_ptr<MemTracker>& TabletServer::mem_tracker() const {
608
196k
  return RpcServerBase::mem_tracker();
609
196k
}
610
611
181
void TabletServer::SetPublisher(rpc::Publisher service) {
612
181
  publish_service_ptr_.reset(new rpc::Publisher(std::move(service)));
613
181
}
614
615
}  // namespace tserver
616
}  // namespace yb