YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/integration-tests/external_mini_cluster.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/external_mini_cluster.h"
34
35
#include <atomic>
36
#include <chrono>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <string>
41
#include <thread>
42
#include <unordered_map>
43
#include <vector>
44
45
#include <glog/logging.h>
46
#include <gtest/gtest.h>
47
#include <rapidjson/document.h>
48
49
#include "yb/client/client.h"
50
51
#include "yb/common/wire_protocol.h"
52
53
#include "yb/consensus/consensus.proxy.h"
54
55
#include "yb/fs/fs_manager.h"
56
57
#include "yb/gutil/algorithm.h"
58
#include "yb/gutil/bind.h"
59
#include "yb/gutil/macros.h"
60
#include "yb/gutil/ref_counted.h"
61
#include "yb/gutil/singleton.h"
62
#include "yb/gutil/strings/join.h"
63
#include "yb/gutil/strings/substitute.h"
64
#include "yb/gutil/strings/util.h"
65
66
#include "yb/integration-tests/cluster_itest_util.h"
67
68
#include "yb/master/master_admin.proxy.h"
69
#include "yb/master/master_cluster.proxy.h"
70
#include "yb/master/master_rpc.h"
71
#include "yb/master/sys_catalog.h"
72
73
#include "yb/rpc/messenger.h"
74
#include "yb/rpc/proxy.h"
75
#include "yb/rpc/rpc_controller.h"
76
77
#include "yb/server/server_base.pb.h"
78
#include "yb/server/server_base.proxy.h"
79
80
#include "yb/tserver/tserver_admin.proxy.h"
81
#include "yb/tserver/tserver_service.proxy.h"
82
83
#include "yb/util/async_util.h"
84
#include "yb/util/curl_util.h"
85
#include "yb/util/env.h"
86
#include "yb/util/faststring.h"
87
#include "yb/util/format.h"
88
#include "yb/util/jsonreader.h"
89
#include "yb/util/logging.h"
90
#include "yb/util/metrics.h"
91
#include "yb/util/monotime.h"
92
#include "yb/util/net/net_fwd.h"
93
#include "yb/util/net/sockaddr.h"
94
#include "yb/util/path_util.h"
95
#include "yb/util/pb_util.h"
96
#include "yb/util/size_literals.h"
97
#include "yb/util/slice.h"
98
#include "yb/util/status_format.h"
99
#include "yb/util/status_log.h"
100
#include "yb/util/stopwatch.h"
101
#include "yb/util/subprocess.h"
102
#include "yb/util/test_util.h"
103
#include "yb/util/tsan_util.h"
104
105
using namespace std::literals;  // NOLINT
106
using namespace yb::size_literals;  // NOLINT
107
108
using std::atomic;
109
using std::lock_guard;
110
using std::mutex;
111
using std::shared_ptr;
112
using std::string;
113
using std::thread;
114
using std::unique_ptr;
115
116
using rapidjson::Value;
117
using strings::Substitute;
118
119
using yb::master::GetLeaderMasterRpc;
120
using yb::master::IsInitDbDoneRequestPB;
121
using yb::master::IsInitDbDoneResponsePB;
122
using yb::server::ServerStatusPB;
123
using yb::tserver::ListTabletsRequestPB;
124
using yb::tserver::ListTabletsResponsePB;
125
using yb::tserver::TabletServerErrorPB;
126
using yb::tserver::TabletServerServiceProxy;
127
using yb::consensus::ConsensusServiceProxy;
128
using yb::consensus::RaftPeerPB;
129
using yb::consensus::ChangeConfigRequestPB;
130
using yb::consensus::ChangeConfigResponsePB;
131
using yb::consensus::ChangeConfigType;
132
using yb::consensus::GetLastOpIdRequestPB;
133
using yb::consensus::GetLastOpIdResponsePB;
134
using yb::consensus::LeaderStepDownRequestPB;
135
using yb::consensus::LeaderStepDownResponsePB;
136
using yb::consensus::RunLeaderElectionRequestPB;
137
using yb::consensus::RunLeaderElectionResponsePB;
138
using yb::master::IsMasterLeaderReadyRequestPB;
139
using yb::master::IsMasterLeaderReadyResponsePB;
140
using yb::master::GetMasterClusterConfigRequestPB;
141
using yb::master::GetMasterClusterConfigResponsePB;
142
using yb::master::ChangeMasterClusterConfigRequestPB;
143
using yb::master::ChangeMasterClusterConfigResponsePB;
144
using yb::master::SysClusterConfigEntryPB;
145
using yb::tserver::ListTabletsForTabletServerRequestPB;
146
using yb::tserver::ListTabletsForTabletServerResponsePB;
147
using yb::master::ListMastersRequestPB;
148
using yb::master::ListMastersResponsePB;
149
using yb::tserver::TabletServerErrorPB;
150
using yb::rpc::RpcController;
151
152
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
153
154
DECLARE_string(vmodule);
155
DECLARE_int32(replication_factor);
156
DECLARE_bool(mem_tracker_logging);
157
DECLARE_bool(mem_tracker_log_stack_trace);
158
DECLARE_string(minicluster_daemon_id);
159
DECLARE_bool(use_libbacktrace);
160
DECLARE_bool(never_fsync);
161
162
DEFINE_string(external_daemon_heap_profile_prefix, "",
163
              "If this is not empty, tcmalloc's HEAPPROFILE is set this, followed by a unique "
164
              "suffix for external mini-cluster daemons.");
165
166
DEFINE_bool(external_daemon_safe_shutdown, false,
167
            "Shutdown external daemons using SIGTERM first. Disabled by default to avoid "
168
            "interfering with kill-testing.");
169
170
DECLARE_int64(outbound_rpc_block_size);
171
DECLARE_int64(outbound_rpc_memory_limit);
172
173
DEFINE_int64(external_mini_cluster_max_log_bytes, 50_MB * 100,
174
             "Max total size of log bytes produced by all external mini-cluster daemons. "
175
             "The test is shut down if this limit is exceeded.");
176
177
namespace yb {
178
179
static const char* const kMasterBinaryName = "yb-master";
180
static const char* const kTabletServerBinaryName = "yb-tserver";
181
static double kProcessStartTimeoutSeconds = 60.0;
182
static MonoDelta kTabletServerRegistrationTimeout = 60s;
183
184
static const int kHeapProfileSignal = SIGUSR1;
185
186
constexpr size_t kDefaultMemoryLimitHardBytes = NonTsanVsTsan(1_GB, 512_MB);
187
188
namespace {
189
190
2.91k
void AddExtraFlagsFromEnvVar(const char* env_var_name, std::vector<std::string>* args_dest) {
191
2.91k
  const char* extra_daemon_flags_env_var_value = getenv(env_var_name);
192
2.91k
  if (extra_daemon_flags_env_var_value) {
193
0
    LOG(INFO) << "Setting extra daemon flags as specified by env var " << env_var_name << ": "
194
0
              << extra_daemon_flags_env_var_value;
195
    // TODO: this has an issue with handling quoted arguments with embedded spaces.
196
0
    std::istringstream iss(extra_daemon_flags_env_var_value);
197
0
    copy(std::istream_iterator<string>(iss),
198
0
         std::istream_iterator<string>(),
199
0
         std::back_inserter(*args_dest));
200
2.91k
  } else {
201
2.91k
    LOG(INFO) << "Env var " << env_var_name << " not specified, not setting extra flags from it";
202
2.91k
  }
203
2.91k
}
204
205
std::vector<std::string> FsRootDirs(const std::string& data_dir,
206
1.35k
                                    uint16_t num_drives) {
207
1.35k
  if (num_drives == 1) {
208
1.32k
    return vector<string>{data_dir};
209
1.32k
  }
210
32
  vector<string> data_dirs;
211
98
  for (int drive =  1; drive <= num_drives; 
++drive66
) {
212
66
    data_dirs.push_back(JoinPathSegments(data_dir, Substitute("d-$0", drive)));
213
66
  }
214
32
  return data_dirs;
215
1.35k
}
216
217
std::vector<std::string> FsDataDirs(const std::string& data_dir,
218
                                    const std::string& server_type,
219
1.29k
                                    uint16_t num_drives) {
220
1.29k
  if (num_drives == 1) {
221
1.26k
    return vector<string>{GetServerTypeDataPath(data_dir, server_type)};
222
1.26k
  }
223
30
  vector<string> data_dirs;
224
91
  for (int drive =  1; drive <= num_drives; 
++drive61
) {
225
61
    data_dirs.push_back(GetServerTypeDataPath(
226
61
                          JoinPathSegments(data_dir, Substitute("d-$0", drive)), server_type));
227
61
  }
228
30
  return data_dirs;
229
1.29k
}
230
231
}  // anonymous namespace
232
233
// ------------------------------------------------------------------------------------------------
234
// ExternalMiniClusterOptions
235
// ------------------------------------------------------------------------------------------------
236
237
7
Status ExternalMiniClusterOptions::RemovePort(const uint16_t port) {
238
7
  auto iter = std::find(master_rpc_ports.begin(), master_rpc_ports.end(), port);
239
240
7
  if (iter == master_rpc_ports.end()) {
241
0
    return STATUS(InvalidArgument, Substitute(
242
0
        "Port to be removed '$0' not found in existing list of $1 masters.",
243
0
        port, num_masters));
244
0
  }
245
246
7
  master_rpc_ports.erase(iter);
247
7
  --num_masters;
248
249
7
  return Status::OK();
250
7
}
251
252
10
Status ExternalMiniClusterOptions::AddPort(const uint16_t port) {
253
10
  auto iter = std::find(master_rpc_ports.begin(), master_rpc_ports.end(), port);
254
255
10
  if (iter != master_rpc_ports.end()) {
256
0
    return STATUS(InvalidArgument, Substitute(
257
0
        "Port to be added '$0' already found in the existing list of $1 masters.",
258
0
        port, num_masters));
259
0
  }
260
261
10
  master_rpc_ports.push_back(port);
262
10
  ++num_masters;
263
264
10
  return Status::OK();
265
10
}
266
267
441
void ExternalMiniClusterOptions::AdjustMasterRpcPorts() {
268
441
  if (master_rpc_ports.size() == 1 && 
master_rpc_ports[0] == 0393
) {
269
    // Add missing master ports to avoid errors when we try to start the cluster.
270
497
    while (master_rpc_ports.size() < num_masters) {
271
105
      master_rpc_ports.push_back(0);
272
105
    }
273
392
  }
274
441
}
275
276
// ------------------------------------------------------------------------------------------------
277
// ExternalMiniCluster
278
// ------------------------------------------------------------------------------------------------
279
280
ExternalMiniCluster::ExternalMiniCluster(const ExternalMiniClusterOptions& opts)
281
441
    : opts_(opts), add_new_master_at_(-1) {
282
441
  opts_.AdjustMasterRpcPorts();
283
  // These "extra mini cluster options" are added in the end of the command line.
284
441
  const auto common_extra_flags = {
285
441
      "--enable_tracing"s,
286
441
      Substitute("--memory_limit_hard_bytes=$0", kDefaultMemoryLimitHardBytes),
287
441
      Substitute("--never_fsync=$0", FLAGS_never_fsync),
288
441
      (opts.log_to_file ? 
"--alsologtostderr"s1
:
"--logtostderr"s440
),
289
441
      (IsTsan() ? 
"--rpc_slow_query_threshold_ms=20000"s0
:
290
441
          "--rpc_slow_query_threshold_ms=10000"s)
291
441
  };
292
882
  for (auto* extra_flags : {&opts_.extra_master_flags, &opts_.extra_tserver_flags}) {
293
    // Common default extra flags are inserted in the beginning so that they can be overridden by
294
    // caller-specified flags.
295
882
    extra_flags->insert(extra_flags->begin(),
296
882
                        common_extra_flags.begin(),
297
882
                        common_extra_flags.end());
298
882
  }
299
441
  AddExtraFlagsFromEnvVar("YB_EXTRA_MASTER_FLAGS", &opts_.extra_master_flags);
300
441
  AddExtraFlagsFromEnvVar("YB_EXTRA_TSERVER_FLAGS", &opts_.extra_tserver_flags);
301
441
}
302
303
345
ExternalMiniCluster::~ExternalMiniCluster() {
304
345
  Shutdown();
305
345
  if (messenger_holder_) {
306
340
    messenger_holder_->Shutdown();
307
340
  }
308
345
}
309
310
440
Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
311
440
  string exe;
312
440
  RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
313
440
  *ret = DirName(exe) + "/../bin";
314
440
  return Status::OK();
315
440
}
316
317
325
std::string ExternalMiniCluster::GetClusterDataDirName() const {
318
325
  if (opts_.cluster_id == "") {
319
325
    return "minicluster-data";
320
325
  }
321
0
  return Format("minicluster-data-$0", opts_.cluster_id);
322
325
}
323
324
440
Status ExternalMiniCluster::HandleOptions() {
325
440
  daemon_bin_path_ = opts_.daemon_bin_path;
326
440
  if (daemon_bin_path_.empty()) {
327
440
    RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_));
328
440
  }
329
330
440
  data_root_ = opts_.data_root;
331
440
  if (data_root_.empty()) {
332
    // If they don't specify a data root, use the current gtest directory.
333
325
    data_root_ = JoinPathSegments(GetTestDataDirectory(), GetClusterDataDirName());
334
335
    // If "data_root_counter" is non-negative, and the auto-generated "data_root_" directory already
336
    // exists, create a subdirectory using the counter value as its name. The caller should maintain
337
    // this counter and increment it for each test run.
338
325
    if (opts_.data_root_counter >= 0) {
339
8
      struct stat sb;
340
8
      if (stat(data_root_.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) {
341
0
        data_root_ = Substitute("$0/$1", data_root_, opts_.data_root_counter);
342
0
        CHECK_EQ(mkdir(data_root_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH), 0);
343
0
      }
344
8
    }
345
325
  }
346
347
440
  return Status::OK();
348
440
}
349
350
440
Status ExternalMiniCluster::Start(rpc::Messenger* messenger) {
351
440
  CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
352
0
      << "). Maybe you meant Restart()?";
353
440
  CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
354
0
      << tablet_servers_.size() << "). Maybe you meant Restart()?";
355
440
  RETURN_NOT_OK(HandleOptions());
356
440
  FLAGS_replication_factor = narrow_cast<int>(opts_.num_masters);
357
358
440
  if (messenger == nullptr) {
359
436
    rpc::MessengerBuilder builder("minicluster-messenger");
360
436
    builder.set_num_reactors(1);
361
436
    messenger_holder_ = VERIFY_RESULT_PREPEND(
362
0
        builder.Build(), "Failed to start Messenger for minicluster");
363
0
    messenger_ = messenger_holder_.get();
364
436
  } else {
365
4
    messenger_holder_ = nullptr;
366
4
    messenger_ = messenger;
367
4
  }
368
440
  proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_);
369
370
440
  RETURN_NOT_OK(Env::Default()->CreateDirs(data_root_));
371
372
440
  LOG(INFO) << "Starting cluster with option bind_to_unique_loopback_addresses="
373
440
      << (opts_.bind_to_unique_loopback_addresses ? 
"true"74
:
"false"366
);
374
375
440
  LOG(INFO) << "Starting " << opts_.num_masters << " masters";
376
440
  RETURN_NOT_OK_PREPEND(StartMasters(), "Failed to start masters.");
377
440
  add_new_master_at_ = opts_.num_masters;
378
379
440
  if (opts_.num_tablet_servers > 0) {
380
424
    LOG(INFO) << "Starting " << opts_.num_tablet_servers << " tablet servers";
381
382
1.67k
    for (size_t i = 1; i <= opts_.num_tablet_servers; 
i++1.24k
) {
383
1.26k
      RETURN_NOT_OK_PREPEND(
384
1.26k
          AddTabletServer(ExternalMiniClusterOptions::kDefaultStartCqlProxy),
385
1.26k
          Substitute("Failed starting tablet server $0", i));
386
1.24k
    }
387
411
    RETURN_NOT_OK(WaitForTabletServerCount(
388
411
        opts_.num_tablet_servers, kTabletServerRegistrationTimeout));
389
411
  } else {
390
16
    LOG(INFO) << "No need to start tablet servers";
391
16
  }
392
393
426
  running_ = true;
394
426
  return Status::OK();
