YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/mini_cluster.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/mini_cluster.h"
34
35
#include <algorithm>
36
37
#include "yb/client/client.h"
38
#include "yb/client/yb_table_name.h"
39
40
#include "yb/consensus/consensus.h"
41
#include "yb/consensus/consensus.pb.h"
42
43
#include "yb/gutil/casts.h"
44
#include "yb/gutil/strings/join.h"
45
#include "yb/gutil/strings/substitute.h"
46
47
#include "yb/master/catalog_entity_info.h"
48
#include "yb/master/catalog_manager_if.h"
49
#include "yb/master/master.h"
50
#include "yb/master/master_admin.pb.h"
51
#include "yb/master/master_client.pb.h"
52
#include "yb/master/master_cluster.pb.h"
53
#include "yb/master/mini_master.h"
54
#include "yb/master/scoped_leader_shared_lock.h"
55
#include "yb/master/ts_descriptor.h"
56
#include "yb/master/ts_manager.h"
57
58
#include "yb/rocksdb/db/db_impl.h"
59
#include "yb/rocksdb/rate_limiter.h"
60
61
#include "yb/rpc/messenger.h"
62
63
#include "yb/server/hybrid_clock.h"
64
#include "yb/server/skewed_clock.h"
65
66
#include "yb/tablet/tablet.h"
67
#include "yb/tablet/tablet_metadata.h"
68
#include "yb/tablet/tablet_peer.h"
69
#include "yb/tablet/transaction_participant.h"
70
71
#include "yb/tserver/mini_tablet_server.h"
72
#include "yb/tserver/tablet_server.h"
73
#include "yb/tserver/ts_tablet_manager.h"
74
75
#include "yb/util/debug/long_operation_tracker.h"
76
#include "yb/util/format.h"
77
#include "yb/util/path_util.h"
78
#include "yb/util/random_util.h"
79
#include "yb/util/scope_exit.h"
80
#include "yb/util/status.h"
81
#include "yb/util/status_format.h"
82
#include "yb/util/status_log.h"
83
#include "yb/util/stopwatch.h"
84
#include "yb/util/test_thread_holder.h"
85
#include "yb/util/test_util.h"
86
#include "yb/util/tsan_util.h"
87
88
using namespace std::literals;
89
using strings::Substitute;
90
91
DEFINE_string(mini_cluster_base_dir, "", "Directory for master/ts data");
92
DEFINE_bool(mini_cluster_reuse_data, false, "Reuse data of mini cluster");
93
DECLARE_int32(master_svc_num_threads);
94
DECLARE_int32(memstore_size_mb);
95
DECLARE_int32(master_consensus_svc_num_threads);
96
DECLARE_int32(master_remote_bootstrap_svc_num_threads);
97
DECLARE_int32(generic_svc_num_threads);
98
DECLARE_int32(tablet_server_svc_num_threads);
99
DECLARE_int32(ts_admin_svc_num_threads);
100
DECLARE_int32(ts_consensus_svc_num_threads);
101
DECLARE_int32(ts_remote_bootstrap_svc_num_threads);
102
DECLARE_int32(replication_factor);
103
DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec);
104
DECLARE_string(use_private_ip);
105
DECLARE_int32(load_balancer_initial_delay_secs);
106
DECLARE_int32(transaction_table_num_tablets);
107
108
namespace yb {
109
110
using client::YBClient;
111
using client::YBClientBuilder;
112
using master::CatalogManager;
113
using master::MiniMaster;
114
using master::TabletLocationsPB;
115
using master::TSDescriptor;
116
using std::shared_ptr;
117
using std::string;
118
using std::vector;
119
using tserver::MiniTabletServer;
120
using tserver::TabletServer;
121
using master::GetMasterClusterConfigResponsePB;
122
using master::ChangeMasterClusterConfigRequestPB;
123
using master::ChangeMasterClusterConfigResponsePB;
124
using master::SysClusterConfigEntryPB;
125
126
namespace {
127
128
const std::vector<uint16_t> EMPTY_MASTER_RPC_PORTS = {};
129
const int kMasterLeaderElectionWaitTimeSeconds = 20 * kTimeMultiplier;
130
const int kRegistrationWaitTimeSeconds = 45 * kTimeMultiplier;
131
const int kTabletReportWaitTimeSeconds = 5;
132
133
398
std::string GetClusterDataDirName(const MiniClusterOptions& options) {
134
398
  std::string cluster_name = "minicluster-data";
135
398
  if (options.cluster_id == "") {
136
398
    return cluster_name;
137
398
  }
138
0
  return Format("$0-$1", cluster_name, options.cluster_id);
139
0
}
140
141
398
std::string GetFsRoot(const MiniClusterOptions& options) {
142
398
  if (!options.data_root.empty()) {
143
0
    return options.data_root;
144
0
  }
145
398
  if (!FLAGS_mini_cluster_base_dir.empty()) {
146
0
    return FLAGS_mini_cluster_base_dir;
147
0
  }
148
398
  return JoinPathSegments(GetTestDataDirectory(), GetClusterDataDirName(options));
149
398
}
150
151
} // namespace
152
153
MiniCluster::MiniCluster(const MiniClusterOptions& options)
154
    : options_(options),
155
398
      fs_root_(GetFsRoot(options)) {
156
398
  mini_masters_.resize(options_.num_masters);
157
398
}
158
159
53
MiniCluster::~MiniCluster() {
160
53
  Shutdown();
161
53
}
162
163
398
Status MiniCluster::Start(const std::vector<tserver::TabletServerOptions>& extra_tserver_options) {
164
0
  CHECK(!fs_root_.empty()) << "No Fs root was provided";
165
398
  CHECK(!running_);
166
167
398
  EnsurePortsAllocated();
168
169
398
  if (!options_.master_env->FileExists(fs_root_)) {
170
398
    RETURN_NOT_OK(options_.master_env->CreateDir(fs_root_));
171
398
  }
172
173
  // TODO: properly handle setting these variables in case of multiple MiniClusters in the same
174
  // process.
175
176
  // Use conservative number of threads for the mini cluster for unit test env
177
  // where several unit tests tend to run in parallel.
178
  // To get default number of threads - try to find SERVICE_POOL_OPTIONS macro usage.
179
398
  FLAGS_master_svc_num_threads = 2;
180
398
  FLAGS_master_consensus_svc_num_threads = 2;
181
398
  FLAGS_master_remote_bootstrap_svc_num_threads = 2;
182
398
  FLAGS_generic_svc_num_threads = 2;
183
184
398
  FLAGS_tablet_server_svc_num_threads = 8;
185
398
  FLAGS_ts_admin_svc_num_threads = 2;
186
398
  FLAGS_ts_consensus_svc_num_threads = 8;
187
398
  FLAGS_ts_remote_bootstrap_svc_num_threads = 2;
188
189
  // Limit number of transaction table tablets to help avoid timeouts.
190
398
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_transaction_table_num_tablets) =
191
398
      NumTabletsPerTransactionTable(options_);
192
193
  // We are testing public/private IPs using mini cluster. So set mode to 'cloud'.
194
398
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_private_ip) = "cloud";
195
196
  // This dictates the RF of newly created tables.
