YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server_main.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <chrono>
34
#include <iostream>
35
36
#include <boost/optional/optional.hpp>
37
#include <glog/logging.h>
38
39
#ifdef TCMALLOC_ENABLED
40
#include <gperftools/malloc_extension.h>
41
#endif
42
43
#include "yb/consensus/log_util.h"
44
#include "yb/consensus/consensus_queue.h"
45
46
#include "yb/encryption/header_manager_impl.h"
47
#include "yb/encryption/encrypted_file_factory.h"
48
#include "yb/encryption/universe_key_manager.h"
49
50
#include "yb/yql/cql/cqlserver/cql_server.h"
51
#include "yb/yql/pgwrapper/pg_wrapper.h"
52
#include "yb/yql/redis/redisserver/redis_server.h"
53
54
#include "yb/gutil/strings/substitute.h"
55
#include "yb/master/call_home.h"
56
#include "yb/rpc/io_thread_pool.h"
57
#include "yb/rpc/scheduler.h"
58
#include "yb/server/skewed_clock.h"
59
#include "yb/server/secure.h"
60
#include "yb/tserver/factory.h"
61
#include "yb/tserver/tablet_server.h"
62
63
#include "yb/util/flags.h"
64
#include "yb/util/init.h"
65
#include "yb/util/logging.h"
66
#include "yb/util/main_util.h"
67
#include "yb/util/result.h"
68
#include "yb/util/ulimit_util.h"
69
#include "yb/util/size_literals.h"
70
#include "yb/util/net/net_util.h"
71
#include "yb/util/status_log.h"
72
#include "yb/util/debug/trace_event.h"
73
74
#include "yb/rocksutil/rocksdb_encrypted_file_factory.h"
75
76
#include "yb/tserver/server_main_util.h"
77
78
using namespace std::placeholders;
79
80
using yb::redisserver::RedisServer;
81
using yb::redisserver::RedisServerOptions;
82
83
using yb::cqlserver::CQLServer;
84
using yb::cqlserver::CQLServerOptions;
85
86
using yb::pgwrapper::PgProcessConf;
87
using yb::pgwrapper::PgWrapper;
88
using yb::pgwrapper::PgSupervisor;
89
90
using namespace yb::size_literals;  // NOLINT
91
92
DEFINE_bool(start_redis_proxy, true, "Starts a redis proxy along with the tablet server");
93
94
DEFINE_bool(start_cql_proxy, true, "Starts a CQL proxy along with the tablet server");
95
DEFINE_string(cql_proxy_broadcast_rpc_address, "",
96
              "RPC address to broadcast to other nodes. This is the broadcast_address used in the"
97
                  " system.local table");
