YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/server/server_base.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/server/server_base.h"
34
35
#include <algorithm>
36
#include <string>
37
#include <thread>
38
#include <vector>
39
40
#include <boost/algorithm/string/predicate.hpp>
41
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/encryption/encryption_util.h"
45
46
#include "yb/fs/fs_manager.h"
47
48
#include "yb/gutil/strings/strcat.h"
49
#include "yb/gutil/sysinfo.h"
50
#include "yb/gutil/walltime.h"
51
52
#include "yb/rpc/messenger.h"
53
#include "yb/rpc/proxy.h"
54
55
#include "yb/server/default-path-handlers.h"
56
#include "yb/server/generic_service.h"
57
#include "yb/server/glog_metrics.h"
58
#include "yb/server/hybrid_clock.h"
59
#include "yb/server/logical_clock.h"
60
#include "yb/server/rpc_server.h"
61
#include "yb/server/rpcz-path-handler.h"
62
#include "yb/server/server_base.pb.h"
63
#include "yb/server/server_base_options.h"
64
#include "yb/server/tcmalloc_metrics.h"
65
#include "yb/server/tracing-path-handlers.h"
66
#include "yb/server/webserver.h"
67
68
#include "yb/util/atomic.h"
69
#include "yb/util/concurrent_value.h"
70
#include "yb/util/env.h"
71
#include "yb/util/flag_tags.h"
72
#include "yb/util/jsonwriter.h"
73
#include "yb/util/mem_tracker.h"
74
#include "yb/util/metrics.h"
75
#include "yb/util/monotime.h"
76
#include "yb/util/net/net_util.h"
77
#include "yb/util/net/sockaddr.h"
78
#include "yb/util/pb_util.h"
79
#include "yb/util/rolling_log.h"
80
#include "yb/util/spinlock_profiling.h"
81
#include "yb/util/status.h"
82
#include "yb/util/status_log.h"
83
#include "yb/util/timestamp.h"
84
#include "yb/util/thread.h"
85
#include "yb/util/version_info.h"
86
87
DEFINE_int32(num_reactor_threads, -1,
88
             "Number of libev reactor threads to start. If -1, the value is automatically set.");
89
TAG_FLAG(num_reactor_threads, advanced);
90
91
DECLARE_bool(use_hybrid_clock);
92
93
DEFINE_int32(generic_svc_num_threads, 10,
94
             "Number of RPC worker threads to run for the generic service");
95
TAG_FLAG(generic_svc_num_threads, advanced);
96
97
DEFINE_int32(generic_svc_queue_length, 50,
98
             "RPC Queue length for the generic service");
99
TAG_FLAG(generic_svc_queue_length, advanced);
100
101
DEFINE_string(yb_test_name, "",
102
              "Specifies test name this daemon is running as part of.");
103
104
DEFINE_bool(TEST_check_broadcast_address, true, "Break connectivity in test mini cluster to "
105
            "check broadcast address.");
106
107
DEFINE_test_flag(string, public_hostname_suffix, ".ip.yugabyte", "Suffix for public hostnames.");
108
109
DEFINE_test_flag(bool, simulate_port_conflict_error, false,
110
                 "Simulate a port conflict error during initialization.");
111
112
DEFINE_test_flag(int32, nodes_per_cloud, 2,
113
                 "Number of nodes per cloud to test private and public addresses.");
114
115
METRIC_DEFINE_lag(server, server_uptime_ms,
116
                  "Server uptime",
117
                  "The amount of time a server has been up for.");