395
440
}
396
397
694
void ExternalMiniCluster::Shutdown(NodeSelectionMode mode) {
398
  // TODO: in the regular MiniCluster Shutdown is a no-op if running_ is false.
399
  // Therefore, in case of an error during cluster startup behavior might be different.
400
694
  if (mode == ALL) {
401
1.02k
    for (const scoped_refptr<ExternalMaster>& master : masters_) {
402
1.02k
      if (master) {
403
1.02k
        master->Shutdown();
404
1.02k
      }
405
1.02k
    }
406
693
  }
407
408
1.95k
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
409
1.95k
    ts->Shutdown();
410
1.95k
  }
411
694
  running_ = false;
412
694
}
413
414
2
Status ExternalMiniCluster::Restart() {
415
2
  LOG(INFO) << "Restarting cluster with " << masters_.size() << " masters.";
416
5
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
417
5
    if (master && master->IsShutdown()) {
418
4
      RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " +
419
4
                                               master->bound_rpc_hostport().ToString());
420
4
    }
421
5
  }
422
423
3
  
for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_)2
{
424
3
    if (ts->IsShutdown()) {
425
3
      RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " +
426
3
                                           ts->bound_rpc_hostport().ToString());
427
3
    }
428
3
  }
429
430
2
  RETURN_NOT_OK(WaitForTabletServerCount(tablet_servers_.size(), kTabletServerRegistrationTimeout));
431
432
2
  running_ = true;
433
2
  return Status::OK();
