YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/tablet_server.h"
34
35
#include <algorithm>
36
#include <limits>
37
#include <list>
38
#include <thread>
39
#include <utility>
40
#include <vector>
41
42
#include <glog/logging.h>
43
44
#include "yb/client/transaction_manager.h"
45
#include "yb/client/universe_key_client.h"
46
47
#include "yb/common/common_flags.h"
48
#include "yb/common/wire_protocol.h"
49
50
#include "yb/encryption/universe_key_manager.h"
51
52
#include "yb/fs/fs_manager.h"
53
54
#include "yb/gutil/strings/substitute.h"
55
56
#include "yb/master/master_heartbeat.pb.h"
57
58
#include "yb/rpc/messenger.h"
59
#include "yb/rpc/service_if.h"
60
#include "yb/rpc/yb_rpc.h"
61
62
#include "yb/server/rpc_server.h"
63
#include "yb/server/webserver.h"
64
65
#include "yb/tablet/maintenance_manager.h"
66
#include "yb/tablet/tablet_peer.h"
67
68
#include "yb/tserver/heartbeater.h"
69
#include "yb/tserver/heartbeater_factory.h"
70
#include "yb/tserver/metrics_snapshotter.h"
71
#include "yb/tserver/pg_client_service.h"
72
#include "yb/tserver/remote_bootstrap_service.h"
73
#include "yb/tserver/tablet_service.h"
74
#include "yb/tserver/ts_tablet_manager.h"
75
#include "yb/tserver/tserver-path-handlers.h"
76
#include "yb/tserver/tserver_service.proxy.h"
77
78
#include "yb/util/flag_tags.h"
79
#include "yb/util/logging.h"
80
#include "yb/util/net/net_util.h"
81
#include "yb/util/net/sockaddr.h"
82
#include "yb/util/random_util.h"
83
#include "yb/util/size_literals.h"
84
#include "yb/util/status.h"
85
#include "yb/util/status_log.h"
86
87
using std::make_shared;
88
using std::shared_ptr;
89
using std::vector;
90
using yb::rpc::ServiceIf;
91
using yb::tablet::TabletPeer;
92
93
using namespace yb::size_literals;
94
using namespace std::placeholders;
95
96
DEFINE_int32(tablet_server_svc_num_threads, -1,
97
             "Number of RPC worker threads for the TS service. If -1, it is auto configured.");
98
TAG_FLAG(tablet_server_svc_num_threads, advanced);
99
100
DEFINE_int32(ts_admin_svc_num_threads, 10,
101
             "Number of RPC worker threads for the TS admin service");
102
TAG_FLAG(ts_admin_svc_num_threads, advanced);
103
104
DEFINE_int32(ts_consensus_svc_num_threads, -1,
105
             "Number of RPC worker threads for the TS consensus service. If -1, it is auto "
106
             "configured.");
107
TAG_FLAG(ts_consensus_svc_num_threads, advanced);
108
109
DEFINE_int32(ts_remote_bootstrap_svc_num_threads, 10,
110
             "Number of RPC worker threads for the TS remote bootstrap service");
111
TAG_FLAG(ts_remote_bootstrap_svc_num_threads, advanced);
112
113
DEFINE_int32(tablet_server_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength,
114
             "RPC queue length for the TS service.");
115
TAG_FLAG(tablet_server_svc_queue_length, advanced);
116
117
DEFINE_int32(ts_admin_svc_queue_length, 50,
118
             "RPC queue length for the TS admin service");
119
TAG_FLAG(ts_admin_svc_queue_length, advanced);
120
121
DEFINE_int32(ts_consensus_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength,
122
             "RPC queue length for the TS consensus service.");
123
TAG_FLAG(ts_consensus_svc_queue_length, advanced);
124
125
DEFINE_int32(ts_remote_bootstrap_svc_queue_length, 50,
126
             "RPC queue length for the TS remote bootstrap service");
127
TAG_FLAG(ts_remote_bootstrap_svc_queue_length, advanced);
128
129
DEFINE_int32(pg_client_svc_queue_length, yb::tserver::TabletServer::kDefaultSvcQueueLength,
130
             "RPC queue length for the Pg Client service.");