118
119
using namespace std::literals;
120
using namespace std::placeholders;
121
122
using std::shared_ptr;
123
using std::string;
124
using std::stringstream;
125
using std::vector;
126
using strings::Substitute;
127
128
namespace yb {
129
namespace server {
130
131
namespace {
132
133
// Disambiguates between servers when in a minicluster.
134
AtomicInt<int32_t> mem_tracker_id_counter(-1);
135
136
std::string kServerMemTrackerName = "server";
137
138
struct CommonMemTrackers {
139
  std::vector<MemTrackerPtr> trackers;
140
141
0
  ~CommonMemTrackers() {
142
#if defined(TCMALLOC_ENABLED)
143
    // Prevent root mem tracker from accessing common mem trackers.
144
    auto root = MemTracker::GetRootTracker();
145
    root->SetPollChildrenConsumptionFunctors(nullptr);
146
#endif
147
0
  }
148
};
149
150
std::unique_ptr<CommonMemTrackers> common_mem_trackers;
151
152
} // anonymous namespace
153
154
17.3k
std::shared_ptr<MemTracker> CreateMemTrackerForServer() {
155
17.3k
  int32_t id = mem_tracker_id_counter.Increment();
156
17.3k
  std::string id_str = kServerMemTrackerName;
157
17.3k
  if (id != 0) {
158
1.58k
    StrAppend(&id_str, " ", id);
159
1.58k
  }
160
17.3k
  return MemTracker::CreateTracker(id_str);
161
17.3k
}
162
163
#if defined(TCMALLOC_ENABLED)
164
void RegisterTCMallocTracker(const char* name, const char* prop) {
165
  common_mem_trackers->trackers.push_back(MemTracker::CreateTracker(
166
      -1, "TCMalloc "s + name, std::bind(&MemTracker::GetTCMallocProperty, prop)));
167
}
168
#endif
169
170
RpcServerBase::RpcServerBase(string name, const ServerBaseOptions& options,
171
                             const string& metric_namespace,
172
                             MemTrackerPtr mem_tracker,
173
                             const scoped_refptr<server::Clock>& clock)
174
    : name_(std::move(name)),
175
      mem_tracker_(std::move(mem_tracker)),
176
      metric_registry_(new MetricRegistry()),
177
      metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_.get(), metric_namespace)),
178
      options_(options),
179
      initialized_(false),