434
2
}
435
436
1.74k
string ExternalMiniCluster::GetBinaryPath(const string& binary) const {
437
1.74k
  CHECK(!daemon_bin_path_.empty());
438
1.74k
  string default_path = JoinPathSegments(daemon_bin_path_, binary);
439
1.74k
  if (Env::Default()->FileExists(default_path)) {
440
1.74k
    return default_path;
441
1.74k
  }
442
443
  // In CLion-based builds we sometimes have to look for the binary in other directories.
444
0
  string alternative_dir;
445
0
  if (binary == "yb-master") {
446
0
    alternative_dir = "master";
447
0
  } else if (binary == "yb-tserver") {
448
0
    alternative_dir = "tserver";
449
0
  } else {
450
0
    LOG(WARNING) << "Default path " << default_path << " for binary " << binary <<
451
0
      " does not exist, and no alternative directory is available for this binary";
452
0
    return default_path;
453
0
  }
454
455
0
  string alternative_path = JoinPathSegments(daemon_bin_path_,
456
0
    "../" + alternative_dir + "/" + binary);
457
0
  if (Env::Default()->FileExists(alternative_path)) {
458
0
    LOG(INFO) << "Default path " << default_path << " for binary " << binary <<
459
0
      " does not exist, using alternative location: " << alternative_path;
460
0
    return alternative_path;
461
0
  } else {
462
0
    LOG(WARNING) << "Neither " << default_path << " nor " << alternative_path << " exist";
463
0
    return default_path;
464
0
  }
465
0
}
466
467
1.94k
string ExternalMiniCluster::GetDataPath(const string& daemon_id) const {
468
1.94k
  CHECK(!data_root_.empty());
469
1.94k
  return JoinPathSegments(data_root_, daemon_id);
470
1.94k
}
471
472
namespace {
473
1.93k
vector<string> SubstituteInFlags(const vector<string>& orig_flags, size_t index) {
474
1.93k
  string str_index = std::to_string(index);
475
1.93k
  vector<string> ret;
476
19.6k
  for (const string& orig : orig_flags) {
477
19.6k
    ret.push_back(StringReplace(orig, "${index}", str_index, true));
478
19.6k
  }
479
1.93k
  return ret;
480
1.93k
}
481
482
}  // anonymous namespace
483
484
1
Result<ExternalMaster *> ExternalMiniCluster::StartMasterWithPeers(const string& peer_addrs) {
485
1
  uint16_t rpc_port = AllocateFreePort();
486
1
  uint16_t http_port = AllocateFreePort();
487
1
  LOG(INFO) << "Using auto-assigned rpc_port " << rpc_port << "; http_port " << http_port
488
1
            << " to start a new external mini-cluster master with peers '" << peer_addrs << "'.";
489
490
1
  string addr = MasterAddressForPort(rpc_port);
491
1
  string exe = GetBinaryPath(kMasterBinaryName);
492
493
1
  ExternalMaster* master =
494
1
      new ExternalMaster(add_new_master_at_, messenger_, proxy_cache_.get(), exe,
495
1
                         GetDataPath(Substitute("master-$0", add_new_master_at_)),
496
1
                         opts_.extra_master_flags, addr, http_port, peer_addrs);
497
498
1
  RETURN_NOT_OK(master->Start());
499
500
1
  add_new_master_at_++;
501
1
  return master;
502
1
}
503
504
777
std::string ExternalMiniCluster::MasterAddressForPort(uint16_t port) const {
505
777
  return Format(opts_.use_even_ips ? 
"127.0.0.2:$0"7
:
"127.0.0.1:$0"770
, port);
506
777
}
507
508
10
void ExternalMiniCluster::StartShellMaster(ExternalMaster** new_master) {
509
10
  uint16_t rpc_port = AllocateFreePort();
510
10
  uint16_t http_port = AllocateFreePort();
511
10
  LOG(INFO) << "Using auto-assigned rpc_port " << rpc_port << "; http_port " << http_port
512
10
            << " to start a new external mini-cluster shell master.";
513
514
10
  string addr = MasterAddressForPort(rpc_port);
515
516
10
  string exe = GetBinaryPath(kMasterBinaryName);
517
518
10
  ExternalMaster* master = new ExternalMaster(
519
10
      add_new_master_at_,
520
10
      messenger_,
521
10
      proxy_cache_.get(),
522
10
      exe,
523
10
      GetDataPath(Substitute("master-$0", add_new_master_at_)),
524
10
      opts_.extra_master_flags,
525
10
      addr,
526
10
      http_port,
527
10
      "");
528
529
10
  Status s = master->Start(true);
530
531
10
  if (!s.ok()) {
532
0
    LOG(FATAL) << Substitute("Unable to start 'shell' mode master at index $0, due to error $1.",
533
0
                             add_new_master_at_, s.ToString());
534
0
  }
535
536
10
  add_new_master_at_++;
537
10
  *new_master = master;
538
10
}
539
540
17
Status ExternalMiniCluster::CheckPortAndMasterSizes() const {
541
17
  if (opts_.num_masters != masters_.size() ||
542
17
      opts_.num_masters != opts_.master_rpc_ports.size()) {
543
0
    string fatal_err_msg = Substitute(
544
0
        "Mismatch number of masters in options $0, compared to masters vector $1 or rpc ports $2",
545
0
        opts_.num_masters, masters_.size(), opts_.master_rpc_ports.size());
546
0
    LOG(FATAL) << fatal_err_msg;
547
0
  }
548
549
17
  return Status::OK();
550
17
}
551
552
10
Status ExternalMiniCluster::AddMaster(ExternalMaster* master) {
553
10
  auto iter = std::find_if(masters_.begin(), masters_.end(), MasterComparator(master));
554
555
10
  if (iter != masters_.end()) {
556
0
    return STATUS(InvalidArgument, Substitute(
557
0
        "Master to be added '$0' already found in existing list of $1 masters.",
558
0
        master->bound_rpc_hostport().ToString(), opts_.num_masters));
559
0
  }
560
561
10
  RETURN_NOT_OK(opts_.AddPort(master->bound_rpc_hostport().port()));
562
10
  masters_.push_back(master);
563
564
10
  RETURN_NOT_OK(CheckPortAndMasterSizes());
565
566
10
  return Status::OK();
567
10
}
568
569
7
Status ExternalMiniCluster::RemoveMaster(ExternalMaster* master) {
570
7
  auto iter = std::find_if(masters_.begin(), masters_.end(), MasterComparator(master));
571
572
7
  if (iter == masters_.end()) {
573
0
    return STATUS(InvalidArgument, Substitute(
574
0
        "Master to be removed '$0' not found in existing list of $1 masters.",
575
0
        master->bound_rpc_hostport().ToString(), opts_.num_masters));
576
0
  }
577
578
7
  RETURN_NOT_OK(opts_.RemovePort(master->bound_rpc_hostport().port()));
579
7
  masters_.erase(iter);
580
581
7
  RETURN_NOT_OK(CheckPortAndMasterSizes());
582
583
7
  return Status::OK();
584
7
}
585
586
0
ConsensusServiceProxy ExternalMiniCluster::GetLeaderConsensusProxy() {
587
0
  return GetConsensusProxy(GetLeaderMaster());
588
0
}
589
590
155
ConsensusServiceProxy ExternalMiniCluster::GetConsensusProxy(ExternalDaemon* external_deamon) {
591
155
  return GetProxy<ConsensusServiceProxy>(external_deamon);
592
155
}
593
594
10
Status ExternalMiniCluster::StepDownMasterLeader(TabletServerErrorPB::Code* error_code) {
595
10
  ExternalMaster* leader = GetLeaderMaster();
596
10
  string leader_uuid = leader->uuid();
597
10
  auto host_port = leader->bound_rpc_addr();
598
10
  LeaderStepDownRequestPB lsd_req;
599
10
  lsd_req.set_tablet_id(yb::master::kSysCatalogTabletId);
600
10
  lsd_req.set_dest_uuid(leader_uuid);
601
10
  LeaderStepDownResponsePB lsd_resp;
602
10
  RpcController lsd_rpc;
603
10
  lsd_rpc.set_timeout(opts_.timeout);
604
10
  ConsensusServiceProxy proxy(proxy_cache_.get(), host_port);
605
10
  RETURN_NOT_OK(proxy.LeaderStepDown(lsd_req, &lsd_resp, &lsd_rpc));
606
10
  if (lsd_resp.has_error()) {
607
2
    LOG(ERROR) << "LeaderStepDown for " << leader_uuid << " received error "
608
2
               << lsd_resp.error().ShortDebugString();
609
2
    *error_code = lsd_resp.error().code();
610
2
    return StatusFromPB(lsd_resp.error().status());
611
2
  }
612
613
8
  LOG(INFO) << "Leader at host/port '" << host_port << "' step down complete.";
614
615
8
  return Status::OK();
616
10
}
617
618
5
Status ExternalMiniCluster::StepDownMasterLeaderAndWaitForNewLeader() {
619
5
  ExternalMaster* leader = GetLeaderMaster();
620
5
  string old_leader_uuid = leader->uuid();
621
5
  string leader_uuid = old_leader_uuid;
622
5
  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
623
5
  LOG(INFO) << "Starting step down of leader " << leader->bound_rpc_addr();
624
625
  // while loop will not be needed once JIRA ENG-49 is fixed.
626
5
  int iter = 1;
627
12
  while (leader_uuid == old_leader_uuid) {
628
7
    Status s = StepDownMasterLeader(&error_code);
629
    // If step down hits any error except not-ready, exit.
630
7
    if (!s.ok() && 
error_code != TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN2
) {
631
0
      return s;
632
0
    }
633
7
    sleep(3);  // TODO: add wait for election api.
634
7
    leader = GetLeaderMaster();
635
7
    leader_uuid = leader->uuid();
636
7
    LOG(INFO) << "Got new leader " << leader->bound_rpc_addr() << ", iter=" << iter;
637
7
    iter++;
638
7
  }
639
640
5
  return Status::OK();
641
5
}
642
643
Status ExternalMiniCluster::ChangeConfig(ExternalMaster* master,
644
                                         ChangeConfigType type,
645
                                         consensus::PeerMemberType member_type,
646
17
                                         bool use_hostport) {
647
17
  if (type != consensus::ADD_SERVER && 
type != consensus::REMOVE_SERVER7
) {
648
0
    return STATUS(InvalidArgument, Substitute("Invalid Change Config type $0", type));
649
0
  }
650
651
17
  ChangeConfigRequestPB req;
652
17
  ChangeConfigResponsePB resp;
653
17
  rpc::RpcController rpc;
654
17
  rpc.set_timeout(opts_.timeout);
655
656
17
  RaftPeerPB peer_pb;
657
17
  peer_pb.set_permanent_uuid(use_hostport ? 
""1
:
master->uuid()16
);
658
17
  if (type == consensus::ADD_SERVER) {
659
10
    peer_pb.set_member_type(member_type);
660
10
  }
661
17
  HostPortToPB(master->bound_rpc_hostport(), peer_pb.mutable_last_known_private_addr()->Add());
662
17
  req.set_tablet_id(yb::master::kSysCatalogTabletId);
663
17
  req.set_type(type);
664
17
  req.set_use_host(use_hostport);
665
17
  *req.mutable_server() = peer_pb;
666
667
  // There could be timing window where we found the leader host/port, but an election in the
668
  // meantime could have made it step down. So we retry till we hit the leader correctly.
669
17
  int num_attempts = 1;
670
18
  while (true) {
671
18
    ExternalMaster* leader = GetLeaderMaster();
672
18
    auto leader_proxy = std::make_unique<ConsensusServiceProxy>(
673
18
        proxy_cache_.get(), leader->bound_rpc_addr());
674
18
    string leader_uuid = leader->uuid();
675
676
18
    if (type == consensus::REMOVE_SERVER && 
leader_uuid == req.server().permanent_uuid()8
) {
677
4
      RETURN_NOT_OK(StepDownMasterLeaderAndWaitForNewLeader());
678
4
      leader = GetLeaderMaster();
679
4
      leader_uuid = leader->uuid();
680
4
      leader_proxy.reset(new ConsensusServiceProxy(proxy_cache_.get(), leader->bound_rpc_addr()));
681
4
    }
682
683
18
    req.set_dest_uuid(leader_uuid);
684
18
    RETURN_NOT_OK(leader_proxy->ChangeConfig(req, &resp, &rpc));
685
18
    if (resp.has_error()) {
686
1
      if (resp.error().code() != TabletServerErrorPB::NOT_THE_LEADER &&
687
1
          resp.error().code() != TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG) {
688
0
        return STATUS(RuntimeError, Substitute("Change Config RPC to leader hit error: $0",
689
0
                                               resp.error().ShortDebugString()));
690
0
      }
691
17
    } else {
692
17
      break;
693
17
    }
694
695
    // Need to retry as we come here with NOT_THE_LEADER.
696
1
    if (num_attempts >= kMaxRetryIterations) {
697
0
      return STATUS(IllegalState,
698
0
                    Substitute("Failed to complete ChangeConfig request '$0' even after maximum "
699
0
                               "number of attempts. Last error '$1'",
700
0
                               req.ShortDebugString(), resp.error().ShortDebugString()));
701
0
    }
702
703
1
    SleepFor(MonoDelta::FromSeconds(1));
704
705
1
    LOG(INFO) << "Resp error '" << resp.error().ShortDebugString() << "', num=" << num_attempts
706
1
              << ", retrying...";
707
708
1
    rpc.Reset();
709
1
    num_attempts++;
710
1
  }
711
712
17
  LOG(INFO) << "Master " << master->bound_rpc_hostport().ToString() << ", change type "
713
17
            << type << " to " << masters_.size() << " masters.";
714
715
17
  if (type == consensus::ADD_SERVER) {
716
10
    return AddMaster(master);
717
10
  } else 
if (7
type == consensus::REMOVE_SERVER7
) {
718
7
    return RemoveMaster(master);
719
7
  }
720
721
0
  string err_msg = Substitute("Should not reach here - change type $0", type);
722
723
0
  LOG(FATAL) << err_msg;
724
725
  // Satisfy the compiler with a return from here
726
0
  return STATUS(RuntimeError, err_msg);
727
17
}
728
729
// We look for the exact master match. Since it is possible to stop/restart master on
730
// a given host/port, we do not want a stale master pointer input to match a newer master.
731
161
int ExternalMiniCluster::GetIndexOfMaster(ExternalMaster* master) const {
732
199
  for (size_t i = 0; i < masters_.size(); 
i++38
) {
733
199
    if (masters_[i].get() == master) {
734
161
      return narrow_cast<int>(i);
735
161
    }
736
199
  }
737
0
  return -1;
738
161
}
739
740
1
Status ExternalMiniCluster::PingMaster(ExternalMaster* master) const {
741
1
  int index = GetIndexOfMaster(master);
742
1
  server::PingRequestPB req;
743
1
  server::PingResponsePB resp;
744
1
  std::shared_ptr<server::GenericServiceProxy> proxy =
745
1
      index == -1 ? 
master_generic_proxy(master->bound_rpc_addr())0
: master_generic_proxy(index);
746
1
  rpc::RpcController rpc;
747
1
  rpc.set_timeout(opts_.timeout);
748
1
  return proxy->Ping(req, &resp, &rpc);
749
1
}
750
751
Status ExternalMiniCluster::AddTServerToBlacklist(
752
    ExternalMaster* master,
753
5
    ExternalTabletServer* ts) {
754
5
  GetMasterClusterConfigRequestPB config_req;
755
5
  GetMasterClusterConfigResponsePB config_resp;
756
5
  int index = GetIndexOfMaster(master);
757
758
5
  if (index == -1) {
759
0
    return STATUS(InvalidArgument, Substitute(
760
0
        "Given master '$0' not in the current list of $1 masters.",
761
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
762
0
  }
763
764
5
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
765
5
  rpc::RpcController rpc;
766
5
  rpc.set_timeout(opts_.timeout);
767
5
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
768
5
  if (config_resp.has_error()) {
769
0
    return STATUS(RuntimeError, Substitute(
770
0
        "GetMasterClusterConfig RPC response hit error: $0",
771
0
        config_resp.error().ShortDebugString()));
772
0
  }
773
  // Get current config
774
5
  ChangeMasterClusterConfigRequestPB change_req;
775
5
  SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config();
776
  // add tserver to blacklist
777
5
  HostPortToPB(ts->bound_rpc_hostport(), config.mutable_server_blacklist()->mutable_hosts()->Add());
778
5
  *change_req.mutable_cluster_config() = config;
779
5
  ChangeMasterClusterConfigResponsePB change_resp;
780
5
  rpc.Reset();
781
5
  RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc));
782
5
  if (change_resp.has_error()) {
783
0
    return STATUS(RuntimeError, Substitute(
784
0
        "ChangeMasterClusterConfig RPC response hit error: $0",
785
0
        change_resp.error().ShortDebugString()));
786
0
  }
787
788
5
  LOG(INFO) << "TServer at " << ts->bound_rpc_hostport().ToString()
789
5
  << " was added to the blacklist";
790
791
5
  return Status::OK();
792
5
}
793
794
Status ExternalMiniCluster::GetMinReplicaCountForPlacementBlock(
795
    ExternalMaster* master,
796
    const string& cloud, const string& region, const string& zone,
797
1
    int* min_num_replicas) {
798
1
  GetMasterClusterConfigRequestPB config_req;
799
1
  GetMasterClusterConfigResponsePB config_resp;
800
1
  int index = GetIndexOfMaster(master);
801
802
1
  if (index == -1) {
803
0
    return STATUS(InvalidArgument, Substitute(
804
0
        "Given master '$0' not in the current list of $1 masters.",
805
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
806
0
  }
807
808
1
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
809
1
  rpc::RpcController rpc;
810
1
  rpc.set_timeout(opts_.timeout);
811
1
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
812
1
  if (config_resp.has_error()) {
813
0
    return STATUS(RuntimeError, Substitute(
814
0
        "GetMasterClusterConfig RPC response hit error: $0",
815
0
        config_resp.error().ShortDebugString()));
816
0
  }
817
1
  const SysClusterConfigEntryPB& config = config_resp.cluster_config();
818
819
1
  if (!config.has_replication_info() || !config.replication_info().has_live_replicas()) {
820
0
    return STATUS(InvalidArgument, Substitute(
821
0
        "Given placement block '$0.$1.$2' not in the current list of placement blocks.",
822
0
        cloud, region, zone));
823
0
  }
824
825
1
  const master::PlacementInfoPB& pi = config.replication_info().live_replicas();
826
827
1
  int found_index = -1;
828
1
  bool found = false;
829
1
  for (int i = 0; i < pi.placement_blocks_size(); 
i++0
) {
830
1
    if (!pi.placement_blocks(i).has_cloud_info()) {
831
0
      continue;
832
0
    }
833
834
1
    bool is_cloud_same = false, is_region_same = false, is_zone_same = false;
835
836
1
    if (pi.placement_blocks(i).cloud_info().has_placement_cloud() && cloud != "") {
837
1
      is_cloud_same = pi.placement_blocks(i).cloud_info().placement_cloud() == cloud;
838
1
    } else 
if (0
!pi.placement_blocks(i).cloud_info().has_placement_cloud()0
&&
cloud == ""0
) {
839
0
      is_cloud_same = true;
840
0
    }
841
842
1
    if (pi.placement_blocks(i).cloud_info().has_placement_region() && region != "") {
843
1
      is_region_same = pi.placement_blocks(i).cloud_info().placement_region() == region;
844
1
    } else 
if (0
!pi.placement_blocks(i).cloud_info().has_placement_region()0
&&
region == ""0
) {
845
0
      is_region_same = true;
846
0
    }
847
848
1
    if (pi.placement_blocks(i).cloud_info().has_placement_zone() && 
zone != ""0
) {
849
0
      is_zone_same = pi.placement_blocks(i).cloud_info().placement_zone() == zone;
850
1
    } else if (!pi.placement_blocks(i).cloud_info().has_placement_zone() && zone == "") {
851
1
      is_zone_same = true;
852
1
    }
853
854
1
    if (is_cloud_same && is_region_same && is_zone_same) {
855
1
      found = true;
856
1
      found_index = i;
857
1
      break;
858
1
    }
859
1
  }
860
861
1
  if (!found || !pi.placement_blocks(found_index).has_min_num_replicas()) {
862
0
    return STATUS(InvalidArgument, Substitute(
863
0
        "Given placement block '$0.$1.$2' not in the current list of placement blocks.",
864
0
        cloud, region, zone));
865
0
  }
866
867
1
  *min_num_replicas = pi.placement_blocks(found_index).min_num_replicas();
868
1
  return Status::OK();
869
1
}
870
871
Status ExternalMiniCluster::AddTServerToLeaderBlacklist(
872
    ExternalMaster* master,
873
3
    ExternalTabletServer* ts) {
874
3
  GetMasterClusterConfigRequestPB config_req;
875
3
  GetMasterClusterConfigResponsePB config_resp;
876
3
  int index = GetIndexOfMaster(master);
877
878
3
  if (index == -1) {
879
0
    return STATUS(InvalidArgument, Substitute(
880
0
        "Given master '$0' not in the current list of $1 masters.",
881
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
882
0
  }
883
884
3
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
885
3
  rpc::RpcController rpc;
886
3
  rpc.set_timeout(opts_.timeout);
887
3
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
888
3
  if (config_resp.has_error()) {
889
0
    return STATUS(RuntimeError, Substitute(
890
0
        "GetMasterClusterConfig RPC response hit error: $0",
891
0
        config_resp.error().ShortDebugString()));
892
0
  }
893
  // Get current config
894
3
  ChangeMasterClusterConfigRequestPB change_req;
895
3
  SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config();
896
  // add tserver to blacklist
897
3
  HostPortToPB(ts->bound_rpc_hostport(), config.mutable_leader_blacklist()->mutable_hosts()->Add());
898
3
  *change_req.mutable_cluster_config() = config;
899
3
  ChangeMasterClusterConfigResponsePB change_resp;
900
3
  rpc.Reset();
901
3
  RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc));
902
3
  if (change_resp.has_error()) {
903
0
    return STATUS(RuntimeError, Substitute(
904
0
        "ChangeMasterClusterConfig RPC response hit error: $0",
905
0
        change_resp.error().ShortDebugString()));
906
0
  }
907
908
3
  LOG(INFO) << "TServer at " << ts->bound_rpc_hostport().ToString()
909
3
  << " was added to the leader blacklist";
910
911
3
  return Status::OK();
912
3
}
913
914
Status ExternalMiniCluster::ClearBlacklist(
915
3
    ExternalMaster* master) {
916
3
  GetMasterClusterConfigRequestPB config_req;
917
3
  GetMasterClusterConfigResponsePB config_resp;
918
3
  int index = GetIndexOfMaster(master);
919
920
3
  if (index == -1) {
921
0
    return STATUS(InvalidArgument, Substitute(
922
0
        "Given master '$0' not in the current list of $1 masters.",
923
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
924
0
  }
925
926
3
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
927
3
  rpc::RpcController rpc;
928
3
  rpc.set_timeout(opts_.timeout);
929
3
  RETURN_NOT_OK(proxy.GetMasterClusterConfig(config_req, &config_resp, &rpc));
930
3
  if (config_resp.has_error()) {
931
0
    return STATUS(RuntimeError, Substitute(
932
0
        "GetMasterClusterConfig RPC response hit error: $0",
933
0
        config_resp.error().ShortDebugString()));
934
0
  }
935
  // Get current config.
936
3
  ChangeMasterClusterConfigRequestPB change_req;
937
3
  SysClusterConfigEntryPB config = *config_resp.mutable_cluster_config();
938
  // Clear blacklist.
939
3
  config.mutable_server_blacklist()->mutable_hosts()->Clear();
940
3
  config.mutable_leader_blacklist()->mutable_hosts()->Clear();
941
3
  *change_req.mutable_cluster_config() = config;
942
3
  ChangeMasterClusterConfigResponsePB change_resp;
943
3
  rpc.Reset();
944
3
  RETURN_NOT_OK(proxy.ChangeMasterClusterConfig(change_req, &change_resp, &rpc));
945
3
  if (change_resp.has_error()) {
946
0
    return STATUS(RuntimeError, Substitute(
947
0
        "ChangeMasterClusterConfig RPC response hit error: $0",
948
0
        change_resp.error().ShortDebugString()));
949
0
  }
950
951
3
  LOG(INFO) << "Blacklist cleared successfully";
952
953
3
  return Status::OK();
954
3
}
955
956
23
Status ExternalMiniCluster::GetNumMastersAsSeenBy(ExternalMaster* master, int* num_peers) {
957
23
  ListMastersRequestPB list_req;
958
23
  ListMastersResponsePB list_resp;
959
23
  int index = GetIndexOfMaster(master);
960
961
23
  if (index == -1) {
962
0
    return STATUS(InvalidArgument, Substitute(
963
0
        "Given master '$0' not in the current list of $1 masters.",
964
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
965
0
  }
966
967
23
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
968
23
  rpc::RpcController rpc;
969
23
  rpc.set_timeout(opts_.timeout);
970
23
  RETURN_NOT_OK(proxy.ListMasters(list_req, &list_resp, &rpc));
971
23
  if (list_resp.has_error()) {
972
0
    return STATUS(RuntimeError, Substitute(
973
0
        "List Masters RPC response hit error: $0", list_resp.error().ShortDebugString()));
974
0
  }
975
976
23
  LOG(INFO) << "List Masters for master at index " << index
977
23
            << " got " << list_resp.masters_size() << " peers";
978
979
23
  *num_peers = list_resp.masters_size();
980
981
23
  return Status::OK();
982
23
}
983
984
13
Status ExternalMiniCluster::WaitForLeaderCommitTermAdvance() {
985
13
  OpIdPB start_opid;
986
13
  RETURN_NOT_OK(GetLastOpIdForLeader(&start_opid));
987
13
  LOG(INFO) << "Start OPID : " << start_opid.ShortDebugString();
988
989
  // Need not do any wait if it is a restart case - so the commit term will be > 0.
990
13
  if (start_opid.term() != 0)
991
13
    return Status::OK();
992
993
0
  MonoTime now = MonoTime::Now();
994
0
  MonoTime deadline = now;
995
0
  deadline.AddDelta(opts_.timeout);
996
0
  auto opid = start_opid;
997
998
0
  for (int i = 1; now.ComesBefore(deadline); ++i) {
999
0
    if (opid.term() > start_opid.term()) {
1000
0
      LOG(INFO) << "Final OPID: " << opid.ShortDebugString() << " after "
1001
0
                << i << " iterations.";
1002
1003
0
      return Status::OK();
1004
0
    }
1005
0
    SleepFor(MonoDelta::FromMilliseconds(min(i, 10)));
1006
0
    RETURN_NOT_OK(GetLastOpIdForLeader(&opid));
1007
0
    now = MonoTime::Now();
1008
0
  }
1009
1010
0
  return STATUS(TimedOut, Substitute("Term did not advance from $0.", start_opid.term()));
1011
0
}
1012
1013
Status ExternalMiniCluster::GetLastOpIdForEachMasterPeer(
1014
    const MonoDelta& timeout,
1015
    consensus::OpIdType opid_type,
1016
44
    vector<OpIdPB>* op_ids) {
1017
44
  GetLastOpIdRequestPB opid_req;
1018
44
  GetLastOpIdResponsePB opid_resp;
1019
44
  opid_req.set_tablet_id(yb::master::kSysCatalogTabletId);
1020
44
  RpcController controller;
1021
44
  controller.set_timeout(timeout);
1022
1023
44
  op_ids->clear();
1024
154
  for (scoped_refptr<ExternalMaster> master : masters_) {
1025
154
    opid_req.set_dest_uuid(master->uuid());
1026
154
    opid_req.set_opid_type(opid_type);
1027
154
    RETURN_NOT_OK_PREPEND(
1028
154
        GetConsensusProxy(master.get()).GetLastOpId(opid_req, &opid_resp, &controller),
1029
154
        Substitute("Failed to fetch last op id from $0", master->bound_rpc_hostport().port()));
1030
154
    op_ids->push_back(opid_resp.opid());
1031
154
    controller.Reset();
1032
154
  }
1033
1034
44
  return Status::OK();
1035
44
}
1036
1037
24
Status ExternalMiniCluster::WaitForMastersToCommitUpTo(int64_t target_index) {
1038
24
  auto deadline = CoarseMonoClock::Now() + opts_.timeout.ToSteadyDuration();
1039
1040
44
  for (int i = 1;; 
i++20
) {
1041
44
    vector<OpIdPB> ids;
1042
44
    Status s = GetLastOpIdForEachMasterPeer(opts_.timeout, consensus::COMMITTED_OPID, &ids);
1043
1044
44
    if (s.ok()) {
1045
44
      bool any_behind = false;
1046
109
      for (const auto& id : ids) {
1047
109
        if (id.index() < target_index) {
1048
20
          any_behind = true;
1049
20
          break;
1050
20
        }
1051
109
      }
1052
44
      if (!any_behind) {
1053
24
        LOG(INFO) << "Committed up to " << target_index;
1054
24
        return Status::OK();
1055
24
      }
1056
44
    } else {
1057
0
      LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString();
1058
0
    }
1059
1060
20
    if (CoarseMonoClock::Now() >= deadline) {
1061
0
      if (!s.ok()) {
1062
0
        return s;
1063
0
      }
1064
1065
0
      return STATUS_FORMAT(TimedOut,
1066
0
                           "Index $0 not available on all replicas after $1. ",
1067
0
                           target_index,
1068
0
                           opts_.timeout);
1069
0
    }
1070
1071
20
    SleepFor(MonoDelta::FromMilliseconds(min(i * 100, 1000)));
1072
20
  }
1073
24
}
1074
1075
125
Status ExternalMiniCluster::GetIsMasterLeaderServiceReady(ExternalMaster* master) {
1076
125
  IsMasterLeaderReadyRequestPB req;
1077
125
  IsMasterLeaderReadyResponsePB resp;
1078
125
  int index = GetIndexOfMaster(master);
1079
1080
125
  if (index == -1) {
1081
0
    return STATUS(InvalidArgument, Substitute(
1082
0
        "Given master '$0' not in the current list of $1 masters.",
1083
0
        master->bound_rpc_hostport().ToString(), masters_.size()));
1084
0
  }
1085
1086
125
  auto proxy = GetMasterProxy<master::MasterClusterProxy>(index);
1087
125
  rpc::RpcController rpc;
1088
125
  rpc.set_timeout(opts_.timeout);
1089
125
  RETURN_NOT_OK(proxy.IsMasterLeaderServiceReady(req, &resp, &rpc));
1090
125
  if (resp.has_error()) {
1091
122
    return STATUS(RuntimeError, Substitute(
1092
122
        "Is master ready RPC response hit error: $0", resp.error().ShortDebugString()));
1093
122
  }
1094
1095
3
  return Status::OK();
1096
125
}
1097
1098
42
Status ExternalMiniCluster::GetLastOpIdForLeader(OpIdPB* opid) {
1099
42
  ExternalMaster* leader = GetLeaderMaster();
1100
42
  auto leader_master_sock = leader->bound_rpc_addr();
1101
42
  std::shared_ptr<ConsensusServiceProxy> leader_proxy =
1102
42
    std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), leader_master_sock);
1103
1104
42
  RETURN_NOT_OK(itest::GetLastOpIdForMasterReplica(
1105
42
      leader_proxy,
1106
42
      yb::master::kSysCatalogTabletId,
1107
42
      leader->uuid(),
1108
42
      consensus::COMMITTED_OPID,
1109
42
      opts_.timeout,
1110
42
      opid));
1111
1112
42
  return Status::OK();
1113
42
}
1114
1115
61
string ExternalMiniCluster::GetMasterAddresses() const {
1116
61
  string peer_addrs = "";
1117
186
  for (size_t i = 0; i < opts_.num_masters; 
i++125
) {
1118
125
    if (!peer_addrs.empty()) {
1119
64
      peer_addrs += ",";
1120
64
    }
1121
125
    peer_addrs += MasterAddressForPort(opts_.master_rpc_ports[i]);
1122
125
  }
1123
61
  return peer_addrs;
1124
61
}
1125
1126
0
string ExternalMiniCluster::GetTabletServerAddresses() const {
1127
0
  string peer_addrs = "";
1128
0
  for (const auto& ts : tablet_servers_) {
1129
0
    if (!peer_addrs.empty()) {
1130
0
      peer_addrs += ",";
1131
0
    }
1132
0
    peer_addrs += HostPortToString(ts->bind_host(), ts->rpc_port());
1133
0
  }
1134
0
  return peer_addrs;
1135
0
}
1136
1137
0
string ExternalMiniCluster::GetTabletServerHTTPAddresses() const {
1138
0
  string peer_addrs = "";
1139
0
  for (const auto& ts : tablet_servers_) {
1140
0
    if (!peer_addrs.empty()) {
1141
0
      peer_addrs += ",";
1142
0
    }
1143
0
    peer_addrs += HostPortToString(ts->bind_host(), ts->http_port());
1144
0
  }
1145
0
  return peer_addrs;
1146
0
}
1147
1148
440
Status ExternalMiniCluster::StartMasters() {
1149
440
  auto num_masters = opts_.num_masters;
1150
1151
440
  if (opts_.master_rpc_ports.size() != num_masters) {
1152
0
    LOG(FATAL) << num_masters << " masters requested, but " <<
1153
0
        opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
1154
0
  }
1155
1156
641
  for (auto& port : opts_.master_rpc_ports) {
1157
641
    if (port == 0) {
1158
640
      port = AllocateFreePort();
1159
640
      LOG(INFO) << "Using an auto-assigned port " << port
1160
640
                << " to start an external mini-cluster master";
1161
640
    }
1162
641
  }
1163
1164
440
  vector<string> peer_addrs;
1165
1.08k
  for (size_t i = 0; i < num_masters; 
i++641
) {
1166
641
    string addr = MasterAddressForPort(opts_.master_rpc_ports[i]);
1167
641
    peer_addrs.push_back(addr);
1168
641
  }
1169
440
  string peer_addrs_str = JoinStrings(peer_addrs, ",");
1170
440
  vector<string> flags = opts_.extra_master_flags;
1171
  // Disable WAL fsync for tests
1172
440
  flags.push_back("--durable_wal_write=false");
1173
440
  flags.push_back("--enable_leader_failure_detection=true");
1174
  // Limit number of transaction table tablets to help avoid timeouts.
1175
440
  int num_transaction_table_tablets = NumTabletsPerTransactionTable(opts_);
1176
440
  flags.push_back(Substitute("--transaction_table_num_tablets=$0", num_transaction_table_tablets));
1177
  // For sanitizer builds, it is easy to overload the master, leading to quorum changes.
1178
  // This could end up breaking ever trivial DDLs like creating an initial table in the cluster.
1179
440
  if (IsSanitizer()) {
1180
0
    flags.push_back("--leader_failure_max_missed_heartbeat_periods=10");
1181
0
  }
1182
440
  if (opts_.enable_ysql) {
1183
125
    flags.push_back("--enable_ysql=true");
1184
125
    flags.push_back("--master_auto_run_initdb");
1185
315
  } else {
1186
315
    flags.push_back("--enable_ysql=false");
1187
315
  }
1188
440
  string exe = GetBinaryPath(kMasterBinaryName);
1189
1190
  // Start the masters.
1191
1.08k
  for (size_t i = 0; i < num_masters; 
i++641
) {
1192
641
    uint16_t http_port = AllocateFreePort();
1193
641
    scoped_refptr<ExternalMaster> peer =
1194
641
      new ExternalMaster(
1195
641
        i,
1196
641
        messenger_,
1197
641
        proxy_cache_.get(),
1198
641
        exe,
1199
641
        GetDataPath(Substitute("master-$0", i)),
1200
641
        SubstituteInFlags(flags, i),
1201
641
        peer_addrs[i],
1202
641
        http_port,
1203
641
        peer_addrs_str);
1204
641
    RETURN_NOT_OK_PREPEND(peer->Start(),
1205
641
                          Substitute("Unable to start Master at index $0", i));
1206
641
    masters_.push_back(peer);
1207
641
  }
1208
1209
440
  if (opts_.enable_ysql) {
1210
125
    RETURN_NOT_OK(WaitForInitDb());
1211
125
  }
1212
440
  return Status::OK();
1213
440
}
1214
1215
125
Status ExternalMiniCluster::WaitForInitDb() {
1216
125
  const auto start_time = std::chrono::steady_clock::now();
1217
125
  const auto kTimeout = NonTsanVsTsan(1200s, 1800s);
1218
125
  int num_timeouts = 0;
1219
125
  const int kMaxTimeouts = 10;
1220
2.98k
  while (true) {
1221
6.74k
    for (size_t i = 0; i < opts_.num_masters; 
i++3.76k
) {
1222
3.89k
      auto elapsed_time = std::chrono::steady_clock::now() - start_time;
1223
3.89k
      if (elapsed_time > kTimeout) {
1224
0
        return STATUS_FORMAT(
1225
0
            TimedOut,
1226
0
            "Timed out while waiting for initdb to complete: elapsed time is $0, timeout is $1",
1227
0
            elapsed_time, kTimeout);
1228
0
      }
1229
3.89k
      auto proxy = GetMasterProxy<master::MasterAdminProxy>(i);
1230
3.89k
      rpc::RpcController rpc;
1231
3.89k
      rpc.set_timeout(opts_.timeout);
1232
3.89k
      IsInitDbDoneRequestPB req;
1233
3.89k
      IsInitDbDoneResponsePB resp;
1234
3.89k
      Status status = proxy.IsInitDbDone(req, &resp, &rpc);
1235
3.89k
      if (status.IsTimedOut()) {
1236
0
        num_timeouts++;
1237
0
        LOG(WARNING) << status << " (seen " << num_timeouts << " timeouts so far)";
1238
0
        if (num_timeouts == kMaxTimeouts) {
1239
0
          LOG(ERROR) << "Reached " << kMaxTimeouts << " timeouts: " << status;
1240
0
          return status;
1241
0
        }
1242
0
        continue;
1243
0
      }
1244
3.89k
      if (resp.has_error() &&
1245
3.89k
          
resp.error().code() != master::MasterErrorPB::NOT_THE_LEADER2.90k
) {
1246
1247
0
        return STATUS(RuntimeError, Substitute(
1248
0
            "IsInitDbDone RPC response hit error: $0",
1249
0
            resp.error().ShortDebugString()));
1250
0
      }
1251
3.89k
      if (resp.done()) {
1252
125
        if (resp.has_initdb_error() && 
!resp.initdb_error().empty()0
) {
1253
0
          LOG(ERROR) << "master reported an initdb error: " << resp.initdb_error();
1254
0
          return STATUS(RuntimeError, "initdb failed: " + resp.initdb_error());
1255
0
        }
1256
125
        LOG(INFO) << "master indicated that initdb is done";
1257
125
        return Status::OK();
1258
125
      }
1259
3.89k
    }
1260
2.85k
    std::this_thread::sleep_for(500ms);
1261
2.85k
  }
1262
125
}
1263
1264
48
Result<bool> ExternalMiniCluster::is_ts_stale(int ts_idx, MonoDelta deadline) {
1265
48
  auto proxy = GetMasterProxy<master::MasterClusterProxy>();
1266
48
  std::shared_ptr<rpc::RpcController> controller = std::make_shared<rpc::RpcController>();
1267
48
  master::ListTabletServersRequestPB req;
1268
48
  master::ListTabletServersResponsePB resp;
1269
48
  controller->Reset();
1270
48
  controller->set_timeout(deadline);
1271
1272
48
  RETURN_NOT_OK(proxy.ListTabletServers(req, &resp, controller.get()));
1273
1274
48
  bool is_stale = false, is_ts_found = false;
1275
212
  for (int i = 0; i < resp.servers_size(); 
i++164
) {
1276
164
    if (!resp.servers(i).has_instance_id()) {
1277
0
      return STATUS_SUBSTITUTE(
1278
0
        Uninitialized,
1279
0
        "ListTabletServers RPC returned a TS with uninitialized instance id."
1280
0
      );
1281
0
    }
1282
1283
164
    if (!resp.servers(i).instance_id().has_permanent_uuid()) {
1284
0
      return STATUS_SUBSTITUTE(
1285
0
        Uninitialized,
1286
0
        "ListTabletServers RPC returned a TS with uninitialized UUIDs."
1287
0
      );
1288
0
    }
1289
1290
164
    if (resp.servers(i).instance_id().permanent_uuid() == tablet_server(ts_idx)->uuid()) {
1291
48
      is_ts_found = true;
1292
48
      is_stale = !resp.servers(i).alive();
1293
48
    }
1294
164
  }
1295
1296
48
  if (!is_ts_found) {
1297
0
    return STATUS_SUBSTITUTE(
1298
0
        NotFound,
1299
0
        "Given TS not found in ListTabletServers RPC."
1300
0
    );
1301
0
  }
1302
48
  return is_stale;
1303
48
}
1304
1305
2
CHECKED_STATUS ExternalMiniCluster::WaitForMasterToMarkTSAlive(int ts_idx, MonoDelta deadline) {
1306
2
  RETURN_NOT_OK(WaitFor([&]() -> Result<bool> {
1307
2
    return !VERIFY_RESULT(is_ts_stale(ts_idx));
1308
2
  }, deadline * kTimeMultiplier, "Is TS Alive", 1s));
1309
1310
2
  return Status::OK();
1311
2
}
1312
1313
4
CHECKED_STATUS ExternalMiniCluster::WaitForMasterToMarkTSDead(int ts_idx, MonoDelta deadline) {
1314
4
  RETURN_NOT_OK(WaitFor([&]() -> Result<bool> {
1315
4
    return is_ts_stale(ts_idx);
1316
4
  }, deadline * kTimeMultiplier, "Is TS dead", 1s));
1317
1318
4
  return Status::OK();
1319
4
}
1320
1321
1.29k
string ExternalMiniCluster::GetBindIpForTabletServer(size_t index) const {
1322
1.29k
  if (opts_.use_even_ips) {
1323
18
    return Substitute("127.0.0.$0", (index + 1) * 2);
1324
1.27k
  } else if (opts_.bind_to_unique_loopback_addresses) {
1325
222
#if defined(__APPLE__)
1326
222
    return Substitute("127.0.0.$0", index + 1); // Use default 127.0.0.x IPs.
1327
#else
1328
    const pid_t p = getpid();
1329
    return Substitute("127.$0.$1.$2", (p >> 8) & 0xff, p & 0xff, index);
1330
#endif
1331
1.05k
  } else {
1332
1.05k
    return "127.0.0.1";
1333
1.05k
  }
1334
1.29k
}
1335
1336
Status ExternalMiniCluster::AddTabletServer(
1337
1.29k
    bool start_cql_proxy, const std::vector<std::string>& extra_flags, int num_drives) {
1338
1.29k
  CHECK(GetLeaderMaster() != nullptr)
1339
0
      << "Must have started at least 1 master before adding tablet servers";
1340
1341
1.29k
  size_t idx = tablet_servers_.size();
1342
1343
1.29k
  string exe = GetBinaryPath(kTabletServerBinaryName);
1344
1.29k
  vector<HostPort> master_hostports;
1345
3.09k
  for (size_t i = 0; i < num_masters(); 
i++1.79k
) {
1346
1.79k
    master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport());
1347
1.79k
  }
1348
1349
1.29k
  uint16_t ts_rpc_port = 0;
1350
1.29k
  uint16_t ts_http_port = 0;
1351
1.29k
  uint16_t redis_rpc_port = 0;
1352
1.29k
  uint16_t redis_http_port = 0;
1353
1.29k
  uint16_t cql_rpc_port = 0;
1354
1.29k
  uint16_t cql_http_port = 0;
1355
1.29k
  uint16_t pgsql_rpc_port = 0;
1356
1.29k
  uint16_t pgsql_http_port = 0;
1357
1358
1.29k
  if (idx > 0 && 
opts_.use_same_ts_ports870
&&
opts_.bind_to_unique_loopback_addresses148
) {
1359
148
    const scoped_refptr<ExternalTabletServer>& first_ts = tablet_servers_[0];
1360
148
    ts_rpc_port = first_ts->rpc_port();
1361
148
    ts_http_port = first_ts->http_port();
1362
148
    redis_rpc_port = first_ts->redis_rpc_port();
1363
148
    redis_http_port = first_ts->redis_http_port();
1364
148
    cql_rpc_port = first_ts->cql_rpc_port();
1365
148
    cql_http_port = first_ts->cql_http_port();
1366
148
    pgsql_rpc_port = first_ts->pgsql_rpc_port();
1367
148
    pgsql_http_port = first_ts->pgsql_http_port();
1368
1.14k
  } else {
1369
1.14k
    ts_rpc_port = AllocateFreePort();
1370
1.14k
    ts_http_port = AllocateFreePort();
1371
1.14k
    redis_rpc_port = AllocateFreePort();
1372
1.14k
    redis_http_port = AllocateFreePort();
1373
1.14k
    cql_rpc_port = AllocateFreePort();
1374
1.14k
    cql_http_port = AllocateFreePort();
1375
1.14k
    pgsql_rpc_port = AllocateFreePort();
1376
1.14k
    pgsql_http_port = AllocateFreePort();
1377
1.14k
  }
1378
1379
1.29k
  vector<string> flags = opts_.extra_tserver_flags;
1380
1.29k
  if (opts_.enable_ysql) {
1381
355
    flags.push_back("--enable_ysql=true");
1382
939
  } else {
1383
939
    flags.push_back("--enable_ysql=false");
1384
939
  }
1385
1.29k
  flags.insert(flags.end(), extra_flags.begin(), extra_flags.end());
1386
1387
1.29k
  if (num_drives < 0) {
1388
1.29k
    num_drives = opts_.num_drives;
1389
1.29k
  }
1390
1391
1.29k
  scoped_refptr<ExternalTabletServer> ts = new ExternalTabletServer(
1392
1.29k
      idx, messenger_, proxy_cache_.get(), exe, GetDataPath(Substitute("ts-$0", idx + 1)),
1393
1.29k
      num_drives, GetBindIpForTabletServer(idx), ts_rpc_port, ts_http_port, redis_rpc_port,
1394
1.29k
      redis_http_port, cql_rpc_port, cql_http_port, pgsql_rpc_port, pgsql_http_port,
1395
1.29k
      master_hostports, SubstituteInFlags(flags, idx));
1396
1.29k
  RETURN_NOT_OK(ts->Start(start_cql_proxy));
1397
1.28k
  tablet_servers_.push_back(ts);
1398
1.28k
  return Status::OK();
1399
1.29k
}
1400
1401
513
Status ExternalMiniCluster::WaitForTabletServerCount(size_t count, const MonoDelta& timeout) {
1402
513
  MonoTime deadline = MonoTime::Now();
1403
513
  deadline.AddDelta(timeout);
1404
1405
513
  std::vector<scoped_refptr<ExternalTabletServer>> last_unmatched = tablet_servers_;
1406
513
  bool had_leader = false;
1407
1408
52.3k
  while (true) {
1409
52.3k
    MonoDelta remaining = deadline - MonoTime::Now();
1410
52.3k
    if (remaining.ToSeconds() < 0) {
1411
1
      std::vector<std::string> unmatched_uuids;
1412
1
      unmatched_uuids.reserve(last_unmatched.size());
1413
1
      for (const auto& server : last_unmatched) {
1414
1
        unmatched_uuids.push_back(server->instance_id().permanent_uuid());
1415
1
      }
1416
1
      if (!had_leader) {
1417
1
        return STATUS(TimedOut, "Does not have active master leader to check tserver registration");
1418
1
      }
1419
0
      return STATUS_FORMAT(TimedOut, "$0 TS(s) never registered with master (not registered $1)",
1420
1
                           count, unmatched_uuids);
1421
1
    }
1422
1423
    // We should give some time for RPC to proceed, otherwise all requests would fail.
1424
52.3k
    remaining = std::max<MonoDelta>(remaining, 250ms);
1425
1426
52.3k
    last_unmatched = tablet_servers_;
1427
52.3k
    had_leader = false;
1428
104k
    for (size_t i = 0; i < masters_.size(); 
i++51.9k
) {
1429
52.4k
      master::ListTabletServersRequestPB req;
1430
52.4k
      master::ListTabletServersResponsePB resp;
1431
52.4k
      rpc::RpcController rpc;
1432
52.4k
      rpc.set_timeout(remaining);
1433
52.4k
      auto status = GetMasterProxy<master::MasterClusterProxy>(i).ListTabletServers(
1434
52.4k
          req, &resp, &rpc);
1435
52.4k
      LOG_IF
(WARNING, !status.ok()) << "ListTabletServers failed: " << status35.9k
;
1436
52.4k
      if (!status.ok() || 
resp.has_error()16.4k
) {
1437
48.7k
        continue;
1438
48.7k
      }
1439
3.67k
      had_leader = true;
1440
      // ListTabletServers() may return servers that are no longer online.
1441
      // Do a second step of verification to verify that the descs that we got
1442
      // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
1443
3.67k
      size_t match_count = 0;
1444
4.83k
      for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
1445
11.1k
        for (auto it = last_unmatched.begin(); it != last_unmatched.end(); 
++it6.31k
) {
1446
11.1k
          if ((**it).instance_id().permanent_uuid() == e.instance_id().permanent_uuid() &&
1447
11.1k
              
(**it).instance_id().instance_seqno() == e.instance_id().instance_seqno()4.83k
) {
1448
4.83k
            match_count++;
1449
4.83k
            last_unmatched.erase(it);
1450
4.83k
            break;
1451
4.83k
          }
1452
11.1k
        }
1453
4.83k
      }
1454
3.67k
      if (match_count >= count) {
1455
512
        LOG(INFO) << count << " TS(s) registered with Master";
1456
512
        return Status::OK();
1457
512
      }
1458
3.67k
    }
1459
51.7k
    SleepFor(MonoDelta::FromMilliseconds(1));
1460
51.7k
  }
1461
513
}
1462
1463
89
void ExternalMiniCluster::AssertNoCrashes() {
1464
89
  vector<ExternalDaemon*> daemons = this->daemons();
1465
329
  for (ExternalDaemon* d : daemons) {
1466
329
    if (d->IsShutdown()) 
continue0
;
1467
658
    EXPECT_TRUE(d->IsProcessAlive()) << "At least one process crashed. viz: "
1468
658
                                     << d->id();
1469
329
  }
1470
89
}
1471
1472
Result<std::vector<ListTabletsForTabletServerResponsePB::Entry>> ExternalMiniCluster::GetTablets(
1473
881
    ExternalTabletServer* ts) {
1474
881
  TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr());
1475
881
  ListTabletsForTabletServerRequestPB req;
1476
881
  ListTabletsForTabletServerResponsePB resp;
1477
1478
881
  rpc::RpcController rpc;
1479
881
  rpc.set_timeout(10s * kTimeMultiplier);
1480
881
  RETURN_NOT_OK(proxy.ListTabletsForTabletServer(req, &resp, &rpc));
1481
1482
881
  std::vector<ListTabletsForTabletServerResponsePB::Entry> result;
1483
4.41k
  for (const ListTabletsForTabletServerResponsePB::Entry& entry : resp.entries()) {
1484
4.41k
    result.push_back(entry);
1485
4.41k
  }
1486
1487
881
  return result;
1488
881
}
1489
1490
Result<tserver::GetSplitKeyResponsePB> ExternalMiniCluster::GetSplitKey(
1491
1
    const std::string& tablet_id) {
1492
1
  for (size_t i = 0; i < this->num_tablet_servers(); 
i++0
) {
1493
1
    auto tserver = this->tablet_server(i);
1494
1
    auto ts_service_proxy = std::make_unique<tserver::TabletServerServiceProxy>(
1495
1
        proxy_cache_.get(), tserver->bound_rpc_addr());
1496
1
    tserver::GetSplitKeyRequestPB req;
1497
1
    req.set_tablet_id(tablet_id);
1498
1
    rpc::RpcController controller;
1499
1
    controller.set_timeout(10s * kTimeMultiplier);
1500
1
    tserver::GetSplitKeyResponsePB resp;
1501
1
    RETURN_NOT_OK(ts_service_proxy->GetSplitKey(req, &resp, &controller));
1502
1
    if (!resp.has_error()) {
1503
1
      return resp;
1504
1
    }
1505
1
  }
1506
0
  return STATUS(IllegalState, "GetSplitKey failed on all TServers");
1507
1
}
1508
1509
Status ExternalMiniCluster::FlushTabletsOnSingleTServer(
1510
    ExternalTabletServer* ts, const std::vector<yb::TabletId> tablet_ids,
1511
5
    bool is_compaction) {
1512
5
  tserver::FlushTabletsRequestPB req;
1513
5
  tserver::FlushTabletsResponsePB resp;
1514
5
  rpc::RpcController controller;
1515
5
  controller.set_timeout(10s * kTimeMultiplier);
1516
1517
5
  req.set_dest_uuid(ts->uuid());
1518
5
  req.set_operation(is_compaction ? 
tserver::FlushTabletsRequestPB::COMPACT0
1519
5
                                  : tserver::FlushTabletsRequestPB::FLUSH);
1520
5
  for (const auto& tablet_id : tablet_ids) {
1521
5
    req.add_tablet_ids(tablet_id);
1522
5
  }
1523
1524
5
  auto ts_admin_service_proxy = std::make_unique<tserver::TabletServerAdminServiceProxy>(
1525
5
    proxy_cache_.get(), ts->bound_rpc_addr());
1526
5
  return ts_admin_service_proxy->FlushTablets(req, &resp, &controller);
1527
5
}
1528
1529
3
Result<tserver::ListTabletsResponsePB> ExternalMiniCluster::ListTablets(ExternalTabletServer* ts) {
1530
3
  rpc::RpcController rpc;
1531
3
  ListTabletsRequestPB req;
1532
3
  ListTabletsResponsePB resp;
1533
3
  rpc.set_timeout(opts_.timeout);
1534
3
  TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr());
1535
3
  RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
1536
3
  return resp;
1537
3
}
1538
1539
9
Result<std::vector<std::string>> ExternalMiniCluster::GetTabletIds(ExternalTabletServer* ts) {
1540
9
  auto tablets = VERIFY_RESULT(GetTablets(ts));
1541
0
  std::vector<std::string> result;
1542
381
  for (const auto& tablet : tablets) {
1543
381
    result.push_back(tablet.tablet_id());
1544
381
  }
1545
9
  return result;
1546
9
}
1547
1548
Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
1549
23
                                                  const MonoDelta& timeout) {
1550
23
  TabletServerServiceProxy proxy(proxy_cache_.get(), ts->bound_rpc_addr());
1551
23
  ListTabletsRequestPB req;
1552
23
  ListTabletsResponsePB resp;
1553
1554
23
  MonoTime deadline = MonoTime::Now();
1555
23
  deadline.AddDelta(timeout);
1556
23
  while (MonoTime::Now().ComesBefore(deadline)) {
1557
23
    rpc::RpcController rpc;
1558
23
    rpc.set_timeout(MonoDelta::FromSeconds(10));
1559
23
    RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
1560
23
    if (resp.has_error()) {
1561
0
      return StatusFromPB(resp.error().status());
1562
0
    }
1563
1564
23
    int num_not_running = 0;
1565
71
    for (const StatusAndSchemaPB& status : resp.status_and_schema()) {
1566
71
      if (status.tablet_status().state() != tablet::RUNNING) {
1567
0
        num_not_running++;
1568
0
      }
1569
71
    }
1570
1571
23
    if (num_not_running == 0) {
1572
23
      return Status::OK();
1573
23
    }
1574
1575
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1576
0
  }
1577
1578
0
  return STATUS(TimedOut, resp.DebugString());
1579
23
}
1580
1581
0
Status ExternalMiniCluster::WaitForTSToCrash(size_t index, const MonoDelta& timeout) {
1582
0
  ExternalTabletServer* ts = tablet_server(index);
1583
0
  return WaitForTSToCrash(ts, timeout);
1584
0
}
1585
1586
Status ExternalMiniCluster::WaitForTSToCrash(const ExternalTabletServer* ts,
1587
7
                                             const MonoDelta& timeout) {
1588
7
  MonoTime deadline = MonoTime::Now();
1589
7
  deadline.AddDelta(timeout);
1590
4.70k
  while (MonoTime::Now().ComesBefore(deadline)) {
1591
4.70k
    if (!ts->IsProcessAlive()) {
1592
6
      return Status::OK();
1593
6
    }
1594
4.69k
    SleepFor(MonoDelta::FromMilliseconds(10));
1595
4.69k
  }
1596
1
  return STATUS(TimedOut, Substitute("TS $0 did not crash!", ts->instance_id().permanent_uuid()));
1597
7
}
1598
1599
namespace {
1600
void LeaderMasterCallback(HostPort* dst_hostport,
1601
                          Synchronizer* sync,
1602
                          const Status& status,
1603
2.93k
                          const HostPort& result) {
1604
2.93k
  if (status.ok()) {
1605
2.92k
    *dst_hostport = result;
1606
2.92k
  }
1607
2.93k
  sync->StatusCB(status);
1608
2.93k
}
1609
}  // anonymous namespace
1610
1611
5
Result<size_t> ExternalMiniCluster::GetFirstNonLeaderMasterIndex() {
1612
5
  return GetPeerMasterIndex(false);
1613
5
}
1614
1615
3.02k
Result<size_t> ExternalMiniCluster::GetLeaderMasterIndex() {
1616
3.02k
  return GetPeerMasterIndex(true);
1617
3.02k
}
1618
1619
3.03k
Result<size_t> ExternalMiniCluster::GetPeerMasterIndex(bool is_leader) {
1620
3.03k
  Synchronizer sync;
1621
3.03k
  server::MasterAddresses addrs;
1622
3.03k
  HostPort leader_master_hp;
1623
3.03k
  auto deadline = CoarseMonoClock::Now() + 5s;
1624
1625
3.95k
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1626
3.95k
    if (master->IsProcessAlive()) {
1627
3.85k
      addrs.push_back({ master->bound_rpc_addr() });
1628
3.85k
    }
1629
3.95k
  }