131
TAG_FLAG(pg_client_svc_queue_length, advanced);
132
133
DEFINE_bool(enable_direct_local_tablet_server_call,
134
            true,
135
            "Enable direct call to local tablet server");
136
TAG_FLAG(enable_direct_local_tablet_server_call, advanced);
137
138
DEFINE_string(redis_proxy_bind_address, "", "Address to bind the redis proxy to");
139
DEFINE_int32(redis_proxy_webserver_port, 0, "Webserver port for redis proxy");
140
141
DEFINE_string(cql_proxy_bind_address, "", "Address to bind the CQL proxy to");
142
DEFINE_int32(cql_proxy_webserver_port, 0, "Webserver port for CQL proxy");
143
144
DEFINE_string(pgsql_proxy_bind_address, "", "Address to bind the PostgreSQL proxy to");
145
DECLARE_int32(pgsql_proxy_webserver_port);
146
147
DEFINE_int64(inbound_rpc_memory_limit, 0, "Inbound RPC memory limit");
148
149
DEFINE_bool(start_pgsql_proxy, false,
150
            "Whether to run a PostgreSQL server as a child process of the tablet server");
151
152
DEFINE_bool(tserver_enable_metrics_snapshotter, false, "Should metrics snapshotter be enabled");
153
DECLARE_int32(num_concurrent_backfills_allowed);
154
DECLARE_int32(svc_queue_length_default);
155
156
namespace yb {
157
namespace tserver {
158
159
TabletServer::TabletServer(const TabletServerOptions& opts)
160
    : DbServerBase(
161
          "TabletServer", opts, "yb.tabletserver", server::CreateMemTrackerForServer()),
162
      fail_heartbeats_for_tests_(false),
163
      opts_(opts),
164
      tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
165
      path_handlers_(new TabletServerPathHandlers(this)),
166
      maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)),
167
      master_config_index_(0),
168
9.28k
      tablet_server_service_(nullptr) {
169
9.28k
  SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>(
170
9.28k
      FLAGS_inbound_rpc_memory_limit, mem_tracker()));
171
172
9.28k
  LOG(INFO) << "yb::tserver::TabletServer created at " << this;
173
9.28k
  LOG(INFO) << "yb::tserver::TSTabletManager created at " << tablet_manager_.get();