180
26.4k
      stop_metrics_logging_latch_(1) {
181
26.4k
  mem_tracker_->SetMetricEntity(metric_entity_);
182
183
#if defined(TCMALLOC_ENABLED)
184
  // When mem tracker for first server is created we register mem trackers that report tc malloc
185
  // status.
186
  if (mem_tracker_->id() == kServerMemTrackerName) {
187
    common_mem_trackers = std::make_unique<CommonMemTrackers>();
188
189
    RegisterTCMallocTracker("Thread Cache", "tcmalloc.thread_cache_free_bytes");
190
    RegisterTCMallocTracker("Central Cache", "tcmalloc.central_cache_free_bytes");
191
    RegisterTCMallocTracker("Transfer Cache", "tcmalloc.transfer_cache_free_bytes");
192
    RegisterTCMallocTracker("PageHeap Free", "tcmalloc.pageheap_free_bytes");
193
194
    auto root = MemTracker::GetRootTracker();
195
    root->SetPollChildrenConsumptionFunctors([]() {
196
          for (auto& tracker : common_mem_trackers->trackers) {
197
            tracker->UpdateConsumption();
198
          }
199
        });
200
  }
201
#endif
202
203
26.4k
  if (clock) {
204
6.11k
    clock_ = clock;
205
6.11k
    external_clock_ = true;
206
20.2k
  } else if (FLAGS_use_hybrid_clock) {
207
20.2k
    clock_ = new HybridClock();
208
20.2k
  } else {
209
76
    clock_ = LogicalClock::CreateStartingAt(HybridTime::kInitial);
210
76
  }
211
26.4k
}
212
213
void RpcServerBase::SetConnectionContextFactory(
214
26.4k
    rpc::ConnectionContextFactoryPtr connection_context_factory) {
215
26.4k
  rpc_server_.reset(new RpcServer(name_, options_.rpc_opts, std::move(connection_context_factory)));
216
26.4k
}
217
218
249
RpcServerBase::~RpcServerBase() {
219
249
  Shutdown();
220
249
  rpc_server_.reset();
221
249
  messenger_.reset();
222
249
  if (mem_tracker_->parent()) {
223
249
    mem_tracker_->UnregisterFromParent();
224
249
  }
225
249
}
226
227
7.94k
const std::vector<Endpoint>& RpcServerBase::rpc_addresses() const {
228
7.94k
  return rpc_server_->GetBoundAddresses();
229
7.94k
}
230
231
187k
Endpoint RpcServerBase::first_rpc_address() const {
232
187k
  const auto& addrs = rpc_server_->GetBoundAddresses();
233
187k
  CHECK
(!addrs.empty()) << "Not bound"0
;
234
187k
  return addrs[0];
235
187k
}
236
237
8.04k
const std::string RpcServerBase::get_hostname() const {
238
8.04k
  auto hostname = GetHostname();
239
8.04k
  if (hostname.ok()) {
240
8.04k
    
YB_LOG_FIRST_N7.97k
(INFO, 1) << "Running on host: " << *hostname7.97k
;
241
8.04k
    return *hostname;
242
8.04k
  } else {
243
0
    YB_LOG_FIRST_N(WARNING, 1) << "Failed to get current host name: " << hostname.status();
244
0
    return "unknown_hostname";
245
0
  }
246
8.04k
}
247
248
67.0M
const NodeInstancePB& RpcServerBase::instance_pb() const {
249
67.0M
  return *DCHECK_NOTNULL(instance_pb_.get());
250
67.0M
}
251
252
26.3k
Status RpcServerBase::SetupMessengerBuilder(rpc::MessengerBuilder* builder) {
253
26.3k
  if (FLAGS_num_reactor_threads == -1) {
254
    // Auto set the number of reactors based on the number of cores.
255
15.7k
    FLAGS_num_reactor_threads =
256
15.7k
        std::min(16, static_cast<int>(base::NumCPUs()));
257
15.7k
    LOG(INFO) << "Auto setting FLAGS_num_reactor_threads to " << FLAGS_num_reactor_threads;
258
15.7k
  }
259
260
26.3k
  builder->set_num_reactors(FLAGS_num_reactor_threads);
261
26.3k
  builder->set_metric_entity(metric_entity());
262
26.3k
  builder->set_connection_keepalive_time(options_.rpc_opts.connection_keepalive_time_ms * 1ms);
263
264
26.3k
  return Status::OK();
265
26.3k
}
266
267
26.3k
Status RpcServerBase::Init() {
268
26.3k
  CHECK(!initialized_);
269
270
26.3k
  glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
271
26.3k
  tcmalloc::RegisterMetrics(metric_entity_);
272
26.3k
  RegisterSpinLockContentionMetrics(metric_entity_);
273
274
26.3k
  InitSpinLockContentionProfiling();
275
276
26.3k
  RETURN_NOT_OK(SetStackTraceSignal(SIGUSR2));
277
278
  // Initialize the clock immediately. This checks that the clock is synchronized
279
  // so we're less likely to get into a partially initialized state on disk during startup
280
  // if we're having clock problems.
281
26.3k
  if (!external_clock_) {
282
20.2k
    RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
283
20.2k
  }
284
285
  // Create the Messenger.
286
26.3k
  rpc::MessengerBuilder builder(name_);
287
26.3k
  builder.UseDefaultConnectionContextFactory(mem_tracker());
288
26.3k
  RETURN_NOT_OK(SetupMessengerBuilder(&builder));
289
26.3k
  messenger_ = VERIFY_RESULT(builder.Build());
290
0
  proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_.get());
291
292
26.3k
  RETURN_NOT_OK(rpc_server_->Init(messenger_.get()));
293
26.3k
  RETURN_NOT_OK(rpc_server_->Bind());
294
26.3k
  clock_->RegisterMetrics(metric_entity_);
295
296
26.3k
  RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
297
298
26.3k
  initialized_ = true;
299
26.3k
  return Status::OK();