197
325
  SetAtomicFlag(options_.num_tablet_servers >= 3 ? 3 : 1, &FLAGS_replication_factor);
198
398
  FLAGS_memstore_size_mb = 16;
199
  // Default master args to make sure we don't wait to trigger new LB tasks upon master leader
200
  // failover.
201
398
  FLAGS_load_balancer_initial_delay_secs = 0;
202
203
  // start the masters
204
398
  RETURN_NOT_OK_PREPEND(StartMasters(),
205
393
                        "Couldn't start distributed masters");
206
207
393
  if (!extra_tserver_options.empty() &&
208
2
      extra_tserver_options.size() != options_.num_tablet_servers) {
209
0
    return STATUS_SUBSTITUTE(InvalidArgument, "num tserver options: $0 doesn't match with num "
210
0
        "tservers: $1", extra_tserver_options.size(), options_.num_tablet_servers);
211
0
  }
212
213
1.04k
  for (size_t i = 0; i < options_.num_tablet_servers; i++) {
214
656
    if (!extra_tserver_options.empty()) {
215
2
      RETURN_NOT_OK_PREPEND(AddTabletServer(extra_tserver_options[i]),
216
2
                            Substitute("Error adding TS $0", i));
217
654
    } else {
218
654
      RETURN_NOT_OK_PREPEND(AddTabletServer(),
219
653
                            Substitute("Error adding TS $0", i));
220
653
    }
221
222
656
  }
223
224
392
  RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(options_.num_tablet_servers),
225
392
                        "Waiting for tablet servers to start");
226
227
392
  running_ = true;
228
392
  return Status::OK();
229
392
}
230
231
398
Status MiniCluster::StartMasters() {
232
398
  CHECK_GE(master_rpc_ports_.size(), options_.num_masters);
233
398
  EnsurePortsAllocated();
234
235
398
  LOG(INFO) << "Creating distributed mini masters. RPC ports: "
236
398
            << JoinInts(master_rpc_ports_, ", ");
237
238
398
  if (mini_masters_.size() < options_.num_masters) {
239
0
    mini_masters_.resize(options_.num_masters);
240
0
  }
241
242
398
  bool started = false;
243
370
  auto se = ScopeExit([this, &started] {
244
370
    if (!started) {
245
15
      for (const auto& master : mini_masters_) {
246
15
        if (master) {
247
15
          master->Shutdown();
248
15
        }
249
15
      }
250
5
    }
251
370
  });
252
253
857
  for (size_t i = 0; i < options_.num_masters; i++) {
254
464
    mini_masters_[i] = std::make_shared<MiniMaster>(
255
464
        options_.master_env, GetMasterFsRoot(i), master_rpc_ports_[i], master_web_ports_[i], i);
256
464
    auto status = mini_masters_[i]->StartDistributedMaster(master_rpc_ports_);
257
33
    LOG_IF(INFO, !status.ok()) << "Failed to start master: " << status;
258
464
    RETURN_NOT_OK_PREPEND(status, Substitute("Couldn't start follower $0", i));
259
28
    VLOG(1) << "Started MiniMaster with UUID " << mini_masters_[i]->permanent_uuid()
260
28
            << " at index " << i;
261
459
  }
262
393
  int i = 0;
263
365
  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
264
365
    LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
265
365
    RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
266
365
                          Substitute("Could not initialize catalog manager on master $0", i));
267
365
  }
268
393
  started = true;
269
393
  return Status::OK();
270
393
}
271
272
0
Status MiniCluster::StartSync() {
273
0
  RETURN_NOT_OK(Start());
274
0
  int count = 0;
275
0
  for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
276
0
    RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
277
0
                          Substitute("TabletServer $0 failed to start.", count));
278
0
    count++;
279
0
  }
280
0
  return Status::OK();
281
0
}
282
283
0
Status MiniCluster::RestartSync() {
284
0
  LOG(INFO) << string(80, '-');
285
0
  LOG(INFO) << __FUNCTION__;
286
0
  LOG(INFO) << string(80, '-');
287
288
0
  LOG(INFO) << "Restart tablet server(s)...";
289
0
  for (auto& tablet_server : mini_tablet_servers_) {
290
0
    CHECK_OK(tablet_server->Restart());
291
0
    CHECK_OK(tablet_server->WaitStarted());
292
0
  }
293
0
  LOG(INFO) << "Restart master server(s)...";
294
0
  for (auto& master_server : mini_masters_) {
295
0
    LOG(INFO) << "Restarting master " << master_server->permanent_uuid();
296
0
    LongOperationTracker long_operation_tracker("Master restart", 5s);
297
0
    CHECK_OK(master_server->Restart());
298
0
    LOG(INFO) << "Waiting for catalog manager at " << master_server->permanent_uuid();
299
0
    CHECK_OK(master_server->WaitForCatalogManagerInit());
300
0
  }
301
0
  LOG(INFO) << string(80, '-');
302
0
  LOG(INFO) << __FUNCTION__ << " done";
303
0
  LOG(INFO) << string(80, '-');
304
305
0
  RETURN_NOT_OK_PREPEND(WaitForAllTabletServers(),
306
0
                        "Waiting for tablet servers to start");
307
0
  running_ = true;
308
0
  return Status::OK();
309
0
}
310
311
657
Status MiniCluster::AddTabletServer(const tserver::TabletServerOptions& extra_opts) {
312
657
  if (mini_masters_.empty()) {
313
0
    return STATUS(IllegalState, "Master not yet initialized");
314
0
  }
315
657
  auto new_idx = mini_tablet_servers_.size();
316
317
657
  EnsurePortsAllocated(0 /* num_masters (will pick default) */, new_idx + 1);
318
657
  const uint16_t ts_rpc_port = tserver_rpc_ports_[new_idx];
319
320
657
  std::shared_ptr<MiniTabletServer> tablet_server;
321
657
  if (options_.num_drives == 1) {
322
647
    tablet_server = std::make_shared<MiniTabletServer>(
323
647
          GetTabletServerFsRoot(new_idx), ts_rpc_port, extra_opts, new_idx);
324
10
  } else {
325
10
    std::vector<std::string> dirs;
326
40
    for (int i = 0; i < options_.num_drives; ++i) {
327
30
      dirs.push_back(GetTabletServerDrive(new_idx, i));
328
30
    }
329
10
    tablet_server = std::make_shared<MiniTabletServer>(
330
10
          dirs, dirs, ts_rpc_port, extra_opts, new_idx);
331
10
  }
332
333
  // set the master addresses
334
657
  auto master_addr = std::make_shared<server::MasterAddresses>();
335
657
  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
336
657
    master_addr->push_back({HostPort(master->bound_rpc_addr())});
337
657
    for (const auto& hp : master->master()->opts().broadcast_addresses) {
338
657
      master_addr->back().push_back(hp);
339
657
    }
340
657
  }