174
9.28k
}
175
176
166
TabletServer::~TabletServer() {
177
166
  Shutdown();
178
166
}
179
180
0
std::string TabletServer::ToString() const {
181
0
  return strings::Substitute("TabletServer : rpc=$0, uuid=$1",
182
0
                             yb::ToString(first_rpc_address()),
183
0
                             fs_manager_->uuid());
184
0
}
185
186
9.27k
Status TabletServer::ValidateMasterAddressResolution() const {
187
9.27k
  return ResultToStatus(server::ResolveMasterAddresses(*opts_.GetMasterAddresses()));
188
9.27k
}
189
190
Status TabletServer::UpdateMasterAddresses(const consensus::RaftConfigPB& new_config,
191
224
                                           bool is_master_leader) {
192
224
  shared_ptr<server::MasterAddresses> new_master_addresses;
193
224
  if (is_master_leader) {
194
208
    SetCurrentMasterIndex(new_config.opid_index());
195
208
    new_master_addresses = make_shared<server::MasterAddresses>();
196
197
208
    SetCurrentMasterIndex(new_config.opid_index());
198
199
701
    for (const auto& peer : new_config.peers()) {
200
701
      std::vector<HostPort> list;
201
701
      for (const auto& hp : peer.last_known_private_addr()) {
202
701
        list.push_back(HostPortFromPB(hp));
203
701
      }
204
701
      for (const auto& hp : peer.last_known_broadcast_addr()) {
205
0
        list.push_back(HostPortFromPB(hp));
206
0
      }
207
701
      new_master_addresses->push_back(std::move(list));
208
701
    }
209
208
  } else {
210
16
    new_master_addresses = make_shared<server::MasterAddresses>(*opts_.GetMasterAddresses());
211
212
46
    for (auto& list : *new_master_addresses) {
213
46
      std::sort(list.begin(), list.end());
214
46
    }
215
216
44
    for (const auto& peer : new_config.peers()) {
217
44
      std::vector<HostPort> list;
218
44
      for (const auto& hp : peer.last_known_private_addr()) {
219
44
        list.push_back(HostPortFromPB(hp));
220
44
      }
221
44
      for (const auto& hp : peer.last_known_broadcast_addr()) {
222
0
        list.push_back(HostPortFromPB(hp));
223
0
      }
224
44
      std::sort(list.begin(), list.end());
225
44
      bool found = false;
226
122
      for (const auto& existing : *new_master_addresses) {
227
122
        if (existing == list) {
228
22
          found = true;
229
22
          break;
230
22
        }
231
122
      }
232
44
      if (!found) {
233
22
        new_master_addresses->push_back(std::move(list));
234
22
      }
235
44
    }
236
16
  }
237
238
224
  LOG(INFO) << "Got new list of " << new_config.peers_size() << " masters at index "
239
224
            << new_config.opid_index() << " old masters = "
240
224
            << yb::ToString(opts_.GetMasterAddresses())
241
224
            << " new masters = " << yb::ToString(new_master_addresses) << " from "
242
224
            << (is_master_leader ? 
"leader."208
:
"follower."16
);
243
244
224
  opts_.SetMasterAddresses(new_master_addresses);
245
246
224
  heartbeater_->set_master_addresses(new_master_addresses);
247
248
224
  return Status::OK();
249
224
}
250
251
0
void TabletServer::SetUniverseKeys(const encryption::UniverseKeysPB& universe_keys) {
252
0
  opts_.universe_key_manager->SetUniverseKeys(universe_keys);
253
0
}
254
255
0
void TabletServer::GetUniverseKeyRegistrySync() {
256
0
  universe_key_client_->GetUniverseKeyRegistrySync();
257
0
}
258
259
9.27k
Status TabletServer::Init() {
260
9.27k
  CHECK(!initted_.load(std::memory_order_acquire));
261
262
  // Validate that the passed master address actually resolves.
263
  // We don't validate that we can connect at this point -- it should
264
  // be allowed to start the TS and the master in whichever order --
265
  // our heartbeat thread will loop until successfully connecting.
266
9.27k
  RETURN_NOT_OK(ValidateMasterAddressResolution());
267
268
9.27k
  RETURN_NOT_OK(RpcAndWebServerBase::Init());
269
270
  // If enabled, creates a proxy to call this tablet server locally.
271
9.27k
  if (FLAGS_enable_direct_local_tablet_server_call) {
272
8.75k
    proxy_ = std::make_shared<TabletServerServiceProxy>(proxy_cache_.get(), HostPort());
273
8.75k
  }
274
275
9.27k
  RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
276
277
9.27k
  log_prefix_ = Format("P $0: ", permanent_uuid());
278
279
9.27k
  heartbeater_ = CreateHeartbeater(opts_, this);
280
281
9.27k
  if (FLAGS_tserver_enable_metrics_snapshotter) {
282
1
    metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this));
283
1
  }
284
285
9.27k
  std::vector<HostPort> hps;
286
21.8k
  for (const auto& master_addr_vector : *opts_.GetMasterAddresses()) {
287
22.7k
    for (const auto& master_addr : master_addr_vector) {
288
22.7k
      hps.push_back(master_addr);
289
22.7k
    }
290
21.8k
  }
291
292
9.27k
  universe_key_client_ = std::make_unique<client::UniverseKeyClient>(
293
9.27k
      hps, proxy_cache_.get(), [&] (const encryption::UniverseKeysPB& universe_keys) {
294
0
        opts_.universe_key_manager->SetUniverseKeys(universe_keys);
295
0
  });
296
9.27k
  opts_.universe_key_manager->SetGetUniverseKeysCallback([&]() {
297
0
    universe_key_client_->GetUniverseKeyRegistrySync();
298
0
  });