300
26.3k
}
301
302
0
string RpcServerBase::ToString() const {
303
0
  return strings::Substitute("$0 : rpc=$1", name_, yb::ToString(first_rpc_address()));
304
0
}
305
306
4.70k
void RpcServerBase::GetStatusPB(ServerStatusPB* status) const {
307
  // Node instance
308
4.70k
  status->mutable_node_instance()->CopyFrom(*instance_pb_);
309
310
  // RPC ports
311
4.70k
  {
312
4.70k
    for (const auto& addr : rpc_server_->GetBoundAddresses()) {
313
4.70k
      HostPortPB* pb = status->add_bound_rpc_addresses();
314
4.70k
      pb->set_host(addr.address().to_string());
315
4.70k
      pb->set_port(addr.port());
316
4.70k
    }
317
4.70k
  }
318
319
4.70k
  VersionInfo::GetVersionInfoPB(status->mutable_version_info());
320
4.70k
}
321
322
31.8k
CloudInfoPB RpcServerBase::MakeCloudInfoPB() const {
323
31.8k
  return options_.MakeCloudInfoPB();
324
31.8k
}
325
326
Status RpcServerBase::DumpServerInfo(const string& path,
327
4.69k
                                     const string& format) const {
328
4.69k
  ServerStatusPB status;
329
4.69k
  GetStatusPB(&status);
330
331
4.69k
  if (boost::iequals(format, "json")) {
332
0
    string json = JsonWriter::ToJson(status, JsonWriter::PRETTY);
333
0
    RETURN_NOT_OK(WriteStringToFile(options_.env, Slice(json), path));
334
4.69k
  } else if (boost::iequals(format, "pb")) {
335
    // TODO: Use PB container format?
336
4.69k
    RETURN_NOT_OK(pb_util::WritePBToPath(options_.env, path, status,
337
4.69k
                                         pb_util::NO_SYNC)); // durability doesn't matter
338
4.69k
  } else {
339
0
    return STATUS(InvalidArgument, "bad format", format);
340
0
  }
341
342
4.69k
  LOG(INFO) << "Dumped server information to " << path;
343
4.69k
  return Status::OK();
344
4.69k
}
345
346
Status RpcServerBase::RegisterService(size_t queue_limit,
347
                                      rpc::ServiceIfPtr rpc_impl,
348
209k
                                      rpc::ServicePriority priority) {
349
209k
  return rpc_server_->RegisterService(queue_limit, std::move(rpc_impl), priority);
350
209k
}
351
352
25.8k
Status RpcServerBase::StartMetricsLogging() {
353
25.8k
  if (options_.metrics_log_interval_ms <= 0) {
354
25.8k
    return Status::OK();
355
25.8k
  }
356
357
0
  return Thread::Create("server", "metrics-logger", &RpcAndWebServerBase::MetricsLoggingThread,
358
0
                        this, &metrics_logging_thread_);
359
25.8k
}
360
361
0
void RpcServerBase::MetricsLoggingThread() {
362
0
  RollingLog log(Env::Default(), FLAGS_log_dir, "metrics");
363
364
  // How long to wait before trying again if we experience a failure
365
  // logging metrics.
366
0
  const MonoDelta kWaitBetweenFailures = MonoDelta::FromSeconds(60);
367
368
0
  MonoTime next_log = MonoTime::Now();
369
0
  while (!stop_metrics_logging_latch_.WaitUntil(next_log)) {
370
0
    next_log = MonoTime::Now();
371
0
    next_log.AddDelta(MonoDelta::FromMilliseconds(options_.metrics_log_interval_ms));
372
373
0
    std::stringstream buf;
374
0
    buf << "metrics " << GetCurrentTimeMicros() << " ";
375
376
    // Collect the metrics JSON string.
377
0
    vector<string> metrics;
378
0
    metrics.push_back("*");
379
0
    MetricJsonOptions opts;
380
0
    opts.include_raw_histograms = true;
381
382
0
    JsonWriter writer(&buf, JsonWriter::COMPACT);
383
0
    Status s = metric_registry_->WriteAsJson(&writer, metrics, opts);
384
0
    if (!s.ok()) {
385
0
      WARN_NOT_OK(s, "Unable to collect metrics to log");
386
0
      next_log.AddDelta(kWaitBetweenFailures);
387
0
      continue;
388
0
    }
389
390
0
    buf << "\n";
391
392
0
    s = log.Append(buf.str());
393
0
    if (!s.ok()) {
394
0
      WARN_NOT_OK(s, "Unable to write metrics to log");
395
0
      next_log.AddDelta(kWaitBetweenFailures);
396
0
      continue;
397
0
    }
398
0
  }
399
400
0
  WARN_NOT_OK(log.Close(), "Unable to close metric log");
401
0
}
402
403
25.7k
Status RpcServerBase::Start() {
404
25.7k
  std::unique_ptr<rpc::ServiceIf> gsvc_impl(new GenericServiceImpl(this));
405
25.7k
  RETURN_NOT_OK(RegisterService(FLAGS_generic_svc_queue_length, std::move(gsvc_impl)));
406
407
25.7k
  RETURN_NOT_OK(StartRpcServer());
408
409
25.7k
  return Status::OK();
410
25.7k
}
411
412
25.7k
Status RpcServerBase::StartRpcServer() {
413
25.7k
  CHECK(initialized_);
414
415
25.7k
  RETURN_NOT_OK(rpc_server_->Start());
416
417
25.7k
  if (!options_.dump_info_path.empty()) {
418
4.69k
    RETURN_NOT_OK_PREPEND(DumpServerInfo(options_.dump_info_path, options_.dump_info_format),
419
4.69k
                          "Failed to dump server info to " + options_.dump_info_path);
420
4.69k
  }
421
422
25.7k
  return Status::OK();
423
25.7k
}
424
425
879
void RpcServerBase::Shutdown() {
426
879
  if (metrics_logging_thread_) {
427
0
    stop_metrics_logging_latch_.CountDown();
428
0
    metrics_logging_thread_->Join();
429
0
  }
430
879
  if (rpc_server_) {
431
871
    rpc_server_->Shutdown();
432
871
  }
433
879
  if (messenger_) {
434
625
    messenger_->Shutdown();
435
625
  }
436
879
}
437
438
RpcAndWebServerBase::RpcAndWebServerBase(
439
    string name, const ServerBaseOptions& options,
440
    const std::string& metric_namespace,
441
    MemTrackerPtr mem_tracker,
442
    const scoped_refptr<server::Clock>& clock)
