/Users/deen/code/yugabyte-db/src/yb/server/server_base.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/server/server_base.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <string> |
37 | | #include <thread> |
38 | | #include <vector> |
39 | | |
40 | | #include <boost/algorithm/string/predicate.hpp> |
41 | | |
42 | | #include "yb/common/wire_protocol.h" |
43 | | |
44 | | #include "yb/encryption/encryption_util.h" |
45 | | |
46 | | #include "yb/fs/fs_manager.h" |
47 | | |
48 | | #include "yb/gutil/strings/strcat.h" |
49 | | #include "yb/gutil/sysinfo.h" |
50 | | #include "yb/gutil/walltime.h" |
51 | | |
52 | | #include "yb/rpc/messenger.h" |
53 | | #include "yb/rpc/proxy.h" |
54 | | |
55 | | #include "yb/server/default-path-handlers.h" |
56 | | #include "yb/server/generic_service.h" |
57 | | #include "yb/server/glog_metrics.h" |
58 | | #include "yb/server/hybrid_clock.h" |
59 | | #include "yb/server/logical_clock.h" |
60 | | #include "yb/server/rpc_server.h" |
61 | | #include "yb/server/rpcz-path-handler.h" |
62 | | #include "yb/server/server_base.pb.h" |
63 | | #include "yb/server/server_base_options.h" |
64 | | #include "yb/server/tcmalloc_metrics.h" |
65 | | #include "yb/server/tracing-path-handlers.h" |
66 | | #include "yb/server/webserver.h" |
67 | | |
68 | | #include "yb/util/atomic.h" |
69 | | #include "yb/util/concurrent_value.h" |
70 | | #include "yb/util/env.h" |
71 | | #include "yb/util/flag_tags.h" |
72 | | #include "yb/util/jsonwriter.h" |
73 | | #include "yb/util/mem_tracker.h" |
74 | | #include "yb/util/metrics.h" |
75 | | #include "yb/util/monotime.h" |
76 | | #include "yb/util/net/net_util.h" |
77 | | #include "yb/util/net/sockaddr.h" |
78 | | #include "yb/util/pb_util.h" |
79 | | #include "yb/util/rolling_log.h" |
80 | | #include "yb/util/spinlock_profiling.h" |
81 | | #include "yb/util/status.h" |
82 | | #include "yb/util/status_log.h" |
83 | | #include "yb/util/timestamp.h" |
84 | | #include "yb/util/thread.h" |
85 | | #include "yb/util/version_info.h" |
86 | | |
87 | | DEFINE_int32(num_reactor_threads, -1, |
88 | | "Number of libev reactor threads to start. If -1, the value is automatically set."); |
89 | | TAG_FLAG(num_reactor_threads, advanced); |
90 | | |
91 | | DECLARE_bool(use_hybrid_clock); |
92 | | |
93 | | DEFINE_int32(generic_svc_num_threads, 10, |
94 | | "Number of RPC worker threads to run for the generic service"); |
95 | | TAG_FLAG(generic_svc_num_threads, advanced); |
96 | | |
97 | | DEFINE_int32(generic_svc_queue_length, 50, |
98 | | "RPC Queue length for the generic service"); |
99 | | TAG_FLAG(generic_svc_queue_length, advanced); |
100 | | |
101 | | DEFINE_string(yb_test_name, "", |
102 | | "Specifies test name this daemon is running as part of."); |
103 | | |
104 | | DEFINE_bool(TEST_check_broadcast_address, true, "Break connectivity in test mini cluster to " |
105 | | "check broadcast address."); |
106 | | |
107 | | DEFINE_test_flag(string, public_hostname_suffix, ".ip.yugabyte", "Suffix for public hostnames."); |
108 | | |
109 | | DEFINE_test_flag(bool, simulate_port_conflict_error, false, |
110 | | "Simulate a port conflict error during initialization."); |
111 | | |
112 | | DEFINE_test_flag(int32, nodes_per_cloud, 2, |
113 | | "Number of nodes per cloud to test private and public addresses."); |
114 | | |
115 | | METRIC_DEFINE_lag(server, server_uptime_ms, |
116 | | "Server uptime", |
117 | | "The amount of time a server has been up for."); |
118 | | |
119 | | using namespace std::literals; |
120 | | using namespace std::placeholders; |
121 | | |
122 | | using std::shared_ptr; |
123 | | using std::string; |
124 | | using std::stringstream; |
125 | | using std::vector; |
126 | | using strings::Substitute; |
127 | | |
128 | | namespace yb { |
129 | | namespace server { |
130 | | |
131 | | namespace { |
132 | | |
133 | | // Disambiguates between servers when in a minicluster. |
134 | | AtomicInt<int32_t> mem_tracker_id_counter(-1); |
135 | | |
136 | | std::string kServerMemTrackerName = "server"; |
137 | | |
138 | | struct CommonMemTrackers { |
139 | | std::vector<MemTrackerPtr> trackers; |
140 | | |
141 | 0 | ~CommonMemTrackers() { |
142 | | #if defined(TCMALLOC_ENABLED) |
143 | | // Prevent root mem tracker from accessing common mem trackers. |
144 | | auto root = MemTracker::GetRootTracker(); |
145 | | root->SetPollChildrenConsumptionFunctors(nullptr); |
146 | | #endif |
147 | 0 | } |
148 | | }; |
149 | | |
150 | | std::unique_ptr<CommonMemTrackers> common_mem_trackers; |
151 | | |
152 | | } // anonymous namespace |
153 | | |
154 | 11.5k | std::shared_ptr<MemTracker> CreateMemTrackerForServer() { |
155 | 11.5k | int32_t id = mem_tracker_id_counter.Increment(); |
156 | 11.5k | std::string id_str = kServerMemTrackerName; |
157 | 11.5k | if (id != 0) { |
158 | 752 | StrAppend(&id_str, " ", id); |
159 | 752 | } |
160 | 11.5k | return MemTracker::CreateTracker(id_str); |
161 | 11.5k | } |
162 | | |
163 | | #if defined(TCMALLOC_ENABLED) |
164 | | void RegisterTCMallocTracker(const char* name, const char* prop) { |
165 | | common_mem_trackers->trackers.push_back(MemTracker::CreateTracker( |
166 | | -1, "TCMalloc "s + name, std::bind(&MemTracker::GetTCMallocProperty, prop))); |
167 | | } |
168 | | #endif |
169 | | |
170 | | RpcServerBase::RpcServerBase(string name, const ServerBaseOptions& options, |
171 | | const string& metric_namespace, |
172 | | MemTrackerPtr mem_tracker, |
173 | | const scoped_refptr<server::Clock>& clock) |
174 | | : name_(std::move(name)), |
175 | | mem_tracker_(std::move(mem_tracker)), |
176 | | metric_registry_(new MetricRegistry()), |
177 | | metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_.get(), metric_namespace)), |
178 | | options_(options), |
179 | | initialized_(false), |
180 | 17.5k | stop_metrics_logging_latch_(1) { |
181 | 17.5k | mem_tracker_->SetMetricEntity(metric_entity_); |
182 | | |
183 | | #if defined(TCMALLOC_ENABLED) |
184 | | // When mem tracker for first server is created we register mem trackers that report tc malloc |
185 | | // status. |
186 | | if (mem_tracker_->id() == kServerMemTrackerName) { |
187 | | common_mem_trackers = std::make_unique<CommonMemTrackers>(); |
188 | | |
189 | | RegisterTCMallocTracker("Thread Cache", "tcmalloc.thread_cache_free_bytes"); |
190 | | RegisterTCMallocTracker("Central Cache", "tcmalloc.central_cache_free_bytes"); |
191 | | RegisterTCMallocTracker("Transfer Cache", "tcmalloc.transfer_cache_free_bytes"); |
192 | | RegisterTCMallocTracker("PageHeap Free", "tcmalloc.pageheap_free_bytes"); |
193 | | |
194 | | auto root = MemTracker::GetRootTracker(); |
195 | | root->SetPollChildrenConsumptionFunctors([]() { |
196 | | for (auto& tracker : common_mem_trackers->trackers) { |
197 | | tracker->UpdateConsumption(); |
198 | | } |
199 | | }); |
200 | | } |
201 | | #endif |
202 | | |
203 | 17.5k | if (clock) { |
204 | 4.54k | clock_ = clock; |
205 | 4.54k | external_clock_ = true; |
206 | 12.9k | } else if (FLAGS_use_hybrid_clock) { |
207 | 12.8k | clock_ = new HybridClock(); |
208 | 76 | } else { |
209 | 76 | clock_ = LogicalClock::CreateStartingAt(HybridTime::kInitial); |
210 | 76 | } |
211 | 17.5k | } |
212 | | |
213 | | void RpcServerBase::SetConnectionContextFactory( |
214 | 17.5k | rpc::ConnectionContextFactoryPtr connection_context_factory) { |
215 | 17.5k | rpc_server_.reset(new RpcServer(name_, options_.rpc_opts, std::move(connection_context_factory))); |
216 | 17.5k | } |
217 | | |
218 | 204 | RpcServerBase::~RpcServerBase() { |
219 | 204 | Shutdown(); |
220 | 204 | rpc_server_.reset(); |
221 | 204 | messenger_.reset(); |
222 | 204 | if (mem_tracker_->parent()) { |
223 | 204 | mem_tracker_->UnregisterFromParent(); |
224 | 204 | } |
225 | 204 | } |
226 | | |
227 | 5.35k | const std::vector<Endpoint>& RpcServerBase::rpc_addresses() const { |
228 | 5.35k | return rpc_server_->GetBoundAddresses(); |
229 | 5.35k | } |
230 | | |
231 | 59.3k | Endpoint RpcServerBase::first_rpc_address() const { |
232 | 59.3k | const auto& addrs = rpc_server_->GetBoundAddresses(); |
233 | 0 | CHECK(!addrs.empty()) << "Not bound"; |
234 | 59.3k | return addrs[0]; |
235 | 59.3k | } |
236 | | |
237 | 5.43k | const std::string RpcServerBase::get_hostname() const { |
238 | 5.43k | auto hostname = GetHostname(); |
239 | 5.43k | if (hostname.ok()) { |
240 | 5.38k | YB_LOG_FIRST_N(INFO, 1) << "Running on host: " << *hostname; |
241 | 5.43k | return *hostname; |
242 | 0 | } else { |
243 | 0 | YB_LOG_FIRST_N(WARNING, 1) << "Failed to get current host name: " << hostname.status(); |
244 | 0 | return "unknown_hostname"; |
245 | 0 | } |
246 | 5.43k | } |
247 | | |
248 | 15.8M | const NodeInstancePB& RpcServerBase::instance_pb() const { |
249 | 15.8M | return *DCHECK_NOTNULL(instance_pb_.get()); |
250 | 15.8M | } |
251 | | |
252 | 17.5k | Status RpcServerBase::SetupMessengerBuilder(rpc::MessengerBuilder* builder) { |
253 | 17.5k | if (FLAGS_num_reactor_threads == -1) { |
254 | | // Auto set the number of reactors based on the number of cores. |
255 | 10.8k | FLAGS_num_reactor_threads = |
256 | 10.8k | std::min(16, static_cast<int>(base::NumCPUs())); |
257 | 10.8k | LOG(INFO) << "Auto setting FLAGS_num_reactor_threads to " << FLAGS_num_reactor_threads; |
258 | 10.8k | } |
259 | | |
260 | 17.5k | builder->set_num_reactors(FLAGS_num_reactor_threads); |
261 | 17.5k | builder->set_metric_entity(metric_entity()); |
262 | 17.5k | builder->set_connection_keepalive_time(options_.rpc_opts.connection_keepalive_time_ms * 1ms); |
263 | | |
264 | 17.5k | return Status::OK(); |
265 | 17.5k | } |
266 | | |
267 | 17.5k | Status RpcServerBase::Init() { |
268 | 17.5k | CHECK(!initialized_); |
269 | | |
270 | 17.5k | glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_)); |
271 | 17.5k | tcmalloc::RegisterMetrics(metric_entity_); |
272 | 17.5k | RegisterSpinLockContentionMetrics(metric_entity_); |
273 | | |
274 | 17.5k | InitSpinLockContentionProfiling(); |
275 | | |
276 | 17.5k | RETURN_NOT_OK(SetStackTraceSignal(SIGUSR2)); |
277 | | |
278 | | // Initialize the clock immediately. This checks that the clock is synchronized |
279 | | // so we're less likely to get into a partially initialized state on disk during startup |
280 | | // if we're having clock problems. |
281 | 17.5k | if (!external_clock_) { |
282 | 12.9k | RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock"); |
283 | 12.9k | } |
284 | | |
285 | | // Create the Messenger. |
286 | 17.5k | rpc::MessengerBuilder builder(name_); |
287 | 17.5k | builder.UseDefaultConnectionContextFactory(mem_tracker()); |
288 | 17.5k | RETURN_NOT_OK(SetupMessengerBuilder(&builder)); |
289 | 17.5k | messenger_ = VERIFY_RESULT(builder.Build()); |
290 | 17.5k | proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_.get()); |
291 | | |
292 | 17.5k | RETURN_NOT_OK(rpc_server_->Init(messenger_.get())); |
293 | 17.5k | RETURN_NOT_OK(rpc_server_->Bind()); |
294 | 17.4k | clock_->RegisterMetrics(metric_entity_); |
295 | | |
296 | 17.4k | RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging"); |
297 | | |
298 | 17.4k | initialized_ = true; |
299 | 17.4k | return Status::OK(); |
300 | 17.4k | } |
301 | | |
302 | 0 | string RpcServerBase::ToString() const { |
303 | 0 | return strings::Substitute("$0 : rpc=$1", name_, yb::ToString(first_rpc_address())); |
304 | 0 | } |
305 | | |
306 | 2.54k | void RpcServerBase::GetStatusPB(ServerStatusPB* status) const { |
307 | | // Node instance |
308 | 2.54k | status->mutable_node_instance()->CopyFrom(*instance_pb_); |
309 | | |
310 | | // RPC ports |
311 | 2.54k | { |
312 | 2.54k | for (const auto& addr : rpc_server_->GetBoundAddresses()) { |
313 | 2.54k | HostPortPB* pb = status->add_bound_rpc_addresses(); |
314 | 2.54k | pb->set_host(addr.address().to_string()); |
315 | 2.54k | pb->set_port(addr.port()); |
316 | 2.54k | } |
317 | 2.54k | } |
318 | | |
319 | 2.54k | VersionInfo::GetVersionInfoPB(status->mutable_version_info()); |
320 | 2.54k | } |
321 | | |
322 | 19.6k | CloudInfoPB RpcServerBase::MakeCloudInfoPB() const { |
323 | 19.6k | return options_.MakeCloudInfoPB(); |
324 | 19.6k | } |
325 | | |
326 | | Status RpcServerBase::DumpServerInfo(const string& path, |
327 | 2.54k | const string& format) const { |
328 | 2.54k | ServerStatusPB status; |
329 | 2.54k | GetStatusPB(&status); |
330 | | |
331 | 2.54k | if (boost::iequals(format, "json")) { |
332 | 0 | string json = JsonWriter::ToJson(status, JsonWriter::PRETTY); |
333 | 0 | RETURN_NOT_OK(WriteStringToFile(options_.env, Slice(json), path)); |
334 | 2.54k | } else if (boost::iequals(format, "pb")) { |
335 | | // TODO: Use PB container format? |
336 | 2.54k | RETURN_NOT_OK(pb_util::WritePBToPath(options_.env, path, status, |
337 | 2.54k | pb_util::NO_SYNC)); // durability doesn't matter |
338 | 0 | } else { |
339 | 0 | return STATUS(InvalidArgument, "bad format", format); |
340 | 0 | } |
341 | | |
342 | 2.54k | LOG(INFO) << "Dumped server information to " << path; |
343 | 2.54k | return Status::OK(); |
344 | 2.54k | } |
345 | | |
346 | | Status RpcServerBase::RegisterService(size_t queue_limit, |
347 | | rpc::ServiceIfPtr rpc_impl, |
348 | 140k | rpc::ServicePriority priority) { |
349 | 140k | return rpc_server_->RegisterService(queue_limit, std::move(rpc_impl), priority); |
350 | 140k | } |
351 | | |
352 | 17.1k | Status RpcServerBase::StartMetricsLogging() { |
353 | 17.1k | if (options_.metrics_log_interval_ms <= 0) { |
354 | 17.1k | return Status::OK(); |
355 | 17.1k | } |
356 | | |
357 | 0 | return Thread::Create("server", "metrics-logger", &RpcAndWebServerBase::MetricsLoggingThread, |
358 | 0 | this, &metrics_logging_thread_); |
359 | 0 | } |
360 | | |
361 | 0 | void RpcServerBase::MetricsLoggingThread() { |
362 | 0 | RollingLog log(Env::Default(), FLAGS_log_dir, "metrics"); |
363 | | |
364 | | // How long to wait before trying again if we experience a failure |
365 | | // logging metrics. |
366 | 0 | const MonoDelta kWaitBetweenFailures = MonoDelta::FromSeconds(60); |
367 | |
|
368 | 0 | MonoTime next_log = MonoTime::Now(); |
369 | 0 | while (!stop_metrics_logging_latch_.WaitUntil(next_log)) { |
370 | 0 | next_log = MonoTime::Now(); |
371 | 0 | next_log.AddDelta(MonoDelta::FromMilliseconds(options_.metrics_log_interval_ms)); |
372 | |
|
373 | 0 | std::stringstream buf; |
374 | 0 | buf << "metrics " << GetCurrentTimeMicros() << " "; |
375 | | |
376 | | // Collect the metrics JSON string. |
377 | 0 | vector<string> metrics; |
378 | 0 | metrics.push_back("*"); |
379 | 0 | MetricJsonOptions opts; |
380 | 0 | opts.include_raw_histograms = true; |
381 | |
|
382 | 0 | JsonWriter writer(&buf, JsonWriter::COMPACT); |
383 | 0 | Status s = metric_registry_->WriteAsJson(&writer, metrics, opts); |
384 | 0 | if (!s.ok()) { |
385 | 0 | WARN_NOT_OK(s, "Unable to collect metrics to log"); |
386 | 0 | next_log.AddDelta(kWaitBetweenFailures); |
387 | 0 | continue; |
388 | 0 | } |
389 | | |
390 | 0 | buf << "\n"; |
391 | |
|
392 | 0 | s = log.Append(buf.str()); |
393 | 0 | if (!s.ok()) { |
394 | 0 | WARN_NOT_OK(s, "Unable to write metrics to log"); |
395 | 0 | next_log.AddDelta(kWaitBetweenFailures); |
396 | 0 | continue; |
397 | 0 | } |
398 | 0 | } |
399 | |
|
400 | 0 | WARN_NOT_OK(log.Close(), "Unable to close metric log"); |
401 | 0 | } |
402 | | |
403 | 17.1k | Status RpcServerBase::Start() { |
404 | 17.1k | std::unique_ptr<rpc::ServiceIf> gsvc_impl(new GenericServiceImpl(this)); |
405 | 17.1k | RETURN_NOT_OK(RegisterService(FLAGS_generic_svc_queue_length, std::move(gsvc_impl))); |
406 | | |
407 | 17.1k | RETURN_NOT_OK(StartRpcServer()); |
408 | | |
409 | 17.1k | return Status::OK(); |
410 | 17.1k | } |
411 | | |
412 | 17.1k | Status RpcServerBase::StartRpcServer() { |
413 | 17.1k | CHECK(initialized_); |
414 | | |
415 | 17.1k | RETURN_NOT_OK(rpc_server_->Start()); |
416 | | |
417 | 17.1k | if (!options_.dump_info_path.empty()) { |
418 | 2.54k | RETURN_NOT_OK_PREPEND(DumpServerInfo(options_.dump_info_path, options_.dump_info_format), |
419 | 2.54k | "Failed to dump server info to " + options_.dump_info_path); |
420 | 2.54k | } |
421 | | |
422 | 17.1k | return Status::OK(); |
423 | 17.1k | } |
424 | | |
425 | 662 | void RpcServerBase::Shutdown() { |
426 | 662 | if (metrics_logging_thread_) { |
427 | 0 | stop_metrics_logging_latch_.CountDown(); |
428 | 0 | metrics_logging_thread_->Join(); |
429 | 0 | } |
430 | 662 | if (rpc_server_) { |
431 | 660 | rpc_server_->Shutdown(); |
432 | 660 | } |
433 | 662 | if (messenger_) { |
434 | 580 | messenger_->Shutdown(); |
435 | 580 | } |
436 | 662 | } |
437 | | |
438 | | RpcAndWebServerBase::RpcAndWebServerBase( |
439 | | string name, const ServerBaseOptions& options, |
440 | | const std::string& metric_namespace, |
441 | | MemTrackerPtr mem_tracker, |
442 | | const scoped_refptr<server::Clock>& clock) |
443 | | : RpcServerBase(name, options, metric_namespace, std::move(mem_tracker), clock), |
444 | 17.5k | web_server_(new Webserver(options_.CompleteWebserverOptions(), name_)) { |
445 | 17.5k | FsManagerOpts fs_opts; |
446 | 17.5k | fs_opts.metric_entity = metric_entity_; |
447 | 17.5k | fs_opts.parent_mem_tracker = mem_tracker_; |
448 | 17.5k | fs_opts.wal_paths = options.fs_opts.wal_paths; |
449 | 17.5k | fs_opts.data_paths = options.fs_opts.data_paths; |
450 | 17.5k | fs_opts.server_type = options.server_type; |
451 | 17.5k | fs_manager_.reset(new FsManager(options.env, fs_opts)); |
452 | | |
453 | 17.5k | CHECK_OK(StartThreadInstrumentation(metric_entity_, web_server_.get())); |
454 | 17.5k | } |
455 | | |
456 | 204 | RpcAndWebServerBase::~RpcAndWebServerBase() { |
457 | 204 | Shutdown(); |
458 | 204 | } |
459 | | |
460 | 86 | Endpoint RpcAndWebServerBase::first_http_address() const { |
461 | 86 | std::vector<Endpoint> addrs; |
462 | 86 | WARN_NOT_OK(web_server_->GetBoundAddresses(&addrs), |
463 | 86 | "Couldn't get bound webserver addresses"); |
464 | 0 | CHECK(!addrs.empty()) << "Not bound"; |
465 | 86 | return addrs[0]; |
466 | 86 | } |
467 | | |
468 | 17.1k | void RpcAndWebServerBase::GenerateInstanceID() { |
469 | 17.1k | instance_pb_.reset(new NodeInstancePB); |
470 | 17.1k | instance_pb_->set_permanent_uuid(fs_manager_->uuid()); |
471 | 17.1k | auto now = Env::Default()->NowMicros(); |
472 | | |
473 | 17.1k | server_uptime_ms_metric_ = metric_entity_->FindOrCreateAtomicMillisLag(&METRIC_server_uptime_ms); |
474 | | |
475 | | // TODO: maybe actually bump a sequence number on local disk instead of |
476 | | // using time. |
477 | 17.1k | instance_pb_->set_instance_seqno(now); |
478 | 17.1k | instance_pb_->set_start_time_us(now); |
479 | 17.1k | } |
480 | | |
481 | 17.5k | Status RpcAndWebServerBase::Init() { |
482 | 17.5k | encryption::InitOpenSSL(); |
483 | | |
484 | 17.5k | Status s = fs_manager_->Open(); |
485 | 17.5k | if (s.IsNotFound() || (!s.ok() && fs_manager_->HasAnyLockFiles())) { |
486 | 11.4k | LOG(INFO) << "Could not load existing FS layout: " << s.ToString(); |
487 | 11.4k | LOG(INFO) << "Creating new FS layout"; |
488 | 11.4k | RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(true), |
489 | 11.4k | "Could not create new FS layout"); |
490 | 11.4k | s = fs_manager_->Open(); |
491 | 11.4k | } |
492 | 17.5k | RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout"); |
493 | | |
494 | 17.5k | if (PREDICT_FALSE(FLAGS_TEST_simulate_port_conflict_error)) { |
495 | 1 | return STATUS(NetworkError, "Simulated port conflict error"); |
496 | 1 | } |
497 | | |
498 | 17.5k | RETURN_NOT_OK(RpcServerBase::Init()); |
499 | | |
500 | 17.4k | return Status::OK(); |
501 | 17.5k | } |
502 | | |
503 | 2.54k | void RpcAndWebServerBase::GetStatusPB(ServerStatusPB* status) const { |
504 | 2.54k | RpcServerBase::GetStatusPB(status); |
505 | | |
506 | | // HTTP ports |
507 | 2.54k | { |
508 | 2.54k | std::vector<Endpoint> addrs; |
509 | 2.54k | CHECK_OK(web_server_->GetBoundAddresses(&addrs)); |
510 | 2.87k | for (const auto& addr : addrs) { |
511 | 2.87k | HostPortPB* pb = status->add_bound_http_addresses(); |
512 | 2.87k | pb->set_host(addr.address().to_string()); |
513 | 2.87k | pb->set_port(addr.port()); |
514 | 2.87k | } |
515 | 2.54k | } |
516 | 2.54k | } |
517 | | |
518 | 48.3k | Status RpcAndWebServerBase::GetRegistration(ServerRegistrationPB* reg, RpcOnly rpc_only) const { |
519 | 48.3k | std::vector<HostPort> addrs = CHECK_NOTNULL(rpc_server())->GetRpcHostPort(); |
520 | 48.3k | DCHECK_GE(addrs.size(), 1); |
521 | | |
522 | | // Fall back to hostname resolution if the rpc hostname is a wildcard. |
523 | 48.3k | if (addrs.size() != 1 || IsWildcardAddress(addrs[0].host()) || addrs[0].port() == 0) { |
524 | 107 | vector<Endpoint> endpoints = |
525 | 107 | CHECK_NOTNULL(rpc_server())->GetBoundAddresses(); |
526 | 107 | RETURN_NOT_OK_PREPEND( |
527 | 107 | AddHostPortPBs(endpoints, reg->mutable_private_rpc_addresses()), |
528 | 107 | "Failed to add RPC endpoints to registration"); |
529 | 108 | for (const auto &addr : reg->private_rpc_addresses()) { |
530 | 108 | LOG(INFO) << " Using private rpc addresses: ( " << addr.ShortDebugString() |
531 | 108 | << " )"; |
532 | 108 | } |
533 | 48.1k | } else { |
534 | 48.1k | HostPortsToPBs(addrs, reg->mutable_private_rpc_addresses()); |
535 | 48.1k | LOG(INFO) << "Using private rpc address " |
536 | 48.1k | << reg->private_rpc_addresses(0).host(); |
537 | 48.1k | } |
538 | | |
539 | 48.3k | HostPortsToPBs(options_.broadcast_addresses, reg->mutable_broadcast_addresses()); |
540 | | |
541 | 48.3k | if (!rpc_only) { |
542 | 11.3k | HostPort web_input_hp; |
543 | 11.3k | RETURN_NOT_OK(CHECK_NOTNULL(web_server())->GetInputHostPort(&web_input_hp)); |
544 | 11.3k | if (IsWildcardAddress(web_input_hp.host()) || web_input_hp.port() == 0) { |
545 | 2 | std::vector<Endpoint> web_addrs; |
546 | 2 | RETURN_NOT_OK_PREPEND( |
547 | 2 | CHECK_NOTNULL(web_server())->GetBoundAddresses(&web_addrs), |
548 | 0 | "Unable to get bound HTTP addresses"); |
549 | 0 | RETURN_NOT_OK_PREPEND(AddHostPortPBs( |
550 | 0 | web_addrs, reg->mutable_http_addresses()), |
551 | 0 | "Failed to add HTTP addresses to registration"); |
552 | 0 | for (const auto &addr : reg->http_addresses()) { |
553 | 0 | LOG(INFO) << "Using http addresses: ( " << addr.ShortDebugString() << " )"; |
554 | 0 | } |
555 | 11.3k | } else { |
556 | 11.3k | HostPortsToPBs({ web_input_hp }, reg->mutable_http_addresses()); |
557 | 11.3k | LOG(INFO) << "Using http address " << reg->http_addresses(0).host(); |
558 | 11.3k | } |
559 | 11.3k | } |
560 | 48.3k | reg->mutable_cloud_info()->set_placement_cloud(options_.placement_cloud()); |
561 | 48.3k | reg->mutable_cloud_info()->set_placement_region(options_.placement_region()); |
562 | 48.3k | reg->mutable_cloud_info()->set_placement_zone(options_.placement_zone()); |
563 | 48.3k | reg->set_placement_uuid(options_.placement_uuid); |
564 | 48.3k | return Status::OK(); |
565 | 48.3k | } |
566 | | |
567 | 17.1k | string RpcAndWebServerBase::GetEasterEggMessage() const { |
568 | 17.1k | return "Congratulations on installing YugabyteDB. " |
569 | 17.1k | "We'd like to welcome you to the community with a free t-shirt and pack of stickers! " |
570 | 17.1k | "Please claim your reward here: <a href='https://www.yugabyte.com/community-rewards/'>" |
571 | 17.1k | "https://www.yugabyte.com/community-rewards/</a>"; |
572 | | |
573 | 17.1k | } |
574 | | |
575 | 17.1k | string RpcAndWebServerBase::FooterHtml() const { |
576 | 17.1k | return Substitute("<pre class='message'><i class=\"fa-lg fa fa-gift\" aria-hidden=\"true\"></i>" |
577 | 17.1k | " $0</pre><pre>$1\nserver uuid $2 local time $3</pre>", |
578 | 17.1k | GetEasterEggMessage(), |
579 | 17.1k | VersionInfo::GetShortVersionString(), |
580 | 17.1k | instance_pb_->permanent_uuid(), |
581 | 17.1k | Timestamp(GetCurrentTimeMicros()).ToHumanReadableTime()); |
582 | 17.1k | } |
583 | | |
584 | | void RpcAndWebServerBase::DisplayIconTile(std::stringstream* output, const string icon, |
585 | 0 | const string caption, const string url) { |
586 | 0 | *output << " <div class='col-sm-4 col-md-4 dbg-tile'>\n" |
587 | 0 | << " <a href='" << url << "' class='thumbnail'>\n" |
588 | 0 | << " <div class='dbg-icon'>\n" |
589 | 0 | << " <i class='fa " << icon << "' aria-hidden='true'></i>\n" |
590 | 0 | << " </div>\n" |
591 | 0 | << " <div class='caption dbg-caption'>\n" |
592 | 0 | << " <h3>" << caption << "</h3>\n" |
593 | 0 | << " </div> <!-- caption -->\n" |
594 | 0 | << " </a> <!-- thumbnail -->\n" |
595 | 0 | << " </div> <!-- col-sm-4 col-md-4 -->\n"; |
596 | 0 | } |
597 | | |
598 | 0 | void RpcAndWebServerBase::DisplayGeneralInfoIcons(std::stringstream* output) { |
599 | | // Logs. |
600 | 0 | DisplayIconTile(output, "fa-files-o", "Logs", "/logs"); |
601 | | // GFlags. |
602 | 0 | DisplayIconTile(output, "fa-flag-o", "GFlags", "/varz"); |
603 | | // Memory trackers. |
604 | 0 | DisplayIconTile(output, "fa-bar-chart", "Memory Breakdown", "/mem-trackers"); |
605 | | // Total memory. |
606 | 0 | DisplayIconTile(output, "fa-cog", "Total Memory", "/memz"); |
607 | | // Metrics. |
608 | 0 | DisplayIconTile(output, "fa-line-chart", "Metrics", "/prometheus-metrics"); |
609 | | // Threads. |
610 | 0 | DisplayIconTile(output, "fa-microchip", "Threads", "/threadz"); |
611 | | // Drives. |
612 | 0 | DisplayIconTile(output, "fa-hdd-o", "Drives", "/drives"); |
613 | 0 | } |
614 | | |
615 | 0 | Status RpcAndWebServerBase::DisplayRpcIcons(std::stringstream* output) { |
616 | | // RPCs in Progress. |
617 | 0 | DisplayIconTile(output, "fa-tasks", "Server RPCs", "/rpcz"); |
618 | 0 | return Status::OK(); |
619 | 0 | } |
620 | | |
621 | | Status RpcAndWebServerBase::HandleDebugPage(const Webserver::WebRequest& req, |
622 | 0 | Webserver::WebResponse* resp) { |
623 | 0 | std::stringstream *output = &resp->output; |
624 | 0 | *output << "<h1>Debug Utilities</h1>\n"; |
625 | |
|
626 | 0 | *output << "<div class='row debug-tiles'>\n"; |
627 | 0 | *output << "<h2> General Info </h2>"; |
628 | 0 | DisplayGeneralInfoIcons(output); |
629 | 0 | *output << "</div> <!-- row -->\n"; |
630 | 0 | *output << "<h2> RPCs In Progress </h2>"; |
631 | 0 | *output << "<div class='row debug-tiles'>\n"; |
632 | 0 | RETURN_NOT_OK(DisplayRpcIcons(output)); |
633 | 0 | *output << "</div> <!-- row -->\n"; |
634 | 0 | return Status::OK(); |
635 | 0 | } |
636 | | |
637 | 17.1k | Status RpcAndWebServerBase::Start() { |
638 | 17.1k | GenerateInstanceID(); |
639 | | |
640 | 17.1k | AddDefaultPathHandlers(web_server_.get()); |
641 | 17.1k | AddRpczPathHandlers(messenger_.get(), web_server_.get()); |
642 | 17.1k | RegisterMetricsJsonHandler(web_server_.get(), metric_registry_.get()); |
643 | 17.1k | RegisterPathUsageHandler(web_server_.get(), fs_manager_.get()); |
644 | 17.1k | TracingPathHandlers::RegisterHandlers(web_server_.get()); |
645 | 17.1k | web_server_->RegisterPathHandler("/utilz", "Utilities", |
646 | 17.1k | std::bind(&RpcAndWebServerBase::HandleDebugPage, this, _1, _2), |
647 | 17.1k | true, true, "fa fa-wrench"); |
648 | 17.1k | web_server_->set_footer_html(FooterHtml()); |
649 | 17.1k | RETURN_NOT_OK(web_server_->Start()); |
650 | | |
651 | 17.1k | RETURN_NOT_OK(RpcServerBase::Start()); |
652 | | |
653 | 17.1k | return Status::OK(); |
654 | 17.1k | } |
655 | | |
656 | 458 | void RpcAndWebServerBase::Shutdown() { |
657 | 458 | RpcServerBase::Shutdown(); |
658 | 458 | web_server_->Stop(); |
659 | 458 | } |
660 | | |
661 | 41.1k | std::string TEST_RpcAddress(size_t index, Private priv) { |
662 | 41.1k | return Format("127.0.0.$0$1", |
663 | 21.1k | index * 2 + (priv ? 0 : 1), priv ? "" : FLAGS_TEST_public_hostname_suffix); |
664 | 41.1k | } |
665 | | |
666 | 2.02k | string TEST_RpcBindEndpoint(size_t index, uint16_t port) { |
667 | 2.02k | return HostPortToString(TEST_RpcAddress(index, Private::kTrue), port); |
668 | 2.02k | } |
669 | | |
670 | | constexpr int kMaxServers = 20; |
671 | | constexpr int kMinServerIdx = 1; |
672 | | |
673 | | // We group servers by two. Servers in the same group communciate via private connection. Servers in |
674 | | // different groups communicate via public connection. |
675 | 19.0k | size_t ServerGroupNum(size_t server_idx) { |
676 | 19.0k | return (server_idx - 1) / FLAGS_TEST_nodes_per_cloud; |
677 | 19.0k | } |
678 | | |
679 | 911 | void TEST_SetupConnectivity(rpc::Messenger* messenger, size_t index) { |
680 | 911 | if (!FLAGS_TEST_check_broadcast_address) { |
681 | 2 | return; |
682 | 2 | } |
683 | | |
684 | 909 | CHECK_GE(index, kMinServerIdx); |
685 | 909 | CHECK_LE(index, kMaxServers); |
686 | | |
687 | 909 | messenger->TEST_SetOutboundIpBase( |
688 | 909 | CHECK_RESULT(HostToAddress(TEST_RpcAddress(index, Private::kTrue)))); |
689 | 909 | auto server_group = ServerGroupNum(index); |
690 | 19.0k | for (int other_server_idx = kMinServerIdx; other_server_idx <= kMaxServers; ++other_server_idx) { |
691 | | // We group servers by 2. When servers belongs to the same group, they should use |
692 | | // private ip for communication, otherwise public ip should be used. |
693 | 18.1k | bool same_group = ServerGroupNum(other_server_idx) == server_group; |
694 | 18.1k | auto broken_address = CHECK_RESULT( |
695 | 18.1k | HostToAddress(TEST_RpcAddress(other_server_idx, Private(!same_group)))); |
696 | 18.1k | LOG(INFO) << "Break " << index << " => " << broken_address; |
697 | 18.1k | messenger->BreakConnectivityWith(broken_address); |
698 | 18.1k | auto working_address = CHECK_RESULT( |
699 | 18.1k | HostToAddress(TEST_RpcAddress(other_server_idx, Private(same_group)))); |
700 | 18.1k | messenger->RestoreConnectivityWith(working_address); |
701 | 18.1k | } |
702 | 909 | } |
703 | | |
704 | 0 | void TEST_Isolate(rpc::Messenger* messenger) { |
705 | 0 | for (int other_server_idx = kMinServerIdx; other_server_idx <= kMaxServers; ++other_server_idx) { |
706 | 0 | messenger->BreakConnectivityWith( |
707 | 0 | CHECK_RESULT(HostToAddress(TEST_RpcAddress(other_server_idx, Private::kTrue)))); |
708 | 0 | messenger->BreakConnectivityWith( |
709 | 0 | CHECK_RESULT(HostToAddress(TEST_RpcAddress(other_server_idx, Private::kFalse)))); |
710 | 0 | } |
711 | 0 | } |
712 | | |
713 | | } // namespace server |
714 | | } // namespace yb |