YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/external_mini_cluster.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/external_mini_cluster.h"
34
35
#include <atomic>
36
#include <chrono>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <string>
41
#include <thread>
42
#include <unordered_map>
43
#include <vector>
44
45
#include <glog/logging.h>
46
#include <gtest/gtest.h>
47
#include <rapidjson/document.h>
48
49
#include "yb/client/client.h"
50
51
#include "yb/common/wire_protocol.h"
52
53
#include "yb/consensus/consensus.proxy.h"
54
55
#include "yb/fs/fs_manager.h"
56
57
#include "yb/gutil/algorithm.h"
58
#include "yb/gutil/bind.h"
59
#include "yb/gutil/macros.h"
60
#include "yb/gutil/ref_counted.h"
61
#include "yb/gutil/singleton.h"
62
#include "yb/gutil/strings/join.h"
63
#include "yb/gutil/strings/substitute.h"
64
#include "yb/gutil/strings/util.h"
65
66
#include "yb/integration-tests/cluster_itest_util.h"
67
68
#include "yb/master/master_admin.proxy.h"
69
#include "yb/master/master_cluster.proxy.h"
70
#include "yb/master/master_rpc.h"
71
#include "yb/master/sys_catalog.h"
72
73
#include "yb/rpc/messenger.h"
74
#include "yb/rpc/proxy.h"
75
#include "yb/rpc/rpc_controller.h"
76
77
#include "yb/server/server_base.pb.h"
78
#include "yb/server/server_base.proxy.h"
79
80
#include "yb/tserver/tserver_admin.proxy.h"
81
#include "yb/tserver/tserver_service.proxy.h"
82
83
#include "yb/util/async_util.h"
84
#include "yb/util/curl_util.h"
85
#include "yb/util/env.h"
86
#include "yb/util/faststring.h"
87
#include "yb/util/format.h"
88
#include "yb/util/jsonreader.h"
89
#include "yb/util/logging.h"
90
#include "yb/util/metrics.h"
91
#include "yb/util/monotime.h"
92
#include "yb/util/net/net_fwd.h"
93
#include "yb/util/net/sockaddr.h"
94
#include "yb/util/path_util.h"
95
#include "yb/util/pb_util.h"
96
#include "yb/util/size_literals.h"
97
#include "yb/util/slice.h"
98
#include "yb/util/status_format.h"
99
#include "yb/util/status_log.h"
100
#include "yb/util/stopwatch.h"
101
#include "yb/util/subprocess.h"
102
#include "yb/util/test_util.h"
103
#include "yb/util/tsan_util.h"
104
105
using namespace std::literals;  // NOLINT
106
using namespace yb::size_literals;  // NOLINT
107
108
using std::atomic;
109
using std::lock_guard;
110
using std::mutex;
111
using std::shared_ptr;
112
using std::string;
113
using std::thread;
114
using std::unique_ptr;
115
116
using rapidjson::Value;
117
using strings::Substitute;
118
119
using yb::master::GetLeaderMasterRpc;
120
using yb::master::IsInitDbDoneRequestPB;
121
using yb::master::IsInitDbDoneResponsePB;
122
using yb::server::ServerStatusPB;
123
using yb::tserver::ListTabletsRequestPB;
124
using yb::tserver::ListTabletsResponsePB;
125
using yb::tserver::TabletServerErrorPB;
126
using yb::tserver::TabletServerServiceProxy;
127
using yb::consensus::ConsensusServiceProxy;
128
using yb::consensus::RaftPeerPB;
129
using yb::consensus::ChangeConfigRequestPB;
130
using yb::consensus::ChangeConfigResponsePB;
131
using yb::consensus::ChangeConfigType;
132
using yb::consensus::GetLastOpIdRequestPB;
133
using yb::consensus::GetLastOpIdResponsePB;
134
using yb::consensus::LeaderStepDownRequestPB;
135
using yb::consensus::LeaderStepDownResponsePB;
136
using yb::consensus::RunLeaderElectionRequestPB;
137
using yb::consensus::RunLeaderElectionResponsePB;
138
using yb::master::IsMasterLeaderReadyRequestPB;
139
using yb::master::IsMasterLeaderReadyResponsePB;
140
using yb::master::GetMasterClusterConfigRequestPB;
141
using yb::master::GetMasterClusterConfigResponsePB;
142
using yb::master::ChangeMasterClusterConfigRequestPB;
143
using yb::master::ChangeMasterClusterConfigResponsePB;
144
using yb::master::SysClusterConfigEntryPB;
145
using yb::tserver::ListTabletsForTabletServerRequestPB;
146
using yb::tserver::ListTabletsForTabletServerResponsePB;
147
using yb::master::ListMastersRequestPB;
148
using yb::master::ListMastersResponsePB;
149
using yb::tserver::TabletServerErrorPB;
150
using yb::rpc::RpcController;
151
152
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
153
154
DECLARE_string(vmodule);
155
DECLARE_int32(replication_factor);
156
DECLARE_bool(mem_tracker_logging);
157
DECLARE_bool(mem_tracker_log_stack_trace);
158
DECLARE_string(minicluster_daemon_id);
159
DECLARE_bool(use_libbacktrace);
160
DECLARE_bool(never_fsync);
161
162
DEFINE_string(external_daemon_heap_profile_prefix, "",
163
              "If this is not empty, tcmalloc's HEAPPROFILE is set this, followed by a unique "
164
              "suffix for external mini-cluster daemons.");
165
166
DEFINE_bool(external_daemon_safe_shutdown, false,
167
            "Shutdown external daemons using SIGTERM first. Disabled by default to avoid "
168
            "interfering with kill-testing.");
169
170
DECLARE_int64(outbound_rpc_block_size);
171
DECLARE_int64(outbound_rpc_memory_limit);
172
173
DEFINE_int64(external_mini_cluster_max_log_bytes, 50_MB * 100,
174
             "Max total size of log bytes produced by all external mini-cluster daemons. "
175
             "The test is shut down if this limit is exceeded.");
176
177
namespace yb {
178
179
static const char* const kMasterBinaryName = "yb-master";
180
static const char* const kTabletServerBinaryName = "yb-tserver";
181
static double kProcessStartTimeoutSeconds = 60.0;
182
static MonoDelta kTabletServerRegistrationTimeout = 60s;
183
184
static const int kHeapProfileSignal = SIGUSR1;
185
186
constexpr size_t kDefaultMemoryLimitHardBytes = NonTsanVsTsan(1_GB, 512_MB);
187
188
namespace {
189
190
1.53k
void AddExtraFlagsFromEnvVar(const char* env_var_name, std::vector<std::string>* args_dest) {
191
1.53k
  const char* extra_daemon_flags_env_var_value = getenv(env_var_name);
192
1.53k
  if (extra_daemon_flags_env_var_value) {
193
0
    LOG(INFO) << "Setting extra daemon flags as specified by env var " << env_var_name << ": "
194
0
              << extra_daemon_flags_env_var_value;
195
    // TODO: this has an issue with handling quoted arguments with embedded spaces.
196
0
    std::istringstream iss(extra_daemon_flags_env_var_value);
197
0
    copy(std::istream_iterator<string>(iss),
198
0
         std::istream_iterator<string>(),
199
0
         std::back_inserter(*args_dest));
200
1.53k
  } else {
201
1.53k
    LOG(INFO) << "Env var " << env_var_name << " not specified, not setting extra flags from it";
202
1.53k
  }
203
1.53k
}
204
205
std::vector<std::string> FsRootDirs(const std::string& data_dir,
206
738
                                    uint16_t num_drives) {
207
738
  if (num_drives == 1) {
208
706
    return vector<string>{data_dir};
209
706
  }
210
32
  vector<string> data_dirs;
211
98
  for (int drive =  1; drive <= num_drives; ++drive) {
212
66
    data_dirs.push_back(JoinPathSegments(data_dir, Substitute("d-$0", drive)));
213
66
  }
214
32
  return data_dirs;
215
32
}
216
217
std::vector<std::string> FsDataDirs(const std::string& data_dir,
218
                                    const std::string& server_type,
219
676
                                    uint16_t num_drives) {
220
676
  if (num_drives == 1) {
221
646
    return vector<string>{GetServerTypeDataPath(data_dir, server_type)};
222
646
  }
223
30
  vector<string> data_dirs;
224
91
  for (int drive =  1; drive <= num_drives; ++drive) {
225
61
    data_dirs.push_back(GetServerTypeDataPath(
226
61
                          JoinPathSegments(data_dir, Substitute("d-$0", drive)), server_type));
227
61
  }
228
30
  return data_dirs;
229
30
}
230
231
}  // anonymous namespace
232
233
// ------------------------------------------------------------------------------------------------
234
// ExternalMiniClusterOptions
235
// ------------------------------------------------------------------------------------------------
236
237
7
Status ExternalMiniClusterOptions::RemovePort(const uint16_t port) {
238
7
  auto iter = std::find(master_rpc_ports.begin(), master_rpc_ports.end(), port);
239
240
7
  if (iter == master_rpc_ports.end()) {
241
0
    return STATUS(InvalidArgument, Substitute(
242
0
        "Port to be removed '$0' not found in existing list of $1 masters.",
243
0
        port, num_masters));
244
0
  }
245
246
7
  master_rpc_ports.erase(iter);
247
7
  --num_masters;
248
249
7
  return Status::OK();
250
7
}
251
252
10
Status ExternalMiniClusterOptions::AddPort(const uint16_t port) {
253
10
  auto iter = std::find(master_rpc_ports.begin(), master_rpc_ports.end(), port);
254
255
10
  if (iter != master_rpc_ports.end()) {
256
0
    return STATUS(InvalidArgument, Substitute(
257
0
        "Port to be added '$0' already found in the existing list of $1 masters.",
258
0
        port, num_masters));
259
0
  }
260
261
10
  master_rpc_ports.push_back(port);
262
10
  ++num_masters;
263
264
10
  return Status::OK();
265
10
}
266
267
231
void ExternalMiniClusterOptions::AdjustMasterRpcPorts() {
268
231
  if (master_rpc_ports.size() == 1 && master_rpc_ports[0] == 0) {
269
    // Add missing master ports to avoid errors when we try to start the cluster.
270
201
    while (master_rpc_ports.size() < num_masters) {
271
8
      master_rpc_ports.push_back(0);
272
8
    }
273
193
  }
274
231
}
275
276
// ------------------------------------------------------------------------------------------------
277
// ExternalMiniCluster
278
// ------------------------------------------------------------------------------------------------
279
280
ExternalMiniCluster::ExternalMiniCluster(const ExternalMiniClusterOptions& opts)
281
231
    : opts_(opts), add_new_master_at_(-1) {
282
231
  opts_.AdjustMasterRpcPorts();
283
  // These "extra mini cluster options" are added in the end of the command line.
284
231
  const auto common_extra_flags = {
285
231
      "--enable_tracing"s,
286
231
      Substitute("--memory_limit_hard_bytes=$0", kDefaultMemoryLimitHardBytes),
287
231
      Substitute("--never_fsync=$0", FLAGS_never_fsync),
288
230
      (opts.log_to_file ? "--alsologtostderr"s : "--logtostderr"s),
289
0
      (IsTsan() ? "--rpc_slow_query_threshold_ms=20000"s :
290
231
          "--rpc_slow_query_threshold_ms=10000"s)
291
231
  };
292
462
  for (auto* extra_flags : {&opts_.extra_master_flags, &opts_.extra_tserver_flags}) {
293
    // Common default extra flags are inserted in the beginning so that they can be overridden by
294
    // caller-specified flags.
295
462
    extra_flags->insert(extra_flags->begin(),
296
462
                        common_extra_flags.begin(),
297
462
                        common_extra_flags.end());
298
462
  }
299
231
  AddExtraFlagsFromEnvVar("YB_EXTRA_MASTER_FLAGS", &opts_.extra_master_flags);
300
231
  AddExtraFlagsFromEnvVar("YB_EXTRA_TSERVER_FLAGS", &opts_.extra_tserver_flags);
301
231
}
302
303
170
ExternalMiniCluster::~ExternalMiniCluster() {
304
170
  Shutdown();
305
170
  if (messenger_holder_) {
306
169
    messenger_holder_->Shutdown();
307
169
  }
308
170
}
309
310
230
Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
311
230
  string exe;
312
230
  RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
313
230
  *ret = DirName(exe) + "/../bin";
314
230
  return Status::OK();
315
230
}
316
317
183
std::string ExternalMiniCluster::GetClusterDataDirName() const {
318
183
  if (opts_.cluster_id == "") {
319
183
    return "minicluster-data";
320
183
  }
321
0
  return Format("minicluster-data-$0", opts_.cluster_id);
322
0
}
323
324
230
Status ExternalMiniCluster::HandleOptions() {
325
230
  daemon_bin_path_ = opts_.daemon_bin_path;
326
230
  if (daemon_bin_path_.empty()) {
327
230
    RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_));
328
230
  }
329
330
230
  data_root_ = opts_.data_root;
331
230
  if (data_root_.empty()) {
332
    // If they don't specify a data root, use the current gtest directory.
333
183
    data_root_ = JoinPathSegments(GetTestDataDirectory(), GetClusterDataDirName());
334
335
    // If "data_root_counter" is non-negative, and the auto-generated "data_root_" directory already
336
    // exists, create a subdirectory using the counter value as its name. The caller should maintain
337
    // this counter and increment it for each test run.
338
183
    if (opts_.data_root_counter >= 0) {
339
0
      struct stat sb;
340
0
      if (stat(data_root_.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) {
341
0
        data_root_ = Substitute("$0/$1", data_root_, opts_.data_root_counter);
342
0
        CHECK_EQ(mkdir(data_root_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH), 0);
343
0
      }
344
0
    }
345
183
  }
346
347
230
  return Status::OK();
348
230
}
349
350
230
Status ExternalMiniCluster::Start(rpc::Messenger* messenger) {
351
0
  CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
352
0
      << "). Maybe you meant Restart()?";
353
0
  CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
354
0
      << tablet_servers_.size() << "). Maybe you meant Restart()?";
355
230
  RETURN_NOT_OK(HandleOptions());
356
230
  FLAGS_replication_factor = narrow_cast<int>(opts_.num_masters);
357
358
230
  if (messenger == nullptr) {
359
230
    rpc::MessengerBuilder builder("minicluster-messenger");
360
230
    builder.set_num_reactors(1);
361
230
    messenger_holder_ = VERIFY_RESULT_PREPEND(
362
230
        builder.Build(), "Failed to start Messenger for minicluster");
363
230
    messenger_ = messenger_holder_.get();
364
0
  } else {
365
0
    messenger_holder_ = nullptr;
366
0
    messenger_ = messenger;
367
0
  }
368
230
  proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_);
369
370
230
  RETURN_NOT_OK(Env::Default()->CreateDirs(data_root_));
371
372
230
  LOG(INFO) << "Starting cluster with option bind_to_unique_loopback_addresses="
373
177
      << (opts_.bind_to_unique_loopback_addresses ? "true" : "false");
374
375
230
  LOG(INFO) << "Starting " << opts_.num_masters << " masters";
376
230
  RETURN_NOT_OK_PREPEND(StartMasters(), "Failed to start masters.");
377
230
  add_new_master_at_ = opts_.num_masters;
378
379
230
  if (opts_.num_tablet_servers > 0) {
380
216
    LOG(INFO) << "Starting " << opts_.num_tablet_servers << " tablet servers";
381
382
864
    for (size_t i = 1; i <= opts_.num_tablet_servers; i++) {
383
648
      RETURN_NOT_OK_PREPEND(
384
648
          AddTabletServer(ExternalMiniClusterOptions::kDefaultStartCqlProxy),
385
648
          Substitute("Failed starting tablet server $0", i));
386
648
    }
387
216
    RETURN_NOT_OK(WaitForTabletServerCount(
388
216
        opts_.num_tablet_servers, kTabletServerRegistrationTimeout));
389
14
  } else {
390
14
    LOG(INFO) << "No need to start tablet servers";
391
14
  }
