YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/integration-tests/mini_cluster.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/mini_cluster.h"
34
35
#include <algorithm>
36
37
#include "yb/client/client.h"
38
#include "yb/client/yb_table_name.h"
39
40
#include "yb/consensus/consensus.h"
41
#include "yb/consensus/consensus.pb.h"
42
43
#include "yb/gutil/casts.h"
44
#include "yb/gutil/strings/join.h"
45
#include "yb/gutil/strings/substitute.h"
46
47
#include "yb/master/catalog_entity_info.h"
48
#include "yb/master/catalog_manager_if.h"
49
#include "yb/master/master.h"
50
#include "yb/master/master_admin.pb.h"
51
#include "yb/master/master_client.pb.h"
52
#include "yb/master/master_cluster.pb.h"
53
#include "yb/master/mini_master.h"
54
#include "yb/master/scoped_leader_shared_lock.h"
55
#include "yb/master/ts_descriptor.h"
56
#include "yb/master/ts_manager.h"
57
58
#include "yb/rocksdb/db/db_impl.h"
59
#include "yb/rocksdb/rate_limiter.h"
60
61
#include "yb/rpc/messenger.h"
62
63
#include "yb/server/hybrid_clock.h"
64
#include "yb/server/skewed_clock.h"
65
66
#include "yb/tablet/tablet.h"
67
#include "yb/tablet/tablet_metadata.h"
68
#include "yb/tablet/tablet_peer.h"
69
#include "yb/tablet/transaction_participant.h"
70
71
#include "yb/tserver/mini_tablet_server.h"
72
#include "yb/tserver/tablet_server.h"
73
#include "yb/tserver/ts_tablet_manager.h"
74
75
#include "yb/util/debug/long_operation_tracker.h"
76
#include "yb/util/format.h"
77
#include "yb/util/path_util.h"
78
#include "yb/util/random_util.h"
79
#include "yb/util/scope_exit.h"
80
#include "yb/util/status.h"
81
#include "yb/util/status_format.h"
82
#include "yb/util/status_log.h"
83
#include "yb/util/stopwatch.h"
84
#include "yb/util/test_thread_holder.h"
85
#include "yb/util/test_util.h"
86
#include "yb/util/tsan_util.h"
87
88
using namespace std::literals;
89
using strings::Substitute;
90
91
DEFINE_string(mini_cluster_base_dir, "", "Directory for master/ts data");
92
DEFINE_bool(mini_cluster_reuse_data, false, "Reuse data of mini cluster");
93
DECLARE_int32(master_svc_num_threads);
94
DECLARE_int32(memstore_size_mb);
95
DECLARE_int32(master_consensus_svc_num_threads);
96
DECLARE_int32(master_remote_bootstrap_svc_num_threads);
97
DECLARE_int32(generic_svc_num_threads);
98
DECLARE_int32(tablet_server_svc_num_threads);
99
DECLARE_int32(ts_admin_svc_num_threads);
100
DECLARE_int32(ts_consensus_svc_num_threads);
101
DECLARE_int32(ts_remote_bootstrap_svc_num_threads);
102
DECLARE_int32(replication_factor);
103
DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec);
104
DECLARE_string(use_private_ip);
105
DECLARE_int32(load_balancer_initial_delay_secs);
106
DECLARE_int32(transaction_table_num_tablets);
107
108
namespace yb {
109
110
using client::YBClient;
111
using client::YBClientBuilder;
112
using master::CatalogManager;
113
using master::MiniMaster;
114
using master::TabletLocationsPB;
115
using master::TSDescriptor;
116
using std::shared_ptr;
117
using std::string;
118
using std::vector;
119
using tserver::MiniTabletServer;
120
using tserver::TabletServer;
121
using master::GetMasterClusterConfigResponsePB;
122
using master::ChangeMasterClusterConfigRequestPB;
123
using master::ChangeMasterClusterConfigResponsePB;
124
using master::SysClusterConfigEntryPB;
125
126
namespace {
127
128
const std::vector<uint16_t> EMPTY_MASTER_RPC_PORTS = {};
129
const int kMasterLeaderElectionWaitTimeSeconds = 20 * kTimeMultiplier;
130
const int kRegistrationWaitTimeSeconds = 45 * kTimeMultiplier;
131
const int kTabletReportWaitTimeSeconds = 5;
132
133
992
std::string GetClusterDataDirName(const MiniClusterOptions& options) {
134
992
  std::string cluster_name = "minicluster-data";
135
992
  if (options.cluster_id == "") {
136
825
    return cluster_name;
137
825
  }
138
167
  return Format("$0-$1", cluster_name, options.cluster_id);
139
992
}
140
141
992
std::string GetFsRoot(const MiniClusterOptions& options) {
142
992
  if (!options.data_root.empty()) {
143
0
    return options.data_root;
144
0
  }
145
992
  if (!FLAGS_mini_cluster_base_dir.empty()) {
146
0
    return FLAGS_mini_cluster_base_dir;
147
0
  }
148
992
  return JoinPathSegments(GetTestDataDirectory(), GetClusterDataDirName(options));
149
992
}
150
151
} // namespace
152
153
MiniCluster::MiniCluster(const MiniClusterOptions& options)
154
    : options_(options),
155
992
      fs_root_(GetFsRoot(options)) {
156
992
  mini_masters_.resize(options_.num_masters);
157
992
}
158
159
81
MiniCluster::~MiniCluster() {
160
81
  Shutdown();
161
81
}
162
163
992
Status MiniCluster::Start(const std::vector<tserver::TabletServerOptions>& extra_tserver_options) {
164
992
  CHECK
(!fs_root_.empty()) << "No Fs root was provided"0
;
165
992
  CHECK(!running_);
166
167
992
  EnsurePortsAllocated();
168
169
992
  if (!options_.master_env->FileExists(fs_root_)) {
170
992
    RETURN_NOT_OK(options_.master_env->CreateDir(fs_root_));
171
992
  }
172
173
  // TODO: properly handle setting these variables in case of multiple MiniClusters in the same
174
  // process.
175
176
  // Use conservative number of threads for the mini cluster for unit test env
177
  // where several unit tests tend to run in parallel.
178
  // To get default number of threads - try to find SERVICE_POOL_OPTIONS macro usage.
179
992
  FLAGS_master_svc_num_threads = 2;
180
992
  FLAGS_master_consensus_svc_num_threads = 2;
181
992
  FLAGS_master_remote_bootstrap_svc_num_threads = 2;
182
992
  FLAGS_generic_svc_num_threads = 2;
183
184
992
  FLAGS_tablet_server_svc_num_threads = 8;
185
992
  FLAGS_ts_admin_svc_num_threads = 2;
186
992
  FLAGS_ts_consensus_svc_num_threads = 8;
187
992
  FLAGS_ts_remote_bootstrap_svc_num_threads = 2;
188
189
  // Limit number of transaction table tablets to help avoid timeouts.
190
992
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_transaction_table_num_tablets) =
191
992
      NumTabletsPerTransactionTable(options_);
192
193
  // We are testing public/private IPs using mini cluster. So set mode to 'cloud'.
194
992
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_private_ip) = "cloud";
195
196
  // This dictates the RF of newly created tables.