341
342
657
  tablet_server->options()->master_addresses_flag = server::MasterAddressesToString(*master_addr);
343
657
  tablet_server->options()->SetMasterAddresses(master_addr);
344
657
  tablet_server->options()->webserver_opts.port = tserver_web_ports_[new_idx];
345
657
  if (options_.ts_env) {
346
10
    tablet_server->options()->env = options_.ts_env;
347
10
  }
348
657
  if (options_.ts_rocksdb_env) {
349
10
    tablet_server->options()->rocksdb_env = options_.ts_rocksdb_env;
350
10
  }
351
657
  RETURN_NOT_OK(tablet_server->Start());
352
656
  mini_tablet_servers_.push_back(tablet_server);
353
656
  return Status::OK();
354
657
}
355
356
655
Status MiniCluster::AddTabletServer() {
357
655
  auto options = tserver::TabletServerOptions::CreateTabletServerOptions();
358
655
  RETURN_NOT_OK(options);
359
655
  return AddTabletServer(*options);
360
655
}
361
362
namespace {
363
364
Status ChangeClusterConfig(
365
    master::CatalogManagerIf* catalog_manager,
366
0
    std::function<void(SysClusterConfigEntryPB*)> config_changer) {
367
0
  GetMasterClusterConfigResponsePB config_resp;
368
0
  RETURN_NOT_OK(catalog_manager->GetClusterConfig(&config_resp));
369
370
0
  ChangeMasterClusterConfigRequestPB change_req;
371
0
  *change_req.mutable_cluster_config() = std::move(*config_resp.mutable_cluster_config());
372
0
  SysClusterConfigEntryPB* config = change_req.mutable_cluster_config();
373
374
0
  config_changer(config);
375
376
0
  ChangeMasterClusterConfigResponsePB change_resp;
377
0
  return catalog_manager->SetClusterConfig(&change_req, &change_resp);
378
0
}
379
380
} // namespace
381
382
0
Status MiniCluster::AddTServerToBlacklist(const MiniTabletServer& ts) {
383
0
  const auto* master = VERIFY_RESULT(GetLeaderMiniMaster());
384
385
0
  RETURN_NOT_OK(ChangeClusterConfig(
386
0
      &master->catalog_manager(), [&ts](SysClusterConfigEntryPB* config) {
387
        // Add tserver to blacklist.
388
0
        HostPortPB* blacklist_host_pb = config->mutable_server_blacklist()->mutable_hosts()->Add();
389
0
        blacklist_host_pb->set_host(ts.bound_rpc_addr().address().to_string());
390
0
        blacklist_host_pb->set_port(ts.bound_rpc_addr().port());
391
0
      }));
392
393
0
  LOG(INFO) << "TServer " << ts.server()->permanent_uuid() << " at "
394
0
            << ts.bound_rpc_addr().address().to_string() << ":" << ts.bound_rpc_addr().port()
395
0
            << " was added to the blacklist";
396
397
0
  return Status::OK();
398
0
}
399
400
0
Status MiniCluster::ClearBlacklist() {
401
0
  const auto* master = VERIFY_RESULT(GetLeaderMiniMaster());
402
403
0
  RETURN_NOT_OK(
404
0
      ChangeClusterConfig(&master->catalog_manager(), [](SysClusterConfigEntryPB* config) {
405
0
        config->mutable_server_blacklist()->Clear();
406
0
        config->mutable_leader_blacklist()->Clear();
407
0
      }));
408
409
0
  LOG(INFO) << "Blacklist has been cleared";
410
411
0
  return Status::OK();
412
0
}
413
414
0
string MiniCluster::GetMasterAddresses() const {
415
0
  string peer_addrs = "";
416
0
  for (const auto& master : mini_masters_) {
417
0
    if (!peer_addrs.empty()) {
418
0
      peer_addrs += ",";
419
0
    }
420
0
    peer_addrs += master->bound_rpc_addr_str();
421
0
  }
422
0
  return peer_addrs;
423
0
}
424
425
0
string MiniCluster::GetTserverHTTPAddresses() const {
426
0
  string peer_addrs = "";
427
0
  for (const auto& tserver : mini_tablet_servers_) {
428
0
    if (!peer_addrs.empty()) {
429
0
      peer_addrs += ",";
430
0
    }
431
0
    peer_addrs += tserver->bound_http_addr_str();
432
0
  }
433
0
  return peer_addrs;
434
0
}
435
436
6.21k
ssize_t MiniCluster::LeaderMasterIdx() {
437
6.21k
  Stopwatch sw;
438
6.21k
  sw.start();
439
167k
  while (sw.elapsed().wall_seconds() < kMasterLeaderElectionWaitTimeSeconds) {
440
328k
    for (size_t i = 0; i < mini_masters_.size(); i++) {
441
167k
      MiniMaster* master = mini_master(i);
442
167k
      if (master->master() == nullptr || master->master()->IsShutdown()) {
443
0
        continue;
444
0
      }
445
167k
      SCOPED_LEADER_SHARED_LOCK(l, master->master()->catalog_manager_impl());
446
167k
      if (l.catalog_status().ok() && l.leader_status().ok()) {
447
6.21k
        return i;
448
6.21k
      }
449
167k
    }
450
161k
    SleepFor(MonoDelta::FromMilliseconds(1));
451
161k
  }
452
0
  LOG(ERROR) << "No leader master elected after " << kMasterLeaderElectionWaitTimeSeconds
453
0
             << " seconds.";
454
0
  return -1;
455
6.21k
}
456
457
6.21k
Result<MiniMaster*> MiniCluster::GetLeaderMiniMaster() {
458
6.21k
  const auto idx = LeaderMasterIdx();
459
6.21k
  if (idx == -1) {
460
0
    return STATUS(TimedOut, "No leader master has been elected");
461
0
  }
462
6.21k
  return mini_master(idx);
463
6.21k
}
464
465
109
void MiniCluster::Shutdown() {
466
109
  if (!running_)
467
88
    return;
468
469
21
  for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
470
21
    tablet_server->Shutdown();
471
21
  }
472
21
  mini_tablet_servers_.clear();
473
474
21
  for (shared_ptr<MiniMaster>& master_server : mini_masters_) {
475
21
    master_server->Shutdown();
476
21
    master_server.reset();
477
21
  }