392
393
229
  running_ = true;
394
229
  return Status::OK();
395
230
}
396
397
343
void ExternalMiniCluster::Shutdown(NodeSelectionMode mode) {
398
  // TODO: in the regular MiniCluster Shutdown is a no-op if running_ is false.
399
  // Therefore, in case of an error during cluster startup behavior might be different.
400
343
  if (mode == ALL) {
401
498
    for (const scoped_refptr<ExternalMaster>& master : masters_) {
402
498
      if (master) {
403
498
        master->Shutdown();
404
498
      }
405
498
    }
406
342
  }
407
408
945
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
409
945
    ts->Shutdown();
410
945
  }
411
343
  running_ = false;
412
343
}
413
414
2
Status ExternalMiniCluster::Restart() {
415
2
  LOG(INFO) << "Restarting cluster with " << masters_.size() << " masters.";
416
5
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
417
5
    if (master && master->IsShutdown()) {
418
4
      RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " +
419
4
                                               master->bound_rpc_hostport().ToString());
420
4
    }
421
5
  }
422
423
3
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
424
3
    if (ts->IsShutdown()) {
425
3
      RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " +
426
3
                                           ts->bound_rpc_hostport().ToString());
427
3
    }
428
3
  }
429
430
2
  RETURN_NOT_OK(WaitForTabletServerCount(tablet_servers_.size(), kTabletServerRegistrationTimeout));
431
432
2
  running_ = true;
433
2
  return Status::OK();