1630
3.03k
  if (addrs.empty()) {
1631
99
    return STATUS(IllegalState, "No running masters");
1632
99
  }
1633
2.93k
  rpc::Rpcs rpcs;
1634
2.93k
  auto rpc = std::make_shared<GetLeaderMasterRpc>(
1635
2.93k
      Bind(&LeaderMasterCallback, &leader_master_hp, &sync),
1636
2.93k
      addrs,
1637
2.93k
      deadline,
1638
2.93k
      messenger_,
1639
2.93k
      proxy_cache_.get(),
1640
2.93k
      &rpcs);
1641
2.93k
  rpc->SendRpc();
1642
2.93k
  RETURN_NOT_OK(sync.Wait());
1643
2.92k
  rpcs.Shutdown();
1644
1645
2.92k
  const char* peer_type = is_leader ? 
"leader"2.92k
:
"non-leader"5
;
1646
3.43k
  for (size_t i = 0; i < masters_.size(); 
i++511
) {
1647
3.43k
    bool matches_leader = masters_[i]->bound_rpc_hostport().port() == leader_master_hp.port();
1648
3.43k
    if (is_leader == matches_leader) {
1649
2.92k
      LOG(INFO) << "Found peer " << peer_type << " at index " << i << ".";
1650
2.92k
      return i;
1651
2.92k
    }
1652
3.43k
  }