478
21
  mini_masters_.clear();
479
480
21
  running_ = false;
481
21
}
482
483
0
Status MiniCluster::FlushTablets(tablet::FlushMode mode, tablet::FlushFlags flags) {
484
0
  for (const auto& tablet_server : mini_tablet_servers_) {
485
0
    RETURN_NOT_OK(tablet_server->FlushTablets(mode, flags));
486
0
  }
487
0
  return Status::OK();
488
0
}
489
490
0
Status MiniCluster::CompactTablets() {
491
0
  for (const auto& tablet_server : mini_tablet_servers_) {
492
0
    RETURN_NOT_OK(tablet_server->CompactTablets());
493
0
  }
494
0
  return Status::OK();
495
0
}
496
497
0
Status MiniCluster::SwitchMemtables() {
498
0
  for (const auto& tablet_server : mini_tablet_servers_) {
499
0
    RETURN_NOT_OK(tablet_server->SwitchMemtables());
500
0
  }
501
0
  return Status::OK();
502
0
}
503
504
0
Status MiniCluster::CleanTabletLogs() {
505
0
  for (const auto& tablet_server : mini_tablet_servers_) {
506
0
    RETURN_NOT_OK(tablet_server->CleanTabletLogs());
507
0
  }
508
0
  return Status::OK();
509
0
}
510
511
0
void MiniCluster::ShutdownMasters() {
512
0
  for (shared_ptr<MiniMaster>& master_server : mini_masters_) {
513
0
    master_server->Shutdown();
514
0
    master_server.reset();
515
0
  }
516
0
}
517
518
173k
MiniMaster* MiniCluster::mini_master(size_t idx) {
519
0
  CHECK_GE(idx, 0) << "Master idx must be >= 0";
520
0
  CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
521
173k
  return mini_masters_[idx].get();
522
173k
}
523
524
40
MiniTabletServer* MiniCluster::mini_tablet_server(size_t idx) {
525
0
  CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
526
0
  CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
527
40
  return mini_tablet_servers_[idx].get();
528
40
}
529
530
0
MiniTabletServer* MiniCluster::find_tablet_server(const std::string& uuid) {
531
0
  for (const auto& server : mini_tablet_servers_) {
532
0
    if (!server->server()) {
533
0
      continue;
534
0
    }
535
0
    if (server->server()->instance_pb().permanent_uuid() == uuid) {
536
0
      return server.get();
537
0
    }
538
0
  }
539
0
  return nullptr;
540
0
}
541
542
464
string MiniCluster::GetMasterFsRoot(size_t idx) {
543
464
  return JoinPathSegments(fs_root_, Substitute("master-$0-root", idx + 1));
544
464
}
545
546
647
string MiniCluster::GetTabletServerFsRoot(size_t idx) {
547
647
  return JoinPathSegments(fs_root_, Substitute("ts-$0-root", idx + 1));
548
647
}
549
550
75
string MiniCluster::GetTabletServerDrive(size_t idx, int drive_index) {
551
75
  if (options_.num_drives == 1) {
552
0
    return GetTabletServerFsRoot(idx);
553
0
  }
554
75
  return JoinPathSegments(fs_root_, Substitute("ts-$0-drive-$1", idx + 1, drive_index + 1));
555
75
}
556
557
28
tserver::TSTabletManager* MiniCluster::GetTabletManager(size_t idx) {
558
28
  return mini_tablet_server(idx)->server()->tablet_manager();
559
28
}
560
561
0
std::vector<std::shared_ptr<tablet::TabletPeer>> MiniCluster::GetTabletPeers(size_t idx) {
562
0
  return GetTabletManager(idx)->GetTabletPeers();
563
0
}
564
565
Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
566
                                        int expected_count,
567
0
                                        TabletLocationsPB* locations) {
568
0
  Stopwatch sw;
569
0
  sw.start();
570
0
  while (sw.elapsed().wall_seconds() < kTabletReportWaitTimeSeconds) {
571
0
    auto leader_mini_master = GetLeaderMiniMaster();
572
0
    if (!leader_mini_master.ok()) {
573
0
      continue;
574
0
    }
575
0
    locations->Clear();
576
0
    Status s = (*leader_mini_master)
577
0
                   ->master()
578
0
                   ->catalog_manager()
579
0
                   ->GetTabletLocations(tablet_id, locations);
580
0
    if (s.ok() && ((locations->stale() && expected_count == 0) ||
581
0
        (!locations->stale() && locations->replicas_size() == expected_count))) {
582
0
      return Status::OK();
583
0
    }
584
585
0
    SleepFor(MonoDelta::FromMilliseconds(1));
586
0
  }
587
0
  return STATUS(TimedOut, Substitute("Tablet $0 never reached expected replica count $1",
588
0
                                     tablet_id, expected_count));
589
0
}
590
591
0
Status MiniCluster::WaitForAllTabletServers() {
592
0
  return WaitForTabletServerCount(num_tablet_servers());
593
0
}
594
595
108
Status MiniCluster::WaitForTabletServerCount(size_t count) {
596
108
  vector<shared_ptr<master::TSDescriptor> > descs;
597
108
  return WaitForTabletServerCount(count, &descs);
598
108
}
599
600
Status MiniCluster::WaitForTabletServerCount(size_t count,
601
109
                                             vector<shared_ptr<TSDescriptor> >* descs) {
602
109
  Stopwatch sw;
603
109
  sw.start();
604
6.21k
  while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
605
6.21k
    auto leader = GetLeaderMiniMaster();
606
6.21k
    if (leader.ok()) {
607
6.21k
      (*leader)->ts_manager().GetAllDescriptors(descs);
608
6.21k
      if (descs->size() == count) {
609
        // GetAllDescriptors() may return servers that are no longer online.
610
        // Do a second step of verification to verify that the descs that we got
611
        // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
612
109
        size_t match_count = 0;
613
109
        for (const shared_ptr<TSDescriptor>& desc : *descs) {
614
109
          for (auto mini_tablet_server : mini_tablet_servers_) {
615
109
            auto ts = mini_tablet_server->server();
616
109
            if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
617
109
                ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
618
109
              match_count++;
619
109
              break;
620
109
            }
621
109
          }
622
109
        }
623
624
109
        if (match_count == count) {
625
109
          LOG(INFO) << count << " TS(s) registered with Master after "
626
109
                    << sw.elapsed().wall_seconds() << "s";
627
109
          return Status::OK();
628
109
        }
629
6.11k
      }
630
631
6.11k
      YB_LOG_EVERY_N_SECS(INFO, 5) << "Registered: " << AsString(*descs);
632
6.11k
    }
633
634
6.11k
    SleepFor(MonoDelta::FromMilliseconds(1));
635
6.11k
  }
636
0
  return STATUS(TimedOut, Substitute("$0 TS(s) never registered with master", count));
