YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/mini_master.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/master/mini_master.h"
34
35
#include <string>
36
37
#include <glog/logging.h>
38
39
#include "yb/master/catalog_manager.h"
40
#include "yb/master/master.h"
41
42
#include "yb/rpc/messenger.h"
43
44
#include "yb/util/net/net_util.h"
45
#include "yb/util/net/sockaddr.h"
46
#include "yb/util/net/tunnel.h"
47
#include "yb/util/status.h"
48
49
using strings::Substitute;
50
51
DECLARE_bool(TEST_simulate_fs_create_failure);
52
DECLARE_bool(rpc_server_allow_ephemeral_ports);
53
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
54
DECLARE_int32(TEST_nodes_per_cloud);
55
DECLARE_bool(durable_wal_write);
56
57
namespace yb {
58
namespace master {
59
60
MiniMaster::MiniMaster(Env* env, string fs_root, uint16_t rpc_port, uint16_t web_port, int index)
61
    : running_(false),
62
      env_(env),
63
      fs_root_(std::move(fs_root)),
64
      rpc_port_(rpc_port),
65
      web_port_(web_port),
66
1.13k
      index_(index + 1) {}
67
68
78
MiniMaster::~MiniMaster() {
69
78
  if (running_) {
70
0
    LOG(WARNING) << "MiniMaster destructor called without clean shutdown for: "
71
0
                 << bound_rpc_addr_str();
72
0
  }
73
78
}
74
75
55
Status MiniMaster::Start(bool TEST_simulate_fs_create_failure) {
76
55
  CHECK(!running_);
77
55
  FLAGS_rpc_server_allow_ephemeral_ports = true;
78
55
  FLAGS_TEST_simulate_fs_create_failure = TEST_simulate_fs_create_failure;
79
  // Disable WAL fsync for tests
80
55
  FLAGS_durable_wal_write = false;
81
55
  RETURN_NOT_OK(StartOnPorts(rpc_port_, web_port_));
82
53
  return master_->WaitForCatalogManagerInit();
83
55
}
84
85
86
1.07k
Status MiniMaster::StartDistributedMaster(const vector<uint16_t>& peer_ports) {
87
1.07k
  CHECK(!running_);
88
1.07k
  return StartDistributedMasterOnPorts(rpc_port_, web_port_, peer_ports);
89
1.07k
}
90
91
98
void MiniMaster::Shutdown() {
92
98
  if (tunnel_) {
93
95
    tunnel_->Shutdown();
94
95
  }
95
98
  if (running_) {
96
94
    master_->Shutdown();
97
94
  }
98
98
  tunnel_.reset();
99
98
  running_ = false;
100
98
  master_.reset();
101
98
}
102
103
55
Status MiniMaster::StartOnPorts(uint16_t rpc_port, uint16_t web_port) {
104
55
  CHECK(!running_);
105
55
  CHECK(!master_);
106
107
55
  auto master_addresses = std::make_shared<server::MasterAddresses>();
108
55
  if (pass_master_addresses_) {
109
55
    HostPort local_host_port;
110
55
    RETURN_NOT_OK(local_host_port.ParseString(
111
55
        server::TEST_RpcBindEndpoint(index_, rpc_port), rpc_port));
112
55
    master_addresses->push_back({local_host_port});
113
55
  }
114
55
  MasterOptions opts(master_addresses);
115
116
55
  Status start_status = StartOnPorts(rpc_port, web_port, &opts);
117
55
  if (!start_status.ok()) {
118
2
    LOG(ERROR) << "MiniMaster failed to start on RPC port " << rpc_port
119
2
               << ", web port " << web_port << ": " << start_status;
120
    // Don't crash here. Handle the error in the caller (e.g. could retry there).
121
2
  }
122
55
  return start_status;
123
55
}
124
125
Status MiniMaster::StartOnPorts(uint16_t rpc_port, uint16_t web_port,
126
1.13k
                                MasterOptions* opts) {
127
1.13k
  opts->rpc_opts.rpc_bind_addresses = server::TEST_RpcBindEndpoint(index_, rpc_port);
128
1.13k
  opts->webserver_opts.port = web_port;
129
1.13k
  opts->fs_opts.wal_paths = { fs_root_ };
130
1.13k
  opts->fs_opts.data_paths = { fs_root_ };
131
  // A.B.C.D.xip.io resolves to A.B.C.D so it is very useful for testing.
132
1.13k
  opts->broadcast_addresses = {
133
1.13k
      HostPort(server::TEST_RpcAddress(index_, server::Private::kFalse), rpc_port) };
134
135
1.13k
  if (!opts->has_placement_cloud()) {
136
1.13k
    opts->SetPlacement(
137
1.13k
        Format("cloud$0", (index_ + 1) / FLAGS_TEST_nodes_per_cloud),
138
1.13k
        Format("rack$0", index_), "zone");
139
1.13k
  }
140
141
1.13k
  std::unique_ptr<Master> server(new enterprise::Master(*opts));
142
1.13k
  RETURN_NOT_OK(server->Init());
143
144
1.13k
  server::TEST_SetupConnectivity(server->messenger(), index_);
145
146
1.13k
  RETURN_NOT_OK(server->StartAsync());
147
148
1.13k
  master_.swap(server);
149
150
1.13k
  tunnel_ = std::make_unique<Tunnel>(&master_->messenger()->io_service());
151
1.13k
  std::vector<Endpoint> local;
152
1.13k
  RETURN_NOT_OK(opts->broadcast_addresses[0].ResolveAddresses(&local));
153
1.13k
  Endpoint remote = VERIFY_RESULT(ParseEndpoint(opts->rpc_opts.rpc_bind_addresses, 0));
154
1.13k
  RETURN_NOT_OK(tunnel_->Start(local.front(), remote));
155
156
1.13k
  running_ = true;
157
158
1.13k
  return Status::OK();
159
1.13k
}
160
161
Status MiniMaster::StartDistributedMasterOnPorts(uint16_t rpc_port, uint16_t web_port,
162
1.07k
                                                 const vector<uint16_t>& peer_ports) {
163
1.07k
  CHECK(!running_);
164
1.07k
  CHECK(!master_);
165
166
1.07k
  auto peer_addresses = std::make_shared<server::MasterAddresses>();
167
1.07k
  if (pass_master_addresses_) {
168
1.07k
    peer_addresses->resize(peer_ports.size());
169
170
1.07k
    int index = 0;
171
1.32k
    for (uint16_t peer_port : peer_ports) {
172
1.32k
      auto& addresses = (*peer_addresses)[index];
173
1.32k
      ++index;
174
1.32k
      addresses.push_back(VERIFY_RESULT(HostPort::FromString(
175
1.32k
          server::TEST_RpcBindEndpoint(index, peer_port), peer_port)));
176
1.32k
      addresses.push_back(VERIFY_RESULT(HostPort::FromString(
177
1.32k
          server::TEST_RpcAddress(index, server::Private::kFalse), peer_port)));
178
1.32k
    }
179
1.07k
  }
180
1.07k
  MasterOptions opts(peer_addresses);
181
182
1.07k
  return StartOnPorts(rpc_port, web_port, &opts);
183
1.07k
}
184
185
7
Status MiniMaster::Restart() {
186
7
  CHECK(running_);
187
188
7
  auto prev_rpc = bound_rpc_addr();
189
7
  Endpoint prev_http = bound_http_addr();
190
7
  auto master_addresses = master_->opts().GetMasterAddresses();
191
7
  Shutdown();
192
193
7
  MasterOptions opts(master_addresses);
194
7
  RETURN_NOT_OK(StartOnPorts(prev_rpc.port(), prev_http.port(), &opts));
195
7
  CHECK(running_);
196
7
  return WaitForCatalogManagerInit();
197
7
}
198
199
957
Status MiniMaster::WaitForCatalogManagerInit() {
200
957
  RETURN_NOT_OK(master_->catalog_manager()->WaitForWorkerPoolTests());
201
957
  return master_->WaitForCatalogManagerInit();
202
957
}
203
204
0
Status MiniMaster::WaitUntilCatalogManagerIsLeaderAndReadyForTests() {
205
0
  return master_->WaitUntilCatalogManagerIsLeaderAndReadyForTests();
206
0
}
207
208
1.89k
HostPort MiniMaster::bound_rpc_addr() const {
209
1.89k
  CHECK(running_);
210
1.89k
  return HostPort::FromBoundEndpoint(master_->first_rpc_address());
211
1.89k
}
212
213
9
Endpoint MiniMaster::bound_http_addr() const {
214
9
  CHECK(running_);
215
9
  return master_->first_http_address();
216
9
}
217
218
0
std::string MiniMaster::permanent_uuid() const {
219
0
  CHECK(master_);
220
0
  return DCHECK_NOTNULL(master_->fs_manager())->uuid();
221
0
}
222
223
238
std::string MiniMaster::bound_rpc_addr_str() const {
224
238
  return bound_rpc_addr().ToString();
225
238
}
226
227
170
CatalogManagerIf& MiniMaster::catalog_manager() const {
228
170
  return *master_->catalog_manager();
229
170
}
230
231
19
CatalogManager& MiniMaster::catalog_manager_impl() const {
232
19
  return *master_->catalog_manager_impl();
233
19
}
234
235
2
tablet::TabletPeerPtr MiniMaster::tablet_peer() const {
236
2
  return catalog_manager().tablet_peer();
237
2
}
238
239
0
rpc::Messenger& MiniMaster::messenger() const {
240
0
  return *master_->messenger();
241
0
}
242
243
0
master::SysCatalogTable& MiniMaster::sys_catalog() const {
244
0
  return *catalog_manager().sys_catalog();
245
0
}
246
247
52.2k
master::TSManager& MiniMaster::ts_manager() const {
248
52.2k
  return *master_->ts_manager();
249
52.2k
}
250
251
0
master::FlushManager& MiniMaster::flush_manager() const {
252
0
  return *master_->flush_manager();
253
0
}
254
255
0
FsManager& MiniMaster::fs_manager() const {
256
0
  return *master_->fs_manager();
257
0
}
258
259
} // namespace master
260
} // namespace yb