/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server_main.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <chrono> |
34 | | #include <iostream> |
35 | | |
36 | | #include <boost/optional/optional.hpp> |
37 | | #include <glog/logging.h> |
38 | | |
39 | | #ifdef TCMALLOC_ENABLED |
40 | | #include <gperftools/malloc_extension.h> |
41 | | #endif |
42 | | |
43 | | #include "yb/consensus/log_util.h" |
44 | | #include "yb/consensus/consensus_queue.h" |
45 | | |
46 | | #include "yb/encryption/header_manager_impl.h" |
47 | | #include "yb/encryption/encrypted_file_factory.h" |
48 | | #include "yb/encryption/universe_key_manager.h" |
49 | | |
50 | | #include "yb/yql/cql/cqlserver/cql_server.h" |
51 | | #include "yb/yql/pgwrapper/pg_wrapper.h" |
52 | | #include "yb/yql/redis/redisserver/redis_server.h" |
53 | | |
54 | | #include "yb/gutil/strings/substitute.h" |
55 | | #include "yb/master/call_home.h" |
56 | | #include "yb/rpc/io_thread_pool.h" |
57 | | #include "yb/rpc/scheduler.h" |
58 | | #include "yb/server/skewed_clock.h" |
59 | | #include "yb/server/secure.h" |
60 | | #include "yb/tserver/factory.h" |
61 | | #include "yb/tserver/tablet_server.h" |
62 | | |
63 | | #include "yb/util/flags.h" |
64 | | #include "yb/util/init.h" |
65 | | #include "yb/util/logging.h" |
66 | | #include "yb/util/main_util.h" |
67 | | #include "yb/util/result.h" |
68 | | #include "yb/util/ulimit_util.h" |
69 | | #include "yb/util/size_literals.h" |
70 | | #include "yb/util/net/net_util.h" |
71 | | #include "yb/util/status_log.h" |
72 | | #include "yb/util/debug/trace_event.h" |
73 | | |
74 | | #include "yb/rocksutil/rocksdb_encrypted_file_factory.h" |
75 | | |
76 | | #include "yb/tserver/server_main_util.h" |
77 | | |
78 | | using namespace std::placeholders; |
79 | | |
80 | | using yb::redisserver::RedisServer; |
81 | | using yb::redisserver::RedisServerOptions; |
82 | | |
83 | | using yb::cqlserver::CQLServer; |
84 | | using yb::cqlserver::CQLServerOptions; |
85 | | |
86 | | using yb::pgwrapper::PgProcessConf; |
87 | | using yb::pgwrapper::PgWrapper; |
88 | | using yb::pgwrapper::PgSupervisor; |
89 | | |
90 | | using namespace yb::size_literals; // NOLINT |
91 | | |
92 | | DEFINE_bool(start_redis_proxy, true, "Starts a redis proxy along with the tablet server"); |
93 | | |
94 | | DEFINE_bool(start_cql_proxy, true, "Starts a CQL proxy along with the tablet server"); |
95 | | DEFINE_string(cql_proxy_broadcast_rpc_address, "", |
96 | | "RPC address to broadcast to other nodes. This is the broadcast_address used in the" |
97 | | " system.local table"); |
98 | | |
99 | | DECLARE_string(rpc_bind_addresses); |
100 | | DECLARE_bool(callhome_enabled); |
101 | | DECLARE_int32(webserver_port); |
102 | | DECLARE_int32(logbuflevel); |
103 | | DECLARE_int32(stderrthreshold); |
104 | | |
105 | | DECLARE_string(redis_proxy_bind_address); |
106 | | DECLARE_int32(redis_proxy_webserver_port); |
107 | | |
108 | | DECLARE_string(cql_proxy_bind_address); |
109 | | DECLARE_int32(cql_proxy_webserver_port); |
110 | | |
111 | | DECLARE_string(pgsql_proxy_bind_address); |
112 | | DECLARE_bool(start_pgsql_proxy); |
113 | | DECLARE_bool(enable_ysql); |
114 | | |
115 | | DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec); |
116 | | |
117 | | DECLARE_bool(use_client_to_server_encryption); |
118 | | DECLARE_string(certs_dir); |
119 | | DECLARE_string(certs_for_client_dir); |
120 | | DECLARE_string(cert_node_filename); |
121 | | DECLARE_string(ysql_hba_conf); |
122 | | DECLARE_string(ysql_pg_conf); |
123 | | DECLARE_string(metric_node_name); |
124 | | |
125 | | // Deprecated because it's misspelled. But if set, this flag takes precedence over |
126 | | // remote_bootstrap_rate_limit_bytes_per_sec for compatibility. |
127 | | DECLARE_int64(remote_boostrap_rate_limit_bytes_per_sec); |
128 | | |
129 | | namespace yb { |
130 | | namespace tserver { |
131 | | namespace { |
132 | | |
133 | 23.2k | void SetProxyAddress(std::string* flag, const std::string& name, uint16_t port) { |
134 | 23.2k | if (flag->empty()) { |
135 | 4.75k | std::vector<HostPort> bind_addresses; |
136 | 4.75k | Status status = HostPort::ParseStrings(FLAGS_rpc_bind_addresses, 0, &bind_addresses); |
137 | 4.75k | LOG_IF(DFATAL, !status.ok()) << "Bad public IPs " << FLAGS_rpc_bind_addresses << ": " << status0 ; |
138 | 4.75k | if (!bind_addresses.empty()) { |
139 | 4.75k | for (auto& addr : bind_addresses) { |
140 | 4.75k | addr.set_port(port); |
141 | 4.75k | } |
142 | 4.75k | *flag = HostPort::ToCommaSeparatedString(bind_addresses); |
143 | 4.75k | LOG(INFO) << "Reset " << name << " bind address to " << *flag; |
144 | 4.75k | } |
145 | 4.75k | } |
146 | 23.2k | } |
147 | | |
148 | | // Helper function to set the proxy rpc addresses based on rpc_bind_addresses. |
149 | 7.75k | void SetProxyAddresses() { |
150 | 7.75k | LOG(INFO) << "Using parsed rpc = " << FLAGS_rpc_bind_addresses; |
151 | 7.75k | SetProxyAddress(&FLAGS_redis_proxy_bind_address, "YEDIS", RedisServer::kDefaultPort); |
152 | 7.75k | SetProxyAddress(&FLAGS_cql_proxy_bind_address, "YCQL", CQLServer::kDefaultPort); |
153 | 7.75k | SetProxyAddress(&FLAGS_pgsql_proxy_bind_address, "YSQL", PgProcessConf::kDefaultPort); |
154 | 7.75k | } |
155 | | |
156 | 7.75k | int TabletServerMain(int argc, char** argv) { |
157 | 7.75k | #ifndef NDEBUG |
158 | 7.75k | HybridTime::TEST_SetPrettyToString(true); |
159 | 7.75k | #endif |
160 | | |
161 | | // Reset some default values before parsing gflags. |
162 | 7.75k | FLAGS_rpc_bind_addresses = strings::Substitute("0.0.0.0:$0", |
163 | 7.75k | TabletServer::kDefaultPort); |
164 | 7.75k | FLAGS_webserver_port = TabletServer::kDefaultWebPort; |
165 | 7.75k | FLAGS_redis_proxy_webserver_port = RedisServer::kDefaultWebPort; |
166 | 7.75k | FLAGS_cql_proxy_webserver_port = CQLServer::kDefaultWebPort; |
167 | | |
168 | 7.75k | string host_name; |
169 | 7.75k | if (GetHostname(&host_name).ok()) { |
170 | 7.75k | FLAGS_metric_node_name = strings::Substitute("$0:$1", host_name, TabletServer::kDefaultWebPort); |
171 | 7.75k | } else { |
172 | 0 | LOG(INFO) << "Failed to get tablet's host name, keeping default metric_node_name"; |
173 | 0 | } |
174 | | |
175 | 7.75k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(MasterTServerParseFlagsAndInit( |
176 | 7.75k | TabletServerOptions::kServerType, &argc, &argv)); |
177 | | |
178 | 7.75k | SetProxyAddresses(); |
179 | | |
180 | | // Object that manages the universe key registry used for encrypting and decrypting data keys. |
181 | | // Copies are given to each Env. |
182 | 7.75k | auto universe_key_manager = std::make_unique<encryption::UniverseKeyManager>(); |
183 | | // Encrypted env for all non-rocksdb file i/o operations. |
184 | 7.75k | std::unique_ptr<yb::Env> env = |
185 | 7.75k | NewEncryptedEnv(DefaultHeaderManager(universe_key_manager.get())); |
186 | | // Encrypted env for all rocksdb file i/o operations. |
187 | 7.75k | std::unique_ptr<rocksdb::Env> rocksdb_env = |
188 | 7.75k | NewRocksDBEncryptedEnv(DefaultHeaderManager(universe_key_manager.get())); |
189 | | |
190 | 7.75k | auto tablet_server_options = TabletServerOptions::CreateTabletServerOptions(); |
191 | 7.75k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(tablet_server_options); |
192 | 7.75k | tablet_server_options->env = env.get(); |
193 | 7.75k | tablet_server_options->rocksdb_env = rocksdb_env.get(); |
194 | 7.75k | tablet_server_options->universe_key_manager = universe_key_manager.get(); |
195 | 7.75k | enterprise::Factory factory; |
196 | | |
197 | 7.75k | auto server = factory.CreateTabletServer(*tablet_server_options); |
198 | | |
199 | | // ---------------------------------------------------------------------------------------------- |
200 | | // Starting to instantiate servers |
201 | | // ---------------------------------------------------------------------------------------------- |
202 | | |
203 | 7.75k | LOG(INFO) << "Initializing tablet server..."; |
204 | 7.75k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(server->Init()); |
205 | 7.75k | LOG(INFO) << "Starting tablet server..."; |
206 | 7.75k | UlimitUtil::InitUlimits(); |
207 | 7.75k | LOG(INFO) << "ulimit cur(max)..." << UlimitUtil::GetUlimitInfo(); |
208 | 7.75k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(server->Start()); |
209 | 7.75k | LOG(INFO) << "Tablet server successfully started."; |
210 | | |
211 | 7.75k | std::unique_ptr<CallHome> call_home; |
212 | 7.75k | call_home = std::make_unique<CallHome>(server.get(), ServerType::TSERVER); |
213 | 7.75k | call_home->ScheduleCallHome(); |
214 | | |
215 | 7.75k | std::unique_ptr<PgSupervisor> pg_supervisor; |
216 | 7.75k | if (FLAGS_start_pgsql_proxy || FLAGS_enable_ysql7.75k ) { |
217 | 2.00k | auto pg_process_conf_result = PgProcessConf::CreateValidateAndRunInitDb( |
218 | 2.00k | FLAGS_pgsql_proxy_bind_address, |
219 | 2.00k | tablet_server_options->fs_opts.data_paths.front() + "/pg_data", |
220 | 2.00k | server->GetSharedMemoryFd()); |
221 | 2.00k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(pg_process_conf_result); |
222 | 2.00k | auto& pg_process_conf = *pg_process_conf_result; |
223 | 2.00k | pg_process_conf.master_addresses = tablet_server_options->master_addresses_flag; |
224 | 2.00k | pg_process_conf.certs_dir = FLAGS_certs_dir.empty() |
225 | 2.00k | ? server::DefaultCertsDir(*server->fs_manager())1.97k |
226 | 2.00k | : FLAGS_certs_dir36 ; |
227 | 2.00k | pg_process_conf.certs_for_client_dir = FLAGS_certs_for_client_dir.empty() |
228 | 2.00k | ? pg_process_conf.certs_dir1.99k |
229 | 2.00k | : FLAGS_certs_for_client_dir17 ; |
230 | 2.00k | pg_process_conf.enable_tls = FLAGS_use_client_to_server_encryption; |
231 | | |
232 | | // Follow the same logic as elsewhere, check FLAGS_cert_node_filename then |
233 | | // server_broadcast_addresses then rpc_bind_addresses. |
234 | 2.00k | if (!FLAGS_cert_node_filename.empty()) { |
235 | 0 | pg_process_conf.cert_base_name = FLAGS_cert_node_filename; |
236 | 2.00k | } else { |
237 | 2.00k | const auto server_broadcast_addresses = |
238 | 2.00k | HostPort::ParseStrings(server->options().server_broadcast_addresses, 0); |
239 | 2.00k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(server_broadcast_addresses); |
240 | 2.00k | const auto rpc_bind_addresses = |
241 | 2.00k | HostPort::ParseStrings(server->options().rpc_opts.rpc_bind_addresses, 0); |
242 | 2.00k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(rpc_bind_addresses); |
243 | 2.00k | pg_process_conf.cert_base_name = !server_broadcast_addresses->empty() |
244 | 2.00k | ? server_broadcast_addresses->front().host()0 |
245 | 2.00k | : rpc_bind_addresses->front().host(); |
246 | 2.00k | } |
247 | 2.00k | LOG(INFO) << "Starting PostgreSQL server listening on " |
248 | 2.00k | << pg_process_conf.listen_addresses << ", port " << pg_process_conf.pg_port; |
249 | | |
250 | 2.00k | pg_supervisor = std::make_unique<PgSupervisor>(pg_process_conf); |
251 | 2.00k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(pg_supervisor->Start()); |
252 | 2.00k | } |
253 | | |
254 | 7.75k | std::unique_ptr<RedisServer> redis_server; |
255 | 7.75k | if (FLAGS_start_redis_proxy) { |
256 | 2.93k | RedisServerOptions redis_server_options; |
257 | 2.93k | redis_server_options.rpc_opts.rpc_bind_addresses = FLAGS_redis_proxy_bind_address; |
258 | 2.93k | redis_server_options.webserver_opts.port = FLAGS_redis_proxy_webserver_port; |
259 | 2.93k | redis_server_options.master_addresses_flag = tablet_server_options->master_addresses_flag; |
260 | 2.93k | redis_server_options.SetMasterAddresses(tablet_server_options->GetMasterAddresses()); |
261 | 2.93k | redis_server_options.dump_info_path = |
262 | 2.93k | (tablet_server_options->dump_info_path.empty() |
263 | 2.93k | ? ""1.59k |
264 | 2.93k | : tablet_server_options->dump_info_path + "-redis"1.33k ); |
265 | 2.93k | redis_server.reset(new RedisServer(redis_server_options, server.get())); |
266 | 2.93k | LOG(INFO) << "Starting redis server..."; |
267 | 2.93k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(redis_server->Start()); |
268 | 2.93k | LOG(INFO) << "Redis server successfully started."; |
269 | 2.93k | } |
270 | | |
271 | | // TODO(neil): After CQL server is starting, it blocks this thread from moving on. |
272 | | // This should be fixed such that all processes or service by tablet server are treated equally |
273 | | // by using different threads for each process. |
274 | 7.75k | std::unique_ptr<CQLServer> cql_server; |
275 | 7.75k | if (FLAGS_start_cql_proxy) { |
276 | 6.11k | CQLServerOptions cql_server_options; |
277 | 6.11k | cql_server_options.rpc_opts.rpc_bind_addresses = FLAGS_cql_proxy_bind_address; |
278 | 6.11k | cql_server_options.broadcast_rpc_address = FLAGS_cql_proxy_broadcast_rpc_address; |
279 | 6.11k | cql_server_options.webserver_opts.port = FLAGS_cql_proxy_webserver_port; |
280 | 6.11k | cql_server_options.master_addresses_flag = tablet_server_options->master_addresses_flag; |
281 | 6.11k | cql_server_options.SetMasterAddresses(tablet_server_options->GetMasterAddresses()); |
282 | 6.11k | cql_server_options.dump_info_path = |
283 | 6.11k | (tablet_server_options->dump_info_path.empty() |
284 | 6.11k | ? ""4.77k |
285 | 6.11k | : tablet_server_options->dump_info_path + "-cql"1.33k ); |
286 | 6.11k | boost::asio::io_service io; |
287 | 6.11k | cql_server = factory.CreateCQLServer(cql_server_options, &io, server.get()); |
288 | 6.11k | LOG(INFO) << "Starting CQL server..."; |
289 | 6.11k | LOG_AND_RETURN_FROM_MAIN_NOT_OK(cql_server->Start()); |
290 | 6.11k | LOG(INFO) << "CQL server successfully started."; |
291 | | |
292 | | // Should run forever unless there are some errors. |
293 | 6.11k | boost::system::error_code ec; |
294 | 6.11k | io.run(ec); |
295 | 6.11k | if (ec) { |
296 | 0 | LOG(WARNING) << "IO service run failure: " << ec; |
297 | 0 | } |
298 | | |
299 | 6.11k | LOG (WARNING) << "CQL Server shutting down"; |
300 | 6.11k | cql_server->Shutdown(); |
301 | 6.11k | } |
302 | | |
303 | 9.98k | while (7.75k true) { |
304 | 2.23k | SleepFor(MonoDelta::FromSeconds(60)); |
305 | 2.23k | } |
306 | | |
307 | 7.75k | return 0; |
308 | 7.75k | } |
309 | | |
310 | | } // namespace |
311 | | } // namespace tserver |
312 | | } // namespace yb |
313 | | |
314 | 18.6k | int main(int argc, char** argv) { |
315 | 18.6k | return yb::tserver::TabletServerMain(argc, argv); |
316 | 18.6k | } |