434
2
}
435
436
919
string ExternalMiniCluster::GetBinaryPath(const string& binary) const {
437
919
  CHECK(!daemon_bin_path_.empty());
438
919
  string default_path = JoinPathSegments(daemon_bin_path_, binary);
439
919
  if (Env::Default()->FileExists(default_path)) {
440
919
    return default_path;
441
919
  }
442
443
  // In CLion-based builds we sometimes have to look for the binary in other directories.
444
0
  string alternative_dir;
445
0
  if (binary == "yb-master") {
446
0
    alternative_dir = "master";
447
0
  } else if (binary == "yb-tserver") {
448
0
    alternative_dir = "tserver";
449
0
  } else {
450
0
    LOG(WARNING) << "Default path " << default_path << " for binary " << binary <<
451
0
      " does not exist, and no alternative directory is available for this binary";
452
0
    return default_path;
453
0
  }
454
455
0
  string alternative_path = JoinPathSegments(daemon_bin_path_,
456
0
    "../" + alternative_dir + "/" + binary);
457
0
  if (Env::Default()->FileExists(alternative_path)) {
458
0
    LOG(INFO) << "Default path " << default_path << " for binary " << binary <<
459
0
      " does not exist, using alternative location: " << alternative_path;
460
0
    return alternative_path;
461
0
  } else {
462
0
    LOG(WARNING) << "Neither " << default_path << " nor " << alternative_path << " exist";
463
0
    return default_path;
464
0
  }
465
0
}
466
467
997
string ExternalMiniCluster::GetDataPath(const string& daemon_id) const {
468
997
  CHECK(!data_root_.empty());
469
997
  return JoinPathSegments(data_root_, daemon_id);
470
997
}
471
472
namespace {
473
987
vector<string> SubstituteInFlags(const vector<string>& orig_flags, size_t index) {
474
987
  string str_index = std::to_string(index);
475
987
  vector<string> ret;
476
9.62k
  for (const string& orig : orig_flags) {
477
9.62k
    ret.push_back(StringReplace(orig, "${index}", str_index, true));
478
9.62k
  }
479
987
  return ret;
480
987
}
481
482
}  // anonymous namespace
483
484
1
Result<ExternalMaster *> ExternalMiniCluster::StartMasterWithPeers(const string& peer_addrs) {
485
1
  uint16_t rpc_port = AllocateFreePort();
486
1
  uint16_t http_port = AllocateFreePort();
487
1
  LOG(INFO) << "Using auto-assigned rpc_port " << rpc_port << "; http_port " << http_port
488
1
            << " to start a new external mini-cluster master with peers '" << peer_addrs << "'.";
489
490
1
  string addr = MasterAddressForPort(rpc_port);
491
1
  string exe = GetBinaryPath(kMasterBinaryName);
492
493
1
  ExternalMaster* master =
494
1
      new ExternalMaster(add_new_master_at_, messenger_, proxy_cache_.get(), exe,
495
1
                         GetDataPath(Substitute("master-$0", add_new_master_at_)),
496
1
                         opts_.extra_master_flags, addr, http_port, peer_addrs);
497
498
1
  RETURN_NOT_OK(master->Start());
499
500
1
  add_new_master_at_++;
501
1
  return master;
502
1
}
503
504
364
std::string ExternalMiniCluster::MasterAddressForPort(uint16_t port) const {
505
364
  return Format(opts_.use_even_ips ? "127.0.0.2:$0" : "127.0.0.1:$0", port);
506
364
}
507
508
9
void ExternalMiniCluster::StartShellMaster(ExternalMaster** new_master) {
509
9
  uint16_t rpc_port = AllocateFreePort();
510
9
  uint16_t http_port = AllocateFreePort();
511
9
  LOG(INFO) << "Using auto-assigned rpc_port " << rpc_port << "; http_port " << http_port
512
9
            << " to start a new external mini-cluster shell master.";
513
514
9
  string addr = MasterAddressForPort(rpc_port);
515
516
9
  string exe = GetBinaryPath(kMasterBinaryName);
517
518
9
  ExternalMaster* master = new ExternalMaster(
519
9
      add_new_master_at_,
520
9
      messenger_,
521
9
      proxy_cache_.get(),
522
9
      exe,
523
9
      GetDataPath(Substitute("master-$0", add_new_master_at_)),
524
9
      opts_.extra_master_flags,
525
9
      addr,
526
9
      http_port,
527
9
      "");
528
529
9
  Status s = master->Start(true);
530
531
9
  if (!s.ok()) {
532
0
    LOG(FATAL) << Substitute("Unable to start 'shell' mode master at index $0, due to error $1.",
533
0
                             add_new_master_at_, s.ToString());
534
0
  }
535
536
9
  add_new_master_at_++;
537
9
  *new_master = master;
538
9
}
539
540
17
Status ExternalMiniCluster::CheckPortAndMasterSizes() const {
541
17
  if (opts_.num_masters != masters_.size() ||
542
17
      opts_.num_masters != opts_.master_rpc_ports.size()) {
543
0
    string fatal_err_msg = Substitute(
544
0
        "Mismatch number of masters in options $0, compared to masters vector $1 or rpc ports $2",
545
0
        opts_.num_masters, masters_.size(), opts_.master_rpc_ports.size());
546
0
    LOG(FATAL) << fatal_err_msg;
547
0
  }
548
549
17
  return Status::OK();
550
17
}
551
552
10
Status ExternalMiniCluster::AddMaster(ExternalMaster* master) {
553
10
  auto iter = std::find_if(masters_.begin(), masters_.end(), MasterComparator(master));
554
555
10
  if (iter != masters_.end()) {
556
0
    return STATUS(InvalidArgument, Substitute(
557
0
        "Master to be added '$0' already found in existing list of $1 masters.",
558
0
        master->bound_rpc_hostport().ToString(), opts_.num_masters));
559
0
  }
560
561
10
  RETURN_NOT_OK(opts_.AddPort(master->bound_rpc_hostport().port()));
562
10
  masters_.push_back(master);
563
564
10
  RETURN_NOT_OK(CheckPortAndMasterSizes());
565
566
10
  return Status::OK();
567
10
}
568
569
7
Status ExternalMiniCluster::RemoveMaster(ExternalMaster* master) {
570
7
  auto iter = std::find_if(masters_.begin(), masters_.end(), MasterComparator(master));
571
572
7
  if (iter == masters_.end()) {
573
0
    return STATUS(InvalidArgument, Substitute(
574
0
        "Master to be removed '$0' not found in existing list of $1 masters.",
575
0
        master->bound_rpc_hostport().ToString(), opts_.num_masters));
576
0
  }
577
578
7
  RETURN_NOT_OK(opts_.RemovePort(master->bound_rpc_hostport().port()));
579
7
  masters_.erase(iter);
580
581
7
  RETURN_NOT_OK(CheckPortAndMasterSizes());
582
583
7
  return Status::OK();
584
7
}
585
586
0
ConsensusServiceProxy ExternalMiniCluster::GetLeaderConsensusProxy() {
587
0
  return GetConsensusProxy(GetLeaderMaster());
588
0
}
589
590
155
ConsensusServiceProxy ExternalMiniCluster::GetConsensusProxy(ExternalDaemon* external_deamon) {
591
155
  return GetProxy<ConsensusServiceProxy>(external_deamon);
592
155
}
593
594
7
Status ExternalMiniCluster::StepDownMasterLeader(TabletServerErrorPB::Code* error_code) {
595
7
  ExternalMaster* leader = GetLeaderMaster();
596
7
  string leader_uuid = leader->uuid();
597
7
  auto host_port = leader->bound_rpc_addr();
598
7
  LeaderStepDownRequestPB lsd_req;
599
7
  lsd_req.set_tablet_id(yb::master::kSysCatalogTabletId);
600
7
  lsd_req.set_dest_uuid(leader_uuid);
601
7
  LeaderStepDownResponsePB lsd_resp;
602
7
  RpcController lsd_rpc;
603
7
  lsd_rpc.set_timeout(opts_.timeout);
604
7
  ConsensusServiceProxy proxy(proxy_cache_.get(), host_port);
605
7
  RETURN_NOT_OK(proxy.LeaderStepDown(lsd_req, &lsd_resp, &lsd_rpc));
606
7
  if (lsd_resp.has_error()) {
607
1
    LOG(ERROR) << "LeaderStepDown for " << leader_uuid << " received error "
608
1
               << lsd_resp.error().ShortDebugString();
609
1
    *error_code = lsd_resp.error().code();
610
1
    return StatusFromPB(lsd_resp.error().status());
611
1
  }
612
613
6
  LOG(INFO) << "Leader at host/port '" << host_port << "' step down complete.";
614
615
6
  return Status::OK();
616
6
}
617
618
4
Status ExternalMiniCluster::StepDownMasterLeaderAndWaitForNewLeader() {
619
4
  ExternalMaster* leader = GetLeaderMaster();
620
4
  string old_leader_uuid = leader->uuid();
621
4
  string leader_uuid = old_leader_uuid;
622
4
  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
623
4
  LOG(INFO) << "Starting step down of leader " << leader->bound_rpc_addr();
624
625
  // while loop will not be needed once JIRA ENG-49 is fixed.
626
4
  int iter = 1;
627
9
  while (leader_uuid == old_leader_uuid) {
628
5
    Status s = StepDownMasterLeader(&error_code);
629
    // If step down hits any error except not-ready, exit.
630
5
    if (!s.ok() && error_code != TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN) {
631
0
      return s;
632
0
    }
633
5
    sleep(3);  // TODO: add wait for election api.
634
5
    leader = GetLeaderMaster();
635
5
    leader_uuid = leader->uuid();
636
5
    LOG(INFO) << "Got new leader " << leader->bound_rpc_addr() << ", iter=" << iter;
637
5
    iter++;
638
5
  }
639
640
4
  return Status::OK();
641
4
}
642
643
Status ExternalMiniCluster::ChangeConfig(ExternalMaster* master,
644
                                         ChangeConfigType type,
645
                                         consensus::PeerMemberType member_type,
646
17
                                         bool use_hostport) {
647
17
  if (type != consensus::ADD_SERVER && type != consensus::REMOVE_SERVER) {
648
0
    return STATUS(InvalidArgument, Substitute("Invalid Change Config type $0", type));
649
0
  }
650
651
17
  ChangeConfigRequestPB req;
652
17
  ChangeConfigResponsePB resp;
653
17
  rpc::RpcController rpc;
654
17
  rpc.set_timeout(opts_.timeout);
655
656
17
  RaftPeerPB peer_pb;
657
16
  peer_pb.set_permanent_uuid(use_hostport ? "" : master->uuid());
658
17
  if (type == consensus::ADD_SERVER) {
659
10
    peer_pb.set_member_type(member_type);
660
10
  }
661
17
  HostPortToPB(master->bound_rpc_hostport(), peer_pb.mutable_last_known_private_addr()->Add());
662
17
  req.set_tablet_id(yb::master::kSysCatalogTabletId);
663
17
  req.set_type(type);
664
17
  req.set_use_host(use_hostport);
665
17
  *req.mutable_server() = peer_pb;
666
667
  // There could be timing window where we found the leader host/port, but an election in the
668
  // meantime could have made it step down. So we retry till we hit the leader correctly.
669
17
  int num_attempts = 1;
670
19
  while (true) {
671
19
    ExternalMaster* leader = GetLeaderMaster();
672
19
    auto leader_proxy = std::make_unique<ConsensusServiceProxy>(
673
19
        proxy_cache_.get(), leader->bound_rpc_addr());
674
19
    string leader_uuid = leader->uuid();
675
676
19
    if (type == consensus::REMOVE_SERVER && leader_uuid == req.server().permanent_uuid()) {
677
3
      RETURN_NOT_OK(StepDownMasterLeaderAndWaitForNewLeader());
678
3
      leader = GetLeaderMaster();
679
3
      leader_uuid = leader->uuid();
680
3
      leader_proxy.reset(new ConsensusServiceProxy(proxy_cache_.get(), leader->bound_rpc_addr()));
681
3
    }
682
683
19
    req.set_dest_uuid(leader_uuid);
684
19
    RETURN_NOT_OK(leader_proxy->ChangeConfig(req, &resp, &rpc));
685
19
    if (resp.has_error()) {
686
2
      if (resp.error().code() != TabletServerErrorPB::NOT_THE_LEADER &&
687
2
          resp.error().code() != TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG) {
688
0
        return STATUS(RuntimeError, Substitute("Change Config RPC to leader hit error: $0",
689
0
                                               resp.error().ShortDebugString()));
690
0
      }
691
17
    } else {
692
17
      break;
693
17
    }
694
695
    // Need to retry as we come here with NOT_THE_LEADER.
696
2
    if (num_attempts >= kMaxRetryIterations) {
697
0
      return STATUS(IllegalState,
698
0
                    Substitute("Failed to complete ChangeConfig request '$0' even after maximum "
699
0
                               "number of attempts. Last error '$1'",
700
0
                               req.ShortDebugString(), resp.error().ShortDebugString()));
701
0
    }
702
703
2
    SleepFor(MonoDelta::FromSeconds(1));
704
705
2
    LOG(INFO) << "Resp error '" << resp.error().ShortDebugString() << "', num=" << num_attempts
706
2
              << ", retrying...";
707
708
2
    rpc.Reset();
709
2
    num_attempts++;
710
2
  }
711
712
17
  LOG(INFO) << "Master " << master->bound_rpc_hostport().ToString() << ", change type "
713
17
            << type << " to " << masters_.size() << " masters.";
714
715
17
  if (type == consensus::ADD_SERVER) {
716
10
    return AddMaster(master);
717
7
  } else if (type == consensus::REMOVE_SERVER) {
718
7
    return RemoveMaster(master);
719
7
  }
720
721
0
  string err_msg = Substitute("Should not reach here - change type $0", type);
722
723
0
  LOG(FATAL) << err_msg;
724
725
  // Satisfy the compiler with a return from here
726
0
  return STATUS(RuntimeError, err_msg);
727
0
}
728
729
// We look for the exact master match. Since it is possible to stop/restart master on
730
// a given host/port, we do not want a stale master pointer input to match a newer master.
731
37
int ExternalMiniCluster::GetIndexOfMaster(ExternalMaster* master) const {
732
73
  for (size_t i = 0; i < masters_.size(); i++) {
733
73
    if (masters_[i].get() == master) {
734
37
      return narrow_cast<int>(i);
735
37
    }
736
73
  }
737
0
  return -1;
738
37
}
739
740
1
Status ExternalMiniCluster::PingMaster(ExternalMaster* master) const {
741
1
  int index = GetIndexOfMaster(master);
742
1
  server::PingRequestPB req;
743
1
  server::PingResponsePB resp;
744
1
  std::shared_ptr<server::GenericServiceProxy> proxy =
745
1
      index == -1 ? master_generic_proxy(master->bound_rpc_addr()) : master_generic_proxy(index);
746
1
  rpc::RpcController rpc;
747
1
  rpc.set_timeout(opts_.timeout);
748
1
  return proxy->Ping(req, &resp, &rpc);
749
1
}
750
751
Status ExternalMiniCluster::AddTServerToBlacklist(
752
    ExternalMaster* master,
753
5
    ExternalTabletServer* ts) {
754
5
  GetMasterClusterConfigRequestPB config_req;
755
5
  GetMasterClusterConfigResponsePB config_resp;
756
5
  int index = GetIndexOfMaster(master);
757
758
5
  if (index == -1) {
759
0
    return STATUS(InvalidArgument, Substitute(
760
0
        "Given master '$0' not in the current list of $1 masters.",
761
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
762
0
  }
763
764
5
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
765
5
  rpc::RpcController rpc;
766
5
  rpc.set_timeout(opts_.timeout);
767
5
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
768
5
  if (config_resp.has_error()) {
769
0
    return STATUS(RuntimeError, Substitute(
770
0
        "GetMasterClusterConfig RPC response hit error: $0",
771
0
        config_resp.error().ShortDebugString()));
772
0
  }
773
  // Get current config
774
5
  ChangeMasterClusterConfigRequestPB change_req;
775
5
  SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config();
776
  // add tserver to blacklist
777
5
  HostPortToPB(ts->bound_rpc_hostport(), config.mutable_server_blacklist()->mutable_hosts()->Add());
778
5
  *change_req.mutable_cluster_config() = config;
779
5
  ChangeMasterClusterConfigResponsePB change_resp;
780
5
  rpc.Reset();
781
5
  RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc));
782
5
  if (change_resp.has_error()) {
783
0
    return STATUS(RuntimeError, Substitute(
784
0
        "ChangeMasterClusterConfig RPC response hit error: $0",
785
0
        change_resp.error().ShortDebugString()));
786
0
  }
787
788
5
  LOG(INFO) << "TServer at " << ts->bound_rpc_hostport().ToString()
789
5
  << " was added to the blacklist";
790
791
5
  return Status::OK();
792
5
}
793
794
Status ExternalMiniCluster::GetMinReplicaCountForPlacementBlock(
795
    ExternalMaster* master,
796
    const string& cloud, const string& region, const string& zone,
797
1
    int* min_num_replicas) {
798
1
  GetMasterClusterConfigRequestPB config_req;
799
1
  GetMasterClusterConfigResponsePB config_resp;
800
1
  int index = GetIndexOfMaster(master);
801
802
1
  if (index == -1) {
803
0
    return STATUS(InvalidArgument, Substitute(
804
0
        "Given master '$0' not in the current list of $1 masters.",
805
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
806
0
  }
807
808
1
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
809
1
  rpc::RpcController rpc;
810
1
  rpc.set_timeout(opts_.timeout);
811
1
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
812
1
  if (config_resp.has_error()) {
813
0
    return STATUS(RuntimeError, Substitute(
814
0
        "GetMasterClusterConfig RPC response hit error: $0",
815
0
        config_resp.error().ShortDebugString()));
816
0
  }
817
1
  const SysClusterConfigEntryPB& config = config_resp.cluster_config();
818
819
1
  if (!config.has_replication_info() || !config.replication_info().has_live_replicas()) {
820
0
    return STATUS(InvalidArgument, Substitute(
821
0
        "Given placement block '$0.$1.$2' not in the current list of placement blocks.",
822
0
        cloud, region, zone));
823
0
  }
824
825
1
  const master::PlacementInfoPB& pi = config.replication_info().live_replicas();
826
827
1
  int found_index = -1;
828
1
  bool found = false;
829
1
  for (int i = 0; i < pi.placement_blocks_size(); i++) {
830
1
    if (!pi.placement_blocks(i).has_cloud_info()) {
831
0
      continue;
832
0
    }
833
834
1
    bool is_cloud_same = false, is_region_same = false, is_zone_same = false;
835
836
1
    if (pi.placement_blocks(i).cloud_info().has_placement_cloud() && cloud != "") {
837
1
      is_cloud_same = pi.placement_blocks(i).cloud_info().placement_cloud() == cloud;
838
0
    } else if (!pi.placement_blocks(i).cloud_info().has_placement_cloud() && cloud == "") {
839
0
      is_cloud_same = true;
840
0
    }
841
842
1
    if (pi.placement_blocks(i).cloud_info().has_placement_region() && region != "") {
843
1
      is_region_same = pi.placement_blocks(i).cloud_info().placement_region() == region;
844
0
    } else if (!pi.placement_blocks(i).cloud_info().has_placement_region() && region == "") {
845
0
      is_region_same = true;
846
0
    }
847
848
1
    if (pi.placement_blocks(i).cloud_info().has_placement_zone() && zone != "") {
849
0
      is_zone_same = pi.placement_blocks(i).cloud_info().placement_zone() == zone;
850
1
    } else if (!pi.placement_blocks(i).cloud_info().has_placement_zone() && zone == "") {
851
1
      is_zone_same = true;
852
1
    }
853
854
1
    if (is_cloud_same && is_region_same && is_zone_same) {
855
1
      found = true;
856
1
      found_index = i;
857
1
      break;
858
1
    }
859
1
  }
860
861
1
  if (!found || !pi.placement_blocks(found_index).has_min_num_replicas()) {
862
0
    return STATUS(InvalidArgument, Substitute(
863
0
        "Given placement block '$0.$1.$2' not in the current list of placement blocks.",
864
0
        cloud, region, zone));
865
0
  }
866
867
1
  *min_num_replicas = pi.placement_blocks(found_index).min_num_replicas();
868
1
  return Status::OK();
869
1
}
870
871
Status ExternalMiniCluster::AddTServerToLeaderBlacklist(
872
    ExternalMaster* master,
873
3
    ExternalTabletServer* ts) {
874
3
  GetMasterClusterConfigRequestPB config_req;
875
3
  GetMasterClusterConfigResponsePB config_resp;
876
3
  int index = GetIndexOfMaster(master);
877
878
3
  if (index == -1) {
879
0
    return STATUS(InvalidArgument, Substitute(
880
0
        "Given master '$0' not in the current list of $1 masters.",
881
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
882
0
  }
883
884
3
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
885
3
  rpc::RpcController rpc;
886
3
  rpc.set_timeout(opts_.timeout);
887
3
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
888
3
  if (config_resp.has_error()) {
889
0
    return STATUS(RuntimeError, Substitute(
890
0
        "GetMasterClusterConfig RPC response hit error: $0",
891
0
        config_resp.error().ShortDebugString()));
892
0
  }
893
  // Get current config
894
3
  ChangeMasterClusterConfigRequestPB change_req;
895
3
  SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config();
896
  // add tserver to blacklist
897
3
  HostPortToPB(ts->bound_rpc_hostport(), config.mutable_leader_blacklist()->mutable_hosts()->Add());
898
3
  *change_req.mutable_cluster_config() = config;
899
3
  ChangeMasterClusterConfigResponsePB change_resp;
900
3
  rpc.Reset();
901
3
  RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc));
902
3
  if (change_resp.has_error()) {
903
0
    return STATUS(RuntimeError, Substitute(
904
0
        "ChangeMasterClusterConfig RPC response hit error: $0",
905
0
        change_resp.error().ShortDebugString()));
906
0
  }
907
908
3
  LOG(INFO) << "TServer at " << ts->bound_rpc_hostport().ToString()
909
3
  << " was added to the leader blacklist";
910
911
3
  return Status::OK();
912
3
}
913
914
Status ExternalMiniCluster::ClearBlacklist(
915
3
    ExternalMaster* master) {
916
3
  GetMasterClusterConfigRequestPB config_req;
917
3
  GetMasterClusterConfigResponsePB config_resp;
918
3
  int index = GetIndexOfMaster(master);
919
920
3
  if (index == -1) {
921
0
    return STATUS(InvalidArgument, Substitute(
922
0
        "Given master '$0' not in the current list of $1 masters.",
923
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
924
0
  }
925
926
3
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
927
3
  rpc::RpcController rpc;
928
3
  rpc.set_timeout(opts_.timeout);
929
3
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
930
3
  if (config_resp.has_error()) {
931
0
    return STATUS(RuntimeError, Substitute(
932
0
        "GetMasterClusterConfig RPC response hit error: $0",
933
0
        config_resp.error().ShortDebugString()));
934
0
  }
935
  // Get current config.
936
3
  ChangeMasterClusterConfigRequestPB change_req;
937
3
  SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config();
938
  // Clear blacklist.
939
3
  config.mutable_server_blacklist()->mutable_hosts()->Clear();
940
3
  config.mutable_leader_blacklist()->mutable_hosts()->Clear();
941
3
  *change_req.mutable_cluster_config() = config;
942
3
  ChangeMasterClusterConfigResponsePB change_resp;
943
3
  rpc.Reset();
944
3
  RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc));
945
3
  if (change_resp.has_error()) {
946
0
    return STATUS(RuntimeError, Substitute(
947
0
        "ChangeMasterClusterConfig RPC response hit error: $0",
948
0
        change_resp.error().ShortDebugString()));
949
0
  }
950
951
3
  LOG(INFO) << "Blacklist cleared successfully";
952
953
3
  return Status::OK();
954
3
}
955
956
23
Status ExternalMiniCluster::GetNumMastersAsSeenBy(ExternalMaster* master, int* num_peers) {
957
23
  ListMastersRequestPB list_req;
958
23
  ListMastersResponsePB list_resp;
959
23
  int index = GetIndexOfMaster(master);
960
961
23
  if (index == -1) {
962
0
    return STATUS(InvalidArgument, Substitute(
963
0
        "Given master '$0' not in the current list of $1 masters.",
964
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
965
0
  }
966
967
23
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
968
23
  rpc::RpcController rpc;
969
23
  rpc.set_timeout(opts_.timeout);
970
23
  RETURN_NOT_OK(proxy.ListMasters(list_req, &list_resp, &rpc));
971
23
  if (list_resp.has_error()) {
972
0
    return STATUS(RuntimeError, Substitute(
973
0
        "List Masters RPC response hit error: $0", list_resp.error().ShortDebugString()));
974
0
  }
975
976
23
  LOG(INFO) << "List Masters for master at index " << index
977
23
            << " got " << list_resp.masters_size() << " peers";
978
979
23
  *num_peers = list_resp.masters_size();
980
981
23
  return Status::OK();
982
23
}
983
984
13
Status ExternalMiniCluster::WaitForLeaderCommitTermAdvance() {
985
13
  OpIdPB start_opid;
986
13
  RETURN_NOT_OK(GetLastOpIdForLeader(&start_opid));
987
13
  LOG(INFO) << "Start OPID : " << start_opid.ShortDebugString();
988
989
  // Need not do any wait if it is a restart case - so the commit term will be > 0.
990
13
  if (start_opid.term() != 0)
991
13
    return Status::OK();
992
993
0
  MonoTime now = MonoTime::Now();
994
0
  MonoTime deadline = now;
995
0
  deadline.AddDelta(opts_.timeout);
996
0
  auto opid = start_opid;
997
998
0
  for (int i = 1; now.ComesBefore(deadline); ++i) {
999
0
    if (opid.term() > start_opid.term()) {
1000
0
      LOG(INFO) << "Final OPID: " << opid.ShortDebugString() << " after "
1001
0
                << i << " iterations.";
1002
1003
0
      return Status::OK();
1004
0
    }
1005
0
    SleepFor(MonoDelta::FromMilliseconds(min(i, 10)));
1006
0
    RETURN_NOT_OK(GetLastOpIdForLeader(&opid));
1007
0
    now = MonoTime::Now();
1008
0
  }
1009
1010
0
  return STATUS(TimedOut, Substitute("Term did not advance from $0.", start_opid.term()));
1011
0
}
1012
1013
Status ExternalMiniCluster::GetLastOpIdForEachMasterPeer(
1014
    const MonoDelta& timeout,
1015
    consensus::OpIdType opid_type,
1016
44
    vector<OpIdPB>* op_ids) {
1017
44
  GetLastOpIdRequestPB opid_req;
1018
44
  GetLastOpIdResponsePB opid_resp;
1019
44
  opid_req.set_tablet_id(yb::master::kSysCatalogTabletId);
1020
44
  RpcController controller;
1021
44
  controller.set_timeout(timeout);
1022
1023
44
  op_ids->clear();
1024
154
  for (scoped_refptr<ExternalMaster> master : masters_) {
1025
154
    opid_req.set_dest_uuid(master->uuid());
1026
154
    opid_req.set_opid_type(opid_type);
1027
154
    RETURN_NOT_OK_PREPEND(
1028
154
        GetConsensusProxy(master.get()).GetLastOpId(opid_req, &opid_resp, &controller),
1029
154
        Substitute("Failed to fetch last op id from $0", master->bound_rpc_hostport().port()));
1030
154
    op_ids->push_back(opid_resp.opid());
1031
154
    controller.Reset();
1032
154
  }
1033
1034
44
  return Status::OK();
1035
44
}
1036
1037
24
Status ExternalMiniCluster::WaitForMastersToCommitUpTo(int64_t target_index) {
1038
24
  auto deadline = CoarseMonoClock::Now() + opts_.timeout.ToSteadyDuration();
1039
1040
44
  for (int i = 1;; i++) {
1041
44
    vector<OpIdPB> ids;
1042
44
    Status s = GetLastOpIdForEachMasterPeer(opts_.timeout, consensus::COMMITTED_OPID, &ids);
1043
1044
44
    if (s.ok()) {
1045
44
      bool any_behind = false;
1046
109
      for (const auto& id : ids) {
1047
109
        if (id.index() < target_index) {
1048
20
          any_behind = true;
1049
20
          break;
1050
20
        }
1051
109
      }
1052
44
      if (!any_behind) {
1053
24
        LOG(INFO) << "Committed up to " << target_index;
1054
24
        return Status::OK();
1055
24
      }
1056
0
    } else {
1057
0
      LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString();
1058
0
    }
1059
1060
20
    if (CoarseMonoClock::Now() >= deadline) {
1061
0
      if (!s.ok()) {
1062
0
        return s;
1063
0
      }
1064
1065
0
      return STATUS_FORMAT(TimedOut,
1066
0
                           "Index $0 not available on all replicas after $1. ",
1067
0
                           target_index,
1068
0
                           opts_.timeout);
1069
0
    }
1070
1071
20
    SleepFor(MonoDelta::FromMilliseconds(min(i * 100, 1000)));
1072
20
  }
1073
24
}
1074
1075
1
Status ExternalMiniCluster::GetIsMasterLeaderServiceReady(ExternalMaster* master) {
1076
1
  IsMasterLeaderReadyRequestPB req;
1077
1
  IsMasterLeaderReadyResponsePB resp;
1078
1
  int index = GetIndexOfMaster(master);
1079
1080
1
  if (index == -1) {
1081
0
    return STATUS(InvalidArgument, Substitute(
1082
0
        "Given master '$0' not in the current list of $1 masters.",
1083
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
1084
0
  }
1085
1086
1
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
1087
1
  rpc::RpcController rpc;
1088
1
  rpc.set_timeout(opts_.timeout);
1089
1
  RETURN_NOT_OK(proxy.IsMasterLeaderServiceReady(req, &resp, &rpc));
1090
1
  if (resp.has_error()) {
1091
0
    return STATUS(RuntimeError, Substitute(
1092
0
        "Is master ready RPC response hit error: $0", resp.error().ShortDebugString()));
1093
0
  }
1094
1095
1
  return Status::OK();
1096
1
}
1097
1098
42
Status ExternalMiniCluster::GetLastOpIdForLeader(OpIdPB* opid) {
1099
42
  ExternalMaster* leader = GetLeaderMaster();
1100
42
  auto leader_master_sock = leader->bound_rpc_addr();
1101
42
  std::shared_ptr<ConsensusServiceProxy> leader_proxy =
1102
42
    std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), leader_master_sock);
1103
1104
42
  RETURN_NOT_OK(itest::GetLastOpIdForMasterReplica(
1105
42
      leader_proxy,
1106
42
      yb::master::kSysCatalogTabletId,
1107
42
      leader->uuid(),
1108
42
      consensus::COMMITTED_OPID,
1109
42
      opts_.timeout,
1110
42
      opid));
1111
1112
42
  return Status::OK();
1113
42
}
1114
1115
26
string ExternalMiniCluster::GetMasterAddresses() const {
1116
26
  string peer_addrs = "";
1117
68
  for (size_t i = 0; i < opts_.num_masters; i++) {
1118
42
    if (!peer_addrs.empty()) {
1119
16
      peer_addrs += ",";
1120
16
    }
1121
42
    peer_addrs += MasterAddressForPort(opts_.master_rpc_ports[i]);
1122
42
  }
1123
26
  return peer_addrs;
1124
26
}
1125
1126
0
string ExternalMiniCluster::GetTabletServerAddresses() const {
1127
0
  string peer_addrs = "";
1128
0
  for (const auto& ts : tablet_servers_) {
1129
0
    if (!peer_addrs.empty()) {
1130
0
      peer_addrs += ",";
1131
0
    }
1132
0
    peer_addrs += HostPortToString(ts->bind_host(), ts->rpc_port());
1133
0
  }
1134
0
  return peer_addrs;
1135
0
}
1136
1137
0
string ExternalMiniCluster::GetTabletServerHTTPAddresses() const {
1138
0
  string peer_addrs = "";
1139
0
  for (const auto& ts : tablet_servers_) {
1140
0
    if (!peer_addrs.empty()) {
1141
0
      peer_addrs += ",";
1142
0
    }
1143
0
    peer_addrs += HostPortToString(ts->bind_host(), ts->http_port());
1144
0
  }
1145
0
  return peer_addrs;
1146
0
}
1147
1148
230
Status ExternalMiniCluster::StartMasters() {
1149
230
  auto num_masters = opts_.num_masters;
1150
1151
230
  if (opts_.master_rpc_ports.size() != num_masters) {
1152
0
    LOG(FATAL) << num_masters << " masters requested, but " <<
1153
0
        opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
1154
0
  }
1155
1156
312
  for (auto& port : opts_.master_rpc_ports) {
1157
312
    if (port == 0) {
1158
311
      port = AllocateFreePort();
1159
311
      LOG(INFO) << "Using an auto-assigned port " << port
1160
311
                << " to start an external mini-cluster master";
1161
311
    }
1162
312
  }
1163
1164
230
  vector<string> peer_addrs;
1165
542
  for (size_t i = 0; i < num_masters; i++) {
1166
312
    string addr = MasterAddressForPort(opts_.master_rpc_ports[i]);
1167
312
    peer_addrs.push_back(addr);
1168
312
  }
1169
230
  string peer_addrs_str = JoinStrings(peer_addrs, ",");
1170
230
  vector<string> flags = opts_.extra_master_flags;
1171
  // Disable WAL fsync for tests
1172
230
  flags.push_back("--durable_wal_write=false");
1173
230
  flags.push_back("--enable_leader_failure_detection=true");
1174
  // Limit number of transaction table tablets to help avoid timeouts.
1175
230
  int num_transaction_table_tablets = NumTabletsPerTransactionTable(opts_);
1176
230
  flags.push_back(Substitute("--transaction_table_num_tablets=$0", num_transaction_table_tablets));
1177
  // For sanitizer builds, it is easy to overload the master, leading to quorum changes.
1178
  // This could end up breaking ever trivial DDLs like creating an initial table in the cluster.
1179
230
  if (IsSanitizer()) {
1180
0
    flags.push_back("--leader_failure_max_missed_heartbeat_periods=10");
1181
0
  }
1182
230
  if (opts_.enable_ysql) {
1183
5
    flags.push_back("--enable_ysql=true");
1184
5
    flags.push_back("--master_auto_run_initdb");
1185
225
  } else {
1186
225
    flags.push_back("--enable_ysql=false");
1187
225
  }
1188
230
  string exe = GetBinaryPath(kMasterBinaryName);
1189
1190
  // Start the masters.
1191
542
  for (size_t i = 0; i < num_masters; i++) {
1192
312
    uint16_t http_port = AllocateFreePort();
1193
312
    scoped_refptr<ExternalMaster> peer =
1194
312
      new ExternalMaster(
1195
312
        i,
1196
312
        messenger_,
1197
312
        proxy_cache_.get(),
1198
312
        exe,
1199
312
        GetDataPath(Substitute("master-$0", i)),
1200
312
        SubstituteInFlags(flags, i),
1201
312
        peer_addrs[i],
1202
312
        http_port,
1203
312
        peer_addrs_str);
1204
312
    RETURN_NOT_OK_PREPEND(peer->Start(),
1205
312
                          Substitute("Unable to start Master at index $0", i));
1206
312
    masters_.push_back(peer);
1207
312
  }
1208
1209
230
  if (opts_.enable_ysql) {
1210
5
    RETURN_NOT_OK(WaitForInitDb());
1211
5
  }
1212
230
  return Status::OK();
1213
230
}
1214
1215
5
Status ExternalMiniCluster::WaitForInitDb() {
1216
5
  const auto start_time = std::chrono::steady_clock::now();
1217
5
  const auto kTimeout = NonTsanVsTsan(1200s, 1800s);
1218
5
  int num_timeouts = 0;
1219
5
  const int kMaxTimeouts = 10;
1220
66
  while (true) {
1221
157
    for (size_t i = 0; i < opts_.num_masters; i++) {
1222
96
      auto elapsed_time = std::chrono::steady_clock::now() - start_time;
1223
96
      if (elapsed_time > kTimeout) {
1224
0
        return STATUS_FORMAT(
1225
0
            TimedOut,
1226
0
            "Timed out while waiting for initdb to complete: elapsed time is $0, timeout is $1",
1227
0
            elapsed_time, kTimeout);
1228
0
      }
1229
96
      auto proxy = GetMasterProxy<master::MasterAdminProxy>(i);
1230
96
      rpc::RpcController rpc;
1231
96
      rpc.set_timeout(opts_.timeout);
1232
96
      IsInitDbDoneRequestPB req;
1233
96
      IsInitDbDoneResponsePB resp;
1234
96
      Status status = proxy.IsInitDbDone(req, &resp, &rpc);
1235
96
      if (status.IsTimedOut()) {
1236
0
        num_timeouts++;
1237
0
        LOG(WARNING) << status << " (seen " << num_timeouts << " timeouts so far)";
1238
0
        if (num_timeouts == kMaxTimeouts) {
1239
0
          LOG(ERROR) << "Reached " << kMaxTimeouts << " timeouts: " << status;
1240
0
          return status;
1241
0
        }
1242
0
        continue;
1243
0
      }
1244
96
      if (resp.has_error() &&
1245
91
          resp.error().code() != master::MasterErrorPB::NOT_THE_LEADER) {
1246
1247
0
        return STATUS(RuntimeError, Substitute(
1248
0
            "IsInitDbDone RPC response hit error: $0",
1249
0
            resp.error().ShortDebugString()));
1250
0
      }
1251
96
      if (resp.done()) {
1252
5
        if (resp.has_initdb_error() && !resp.initdb_error().empty()) {
1253
0
          LOG(ERROR) << "master reported an initdb error: " << resp.initdb_error();
1254
0
          return STATUS(RuntimeError, "initdb failed: " + resp.initdb_error());
1255
0
        }
1256
5
        LOG(INFO) << "master indicated that initdb is done";
1257
5
        return Status::OK();
1258
5
      }
1259
96
    }
1260
61
    std::this_thread::sleep_for(500ms);
1261
61
  }
1262
5
}
1263
1264
41
Result<bool> ExternalMiniCluster::is_ts_stale(int ts_idx, MonoDelta deadline) {
1265
41
  auto proxy = GetMasterProxy<master::MasterClusterProxy>();
1266
41
  std::shared_ptr<rpc::RpcController> controller = std::make_shared<rpc::RpcController>();
1267
41
  master::ListTabletServersRequestPB req;
1268
41
  master::ListTabletServersResponsePB resp;
1269
41
  controller->Reset();
1270
41
  controller->set_timeout(deadline);
1271
1272
41
  RETURN_NOT_OK(proxy.ListTabletServers(req, &resp, controller.get()));
1273
1274
41
  bool is_stale = false, is_ts_found = false;
1275
177
  for (int i = 0; i < resp.servers_size(); i++) {
1276
136
    if (!resp.servers(i).has_instance_id()) {
1277
0
      return STATUS_SUBSTITUTE(
1278
0
        Uninitialized,
1279
0
        "ListTabletServers RPC returned a TS with uninitialized instance id."
1280
0
      );
1281
0
    }
1282
1283
136
    if (!resp.servers(i).instance_id().has_permanent_uuid()) {
1284
0
      return STATUS_SUBSTITUTE(
1285
0
        Uninitialized,
1286
0
        "ListTabletServers RPC returned a TS with uninitialized UUIDs."
1287
0
      );
1288
0
    }
1289
1290
136
    if (resp.servers(i).instance_id().permanent_uuid() == tablet_server(ts_idx)->uuid()) {
1291
41
      is_ts_found = true;
1292
41
      is_stale = !resp.servers(i).alive();
1293
41
    }
1294
136
  }
1295
1296
41
  if (!is_ts_found) {
1297
0
    return STATUS_SUBSTITUTE(
1298
0
        NotFound,
1299
0
        "Given TS not found in ListTabletServers RPC."
1300
0
    );
1301
0
  }
1302
41
  return is_stale;
1303
41
}
1304
1305
2
CHECKED_STATUS ExternalMiniCluster::WaitForMasterToMarkTSAlive(int ts_idx, MonoDelta deadline) {
1306
2
  RETURN_NOT_OK(WaitFor([&]() -> Result<bool> {
1307
2
    return !VERIFY_RESULT(is_ts_stale(ts_idx));
1308
2
  }, deadline * kTimeMultiplier, "Is TS Alive", 1s));
1309
1310
2
  return Status::OK();
1311
2
}
1312
1313
4
CHECKED_STATUS ExternalMiniCluster::WaitForMasterToMarkTSDead(int ts_idx, MonoDelta deadline) {
1314
4
  RETURN_NOT_OK(WaitFor([&]() -> Result<bool> {
1315
4
    return is_ts_stale(ts_idx);
1316
4
  }, deadline * kTimeMultiplier, "Is TS dead", 1s));
1317
1318
4
  return Status::OK();
1319
4
}
1320
1321
678
string ExternalMiniCluster::GetBindIpForTabletServer(size_t index) const {
1322
678
  if (opts_.use_even_ips) {
1323
0
    return Substitute("127.0.0.$0", (index + 1) * 2);
1324
678
  } else if (opts_.bind_to_unique_loopback_addresses) {
1325
159
#if defined(__APPLE__)
1326
159
    return Substitute("127.0.0.$0", index + 1); // Use default 127.0.0.x IPs.
1327
#else
1328
    const pid_t p = getpid();
1329
    return Substitute("127.$0.$1.$2", (p >> 8) & 0xff, p & 0xff, index);
1330
#endif
1331
519
  } else {
1332
519
    return "127.0.0.1";
1333
519
  }
1334
678
}
1335
1336
Status ExternalMiniCluster::AddTabletServer(
1337
679
    bool start_cql_proxy, const std::vector<std::string>& extra_flags, int num_drives) {
1338
0
  CHECK(GetLeaderMaster() != nullptr)
1339
0
      << "Must have started at least 1 master before adding tablet servers";
1340
1341
679
  size_t idx = tablet_servers_.size();
1342
1343
679
  string exe = GetBinaryPath(kTabletServerBinaryName);
1344
679
  vector<HostPort> master_hostports;
1345
1.52k
  for (size_t i = 0; i < num_masters(); i++) {
1346
845
    master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport());
1347
845
  }
1348
1349
679
  uint16_t ts_rpc_port = 0;
1350
679
  uint16_t ts_http_port = 0;
1351
679
  uint16_t redis_rpc_port = 0;
1352
679
  uint16_t redis_http_port = 0;
1353
679
  uint16_t cql_rpc_port = 0;
1354
679
  uint16_t cql_http_port = 0;
1355
679
  uint16_t pgsql_rpc_port = 0;
1356
679
  uint16_t pgsql_http_port = 0;
1357
1358
679
  if (idx > 0 && opts_.use_same_ts_ports && opts_.bind_to_unique_loopback_addresses) {
1359
106
    const scoped_refptr<ExternalTabletServer>& first_ts = tablet_servers_[0];
1360
106
    ts_rpc_port = first_ts->rpc_port();
1361
106
    ts_http_port = first_ts->http_port();
1362
106
    redis_rpc_port = first_ts->redis_rpc_port();
1363
106
    redis_http_port = first_ts->redis_http_port();
1364
106
    cql_rpc_port = first_ts->cql_rpc_port();
1365
106
    cql_http_port = first_ts->cql_http_port();
1366
106
    pgsql_rpc_port = first_ts->pgsql_rpc_port();
1367
106
    pgsql_http_port = first_ts->pgsql_http_port();
1368
573
  } else {
1369
573
    ts_rpc_port = AllocateFreePort();
1370
573
    ts_http_port = AllocateFreePort();
1371
573
    redis_rpc_port = AllocateFreePort();
1372
573
    redis_http_port = AllocateFreePort();
1373
573
    cql_rpc_port = AllocateFreePort();
1374
573
    cql_http_port = AllocateFreePort();
1375
573
    pgsql_rpc_port = AllocateFreePort();
1376
573
    pgsql_http_port = AllocateFreePort();
1377
573
  }
1378
1379
679
  vector<string> flags = opts_.extra_tserver_flags;
1380
679
  if (opts_.enable_ysql) {
1381
18
    flags.push_back("--enable_ysql=true");
1382
661
  } else {
1383
661
    flags.push_back("--enable_ysql=false");
1384
661
  }
1385
679
  flags.insert(flags.end(), extra_flags.begin(), extra_flags.end());
1386
1387
679
  if (num_drives < 0) {
1388
675
    num_drives = opts_.num_drives;
1389
675
  }
1390
1391
679
  scoped_refptr<ExternalTabletServer> ts = new ExternalTabletServer(
1392
679
      idx, messenger_, proxy_cache_.get(), exe, GetDataPath(Substitute("ts-$0", idx + 1)),
1393
679
      num_drives, GetBindIpForTabletServer(idx), ts_rpc_port, ts_http_port, redis_rpc_port,
1394
679
      redis_http_port, cql_rpc_port, cql_http_port, pgsql_rpc_port, pgsql_http_port,
1395
679
      master_hostports, SubstituteInFlags(flags, idx));
1396
679
  RETURN_NOT_OK(ts->Start(start_cql_proxy));
1397
678
  tablet_servers_.push_back(ts);
1398
678
  return Status::OK();
1399
679
}
1400
1401
288
Status ExternalMiniCluster::WaitForTabletServerCount(size_t count, const MonoDelta& timeout) {
1402
288
  MonoTime deadline = MonoTime::Now();
1403
288
  deadline.AddDelta(timeout);
1404
1405
288
  std::vector<scoped_refptr<ExternalTabletServer>> last_unmatched = tablet_servers_;
1406
288
  bool had_leader = false;
1407
1408
40.1k
  while (true) {
1409
40.1k
    MonoDelta remaining = deadline - MonoTime::Now();
1410
40.1k
    if (remaining.ToSeconds() < 0) {
1411
1
      std::vector<std::string> unmatched_uuids;
1412
1
      unmatched_uuids.reserve(last_unmatched.size());
1413
1
      for (const auto& server : last_unmatched) {
1414
1
        unmatched_uuids.push_back(server->instance_id().permanent_uuid());
1415
1
      }
1416
1
      if (!had_leader) {
1417
1
        return STATUS(TimedOut, "Does not have active master leader to check tserver registration");
1418
1
      }
1419
0
      return STATUS_FORMAT(TimedOut, "$0 TS(s) never registered with master (not registered $1)",
1420
0
                           count, unmatched_uuids);
1421
0
    }
1422
1423
    // We should give some time for RPC to proceed, otherwise all requests would fail.
1424
40.1k
    remaining = std::max<MonoDelta>(remaining, 250ms);
1425
1426
40.1k
    last_unmatched = tablet_servers_;
1427
40.1k
    had_leader = false;
1428
80.4k
    for (size_t i = 0; i < masters_.size(); i++) {
1429
40.6k
      master::ListTabletServersRequestPB req;
1430
40.6k
      master::ListTabletServersResponsePB resp;
1431
40.6k
      rpc::RpcController rpc;
1432
40.6k
      rpc.set_timeout(remaining);
1433
40.6k
      auto status = GetMasterProxy<master::MasterClusterProxy>(i).ListTabletServers(
1434
40.6k
          req, &resp, &rpc);
1435
26.2k
      LOG_IF(WARNING, !status.ok()) << "ListTabletServers failed: " << status;
1436
40.6k
      if (!status.ok() || resp.has_error()) {
1437
38.0k
        continue;
1438
38.0k
      }
1439
2.55k
      had_leader = true;
1440
      // ListTabletServers() may return servers that are no longer online.
1441
      // Do a second step of verification to verify that the descs that we got
1442
      // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
1443
2.55k
      size_t match_count = 0;
1444
3.52k
      for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
1445
10.6k
        for (auto it = last_unmatched.begin(); it != last_unmatched.end(); ++it) {
1446
10.6k
          if ((**it).instance_id().permanent_uuid() == e.instance_id().permanent_uuid() &&
1447
3.52k
              (**it).instance_id().instance_seqno() == e.instance_id().instance_seqno()) {
1448
3.52k
            match_count++;
1449
3.52k
            last_unmatched.erase(it);
1450
3.52k
            break;
1451
3.52k
          }
1452
10.6k
        }
1453
3.52k
      }
1454
2.55k
      if (match_count >= count) {
1455
287
        LOG(INFO) << count << " TS(s) registered with Master";
1456
287
        return Status::OK();
1457
287
      }
1458
2.55k
    }
1459
39.8k
    SleepFor(MonoDelta::FromMilliseconds(1));
1460
39.8k
  }
1461
288
}
1462
1463
89
void ExternalMiniCluster::AssertNoCrashes() {
1464
89
  vector<ExternalDaemon*> daemons = this->daemons();
1465
329
  for (ExternalDaemon* d : daemons) {
1466
329
    if (d->IsShutdown()) continue;
1467
658
    EXPECT_TRUE(d->IsProcessAlive()) << "At least one process crashed. viz: "
1468
658
                                     << d->id();
1469
329
  }
1470
89
}
1471
1472
Result<std::vector<ListTabletsForTabletServerResponsePB::Entry>> ExternalMiniCluster::GetTablets(
1473
892
    ExternalTabletServer* ts) {
1474
892
  TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr());
1475
892
  ListTabletsForTabletServerRequestPB req;
1476
892
  ListTabletsForTabletServerResponsePB resp;
1477
1478
892
  rpc::RpcController rpc;
1479
892
  rpc.set_timeout(10s * kTimeMultiplier);
1480
892
  RETURN_NOT_OK(proxy.ListTabletsForTabletServer(req, &resp, &rpc));
1481
1482
892
  std::vector<ListTabletsForTabletServerResponsePB::Entry> result;
1483
3.77k
  for (const ListTabletsForTabletServerResponsePB::Entry& entry : resp.entries()) {
1484
3.77k
    result.push_back(entry);
1485
3.77k
  }
1486
1487
892
  return result;
1488
892
}
1489
1490
Result<tserver::GetSplitKeyResponsePB> ExternalMiniCluster::GetSplitKey(
1491
1
    const std::string& tablet_id) {
1492
3
  for (size_t i = 0; i < this->num_tablet_servers(); i++) {
1493
3
    auto tserver = this->tablet_server(i);
1494
3
    auto ts_service_proxy = std::make_unique<tserver::TabletServerServiceProxy>(
1495
3
        proxy_cache_.get(), tserver->bound_rpc_addr());
1496
3
    tserver::GetSplitKeyRequestPB req;
1497
3
    req.set_tablet_id(tablet_id);
1498
3
    rpc::RpcController controller;
1499
3
    controller.set_timeout(10s * kTimeMultiplier);
1500
3
    tserver::GetSplitKeyResponsePB resp;
1501
3
    RETURN_NOT_OK(ts_service_proxy->GetSplitKey(req, &resp, &controller));
1502
3
    if (!resp.has_error()) {
1503
1
      return resp;
1504
1
    }
1505
3
  }
1506
0
  return STATUS(IllegalState, "GetSplitKey failed on all TServers");
1507
1
}
1508
1509
Status ExternalMiniCluster::FlushTabletsOnSingleTServer(
1510
    ExternalTabletServer* ts, const std::vector<yb::TabletId> tablet_ids,
1511
5
    bool is_compaction) {
1512
5
  tserver::FlushTabletsRequestPB req;
1513
5
  tserver::FlushTabletsResponsePB resp;
1514
5
  rpc::RpcController controller;
1515
5
  controller.set_timeout(10s * kTimeMultiplier);
1516
1517
5
  req.set_dest_uuid(ts->uuid());
1518
0
  req.set_operation(is_compaction ? tserver::FlushTabletsRequestPB::COMPACT
1519
5
                                  : tserver::FlushTabletsRequestPB::FLUSH);
1520
5
  for (const auto& tablet_id : tablet_ids) {
1521
5
    req.add_tablet_ids(tablet_id);
1522
5
  }
1523
1524
5
  auto ts_admin_service_proxy = std::make_unique<tserver::TabletServerAdminServiceProxy>(
1525
5
    proxy_cache_.get(), ts->bound_rpc_addr());
1526
5
  return ts_admin_service_proxy->FlushTablets(req, &resp, &controller);
1527
5
}
1528
1529
3
Result<tserver::ListTabletsResponsePB> ExternalMiniCluster::ListTablets(ExternalTabletServer* ts) {
1530
3
  rpc::RpcController rpc;
1531
3
  ListTabletsRequestPB req;
1532
3
  ListTabletsResponsePB resp;
1533
3
  rpc.set_timeout(opts_.timeout);
1534
3
  TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr());
1535
3
  RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
1536
3
  return resp;
1537
3
}
1538
1539
3
Result<std::vector<std::string>> ExternalMiniCluster::GetTabletIds(ExternalTabletServer* ts) {
1540
3
  auto tablets = VERIFY_RESULT(GetTablets(ts));
1541
3
  std::vector<std::string> result;
1542
3
  for (const auto& tablet : tablets) {
1543
3
    result.push_back(tablet.tablet_id());
1544
3
  }
1545
3
  return result;
1546
3
}
1547
1548
Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
1549
25
                                                  const MonoDelta& timeout) {
1550
25
  TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr());