637
109
}
638
639
35
void MiniCluster::ConfigureClientBuilder(YBClientBuilder* builder) {
640
35
  CHECK_NOTNULL(builder);
641
35
  builder->clear_master_server_addrs();
642
35
  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
643
35
    CHECK(master);
644
35
    builder->add_master_server_addr(master->bound_rpc_addr_str());
645
35
  }
646
35
}
647
648
0
Result<HostPort> MiniCluster::DoGetLeaderMasterBoundRpcAddr() {
649
0
  return VERIFY_RESULT(GetLeaderMiniMaster())->bound_rpc_addr();
650
0
}
651
652
void MiniCluster::AllocatePortsForDaemonType(
653
    const string daemon_type,
654
    const size_t num_daemons,
655
    const string port_type,
656
5.81k
    std::vector<uint16_t>* ports) {
657
5.81k
  const size_t old_size = ports->size();
658
5.81k
  if (ports->size() < num_daemons) {
659
1.59k
    ports->resize(num_daemons, 0 /* default value */);
660
1.59k
  }
661
8.84k
  for (auto i = old_size; i < num_daemons; ++i) {
662
3.03k
    if ((*ports)[i] == 0) {
663
3.03k
      const uint16_t new_port = port_picker_.AllocateFreePort();
664
3.03k
      (*ports)[i] = new_port;
665
3.03k
      LOG(INFO) << "Using auto-assigned port " << new_port << " for a " << daemon_type
666
3.03k
                << " " << port_type << " port";
667
3.03k
    }
668
3.03k
  }
669
5.81k
}
670
671
1.45k
void MiniCluster::EnsurePortsAllocated(size_t new_num_masters, size_t new_num_tservers) {
672
1.45k
  if (new_num_masters == 0) {
673
1.45k
    new_num_masters = std::max(options_.num_masters, num_masters());
674
1.45k
  }
675
1.45k
  AllocatePortsForDaemonType("master", new_num_masters, "RPC", &master_rpc_ports_);
676
1.45k
  AllocatePortsForDaemonType("master", new_num_masters, "web", &master_web_ports_);
677
678
1.45k
  if (new_num_tservers == 0) {
679
796
    new_num_tservers = std::max(options_.num_tablet_servers, num_tablet_servers());
680
796
  }
681
1.45k
  AllocatePortsForDaemonType("tablet server", new_num_tservers, "RPC", &tserver_rpc_ports_);
682
1.45k
  AllocatePortsForDaemonType("tablet server", new_num_tservers, "web", &tserver_web_ports_);
683
1.45k
}
684
685
server::SkewedClockDeltaChanger JumpClock(
686
0
    server::RpcServerBase* server, std::chrono::milliseconds delta) {
687
0
  auto* hybrid_clock = down_cast<server::HybridClock*>(server->clock());
688
0
  return server::SkewedClockDeltaChanger(
689
0
      delta, std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock()));
690
0
}
691
692
std::vector<server::SkewedClockDeltaChanger> SkewClocks(
693
0
    MiniCluster* cluster, std::chrono::milliseconds clock_skew) {
694
0
  std::vector<server::SkewedClockDeltaChanger> delta_changers;
695
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
696
0
    delta_changers.push_back(JumpClock(cluster->mini_tablet_server(i)->server(), i * clock_skew));
697
0
  }
698
0
  return delta_changers;
699
0
}
700
701
std::vector<server::SkewedClockDeltaChanger> JumpClocks(
702
0
    MiniCluster* cluster, std::chrono::milliseconds delta) {
703
0
  std::vector<server::SkewedClockDeltaChanger> delta_changers;
704
0
  auto num_masters = cluster->num_masters();
705
0
  auto num_tservers = cluster->num_tablet_servers();
706
0
  delta_changers.reserve(num_masters + num_tservers);
707
0
  for (size_t i = 0; i != num_masters; ++i) {
708
0
    delta_changers.push_back(JumpClock(cluster->mini_master(i)->master(), delta));
709
0
  }
710
0
  for (size_t i = 0; i != num_tservers; ++i) {
711
0
    delta_changers.push_back(JumpClock(cluster->mini_tablet_server(i)->server(), delta));
712
0
  }
713
0
  return delta_changers;
714
0
}
715
716
0
void StepDownAllTablets(MiniCluster* cluster) {
717
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
718
0
    for (const auto& peer : cluster->GetTabletPeers(i)) {
719
0
      consensus::LeaderStepDownRequestPB req;
720
0
      req.set_tablet_id(peer->tablet_id());
721
0
      consensus::LeaderStepDownResponsePB resp;
722
0
      ASSERT_OK(peer->consensus()->StepDown(&req, &resp));
723
0
    }
724
0
  }
725
0
}
726
727
0
void StepDownRandomTablet(MiniCluster* cluster) {
728
0
  auto peers = ListTabletPeers(cluster, ListPeersFilter::kLeaders);
729
0
  if (!peers.empty()) {
730
0
    auto peer = RandomElement(peers);
731
732
0
    consensus::LeaderStepDownRequestPB req;
733
0
    req.set_tablet_id(peer->tablet_id());
734
0
    consensus::LeaderStepDownResponsePB resp;
735
0
    ASSERT_OK(peer->consensus()->StepDown(&req, &resp));
736
0
  }
737
0
}
738
739
0
std::unordered_set<string> ListTabletIdsForTable(MiniCluster* cluster, const string& table_id) {
740
0
  std::unordered_set<string> tablet_ids;
741
0
  for (auto peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) {
742
0
    if (peer->tablet_metadata()->table_id() == table_id) {
743
0
      tablet_ids.insert(peer->tablet_id());
744
0
    }
745
0
  }
746
0
  return tablet_ids;
747
0
}
748
749
std::unordered_set<string> ListActiveTabletIdsForTable(
750
0
    MiniCluster* cluster, const string& table_id) {
751
0
  std::unordered_set<string> tablet_ids;
752
0
  for (auto peer : ListTableActiveTabletPeers(cluster, table_id)) {
753
0
    tablet_ids.insert(peer->tablet_id());
754
0
  }
755
0
  return tablet_ids;
756
0
}
757
758
0
std::vector<tablet::TabletPeerPtr> ListTabletPeers(MiniCluster* cluster, ListPeersFilter filter) {
759
0
  switch (filter) {
760
0
    case ListPeersFilter::kAll:
761
0
      return ListTabletPeers(cluster, [](const auto& peer) { return true; });
762
0
    case ListPeersFilter::kLeaders:
763
0
      return ListTabletPeers(cluster, [](const auto& peer) {
764
0
        auto consensus = peer->shared_consensus();
765
0
        return consensus && consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
766
0
      });
767
0
    case ListPeersFilter::kNonLeaders:
768
0
      return ListTabletPeers(cluster, [](const auto& peer) {
769
0
        auto consensus = peer->shared_consensus();
770
0
        return consensus && consensus->GetLeaderStatus() == consensus::LeaderStatus::NOT_LEADER;
771
0
      });
772
0
  }
773
774
0
  FATAL_INVALID_ENUM_VALUE(ListPeersFilter, filter);
775
0
}
776
777
std::vector<tablet::TabletPeerPtr> ListTabletPeers(
778
    MiniCluster* cluster,
779
0
    const std::function<bool(const std::shared_ptr<tablet::TabletPeer>&)>& filter) {
780
0
  std::vector<tablet::TabletPeerPtr> result;
781
782
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
783
0
    auto server = cluster->mini_tablet_server(i)->server();
784
0
    if (!server) { // Server is shut down.
785
0
      continue;
786
0
    }
787
0
    auto peers = server->tablet_manager()->GetTabletPeers();
788
0
    for (const auto& peer : peers) {
789
0
      WARN_NOT_OK(
790
0
          WaitFor(
791
0
              [peer] { return peer->consensus() != nullptr || peer->IsShutdownStarted(); }, 5s,
792
0
              Format("Waiting peer T $0 P $1 ready", peer->tablet_id(), peer->permanent_uuid())),
793
0
          "List tablet peers failure");
794
0
      if (peer->consensus() != nullptr && filter(peer)) {
795
0
        result.push_back(peer);
796
0
      }
797
0
    }
798
0
  }