443
    : RpcServerBase(name, options, metric_namespace, std::move(mem_tracker), clock),
444
26.4k
      web_server_(new Webserver(options_.CompleteWebserverOptions(), name_)) {
445
26.4k
  FsManagerOpts fs_opts;
446
26.4k
  fs_opts.metric_entity = metric_entity_;
447
26.4k
  fs_opts.parent_mem_tracker = mem_tracker_;
448
26.4k
  fs_opts.wal_paths = options.fs_opts.wal_paths;
449
26.4k
  fs_opts.data_paths = options.fs_opts.data_paths;
450
26.4k
  fs_opts.server_type = options.server_type;
451
26.4k
  fs_manager_.reset(new FsManager(options.env, fs_opts));
452
453
26.4k
  CHECK_OK(StartThreadInstrumentation(metric_entity_, web_server_.get()));
454
26.4k
}
455
456
254
RpcAndWebServerBase::~RpcAndWebServerBase() {
457
254
  Shutdown();
458
254
}
459
460
113
Endpoint RpcAndWebServerBase::first_http_address() const {
461
113
  std::vector<Endpoint> addrs;
462
113
  WARN_NOT_OK(web_server_->GetBoundAddresses(&addrs),
463
113
              "Couldn't get bound webserver addresses");
464
113
  CHECK
(!addrs.empty()) << "Not bound"0
;
465
113
  return addrs[0];
466
113
}
467
468
25.8k
void RpcAndWebServerBase::GenerateInstanceID() {
469
25.8k
  instance_pb_.reset(new NodeInstancePB);
470
25.8k
  instance_pb_->set_permanent_uuid(fs_manager_->uuid());
471
25.8k
  auto now = Env::Default()->NowMicros();
472
473
25.8k
  server_uptime_ms_metric_ = metric_entity_->FindOrCreateAtomicMillisLag(&METRIC_server_uptime_ms);
474
475
  // TODO: maybe actually bump a sequence number on local disk instead of
476
  // using time.
477
25.8k
  instance_pb_->set_instance_seqno(now);
478
25.8k
  instance_pb_->set_start_time_us(now);
479
25.8k
}
480
481
26.4k
Status RpcAndWebServerBase::Init() {
482
26.4k
  encryption::InitOpenSSL();
483
484
26.4k
  Status s = fs_manager_->CheckAndOpenFileSystemRoots();
485
26.4k
  if (s.IsNotFound() || 
(9.16k
!s.ok()9.16k
&&
fs_manager_->HasAnyLockFiles()0
)) {
486
17.2k
    LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
487
17.2k
    LOG(INFO) << "Creating new FS layout";
488
17.2k
    RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(true),
489
17.2k
                          "Could not create new FS layout");
490
17.2k
    s = fs_manager_->CheckAndOpenFileSystemRoots();
491
17.2k
  }
492
26.3k
  RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
493
494
26.3k
  if (PREDICT_FALSE(FLAGS_TEST_simulate_port_conflict_error)) {
495
1
    return STATUS(NetworkError, "Simulated port conflict error");
496
1
  }