1551
25
  ListTabletsRequestPB req;
1552
25
  ListTabletsResponsePB resp;
1553
1554
25
  MonoTime deadline = MonoTime::Now();
1555
25
  deadline.AddDelta(timeout);
1556
25
  while (MonoTime::Now().ComesBefore(deadline)) {
1557
25
    rpc::RpcController rpc;
1558
25
    rpc.set_timeout(MonoDelta::FromSeconds(10));
1559
25
    RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
1560
25
    if (resp.has_error()) {
1561
0
      return StatusFromPB(resp.error().status());
1562
0
    }
1563
1564
25
    int num_not_running = 0;
1565
74
    for (const StatusAndSchemaPB& status : resp.status_and_schema()) {
1566
74
      if (status.tablet_status().state() != tablet::RUNNING) {
1567
0
        num_not_running++;
1568
0
      }
1569
74
    }
1570
1571
25
    if (num_not_running == 0) {
1572
25
      return Status::OK();
1573
25
    }
1574
1575
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1576
0
  }
1577
1578
0
  return STATUS(TimedOut, resp.DebugString());
1579
25
}
1580
1581
0
Status ExternalMiniCluster::WaitForTSToCrash(size_t index, const MonoDelta& timeout) {
1582
0
  ExternalTabletServer* ts = tablet_server(index);
1583
0
  return WaitForTSToCrash(ts, timeout);
1584
0
}
1585
1586
Status ExternalMiniCluster::WaitForTSToCrash(const ExternalTabletServer* ts,
1587
7
                                             const MonoDelta& timeout) {
1588
7
  MonoTime deadline = MonoTime::Now();
1589
7
  deadline.AddDelta(timeout);
1590
33
  while (MonoTime::Now().ComesBefore(deadline)) {
1591
33
    if (!ts->IsProcessAlive()) {
1592
7
      return Status::OK();
1593
7
    }
1594
26
    SleepFor(MonoDelta::FromMilliseconds(10));
1595
26
  }
1596
0
  return STATUS(TimedOut, Substitute("TS $0 did not crash!", ts->instance_id().permanent_uuid()));
1597
7
}
1598
1599
namespace {
1600
void LeaderMasterCallback(HostPort* dst_hostport,
1601
                          Synchronizer* sync,
1602
                          const Status& status,
1603
2.28k
                          const HostPort& result) {
1604
2.28k
  if (status.ok()) {
1605
2.28k
    *dst_hostport = result;
1606
2.28k
  }
1607
2.28k
  sync->StatusCB(status);
1608
2.28k
}
1609
}  // anonymous namespace
1610
1611
2
Result<size_t> ExternalMiniCluster::GetFirstNonLeaderMasterIndex() {
1612
2
  return GetPeerMasterIndex(false);
1613
2
}
1614
1615
2.38k
Result<size_t> ExternalMiniCluster::GetLeaderMasterIndex() {
1616
2.38k
  return GetPeerMasterIndex(true);
1617
2.38k
}
1618
1619
2.38k
Result<size_t> ExternalMiniCluster::GetPeerMasterIndex(bool is_leader) {
1620
2.38k
  Synchronizer sync;
1621
2.38k
  server::MasterAddresses addrs;
1622
2.38k
  HostPort leader_master_hp;
1623
2.38k
  auto deadline = CoarseMonoClock::Now() + 5s;
1624
1625
2.85k
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1626
2.85k
    if (master->IsProcessAlive()) {
1627
2.74k
      addrs.push_back({ master->bound_rpc_addr() });
1628
2.74k
    }
1629
2.85k
  }