197
992
  SetAtomicFlag(options_.num_tablet_servers >= 3 ? 
3668
:
1324
, &FLAGS_replication_factor);
198
992
  FLAGS_memstore_size_mb = 16;
199
  // Default master args to make sure we don't wait to trigger new LB tasks upon master leader
200
  // failover.
201
992
  FLAGS_load_balancer_initial_delay_secs = 0;
202
203
  // start the masters
204
992
  RETURN_NOT_OK_PREPEND(StartMasters(),
205
992
                        "Couldn't start distributed masters");
206
207
991
  if (!extra_tserver_options.empty() &&
208
991
      
extra_tserver_options.size() != options_.num_tablet_servers2
) {
209
0
    return STATUS_SUBSTITUTE(InvalidArgument, "num tserver options: $0 doesn't match with num "
210
0
        "tservers: $1", extra_tserver_options.size(), options_.num_tablet_servers);
211
0
  }
212
213
2.46k
  
for (size_t i = 0; 991
i < options_.num_tablet_servers;
i++1.47k
) {
214
1.47k
    if (!extra_tserver_options.empty()) {
215
2
      RETURN_NOT_OK_PREPEND(AddTabletServer(extra_tserver_options[i]),
216
2
                            Substitute("Error adding TS $0", i));
217
1.47k
    } else {
218
1.47k
      RETURN_NOT_OK_PREPEND(AddTabletServer(),
219
1.47k
                            Substitute("Error adding TS $0", i));
220
1.46k
    }
221
222
1.47k
  }
223
224
990
  RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(options_.num_tablet_servers),
225
990
                        "Waiting for tablet servers to start");
226
227
990
  running_ = true;
228
990
  return Status::OK();
229
990
}
230
231
992
Status MiniCluster::StartMasters() {
232
992
  CHECK_GE(master_rpc_ports_.size(), options_.num_masters);
233
992
  EnsurePortsAllocated();
234
235
992
  LOG(INFO) << "Creating distributed mini masters. RPC ports: "
236
992
            << JoinInts(master_rpc_ports_, ", ");
237
238
992
  if (mini_masters_.size() < options_.num_masters) {
239
0
    mini_masters_.resize(options_.num_masters);
240
0
  }
241
242
992
  bool started = false;
243
992
  auto se = ScopeExit([this, &started] {
244
954
    if (!started) {
245
8
      for (const auto& master : mini_masters_) {
246
8
        if (master) {
247
8
          master->Shutdown();
248
8
        }
249
8
      }
250
4
    }
251
954
  });
252
253
2.06k
  for (size_t i = 0; i < options_.num_masters; 
i++1.07k
) {
254
1.07k
    mini_masters_[i] = std::make_shared<MiniMaster>(
255
1.07k
        options_.master_env, GetMasterFsRoot(i), master_rpc_ports_[i], master_web_ports_[i], i);
256
1.07k
    auto status = mini_masters_[i]->StartDistributedMaster(master_rpc_ports_);
257
1.07k
    LOG_IF
(INFO, !status.ok()) << "Failed to start master: " << status42
;
258
1.07k
    RETURN_NOT_OK_PREPEND(status, Substitute("Couldn't start follower $0", i));
259
1.07k
    VLOG(1) << "Started MiniMaster with UUID " << mini_masters_[i]->permanent_uuid()
260
39
            << " at index " << i;
261
1.07k
  }
262
989
  int i = 0;
263
989
  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
264
950
    LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
265
950
    RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
266
950
                          Substitute("Could not initialize catalog manager on master $0", i));
267
950
  }
268
989
  started = true;
269
989
  return Status::OK();
270
989
}
271
272
167
Status MiniCluster::StartSync() {
273
167
  RETURN_NOT_OK(Start());
274
167
  int count = 0;
275
167
  for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
276
0
    RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
277
0
                          Substitute("TabletServer $0 failed to start.", count));
278
0
    count++;
279
0
  }
280
167
  return Status::OK();
281
167
}
282
283
0
Status MiniCluster::RestartSync() {
284
0
  LOG(INFO) << string(80, '-');
285
0
  LOG(INFO) << __FUNCTION__;
286
0
  LOG(INFO) << string(80, '-');
287
288
0
  LOG(INFO) << "Restart tablet server(s)...";
289
0
  for (auto& tablet_server : mini_tablet_servers_) {
290
0
    CHECK_OK(tablet_server->Restart());
291
0
    CHECK_OK(tablet_server->WaitStarted());
292
0
  }
293
0
  LOG(INFO) << "Restart master server(s)...";
294
0
  for (auto& master_server : mini_masters_) {
295
0
    LOG(INFO) << "Restarting master " << master_server->permanent_uuid();
296
0
    LongOperationTracker long_operation_tracker("Master restart", 5s);
297
0
    CHECK_OK(master_server->Restart());
298
0
    LOG(INFO) << "Waiting for catalog manager at " << master_server->permanent_uuid();
299
0
    CHECK_OK(master_server->WaitForCatalogManagerInit());
300
0
  }
301
0
  LOG(INFO) << string(80, '-');
302
0
  LOG(INFO) << __FUNCTION__ << " done";
303
0
  LOG(INFO) << string(80, '-');
304
305
0
  RETURN_NOT_OK_PREPEND(WaitForAllTabletServers(),
306
0
                        "Waiting for tablet servers to start");
307
0
  running_ = true;
308
0
  return Status::OK();
309
0
}
310
311
1.47k
Status MiniCluster::AddTabletServer(const tserver::TabletServerOptions& extra_opts) {
312
1.47k
  if (mini_masters_.empty()) {
313
0
    return STATUS(IllegalState, "Master not yet initialized");
314
0
  }
315
1.47k
  auto new_idx = mini_tablet_servers_.size();
316
317
1.47k
  EnsurePortsAllocated(0 /* num_masters (will pick default) */, new_idx + 1);
318
1.47k
  const uint16_t ts_rpc_port = tserver_rpc_ports_[new_idx];
319
320
1.47k
  std::shared_ptr<MiniTabletServer> tablet_server;
321
1.47k
  if (options_.num_drives == 1) {
322
1.46k
    tablet_server = std::make_shared<MiniTabletServer>(
323
1.46k
          GetTabletServerFsRoot(new_idx), ts_rpc_port, extra_opts, new_idx);
324
1.46k
  } else {
325
10
    std::vector<std::string> dirs;
326
40
    for (int i = 0; i < options_.num_drives; 
++i30
) {
327
30
      dirs.push_back(GetTabletServerDrive(new_idx, i));
328
30
    }
329
10
    tablet_server = std::make_shared<MiniTabletServer>(
330
10
          dirs, dirs, ts_rpc_port, extra_opts, new_idx);
331
10
  }
332
333
  // set the master addresses
334
1.47k
  auto master_addr = std::make_shared<server::MasterAddresses>();
335
1.47k
  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
336
1.47k
    master_addr->push_back({HostPort(master->bound_rpc_addr())});
337
1.47k
    for (const auto& hp : master->master()->opts().broadcast_addresses) {
338
1.47k
      master_addr->back().push_back(hp);
339
1.47k
    }
340
1.47k
  }
