/Users/deen/code/yugabyte-db/src/yb/integration-tests/mini_cluster.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/integration-tests/mini_cluster.h" |
34 | | |
35 | | #include <algorithm> |
36 | | |
37 | | #include "yb/client/client.h" |
38 | | #include "yb/client/yb_table_name.h" |
39 | | |
40 | | #include "yb/consensus/consensus.h" |
41 | | #include "yb/consensus/consensus.pb.h" |
42 | | |
43 | | #include "yb/gutil/casts.h" |
44 | | #include "yb/gutil/strings/join.h" |
45 | | #include "yb/gutil/strings/substitute.h" |
46 | | |
47 | | #include "yb/master/catalog_entity_info.h" |
48 | | #include "yb/master/catalog_manager_if.h" |
49 | | #include "yb/master/master.h" |
50 | | #include "yb/master/master_admin.pb.h" |
51 | | #include "yb/master/master_client.pb.h" |
52 | | #include "yb/master/master_cluster.pb.h" |
53 | | #include "yb/master/mini_master.h" |
54 | | #include "yb/master/scoped_leader_shared_lock.h" |
55 | | #include "yb/master/ts_descriptor.h" |
56 | | #include "yb/master/ts_manager.h" |
57 | | |
58 | | #include "yb/rocksdb/db/db_impl.h" |
59 | | #include "yb/rocksdb/rate_limiter.h" |
60 | | |
61 | | #include "yb/rpc/messenger.h" |
62 | | |
63 | | #include "yb/server/hybrid_clock.h" |
64 | | #include "yb/server/skewed_clock.h" |
65 | | |
66 | | #include "yb/tablet/tablet.h" |
67 | | #include "yb/tablet/tablet_metadata.h" |
68 | | #include "yb/tablet/tablet_peer.h" |
69 | | #include "yb/tablet/transaction_participant.h" |
70 | | |
71 | | #include "yb/tserver/mini_tablet_server.h" |
72 | | #include "yb/tserver/tablet_server.h" |
73 | | #include "yb/tserver/ts_tablet_manager.h" |
74 | | |
75 | | #include "yb/util/debug/long_operation_tracker.h" |
76 | | #include "yb/util/format.h" |
77 | | #include "yb/util/path_util.h" |
78 | | #include "yb/util/random_util.h" |
79 | | #include "yb/util/scope_exit.h" |
80 | | #include "yb/util/status.h" |
81 | | #include "yb/util/status_format.h" |
82 | | #include "yb/util/status_log.h" |
83 | | #include "yb/util/stopwatch.h" |
84 | | #include "yb/util/test_thread_holder.h" |
85 | | #include "yb/util/test_util.h" |
86 | | #include "yb/util/tsan_util.h" |
87 | | |
88 | | using namespace std::literals; |
89 | | using strings::Substitute; |
90 | | |
91 | | DEFINE_string(mini_cluster_base_dir, "", "Directory for master/ts data"); |
92 | | DEFINE_bool(mini_cluster_reuse_data, false, "Reuse data of mini cluster"); |
93 | | DECLARE_int32(master_svc_num_threads); |
94 | | DECLARE_int32(memstore_size_mb); |
95 | | DECLARE_int32(master_consensus_svc_num_threads); |
96 | | DECLARE_int32(master_remote_bootstrap_svc_num_threads); |
97 | | DECLARE_int32(generic_svc_num_threads); |
98 | | DECLARE_int32(tablet_server_svc_num_threads); |
99 | | DECLARE_int32(ts_admin_svc_num_threads); |
100 | | DECLARE_int32(ts_consensus_svc_num_threads); |
101 | | DECLARE_int32(ts_remote_bootstrap_svc_num_threads); |
102 | | DECLARE_int32(replication_factor); |
103 | | DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec); |
104 | | DECLARE_string(use_private_ip); |
105 | | DECLARE_int32(load_balancer_initial_delay_secs); |
106 | | DECLARE_int32(transaction_table_num_tablets); |
107 | | |
108 | | namespace yb { |
109 | | |
110 | | using client::YBClient; |
111 | | using client::YBClientBuilder; |
112 | | using master::CatalogManager; |
113 | | using master::MiniMaster; |
114 | | using master::TabletLocationsPB; |
115 | | using master::TSDescriptor; |
116 | | using std::shared_ptr; |
117 | | using std::string; |
118 | | using std::vector; |
119 | | using tserver::MiniTabletServer; |
120 | | using tserver::TabletServer; |
121 | | using master::GetMasterClusterConfigResponsePB; |
122 | | using master::ChangeMasterClusterConfigRequestPB; |
123 | | using master::ChangeMasterClusterConfigResponsePB; |
124 | | using master::SysClusterConfigEntryPB; |
125 | | |
126 | | namespace { |
127 | | |
128 | | const std::vector<uint16_t> EMPTY_MASTER_RPC_PORTS = {}; |
129 | | const int kMasterLeaderElectionWaitTimeSeconds = 20 * kTimeMultiplier; |
130 | | const int kRegistrationWaitTimeSeconds = 45 * kTimeMultiplier; |
131 | | const int kTabletReportWaitTimeSeconds = 5; |
132 | | |
133 | 398 | std::string GetClusterDataDirName(const MiniClusterOptions& options) { |
134 | 398 | std::string cluster_name = "minicluster-data"; |
135 | 398 | if (options.cluster_id == "") { |
136 | 398 | return cluster_name; |
137 | 398 | } |
138 | 0 | return Format("$0-$1", cluster_name, options.cluster_id); |
139 | 0 | } |
140 | | |
141 | 398 | std::string GetFsRoot(const MiniClusterOptions& options) { |
142 | 398 | if (!options.data_root.empty()) { |
143 | 0 | return options.data_root; |
144 | 0 | } |
145 | 398 | if (!FLAGS_mini_cluster_base_dir.empty()) { |
146 | 0 | return FLAGS_mini_cluster_base_dir; |
147 | 0 | } |
148 | 398 | return JoinPathSegments(GetTestDataDirectory(), GetClusterDataDirName(options)); |
149 | 398 | } |
150 | | |
151 | | } // namespace |
152 | | |
153 | | MiniCluster::MiniCluster(const MiniClusterOptions& options) |
154 | | : options_(options), |
155 | 398 | fs_root_(GetFsRoot(options)) { |
156 | 398 | mini_masters_.resize(options_.num_masters); |
157 | 398 | } |
158 | | |
159 | 53 | MiniCluster::~MiniCluster() { |
160 | 53 | Shutdown(); |
161 | 53 | } |
162 | | |
163 | 398 | Status MiniCluster::Start(const std::vector<tserver::TabletServerOptions>& extra_tserver_options) { |
164 | 0 | CHECK(!fs_root_.empty()) << "No Fs root was provided"; |
165 | 398 | CHECK(!running_); |
166 | | |
167 | 398 | EnsurePortsAllocated(); |
168 | | |
169 | 398 | if (!options_.master_env->FileExists(fs_root_)) { |
170 | 398 | RETURN_NOT_OK(options_.master_env->CreateDir(fs_root_)); |
171 | 398 | } |
172 | | |
173 | | // TODO: properly handle setting these variables in case of multiple MiniClusters in the same |
174 | | // process. |
175 | | |
176 | | // Use conservative number of threads for the mini cluster for unit test env |
177 | | // where several unit tests tend to run in parallel. |
178 | | // To get default number of threads - try to find SERVICE_POOL_OPTIONS macro usage. |
179 | 398 | FLAGS_master_svc_num_threads = 2; |
180 | 398 | FLAGS_master_consensus_svc_num_threads = 2; |
181 | 398 | FLAGS_master_remote_bootstrap_svc_num_threads = 2; |
182 | 398 | FLAGS_generic_svc_num_threads = 2; |
183 | | |
184 | 398 | FLAGS_tablet_server_svc_num_threads = 8; |
185 | 398 | FLAGS_ts_admin_svc_num_threads = 2; |
186 | 398 | FLAGS_ts_consensus_svc_num_threads = 8; |
187 | 398 | FLAGS_ts_remote_bootstrap_svc_num_threads = 2; |
188 | | |
189 | | // Limit number of transaction table tablets to help avoid timeouts. |
190 | 398 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_transaction_table_num_tablets) = |
191 | 398 | NumTabletsPerTransactionTable(options_); |
192 | | |
193 | | // We are testing public/private IPs using mini cluster. So set mode to 'cloud'. |
194 | 398 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_private_ip) = "cloud"; |
195 | | |
196 | | // This dictates the RF of newly created tables. |
197 | 325 | SetAtomicFlag(options_.num_tablet_servers >= 3 ? 3 : 1, &FLAGS_replication_factor); |
198 | 398 | FLAGS_memstore_size_mb = 16; |
199 | | // Default master args to make sure we don't wait to trigger new LB tasks upon master leader |
200 | | // failover. |
201 | 398 | FLAGS_load_balancer_initial_delay_secs = 0; |
202 | | |
203 | | // start the masters |
204 | 398 | RETURN_NOT_OK_PREPEND(StartMasters(), |
205 | 393 | "Couldn't start distributed masters"); |
206 | | |
207 | 393 | if (!extra_tserver_options.empty() && |
208 | 2 | extra_tserver_options.size() != options_.num_tablet_servers) { |
209 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "num tserver options: $0 doesn't match with num " |
210 | 0 | "tservers: $1", extra_tserver_options.size(), options_.num_tablet_servers); |
211 | 0 | } |
212 | | |
213 | 1.04k | for (size_t i = 0; i < options_.num_tablet_servers; i++) { |
214 | 656 | if (!extra_tserver_options.empty()) { |
215 | 2 | RETURN_NOT_OK_PREPEND(AddTabletServer(extra_tserver_options[i]), |
216 | 2 | Substitute("Error adding TS $0", i)); |
217 | 654 | } else { |
218 | 654 | RETURN_NOT_OK_PREPEND(AddTabletServer(), |
219 | 653 | Substitute("Error adding TS $0", i)); |
220 | 653 | } |
221 | | |
222 | 656 | } |
223 | | |
224 | 392 | RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(options_.num_tablet_servers), |
225 | 392 | "Waiting for tablet servers to start"); |
226 | | |
227 | 392 | running_ = true; |
228 | 392 | return Status::OK(); |
229 | 392 | } |
230 | | |
231 | 398 | Status MiniCluster::StartMasters() { |
232 | 398 | CHECK_GE(master_rpc_ports_.size(), options_.num_masters); |
233 | 398 | EnsurePortsAllocated(); |
234 | | |
235 | 398 | LOG(INFO) << "Creating distributed mini masters. RPC ports: " |
236 | 398 | << JoinInts(master_rpc_ports_, ", "); |
237 | | |
238 | 398 | if (mini_masters_.size() < options_.num_masters) { |
239 | 0 | mini_masters_.resize(options_.num_masters); |
240 | 0 | } |
241 | | |
242 | 398 | bool started = false; |
243 | 370 | auto se = ScopeExit([this, &started] { |
244 | 370 | if (!started) { |
245 | 15 | for (const auto& master : mini_masters_) { |
246 | 15 | if (master) { |
247 | 15 | master->Shutdown(); |
248 | 15 | } |
249 | 15 | } |
250 | 5 | } |
251 | 370 | }); |
252 | | |
253 | 857 | for (size_t i = 0; i < options_.num_masters; i++) { |
254 | 464 | mini_masters_[i] = std::make_shared<MiniMaster>( |
255 | 464 | options_.master_env, GetMasterFsRoot(i), master_rpc_ports_[i], master_web_ports_[i], i); |
256 | 464 | auto status = mini_masters_[i]->StartDistributedMaster(master_rpc_ports_); |
257 | 33 | LOG_IF(INFO, !status.ok()) << "Failed to start master: " << status; |
258 | 464 | RETURN_NOT_OK_PREPEND(status, Substitute("Couldn't start follower $0", i)); |
259 | 28 | VLOG(1) << "Started MiniMaster with UUID " << mini_masters_[i]->permanent_uuid() |
260 | 28 | << " at index " << i; |
261 | 459 | } |
262 | 393 | int i = 0; |
263 | 365 | for (const shared_ptr<MiniMaster>& master : mini_masters_) { |
264 | 365 | LOG(INFO) << "Waiting to initialize catalog manager on master " << i++; |
265 | 365 | RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(), |
266 | 365 | Substitute("Could not initialize catalog manager on master $0", i)); |
267 | 365 | } |
268 | 393 | started = true; |
269 | 393 | return Status::OK(); |
270 | 393 | } |
271 | | |
272 | 0 | Status MiniCluster::StartSync() { |
273 | 0 | RETURN_NOT_OK(Start()); |
274 | 0 | int count = 0; |
275 | 0 | for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) { |
276 | 0 | RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(), |
277 | 0 | Substitute("TabletServer $0 failed to start.", count)); |
278 | 0 | count++; |
279 | 0 | } |
280 | 0 | return Status::OK(); |
281 | 0 | } |
282 | | |
283 | 0 | Status MiniCluster::RestartSync() { |
284 | 0 | LOG(INFO) << string(80, '-'); |
285 | 0 | LOG(INFO) << __FUNCTION__; |
286 | 0 | LOG(INFO) << string(80, '-'); |
287 | |
|
288 | 0 | LOG(INFO) << "Restart tablet server(s)..."; |
289 | 0 | for (auto& tablet_server : mini_tablet_servers_) { |
290 | 0 | CHECK_OK(tablet_server->Restart()); |
291 | 0 | CHECK_OK(tablet_server->WaitStarted()); |
292 | 0 | } |
293 | 0 | LOG(INFO) << "Restart master server(s)..."; |
294 | 0 | for (auto& master_server : mini_masters_) { |
295 | 0 | LOG(INFO) << "Restarting master " << master_server->permanent_uuid(); |
296 | 0 | LongOperationTracker long_operation_tracker("Master restart", 5s); |
297 | 0 | CHECK_OK(master_server->Restart()); |
298 | 0 | LOG(INFO) << "Waiting for catalog manager at " << master_server->permanent_uuid(); |
299 | 0 | CHECK_OK(master_server->WaitForCatalogManagerInit()); |
300 | 0 | } |
301 | 0 | LOG(INFO) << string(80, '-'); |
302 | 0 | LOG(INFO) << __FUNCTION__ << " done"; |
303 | 0 | LOG(INFO) << string(80, '-'); |
304 | |
|
305 | 0 | RETURN_NOT_OK_PREPEND(WaitForAllTabletServers(), |
306 | 0 | "Waiting for tablet servers to start"); |
307 | 0 | running_ = true; |
308 | 0 | return Status::OK(); |
309 | 0 | } |
310 | | |
311 | 657 | Status MiniCluster::AddTabletServer(const tserver::TabletServerOptions& extra_opts) { |
312 | 657 | if (mini_masters_.empty()) { |
313 | 0 | return STATUS(IllegalState, "Master not yet initialized"); |
314 | 0 | } |
315 | 657 | auto new_idx = mini_tablet_servers_.size(); |
316 | | |
317 | 657 | EnsurePortsAllocated(0 /* num_masters (will pick default) */, new_idx + 1); |
318 | 657 | const uint16_t ts_rpc_port = tserver_rpc_ports_[new_idx]; |
319 | | |
320 | 657 | std::shared_ptr<MiniTabletServer> tablet_server; |
321 | 657 | if (options_.num_drives == 1) { |
322 | 647 | tablet_server = std::make_shared<MiniTabletServer>( |
323 | 647 | GetTabletServerFsRoot(new_idx), ts_rpc_port, extra_opts, new_idx); |
324 | 10 | } else { |
325 | 10 | std::vector<std::string> dirs; |
326 | 40 | for (int i = 0; i < options_.num_drives; ++i) { |
327 | 30 | dirs.push_back(GetTabletServerDrive(new_idx, i)); |
328 | 30 | } |
329 | 10 | tablet_server = std::make_shared<MiniTabletServer>( |
330 | 10 | dirs, dirs, ts_rpc_port, extra_opts, new_idx); |
331 | 10 | } |
332 | | |
333 | | // set the master addresses |
334 | 657 | auto master_addr = std::make_shared<server::MasterAddresses>(); |
335 | 657 | for (const shared_ptr<MiniMaster>& master : mini_masters_) { |
336 | 657 | master_addr->push_back({HostPort(master->bound_rpc_addr())}); |
337 | 657 | for (const auto& hp : master->master()->opts().broadcast_addresses) { |
338 | 657 | master_addr->back().push_back(hp); |
339 | 657 | } |
340 | 657 | } |
341 | | |
342 | 657 | tablet_server->options()->master_addresses_flag = server::MasterAddressesToString(*master_addr); |
343 | 657 | tablet_server->options()->SetMasterAddresses(master_addr); |
344 | 657 | tablet_server->options()->webserver_opts.port = tserver_web_ports_[new_idx]; |
345 | 657 | if (options_.ts_env) { |
346 | 10 | tablet_server->options()->env = options_.ts_env; |
347 | 10 | } |
348 | 657 | if (options_.ts_rocksdb_env) { |
349 | 10 | tablet_server->options()->rocksdb_env = options_.ts_rocksdb_env; |
350 | 10 | } |
351 | 657 | RETURN_NOT_OK(tablet_server->Start()); |
352 | 656 | mini_tablet_servers_.push_back(tablet_server); |
353 | 656 | return Status::OK(); |
354 | 657 | } |
355 | | |
356 | 655 | Status MiniCluster::AddTabletServer() { |
357 | 655 | auto options = tserver::TabletServerOptions::CreateTabletServerOptions(); |
358 | 655 | RETURN_NOT_OK(options); |
359 | 655 | return AddTabletServer(*options); |
360 | 655 | } |
361 | | |
362 | | namespace { |
363 | | |
364 | | Status ChangeClusterConfig( |
365 | | master::CatalogManagerIf* catalog_manager, |
366 | 0 | std::function<void(SysClusterConfigEntryPB*)> config_changer) { |
367 | 0 | GetMasterClusterConfigResponsePB config_resp; |
368 | 0 | RETURN_NOT_OK(catalog_manager->GetClusterConfig(&config_resp)); |
369 | |
|
370 | 0 | ChangeMasterClusterConfigRequestPB change_req; |
371 | 0 | *change_req.mutable_cluster_config() = std::move(*config_resp.mutable_cluster_config()); |
372 | 0 | SysClusterConfigEntryPB* config = change_req.mutable_cluster_config(); |
373 | |
|
374 | 0 | config_changer(config); |
375 | |
|
376 | 0 | ChangeMasterClusterConfigResponsePB change_resp; |
377 | 0 | return catalog_manager->SetClusterConfig(&change_req, &change_resp); |
378 | 0 | } |
379 | | |
380 | | } // namespace |
381 | | |
382 | 0 | Status MiniCluster::AddTServerToBlacklist(const MiniTabletServer& ts) { |
383 | 0 | const auto* master = VERIFY_RESULT(GetLeaderMiniMaster()); |
384 | |
|
385 | 0 | RETURN_NOT_OK(ChangeClusterConfig( |
386 | 0 | &master->catalog_manager(), [&ts](SysClusterConfigEntryPB* config) { |
387 | | // Add tserver to blacklist. |
388 | 0 | HostPortPB* blacklist_host_pb = config->mutable_server_blacklist()->mutable_hosts()->Add(); |
389 | 0 | blacklist_host_pb->set_host(ts.bound_rpc_addr().address().to_string()); |
390 | 0 | blacklist_host_pb->set_port(ts.bound_rpc_addr().port()); |
391 | 0 | })); |
392 | |
|
393 | 0 | LOG(INFO) << "TServer " << ts.server()->permanent_uuid() << " at " |
394 | 0 | << ts.bound_rpc_addr().address().to_string() << ":" << ts.bound_rpc_addr().port() |
395 | 0 | << " was added to the blacklist"; |
396 | |
|
397 | 0 | return Status::OK(); |
398 | 0 | } |
399 | | |
400 | 0 | Status MiniCluster::ClearBlacklist() { |
401 | 0 | const auto* master = VERIFY_RESULT(GetLeaderMiniMaster()); |
402 | |
|
403 | 0 | RETURN_NOT_OK( |
404 | 0 | ChangeClusterConfig(&master->catalog_manager(), [](SysClusterConfigEntryPB* config) { |
405 | 0 | config->mutable_server_blacklist()->Clear(); |
406 | 0 | config->mutable_leader_blacklist()->Clear(); |
407 | 0 | })); |
408 | |
|
409 | 0 | LOG(INFO) << "Blacklist has been cleared"; |
410 | |
|
411 | 0 | return Status::OK(); |
412 | 0 | } |
413 | | |
414 | 0 | string MiniCluster::GetMasterAddresses() const { |
415 | 0 | string peer_addrs = ""; |
416 | 0 | for (const auto& master : mini_masters_) { |
417 | 0 | if (!peer_addrs.empty()) { |
418 | 0 | peer_addrs += ","; |
419 | 0 | } |
420 | 0 | peer_addrs += master->bound_rpc_addr_str(); |
421 | 0 | } |
422 | 0 | return peer_addrs; |
423 | 0 | } |
424 | | |
425 | 0 | string MiniCluster::GetTserverHTTPAddresses() const { |
426 | 0 | string peer_addrs = ""; |
427 | 0 | for (const auto& tserver : mini_tablet_servers_) { |
428 | 0 | if (!peer_addrs.empty()) { |
429 | 0 | peer_addrs += ","; |
430 | 0 | } |
431 | 0 | peer_addrs += tserver->bound_http_addr_str(); |
432 | 0 | } |
433 | 0 | return peer_addrs; |
434 | 0 | } |
435 | | |
436 | 6.21k | ssize_t MiniCluster::LeaderMasterIdx() { |
437 | 6.21k | Stopwatch sw; |
438 | 6.21k | sw.start(); |
439 | 167k | while (sw.elapsed().wall_seconds() < kMasterLeaderElectionWaitTimeSeconds) { |
440 | 328k | for (size_t i = 0; i < mini_masters_.size(); i++) { |
441 | 167k | MiniMaster* master = mini_master(i); |
442 | 167k | if (master->master() == nullptr || master->master()->IsShutdown()) { |
443 | 0 | continue; |
444 | 0 | } |
445 | 167k | SCOPED_LEADER_SHARED_LOCK(l, master->master()->catalog_manager_impl()); |
446 | 167k | if (l.catalog_status().ok() && l.leader_status().ok()) { |
447 | 6.21k | return i; |
448 | 6.21k | } |
449 | 167k | } |
450 | 161k | SleepFor(MonoDelta::FromMilliseconds(1)); |
451 | 161k | } |
452 | 0 | LOG(ERROR) << "No leader master elected after " << kMasterLeaderElectionWaitTimeSeconds |
453 | 0 | << " seconds."; |
454 | 0 | return -1; |
455 | 6.21k | } |
456 | | |
457 | 6.21k | Result<MiniMaster*> MiniCluster::GetLeaderMiniMaster() { |
458 | 6.21k | const auto idx = LeaderMasterIdx(); |
459 | 6.21k | if (idx == -1) { |
460 | 0 | return STATUS(TimedOut, "No leader master has been elected"); |
461 | 0 | } |
462 | 6.21k | return mini_master(idx); |
463 | 6.21k | } |
464 | | |
465 | 109 | void MiniCluster::Shutdown() { |
466 | 109 | if (!running_) |
467 | 88 | return; |
468 | | |
469 | 21 | for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) { |
470 | 21 | tablet_server->Shutdown(); |
471 | 21 | } |
472 | 21 | mini_tablet_servers_.clear(); |
473 | | |
474 | 21 | for (shared_ptr<MiniMaster>& master_server : mini_masters_) { |
475 | 21 | master_server->Shutdown(); |
476 | 21 | master_server.reset(); |
477 | 21 | } |
478 | 21 | mini_masters_.clear(); |
479 | | |
480 | 21 | running_ = false; |
481 | 21 | } |
482 | | |
483 | 0 | Status MiniCluster::FlushTablets(tablet::FlushMode mode, tablet::FlushFlags flags) { |
484 | 0 | for (const auto& tablet_server : mini_tablet_servers_) { |
485 | 0 | RETURN_NOT_OK(tablet_server->FlushTablets(mode, flags)); |
486 | 0 | } |
487 | 0 | return Status::OK(); |
488 | 0 | } |
489 | | |
490 | 0 | Status MiniCluster::CompactTablets() { |
491 | 0 | for (const auto& tablet_server : mini_tablet_servers_) { |
492 | 0 | RETURN_NOT_OK(tablet_server->CompactTablets()); |
493 | 0 | } |
494 | 0 | return Status::OK(); |
495 | 0 | } |
496 | | |
497 | 0 | Status MiniCluster::SwitchMemtables() { |
498 | 0 | for (const auto& tablet_server : mini_tablet_servers_) { |
499 | 0 | RETURN_NOT_OK(tablet_server->SwitchMemtables()); |
500 | 0 | } |
501 | 0 | return Status::OK(); |
502 | 0 | } |
503 | | |
504 | 0 | Status MiniCluster::CleanTabletLogs() { |
505 | 0 | for (const auto& tablet_server : mini_tablet_servers_) { |
506 | 0 | RETURN_NOT_OK(tablet_server->CleanTabletLogs()); |
507 | 0 | } |
508 | 0 | return Status::OK(); |
509 | 0 | } |
510 | | |
511 | 0 | void MiniCluster::ShutdownMasters() { |
512 | 0 | for (shared_ptr<MiniMaster>& master_server : mini_masters_) { |
513 | 0 | master_server->Shutdown(); |
514 | 0 | master_server.reset(); |
515 | 0 | } |
516 | 0 | } |
517 | | |
518 | 173k | MiniMaster* MiniCluster::mini_master(size_t idx) { |
519 | 0 | CHECK_GE(idx, 0) << "Master idx must be >= 0"; |
520 | 0 | CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started"; |
521 | 173k | return mini_masters_[idx].get(); |
522 | 173k | } |
523 | | |
524 | 40 | MiniTabletServer* MiniCluster::mini_tablet_server(size_t idx) { |
525 | 0 | CHECK_GE(idx, 0) << "TabletServer idx must be >= 0"; |
526 | 0 | CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'"; |
527 | 40 | return mini_tablet_servers_[idx].get(); |
528 | 40 | } |
529 | | |
530 | 0 | MiniTabletServer* MiniCluster::find_tablet_server(const std::string& uuid) { |
531 | 0 | for (const auto& server : mini_tablet_servers_) { |
532 | 0 | if (!server->server()) { |
533 | 0 | continue; |
534 | 0 | } |
535 | 0 | if (server->server()->instance_pb().permanent_uuid() == uuid) { |
536 | 0 | return server.get(); |
537 | 0 | } |
538 | 0 | } |
539 | 0 | return nullptr; |
540 | 0 | } |
541 | | |
542 | 464 | string MiniCluster::GetMasterFsRoot(size_t idx) { |
543 | 464 | return JoinPathSegments(fs_root_, Substitute("master-$0-root", idx + 1)); |
544 | 464 | } |
545 | | |
546 | 647 | string MiniCluster::GetTabletServerFsRoot(size_t idx) { |
547 | 647 | return JoinPathSegments(fs_root_, Substitute("ts-$0-root", idx + 1)); |
548 | 647 | } |
549 | | |
550 | 75 | string MiniCluster::GetTabletServerDrive(size_t idx, int drive_index) { |
551 | 75 | if (options_.num_drives == 1) { |
552 | 0 | return GetTabletServerFsRoot(idx); |
553 | 0 | } |
554 | 75 | return JoinPathSegments(fs_root_, Substitute("ts-$0-drive-$1", idx + 1, drive_index + 1)); |
555 | 75 | } |
556 | | |
557 | 28 | tserver::TSTabletManager* MiniCluster::GetTabletManager(size_t idx) { |
558 | 28 | return mini_tablet_server(idx)->server()->tablet_manager(); |
559 | 28 | } |
560 | | |
561 | 0 | std::vector<std::shared_ptr<tablet::TabletPeer>> MiniCluster::GetTabletPeers(size_t idx) { |
562 | 0 | return GetTabletManager(idx)->GetTabletPeers(); |
563 | 0 | } |
564 | | |
565 | | Status MiniCluster::WaitForReplicaCount(const string& tablet_id, |
566 | | int expected_count, |
567 | 0 | TabletLocationsPB* locations) { |
568 | 0 | Stopwatch sw; |
569 | 0 | sw.start(); |
570 | 0 | while (sw.elapsed().wall_seconds() < kTabletReportWaitTimeSeconds) { |
571 | 0 | auto leader_mini_master = GetLeaderMiniMaster(); |
572 | 0 | if (!leader_mini_master.ok()) { |
573 | 0 | continue; |
574 | 0 | } |
575 | 0 | locations->Clear(); |
576 | 0 | Status s = (*leader_mini_master) |
577 | 0 | ->master() |
578 | 0 | ->catalog_manager() |
579 | 0 | ->GetTabletLocations(tablet_id, locations); |
580 | 0 | if (s.ok() && ((locations->stale() && expected_count == 0) || |
581 | 0 | (!locations->stale() && locations->replicas_size() == expected_count))) { |
582 | 0 | return Status::OK(); |
583 | 0 | } |
584 | | |
585 | 0 | SleepFor(MonoDelta::FromMilliseconds(1)); |
586 | 0 | } |
587 | 0 | return STATUS(TimedOut, Substitute("Tablet $0 never reached expected replica count $1", |
588 | 0 | tablet_id, expected_count)); |
589 | 0 | } |
590 | | |
591 | 0 | Status MiniCluster::WaitForAllTabletServers() { |
592 | 0 | return WaitForTabletServerCount(num_tablet_servers()); |
593 | 0 | } |
594 | | |
595 | 108 | Status MiniCluster::WaitForTabletServerCount(size_t count) { |
596 | 108 | vector<shared_ptr<master::TSDescriptor> > descs; |
597 | 108 | return WaitForTabletServerCount(count, &descs); |
598 | 108 | } |
599 | | |
600 | | Status MiniCluster::WaitForTabletServerCount(size_t count, |
601 | 109 | vector<shared_ptr<TSDescriptor> >* descs) { |
602 | 109 | Stopwatch sw; |
603 | 109 | sw.start(); |
604 | 6.21k | while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) { |
605 | 6.21k | auto leader = GetLeaderMiniMaster(); |
606 | 6.21k | if (leader.ok()) { |
607 | 6.21k | (*leader)->ts_manager().GetAllDescriptors(descs); |
608 | 6.21k | if (descs->size() == count) { |
609 | | // GetAllDescriptors() may return servers that are no longer online. |
610 | | // Do a second step of verification to verify that the descs that we got |
611 | | // are aligned (same uuid/seqno) with the TSs that we have in the cluster. |
612 | 109 | size_t match_count = 0; |
613 | 109 | for (const shared_ptr<TSDescriptor>& desc : *descs) { |
614 | 109 | for (auto mini_tablet_server : mini_tablet_servers_) { |
615 | 109 | auto ts = mini_tablet_server->server(); |
616 | 109 | if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() && |
617 | 109 | ts->instance_pb().instance_seqno() == desc->latest_seqno()) { |
618 | 109 | match_count++; |
619 | 109 | break; |
620 | 109 | } |
621 | 109 | } |
622 | 109 | } |
623 | | |
624 | 109 | if (match_count == count) { |
625 | 109 | LOG(INFO) << count << " TS(s) registered with Master after " |
626 | 109 | << sw.elapsed().wall_seconds() << "s"; |
627 | 109 | return Status::OK(); |
628 | 109 | } |
629 | 6.11k | } |
630 | | |
631 | 6.11k | YB_LOG_EVERY_N_SECS(INFO, 5) << "Registered: " << AsString(*descs); |
632 | 6.11k | } |
633 | | |
634 | 6.11k | SleepFor(MonoDelta::FromMilliseconds(1)); |
635 | 6.11k | } |
636 | 0 | return STATUS(TimedOut, Substitute("$0 TS(s) never registered with master", count)); |
637 | 109 | } |
638 | | |
639 | 35 | void MiniCluster::ConfigureClientBuilder(YBClientBuilder* builder) { |
640 | 35 | CHECK_NOTNULL(builder); |
641 | 35 | builder->clear_master_server_addrs(); |
642 | 35 | for (const shared_ptr<MiniMaster>& master : mini_masters_) { |
643 | 35 | CHECK(master); |
644 | 35 | builder->add_master_server_addr(master->bound_rpc_addr_str()); |
645 | 35 | } |
646 | 35 | } |
647 | | |
648 | 0 | Result<HostPort> MiniCluster::DoGetLeaderMasterBoundRpcAddr() { |
649 | 0 | return VERIFY_RESULT(GetLeaderMiniMaster())->bound_rpc_addr(); |
650 | 0 | } |
651 | | |
652 | | void MiniCluster::AllocatePortsForDaemonType( |
653 | | const string daemon_type, |
654 | | const size_t num_daemons, |
655 | | const string port_type, |
656 | 5.81k | std::vector<uint16_t>* ports) { |
657 | 5.81k | const size_t old_size = ports->size(); |
658 | 5.81k | if (ports->size() < num_daemons) { |
659 | 1.59k | ports->resize(num_daemons, 0 /* default value */); |
660 | 1.59k | } |
661 | 8.84k | for (auto i = old_size; i < num_daemons; ++i) { |
662 | 3.03k | if ((*ports)[i] == 0) { |
663 | 3.03k | const uint16_t new_port = port_picker_.AllocateFreePort(); |
664 | 3.03k | (*ports)[i] = new_port; |
665 | 3.03k | LOG(INFO) << "Using auto-assigned port " << new_port << " for a " << daemon_type |
666 | 3.03k | << " " << port_type << " port"; |
667 | 3.03k | } |
668 | 3.03k | } |
669 | 5.81k | } |
670 | | |
671 | 1.45k | void MiniCluster::EnsurePortsAllocated(size_t new_num_masters, size_t new_num_tservers) { |
672 | 1.45k | if (new_num_masters == 0) { |
673 | 1.45k | new_num_masters = std::max(options_.num_masters, num_masters()); |
674 | 1.45k | } |
675 | 1.45k | AllocatePortsForDaemonType("master", new_num_masters, "RPC", &master_rpc_ports_); |
676 | 1.45k | AllocatePortsForDaemonType("master", new_num_masters, "web", &master_web_ports_); |
677 | | |
678 | 1.45k | if (new_num_tservers == 0) { |
679 | 796 | new_num_tservers = std::max(options_.num_tablet_servers, num_tablet_servers()); |
680 | 796 | } |
681 | 1.45k | AllocatePortsForDaemonType("tablet server", new_num_tservers, "RPC", &tserver_rpc_ports_); |
682 | 1.45k | AllocatePortsForDaemonType("tablet server", new_num_tservers, "web", &tserver_web_ports_); |
683 | 1.45k | } |
684 | | |
685 | | server::SkewedClockDeltaChanger JumpClock( |
686 | 0 | server::RpcServerBase* server, std::chrono::milliseconds delta) { |
687 | 0 | auto* hybrid_clock = down_cast<server::HybridClock*>(server->clock()); |
688 | 0 | return server::SkewedClockDeltaChanger( |
689 | 0 | delta, std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock())); |
690 | 0 | } |
691 | | |
692 | | std::vector<server::SkewedClockDeltaChanger> SkewClocks( |
693 | 0 | MiniCluster* cluster, std::chrono::milliseconds clock_skew) { |
694 | 0 | std::vector<server::SkewedClockDeltaChanger> delta_changers; |
695 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
696 | 0 | delta_changers.push_back(JumpClock(cluster->mini_tablet_server(i)->server(), i * clock_skew)); |
697 | 0 | } |
698 | 0 | return delta_changers; |
699 | 0 | } |
700 | | |
701 | | std::vector<server::SkewedClockDeltaChanger> JumpClocks( |
702 | 0 | MiniCluster* cluster, std::chrono::milliseconds delta) { |
703 | 0 | std::vector<server::SkewedClockDeltaChanger> delta_changers; |
704 | 0 | auto num_masters = cluster->num_masters(); |
705 | 0 | auto num_tservers = cluster->num_tablet_servers(); |
706 | 0 | delta_changers.reserve(num_masters + num_tservers); |
707 | 0 | for (size_t i = 0; i != num_masters; ++i) { |
708 | 0 | delta_changers.push_back(JumpClock(cluster->mini_master(i)->master(), delta)); |
709 | 0 | } |
710 | 0 | for (size_t i = 0; i != num_tservers; ++i) { |
711 | 0 | delta_changers.push_back(JumpClock(cluster->mini_tablet_server(i)->server(), delta)); |
712 | 0 | } |
713 | 0 | return delta_changers; |
714 | 0 | } |
715 | | |
716 | 0 | void StepDownAllTablets(MiniCluster* cluster) { |
717 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
718 | 0 | for (const auto& peer : cluster->GetTabletPeers(i)) { |
719 | 0 | consensus::LeaderStepDownRequestPB req; |
720 | 0 | req.set_tablet_id(peer->tablet_id()); |
721 | 0 | consensus::LeaderStepDownResponsePB resp; |
722 | 0 | ASSERT_OK(peer->consensus()->StepDown(&req, &resp)); |
723 | 0 | } |
724 | 0 | } |
725 | 0 | } |
726 | | |
727 | 0 | void StepDownRandomTablet(MiniCluster* cluster) { |
728 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kLeaders); |
729 | 0 | if (!peers.empty()) { |
730 | 0 | auto peer = RandomElement(peers); |
731 | |
|
732 | 0 | consensus::LeaderStepDownRequestPB req; |
733 | 0 | req.set_tablet_id(peer->tablet_id()); |
734 | 0 | consensus::LeaderStepDownResponsePB resp; |
735 | 0 | ASSERT_OK(peer->consensus()->StepDown(&req, &resp)); |
736 | 0 | } |
737 | 0 | } |
738 | | |
739 | 0 | std::unordered_set<string> ListTabletIdsForTable(MiniCluster* cluster, const string& table_id) { |
740 | 0 | std::unordered_set<string> tablet_ids; |
741 | 0 | for (auto peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) { |
742 | 0 | if (peer->tablet_metadata()->table_id() == table_id) { |
743 | 0 | tablet_ids.insert(peer->tablet_id()); |
744 | 0 | } |
745 | 0 | } |
746 | 0 | return tablet_ids; |
747 | 0 | } |
748 | | |
749 | | std::unordered_set<string> ListActiveTabletIdsForTable( |
750 | 0 | MiniCluster* cluster, const string& table_id) { |
751 | 0 | std::unordered_set<string> tablet_ids; |
752 | 0 | for (auto peer : ListTableActiveTabletPeers(cluster, table_id)) { |
753 | 0 | tablet_ids.insert(peer->tablet_id()); |
754 | 0 | } |
755 | 0 | return tablet_ids; |
756 | 0 | } |
757 | | |
758 | 0 | std::vector<tablet::TabletPeerPtr> ListTabletPeers(MiniCluster* cluster, ListPeersFilter filter) { |
759 | 0 | switch (filter) { |
760 | 0 | case ListPeersFilter::kAll: |
761 | 0 | return ListTabletPeers(cluster, [](const auto& peer) { return true; }); |
762 | 0 | case ListPeersFilter::kLeaders: |
763 | 0 | return ListTabletPeers(cluster, [](const auto& peer) { |
764 | 0 | auto consensus = peer->shared_consensus(); |
765 | 0 | return consensus && consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER; |
766 | 0 | }); |
767 | 0 | case ListPeersFilter::kNonLeaders: |
768 | 0 | return ListTabletPeers(cluster, [](const auto& peer) { |
769 | 0 | auto consensus = peer->shared_consensus(); |
770 | 0 | return consensus && consensus->GetLeaderStatus() == consensus::LeaderStatus::NOT_LEADER; |
771 | 0 | }); |
772 | 0 | } |
773 | | |
774 | 0 | FATAL_INVALID_ENUM_VALUE(ListPeersFilter, filter); |
775 | 0 | } |
776 | | |
777 | | std::vector<tablet::TabletPeerPtr> ListTabletPeers( |
778 | | MiniCluster* cluster, |
779 | 0 | const std::function<bool(const std::shared_ptr<tablet::TabletPeer>&)>& filter) { |
780 | 0 | std::vector<tablet::TabletPeerPtr> result; |
781 | |
|
782 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
783 | 0 | auto server = cluster->mini_tablet_server(i)->server(); |
784 | 0 | if (!server) { // Server is shut down. |
785 | 0 | continue; |
786 | 0 | } |
787 | 0 | auto peers = server->tablet_manager()->GetTabletPeers(); |
788 | 0 | for (const auto& peer : peers) { |
789 | 0 | WARN_NOT_OK( |
790 | 0 | WaitFor( |
791 | 0 | [peer] { return peer->consensus() != nullptr || peer->IsShutdownStarted(); }, 5s, |
792 | 0 | Format("Waiting peer T $0 P $1 ready", peer->tablet_id(), peer->permanent_uuid())), |
793 | 0 | "List tablet peers failure"); |
794 | 0 | if (peer->consensus() != nullptr && filter(peer)) { |
795 | 0 | result.push_back(peer); |
796 | 0 | } |
797 | 0 | } |
798 | 0 | } |
799 | |
|
800 | 0 | return result; |
801 | 0 | } |
802 | | |
803 | | std::vector<tablet::TabletPeerPtr> ListTableActiveTabletLeadersPeers( |
804 | 0 | MiniCluster* cluster, const TableId& table_id) { |
805 | 0 | return ListTabletPeers(cluster, [&table_id](const auto& peer) { |
806 | 0 | return peer->tablet_metadata() && |
807 | 0 | peer->tablet_metadata()->table_id() == table_id && |
808 | 0 | peer->tablet_metadata()->tablet_data_state() != |
809 | 0 | tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED && |
810 | 0 | peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER; |
811 | 0 | }); |
812 | 0 | } |
813 | | |
814 | | std::vector<tablet::TabletPeerPtr> ListTableTabletPeers( |
815 | 0 | MiniCluster* cluster, const TableId& table_id) { |
816 | 0 | return ListTabletPeers(cluster, [table_id](const std::shared_ptr<tablet::TabletPeer>& peer) { |
817 | 0 | return peer->tablet_metadata()->table_id() == table_id; |
818 | 0 | }); |
819 | 0 | } |
820 | | |
821 | | namespace { |
822 | | |
823 | 0 | bool IsActive(tablet::TabletDataState tablet_data_state) { |
824 | 0 | return tablet_data_state != tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED && |
825 | 0 | tablet_data_state != tablet::TabletDataState::TABLET_DATA_TOMBSTONED && |
826 | 0 | tablet_data_state != tablet::TabletDataState::TABLET_DATA_DELETED; |
827 | 0 | } |
828 | | |
829 | | } // namespace |
830 | | |
831 | | std::vector<tablet::TabletPeerPtr> ListTableActiveTabletPeers( |
832 | 0 | MiniCluster* cluster, const TableId& table_id) { |
833 | 0 | std::vector<tablet::TabletPeerPtr> result; |
834 | 0 | for (auto peer : ListTableTabletPeers(cluster, table_id)) { |
835 | 0 | const auto tablet_meta = peer->tablet_metadata(); |
836 | 0 | if (IsActive(tablet_meta->tablet_data_state())) { |
837 | 0 | result.push_back(peer); |
838 | 0 | } |
839 | 0 | } |
840 | 0 | return result; |
841 | 0 | } |
842 | | |
843 | 0 | std::vector<tablet::TabletPeerPtr> ListActiveTabletLeadersPeers(MiniCluster* cluster) { |
844 | 0 | return ListTabletPeers(cluster, [](const auto& peer) { |
845 | 0 | const auto tablet_meta = peer->tablet_metadata(); |
846 | 0 | const auto consensus = peer->shared_consensus(); |
847 | 0 | return tablet_meta && tablet_meta->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE && |
848 | 0 | IsActive(tablet_meta->tablet_data_state()) && |
849 | 0 | consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER; |
850 | 0 | }); |
851 | 0 | } |
852 | | |
853 | | std::vector<tablet::TabletPeerPtr> ListTableInactiveSplitTabletPeers( |
854 | 0 | MiniCluster* cluster, const TableId& table_id) { |
855 | 0 | std::vector<tablet::TabletPeerPtr> result; |
856 | 0 | for (auto peer : ListTableTabletPeers(cluster, table_id)) { |
857 | 0 | if (peer->tablet()->metadata()->tablet_data_state() == |
858 | 0 | tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED) { |
859 | 0 | result.push_back(peer); |
860 | 0 | } |
861 | 0 | } |
862 | 0 | return result; |
863 | 0 | } |
864 | | |
865 | | Result<std::vector<tablet::TabletPeerPtr>> WaitForTableActiveTabletLeadersPeers( |
866 | | MiniCluster* cluster, const TableId& table_id, |
867 | 0 | const size_t num_active_leaders, const MonoDelta timeout) { |
868 | 0 | SCHECK_NOTNULL(cluster); |
869 | |
|
870 | 0 | std::vector<tablet::TabletPeerPtr> active_leaders_peers; |
871 | 0 | RETURN_NOT_OK(LoggedWaitFor([&] { |
872 | 0 | active_leaders_peers = ListTableActiveTabletLeadersPeers(cluster, table_id); |
873 | 0 | LOG(INFO) << "active_leader_peers.size(): " << active_leaders_peers.size(); |
874 | 0 | return active_leaders_peers.size() == num_active_leaders; |
875 | 0 | }, timeout, "Waiting for leaders ...")); |
876 | 0 | return active_leaders_peers; |
877 | 0 | } |
878 | | |
879 | | Status WaitUntilTabletHasLeader( |
880 | 0 | MiniCluster* cluster, const string& tablet_id, MonoTime deadline) { |
881 | 0 | return Wait([cluster, &tablet_id] { |
882 | 0 | auto tablet_peers = ListTabletPeers(cluster, [&tablet_id](auto peer) { |
883 | 0 | return peer->tablet_id() == tablet_id |
884 | 0 | && peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER; |
885 | 0 | }); |
886 | 0 | return tablet_peers.size() == 1; |
887 | 0 | }, deadline, "Waiting for election in tablet " + tablet_id); |
888 | 0 | } |
889 | | |
890 | 0 | CHECKED_STATUS WaitUntilMasterHasLeader(MiniCluster* cluster, MonoDelta timeout) { |
891 | 0 | return WaitFor([cluster] { |
892 | 0 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
893 | 0 | auto tablet_peer = cluster->mini_master(i)->tablet_peer(); |
894 | 0 | if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
895 | 0 | return true; |
896 | 0 | } |
897 | 0 | } |
898 | 0 | return false; |
899 | 0 | }, timeout, "Waiting for master leader"); |
900 | 0 | } |
901 | | |
902 | | Status WaitForLeaderOfSingleTablet( |
903 | | MiniCluster* cluster, tablet::TabletPeerPtr leader, MonoDelta duration, |
904 | 0 | const std::string& description) { |
905 | 0 | return WaitFor([cluster, &leader] { |
906 | 0 | auto new_leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders); |
907 | 0 | return new_leaders.size() == 1 && new_leaders[0] == leader; |
908 | 0 | }, duration, description); |
909 | 0 | } |
910 | | |
911 | | Status StepDown( |
912 | | tablet::TabletPeerPtr leader, const std::string& new_leader_uuid, |
913 | 0 | ForceStepDown force_step_down) { |
914 | 0 | consensus::LeaderStepDownRequestPB req; |
915 | 0 | req.set_tablet_id(leader->tablet_id()); |
916 | 0 | req.set_new_leader_uuid(new_leader_uuid); |
917 | 0 | if (force_step_down) { |
918 | 0 | req.set_force_step_down(true); |
919 | 0 | } |
920 | 0 | consensus::LeaderStepDownResponsePB resp; |
921 | 0 | RETURN_NOT_OK(leader->consensus()->StepDown(&req, &resp)); |
922 | 0 | if (resp.has_error()) { |
923 | 0 | return STATUS_FORMAT(RuntimeError, "Step down failed: $0", resp); |
924 | 0 | } |
925 | 0 | return Status::OK(); |
926 | 0 | } |
927 | | |
928 | | std::thread RestartsThread( |
929 | 0 | MiniCluster* cluster, CoarseDuration interval, std::atomic<bool>* stop_flag) { |
930 | 0 | return std::thread([cluster, interval, stop_flag] { |
931 | 0 | CDSAttacher attacher; |
932 | 0 | SetFlagOnExit set_stop_on_exit(stop_flag); |
933 | 0 | int it = 0; |
934 | 0 | while (!stop_flag->load(std::memory_order_acquire)) { |
935 | 0 | std::this_thread::sleep_for(interval); |
936 | 0 | ASSERT_OK(cluster->mini_tablet_server(++it % cluster->num_tablet_servers())->Restart()); |
937 | 0 | } |
938 | 0 | }); |
939 | 0 | } |
940 | | |
941 | 0 | Status WaitAllReplicasReady(MiniCluster* cluster, MonoDelta timeout) { |
942 | 0 | return WaitFor([cluster] { |
943 | 0 | std::unordered_set<std::string> tablet_ids; |
944 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll); |
945 | 0 | for (const auto& peer : peers) { |
946 | 0 | if (peer->state() != tablet::RaftGroupStatePB::RUNNING) { |
947 | 0 | return false; |
948 | 0 | } |
949 | 0 | tablet_ids.insert(peer->tablet_id()); |
950 | 0 | } |
951 | 0 | auto replication_factor = cluster->num_tablet_servers(); |
952 | 0 | return tablet_ids.size() * replication_factor == peers.size(); |
953 | 0 | }, timeout, "Wait all replicas to be ready"); |
954 | 0 | } |
955 | | |
956 | 0 | Status WaitAllReplicasHaveIndex(MiniCluster* cluster, int64_t index, MonoDelta timeout) { |
957 | 0 | return WaitFor([cluster, index] { |
958 | 0 | std::unordered_set<std::string> tablet_ids; |
959 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll); |
960 | 0 | for (const auto& peer : peers) { |
961 | 0 | if (peer->GetLatestLogEntryOpId().index < index) { |
962 | 0 | return false; |
963 | 0 | } |
964 | 0 | tablet_ids.insert(peer->tablet_id()); |
965 | 0 | } |
966 | 0 | auto replication_factor = cluster->num_tablet_servers(); |
967 | 0 | return tablet_ids.size() * replication_factor == peers.size(); |
968 | 0 | }, timeout, "Wait for all replicas to have a specific Raft index"); |
969 | 0 | } |
970 | | |
971 | | template <class Collection> |
972 | 0 | void PushBackIfNotNull(const typename Collection::value_type& value, Collection* collection) { |
973 | 0 | if (value != nullptr) { |
974 | 0 | collection->push_back(value); |
975 | 0 | } |
976 | 0 | } |
977 | | |
978 | 0 | std::vector<rocksdb::DB*> GetAllRocksDbs(MiniCluster* cluster, bool include_intents) { |
979 | 0 | std::vector<rocksdb::DB*> dbs; |
980 | 0 | for (auto& peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) { |
981 | 0 | const auto* tablet = peer->tablet(); |
982 | 0 | PushBackIfNotNull(tablet->TEST_db(), &dbs); |
983 | 0 | if (include_intents) { |
984 | 0 | PushBackIfNotNull(tablet->TEST_intents_db(), &dbs); |
985 | 0 | } |
986 | 0 | } |
987 | 0 | return dbs; |
988 | 0 | } |
989 | | |
990 | 0 | int NumTotalRunningCompactions(MiniCluster* cluster) { |
991 | 0 | int compactions = 0; |
992 | 0 | for (auto* db : GetAllRocksDbs(cluster)) { |
993 | 0 | compactions += down_cast<rocksdb::DBImpl*>(db)->TEST_NumTotalRunningCompactions(); |
994 | 0 | } |
995 | 0 | return compactions; |
996 | 0 | } |
997 | | |
998 | 0 | int NumRunningFlushes(MiniCluster* cluster) { |
999 | 0 | int flushes = 0; |
1000 | 0 | for (auto* db : GetAllRocksDbs(cluster)) { |
1001 | 0 | flushes += down_cast<rocksdb::DBImpl*>(db)->TEST_NumRunningFlushes(); |
1002 | 0 | } |
1003 | 0 | return flushes; |
1004 | 0 | } |
1005 | | |
1006 | | Result<scoped_refptr<master::TableInfo>> FindTable( |
1007 | 0 | MiniCluster* cluster, const client::YBTableName& table_name) { |
1008 | 0 | auto& catalog_manager = VERIFY_RESULT(cluster->GetLeaderMiniMaster())->catalog_manager(); |
1009 | 0 | master::TableIdentifierPB identifier; |
1010 | 0 | table_name.SetIntoTableIdentifierPB(&identifier); |
1011 | 0 | return catalog_manager.FindTable(identifier); |
1012 | 0 | } |
1013 | | |
1014 | 0 | Status WaitForInitDb(MiniCluster* cluster) { |
1015 | 0 | const auto start_time = CoarseMonoClock::now(); |
1016 | 0 | const auto kTimeout = RegularBuildVsSanitizers(600s, 1800s); |
1017 | 0 | while (CoarseMonoClock::now() <= start_time + kTimeout) { |
1018 | 0 | auto leader_mini_master = cluster->GetLeaderMiniMaster(); |
1019 | 0 | if (!leader_mini_master.ok()) { |
1020 | 0 | continue; |
1021 | 0 | } |
1022 | 0 | auto& catalog_manager = (*leader_mini_master)->catalog_manager(); |
1023 | 0 | master::IsInitDbDoneRequestPB req; |
1024 | 0 | master::IsInitDbDoneResponsePB resp; |
1025 | 0 | auto status = catalog_manager.IsInitDbDone(&req, &resp); |
1026 | 0 | if (!status.ok()) { |
1027 | 0 | LOG(INFO) << "IsInitDbDone failure: " << status; |
1028 | 0 | continue; |
1029 | 0 | } |
1030 | 0 | if (resp.has_initdb_error()) { |
1031 | 0 | return STATUS_FORMAT(RuntimeError, "Init DB failed: $0", resp.initdb_error()); |
1032 | 0 | } |
1033 | 0 | if (resp.done()) { |
1034 | 0 | return Status::OK(); |
1035 | 0 | } |
1036 | 0 | std::this_thread::sleep_for(500ms); |
1037 | 0 | } |
1038 | |
|
1039 | 0 | return STATUS_FORMAT(TimedOut, "Unable to init db in $0", kTimeout); |
1040 | 0 | } |
1041 | | |
1042 | 0 | size_t CountIntents(MiniCluster* cluster, const TabletPeerFilter& filter) { |
1043 | 0 | size_t result = 0; |
1044 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll); |
1045 | 0 | for (const auto &peer : peers) { |
1046 | 0 | auto participant = peer->tablet() ? peer->tablet()->transaction_participant() : nullptr; |
1047 | 0 | if (!participant) { |
1048 | 0 | continue; |
1049 | 0 | } |
1050 | 0 | if (filter && !filter(peer.get())) { |
1051 | 0 | continue; |
1052 | 0 | } |
1053 | 0 | auto intents_count = participant->TEST_CountIntents(); |
1054 | 0 | if (intents_count.first) { |
1055 | 0 | result += intents_count.first; |
1056 | 0 | LOG(INFO) << Format("T $0 P $1: Intents present: $2, transactions: $3", peer->tablet_id(), |
1057 | 0 | peer->permanent_uuid(), intents_count.first, intents_count.second); |
1058 | 0 | } |
1059 | 0 | } |
1060 | 0 | return result; |
1061 | 0 | } |
1062 | | |
1063 | 0 | MiniTabletServer* FindTabletLeader(MiniCluster* cluster, const TabletId& tablet_id) { |
1064 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
1065 | 0 | auto server = cluster->mini_tablet_server(i); |
1066 | 0 | if (!server->server()) { // Server is shut down. |
1067 | 0 | continue; |
1068 | 0 | } |
1069 | 0 | if (server->server()->LeaderAndReady(tablet_id)) { |
1070 | 0 | return server; |
1071 | 0 | } |
1072 | 0 | } |
1073 | |
|
1074 | 0 | return nullptr; |
1075 | 0 | } |
1076 | | |
1077 | 0 | void ShutdownAllTServers(MiniCluster* cluster) { |
1078 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
1079 | 0 | cluster->mini_tablet_server(i)->Shutdown(); |
1080 | 0 | } |
1081 | 0 | } |
1082 | | |
1083 | 0 | Status StartAllTServers(MiniCluster* cluster) { |
1084 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
1085 | 0 | RETURN_NOT_OK(cluster->mini_tablet_server(i)->Start()); |
1086 | 0 | } |
1087 | |
|
1088 | 0 | return Status::OK(); |
1089 | 0 | } |
1090 | | |
1091 | 0 | void ShutdownAllMasters(MiniCluster* cluster) { |
1092 | 0 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
1093 | 0 | cluster->mini_master(i)->Shutdown(); |
1094 | 0 | } |
1095 | 0 | } |
1096 | | |
1097 | 0 | Status StartAllMasters(MiniCluster* cluster) { |
1098 | 0 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
1099 | 0 | RETURN_NOT_OK(cluster->mini_master(i)->Start()); |
1100 | 0 | } |
1101 | |
|
1102 | 0 | return Status::OK(); |
1103 | 0 | } |
1104 | | |
1105 | | void SetupConnectivity( |
1106 | 0 | rpc::Messenger* messenger, const IpAddress& address, Connectivity connectivity) { |
1107 | 0 | switch (connectivity) { |
1108 | 0 | case Connectivity::kOn: |
1109 | 0 | messenger->RestoreConnectivityTo(address); |
1110 | 0 | return; |
1111 | 0 | case Connectivity::kOff: |
1112 | 0 | messenger->BreakConnectivityTo(address); |
1113 | 0 | return; |
1114 | 0 | } |
1115 | 0 | FATAL_INVALID_ENUM_VALUE(Connectivity, connectivity); |
1116 | 0 | } |
1117 | | |
1118 | | Status SetupConnectivity( |
1119 | 0 | MiniCluster* cluster, size_t idx1, size_t idx2, Connectivity connectivity) { |
1120 | 0 | for (auto from_idx : {idx1, idx2}) { |
1121 | 0 | auto to_idx = idx1 ^ idx2 ^ from_idx; |
1122 | 0 | for (auto type : {server::Private::kFalse, server::Private::kTrue}) { |
1123 | | // TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed. |
1124 | 0 | auto address = VERIFY_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, type))); |
1125 | 0 | if (from_idx < cluster->num_masters()) { |
1126 | 0 | SetupConnectivity( |
1127 | 0 | cluster->mini_master(from_idx)->master()->messenger(), address, connectivity); |
1128 | 0 | } |
1129 | 0 | if (from_idx < cluster->num_tablet_servers()) { |
1130 | 0 | SetupConnectivity( |
1131 | 0 | cluster->mini_tablet_server(from_idx)->server()->messenger(), address, connectivity); |
1132 | 0 | } |
1133 | 0 | } |
1134 | 0 | } |
1135 | |
|
1136 | 0 | return Status::OK(); |
1137 | 0 | } |
1138 | | |
1139 | 0 | Status BreakConnectivity(MiniCluster* cluster, size_t idx1, size_t idx2) { |
1140 | 0 | return SetupConnectivity(cluster, idx1, idx2, Connectivity::kOff); |
1141 | 0 | } |
1142 | | |
1143 | 0 | Result<size_t> ServerWithLeaders(MiniCluster* cluster) { |
1144 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
1145 | 0 | auto* server = cluster->mini_tablet_server(i)->server(); |
1146 | 0 | if (!server) { |
1147 | 0 | continue; |
1148 | 0 | } |
1149 | 0 | auto* ts_manager = server->tablet_manager(); |
1150 | 0 | if (ts_manager->GetLeaderCount() != 0) { |
1151 | 0 | return i; |
1152 | 0 | } |
1153 | 0 | } |
1154 | 0 | return STATUS(NotFound, "No tablet server with leaders"); |
1155 | 0 | } |
1156 | | |
1157 | 0 | void SetCompactFlushRateLimitBytesPerSec(MiniCluster* cluster, const size_t bytes_per_sec) { |
1158 | 0 | LOG(INFO) << "Setting FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec to: " << bytes_per_sec |
1159 | 0 | << " and updating compact/flush rate in existing tablets"; |
1160 | 0 | FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec = bytes_per_sec; |
1161 | 0 | for (auto& tablet_peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) { |
1162 | 0 | auto tablet = tablet_peer->shared_tablet(); |
1163 | 0 | for (auto* db : { tablet->TEST_db(), tablet->TEST_intents_db() }) { |
1164 | 0 | if (db) { |
1165 | 0 | db->GetDBOptions().rate_limiter->SetBytesPerSecond(bytes_per_sec); |
1166 | 0 | } |
1167 | 0 | } |
1168 | 0 | } |
1169 | 0 | } |
1170 | | |
1171 | | CHECKED_STATUS WaitAllReplicasSynchronizedWithLeader( |
1172 | 0 | MiniCluster* cluster, CoarseTimePoint deadline) { |
1173 | 0 | auto leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders); |
1174 | 0 | std::unordered_map<TabletId, int64_t> last_committed_idx; |
1175 | 0 | for (const auto& peer : leaders) { |
1176 | 0 | auto idx = peer->consensus()->GetLastCommittedOpId().index; |
1177 | 0 | last_committed_idx.emplace(peer->tablet_id(), idx); |
1178 | 0 | LOG(INFO) << "Committed op id for " << peer->tablet_id() << ": " << idx; |
1179 | 0 | } |
1180 | 0 | auto non_leaders = ListTabletPeers(cluster, ListPeersFilter::kNonLeaders); |
1181 | 0 | for (const auto& peer : non_leaders) { |
1182 | 0 | auto it = last_committed_idx.find(peer->tablet_id()); |
1183 | 0 | if (it == last_committed_idx.end()) { |
1184 | 0 | return STATUS_FORMAT(IllegalState, "Unknown committed op id for $0", peer->tablet_id()); |
1185 | 0 | } |
1186 | 0 | RETURN_NOT_OK(Wait([idx = it->second, peer]() { |
1187 | 0 | return peer->consensus()->GetLastCommittedOpId().index >= idx; |
1188 | 0 | }, |
1189 | 0 | deadline, Format("Wait T $0 P $1 commit $2", |
1190 | 0 | peer->tablet_id(), peer->permanent_uuid(), it->second))); |
1191 | 0 | } |
1192 | 0 | return Status::OK(); |
1193 | 0 | } |
1194 | | |
1195 | | } // namespace yb |