1630
2.38k
  if (addrs.empty()) {
1631
99
    return STATUS(IllegalState, "No running masters");
1632
99
  }
1633
2.28k
  rpc::Rpcs rpcs;
1634
2.28k
  auto rpc = std::make_shared<GetLeaderMasterRpc>(
1635
2.28k
      Bind(&LeaderMasterCallback, &leader_master_hp, &sync),
1636
2.28k
      addrs,
1637
2.28k
      deadline,
1638
2.28k
      messenger_,
1639
2.28k
      proxy_cache_.get(),
1640
2.28k
      &rpcs);
1641
2.28k
  rpc->SendRpc();
1642
2.28k
  RETURN_NOT_OK(sync.Wait());
1643
2.28k
  rpcs.Shutdown();
1644
1645
2.28k
  const char* peer_type = is_leader ? "leader" : "non-leader";
1646
2.54k
  for (size_t i = 0; i < masters_.size(); i++) {
1647
2.54k
    bool matches_leader = masters_[i]->bound_rpc_hostport().port() == leader_master_hp.port();
1648
2.54k
    if (is_leader == matches_leader) {
1649
2.28k
      LOG(INFO) << "Found peer " << peer_type << " at index " << i << ".";
1650
2.28k
      return i;
1651
2.28k
    }
1652
2.54k
  }
1653
1654
  // There is never a situation where this should happen, so it's
1655
  // better to exit with a FATAL log message right away vs. return a
1656
  // Status::IllegalState().
1657
0
  auto status = STATUS_FORMAT(NotFound, "Peer $0 master is not in masters_ list", peer_type);
1658
0
  LOG(FATAL) << status;
1659
0
  return status;
1660
2.28k
}
1661
1662
2.25k
ExternalMaster* ExternalMiniCluster::GetLeaderMaster() {
1663
2.25k
  int num_attempts = 0;
1664
  // Retry to get the leader master's index - due to timing issues (like election in progress).
1665
2.35k
  for (;;) {
1666
2.35k
    ++num_attempts;
1667
2.35k
    auto idx = GetLeaderMasterIndex();
1668
2.35k
    if (idx.ok()) {
1669
2.25k
      return master(*idx);
1670
2.25k
    }
1671
104
    LOG(INFO) << "GetLeaderMasterIndex@" << num_attempts << " hit error: " << idx.status();
1672
104
    if (num_attempts >= kMaxRetryIterations) {
1673
1
      LOG(WARNING) << "Failed to get leader master after " << num_attempts << " attempts, "
1674
1
                   << "returning the first master.";
1675
1
      break;
1676
1
    }
1677
103
    SleepFor(MonoDelta::FromMilliseconds(num_attempts * 10));
1678
103
  }
1679
1680
1
  return master(0);
1681
2.25k
}
1682
1683
4
Result<size_t> ExternalMiniCluster::GetTabletLeaderIndex(const std::string& tablet_id) {
1684
8
  for (size_t i = 0; i < num_tablet_servers(); ++i) {
1685
8
    auto tserver = tablet_server(i);
1686
8
    if (tserver->IsProcessAlive()) {
1687
7
      auto tablets = VERIFY_RESULT(GetTablets(tserver));
1688
24
      for (const auto& tablet : tablets) {
1689
24
        if (tablet.tablet_id() == tablet_id && tablet.is_leader()) {
1690
4
          return i;
1691
4
        }
1692
24
      }
1693
7
    }
1694
8
  }
1695
0
  return STATUS(
1696
4
      NotFound, Format("Could not find leader of tablet $0 among live tservers.", tablet_id));
1697
4
}
1698
1699
72
ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const {
1700
156
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
1701
156
    if (ts->instance_id().permanent_uuid() == uuid) {
1702
72
      return ts.get();
1703
72
    }
1704
156
  }
1705
0
  return nullptr;
1706
72
}
1707
1708
6
int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
1709
14
  for (size_t i = 0; i < tablet_servers_.size(); i++) {
1710
14
    if (tablet_servers_[i]->uuid() == uuid) {
1711
6
      return narrow_cast<int>(i);
1712
6
    }
1713
14
  }
1714
0
  return -1;
1715
6
}
1716
1717
71
vector<ExternalMaster*> ExternalMiniCluster::master_daemons() const {
1718
71
  vector<ExternalMaster*> results;
1719
75
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1720
75
    results.push_back(master.get());
1721
75
  }
1722
71
  return results;
1723
71
}
1724
1725
89
vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
1726
89
  vector<ExternalDaemon*> results;
1727
237
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
1728
237
    results.push_back(ts.get());
1729
237
  }
1730
92
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1731
92
    results.push_back(master.get());
1732
92
  }
1733
89
  return results;