799
800
0
  return result;
801
0
}
802
803
std::vector<tablet::TabletPeerPtr> ListTableActiveTabletLeadersPeers(
804
0
    MiniCluster* cluster, const TableId& table_id) {
805
0
  return ListTabletPeers(cluster, [&table_id](const auto& peer) {
806
0
    return peer->tablet_metadata() &&
807
0
           peer->tablet_metadata()->table_id() == table_id &&
808
0
           peer->tablet_metadata()->tablet_data_state() !=
809
0
               tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED &&
810
0
           peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
811
0
  });
812
0
}
813
814
std::vector<tablet::TabletPeerPtr> ListTableTabletPeers(
815
0
      MiniCluster* cluster, const TableId& table_id) {
816
0
  return ListTabletPeers(cluster, [table_id](const std::shared_ptr<tablet::TabletPeer>& peer) {
817
0
    return peer->tablet_metadata()->table_id() == table_id;
818
0
  });
819
0
}
820
821
namespace {
822
823
0
bool IsActive(tablet::TabletDataState tablet_data_state) {
824
0
  return tablet_data_state != tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED &&
825
0
         tablet_data_state != tablet::TabletDataState::TABLET_DATA_TOMBSTONED &&
826
0
         tablet_data_state != tablet::TabletDataState::TABLET_DATA_DELETED;
827
0
}
828
829
} // namespace
830
831
std::vector<tablet::TabletPeerPtr> ListTableActiveTabletPeers(
832
0
    MiniCluster* cluster, const TableId& table_id) {
833
0
  std::vector<tablet::TabletPeerPtr> result;
834
0
  for (auto peer : ListTableTabletPeers(cluster, table_id)) {
835
0
    const auto tablet_meta = peer->tablet_metadata();
836
0
    if (IsActive(tablet_meta->tablet_data_state())) {
837
0
      result.push_back(peer);
838
0
    }
839
0
  }
840
0
  return result;
841
0
}
842
843
0
std::vector<tablet::TabletPeerPtr> ListActiveTabletLeadersPeers(MiniCluster* cluster) {
844
0
  return ListTabletPeers(cluster, [](const auto& peer) {
845
0
    const auto tablet_meta = peer->tablet_metadata();
846
0
    const auto consensus = peer->shared_consensus();
847
0
    return tablet_meta && tablet_meta->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE &&
848
0
           IsActive(tablet_meta->tablet_data_state()) &&
849
0
           consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
850
0
  });
851
0
}
852
853
std::vector<tablet::TabletPeerPtr> ListTableInactiveSplitTabletPeers(
854
0
    MiniCluster* cluster, const TableId& table_id) {
855
0
  std::vector<tablet::TabletPeerPtr> result;
856
0
  for (auto peer : ListTableTabletPeers(cluster, table_id)) {
857
0
    if (peer->tablet()->metadata()->tablet_data_state() ==
858
0
        tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
859
0
      result.push_back(peer);
860
0
    }
861
0
  }
862
0
  return result;
863
0
}
864
865
Result<std::vector<tablet::TabletPeerPtr>> WaitForTableActiveTabletLeadersPeers(
866
    MiniCluster* cluster, const TableId& table_id,
867
0
    const size_t num_active_leaders, const MonoDelta timeout) {
868
0
  SCHECK_NOTNULL(cluster);
869
870
0
  std::vector<tablet::TabletPeerPtr> active_leaders_peers;
871
0
  RETURN_NOT_OK(LoggedWaitFor([&] {
872
0
    active_leaders_peers = ListTableActiveTabletLeadersPeers(cluster, table_id);
873
0
    LOG(INFO) << "active_leader_peers.size(): " << active_leaders_peers.size();
874
0
    return active_leaders_peers.size() == num_active_leaders;
875
0
  }, timeout, "Waiting for leaders ..."));
876
0
  return active_leaders_peers;
877
0
}
878
879
Status WaitUntilTabletHasLeader(
880
0
    MiniCluster* cluster, const string& tablet_id, MonoTime deadline) {
881
0
  return Wait([cluster, &tablet_id] {
882
0
    auto tablet_peers = ListTabletPeers(cluster, [&tablet_id](auto peer) {
883
0
      return peer->tablet_id() == tablet_id
884
0
          && peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
885
0
    });
886
0
    return tablet_peers.size() == 1;
887
0
  }, deadline, "Waiting for election in tablet " + tablet_id);
888
0
}
889
890
0
CHECKED_STATUS WaitUntilMasterHasLeader(MiniCluster* cluster, MonoDelta timeout) {
891
0
  return WaitFor([cluster] {
892
0
    for (size_t i = 0; i != cluster->num_masters(); ++i) {
893
0
      auto tablet_peer = cluster->mini_master(i)->tablet_peer();
894
0
      if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
895
0
        return true;
896
0
      }
897
0
    }
898
0
    return false;
899
0
  }, timeout, "Waiting for master leader");
900
0
}
901
902
Status WaitForLeaderOfSingleTablet(
903
    MiniCluster* cluster, tablet::TabletPeerPtr leader, MonoDelta duration,
904
0
    const std::string& description) {
905
0
  return WaitFor([cluster, &leader] {
906
0
    auto new_leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders);
907
0
    return new_leaders.size() == 1 && new_leaders[0] == leader;
908
0
  }, duration, description);