341
342
1.47k
  tablet_server->options()->master_addresses_flag = server::MasterAddressesToString(*master_addr);
343
1.47k
  tablet_server->options()->SetMasterAddresses(master_addr);
344
1.47k
  tablet_server->options()->webserver_opts.port = tserver_web_ports_[new_idx];
345
1.47k
  if (options_.ts_env) {
346
10
    tablet_server->options()->env = options_.ts_env;
347
10
  }
348
1.47k
  if (options_.ts_rocksdb_env) {
349
10
    tablet_server->options()->rocksdb_env = options_.ts_rocksdb_env;
350
10
  }
351
1.47k
  RETURN_NOT_OK(tablet_server->Start());
352
1.47k
  mini_tablet_servers_.push_back(tablet_server);
353
1.47k
  return Status::OK();
354
1.47k
}
355
356
1.47k
Status MiniCluster::AddTabletServer() {
357
1.47k
  auto options = tserver::TabletServerOptions::CreateTabletServerOptions();
358
1.47k
  RETURN_NOT_OK(options);
359
1.47k
  return AddTabletServer(*options);
360
1.47k
}
361
362
namespace {
363
364
Status ChangeClusterConfig(
365
    master::CatalogManagerIf* catalog_manager,
366
0
    std::function<void(SysClusterConfigEntryPB*)> config_changer) {
367
0
  GetMasterClusterConfigResponsePB config_resp;
368
0
  RETURN_NOT_OK(catalog_manager->GetClusterConfig(&config_resp));
369
370
0
  ChangeMasterClusterConfigRequestPB change_req;
371
0
  *change_req.mutable_cluster_config() = std::move(*config_resp.mutable_cluster_config());
372
0
  SysClusterConfigEntryPB* config = change_req.mutable_cluster_config();
373
374
0
  config_changer(config);
375
376
0
  ChangeMasterClusterConfigResponsePB change_resp;
377
0
  return catalog_manager->SetClusterConfig(&change_req, &change_resp);
378
0
}
379
380
} // namespace
381
382
0
Status MiniCluster::AddTServerToBlacklist(const MiniTabletServer& ts) {
383
0
  const auto* master = VERIFY_RESULT(GetLeaderMiniMaster());
384
385
0
  RETURN_NOT_OK(ChangeClusterConfig(
386
0
      &master->catalog_manager(), [&ts](SysClusterConfigEntryPB* config) {
387
        // Add tserver to blacklist.
388
0
        HostPortPB* blacklist_host_pb = config->mutable_server_blacklist()->mutable_hosts()->Add();
389
0
        blacklist_host_pb->set_host(ts.bound_rpc_addr().address().to_string());
390
0
        blacklist_host_pb->set_port(ts.bound_rpc_addr().port());
391
0
      }));
392
393
0
  LOG(INFO) << "TServer " << ts.server()->permanent_uuid() << " at "
394
0
            << ts.bound_rpc_addr().address().to_string() << ":" << ts.bound_rpc_addr().port()
395
0
            << " was added to the blacklist";
396
397
0
  return Status::OK();
398
0
}
399
400
0
Status MiniCluster::ClearBlacklist() {
401
0
  const auto* master = VERIFY_RESULT(GetLeaderMiniMaster());
402
403
0
  RETURN_NOT_OK(
404
0
      ChangeClusterConfig(&master->catalog_manager(), [](SysClusterConfigEntryPB* config) {
405
0
        config->mutable_server_blacklist()->Clear();
406
0
        config->mutable_leader_blacklist()->Clear();
407
0
      }));
408
409
0
  LOG(INFO) << "Blacklist has been cleared";
410
411
0
  return Status::OK();
412
0
}
413
414
0
string MiniCluster::GetMasterAddresses() const {
415
0
  string peer_addrs = "";
416
0
  for (const auto& master : mini_masters_) {
417
0
    if (!peer_addrs.empty()) {
418
0
      peer_addrs += ",";
419
0
    }
420
0
    peer_addrs += master->bound_rpc_addr_str();
421
0
  }
422
0
  return peer_addrs;
423
0
}
424
425
0
string MiniCluster::GetTserverHTTPAddresses() const {
426
0
  string peer_addrs = "";
427
0
  for (const auto& tserver : mini_tablet_servers_) {
428
0
    if (!peer_addrs.empty()) {
429
0
      peer_addrs += ",";
430
0
    }
431
0
    peer_addrs += tserver->bound_http_addr_str();
432
0
  }
433
0
  return peer_addrs;
434
0
}
435
436
55.6k
ssize_t MiniCluster::LeaderMasterIdx() {
437
55.6k
  Stopwatch sw;
438
55.6k
  sw.start();
439
636k
  while (sw.elapsed().wall_seconds() < kMasterLeaderElectionWaitTimeSeconds) {
440
1.21M
    for (size_t i = 0; i < mini_masters_.size(); 
i++580k
) {
441
636k
      MiniMaster* master = mini_master(i);
442
636k
      if (master->master() == nullptr || master->master()->IsShutdown()) {
443
0
        continue;
444
0
      }
445
636k
      SCOPED_LEADER_SHARED_LOCK(l, master->master()->catalog_manager_impl());
446
636k
      if (l.catalog_status().ok() && 
l.leader_status().ok()636k
) {
447
55.6k
        return i;
448
55.6k
      }
449
636k
    }
450
580k
    SleepFor(MonoDelta::FromMilliseconds(1));
451
580k
  }
452
8
  LOG(ERROR) << "No leader master elected after " << kMasterLeaderElectionWaitTimeSeconds
453
8
             << " seconds.";
454
8
  return -1;
455
55.6k
}
456
457
55.6k
Result<MiniMaster*> MiniCluster::GetLeaderMiniMaster() {
458
55.6k
  const auto idx = LeaderMasterIdx();
459
55.6k
  if (idx == -1) {
460
0
    return STATUS(TimedOut, "No leader master has been elected");
461
0
  }
462
55.6k
  return mini_master(idx);
463
55.6k
}
464
465
186
void MiniCluster::Shutdown() {
466
186
  if (!running_)
467
139
    return;
468
469
47
  for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
470
47
    tablet_server->Shutdown();
471
47
  }
472
47
  mini_tablet_servers_.clear();