1734
89
}
1735
1736
29
std::vector<ExternalTabletServer*> ExternalMiniCluster::tserver_daemons() const {
1737
29
  std::vector<ExternalTabletServer*> result;
1738
29
  result.reserve(tablet_servers_.size());
1739
88
  for (const auto& ts : tablet_servers_) {
1740
88
    result.push_back(ts.get());
1741
88
  }
1742
29
  return result;
1743
29
}
1744
1745
0
HostPort ExternalMiniCluster::pgsql_hostport(int node_index) const {
1746
0
  return HostPort(tablet_servers_[node_index]->bind_host(),
1747
0
                  tablet_servers_[node_index]->pgsql_rpc_port());
1748
0
}
1749
1750
395k
rpc::Messenger* ExternalMiniCluster::messenger() {
1751
395k
  return messenger_;
1752
395k
}
1753
1754
std::shared_ptr<server::GenericServiceProxy> ExternalMiniCluster::master_generic_proxy(
1755
1
    int idx) const {
1756
1
  CHECK_GE(idx, 0);
1757
1
  CHECK_LT(idx, masters_.size());
1758
1
  return std::make_shared<server::GenericServiceProxy>(
1759
1
    proxy_cache_.get(), CHECK_NOTNULL(master(idx))->bound_rpc_addr());
1760
1
}
1761
1762
std::shared_ptr<server::GenericServiceProxy> ExternalMiniCluster::master_generic_proxy(
1763
0
    const HostPort& bound_rpc_addr) const {
1764
0
  return std::make_shared<server::GenericServiceProxy>(proxy_cache_.get(), bound_rpc_addr);
1765
0
}
1766
1767
223
void ExternalMiniCluster::ConfigureClientBuilder(client::YBClientBuilder* builder) {
1768
223
  CHECK_NOTNULL(builder);
1769
223
  CHECK(!masters_.empty());
1770
223
  builder->clear_master_server_addrs();
1771
281
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1772
281
    builder->add_master_server_addr(master->bound_rpc_hostport().ToString());
1773
281
  }
1774
223
}
1775
1776
1.24k
Result<HostPort> ExternalMiniCluster::DoGetLeaderMasterBoundRpcAddr() {
1777
1.24k
  return GetLeaderMaster()->bound_rpc_addr();
1778
1.24k
}
1779
1780
Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon,
1781
                                    const string& flag,
1782
97
                                    const string& value) {
1783
97
  server::GenericServiceProxy proxy(proxy_cache_.get(), daemon->bound_rpc_addr());
1784
1785
97
  rpc::RpcController controller;
1786
97
  controller.set_timeout(MonoDelta::FromSeconds(30));
1787
97
  server::SetFlagRequestPB req;
1788
97
  server::SetFlagResponsePB resp;
1789
97
  req.set_flag(flag);
1790
97
  req.set_value(value);
1791
97
  req.set_force(true);
1792
97
  RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller),
1793
96
                        "rpc failed");
1794
96
  if (resp.result() != server::SetFlagResponsePB::SUCCESS) {
1795
0
    return STATUS(RemoteError, "failed to set flag",
1796
0
                               resp.ShortDebugString());
1797
0
  }
1798
96
  return Status::OK();
1799
96
}
1800
1801
8
Status ExternalMiniCluster::SetFlagOnMasters(const string& flag, const string& value) {
1802
8
  for (const auto& master : masters_) {
1803
8
    RETURN_NOT_OK(SetFlag(master.get(), flag, value));
1804
8
  }
1805
8
  return Status::OK();
1806
8
}
1807
1808
3
Status ExternalMiniCluster::SetFlagOnTServers(const string& flag, const string& value) {
1809
11
  for (const auto& tablet_server : tablet_servers_) {
1810
11
    RETURN_NOT_OK(SetFlag(tablet_server.get(), flag, value));
1811
11
  }
1812
2
  return Status::OK();
1813
3
}
1814
1815
1816
5.31k
uint16_t ExternalMiniCluster::AllocateFreePort() {
1817
  // This will take a file lock ensuring the port does not get claimed by another thread/process
1818
  // and add it to our vector of such locks that will be freed on minicluster shutdown.
1819
5.31k
  free_port_file_locks_.emplace_back();
1820
5.31k
  return GetFreePort(&free_port_file_locks_.back());
1821
5.31k
}
1822
1823
0
Status ExternalMiniCluster::StartElection(ExternalMaster* master) {
1824
0
  auto master_sock = master->bound_rpc_addr();
1825
0
  auto master_proxy = std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), master_sock);
1826
1827
0
  RunLeaderElectionRequestPB req;
1828
0
  req.set_dest_uuid(master->uuid());
1829
0
  req.set_tablet_id(yb::master::kSysCatalogTabletId);
1830
0
  RunLeaderElectionResponsePB resp;
1831
0
  RpcController rpc;
1832
0
  rpc.set_timeout(opts_.timeout);
1833
0
  RETURN_NOT_OK(master_proxy->RunLeaderElection(req, &resp, &rpc));
1834
0
  if (resp.has_error()) {
1835
0
    return StatusFromPB(resp.error().status())
1836
0
               .CloneAndPrepend(Substitute("Code $0",
1837
0
                                           TabletServerErrorPB::Code_Name(resp.error().code())));
1838
0
  }
1839
0
  return Status::OK();
1840
0
}
1841
1842
35
ExternalMaster* ExternalMiniCluster::master() const {
1843
35
  if (masters_.empty())
1844
0
    return nullptr;
1845
1846
0
  CHECK_EQ(masters_.size(), 1)
1847
0
      << "master() should not be used with multiple masters, use GetLeaderMaster() instead.";
1848
35
  return master(0);
1849
35
}
1850
1851
// Return master at 'idx' or NULL if the master at 'idx' has not been started.
1852
44.5k
ExternalMaster* ExternalMiniCluster::master(size_t idx) const {
1853
44.5k
  CHECK_LT(idx, masters_.size());
1854
44.5k
  return masters_[idx].get();
1855
44.5k
}
1856
1857
3.25k
ExternalTabletServer* ExternalMiniCluster::tablet_server(size_t idx) const {
1858
3.25k
  CHECK_LT(idx, tablet_servers_.size());
1859
3.25k
  return tablet_servers_[idx].get();
1860
3.25k
}
1861
1862
//------------------------------------------------------------
1863
// ExternalDaemon
1864
//------------------------------------------------------------
1865
1866
namespace {
1867
1868
// Global state to manage all log tailer threads. This state is managed using Singleton from gutil
1869
// and is never deallocated.
1870
struct GlobalLogTailerState {
1871
  mutex logging_mutex;
1872
  atomic<int> next_log_tailer_id{0};
1873
1874
  // We need some references to these heap-allocated atomic booleans so that ASAN would not consider
1875
  // them memory leaks.
1876
  mutex id_to_stopped_flag_mutex;
1877
  map<int, atomic<bool>*> id_to_stopped_flag;
1878
1879
  // This is used to limit the total amount of logs produced by external daemons over the lifetime
1880
  // of a test program. Guarded by logging_mutex.
1881
  size_t total_bytes_logged = 0;
1882
};
1883
1884
}  // anonymous namespace
1885
1886
class ExternalDaemon::LogTailerThread {
1887
 public:
1888
  LogTailerThread(const string line_prefix,
1889
                  const int child_fd,
1890
                  ostream* const out)
1891
      : id_(global_state()->next_log_tailer_id.fetch_add(1)),
1892
        stopped_(CreateStoppedFlagForId(id_)),
1893
        thread_desc_(Substitute("log tailer thread for prefix $0", line_prefix)),
1894
2.13k
        thread_([=] {
1895
18.4E
          VLOG(1) << "Starting " << thread_desc_;
1896
2.13k
          FILE* const fp = fdopen(child_fd, "rb");
1897
2.13k
          char buf[65536];
1898
2.13k
          const atomic<bool>* stopped;
1899
1900
2.13k
          {
1901
2.13k
            lock_guard<mutex> l(state_lock_);
1902
2.13k
            stopped = stopped_;
1903
2.13k
          }
1904
1905
          // Instead of doing a nonblocking read, we detach this thread and allow it to block
1906
          // indefinitely trying to read from a child process's stream where nothing is happening.
1907
          // This is probably OK as long as we are careful to avoid accessing any state that might
1908
          // have been already destructed (e.g. logging, cout/cerr, member fields of this class,
1909
          // etc.) in case we do get unblocked. Instead, we keep a local pointer to the atomic
1910
          // "stopped" flag, and that allows us to safely check if it is OK to print log messages.
1911
          // The "stopped" flag itself is never deallocated.
1912
2.13k
          bool is_eof = false;
1913
2.13k
          bool is_fgets_null = false;
1914
2.13k
          auto& logging_mutex = global_state()->logging_mutex;
1915
2.13k
          auto& total_bytes_logged = global_state()->total_bytes_logged;
1916
828k
          while (!(is_eof = feof(fp)) &&
1917
828k
                 !(is_fgets_null = (fgets(buf, sizeof(buf), fp) == nullptr)) &&
1918
826k
                 !stopped->load()) {
1919
826k
            size_t l = strlen(buf);
1920
18.4E
            const char* maybe_end_of_line = l > 0 && buf[l - 1] == '\n' ? "" : "\n";
1921
            // Synchronize tailing output from all external daemons for simplicity.
1922
826k
            lock_guard<mutex> lock(logging_mutex);
1923
826k
            if (stopped->load()) break;
1924
            // Make sure we always output an end-of-line character.
1925
826k
            *out << line_prefix << " " << buf << maybe_end_of_line;
1926
826k
            if (!stopped->load()) {
1927
826k
              auto listener = listener_.load(std::memory_order_acquire);
1928
826k
              if (!stopped->load() && listener) {
1929
907
                listener->Handle(GStringPiece(buf, maybe_end_of_line ? l : l - 1));
1930
907
              }
1931
826k
            }
1932
826k
            total_bytes_logged += strlen(buf) + strlen(maybe_end_of_line);
1933
            // Abort the test if it produces too much log spew.
1934
826k
            CHECK_LE(total_bytes_logged, FLAGS_external_mini_cluster_max_log_bytes);
1935
826k
          }
1936
2.13k
          fclose(fp);
1937
2.13k
          if (!stopped->load()) {
1938
            // It might not be safe to log anything if we have already stopped.
1939
4
            VLOG(1) << "Exiting " << thread_desc_
1940
4
                    << ": is_eof=" << is_eof
1941
4
                    << ", is_fgets_null=" << is_fgets_null
1942
4
                    << ", stopped=0";
1943
1.57k
          }
1944
2.14k
        }) {
1945
2.14k
    thread_.detach();
1946
2.14k
  }
1947
1948
8
  void SetListener(StringListener* listener) {
1949
8
    listener_ = listener;
1950
8
  }
1951
1952
8
  void RemoveListener(StringListener* listener) {
1953
8
    listener_.compare_exchange_strong(listener, nullptr);
1954
8
  }
1955
1956
1.59k
  ~LogTailerThread() {
1957
0
    VLOG(1) << "Stopping " << thread_desc_;
1958
1.59k
    lock_guard<mutex> l(state_lock_);
1959
1.59k
    stopped_->store(true);
1960
1.59k
    listener_ = nullptr;
1961
1.59k
  }
1962
1963
 private:
1964
10.7k
  static GlobalLogTailerState* global_state() {
1965
10.7k
    return Singleton<GlobalLogTailerState>::get();
1966
10.7k
  }
1967
1968
2.14k
  static atomic<bool>* CreateStoppedFlagForId(int id) {
1969
2.14k
    lock_guard<mutex> lock(global_state()->id_to_stopped_flag_mutex);
1970
    // This is never deallocated, but we add this pointer to the id_to_stopped_flag map referenced
1971
    // from the global state singleton, and that apparently makes ASAN no longer consider this to be
1972
    // a memory leak. We don't need to check if the id already exists in the map, because this
1973
    // function is never invoked with a particular id more than once.
1974
2.14k
    auto* const stopped = new atomic<bool>();
1975
2.14k
    stopped->store(false);
1976
2.14k
    global_state()->id_to_stopped_flag[id] = stopped;
1977
2.14k
    return stopped;
1978
2.14k
  }
1979
1980
  const int id_;
1981
1982
  // This lock protects the stopped_ pointer in case of a race between tailer thread's
1983
  // initialization (i.e. before it gets into its loop) and the destructor.
1984
  mutex state_lock_;
1985
1986
  atomic<bool>* const stopped_;
1987
  const string thread_desc_;  // A human-readable description of this thread.
1988
  thread thread_;
1989
  std::atomic<StringListener*> listener_{nullptr};
1990
};
1991
1992
ExternalDaemon::ExternalDaemon(
1993
    std::string daemon_id,
1994
    rpc::Messenger* messenger,
1995
    rpc::ProxyCache* proxy_cache,
1996
    const string& exe,
1997
    const string& root_dir,
1998
    const std::vector<std::string>& data_dirs,
1999
    const vector<string>& extra_flags)
2000
  : daemon_id_(daemon_id),
2001
    messenger_(messenger),
2002
    proxy_cache_(proxy_cache),
2003
    exe_(exe),
2004
    root_dir_(root_dir),
2005
    data_dirs_(data_dirs),
2006
997
    extra_flags_(extra_flags) {}
2007
2008
718
ExternalDaemon::~ExternalDaemon() {
2009
718
}
2010
2011
149k
bool ExternalDaemon::ServerInfoPathsExist() {
2012
149k
  return Env::Default()->FileExists(GetServerInfoPath());
2013
149k
}
2014
2015
1.07k
Status ExternalDaemon::BuildServerStateFromInfoPath() {
2016
1.07k
  return BuildServerStateFromInfoPath(GetServerInfoPath(), &status_);
2017
1.07k
}
2018
2019
Status ExternalDaemon::BuildServerStateFromInfoPath(
2020
1.80k
    const string& info_path, std::unique_ptr<ServerStatusPB>* server_status) {
2021
1.80k
  server_status->reset(new ServerStatusPB());
2022
1.80k
  RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, (*server_status).get()),
2023
1.80k
                        "Failed to read info file from " + info_path);
2024
1.80k
  return Status::OK();
2025
1.80k
}
2026
2027
165k
string ExternalDaemon::GetServerInfoPath() {
2028
165k
  return JoinPathSegments(root_dir_, "info.pb");
2029
165k
}
2030
2031
1.07k
Status ExternalDaemon::DeleteServerInfoPaths() {
2032
1.07k
  return Env::Default()->DeleteFile(GetServerInfoPath());
2033
1.07k
}
2034
2035
1.07k
Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
2036
1.07k
  CHECK(!process_);
2037
2038
1.07k
  vector<string> argv;
2039
  // First the exe for argv[0]
2040
1.07k
  argv.push_back(BaseName(exe_));
2041
2042
  // Then all the flags coming from the minicluster framework.
2043
1.07k
  argv.insert(argv.end(), user_flags.begin(), user_flags.end());
2044
2045
  // Disable callhome.
2046
1.07k
  argv.push_back("--callhome_enabled=false");
2047
2048
  // Disabled due to #4507.
2049
  // TODO: Enable metrics logging after #4507 is fixed.
2050
  //
2051
  // Even though we set -logtostderr down below, metrics logs end up being written
2052
  // based on -log_dir. So, we have to set that too.
2053
1.07k
  argv.push_back("--metrics_log_interval_ms=0");
2054
2055
  // Force set log_dir to empty value, process will chose default destination inside fs_data_dir
2056
  // In other case log_dir value will be extracted from TEST_TMPDIR env variable but it is
2057
  // inherited from test script
2058
1.07k
  argv.push_back("--log_dir=");
2059
2060
  // Tell the server to dump its port information so we can pick it up.
2061
1.07k
  const string info_path = GetServerInfoPath();
2062
1.07k
  argv.push_back("--server_dump_info_path=" + info_path);
2063
1.07k
  argv.push_back("--server_dump_info_format=pb");
2064
2065
  // We use ephemeral ports in many tests. They don't work for production, but are OK
2066
  // in unit tests.
2067
1.07k
  argv.push_back("--rpc_server_allow_ephemeral_ports");
2068
2069
  // A previous instance of the daemon may have run in the same directory. So, remove
2070
  // the previous info file if it's there.
2071
1.07k
  Status s = DeleteServerInfoPaths();
2072
1.07k
  if (!s.ok() && !s.IsNotFound()) {
2073
0
    LOG (WARNING) << "Failed to delete info paths: " << s.ToString();
2074
0
  }
2075
2076
  // Ensure that logging goes to the test output doesn't get buffered.
2077
1.07k
  argv.push_back("--logbuflevel=-1");
2078
2079
  // Use the same verbose logging level in the child process as in the test driver.
