/Users/deen/code/yugabyte-db/src/yb/tserver/mini_tablet_server.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/mini_tablet_server.h" |
34 | | |
35 | | #include <functional> |
36 | | #include <memory> |
37 | | #include <string> |
38 | | #include <utility> |
39 | | |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/common/index.h" |
43 | | #include "yb/common/partition.h" |
44 | | #include "yb/common/schema.h" |
45 | | |
46 | | #include "yb/consensus/consensus.pb.h" |
47 | | |
48 | | #include "yb/encryption/encrypted_file_factory.h" |
49 | | #include "yb/encryption/header_manager_impl.h" |
50 | | #include "yb/encryption/universe_key_manager.h" |
51 | | |
52 | | #include "yb/rocksutil/rocksdb_encrypted_file_factory.h" |
53 | | |
54 | | #include "yb/rpc/messenger.h" |
55 | | |
56 | | #include "yb/server/rpc_server.h" |
57 | | |
58 | | #include "yb/tablet/tablet-harness.h" |
59 | | #include "yb/tablet/tablet.h" |
60 | | #include "yb/tablet/tablet_metadata.h" |
61 | | #include "yb/tablet/tablet_peer.h" |
62 | | |
63 | | #include "yb/tserver/tablet_server.h" |
64 | | #include "yb/tserver/ts_tablet_manager.h" |
65 | | |
66 | | #include "yb/util/flag_tags.h" |
67 | | #include "yb/util/net/sockaddr.h" |
68 | | #include "yb/util/net/tunnel.h" |
69 | | #include "yb/util/scope_exit.h" |
70 | | #include "yb/util/status.h" |
71 | | |
72 | | using std::pair; |
73 | | |
74 | | using yb::consensus::Consensus; |
75 | | using yb::consensus::ConsensusOptions; |
76 | | using yb::consensus::RaftPeerPB; |
77 | | using yb::consensus::RaftConfigPB; |
78 | | using yb::log::Log; |
79 | | using strings::Substitute; |
80 | | using yb::tablet::TabletPeer; |
81 | | |
82 | | DECLARE_bool(rpc_server_allow_ephemeral_ports); |
83 | | DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
84 | | DECLARE_int32(TEST_nodes_per_cloud); |
85 | | |
86 | | DEFINE_test_flag(bool, private_broadcast_address, false, |
87 | | "Use private address for broadcast address in tests."); |
88 | | |
89 | | namespace yb { |
90 | | namespace tserver { |
91 | | |
92 | | MiniTabletServer::MiniTabletServer(const std::vector<std::string>& wal_paths, |
93 | | const std::vector<std::string>& data_paths, |
94 | | uint16_t rpc_port, |
95 | | const TabletServerOptions& extra_opts, int index) |
96 | | : started_(false), |
97 | | opts_(extra_opts), |
98 | | index_(index + 1), |
99 | | universe_key_manager_(new encryption::UniverseKeyManager()), |
100 | | encrypted_env_(NewEncryptedEnv(encryption::DefaultHeaderManager(universe_key_manager_.get()))), |
101 | | rocksdb_encrypted_env_( |
102 | 1.52k | NewRocksDBEncryptedEnv(encryption::DefaultHeaderManager(universe_key_manager_.get()))) { |
103 | | |
104 | | // Start RPC server on loopback. |
105 | 1.52k | FLAGS_rpc_server_allow_ephemeral_ports = true; |
106 | 1.52k | opts_.rpc_opts.rpc_bind_addresses = server::TEST_RpcBindEndpoint(index_, rpc_port); |
107 | | // A.B.C.D.xip.io resolves to A.B.C.D so it is very useful for testing. |
108 | 1.52k | opts_.broadcast_addresses = { |
109 | 1.52k | HostPort(server::TEST_RpcAddress(index_, |
110 | 1.52k | server::Private(FLAGS_TEST_private_broadcast_address)), |
111 | 1.52k | rpc_port) }; |
112 | 1.52k | opts_.webserver_opts.port = 0; |
113 | 1.52k | opts_.webserver_opts.bind_interface = opts_.broadcast_addresses.front().host(); |
114 | 1.52k | if (!opts_.has_placement_cloud()) { |
115 | 1.52k | opts_.SetPlacement(Format("cloud$0", (index_ + 1) / FLAGS_TEST_nodes_per_cloud), |
116 | 1.52k | Format("rack$0", index_), "zone"); |
117 | 1.52k | } |
118 | 1.52k | opts_.fs_opts.wal_paths = wal_paths; |
119 | 1.52k | opts_.fs_opts.data_paths = data_paths; |
120 | 1.52k | opts_.universe_key_manager = universe_key_manager_.get(); |
121 | 1.52k | opts_.env = encrypted_env_.get(); |
122 | 1.52k | opts_.rocksdb_env = rocksdb_encrypted_env_.get(); |
123 | 1.52k | } |
124 | | |
125 | | MiniTabletServer::MiniTabletServer(const string& fs_root, |
126 | | uint16_t rpc_port, |
127 | | const TabletServerOptions& extra_opts, |
128 | | int index) |
129 | 1.49k | : MiniTabletServer({ fs_root }, { fs_root }, rpc_port, extra_opts, index) { |
130 | 1.49k | } Unexecuted instantiation: yb::tserver::MiniTabletServer::MiniTabletServer(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned short, yb::tserver::TabletServerOptions const&, int) yb::tserver::MiniTabletServer::MiniTabletServer(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned short, yb::tserver::TabletServerOptions const&, int) Line | Count | Source | 129 | 1.49k | : MiniTabletServer({ fs_root }, { fs_root }, rpc_port, extra_opts, index) { | 130 | 1.49k | } |
|
131 | | |
132 | 210 | MiniTabletServer::~MiniTabletServer() { |
133 | 210 | } |
134 | | |
135 | | Result<std::unique_ptr<MiniTabletServer>> MiniTabletServer::CreateMiniTabletServer( |
136 | 35 | const string& fs_root, uint16_t rpc_port, int index) { |
137 | 35 | auto options_result = TabletServerOptions::CreateTabletServerOptions(); |
138 | 35 | RETURN_NOT_OK(options_result); |
139 | 35 | return std::make_unique<MiniTabletServer>(fs_root, rpc_port, *options_result, index); |
140 | 35 | } |
141 | | |
142 | 1.52k | Status MiniTabletServer::Start() { |
143 | 1.52k | CHECK(!started_); |
144 | | |
145 | 1.52k | std::unique_ptr<TabletServer> server(new enterprise::TabletServer(opts_)); |
146 | 1.52k | RETURN_NOT_OK(server->Init()); |
147 | | |
148 | 1.52k | RETURN_NOT_OK(server->Start()); |
149 | | |
150 | 1.43k | server_.swap(server); |
151 | | |
152 | 1.43k | RETURN_NOT_OK(Reconnect()); |
153 | | |
154 | 1.43k | started_ = true; |
155 | 1.43k | return Status::OK(); |
156 | 1.43k | } |
157 | | |
158 | 0 | void MiniTabletServer::Isolate() { |
159 | 0 | server::TEST_Isolate(server_->messenger()); |
160 | 0 | tunnel_->Shutdown(); |
161 | 0 | } |
162 | | |
163 | 838 | Status MiniTabletServer::Reconnect() { |
164 | 838 | server::TEST_SetupConnectivity(server_->messenger(), index_); |
165 | | |
166 | 838 | if (FLAGS_TEST_private_broadcast_address) { |
167 | 8 | return Status::OK(); |
168 | 8 | } |
169 | | |
170 | 830 | tunnel_ = std::make_unique<Tunnel>(&server_->messenger()->io_service()); |
171 | 830 | auto started_tunnel = false; |
172 | 830 | auto se = ScopeExit([this, &started_tunnel] { |
173 | 829 | if (!started_tunnel) { |
174 | 1 | tunnel_->Shutdown(); |
175 | 1 | } |
176 | 829 | }); |
177 | | |
178 | 830 | std::vector<Endpoint> local; |
179 | 830 | RETURN_NOT_OK(opts_.broadcast_addresses[0].ResolveAddresses(&local)); |
180 | 830 | Endpoint remote = VERIFY_RESULT(ParseEndpoint(opts_.rpc_opts.rpc_bind_addresses, 0)); |
181 | 830 | RETURN_NOT_OK(tunnel_->Start( |
182 | 830 | local.front(), remote, [messenger = server_->messenger()](const IpAddress& address) { |
183 | 830 | return !messenger->TEST_ShouldArtificiallyRejectIncomingCallsFrom(address); |
184 | 830 | })); |
185 | 829 | started_tunnel = true; |
186 | 829 | return Status::OK(); |
187 | 830 | } |
188 | | |
189 | 14 | Status MiniTabletServer::WaitStarted() { |
190 | 14 | return server_->WaitInited(); |
191 | 14 | } |
192 | | |
193 | 99 | void MiniTabletServer::Shutdown() { |
194 | 99 | if (tunnel_) { |
195 | 99 | tunnel_->Shutdown(); |
196 | 99 | } |
197 | 99 | if (started_) { |
198 | | // Save bind address and port so we can later restart the server. |
199 | 99 | opts_.rpc_opts.rpc_bind_addresses = server::TEST_RpcBindEndpoint( |
200 | 99 | index_, bound_rpc_addr().port()); |
201 | 99 | opts_.webserver_opts.port = bound_http_addr().port(); |
202 | 99 | server_->Shutdown(); |
203 | 99 | tunnel_.reset(); |
204 | 99 | server_.reset(); |
205 | 99 | } |
206 | 99 | started_ = false; |
207 | 99 | } |
208 | | |
209 | | namespace { |
210 | | |
211 | | CHECKED_STATUS ForAllTablets( |
212 | | MiniTabletServer* mts, |
213 | 0 | std::function<Status(TabletPeer* tablet_peer)> action) { |
214 | 0 | if (!mts->server()) { |
215 | 0 | return STATUS(IllegalState, "Server is not running"); |
216 | 0 | } |
217 | 0 | auto tablets = mts->server()->tablet_manager()->GetTabletPeers(); |
218 | 0 | for (const auto& tablet : tablets) { |
219 | 0 | RETURN_NOT_OK(action(tablet.get())); |
220 | 0 | } |
221 | 0 | return Status::OK(); |
222 | 0 | } |
223 | | |
224 | | } // namespace |
225 | | |
226 | 0 | Status MiniTabletServer::FlushTablets(tablet::FlushMode mode, tablet::FlushFlags flags) { |
227 | 0 | if (!server_) { |
228 | 0 | return Status::OK(); |
229 | 0 | } |
230 | 0 | return ForAllTablets(this, [mode, flags](TabletPeer* tablet_peer) -> Status { |
231 | 0 | if (!tablet_peer->tablet()) { |
232 | 0 | return Status::OK(); |
233 | 0 | } |
234 | 0 | return tablet_peer->tablet()->Flush(mode, flags); |
235 | 0 | }); |
236 | 0 | } |
237 | | |
238 | 0 | Status MiniTabletServer::CompactTablets() { |
239 | 0 | if (!server_) { |
240 | 0 | return Status::OK(); |
241 | 0 | } |
242 | 0 | return ForAllTablets(this, [](TabletPeer* tablet_peer) { |
243 | 0 | if (tablet_peer->tablet()) { |
244 | 0 | tablet_peer->tablet()->ForceRocksDBCompactInTest(); |
245 | 0 | } |
246 | 0 | return Status::OK(); |
247 | 0 | }); |
248 | 0 | } |
249 | | |
250 | 0 | Status MiniTabletServer::SwitchMemtables() { |
251 | 0 | return ForAllTablets(this, [](TabletPeer* tablet_peer) { |
252 | 0 | return tablet_peer->tablet()->TEST_SwitchMemtable(); |
253 | 0 | }); |
254 | 0 | } |
255 | | |
256 | 0 | Status MiniTabletServer::CleanTabletLogs() { |
257 | 0 | if (!server_) { |
258 | | // Nothing to clean. |
259 | 0 | return Status::OK(); |
260 | 0 | } |
261 | 0 | return ForAllTablets(this, [](TabletPeer* tablet_peer) { |
262 | 0 | return tablet_peer->RunLogGC(); |
263 | 0 | }); |
264 | 0 | } |
265 | | |
266 | 0 | Status MiniTabletServer::Restart() { |
267 | 0 | CHECK(started_); |
268 | 0 | Shutdown(); |
269 | 0 | return Start(); |
270 | 0 | } |
271 | | |
272 | 0 | Status MiniTabletServer::RestartStoppedServer() { |
273 | 0 | Shutdown(); |
274 | 0 | return Start(); |
275 | 0 | } |
276 | | |
277 | 40 | RaftConfigPB MiniTabletServer::CreateLocalConfig() const { |
278 | 40 | CHECK(started_) << "Must Start()"0 ; |
279 | 40 | RaftConfigPB config; |
280 | 40 | RaftPeerPB* peer = config.add_peers(); |
281 | 40 | peer->set_permanent_uuid(server_->instance_pb().permanent_uuid()); |
282 | 40 | peer->set_member_type(consensus::PeerMemberType::VOTER); |
283 | 40 | auto host_port = peer->mutable_last_known_private_addr()->Add(); |
284 | 40 | host_port->set_host(bound_rpc_addr().address().to_string()); |
285 | 40 | host_port->set_port(bound_rpc_addr().port()); |
286 | 40 | return config; |
287 | 40 | } |
288 | | |
289 | | Status MiniTabletServer::AddTestTablet(const std::string& ns_id, |
290 | | const std::string& table_id, |
291 | | const std::string& tablet_id, |
292 | | const Schema& schema, |
293 | 31 | TableType table_type) { |
294 | 31 | return AddTestTablet(ns_id, table_id, tablet_id, schema, CreateLocalConfig(), table_type); |
295 | 31 | } |
296 | | |
297 | | Status MiniTabletServer::AddTestTablet(const std::string& ns_id, |
298 | | const std::string& table_id, |
299 | | const std::string& tablet_id, |
300 | | const Schema& schema, |
301 | | const RaftConfigPB& config, |
302 | 31 | TableType table_type) { |
303 | 31 | CHECK(started_) << "Must Start()"0 ; |
304 | 31 | Schema schema_with_ids = SchemaBuilder(schema).Build(); |
305 | 31 | pair<PartitionSchema, Partition> partition = tablet::CreateDefaultPartition(schema_with_ids); |
306 | | |
307 | 31 | auto table_info = std::make_shared<tablet::TableInfo>( |
308 | 31 | table_id, ns_id, table_id, table_type, schema_with_ids, IndexMap(), |
309 | 31 | boost::none /* index_info */, 0 /* schema_version */, partition.first); |
310 | | |
311 | 31 | return ResultToStatus(server_->tablet_manager()->CreateNewTablet( |
312 | 31 | table_info, tablet_id, partition.second, config)); |
313 | 31 | } |
314 | | |
315 | 7 | void MiniTabletServer::FailHeartbeats(bool fail_heartbeats_for_tests) { |
316 | 7 | server_->set_fail_heartbeats_for_tests(fail_heartbeats_for_tests); |
317 | 7 | } |
318 | | |
319 | 227 | Endpoint MiniTabletServer::bound_rpc_addr() const { |
320 | 227 | CHECK(started_); |
321 | 227 | return server_->first_rpc_address(); |
322 | 227 | } |
323 | | |
324 | 104 | Endpoint MiniTabletServer::bound_http_addr() const { |
325 | 104 | CHECK(started_); |
326 | 104 | return server_->first_http_address(); |
327 | 104 | } |
328 | | |
329 | 0 | std::string MiniTabletServer::bound_http_addr_str() const { |
330 | 0 | return HostPort::FromBoundEndpoint(bound_http_addr()).ToString(); |
331 | 0 | } |
332 | | |
333 | 0 | std::string MiniTabletServer::bound_rpc_addr_str() const { |
334 | 0 | return HostPort::FromBoundEndpoint(bound_rpc_addr()).ToString(); |
335 | 0 | } |
336 | | |
337 | 0 | FsManager& MiniTabletServer::fs_manager() const { |
338 | 0 | CHECK(started_); |
339 | 0 | return *server_->fs_manager(); |
340 | 0 | } |
341 | | |
342 | | } // namespace tserver |
343 | | } // namespace yb |