909
0
}
910
911
Status StepDown(
912
    tablet::TabletPeerPtr leader, const std::string& new_leader_uuid,
913
0
    ForceStepDown force_step_down) {
914
0
  consensus::LeaderStepDownRequestPB req;
915
0
  req.set_tablet_id(leader->tablet_id());
916
0
  req.set_new_leader_uuid(new_leader_uuid);
917
0
  if (force_step_down) {
918
0
    req.set_force_step_down(true);
919
0
  }
920
0
  consensus::LeaderStepDownResponsePB resp;
921
0
  RETURN_NOT_OK(leader->consensus()->StepDown(&req, &resp));
922
0
  if (resp.has_error()) {
923
0
    return STATUS_FORMAT(RuntimeError, "Step down failed: $0", resp);
924
0
  }
925
0
  return Status::OK();
926
0
}
927
928
std::thread RestartsThread(
929
0
    MiniCluster* cluster, CoarseDuration interval, std::atomic<bool>* stop_flag) {
930
0
  return std::thread([cluster, interval, stop_flag] {
931
0
    CDSAttacher attacher;
932
0
    SetFlagOnExit set_stop_on_exit(stop_flag);
933
0
    int it = 0;
934
0
    while (!stop_flag->load(std::memory_order_acquire)) {
935
0
      std::this_thread::sleep_for(interval);
936
0
      ASSERT_OK(cluster->mini_tablet_server(++it % cluster->num_tablet_servers())->Restart());
937
0
    }
938
0
  });
939
0
}
940
941
0
Status WaitAllReplicasReady(MiniCluster* cluster, MonoDelta timeout) {
942
0
  return WaitFor([cluster] {
943
0
    std::unordered_set<std::string> tablet_ids;
944
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
945
0
    for (const auto& peer : peers) {
946
0
      if (peer->state() != tablet::RaftGroupStatePB::RUNNING) {
947
0
        return false;
948
0
      }
949
0
      tablet_ids.insert(peer->tablet_id());
950
0
    }
951
0
    auto replication_factor = cluster->num_tablet_servers();
952
0
    return tablet_ids.size() * replication_factor == peers.size();
953
0
  }, timeout, "Wait all replicas to be ready");
954
0
}
955
956
0
Status WaitAllReplicasHaveIndex(MiniCluster* cluster, int64_t index, MonoDelta timeout) {
957
0
  return WaitFor([cluster, index] {
958
0
    std::unordered_set<std::string> tablet_ids;
959
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
960
0
    for (const auto& peer : peers) {
961
0
      if (peer->GetLatestLogEntryOpId().index < index) {
962
0
        return false;
963
0
      }
964
0
      tablet_ids.insert(peer->tablet_id());
965
0
    }
966
0
    auto replication_factor = cluster->num_tablet_servers();
967
0
    return tablet_ids.size() * replication_factor == peers.size();
968
0
  }, timeout, "Wait for all replicas to have a specific Raft index");
969
0
}
970
971
template <class Collection>
972
0
void PushBackIfNotNull(const typename Collection::value_type& value, Collection* collection) {
973
0
  if (value != nullptr) {
974
0
    collection->push_back(value);
975
0
  }
976
0
}
977
978
0
std::vector<rocksdb::DB*> GetAllRocksDbs(MiniCluster* cluster, bool include_intents) {
979
0
  std::vector<rocksdb::DB*> dbs;
980
0
  for (auto& peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) {
981
0
    const auto* tablet = peer->tablet();
982
0
    PushBackIfNotNull(tablet->TEST_db(), &dbs);
983
0
    if (include_intents) {
984
0
      PushBackIfNotNull(tablet->TEST_intents_db(), &dbs);
985
0
    }
986
0
  }
987
0
  return dbs;
988
0
}
989
990
0
int NumTotalRunningCompactions(MiniCluster* cluster) {
991
0
  int compactions = 0;
992
0
  for (auto* db : GetAllRocksDbs(cluster)) {
993
0
    compactions += down_cast<rocksdb::DBImpl*>(db)->TEST_NumTotalRunningCompactions();
994
0
  }
995
0
  return compactions;
996
0
}
997
998
0
int NumRunningFlushes(MiniCluster* cluster) {
999
0
  int flushes = 0;
1000
0
  for (auto* db : GetAllRocksDbs(cluster)) {
1001
0
    flushes += down_cast<rocksdb::DBImpl*>(db)->TEST_NumRunningFlushes();
1002
0
  }
1003
0
  return flushes;
1004
0
}
1005
1006
Result<scoped_refptr<master::TableInfo>> FindTable(
1007
0
    MiniCluster* cluster, const client::YBTableName& table_name) {
1008
0
  auto& catalog_manager = VERIFY_RESULT(cluster->GetLeaderMiniMaster())->catalog_manager();
1009
0
  master::TableIdentifierPB identifier;
1010
0
  table_name.SetIntoTableIdentifierPB(&identifier);
1011
0
  return catalog_manager.FindTable(identifier);
1012
0
}
1013
1014
0
Status WaitForInitDb(MiniCluster* cluster) {
1015
0
  const auto start_time = CoarseMonoClock::now();
1016
0
  const auto kTimeout = RegularBuildVsSanitizers(600s, 1800s);
1017
0
  while (CoarseMonoClock::now() <= start_time + kTimeout) {
1018
0
    auto leader_mini_master = cluster->GetLeaderMiniMaster();
1019
0
    if (!leader_mini_master.ok()) {
1020
0
      continue;
1021
0
    }
1022
0
    auto& catalog_manager = (*leader_mini_master)->catalog_manager();
1023
0
    master::IsInitDbDoneRequestPB req;
1024
0
    master::IsInitDbDoneResponsePB resp;
1025
0
    auto status = catalog_manager.IsInitDbDone(&req, &resp);
1026
0
    if (!status.ok()) {
1027
0
      LOG(INFO) << "IsInitDbDone failure: " << status;
1028
0
      continue;
1029
0
    }
1030
0
    if (resp.has_initdb_error()) {
1031
0
      return STATUS_FORMAT(RuntimeError, "Init DB failed: $0", resp.initdb_error());
1032
0
    }
1033
0
    if (resp.done()) {
1034
0
      return Status::OK();
1035
0
    }
1036
0
    std::this_thread::sleep_for(500ms);
1037
0
  }
1038
1039
0
  return STATUS_FORMAT(TimedOut, "Unable to init db in $0", kTimeout);
1040
0
}
1041
1042
0
size_t CountIntents(MiniCluster* cluster, const TabletPeerFilter& filter) {
1043
0
  size_t result = 0;
1044
0
  auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
1045
0
  for (const auto &peer : peers) {
1046
0
    auto participant = peer->tablet() ? peer->tablet()->transaction_participant() : nullptr;
1047
0
    if (!participant) {
1048
0
      continue;
1049
0
    }
1050
0
    if (filter && !filter(peer.get())) {
1051
0
      continue;
1052
0
    }
1053
0
    auto intents_count = participant->TEST_CountIntents();
1054
0
    if (intents_count.first) {
1055
0
      result += intents_count.first;
1056
0
      LOG(INFO) << Format("T $0 P $1: Intents present: $2, transactions: $3", peer->tablet_id(),
1057
0
                          peer->permanent_uuid(), intents_count.first, intents_count.second);
1058
0
    }
1059
0
  }
1060
0
  return result;
1061
0
}
1062
1063
0
MiniTabletServer* FindTabletLeader(MiniCluster* cluster, const TabletId& tablet_id) {
1064
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1065
0
    auto server = cluster->mini_tablet_server(i);
1066
0
    if (!server->server()) { // Server is shut down.
1067
0
      continue;
1068
0
    }
1069
0
    if (server->server()->LeaderAndReady(tablet_id)) {
1070
0
      return server;
1071
0
    }
1072
0
  }