1653
1654
  // There is never a situation where this should happen, so it's
1655
  // better to exit with a FATAL log message right away vs. return a
1656
  // Status::IllegalState().
1657
0
  auto status = STATUS_FORMAT(NotFound, "Peer $0 master is not in masters_ list", peer_type);
1658
0
  LOG(FATAL) << status;
1659
0
  return status;
1660
2.92k
}
1661
1662
2.89k
ExternalMaster* ExternalMiniCluster::GetLeaderMaster() {
1663
2.89k
  int num_attempts = 0;
1664
  // Retry to get the leader master's index - due to timing issues (like election in progress).
1665
3.00k
  for (;;) {
1666
3.00k
    ++num_attempts;
1667
3.00k
    auto idx = GetLeaderMasterIndex();
1668
3.00k
    if (idx.ok()) {
1669
2.89k
      return master(*idx);
1670
2.89k
    }
1671
108
    LOG(INFO) << "GetLeaderMasterIndex@" << num_attempts << " hit error: " << idx.status();
1672
108
    if (num_attempts >= kMaxRetryIterations) {
1673
1
      LOG(WARNING) << "Failed to get leader master after " << num_attempts << " attempts, "
1674
1
                   << "returning the first master.";
1675
1
      break;
1676
1
    }
1677
107
    SleepFor(MonoDelta::FromMilliseconds(num_attempts * 10));
1678
107
  }
1679
1680
1
  return master(0);
1681
2.89k
}
1682
1683
4
Result<size_t> ExternalMiniCluster::GetTabletLeaderIndex(const std::string& tablet_id) {
1684
8
  for (size_t i = 0; i < num_tablet_servers(); 
++i4
) {
1685
8
    auto tserver = tablet_server(i);
1686
8
    if (tserver->IsProcessAlive()) {
1687
7
      auto tablets = VERIFY_RESULT(GetTablets(tserver));
1688
22
      for (const auto& tablet : tablets) {
1689
22
        if (tablet.tablet_id() == tablet_id && 
tablet.is_leader()7
) {
1690
4
          return i;
1691
4
        }
1692
22
      }
1693
7
    }
1694
8
  }
1695
0
  return STATUS(
1696
4
      NotFound, Format("Could not find leader of tablet $0 among live tservers.", tablet_id));
1697
4
}
1698
1699
96
ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const {
1700
230
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
1701
230
    if (ts->instance_id().permanent_uuid() == uuid) {
1702
96
      return ts.get();
1703
96
    }
1704
230
  }
1705
0
  return nullptr;
1706
96
}
1707
1708
6
int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
1709
12
  for (size_t i = 0; i < tablet_servers_.size(); 
i++6
) {
1710
12
    if (tablet_servers_[i]->uuid() == uuid) {
1711
6
      return narrow_cast<int>(i);
1712
6
    }
1713
12
  }
1714
0
  return -1;
1715
6
}
1716
1717
282
vector<ExternalMaster*> ExternalMiniCluster::master_daemons() const {
1718
282
  vector<ExternalMaster*> results;
1719
564
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1720
564
    results.push_back(master.get());
1721
564
  }
1722
282
  return results;
1723
282
}
1724
1725
89
vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
1726
89
  vector<ExternalDaemon*> results;
1727
237
  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
1728
237
    results.push_back(ts.get());
1729
237
  }
1730
92
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1731
92
    results.push_back(master.get());
1732
92
  }
1733
89
  return results;