299
9.27k
  RETURN_NOT_OK_PREPEND(tablet_manager_->Init(),
300
9.27k
                        "Could not init Tablet Manager");
301
302
9.27k
  initted_.store(true, std::memory_order_release);
303
304
9.27k
  auto bound_addresses = rpc_server()->GetBoundAddresses();
305
9.27k
  if (!bound_addresses.empty()) {
306
8.74k
    ServerRegistrationPB reg;
307
8.74k
    RETURN_NOT_OK(GetRegistration(&reg, server::RpcOnly::kTrue));
308
8.74k
    shared_object().SetHostEndpoint(bound_addresses.front(), PublicHostPort(reg).host());
309
8.74k
  }
310
311
  // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h.
312
9.27k
  RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433));
313
9.27k
  shared_object().SetPostgresAuthKey(RandomUniformInt<uint64_t>());
314
315
9.27k
  return Status::OK();
316
9.27k
}
317
318
26.9k
Status TabletServer::GetRegistration(ServerRegistrationPB* reg, server::RpcOnly rpc_only) const {
319
26.9k
  RETURN_NOT_OK(RpcAndWebServerBase::GetRegistration(reg, rpc_only));
320
26.9k
  reg->set_pg_port(pgsql_proxy_bind_address().port());
321
26.9k
  return Status::OK();
322
26.9k
}
323
324
14
Status TabletServer::WaitInited() {
325
14
  return tablet_manager_->WaitForAllBootstrapsToFinish();
326
14
}
327
328
8.74k
void TabletServer::AutoInitServiceFlags() {
329
8.74k
  const int32 num_cores = base::NumCPUs();
330
331
8.74k
  if (FLAGS_tablet_server_svc_num_threads == -1) {
332
    // Auto select number of threads for the TS service based on number of cores.
333
    // But bound it between 64 & 512.
334
6.43k
    const int32 num_threads = std::min(512, num_cores * 32);
335
6.43k
    FLAGS_tablet_server_svc_num_threads = std::max(64, num_threads);
336
6.43k
    LOG(INFO) << "Auto setting FLAGS_tablet_server_svc_num_threads to "
337
6.43k
              << FLAGS_tablet_server_svc_num_threads;
338
6.43k
  }
339
340
8.74k
  if (FLAGS_num_concurrent_backfills_allowed == -1) {
341
8.66k
    const int32 num_threads = std::min(8, num_cores / 2);
342
8.66k
    FLAGS_num_concurrent_backfills_allowed = std::max(1, num_threads);
343
8.66k
    LOG(INFO) << "Auto setting FLAGS_num_concurrent_backfills_allowed to "
344
8.66k
              << FLAGS_num_concurrent_backfills_allowed;
345
8.66k
  }
346
347
8.74k
  if (FLAGS_ts_consensus_svc_num_threads == -1) {
348
    // Auto select number of threads for the TS service based on number of cores.
349
    // But bound it between 64 & 512.
350
6.43k
    const int32 num_threads = std::min(512, num_cores * 32);
351
6.43k
    FLAGS_ts_consensus_svc_num_threads = std::max(64, num_threads);
352
6.43k
    LOG(INFO) << "Auto setting FLAGS_ts_consensus_svc_num_threads to "
353
6.43k
              << FLAGS_ts_consensus_svc_num_threads;
354
6.43k
  }
355
8.74k
}
356
357
8.74k
Status TabletServer::RegisterServices() {
358
8.74k
  tablet_server_service_ = new TabletServiceImpl(this);
359
8.74k
  LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_;
360
8.74k
  std::unique_ptr<ServiceIf> ts_service(tablet_server_service_);
361
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
362
8.74k
                                                     std::move(ts_service)));
363
364
8.74k
  std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
365
8.74k
  LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get();
366
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length,
367
8.74k
                                                     std::move(admin_service)));
368
369
8.74k
  std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(),
370
8.74k
                                                                        tablet_manager_.get()));
371
8.74k
  LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get();
372
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length,
373
8.74k
                                                     std::move(consensus_service),
374
8.74k
                                                     rpc::ServicePriority::kHigh));
375
376
8.74k
  std::unique_ptr<ServiceIf> remote_bootstrap_service =