2080
1.07k
  if (FLAGS_v != 0) {  // Skip this option if it has its default value (0).
2081
0
    argv.push_back(Substitute("-v=$0", FLAGS_v));
2082
0
  }
2083
1.07k
  if (!FLAGS_vmodule.empty()) {
2084
0
    argv.push_back(Substitute("--vmodule=$0", FLAGS_vmodule));
2085
0
  }
2086
1.07k
  if (FLAGS_mem_tracker_logging) {
2087
0
    argv.push_back("--mem_tracker_logging");
2088
0
  }
2089
1.07k
  if (FLAGS_mem_tracker_log_stack_trace) {
2090
0
    argv.push_back("--mem_tracker_log_stack_trace");
2091
0
  }
2092
1.07k
  if (FLAGS_use_libbacktrace) {
2093
0
    argv.push_back("--use_libbacktrace");
2094
0
  }
2095
2096
1.07k
  const char* test_invocation_id = getenv("YB_TEST_INVOCATION_ID");
2097
1.07k
  if (test_invocation_id) {
2098
    // We use --metric_node_name=... to include a unique "test invocation id" into the command
2099
    // line so we can kill any stray processes later. --metric_node_name is normally how we pass
2100
    // the Universe ID to the cluster. We could use any other flag that is present in yb-master
2101
    // and yb-tserver for this.
2102
1.07k
    argv.push_back(Format("--metric_node_name=$0", test_invocation_id));
2103
1.07k
  }
2104
2105
1.07k
  string fatal_details_path_prefix = GetFatalDetailsPathPrefix();
2106
1.07k
  argv.push_back(Format(
2107
1.07k
      "--fatal_details_path_prefix=$0.$1", GetFatalDetailsPathPrefix(), daemon_id_));
2108
2109
1.07k
  argv.push_back(Format("--minicluster_daemon_id=$0", daemon_id_));
2110
2111
  // Finally, extra flags to override.
2112
  // - extra_flags_ is taken from ExternalMiniCluster.opts_, which is often set by test subclasses'
2113
  //   UpdateMiniClusterOptions.
2114
  // - extra daemon flags is supplied by the user, either through environment variable or
2115
  //   yb_build.sh --extra_daemon_flags (or --extra_daemon_args), so it should take highest
2116
  //   precedence.
2117
1.07k
  argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end());
2118
1.07k
  AddExtraFlagsFromEnvVar("YB_EXTRA_DAEMON_FLAGS", &argv);
2119
2120
1.07k
  std::unique_ptr<Subprocess> p(new Subprocess(exe_, argv));
2121
1.07k
  p->PipeParentStdout();
2122
1.07k
  p->PipeParentStderr();
2123
1.07k
  auto default_output_prefix = Substitute("[$0]", daemon_id_);
2124
1.07k
  LOG(INFO) << "Running " << default_output_prefix << ": " << exe_ << "\n"
2125
1.07k
    << JoinStrings(argv, "\n");
2126
1.07k
  if (!FLAGS_external_daemon_heap_profile_prefix.empty()) {
2127
0
    p->SetEnv("HEAPPROFILE",
2128
0
              FLAGS_external_daemon_heap_profile_prefix + "_" + daemon_id_);
2129
0
    p->SetEnv("HEAPPROFILESIGNAL", std::to_string(kHeapProfileSignal));
2130
0
  }
2131
2132
1.07k
  const char* llvm_profile_env_var_value = getenv("LLVM_PROFILE_FILE");
2133
1.07k
  if (llvm_profile_env_var_value) {
2134
1.07k
    p->SetEnv("LLVM_PROFILE_FILE", Format("$0_$1", llvm_profile_env_var_value, daemon_id_));
2135
1.07k
  }
2136
2137
1.07k
  RETURN_NOT_OK_PREPEND(p->Start(),
2138
1.07k
                        Substitute("Failed to start subprocess $0", exe_));
2139
2140
1.07k
  stdout_tailer_thread_ = std::make_unique<LogTailerThread>(
2141
1.07k
      Substitute("[$0 stdout]", daemon_id_), p->ReleaseChildStdoutFd(), &std::cout);
2142
2143
  // We will mostly see stderr output from the child process (because of --logtostderr), so we'll
2144
  // assume that by default in the output prefix.
2145
1.07k
  stderr_tailer_thread_ = std::make_unique<LogTailerThread>(
2146
1.07k
      default_output_prefix, p->ReleaseChildStderrFd(), &std::cerr);
2147
2148
  // The process is now starting -- wait for the bound port info to show up.
2149
1.07k
  Stopwatch sw;
2150
1.07k
  sw.start();
2151
1.07k
  bool success = false;
2152
149k
  while (sw.elapsed().wall_seconds() < kProcessStartTimeoutSeconds) {
2153
149k
    if (ServerInfoPathsExist()) {
2154
1.07k
      success = true;
2155
1.07k
      break;
2156
1.07k
    }
2157
148k
    SleepFor(MonoDelta::FromMilliseconds(10));
2158
148k
    int rc;
2159
148k
    Status s = p->WaitNoBlock(&rc);
2160
148k
    if (s.IsTimedOut()) {
2161
      // The process is still running.
2162
148k
      continue;
2163
148k
    }
2164
1
    RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_));
2165
1
    return STATUS(RuntimeError,
2166
1
      Substitute("Process exited with rc=$0", rc),
2167
1
      exe_);
2168
1
  }
2169
2170
1.07k
  if (!success) {
2171
0
    WARN_NOT_OK(p->Kill(SIGKILL), "Killing process failed");
2172
0
    return STATUS(TimedOut,
2173
0
        Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)",
2174
0
                   kProcessStartTimeoutSeconds, exe_, info_path));
2175
0
  }
2176
2177
1.07k
  RETURN_NOT_OK(BuildServerStateFromInfoPath());
2178
1.07k
  LOG(INFO) << "Started " << default_output_prefix << " " << exe_ << " as pid " << p->pid();
2179
0
  VLOG(1) << exe_ << " instance information:\n" << status_->DebugString();
2180
2181
1.07k
  process_.swap(p);
2182
1.07k
  return Status::OK();
2183
1.07k
}
2184
2185
65
Status ExternalDaemon::Pause() {
2186
65
  if (!process_) return Status::OK();
2187
0
  VLOG(1) << "Pausing " << ProcessNameAndPidStr();
2188
65
  return process_->Kill(SIGSTOP);
2189
65
}
2190
2191
68
Status ExternalDaemon::Resume() {
2192
68
  if (!process_) return Status::OK();
2193
0
  VLOG(1) << "Resuming " << ProcessNameAndPidStr();
2194
68
  return process_->Kill(SIGCONT);
2195
68
}
2196
2197
0
Status ExternalDaemon::Kill(int signal) {
2198
0
  if (!process_) return Status::OK();
2199
0
  VLOG(1) << "Kill " << ProcessNameAndPidStr() << " with " << signal;
2200
0
  return process_->Kill(signal);
2201
0
}
2202
2203
6.64k
bool ExternalDaemon::IsShutdown() const {
2204
6.64k
  return process_.get() == nullptr;
2205
6.64k
}
2206
2207
5.23k
bool ExternalDaemon::IsProcessAlive() const {
2208
5.23k
  if (IsShutdown()) {
2209
86
    return false;
2210
86
  }
2211
2212
5.14k
  int rc = 0;
2213
5.14k
  Status s = process_->WaitNoBlock(&rc);
2214
  // If the non-blocking Wait "times out", that means the process
2215
  // is running.
2216
5.14k
  return s.IsTimedOut();
2217
5.14k
}
2218
2219
44
pid_t ExternalDaemon::pid() const {
2220
44
  return process_->pid();
2221
44
}
2222
2223
1.61k
void ExternalDaemon::Shutdown() {
2224
1.61k
  if (!process_) return;
2225
2226
802
  LOG_WITH_PREFIX(INFO) << "Starting Shutdown()";
2227
2228
  // Before we kill the process, store the addresses. If we're told to start again we'll reuse
2229
  // these.
2230
802
  bound_rpc_ = bound_rpc_hostport();
2231
802
  bound_http_ = bound_http_hostport();
2232
2233
802
  if (IsProcessAlive()) {
2234
    // In coverage builds, ask the process nicely to flush coverage info
2235
    // before we kill -9 it. Otherwise, we never get any coverage from
2236
    // external clusters.
2237
789
    FlushCoverage();
2238
2239
789
    if (!FLAGS_external_daemon_heap_profile_prefix.empty()) {
2240
      // The child process has been configured using the HEAPPROFILESIGNAL environment variable to
2241
      // create a heap profile on receiving kHeapProfileSignal.
2242
0
      static const int kWaitMs = 100;
2243
0
      LOG(INFO) << "Sending signal " << kHeapProfileSignal << " to " << ProcessNameAndPidStr()
2244
0
                << " to capture a heap profile. Waiting for " << kWaitMs << " ms afterwards.";
2245
0
      WARN_NOT_OK(process_->Kill(kHeapProfileSignal), "Killing process failed");
2246
0
      std::this_thread::sleep_for(std::chrono::milliseconds(kWaitMs));
2247
0
    }
2248
2249
789
    if (FLAGS_external_daemon_safe_shutdown) {
2250
      // We put 'SIGTERM' in quotes because an unquoted one would be treated as a test failure
2251
      // by our regular expressions in common-test-env.sh.
2252
0
      LOG(INFO) << "Terminating " << ProcessNameAndPidStr() << " using 'SIGTERM' signal";
2253
0
      WARN_NOT_OK(process_->Kill(SIGTERM), "Killing process failed");
2254
0
      int total_delay_ms = 0;
2255
0
      int current_delay_ms = 10;
2256
0
      for (int i = 0; i < 10 && IsProcessAlive(); ++i) {
2257
0
        std::this_thread::sleep_for(std::chrono::milliseconds(current_delay_ms));
2258
0
        total_delay_ms += current_delay_ms;
2259
0
        current_delay_ms += 10;  // will sleep for 10ms, then 20ms, etc.
2260
0
      }
2261
2262
0
      if (IsProcessAlive()) {
2263
0
        LOG(INFO) << "The process " << ProcessNameAndPidStr() << " is still running after "
2264
0
                  << total_delay_ms << " ms, will send SIGKILL";
2265
0
      }
2266
0
    }
2267
2268
789
    if (IsProcessAlive()) {
2269
789
      LOG(INFO) << "Killing " << ProcessNameAndPidStr() << " with SIGKILL";
2270
789
      WARN_NOT_OK(process_->Kill(SIGKILL), "Killing process failed");
2271
789
    }
2272
789
  }
2273
802
  int ret = 0;
2274
802
  WARN_NOT_OK(process_->Wait(&ret), "Waiting on " + exe_);
2275
802
  process_.reset();
2276
802
}
2277
2278
789
void ExternalDaemon::FlushCoverage() {
2279
789
#ifndef COVERAGE_BUILD_
2280
789
  return;
2281
#else
2282
  LOG(FATAL) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid();
2283
  server::GenericServiceProxy proxy(proxy_cache_, bound_rpc_addr());
2284
2285
  server::FlushCoverageRequestPB req;
2286
  server::FlushCoverageResponsePB resp;
2287
  rpc::RpcController rpc;
2288
2289
  // Set a reasonably short timeout, since some of our tests kill servers which
2290
  // are kill -STOPed.
2291
  rpc.set_timeout(MonoDelta::FromMilliseconds(100));
2292
  Status s = proxy.FlushCoverage(req, &resp, &rpc);
2293
  if (s.ok() && !resp.success()) {
2294
    s = STATUS(RemoteError, "Server does not appear to be running a coverage build");
2295
  }
2296
  WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid()));
2297
#endif
2298
789
}
2299
2300
789
std::string ExternalDaemon::ProcessNameAndPidStr() {
2301
789
  return Substitute("$0 with pid $1", exe_, process_->pid());
2302
789
}
2303
2304
51.4k
HostPort ExternalDaemon::bound_rpc_hostport() const {
2305
51.4k
  CHECK(status_);
2306
51.4k
  CHECK_GE(status_->bound_rpc_addresses_size(), 1);
2307
51.4k
  return HostPortFromPB(status_->bound_rpc_addresses(0));
2308
51.4k
}
2309
2310
46.8k
HostPort ExternalDaemon::bound_rpc_addr() const {
2311
46.8k
  return bound_rpc_hostport();
2312
46.8k
}
2313
2314
971
HostPort ExternalDaemon::bound_http_hostport() const {
2315
971
  CHECK(status_);
2316
971
  CHECK_GE(status_->bound_http_addresses_size(), 1);
2317
971
  return HostPortFromPB(status_->bound_http_addresses(0));
2318
971
}
2319
2320
16.5k
const NodeInstancePB& ExternalDaemon::instance_id() const {
2321
16.5k
  CHECK(status_);
2322
16.5k
  return status_->node_instance();
2323
16.5k
}
2324
2325
671
const string& ExternalDaemon::uuid() const {
2326
671
  CHECK(status_);
2327
671
  return status_->node_instance().permanent_uuid();
2328
671
}
2329
2330
Result<int64_t> ExternalDaemon::GetInt64MetricFromHost(const HostPort& hostport,
2331
                                                       const MetricEntityPrototype* entity_proto,
2332
                                                       const char* entity_id,
2333
                                                       const MetricPrototype* metric_proto,
2334
170
                                                       const char* value_field) {
2335
170
  return GetInt64MetricFromHost(hostport, entity_proto->name(), entity_id, metric_proto->name(),
2336
170
                                value_field);
2337
170
}
2338
2339
Result<int64_t> ExternalDaemon::GetInt64MetricFromHost(const HostPort& hostport,
2340
                                                       const char* entity_proto_name,
2341
                                                       const char* entity_id,
2342
                                                       const char* metric_proto_name,
2343
170
                                                       const char* value_field) {
2344
  // Fetch metrics whose name matches the given prototype.
2345
170
  string url = Substitute(
2346
170
      "http://$0/jsonmetricz?metrics=$1",
2347
170
      hostport.ToString(),
2348
170
      metric_proto_name);
2349
170
  EasyCurl curl;
2350
170
  faststring dst;
2351
170
  RETURN_NOT_OK(curl.FetchURL(url, &dst));
2352
2353
  // Parse the results, beginning with the top-level entity array.
2354
170
  JsonReader r(dst.ToString());
2355
170
  RETURN_NOT_OK(r.Init());
2356
170
  vector<const Value*> entities;
2357
170
  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities));
2358
170
  for (const Value* entity : entities) {
2359
    // Find the desired entity.
2360
170
    string type;
2361
170
    RETURN_NOT_OK(r.ExtractString(entity, "type", &type));
2362
170
    if (type != entity_proto_name) {
2363
0
      continue;
2364
0
    }
2365
170
    if (entity_id) {
2366
116
      string id;
2367
116
      RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
2368
116
      if (id != entity_id) {
2369
0
        continue;
2370
0
      }
2371
170
    }
2372
2373
    // Find the desired metric within the entity.
2374
170
    vector<const Value*> metrics;
2375
170
    RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
2376
170
    for (const Value* metric : metrics) {
2377
170
      string name;
2378
170
      RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
2379
170
      if (name != metric_proto_name) {
2380
0
        continue;
2381
0
      }
2382
170
      int64_t value;
2383
170
      RETURN_NOT_OK(r.ExtractInt64(metric, value_field, &value));
2384
170
      return value;
2385
170
    }
2386
170
  }
2387
0
  string msg;
2388
0
  if (entity_id) {
2389
0
    msg = Substitute("Could not find metric $0.$1 for entity $2",
2390
0
                     entity_proto_name, metric_proto_name,
2391
0
                     entity_id);
2392
0
  } else {
2393
0
    msg = Substitute("Could not find metric $0.$1",
2394
0
                     entity_proto_name, metric_proto_name);
2395
0
  }
2396
0
  return STATUS(NotFound, msg);