497
498
26.3k
  RETURN_NOT_OK(RpcServerBase::Init());
499
500
26.3k
  return Status::OK();
501
26.3k
}
502
503
4.70k
void RpcAndWebServerBase::GetStatusPB(ServerStatusPB* status) const {
504
4.70k
  RpcServerBase::GetStatusPB(status);
505
506
  // HTTP ports
507
4.70k
  {
508
4.70k
    std::vector<Endpoint> addrs;
509
4.70k
    CHECK_OK(web_server_->GetBoundAddresses(&addrs));
510
5.37k
    for (const auto& addr : addrs) {
511
5.37k
      HostPortPB* pb = status->add_bound_http_addresses();
512
5.37k
      pb->set_host(addr.address().to_string());
513
5.37k
      pb->set_port(addr.port());
514
5.37k
    }
515
4.70k
  }
516
4.70k
}
517
518
71.8k
Status RpcAndWebServerBase::GetRegistration(ServerRegistrationPB* reg, RpcOnly rpc_only) const {
519
71.8k
  std::vector<HostPort> addrs = CHECK_NOTNULL(rpc_server())->GetRpcHostPort();
520
71.8k
  DCHECK_GE(addrs.size(), 1);
521
522
  // Fall back to hostname resolution if the rpc hostname is a wildcard.
523
71.8k
  if (
addrs.size() != 171.8k
||
IsWildcardAddress(addrs[0].host())71.8k
|| addrs[0].port() == 0) {
524
107
    vector<Endpoint> endpoints =
525
107
        CHECK_NOTNULL(rpc_server())->GetBoundAddresses();
526
107
    RETURN_NOT_OK_PREPEND(
527
107
        AddHostPortPBs(endpoints, reg->mutable_private_rpc_addresses()),
528
107
        "Failed to add RPC endpoints to registration");
529
108
    for (const auto &addr : reg->private_rpc_addresses()) {
530
108
      LOG(INFO) << " Using private rpc addresses: ( " << addr.ShortDebugString()
531
108
                << " )";
532
108
    }
533
71.7k
  } else {
534
71.7k
    HostPortsToPBs(addrs, reg->mutable_private_rpc_addresses());
535
71.7k
    LOG(INFO) << "Using private rpc address "
536
71.7k
              << reg->private_rpc_addresses(0).host();
537
71.7k
  }
538
539
71.8k
  HostPortsToPBs(options_.broadcast_addresses, reg->mutable_broadcast_addresses());
540
541
71.8k
  if (!rpc_only) {
542
17.4k
    HostPort web_input_hp;
543
17.4k
    RETURN_NOT_OK(CHECK_NOTNULL(web_server())->GetInputHostPort(&web_input_hp));
544
17.4k
    if (IsWildcardAddress(web_input_hp.host()) || 
web_input_hp.port() == 017.4k
) {
545
2
      std::vector<Endpoint> web_addrs;
546
2
      RETURN_NOT_OK_PREPEND(
547
2
          CHECK_NOTNULL(web_server())->GetBoundAddresses(&web_addrs),
548
2
          "Unable to get bound HTTP addresses");
549
0
      RETURN_NOT_OK_PREPEND(AddHostPortPBs(
550
0
          web_addrs, reg->mutable_http_addresses()),
551
0
          "Failed to add HTTP addresses to registration");
552
0
      for (const auto &addr : reg->http_addresses()) {
553
0
        LOG(INFO) << "Using http addresses: ( " << addr.ShortDebugString() << " )";
554
0
      }
555
17.4k
    } else {
556
17.4k
      HostPortsToPBs({ web_input_hp }, reg->mutable_http_addresses());
557
17.4k
      LOG(INFO) << "Using http address " << reg->http_addresses(0).host();
558
17.4k
    }
559
17.4k
  }
560
71.8k
  reg->mutable_cloud_info()->set_placement_cloud(options_.placement_cloud());
561
71.8k
  reg->mutable_cloud_info()->set_placement_region(options_.placement_region());
562
71.8k
  reg->mutable_cloud_info()->set_placement_zone(options_.placement_zone());
563
71.8k
  reg->set_placement_uuid(options_.placement_uuid);
564
71.8k
  return Status::OK();
565
71.8k
}
566
567
25.8k
string RpcAndWebServerBase::GetEasterEggMessage() const {
568
25.8k
  return "Congratulations on installing YugabyteDB. "
569
25.8k
         "We'd like to welcome you to the community with a free t-shirt and pack of stickers! "
570
25.8k
         "Please claim your reward here: <a href='https://www.yugabyte.com/community-rewards/'>"
571
25.8k
         "https://www.yugabyte.com/community-rewards/</a>";
572
573
25.8k
}
574
575
25.8k
string RpcAndWebServerBase::FooterHtml() const {
576
25.8k
  return Substitute("<pre class='message'><i class=\"fa-lg fa fa-gift\" aria-hidden=\"true\"></i>"
577
25.8k
                    " $0</pre><pre>$1\nserver uuid $2 local time $3</pre>",
578
25.8k
                    GetEasterEggMessage(),
579
25.8k
                    VersionInfo::GetShortVersionString(),
580
25.8k
                    instance_pb_->permanent_uuid(),
581
25.8k
                    Timestamp(GetCurrentTimeMicros()).ToHumanReadableTime());
582
25.8k
}
583
584
void RpcAndWebServerBase::DisplayIconTile(std::stringstream* output, const string icon,
585
0
                                          const string caption, const string url) {
586
0
  *output << "  <div class='col-sm-4 col-md-4 dbg-tile'>\n"
587
0
          << "    <a href='" << url << "' class='thumbnail'>\n"
588
0
          << "      <div class='dbg-icon'>\n"
589
0
          << "        <i class='fa " << icon << "' aria-hidden='true'></i>\n"
590
0
          << "      </div>\n"
591
0
          << "      <div class='caption dbg-caption'>\n"
592
0
          << "        <h3>" << caption << "</h3>\n"
593
0
          << "      </div> <!-- caption -->\n"
594
0
          << "    </a> <!-- thumbnail -->\n"
595
0
          << "  </div> <!-- col-sm-4 col-md-4 -->\n";
596
0
}
597
598
0
void RpcAndWebServerBase::DisplayGeneralInfoIcons(std::stringstream* output) {
599
  // Logs.
600
0
  DisplayIconTile(output, "fa-files-o", "Logs", "/logs");
601
  // GFlags.
602
0
  DisplayIconTile(output, "fa-flag-o", "GFlags", "/varz");
603
  // Memory trackers.
604
0
  DisplayIconTile(output, "fa-bar-chart", "Memory Breakdown", "/mem-trackers");
605
  // Total memory.
606
0
  DisplayIconTile(output, "fa-cog", "Total Memory", "/memz");
607
  // Metrics.
608
0
  DisplayIconTile(output, "fa-line-chart", "Metrics", "/prometheus-metrics");
609
  // Threads.
610
0
  DisplayIconTile(output, "fa-microchip", "Threads", "/threadz");
611
  // Drives.
612
0
  DisplayIconTile(output, "fa-hdd-o", "Drives", "/drives");
613
0
}
614
615
0
Status RpcAndWebServerBase::DisplayRpcIcons(std::stringstream* output) {
616
  // RPCs in Progress.
617
0
  DisplayIconTile(output, "fa-tasks", "Server RPCs", "/rpcz");
618
0
  return Status::OK();
619
0
}
620
621
Status RpcAndWebServerBase::HandleDebugPage(const Webserver::WebRequest& req,
622
0
                                            Webserver::WebResponse* resp) {
623
0
  std::stringstream *output = &resp->output;
624
0
  *output << "<h1>Debug Utilities</h1>\n";
625
626
0
  *output << "<div class='row debug-tiles'>\n";
627
0
  *output << "<h2> General Info </h2>";
628
0
  DisplayGeneralInfoIcons(output);
629
0
  *output << "</div> <!-- row -->\n";
630
0
  *output << "<h2> RPCs In Progress </h2>";
631
0
  *output << "<div class='row debug-tiles'>\n";
632
0
  RETURN_NOT_OK(DisplayRpcIcons(output));
633
0
  *output << "</div> <!-- row -->\n";
634
0
  return Status::OK();
635
0
}
636
637
25.8k
Status RpcAndWebServerBase::Start() {
638
25.8k
  GenerateInstanceID();
639
640
25.8k
  AddDefaultPathHandlers(web_server_.get());
641
25.8k
  AddRpczPathHandlers(messenger_.get(), web_server_.get());
642
25.8k
  RegisterMetricsJsonHandler(web_server_.get(), metric_registry_.get());
643
25.8k
  RegisterPathUsageHandler(web_server_.get(), fs_manager_.get());
644
25.8k
  TracingPathHandlers::RegisterHandlers(web_server_.get());
645
25.8k
  web_server_->RegisterPathHandler("/utilz", "Utilities",
646
25.8k
                                   std::bind(&RpcAndWebServerBase::HandleDebugPage, this, _1, _2),
647
25.8k
                                   true, true, "fa fa-wrench");
648
25.8k
  web_server_->set_footer_html(FooterHtml());
649
25.8k
  RETURN_NOT_OK(web_server_->Start());
650
651
25.7k
  RETURN_NOT_OK(RpcServerBase::Start());
652
653
25.7k
  return Status::OK();
654
25.7k
}
655
656
630
void RpcAndWebServerBase::Shutdown() {
657
630
  RpcServerBase::Shutdown();
658
630
  web_server_->Stop();
659
630
}
660
661
80.3k
std::string TEST_RpcAddress(size_t index, Private priv) {
662
80.3k
  return Format("127.0.0.$0$1",
663
80.3k
                index * 2 + (priv ? 
041.1k
:
139.1k
), priv ?
""41.1k
:
FLAGS_TEST_public_hostname_suffix39.1k
);
664
80.3k
}
665
666
4.14k
string TEST_RpcBindEndpoint(size_t index, uint16_t port) {
667
4.14k
  return HostPortToString(TEST_RpcAddress(index, Private::kTrue), port);
668
4.14k
}
669
670
constexpr int kMaxServers = 20;
671
constexpr int kMinServerIdx = 1;
672
673
// We group servers by two. Servers in the same group communciate via private connection. Servers in
674
// different groups communicate via public connection.
675
36.9k
size_t ServerGroupNum(size_t server_idx) {
676
36.9k
  return (server_idx - 1) / FLAGS_TEST_nodes_per_cloud;
677
36.9k
}
678
679
1.94k
void TEST_SetupConnectivity(rpc::Messenger* messenger, size_t index) {
680
1.94k
  if (!FLAGS_TEST_check_broadcast_address) {
681
179
    return;
682
179
  }
683
684
1.76k
  CHECK_GE(index, kMinServerIdx);
685
1.76k
  CHECK_LE(index, kMaxServers);
686
687
1.76k
  messenger->TEST_SetOutboundIpBase(
688
1.76k
      CHECK_RESULT(HostToAddress(TEST_RpcAddress(index, Private::kTrue))));
689
1.76k
  auto server_group = ServerGroupNum(index);
690
36.9k
  for (int other_server_idx = kMinServerIdx; other_server_idx <= kMaxServers; 
++other_server_idx35.2k
) {
691
    // We group servers by 2. When servers belongs to the same group, they should use
692
    // private ip for communication, otherwise public ip should be used.
693
35.2k
    bool same_group = ServerGroupNum(other_server_idx) == server_group;
694
35.2k
    auto broken_address = CHECK_RESULT(
695
35.2k
        HostToAddress(TEST_RpcAddress(other_server_idx, Private(!same_group))));
696
35.2k
    LOG(INFO) << "Break " << index << " => " << broken_address;
697
35.2k
    messenger->BreakConnectivityWith(broken_address);
698
35.2k
    auto working_address = CHECK_RESULT(
699
35.2k
        HostToAddress(TEST_RpcAddress(other_server_idx, Private(same_group))));
700
35.2k
    messenger->RestoreConnectivityWith(working_address);
701
35.2k
  }
702
1.76k
}
703
704
0
void TEST_Isolate(rpc::Messenger* messenger) {
705
0
  for (int other_server_idx = kMinServerIdx; other_server_idx <= kMaxServers; ++other_server_idx) {
706
0
    messenger->BreakConnectivityWith(
707
0
        CHECK_RESULT(HostToAddress(TEST_RpcAddress(other_server_idx, Private::kTrue))));
708
0
    messenger->BreakConnectivityWith(
709
0
        CHECK_RESULT(HostToAddress(TEST_RpcAddress(other_server_idx, Private::kFalse))));
710
0
  }
711
0
}
712
713
} // namespace server
714
} // namespace yb