377
8.74k
      std::make_unique<RemoteBootstrapServiceImpl>(
378
8.74k
          fs_manager_.get(), tablet_manager_.get(), metric_entity());
379
8.74k
  LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " <<
380
8.74k
    remote_bootstrap_service.get();
381
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length,
382
8.74k
                                                     std::move(remote_bootstrap_service)));
383
384
8.74k
  std::unique_ptr<ServiceIf> forward_service =
385
8.74k
    std::make_unique<TabletServerForwardServiceImpl>(tablet_server_service_, this);
386
8.74k
  LOG(INFO) << "yb::tserver::ForwardServiceImpl created at " << forward_service.get();
387
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
388
8.74k
                                                     std::move(forward_service)));
389
390
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(
391
8.74k
      FLAGS_pg_client_svc_queue_length,
392
8.74k
      std::make_unique<PgClientServiceImpl>(
393
8.74k
          tablet_manager_->client_future(),
394
8.74k
          clock(),
395
8.74k
          std::bind(&TabletServer::TransactionPool, this),
396
8.74k
          metric_entity(),
397
8.74k
          &messenger()->scheduler())));
398
399
8.74k
  return Status::OK();
400
8.74k
}
401
402
8.74k
Status TabletServer::Start() {
403
8.74k
  CHECK(initted_.load(std::memory_order_acquire));
404
405
8.74k
  AutoInitServiceFlags();
406
407
8.74k
  RETURN_NOT_OK(RegisterServices());
408
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::Start());
409
410
8.65k
  RETURN_NOT_OK(tablet_manager_->Start());
411
412
8.65k
  RETURN_NOT_OK(heartbeater_->Start());
413
414
8.65k
  if (FLAGS_tserver_enable_metrics_snapshotter) {
415
1
    RETURN_NOT_OK(metrics_snapshotter_->Start());
416
1
  }
417
418
8.65k
  RETURN_NOT_OK(maintenance_manager_->Init());
419
420
8.65k
  google::FlushLogFiles(google::INFO); // Flush the startup messages.
421
422
8.65k
  return Status::OK();