1073
1074
0
  return nullptr;
1075
0
}
1076
1077
0
void ShutdownAllTServers(MiniCluster* cluster) {
1078
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1079
0
    cluster->mini_tablet_server(i)->Shutdown();
1080
0
  }
1081
0
}
1082
1083
0
Status StartAllTServers(MiniCluster* cluster) {
1084
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1085
0
    RETURN_NOT_OK(cluster->mini_tablet_server(i)->Start());
1086
0
  }
1087
1088
0
  return Status::OK();
1089
0
}
1090
1091
0
void ShutdownAllMasters(MiniCluster* cluster) {
1092
0
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
1093
0
    cluster->mini_master(i)->Shutdown();
1094
0
  }
1095
0
}
1096
1097
0
Status StartAllMasters(MiniCluster* cluster) {
1098
0
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
1099
0
    RETURN_NOT_OK(cluster->mini_master(i)->Start());
1100
0
  }
1101
1102
0
  return Status::OK();
1103
0
}
1104
1105
void SetupConnectivity(
1106
0
    rpc::Messenger* messenger, const IpAddress& address, Connectivity connectivity) {
1107
0
  switch (connectivity) {
1108
0
    case Connectivity::kOn:
1109
0
      messenger->RestoreConnectivityTo(address);
1110
0
      return;
1111
0
    case Connectivity::kOff:
1112
0
      messenger->BreakConnectivityTo(address);
1113
0
      return;
1114
0
  }
1115
0
  FATAL_INVALID_ENUM_VALUE(Connectivity, connectivity);
1116
0
}
1117
1118
Status SetupConnectivity(
1119
0
    MiniCluster* cluster, size_t idx1, size_t idx2, Connectivity connectivity) {
1120
0
  for (auto from_idx : {idx1, idx2}) {
1121
0
    auto to_idx = idx1 ^ idx2 ^ from_idx;
1122
0
    for (auto type : {server::Private::kFalse, server::Private::kTrue}) {
1123
      // TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed.
1124
0
      auto address = VERIFY_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, type)));
1125
0
      if (from_idx < cluster->num_masters()) {
1126
0
        SetupConnectivity(
1127
0
            cluster->mini_master(from_idx)->master()->messenger(), address, connectivity);
1128
0
      }
1129
0
      if (from_idx < cluster->num_tablet_servers()) {
1130
0
        SetupConnectivity(
1131
0
            cluster->mini_tablet_server(from_idx)->server()->messenger(), address, connectivity);
1132
0
      }
1133
0
    }
1134
0
  }
1135
1136
0
  return Status::OK();
1137
0
}
1138
1139
0
Status BreakConnectivity(MiniCluster* cluster, size_t idx1, size_t idx2) {
1140
0
  return SetupConnectivity(cluster, idx1, idx2, Connectivity::kOff);
1141
0
}
1142
1143
0
Result<size_t> ServerWithLeaders(MiniCluster* cluster) {
1144
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1145
0
    auto* server = cluster->mini_tablet_server(i)->server();
1146
0
    if (!server) {
1147
0
      continue;
1148
0
    }
1149
0
    auto* ts_manager = server->tablet_manager();
1150
0
    if (ts_manager->GetLeaderCount() != 0) {
1151
0
      return i;
1152
0
    }
1153
0
  }
1154
0
  return STATUS(NotFound, "No tablet server with leaders");
1155
0
}
1156
1157
0
void SetCompactFlushRateLimitBytesPerSec(MiniCluster* cluster, const size_t bytes_per_sec) {
1158
0
  LOG(INFO) << "Setting FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec to: " << bytes_per_sec
1159
0
            << " and updating compact/flush rate in existing tablets";
1160
0
  FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec = bytes_per_sec;
1161
0
  for (auto& tablet_peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) {
1162
0
    auto tablet = tablet_peer->shared_tablet();
1163
0
    for (auto* db : { tablet->TEST_db(), tablet->TEST_intents_db() }) {
1164
0
      if (db) {
1165
0
        db->GetDBOptions().rate_limiter->SetBytesPerSecond(bytes_per_sec);
1166
0
      }
1167
0
    }
1168
0
  }
1169
0
}
1170
1171
CHECKED_STATUS WaitAllReplicasSynchronizedWithLeader(
1172
0
    MiniCluster* cluster, CoarseTimePoint deadline) {
1173
0
  auto leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders);
1174
0
  std::unordered_map<TabletId, int64_t> last_committed_idx;
1175
0
  for (const auto& peer : leaders) {
1176
0
    auto idx = peer->consensus()->GetLastCommittedOpId().index;
1177
0
    last_committed_idx.emplace(peer->tablet_id(), idx);
1178
0
    LOG(INFO) << "Committed op id for " << peer->tablet_id() << ": " << idx;
1179
0
  }
1180
0
  auto non_leaders = ListTabletPeers(cluster, ListPeersFilter::kNonLeaders);
1181
0
  for (const auto& peer : non_leaders) {
1182
0
    auto it = last_committed_idx.find(peer->tablet_id());
1183
0
    if (it == last_committed_idx.end()) {
1184
0
      return STATUS_FORMAT(IllegalState, "Unknown committed op id for $0", peer->tablet_id());
1185
0
    }
1186
0
    RETURN_NOT_OK(Wait([idx = it->second, peer]() {
1187
0
      return peer->consensus()->GetLastCommittedOpId().index >= idx;
1188
0
    },
1189
0
    deadline, Format("Wait T $0 P $1 commit $2",
1190
0
                     peer->tablet_id(), peer->permanent_uuid(), it->second)));
1191
0
  }
1192
0
  return Status::OK();
1193
0
}
1194
1195
}  // namespace yb