473
474
47
  for (shared_ptr<MiniMaster>& master_server : mini_masters_) {
475
33
    master_server->Shutdown();
476
33
    master_server.reset();
477
33
  }
478
47
  mini_masters_.clear();
479
480
47
  running_ = false;
481
47
}
482
483
0
Status MiniCluster::FlushTablets(tablet::FlushMode mode, tablet::FlushFlags flags) {
484
0
  for (const auto& tablet_server : mini_tablet_servers_) {
485
0
    RETURN_NOT_OK(tablet_server->FlushTablets(mode, flags));
486
0
  }
487
0
  return Status::OK();
488
0
}
489
490
0
Status MiniCluster::CompactTablets() {
491
0
  for (const auto& tablet_server : mini_tablet_servers_) {
492
0
    RETURN_NOT_OK(tablet_server->CompactTablets());
493
0
  }
494
0
  return Status::OK();
495
0
}
496
497
0
Status MiniCluster::SwitchMemtables() {
498
0
  for (const auto& tablet_server : mini_tablet_servers_) {
499
0
    RETURN_NOT_OK(tablet_server->SwitchMemtables());
500
0
  }
501
0
  return Status::OK();
502
0
}
503
504
0
Status MiniCluster::CleanTabletLogs() {
505
0
  for (const auto& tablet_server : mini_tablet_servers_) {
506
0
    RETURN_NOT_OK(tablet_server->CleanTabletLogs());
507
0
  }
508
0
  return Status::OK();
509
0
}
510
511
0
void MiniCluster::ShutdownMasters() {
512
0
  for (shared_ptr<MiniMaster>& master_server : mini_masters_) {
513
0
    master_server->Shutdown();
514
0
    master_server.reset();
515
0
  }
516
0
}
517
518
692k
MiniMaster* MiniCluster::mini_master(size_t idx) {
519
692k
  CHECK_GE
(idx, 0) << "Master idx must be >= 0"0
;
520
692k
  CHECK_LT
(idx, mini_masters_.size()) << "Master idx must be < num masters started"0
;
521
692k
  return mini_masters_[idx].get();
522
692k
}
523
524
162
MiniTabletServer* MiniCluster::mini_tablet_server(size_t idx) {
525
162
  CHECK_GE
(idx, 0) << "TabletServer idx must be >= 0"0
;
526
162
  CHECK_LT
(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'"0
;
527
162
  return mini_tablet_servers_[idx].get();
528
162
}
529
530
0
MiniTabletServer* MiniCluster::find_tablet_server(const std::string& uuid) {
531
0
  for (const auto& server : mini_tablet_servers_) {
532
0
    if (!server->server()) {
533
0
      continue;
534
0
    }
535
0
    if (server->server()->instance_pb().permanent_uuid() == uuid) {
536
0
      return server.get();
537
0
    }
538
0
  }
539
0
  return nullptr;
540
0
}
541
542
1.07k
string MiniCluster::GetMasterFsRoot(size_t idx) {
543
1.07k
  return JoinPathSegments(fs_root_, Substitute("master-$0-root", idx + 1));
544
1.07k
}
545
546
1.46k
string MiniCluster::GetTabletServerFsRoot(size_t idx) {
547
1.46k
  return JoinPathSegments(fs_root_, Substitute("ts-$0-root", idx + 1));
548
1.46k
}
549
550
75
string MiniCluster::GetTabletServerDrive(size_t idx, int drive_index) {
551
75
  if (options_.num_drives == 1) {
552
0
    return GetTabletServerFsRoot(idx);
553
0
  }
554
75
  return JoinPathSegments(fs_root_, Substitute("ts-$0-drive-$1", idx + 1, drive_index + 1));
555
75
}
556
557
28
tserver::TSTabletManager* MiniCluster::GetTabletManager(size_t idx) {
558
28
  return mini_tablet_server(idx)->server()->tablet_manager();
559
28
}
560
561
0
std::vector<std::shared_ptr<tablet::TabletPeer>> MiniCluster::GetTabletPeers(size_t idx) {
562
0
  return GetTabletManager(idx)->GetTabletPeers();
563
0
}
564
565
Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
566
                                        int expected_count,
567
1
                                        TabletLocationsPB* locations) {
568
1
  Stopwatch sw;
569
1
  sw.start();
570
3.35k
  while (sw.elapsed().wall_seconds() < kTabletReportWaitTimeSeconds) {
571
3.35k
    auto leader_mini_master = GetLeaderMiniMaster();
572
3.35k
    if (!leader_mini_master.ok()) {
573
0
      continue;
574
0
    }
575
3.35k
    locations->Clear();
576
3.35k
    Status s = (*leader_mini_master)
577
3.35k
                   ->master()
578
3.35k
                   ->catalog_manager()
579
3.35k
                   ->GetTabletLocations(tablet_id, locations);
580
3.35k
    if (s.ok() && 
(0
(0
locations->stale()0
&&
expected_count == 00
) ||
581
0
        (!locations->stale() && locations->replicas_size() == expected_count))) {
582
0
      return Status::OK();
583
0
    }
584
585
3.35k
    SleepFor(MonoDelta::FromMilliseconds(1));
586
3.35k
  }
587
1
  return STATUS(TimedOut, Substitute("Tablet $0 never reached expected replica count $1",
588
1
                                     tablet_id, expected_count));
589
1
}
590
591
0
Status MiniCluster::WaitForAllTabletServers() {
592
0
  return WaitForTabletServerCount(num_tablet_servers());
593
0
}
594
595
298
Status MiniCluster::WaitForTabletServerCount(size_t count) {
596
298
  vector<shared_ptr<master::TSDescriptor> > descs;
597
298
  return WaitForTabletServerCount(count, &descs);
598
298
}
599
600
Status MiniCluster::WaitForTabletServerCount(size_t count,
601
299
                                             vector<shared_ptr<TSDescriptor> >* descs) {
602
299
  Stopwatch sw;
603
299
  sw.start();
604
52.3k
  while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
605
52.3k
    auto leader = GetLeaderMiniMaster();
606
52.3k
    if (leader.ok()) {
607
52.2k
      (*leader)->ts_manager().GetAllDescriptors(descs);
608
52.2k
      if (descs->size() == count) {
609
        // GetAllDescriptors() may return servers that are no longer online.
610
        // Do a second step of verification to verify that the descs that we got
611
        // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
612
291
        size_t match_count = 0;
613
291
        for (const shared_ptr<TSDescriptor>& desc : *descs) {
614
291
          for (auto mini_tablet_server : mini_tablet_servers_) {
615
291
            auto ts = mini_tablet_server->server();
616
291
            if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
617
291
                ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
618
291
              match_count++;
619
291
              break;
620
291
            }
621
291
          }
622
291
        }
623
624
291
        if (match_count == count) {
625
291
          LOG(INFO) << count << " TS(s) registered with Master after "
626
291
                    << sw.elapsed().wall_seconds() << "s";
627
291
          return Status::OK();
628
291
        }
629
291
      }
630
631
52.0k
      YB_LOG_EVERY_N_SECS
(INFO, 5) << "Registered: " << AsString(*descs)254
;
632
52.0k
    }
