/Users/deen/code/yugabyte-db/src/yb/integration-tests/external_mini_cluster.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/integration-tests/external_mini_cluster.h" |
34 | | |
35 | | #include <atomic> |
36 | | #include <chrono> |
37 | | #include <functional> |
38 | | #include <memory> |
39 | | #include <mutex> |
40 | | #include <string> |
41 | | #include <thread> |
42 | | #include <unordered_map> |
43 | | #include <vector> |
44 | | |
45 | | #include <glog/logging.h> |
46 | | #include <gtest/gtest.h> |
47 | | #include <rapidjson/document.h> |
48 | | |
49 | | #include "yb/client/client.h" |
50 | | |
51 | | #include "yb/common/wire_protocol.h" |
52 | | |
53 | | #include "yb/consensus/consensus.proxy.h" |
54 | | |
55 | | #include "yb/fs/fs_manager.h" |
56 | | |
57 | | #include "yb/gutil/algorithm.h" |
58 | | #include "yb/gutil/bind.h" |
59 | | #include "yb/gutil/macros.h" |
60 | | #include "yb/gutil/ref_counted.h" |
61 | | #include "yb/gutil/singleton.h" |
62 | | #include "yb/gutil/strings/join.h" |
63 | | #include "yb/gutil/strings/substitute.h" |
64 | | #include "yb/gutil/strings/util.h" |
65 | | |
66 | | #include "yb/integration-tests/cluster_itest_util.h" |
67 | | |
68 | | #include "yb/master/master_admin.proxy.h" |
69 | | #include "yb/master/master_cluster.proxy.h" |
70 | | #include "yb/master/master_rpc.h" |
71 | | #include "yb/master/sys_catalog.h" |
72 | | |
73 | | #include "yb/rpc/messenger.h" |
74 | | #include "yb/rpc/proxy.h" |
75 | | #include "yb/rpc/rpc_controller.h" |
76 | | |
77 | | #include "yb/server/server_base.pb.h" |
78 | | #include "yb/server/server_base.proxy.h" |
79 | | |
80 | | #include "yb/tserver/tserver_admin.proxy.h" |
81 | | #include "yb/tserver/tserver_service.proxy.h" |
82 | | |
83 | | #include "yb/util/async_util.h" |
84 | | #include "yb/util/curl_util.h" |
85 | | #include "yb/util/env.h" |
86 | | #include "yb/util/faststring.h" |
87 | | #include "yb/util/format.h" |
88 | | #include "yb/util/jsonreader.h" |
89 | | #include "yb/util/logging.h" |
90 | | #include "yb/util/metrics.h" |
91 | | #include "yb/util/monotime.h" |
92 | | #include "yb/util/net/net_fwd.h" |
93 | | #include "yb/util/net/sockaddr.h" |
94 | | #include "yb/util/path_util.h" |
95 | | #include "yb/util/pb_util.h" |
96 | | #include "yb/util/size_literals.h" |
97 | | #include "yb/util/slice.h" |
98 | | #include "yb/util/status_format.h" |
99 | | #include "yb/util/status_log.h" |
100 | | #include "yb/util/stopwatch.h" |
101 | | #include "yb/util/subprocess.h" |
102 | | #include "yb/util/test_util.h" |
103 | | #include "yb/util/tsan_util.h" |
104 | | |
105 | | using namespace std::literals; // NOLINT |
106 | | using namespace yb::size_literals; // NOLINT |
107 | | |
108 | | using std::atomic; |
109 | | using std::lock_guard; |
110 | | using std::mutex; |
111 | | using std::shared_ptr; |
112 | | using std::string; |
113 | | using std::thread; |
114 | | using std::unique_ptr; |
115 | | |
116 | | using rapidjson::Value; |
117 | | using strings::Substitute; |
118 | | |
119 | | using yb::master::GetLeaderMasterRpc; |
120 | | using yb::master::IsInitDbDoneRequestPB; |
121 | | using yb::master::IsInitDbDoneResponsePB; |
122 | | using yb::server::ServerStatusPB; |
123 | | using yb::tserver::ListTabletsRequestPB; |
124 | | using yb::tserver::ListTabletsResponsePB; |
125 | | using yb::tserver::TabletServerErrorPB; |
126 | | using yb::tserver::TabletServerServiceProxy; |
127 | | using yb::consensus::ConsensusServiceProxy; |
128 | | using yb::consensus::RaftPeerPB; |
129 | | using yb::consensus::ChangeConfigRequestPB; |
130 | | using yb::consensus::ChangeConfigResponsePB; |
131 | | using yb::consensus::ChangeConfigType; |
132 | | using yb::consensus::GetLastOpIdRequestPB; |
133 | | using yb::consensus::GetLastOpIdResponsePB; |
134 | | using yb::consensus::LeaderStepDownRequestPB; |
135 | | using yb::consensus::LeaderStepDownResponsePB; |
136 | | using yb::consensus::RunLeaderElectionRequestPB; |
137 | | using yb::consensus::RunLeaderElectionResponsePB; |
138 | | using yb::master::IsMasterLeaderReadyRequestPB; |
139 | | using yb::master::IsMasterLeaderReadyResponsePB; |
140 | | using yb::master::GetMasterClusterConfigRequestPB; |
141 | | using yb::master::GetMasterClusterConfigResponsePB; |
142 | | using yb::master::ChangeMasterClusterConfigRequestPB; |
143 | | using yb::master::ChangeMasterClusterConfigResponsePB; |
144 | | using yb::master::SysClusterConfigEntryPB; |
145 | | using yb::tserver::ListTabletsForTabletServerRequestPB; |
146 | | using yb::tserver::ListTabletsForTabletServerResponsePB; |
147 | | using yb::master::ListMastersRequestPB; |
148 | | using yb::master::ListMastersResponsePB; |
149 | | using yb::tserver::TabletServerErrorPB; |
150 | | using yb::rpc::RpcController; |
151 | | |
152 | | typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; |
153 | | |
154 | | DECLARE_string(vmodule); |
155 | | DECLARE_int32(replication_factor); |
156 | | DECLARE_bool(mem_tracker_logging); |
157 | | DECLARE_bool(mem_tracker_log_stack_trace); |
158 | | DECLARE_string(minicluster_daemon_id); |
159 | | DECLARE_bool(use_libbacktrace); |
160 | | DECLARE_bool(never_fsync); |
161 | | |
162 | | DEFINE_string(external_daemon_heap_profile_prefix, "", |
163 | | "If this is not empty, tcmalloc's HEAPPROFILE is set this, followed by a unique " |
164 | | "suffix for external mini-cluster daemons."); |
165 | | |
166 | | DEFINE_bool(external_daemon_safe_shutdown, false, |
167 | | "Shutdown external daemons using SIGTERM first. Disabled by default to avoid " |
168 | | "interfering with kill-testing."); |
169 | | |
170 | | DECLARE_int64(outbound_rpc_block_size); |
171 | | DECLARE_int64(outbound_rpc_memory_limit); |
172 | | |
173 | | DEFINE_int64(external_mini_cluster_max_log_bytes, 50_MB * 100, |
174 | | "Max total size of log bytes produced by all external mini-cluster daemons. " |
175 | | "The test is shut down if this limit is exceeded."); |
176 | | |
177 | | namespace yb { |
178 | | |
179 | | static const char* const kMasterBinaryName = "yb-master"; |
180 | | static const char* const kTabletServerBinaryName = "yb-tserver"; |
181 | | static double kProcessStartTimeoutSeconds = 60.0; |
182 | | static MonoDelta kTabletServerRegistrationTimeout = 60s; |
183 | | |
184 | | static const int kHeapProfileSignal = SIGUSR1; |
185 | | |
186 | | constexpr size_t kDefaultMemoryLimitHardBytes = NonTsanVsTsan(1_GB, 512_MB); |
187 | | |
188 | | namespace { |
189 | | |
190 | 1.53k | void AddExtraFlagsFromEnvVar(const char* env_var_name, std::vector<std::string>* args_dest) { |
191 | 1.53k | const char* extra_daemon_flags_env_var_value = getenv(env_var_name); |
192 | 1.53k | if (extra_daemon_flags_env_var_value) { |
193 | 0 | LOG(INFO) << "Setting extra daemon flags as specified by env var " << env_var_name << ": " |
194 | 0 | << extra_daemon_flags_env_var_value; |
195 | | // TODO: this has an issue with handling quoted arguments with embedded spaces. |
196 | 0 | std::istringstream iss(extra_daemon_flags_env_var_value); |
197 | 0 | copy(std::istream_iterator<string>(iss), |
198 | 0 | std::istream_iterator<string>(), |
199 | 0 | std::back_inserter(*args_dest)); |
200 | 1.53k | } else { |
201 | 1.53k | LOG(INFO) << "Env var " << env_var_name << " not specified, not setting extra flags from it"; |
202 | 1.53k | } |
203 | 1.53k | } |
204 | | |
205 | | std::vector<std::string> FsRootDirs(const std::string& data_dir, |
206 | 738 | uint16_t num_drives) { |
207 | 738 | if (num_drives == 1) { |
208 | 706 | return vector<string>{data_dir}; |
209 | 706 | } |
210 | 32 | vector<string> data_dirs; |
211 | 98 | for (int drive = 1; drive <= num_drives; ++drive) { |
212 | 66 | data_dirs.push_back(JoinPathSegments(data_dir, Substitute("d-$0", drive))); |
213 | 66 | } |
214 | 32 | return data_dirs; |
215 | 32 | } |
216 | | |
217 | | std::vector<std::string> FsDataDirs(const std::string& data_dir, |
218 | | const std::string& server_type, |
219 | 676 | uint16_t num_drives) { |
220 | 676 | if (num_drives == 1) { |
221 | 646 | return vector<string>{GetServerTypeDataPath(data_dir, server_type)}; |
222 | 646 | } |
223 | 30 | vector<string> data_dirs; |
224 | 91 | for (int drive = 1; drive <= num_drives; ++drive) { |
225 | 61 | data_dirs.push_back(GetServerTypeDataPath( |
226 | 61 | JoinPathSegments(data_dir, Substitute("d-$0", drive)), server_type)); |
227 | 61 | } |
228 | 30 | return data_dirs; |
229 | 30 | } |
230 | | |
231 | | } // anonymous namespace |
232 | | |
233 | | // ------------------------------------------------------------------------------------------------ |
234 | | // ExternalMiniClusterOptions |
235 | | // ------------------------------------------------------------------------------------------------ |
236 | | |
237 | 7 | Status ExternalMiniClusterOptions::RemovePort(const uint16_t port) { |
238 | 7 | auto iter = std::find(master_rpc_ports.begin(), master_rpc_ports.end(), port); |
239 | | |
240 | 7 | if (iter == master_rpc_ports.end()) { |
241 | 0 | return STATUS(InvalidArgument, Substitute( |
242 | 0 | "Port to be removed '$0' not found in existing list of $1 masters.", |
243 | 0 | port, num_masters)); |
244 | 0 | } |
245 | | |
246 | 7 | master_rpc_ports.erase(iter); |
247 | 7 | --num_masters; |
248 | | |
249 | 7 | return Status::OK(); |
250 | 7 | } |
251 | | |
252 | 10 | Status ExternalMiniClusterOptions::AddPort(const uint16_t port) { |
253 | 10 | auto iter = std::find(master_rpc_ports.begin(), master_rpc_ports.end(), port); |
254 | | |
255 | 10 | if (iter != master_rpc_ports.end()) { |
256 | 0 | return STATUS(InvalidArgument, Substitute( |
257 | 0 | "Port to be added '$0' already found in the existing list of $1 masters.", |
258 | 0 | port, num_masters)); |
259 | 0 | } |
260 | | |
261 | 10 | master_rpc_ports.push_back(port); |
262 | 10 | ++num_masters; |
263 | | |
264 | 10 | return Status::OK(); |
265 | 10 | } |
266 | | |
267 | 231 | void ExternalMiniClusterOptions::AdjustMasterRpcPorts() { |
268 | 231 | if (master_rpc_ports.size() == 1 && master_rpc_ports[0] == 0) { |
269 | | // Add missing master ports to avoid errors when we try to start the cluster. |
270 | 201 | while (master_rpc_ports.size() < num_masters) { |
271 | 8 | master_rpc_ports.push_back(0); |
272 | 8 | } |
273 | 193 | } |
274 | 231 | } |
275 | | |
276 | | // ------------------------------------------------------------------------------------------------ |
277 | | // ExternalMiniCluster |
278 | | // ------------------------------------------------------------------------------------------------ |
279 | | |
280 | | ExternalMiniCluster::ExternalMiniCluster(const ExternalMiniClusterOptions& opts) |
281 | 231 | : opts_(opts), add_new_master_at_(-1) { |
282 | 231 | opts_.AdjustMasterRpcPorts(); |
283 | | // These "extra mini cluster options" are added in the end of the command line. |
284 | 231 | const auto common_extra_flags = { |
285 | 231 | "--enable_tracing"s, |
286 | 231 | Substitute("--memory_limit_hard_bytes=$0", kDefaultMemoryLimitHardBytes), |
287 | 231 | Substitute("--never_fsync=$0", FLAGS_never_fsync), |
288 | 230 | (opts.log_to_file ? "--alsologtostderr"s : "--logtostderr"s), |
289 | 0 | (IsTsan() ? "--rpc_slow_query_threshold_ms=20000"s : |
290 | 231 | "--rpc_slow_query_threshold_ms=10000"s) |
291 | 231 | }; |
292 | 462 | for (auto* extra_flags : {&opts_.extra_master_flags, &opts_.extra_tserver_flags}) { |
293 | | // Common default extra flags are inserted in the beginning so that they can be overridden by |
294 | | // caller-specified flags. |
295 | 462 | extra_flags->insert(extra_flags->begin(), |
296 | 462 | common_extra_flags.begin(), |
297 | 462 | common_extra_flags.end()); |
298 | 462 | } |
299 | 231 | AddExtraFlagsFromEnvVar("YB_EXTRA_MASTER_FLAGS", &opts_.extra_master_flags); |
300 | 231 | AddExtraFlagsFromEnvVar("YB_EXTRA_TSERVER_FLAGS", &opts_.extra_tserver_flags); |
301 | 231 | } |
302 | | |
303 | 170 | ExternalMiniCluster::~ExternalMiniCluster() { |
304 | 170 | Shutdown(); |
305 | 170 | if (messenger_holder_) { |
306 | 169 | messenger_holder_->Shutdown(); |
307 | 169 | } |
308 | 170 | } |
309 | | |
310 | 230 | Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) { |
311 | 230 | string exe; |
312 | 230 | RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe)); |
313 | 230 | *ret = DirName(exe) + "/../bin"; |
314 | 230 | return Status::OK(); |
315 | 230 | } |
316 | | |
317 | 183 | std::string ExternalMiniCluster::GetClusterDataDirName() const { |
318 | 183 | if (opts_.cluster_id == "") { |
319 | 183 | return "minicluster-data"; |
320 | 183 | } |
321 | 0 | return Format("minicluster-data-$0", opts_.cluster_id); |
322 | 0 | } |
323 | | |
324 | 230 | Status ExternalMiniCluster::HandleOptions() { |
325 | 230 | daemon_bin_path_ = opts_.daemon_bin_path; |
326 | 230 | if (daemon_bin_path_.empty()) { |
327 | 230 | RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_)); |
328 | 230 | } |
329 | | |
330 | 230 | data_root_ = opts_.data_root; |
331 | 230 | if (data_root_.empty()) { |
332 | | // If they don't specify a data root, use the current gtest directory. |
333 | 183 | data_root_ = JoinPathSegments(GetTestDataDirectory(), GetClusterDataDirName()); |
334 | | |
335 | | // If "data_root_counter" is non-negative, and the auto-generated "data_root_" directory already |
336 | | // exists, create a subdirectory using the counter value as its name. The caller should maintain |
337 | | // this counter and increment it for each test run. |
338 | 183 | if (opts_.data_root_counter >= 0) { |
339 | 0 | struct stat sb; |
340 | 0 | if (stat(data_root_.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { |
341 | 0 | data_root_ = Substitute("$0/$1", data_root_, opts_.data_root_counter); |
342 | 0 | CHECK_EQ(mkdir(data_root_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH), 0); |
343 | 0 | } |
344 | 0 | } |
345 | 183 | } |
346 | | |
347 | 230 | return Status::OK(); |
348 | 230 | } |
349 | | |
350 | 230 | Status ExternalMiniCluster::Start(rpc::Messenger* messenger) { |
351 | 0 | CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size() |
352 | 0 | << "). Maybe you meant Restart()?"; |
353 | 0 | CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: " |
354 | 0 | << tablet_servers_.size() << "). Maybe you meant Restart()?"; |
355 | 230 | RETURN_NOT_OK(HandleOptions()); |
356 | 230 | FLAGS_replication_factor = narrow_cast<int>(opts_.num_masters); |
357 | | |
358 | 230 | if (messenger == nullptr) { |
359 | 230 | rpc::MessengerBuilder builder("minicluster-messenger"); |
360 | 230 | builder.set_num_reactors(1); |
361 | 230 | messenger_holder_ = VERIFY_RESULT_PREPEND( |
362 | 230 | builder.Build(), "Failed to start Messenger for minicluster"); |
363 | 230 | messenger_ = messenger_holder_.get(); |
364 | 0 | } else { |
365 | 0 | messenger_holder_ = nullptr; |
366 | 0 | messenger_ = messenger; |
367 | 0 | } |
368 | 230 | proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_); |
369 | | |
370 | 230 | RETURN_NOT_OK(Env::Default()->CreateDirs(data_root_)); |
371 | | |
372 | 230 | LOG(INFO) << "Starting cluster with option bind_to_unique_loopback_addresses=" |
373 | 177 | << (opts_.bind_to_unique_loopback_addresses ? "true" : "false"); |
374 | | |
375 | 230 | LOG(INFO) << "Starting " << opts_.num_masters << " masters"; |
376 | 230 | RETURN_NOT_OK_PREPEND(StartMasters(), "Failed to start masters."); |
377 | 230 | add_new_master_at_ = opts_.num_masters; |
378 | | |
379 | 230 | if (opts_.num_tablet_servers > 0) { |
380 | 216 | LOG(INFO) << "Starting " << opts_.num_tablet_servers << " tablet servers"; |
381 | | |
382 | 864 | for (size_t i = 1; i <= opts_.num_tablet_servers; i++) { |
383 | 648 | RETURN_NOT_OK_PREPEND( |
384 | 648 | AddTabletServer(ExternalMiniClusterOptions::kDefaultStartCqlProxy), |
385 | 648 | Substitute("Failed starting tablet server $0", i)); |
386 | 648 | } |
387 | 216 | RETURN_NOT_OK(WaitForTabletServerCount( |
388 | 216 | opts_.num_tablet_servers, kTabletServerRegistrationTimeout)); |
389 | 14 | } else { |
390 | 14 | LOG(INFO) << "No need to start tablet servers"; |
391 | 14 | } |
392 | | |
393 | 229 | running_ = true; |
394 | 229 | return Status::OK(); |
395 | 230 | } |
396 | | |
397 | 343 | void ExternalMiniCluster::Shutdown(NodeSelectionMode mode) { |
398 | | // TODO: in the regular MiniCluster Shutdown is a no-op if running_ is false. |
399 | | // Therefore, in case of an error during cluster startup behavior might be different. |
400 | 343 | if (mode == ALL) { |
401 | 498 | for (const scoped_refptr<ExternalMaster>& master : masters_) { |
402 | 498 | if (master) { |
403 | 498 | master->Shutdown(); |
404 | 498 | } |
405 | 498 | } |
406 | 342 | } |
407 | | |
408 | 945 | for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
409 | 945 | ts->Shutdown(); |
410 | 945 | } |
411 | 343 | running_ = false; |
412 | 343 | } |
413 | | |
414 | 2 | Status ExternalMiniCluster::Restart() { |
415 | 2 | LOG(INFO) << "Restarting cluster with " << masters_.size() << " masters."; |
416 | 5 | for (const scoped_refptr<ExternalMaster>& master : masters_) { |
417 | 5 | if (master && master->IsShutdown()) { |
418 | 4 | RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " + |
419 | 4 | master->bound_rpc_hostport().ToString()); |
420 | 4 | } |
421 | 5 | } |
422 | | |
423 | 3 | for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
424 | 3 | if (ts->IsShutdown()) { |
425 | 3 | RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " + |
426 | 3 | ts->bound_rpc_hostport().ToString()); |
427 | 3 | } |
428 | 3 | } |
429 | | |
430 | 2 | RETURN_NOT_OK(WaitForTabletServerCount(tablet_servers_.size(), kTabletServerRegistrationTimeout)); |
431 | | |
432 | 2 | running_ = true; |
433 | 2 | return Status::OK(); |
434 | 2 | } |
435 | | |
436 | 919 | string ExternalMiniCluster::GetBinaryPath(const string& binary) const { |
437 | 919 | CHECK(!daemon_bin_path_.empty()); |
438 | 919 | string default_path = JoinPathSegments(daemon_bin_path_, binary); |
439 | 919 | if (Env::Default()->FileExists(default_path)) { |
440 | 919 | return default_path; |
441 | 919 | } |
442 | | |
443 | | // In CLion-based builds we sometimes have to look for the binary in other directories. |
444 | 0 | string alternative_dir; |
445 | 0 | if (binary == "yb-master") { |
446 | 0 | alternative_dir = "master"; |
447 | 0 | } else if (binary == "yb-tserver") { |
448 | 0 | alternative_dir = "tserver"; |
449 | 0 | } else { |
450 | 0 | LOG(WARNING) << "Default path " << default_path << " for binary " << binary << |
451 | 0 | " does not exist, and no alternative directory is available for this binary"; |
452 | 0 | return default_path; |
453 | 0 | } |
454 | | |
455 | 0 | string alternative_path = JoinPathSegments(daemon_bin_path_, |
456 | 0 | "../" + alternative_dir + "/" + binary); |
457 | 0 | if (Env::Default()->FileExists(alternative_path)) { |
458 | 0 | LOG(INFO) << "Default path " << default_path << " for binary " << binary << |
459 | 0 | " does not exist, using alternative location: " << alternative_path; |
460 | 0 | return alternative_path; |
461 | 0 | } else { |
462 | 0 | LOG(WARNING) << "Neither " << default_path << " nor " << alternative_path << " exist"; |
463 | 0 | return default_path; |
464 | 0 | } |
465 | 0 | } |
466 | | |
467 | 997 | string ExternalMiniCluster::GetDataPath(const string& daemon_id) const { |
468 | 997 | CHECK(!data_root_.empty()); |
469 | 997 | return JoinPathSegments(data_root_, daemon_id); |
470 | 997 | } |
471 | | |
472 | | namespace { |
473 | 987 | vector<string> SubstituteInFlags(const vector<string>& orig_flags, size_t index) { |
474 | 987 | string str_index = std::to_string(index); |
475 | 987 | vector<string> ret; |
476 | 9.62k | for (const string& orig : orig_flags) { |
477 | 9.62k | ret.push_back(StringReplace(orig, "${index}", str_index, true)); |
478 | 9.62k | } |
479 | 987 | return ret; |
480 | 987 | } |
481 | | |
482 | | } // anonymous namespace |
483 | | |
484 | 1 | Result<ExternalMaster *> ExternalMiniCluster::StartMasterWithPeers(const string& peer_addrs) { |
485 | 1 | uint16_t rpc_port = AllocateFreePort(); |
486 | 1 | uint16_t http_port = AllocateFreePort(); |
487 | 1 | LOG(INFO) << "Using auto-assigned rpc_port " << rpc_port << "; http_port " << http_port |
488 | 1 | << " to start a new external mini-cluster master with peers '" << peer_addrs << "'."; |
489 | | |
490 | 1 | string addr = MasterAddressForPort(rpc_port); |
491 | 1 | string exe = GetBinaryPath(kMasterBinaryName); |
492 | | |
493 | 1 | ExternalMaster* master = |
494 | 1 | new ExternalMaster(add_new_master_at_, messenger_, proxy_cache_.get(), exe, |
495 | 1 | GetDataPath(Substitute("master-$0", add_new_master_at_)), |
496 | 1 | opts_.extra_master_flags, addr, http_port, peer_addrs); |
497 | | |
498 | 1 | RETURN_NOT_OK(master->Start()); |
499 | | |
500 | 1 | add_new_master_at_++; |
501 | 1 | return master; |
502 | 1 | } |
503 | | |
504 | 364 | std::string ExternalMiniCluster::MasterAddressForPort(uint16_t port) const { |
505 | 364 | return Format(opts_.use_even_ips ? "127.0.0.2:$0" : "127.0.0.1:$0", port); |
506 | 364 | } |
507 | | |
508 | 9 | void ExternalMiniCluster::StartShellMaster(ExternalMaster** new_master) { |
509 | 9 | uint16_t rpc_port = AllocateFreePort(); |
510 | 9 | uint16_t http_port = AllocateFreePort(); |
511 | 9 | LOG(INFO) << "Using auto-assigned rpc_port " << rpc_port << "; http_port " << http_port |
512 | 9 | << " to start a new external mini-cluster shell master."; |
513 | | |
514 | 9 | string addr = MasterAddressForPort(rpc_port); |
515 | | |
516 | 9 | string exe = GetBinaryPath(kMasterBinaryName); |
517 | | |
518 | 9 | ExternalMaster* master = new ExternalMaster( |
519 | 9 | add_new_master_at_, |
520 | 9 | messenger_, |
521 | 9 | proxy_cache_.get(), |
522 | 9 | exe, |
523 | 9 | GetDataPath(Substitute("master-$0", add_new_master_at_)), |
524 | 9 | opts_.extra_master_flags, |
525 | 9 | addr, |
526 | 9 | http_port, |
527 | 9 | ""); |
528 | | |
529 | 9 | Status s = master->Start(true); |
530 | | |
531 | 9 | if (!s.ok()) { |
532 | 0 | LOG(FATAL) << Substitute("Unable to start 'shell' mode master at index $0, due to error $1.", |
533 | 0 | add_new_master_at_, s.ToString()); |
534 | 0 | } |
535 | | |
536 | 9 | add_new_master_at_++; |
537 | 9 | *new_master = master; |
538 | 9 | } |
539 | | |
540 | 17 | Status ExternalMiniCluster::CheckPortAndMasterSizes() const { |
541 | 17 | if (opts_.num_masters != masters_.size() || |
542 | 17 | opts_.num_masters != opts_.master_rpc_ports.size()) { |
543 | 0 | string fatal_err_msg = Substitute( |
544 | 0 | "Mismatch number of masters in options $0, compared to masters vector $1 or rpc ports $2", |
545 | 0 | opts_.num_masters, masters_.size(), opts_.master_rpc_ports.size()); |
546 | 0 | LOG(FATAL) << fatal_err_msg; |
547 | 0 | } |
548 | | |
549 | 17 | return Status::OK(); |
550 | 17 | } |
551 | | |
552 | 10 | Status ExternalMiniCluster::AddMaster(ExternalMaster* master) { |
553 | 10 | auto iter = std::find_if(masters_.begin(), masters_.end(), MasterComparator(master)); |
554 | | |
555 | 10 | if (iter != masters_.end()) { |
556 | 0 | return STATUS(InvalidArgument, Substitute( |
557 | 0 | "Master to be added '$0' already found in existing list of $1 masters.", |
558 | 0 | master->bound_rpc_hostport().ToString(), opts_.num_masters)); |
559 | 0 | } |
560 | | |
561 | 10 | RETURN_NOT_OK(opts_.AddPort(master->bound_rpc_hostport().port())); |
562 | 10 | masters_.push_back(master); |
563 | | |
564 | 10 | RETURN_NOT_OK(CheckPortAndMasterSizes()); |
565 | | |
566 | 10 | return Status::OK(); |
567 | 10 | } |
568 | | |
569 | 7 | Status ExternalMiniCluster::RemoveMaster(ExternalMaster* master) { |
570 | 7 | auto iter = std::find_if(masters_.begin(), masters_.end(), MasterComparator(master)); |
571 | | |
572 | 7 | if (iter == masters_.end()) { |
573 | 0 | return STATUS(InvalidArgument, Substitute( |
574 | 0 | "Master to be removed '$0' not found in existing list of $1 masters.", |
575 | 0 | master->bound_rpc_hostport().ToString(), opts_.num_masters)); |
576 | 0 | } |
577 | | |
578 | 7 | RETURN_NOT_OK(opts_.RemovePort(master->bound_rpc_hostport().port())); |
579 | 7 | masters_.erase(iter); |
580 | | |
581 | 7 | RETURN_NOT_OK(CheckPortAndMasterSizes()); |
582 | | |
583 | 7 | return Status::OK(); |
584 | 7 | } |
585 | | |
586 | 0 | ConsensusServiceProxy ExternalMiniCluster::GetLeaderConsensusProxy() { |
587 | 0 | return GetConsensusProxy(GetLeaderMaster()); |
588 | 0 | } |
589 | | |
590 | 155 | ConsensusServiceProxy ExternalMiniCluster::GetConsensusProxy(ExternalDaemon* external_deamon) { |
591 | 155 | return GetProxy<ConsensusServiceProxy>(external_deamon); |
592 | 155 | } |
593 | | |
594 | 7 | Status ExternalMiniCluster::StepDownMasterLeader(TabletServerErrorPB::Code* error_code) { |
595 | 7 | ExternalMaster* leader = GetLeaderMaster(); |
596 | 7 | string leader_uuid = leader->uuid(); |
597 | 7 | auto host_port = leader->bound_rpc_addr(); |
598 | 7 | LeaderStepDownRequestPB lsd_req; |
599 | 7 | lsd_req.set_tablet_id(yb::master::kSysCatalogTabletId); |
600 | 7 | lsd_req.set_dest_uuid(leader_uuid); |
601 | 7 | LeaderStepDownResponsePB lsd_resp; |
602 | 7 | RpcController lsd_rpc; |
603 | 7 | lsd_rpc.set_timeout(opts_.timeout); |
604 | 7 | ConsensusServiceProxy proxy(proxy_cache_.get(), host_port); |
605 | 7 | RETURN_NOT_OK(proxy.LeaderStepDown(lsd_req, &lsd_resp, &lsd_rpc)); |
606 | 7 | if (lsd_resp.has_error()) { |
607 | 1 | LOG(ERROR) << "LeaderStepDown for " << leader_uuid << " received error " |
608 | 1 | << lsd_resp.error().ShortDebugString(); |
609 | 1 | *error_code = lsd_resp.error().code(); |
610 | 1 | return StatusFromPB(lsd_resp.error().status()); |
611 | 1 | } |
612 | | |
613 | 6 | LOG(INFO) << "Leader at host/port '" << host_port << "' step down complete."; |
614 | | |
615 | 6 | return Status::OK(); |
616 | 6 | } |
617 | | |
618 | 4 | Status ExternalMiniCluster::StepDownMasterLeaderAndWaitForNewLeader() { |
619 | 4 | ExternalMaster* leader = GetLeaderMaster(); |
620 | 4 | string old_leader_uuid = leader->uuid(); |
621 | 4 | string leader_uuid = old_leader_uuid; |
622 | 4 | TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
623 | 4 | LOG(INFO) << "Starting step down of leader " << leader->bound_rpc_addr(); |
624 | | |
625 | | // while loop will not be needed once JIRA ENG-49 is fixed. |
626 | 4 | int iter = 1; |
627 | 9 | while (leader_uuid == old_leader_uuid) { |
628 | 5 | Status s = StepDownMasterLeader(&error_code); |
629 | | // If step down hits any error except not-ready, exit. |
630 | 5 | if (!s.ok() && error_code != TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN) { |
631 | 0 | return s; |
632 | 0 | } |
633 | 5 | sleep(3); // TODO: add wait for election api. |
634 | 5 | leader = GetLeaderMaster(); |
635 | 5 | leader_uuid = leader->uuid(); |
636 | 5 | LOG(INFO) << "Got new leader " << leader->bound_rpc_addr() << ", iter=" << iter; |
637 | 5 | iter++; |
638 | 5 | } |
639 | | |
640 | 4 | return Status::OK(); |
641 | 4 | } |
642 | | |
643 | | Status ExternalMiniCluster::ChangeConfig(ExternalMaster* master, |
644 | | ChangeConfigType type, |
645 | | consensus::PeerMemberType member_type, |
646 | 17 | bool use_hostport) { |
647 | 17 | if (type != consensus::ADD_SERVER && type != consensus::REMOVE_SERVER) { |
648 | 0 | return STATUS(InvalidArgument, Substitute("Invalid Change Config type $0", type)); |
649 | 0 | } |
650 | | |
651 | 17 | ChangeConfigRequestPB req; |
652 | 17 | ChangeConfigResponsePB resp; |
653 | 17 | rpc::RpcController rpc; |
654 | 17 | rpc.set_timeout(opts_.timeout); |
655 | | |
656 | 17 | RaftPeerPB peer_pb; |
657 | 16 | peer_pb.set_permanent_uuid(use_hostport ? "" : master->uuid()); |
658 | 17 | if (type == consensus::ADD_SERVER) { |
659 | 10 | peer_pb.set_member_type(member_type); |
660 | 10 | } |
661 | 17 | HostPortToPB(master->bound_rpc_hostport(), peer_pb.mutable_last_known_private_addr()->Add()); |
662 | 17 | req.set_tablet_id(yb::master::kSysCatalogTabletId); |
663 | 17 | req.set_type(type); |
664 | 17 | req.set_use_host(use_hostport); |
665 | 17 | *req.mutable_server() = peer_pb; |
666 | | |
667 | | // There could be timing window where we found the leader host/port, but an election in the |
668 | | // meantime could have made it step down. So we retry till we hit the leader correctly. |
669 | 17 | int num_attempts = 1; |
670 | 19 | while (true) { |
671 | 19 | ExternalMaster* leader = GetLeaderMaster(); |
672 | 19 | auto leader_proxy = std::make_unique<ConsensusServiceProxy>( |
673 | 19 | proxy_cache_.get(), leader->bound_rpc_addr()); |
674 | 19 | string leader_uuid = leader->uuid(); |
675 | | |
676 | 19 | if (type == consensus::REMOVE_SERVER && leader_uuid == req.server().permanent_uuid()) { |
677 | 3 | RETURN_NOT_OK(StepDownMasterLeaderAndWaitForNewLeader()); |
678 | 3 | leader = GetLeaderMaster(); |
679 | 3 | leader_uuid = leader->uuid(); |
680 | 3 | leader_proxy.reset(new ConsensusServiceProxy(proxy_cache_.get(), leader->bound_rpc_addr())); |
681 | 3 | } |
682 | | |
683 | 19 | req.set_dest_uuid(leader_uuid); |
684 | 19 | RETURN_NOT_OK(leader_proxy->ChangeConfig(req, &resp, &rpc)); |
685 | 19 | if (resp.has_error()) { |
686 | 2 | if (resp.error().code() != TabletServerErrorPB::NOT_THE_LEADER && |
687 | 2 | resp.error().code() != TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG) { |
688 | 0 | return STATUS(RuntimeError, Substitute("Change Config RPC to leader hit error: $0", |
689 | 0 | resp.error().ShortDebugString())); |
690 | 0 | } |
691 | 17 | } else { |
692 | 17 | break; |
693 | 17 | } |
694 | | |
695 | | // Need to retry as we come here with NOT_THE_LEADER. |
696 | 2 | if (num_attempts >= kMaxRetryIterations) { |
697 | 0 | return STATUS(IllegalState, |
698 | 0 | Substitute("Failed to complete ChangeConfig request '$0' even after maximum " |
699 | 0 | "number of attempts. Last error '$1'", |
700 | 0 | req.ShortDebugString(), resp.error().ShortDebugString())); |
701 | 0 | } |
702 | | |
703 | 2 | SleepFor(MonoDelta::FromSeconds(1)); |
704 | | |
705 | 2 | LOG(INFO) << "Resp error '" << resp.error().ShortDebugString() << "', num=" << num_attempts |
706 | 2 | << ", retrying..."; |
707 | | |
708 | 2 | rpc.Reset(); |
709 | 2 | num_attempts++; |
710 | 2 | } |
711 | | |
712 | 17 | LOG(INFO) << "Master " << master->bound_rpc_hostport().ToString() << ", change type " |
713 | 17 | << type << " to " << masters_.size() << " masters."; |
714 | | |
715 | 17 | if (type == consensus::ADD_SERVER) { |
716 | 10 | return AddMaster(master); |
717 | 7 | } else if (type == consensus::REMOVE_SERVER) { |
718 | 7 | return RemoveMaster(master); |
719 | 7 | } |
720 | | |
721 | 0 | string err_msg = Substitute("Should not reach here - change type $0", type); |
722 | |
|
723 | 0 | LOG(FATAL) << err_msg; |
724 | | |
725 | | // Satisfy the compiler with a return from here |
726 | 0 | return STATUS(RuntimeError, err_msg); |
727 | 0 | } |
728 | | |
729 | | // We look for the exact master match. Since it is possible to stop/restart master on |
730 | | // a given host/port, we do not want a stale master pointer input to match a newer master. |
731 | 37 | int ExternalMiniCluster::GetIndexOfMaster(ExternalMaster* master) const { |
732 | 73 | for (size_t i = 0; i < masters_.size(); i++) { |
733 | 73 | if (masters_[i].get() == master) { |
734 | 37 | return narrow_cast<int>(i); |
735 | 37 | } |
736 | 73 | } |
737 | 0 | return -1; |
738 | 37 | } |
739 | | |
740 | 1 | Status ExternalMiniCluster::PingMaster(ExternalMaster* master) const { |
741 | 1 | int index = GetIndexOfMaster(master); |
742 | 1 | server::PingRequestPB req; |
743 | 1 | server::PingResponsePB resp; |
744 | 1 | std::shared_ptr<server::GenericServiceProxy> proxy = |
745 | 1 | index == -1 ? master_generic_proxy(master->bound_rpc_addr()) : master_generic_proxy(index); |
746 | 1 | rpc::RpcController rpc; |
747 | 1 | rpc.set_timeout(opts_.timeout); |
748 | 1 | return proxy->Ping(req, &resp, &rpc); |
749 | 1 | } |
750 | | |
751 | | Status ExternalMiniCluster::AddTServerToBlacklist( |
752 | | ExternalMaster* master, |
753 | 5 | ExternalTabletServer* ts) { |
754 | 5 | GetMasterClusterConfigRequestPB config_req; |
755 | 5 | GetMasterClusterConfigResponsePB config_resp; |
756 | 5 | int index = GetIndexOfMaster(master); |
757 | | |
758 | 5 | if (index == -1) { |
759 | 0 | return STATUS(InvalidArgument, Substitute( |
760 | 0 | "Given master '$0' not in the current list of $1 masters.", |
761 | 0 | master->bound_rpc_hostport().ToString(), masters_.size())); |
762 | 0 | } |
763 | | |
764 | 5 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(index); |
765 | 5 | rpc::RpcController rpc; |
766 | 5 | rpc.set_timeout(opts_.timeout); |
767 | 5 | RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc)); |
768 | 5 | if (config_resp.has_error()) { |
769 | 0 | return STATUS(RuntimeError, Substitute( |
770 | 0 | "GetMasterClusterConfig RPC response hit error: $0", |
771 | 0 | config_resp.error().ShortDebugString())); |
772 | 0 | } |
773 | | // Get current config |
774 | 5 | ChangeMasterClusterConfigRequestPB change_req; |
775 | 5 | SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config(); |
776 | | // add tserver to blacklist |
777 | 5 | HostPortToPB(ts->bound_rpc_hostport(), config.mutable_server_blacklist()->mutable_hosts()->Add()); |
778 | 5 | *change_req.mutable_cluster_config() = config; |
779 | 5 | ChangeMasterClusterConfigResponsePB change_resp; |
780 | 5 | rpc.Reset(); |
781 | 5 | RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc)); |
782 | 5 | if (change_resp.has_error()) { |
783 | 0 | return STATUS(RuntimeError, Substitute( |
784 | 0 | "ChangeMasterClusterConfig RPC response hit error: $0", |
785 | 0 | change_resp.error().ShortDebugString())); |
786 | 0 | } |
787 | | |
788 | 5 | LOG(INFO) << "TServer at " << ts->bound_rpc_hostport().ToString() |
789 | 5 | << " was added to the blacklist"; |
790 | | |
791 | 5 | return Status::OK(); |
792 | 5 | } |
793 | | |
794 | | Status ExternalMiniCluster::GetMinReplicaCountForPlacementBlock( |
795 | | ExternalMaster* master, |
796 | | const string& cloud, const string& region, const string& zone, |
797 | 1 | int* min_num_replicas) { |
798 | 1 | GetMasterClusterConfigRequestPB config_req; |
799 | 1 | GetMasterClusterConfigResponsePB config_resp; |
800 | 1 | int index = GetIndexOfMaster(master); |
801 | | |
802 | 1 | if (index == -1) { |
803 | 0 | return STATUS(InvalidArgument, Substitute( |
804 | 0 | "Given master '$0' not in the current list of $1 masters.", |
805 | 0 | master->bound_rpc_hostport().ToString(), masters_.size())); |
806 | 0 | } |
807 | | |
808 | 1 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(index); |
809 | 1 | rpc::RpcController rpc; |
810 | 1 | rpc.set_timeout(opts_.timeout); |
811 | 1 | RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc)); |
812 | 1 | if (config_resp.has_error()) { |
813 | 0 | return STATUS(RuntimeError, Substitute( |
814 | 0 | "GetMasterClusterConfig RPC response hit error: $0", |
815 | 0 | config_resp.error().ShortDebugString())); |
816 | 0 | } |
817 | 1 | const SysClusterConfigEntryPB& config = config_resp.cluster_config(); |
818 | | |
819 | 1 | if (!config.has_replication_info() || !config.replication_info().has_live_replicas()) { |
820 | 0 | return STATUS(InvalidArgument, Substitute( |
821 | 0 | "Given placement block '$0.$1.$2' not in the current list of placement blocks.", |
822 | 0 | cloud, region, zone)); |
823 | 0 | } |
824 | | |
825 | 1 | const master::PlacementInfoPB& pi = config.replication_info().live_replicas(); |
826 | | |
827 | 1 | int found_index = -1; |
828 | 1 | bool found = false; |
829 | 1 | for (int i = 0; i < pi.placement_blocks_size(); i++) { |
830 | 1 | if (!pi.placement_blocks(i).has_cloud_info()) { |
831 | 0 | continue; |
832 | 0 | } |
833 | | |
834 | 1 | bool is_cloud_same = false, is_region_same = false, is_zone_same = false; |
835 | | |
836 | 1 | if (pi.placement_blocks(i).cloud_info().has_placement_cloud() && cloud != "") { |
837 | 1 | is_cloud_same = pi.placement_blocks(i).cloud_info().placement_cloud() == cloud; |
838 | 0 | } else if (!pi.placement_blocks(i).cloud_info().has_placement_cloud() && cloud == "") { |
839 | 0 | is_cloud_same = true; |
840 | 0 | } |
841 | | |
842 | 1 | if (pi.placement_blocks(i).cloud_info().has_placement_region() && region != "") { |
843 | 1 | is_region_same = pi.placement_blocks(i).cloud_info().placement_region() == region; |
844 | 0 | } else if (!pi.placement_blocks(i).cloud_info().has_placement_region() && region == "") { |
845 | 0 | is_region_same = true; |
846 | 0 | } |
847 | | |
848 | 1 | if (pi.placement_blocks(i).cloud_info().has_placement_zone() && zone != "") { |
849 | 0 | is_zone_same = pi.placement_blocks(i).cloud_info().placement_zone() == zone; |
850 | 1 | } else if (!pi.placement_blocks(i).cloud_info().has_placement_zone() && zone == "") { |
851 | 1 | is_zone_same = true; |
852 | 1 | } |
853 | | |
854 | 1 | if (is_cloud_same && is_region_same && is_zone_same) { |
855 | 1 | found = true; |
856 | 1 | found_index = i; |
857 | 1 | break; |
858 | 1 | } |
859 | 1 | } |
860 | | |
861 | 1 | if (!found || !pi.placement_blocks(found_index).has_min_num_replicas()) { |
862 | 0 | return STATUS(InvalidArgument, Substitute( |
863 | 0 | "Given placement block '$0.$1.$2' not in the current list of placement blocks.", |
864 | 0 | cloud, region, zone)); |
865 | 0 | } |
866 | | |
867 | 1 | *min_num_replicas = pi.placement_blocks(found_index).min_num_replicas(); |
868 | 1 | return Status::OK(); |
869 | 1 | } |
870 | | |
871 | | Status ExternalMiniCluster::AddTServerToLeaderBlacklist( |
872 | | ExternalMaster* master, |
873 | 3 | ExternalTabletServer* ts) { |
874 | 3 | GetMasterClusterConfigRequestPB config_req; |
875 | 3 | GetMasterClusterConfigResponsePB config_resp; |
876 | 3 | int index = GetIndexOfMaster(master); |
877 | | |
878 | 3 | if (index == -1) { |
879 | 0 | return STATUS(InvalidArgument, Substitute( |
880 | 0 | "Given master '$0' not in the current list of $1 masters.", |
881 | 0 | master->bound_rpc_hostport().ToString(), masters_.size())); |
882 | 0 | } |
883 | | |
884 | 3 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(index); |
885 | 3 | rpc::RpcController rpc; |
886 | 3 | rpc.set_timeout(opts_.timeout); |
887 | 3 | RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc)); |
888 | 3 | if (config_resp.has_error()) { |
889 | 0 | return STATUS(RuntimeError, Substitute( |
890 | 0 | "GetMasterClusterConfig RPC response hit error: $0", |
891 | 0 | config_resp.error().ShortDebugString())); |
892 | 0 | } |
893 | | // Get current config |
894 | 3 | ChangeMasterClusterConfigRequestPB change_req; |
895 | 3 | SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config(); |
896 | | // add tserver to blacklist |
897 | 3 | HostPortToPB(ts->bound_rpc_hostport(), config.mutable_leader_blacklist()->mutable_hosts()->Add()); |
898 | 3 | *change_req.mutable_cluster_config() = config; |
899 | 3 | ChangeMasterClusterConfigResponsePB change_resp; |
900 | 3 | rpc.Reset(); |
901 | 3 | RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc)); |
902 | 3 | if (change_resp.has_error()) { |
903 | 0 | return STATUS(RuntimeError, Substitute( |
904 | 0 | "ChangeMasterClusterConfig RPC response hit error: $0", |
905 | 0 | change_resp.error().ShortDebugString())); |
906 | 0 | } |
907 | | |
908 | 3 | LOG(INFO) << "TServer at " << ts->bound_rpc_hostport().ToString() |
909 | 3 | << " was added to the leader blacklist"; |
910 | | |
911 | 3 | return Status::OK(); |
912 | 3 | } |
913 | | |
914 | | Status ExternalMiniCluster::ClearBlacklist( |
915 | 3 | ExternalMaster* master) { |
916 | 3 | GetMasterClusterConfigRequestPB config_req; |
917 | 3 | GetMasterClusterConfigResponsePB config_resp; |
918 | 3 | int index = GetIndexOfMaster(master); |
919 | | |
920 | 3 | if (index == -1) { |
921 | 0 | return STATUS(InvalidArgument, Substitute( |
922 | 0 | "Given master '$0' not in the current list of $1 masters.", |
923 | 0 | master->bound_rpc_hostport().ToString(), masters_.size())); |
924 | 0 | } |
925 | | |
926 | 3 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(index); |
927 | 3 | rpc::RpcController rpc; |
928 | 3 | rpc.set_timeout(opts_.timeout); |
929 | 3 | RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc)); |
930 | 3 | if (config_resp.has_error()) { |
931 | 0 | return STATUS(RuntimeError, Substitute( |
932 | 0 | "GetMasterClusterConfig RPC response hit error: $0", |
933 | 0 | config_resp.error().ShortDebugString())); |
934 | 0 | } |
935 | | // Get current config. |
936 | 3 | ChangeMasterClusterConfigRequestPB change_req; |
937 | 3 | SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config(); |
938 | | // Clear blacklist. |
939 | 3 | config.mutable_server_blacklist()->mutable_hosts()->Clear(); |
940 | 3 | config.mutable_leader_blacklist()->mutable_hosts()->Clear(); |
941 | 3 | *change_req.mutable_cluster_config() = config; |
942 | 3 | ChangeMasterClusterConfigResponsePB change_resp; |
943 | 3 | rpc.Reset(); |
944 | 3 | RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc)); |
945 | 3 | if (change_resp.has_error()) { |
946 | 0 | return STATUS(RuntimeError, Substitute( |
947 | 0 | "ChangeMasterClusterConfig RPC response hit error: $0", |
948 | 0 | change_resp.error().ShortDebugString())); |
949 | 0 | } |
950 | | |
951 | 3 | LOG(INFO) << "Blacklist cleared successfully"; |
952 | | |
953 | 3 | return Status::OK(); |
954 | 3 | } |
955 | | |
956 | 23 | Status ExternalMiniCluster::GetNumMastersAsSeenBy(ExternalMaster* master, int* num_peers) { |
957 | 23 | ListMastersRequestPB list_req; |
958 | 23 | ListMastersResponsePB list_resp; |
959 | 23 | int index = GetIndexOfMaster(master); |
960 | | |
961 | 23 | if (index == -1) { |
962 | 0 | return STATUS(InvalidArgument, Substitute( |
963 | 0 | "Given master '$0' not in the current list of $1 masters.", |
964 | 0 | master->bound_rpc_hostport().ToString(), masters_.size())); |
965 | 0 | } |
966 | | |
967 | 23 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(index); |
968 | 23 | rpc::RpcController rpc; |
969 | 23 | rpc.set_timeout(opts_.timeout); |
970 | 23 | RETURN_NOT_OK(proxy.ListMasters(list_req, &list_resp, &rpc)); |
971 | 23 | if (list_resp.has_error()) { |
972 | 0 | return STATUS(RuntimeError, Substitute( |
973 | 0 | "List Masters RPC response hit error: $0", list_resp.error().ShortDebugString())); |
974 | 0 | } |
975 | | |
976 | 23 | LOG(INFO) << "List Masters for master at index " << index |
977 | 23 | << " got " << list_resp.masters_size() << " peers"; |
978 | | |
979 | 23 | *num_peers = list_resp.masters_size(); |
980 | | |
981 | 23 | return Status::OK(); |
982 | 23 | } |
983 | | |
984 | 13 | Status ExternalMiniCluster::WaitForLeaderCommitTermAdvance() { |
985 | 13 | OpIdPB start_opid; |
986 | 13 | RETURN_NOT_OK(GetLastOpIdForLeader(&start_opid)); |
987 | 13 | LOG(INFO) << "Start OPID : " << start_opid.ShortDebugString(); |
988 | | |
989 | | // Need not do any wait if it is a restart case - so the commit term will be > 0. |
990 | 13 | if (start_opid.term() != 0) |
991 | 13 | return Status::OK(); |
992 | | |
993 | 0 | MonoTime now = MonoTime::Now(); |
994 | 0 | MonoTime deadline = now; |
995 | 0 | deadline.AddDelta(opts_.timeout); |
996 | 0 | auto opid = start_opid; |
997 | |
|
998 | 0 | for (int i = 1; now.ComesBefore(deadline); ++i) { |
999 | 0 | if (opid.term() > start_opid.term()) { |
1000 | 0 | LOG(INFO) << "Final OPID: " << opid.ShortDebugString() << " after " |
1001 | 0 | << i << " iterations."; |
1002 | |
|
1003 | 0 | return Status::OK(); |
1004 | 0 | } |
1005 | 0 | SleepFor(MonoDelta::FromMilliseconds(min(i, 10))); |
1006 | 0 | RETURN_NOT_OK(GetLastOpIdForLeader(&opid)); |
1007 | 0 | now = MonoTime::Now(); |
1008 | 0 | } |
1009 | |
|
1010 | 0 | return STATUS(TimedOut, Substitute("Term did not advance from $0.", start_opid.term())); |
1011 | 0 | } |
1012 | | |
1013 | | Status ExternalMiniCluster::GetLastOpIdForEachMasterPeer( |
1014 | | const MonoDelta& timeout, |
1015 | | consensus::OpIdType opid_type, |
1016 | 44 | vector<OpIdPB>* op_ids) { |
1017 | 44 | GetLastOpIdRequestPB opid_req; |
1018 | 44 | GetLastOpIdResponsePB opid_resp; |
1019 | 44 | opid_req.set_tablet_id(yb::master::kSysCatalogTabletId); |
1020 | 44 | RpcController controller; |
1021 | 44 | controller.set_timeout(timeout); |
1022 | | |
1023 | 44 | op_ids->clear(); |
1024 | 154 | for (scoped_refptr<ExternalMaster> master : masters_) { |
1025 | 154 | opid_req.set_dest_uuid(master->uuid()); |
1026 | 154 | opid_req.set_opid_type(opid_type); |
1027 | 154 | RETURN_NOT_OK_PREPEND( |
1028 | 154 | GetConsensusProxy(master.get()).GetLastOpId(opid_req, &opid_resp, &controller), |
1029 | 154 | Substitute("Failed to fetch last op id from $0", master->bound_rpc_hostport().port())); |
1030 | 154 | op_ids->push_back(opid_resp.opid()); |
1031 | 154 | controller.Reset(); |
1032 | 154 | } |
1033 | | |
1034 | 44 | return Status::OK(); |
1035 | 44 | } |
1036 | | |
1037 | 24 | Status ExternalMiniCluster::WaitForMastersToCommitUpTo(int64_t target_index) { |
1038 | 24 | auto deadline = CoarseMonoClock::Now() + opts_.timeout.ToSteadyDuration(); |
1039 | | |
1040 | 44 | for (int i = 1;; i++) { |
1041 | 44 | vector<OpIdPB> ids; |
1042 | 44 | Status s = GetLastOpIdForEachMasterPeer(opts_.timeout, consensus::COMMITTED_OPID, &ids); |
1043 | | |
1044 | 44 | if (s.ok()) { |
1045 | 44 | bool any_behind = false; |
1046 | 109 | for (const auto& id : ids) { |
1047 | 109 | if (id.index() < target_index) { |
1048 | 20 | any_behind = true; |
1049 | 20 | break; |
1050 | 20 | } |
1051 | 109 | } |
1052 | 44 | if (!any_behind) { |
1053 | 24 | LOG(INFO) << "Committed up to " << target_index; |
1054 | 24 | return Status::OK(); |
1055 | 24 | } |
1056 | 0 | } else { |
1057 | 0 | LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString(); |
1058 | 0 | } |
1059 | | |
1060 | 20 | if (CoarseMonoClock::Now() >= deadline) { |
1061 | 0 | if (!s.ok()) { |
1062 | 0 | return s; |
1063 | 0 | } |
1064 | | |
1065 | 0 | return STATUS_FORMAT(TimedOut, |
1066 | 0 | "Index $0 not available on all replicas after $1. ", |
1067 | 0 | target_index, |
1068 | 0 | opts_.timeout); |
1069 | 0 | } |
1070 | | |
1071 | 20 | SleepFor(MonoDelta::FromMilliseconds(min(i * 100, 1000))); |
1072 | 20 | } |
1073 | 24 | } |
1074 | | |
1075 | 1 | Status ExternalMiniCluster::GetIsMasterLeaderServiceReady(ExternalMaster* master) { |
1076 | 1 | IsMasterLeaderReadyRequestPB req; |
1077 | 1 | IsMasterLeaderReadyResponsePB resp; |
1078 | 1 | int index = GetIndexOfMaster(master); |
1079 | | |
1080 | 1 | if (index == -1) { |
1081 | 0 | return STATUS(InvalidArgument, Substitute( |
1082 | 0 | "Given master '$0' not in the current list of $1 masters.", |
1083 | 0 | master->bound_rpc_hostport().ToString(), masters_.size())); |
1084 | 0 | } |
1085 | | |
1086 | 1 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(index); |
1087 | 1 | rpc::RpcController rpc; |
1088 | 1 | rpc.set_timeout(opts_.timeout); |
1089 | 1 | RETURN_NOT_OK(proxy.IsMasterLeaderServiceReady(req, &resp, &rpc)); |
1090 | 1 | if (resp.has_error()) { |
1091 | 0 | return STATUS(RuntimeError, Substitute( |
1092 | 0 | "Is master ready RPC response hit error: $0", resp.error().ShortDebugString())); |
1093 | 0 | } |
1094 | | |
1095 | 1 | return Status::OK(); |
1096 | 1 | } |
1097 | | |
1098 | 42 | Status ExternalMiniCluster::GetLastOpIdForLeader(OpIdPB* opid) { |
1099 | 42 | ExternalMaster* leader = GetLeaderMaster(); |
1100 | 42 | auto leader_master_sock = leader->bound_rpc_addr(); |
1101 | 42 | std::shared_ptr<ConsensusServiceProxy> leader_proxy = |
1102 | 42 | std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), leader_master_sock); |
1103 | | |
1104 | 42 | RETURN_NOT_OK(itest::GetLastOpIdForMasterReplica( |
1105 | 42 | leader_proxy, |
1106 | 42 | yb::master::kSysCatalogTabletId, |
1107 | 42 | leader->uuid(), |
1108 | 42 | consensus::COMMITTED_OPID, |
1109 | 42 | opts_.timeout, |
1110 | 42 | opid)); |
1111 | | |
1112 | 42 | return Status::OK(); |
1113 | 42 | } |
1114 | | |
1115 | 26 | string ExternalMiniCluster::GetMasterAddresses() const { |
1116 | 26 | string peer_addrs = ""; |
1117 | 68 | for (size_t i = 0; i < opts_.num_masters; i++) { |
1118 | 42 | if (!peer_addrs.empty()) { |
1119 | 16 | peer_addrs += ","; |
1120 | 16 | } |
1121 | 42 | peer_addrs += MasterAddressForPort(opts_.master_rpc_ports[i]); |
1122 | 42 | } |
1123 | 26 | return peer_addrs; |
1124 | 26 | } |
1125 | | |
1126 | 0 | string ExternalMiniCluster::GetTabletServerAddresses() const { |
1127 | 0 | string peer_addrs = ""; |
1128 | 0 | for (const auto& ts : tablet_servers_) { |
1129 | 0 | if (!peer_addrs.empty()) { |
1130 | 0 | peer_addrs += ","; |
1131 | 0 | } |
1132 | 0 | peer_addrs += HostPortToString(ts->bind_host(), ts->rpc_port()); |
1133 | 0 | } |
1134 | 0 | return peer_addrs; |
1135 | 0 | } |
1136 | | |
1137 | 0 | string ExternalMiniCluster::GetTabletServerHTTPAddresses() const { |
1138 | 0 | string peer_addrs = ""; |
1139 | 0 | for (const auto& ts : tablet_servers_) { |
1140 | 0 | if (!peer_addrs.empty()) { |
1141 | 0 | peer_addrs += ","; |
1142 | 0 | } |
1143 | 0 | peer_addrs += HostPortToString(ts->bind_host(), ts->http_port()); |
1144 | 0 | } |
1145 | 0 | return peer_addrs; |
1146 | 0 | } |
1147 | | |
1148 | 230 | Status ExternalMiniCluster::StartMasters() { |
1149 | 230 | auto num_masters = opts_.num_masters; |
1150 | | |
1151 | 230 | if (opts_.master_rpc_ports.size() != num_masters) { |
1152 | 0 | LOG(FATAL) << num_masters << " masters requested, but " << |
1153 | 0 | opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'"; |
1154 | 0 | } |
1155 | | |
1156 | 312 | for (auto& port : opts_.master_rpc_ports) { |
1157 | 312 | if (port == 0) { |
1158 | 311 | port = AllocateFreePort(); |
1159 | 311 | LOG(INFO) << "Using an auto-assigned port " << port |
1160 | 311 | << " to start an external mini-cluster master"; |
1161 | 311 | } |
1162 | 312 | } |
1163 | | |
1164 | 230 | vector<string> peer_addrs; |
1165 | 542 | for (size_t i = 0; i < num_masters; i++) { |
1166 | 312 | string addr = MasterAddressForPort(opts_.master_rpc_ports[i]); |
1167 | 312 | peer_addrs.push_back(addr); |
1168 | 312 | } |
1169 | 230 | string peer_addrs_str = JoinStrings(peer_addrs, ","); |
1170 | 230 | vector<string> flags = opts_.extra_master_flags; |
1171 | | // Disable WAL fsync for tests |
1172 | 230 | flags.push_back("--durable_wal_write=false"); |
1173 | 230 | flags.push_back("--enable_leader_failure_detection=true"); |
1174 | | // Limit number of transaction table tablets to help avoid timeouts. |
1175 | 230 | int num_transaction_table_tablets = NumTabletsPerTransactionTable(opts_); |
1176 | 230 | flags.push_back(Substitute("--transaction_table_num_tablets=$0", num_transaction_table_tablets)); |
1177 | | // For sanitizer builds, it is easy to overload the master, leading to quorum changes. |
1178 | | // This could end up breaking ever trivial DDLs like creating an initial table in the cluster. |
1179 | 230 | if (IsSanitizer()) { |
1180 | 0 | flags.push_back("--leader_failure_max_missed_heartbeat_periods=10"); |
1181 | 0 | } |
1182 | 230 | if (opts_.enable_ysql) { |
1183 | 5 | flags.push_back("--enable_ysql=true"); |
1184 | 5 | flags.push_back("--master_auto_run_initdb"); |
1185 | 225 | } else { |
1186 | 225 | flags.push_back("--enable_ysql=false"); |
1187 | 225 | } |
1188 | 230 | string exe = GetBinaryPath(kMasterBinaryName); |
1189 | | |
1190 | | // Start the masters. |
1191 | 542 | for (size_t i = 0; i < num_masters; i++) { |
1192 | 312 | uint16_t http_port = AllocateFreePort(); |
1193 | 312 | scoped_refptr<ExternalMaster> peer = |
1194 | 312 | new ExternalMaster( |
1195 | 312 | i, |
1196 | 312 | messenger_, |
1197 | 312 | proxy_cache_.get(), |
1198 | 312 | exe, |
1199 | 312 | GetDataPath(Substitute("master-$0", i)), |
1200 | 312 | SubstituteInFlags(flags, i), |
1201 | 312 | peer_addrs[i], |
1202 | 312 | http_port, |
1203 | 312 | peer_addrs_str); |
1204 | 312 | RETURN_NOT_OK_PREPEND(peer->Start(), |
1205 | 312 | Substitute("Unable to start Master at index $0", i)); |
1206 | 312 | masters_.push_back(peer); |
1207 | 312 | } |
1208 | | |
1209 | 230 | if (opts_.enable_ysql) { |
1210 | 5 | RETURN_NOT_OK(WaitForInitDb()); |
1211 | 5 | } |
1212 | 230 | return Status::OK(); |
1213 | 230 | } |
1214 | | |
1215 | 5 | Status ExternalMiniCluster::WaitForInitDb() { |
1216 | 5 | const auto start_time = std::chrono::steady_clock::now(); |
1217 | 5 | const auto kTimeout = NonTsanVsTsan(1200s, 1800s); |
1218 | 5 | int num_timeouts = 0; |
1219 | 5 | const int kMaxTimeouts = 10; |
1220 | 66 | while (true) { |
1221 | 157 | for (size_t i = 0; i < opts_.num_masters; i++) { |
1222 | 96 | auto elapsed_time = std::chrono::steady_clock::now() - start_time; |
1223 | 96 | if (elapsed_time > kTimeout) { |
1224 | 0 | return STATUS_FORMAT( |
1225 | 0 | TimedOut, |
1226 | 0 | "Timed out while waiting for initdb to complete: elapsed time is $0, timeout is $1", |
1227 | 0 | elapsed_time, kTimeout); |
1228 | 0 | } |
1229 | 96 | auto proxy = GetMasterProxy<master::MasterAdminProxy>(i); |
1230 | 96 | rpc::RpcController rpc; |
1231 | 96 | rpc.set_timeout(opts_.timeout); |
1232 | 96 | IsInitDbDoneRequestPB req; |
1233 | 96 | IsInitDbDoneResponsePB resp; |
1234 | 96 | Status status = proxy.IsInitDbDone(req, &resp, &rpc); |
1235 | 96 | if (status.IsTimedOut()) { |
1236 | 0 | num_timeouts++; |
1237 | 0 | LOG(WARNING) << status << " (seen " << num_timeouts << " timeouts so far)"; |
1238 | 0 | if (num_timeouts == kMaxTimeouts) { |
1239 | 0 | LOG(ERROR) << "Reached " << kMaxTimeouts << " timeouts: " << status; |
1240 | 0 | return status; |
1241 | 0 | } |
1242 | 0 | continue; |
1243 | 0 | } |
1244 | 96 | if (resp.has_error() && |
1245 | 91 | resp.error().code() != master::MasterErrorPB::NOT_THE_LEADER) { |
1246 | |
|
1247 | 0 | return STATUS(RuntimeError, Substitute( |
1248 | 0 | "IsInitDbDone RPC response hit error: $0", |
1249 | 0 | resp.error().ShortDebugString())); |
1250 | 0 | } |
1251 | 96 | if (resp.done()) { |
1252 | 5 | if (resp.has_initdb_error() && !resp.initdb_error().empty()) { |
1253 | 0 | LOG(ERROR) << "master reported an initdb error: " << resp.initdb_error(); |
1254 | 0 | return STATUS(RuntimeError, "initdb failed: " + resp.initdb_error()); |
1255 | 0 | } |
1256 | 5 | LOG(INFO) << "master indicated that initdb is done"; |
1257 | 5 | return Status::OK(); |
1258 | 5 | } |
1259 | 96 | } |
1260 | 61 | std::this_thread::sleep_for(500ms); |
1261 | 61 | } |
1262 | 5 | } |
1263 | | |
1264 | 41 | Result<bool> ExternalMiniCluster::is_ts_stale(int ts_idx, MonoDelta deadline) { |
1265 | 41 | auto proxy = GetMasterProxy<master::MasterClusterProxy>(); |
1266 | 41 | std::shared_ptr<rpc::RpcController> controller = std::make_shared<rpc::RpcController>(); |
1267 | 41 | master::ListTabletServersRequestPB req; |
1268 | 41 | master::ListTabletServersResponsePB resp; |
1269 | 41 | controller->Reset(); |
1270 | 41 | controller->set_timeout(deadline); |
1271 | | |
1272 | 41 | RETURN_NOT_OK(proxy.ListTabletServers(req, &resp, controller.get())); |
1273 | | |
1274 | 41 | bool is_stale = false, is_ts_found = false; |
1275 | 177 | for (int i = 0; i < resp.servers_size(); i++) { |
1276 | 136 | if (!resp.servers(i).has_instance_id()) { |
1277 | 0 | return STATUS_SUBSTITUTE( |
1278 | 0 | Uninitialized, |
1279 | 0 | "ListTabletServers RPC returned a TS with uninitialized instance id." |
1280 | 0 | ); |
1281 | 0 | } |
1282 | | |
1283 | 136 | if (!resp.servers(i).instance_id().has_permanent_uuid()) { |
1284 | 0 | return STATUS_SUBSTITUTE( |
1285 | 0 | Uninitialized, |
1286 | 0 | "ListTabletServers RPC returned a TS with uninitialized UUIDs." |
1287 | 0 | ); |
1288 | 0 | } |
1289 | | |
1290 | 136 | if (resp.servers(i).instance_id().permanent_uuid() == tablet_server(ts_idx)->uuid()) { |
1291 | 41 | is_ts_found = true; |
1292 | 41 | is_stale = !resp.servers(i).alive(); |
1293 | 41 | } |
1294 | 136 | } |
1295 | | |
1296 | 41 | if (!is_ts_found) { |
1297 | 0 | return STATUS_SUBSTITUTE( |
1298 | 0 | NotFound, |
1299 | 0 | "Given TS not found in ListTabletServers RPC." |
1300 | 0 | ); |
1301 | 0 | } |
1302 | 41 | return is_stale; |
1303 | 41 | } |
1304 | | |
1305 | 2 | CHECKED_STATUS ExternalMiniCluster::WaitForMasterToMarkTSAlive(int ts_idx, MonoDelta deadline) { |
1306 | 2 | RETURN_NOT_OK(WaitFor([&]() -> Result<bool> { |
1307 | 2 | return !VERIFY_RESULT(is_ts_stale(ts_idx)); |
1308 | 2 | }, deadline * kTimeMultiplier, "Is TS Alive", 1s)); |
1309 | | |
1310 | 2 | return Status::OK(); |
1311 | 2 | } |
1312 | | |
1313 | 4 | CHECKED_STATUS ExternalMiniCluster::WaitForMasterToMarkTSDead(int ts_idx, MonoDelta deadline) { |
1314 | 4 | RETURN_NOT_OK(WaitFor([&]() -> Result<bool> { |
1315 | 4 | return is_ts_stale(ts_idx); |
1316 | 4 | }, deadline * kTimeMultiplier, "Is TS dead", 1s)); |
1317 | | |
1318 | 4 | return Status::OK(); |
1319 | 4 | } |
1320 | | |
1321 | 678 | string ExternalMiniCluster::GetBindIpForTabletServer(size_t index) const { |
1322 | 678 | if (opts_.use_even_ips) { |
1323 | 0 | return Substitute("127.0.0.$0", (index + 1) * 2); |
1324 | 678 | } else if (opts_.bind_to_unique_loopback_addresses) { |
1325 | 159 | #if defined(__APPLE__) |
1326 | 159 | return Substitute("127.0.0.$0", index + 1); // Use default 127.0.0.x IPs. |
1327 | | #else |
1328 | | const pid_t p = getpid(); |
1329 | | return Substitute("127.$0.$1.$2", (p >> 8) & 0xff, p & 0xff, index); |
1330 | | #endif |
1331 | 519 | } else { |
1332 | 519 | return "127.0.0.1"; |
1333 | 519 | } |
1334 | 678 | } |
1335 | | |
1336 | | Status ExternalMiniCluster::AddTabletServer( |
1337 | 679 | bool start_cql_proxy, const std::vector<std::string>& extra_flags, int num_drives) { |
1338 | 0 | CHECK(GetLeaderMaster() != nullptr) |
1339 | 0 | << "Must have started at least 1 master before adding tablet servers"; |
1340 | | |
1341 | 679 | size_t idx = tablet_servers_.size(); |
1342 | | |
1343 | 679 | string exe = GetBinaryPath(kTabletServerBinaryName); |
1344 | 679 | vector<HostPort> master_hostports; |
1345 | 1.52k | for (size_t i = 0; i < num_masters(); i++) { |
1346 | 845 | master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport()); |
1347 | 845 | } |
1348 | | |
1349 | 679 | uint16_t ts_rpc_port = 0; |
1350 | 679 | uint16_t ts_http_port = 0; |
1351 | 679 | uint16_t redis_rpc_port = 0; |
1352 | 679 | uint16_t redis_http_port = 0; |
1353 | 679 | uint16_t cql_rpc_port = 0; |
1354 | 679 | uint16_t cql_http_port = 0; |
1355 | 679 | uint16_t pgsql_rpc_port = 0; |
1356 | 679 | uint16_t pgsql_http_port = 0; |
1357 | | |
1358 | 679 | if (idx > 0 && opts_.use_same_ts_ports && opts_.bind_to_unique_loopback_addresses) { |
1359 | 106 | const scoped_refptr<ExternalTabletServer>& first_ts = tablet_servers_[0]; |
1360 | 106 | ts_rpc_port = first_ts->rpc_port(); |
1361 | 106 | ts_http_port = first_ts->http_port(); |
1362 | 106 | redis_rpc_port = first_ts->redis_rpc_port(); |
1363 | 106 | redis_http_port = first_ts->redis_http_port(); |
1364 | 106 | cql_rpc_port = first_ts->cql_rpc_port(); |
1365 | 106 | cql_http_port = first_ts->cql_http_port(); |
1366 | 106 | pgsql_rpc_port = first_ts->pgsql_rpc_port(); |
1367 | 106 | pgsql_http_port = first_ts->pgsql_http_port(); |
1368 | 573 | } else { |
1369 | 573 | ts_rpc_port = AllocateFreePort(); |
1370 | 573 | ts_http_port = AllocateFreePort(); |
1371 | 573 | redis_rpc_port = AllocateFreePort(); |
1372 | 573 | redis_http_port = AllocateFreePort(); |
1373 | 573 | cql_rpc_port = AllocateFreePort(); |
1374 | 573 | cql_http_port = AllocateFreePort(); |
1375 | 573 | pgsql_rpc_port = AllocateFreePort(); |
1376 | 573 | pgsql_http_port = AllocateFreePort(); |
1377 | 573 | } |
1378 | | |
1379 | 679 | vector<string> flags = opts_.extra_tserver_flags; |
1380 | 679 | if (opts_.enable_ysql) { |
1381 | 18 | flags.push_back("--enable_ysql=true"); |
1382 | 661 | } else { |
1383 | 661 | flags.push_back("--enable_ysql=false"); |
1384 | 661 | } |
1385 | 679 | flags.insert(flags.end(), extra_flags.begin(), extra_flags.end()); |
1386 | | |
1387 | 679 | if (num_drives < 0) { |
1388 | 675 | num_drives = opts_.num_drives; |
1389 | 675 | } |
1390 | | |
1391 | 679 | scoped_refptr<ExternalTabletServer> ts = new ExternalTabletServer( |
1392 | 679 | idx, messenger_, proxy_cache_.get(), exe, GetDataPath(Substitute("ts-$0", idx + 1)), |
1393 | 679 | num_drives, GetBindIpForTabletServer(idx), ts_rpc_port, ts_http_port, redis_rpc_port, |
1394 | 679 | redis_http_port, cql_rpc_port, cql_http_port, pgsql_rpc_port, pgsql_http_port, |
1395 | 679 | master_hostports, SubstituteInFlags(flags, idx)); |
1396 | 679 | RETURN_NOT_OK(ts->Start(start_cql_proxy)); |
1397 | 678 | tablet_servers_.push_back(ts); |
1398 | 678 | return Status::OK(); |
1399 | 679 | } |
1400 | | |
1401 | 288 | Status ExternalMiniCluster::WaitForTabletServerCount(size_t count, const MonoDelta& timeout) { |
1402 | 288 | MonoTime deadline = MonoTime::Now(); |
1403 | 288 | deadline.AddDelta(timeout); |
1404 | | |
1405 | 288 | std::vector<scoped_refptr<ExternalTabletServer>> last_unmatched = tablet_servers_; |
1406 | 288 | bool had_leader = false; |
1407 | | |
1408 | 40.1k | while (true) { |
1409 | 40.1k | MonoDelta remaining = deadline - MonoTime::Now(); |
1410 | 40.1k | if (remaining.ToSeconds() < 0) { |
1411 | 1 | std::vector<std::string> unmatched_uuids; |
1412 | 1 | unmatched_uuids.reserve(last_unmatched.size()); |
1413 | 1 | for (const auto& server : last_unmatched) { |
1414 | 1 | unmatched_uuids.push_back(server->instance_id().permanent_uuid()); |
1415 | 1 | } |
1416 | 1 | if (!had_leader) { |
1417 | 1 | return STATUS(TimedOut, "Does not have active master leader to check tserver registration"); |
1418 | 1 | } |
1419 | 0 | return STATUS_FORMAT(TimedOut, "$0 TS(s) never registered with master (not registered $1)", |
1420 | 0 | count, unmatched_uuids); |
1421 | 0 | } |
1422 | | |
1423 | | // We should give some time for RPC to proceed, otherwise all requests would fail. |
1424 | 40.1k | remaining = std::max<MonoDelta>(remaining, 250ms); |
1425 | | |
1426 | 40.1k | last_unmatched = tablet_servers_; |
1427 | 40.1k | had_leader = false; |
1428 | 80.4k | for (size_t i = 0; i < masters_.size(); i++) { |
1429 | 40.6k | master::ListTabletServersRequestPB req; |
1430 | 40.6k | master::ListTabletServersResponsePB resp; |
1431 | 40.6k | rpc::RpcController rpc; |
1432 | 40.6k | rpc.set_timeout(remaining); |
1433 | 40.6k | auto status = GetMasterProxy<master::MasterClusterProxy>(i).ListTabletServers( |
1434 | 40.6k | req, &resp, &rpc); |
1435 | 26.2k | LOG_IF(WARNING, !status.ok()) << "ListTabletServers failed: " << status; |
1436 | 40.6k | if (!status.ok() || resp.has_error()) { |
1437 | 38.0k | continue; |
1438 | 38.0k | } |
1439 | 2.55k | had_leader = true; |
1440 | | // ListTabletServers() may return servers that are no longer online. |
1441 | | // Do a second step of verification to verify that the descs that we got |
1442 | | // are aligned (same uuid/seqno) with the TSs that we have in the cluster. |
1443 | 2.55k | size_t match_count = 0; |
1444 | 3.52k | for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) { |
1445 | 10.6k | for (auto it = last_unmatched.begin(); it != last_unmatched.end(); ++it) { |
1446 | 10.6k | if ((**it).instance_id().permanent_uuid() == e.instance_id().permanent_uuid() && |
1447 | 3.52k | (**it).instance_id().instance_seqno() == e.instance_id().instance_seqno()) { |
1448 | 3.52k | match_count++; |
1449 | 3.52k | last_unmatched.erase(it); |
1450 | 3.52k | break; |
1451 | 3.52k | } |
1452 | 10.6k | } |
1453 | 3.52k | } |
1454 | 2.55k | if (match_count >= count) { |
1455 | 287 | LOG(INFO) << count << " TS(s) registered with Master"; |
1456 | 287 | return Status::OK(); |
1457 | 287 | } |
1458 | 2.55k | } |
1459 | 39.8k | SleepFor(MonoDelta::FromMilliseconds(1)); |
1460 | 39.8k | } |
1461 | 288 | } |
1462 | | |
1463 | 89 | void ExternalMiniCluster::AssertNoCrashes() { |
1464 | 89 | vector<ExternalDaemon*> daemons = this->daemons(); |
1465 | 329 | for (ExternalDaemon* d : daemons) { |
1466 | 329 | if (d->IsShutdown()) continue; |
1467 | 658 | EXPECT_TRUE(d->IsProcessAlive()) << "At least one process crashed. viz: " |
1468 | 658 | << d->id(); |
1469 | 329 | } |
1470 | 89 | } |
1471 | | |
1472 | | Result<std::vector<ListTabletsForTabletServerResponsePB::Entry>> ExternalMiniCluster::GetTablets( |
1473 | 892 | ExternalTabletServer* ts) { |
1474 | 892 | TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr()); |
1475 | 892 | ListTabletsForTabletServerRequestPB req; |
1476 | 892 | ListTabletsForTabletServerResponsePB resp; |
1477 | | |
1478 | 892 | rpc::RpcController rpc; |
1479 | 892 | rpc.set_timeout(10s * kTimeMultiplier); |
1480 | 892 | RETURN_NOT_OK(proxy.ListTabletsForTabletServer(req, &resp, &rpc)); |
1481 | | |
1482 | 892 | std::vector<ListTabletsForTabletServerResponsePB::Entry> result; |
1483 | 3.77k | for (const ListTabletsForTabletServerResponsePB::Entry& entry : resp.entries()) { |
1484 | 3.77k | result.push_back(entry); |
1485 | 3.77k | } |
1486 | | |
1487 | 892 | return result; |
1488 | 892 | } |
1489 | | |
1490 | | Result<tserver::GetSplitKeyResponsePB> ExternalMiniCluster::GetSplitKey( |
1491 | 1 | const std::string& tablet_id) { |
1492 | 3 | for (size_t i = 0; i < this->num_tablet_servers(); i++) { |
1493 | 3 | auto tserver = this->tablet_server(i); |
1494 | 3 | auto ts_service_proxy = std::make_unique<tserver::TabletServerServiceProxy>( |
1495 | 3 | proxy_cache_.get(), tserver->bound_rpc_addr()); |
1496 | 3 | tserver::GetSplitKeyRequestPB req; |
1497 | 3 | req.set_tablet_id(tablet_id); |
1498 | 3 | rpc::RpcController controller; |
1499 | 3 | controller.set_timeout(10s * kTimeMultiplier); |
1500 | 3 | tserver::GetSplitKeyResponsePB resp; |
1501 | 3 | RETURN_NOT_OK(ts_service_proxy->GetSplitKey(req, &resp, &controller)); |
1502 | 3 | if (!resp.has_error()) { |
1503 | 1 | return resp; |
1504 | 1 | } |
1505 | 3 | } |
1506 | 0 | return STATUS(IllegalState, "GetSplitKey failed on all TServers"); |
1507 | 1 | } |
1508 | | |
1509 | | Status ExternalMiniCluster::FlushTabletsOnSingleTServer( |
1510 | | ExternalTabletServer* ts, const std::vector<yb::TabletId> tablet_ids, |
1511 | 5 | bool is_compaction) { |
1512 | 5 | tserver::FlushTabletsRequestPB req; |
1513 | 5 | tserver::FlushTabletsResponsePB resp; |
1514 | 5 | rpc::RpcController controller; |
1515 | 5 | controller.set_timeout(10s * kTimeMultiplier); |
1516 | | |
1517 | 5 | req.set_dest_uuid(ts->uuid()); |
1518 | 0 | req.set_operation(is_compaction ? tserver::FlushTabletsRequestPB::COMPACT |
1519 | 5 | : tserver::FlushTabletsRequestPB::FLUSH); |
1520 | 5 | for (const auto& tablet_id : tablet_ids) { |
1521 | 5 | req.add_tablet_ids(tablet_id); |
1522 | 5 | } |
1523 | | |
1524 | 5 | auto ts_admin_service_proxy = std::make_unique<tserver::TabletServerAdminServiceProxy>( |
1525 | 5 | proxy_cache_.get(), ts->bound_rpc_addr()); |
1526 | 5 | return ts_admin_service_proxy->FlushTablets(req, &resp, &controller); |
1527 | 5 | } |
1528 | | |
1529 | 3 | Result<tserver::ListTabletsResponsePB> ExternalMiniCluster::ListTablets(ExternalTabletServer* ts) { |
1530 | 3 | rpc::RpcController rpc; |
1531 | 3 | ListTabletsRequestPB req; |
1532 | 3 | ListTabletsResponsePB resp; |
1533 | 3 | rpc.set_timeout(opts_.timeout); |
1534 | 3 | TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr()); |
1535 | 3 | RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc)); |
1536 | 3 | return resp; |
1537 | 3 | } |
1538 | | |
1539 | 3 | Result<std::vector<std::string>> ExternalMiniCluster::GetTabletIds(ExternalTabletServer* ts) { |
1540 | 3 | auto tablets = VERIFY_RESULT(GetTablets(ts)); |
1541 | 3 | std::vector<std::string> result; |
1542 | 3 | for (const auto& tablet : tablets) { |
1543 | 3 | result.push_back(tablet.tablet_id()); |
1544 | 3 | } |
1545 | 3 | return result; |
1546 | 3 | } |
1547 | | |
1548 | | Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts, |
1549 | 25 | const MonoDelta& timeout) { |
1550 | 25 | TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr()); |
1551 | 25 | ListTabletsRequestPB req; |
1552 | 25 | ListTabletsResponsePB resp; |
1553 | | |
1554 | 25 | MonoTime deadline = MonoTime::Now(); |
1555 | 25 | deadline.AddDelta(timeout); |
1556 | 25 | while (MonoTime::Now().ComesBefore(deadline)) { |
1557 | 25 | rpc::RpcController rpc; |
1558 | 25 | rpc.set_timeout(MonoDelta::FromSeconds(10)); |
1559 | 25 | RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc)); |
1560 | 25 | if (resp.has_error()) { |
1561 | 0 | return StatusFromPB(resp.error().status()); |
1562 | 0 | } |
1563 | | |
1564 | 25 | int num_not_running = 0; |
1565 | 74 | for (const StatusAndSchemaPB& status : resp.status_and_schema()) { |
1566 | 74 | if (status.tablet_status().state() != tablet::RUNNING) { |
1567 | 0 | num_not_running++; |
1568 | 0 | } |
1569 | 74 | } |
1570 | | |
1571 | 25 | if (num_not_running == 0) { |
1572 | 25 | return Status::OK(); |
1573 | 25 | } |
1574 | | |
1575 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1576 | 0 | } |
1577 | | |
1578 | 0 | return STATUS(TimedOut, resp.DebugString()); |
1579 | 25 | } |
1580 | | |
1581 | 0 | Status ExternalMiniCluster::WaitForTSToCrash(size_t index, const MonoDelta& timeout) { |
1582 | 0 | ExternalTabletServer* ts = tablet_server(index); |
1583 | 0 | return WaitForTSToCrash(ts, timeout); |
1584 | 0 | } |
1585 | | |
1586 | | Status ExternalMiniCluster::WaitForTSToCrash(const ExternalTabletServer* ts, |
1587 | 7 | const MonoDelta& timeout) { |
1588 | 7 | MonoTime deadline = MonoTime::Now(); |
1589 | 7 | deadline.AddDelta(timeout); |
1590 | 33 | while (MonoTime::Now().ComesBefore(deadline)) { |
1591 | 33 | if (!ts->IsProcessAlive()) { |
1592 | 7 | return Status::OK(); |
1593 | 7 | } |
1594 | 26 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1595 | 26 | } |
1596 | 0 | return STATUS(TimedOut, Substitute("TS $0 did not crash!", ts->instance_id().permanent_uuid())); |
1597 | 7 | } |
1598 | | |
1599 | | namespace { |
1600 | | void LeaderMasterCallback(HostPort* dst_hostport, |
1601 | | Synchronizer* sync, |
1602 | | const Status& status, |
1603 | 2.28k | const HostPort& result) { |
1604 | 2.28k | if (status.ok()) { |
1605 | 2.28k | *dst_hostport = result; |
1606 | 2.28k | } |
1607 | 2.28k | sync->StatusCB(status); |
1608 | 2.28k | } |
1609 | | } // anonymous namespace |
1610 | | |
1611 | 2 | Result<size_t> ExternalMiniCluster::GetFirstNonLeaderMasterIndex() { |
1612 | 2 | return GetPeerMasterIndex(false); |
1613 | 2 | } |
1614 | | |
1615 | 2.38k | Result<size_t> ExternalMiniCluster::GetLeaderMasterIndex() { |
1616 | 2.38k | return GetPeerMasterIndex(true); |
1617 | 2.38k | } |
1618 | | |
1619 | 2.38k | Result<size_t> ExternalMiniCluster::GetPeerMasterIndex(bool is_leader) { |
1620 | 2.38k | Synchronizer sync; |
1621 | 2.38k | server::MasterAddresses addrs; |
1622 | 2.38k | HostPort leader_master_hp; |
1623 | 2.38k | auto deadline = CoarseMonoClock::Now() + 5s; |
1624 | | |
1625 | 2.85k | for (const scoped_refptr<ExternalMaster>& master : masters_) { |
1626 | 2.85k | if (master->IsProcessAlive()) { |
1627 | 2.74k | addrs.push_back({ master->bound_rpc_addr() }); |
1628 | 2.74k | } |
1629 | 2.85k | } |
1630 | 2.38k | if (addrs.empty()) { |
1631 | 99 | return STATUS(IllegalState, "No running masters"); |
1632 | 99 | } |
1633 | 2.28k | rpc::Rpcs rpcs; |
1634 | 2.28k | auto rpc = std::make_shared<GetLeaderMasterRpc>( |
1635 | 2.28k | Bind(&LeaderMasterCallback, &leader_master_hp, &sync), |
1636 | 2.28k | addrs, |
1637 | 2.28k | deadline, |
1638 | 2.28k | messenger_, |
1639 | 2.28k | proxy_cache_.get(), |
1640 | 2.28k | &rpcs); |
1641 | 2.28k | rpc->SendRpc(); |
1642 | 2.28k | RETURN_NOT_OK(sync.Wait()); |
1643 | 2.28k | rpcs.Shutdown(); |
1644 | | |
1645 | 2.28k | const char* peer_type = is_leader ? "leader" : "non-leader"; |
1646 | 2.54k | for (size_t i = 0; i < masters_.size(); i++) { |
1647 | 2.54k | bool matches_leader = masters_[i]->bound_rpc_hostport().port() == leader_master_hp.port(); |
1648 | 2.54k | if (is_leader == matches_leader) { |
1649 | 2.28k | LOG(INFO) << "Found peer " << peer_type << " at index " << i << "."; |
1650 | 2.28k | return i; |
1651 | 2.28k | } |
1652 | 2.54k | } |
1653 | | |
1654 | | // There is never a situation where this should happen, so it's |
1655 | | // better to exit with a FATAL log message right away vs. return a |
1656 | | // Status::IllegalState(). |
1657 | 0 | auto status = STATUS_FORMAT(NotFound, "Peer $0 master is not in masters_ list", peer_type); |
1658 | 0 | LOG(FATAL) << status; |
1659 | 0 | return status; |
1660 | 2.28k | } |
1661 | | |
1662 | 2.25k | ExternalMaster* ExternalMiniCluster::GetLeaderMaster() { |
1663 | 2.25k | int num_attempts = 0; |
1664 | | // Retry to get the leader master's index - due to timing issues (like election in progress). |
1665 | 2.35k | for (;;) { |
1666 | 2.35k | ++num_attempts; |
1667 | 2.35k | auto idx = GetLeaderMasterIndex(); |
1668 | 2.35k | if (idx.ok()) { |
1669 | 2.25k | return master(*idx); |
1670 | 2.25k | } |
1671 | 104 | LOG(INFO) << "GetLeaderMasterIndex@" << num_attempts << " hit error: " << idx.status(); |
1672 | 104 | if (num_attempts >= kMaxRetryIterations) { |
1673 | 1 | LOG(WARNING) << "Failed to get leader master after " << num_attempts << " attempts, " |
1674 | 1 | << "returning the first master."; |
1675 | 1 | break; |
1676 | 1 | } |
1677 | 103 | SleepFor(MonoDelta::FromMilliseconds(num_attempts * 10)); |
1678 | 103 | } |
1679 | | |
1680 | 1 | return master(0); |
1681 | 2.25k | } |
1682 | | |
1683 | 4 | Result<size_t> ExternalMiniCluster::GetTabletLeaderIndex(const std::string& tablet_id) { |
1684 | 8 | for (size_t i = 0; i < num_tablet_servers(); ++i) { |
1685 | 8 | auto tserver = tablet_server(i); |
1686 | 8 | if (tserver->IsProcessAlive()) { |
1687 | 7 | auto tablets = VERIFY_RESULT(GetTablets(tserver)); |
1688 | 24 | for (const auto& tablet : tablets) { |
1689 | 24 | if (tablet.tablet_id() == tablet_id && tablet.is_leader()) { |
1690 | 4 | return i; |
1691 | 4 | } |
1692 | 24 | } |
1693 | 7 | } |
1694 | 8 | } |
1695 | 0 | return STATUS( |
1696 | 4 | NotFound, Format("Could not find leader of tablet $0 among live tservers.", tablet_id)); |
1697 | 4 | } |
1698 | | |
1699 | 72 | ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const { |
1700 | 156 | for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
1701 | 156 | if (ts->instance_id().permanent_uuid() == uuid) { |
1702 | 72 | return ts.get(); |
1703 | 72 | } |
1704 | 156 | } |
1705 | 0 | return nullptr; |
1706 | 72 | } |
1707 | | |
1708 | 6 | int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const { |
1709 | 14 | for (size_t i = 0; i < tablet_servers_.size(); i++) { |
1710 | 14 | if (tablet_servers_[i]->uuid() == uuid) { |
1711 | 6 | return narrow_cast<int>(i); |
1712 | 6 | } |
1713 | 14 | } |
1714 | 0 | return -1; |
1715 | 6 | } |
1716 | | |
1717 | 71 | vector<ExternalMaster*> ExternalMiniCluster::master_daemons() const { |
1718 | 71 | vector<ExternalMaster*> results; |
1719 | 75 | for (const scoped_refptr<ExternalMaster>& master : masters_) { |
1720 | 75 | results.push_back(master.get()); |
1721 | 75 | } |
1722 | 71 | return results; |
1723 | 71 | } |
1724 | | |
1725 | 89 | vector<ExternalDaemon*> ExternalMiniCluster::daemons() const { |
1726 | 89 | vector<ExternalDaemon*> results; |
1727 | 237 | for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
1728 | 237 | results.push_back(ts.get()); |
1729 | 237 | } |
1730 | 92 | for (const scoped_refptr<ExternalMaster>& master : masters_) { |
1731 | 92 | results.push_back(master.get()); |
1732 | 92 | } |
1733 | 89 | return results; |
1734 | 89 | } |
1735 | | |
1736 | 29 | std::vector<ExternalTabletServer*> ExternalMiniCluster::tserver_daemons() const { |
1737 | 29 | std::vector<ExternalTabletServer*> result; |
1738 | 29 | result.reserve(tablet_servers_.size()); |
1739 | 88 | for (const auto& ts : tablet_servers_) { |
1740 | 88 | result.push_back(ts.get()); |
1741 | 88 | } |
1742 | 29 | return result; |
1743 | 29 | } |
1744 | | |
1745 | 0 | HostPort ExternalMiniCluster::pgsql_hostport(int node_index) const { |
1746 | 0 | return HostPort(tablet_servers_[node_index]->bind_host(), |
1747 | 0 | tablet_servers_[node_index]->pgsql_rpc_port()); |
1748 | 0 | } |
1749 | | |
1750 | 395k | rpc::Messenger* ExternalMiniCluster::messenger() { |
1751 | 395k | return messenger_; |
1752 | 395k | } |
1753 | | |
1754 | | std::shared_ptr<server::GenericServiceProxy> ExternalMiniCluster::master_generic_proxy( |
1755 | 1 | int idx) const { |
1756 | 1 | CHECK_GE(idx, 0); |
1757 | 1 | CHECK_LT(idx, masters_.size()); |
1758 | 1 | return std::make_shared<server::GenericServiceProxy>( |
1759 | 1 | proxy_cache_.get(), CHECK_NOTNULL(master(idx))->bound_rpc_addr()); |
1760 | 1 | } |
1761 | | |
1762 | | std::shared_ptr<server::GenericServiceProxy> ExternalMiniCluster::master_generic_proxy( |
1763 | 0 | const HostPort& bound_rpc_addr) const { |
1764 | 0 | return std::make_shared<server::GenericServiceProxy>(proxy_cache_.get(), bound_rpc_addr); |
1765 | 0 | } |
1766 | | |
1767 | 223 | void ExternalMiniCluster::ConfigureClientBuilder(client::YBClientBuilder* builder) { |
1768 | 223 | CHECK_NOTNULL(builder); |
1769 | 223 | CHECK(!masters_.empty()); |
1770 | 223 | builder->clear_master_server_addrs(); |
1771 | 281 | for (const scoped_refptr<ExternalMaster>& master : masters_) { |
1772 | 281 | builder->add_master_server_addr(master->bound_rpc_hostport().ToString()); |
1773 | 281 | } |
1774 | 223 | } |
1775 | | |
1776 | 1.24k | Result<HostPort> ExternalMiniCluster::DoGetLeaderMasterBoundRpcAddr() { |
1777 | 1.24k | return GetLeaderMaster()->bound_rpc_addr(); |
1778 | 1.24k | } |
1779 | | |
1780 | | Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon, |
1781 | | const string& flag, |
1782 | 97 | const string& value) { |
1783 | 97 | server::GenericServiceProxy proxy(proxy_cache_.get(), daemon->bound_rpc_addr()); |
1784 | | |
1785 | 97 | rpc::RpcController controller; |
1786 | 97 | controller.set_timeout(MonoDelta::FromSeconds(30)); |
1787 | 97 | server::SetFlagRequestPB req; |
1788 | 97 | server::SetFlagResponsePB resp; |
1789 | 97 | req.set_flag(flag); |
1790 | 97 | req.set_value(value); |
1791 | 97 | req.set_force(true); |
1792 | 97 | RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller), |
1793 | 96 | "rpc failed"); |
1794 | 96 | if (resp.result() != server::SetFlagResponsePB::SUCCESS) { |
1795 | 0 | return STATUS(RemoteError, "failed to set flag", |
1796 | 0 | resp.ShortDebugString()); |
1797 | 0 | } |
1798 | 96 | return Status::OK(); |
1799 | 96 | } |
1800 | | |
1801 | 8 | Status ExternalMiniCluster::SetFlagOnMasters(const string& flag, const string& value) { |
1802 | 8 | for (const auto& master : masters_) { |
1803 | 8 | RETURN_NOT_OK(SetFlag(master.get(), flag, value)); |
1804 | 8 | } |
1805 | 8 | return Status::OK(); |
1806 | 8 | } |
1807 | | |
1808 | 3 | Status ExternalMiniCluster::SetFlagOnTServers(const string& flag, const string& value) { |
1809 | 11 | for (const auto& tablet_server : tablet_servers_) { |
1810 | 11 | RETURN_NOT_OK(SetFlag(tablet_server.get(), flag, value)); |
1811 | 11 | } |
1812 | 2 | return Status::OK(); |
1813 | 3 | } |
1814 | | |
1815 | | |
1816 | 5.31k | uint16_t ExternalMiniCluster::AllocateFreePort() { |
1817 | | // This will take a file lock ensuring the port does not get claimed by another thread/process |
1818 | | // and add it to our vector of such locks that will be freed on minicluster shutdown. |
1819 | 5.31k | free_port_file_locks_.emplace_back(); |
1820 | 5.31k | return GetFreePort(&free_port_file_locks_.back()); |
1821 | 5.31k | } |
1822 | | |
1823 | 0 | Status ExternalMiniCluster::StartElection(ExternalMaster* master) { |
1824 | 0 | auto master_sock = master->bound_rpc_addr(); |
1825 | 0 | auto master_proxy = std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), master_sock); |
1826 | |
|
1827 | 0 | RunLeaderElectionRequestPB req; |
1828 | 0 | req.set_dest_uuid(master->uuid()); |
1829 | 0 | req.set_tablet_id(yb::master::kSysCatalogTabletId); |
1830 | 0 | RunLeaderElectionResponsePB resp; |
1831 | 0 | RpcController rpc; |
1832 | 0 | rpc.set_timeout(opts_.timeout); |
1833 | 0 | RETURN_NOT_OK(master_proxy->RunLeaderElection(req, &resp, &rpc)); |
1834 | 0 | if (resp.has_error()) { |
1835 | 0 | return StatusFromPB(resp.error().status()) |
1836 | 0 | .CloneAndPrepend(Substitute("Code $0", |
1837 | 0 | TabletServerErrorPB::Code_Name(resp.error().code()))); |
1838 | 0 | } |
1839 | 0 | return Status::OK(); |
1840 | 0 | } |
1841 | | |
1842 | 35 | ExternalMaster* ExternalMiniCluster::master() const { |
1843 | 35 | if (masters_.empty()) |
1844 | 0 | return nullptr; |
1845 | | |
1846 | 0 | CHECK_EQ(masters_.size(), 1) |
1847 | 0 | << "master() should not be used with multiple masters, use GetLeaderMaster() instead."; |
1848 | 35 | return master(0); |
1849 | 35 | } |
1850 | | |
1851 | | // Return master at 'idx' or NULL if the master at 'idx' has not been started. |
1852 | 44.5k | ExternalMaster* ExternalMiniCluster::master(size_t idx) const { |
1853 | 44.5k | CHECK_LT(idx, masters_.size()); |
1854 | 44.5k | return masters_[idx].get(); |
1855 | 44.5k | } |
1856 | | |
1857 | 3.25k | ExternalTabletServer* ExternalMiniCluster::tablet_server(size_t idx) const { |
1858 | 3.25k | CHECK_LT(idx, tablet_servers_.size()); |
1859 | 3.25k | return tablet_servers_[idx].get(); |
1860 | 3.25k | } |
1861 | | |
1862 | | //------------------------------------------------------------ |
1863 | | // ExternalDaemon |
1864 | | //------------------------------------------------------------ |
1865 | | |
1866 | | namespace { |
1867 | | |
1868 | | // Global state to manage all log tailer threads. This state is managed using Singleton from gutil |
1869 | | // and is never deallocated. |
1870 | | struct GlobalLogTailerState { |
1871 | | mutex logging_mutex; |
1872 | | atomic<int> next_log_tailer_id{0}; |
1873 | | |
1874 | | // We need some references to these heap-allocated atomic booleans so that ASAN would not consider |
1875 | | // them memory leaks. |
1876 | | mutex id_to_stopped_flag_mutex; |
1877 | | map<int, atomic<bool>*> id_to_stopped_flag; |
1878 | | |
1879 | | // This is used to limit the total amount of logs produced by external daemons over the lifetime |
1880 | | // of a test program. Guarded by logging_mutex. |
1881 | | size_t total_bytes_logged = 0; |
1882 | | }; |
1883 | | |
1884 | | } // anonymous namespace |
1885 | | |
1886 | | class ExternalDaemon::LogTailerThread { |
1887 | | public: |
1888 | | LogTailerThread(const string line_prefix, |
1889 | | const int child_fd, |
1890 | | ostream* const out) |
1891 | | : id_(global_state()->next_log_tailer_id.fetch_add(1)), |
1892 | | stopped_(CreateStoppedFlagForId(id_)), |
1893 | | thread_desc_(Substitute("log tailer thread for prefix $0", line_prefix)), |
1894 | 2.13k | thread_([=] { |
1895 | 18.4E | VLOG(1) << "Starting " << thread_desc_; |
1896 | 2.13k | FILE* const fp = fdopen(child_fd, "rb"); |
1897 | 2.13k | char buf[65536]; |
1898 | 2.13k | const atomic<bool>* stopped; |
1899 | | |
1900 | 2.13k | { |
1901 | 2.13k | lock_guard<mutex> l(state_lock_); |
1902 | 2.13k | stopped = stopped_; |
1903 | 2.13k | } |
1904 | | |
1905 | | // Instead of doing a nonblocking read, we detach this thread and allow it to block |
1906 | | // indefinitely trying to read from a child process's stream where nothing is happening. |
1907 | | // This is probably OK as long as we are careful to avoid accessing any state that might |
1908 | | // have been already destructed (e.g. logging, cout/cerr, member fields of this class, |
1909 | | // etc.) in case we do get unblocked. Instead, we keep a local pointer to the atomic |
1910 | | // "stopped" flag, and that allows us to safely check if it is OK to print log messages. |
1911 | | // The "stopped" flag itself is never deallocated. |
1912 | 2.13k | bool is_eof = false; |
1913 | 2.13k | bool is_fgets_null = false; |
1914 | 2.13k | auto& logging_mutex = global_state()->logging_mutex; |
1915 | 2.13k | auto& total_bytes_logged = global_state()->total_bytes_logged; |
1916 | 828k | while (!(is_eof = feof(fp)) && |
1917 | 828k | !(is_fgets_null = (fgets(buf, sizeof(buf), fp) == nullptr)) && |
1918 | 826k | !stopped->load()) { |
1919 | 826k | size_t l = strlen(buf); |
1920 | 18.4E | const char* maybe_end_of_line = l > 0 && buf[l - 1] == '\n' ? "" : "\n"; |
1921 | | // Synchronize tailing output from all external daemons for simplicity. |
1922 | 826k | lock_guard<mutex> lock(logging_mutex); |
1923 | 826k | if (stopped->load()) break; |
1924 | | // Make sure we always output an end-of-line character. |
1925 | 826k | *out << line_prefix << " " << buf << maybe_end_of_line; |
1926 | 826k | if (!stopped->load()) { |
1927 | 826k | auto listener = listener_.load(std::memory_order_acquire); |
1928 | 826k | if (!stopped->load() && listener) { |
1929 | 907 | listener->Handle(GStringPiece(buf, maybe_end_of_line ? l : l - 1)); |
1930 | 907 | } |
1931 | 826k | } |
1932 | 826k | total_bytes_logged += strlen(buf) + strlen(maybe_end_of_line); |
1933 | | // Abort the test if it produces too much log spew. |
1934 | 826k | CHECK_LE(total_bytes_logged, FLAGS_external_mini_cluster_max_log_bytes); |
1935 | 826k | } |
1936 | 2.13k | fclose(fp); |
1937 | 2.13k | if (!stopped->load()) { |
1938 | | // It might not be safe to log anything if we have already stopped. |
1939 | 4 | VLOG(1) << "Exiting " << thread_desc_ |
1940 | 4 | << ": is_eof=" << is_eof |
1941 | 4 | << ", is_fgets_null=" << is_fgets_null |
1942 | 4 | << ", stopped=0"; |
1943 | 1.57k | } |
1944 | 2.14k | }) { |
1945 | 2.14k | thread_.detach(); |
1946 | 2.14k | } |
1947 | | |
1948 | 8 | void SetListener(StringListener* listener) { |
1949 | 8 | listener_ = listener; |
1950 | 8 | } |
1951 | | |
1952 | 8 | void RemoveListener(StringListener* listener) { |
1953 | 8 | listener_.compare_exchange_strong(listener, nullptr); |
1954 | 8 | } |
1955 | | |
1956 | 1.59k | ~LogTailerThread() { |
1957 | 0 | VLOG(1) << "Stopping " << thread_desc_; |
1958 | 1.59k | lock_guard<mutex> l(state_lock_); |
1959 | 1.59k | stopped_->store(true); |
1960 | 1.59k | listener_ = nullptr; |
1961 | 1.59k | } |
1962 | | |
1963 | | private: |
1964 | 10.7k | static GlobalLogTailerState* global_state() { |
1965 | 10.7k | return Singleton<GlobalLogTailerState>::get(); |
1966 | 10.7k | } |
1967 | | |
1968 | 2.14k | static atomic<bool>* CreateStoppedFlagForId(int id) { |
1969 | 2.14k | lock_guard<mutex> lock(global_state()->id_to_stopped_flag_mutex); |
1970 | | // This is never deallocated, but we add this pointer to the id_to_stopped_flag map referenced |
1971 | | // from the global state singleton, and that apparently makes ASAN no longer consider this to be |
1972 | | // a memory leak. We don't need to check if the id already exists in the map, because this |
1973 | | // function is never invoked with a particular id more than once. |
1974 | 2.14k | auto* const stopped = new atomic<bool>(); |
1975 | 2.14k | stopped->store(false); |
1976 | 2.14k | global_state()->id_to_stopped_flag[id] = stopped; |
1977 | 2.14k | return stopped; |
1978 | 2.14k | } |
1979 | | |
1980 | | const int id_; |
1981 | | |
1982 | | // This lock protects the stopped_ pointer in case of a race between tailer thread's |
1983 | | // initialization (i.e. before it gets into its loop) and the destructor. |
1984 | | mutex state_lock_; |
1985 | | |
1986 | | atomic<bool>* const stopped_; |
1987 | | const string thread_desc_; // A human-readable description of this thread. |
1988 | | thread thread_; |
1989 | | std::atomic<StringListener*> listener_{nullptr}; |
1990 | | }; |
1991 | | |
1992 | | ExternalDaemon::ExternalDaemon( |
1993 | | std::string daemon_id, |
1994 | | rpc::Messenger* messenger, |
1995 | | rpc::ProxyCache* proxy_cache, |
1996 | | const string& exe, |
1997 | | const string& root_dir, |
1998 | | const std::vector<std::string>& data_dirs, |
1999 | | const vector<string>& extra_flags) |
2000 | | : daemon_id_(daemon_id), |
2001 | | messenger_(messenger), |
2002 | | proxy_cache_(proxy_cache), |
2003 | | exe_(exe), |
2004 | | root_dir_(root_dir), |
2005 | | data_dirs_(data_dirs), |
2006 | 997 | extra_flags_(extra_flags) {} |
2007 | | |
2008 | 718 | ExternalDaemon::~ExternalDaemon() { |
2009 | 718 | } |
2010 | | |
2011 | 149k | bool ExternalDaemon::ServerInfoPathsExist() { |
2012 | 149k | return Env::Default()->FileExists(GetServerInfoPath()); |
2013 | 149k | } |
2014 | | |
2015 | 1.07k | Status ExternalDaemon::BuildServerStateFromInfoPath() { |
2016 | 1.07k | return BuildServerStateFromInfoPath(GetServerInfoPath(), &status_); |
2017 | 1.07k | } |
2018 | | |
2019 | | Status ExternalDaemon::BuildServerStateFromInfoPath( |
2020 | 1.80k | const string& info_path, std::unique_ptr<ServerStatusPB>* server_status) { |
2021 | 1.80k | server_status->reset(new ServerStatusPB()); |
2022 | 1.80k | RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, (*server_status).get()), |
2023 | 1.80k | "Failed to read info file from " + info_path); |
2024 | 1.80k | return Status::OK(); |
2025 | 1.80k | } |
2026 | | |
2027 | 165k | string ExternalDaemon::GetServerInfoPath() { |
2028 | 165k | return JoinPathSegments(root_dir_, "info.pb"); |
2029 | 165k | } |
2030 | | |
2031 | 1.07k | Status ExternalDaemon::DeleteServerInfoPaths() { |
2032 | 1.07k | return Env::Default()->DeleteFile(GetServerInfoPath()); |
2033 | 1.07k | } |
2034 | | |
2035 | 1.07k | Status ExternalDaemon::StartProcess(const vector<string>& user_flags) { |
2036 | 1.07k | CHECK(!process_); |
2037 | | |
2038 | 1.07k | vector<string> argv; |
2039 | | // First the exe for argv[0] |
2040 | 1.07k | argv.push_back(BaseName(exe_)); |
2041 | | |
2042 | | // Then all the flags coming from the minicluster framework. |
2043 | 1.07k | argv.insert(argv.end(), user_flags.begin(), user_flags.end()); |
2044 | | |
2045 | | // Disable callhome. |
2046 | 1.07k | argv.push_back("--callhome_enabled=false"); |
2047 | | |
2048 | | // Disabled due to #4507. |
2049 | | // TODO: Enable metrics logging after #4507 is fixed. |
2050 | | // |
2051 | | // Even though we set -logtostderr down below, metrics logs end up being written |
2052 | | // based on -log_dir. So, we have to set that too. |
2053 | 1.07k | argv.push_back("--metrics_log_interval_ms=0"); |
2054 | | |
2055 | | // Force set log_dir to empty value, process will chose default destination inside fs_data_dir |
2056 | | // In other case log_dir value will be extracted from TEST_TMPDIR env variable but it is |
2057 | | // inherited from test script |
2058 | 1.07k | argv.push_back("--log_dir="); |
2059 | | |
2060 | | // Tell the server to dump its port information so we can pick it up. |
2061 | 1.07k | const string info_path = GetServerInfoPath(); |
2062 | 1.07k | argv.push_back("--server_dump_info_path=" + info_path); |
2063 | 1.07k | argv.push_back("--server_dump_info_format=pb"); |
2064 | | |
2065 | | // We use ephemeral ports in many tests. They don't work for production, but are OK |
2066 | | // in unit tests. |
2067 | 1.07k | argv.push_back("--rpc_server_allow_ephemeral_ports"); |
2068 | | |
2069 | | // A previous instance of the daemon may have run in the same directory. So, remove |
2070 | | // the previous info file if it's there. |
2071 | 1.07k | Status s = DeleteServerInfoPaths(); |
2072 | 1.07k | if (!s.ok() && !s.IsNotFound()) { |
2073 | 0 | LOG (WARNING) << "Failed to delete info paths: " << s.ToString(); |
2074 | 0 | } |
2075 | | |
2076 | | // Ensure that logging goes to the test output doesn't get buffered. |
2077 | 1.07k | argv.push_back("--logbuflevel=-1"); |
2078 | | |
2079 | | // Use the same verbose logging level in the child process as in the test driver. |
2080 | 1.07k | if (FLAGS_v != 0) { // Skip this option if it has its default value (0). |
2081 | 0 | argv.push_back(Substitute("-v=$0", FLAGS_v)); |
2082 | 0 | } |
2083 | 1.07k | if (!FLAGS_vmodule.empty()) { |
2084 | 0 | argv.push_back(Substitute("--vmodule=$0", FLAGS_vmodule)); |
2085 | 0 | } |
2086 | 1.07k | if (FLAGS_mem_tracker_logging) { |
2087 | 0 | argv.push_back("--mem_tracker_logging"); |
2088 | 0 | } |
2089 | 1.07k | if (FLAGS_mem_tracker_log_stack_trace) { |
2090 | 0 | argv.push_back("--mem_tracker_log_stack_trace"); |
2091 | 0 | } |
2092 | 1.07k | if (FLAGS_use_libbacktrace) { |
2093 | 0 | argv.push_back("--use_libbacktrace"); |
2094 | 0 | } |
2095 | | |
2096 | 1.07k | const char* test_invocation_id = getenv("YB_TEST_INVOCATION_ID"); |
2097 | 1.07k | if (test_invocation_id) { |
2098 | | // We use --metric_node_name=... to include a unique "test invocation id" into the command |
2099 | | // line so we can kill any stray processes later. --metric_node_name is normally how we pass |
2100 | | // the Universe ID to the cluster. We could use any other flag that is present in yb-master |
2101 | | // and yb-tserver for this. |
2102 | 1.07k | argv.push_back(Format("--metric_node_name=$0", test_invocation_id)); |
2103 | 1.07k | } |
2104 | | |
2105 | 1.07k | string fatal_details_path_prefix = GetFatalDetailsPathPrefix(); |
2106 | 1.07k | argv.push_back(Format( |
2107 | 1.07k | "--fatal_details_path_prefix=$0.$1", GetFatalDetailsPathPrefix(), daemon_id_)); |
2108 | | |
2109 | 1.07k | argv.push_back(Format("--minicluster_daemon_id=$0", daemon_id_)); |
2110 | | |
2111 | | // Finally, extra flags to override. |
2112 | | // - extra_flags_ is taken from ExternalMiniCluster.opts_, which is often set by test subclasses' |
2113 | | // UpdateMiniClusterOptions. |
2114 | | // - extra daemon flags is supplied by the user, either through environment variable or |
2115 | | // yb_build.sh --extra_daemon_flags (or --extra_daemon_args), so it should take highest |
2116 | | // precedence. |
2117 | 1.07k | argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end()); |
2118 | 1.07k | AddExtraFlagsFromEnvVar("YB_EXTRA_DAEMON_FLAGS", &argv); |
2119 | | |
2120 | 1.07k | std::unique_ptr<Subprocess> p(new Subprocess(exe_, argv)); |
2121 | 1.07k | p->PipeParentStdout(); |
2122 | 1.07k | p->PipeParentStderr(); |
2123 | 1.07k | auto default_output_prefix = Substitute("[$0]", daemon_id_); |
2124 | 1.07k | LOG(INFO) << "Running " << default_output_prefix << ": " << exe_ << "\n" |
2125 | 1.07k | << JoinStrings(argv, "\n"); |
2126 | 1.07k | if (!FLAGS_external_daemon_heap_profile_prefix.empty()) { |
2127 | 0 | p->SetEnv("HEAPPROFILE", |
2128 | 0 | FLAGS_external_daemon_heap_profile_prefix + "_" + daemon_id_); |
2129 | 0 | p->SetEnv("HEAPPROFILESIGNAL", std::to_string(kHeapProfileSignal)); |
2130 | 0 | } |
2131 | | |
2132 | 1.07k | const char* llvm_profile_env_var_value = getenv("LLVM_PROFILE_FILE"); |
2133 | 1.07k | if (llvm_profile_env_var_value) { |
2134 | 1.07k | p->SetEnv("LLVM_PROFILE_FILE", Format("$0_$1", llvm_profile_env_var_value, daemon_id_)); |
2135 | 1.07k | } |
2136 | | |
2137 | 1.07k | RETURN_NOT_OK_PREPEND(p->Start(), |
2138 | 1.07k | Substitute("Failed to start subprocess $0", exe_)); |
2139 | | |
2140 | 1.07k | stdout_tailer_thread_ = std::make_unique<LogTailerThread>( |
2141 | 1.07k | Substitute("[$0 stdout]", daemon_id_), p->ReleaseChildStdoutFd(), &std::cout); |
2142 | | |
2143 | | // We will mostly see stderr output from the child process (because of --logtostderr), so we'll |
2144 | | // assume that by default in the output prefix. |
2145 | 1.07k | stderr_tailer_thread_ = std::make_unique<LogTailerThread>( |
2146 | 1.07k | default_output_prefix, p->ReleaseChildStderrFd(), &std::cerr); |
2147 | | |
2148 | | // The process is now starting -- wait for the bound port info to show up. |
2149 | 1.07k | Stopwatch sw; |
2150 | 1.07k | sw.start(); |
2151 | 1.07k | bool success = false; |
2152 | 149k | while (sw.elapsed().wall_seconds() < kProcessStartTimeoutSeconds) { |
2153 | 149k | if (ServerInfoPathsExist()) { |
2154 | 1.07k | success = true; |
2155 | 1.07k | break; |
2156 | 1.07k | } |
2157 | 148k | SleepFor(MonoDelta::FromMilliseconds(10)); |
2158 | 148k | int rc; |
2159 | 148k | Status s = p->WaitNoBlock(&rc); |
2160 | 148k | if (s.IsTimedOut()) { |
2161 | | // The process is still running. |
2162 | 148k | continue; |
2163 | 148k | } |
2164 | 1 | RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_)); |
2165 | 1 | return STATUS(RuntimeError, |
2166 | 1 | Substitute("Process exited with rc=$0", rc), |
2167 | 1 | exe_); |
2168 | 1 | } |
2169 | | |
2170 | 1.07k | if (!success) { |
2171 | 0 | WARN_NOT_OK(p->Kill(SIGKILL), "Killing process failed"); |
2172 | 0 | return STATUS(TimedOut, |
2173 | 0 | Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)", |
2174 | 0 | kProcessStartTimeoutSeconds, exe_, info_path)); |
2175 | 0 | } |
2176 | | |
2177 | 1.07k | RETURN_NOT_OK(BuildServerStateFromInfoPath()); |
2178 | 1.07k | LOG(INFO) << "Started " << default_output_prefix << " " << exe_ << " as pid " << p->pid(); |
2179 | 0 | VLOG(1) << exe_ << " instance information:\n" << status_->DebugString(); |
2180 | | |
2181 | 1.07k | process_.swap(p); |
2182 | 1.07k | return Status::OK(); |
2183 | 1.07k | } |
2184 | | |
2185 | 65 | Status ExternalDaemon::Pause() { |
2186 | 65 | if (!process_) return Status::OK(); |
2187 | 0 | VLOG(1) << "Pausing " << ProcessNameAndPidStr(); |
2188 | 65 | return process_->Kill(SIGSTOP); |
2189 | 65 | } |
2190 | | |
2191 | 68 | Status ExternalDaemon::Resume() { |
2192 | 68 | if (!process_) return Status::OK(); |
2193 | 0 | VLOG(1) << "Resuming " << ProcessNameAndPidStr(); |
2194 | 68 | return process_->Kill(SIGCONT); |
2195 | 68 | } |
2196 | | |
2197 | 0 | Status ExternalDaemon::Kill(int signal) { |
2198 | 0 | if (!process_) return Status::OK(); |
2199 | 0 | VLOG(1) << "Kill " << ProcessNameAndPidStr() << " with " << signal; |
2200 | 0 | return process_->Kill(signal); |
2201 | 0 | } |
2202 | | |
2203 | 6.64k | bool ExternalDaemon::IsShutdown() const { |
2204 | 6.64k | return process_.get() == nullptr; |
2205 | 6.64k | } |
2206 | | |
2207 | 5.23k | bool ExternalDaemon::IsProcessAlive() const { |
2208 | 5.23k | if (IsShutdown()) { |
2209 | 86 | return false; |
2210 | 86 | } |
2211 | | |
2212 | 5.14k | int rc = 0; |
2213 | 5.14k | Status s = process_->WaitNoBlock(&rc); |
2214 | | // If the non-blocking Wait "times out", that means the process |
2215 | | // is running. |
2216 | 5.14k | return s.IsTimedOut(); |
2217 | 5.14k | } |
2218 | | |
2219 | 44 | pid_t ExternalDaemon::pid() const { |
2220 | 44 | return process_->pid(); |
2221 | 44 | } |
2222 | | |
2223 | 1.61k | void ExternalDaemon::Shutdown() { |
2224 | 1.61k | if (!process_) return; |
2225 | | |
2226 | 802 | LOG_WITH_PREFIX(INFO) << "Starting Shutdown()"; |
2227 | | |
2228 | | // Before we kill the process, store the addresses. If we're told to start again we'll reuse |
2229 | | // these. |
2230 | 802 | bound_rpc_ = bound_rpc_hostport(); |
2231 | 802 | bound_http_ = bound_http_hostport(); |
2232 | | |
2233 | 802 | if (IsProcessAlive()) { |
2234 | | // In coverage builds, ask the process nicely to flush coverage info |
2235 | | // before we kill -9 it. Otherwise, we never get any coverage from |
2236 | | // external clusters. |
2237 | 789 | FlushCoverage(); |
2238 | | |
2239 | 789 | if (!FLAGS_external_daemon_heap_profile_prefix.empty()) { |
2240 | | // The child process has been configured using the HEAPPROFILESIGNAL environment variable to |
2241 | | // create a heap profile on receiving kHeapProfileSignal. |
2242 | 0 | static const int kWaitMs = 100; |
2243 | 0 | LOG(INFO) << "Sending signal " << kHeapProfileSignal << " to " << ProcessNameAndPidStr() |
2244 | 0 | << " to capture a heap profile. Waiting for " << kWaitMs << " ms afterwards."; |
2245 | 0 | WARN_NOT_OK(process_->Kill(kHeapProfileSignal), "Killing process failed"); |
2246 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(kWaitMs)); |
2247 | 0 | } |
2248 | | |
2249 | 789 | if (FLAGS_external_daemon_safe_shutdown) { |
2250 | | // We put 'SIGTERM' in quotes because an unquoted one would be treated as a test failure |
2251 | | // by our regular expressions in common-test-env.sh. |
2252 | 0 | LOG(INFO) << "Terminating " << ProcessNameAndPidStr() << " using 'SIGTERM' signal"; |
2253 | 0 | WARN_NOT_OK(process_->Kill(SIGTERM), "Killing process failed"); |
2254 | 0 | int total_delay_ms = 0; |
2255 | 0 | int current_delay_ms = 10; |
2256 | 0 | for (int i = 0; i < 10 && IsProcessAlive(); ++i) { |
2257 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(current_delay_ms)); |
2258 | 0 | total_delay_ms += current_delay_ms; |
2259 | 0 | current_delay_ms += 10; // will sleep for 10ms, then 20ms, etc. |
2260 | 0 | } |
2261 | |
|
2262 | 0 | if (IsProcessAlive()) { |
2263 | 0 | LOG(INFO) << "The process " << ProcessNameAndPidStr() << " is still running after " |
2264 | 0 | << total_delay_ms << " ms, will send SIGKILL"; |
2265 | 0 | } |
2266 | 0 | } |
2267 | | |
2268 | 789 | if (IsProcessAlive()) { |
2269 | 789 | LOG(INFO) << "Killing " << ProcessNameAndPidStr() << " with SIGKILL"; |
2270 | 789 | WARN_NOT_OK(process_->Kill(SIGKILL), "Killing process failed"); |
2271 | 789 | } |
2272 | 789 | } |
2273 | 802 | int ret = 0; |
2274 | 802 | WARN_NOT_OK(process_->Wait(&ret), "Waiting on " + exe_); |
2275 | 802 | process_.reset(); |
2276 | 802 | } |
2277 | | |
2278 | 789 | void ExternalDaemon::FlushCoverage() { |
2279 | 789 | #ifndef COVERAGE_BUILD_ |
2280 | 789 | return; |
2281 | | #else |
2282 | | LOG(FATAL) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid(); |
2283 | | server::GenericServiceProxy proxy(proxy_cache_, bound_rpc_addr()); |
2284 | | |
2285 | | server::FlushCoverageRequestPB req; |
2286 | | server::FlushCoverageResponsePB resp; |
2287 | | rpc::RpcController rpc; |
2288 | | |
2289 | | // Set a reasonably short timeout, since some of our tests kill servers which |
2290 | | // are kill -STOPed. |
2291 | | rpc.set_timeout(MonoDelta::FromMilliseconds(100)); |
2292 | | Status s = proxy.FlushCoverage(req, &resp, &rpc); |
2293 | | if (s.ok() && !resp.success()) { |
2294 | | s = STATUS(RemoteError, "Server does not appear to be running a coverage build"); |
2295 | | } |
2296 | | WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid())); |
2297 | | #endif |
2298 | 789 | } |
2299 | | |
2300 | 789 | std::string ExternalDaemon::ProcessNameAndPidStr() { |
2301 | 789 | return Substitute("$0 with pid $1", exe_, process_->pid()); |
2302 | 789 | } |
2303 | | |
2304 | 51.4k | HostPort ExternalDaemon::bound_rpc_hostport() const { |
2305 | 51.4k | CHECK(status_); |
2306 | 51.4k | CHECK_GE(status_->bound_rpc_addresses_size(), 1); |
2307 | 51.4k | return HostPortFromPB(status_->bound_rpc_addresses(0)); |
2308 | 51.4k | } |
2309 | | |
2310 | 46.8k | HostPort ExternalDaemon::bound_rpc_addr() const { |
2311 | 46.8k | return bound_rpc_hostport(); |
2312 | 46.8k | } |
2313 | | |
2314 | 971 | HostPort ExternalDaemon::bound_http_hostport() const { |
2315 | 971 | CHECK(status_); |
2316 | 971 | CHECK_GE(status_->bound_http_addresses_size(), 1); |
2317 | 971 | return HostPortFromPB(status_->bound_http_addresses(0)); |
2318 | 971 | } |
2319 | | |
2320 | 16.5k | const NodeInstancePB& ExternalDaemon::instance_id() const { |
2321 | 16.5k | CHECK(status_); |
2322 | 16.5k | return status_->node_instance(); |
2323 | 16.5k | } |
2324 | | |
2325 | 671 | const string& ExternalDaemon::uuid() const { |
2326 | 671 | CHECK(status_); |
2327 | 671 | return status_->node_instance().permanent_uuid(); |
2328 | 671 | } |
2329 | | |
2330 | | Result<int64_t> ExternalDaemon::GetInt64MetricFromHost(const HostPort& hostport, |
2331 | | const MetricEntityPrototype* entity_proto, |
2332 | | const char* entity_id, |
2333 | | const MetricPrototype* metric_proto, |
2334 | 170 | const char* value_field) { |
2335 | 170 | return GetInt64MetricFromHost(hostport, entity_proto->name(), entity_id, metric_proto->name(), |
2336 | 170 | value_field); |
2337 | 170 | } |
2338 | | |
2339 | | Result<int64_t> ExternalDaemon::GetInt64MetricFromHost(const HostPort& hostport, |
2340 | | const char* entity_proto_name, |
2341 | | const char* entity_id, |
2342 | | const char* metric_proto_name, |
2343 | 170 | const char* value_field) { |
2344 | | // Fetch metrics whose name matches the given prototype. |
2345 | 170 | string url = Substitute( |
2346 | 170 | "http://$0/jsonmetricz?metrics=$1", |
2347 | 170 | hostport.ToString(), |
2348 | 170 | metric_proto_name); |
2349 | 170 | EasyCurl curl; |
2350 | 170 | faststring dst; |
2351 | 170 | RETURN_NOT_OK(curl.FetchURL(url, &dst)); |
2352 | | |
2353 | | // Parse the results, beginning with the top-level entity array. |
2354 | 170 | JsonReader r(dst.ToString()); |
2355 | 170 | RETURN_NOT_OK(r.Init()); |
2356 | 170 | vector<const Value*> entities; |
2357 | 170 | RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities)); |
2358 | 170 | for (const Value* entity : entities) { |
2359 | | // Find the desired entity. |
2360 | 170 | string type; |
2361 | 170 | RETURN_NOT_OK(r.ExtractString(entity, "type", &type)); |
2362 | 170 | if (type != entity_proto_name) { |
2363 | 0 | continue; |
2364 | 0 | } |
2365 | 170 | if (entity_id) { |
2366 | 116 | string id; |
2367 | 116 | RETURN_NOT_OK(r.ExtractString(entity, "id", &id)); |
2368 | 116 | if (id != entity_id) { |
2369 | 0 | continue; |
2370 | 0 | } |
2371 | 170 | } |
2372 | | |
2373 | | // Find the desired metric within the entity. |
2374 | 170 | vector<const Value*> metrics; |
2375 | 170 | RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics)); |
2376 | 170 | for (const Value* metric : metrics) { |
2377 | 170 | string name; |
2378 | 170 | RETURN_NOT_OK(r.ExtractString(metric, "name", &name)); |
2379 | 170 | if (name != metric_proto_name) { |
2380 | 0 | continue; |
2381 | 0 | } |
2382 | 170 | int64_t value; |
2383 | 170 | RETURN_NOT_OK(r.ExtractInt64(metric, value_field, &value)); |
2384 | 170 | return value; |
2385 | 170 | } |
2386 | 170 | } |
2387 | 0 | string msg; |
2388 | 0 | if (entity_id) { |
2389 | 0 | msg = Substitute("Could not find metric $0.$1 for entity $2", |
2390 | 0 | entity_proto_name, metric_proto_name, |
2391 | 0 | entity_id); |
2392 | 0 | } else { |
2393 | 0 | msg = Substitute("Could not find metric $0.$1", |
2394 | 0 | entity_proto_name, metric_proto_name); |
2395 | 0 | } |
2396 | 0 | return STATUS(NotFound, msg); |
2397 | 170 | } |
2398 | | |
2399 | 878 | string ExternalDaemon::LogPrefix() { |
2400 | 878 | return Format("{ daemon_id: $0 bound_rpc: $1 } ", daemon_id_, bound_rpc_); |
2401 | 878 | } |
2402 | | |
2403 | 4 | void ExternalDaemon::SetLogListener(StringListener* listener) { |
2404 | 4 | stdout_tailer_thread_->SetListener(listener); |
2405 | 4 | stderr_tailer_thread_->SetListener(listener); |
2406 | 4 | } |
2407 | | |
2408 | 4 | void ExternalDaemon::RemoveLogListener(StringListener* listener) { |
2409 | 4 | stdout_tailer_thread_->RemoveListener(listener); |
2410 | 4 | stderr_tailer_thread_->RemoveListener(listener); |
2411 | 4 | } |
2412 | | |
2413 | 25 | Result<string> ExternalDaemon::GetFlag(const std::string& flag) { |
2414 | 25 | server::GenericServiceProxy proxy(proxy_cache_, bound_rpc_addr()); |
2415 | | |
2416 | 25 | rpc::RpcController controller; |
2417 | 25 | controller.set_timeout(MonoDelta::FromSeconds(30)); |
2418 | 25 | server::GetFlagRequestPB req; |
2419 | 25 | server::GetFlagResponsePB resp; |
2420 | 25 | req.set_flag(flag); |
2421 | 25 | RETURN_NOT_OK(proxy.GetFlag(req, &resp, &controller)); |
2422 | 25 | if (!resp.valid()) { |
2423 | 0 | return STATUS_FORMAT(RemoteError, "Failed to get gflag $0 value.", flag); |
2424 | 0 | } |
2425 | 25 | return resp.value(); |
2426 | 25 | } |
2427 | | |
2428 | | Result<int64_t> ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto, |
2429 | | const char* entity_id, |
2430 | | const MetricPrototype* metric_proto, |
2431 | 98 | const char* value_field) const { |
2432 | 98 | return GetInt64MetricFromHost( |
2433 | 98 | bound_http_hostport(), entity_proto, entity_id, metric_proto, value_field); |
2434 | 98 | } |
2435 | | |
2436 | | Result<int64_t> ExternalDaemon::GetInt64Metric(const char* entity_proto_name, |
2437 | | const char* entity_id, |
2438 | | const char* metric_proto_name, |
2439 | 0 | const char* value_field) const { |
2440 | 0 | return GetInt64MetricFromHost( |
2441 | 0 | bound_http_hostport(), entity_proto_name, entity_id, metric_proto_name, value_field); |
2442 | 0 | } |
2443 | | |
2444 | | LogWaiter::LogWaiter(ExternalDaemon* daemon, const std::string& string_to_wait) : |
2445 | 3 | daemon_(daemon), string_to_wait_(string_to_wait) { |
2446 | 3 | daemon_->SetLogListener(this); |
2447 | 3 | } |
2448 | | |
2449 | 73 | void LogWaiter::Handle(const GStringPiece& s) { |
2450 | 73 | if (s.contains(string_to_wait_)) { |
2451 | 3 | event_occurred_ = true; |
2452 | 3 | } |
2453 | 73 | } |
2454 | | |
2455 | 3 | Status LogWaiter::WaitFor(const MonoDelta timeout) { |
2456 | 3 | constexpr auto kInitialWaitPeriod = 100ms; |
2457 | 3 | return ::yb::WaitFor( |
2458 | 17 | [this]{ return event_occurred_.load(); }, timeout, |
2459 | 3 | Format("Waiting for log record '$0' on $1...", string_to_wait_, daemon_->id()), |
2460 | 3 | kInitialWaitPeriod); |
2461 | 3 | } |
2462 | | |
2463 | 3 | LogWaiter::~LogWaiter() { |
2464 | 3 | daemon_->RemoveLogListener(this); |
2465 | 3 | } |
2466 | | |
2467 | | //------------------------------------------------------------ |
2468 | | // ScopedResumeExternalDaemon |
2469 | | //------------------------------------------------------------ |
2470 | | |
2471 | | ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon) |
2472 | 18 | : daemon_(CHECK_NOTNULL(daemon)) { |
2473 | 18 | } |
2474 | | |
2475 | 18 | ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() { |
2476 | 18 | CHECK_OK(daemon_->Resume()); |
2477 | 18 | } |
2478 | | |
2479 | | //------------------------------------------------------------ |
2480 | | // ExternalMaster |
2481 | | //------------------------------------------------------------ |
2482 | | ExternalMaster::ExternalMaster( |
2483 | | size_t master_index, |
2484 | | rpc::Messenger* messenger, |
2485 | | rpc::ProxyCache* proxy_cache, |
2486 | | const string& exe, |
2487 | | const string& data_dir, |
2488 | | const std::vector<string>& extra_flags, |
2489 | | const string& rpc_bind_address, |
2490 | | uint16_t http_port, |
2491 | | const string& master_addrs) |
2492 | | : ExternalDaemon(Substitute("m-$0", master_index + 1), messenger, |
2493 | | proxy_cache, exe, data_dir, |
2494 | | {GetServerTypeDataPath(data_dir, "master")}, extra_flags), |
2495 | | rpc_bind_address_(rpc_bind_address), |
2496 | | master_addrs_(master_addrs), |
2497 | 322 | http_port_(http_port) { |
2498 | 322 | } |
2499 | | |
2500 | 252 | ExternalMaster::~ExternalMaster() { |
2501 | 252 | } |
2502 | | |
2503 | | namespace { |
2504 | | |
2505 | | class Flags { |
2506 | | public: |
2507 | | template <class Value> |
2508 | 12.3k | void Add(const std::string& name, const Value& value) { |
2509 | 12.3k | value_.push_back(Format("--$0=$1", name, value)); |
2510 | 12.3k | } external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddINS_8HostPortEEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKT_ Line | Count | Source | 2508 | 2.95k | void Add(const std::string& name, const Value& value) { | 2509 | 2.95k | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 2.95k | } |
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddINSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEEEvRKS9_RKT_ Line | Count | Source | 2508 | 3.20k | void Add(const std::string& name, const Value& value) { | 2509 | 3.20k | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 3.20k | } |
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIA10_cEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKT_ Line | Count | Source | 2508 | 337 | void Add(const std::string& name, const Value& value) { | 2509 | 337 | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 337 | } |
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddItEEvRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKT_ Line | Count | Source | 2508 | 3.28k | void Add(const std::string& name, const Value& value) { | 2509 | 3.28k | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 3.28k | } |
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIiEEvRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKT_ Line | Count | Source | 2508 | 337 | void Add(const std::string& name, const Value& value) { | 2509 | 337 | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 337 | } |
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIbEEvRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKT_ Line | Count | Source | 2508 | 738 | void Add(const std::string& name, const Value& value) { | 2509 | 738 | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 738 | } |
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIA3_cEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKT_ Line | Count | Source | 2508 | 1.47k | void Add(const std::string& name, const Value& value) { | 2509 | 1.47k | value_.push_back(Format("--$0=$1", name, value)); | 2510 | 1.47k | } |
|
2511 | | |
2512 | 2.95k | void AddHostPort(const std::string& name, const std::string& host, uint16_t port) { |
2513 | 2.95k | Add(name, HostPort(host, port)); |
2514 | 2.95k | } |
2515 | | |
2516 | 1.07k | const std::vector<std::string>& value() const { |
2517 | 1.07k | return value_; |
2518 | 1.07k | } |
2519 | | |
2520 | | private: |
2521 | | std::vector<std::string> value_; |
2522 | | }; |
2523 | | |
2524 | | } // namespace |
2525 | | |
2526 | 337 | Status ExternalMaster::Start(bool shell_mode) { |
2527 | 337 | Flags flags; |
2528 | 337 | flags.Add("fs_data_dirs", root_dir_); |
2529 | 337 | flags.Add("rpc_bind_addresses", rpc_bind_address_); |
2530 | 337 | flags.Add("webserver_interface", "localhost"); |
2531 | 337 | flags.Add("webserver_port", http_port_); |
2532 | | // Default master args to make sure we don't wait to trigger new LB tasks upon master leader |
2533 | | // failover. |
2534 | 337 | flags.Add("load_balancer_initial_delay_secs", 0); |
2535 | | // On first start, we need to tell the masters their list of expected peers. |
2536 | | // For 'shell' master, there is no master addresses. |
2537 | 337 | if (!shell_mode) { |
2538 | 313 | flags.Add("master_addresses", master_addrs_); |
2539 | 313 | } |
2540 | 337 | RETURN_NOT_OK(StartProcess(flags.value())); |
2541 | 337 | return Status::OK(); |
2542 | 337 | } |
2543 | | |
2544 | 15 | Status ExternalMaster::Restart() { |
2545 | 15 | LOG_WITH_PREFIX(INFO) << "Restart()"; |
2546 | 15 | if (!IsProcessAlive()) { |
2547 | | // Make sure this function could be safely called if the process has already crashed. |
2548 | 15 | Shutdown(); |
2549 | 15 | } |
2550 | | // We store the addresses on shutdown so make sure we did that first. |
2551 | 15 | if (bound_rpc_.port() == 0) { |
2552 | 0 | return STATUS(IllegalState, "Master cannot be restarted. Must call Shutdown() first."); |
2553 | 0 | } |
2554 | 15 | return Start(true); |
2555 | 15 | } |
2556 | | |
2557 | | //------------------------------------------------------------ |
2558 | | // ExternalTabletServer |
2559 | | //------------------------------------------------------------ |
2560 | | |
2561 | | ExternalTabletServer::ExternalTabletServer( |
2562 | | size_t tablet_server_index, rpc::Messenger* messenger, rpc::ProxyCache* proxy_cache, |
2563 | | const std::string& exe, const std::string& data_dir, uint16_t num_drives, |
2564 | | std::string bind_host, uint16_t rpc_port, uint16_t http_port, uint16_t redis_rpc_port, |
2565 | | uint16_t redis_http_port, uint16_t cql_rpc_port, uint16_t cql_http_port, |
2566 | | uint16_t pgsql_rpc_port, uint16_t pgsql_http_port, |
2567 | | const std::vector<HostPort>& master_addrs, const std::vector<std::string>& extra_flags) |
2568 | | : ExternalDaemon(Substitute("ts-$0", tablet_server_index + 1), |
2569 | | messenger, proxy_cache, exe, data_dir, |
2570 | | FsDataDirs(data_dir, "tserver", num_drives), extra_flags), |
2571 | | master_addrs_(HostPort::ToCommaSeparatedString(master_addrs)), |
2572 | | bind_host_(std::move(bind_host)), |
2573 | | rpc_port_(rpc_port), |
2574 | | http_port_(http_port), |
2575 | | redis_rpc_port_(redis_rpc_port), |
2576 | | redis_http_port_(redis_http_port), |
2577 | | pgsql_rpc_port_(pgsql_rpc_port), |
2578 | | pgsql_http_port_(pgsql_http_port), |
2579 | | cql_rpc_port_(cql_rpc_port), |
2580 | | cql_http_port_(cql_http_port), |
2581 | 675 | num_drives_(num_drives) {} |
2582 | | |
2583 | 466 | ExternalTabletServer::~ExternalTabletServer() { |
2584 | 466 | } |
2585 | | |
2586 | | Status ExternalTabletServer::Start( |
2587 | | bool start_cql_proxy, bool set_proxy_addrs, |
2588 | 738 | std::vector<std::pair<string, string>> extra_flags) { |
2589 | 738 | auto dirs = FsRootDirs(root_dir_, num_drives_); |
2590 | 772 | for (const auto& dir : dirs) { |
2591 | 772 | RETURN_NOT_OK(Env::Default()->CreateDirs(dir)); |
2592 | 772 | } |
2593 | 738 | start_cql_proxy_ = start_cql_proxy; |
2594 | 738 | Flags flags; |
2595 | 738 | flags.Add("fs_data_dirs", JoinStrings(dirs, ",")); |
2596 | 738 | flags.AddHostPort("rpc_bind_addresses", bind_host_, rpc_port_); |
2597 | 738 | flags.Add("webserver_interface", bind_host_); |
2598 | 738 | flags.Add("webserver_port", http_port_); |
2599 | 738 | flags.Add("redis_proxy_webserver_port", redis_http_port_); |
2600 | 738 | flags.Add("pgsql_proxy_webserver_port", pgsql_http_port_); |
2601 | 738 | flags.Add("cql_proxy_webserver_port", cql_http_port_); |
2602 | | |
2603 | 738 | if (set_proxy_addrs) { |
2604 | 738 | flags.AddHostPort("redis_proxy_bind_address", bind_host_, redis_rpc_port_); |
2605 | 738 | flags.AddHostPort("pgsql_proxy_bind_address", bind_host_, pgsql_rpc_port_); |
2606 | 738 | flags.AddHostPort("cql_proxy_bind_address", bind_host_, cql_rpc_port_); |
2607 | 738 | } |
2608 | | |
2609 | 738 | flags.Add("start_cql_proxy", start_cql_proxy_); |
2610 | 738 | flags.Add("tserver_master_addrs", master_addrs_); |
2611 | | |
2612 | | // Use conservative number of threads for the mini cluster for unit test env |
2613 | | // where several unit tests tend to run in parallel. |
2614 | 738 | flags.Add("tablet_server_svc_num_threads", "64"); |
2615 | 738 | flags.Add("ts_consensus_svc_num_threads", "20"); |
2616 | | |
2617 | 4 | for (const auto& flag_value : extra_flags) { |
2618 | 4 | flags.Add(flag_value.first, flag_value.second); |
2619 | 4 | } |
2620 | | |
2621 | 738 | RETURN_NOT_OK(StartProcess(flags.value())); |
2622 | | |
2623 | 736 | return Status::OK(); |
2624 | 738 | } |
2625 | | |
2626 | 736 | Status ExternalTabletServer::BuildServerStateFromInfoPath() { |
2627 | 736 | RETURN_NOT_OK(ExternalDaemon::BuildServerStateFromInfoPath()); |
2628 | 736 | if (start_cql_proxy_) { |
2629 | 735 | RETURN_NOT_OK(ExternalDaemon::BuildServerStateFromInfoPath(GetCQLServerInfoPath(), |
2630 | 735 | &cqlserver_status_)); |
2631 | 735 | } |
2632 | 736 | return Status::OK(); |
2633 | 736 | } |
2634 | | |
2635 | 12.5k | string ExternalTabletServer::GetCQLServerInfoPath() { |
2636 | 12.5k | return ExternalDaemon::GetServerInfoPath() + "-cql"; |
2637 | 12.5k | } |
2638 | | |
2639 | 111k | bool ExternalTabletServer::ServerInfoPathsExist() { |
2640 | 111k | if (start_cql_proxy_) { |
2641 | 110k | return ExternalDaemon::ServerInfoPathsExist() && |
2642 | 11.0k | Env::Default()->FileExists(GetCQLServerInfoPath()); |
2643 | 110k | } |
2644 | 158 | return ExternalDaemon::ServerInfoPathsExist(); |
2645 | 158 | } |
2646 | | |
2647 | 738 | Status ExternalTabletServer::DeleteServerInfoPaths() { |
2648 | | // We want to try a deletion for both files. |
2649 | 738 | Status s1 = ExternalDaemon::DeleteServerInfoPaths(); |
2650 | 738 | Status s2 = Env::Default()->DeleteFile(GetCQLServerInfoPath()); |
2651 | 738 | RETURN_NOT_OK(s1); |
2652 | 63 | RETURN_NOT_OK(s2); |
2653 | 63 | return Status::OK(); |
2654 | 63 | } |
2655 | | |
2656 | | Status ExternalTabletServer::Restart( |
2657 | 61 | bool start_cql_proxy, std::vector<std::pair<string, string>> flags) { |
2658 | 61 | LOG_WITH_PREFIX(INFO) << "Restart: start_cql_proxy=" << start_cql_proxy; |
2659 | 61 | if (!IsProcessAlive()) { |
2660 | | // Make sure this function could be safely called if the process has already crashed. |
2661 | 61 | Shutdown(); |
2662 | 61 | } |
2663 | | // We store the addresses on shutdown so make sure we did that first. |
2664 | 61 | if (bound_rpc_.port() == 0) { |
2665 | 0 | return STATUS(IllegalState, "Tablet server cannot be restarted. Must call Shutdown() first."); |
2666 | 0 | } |
2667 | 61 | return Start(start_cql_proxy, true /* set_proxy_addrs */, flags); |
2668 | 61 | } |
2669 | | |
2670 | | Result<int64_t> ExternalTabletServer::GetInt64CQLMetric(const MetricEntityPrototype* entity_proto, |
2671 | | const char* entity_id, |
2672 | | const MetricPrototype* metric_proto, |
2673 | 0 | const char* value_field) const { |
2674 | 0 | return GetInt64MetricFromHost( |
2675 | 0 | HostPort(bind_host(), cql_http_port()), |
2676 | 0 | entity_proto, entity_id, metric_proto, value_field); |
2677 | 0 | } |
2678 | | |
2679 | 1 | Status ExternalTabletServer::SetNumDrives(uint16_t num_drives) { |
2680 | 1 | if (IsProcessAlive()) { |
2681 | 0 | return STATUS(IllegalState, "Cann't set num drives on running Tablet server. " |
2682 | 0 | "Must call Shutdown() first."); |
2683 | 0 | } |
2684 | 1 | num_drives_ = num_drives; |
2685 | 1 | data_dirs_ = FsDataDirs(root_dir_, "tserver", num_drives_); |
2686 | 1 | return Status::OK(); |
2687 | 1 | } |
2688 | | |
2689 | 2 | Status RestartAllMasters(ExternalMiniCluster* cluster) { |
2690 | 4 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
2691 | 2 | cluster->master(i)->Shutdown(); |
2692 | 2 | } |
2693 | 4 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
2694 | 2 | RETURN_NOT_OK(cluster->master(i)->Restart()); |
2695 | 2 | } |
2696 | | |
2697 | 2 | return Status::OK(); |
2698 | 2 | } |
2699 | | |
2700 | | } // namespace yb |