98
99
DECLARE_string(rpc_bind_addresses);
100
DECLARE_bool(callhome_enabled);
101
DECLARE_int32(webserver_port);
102
DECLARE_int32(logbuflevel);
103
DECLARE_int32(stderrthreshold);
104
105
DECLARE_string(redis_proxy_bind_address);
106
DECLARE_int32(redis_proxy_webserver_port);
107
108
DECLARE_string(cql_proxy_bind_address);
109
DECLARE_int32(cql_proxy_webserver_port);
110
111
DECLARE_string(pgsql_proxy_bind_address);
112
DECLARE_bool(start_pgsql_proxy);
113
DECLARE_bool(enable_ysql);
114
115
DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec);
116
117
DECLARE_bool(use_client_to_server_encryption);
118
DECLARE_string(certs_dir);
119
DECLARE_string(certs_for_client_dir);
120
DECLARE_string(cert_node_filename);
121
DECLARE_string(ysql_hba_conf);
122
DECLARE_string(ysql_pg_conf);
123
DECLARE_string(metric_node_name);
124
125
// Deprecated because it's misspelled.  But if set, this flag takes precedence over
126
// remote_bootstrap_rate_limit_bytes_per_sec for compatibility.
127
DECLARE_int64(remote_boostrap_rate_limit_bytes_per_sec);
128
129
namespace yb {
130
namespace tserver {
131
namespace {
132
133
23.2k
void SetProxyAddress(std::string* flag, const std::string& name, uint16_t port) {
134
23.2k
  if (flag->empty()) {
135
4.75k
    std::vector<HostPort> bind_addresses;
136
4.75k
    Status status = HostPort::ParseStrings(FLAGS_rpc_bind_addresses, 0, &bind_addresses);
137
4.75k
    LOG_IF
(DFATAL, !status.ok()) << "Bad public IPs " << FLAGS_rpc_bind_addresses << ": " << status0
;
138
4.75k
    if (!bind_addresses.empty()) {
139
4.75k
      for (auto& addr : bind_addresses) {
140
4.75k
        addr.set_port(port);
141
4.75k
      }
142
4.75k
      *flag = HostPort::ToCommaSeparatedString(bind_addresses);
143
4.75k
      LOG(INFO) << "Reset " << name << " bind address to " << *flag;
144
4.75k
    }
145
4.75k
  }
146
23.2k
}
147
148
// Helper function to set the proxy rpc addresses based on rpc_bind_addresses.
149
7.75k
void SetProxyAddresses() {
150
7.75k
  LOG(INFO) << "Using parsed rpc = " << FLAGS_rpc_bind_addresses;
151
7.75k
  SetProxyAddress(&FLAGS_redis_proxy_bind_address, "YEDIS", RedisServer::kDefaultPort);
152
7.75k
  SetProxyAddress(&FLAGS_cql_proxy_bind_address, "YCQL", CQLServer::kDefaultPort);
153
7.75k
  SetProxyAddress(&FLAGS_pgsql_proxy_bind_address, "YSQL", PgProcessConf::kDefaultPort);
154
7.75k
}
155
156
7.75k
int TabletServerMain(int argc, char** argv) {
157
7.75k
#ifndef NDEBUG
158
7.75k
  HybridTime::TEST_SetPrettyToString(true);
159
7.75k
#endif
160
161
  // Reset some default values before parsing gflags.
162
7.75k
  FLAGS_rpc_bind_addresses = strings::Substitute("0.0.0.0:$0",
163
7.75k
                                                 TabletServer::kDefaultPort);
164
7.75k
  FLAGS_webserver_port = TabletServer::kDefaultWebPort;
165
7.75k
  FLAGS_redis_proxy_webserver_port = RedisServer::kDefaultWebPort;
166
7.75k
  FLAGS_cql_proxy_webserver_port = CQLServer::kDefaultWebPort;
167
168
7.75k
  string host_name;
169
7.75k
  if (GetHostname(&host_name).ok()) {
170
7.75k
    FLAGS_metric_node_name = strings::Substitute("$0:$1", host_name, TabletServer::kDefaultWebPort);
171
7.75k
  } else {
172
0
    LOG(INFO) << "Failed to get tablet's host name, keeping default metric_node_name";
173
0
  }
174
175
7.75k
  LOG_AND_RETURN_FROM_MAIN_NOT_OK(MasterTServerParseFlagsAndInit(
176
7.75k
      TabletServerOptions::kServerType, &argc, &argv));
177
178
7.75k
  SetProxyAddresses();
179
180
  // Object that manages the universe key registry used for encrypting and decrypting data keys.
181
  // Copies are given to each Env.
182
7.75k
  auto universe_key_manager = std::make_unique<encryption::UniverseKeyManager>();
183
  // Encrypted env for all non-rocksdb file i/o operations.
184
7.75k
  std::unique_ptr<yb::Env> env =
185
7.75k
      NewEncryptedEnv(DefaultHeaderManager(universe_key_manager.get()));
186
  // Encrypted env for all rocksdb file i/o operations.
187
7.75k
  std::unique_ptr<rocksdb::Env> rocksdb_env =
188
7.75k
      NewRocksDBEncryptedEnv(DefaultHeaderManager(universe_key_manager.get()));
189
190
7.75k
  auto tablet_server_options = TabletServerOptions::CreateTabletServerOptions();
191
7.75k
  LOG_AND_RETURN_FROM_MAIN_NOT_OK(tablet_server_options);
192
7.75k
  tablet_server_options->env = env.get();
193
7.75k
  tablet_server_options->rocksdb_env = rocksdb_env.get();
194
7.75k
  tablet_server_options->universe_key_manager = universe_key_manager.get();
195
7.75k
  enterprise::Factory factory;
196
197
7.75k
  auto server = factory.CreateTabletServer(*tablet_server_options);
198
199
  // ----------------------------------------------------------------------------------------------
200
  // Starting to instantiate servers
201
  // ----------------------------------------------------------------------------------------------
202
203
7.75k
  LOG(INFO) << "Initializing tablet server...";
204
7.75k
  LOG_AND_RETURN_FROM_MAIN_NOT_OK(server->Init());
205
7.75k
  LOG(INFO) << "Starting tablet server...";
206
7.75k
  UlimitUtil::InitUlimits();
207
7.75k
  LOG(INFO) << "ulimit cur(max)..." << UlimitUtil::GetUlimitInfo();
208
7.75k
  LOG_AND_RETURN_FROM_MAIN_NOT_OK(server->Start());
209
7.75k
  LOG(INFO) << "Tablet server successfully started.";
210
211
7.75k
  std::unique_ptr<CallHome> call_home;
212
7.75k
  call_home = std::make_unique<CallHome>(server.get(), ServerType::TSERVER);
213
7.75k
  call_home->ScheduleCallHome();
214
215
7.75k
  std::unique_ptr<PgSupervisor> pg_supervisor;
216
7.75k
  if (FLAGS_start_pgsql_proxy || 
FLAGS_enable_ysql7.75k
) {
217
2.00k
    auto pg_process_conf_result = PgProcessConf::CreateValidateAndRunInitDb(
218
2.00k
        FLAGS_pgsql_proxy_bind_address,
219
2.00k
        tablet_server_options->fs_opts.data_paths.front() + "/pg_data",
220
2.00k
        server->GetSharedMemoryFd());
221
2.00k
    LOG_AND_RETURN_FROM_MAIN_NOT_OK(pg_process_conf_result);
222
2.00k
    auto& pg_process_conf = *pg_process_conf_result;
223
2.00k
    pg_process_conf.master_addresses = tablet_server_options->master_addresses_flag;
224
2.00k
    pg_process_conf.certs_dir = FLAGS_certs_dir.empty()
225
2.00k
        ? 
server::DefaultCertsDir(*server->fs_manager())1.97k
226
2.00k
        : 
FLAGS_certs_dir36
;
227
2.00k
    pg_process_conf.certs_for_client_dir = FLAGS_certs_for_client_dir.empty()
228
2.00k
        ? 
pg_process_conf.certs_dir1.99k
229
2.00k
        : 
FLAGS_certs_for_client_dir17
;
230
2.00k
    pg_process_conf.enable_tls = FLAGS_use_client_to_server_encryption;
231
232
    // Follow the same logic as elsewhere, check FLAGS_cert_node_filename then
233
    // server_broadcast_addresses then rpc_bind_addresses.
234
2.00k
    if (!FLAGS_cert_node_filename.empty()) {
235
0
      pg_process_conf.cert_base_name = FLAGS_cert_node_filename;
236
2.00k
    } else {
237
2.00k
      const auto server_broadcast_addresses =
238
2.00k
          HostPort::ParseStrings(server->options().server_broadcast_addresses, 0);
239
2.00k
      LOG_AND_RETURN_FROM_MAIN_NOT_OK(server_broadcast_addresses);
240
2.00k
      const auto rpc_bind_addresses =
241
2.00k
          HostPort::ParseStrings(server->options().rpc_opts.rpc_bind_addresses, 0);
242
2.00k
      LOG_AND_RETURN_FROM_MAIN_NOT_OK(rpc_bind_addresses);
243
2.00k
      pg_process_conf.cert_base_name = !server_broadcast_addresses->empty()
244
2.00k
                                     ? 
server_broadcast_addresses->front().host()0
245
2.00k
                                     : rpc_bind_addresses->front().host();
246
2.00k
    }
247
2.00k
    LOG(INFO) << "Starting PostgreSQL server listening on "
248
2.00k
              << pg_process_conf.listen_addresses << ", port " << pg_process_conf.pg_port;
249
250
2.00k
    pg_supervisor = std::make_unique<PgSupervisor>(pg_process_conf);
251
2.00k
    LOG_AND_RETURN_FROM_MAIN_NOT_OK(pg_supervisor->Start());
252
2.00k
  }
253
254
7.75k
  std::unique_ptr<RedisServer> redis_server;
255
7.75k
  if (FLAGS_start_redis_proxy) {
256
2.93k
    RedisServerOptions redis_server_options;
257
2.93k
    redis_server_options.rpc_opts.rpc_bind_addresses = FLAGS_redis_proxy_bind_address;
258
2.93k
    redis_server_options.webserver_opts.port = FLAGS_redis_proxy_webserver_port;
259
2.93k
    redis_server_options.master_addresses_flag = tablet_server_options->master_addresses_flag;
260
2.93k
    redis_server_options.SetMasterAddresses(tablet_server_options->GetMasterAddresses());
261
2.93k
    redis_server_options.dump_info_path =
262
2.93k
        (tablet_server_options->dump_info_path.empty()
263
2.93k
             ? 
""1.59k
264
2.93k
             : 
tablet_server_options->dump_info_path + "-redis"1.33k
);
265
2.93k
    redis_server.reset(new RedisServer(redis_server_options, server.get()));
266
2.93k
    LOG(INFO) << "Starting redis server...";
267
2.93k
    LOG_AND_RETURN_FROM_MAIN_NOT_OK(redis_server->Start());
268
2.93k
    LOG(INFO) << "Redis server successfully started.";
269
2.93k
  }
270
271
  // TODO(neil): After CQL server is starting, it blocks this thread from moving on.
272
  // This should be fixed such that all processes or service by tablet server are treated equally
273
  // by using different threads for each process.
274
7.75k
  std::unique_ptr<CQLServer> cql_server;
275
7.75k
  if (FLAGS_start_cql_proxy) {
276
6.11k
    CQLServerOptions cql_server_options;
277
6.11k
    cql_server_options.rpc_opts.rpc_bind_addresses = FLAGS_cql_proxy_bind_address;
278
6.11k
    cql_server_options.broadcast_rpc_address = FLAGS_cql_proxy_broadcast_rpc_address;
279
6.11k
    cql_server_options.webserver_opts.port = FLAGS_cql_proxy_webserver_port;
280
6.11k
    cql_server_options.master_addresses_flag = tablet_server_options->master_addresses_flag;
281
6.11k
    cql_server_options.SetMasterAddresses(tablet_server_options->GetMasterAddresses());
282
6.11k
    cql_server_options.dump_info_path =
283
6.11k
        (tablet_server_options->dump_info_path.empty()
284
6.11k
             ? 
""4.77k
285
6.11k
             : 
tablet_server_options->dump_info_path + "-cql"1.33k
);
286
6.11k
    boost::asio::io_service io;
287
6.11k
    cql_server = factory.CreateCQLServer(cql_server_options, &io, server.get());
288
6.11k
    LOG(INFO) << "Starting CQL server...";
289
6.11k
    LOG_AND_RETURN_FROM_MAIN_NOT_OK(cql_server->Start());
290
6.11k
    LOG(INFO) << "CQL server successfully started.";
291
292
    // Should run forever unless there are some errors.
293
6.11k
    boost::system::error_code ec;
294
6.11k
    io.run(ec);
295
6.11k
    if (ec) {
296
0
      LOG(WARNING) << "IO service run failure: " << ec;
297
0
    }
298
299
6.11k
    LOG (WARNING) << "CQL Server shutting down";
300
6.11k
    cql_server->Shutdown();
301
6.11k
  }
302
303
9.98k
  
while (7.75k
true) {
304
2.23k
    SleepFor(MonoDelta::FromSeconds(60));
305
2.23k
  }
306
307
7.75k
  return 0;
308
7.75k
}
309
310
}  // namespace
311
}  // namespace tserver
312
}  // namespace yb
313
314
18.6k
int main(int argc, char** argv) {
315
18.6k
  return yb::tserver::TabletServerMain(argc, argv);
316
18.6k
}