633
634
52.0k
    SleepFor(MonoDelta::FromMilliseconds(1));
635
52.0k
  }
636
8
  return STATUS(TimedOut, Substitute("$0 TS(s) never registered with master", count));
637
299
}
638
639
79
void MiniCluster::ConfigureClientBuilder(YBClientBuilder* builder) {
640
79
  CHECK_NOTNULL(builder);
641
79
  builder->clear_master_server_addrs();
642
79
  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
643
79
    CHECK(master);
644
79
    builder->add_master_server_addr(master->bound_rpc_addr_str());
645
79
  }
646
79
}
647
648
0
Result<HostPort> MiniCluster::DoGetLeaderMasterBoundRpcAddr() {
649
0
  return VERIFY_RESULT(GetLeaderMiniMaster())->bound_rpc_addr();
650
0
}
651
652
void MiniCluster::AllocatePortsForDaemonType(
653
    const string daemon_type,
654
    const size_t num_daemons,
655
    const string port_type,
656
13.8k
    std::vector<uint16_t>* ports) {
657
13.8k
  const size_t old_size = ports->size();
658
13.8k
  if (ports->size() < num_daemons) {
659
3.97k
    ports->resize(num_daemons, 0 /* default value */);
660
3.97k
  }
661
20.6k
  for (auto i = old_size; i < num_daemons; 
++i6.82k
) {
662
6.82k
    if ((*ports)[i] == 0) {
663
6.82k
      const uint16_t new_port = port_picker_.AllocateFreePort();
664
6.82k
      (*ports)[i] = new_port;
665
6.82k
      LOG(INFO) << "Using auto-assigned port " << new_port << " for a " << daemon_type
666
6.82k
                << " " << port_type << " port";
667
6.82k
    }
668
6.82k
  }
669
13.8k
}
670
671
3.45k
void MiniCluster::EnsurePortsAllocated(size_t new_num_masters, size_t new_num_tservers) {
672
3.45k
  if (new_num_masters == 0) {
673
3.45k
    new_num_masters = std::max(options_.num_masters, num_masters());
674
3.45k
  }
675
3.45k
  AllocatePortsForDaemonType("master", new_num_masters, "RPC", &master_rpc_ports_);
676
3.45k
  AllocatePortsForDaemonType("master", new_num_masters, "web", &master_web_ports_);
677
678
3.45k
  if (new_num_tservers == 0) {
679
1.98k
    new_num_tservers = std::max(options_.num_tablet_servers, num_tablet_servers());
680
1.98k
  }
681
3.45k
  AllocatePortsForDaemonType("tablet server", new_num_tservers, "RPC", &tserver_rpc_ports_);
682
3.45k
  AllocatePortsForDaemonType("tablet server", new_num_tservers, "web", &tserver_web_ports_);
683
3.45k
}
684
685
server::SkewedClockDeltaChanger JumpClock(
686
0
    server::RpcServerBase* server, std::chrono::milliseconds delta) {
687
0
  auto* hybrid_clock = down_cast<server::HybridClock*>(server->clock());
688
0
  return server::SkewedClockDeltaChanger(
689
0
      delta, std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock()));
690
0
}
691
692
std::vector<server::SkewedClockDeltaChanger> SkewClocks(
693
0
    MiniCluster* cluster, std::chrono::milliseconds clock_skew) {
694
0
  std::vector<server::SkewedClockDeltaChanger> delta_changers;
695
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
696
0
    delta_changers.push_back(JumpClock(cluster->mini_tablet_server(i)->server(), i * clock_skew));
697
0
  }
698
0
  return delta_changers;
699
0
}
700
701
std::vector<server::SkewedClockDeltaChanger> JumpClocks(
702
0
    MiniCluster* cluster, std::chrono::milliseconds delta) {
703
0
  std::vector<server::SkewedClockDeltaChanger> delta_changers;
704
0
  auto num_masters = cluster->num_masters();
705
0
  auto num_tservers = cluster->num_tablet_servers();
706
0
  delta_changers.reserve(num_masters + num_tservers);
707
0
  for (size_t i = 0; i != num_masters; ++i) {
708
0
    delta_changers.push_back(JumpClock(cluster->mini_master(i)->master(), delta));
709
0
  }
710
0
  for (size_t i = 0; i != num_tservers; ++i) {
711
0
    delta_changers.push_back(JumpClock(cluster->mini_tablet_server(i)->server(), delta));
712
0
  }
713
0
  return delta_changers;
714
0
}
715
716
0
void StepDownAllTablets(MiniCluster* cluster) {
717
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
718
0
    for (const auto& peer : cluster->GetTabletPeers(i)) {
719
0
      consensus::LeaderStepDownRequestPB req;
720
0
      req.set_tablet_id(peer->tablet_id());
721
0
      consensus::LeaderStepDownResponsePB resp;
722
0
      ASSERT_OK(peer->consensus()->StepDown(&req, &resp));
723
0
    }
724
0
  }
725
0
}
726
727
0
void StepDownRandomTablet(MiniCluster* cluster) {
728
0
  auto peers = ListTabletPeers(cluster, ListPeersFilter::kLeaders);
729
0
  if (!peers.empty()) {
730
0
    auto peer = RandomElement(peers);
731
732
0
    consensus::LeaderStepDownRequestPB req;
733
0
    req.set_tablet_id(peer->tablet_id());
734
0
    consensus::LeaderStepDownResponsePB resp;
735
0
    ASSERT_OK(peer->consensus()->StepDown(&req, &resp));
736
0
  }
737
0
}
738
739
0
std::unordered_set<string> ListTabletIdsForTable(MiniCluster* cluster, const string& table_id) {
740
0
  std::unordered_set<string> tablet_ids;
741
0
  for (auto peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) {
742
0
    if (peer->tablet_metadata()->table_id() == table_id) {
743
0
      tablet_ids.insert(peer->tablet_id());
744
0
    }
745
0
  }
746
0
  return tablet_ids;
747
0
}
748
749
std::unordered_set<string> ListActiveTabletIdsForTable(
750
0
    MiniCluster* cluster, const string& table_id) {
751
0
  std::unordered_set<string> tablet_ids;
752
0
  for (auto peer : ListTableActiveTabletPeers(cluster, table_id)) {
753
0
    tablet_ids.insert(peer->tablet_id());
754
0
  }
755
0
  return tablet_ids;
756
0
}
757
758
0
std::vector<tablet::TabletPeerPtr> ListTabletPeers(MiniCluster* cluster, ListPeersFilter filter) {
759
0
  switch (filter) {
760
0
    case ListPeersFilter::kAll:
761
0
      return ListTabletPeers(cluster, [](const auto& peer) { return true; });
762
0
    case ListPeersFilter::kLeaders:
763
0
      return ListTabletPeers(cluster, [](const auto& peer) {
764
0
        auto consensus = peer->shared_consensus();
765
0
        return consensus && consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
766
0
      });
767
0
    case ListPeersFilter::kNonLeaders:
768
0
      return ListTabletPeers(cluster, [](const auto& peer) {
769
0
        auto consensus = peer->shared_consensus();
770
0
        return consensus && consensus->GetLeaderStatus() == consensus::LeaderStatus::NOT_LEADER;
771
0
      });
772
0
  }
773
774
0
  FATAL_INVALID_ENUM_VALUE(ListPeersFilter, filter);
775
0
}
776
777
std::vector<tablet::TabletPeerPtr> ListTabletPeers(
778
    MiniCluster* cluster,
779
0
    const std::function<bool(const std::shared_ptr<tablet::TabletPeer>&)>& filter) {
780
0
  std::vector<tablet::TabletPeerPtr> result;
781
782
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
783
0
    auto server = cluster->mini_tablet_server(i)->server();
784
0
    if (!server) { // Server is shut down.
785
0
      continue;
786
0
    }
787
0
    auto peers = server->tablet_manager()->GetTabletPeers();
788
0
    for (const auto& peer : peers) {
789
0
      WARN_NOT_OK(
790
0
          WaitFor(
791
0
              [peer] { return peer->consensus() != nullptr || peer->IsShutdownStarted(); }, 5s,
792
0
              Format("Waiting peer T $0 P $1 ready", peer->tablet_id(), peer->permanent_uuid())),
793
0
          "List tablet peers failure");
794
0
      if (peer->consensus() != nullptr && filter(peer)) {
795
0
        result.push_back(peer);
796
0
      }
797
0
    }
798
0
  }