423
8.65k
}
424
425
525
void TabletServer::Shutdown() {
426
525
  LOG(INFO) << "TabletServer shutting down...";
427
428
525
  bool expected = true;
429
525
  if (initted_.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) {
430
195
    maintenance_manager_->Shutdown();
431
195
    WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
432
433
195
    if (FLAGS_tserver_enable_metrics_snapshotter) {
434
1
      WARN_NOT_OK(metrics_snapshotter_->Stop(), "Failed to stop TS Metrics Snapshotter thread");
435
1
    }
436
437
195
    {
438
195
      std::lock_guard<simple_spinlock> l(lock_);
439
195
      tablet_server_service_ = nullptr;
440
195
    }
441
195
    tablet_manager_->StartShutdown();
442
195
    RpcAndWebServerBase::Shutdown();
443
195
    tablet_manager_->CompleteShutdown();
444
195
  }
445
446
525
  LOG(INFO) << "TabletServer shut down complete. Bye!";
447
525
}
448
449
4.82M
Status TabletServer::PopulateLiveTServers(const master::TSHeartbeatResponsePB& heartbeat_resp) {
450
4.82M
  std::lock_guard<simple_spinlock> l(lock_);
451
  // We reset the list each time, since we want to keep the tservers that are live from the
452
  // master's perspective.
453
  // TODO: In the future, we should enhance the logic here to keep track information retrieved
454
  // from the master and compare it with information stored here. Based on this information, we
455
  // can only send diff updates CQL clients about whether a node came up or went down.
456
4.82M
  live_tservers_.assign(heartbeat_resp.tservers().begin(), heartbeat_resp.tservers().end());
457
4.82M
  return Status::OK();
458
4.82M
}
459
460
Status TabletServer::GetLiveTServers(
461
37.2k
    std::vector<master::TSInformationPB> *live_tservers) const {
462
37.2k
  std::lock_guard<simple_spinlock> l(lock_);
463
37.2k
  *live_tservers = live_tservers_;
464
37.2k
  return Status::OK();
465
37.2k
}
466
467
Status TabletServer::GetTabletStatus(const GetTabletStatusRequestPB* req,
468
1.08k
                                     GetTabletStatusResponsePB* resp) const {
469
1.08k
  VLOG
(3) << "GetTabletStatus called for tablet " << req->tablet_id()0
;
470
1.08k
  tablet::TabletPeerPtr peer;
471
1.08k
  if (!tablet_manager_->LookupTablet(req->tablet_id(), &peer)) {
472
557
    return STATUS(NotFound, "Tablet not found", req->tablet_id());
473
557
  }
474
528
  peer->GetTabletStatusPB(resp->mutable_tablet_status());
475
528
  return Status::OK();
476
1.08k
}
477
478
467
bool TabletServer::LeaderAndReady(const TabletId& tablet_id, bool allow_stale) const {
479
467
  tablet::TabletPeerPtr peer;
480
467
  if (!tablet_manager_->LookupTablet(tablet_id, &peer)) {
481
0
    return false;
482
0
  }
483
467
  return peer->LeaderStatus(allow_stale) == consensus::LeaderStatus::LEADER_AND_READY;
484
467
}
485
486
Status TabletServer::SetUniverseKeyRegistry(
487
0
    const encryption::UniverseKeyRegistryPB& universe_key_registry) {
488
0
  return Status::OK();
489
0
}
490
491
8.16k
void TabletServer::set_cluster_uuid(const std::string& cluster_uuid) {
492
8.16k
  std::lock_guard<simple_spinlock> l(lock_);
493
8.16k
  cluster_uuid_ = cluster_uuid;
494
8.16k
}
495
496
0
std::string TabletServer::cluster_uuid() const {
497
0
  std::lock_guard<simple_spinlock> l(lock_);
498
0
  return cluster_uuid_;
499
0
}
500
501
0
TabletServiceImpl* TabletServer::tablet_server_service() {
502
0
  std::lock_guard<simple_spinlock> l(lock_);
503
0
  return tablet_server_service_;
504
0
}
505
506
Status GetDynamicUrlTile(
507
  const string& path, const string& hostport, const int port,
508
0
  const string& http_addr_host, string* url) {
509
  // We get an incoming hostport string like '127.0.0.1:5433' or '[::1]:5433' or [::1]
510
  // and a port 13000 which has to be converted to '127.0.0.1:13000'. If the hostport is
511
  // a wildcard - 0.0.0.0 - the URLs are formed based on the http address for web instead
512
0
  HostPort hp;
513
0
  RETURN_NOT_OK(hp.ParseString(hostport, port));
514
0
  if (IsWildcardAddress(hp.host())) {
515
0
    hp.set_host(http_addr_host);
516
0
  }
517
0
  hp.set_port(port);
518
519
0
  *url = strings::Substitute("http://$0$1", hp.ToString(), path);
520
0
  return Status::OK();
521
0
}
522
523
0
Status TabletServer::DisplayRpcIcons(std::stringstream* output) {
524
0
  ServerRegistrationPB reg;
525
0
  RETURN_NOT_OK(GetRegistration(&reg));
526
0
  string http_addr_host = reg.http_addresses(0).host();
527
528
  // RPCs in Progress.
529
0
  DisplayIconTile(output, "fa-tasks", "TServer Live Ops", "/rpcz");
530
  // YCQL RPCs in Progress.
531
0
  string cass_url;
532
0
  RETURN_NOT_OK(GetDynamicUrlTile(
533
0
      "/rpcz", FLAGS_cql_proxy_bind_address, FLAGS_cql_proxy_webserver_port,
534
0
      http_addr_host, &cass_url));
535
0
  DisplayIconTile(output, "fa-tasks", "YCQL Live Ops", cass_url);
536
537
  // YEDIS RPCs in Progress.
538
0
  string redis_url;
539
0
  RETURN_NOT_OK(GetDynamicUrlTile(
540
0
      "/rpcz", FLAGS_redis_proxy_bind_address, FLAGS_redis_proxy_webserver_port,
541
0
      http_addr_host,  &redis_url));
542
0
  DisplayIconTile(output, "fa-tasks", "YEDIS Live Ops", redis_url);
543
544
  // YSQL RPCs in Progress.
545
0
  string sql_url;
546
0
  RETURN_NOT_OK(GetDynamicUrlTile(
547
0
      "/rpcz", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port,
548
0
      http_addr_host, &sql_url));
549
0
  DisplayIconTile(output, "fa-tasks", "YSQL Live Ops", sql_url);
550
551
  // YSQL All Ops
552
0
  string sql_all_url;
553
0
  RETURN_NOT_OK(GetDynamicUrlTile(
554
0
      "/statements", FLAGS_pgsql_proxy_bind_address, FLAGS_pgsql_proxy_webserver_port,
555
0
      http_addr_host, &sql_all_url));
556
0
  DisplayIconTile(output, "fa-tasks", "YSQL All Ops", sql_all_url);
557
0
  return Status::OK();
558
0
}
559
560
1.09M
Env* TabletServer::GetEnv() {
561
1.09M
  return opts_.env;
562
1.09M
}
563
564
8.74k
rocksdb::Env* TabletServer::GetRocksDBEnv() {
565
8.74k
  return opts_.rocksdb_env;
566
8.74k
}
567
568
1.96k
uint64_t TabletServer::GetSharedMemoryPostgresAuthKey() {
569
1.96k
  return shared_object().postgres_auth_key();
570
1.96k
}
571
572
4.82M
void TabletServer::SetYSQLCatalogVersion(uint64_t new_version, uint64_t new_breaking_version) {
573
4.82M
  std::lock_guard<simple_spinlock> l(lock_);
574
575
4.82M
  if (new_version > ysql_catalog_version_) {
576
19.3k
    ysql_catalog_version_ = new_version;
577
19.3k
    shared_object().SetYSQLCatalogVersion(new_version);
578
19.3k
    ysql_last_breaking_catalog_version_ = new_breaking_version;
579
19.3k
    if (FLAGS_log_ysql_catalog_versions) {
580
0
      LOG_WITH_FUNC(INFO) << "set catalog version: " << new_version << ", breaking version: "
581
0
                          << new_breaking_version;
582
0
    }
583
4.80M
  } else if (new_version < ysql_catalog_version_) {
584
0
    LOG(DFATAL) << "Ignoring ysql catalog version update: new version too old. "
585
0
                 << "New: " << new_version << ", Old: " << ysql_catalog_version_;
586
0
  }
587
4.82M
}
588
589
4.82M
void TabletServer::UpdateTransactionTablesVersion(uint64_t new_version) {
590
4.82M
  const auto transaction_manager = transaction_manager_.load(std::memory_order_acquire);
591
4.82M
  if (transaction_manager) {
592
88.5k
    transaction_manager->UpdateTransactionTablesVersion(new_version);
593
88.5k
  }
594
4.82M
}
595
596
13.6M
TabletPeerLookupIf* TabletServer::tablet_peer_lookup() {
597
13.6M
  return tablet_manager_.get();
598
13.6M
}
599
600
75.0k
const std::shared_future<client::YBClient*>& TabletServer::client_future() const {
601
75.0k
  return tablet_manager_->client_future();
602
75.0k
}
603
604
287k
client::TransactionPool* TabletServer::TransactionPool() {
605
287k
  return DbServerBase::TransactionPool();
606
287k
}
607
608
1.04k
client::LocalTabletFilter TabletServer::CreateLocalTabletFilter() {
609
1.04k
  return std::bind(&TSTabletManager::PreserveLocalLeadersOnly, tablet_manager(), _1);
610
1.04k
}
611
612
329k
const std::shared_ptr<MemTracker>& TabletServer::mem_tracker() const {
613
329k
  return RpcServerBase::mem_tracker();
614
329k
}
615
616
572
void TabletServer::SetPublisher(rpc::Publisher service) {
617
572
  publish_service_ptr_.reset(new rpc::Publisher(std::move(service)));
618
572
}
619
620
}  // namespace tserver
621
}  // namespace yb