2397
170
}
2398
2399
878
string ExternalDaemon::LogPrefix() {
2400
878
  return Format("{ daemon_id: $0 bound_rpc: $1 } ", daemon_id_, bound_rpc_);
2401
878
}
2402
2403
4
void ExternalDaemon::SetLogListener(StringListener* listener) {
2404
4
  stdout_tailer_thread_->SetListener(listener);
2405
4
  stderr_tailer_thread_->SetListener(listener);
2406
4
}
2407
2408
4
void ExternalDaemon::RemoveLogListener(StringListener* listener) {
2409
4
  stdout_tailer_thread_->RemoveListener(listener);
2410
4
  stderr_tailer_thread_->RemoveListener(listener);
2411
4
}
2412
2413
25
Result<string> ExternalDaemon::GetFlag(const std::string& flag) {
2414
25
  server::GenericServiceProxy proxy(proxy_cache_, bound_rpc_addr());
2415
2416
25
  rpc::RpcController controller;
2417
25
  controller.set_timeout(MonoDelta::FromSeconds(30));
2418
25
  server::GetFlagRequestPB req;
2419
25
  server::GetFlagResponsePB resp;
2420
25
  req.set_flag(flag);
2421
25
  RETURN_NOT_OK(proxy.GetFlag(req, &resp, &controller));
2422
25
  if (!resp.valid()) {
2423
0
    return STATUS_FORMAT(RemoteError, "Failed to get gflag $0 value.", flag);
2424
0
  }
2425
25
  return resp.value();
2426
25
}
2427
2428
Result<int64_t> ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto,
2429
                                               const char* entity_id,
2430
                                               const MetricPrototype* metric_proto,
2431
98
                                               const char* value_field) const {
2432
98
  return GetInt64MetricFromHost(
2433
98
      bound_http_hostport(), entity_proto, entity_id, metric_proto, value_field);
2434
98
}
2435
2436
Result<int64_t> ExternalDaemon::GetInt64Metric(const char* entity_proto_name,
2437
                                               const char* entity_id,
2438
                                               const char* metric_proto_name,
2439
0
                                               const char* value_field) const {
2440
0
  return GetInt64MetricFromHost(
2441
0
      bound_http_hostport(), entity_proto_name, entity_id, metric_proto_name, value_field);
2442
0
}
2443
2444
LogWaiter::LogWaiter(ExternalDaemon* daemon, const std::string& string_to_wait) :
2445
3
    daemon_(daemon), string_to_wait_(string_to_wait) {
2446
3
  daemon_->SetLogListener(this);
2447
3
}
2448
2449
73
void LogWaiter::Handle(const GStringPiece& s) {
2450
73
  if (s.contains(string_to_wait_)) {
2451
3
    event_occurred_ = true;
2452
3
  }
2453
73
}
2454
2455
3
Status LogWaiter::WaitFor(const MonoDelta timeout) {
2456
3
  constexpr auto kInitialWaitPeriod = 100ms;
2457
3
  return ::yb::WaitFor(
2458
17
      [this]{ return event_occurred_.load(); }, timeout,
2459
3
      Format("Waiting for log record '$0' on $1...", string_to_wait_, daemon_->id()),
2460
3
      kInitialWaitPeriod);
2461
3
}
2462
2463
3
LogWaiter::~LogWaiter() {
2464
3
  daemon_->RemoveLogListener(this);
2465
3
}
2466
2467
//------------------------------------------------------------
2468
// ScopedResumeExternalDaemon
2469
//------------------------------------------------------------
2470
2471
ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon)
2472
18
    : daemon_(CHECK_NOTNULL(daemon)) {
2473
18
}
2474
2475
18
ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
2476
18
  CHECK_OK(daemon_->Resume());
2477
18
}
2478
2479
//------------------------------------------------------------
2480
// ExternalMaster
2481
//------------------------------------------------------------
2482
ExternalMaster::ExternalMaster(
2483
    size_t master_index,
2484
    rpc::Messenger* messenger,
2485
    rpc::ProxyCache* proxy_cache,
2486
    const string& exe,
2487
    const string& data_dir,
2488
    const std::vector<string>& extra_flags,
2489
    const string& rpc_bind_address,
2490
    uint16_t http_port,
2491
    const string& master_addrs)
2492
    : ExternalDaemon(Substitute("m-$0", master_index + 1), messenger,
2493
                     proxy_cache, exe, data_dir,
2494
                     {GetServerTypeDataPath(data_dir, "master")}, extra_flags),
2495
      rpc_bind_address_(rpc_bind_address),
2496
      master_addrs_(master_addrs),
2497
322
      http_port_(http_port) {
2498
322
}
2499
2500
252
ExternalMaster::~ExternalMaster() {
2501
252
}
2502
2503
namespace {
2504
2505
class Flags {
2506
 public:
2507
  template <class Value>
2508
12.3k
  void Add(const std::string& name, const Value& value) {
2509
12.3k
    value_.push_back(Format("--$0=$1", name, value));
2510
12.3k
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddINS_8HostPortEEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKT_
Line
Count
Source
2508
2.95k
  void Add(const std::string& name, const Value& value) {
2509
2.95k
    value_.push_back(Format("--$0=$1", name, value));
2510
2.95k
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddINSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEEEvRKS9_RKT_
Line
Count
Source
2508
3.20k
  void Add(const std::string& name, const Value& value) {
2509
3.20k
    value_.push_back(Format("--$0=$1", name, value));
2510
3.20k
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIA10_cEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKT_
Line
Count
Source
2508
337
  void Add(const std::string& name, const Value& value) {
2509
337
    value_.push_back(Format("--$0=$1", name, value));
2510
337
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddItEEvRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKT_
Line
Count
Source
2508
3.28k
  void Add(const std::string& name, const Value& value) {
2509
3.28k
    value_.push_back(Format("--$0=$1", name, value));
2510
3.28k
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIiEEvRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKT_
Line
Count
Source
2508
337
  void Add(const std::string& name, const Value& value) {
2509
337
    value_.push_back(Format("--$0=$1", name, value));
2510
337
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIbEEvRKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEERKT_
Line
Count
Source
2508
738
  void Add(const std::string& name, const Value& value) {
2509
738
    value_.push_back(Format("--$0=$1", name, value));
2510
738
  }
external_mini_cluster.cc:_ZN2yb12_GLOBAL__N_15Flags3AddIA3_cEEvRKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKT_
Line
Count
Source
2508
1.47k
  void Add(const std::string& name, const Value& value) {
2509
1.47k
    value_.push_back(Format("--$0=$1", name, value));
2510
1.47k
  }
2511
2512
2.95k
  void AddHostPort(const std::string& name, const std::string& host, uint16_t port) {
2513
2.95k
    Add(name, HostPort(host, port));
2514
2.95k
  }
2515
2516
1.07k
  const std::vector<std::string>& value() const {
2517
1.07k
    return value_;
2518
1.07k
  }
2519
2520
 private:
2521
  std::vector<std::string> value_;
2522
};
2523
2524
} // namespace
2525
2526
337
Status ExternalMaster::Start(bool shell_mode) {
2527
337
  Flags flags;
2528
337
  flags.Add("fs_data_dirs", root_dir_);
2529
337
  flags.Add("rpc_bind_addresses", rpc_bind_address_);
2530
337
  flags.Add("webserver_interface", "localhost");
2531
337
  flags.Add("webserver_port", http_port_);
2532
  // Default master args to make sure we don't wait to trigger new LB tasks upon master leader
2533
  // failover.
2534
337
  flags.Add("load_balancer_initial_delay_secs", 0);
2535
  // On first start, we need to tell the masters their list of expected peers.
2536
  // For 'shell' master, there is no master addresses.
2537
337
  if (!shell_mode) {
2538
313
    flags.Add("master_addresses", master_addrs_);
2539
313
  }
2540
337
  RETURN_NOT_OK(StartProcess(flags.value()));
2541
337
  return Status::OK();
2542
337
}
2543
2544
15
Status ExternalMaster::Restart() {
2545
15
  LOG_WITH_PREFIX(INFO) << "Restart()";
2546
15
  if (!IsProcessAlive()) {
2547
    // Make sure this function could be safely called if the process has already crashed.
2548
15
    Shutdown();
2549
15
  }
2550
  // We store the addresses on shutdown so make sure we did that first.
2551
15
  if (bound_rpc_.port() == 0) {
2552
0
    return STATUS(IllegalState, "Master cannot be restarted. Must call Shutdown() first.");
2553
0
  }
2554
15
  return Start(true);
2555
15
}
2556
2557
//------------------------------------------------------------
2558
// ExternalTabletServer
2559
//------------------------------------------------------------
2560
2561
ExternalTabletServer::ExternalTabletServer(
2562
    size_t tablet_server_index, rpc::Messenger* messenger, rpc::ProxyCache* proxy_cache,
2563
    const std::string& exe, const std::string& data_dir, uint16_t num_drives,
2564
    std::string bind_host, uint16_t rpc_port, uint16_t http_port, uint16_t redis_rpc_port,
2565
    uint16_t redis_http_port, uint16_t cql_rpc_port, uint16_t cql_http_port,
2566
    uint16_t pgsql_rpc_port, uint16_t pgsql_http_port,
2567
    const std::vector<HostPort>& master_addrs, const std::vector<std::string>& extra_flags)
2568
    : ExternalDaemon(Substitute("ts-$0", tablet_server_index + 1),
2569
                     messenger, proxy_cache, exe, data_dir,
2570
                     FsDataDirs(data_dir, "tserver", num_drives), extra_flags),
2571
      master_addrs_(HostPort::ToCommaSeparatedString(master_addrs)),
2572
      bind_host_(std::move(bind_host)),
2573
      rpc_port_(rpc_port),
2574
      http_port_(http_port),
2575
      redis_rpc_port_(redis_rpc_port),
2576
      redis_http_port_(redis_http_port),
2577
      pgsql_rpc_port_(pgsql_rpc_port),
2578
      pgsql_http_port_(pgsql_http_port),
2579
      cql_rpc_port_(cql_rpc_port),
2580
      cql_http_port_(cql_http_port),
2581
675
      num_drives_(num_drives) {}
2582
2583
466
ExternalTabletServer::~ExternalTabletServer() {
2584
466
}
2585
2586
Status ExternalTabletServer::Start(
2587
    bool start_cql_proxy, bool set_proxy_addrs,
2588
738
    std::vector<std::pair<string, string>> extra_flags) {
2589
738
  auto dirs = FsRootDirs(root_dir_, num_drives_);
2590
772
  for (const auto& dir : dirs) {
2591
772
    RETURN_NOT_OK(Env::Default()->CreateDirs(dir));
2592
772
  }
2593
738
  start_cql_proxy_ = start_cql_proxy;
2594
738
  Flags flags;
2595
738
  flags.Add("fs_data_dirs", JoinStrings(dirs, ","));
2596
738
  flags.AddHostPort("rpc_bind_addresses", bind_host_, rpc_port_);
2597
738
  flags.Add("webserver_interface", bind_host_);
2598
738
  flags.Add("webserver_port", http_port_);
2599
738
  flags.Add("redis_proxy_webserver_port", redis_http_port_);
2600
738
  flags.Add("pgsql_proxy_webserver_port", pgsql_http_port_);
2601
738
  flags.Add("cql_proxy_webserver_port", cql_http_port_);
2602
2603
738
  if (set_proxy_addrs) {
2604
738
    flags.AddHostPort("redis_proxy_bind_address", bind_host_, redis_rpc_port_);
2605
738
    flags.AddHostPort("pgsql_proxy_bind_address", bind_host_, pgsql_rpc_port_);
2606
738
    flags.AddHostPort("cql_proxy_bind_address", bind_host_, cql_rpc_port_);
2607
738
  }
2608
2609
738
  flags.Add("start_cql_proxy", start_cql_proxy_);
2610
738
  flags.Add("tserver_master_addrs", master_addrs_);
2611
2612
  // Use conservative number of threads for the mini cluster for unit test env
2613
  // where several unit tests tend to run in parallel.
2614
738
  flags.Add("tablet_server_svc_num_threads", "64");
2615
738
  flags.Add("ts_consensus_svc_num_threads", "20");
2616
2617
4
  for (const auto& flag_value : extra_flags) {
2618
4
    flags.Add(flag_value.first, flag_value.second);
2619
4
  }
2620
2621
738
  RETURN_NOT_OK(StartProcess(flags.value()));
2622
2623
736
  return Status::OK();
2624
738
}
2625
2626
736
Status ExternalTabletServer::BuildServerStateFromInfoPath() {
2627
736
  RETURN_NOT_OK(ExternalDaemon::BuildServerStateFromInfoPath());
2628
736
  if (start_cql_proxy_) {
2629
735
    RETURN_NOT_OK(ExternalDaemon::BuildServerStateFromInfoPath(GetCQLServerInfoPath(),
2630
735
                                                               &cqlserver_status_));
2631
735
  }
2632
736
  return Status::OK();
2633
736
}
2634
2635
12.5k
string ExternalTabletServer::GetCQLServerInfoPath() {
2636
12.5k
  return ExternalDaemon::GetServerInfoPath() + "-cql";
2637
12.5k
}
2638
2639
111k
bool ExternalTabletServer::ServerInfoPathsExist() {
2640
111k
  if (start_cql_proxy_) {
2641
110k
    return ExternalDaemon::ServerInfoPathsExist() &&
2642
11.0k
        Env::Default()->FileExists(GetCQLServerInfoPath());
2643
110k
  }
2644
158
  return ExternalDaemon::ServerInfoPathsExist();
2645
158
}
2646
2647
738
Status ExternalTabletServer::DeleteServerInfoPaths() {
2648
  // We want to try a deletion for both files.
2649
738
  Status s1 = ExternalDaemon::DeleteServerInfoPaths();
2650
738
  Status s2 = Env::Default()->DeleteFile(GetCQLServerInfoPath());
2651
738
  RETURN_NOT_OK(s1);
2652
63
  RETURN_NOT_OK(s2);
2653
63
  return Status::OK();
2654
63
}
2655
2656
Status ExternalTabletServer::Restart(
2657
61
    bool start_cql_proxy, std::vector<std::pair<string, string>> flags) {
2658
61
  LOG_WITH_PREFIX(INFO) << "Restart: start_cql_proxy=" << start_cql_proxy;
2659
61
  if (!IsProcessAlive()) {
2660
    // Make sure this function could be safely called if the process has already crashed.
2661
61
    Shutdown();
2662
61
  }
2663
  // We store the addresses on shutdown so make sure we did that first.
2664
61
  if (bound_rpc_.port() == 0) {
2665
0
    return STATUS(IllegalState, "Tablet server cannot be restarted. Must call Shutdown() first.");
2666
0
  }
2667
61
  return Start(start_cql_proxy, true /* set_proxy_addrs */, flags);
2668
61
}
2669
2670
Result<int64_t> ExternalTabletServer::GetInt64CQLMetric(const MetricEntityPrototype* entity_proto,
2671
                                                        const char* entity_id,
2672
                                                        const MetricPrototype* metric_proto,
2673
0
                                                        const char* value_field) const {
2674
0
  return GetInt64MetricFromHost(
2675
0
      HostPort(bind_host(), cql_http_port()),
2676
0
      entity_proto, entity_id, metric_proto, value_field);
2677
0
}
2678
2679
1
Status ExternalTabletServer::SetNumDrives(uint16_t num_drives) {
2680
1
  if (IsProcessAlive()) {
2681
0
    return STATUS(IllegalState, "Cann't set num drives on running Tablet server. "
2682
0
                                "Must call Shutdown() first.");
2683
0
  }
2684
1
  num_drives_ = num_drives;
2685
1
  data_dirs_ = FsDataDirs(root_dir_, "tserver", num_drives_);
2686
1
  return Status::OK();
2687
1
}
2688
2689
2
Status RestartAllMasters(ExternalMiniCluster* cluster) {
2690
4
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
2691
2
    cluster->master(i)->Shutdown();
2692
2
  }
2693
4
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
2694
2
    RETURN_NOT_OK(cluster->master(i)->Restart());
2695
2
  }
2696
2697
2
  return Status::OK();
2698
2
}
2699
2700
}  // namespace yb