799
800
0
  return result;
801
0
}
802
803
std::vector<tablet::TabletPeerPtr> ListTableActiveTabletLeadersPeers(
804
0
    MiniCluster* cluster, const TableId& table_id) {
805
0
  return ListTabletPeers(cluster, [&table_id](const auto& peer) {
806
0
    return peer->tablet_metadata() &&
807
0
           peer->tablet_metadata()->table_id() == table_id &&
808
0
           peer->tablet_metadata()->tablet_data_state() !=
809
0
               tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED &&
810
0
           peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
811
0
  });
812
0
}
813
814
std::vector<tablet::TabletPeerPtr> ListTableTabletPeers(
815
0
      MiniCluster* cluster, const TableId& table_id) {
816
0
  return ListTabletPeers(cluster, [table_id](const std::shared_ptr<tablet::TabletPeer>& peer) {
817
0
    return peer->tablet_metadata()->table_id() == table_id;
818
0
  });
819
0
}
820
821
namespace {
822
823
0
bool IsActive(tablet::TabletDataState tablet_data_state) {
824
0
  return tablet_data_state != tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED &&
825
0
         tablet_data_state != tablet::TabletDataState::TABLET_DATA_TOMBSTONED &&
826
0
         tablet_data_state != tablet::TabletDataState::TABLET_DATA_DELETED;
827
0
}
828
829
} // namespace
830
831
std::vector<tablet::TabletPeerPtr> ListTableActiveTabletPeers(
832
0
    MiniCluster* cluster, const TableId& table_id) {
833
0
  std::vector<tablet::TabletPeerPtr> result;
834
0
  for (auto peer : ListTableTabletPeers(cluster, table_id)) {
835
0
    const auto tablet_meta = peer->tablet_metadata();
836
0
    if (IsActive(tablet_meta->tablet_data_state())) {
837
0
      result.push_back(peer);
838
0
    }
839
0
  }
840
0
  return result;
841
0
}
842
843
0
std::vector<tablet::TabletPeerPtr> ListActiveTabletLeadersPeers(MiniCluster* cluster) {
844
0
  return ListTabletPeers(cluster, [](const auto& peer) {
845
0
    const auto tablet_meta = peer->tablet_metadata();
846
0
    const auto consensus = peer->shared_consensus();
847
0
    return tablet_meta && tablet_meta->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE &&
848
0
           IsActive(tablet_meta->tablet_data_state()) &&
849
0
           consensus->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
850
0
  });
851
0
}
852
853
std::vector<tablet::TabletPeerPtr> ListTableInactiveSplitTabletPeers(
854
0
    MiniCluster* cluster, const TableId& table_id) {
855
0
  std::vector<tablet::TabletPeerPtr> result;
856
0
  for (auto peer : ListTableTabletPeers(cluster, table_id)) {
857
0
    if (peer->tablet()->metadata()->tablet_data_state() ==
858
0
        tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
859
0
      result.push_back(peer);
860
0
    }
861
0
  }
862
0
  return result;
863
0
}
864
865
Result<std::vector<tablet::TabletPeerPtr>> WaitForTableActiveTabletLeadersPeers(
866
    MiniCluster* cluster, const TableId& table_id,
867
0
    const size_t num_active_leaders, const MonoDelta timeout) {
868
0
  SCHECK_NOTNULL(cluster);
869
870
0
  std::vector<tablet::TabletPeerPtr> active_leaders_peers;
871
0
  RETURN_NOT_OK(LoggedWaitFor([&] {
872
0
    active_leaders_peers = ListTableActiveTabletLeadersPeers(cluster, table_id);
873
0
    LOG(INFO) << "active_leader_peers.size(): " << active_leaders_peers.size();
874
0
    return active_leaders_peers.size() == num_active_leaders;
875
0
  }, timeout, "Waiting for leaders ..."));
876
0
  return active_leaders_peers;
877
0
}
878
879
Status WaitUntilTabletHasLeader(
880
0
    MiniCluster* cluster, const string& tablet_id, MonoTime deadline) {
881
0
  return Wait([cluster, &tablet_id] {
882
0
    auto tablet_peers = ListTabletPeers(cluster, [&tablet_id](auto peer) {
883
0
      return peer->tablet_id() == tablet_id
884
0
          && peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
885
0
    });
886
0
    return tablet_peers.size() == 1;
887
0
  }, deadline, "Waiting for election in tablet " + tablet_id);
888
0
}
889
890
0
CHECKED_STATUS WaitUntilMasterHasLeader(MiniCluster* cluster, MonoDelta timeout) {
891
0
  return WaitFor([cluster] {
892
0
    for (size_t i = 0; i != cluster->num_masters(); ++i) {
893
0
      auto tablet_peer = cluster->mini_master(i)->tablet_peer();
894
0
      if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
895
0
        return true;
896
0
      }
897
0
    }
898
0
    return false;
899
0
  }, timeout, "Waiting for master leader");
900
0
}
901
902
Status WaitForLeaderOfSingleTablet(
903
    MiniCluster* cluster, tablet::TabletPeerPtr leader, MonoDelta duration,
904
0
    const std::string& description) {
905
0
  return WaitFor([cluster, &leader] {
906
0
    auto new_leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders);
907
0
    return new_leaders.size() == 1 && new_leaders[0] == leader;
908
0
  }, duration, description);