1734
89
}
1735
1736
93
std::vector<ExternalTabletServer*> ExternalMiniCluster::tserver_daemons() const {
1737
93
  std::vector<ExternalTabletServer*> result;
1738
93
  result.reserve(tablet_servers_.size());
1739
291
  for (const auto& ts : tablet_servers_) {
1740
291
    result.push_back(ts.get());
1741
291
  }
1742
93
  return result;
1743
93
}
1744
1745
0
HostPort ExternalMiniCluster::pgsql_hostport(int node_index) const {
1746
0
  return HostPort(tablet_servers_[node_index]->bind_host(),
1747
0
                  tablet_servers_[node_index]->pgsql_rpc_port());
1748
0
}
1749
1750
420k
rpc::Messenger* ExternalMiniCluster::messenger() {
1751
420k
  return messenger_;
1752
420k
}
1753
1754
std::shared_ptr<server::GenericServiceProxy> ExternalMiniCluster::master_generic_proxy(
1755
1
    int idx) const {
1756
1
  CHECK_GE(idx, 0);
1757
1
  CHECK_LT(idx, masters_.size());
1758
1
  return std::make_shared<server::GenericServiceProxy>(
1759
1
    proxy_cache_.get(), CHECK_NOTNULL(master(idx))->bound_rpc_addr());
1760
1
}
1761
1762
std::shared_ptr<server::GenericServiceProxy> ExternalMiniCluster::master_generic_proxy(
1763
0
    const HostPort& bound_rpc_addr) const {
1764
0
  return std::make_shared<server::GenericServiceProxy>(proxy_cache_.get(), bound_rpc_addr);
1765
0
}
1766
1767
287
void ExternalMiniCluster::ConfigureClientBuilder(client::YBClientBuilder* builder) {
1768
287
  CHECK_NOTNULL(builder);
1769
287
  CHECK(!masters_.empty());
1770
287
  builder->clear_master_server_addrs();
1771
378
  for (const scoped_refptr<ExternalMaster>& master : masters_) {
1772
378
    builder->add_master_server_addr(master->bound_rpc_hostport().ToString());
1773
378
  }
1774
287
}
1775
1776
1.18k
Result<HostPort> ExternalMiniCluster::DoGetLeaderMasterBoundRpcAddr() {
1777
1.18k
  return GetLeaderMaster()->bound_rpc_addr();
1778
1.18k
}
1779
1780
Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon,
1781
                                    const string& flag,
1782
109
                                    const string& value) {
1783
109
  server::GenericServiceProxy proxy(proxy_cache_.get(), daemon->bound_rpc_addr());
1784
1785
109
  rpc::RpcController controller;
1786
109
  controller.set_timeout(MonoDelta::FromSeconds(30));
1787
109
  server::SetFlagRequestPB req;
1788
109
  server::SetFlagResponsePB resp;
1789
109
  req.set_flag(flag);
1790
109
  req.set_value(value);
1791
109
  req.set_force(true);
1792
109
  RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller),
1793
109
                        "rpc failed");
1794
108
  if (resp.result() != server::SetFlagResponsePB::SUCCESS) {
1795
0
    return STATUS(RemoteError, "failed to set flag",
1796
0
                               resp.ShortDebugString());
1797
0
  }
1798
108
  return Status::OK();
1799
108
}
1800
1801
12
Status ExternalMiniCluster::SetFlagOnMasters(const string& flag, const string& value) {
1802
12
  for (const auto& master : masters_) {
1803
12
    RETURN_NOT_OK(SetFlag(master.get(), flag, value));
1804
12
  }
1805
12
  return Status::OK();
1806
12
}
1807
1808
4
Status ExternalMiniCluster::SetFlagOnTServers(const string& flag, const string& value) {
1809
14
  for (const auto& tablet_server : tablet_servers_) {
1810
14
    RETURN_NOT_OK(SetFlag(tablet_server.get(), flag, value));
1811
14
  }
1812
3
  return Status::OK();
1813
4
}
1814
1815
1816
10.5k
uint16_t ExternalMiniCluster::AllocateFreePort() {
1817
  // This will take a file lock ensuring the port does not get claimed by another thread/process
1818
  // and add it to our vector of such locks that will be freed on minicluster shutdown.
1819
10.5k
  free_port_file_locks_.emplace_back();
1820
10.5k
  return GetFreePort(&free_port_file_locks_.back());
1821
10.5k
}
1822
1823
0
Status ExternalMiniCluster::StartElection(ExternalMaster* master) {
1824
0
  auto master_sock = master->bound_rpc_addr();
1825
0
  auto master_proxy = std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), master_sock);
1826
1827
0
  RunLeaderElectionRequestPB req;
1828
0
  req.set_dest_uuid(master->uuid());
1829
0
  req.set_tablet_id(yb::master::kSysCatalogTabletId);
1830
0
  RunLeaderElectionResponsePB resp;
1831
0
  RpcController rpc;
1832
0
  rpc.set_timeout(opts_.timeout);
1833
0
  RETURN_NOT_OK(master_proxy->RunLeaderElection(req, &resp, &rpc));
1834
0
  if (resp.has_error()) {
1835
0
    return StatusFromPB(resp.error().status())
1836
0
               .CloneAndPrepend(Substitute("Code $0",
1837
0
                                           TabletServerErrorPB::Code_Name(resp.error().code())));
1838
0
  }
1839
0
  return Status::OK();
1840
0
}
1841
1842
57
ExternalMaster* ExternalMiniCluster::master() const {
1843
57
  if (masters_.empty())
1844
0
    return nullptr;
1845
1846
57
  CHECK_EQ(masters_.size(), 1)
1847
0
      << "master() should not be used with multiple masters, use GetLeaderMaster() instead.";
1848
57
  return master(0);
1849
57
}
1850
1851
// Return master at 'idx' or NULL if the master at 'idx' has not been started.
1852
61.9k
ExternalMaster* ExternalMiniCluster::master(size_t idx) const {
1853
61.9k
  CHECK_LT(idx, masters_.size());
1854
61.9k
  return masters_[idx].get();
1855
61.9k
}
1856
1857
3.88k
ExternalTabletServer* ExternalMiniCluster::tablet_server(size_t idx) const {
1858
3.88k
  CHECK_LT(idx, tablet_servers_.size());
1859
3.88k
  return tablet_servers_[idx].get();
1860
3.88k
}
1861
1862
//------------------------------------------------------------
1863
// ExternalDaemon
1864
//------------------------------------------------------------
1865
1866
namespace {
1867
1868
// Global state to manage all log tailer threads. This state is managed using Singleton from gutil
1869
// and is never deallocated.
1870
struct GlobalLogTailerState {
1871
  mutex logging_mutex;
1872
  atomic<int> next_log_tailer_id{0};
1873
1874
  // We need some references to these heap-allocated atomic booleans so that ASAN would not consider
1875
  // them memory leaks.
1876
  mutex id_to_stopped_flag_mutex;
1877
  map<int, atomic<bool>*> id_to_stopped_flag;
1878
1879
  // This is used to limit the total amount of logs produced by external daemons over the lifetime
1880
  // of a test program. Guarded by logging_mutex.
1881
  size_t total_bytes_logged = 0;
1882
};
1883
1884
}  // anonymous namespace
1885
1886
class ExternalDaemon::LogTailerThread {
1887
 public:
1888
  LogTailerThread(const string line_prefix,
1889
                  const int child_fd,
1890
                  ostream* const out)
1891
      : id_(global_state()->next_log_tailer_id.fetch_add(1)),
1892
        stopped_(CreateStoppedFlagForId(id_)),
1893
        thread_desc_(Substitute("log tailer thread for prefix $0", line_prefix)),
1894
4.04k
        thread_([=] {
1895
4.04k
          VLOG
(1) << "Starting " << thread_desc_1
;
1896
4.04k
          FILE* const fp = fdopen(child_fd, "rb");
1897
4.04k
          char buf[65536];
1898
4.04k
          const atomic<bool>* stopped;
1899
1900
4.04k
          {
1901
4.04k
            lock_guard<mutex> l(state_lock_);
1902
4.04k
            stopped = stopped_;
1903
4.04k
          }
1904
1905
          // Instead of doing a nonblocking read, we detach this thread and allow it to block
1906
          // indefinitely trying to read from a child process's stream where nothing is happening.
1907
          // This is probably OK as long as we are careful to avoid accessing any state that might
1908
          // have been already destructed (e.g. logging, cout/cerr, member fields of this class,
1909
          // etc.) in case we do get unblocked. Instead, we keep a local pointer to the atomic
1910
          // "stopped" flag, and that allows us to safely check if it is OK to print log messages.
1911
          // The "stopped" flag itself is never deallocated.
1912
4.04k
          bool is_eof = false;
1913
4.04k
          bool is_fgets_null = false;
1914
4.04k
          auto& logging_mutex = global_state()->logging_mutex;
1915
4.04k
          auto& total_bytes_logged = global_state()->total_bytes_logged;
1916
1.86M
          while (!(is_eof = feof(fp)) &&
1917
1.86M
                 !(is_fgets_null = (fgets(buf, sizeof(buf), fp) == nullptr)) &&
1918
1.86M
                 
!stopped->load()1.86M
) {
1919
1.86M
            size_t l = strlen(buf);
1920
18.4E
            const char* maybe_end_of_line = 
l > 01.86M
&&
buf[l - 1] == '\n'1.86M
?
""1.86M
: "\n";
1921
            // Synchronize tailing output from all external daemons for simplicity.
1922
1.86M
            lock_guard<mutex> lock(logging_mutex);
1923
1.86M
            if (stopped->load()) 
break0
;
1924
            // Make sure we always output an end-of-line character.
1925
1.86M
            *out << line_prefix << " " << buf << maybe_end_of_line;
1926
1.86M
            if (
!stopped->load()1.86M
) {
1927
1.86M
              auto listener = listener_.load(std::memory_order_acquire);
1928
1.86M
              if (!stopped->load() && listener) {
1929
1.18k
                listener->Handle(GStringPiece(buf, maybe_end_of_line ? l : 
l - 10
));
1930
1.18k
              }
1931
1.86M
            }
1932
1.86M
            total_bytes_logged += strlen(buf) + strlen(maybe_end_of_line);
1933
            // Abort the test if it produces too much log spew.
1934
1.86M
            CHECK_LE(total_bytes_logged, FLAGS_external_mini_cluster_max_log_bytes);
1935
1.86M
          }
1936
4.04k
          fclose(fp);
1937
4.04k
          if (!stopped->load()) {
1938
            // It might not be safe to log anything if we have already stopped.
1939
2.93k
            VLOG(1) << "Exiting " << thread_desc_
1940
12
                    << ": is_eof=" << is_eof
1941
12
                    << ", is_fgets_null=" << is_fgets_null
1942
12
                    << ", stopped=0";
1943
2.93k
          }
1944
4.05k
        }) {
1945
4.05k
    thread_.detach();
1946
4.05k
  }
1947
1948
8
  void SetListener(StringListener* listener) {
1949
8
    listener_ = listener;
1950
8
  }
1951
1952
8
  void RemoveListener(StringListener* listener) {
1953
8
    listener_.compare_exchange_strong(listener, nullptr);
1954
8
  }
1955
1956
3.18k
  ~LogTailerThread() {
1957
3.18k
    VLOG
(1) << "Stopping " << thread_desc_0
;
1958
3.18k
    lock_guard<mutex> l(state_lock_);
1959
3.18k
    stopped_->store(true);
1960
3.18k
    listener_ = nullptr;
1961
3.18k
  }
1962
1963
 private:
1964
20.2k
  static GlobalLogTailerState* global_state() {
1965
20.2k
    return Singleton<GlobalLogTailerState>::get();
1966
20.2k
  }
1967
1968
4.05k
  static atomic<bool>* CreateStoppedFlagForId(int id) {
1969
4.05k
    lock_guard<mutex> lock(global_state()->id_to_stopped_flag_mutex);
1970
    // This is never deallocated, but we add this pointer to the id_to_stopped_flag map referenced
1971
    // from the global state singleton, and that apparently makes ASAN no longer consider this to be
1972
    // a memory leak. We don't need to check if the id already exists in the map, because this
1973
    // function is never invoked with a particular id more than once.
1974
4.05k
    auto* const stopped = new atomic<bool>();
1975
4.05k
    stopped->store(false);
1976
4.05k
    global_state()->id_to_stopped_flag[id] = stopped;
1977
4.05k
    return stopped;
1978
4.05k
  }
1979
1980
  const int id_;
1981
1982
  // This lock protects the stopped_ pointer in case of a race between tailer thread's
1983
  // initialization (i.e. before it gets into its loop) and the destructor.
1984
  mutex state_lock_;
1985
1986
  atomic<bool>* const stopped_;
1987
  const string thread_desc_;  // A human-readable description of this thread.
1988
  thread thread_;
1989
  std::atomic<StringListener*> listener_{nullptr};
1990
};
1991
1992
ExternalDaemon::ExternalDaemon(
1993
    std::string daemon_id,
1994
    rpc::Messenger* messenger,
1995
    rpc::ProxyCache* proxy_cache,
1996
    const string& exe,
1997
    const string& root_dir,
1998
    const std::vector<std::string>& data_dirs,
1999
    const vector<string>& extra_flags)
2000
  : daemon_id_(daemon_id),
2001
    messenger_(messenger),
2002
    proxy_cache_(proxy_cache),
2003
    exe_(exe),
2004
    root_dir_(root_dir),
2005
    data_dirs_(data_dirs),
2006
1.94k
    extra_flags_(extra_flags) {}
2007
2008
1.50k
ExternalDaemon::~ExternalDaemon() {
2009
1.50k
}
2010
2011
517k
bool ExternalDaemon::ServerInfoPathsExist() {
2012
517k
  return Env::Default()->FileExists(GetServerInfoPath());
2013
517k
}
2014
2015
2.01k
Status ExternalDaemon::BuildServerStateFromInfoPath() {
2016
2.01k
  return BuildServerStateFromInfoPath(GetServerInfoPath(), &status_);
2017
2.01k
}
2018
2019
Status ExternalDaemon::BuildServerStateFromInfoPath(
2020
3.34k
    const string& info_path, std::unique_ptr<ServerStatusPB>* server_status) {
2021
3.34k
  server_status->reset(new ServerStatusPB());
2022
3.34k
  RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, (*server_status).get()),
2023
3.34k
                        "Failed to read info file from " + info_path);
2024
3.34k
  return Status::OK();
2025
3.34k
}
2026
2027
682k
string ExternalDaemon::GetServerInfoPath() {
2028
682k
  return JoinPathSegments(root_dir_, "info.pb");
2029
682k
}
2030
2031
2.02k
Status ExternalDaemon::DeleteServerInfoPaths() {
2032
2.02k
  return Env::Default()->DeleteFile(GetServerInfoPath());
2033
2.02k
}
2034
2035
2.02k
Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
2036
2.02k
  CHECK(!process_);
2037
2038
2.02k
  vector<string> argv;
2039
  // First the exe for argv[0]
2040
2.02k
  argv.push_back(BaseName(exe_));
2041
2042
  // Then all the flags coming from the minicluster framework.
2043
2.02k
  argv.insert(argv.end(), user_flags.begin(), user_flags.end());
2044
2045
  // Disable callhome.
2046
2.02k
  argv.push_back("--callhome_enabled=false");
2047
2048
  // Disabled due to #4507.
2049
  // TODO: Enable metrics logging after #4507 is fixed.
2050
  //
2051
  // Even though we set -logtostderr down below, metrics logs end up being written
2052
  // based on -log_dir. So, we have to set that too.
2053
2.02k
  argv.push_back("--metrics_log_interval_ms=0");
2054
2055
  // Force set log_dir to empty value, process will chose default destination inside fs_data_dir
2056
  // In other case log_dir value will be extracted from TEST_TMPDIR env variable but it is
2057
  // inherited from test script
2058
2.02k
  argv.push_back("--log_dir=");
2059
2060
  // Tell the server to dump its port information so we can pick it up.
2061
2.02k
  const string info_path = GetServerInfoPath();
2062
2.02k
  argv.push_back("--server_dump_info_path=" + info_path);
2063
2.02k
  argv.push_back("--server_dump_info_format=pb");
2064
2065
  // We use ephemeral ports in many tests. They don't work for production, but are OK
2066
  // in unit tests.
2067
2.02k
  argv.push_back("--rpc_server_allow_ephemeral_ports");
2068
2069
  // A previous instance of the daemon may have run in the same directory. So, remove
2070
  // the previous info file if it's there.
2071
2.02k
  Status s = DeleteServerInfoPaths();
2072
2.02k
  if (!s.ok() && 
!s.IsNotFound()1.94k
) {
2073
0
    LOG (WARNING) << "Failed to delete info paths: " << s.ToString();
2074
0
  }
2075
2076
  // Ensure that logging goes to the test output doesn't get buffered.
2077
2.02k
  argv.push_back("--logbuflevel=-1");
2078
2079
  // Use the same verbose logging level in the child process as in the test driver.
2080
2.02k
  if (FLAGS_v != 0) {  // Skip this option if it has its default value (0).
2081
0
    argv.push_back(Substitute("-v=$0", FLAGS_v));
2082
0
  }