909
0
}
910
911
Status StepDown(
912
    tablet::TabletPeerPtr leader, const std::string& new_leader_uuid,
913
0
    ForceStepDown force_step_down) {
914
0
  consensus::LeaderStepDownRequestPB req;
915
0
  req.set_tablet_id(leader->tablet_id());
916
0
  req.set_new_leader_uuid(new_leader_uuid);
917
0
  if (force_step_down) {
918
0
    req.set_force_step_down(true);
919
0
  }
920
0
  consensus::LeaderStepDownResponsePB resp;
921
0
  RETURN_NOT_OK(leader->consensus()->StepDown(&req, &resp));
922
0
  if (resp.has_error()) {
923
0
    return STATUS_FORMAT(RuntimeError, "Step down failed: $0", resp);
924
0
  }
925
0
  return Status::OK();
926
0
}
927
928
std::thread RestartsThread(
929
0
    MiniCluster* cluster, CoarseDuration interval, std::atomic<bool>* stop_flag) {
930
0
  return std::thread([cluster, interval, stop_flag] {
931
0
    CDSAttacher attacher;
932
0
    SetFlagOnExit set_stop_on_exit(stop_flag);
933
0
    int it = 0;
934
0
    while (!stop_flag->load(std::memory_order_acquire)) {
935
0
      std::this_thread::sleep_for(interval);
936
0
      ASSERT_OK(cluster->mini_tablet_server(++it % cluster->num_tablet_servers())->Restart());
937
0
    }
938
0
  });
939
0
}
940
941
0
Status WaitAllReplicasReady(MiniCluster* cluster, MonoDelta timeout) {
942
0
  return WaitFor([cluster] {
943
0
    std::unordered_set<std::string> tablet_ids;
944
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
945
0
    for (const auto& peer : peers) {
946
0
      if (peer->state() != tablet::RaftGroupStatePB::RUNNING) {
947
0
        return false;
948
0
      }
949
0
      tablet_ids.insert(peer->tablet_id());
950
0
    }
951
0
    auto replication_factor = cluster->num_tablet_servers();
952
0
    return tablet_ids.size() * replication_factor == peers.size();
953
0
  }, timeout, "Wait all replicas to be ready");
954
0
}
955
956
0
Status WaitAllReplicasHaveIndex(MiniCluster* cluster, int64_t index, MonoDelta timeout) {
957
0
  return WaitFor([cluster, index] {
958
0
    std::unordered_set<std::string> tablet_ids;
959
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
960
0
    for (const auto& peer : peers) {
961
0
      if (peer->GetLatestLogEntryOpId().index < index) {
962
0
        return false;
963
0
      }
964
0
      tablet_ids.insert(peer->tablet_id());
965
0
    }
966
0
    auto replication_factor = cluster->num_tablet_servers();
967
0
    return tablet_ids.size() * replication_factor == peers.size();
968
0
  }, timeout, "Wait for all replicas to have a specific Raft index");
969
0
}
970
971
template <class Collection>
972
0
void PushBackIfNotNull(const typename Collection::value_type& value, Collection* collection) {
973
0
  if (value != nullptr) {
974
0
    collection->push_back(value);
975
0
  }
976
0
}
977
978
0
std::vector<rocksdb::DB*> GetAllRocksDbs(MiniCluster* cluster, bool include_intents) {
979
0
  std::vector<rocksdb::DB*> dbs;
980
0
  for (auto& peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) {
981
0
    const auto* tablet = peer->tablet();
982
0
    PushBackIfNotNull(tablet->TEST_db(), &dbs);
983
0
    if (include_intents) {
984
0
      PushBackIfNotNull(tablet->TEST_intents_db(), &dbs);
985
0
    }
986
0
  }
987
0
  return dbs;
988
0
}
989
990
0
int NumTotalRunningCompactions(MiniCluster* cluster) {
991
0
  int compactions = 0;
992
0
  for (auto* db : GetAllRocksDbs(cluster)) {
993
0
    compactions += down_cast<rocksdb::DBImpl*>(db)->TEST_NumTotalRunningCompactions();
994
0
  }
995
0
  return compactions;
996
0
}
997
998
0
int NumRunningFlushes(MiniCluster* cluster) {
999
0
  int flushes = 0;
1000
0
  for (auto* db : GetAllRocksDbs(cluster)) {
1001
0
    flushes += down_cast<rocksdb::DBImpl*>(db)->TEST_NumRunningFlushes();
1002
0
  }
1003
0
  return flushes;
1004
0
}
1005
1006
Result<scoped_refptr<master::TableInfo>> FindTable(
1007
0
    MiniCluster* cluster, const client::YBTableName& table_name) {
1008
0
  auto& catalog_manager = VERIFY_RESULT(cluster->GetLeaderMiniMaster())->catalog_manager();
1009
0
  master::TableIdentifierPB identifier;
1010
0
  table_name.SetIntoTableIdentifierPB(&identifier);
1011
0
  return catalog_manager.FindTable(identifier);
1012
0
}
1013
1014
14
Status WaitForInitDb(MiniCluster* cluster) {
1015
14
  const auto start_time = CoarseMonoClock::now();
1016
14
  const auto kTimeout = RegularBuildVsSanitizers(600s, 1800s);
1017
14
  while (CoarseMonoClock::now() <= start_time + kTimeout) {
1018
14
    auto leader_mini_master = cluster->GetLeaderMiniMaster();
1019
14
    if (!leader_mini_master.ok()) {
1020
0
      continue;
1021
0
    }
1022
14
    auto& catalog_manager = (*leader_mini_master)->catalog_manager();
1023
14
    master::IsInitDbDoneRequestPB req;
1024
14
    master::IsInitDbDoneResponsePB resp;
1025
14
    auto status = catalog_manager.IsInitDbDone(&req, &resp);
1026
14
    if (!status.ok()) {
1027
0
      LOG(INFO) << "IsInitDbDone failure: " << status;
1028
0
      continue;
1029
0
    }
1030
14
    if (resp.has_initdb_error()) {
1031
0
      return STATUS_FORMAT(RuntimeError, "Init DB failed: $0", resp.initdb_error());
1032
0
    }
1033
14
    if (resp.done()) {
1034
14
      return Status::OK();
1035
14
    }
1036
0
    std::this_thread::sleep_for(500ms);
1037
0
  }
1038
1039
0
  return STATUS_FORMAT(TimedOut, "Unable to init db in $0", kTimeout);
1040
14
}
1041
1042
0
size_t CountIntents(MiniCluster* cluster, const TabletPeerFilter& filter) {
1043
0
  size_t result = 0;
1044
0
  auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
1045
0
  for (const auto &peer : peers) {
1046
0
    auto participant = peer->tablet() ? peer->tablet()->transaction_participant() : nullptr;
1047
0
    if (!participant) {
1048
0
      continue;
1049
0
    }
1050
0
    if (filter && !filter(peer.get())) {
1051
0
      continue;
1052
0
    }
1053
0
    auto intents_count = participant->TEST_CountIntents();
1054
0
    if (intents_count.first) {
1055
0
      result += intents_count.first;
1056
0
      LOG(INFO) << Format("T $0 P $1: Intents present: $2, transactions: $3", peer->tablet_id(),
1057
0
                          peer->permanent_uuid(), intents_count.first, intents_count.second);
1058
0
    }
1059
0
  }
1060
0
  return result;
1061
0
}
1062
1063
0
MiniTabletServer* FindTabletLeader(MiniCluster* cluster, const TabletId& tablet_id) {
1064
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1065
0
    auto server = cluster->mini_tablet_server(i);
1066
0
    if (!server->server()) { // Server is shut down.
1067
0
      continue;
1068
0
    }
1069
0
    if (server->server()->LeaderAndReady(tablet_id)) {
1070
0
      return server;
1071
0
    }
1072
0
  }
1073
1074
0
  return nullptr;
1075
0
}
1076
1077
0
void ShutdownAllTServers(MiniCluster* cluster) {
1078
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1079
0
    cluster->mini_tablet_server(i)->Shutdown();
1080
0
  }
1081
0
}
1082
1083
0
Status StartAllTServers(MiniCluster* cluster) {
1084
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1085
0
    RETURN_NOT_OK(cluster->mini_tablet_server(i)->Start());
1086
0
  }
1087
1088
0
  return Status::OK();
1089
0
}
1090
1091
0
void ShutdownAllMasters(MiniCluster* cluster) {
1092
0
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
1093
0
    cluster->mini_master(i)->Shutdown();
1094
0
  }
1095
0
}
1096
1097
0
Status StartAllMasters(MiniCluster* cluster) {
1098
0
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
1099
0
    RETURN_NOT_OK(cluster->mini_master(i)->Start());
1100
0
  }
1101
1102
0
  return Status::OK();
1103
0
}
1104
1105
void SetupConnectivity(
1106
0
    rpc::Messenger* messenger, const IpAddress& address, Connectivity connectivity) {
1107
0
  switch (connectivity) {
1108
0
    case Connectivity::kOn:
1109
0
      messenger->RestoreConnectivityTo(address);
1110
0
      return;
1111
0
    case Connectivity::kOff:
1112
0
      messenger->BreakConnectivityTo(address);
1113
0
      return;
1114
0
  }
1115
0
  FATAL_INVALID_ENUM_VALUE(Connectivity, connectivity);
1116
0
}
1117
1118
Status SetupConnectivity(
1119
0
    MiniCluster* cluster, size_t idx1, size_t idx2, Connectivity connectivity) {
1120
0
  for (auto from_idx : {idx1, idx2}) {
1121
0
    auto to_idx = idx1 ^ idx2 ^ from_idx;
1122
0
    for (auto type : {server::Private::kFalse, server::Private::kTrue}) {
1123
      // TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed.
1124
0
      auto address = VERIFY_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, type)));
1125
0
      if (from_idx < cluster->num_masters()) {
1126
0
        SetupConnectivity(
1127
0
            cluster->mini_master(from_idx)->master()->messenger(), address, connectivity);
1128
0
      }
1129
0
      if (from_idx < cluster->num_tablet_servers()) {
1130
0
        SetupConnectivity(
1131
0
            cluster->mini_tablet_server(from_idx)->server()->messenger(), address, connectivity);
1132
0
      }
1133
0
    }
1134
0
  }
1135
1136
0
  return Status::OK();
1137
0
}
1138
1139
0
Status BreakConnectivity(MiniCluster* cluster, size_t idx1, size_t idx2) {
1140
0
  return SetupConnectivity(cluster, idx1, idx2, Connectivity::kOff);
1141
0
}
1142
1143
0
Result<size_t> ServerWithLeaders(MiniCluster* cluster) {
1144
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
1145
0
    auto* server = cluster->mini_tablet_server(i)->server();
1146
0
    if (!server) {
1147
0
      continue;
1148
0
    }
1149
0
    auto* ts_manager = server->tablet_manager();
1150
0
    if (ts_manager->GetLeaderCount() != 0) {
1151
0
      return i;
1152
0
    }
1153
0
  }
1154
0
  return STATUS(NotFound, "No tablet server with leaders");
1155
0
}
1156
1157
0
void SetCompactFlushRateLimitBytesPerSec(MiniCluster* cluster, const size_t bytes_per_sec) {
1158
0
  LOG(INFO) << "Setting FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec to: " << bytes_per_sec
1159
0
            << " and updating compact/flush rate in existing tablets";
1160
0
  FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec = bytes_per_sec;
1161
0
  for (auto& tablet_peer : ListTabletPeers(cluster, ListPeersFilter::kAll)) {
1162
0
    auto tablet = tablet_peer->shared_tablet();
1163
0
    for (auto* db : { tablet->TEST_db(), tablet->TEST_intents_db() }) {
1164
0
      if (db) {
1165
0
        db->GetDBOptions().rate_limiter->SetBytesPerSecond(bytes_per_sec);
1166
0
      }
1167
0
    }
1168
0
  }
1169
0
}
1170
1171
CHECKED_STATUS WaitAllReplicasSynchronizedWithLeader(
1172
0
    MiniCluster* cluster, CoarseTimePoint deadline) {
1173
0
  auto leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders);
1174
0
  std::unordered_map<TabletId, int64_t> last_committed_idx;
1175
0
  for (const auto& peer : leaders) {
1176
0
    auto idx = peer->consensus()->GetLastCommittedOpId().index;
1177
0
    last_committed_idx.emplace(peer->tablet_id(), idx);
1178
0
    LOG(INFO) << "Committed op id for " << peer->tablet_id() << ": " << idx;
1179
0
  }
1180
0
  auto non_leaders = ListTabletPeers(cluster, ListPeersFilter::kNonLeaders);
1181
0
  for (const auto& peer : non_leaders) {
1182
0
    auto it = last_committed_idx.find(peer->tablet_id());
1183
0
    if (it == last_committed_idx.end()) {
1184
0
      return STATUS_FORMAT(IllegalState, "Unknown committed op id for $0", peer->tablet_id());
1185
0
    }
1186
0
    RETURN_NOT_OK(Wait([idx = it->second, peer]() {
1187
0
      return peer->consensus()->GetLastCommittedOpId().index >= idx;
1188
0
    },
1189
0
    deadline, Format("Wait T $0 P $1 commit $2",
1190
0
                     peer->tablet_id(), peer->permanent_uuid(), it->second)));
1191
0
  }
1192
0
  return Status::OK();
1193
0
}
1194
1195
}  // namespace yb