2083
2.02k
  if (!FLAGS_vmodule.empty()) {
2084
0
    argv.push_back(Substitute("--vmodule=$0", FLAGS_vmodule));
2085
0
  }
2086
2.02k
  if (FLAGS_mem_tracker_logging) {
2087
0
    argv.push_back("--mem_tracker_logging");
2088
0
  }
2089
2.02k
  if (FLAGS_mem_tracker_log_stack_trace) {
2090
0
    argv.push_back("--mem_tracker_log_stack_trace");
2091
0
  }
2092
2.02k
  if (FLAGS_use_libbacktrace) {
2093
0
    argv.push_back("--use_libbacktrace");
2094
0
  }
2095
2096
2.02k
  const char* test_invocation_id = getenv("YB_TEST_INVOCATION_ID");
2097
2.02k
  if (test_invocation_id) {
2098
    // We use --metric_node_name=... to include a unique "test invocation id" into the command
2099
    // line so we can kill any stray processes later. --metric_node_name is normally how we pass
2100
    // the Universe ID to the cluster. We could use any other flag that is present in yb-master
2101
    // and yb-tserver for this.
2102
2.02k
    argv.push_back(Format("--metric_node_name=$0", test_invocation_id));
2103
2.02k
  }
2104
2105
2.02k
  string fatal_details_path_prefix = GetFatalDetailsPathPrefix();
2106
2.02k
  argv.push_back(Format(
2107
2.02k
      "--fatal_details_path_prefix=$0.$1", GetFatalDetailsPathPrefix(), daemon_id_));
2108
2109
2.02k
  argv.push_back(Format("--minicluster_daemon_id=$0", daemon_id_));
2110
2111
  // Finally, extra flags to override.
2112
  // - extra_flags_ is taken from ExternalMiniCluster.opts_, which is often set by test subclasses'
2113
  //   UpdateMiniClusterOptions.
2114
  // - extra daemon flags is supplied by the user, either through environment variable or
2115
  //   yb_build.sh --extra_daemon_flags (or --extra_daemon_args), so it should take highest
2116
  //   precedence.
2117
2.02k
  argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end());
2118
2.02k
  AddExtraFlagsFromEnvVar("YB_EXTRA_DAEMON_FLAGS", &argv);
2119
2120
2.02k
  std::unique_ptr<Subprocess> p(new Subprocess(exe_, argv));
2121
2.02k
  p->PipeParentStdout();
2122
2.02k
  p->PipeParentStderr();
2123
2.02k
  auto default_output_prefix = Substitute("[$0]", daemon_id_);
2124
2.02k
  LOG(INFO) << "Running " << default_output_prefix << ": " << exe_ << "\n"
2125
2.02k
    << JoinStrings(argv, "\n");
2126
2.02k
  if (!FLAGS_external_daemon_heap_profile_prefix.empty()) {
2127
0
    p->SetEnv("HEAPPROFILE",
2128
0
              FLAGS_external_daemon_heap_profile_prefix + "_" + daemon_id_);
2129
0
    p->SetEnv("HEAPPROFILESIGNAL", std::to_string(kHeapProfileSignal));
2130
0
  }
2131
2132
2.02k
  const char* llvm_profile_env_var_value = getenv("LLVM_PROFILE_FILE");
2133
2.02k
  if (llvm_profile_env_var_value) {
2134
2.02k
    p->SetEnv("LLVM_PROFILE_FILE", Format("$0_$1", llvm_profile_env_var_value, daemon_id_));
2135
2.02k
  }
2136
2137
2.02k
  RETURN_NOT_OK_PREPEND(p->Start(),
2138
2.02k
                        Substitute("Failed to start subprocess $0", exe_));
2139
2140
2.02k
  stdout_tailer_thread_ = std::make_unique<LogTailerThread>(
2141
2.02k
      Substitute("[$0 stdout]", daemon_id_), p->ReleaseChildStdoutFd(), &std::cout);
2142
2143
  // We will mostly see stderr output from the child process (because of --logtostderr), so we'll
2144
  // assume that by default in the output prefix.
2145
2.02k
  stderr_tailer_thread_ = std::make_unique<LogTailerThread>(
2146
2.02k
      default_output_prefix, p->ReleaseChildStderrFd(), &std::cerr);
2147
2148
  // The process is now starting -- wait for the bound port info to show up.
2149
2.02k
  Stopwatch sw;
2150
2.02k
  sw.start();
2151
2.02k
  bool success = false;
2152
517k
  while (sw.elapsed().wall_seconds() < kProcessStartTimeoutSeconds) {
2153
517k
    if (ServerInfoPathsExist()) {
2154
2.01k
      success = true;
2155
2.01k
      break;
2156
2.01k
    }
2157
515k
    SleepFor(MonoDelta::FromMilliseconds(10));
2158
515k
    int rc;
2159
515k
    Status s = p->WaitNoBlock(&rc);
2160
515k
    if (s.IsTimedOut()) {
2161
      // The process is still running.
2162
515k
      continue;
2163
515k
    }
2164
2
    RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_));
2165
2
    return STATUS(RuntimeError,
2166
2
      Substitute("Process exited with rc=$0", rc),
2167
2
      exe_);
2168
2
  }
2169
2170
2.02k
  if (!success) {
2171
12
    WARN_NOT_OK(p->Kill(SIGKILL), "Killing process failed");
2172
12
    return STATUS(TimedOut,
2173
12
        Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)",
2174
12
                   kProcessStartTimeoutSeconds, exe_, info_path));
2175
12
  }
2176
2177
2.01k
  RETURN_NOT_OK(BuildServerStateFromInfoPath());
2178
2.01k
  LOG(INFO) << "Started " << default_output_prefix << " " << exe_ << " as pid " << p->pid();
2179
2.01k
  VLOG
(1) << exe_ << " instance information:\n" << status_->DebugString()0
;
2180
2181
2.01k
  process_.swap(p);
2182
2.01k
  return Status::OK();
2183
2.01k
}
2184
2185
57
Status ExternalDaemon::Pause() {
2186
57
  if (!process_) 
return Status::OK()0
;
2187
57
  VLOG
(1) << "Pausing " << ProcessNameAndPidStr()0
;
2188
57
  return process_->Kill(SIGSTOP);
2189
57
}
2190
2191
58
Status ExternalDaemon::Resume() {
2192
58
  if (!process_) 
return Status::OK()0
;
2193
58
  VLOG
(1) << "Resuming " << ProcessNameAndPidStr()0
;
2194
58
  return process_->Kill(SIGCONT);
2195
58
}
2196
2197
0
Status ExternalDaemon::Kill(int signal) {
2198
0
  if (!process_) return Status::OK();
2199
0
  VLOG(1) << "Kill " << ProcessNameAndPidStr() << " with " << signal;
2200
0
  return process_->Kill(signal);
2201
0
}
2202
2203
15.0k
bool ExternalDaemon::IsShutdown() const {
2204
15.0k
  return process_.get() == nullptr;
2205
15.0k
}
2206
2207
13.1k
bool ExternalDaemon::IsProcessAlive() const {
2208
13.1k
  if (IsShutdown()) {
2209
91
    return false;
2210
91
  }
2211
2212
13.0k
  int rc = 0;
2213
13.0k
  Status s = process_->WaitNoBlock(&rc);
2214
  // If the non-blocking Wait "times out", that means the process
2215
  // is running.
2216
13.0k
  return s.IsTimedOut();
2217
13.1k
}
2218
2219
30
pid_t ExternalDaemon::pid() const {
2220
30
  return process_->pid();
2221
30
}
2222
2223
3.17k
void ExternalDaemon::Shutdown() {
2224
3.17k
  if (!process_) 
return1.59k
;
2225
2226
1.58k
  LOG_WITH_PREFIX(INFO) << "Starting Shutdown()";
2227
2228
  // Before we kill the process, store the addresses. If we're told to start again we'll reuse
2229
  // these.
2230
1.58k
  bound_rpc_ = bound_rpc_hostport();
2231
1.58k
  bound_http_ = bound_http_hostport();
2232
2233
1.58k
  if (IsProcessAlive()) {
2234
    // In coverage builds, ask the process nicely to flush coverage info
2235
    // before we kill -9 it. Otherwise, we never get any coverage from
2236
    // external clusters.
2237
1.57k
    FlushCoverage();
2238
2239
1.57k
    if (!FLAGS_external_daemon_heap_profile_prefix.empty()) {
2240
      // The child process has been configured using the HEAPPROFILESIGNAL environment variable to
2241
      // create a heap profile on receiving kHeapProfileSignal.
2242
0
      static const int kWaitMs = 100;
2243
0
      LOG(INFO) << "Sending signal " << kHeapProfileSignal << " to " << ProcessNameAndPidStr()
2244
0
                << " to capture a heap profile. Waiting for " << kWaitMs << " ms afterwards.";
2245
0
      WARN_NOT_OK(process_->Kill(kHeapProfileSignal), "Killing process failed");
2246
0
      std::this_thread::sleep_for(std::chrono::milliseconds(kWaitMs));
2247
0
    }
2248
2249
1.57k
    if (FLAGS_external_daemon_safe_shutdown) {
2250
      // We put 'SIGTERM' in quotes because an unquoted one would be treated as a test failure
2251
      // by our regular expressions in common-test-env.sh.
2252
0
      LOG(INFO) << "Terminating " << ProcessNameAndPidStr() << " using 'SIGTERM' signal";
2253
0
      WARN_NOT_OK(process_->Kill(SIGTERM), "Killing process failed");
2254
0
      int total_delay_ms = 0;
2255
0
      int current_delay_ms = 10;
2256
0
      for (int i = 0; i < 10 && IsProcessAlive(); ++i) {
2257
0
        std::this_thread::sleep_for(std::chrono::milliseconds(current_delay_ms));
2258
0
        total_delay_ms += current_delay_ms;
2259
0
        current_delay_ms += 10;  // will sleep for 10ms, then 20ms, etc.
2260
0
      }
2261
2262
0
      if (IsProcessAlive()) {
2263
0
        LOG(INFO) << "The process " << ProcessNameAndPidStr() << " is still running after "
2264
0
                  << total_delay_ms << " ms, will send SIGKILL";
2265
0
      }
2266
0
    }
2267
2268
1.57k
    if (IsProcessAlive()) {
2269
1.57k
      LOG(INFO) << "Killing " << ProcessNameAndPidStr() << " with SIGKILL";
2270
1.57k
      WARN_NOT_OK(process_->Kill(SIGKILL), "Killing process failed");
2271
1.57k
    }
2272
1.57k
  }
2273
1.58k
  int ret = 0;
2274
1.58k
  WARN_NOT_OK(process_->Wait(&ret), "Waiting on " + exe_);
2275
1.58k
  process_.reset();
2276
1.58k
}
2277
2278
1.57k
void ExternalDaemon::FlushCoverage() {
2279
1.57k
#ifndef COVERAGE_BUILD_
2280
1.57k
  return;
2281
#else
2282
  LOG(FATAL) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid();
2283
  server::GenericServiceProxy proxy(proxy_cache_, bound_rpc_addr());
2284
2285
  server::FlushCoverageRequestPB req;
2286
  server::FlushCoverageResponsePB resp;
2287
  rpc::RpcController rpc;
2288
2289
  // Set a reasonably short timeout, since some of our tests kill servers which
2290
  // are kill -STOPed.
2291
  rpc.set_timeout(MonoDelta::FromMilliseconds(100));
2292
  Status s = proxy.FlushCoverage(req, &resp, &rpc);
2293
  if (s.ok() && !resp.success()) {
2294
    s = STATUS(RemoteError, "Server does not appear to be running a coverage build");
2295
  }
2296
  WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid()));
2297
#endif
2298
1.57k
}
2299
2300
1.57k
std::string ExternalDaemon::ProcessNameAndPidStr() {
2301
1.57k
  return Substitute("$0 with pid $1", exe_, process_->pid());
2302
1.57k
}
2303
2304
71.4k
HostPort ExternalDaemon::bound_rpc_hostport() const {
2305
71.4k
  CHECK(status_);
2306
71.4k
  CHECK_GE(status_->bound_rpc_addresses_size(), 1);
2307
71.4k
  return HostPortFromPB(status_->bound_rpc_addresses(0));
2308
71.4k
}
2309
2310
64.1k
HostPort ExternalDaemon::bound_rpc_addr() const {
2311
64.1k
  return bound_rpc_hostport();
2312
64.1k
}
2313
2314
2.17k
HostPort ExternalDaemon::bound_http_hostport() const {
2315
2.17k
  CHECK(status_);
2316
2.17k
  CHECK_GE(status_->bound_http_addresses_size(), 1);
2317
2.17k
  return HostPortFromPB(status_->bound_http_addresses(0));
2318
2.17k
}
2319
2320
18.4k
const NodeInstancePB& ExternalDaemon::instance_id() const {
2321
18.4k
  CHECK(status_);
2322
18.4k
  return status_->node_instance();
2323
18.4k
}
2324
2325
696
const string& ExternalDaemon::uuid() const {
2326
696
  CHECK(status_);
2327
696
  return status_->node_instance().permanent_uuid();
2328
696
}
2329
2330
Result<int64_t> ExternalDaemon::GetInt64MetricFromHost(const HostPort& hostport,
2331
                                                       const MetricEntityPrototype* entity_proto,
2332
                                                       const char* entity_id,
2333
                                                       const MetricPrototype* metric_proto,
2334
552
                                                       const char* value_field) {
2335
552
  return GetInt64MetricFromHost(hostport, entity_proto->name(), entity_id, metric_proto->name(),
2336
552
                                value_field);
2337
552
}
2338
2339
Result<int64_t> ExternalDaemon::GetInt64MetricFromHost(const HostPort& hostport,
2340
                                                       const char* entity_proto_name,
2341
                                                       const char* entity_id,
2342
                                                       const char* metric_proto_name,
2343
576
                                                       const char* value_field) {
2344
  // Fetch metrics whose name matches the given prototype.
2345
576
  string url = Substitute(
2346
576
      "http://$0/jsonmetricz?metrics=$1",
2347
576
      hostport.ToString(),
2348
576
      metric_proto_name);
2349
576
  EasyCurl curl;
2350
576
  faststring dst;
2351
576
  RETURN_NOT_OK(curl.FetchURL(url, &dst));
2352
2353
  // Parse the results, beginning with the top-level entity array.
2354
576
  JsonReader r(dst.ToString());
2355
576
  RETURN_NOT_OK(r.Init());
2356
576
  vector<const Value*> entities;
2357
576
  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities));
2358
12.2k
  
for (const Value* entity : entities)576
{
2359
    // Find the desired entity.
2360
12.2k
    string type;
2361
12.2k
    RETURN_NOT_OK(r.ExtractString(entity, "type", &type));
2362
12.2k
    if (type != entity_proto_name) {
2363
0
      continue;
2364
0
    }
2365
12.2k
    if (entity_id) {
2366
12.2k
      string id;
2367
12.2k
      RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
2368
12.2k
      if (id != entity_id) {
2369
11.7k
        continue;
2370
11.7k
      }
2371
12.2k
    }
2372
2373
    // Find the desired metric within the entity.
2374
558
    vector<const Value*> metrics;
2375
558
    RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
2376
558
    for (const Value* metric : metrics) {
2377
558
      string name;
2378
558
      RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
2379
558
      if (name != metric_proto_name) {
2380
0
        continue;
2381
0
      }
2382
558
      int64_t value;
2383
558
      RETURN_NOT_OK(r.ExtractInt64(metric, value_field, &value));
2384
558
      return value;
2385
558
    }
2386
558
  }
2387
18
  string msg;
2388
18
  if (entity_id) {
2389
18
    msg = Substitute("Could not find metric $0.$1 for entity $2",
2390
18
                     entity_proto_name, metric_proto_name,
2391
18
                     entity_id);
2392
18
  } else {
2393
0
    msg = Substitute("Could not find metric $0.$1",
2394
0
                     entity_proto_name, metric_proto_name);
2395
0
  }
2396
18
  return STATUS(NotFound, msg);
2397
576
}
2398
2399
1.66k
string ExternalDaemon::LogPrefix() {
2400
1.66k
  return Format("{ daemon_id: $0 bound_rpc: $1 } ", daemon_id_, bound_rpc_);
2401
1.66k
}
2402
2403
4
void ExternalDaemon::SetLogListener(StringListener* listener) {
2404
4
  stdout_tailer_thread_->SetListener(listener);
2405
4
  stderr_tailer_thread_->SetListener(listener);
2406
4
}
2407
2408
4
void ExternalDaemon::RemoveLogListener(StringListener* listener) {
2409
4
  stdout_tailer_thread_->RemoveListener(listener);
2410
4
  stderr_tailer_thread_->RemoveListener(listener);
2411
4
}
2412
2413
29
Result<string> ExternalDaemon::GetFlag(const std::string& flag) {
2414
29
  server::GenericServiceProxy proxy(proxy_cache_, bound_rpc_addr());
2415
2416
29
  rpc::RpcController controller;
2417
29
  controller.set_timeout(MonoDelta::FromSeconds(30));
2418
29
  server::GetFlagRequestPB req;
2419
29
  server::GetFlagResponsePB resp;
2420
29
  req.set_flag(flag);
2421
29
  RETURN_NOT_OK(proxy.GetFlag(req, &resp, &controller));
2422
29
  if (!resp.valid()) {
2423
0
    return STATUS_FORMAT(RemoteError, "Failed to get gflag $0 value.", flag);
2424
0
  }
2425
29
  return resp.value();
2426
29
}
2427
2428
Result<int64_t> ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto,
2429
                                               const char* entity_id,
2430
                                               const MetricPrototype* metric_proto,
2431
480
                                               const char* value_field) const {
2432
480
  return GetInt64MetricFromHost(
2433
480
      bound_http_hostport(), entity_proto, entity_id, metric_proto, value_field);
2434
480
}
2435
2436
Result<int64_t> ExternalDaemon::GetInt64Metric(const char* entity_proto_name,
2437
                                               const char* entity_id,
2438
                                               const char* metric_proto_name,
2439
24
                                               const char* value_field) const {
2440
24
  return GetInt64MetricFromHost(
2441
24
      bound_http_hostport(), entity_proto_name, entity_id, metric_proto_name, value_field);
2442
24
}
2443
2444
LogWaiter::LogWaiter(ExternalDaemon* daemon, const std::string& string_to_wait) :
2445
3
    daemon_(daemon), string_to_wait_(string_to_wait) {
2446
3
  daemon_->SetLogListener(this);
2447
3
}
2448
2449
73
void LogWaiter::Handle(const GStringPiece& s) {
2450
73
  if (s.contains(string_to_wait_)) {
2451
3
    event_occurred_ = true;
2452
3
  }
2453
73
}
2454
2455
3
Status LogWaiter::WaitFor(const MonoDelta timeout) {
2456
3
  constexpr auto kInitialWaitPeriod = 100ms;
2457
3
  return ::yb::WaitFor(
2458
17
      [this]{ return event_occurred_.load(); }, timeout,
2459
3
      Format("Waiting for log record '$0' on $1...", string_to_wait_, daemon_->id()),
2460
3
      kInitialWaitPeriod);
2461
3
}
2462
2463
3
LogWaiter::~LogWaiter() {
2464
3
  daemon_->RemoveLogListener(this);
2465
3
}
2466
2467
//------------------------------------------------------------
2468
// ScopedResumeExternalDaemon
2469
//------------------------------------------------------------
2470
2471
ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon)
2472
18
    : daemon_(CHECK_NOTNULL(daemon)) {
2473
18
}
2474
2475
18
ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
2476
18
  CHECK_OK(daemon_->Resume());
2477
18
}
2478
2479
//------------------------------------------------------------
2480
// ExternalMaster
2481
//------------------------------------------------------------
2482
ExternalMaster::ExternalMaster(
2483
    size_t master_index,
2484
    rpc::Messenger* messenger,
2485
    rpc::ProxyCache* proxy_cache,
2486
    const string& exe,
2487
    const string& data_dir,
2488
    const std::vector<string>& extra_flags,
2489
    const string& rpc_bind_address,
2490
    uint16_t http_port,
2491
    const string& master_addrs)
2492
    : ExternalDaemon(Substitute("m-$0", master_index + 1), messenger,
2493
                     proxy_cache, exe, data_dir,
2494
                     {GetServerTypeDataPath(data_dir, "master")}, extra_flags),
2495
      rpc_bind_address_(rpc_bind_address),
2496
      master_addrs_(master_addrs),
2497
652
      http_port_(http_port) {
2498
652
}
2499
2500
518
ExternalMaster::~ExternalMaster() {
2501
518
}
2502
2503
namespace {
2504
2505
class Flags {
2506
 public:
2507
  template <class Value>
2508
22.9k
  void Add(const std::string& name, const Value& value) {
2509
22.9k
    value_.push_back(Format("--$0=$1", name, value));
2510
22.9k
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<yb::HostPort>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::HostPort const&)
Line
Count
Source
2508
5.41k
  void Add(const std::string& name, const Value& value) {
2509
5.41k
    value_.push_back(Format("--$0=$1", name, value));
2510
5.41k
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
Line
Count
Source
2508
6.05k
  void Add(const std::string& name, const Value& value) {
2509
6.05k
    value_.push_back(Format("--$0=$1", name, value));
2510
6.05k
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<char [10]>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, char const (&) [10])
Line
Count
Source
2508
675
  void Add(const std::string& name, const Value& value) {
2509
675
    value_.push_back(Format("--$0=$1", name, value));
2510
675
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<unsigned short>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned short const&)
Line
Count
Source
2508
6.09k
  void Add(const std::string& name, const Value& value) {
2509
6.09k
    value_.push_back(Format("--$0=$1", name, value));
2510
6.09k
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<int>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int const&)
Line
Count
Source
2508
675
  void Add(const std::string& name, const Value& value) {
2509
675
    value_.push_back(Format("--$0=$1", name, value));
2510
675
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<bool>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, bool const&)
Line
Count
Source
2508
1.35k
  void Add(const std::string& name, const Value& value) {
2509
1.35k
    value_.push_back(Format("--$0=$1", name, value));
2510
1.35k
  }
external_mini_cluster.cc:void yb::(anonymous namespace)::Flags::Add<char [3]>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, char const (&) [3])
Line
Count
Source
2508
2.70k
  void Add(const std::string& name, const Value& value) {
2509
2.70k
    value_.push_back(Format("--$0=$1", name, value));
2510
2.70k
  }
2511
2512
5.41k
  void AddHostPort(const std::string& name, const std::string& host, uint16_t port) {
2513
5.41k
    Add(name, HostPort(host, port));
2514
5.41k
  }
2515
2516
2.02k
  const std::vector<std::string>& value() const {
2517
2.02k
    return value_;
2518
2.02k
  }
2519
2520
 private:
2521
  std::vector<std::string> value_;
2522
};
2523
2524
} // namespace
2525
2526
675
Status ExternalMaster::Start(bool shell_mode) {
2527
675
  Flags flags;
2528
675
  flags.Add("fs_data_dirs", root_dir_);
2529
675
  flags.Add("rpc_bind_addresses", rpc_bind_address_);
2530
675
  flags.Add("webserver_interface", "localhost");
2531
675
  flags.Add("webserver_port", http_port_);
2532
  // Default master args to make sure we don't wait to trigger new LB tasks upon master leader
2533
  // failover.
2534
675
  flags.Add("load_balancer_initial_delay_secs", 0);
2535
  // On first start, we need to tell the masters their list of expected peers.
2536
  // For 'shell' master, there is no master addresses.
2537
675
  if (!shell_mode) {
2538
642
    flags.Add("master_addresses", master_addrs_);
2539
642
  }
2540
675
  RETURN_NOT_OK(StartProcess(flags.value()));
2541
675
  return Status::OK();
2542
675
}
2543
2544
23
Status ExternalMaster::Restart() {
2545
23
  LOG_WITH_PREFIX(INFO) << "Restart()";
2546
23
  if (!IsProcessAlive()) {
2547
    // Make sure this function could be safely called if the process has already crashed.
2548
23
    Shutdown();
2549
23
  }
2550
  // We store the addresses on shutdown so make sure we did that first.
2551
23
  if (bound_rpc_.port() == 0) {
2552
0
    return STATUS(IllegalState, "Master cannot be restarted. Must call Shutdown() first.");
2553
0
  }
2554
23
  return Start(true);
2555
23
}
2556
2557
//------------------------------------------------------------
2558
// ExternalTabletServer
2559
//------------------------------------------------------------
2560
2561
ExternalTabletServer::ExternalTabletServer(
2562
    size_t tablet_server_index, rpc::Messenger* messenger, rpc::ProxyCache* proxy_cache,
2563
    const std::string& exe, const std::string& data_dir, uint16_t num_drives,
2564
    std::string bind_host, uint16_t rpc_port, uint16_t http_port, uint16_t redis_rpc_port,
2565
    uint16_t redis_http_port, uint16_t cql_rpc_port, uint16_t cql_http_port,
2566
    uint16_t pgsql_rpc_port, uint16_t pgsql_http_port,
2567
    const std::vector<HostPort>& master_addrs, const std::vector<std::string>& extra_flags)
2568
    : ExternalDaemon(Substitute("ts-$0", tablet_server_index + 1),
2569
                     messenger, proxy_cache, exe, data_dir,
2570
                     FsDataDirs(data_dir, "tserver", num_drives), extra_flags),
2571
      master_addrs_(HostPort::ToCommaSeparatedString(master_addrs)),
2572
      bind_host_(std::move(bind_host)),
2573
      rpc_port_(rpc_port),
2574
      http_port_(http_port),
2575
      redis_rpc_port_(redis_rpc_port),
2576
      redis_http_port_(redis_http_port),
2577
      pgsql_rpc_port_(pgsql_rpc_port),
2578
      pgsql_http_port_(pgsql_http_port),
2579
      cql_rpc_port_(cql_rpc_port),
2580
      cql_http_port_(cql_http_port),
2581
1.29k
      num_drives_(num_drives) {}
2582
2583
986
ExternalTabletServer::~ExternalTabletServer() {
2584
986
}
2585
2586
Status ExternalTabletServer::Start(
2587
    bool start_cql_proxy, bool set_proxy_addrs,
2588
1.35k
    std::vector<std::pair<string, string>> extra_flags) {
2589
1.35k
  auto dirs = FsRootDirs(root_dir_, num_drives_);
2590
1.38k
  for (const auto& dir : dirs) {
2591
1.38k
    RETURN_NOT_OK(Env::Default()->CreateDirs(dir));
2592
1.38k
  }
2593
1.35k
  start_cql_proxy_ = start_cql_proxy;
2594
1.35k
  Flags flags;
2595
1.35k
  flags.Add("fs_data_dirs", JoinStrings(dirs, ","));
2596
1.35k
  flags.AddHostPort("rpc_bind_addresses", bind_host_, rpc_port_);
2597
1.35k
  flags.Add("webserver_interface", bind_host_);
2598
1.35k
  flags.Add("webserver_port", http_port_);
2599
1.35k
  flags.Add("redis_proxy_webserver_port", redis_http_port_);
2600
1.35k
  flags.Add("pgsql_proxy_webserver_port", pgsql_http_port_);
2601
1.35k
  flags.Add("cql_proxy_webserver_port", cql_http_port_);
2602
2603
1.35k
  if (set_proxy_addrs) {
2604
1.35k
    flags.AddHostPort("redis_proxy_bind_address", bind_host_, redis_rpc_port_);
2605
1.35k
    flags.AddHostPort("pgsql_proxy_bind_address", bind_host_, pgsql_rpc_port_);
2606
1.35k
    flags.AddHostPort("cql_proxy_bind_address", bind_host_, cql_rpc_port_);
2607
1.35k
  }
2608
2609
1.35k
  flags.Add("start_cql_proxy", start_cql_proxy_);
2610
1.35k
  flags.Add("tserver_master_addrs", master_addrs_);
2611
2612
  // Use conservative number of threads for the mini cluster for unit test env
2613
  // where several unit tests tend to run in parallel.
2614
1.35k
  flags.Add("tablet_server_svc_num_threads", "64");
2615
1.35k
  flags.Add("ts_consensus_svc_num_threads", "20");
2616
2617
1.35k
  for (const auto& flag_value : extra_flags) {
2618
4
    flags.Add(flag_value.first, flag_value.second);
2619
4
  }
2620
2621
1.35k
  RETURN_NOT_OK(StartProcess(flags.value()));
2622
2623
1.33k
  return Status::OK();
2624
1.35k
}
2625
2626
1.33k
Status ExternalTabletServer::BuildServerStateFromInfoPath() {
2627
1.33k
  RETURN_NOT_OK(ExternalDaemon::BuildServerStateFromInfoPath());
2628
1.33k
  if (start_cql_proxy_) {
2629
1.33k
    RETURN_NOT_OK(ExternalDaemon::BuildServerStateFromInfoPath(GetCQLServerInfoPath(),
2630
1.33k
                                                               &cqlserver_status_));
2631
1.33k
  }
2632
1.33k
  return Status::OK();
2633
1.33k
}
2634
2635
158k
string ExternalTabletServer::GetCQLServerInfoPath() {
2636
158k
  return ExternalDaemon::GetServerInfoPath() + "-cql";
2637
158k
}
2638
2639
410k
bool ExternalTabletServer::ServerInfoPathsExist() {
2640
410k
  if (start_cql_proxy_) {
2641
409k
    return ExternalDaemon::ServerInfoPathsExist() &&
2642
409k
        
Env::Default()->FileExists(GetCQLServerInfoPath())156k
;
2643
409k
  }
2644
880
  return ExternalDaemon::ServerInfoPathsExist();
2645
410k
}
2646
2647
1.35k
Status ExternalTabletServer::DeleteServerInfoPaths() {
2648
  // We want to try a deletion for both files.
2649
1.35k
  Status s1 = ExternalDaemon::DeleteServerInfoPaths();
2650
1.35k
  Status s2 = Env::Default()->DeleteFile(GetCQLServerInfoPath());
2651
1.35k
  RETURN_NOT_OK(s1);
2652
64
  RETURN_NOT_OK(s2);
2653
62
  return Status::OK();
2654
64
}
2655
2656
Status ExternalTabletServer::Restart(
2657
59
    bool start_cql_proxy, std::vector<std::pair<string, string>> flags) {
2658
59
  LOG_WITH_PREFIX(INFO) << "Restart: start_cql_proxy=" << start_cql_proxy;
2659
59
  if (!IsProcessAlive()) {
2660
    // Make sure this function could be safely called if the process has already crashed.
2661
59
    Shutdown();
2662
59
  }
2663
  // We store the addresses on shutdown so make sure we did that first.
2664
59
  if (bound_rpc_.port() == 0) {
2665
0
    return STATUS(IllegalState, "Tablet server cannot be restarted. Must call Shutdown() first.");
2666
0
  }
2667
59
  return Start(start_cql_proxy, true /* set_proxy_addrs */, flags);
2668
59
}
2669
2670
Result<int64_t> ExternalTabletServer::GetInt64CQLMetric(const MetricEntityPrototype* entity_proto,
2671
                                                        const char* entity_id,
2672
                                                        const MetricPrototype* metric_proto,
2673
0
                                                        const char* value_field) const {
2674
0
  return GetInt64MetricFromHost(
2675
0
      HostPort(bind_host(), cql_http_port()),
2676
0
      entity_proto, entity_id, metric_proto, value_field);
2677
0
}
2678
2679
1
Status ExternalTabletServer::SetNumDrives(uint16_t num_drives) {
2680
1
  if (IsProcessAlive()) {
2681
0
    return STATUS(IllegalState, "Cann't set num drives on running Tablet server. "
2682
0
                                "Must call Shutdown() first.");
2683
0
  }
2684
1
  num_drives_ = num_drives;
2685
1
  data_dirs_ = FsDataDirs(root_dir_, "tserver", num_drives_);
2686
1
  return Status::OK();
2687
1
}
2688
2689
2
Status RestartAllMasters(ExternalMiniCluster* cluster) {
2690
4
  for (size_t i = 0; i != cluster->num_masters(); 
++i2
) {
2691
2
    cluster->master(i)->Shutdown();
2692
2
  }
2693
4
  for (size_t i = 0; i != cluster->num_masters(); 
++i2
) {
2694
2
    RETURN_NOT_OK(cluster->master(i)->Restart());
2695
2
  }
2696
2697
2
  return Status::OK();
2698
2
}
2699
2700
}  // namespace yb